redb_extras/table_buckets/
iterator.rs

1//! Table bucket range iterator implementation.
2//!
3//! Provides efficient iteration over bucket ranges for specific base keys
4//! by opening bucket-specific tables on demand.
5
6use crate::key_buckets::BucketError;
7use crate::table_buckets::TableBucketBuilder;
8use redb::{ReadOnlyMultimapTable, ReadOnlyTable, ReadTransaction, TableError};
9use std::borrow::Borrow;
10use std::collections::VecDeque;
11use std::marker::PhantomData;
12
13/// Iterator over a range of buckets for a specific base key.
14///
15/// Each bucket is stored in its own table. The iterator opens each bucket
16/// table and performs a point lookup for the base key.
17///
18/// Implements `DoubleEndedIterator` for reverse iteration.
19pub struct TableBucketRangeIterator<'a, K, V>
20where
21    K: redb::Key + Clone + 'static,
22    for<'b> K: Borrow<K::SelfType<'b>>,
23    V: redb::Value + 'static,
24    for<'b> V: From<V::SelfType<'b>>,
25{
26    txn: &'a ReadTransaction,
27    builder: &'a TableBucketBuilder,
28    base_key: K,
29    start_bucket: u64,
30    end_bucket: u64,
31    front_bucket: i64,
32    back_bucket: i64,
33    finished: bool,
34    _phantom: PhantomData<V>,
35}
36
37impl<'a, K, V> TableBucketRangeIterator<'a, K, V>
38where
39    K: redb::Key + Clone + 'static,
40    for<'b> K: Borrow<K::SelfType<'b>>,
41    V: redb::Value + 'static,
42    for<'b> V: From<V::SelfType<'b>>,
43{
44    /// Create a new table bucket range iterator.
45    pub fn new(
46        txn: &'a ReadTransaction,
47        builder: &'a TableBucketBuilder,
48        base_key: K,
49        start_sequence: u64,
50        end_sequence: u64,
51    ) -> Result<Self, BucketError> {
52        if start_sequence > end_sequence {
53            return Err(BucketError::InvalidRange {
54                start: start_sequence,
55                end: end_sequence,
56            });
57        }
58
59        let bucket_size = builder.bucket_size();
60        let start_bucket = start_sequence / bucket_size;
61        let end_bucket = end_sequence / bucket_size;
62
63        Ok(Self {
64            txn,
65            builder,
66            base_key,
67            start_bucket,
68            end_bucket,
69            front_bucket: start_bucket as i64,
70            back_bucket: end_bucket as i64,
71            finished: false,
72            _phantom: PhantomData,
73        })
74    }
75
76    /// Get the bucket range.
77    pub fn bucket_range(&self) -> (u64, u64) {
78        (self.start_bucket, self.end_bucket)
79    }
80
81    fn open_table(&self, bucket: u64) -> Result<Option<ReadOnlyTable<K, V>>, BucketError> {
82        let definition = self.builder.table_definition::<K, V>(bucket);
83        match self.txn.open_table(definition) {
84            Ok(table) => Ok(Some(table)),
85            Err(TableError::TableDoesNotExist(_)) => Ok(None),
86            Err(err) => Err(BucketError::IterationError(format!(
87                "Failed to open bucket table {}: {}",
88                bucket, err
89            ))),
90        }
91    }
92}
93
94impl<'a, K, V> Iterator for TableBucketRangeIterator<'a, K, V>
95where
96    K: redb::Key + Clone + 'static,
97    for<'b> K: Borrow<K::SelfType<'b>>,
98    V: redb::Value + 'static,
99    for<'b> V: From<V::SelfType<'b>>,
100{
101    type Item = Result<V, BucketError>;
102
103    fn next(&mut self) -> Option<Self::Item> {
104        if self.finished {
105            return None;
106        }
107
108        while self.front_bucket <= self.back_bucket {
109            let bucket = self.front_bucket as u64;
110            self.front_bucket += 1;
111
112            let table = match self.open_table(bucket) {
113                Ok(Some(table)) => table,
114                Ok(None) => continue,
115                Err(err) => {
116                    self.finished = true;
117                    return Some(Err(err));
118                }
119            };
120
121            match table.get(self.base_key.clone()) {
122                Ok(Some(value_guard)) => {
123                    return Some(Ok(V::from(value_guard.value())));
124                }
125                Ok(None) => continue,
126                Err(err) => {
127                    self.finished = true;
128                    return Some(Err(BucketError::IterationError(format!(
129                        "Database error during point lookup: {}",
130                        err
131                    ))));
132                }
133            }
134        }
135
136        self.finished = true;
137        None
138    }
139}
140
141impl<'a, K, V> DoubleEndedIterator for TableBucketRangeIterator<'a, K, V>
142where
143    K: redb::Key + Clone + 'static,
144    for<'b> K: Borrow<K::SelfType<'b>>,
145    V: redb::Value + 'static,
146    for<'b> V: From<V::SelfType<'b>>,
147{
148    fn next_back(&mut self) -> Option<Self::Item> {
149        if self.finished {
150            return None;
151        }
152
153        while self.front_bucket <= self.back_bucket {
154            let bucket = self.back_bucket as u64;
155            self.back_bucket -= 1;
156
157            let table = match self.open_table(bucket) {
158                Ok(Some(table)) => table,
159                Ok(None) => continue,
160                Err(err) => {
161                    self.finished = true;
162                    return Some(Err(err));
163                }
164            };
165
166            match table.get(self.base_key.clone()) {
167                Ok(Some(value_guard)) => {
168                    return Some(Ok(V::from(value_guard.value())));
169                }
170                Ok(None) => continue,
171                Err(err) => {
172                    self.finished = true;
173                    return Some(Err(BucketError::IterationError(format!(
174                        "Database error during point lookup: {}",
175                        err
176                    ))));
177                }
178            }
179        }
180
181        self.finished = true;
182        None
183    }
184}
185
186/// Iterator over a range of buckets for a specific base key in multimap tables.
187///
188/// This iterator flattens the multimap values, yielding each value in order
189/// across the requested bucket range via per-bucket point lookups.
190///
191/// Implements `DoubleEndedIterator` to iterate buckets and values in reverse.
192pub struct TableBucketRangeMultimapIterator<'a, K, V>
193where
194    K: redb::Key + Clone + 'static,
195    for<'b> K: Borrow<K::SelfType<'b>>,
196    V: redb::Key + 'static,
197    for<'b> V: From<V::SelfType<'b>>,
198{
199    txn: &'a ReadTransaction,
200    builder: &'a TableBucketBuilder,
201    base_key: K,
202    start_bucket: u64,
203    end_bucket: u64,
204    front_bucket: i64,
205    back_bucket: i64,
206    finished: bool,
207    front_values: Option<VecDeque<V>>,
208    back_values: Option<VecDeque<V>>,
209}
210
211impl<'a, K, V> TableBucketRangeMultimapIterator<'a, K, V>
212where
213    K: redb::Key + Clone + 'static,
214    for<'b> K: Borrow<K::SelfType<'b>>,
215    V: redb::Key + 'static,
216    for<'b> V: From<V::SelfType<'b>>,
217{
218    /// Create a new table bucket range iterator for a multimap table.
219    pub fn new(
220        txn: &'a ReadTransaction,
221        builder: &'a TableBucketBuilder,
222        base_key: K,
223        start_sequence: u64,
224        end_sequence: u64,
225    ) -> Result<Self, BucketError> {
226        if start_sequence > end_sequence {
227            return Err(BucketError::InvalidRange {
228                start: start_sequence,
229                end: end_sequence,
230            });
231        }
232
233        let bucket_size = builder.bucket_size();
234        let start_bucket = start_sequence / bucket_size;
235        let end_bucket = end_sequence / bucket_size;
236
237        Ok(Self {
238            txn,
239            builder,
240            base_key,
241            start_bucket,
242            end_bucket,
243            front_bucket: start_bucket as i64,
244            back_bucket: end_bucket as i64,
245            finished: false,
246            front_values: None,
247            back_values: None,
248        })
249    }
250
251    /// Get the bucket range.
252    pub fn bucket_range(&self) -> (u64, u64) {
253        (self.start_bucket, self.end_bucket)
254    }
255
256    fn open_table(&self, bucket: u64) -> Result<Option<ReadOnlyMultimapTable<K, V>>, BucketError> {
257        let definition = self.builder.multimap_table_definition::<K, V>(bucket);
258        match self.txn.open_multimap_table(definition) {
259            Ok(table) => Ok(Some(table)),
260            Err(TableError::TableDoesNotExist(_)) => Ok(None),
261            Err(err) => Err(BucketError::IterationError(format!(
262                "Failed to open bucket table {}: {}",
263                bucket, err
264            ))),
265        }
266    }
267}
268
269impl<'a, K, V> Iterator for TableBucketRangeMultimapIterator<'a, K, V>
270where
271    K: redb::Key + Clone + 'static,
272    for<'b> K: Borrow<K::SelfType<'b>>,
273    V: redb::Key + 'static,
274    for<'b> V: From<V::SelfType<'b>>,
275{
276    type Item = Result<V, BucketError>;
277
278    fn next(&mut self) -> Option<Self::Item> {
279        if self.finished {
280            return None;
281        }
282
283        loop {
284            if let Some(values) = self.front_values.as_mut() {
285                if let Some(value) = values.pop_front() {
286                    return Some(Ok(value));
287                }
288                self.front_values = None;
289            }
290
291            if self.front_bucket > self.back_bucket {
292                self.finished = true;
293                return None;
294            }
295
296            let bucket = self.front_bucket as u64;
297            self.front_bucket += 1;
298
299            let table = match self.open_table(bucket) {
300                Ok(Some(table)) => table,
301                Ok(None) => continue,
302                Err(err) => {
303                    self.finished = true;
304                    return Some(Err(err));
305                }
306            };
307
308            match table.get(self.base_key.clone()) {
309                Ok(values) => {
310                    let mut collected = VecDeque::new();
311                    for value_result in values {
312                        match value_result {
313                            Ok(value_guard) => {
314                                collected.push_back(V::from(value_guard.value()));
315                            }
316                            Err(err) => {
317                                self.finished = true;
318                                return Some(Err(BucketError::IterationError(format!(
319                                    "Database error during point lookup: {}",
320                                    err
321                                ))));
322                            }
323                        }
324                    }
325                    if collected.is_empty() {
326                        continue;
327                    }
328                    self.front_values = Some(collected);
329                }
330                Err(err) => {
331                    self.finished = true;
332                    return Some(Err(BucketError::IterationError(format!(
333                        "Database error during point lookup: {}",
334                        err
335                    ))));
336                }
337            }
338        }
339    }
340}
341
342impl<'a, K, V> DoubleEndedIterator for TableBucketRangeMultimapIterator<'a, K, V>
343where
344    K: redb::Key + Clone + 'static,
345    for<'b> K: Borrow<K::SelfType<'b>>,
346    V: redb::Key + 'static,
347    for<'b> V: From<V::SelfType<'b>>,
348{
349    fn next_back(&mut self) -> Option<Self::Item> {
350        if self.finished {
351            return None;
352        }
353
354        loop {
355            if let Some(values) = self.back_values.as_mut() {
356                if let Some(value) = values.pop_back() {
357                    return Some(Ok(value));
358                }
359                self.back_values = None;
360            }
361
362            if self.front_bucket > self.back_bucket {
363                self.finished = true;
364                return None;
365            }
366
367            let bucket = self.back_bucket as u64;
368            self.back_bucket -= 1;
369
370            let table = match self.open_table(bucket) {
371                Ok(Some(table)) => table,
372                Ok(None) => continue,
373                Err(err) => {
374                    self.finished = true;
375                    return Some(Err(err));
376                }
377            };
378
379            match table.get(self.base_key.clone()) {
380                Ok(values) => {
381                    let mut collected = VecDeque::new();
382                    for value_result in values {
383                        match value_result {
384                            Ok(value_guard) => {
385                                collected.push_back(V::from(value_guard.value()));
386                            }
387                            Err(err) => {
388                                self.finished = true;
389                                return Some(Err(BucketError::IterationError(format!(
390                                    "Database error during point lookup: {}",
391                                    err
392                                ))));
393                            }
394                        }
395                    }
396                    if collected.is_empty() {
397                        continue;
398                    }
399                    self.back_values = Some(collected);
400                }
401                Err(err) => {
402                    self.finished = true;
403                    return Some(Err(BucketError::IterationError(format!(
404                        "Database error during point lookup: {}",
405                        err
406                    ))));
407                }
408            }
409        }
410    }
411}
412
413/// Extension trait for table bucket iteration on read transactions.
414pub trait TableBucketIterExt {
415    fn table_bucket_range<'a, K, V>(
416        &'a self,
417        builder: &'a TableBucketBuilder,
418        base_key: K,
419        start_sequence: u64,
420        end_sequence: u64,
421    ) -> Result<TableBucketRangeIterator<'a, K, V>, BucketError>
422    where
423        K: redb::Key + Clone + 'static,
424        for<'b> K: Borrow<K::SelfType<'b>>,
425        V: redb::Value + 'static,
426        for<'b> V: From<V::SelfType<'b>>;
427}
428
429impl TableBucketIterExt for ReadTransaction {
430    fn table_bucket_range<'a, K, V>(
431        &'a self,
432        builder: &'a TableBucketBuilder,
433        base_key: K,
434        start_sequence: u64,
435        end_sequence: u64,
436    ) -> Result<TableBucketRangeIterator<'a, K, V>, BucketError>
437    where
438        K: redb::Key + Clone + 'static,
439        for<'b> K: Borrow<K::SelfType<'b>>,
440        V: redb::Value + 'static,
441        for<'b> V: From<V::SelfType<'b>>,
442    {
443        TableBucketRangeIterator::<K, V>::new(self, builder, base_key, start_sequence, end_sequence)
444    }
445}
446
447/// Extension trait for table bucket iteration on read transactions for multimap tables.
448pub trait TableBucketMultimapIterExt {
449    fn table_bucket_multimap_range<'a, K, V>(
450        &'a self,
451        builder: &'a TableBucketBuilder,
452        base_key: K,
453        start_sequence: u64,
454        end_sequence: u64,
455    ) -> Result<TableBucketRangeMultimapIterator<'a, K, V>, BucketError>
456    where
457        K: redb::Key + Clone + 'static,
458        for<'b> K: Borrow<K::SelfType<'b>>,
459        V: redb::Key + 'static,
460        for<'b> V: From<V::SelfType<'b>>;
461}
462
463impl TableBucketMultimapIterExt for ReadTransaction {
464    fn table_bucket_multimap_range<'a, K, V>(
465        &'a self,
466        builder: &'a TableBucketBuilder,
467        base_key: K,
468        start_sequence: u64,
469        end_sequence: u64,
470    ) -> Result<TableBucketRangeMultimapIterator<'a, K, V>, BucketError>
471    where
472        K: redb::Key + Clone + 'static,
473        for<'b> K: Borrow<K::SelfType<'b>>,
474        V: redb::Key + 'static,
475        for<'b> V: From<V::SelfType<'b>>,
476    {
477        TableBucketRangeMultimapIterator::<K, V>::new(
478            self,
479            builder,
480            base_key,
481            start_sequence,
482            end_sequence,
483        )
484    }
485}
486
487#[cfg(test)]
488mod tests {
489    use super::*;
490    use crate::table_buckets::TableBucketBuilder;
491    use redb::{Database, ReadableDatabase};
492    use tempfile::NamedTempFile;
493
494    #[test]
495    fn test_table_bucket_iteration() -> Result<(), Box<dyn std::error::Error>> {
496        let temp_file = NamedTempFile::new()?;
497        let db = Database::create(temp_file.path())?;
498        let builder = TableBucketBuilder::new(100, "table_bucket")?;
499
500        {
501            let write_txn = db.begin_write()?;
502            {
503                {
504                    let mut table =
505                        write_txn.open_table(builder.table_definition::<u64, String>(0))?;
506                    table.insert(123u64, "value_50".to_string())?;
507                    table.insert(456u64, "other_50".to_string())?;
508                }
509
510                {
511                    let mut table =
512                        write_txn.open_table(builder.table_definition::<u64, String>(1))?;
513                    table.insert(123u64, "value_150".to_string())?;
514                }
515
516                {
517                    let mut table =
518                        write_txn.open_table(builder.table_definition::<u64, String>(2))?;
519                    table.insert(123u64, "value_250".to_string())?;
520                }
521            }
522            write_txn.commit()?;
523        }
524
525        let read_txn = db.begin_read()?;
526        let iter = TableBucketRangeIterator::new(&read_txn, &builder, 123u64, 0, 299)?;
527        assert_eq!(iter.bucket_range(), (0, 2));
528
529        let values: Vec<String> = iter.collect::<Result<_, _>>()?;
530        assert_eq!(
531            values,
532            vec![
533                "value_50".to_string(),
534                "value_150".to_string(),
535                "value_250".to_string()
536            ]
537        );
538
539        let iter = TableBucketRangeIterator::new(&read_txn, &builder, 123u64, 0, 299)?;
540        let values: Vec<String> = iter.rev().collect::<Result<_, _>>()?;
541        assert_eq!(
542            values,
543            vec![
544                "value_250".to_string(),
545                "value_150".to_string(),
546                "value_50".to_string()
547            ]
548        );
549
550        let iter = read_txn.table_bucket_range(&builder, 456u64, 0, 299)?;
551        let values: Vec<String> = iter.collect::<Result<_, _>>()?;
552        assert_eq!(values, vec!["other_50".to_string()]);
553
554        let invalid_iter =
555            TableBucketRangeIterator::<u64, String>::new(&read_txn, &builder, 123u64, 200, 100);
556        assert!(invalid_iter.is_err());
557
558        Ok(())
559    }
560
561    #[test]
562    fn test_table_bucket_multimap_iteration() -> Result<(), Box<dyn std::error::Error>> {
563        let temp_file = NamedTempFile::new()?;
564        let db = Database::create(temp_file.path())?;
565        let builder = TableBucketBuilder::new(100, "table_bucket_multimap")?;
566
567        {
568            let write_txn = db.begin_write()?;
569            {
570                {
571                    let mut table = write_txn
572                        .open_multimap_table(builder.multimap_table_definition::<u64, u64>(0))?;
573                    table.insert(123u64, 10u64)?;
574                    table.insert(123u64, 20u64)?;
575                    table.insert(456u64, 99u64)?;
576                    table.insert(456u64, 100u64)?;
577                }
578
579                {
580                    let mut table = write_txn
581                        .open_multimap_table(builder.multimap_table_definition::<u64, u64>(1))?;
582                    table.insert(123u64, 30u64)?;
583                    table.insert(123u64, 40u64)?;
584                }
585            }
586            write_txn.commit()?;
587        }
588
589        let read_txn = db.begin_read()?;
590        let iter = TableBucketRangeMultimapIterator::new(&read_txn, &builder, 123u64, 0, 199)?;
591        assert_eq!(iter.bucket_range(), (0, 1));
592
593        let values: Vec<u64> = iter.collect::<Result<_, _>>()?;
594        assert_eq!(values, vec![10u64, 20u64, 30u64, 40u64]);
595
596        let iter = TableBucketRangeMultimapIterator::new(&read_txn, &builder, 123u64, 0, 199)?;
597        let values: Vec<u64> = iter.rev().collect::<Result<_, _>>()?;
598        assert_eq!(values, vec![40u64, 30u64, 20u64, 10u64]);
599
600        let iter = read_txn.table_bucket_multimap_range(&builder, 456u64, 0, 99)?;
601        let values: Vec<u64> = iter.collect::<Result<_, _>>()?;
602        assert_eq!(values, vec![99u64, 100u64]);
603
604        Ok(())
605    }
606}