Skip to main content

shodh_redb/
table.rs

1use crate::cdc::types::{CdcEvent, ChangeOp};
2use crate::compat::Mutex;
3use crate::db::TransactionGuard;
4use crate::merge::MergeOperator;
5use crate::sealed::Sealed;
6use crate::tree_store::{
7    AccessGuardMutInPlace, Btree, BtreeExtractIf, BtreeHeader, BtreeMut, BtreeRangeIter,
8    MAX_PAIR_LENGTH, MAX_VALUE_LENGTH, PageHint, PageNumber, PageTrackerPolicy, RawBtree,
9    RawEntryIter, TransactionalMemory,
10};
11use crate::types::{Key, MutInPlaceValue, Value};
12use crate::{AccessGuard, AccessGuardMut, StorageError, WriteTransaction};
13use crate::{Result, TableHandle};
14use alloc::string::{String, ToString};
15use alloc::sync::Arc;
16use alloc::vec::Vec;
17use core::borrow::Borrow;
18use core::fmt::{Debug, Formatter};
19use core::marker::PhantomData;
20use core::ops::RangeBounds;
21
22/// Informational storage stats about a table
23#[derive(Debug, Clone, Copy)]
24pub struct TableStats {
25    pub(crate) tree_height: u32,
26    pub(crate) leaf_pages: u64,
27    pub(crate) branch_pages: u64,
28    pub(crate) stored_leaf_bytes: u64,
29    pub(crate) metadata_bytes: u64,
30    pub(crate) fragmented_bytes: u64,
31}
32
33impl TableStats {
34    /// Maximum traversal distance to reach the deepest (key, value) pair in the table
35    pub fn tree_height(&self) -> u32 {
36        self.tree_height
37    }
38
39    /// Number of leaf pages that store user data
40    pub fn leaf_pages(&self) -> u64 {
41        self.leaf_pages
42    }
43
44    /// Number of branch pages in the btree that store user data
45    pub fn branch_pages(&self) -> u64 {
46        self.branch_pages
47    }
48
49    /// Number of bytes consumed by keys and values that have been inserted.
50    /// Does not include indexing overhead
51    pub fn stored_bytes(&self) -> u64 {
52        self.stored_leaf_bytes
53    }
54
55    /// Number of bytes consumed by keys in internal branch pages, plus other metadata
56    pub fn metadata_bytes(&self) -> u64 {
57        self.metadata_bytes
58    }
59
60    /// Number of bytes consumed by fragmentation, both in data pages and internal metadata tables
61    pub fn fragmented_bytes(&self) -> u64 {
62        self.fragmented_bytes
63    }
64}
65
66/// A table containing key-value mappings
67pub struct Table<'txn, K: Key + 'static, V: Value + 'static> {
68    name: String,
69    transaction: &'txn WriteTransaction,
70    tree: BtreeMut<'txn, K, V>,
71}
72
73impl<K: Key + 'static, V: Value + 'static> TableHandle for Table<'_, K, V> {
74    fn name(&self) -> &str {
75        &self.name
76    }
77}
78
79impl<'txn, K: Key + 'static, V: Value + 'static> Table<'txn, K, V> {
80    pub(crate) fn new(
81        name: &str,
82        table_root: Option<BtreeHeader>,
83        freed_pages: Arc<Mutex<Vec<PageNumber>>>,
84        allocated_pages: Arc<Mutex<PageTrackerPolicy>>,
85        mem: Arc<TransactionalMemory>,
86        transaction: &'txn WriteTransaction,
87    ) -> Table<'txn, K, V> {
88        Table {
89            name: name.to_string(),
90            transaction,
91            tree: BtreeMut::new(
92                table_root,
93                transaction.transaction_guard(),
94                mem,
95                freed_pages,
96                allocated_pages,
97            ),
98        }
99    }
100
101    #[allow(dead_code)]
102    #[cfg(feature = "std")]
103    pub(crate) fn print_debug(&self, include_values: bool) -> Result {
104        self.tree.print_debug(include_values)
105    }
106
107    /// Returns an accessor, which allows mutation, to the value corresponding to the given key
108    pub fn get_mut<'k>(
109        &mut self,
110        key: impl Borrow<K::SelfType<'k>>,
111    ) -> Result<Option<AccessGuardMut<'_, V>>> {
112        self.tree.get_mut(key.borrow())
113    }
114
115    /// Removes and returns the first key-value pair in the table
116    pub fn pop_first(&mut self) -> Result<Option<(AccessGuard<'_, K>, AccessGuard<'_, V>)>> {
117        self.tree.pop_first()
118    }
119
120    /// Removes and returns the last key-value pair in the table
121    pub fn pop_last(&mut self) -> Result<Option<(AccessGuard<'_, K>, AccessGuard<'_, V>)>> {
122        self.tree.pop_last()
123    }
124
125    /// Applies `predicate` to all key-value pairs. All entries for which
126    /// `predicate` evaluates to `true` are returned in an iterator, and those which are read from the iterator are removed
127    ///
128    /// Note: values not read from the iterator will not be removed
129    pub fn extract_if<F: for<'f> FnMut(K::SelfType<'f>, V::SelfType<'f>) -> bool>(
130        &mut self,
131        predicate: F,
132    ) -> Result<ExtractIf<'_, K, V, F>> {
133        self.extract_from_if::<K::SelfType<'_>, F>(.., predicate)
134    }
135
136    /// Applies `predicate` to all key-value pairs in the specified range. All entries for which
137    /// `predicate` evaluates to `true` are returned in an iterator, and those which are read from the iterator are removed
138    ///
139    /// Note: values not read from the iterator will not be removed
140    pub fn extract_from_if<'a, KR, F: for<'f> FnMut(K::SelfType<'f>, V::SelfType<'f>) -> bool>(
141        &mut self,
142        range: impl RangeBounds<KR> + 'a,
143        predicate: F,
144    ) -> Result<ExtractIf<'_, K, V, F>>
145    where
146        KR: Borrow<K::SelfType<'a>> + 'a,
147    {
148        let inner = self.tree.extract_from_if(&range, predicate)?;
149        if self.transaction.cdc_log.is_some() {
150            Ok(ExtractIf::with_cdc(
151                inner,
152                self.transaction,
153                self.name.clone(),
154            ))
155        } else {
156            Ok(ExtractIf::new(inner))
157        }
158    }
159
160    /// Applies `predicate` to all key-value pairs. All entries for which
161    /// `predicate` evaluates to `false` are removed.
162    ///
163    pub fn retain<F: for<'f> FnMut(K::SelfType<'f>, V::SelfType<'f>) -> bool>(
164        &mut self,
165        predicate: F,
166    ) -> Result {
167        self.retain_in::<K::SelfType<'_>, F>(.., predicate)
168    }
169
170    /// Applies `predicate` to all key-value pairs in the range `start..end`. All entries for which
171    /// `predicate` evaluates to `false` are removed.
172    ///
173    pub fn retain_in<'a, KR, F: for<'f> FnMut(K::SelfType<'f>, V::SelfType<'f>) -> bool>(
174        &mut self,
175        range: impl RangeBounds<KR> + 'a,
176        mut predicate: F,
177    ) -> Result
178    where
179        KR: Borrow<K::SelfType<'a>> + 'a,
180    {
181        if self.transaction.cdc_log.is_some() {
182            // Route through extract_from_if so CDC events are recorded for each removal
183            for result in self.extract_from_if(range, move |k, v| !predicate(k, v))? {
184                result?;
185            }
186            Ok(())
187        } else {
188            self.tree.retain_in(predicate, range)
189        }
190    }
191
192    /// Removes all key-value pairs in the given range
193    ///
194    /// Returns the number of entries removed
195    pub fn drain<'a, KR>(&mut self, range: impl RangeBounds<KR> + 'a) -> Result<u64>
196    where
197        KR: Borrow<K::SelfType<'a>> + 'a,
198    {
199        let mut count = 0u64;
200        for result in self.extract_from_if(range, |_, _| true)? {
201            result?;
202            count += 1;
203        }
204        Ok(count)
205    }
206
207    /// Removes all key-value pairs from the table
208    ///
209    /// Returns the number of entries removed
210    pub fn drain_all(&mut self) -> Result<u64> {
211        self.drain::<K::SelfType<'_>>(..)
212    }
213
214    /// Insert mapping of the given key to the given value
215    ///
216    /// If key is already present it is replaced
217    ///
218    /// Returns the old value, if the key was present in the table, otherwise None is returned
219    pub fn insert<'k, 'v>(
220        &mut self,
221        key: impl Borrow<K::SelfType<'k>>,
222        value: impl Borrow<V::SelfType<'v>>,
223    ) -> Result<Option<AccessGuard<'_, V>>> {
224        let value_len = V::as_bytes(value.borrow()).as_ref().len();
225        if value_len > MAX_VALUE_LENGTH {
226            return Err(StorageError::ValueTooLarge(value_len));
227        }
228        let key_len = K::as_bytes(key.borrow()).as_ref().len();
229        if key_len > MAX_VALUE_LENGTH {
230            return Err(StorageError::ValueTooLarge(key_len));
231        }
232        if value_len + key_len > MAX_PAIR_LENGTH {
233            return Err(StorageError::ValueTooLarge(value_len + key_len));
234        }
235        let result = self.tree.insert(key.borrow(), value.borrow())?;
236        if self.transaction.cdc_log.is_some() {
237            let old_value = result
238                .as_ref()
239                .map(|g| V::as_bytes(&g.value()).as_ref().to_vec());
240            self.transaction.record_cdc(CdcEvent {
241                table_name: self.name.clone(),
242                op: if old_value.is_some() {
243                    ChangeOp::Update
244                } else {
245                    ChangeOp::Insert
246                },
247                key: K::as_bytes(key.borrow()).as_ref().to_vec(),
248                new_value: Some(V::as_bytes(value.borrow()).as_ref().to_vec()),
249                old_value,
250            });
251        }
252        Ok(result)
253    }
254
255    /// Removes the given key
256    ///
257    /// Returns the old value, if the key was present in the table
258    pub fn remove<'a>(
259        &mut self,
260        key: impl Borrow<K::SelfType<'a>>,
261    ) -> Result<Option<AccessGuard<'_, V>>> {
262        let result = self.tree.remove(key.borrow())?;
263        if self.transaction.cdc_log.is_some() && result.is_some() {
264            let old_value = result
265                .as_ref()
266                .map(|g| V::as_bytes(&g.value()).as_ref().to_vec());
267            self.transaction.record_cdc(CdcEvent {
268                table_name: self.name.clone(),
269                op: ChangeOp::Delete,
270                key: K::as_bytes(key.borrow()).as_ref().to_vec(),
271                new_value: None,
272                old_value,
273            });
274        }
275        Ok(result)
276    }
277
278    /// Atomically read-modify-write a value using a [`MergeOperator`].
279    ///
280    /// Reads the current value for `key` (if any), passes the raw bytes to
281    /// `operator.merge()` along with `operand`, and writes the result back.
282    /// If the operator returns `None`, the key is removed.
283    ///
284    /// This is equivalent to a get-then-insert but expressed as a single call.
285    /// The operation is atomic within the current write transaction.
286    pub fn merge<'k>(
287        &mut self,
288        key: impl Borrow<K::SelfType<'k>>,
289        operand: &[u8],
290        operator: &dyn MergeOperator,
291    ) -> Result<()> {
292        let key_ref = key.borrow();
293
294        // Copy key and existing value bytes, then drop the access guard
295        // to release the immutable borrow before calling insert/remove.
296        // Copy key and existing value bytes, then drop the access guard
297        // to release the immutable borrow before calling insert/remove.
298        let key_bytes = K::as_bytes(key_ref).as_ref().to_vec();
299        let existing_bytes: Option<Vec<u8>> = {
300            let guard = self.get(key_ref)?;
301            guard.map(|g| V::as_bytes(&g.value()).as_ref().to_vec())
302        };
303
304        let merged = operator.merge(&key_bytes, existing_bytes.as_deref(), operand);
305
306        let key_ref = K::from_bytes(&key_bytes);
307        match merged {
308            Some(new_bytes) => {
309                let new_value = V::from_bytes(&new_bytes);
310                self.insert(&key_ref, &new_value)?;
311            }
312            None => {
313                self.remove(&key_ref)?;
314            }
315        }
316        Ok(())
317    }
318
319    /// Type-safe variant of [`merge()`](Self::merge) that serializes the operand
320    /// via `V::as_bytes()` before passing it to the operator.
321    pub fn merge_in<'k, 'v>(
322        &mut self,
323        key: impl Borrow<K::SelfType<'k>>,
324        operand: impl Borrow<V::SelfType<'v>>,
325        operator: &dyn MergeOperator,
326    ) -> Result<()> {
327        let operand_bytes = V::as_bytes(operand.borrow()).as_ref().to_vec();
328        self.merge(key, &operand_bytes, operator)
329    }
330}
331
332impl<K: Key + 'static, V: MutInPlaceValue + 'static> Table<'_, K, V> {
333    /// Reserve space to insert a key-value pair
334    ///
335    /// If key is already present it is replaced
336    ///
337    /// The returned reference will have length equal to `value_length`
338    pub fn insert_reserve<'a>(
339        &mut self,
340        key: impl Borrow<K::SelfType<'a>>,
341        value_length: usize,
342    ) -> Result<AccessGuardMutInPlace<'_, V>> {
343        if value_length > MAX_VALUE_LENGTH {
344            return Err(StorageError::ValueTooLarge(value_length));
345        }
346        let key_len = K::as_bytes(key.borrow()).as_ref().len();
347        if key_len > MAX_VALUE_LENGTH {
348            return Err(StorageError::ValueTooLarge(key_len));
349        }
350        if value_length + key_len > MAX_PAIR_LENGTH {
351            return Err(StorageError::ValueTooLarge(value_length + key_len));
352        }
353        self.tree.insert_reserve(key.borrow(), value_length)
354    }
355}
356
357impl<K: Key + 'static, V: Value + 'static> ReadableTableMetadata for Table<'_, K, V> {
358    fn stats(&self) -> Result<TableStats> {
359        let tree_stats = self.tree.stats()?;
360
361        Ok(TableStats {
362            tree_height: tree_stats.tree_height,
363            leaf_pages: tree_stats.leaf_pages,
364            branch_pages: tree_stats.branch_pages,
365            stored_leaf_bytes: tree_stats.stored_leaf_bytes,
366            metadata_bytes: tree_stats.metadata_bytes,
367            fragmented_bytes: tree_stats.fragmented_bytes,
368        })
369    }
370
371    fn len(&self) -> Result<u64> {
372        self.tree.len()
373    }
374}
375
376impl<K: Key + 'static, V: Value + 'static> ReadableTable<K, V> for Table<'_, K, V> {
377    fn get<'a>(&self, key: impl Borrow<K::SelfType<'a>>) -> Result<Option<AccessGuard<'_, V>>> {
378        self.tree.get(key.borrow())
379    }
380
381    fn range<'a, KR>(&self, range: impl RangeBounds<KR> + 'a) -> Result<Range<'_, K, V>>
382    where
383        KR: Borrow<K::SelfType<'a>> + 'a,
384    {
385        self.tree
386            .range(&range)
387            .map(|x| Range::new(x, self.transaction.transaction_guard()))
388    }
389
390    fn first(&self) -> Result<Option<(AccessGuard<'_, K>, AccessGuard<'_, V>)>> {
391        self.tree.first()
392    }
393
394    fn last(&self) -> Result<Option<(AccessGuard<'_, K>, AccessGuard<'_, V>)>> {
395        self.tree.last()
396    }
397}
398
399impl<K: Key, V: Value> Sealed for Table<'_, K, V> {}
400
401impl<K: Key + 'static, V: Value + 'static> Drop for Table<'_, K, V> {
402    fn drop(&mut self) {
403        self.transaction.close_table(
404            &self.name,
405            &self.tree,
406            self.tree.get_root().map(|x| x.length).unwrap_or_default(),
407        );
408    }
409}
410
411fn debug_helper<K: Key + 'static, V: Value + 'static>(
412    f: &mut Formatter<'_>,
413    name: &str,
414    len: Result<u64>,
415    first: Result<Option<(AccessGuard<K>, AccessGuard<V>)>>,
416    last: Result<Option<(AccessGuard<K>, AccessGuard<V>)>>,
417) -> core::fmt::Result {
418    write!(f, "Table [ name: \"{name}\", ")?;
419    if let Ok(len) = len {
420        if len == 0 {
421            write!(f, "No entries")?;
422        } else if len == 1 {
423            if let Ok(first) = first {
424                let (key, value) = first.as_ref().unwrap();
425                write!(f, "One key-value: {:?} = {:?}", key.value(), value.value())?;
426            } else {
427                write!(f, "I/O Error accessing table!")?;
428            }
429        } else {
430            if let Ok(first) = first {
431                let (key, value) = first.as_ref().unwrap();
432                write!(f, "first: {:?} = {:?}, ", key.value(), value.value())?;
433            } else {
434                write!(f, "I/O Error accessing table!")?;
435            }
436            if len > 2 {
437                write!(f, "...{} more entries..., ", len - 2)?;
438            }
439            if let Ok(last) = last {
440                let (key, value) = last.as_ref().unwrap();
441                write!(f, "last: {:?} = {:?}", key.value(), value.value())?;
442            } else {
443                write!(f, "I/O Error accessing table!")?;
444            }
445        }
446    } else {
447        write!(f, "I/O Error accessing table!")?;
448    }
449    write!(f, " ]")?;
450
451    Ok(())
452}
453
454impl<K: Key + 'static, V: Value + 'static> Debug for Table<'_, K, V> {
455    fn fmt(&self, f: &mut Formatter<'_>) -> core::fmt::Result {
456        debug_helper(f, &self.name, self.len(), self.first(), self.last())
457    }
458}
459
460pub trait ReadableTableMetadata {
461    /// Retrieves information about storage usage for the table
462    fn stats(&self) -> Result<TableStats>;
463
464    /// Returns the number of entries in the table
465    fn len(&self) -> Result<u64>;
466
467    /// Returns `true` if the table is empty
468    fn is_empty(&self) -> Result<bool> {
469        Ok(self.len()? == 0)
470    }
471}
472
473pub trait ReadableTable<K: Key + 'static, V: Value + 'static>: ReadableTableMetadata {
474    /// Returns the value corresponding to the given key
475    fn get<'a>(&self, key: impl Borrow<K::SelfType<'a>>) -> Result<Option<AccessGuard<'_, V>>>;
476
477    /// Returns a double-ended iterator over a range of elements in the table
478    ///
479    /// # Examples
480    ///
481    /// Usage:
482    /// ```rust
483    /// use shodh_redb::*;
484    /// # use tempfile::NamedTempFile;
485    /// const TABLE: TableDefinition<&str, u64> = TableDefinition::new("my_data");
486    ///
487    /// # fn main() -> Result<(), Error> {
488    /// # #[cfg(not(target_os = "wasi"))]
489    /// # let tmpfile = NamedTempFile::new().unwrap();
490    /// # #[cfg(target_os = "wasi")]
491    /// # let tmpfile = NamedTempFile::new_in("/tmp").unwrap();
492    /// # let filename = tmpfile.path();
493    /// let db = Database::create(filename)?;
494    /// let write_txn = db.begin_write()?;
495    /// {
496    ///     let mut table = write_txn.open_table(TABLE)?;
497    ///     table.insert("a", &0)?;
498    ///     table.insert("b", &1)?;
499    ///     table.insert("c", &2)?;
500    /// }
501    /// write_txn.commit()?;
502    ///
503    /// let read_txn = db.begin_read()?;
504    /// let table = read_txn.open_table(TABLE)?;
505    /// let mut iter = table.range("a".."c")?;
506    /// let (key, value) = iter.next().unwrap()?;
507    /// assert_eq!("a", key.value());
508    /// assert_eq!(0, value.value());
509    /// # Ok(())
510    /// # }
511    /// ```
512    fn range<'a, KR>(&self, range: impl RangeBounds<KR> + 'a) -> Result<Range<'_, K, V>>
513    where
514        KR: Borrow<K::SelfType<'a>> + 'a;
515
516    /// Returns the first key-value pair in the table, if it exists
517    fn first(&self) -> Result<Option<(AccessGuard<'_, K>, AccessGuard<'_, V>)>>;
518
519    /// Returns the last key-value pair in the table, if it exists
520    fn last(&self) -> Result<Option<(AccessGuard<'_, K>, AccessGuard<'_, V>)>>;
521
522    /// Returns a double-ended iterator over all elements in the table
523    fn iter(&self) -> Result<Range<'_, K, V>> {
524        self.range::<K::SelfType<'_>>(..)
525    }
526}
527
528/// A read-only untyped table
529pub struct ReadOnlyUntypedTable {
530    tree: RawBtree,
531}
532
533impl Sealed for ReadOnlyUntypedTable {}
534
535impl ReadableTableMetadata for ReadOnlyUntypedTable {
536    /// Retrieves information about storage usage for the table
537    fn stats(&self) -> Result<TableStats> {
538        let tree_stats = self.tree.stats()?;
539
540        Ok(TableStats {
541            tree_height: tree_stats.tree_height,
542            leaf_pages: tree_stats.leaf_pages,
543            branch_pages: tree_stats.branch_pages,
544            stored_leaf_bytes: tree_stats.stored_leaf_bytes,
545            metadata_bytes: tree_stats.metadata_bytes,
546            fragmented_bytes: tree_stats.fragmented_bytes,
547        })
548    }
549
550    fn len(&self) -> Result<u64> {
551        self.tree.len()
552    }
553}
554
555impl ReadOnlyUntypedTable {
556    pub(crate) fn new(
557        root_page: Option<BtreeHeader>,
558        fixed_key_size: Option<usize>,
559        fixed_value_size: Option<usize>,
560        mem: Arc<TransactionalMemory>,
561    ) -> Self {
562        Self {
563            tree: RawBtree::new(root_page, fixed_key_size, fixed_value_size, mem),
564        }
565    }
566
567    /// Iterate all entries as raw key/value byte slices.
568    /// Useful for CLI tools, debugging, and migration without knowing types at compile time.
569    pub fn iter_raw(&self) -> Result<RawEntryIter> {
570        self.tree.raw_iter()
571    }
572}
573
574/// A read-only table
575pub struct ReadOnlyTable<K: Key + 'static, V: Value + 'static> {
576    name: String,
577    tree: Btree<K, V>,
578    transaction_guard: Arc<TransactionGuard>,
579}
580
581impl<K: Key + 'static, V: Value + 'static> TableHandle for ReadOnlyTable<K, V> {
582    fn name(&self) -> &str {
583        &self.name
584    }
585}
586
587impl<K: Key + 'static, V: Value + 'static> ReadOnlyTable<K, V> {
588    pub(crate) fn new(
589        name: String,
590        root_page: Option<BtreeHeader>,
591        hint: PageHint,
592        guard: Arc<TransactionGuard>,
593        mem: Arc<TransactionalMemory>,
594    ) -> Result<ReadOnlyTable<K, V>> {
595        Ok(ReadOnlyTable {
596            name,
597            tree: Btree::new(root_page, hint, guard.clone(), mem)?,
598            transaction_guard: guard,
599        })
600    }
601
602    /// Create a `ReadOnlyTable` that never applies value-level decompression.
603    /// Used for system/internal tables whose values are always stored uncompressed.
604    pub(crate) fn new_uncompressed(
605        name: String,
606        root_page: Option<BtreeHeader>,
607        hint: PageHint,
608        guard: Arc<TransactionGuard>,
609        mem: Arc<TransactionalMemory>,
610    ) -> Result<ReadOnlyTable<K, V>> {
611        Ok(ReadOnlyTable {
612            name,
613            tree: Btree::new_uncompressed(root_page, hint, guard.clone(), mem)?,
614            transaction_guard: guard,
615        })
616    }
617
618    /// This method is like [`ReadableTable::get()`], but the [`AccessGuard`] is reference counted
619    /// and keeps the transaction alive until it is dropped.
620    pub fn get<'a>(
621        &self,
622        key: impl Borrow<K::SelfType<'a>>,
623    ) -> Result<Option<AccessGuard<'static, V>>> {
624        self.tree.get(key.borrow())
625    }
626
627    /// This method is like [`ReadableTable::range()`], but the iterator is reference counted and keeps the transaction
628    /// alive until it is dropped.
629    pub fn range<'a, KR>(&self, range: impl RangeBounds<KR>) -> Result<Range<'static, K, V>>
630    where
631        KR: Borrow<K::SelfType<'a>>,
632    {
633        self.tree
634            .range(&range)
635            .map(|x| Range::new(x, self.transaction_guard.clone()))
636    }
637}
638
639impl<K: Key + 'static, V: Value + 'static> ReadableTableMetadata for ReadOnlyTable<K, V> {
640    fn stats(&self) -> Result<TableStats> {
641        let tree_stats = self.tree.stats()?;
642
643        Ok(TableStats {
644            tree_height: tree_stats.tree_height,
645            leaf_pages: tree_stats.leaf_pages,
646            branch_pages: tree_stats.branch_pages,
647            stored_leaf_bytes: tree_stats.stored_leaf_bytes,
648            metadata_bytes: tree_stats.metadata_bytes,
649            fragmented_bytes: tree_stats.fragmented_bytes,
650        })
651    }
652
653    fn len(&self) -> Result<u64> {
654        self.tree.len()
655    }
656}
657
658impl<K: Key + 'static, V: Value + 'static> ReadableTable<K, V> for ReadOnlyTable<K, V> {
659    fn get<'a>(&self, key: impl Borrow<K::SelfType<'a>>) -> Result<Option<AccessGuard<'_, V>>> {
660        self.tree.get(key.borrow())
661    }
662
663    fn range<'a, KR>(&self, range: impl RangeBounds<KR> + 'a) -> Result<Range<'_, K, V>>
664    where
665        KR: Borrow<K::SelfType<'a>> + 'a,
666    {
667        self.tree
668            .range(&range)
669            .map(|x| Range::new(x, self.transaction_guard.clone()))
670    }
671
672    fn first(&self) -> Result<Option<(AccessGuard<'_, K>, AccessGuard<'_, V>)>> {
673        self.tree.first()
674    }
675
676    fn last(&self) -> Result<Option<(AccessGuard<'_, K>, AccessGuard<'_, V>)>> {
677        self.tree.last()
678    }
679}
680
681impl<K: Key, V: Value> Sealed for ReadOnlyTable<K, V> {}
682
683impl<K: Key + 'static, V: Value + 'static> Debug for ReadOnlyTable<K, V> {
684    fn fmt(&self, f: &mut Formatter<'_>) -> core::fmt::Result {
685        debug_helper(f, &self.name, self.len(), self.first(), self.last())
686    }
687}
688
689pub struct ExtractIf<
690    'a,
691    K: Key + 'static,
692    V: Value + 'static,
693    F: for<'f> FnMut(K::SelfType<'f>, V::SelfType<'f>) -> bool,
694> {
695    inner: BtreeExtractIf<'a, K, V, F>,
696    cdc: Option<(&'a WriteTransaction, String)>,
697}
698
699impl<
700    'a,
701    K: Key + 'static,
702    V: Value + 'static,
703    F: for<'f> FnMut(K::SelfType<'f>, V::SelfType<'f>) -> bool,
704> ExtractIf<'a, K, V, F>
705{
706    pub(crate) fn new(inner: BtreeExtractIf<'a, K, V, F>) -> Self {
707        Self { inner, cdc: None }
708    }
709
710    pub(crate) fn with_cdc(
711        inner: BtreeExtractIf<'a, K, V, F>,
712        transaction: &'a WriteTransaction,
713        table_name: String,
714    ) -> Self {
715        Self {
716            inner,
717            cdc: Some((transaction, table_name)),
718        }
719    }
720}
721
722impl<
723    'a,
724    K: Key + 'static,
725    V: Value + 'static,
726    F: for<'f> FnMut(K::SelfType<'f>, V::SelfType<'f>) -> bool,
727> Iterator for ExtractIf<'a, K, V, F>
728{
729    type Item = Result<(AccessGuard<'a, K>, AccessGuard<'a, V>)>;
730
731    fn next(&mut self) -> Option<Self::Item> {
732        let entry = self.inner.next()?;
733        Some(entry.map(|entry| {
734            let (page, key_range, value_range, decompressed_value) = entry.into_raw();
735            let key = AccessGuard::with_page(page.clone(), key_range);
736            let value = if let Some(bytes) = decompressed_value {
737                AccessGuard::with_owned_value(bytes)
738            } else {
739                AccessGuard::with_page(page, value_range)
740            };
741            if let Some((txn, table_name)) = &self.cdc {
742                txn.record_cdc(CdcEvent {
743                    table_name: table_name.clone(),
744                    op: ChangeOp::Delete,
745                    key: K::as_bytes(&key.value()).as_ref().to_vec(),
746                    new_value: None,
747                    old_value: Some(V::as_bytes(&value.value()).as_ref().to_vec()),
748                });
749            }
750            (key, value)
751        }))
752    }
753}
754
755impl<
756    K: Key + 'static,
757    V: Value + 'static,
758    F: for<'f> FnMut(K::SelfType<'f>, V::SelfType<'f>) -> bool,
759> DoubleEndedIterator for ExtractIf<'_, K, V, F>
760{
761    fn next_back(&mut self) -> Option<Self::Item> {
762        let entry = self.inner.next_back()?;
763        Some(entry.map(|entry| {
764            let (page, key_range, value_range, decompressed_value) = entry.into_raw();
765            let key = AccessGuard::with_page(page.clone(), key_range);
766            let value = if let Some(bytes) = decompressed_value {
767                AccessGuard::with_owned_value(bytes)
768            } else {
769                AccessGuard::with_page(page, value_range)
770            };
771            if let Some((txn, table_name)) = &self.cdc {
772                txn.record_cdc(CdcEvent {
773                    table_name: table_name.clone(),
774                    op: ChangeOp::Delete,
775                    key: K::as_bytes(&key.value()).as_ref().to_vec(),
776                    new_value: None,
777                    old_value: Some(V::as_bytes(&value.value()).as_ref().to_vec()),
778                });
779            }
780            (key, value)
781        }))
782    }
783}
784
785#[derive(Clone)]
786pub struct Range<'a, K: Key + 'static, V: Value + 'static> {
787    inner: BtreeRangeIter<K, V>,
788    _transaction_guard: Arc<TransactionGuard>,
789    // This lifetime is here so that `&` can be held on `Table` preventing concurrent mutation
790    _lifetime: PhantomData<&'a ()>,
791}
792
793impl<K: Key + 'static, V: Value + 'static> Range<'_, K, V> {
794    pub(super) fn new(inner: BtreeRangeIter<K, V>, guard: Arc<TransactionGuard>) -> Self {
795        Self {
796            inner,
797            _transaction_guard: guard,
798            _lifetime: Default::default(),
799        }
800    }
801}
802
803impl<'a, K: Key + 'static, V: Value + 'static> Iterator for Range<'a, K, V> {
804    type Item = Result<(AccessGuard<'a, K>, AccessGuard<'a, V>)>;
805
806    fn next(&mut self) -> Option<Self::Item> {
807        self.inner.next().map(|x| {
808            x.map(|entry| {
809                let (page, key_range, value_range, decompressed_value) = entry.into_raw();
810                let key = AccessGuard::with_page(page.clone(), key_range);
811                let value = if let Some(bytes) = decompressed_value {
812                    AccessGuard::with_owned_value(bytes)
813                } else {
814                    AccessGuard::with_page(page, value_range)
815                };
816                (key, value)
817            })
818        })
819    }
820}
821
822impl<K: Key + 'static, V: Value + 'static> DoubleEndedIterator for Range<'_, K, V> {
823    fn next_back(&mut self) -> Option<Self::Item> {
824        self.inner.next_back().map(|x| {
825            x.map(|entry| {
826                let (page, key_range, value_range, decompressed_value) = entry.into_raw();
827                let key = AccessGuard::with_page(page.clone(), key_range);
828                let value = if let Some(bytes) = decompressed_value {
829                    AccessGuard::with_owned_value(bytes)
830                } else {
831                    AccessGuard::with_page(page, value_range)
832                };
833                (key, value)
834            })
835        })
836    }
837}
838
839// ---------------------------------------------------------------------------
840// storage_traits::WriteTable implementation for legacy Table
841// ---------------------------------------------------------------------------
842
843use crate::storage_traits::OwnedKv;
844
845/// Iterator adapter that converts `(AccessGuard<K>, AccessGuard<V>)` pairs
846/// into `(OwnedKv<K>, OwnedKv<V>)` pairs for the storage trait interface.
847pub struct LegacyRangeIter<'a, K: Key + 'static, V: Value + 'static> {
848    inner: Range<'a, K, V>,
849}
850
851impl<K: Key + 'static, V: Value + 'static> Iterator for LegacyRangeIter<'_, K, V> {
852    type Item = crate::Result<(OwnedKv<K>, OwnedKv<V>)>;
853
854    fn next(&mut self) -> Option<Self::Item> {
855        self.inner.next().map(|result| {
856            result.map(|(k_guard, v_guard)| {
857                let k_bytes = K::as_bytes(&k_guard.value()).as_ref().to_vec();
858                let v_bytes = V::as_bytes(&v_guard.value()).as_ref().to_vec();
859                (OwnedKv::new(k_bytes), OwnedKv::new(v_bytes))
860            })
861        })
862    }
863}
864
865impl<K: Key + 'static, V: Value + 'static> crate::storage_traits::WriteTable<K, V>
866    for Table<'_, K, V>
867{
868    type RangeIter<'a>
869        = LegacyRangeIter<'a, K, V>
870    where
871        Self: 'a;
872
873    fn st_get(&self, key: &K::SelfType<'_>) -> crate::Result<Option<OwnedKv<V>>> {
874        ReadableTable::get(self, key).map(|opt| {
875            opt.map(|guard| {
876                let bytes = V::as_bytes(&guard.value()).as_ref().to_vec();
877                OwnedKv::new(bytes)
878            })
879        })
880    }
881
882    fn st_insert(
883        &mut self,
884        key: &K::SelfType<'_>,
885        value: &V::SelfType<'_>,
886    ) -> crate::Result<Option<OwnedKv<V>>> {
887        self.insert(key, value).map(|opt| {
888            opt.map(|guard| {
889                let bytes = V::as_bytes(&guard.value()).as_ref().to_vec();
890                OwnedKv::new(bytes)
891            })
892        })
893    }
894
895    fn st_remove(&mut self, key: &K::SelfType<'_>) -> crate::Result<Option<OwnedKv<V>>> {
896        self.remove(key).map(|opt| {
897            opt.map(|guard| {
898                let bytes = V::as_bytes(&guard.value()).as_ref().to_vec();
899                OwnedKv::new(bytes)
900            })
901        })
902    }
903
904    fn st_range<'a>(
905        &'a self,
906        start: Option<&K::SelfType<'_>>,
907        end: Option<&K::SelfType<'_>>,
908        start_inclusive: bool,
909        end_inclusive: bool,
910    ) -> crate::Result<Self::RangeIter<'a>> {
911        let inner =
912            legacy_build_range::<K, V, Self>(self, start, end, start_inclusive, end_inclusive)?;
913        Ok(LegacyRangeIter { inner })
914    }
915
916    fn st_drain_all(&mut self) -> crate::Result<u64> {
917        self.drain_all()
918    }
919}
920
921// ---------------------------------------------------------------------------
922// storage_traits::ReadTable for Table (writable tables are also readable)
923// ---------------------------------------------------------------------------
924
925impl<K: Key + 'static, V: Value + 'static> crate::storage_traits::ReadTable<K, V>
926    for Table<'_, K, V>
927{
928    type RangeIter<'a>
929        = LegacyRangeIter<'a, K, V>
930    where
931        Self: 'a;
932
933    fn st_get(&self, key: &K::SelfType<'_>) -> crate::Result<Option<OwnedKv<V>>> {
934        ReadableTable::get(self, key).map(|opt| {
935            opt.map(|guard| {
936                let bytes = V::as_bytes(&guard.value()).as_ref().to_vec();
937                OwnedKv::new(bytes)
938            })
939        })
940    }
941
942    fn st_range<'a>(
943        &'a self,
944        start: Option<&K::SelfType<'_>>,
945        end: Option<&K::SelfType<'_>>,
946        start_inclusive: bool,
947        end_inclusive: bool,
948    ) -> crate::Result<Self::RangeIter<'a>> {
949        let inner =
950            legacy_build_range::<K, V, Self>(self, start, end, start_inclusive, end_inclusive)?;
951        Ok(LegacyRangeIter { inner })
952    }
953}
954
955// ---------------------------------------------------------------------------
956// storage_traits::ReadTable for ReadOnlyTable
957// ---------------------------------------------------------------------------
958
959/// Iterator adapter for read-only table range scans.
960pub struct LegacyReadOnlyRangeIter<'a, K: Key + 'static, V: Value + 'static> {
961    inner: Range<'a, K, V>,
962}
963
964impl<K: Key + 'static, V: Value + 'static> Iterator for LegacyReadOnlyRangeIter<'_, K, V> {
965    type Item = crate::Result<(OwnedKv<K>, OwnedKv<V>)>;
966
967    fn next(&mut self) -> Option<Self::Item> {
968        self.inner.next().map(|result| {
969            result.map(|(k_guard, v_guard)| {
970                let k_bytes = K::as_bytes(&k_guard.value()).as_ref().to_vec();
971                let v_bytes = V::as_bytes(&v_guard.value()).as_ref().to_vec();
972                (OwnedKv::new(k_bytes), OwnedKv::new(v_bytes))
973            })
974        })
975    }
976}
977
978impl<K: Key + 'static, V: Value + 'static> crate::storage_traits::ReadTable<K, V>
979    for ReadOnlyTable<K, V>
980{
981    type RangeIter<'a>
982        = LegacyReadOnlyRangeIter<'a, K, V>
983    where
984        Self: 'a;
985
986    fn st_get(&self, key: &K::SelfType<'_>) -> crate::Result<Option<OwnedKv<V>>> {
987        self.get(key).map(|opt| {
988            opt.map(|guard| {
989                let bytes = V::as_bytes(&guard.value()).as_ref().to_vec();
990                OwnedKv::new(bytes)
991            })
992        })
993    }
994
995    fn st_range<'a>(
996        &'a self,
997        start: Option<&K::SelfType<'_>>,
998        end: Option<&K::SelfType<'_>>,
999        start_inclusive: bool,
1000        end_inclusive: bool,
1001    ) -> crate::Result<Self::RangeIter<'a>> {
1002        let inner =
1003            legacy_build_range_ro::<K, V>(self, start, end, start_inclusive, end_inclusive)?;
1004        Ok(LegacyReadOnlyRangeIter { inner })
1005    }
1006}
1007
1008// ---------------------------------------------------------------------------
1009// Range construction helpers (shared logic for building Range from bounds)
1010// ---------------------------------------------------------------------------
1011
1012/// Build a `Range` from optional start/end keys for types implementing `ReadableTable`.
1013fn legacy_build_range<'a, K: Key + 'static, V: Value + 'static, T: ReadableTable<K, V>>(
1014    table: &'a T,
1015    start: Option<&K::SelfType<'_>>,
1016    end: Option<&K::SelfType<'_>>,
1017    start_inclusive: bool,
1018    end_inclusive: bool,
1019) -> crate::Result<Range<'a, K, V>> {
1020    use core::ops::Bound;
1021
1022    // Hoist byte vecs so they outlive the Bound references.
1023    let s_bytes = start.map(|s| K::as_bytes(s).as_ref().to_vec());
1024    let e_bytes = end.map(|e| K::as_bytes(e).as_ref().to_vec());
1025
1026    let start_bound = match s_bytes.as_deref() {
1027        Some(b) => {
1028            let s_val = K::from_bytes(b);
1029            if start_inclusive {
1030                Bound::Included(s_val)
1031            } else {
1032                Bound::Excluded(s_val)
1033            }
1034        }
1035        None => Bound::Unbounded,
1036    };
1037
1038    let end_bound = match e_bytes.as_deref() {
1039        Some(b) => {
1040            let e_val = K::from_bytes(b);
1041            if end_inclusive {
1042                Bound::Included(e_val)
1043            } else {
1044                Bound::Excluded(e_val)
1045            }
1046        }
1047        None => Bound::Unbounded,
1048    };
1049
1050    table.range::<K::SelfType<'_>>((start_bound, end_bound))
1051}
1052
1053/// Build a `Range` from optional start/end keys for `ReadOnlyTable`.
1054///
1055/// `ReadOnlyTable` has its own `range()` method (not via `ReadableTable` trait)
1056/// with a slightly different signature (returns `Range<'static, K, V>`).
1057fn legacy_build_range_ro<'a, K: Key + 'static, V: Value + 'static>(
1058    table: &'a ReadOnlyTable<K, V>,
1059    start: Option<&K::SelfType<'_>>,
1060    end: Option<&K::SelfType<'_>>,
1061    start_inclusive: bool,
1062    end_inclusive: bool,
1063) -> crate::Result<Range<'a, K, V>> {
1064    use core::ops::Bound;
1065
1066    let s_bytes = start.map(|s| K::as_bytes(s).as_ref().to_vec());
1067    let e_bytes = end.map(|e| K::as_bytes(e).as_ref().to_vec());
1068
1069    let start_bound = match s_bytes.as_deref() {
1070        Some(b) => {
1071            let s_val = K::from_bytes(b);
1072            if start_inclusive {
1073                Bound::Included(s_val)
1074            } else {
1075                Bound::Excluded(s_val)
1076            }
1077        }
1078        None => Bound::Unbounded,
1079    };
1080
1081    let end_bound = match e_bytes.as_deref() {
1082        Some(b) => {
1083            let e_val = K::from_bytes(b);
1084            if end_inclusive {
1085                Bound::Included(e_val)
1086            } else {
1087                Bound::Excluded(e_val)
1088            }
1089        }
1090        None => Bound::Unbounded,
1091    };
1092
1093    table.range::<K::SelfType<'_>>((start_bound, end_bound))
1094}