Skip to main content

icydb_core/db/data/
store.rs

1//! Module: data::store
2//! Responsibility: stable-or-heap row storage behind the data-store boundary.
3//! Does not own: key/row validation policy beyond type boundaries.
4//! Boundary: commit/executor call into this layer after prevalidation.
5
6use crate::{
7    db::{
8        data::{CanonicalRow, RawDataStoreKey, RawRow},
9        direction::Direction,
10        key_taxonomy::RawDataStoreKeyRange,
11        ordered_overlay::{OrderedOverlayEntry, OrderedOverlayVisit, visit_ordered_overlay},
12    },
13    types::EntityTag,
14};
15use ic_memory::stable_structures::{
16    BTreeMap as StableBTreeMap, DefaultMemoryImpl, memory_manager::VirtualMemory,
17};
18#[cfg(feature = "diagnostics")]
19use std::cell::Cell;
20use std::collections::{BTreeMap as HeapBTreeMap, BTreeSet};
21use std::convert::Infallible;
22use std::ops::{Bound, RangeBounds};
23
24#[cfg(feature = "diagnostics")]
25thread_local! {
26    static DATA_STORE_GET_CALL_COUNT: Cell<u64> = const { Cell::new(0) };
27}
28
29#[cfg(feature = "diagnostics")]
30fn record_data_store_get_call() {
31    DATA_STORE_GET_CALL_COUNT.with(|count| {
32        count.set(count.get().saturating_add(1));
33    });
34}
35
36///
37/// DataStore
38///
39/// Thin persistence wrapper over one stable or heap BTreeMap.
40///
41/// Invariant: callers provide already-validated `RawDataStoreKey` and canonical row bytes.
42/// This type intentionally does not enforce commit-phase ordering.
43///
44
45pub struct DataStore {
46    backend: DataStoreBackend,
47}
48
49enum DataStoreBackend {
50    Stable(StableBTreeMap<RawDataStoreKey, RawRow, VirtualMemory<DefaultMemoryImpl>>),
51    Heap(HeapBTreeMap<RawDataStoreKey, RawRow>),
52    Journaled {
53        canonical: StableBTreeMap<RawDataStoreKey, RawRow, VirtualMemory<DefaultMemoryImpl>>,
54        live: HeapBTreeMap<RawDataStoreKey, RawRow>,
55        tombstones: BTreeSet<RawDataStoreKey>,
56    },
57}
58
59/// Control-flow result for store traversal visitors.
60#[derive(Clone, Copy, Debug, Eq, PartialEq)]
61pub(in crate::db) enum StoreVisit {
62    Continue,
63    Stop,
64}
65
66impl StoreVisit {
67    const fn should_stop(self) -> bool {
68        matches!(self, Self::Stop)
69    }
70}
71
72impl DataStore {
73    /// Initialize a data store with the provided backing memory.
74    #[must_use]
75    pub fn init(memory: VirtualMemory<DefaultMemoryImpl>) -> Self {
76        Self {
77            backend: DataStoreBackend::Stable(StableBTreeMap::init(memory)),
78        }
79    }
80
81    /// Initialize a volatile heap-backed data store.
82    #[must_use]
83    pub const fn init_heap() -> Self {
84        Self {
85            backend: DataStoreBackend::Heap(HeapBTreeMap::new()),
86        }
87    }
88
89    /// Initialize a journaled cached-stable data store.
90    ///
91    /// Normal writes update only the live projection. The canonical stable map
92    /// is the future fold target and is not mutated by this wrapper's write
93    /// methods.
94    #[must_use]
95    pub fn init_journaled(memory: VirtualMemory<DefaultMemoryImpl>) -> Self {
96        Self {
97            backend: DataStoreBackend::Journaled {
98                canonical: StableBTreeMap::init(memory),
99                live: HeapBTreeMap::new(),
100                tombstones: BTreeSet::new(),
101            },
102        }
103    }
104
105    /// Insert or replace one row by raw key.
106    pub(in crate::db) fn insert(
107        &mut self,
108        key: RawDataStoreKey,
109        row: CanonicalRow,
110    ) -> Option<RawRow> {
111        let row = row.into_raw_row();
112        let previous_journaled = if matches!(self.backend, DataStoreBackend::Journaled { .. }) {
113            self.get(&key)
114        } else {
115            None
116        };
117        match &mut self.backend {
118            DataStoreBackend::Stable(map) => map.insert(key, row),
119            DataStoreBackend::Heap(map) => map.insert(key, row),
120            DataStoreBackend::Journaled {
121                live, tombstones, ..
122            } => {
123                tombstones.remove(&key);
124                live.insert(key, row);
125                previous_journaled
126            }
127        }
128    }
129
130    /// Insert one raw row directly for corruption-focused test setup only.
131    #[cfg(test)]
132    pub(in crate::db) fn insert_raw_for_test(
133        &mut self,
134        key: RawDataStoreKey,
135        row: RawRow,
136    ) -> Option<RawRow> {
137        let previous_journaled = if matches!(self.backend, DataStoreBackend::Journaled { .. }) {
138            self.get(&key)
139        } else {
140            None
141        };
142        match &mut self.backend {
143            DataStoreBackend::Stable(map) => map.insert(key, row),
144            DataStoreBackend::Heap(map) => map.insert(key, row),
145            DataStoreBackend::Journaled {
146                live, tombstones, ..
147            } => {
148                tombstones.remove(&key);
149                live.insert(key, row);
150                previous_journaled
151            }
152        }
153    }
154
155    /// Remove one row by raw key.
156    pub(in crate::db) fn remove(&mut self, key: &RawDataStoreKey) -> Option<RawRow> {
157        let previous_journaled = if matches!(self.backend, DataStoreBackend::Journaled { .. }) {
158            self.get(key)
159        } else {
160            None
161        };
162        match &mut self.backend {
163            DataStoreBackend::Stable(map) => map.remove(key),
164            DataStoreBackend::Heap(map) => map.remove(key),
165            DataStoreBackend::Journaled {
166                live, tombstones, ..
167            } => {
168                live.remove(key);
169                tombstones.insert(key.clone());
170                previous_journaled
171            }
172        }
173    }
174
175    /// Reset the volatile projection for journaled recovery without mutating
176    /// the canonical stable base.
177    pub(in crate::db) fn reset_journaled_live_projection(
178        &mut self,
179    ) -> Result<(), crate::error::InternalError> {
180        let DataStoreBackend::Journaled {
181            live, tombstones, ..
182        } = &mut self.backend
183        else {
184            return Err(crate::error::InternalError::store_invariant(
185                "journaled live projection reset requires a journaled data store",
186            ));
187        };
188
189        live.clear();
190        tombstones.clear();
191
192        Ok(())
193    }
194
195    /// Apply one recovered journal row put into the volatile projection.
196    pub(in crate::db) fn apply_recovered_journal_put(
197        &mut self,
198        key: RawDataStoreKey,
199        row: RawRow,
200    ) -> Result<Option<RawRow>, crate::error::InternalError> {
201        let DataStoreBackend::Journaled {
202            canonical,
203            live,
204            tombstones,
205        } = &mut self.backend
206        else {
207            return Err(crate::error::InternalError::store_invariant(
208                "journal row replay requires a journaled data store",
209            ));
210        };
211
212        let previous = if tombstones.contains(&key) {
213            None
214        } else {
215            live.get(&key).cloned().or_else(|| canonical.get(&key))
216        };
217        tombstones.remove(&key);
218        live.insert(key, row);
219
220        Ok(previous)
221    }
222
223    /// Apply one recovered journal row delete into the volatile projection.
224    pub(in crate::db) fn apply_recovered_journal_delete(
225        &mut self,
226        key: &RawDataStoreKey,
227    ) -> Result<Option<RawRow>, crate::error::InternalError> {
228        let DataStoreBackend::Journaled {
229            canonical,
230            live,
231            tombstones,
232        } = &mut self.backend
233        else {
234            return Err(crate::error::InternalError::store_invariant(
235                "journal row replay requires a journaled data store",
236            ));
237        };
238
239        let previous = if tombstones.contains(key) {
240            None
241        } else {
242            live.get(key).cloned().or_else(|| canonical.get(key))
243        };
244        live.remove(key);
245        tombstones.insert(key.clone());
246
247        Ok(previous)
248    }
249
250    /// Apply one folded journal row put into the canonical stable base.
251    pub(in crate::db) fn fold_recovered_journal_put(
252        &mut self,
253        key: RawDataStoreKey,
254        row: RawRow,
255    ) -> Result<Option<RawRow>, crate::error::InternalError> {
256        let DataStoreBackend::Journaled { canonical, .. } = &mut self.backend else {
257            return Err(crate::error::InternalError::store_invariant(
258                "journal row fold requires a journaled data store",
259            ));
260        };
261
262        Ok(canonical.insert(key, row))
263    }
264
265    /// Apply one folded journal row delete into the canonical stable base.
266    pub(in crate::db) fn fold_recovered_journal_delete(
267        &mut self,
268        key: &RawDataStoreKey,
269    ) -> Result<Option<RawRow>, crate::error::InternalError> {
270        let DataStoreBackend::Journaled { canonical, .. } = &mut self.backend else {
271            return Err(crate::error::InternalError::store_invariant(
272                "journal row fold requires a journaled data store",
273            ));
274        };
275
276        Ok(canonical.remove(key))
277    }
278
279    /// Load one row by raw key.
280    pub(in crate::db) fn get(&self, key: &RawDataStoreKey) -> Option<RawRow> {
281        #[cfg(feature = "diagnostics")]
282        record_data_store_get_call();
283
284        match &self.backend {
285            DataStoreBackend::Stable(map) => map.get(key),
286            DataStoreBackend::Heap(map) => map.get(key).cloned(),
287            DataStoreBackend::Journaled { .. } => Self::journaled_get_raw(&self.backend, key),
288        }
289    }
290
291    /// Return whether one raw key exists without cloning the row payload.
292    #[must_use]
293    pub(in crate::db) fn contains(&self, key: &RawDataStoreKey) -> bool {
294        match &self.backend {
295            DataStoreBackend::Stable(map) => map.contains_key(key),
296            DataStoreBackend::Heap(map) => map.contains_key(key),
297            DataStoreBackend::Journaled { .. } => {
298                Self::journaled_get_raw(&self.backend, key).is_some()
299            }
300        }
301    }
302
303    /// Clear all stored rows from the data store.
304    #[cfg(test)]
305    pub(in crate::db) fn clear(&mut self) {
306        match &mut self.backend {
307            DataStoreBackend::Stable(map) => map.clear_new(),
308            DataStoreBackend::Heap(map) => map.clear(),
309            DataStoreBackend::Journaled {
310                canonical,
311                live,
312                tombstones,
313            } => {
314                live.clear();
315                tombstones.clear();
316                for entry in canonical.iter() {
317                    tombstones.insert(entry.key().clone());
318                }
319            }
320        }
321    }
322
323    /// Return the number of stored rows without exposing the backing map.
324    #[must_use]
325    pub(in crate::db) fn len(&self) -> u64 {
326        match &self.backend {
327            DataStoreBackend::Stable(map) => map.len(),
328            DataStoreBackend::Heap(map) => u64::try_from(map.len()).unwrap_or(u64::MAX),
329            DataStoreBackend::Journaled { .. } => {
330                let mut count = 0_u64;
331                let _: Result<(), Infallible> = self.visit_entries(|_key, _row| {
332                    count = count.saturating_add(1);
333                    Ok(StoreVisit::Continue)
334                });
335                count
336            }
337        }
338    }
339
340    /// Return whether the data store currently contains no rows.
341    #[must_use]
342    #[cfg(test)]
343    pub(in crate::db) fn is_empty(&self) -> bool {
344        match &self.backend {
345            DataStoreBackend::Stable(map) => map.is_empty(),
346            DataStoreBackend::Heap(map) => map.is_empty(),
347            DataStoreBackend::Journaled { .. } => {
348                let mut empty = true;
349                let _: Result<(), Infallible> = self.visit_entries(|_key, _row| {
350                    empty = false;
351                    Ok(StoreVisit::Stop)
352                });
353                empty
354            }
355        }
356    }
357
358    /// Visit raw row entries in canonical storage order.
359    pub(in crate::db) fn visit_entries<E>(
360        &self,
361        mut visitor: impl FnMut(&RawDataStoreKey, &RawRow) -> Result<StoreVisit, E>,
362    ) -> Result<(), E> {
363        match &self.backend {
364            DataStoreBackend::Stable(map) => {
365                for entry in map.iter() {
366                    if visitor(entry.key(), &entry.value())?.should_stop() {
367                        break;
368                    }
369                }
370            }
371            DataStoreBackend::Heap(map) => {
372                for (key, row) in map {
373                    if visitor(key, row)?.should_stop() {
374                        break;
375                    }
376                }
377            }
378            DataStoreBackend::Journaled {
379                canonical: _,
380                live: _,
381                tombstones: _,
382            } => Self::visit_journaled_entries_in_bounds(
383                &self.backend,
384                (Bound::Unbounded, Bound::Unbounded),
385                false,
386                visitor,
387            )?,
388        }
389
390        Ok(())
391    }
392
393    /// Visit raw row entries in reverse canonical storage order.
394    pub(in crate::db) fn visit_entries_rev<E>(
395        &self,
396        mut visitor: impl FnMut(&RawDataStoreKey, &RawRow) -> Result<StoreVisit, E>,
397    ) -> Result<(), E> {
398        match &self.backend {
399            DataStoreBackend::Stable(map) => {
400                for entry in map.iter().rev() {
401                    if visitor(entry.key(), &entry.value())?.should_stop() {
402                        break;
403                    }
404                }
405            }
406            DataStoreBackend::Heap(map) => {
407                for (key, row) in map.iter().rev() {
408                    if visitor(key, row)?.should_stop() {
409                        break;
410                    }
411                }
412            }
413            DataStoreBackend::Journaled {
414                canonical: _,
415                live: _,
416                tombstones: _,
417            } => Self::visit_journaled_entries_in_bounds(
418                &self.backend,
419                (Bound::Unbounded, Bound::Unbounded),
420                true,
421                visitor,
422            )?,
423        }
424
425        Ok(())
426    }
427
428    /// Visit raw row entries whose keys belong to the provided storage range.
429    pub(in crate::db) fn visit_range<E>(
430        &self,
431        key_range: impl RangeBounds<RawDataStoreKey>,
432        mut visitor: impl FnMut(&RawDataStoreKey, &RawRow) -> Result<StoreVisit, E>,
433    ) -> Result<(), E> {
434        let bounds = Self::owned_range_bounds(&key_range);
435        match &self.backend {
436            DataStoreBackend::Stable(map) => {
437                for entry in map.range((bounds.0.clone(), bounds.1)) {
438                    if visitor(entry.key(), &entry.value())?.should_stop() {
439                        break;
440                    }
441                }
442            }
443            DataStoreBackend::Heap(map) => {
444                for (key, row) in map.range((bounds.0.clone(), bounds.1)) {
445                    if visitor(key, row)?.should_stop() {
446                        break;
447                    }
448                }
449            }
450            DataStoreBackend::Journaled {
451                canonical: _,
452                live: _,
453                tombstones: _,
454            } => Self::visit_journaled_entries_in_bounds(&self.backend, bounds, false, visitor)?,
455        }
456
457        Ok(())
458    }
459
460    /// Visit raw row entries in reverse order whose keys belong to the provided storage range.
461    pub(in crate::db) fn visit_range_rev<E>(
462        &self,
463        key_range: impl RangeBounds<RawDataStoreKey>,
464        mut visitor: impl FnMut(&RawDataStoreKey, &RawRow) -> Result<StoreVisit, E>,
465    ) -> Result<(), E> {
466        let bounds = Self::owned_range_bounds(&key_range);
467        match &self.backend {
468            DataStoreBackend::Stable(map) => {
469                for entry in map.range((bounds.0.clone(), bounds.1)).rev() {
470                    if visitor(entry.key(), &entry.value())?.should_stop() {
471                        break;
472                    }
473                }
474            }
475            DataStoreBackend::Heap(map) => {
476                for (key, row) in map.range((bounds.0.clone(), bounds.1)).rev() {
477                    if visitor(key, row)?.should_stop() {
478                        break;
479                    }
480                }
481            }
482            DataStoreBackend::Journaled {
483                canonical: _,
484                live: _,
485                tombstones: _,
486            } => Self::visit_journaled_entries_in_bounds(&self.backend, bounds, true, visitor)?,
487        }
488
489        Ok(())
490    }
491
492    /// Visit raw row entries for one entity using compact prefix bounds.
493    pub(in crate::db) fn visit_entity<E>(
494        &self,
495        entity: EntityTag,
496        visitor: impl FnMut(&RawDataStoreKey, &RawRow) -> Result<StoreVisit, E>,
497    ) -> Result<(), E> {
498        let range = RawDataStoreKeyRange::entity_prefix(entity);
499        self.visit_range(RawDataStoreKey::store_range_bounds(&range), visitor)
500    }
501
502    /// Sum of bytes used by all stored rows.
503    pub(in crate::db) fn memory_bytes(&self) -> u64 {
504        // Report map footprint as key bytes + row bytes per entry.
505        let mut bytes = 0u64;
506        let _: Result<(), Infallible> = self.visit_entries(|key, row| {
507            bytes = bytes.saturating_add(key.as_bytes().len() as u64 + row.len() as u64);
508            Ok(StoreVisit::Continue)
509        });
510        bytes
511    }
512
513    /// Return the monotonic perf-only count of stable row fetches seen by this process.
514    #[cfg(feature = "diagnostics")]
515    pub(in crate::db) fn current_get_call_count() -> u64 {
516        DATA_STORE_GET_CALL_COUNT.with(Cell::get)
517    }
518
519    #[cfg(test)]
520    #[must_use]
521    pub(in crate::db) fn canonical_len_for_tests(&self) -> u64 {
522        match &self.backend {
523            DataStoreBackend::Stable(map) | DataStoreBackend::Journaled { canonical: map, .. } => {
524                map.len()
525            }
526            DataStoreBackend::Heap(_) => 0,
527        }
528    }
529
530    fn journaled_get_raw(backend: &DataStoreBackend, key: &RawDataStoreKey) -> Option<RawRow> {
531        let DataStoreBackend::Journaled {
532            canonical,
533            live,
534            tombstones,
535        } = backend
536        else {
537            return None;
538        };
539
540        if tombstones.contains(key) {
541            return None;
542        }
543        live.get(key).cloned().or_else(|| canonical.get(key))
544    }
545
546    fn owned_range_bounds(
547        key_range: &impl RangeBounds<RawDataStoreKey>,
548    ) -> (Bound<RawDataStoreKey>, Bound<RawDataStoreKey>) {
549        let lower = match key_range.start_bound() {
550            Bound::Included(key) => Bound::Included(key.clone()),
551            Bound::Excluded(key) => Bound::Excluded(key.clone()),
552            Bound::Unbounded => Bound::Unbounded,
553        };
554        let upper = match key_range.end_bound() {
555            Bound::Included(key) => Bound::Included(key.clone()),
556            Bound::Excluded(key) => Bound::Excluded(key.clone()),
557            Bound::Unbounded => Bound::Unbounded,
558        };
559
560        (lower, upper)
561    }
562
563    fn visit_journaled_entries_in_bounds<E>(
564        backend: &DataStoreBackend,
565        bounds: (Bound<RawDataStoreKey>, Bound<RawDataStoreKey>),
566        reverse: bool,
567        mut visitor: impl FnMut(&RawDataStoreKey, &RawRow) -> Result<StoreVisit, E>,
568    ) -> Result<(), E> {
569        let DataStoreBackend::Journaled {
570            canonical,
571            live,
572            tombstones,
573        } = backend
574        else {
575            return Ok(());
576        };
577
578        if canonical.is_empty() {
579            if reverse {
580                for (key, row) in live.range(bounds).rev() {
581                    if visitor(key, row)?.should_stop() {
582                        return Ok(());
583                    }
584                }
585            } else {
586                for (key, row) in live.range(bounds) {
587                    if visitor(key, row)?.should_stop() {
588                        return Ok(());
589                    }
590                }
591            }
592            return Ok(());
593        }
594
595        if live.is_empty() && tombstones.is_empty() {
596            if reverse {
597                for entry in canonical.range(bounds).rev() {
598                    if visitor(entry.key(), &entry.value())?.should_stop() {
599                        return Ok(());
600                    }
601                }
602            } else {
603                for entry in canonical.range(bounds) {
604                    if visitor(entry.key(), &entry.value())?.should_stop() {
605                        return Ok(());
606                    }
607                }
608            }
609            return Ok(());
610        }
611
612        match if reverse {
613            Direction::Desc
614        } else {
615            Direction::Asc
616        } {
617            Direction::Asc => visit_ordered_overlay(
618                canonical.range((bounds.0.clone(), bounds.1.clone())),
619                live.range((bounds.0, bounds.1)),
620                Direction::Asc,
621                |canonical_entry, live_entry| canonical_entry.key().cmp(live_entry.0),
622                |canonical_entry| !tombstones.contains(canonical_entry.key()),
623                |live_entry| !tombstones.contains(live_entry.0),
624                |entry| {
625                    let visit = match entry {
626                        OrderedOverlayEntry::Canonical(canonical_entry) => {
627                            visitor(canonical_entry.key(), &canonical_entry.value())?
628                        }
629                        OrderedOverlayEntry::Live((key, row)) => visitor(key, row)?,
630                    };
631                    Ok(if visit.should_stop() {
632                        OrderedOverlayVisit::Stop
633                    } else {
634                        OrderedOverlayVisit::Continue
635                    })
636                },
637            ),
638            Direction::Desc => visit_ordered_overlay(
639                canonical.range((bounds.0.clone(), bounds.1.clone())).rev(),
640                live.range((bounds.0, bounds.1)).rev(),
641                Direction::Desc,
642                |canonical_entry, live_entry| canonical_entry.key().cmp(live_entry.0),
643                |canonical_entry| !tombstones.contains(canonical_entry.key()),
644                |live_entry| !tombstones.contains(live_entry.0),
645                |entry| {
646                    let visit = match entry {
647                        OrderedOverlayEntry::Canonical(canonical_entry) => {
648                            visitor(canonical_entry.key(), &canonical_entry.value())?
649                        }
650                        OrderedOverlayEntry::Live((key, row)) => visitor(key, row)?,
651                    };
652                    Ok(if visit.should_stop() {
653                        OrderedOverlayVisit::Stop
654                    } else {
655                        OrderedOverlayVisit::Continue
656                    })
657                },
658            ),
659        }
660    }
661}
662
663#[cfg(test)]
664mod tests {
665    use super::*;
666    use crate::{
667        db::{
668            data::DecodedDataStoreKey,
669            key_taxonomy::{PrimaryKeyComponent, PrimaryKeyValue},
670        },
671        testing::test_memory,
672    };
673    use std::ops::Bound;
674
675    fn raw_key(entity: u64, id: u64) -> RawDataStoreKey {
676        DecodedDataStoreKey::new(
677            EntityTag::new(entity),
678            &PrimaryKeyValue::Scalar(PrimaryKeyComponent::Nat64(id)),
679        )
680        .to_raw()
681        .expect("test data key should encode")
682    }
683
684    fn raw_row(value: u8) -> RawRow {
685        RawRow::try_new(vec![value]).expect("test raw row should be bounded")
686    }
687
688    fn seed_store(memory_id: u8, entries: &[(u64, u64, u8)]) -> DataStore {
689        let mut store = DataStore::init(test_memory(memory_id));
690        for (entity, id, row) in entries {
691            store.insert_raw_for_test(raw_key(*entity, *id), raw_row(*row));
692        }
693        store
694    }
695
696    fn seed_heap_store(entries: &[(u64, u64, u8)]) -> DataStore {
697        let mut store = DataStore::init_heap();
698        for (entity, id, row) in entries {
699            store.insert_raw_for_test(raw_key(*entity, *id), raw_row(*row));
700        }
701        store
702    }
703
704    fn collect_keys(store: &DataStore) -> Vec<RawDataStoreKey> {
705        let mut keys = Vec::new();
706        let _: Result<(), Infallible> = store.visit_entries(|key, _row| {
707            keys.push(key.clone());
708            Ok(StoreVisit::Continue)
709        });
710        keys
711    }
712
713    #[test]
714    fn data_store_visit_entries_preserves_storage_key_order() {
715        let store = seed_store(221, &[(2, 1, 21), (1, 3, 13), (1, 1, 11), (1, 2, 12)]);
716
717        let mut expected = vec![raw_key(2, 1), raw_key(1, 3), raw_key(1, 1), raw_key(1, 2)];
718        expected.sort();
719
720        assert_eq!(collect_keys(&store), expected);
721    }
722
723    #[test]
724    fn data_store_visit_range_preserves_raw_key_bounds() {
725        let store = seed_store(222, &[(1, 1, 11), (1, 2, 12), (1, 3, 13), (1, 4, 14)]);
726
727        let mut visited = Vec::new();
728        let _: Result<(), Infallible> = store.visit_range(
729            (
730                Bound::Included(raw_key(1, 2)),
731                Bound::Excluded(raw_key(1, 4)),
732            ),
733            |key, row| {
734                visited.push((key.clone(), row.as_bytes()[0]));
735                Ok(StoreVisit::Continue)
736            },
737        );
738
739        assert_eq!(visited, vec![(raw_key(1, 2), 12), (raw_key(1, 3), 13)]);
740    }
741
742    #[test]
743    fn data_store_visit_entity_preserves_compact_entity_prefix_bounds() {
744        let store = seed_store(223, &[(2, 1, 21), (1, 2, 12), (2, 3, 23), (1, 1, 11)]);
745
746        let mut visited = Vec::new();
747        let _: Result<(), Infallible> = store.visit_entity(EntityTag::new(2), |key, row| {
748            visited.push((key.clone(), row.as_bytes()[0]));
749            Ok(StoreVisit::Continue)
750        });
751
752        assert_eq!(visited, vec![(raw_key(2, 1), 21), (raw_key(2, 3), 23)]);
753    }
754
755    #[test]
756    fn data_store_visit_entries_can_stop_without_error() {
757        let store = seed_store(224, &[(1, 1, 11), (1, 2, 12), (1, 3, 13)]);
758
759        let mut visited = Vec::new();
760        let _: Result<(), Infallible> = store.visit_entries(|key, _row| {
761            visited.push(key.clone());
762            Ok(if visited.len() == 2 {
763                StoreVisit::Stop
764            } else {
765                StoreVisit::Continue
766            })
767        });
768
769        assert_eq!(visited, vec![raw_key(1, 1), raw_key(1, 2)]);
770    }
771
772    #[test]
773    fn heap_data_store_preserves_order_bounds_and_early_stop() {
774        let store = seed_heap_store(&[(2, 1, 21), (1, 3, 13), (1, 1, 11), (1, 2, 12)]);
775
776        let mut expected = vec![raw_key(2, 1), raw_key(1, 3), raw_key(1, 1), raw_key(1, 2)];
777        expected.sort();
778        assert_eq!(collect_keys(&store), expected);
779
780        let mut ranged = Vec::new();
781        let _: Result<(), Infallible> = store.visit_range(
782            (
783                Bound::Included(raw_key(1, 1)),
784                Bound::Excluded(raw_key(1, 3)),
785            ),
786            |key, row| {
787                ranged.push((key.clone(), row.as_bytes()[0]));
788                Ok(StoreVisit::Continue)
789            },
790        );
791        assert_eq!(ranged, vec![(raw_key(1, 1), 11), (raw_key(1, 2), 12)]);
792
793        let mut entity = Vec::new();
794        let _: Result<(), Infallible> = store.visit_entity(EntityTag::new(2), |key, row| {
795            entity.push((key.clone(), row.as_bytes()[0]));
796            Ok(StoreVisit::Continue)
797        });
798        assert_eq!(entity, vec![(raw_key(2, 1), 21)]);
799
800        let mut stopped = Vec::new();
801        let _: Result<(), Infallible> = store.visit_entries(|key, _| {
802            stopped.push(key.clone());
803            Ok(if stopped.len() == 2 {
804                StoreVisit::Stop
805            } else {
806                StoreVisit::Continue
807            })
808        });
809        assert_eq!(stopped, vec![raw_key(1, 1), raw_key(1, 2)]);
810    }
811
812    #[test]
813    fn journaled_mixed_data_range_traversal_streams_without_snapshot() {
814        let mut store = DataStore::init_journaled(test_memory(225));
815        store
816            .fold_recovered_journal_put(raw_key(1, 1), raw_row(11))
817            .expect("canonical seed should fold");
818        store
819            .fold_recovered_journal_put(raw_key(1, 3), raw_row(13))
820            .expect("canonical seed should fold");
821        store
822            .fold_recovered_journal_put(raw_key(1, 5), raw_row(15))
823            .expect("canonical seed should fold");
824        store
825            .apply_recovered_journal_put(raw_key(1, 0), raw_row(10))
826            .expect("live put should apply");
827        store
828            .apply_recovered_journal_put(raw_key(1, 4), raw_row(14))
829            .expect("live put should apply");
830        store
831            .apply_recovered_journal_put(raw_key(1, 5), raw_row(55))
832            .expect("live override should apply");
833        store
834            .apply_recovered_journal_delete(&raw_key(1, 1))
835            .expect("live delete should apply");
836
837        let mut asc = Vec::new();
838        let _: Result<(), Infallible> = store.visit_range(
839            (
840                Bound::Included(raw_key(1, 0)),
841                Bound::Included(raw_key(1, 5)),
842            ),
843            |key, row| {
844                asc.push((key.clone(), row.as_bytes()[0]));
845                Ok(if asc.len() == 2 {
846                    StoreVisit::Stop
847                } else {
848                    StoreVisit::Continue
849                })
850            },
851        );
852        assert_eq!(asc, vec![(raw_key(1, 0), 10), (raw_key(1, 3), 13)]);
853
854        let mut desc = Vec::new();
855        let _: Result<(), Infallible> = store.visit_range_rev(
856            (
857                Bound::Included(raw_key(1, 0)),
858                Bound::Included(raw_key(1, 5)),
859            ),
860            |key, row| {
861                desc.push((key.clone(), row.as_bytes()[0]));
862                Ok(if desc.len() == 2 {
863                    StoreVisit::Stop
864                } else {
865                    StoreVisit::Continue
866                })
867            },
868        );
869        assert_eq!(desc, vec![(raw_key(1, 5), 55), (raw_key(1, 4), 14)]);
870    }
871}