redb_extras/buckets/
iterator.rs

1//! Bucket range iterator implementation.
2//!
3//! Provides efficient iteration over bucket ranges for specific base keys.
4
5use crate::buckets::key::{BucketedKey, KeyBuilder};
6use crate::buckets::BucketError;
7use redb::{MultimapValue, ReadOnlyMultimapTable, ReadOnlyTable};
8
9/// Iterator over a range of buckets for a specific base key.
10///
11/// BucketRangeIterator performs point lookups for each bucket in the
12/// requested sequence range, yielding only values that match the base key.
13///
14/// Implements `DoubleEndedIterator` for reverse iteration.
15pub struct BucketRangeIterator<'a, V>
16where
17    V: redb::Value + 'static,
18    for<'b> V: From<V::SelfType<'b>>,
19{
20    table: &'a ReadOnlyTable<BucketedKey<u64>, V>,
21    base_key: u64,
22    start_bucket: u64,
23    end_bucket: u64,
24    front_bucket: i64,
25    back_bucket: i64,
26    finished: bool,
27}
28
29impl<'a, V> BucketRangeIterator<'a, V>
30where
31    V: redb::Value + 'static,
32    for<'b> V: From<V::SelfType<'b>>,
33{
34    /// Create a new bucket range iterator.
35    pub fn new(
36        table: &'a ReadOnlyTable<BucketedKey<u64>, V>,
37        key_builder: &KeyBuilder,
38        base_key: u64,
39        start_sequence: u64,
40        end_sequence: u64,
41    ) -> Result<Self, BucketError> {
42        if start_sequence > end_sequence {
43            return Err(BucketError::InvalidRange {
44                start: start_sequence,
45                end: end_sequence,
46            });
47        }
48
49        let bucket_size = key_builder.bucket_size();
50        let start_bucket = start_sequence / bucket_size;
51        let end_bucket = end_sequence / bucket_size;
52
53        Ok(Self {
54            table,
55            base_key,
56            start_bucket,
57            end_bucket,
58            front_bucket: start_bucket as i64,
59            back_bucket: end_bucket as i64,
60            finished: false,
61        })
62    }
63
64    /// Get the bucket range.
65    pub fn bucket_range(&self) -> (u64, u64) {
66        (self.start_bucket, self.end_bucket)
67    }
68}
69
70impl<'a, V> Iterator for BucketRangeIterator<'a, V>
71where
72    V: redb::Value + 'static,
73    for<'b> V: From<V::SelfType<'b>>,
74{
75    type Item = Result<V, BucketError>;
76
77    fn next(&mut self) -> Option<Self::Item> {
78        if self.finished {
79            return None;
80        }
81
82        while self.front_bucket <= self.back_bucket {
83            let bucket = self.front_bucket as u64;
84            self.front_bucket += 1;
85
86            match self.table.get(&BucketedKey::new(self.base_key, bucket)) {
87                Ok(Some(value_guard)) => {
88                    return Some(Ok(V::from(value_guard.value())));
89                }
90                Ok(None) => continue,
91                Err(err) => {
92                    self.finished = true;
93                    return Some(Err(BucketError::IterationError(format!(
94                        "Database error during point lookup: {}",
95                        err
96                    ))));
97                }
98            }
99        }
100
101        self.finished = true;
102        None
103    }
104}
105
106impl<'a, V> DoubleEndedIterator for BucketRangeIterator<'a, V>
107where
108    V: redb::Value + 'static,
109    for<'b> V: From<V::SelfType<'b>>,
110{
111    fn next_back(&mut self) -> Option<Self::Item> {
112        if self.finished {
113            return None;
114        }
115
116        while self.front_bucket <= self.back_bucket {
117            let bucket = self.back_bucket as u64;
118            self.back_bucket -= 1;
119
120            match self.table.get(&BucketedKey::new(self.base_key, bucket)) {
121                Ok(Some(value_guard)) => {
122                    return Some(Ok(V::from(value_guard.value())));
123                }
124                Ok(None) => continue,
125                Err(err) => {
126                    self.finished = true;
127                    return Some(Err(BucketError::IterationError(format!(
128                        "Database error during point lookup: {}",
129                        err
130                    ))));
131                }
132            }
133        }
134
135        self.finished = true;
136        None
137    }
138}
139
140/// Iterator over a range of buckets for a specific base key in multimap tables.
141///
142/// This iterator flattens the multimap values, yielding each value in order
143/// across the requested bucket range via per-bucket point lookups.
144///
145/// Implements `DoubleEndedIterator` to iterate buckets and values in reverse.
146///
147/// ```
148/// use redb::{Database, MultimapTableDefinition, ReadableDatabase};
149/// use redb_extras::buckets::{BucketMultimapIterExt, BucketedKey, KeyBuilder};
150///
151/// const TABLE: MultimapTableDefinition<'static, BucketedKey<u64>, u64> =
152///     MultimapTableDefinition::new("bucketed_values");
153///
154/// # fn main() -> Result<(), Box<dyn std::error::Error>> {
155/// let db = Database::create("example.redb")?;
156/// let key_builder = KeyBuilder::new(100)?;
157///
158/// let write_txn = db.begin_write()?;
159/// {
160///     let mut table = write_txn.open_multimap_table(TABLE)?;
161///     table.insert(key_builder.bucketed_key(42u64, 10), 1u64)?;
162///     table.insert(key_builder.bucketed_key(42u64, 110), 2u64)?;
163/// }
164/// write_txn.commit()?;
165///
166/// let read_txn = db.begin_read()?;
167/// let table = read_txn.open_multimap_table(TABLE)?;
168/// let values: Vec<u64> = table
169///     .bucket_range(&key_builder, 42u64, 0, 199)?
170///     .collect::<Result<_, _>>()?;
171/// assert_eq!(values, vec![1u64, 2u64]);
172/// # Ok(())
173/// # }
174/// ```
175pub struct BucketRangeMultimapIterator<'a, V>
176where
177    V: redb::Key + 'static,
178    for<'b> V: From<V::SelfType<'b>>,
179{
180    table: &'a ReadOnlyMultimapTable<BucketedKey<u64>, V>,
181    base_key: u64,
182    start_bucket: u64,
183    end_bucket: u64,
184    front_bucket: i64,
185    back_bucket: i64,
186    finished: bool,
187    front_values: Option<MultimapValue<'a, V>>,
188    back_values: Option<MultimapValue<'a, V>>,
189}
190
191impl<'a, V> BucketRangeMultimapIterator<'a, V>
192where
193    V: redb::Key + 'static,
194    for<'b> V: From<V::SelfType<'b>>,
195{
196    /// Create a new bucket range iterator for a multimap table.
197    pub fn new(
198        table: &'a ReadOnlyMultimapTable<BucketedKey<u64>, V>,
199        key_builder: &KeyBuilder,
200        base_key: u64,
201        start_sequence: u64,
202        end_sequence: u64,
203    ) -> Result<Self, BucketError> {
204        if start_sequence > end_sequence {
205            return Err(BucketError::InvalidRange {
206                start: start_sequence,
207                end: end_sequence,
208            });
209        }
210
211        let bucket_size = key_builder.bucket_size();
212        let start_bucket = start_sequence / bucket_size;
213        let end_bucket = end_sequence / bucket_size;
214
215        Ok(Self {
216            table,
217            base_key,
218            start_bucket,
219            end_bucket,
220            front_bucket: start_bucket as i64,
221            back_bucket: end_bucket as i64,
222            finished: false,
223            front_values: None,
224            back_values: None,
225        })
226    }
227
228    /// Get the bucket range.
229    pub fn bucket_range(&self) -> (u64, u64) {
230        (self.start_bucket, self.end_bucket)
231    }
232}
233
234impl<'a, V> Iterator for BucketRangeMultimapIterator<'a, V>
235where
236    V: redb::Key + 'static,
237    for<'b> V: From<V::SelfType<'b>>,
238{
239    type Item = Result<V, BucketError>;
240
241    fn next(&mut self) -> Option<Self::Item> {
242        if self.finished {
243            return None;
244        }
245
246        loop {
247            if let Some(values) = self.front_values.as_mut() {
248                match values.next() {
249                    Some(Ok(value_guard)) => {
250                        return Some(Ok(V::from(value_guard.value())));
251                    }
252                    Some(Err(err)) => {
253                        self.finished = true;
254                        return Some(Err(BucketError::IterationError(format!(
255                            "Database error during point lookup: {}",
256                            err
257                        ))));
258                    }
259                    None => {
260                        self.front_values = None;
261                    }
262                }
263            }
264
265            if self.front_bucket > self.back_bucket {
266                self.finished = true;
267                return None;
268            }
269
270            let bucket = self.front_bucket as u64;
271            self.front_bucket += 1;
272
273            match self.table.get(&BucketedKey::new(self.base_key, bucket)) {
274                Ok(values) => {
275                    self.front_values = Some(values);
276                }
277                Err(err) => {
278                    self.finished = true;
279                    return Some(Err(BucketError::IterationError(format!(
280                        "Database error during point lookup: {}",
281                        err
282                    ))));
283                }
284            }
285        }
286    }
287}
288
289impl<'a, V> DoubleEndedIterator for BucketRangeMultimapIterator<'a, V>
290where
291    V: redb::Key + 'static,
292    for<'b> V: From<V::SelfType<'b>>,
293{
294    fn next_back(&mut self) -> Option<Self::Item> {
295        if self.finished {
296            return None;
297        }
298
299        loop {
300            if let Some(values) = self.back_values.as_mut() {
301                match values.next_back() {
302                    Some(Ok(value_guard)) => {
303                        return Some(Ok(V::from(value_guard.value())));
304                    }
305                    Some(Err(err)) => {
306                        self.finished = true;
307                        return Some(Err(BucketError::IterationError(format!(
308                            "Database error during point lookup: {}",
309                            err
310                        ))));
311                    }
312                    None => {
313                        self.back_values = None;
314                    }
315                }
316            }
317
318            if self.front_bucket > self.back_bucket {
319                self.finished = true;
320                return None;
321            }
322
323            let bucket = self.back_bucket as u64;
324            self.back_bucket -= 1;
325
326            match self.table.get(&BucketedKey::new(self.base_key, bucket)) {
327                Ok(values) => {
328                    self.back_values = Some(values);
329                }
330                Err(err) => {
331                    self.finished = true;
332                    return Some(Err(BucketError::IterationError(format!(
333                        "Database error during point lookup: {}",
334                        err
335                    ))));
336                }
337            }
338        }
339    }
340}
341
342/// Extension trait for bucket iteration on read-only tables.
343///
344/// Bucket iteration uses per-bucket point lookups for the requested
345/// sequence range, skipping buckets that have no stored value.
346pub trait BucketIterExt<V>
347where
348    V: redb::Value + 'static,
349    for<'b> V: From<V::SelfType<'b>>,
350{
351    fn bucket_range(
352        &self,
353        key_builder: &KeyBuilder,
354        base_key: u64,
355        start_sequence: u64,
356        end_sequence: u64,
357    ) -> Result<BucketRangeIterator<'_, V>, BucketError>;
358}
359
360impl<V> BucketIterExt<V> for ReadOnlyTable<BucketedKey<u64>, V>
361where
362    V: redb::Value + 'static,
363    for<'b> V: From<V::SelfType<'b>>,
364{
365    fn bucket_range(
366        &self,
367        key_builder: &KeyBuilder,
368        base_key: u64,
369        start_sequence: u64,
370        end_sequence: u64,
371    ) -> Result<BucketRangeIterator<'_, V>, BucketError> {
372        BucketRangeIterator::new(self, key_builder, base_key, start_sequence, end_sequence)
373    }
374}
375
376/// Extension trait for bucket iteration on read-only multimap tables.
377///
378/// Returns a flattened iterator over values for the base key within the
379/// requested bucket range, using per-bucket point lookups.
380pub trait BucketMultimapIterExt<V>
381where
382    V: redb::Key + 'static,
383    for<'b> V: From<V::SelfType<'b>>,
384{
385    fn bucket_range(
386        &self,
387        key_builder: &KeyBuilder,
388        base_key: u64,
389        start_sequence: u64,
390        end_sequence: u64,
391    ) -> Result<BucketRangeMultimapIterator<'_, V>, BucketError>;
392}
393
394impl<V> BucketMultimapIterExt<V> for ReadOnlyMultimapTable<BucketedKey<u64>, V>
395where
396    V: redb::Key + 'static,
397    for<'b> V: From<V::SelfType<'b>>,
398{
399    fn bucket_range(
400        &self,
401        key_builder: &KeyBuilder,
402        base_key: u64,
403        start_sequence: u64,
404        end_sequence: u64,
405    ) -> Result<BucketRangeMultimapIterator<'_, V>, BucketError> {
406        BucketRangeMultimapIterator::new(self, key_builder, base_key, start_sequence, end_sequence)
407    }
408}
409
410#[cfg(test)]
411mod tests {
412    use super::*;
413    use redb::{Database, MultimapTableDefinition, ReadableDatabase, TableDefinition};
414    use tempfile::NamedTempFile;
415
416    const TEST_TABLE: TableDefinition<'static, BucketedKey<u64>, String> =
417        TableDefinition::new("test_table");
418    const TEST_MULTIMAP: MultimapTableDefinition<'static, BucketedKey<u64>, u64> =
419        MultimapTableDefinition::new("test_multimap");
420
421    #[test]
422    fn test_basic_functionality() -> Result<(), Box<dyn std::error::Error>> {
423        let temp_file = NamedTempFile::new()?;
424        let db = Database::create(temp_file.path())?;
425        let key_builder = KeyBuilder::new(100)?;
426
427        // Insert test data
428        {
429            let write_txn = db.begin_write()?;
430            {
431                let mut table = write_txn.open_table(TEST_TABLE)?;
432
433                // Insert values for user123 in different buckets
434                table.insert(key_builder.bucketed_key(123u64, 50), "value_50".to_string())?;
435                table.insert(
436                    key_builder.bucketed_key(123u64, 150),
437                    "value_150".to_string(),
438                )?;
439                table.insert(
440                    key_builder.bucketed_key(123u64, 250),
441                    "value_250".to_string(),
442                )?;
443
444                // Insert values for user456 in same buckets (should not appear in iteration)
445                table.insert(key_builder.bucketed_key(456u64, 50), "other_50".to_string())?;
446                table.insert(
447                    key_builder.bucketed_key(456u64, 150),
448                    "other_150".to_string(),
449                )?;
450            }
451            write_txn.commit()?;
452        }
453
454        // Test that we can create bucket range iterators
455        {
456            let read_txn = db.begin_read()?;
457            let table = read_txn.open_table(TEST_TABLE)?;
458            let iter = BucketRangeIterator::new(&table, &key_builder, 123u64, 0, 199)?;
459            assert_eq!(iter.bucket_range(), (0, 1));
460
461            // Test invalid range
462            let invalid_iter = BucketRangeIterator::new(&table, &key_builder, 123u64, 200, 100);
463            assert!(invalid_iter.is_err());
464        }
465
466        // Test value iteration and base key filtering
467        {
468            let read_txn = db.begin_read()?;
469            let table = read_txn.open_table(TEST_TABLE)?;
470            let iter = BucketRangeIterator::new(&table, &key_builder, 123u64, 0, 299)?;
471            let values: Vec<String> = iter.collect::<Result<_, _>>()?;
472            assert_eq!(
473                values,
474                vec![
475                    "value_50".to_string(),
476                    "value_150".to_string(),
477                    "value_250".to_string()
478                ]
479            );
480
481            let iter = BucketRangeIterator::new(&table, &key_builder, 123u64, 0, 299)?;
482            let values: Vec<String> = iter.rev().collect::<Result<_, _>>()?;
483            assert_eq!(
484                values,
485                vec![
486                    "value_250".to_string(),
487                    "value_150".to_string(),
488                    "value_50".to_string()
489                ]
490            );
491
492            let iter = table.bucket_range(&key_builder, 456u64, 0, 299)?;
493            let values: Vec<String> = iter.collect::<Result<_, _>>()?;
494            assert_eq!(
495                values,
496                vec!["other_50".to_string(), "other_150".to_string()]
497            );
498        }
499
500        Ok(())
501    }
502
503    #[test]
504    fn test_multimap_functionality() -> Result<(), Box<dyn std::error::Error>> {
505        let temp_file = NamedTempFile::new()?;
506        let db = Database::create(temp_file.path())?;
507        let key_builder = KeyBuilder::new(100)?;
508
509        {
510            let write_txn = db.begin_write()?;
511            {
512                let mut table = write_txn.open_multimap_table(TEST_MULTIMAP)?;
513
514                table.insert(key_builder.bucketed_key(123u64, 50), 10u64)?;
515                table.insert(key_builder.bucketed_key(123u64, 50), 20u64)?;
516                table.insert(key_builder.bucketed_key(123u64, 150), 30u64)?;
517                table.insert(key_builder.bucketed_key(123u64, 150), 40u64)?;
518
519                table.insert(key_builder.bucketed_key(456u64, 50), 99u64)?;
520                table.insert(key_builder.bucketed_key(456u64, 50), 100u64)?;
521            }
522            write_txn.commit()?;
523        }
524
525        {
526            let read_txn = db.begin_read()?;
527            let table = read_txn.open_multimap_table(TEST_MULTIMAP)?;
528            let iter = BucketRangeMultimapIterator::new(&table, &key_builder, 123u64, 0, 199)?;
529            assert_eq!(iter.bucket_range(), (0, 1));
530
531            let values: Vec<u64> = iter.collect::<Result<_, _>>()?;
532            assert_eq!(values, vec![10u64, 20u64, 30u64, 40u64]);
533
534            let iter = BucketRangeMultimapIterator::new(&table, &key_builder, 123u64, 0, 199)?;
535            let values: Vec<u64> = iter.rev().collect::<Result<_, _>>()?;
536            assert_eq!(values, vec![40u64, 30u64, 20u64, 10u64]);
537
538            let iter = table.bucket_range(&key_builder, 456u64, 0, 99)?;
539            let values: Vec<u64> = iter.collect::<Result<_, _>>()?;
540            assert_eq!(values, vec![99u64, 100u64]);
541        }
542
543        Ok(())
544    }
545}