redb_extras/key_buckets/
iterator.rs

1//! Bucket range iterator implementation.
2//!
3//! Provides efficient iteration over bucket ranges for specific base keys.
4
5use crate::key_buckets::key::{BucketedKey, KeyBuilder};
6use crate::key_buckets::BucketError;
7use redb::{ReadOnlyMultimapTable, ReadOnlyTable};
8use std::collections::VecDeque;
9
10/// Iterator over a range of buckets for a specific base key.
11///
12/// BucketRangeIterator performs point lookups for each bucket in the
13/// requested sequence range, yielding only values that match the base key.
14///
15/// Implements `DoubleEndedIterator` for reverse iteration.
16pub struct BucketRangeIterator<V>
17where
18    V: redb::Value + 'static,
19    for<'b> V: From<V::SelfType<'b>>,
20{
21    table: ReadOnlyTable<BucketedKey<u64>, V>,
22    base_key: u64,
23    start_bucket: u64,
24    end_bucket: u64,
25    front_bucket: i64,
26    back_bucket: i64,
27    finished: bool,
28}
29
30impl<V> BucketRangeIterator<V>
31where
32    V: redb::Value + 'static,
33    for<'b> V: From<V::SelfType<'b>>,
34{
35    /// Create a new bucket range iterator.
36    pub fn new(
37        table: ReadOnlyTable<BucketedKey<u64>, V>,
38        key_builder: &KeyBuilder,
39        base_key: u64,
40        start_sequence: u64,
41        end_sequence: u64,
42    ) -> Result<Self, BucketError> {
43        if start_sequence > end_sequence {
44            return Err(BucketError::InvalidRange {
45                start: start_sequence,
46                end: end_sequence,
47            });
48        }
49
50        let bucket_size = key_builder.bucket_size();
51        let start_bucket = start_sequence / bucket_size;
52        let end_bucket = end_sequence / bucket_size;
53
54        Ok(Self {
55            table,
56            base_key,
57            start_bucket,
58            end_bucket,
59            front_bucket: start_bucket as i64,
60            back_bucket: end_bucket as i64,
61            finished: false,
62        })
63    }
64
65    /// Get the bucket range.
66    pub fn bucket_range(&self) -> (u64, u64) {
67        (self.start_bucket, self.end_bucket)
68    }
69}
70
71impl<V> Iterator for BucketRangeIterator<V>
72where
73    V: redb::Value + 'static,
74    for<'b> V: From<V::SelfType<'b>>,
75{
76    type Item = Result<V, BucketError>;
77
78    fn next(&mut self) -> Option<Self::Item> {
79        if self.finished {
80            return None;
81        }
82
83        while self.front_bucket <= self.back_bucket {
84            let bucket = self.front_bucket as u64;
85            self.front_bucket += 1;
86
87            match self.table.get(&BucketedKey::new(self.base_key, bucket)) {
88                Ok(Some(value_guard)) => {
89                    return Some(Ok(V::from(value_guard.value())));
90                }
91                Ok(None) => continue,
92                Err(err) => {
93                    self.finished = true;
94                    return Some(Err(BucketError::IterationError(format!(
95                        "Database error during point lookup: {}",
96                        err
97                    ))));
98                }
99            }
100        }
101
102        self.finished = true;
103        None
104    }
105}
106
107impl<V> DoubleEndedIterator for BucketRangeIterator<V>
108where
109    V: redb::Value + 'static,
110    for<'b> V: From<V::SelfType<'b>>,
111{
112    fn next_back(&mut self) -> Option<Self::Item> {
113        if self.finished {
114            return None;
115        }
116
117        while self.front_bucket <= self.back_bucket {
118            let bucket = self.back_bucket as u64;
119            self.back_bucket -= 1;
120
121            match self.table.get(&BucketedKey::new(self.base_key, bucket)) {
122                Ok(Some(value_guard)) => {
123                    return Some(Ok(V::from(value_guard.value())));
124                }
125                Ok(None) => continue,
126                Err(err) => {
127                    self.finished = true;
128                    return Some(Err(BucketError::IterationError(format!(
129                        "Database error during point lookup: {}",
130                        err
131                    ))));
132                }
133            }
134        }
135
136        self.finished = true;
137        None
138    }
139}
140
141/// Iterator over a range of buckets for a specific base key in multimap tables.
142///
143/// This iterator flattens the multimap values, yielding each value in order
144/// across the requested bucket range via per-bucket point lookups.
145///
146/// Implements `DoubleEndedIterator` to iterate buckets and values in reverse.
147///
148/// ```
149/// use redb::{Database, MultimapTableDefinition, ReadableDatabase};
150/// use redb_extras::key_buckets::{BucketMultimapIterExt, BucketedKey, KeyBuilder};
151///
152/// const TABLE: MultimapTableDefinition<'static, BucketedKey<u64>, u64> =
153///     MultimapTableDefinition::new("bucketed_values");
154///
155/// # fn main() -> Result<(), Box<dyn std::error::Error>> {
156/// let db = Database::create("example.redb")?;
157/// let key_builder = KeyBuilder::new(100)?;
158///
159/// let write_txn = db.begin_write()?;
160/// {
161///     let mut table = write_txn.open_multimap_table(TABLE)?;
162///     table.insert(key_builder.bucketed_key(42u64, 10), 1u64)?;
163///     table.insert(key_builder.bucketed_key(42u64, 110), 2u64)?;
164/// }
165/// write_txn.commit()?;
166///
167/// let read_txn = db.begin_read()?;
168/// let table = read_txn.open_multimap_table(TABLE)?;
169/// let values: Vec<u64> = table
170///     .bucket_range(&key_builder, 42u64, 0, 199)?
171///     .collect::<Result<_, _>>()?;
172/// assert_eq!(values, vec![1u64, 2u64]);
173/// # Ok(())
174/// # }
175/// ```
176pub struct BucketRangeMultimapIterator<V>
177where
178    V: redb::Key + 'static,
179    for<'b> V: From<V::SelfType<'b>>,
180{
181    table: ReadOnlyMultimapTable<BucketedKey<u64>, V>,
182    base_key: u64,
183    start_bucket: u64,
184    end_bucket: u64,
185    front_bucket: i64,
186    back_bucket: i64,
187    finished: bool,
188    front_values: Option<VecDeque<V>>,
189    back_values: Option<VecDeque<V>>,
190}
191
192impl<V> BucketRangeMultimapIterator<V>
193where
194    V: redb::Key + 'static,
195    for<'b> V: From<V::SelfType<'b>>,
196{
197    /// Create a new bucket range iterator for a multimap table.
198    pub fn new(
199        table: ReadOnlyMultimapTable<BucketedKey<u64>, V>,
200        key_builder: &KeyBuilder,
201        base_key: u64,
202        start_sequence: u64,
203        end_sequence: u64,
204    ) -> Result<Self, BucketError> {
205        if start_sequence > end_sequence {
206            return Err(BucketError::InvalidRange {
207                start: start_sequence,
208                end: end_sequence,
209            });
210        }
211
212        let bucket_size = key_builder.bucket_size();
213        let start_bucket = start_sequence / bucket_size;
214        let end_bucket = end_sequence / bucket_size;
215
216        Ok(Self {
217            table,
218            base_key,
219            start_bucket,
220            end_bucket,
221            front_bucket: start_bucket as i64,
222            back_bucket: end_bucket as i64,
223            finished: false,
224            front_values: None,
225            back_values: None,
226        })
227    }
228
229    /// Get the bucket range.
230    pub fn bucket_range(&self) -> (u64, u64) {
231        (self.start_bucket, self.end_bucket)
232    }
233}
234
235impl<V> Iterator for BucketRangeMultimapIterator<V>
236where
237    V: redb::Key + 'static,
238    for<'b> V: From<V::SelfType<'b>>,
239{
240    type Item = Result<V, BucketError>;
241
242    fn next(&mut self) -> Option<Self::Item> {
243        if self.finished {
244            return None;
245        }
246
247        loop {
248            if let Some(values) = self.front_values.as_mut() {
249                if let Some(value) = values.pop_front() {
250                    return Some(Ok(value));
251                }
252                self.front_values = None;
253            }
254
255            if self.front_bucket > self.back_bucket {
256                self.finished = true;
257                return None;
258            }
259
260            let bucket = self.front_bucket as u64;
261            self.front_bucket += 1;
262
263            match self.table.get(&BucketedKey::new(self.base_key, bucket)) {
264                Ok(values) => {
265                    let mut collected = VecDeque::new();
266                    for value_result in values {
267                        match value_result {
268                            Ok(value_guard) => {
269                                collected.push_back(V::from(value_guard.value()));
270                            }
271                            Err(err) => {
272                                self.finished = true;
273                                return Some(Err(BucketError::IterationError(format!(
274                                    "Database error during point lookup: {}",
275                                    err
276                                ))));
277                            }
278                        }
279                    }
280                    if collected.is_empty() {
281                        continue;
282                    }
283                    self.front_values = Some(collected);
284                }
285                Err(err) => {
286                    self.finished = true;
287                    return Some(Err(BucketError::IterationError(format!(
288                        "Database error during point lookup: {}",
289                        err
290                    ))));
291                }
292            }
293        }
294    }
295}
296
297impl<V> DoubleEndedIterator for BucketRangeMultimapIterator<V>
298where
299    V: redb::Key + 'static,
300    for<'b> V: From<V::SelfType<'b>>,
301{
302    fn next_back(&mut self) -> Option<Self::Item> {
303        if self.finished {
304            return None;
305        }
306
307        loop {
308            if let Some(values) = self.back_values.as_mut() {
309                if let Some(value) = values.pop_back() {
310                    return Some(Ok(value));
311                }
312                self.back_values = None;
313            }
314
315            if self.front_bucket > self.back_bucket {
316                self.finished = true;
317                return None;
318            }
319
320            let bucket = self.back_bucket as u64;
321            self.back_bucket -= 1;
322
323            match self.table.get(&BucketedKey::new(self.base_key, bucket)) {
324                Ok(values) => {
325                    let mut collected = VecDeque::new();
326                    for value_result in values {
327                        match value_result {
328                            Ok(value_guard) => {
329                                collected.push_back(V::from(value_guard.value()));
330                            }
331                            Err(err) => {
332                                self.finished = true;
333                                return Some(Err(BucketError::IterationError(format!(
334                                    "Database error during point lookup: {}",
335                                    err
336                                ))));
337                            }
338                        }
339                    }
340                    if collected.is_empty() {
341                        continue;
342                    }
343                    self.back_values = Some(collected);
344                }
345                Err(err) => {
346                    self.finished = true;
347                    return Some(Err(BucketError::IterationError(format!(
348                        "Database error during point lookup: {}",
349                        err
350                    ))));
351                }
352            }
353        }
354    }
355}
356
357/// Extension trait for bucket iteration on read-only tables.
358///
359/// Bucket iteration uses per-bucket point lookups for the requested
360/// sequence range, skipping buckets that have no stored value.
361///
362/// This consumes the table handle so the iterator can own it.
363pub trait BucketIterExt<V>
364where
365    V: redb::Value + 'static,
366    for<'b> V: From<V::SelfType<'b>>,
367{
368    fn bucket_range(
369        self,
370        key_builder: &KeyBuilder,
371        base_key: u64,
372        start_sequence: u64,
373        end_sequence: u64,
374    ) -> Result<BucketRangeIterator<V>, BucketError>;
375}
376
377impl<V> BucketIterExt<V> for ReadOnlyTable<BucketedKey<u64>, V>
378where
379    V: redb::Value + 'static,
380    for<'b> V: From<V::SelfType<'b>>,
381{
382    fn bucket_range(
383        self,
384        key_builder: &KeyBuilder,
385        base_key: u64,
386        start_sequence: u64,
387        end_sequence: u64,
388    ) -> Result<BucketRangeIterator<V>, BucketError> {
389        BucketRangeIterator::new(self, key_builder, base_key, start_sequence, end_sequence)
390    }
391}
392
393/// Extension trait for bucket iteration on read-only multimap tables.
394///
395/// Returns a flattened iterator over values for the base key within the
396/// requested bucket range, using per-bucket point lookups.
397///
398/// This consumes the table handle so the iterator can own it.
399pub trait BucketMultimapIterExt<V>
400where
401    V: redb::Key + 'static,
402    for<'b> V: From<V::SelfType<'b>>,
403{
404    fn bucket_range(
405        self,
406        key_builder: &KeyBuilder,
407        base_key: u64,
408        start_sequence: u64,
409        end_sequence: u64,
410    ) -> Result<BucketRangeMultimapIterator<V>, BucketError>;
411}
412
413impl<V> BucketMultimapIterExt<V> for ReadOnlyMultimapTable<BucketedKey<u64>, V>
414where
415    V: redb::Key + 'static,
416    for<'b> V: From<V::SelfType<'b>>,
417{
418    fn bucket_range(
419        self,
420        key_builder: &KeyBuilder,
421        base_key: u64,
422        start_sequence: u64,
423        end_sequence: u64,
424    ) -> Result<BucketRangeMultimapIterator<V>, BucketError> {
425        BucketRangeMultimapIterator::new(self, key_builder, base_key, start_sequence, end_sequence)
426    }
427}
428
429#[cfg(test)]
430mod tests {
431    use super::*;
432    use redb::{Database, MultimapTableDefinition, ReadableDatabase, TableDefinition};
433    use tempfile::NamedTempFile;
434
435    const TEST_TABLE: TableDefinition<'static, BucketedKey<u64>, String> =
436        TableDefinition::new("test_table");
437    const TEST_MULTIMAP: MultimapTableDefinition<'static, BucketedKey<u64>, u64> =
438        MultimapTableDefinition::new("test_multimap");
439
440    #[test]
441    fn test_basic_functionality() -> Result<(), Box<dyn std::error::Error>> {
442        let temp_file = NamedTempFile::new()?;
443        let db = Database::create(temp_file.path())?;
444        let key_builder = KeyBuilder::new(100)?;
445
446        // Insert test data
447        {
448            let write_txn = db.begin_write()?;
449            {
450                let mut table = write_txn.open_table(TEST_TABLE)?;
451
452                // Insert values for user123 in different buckets
453                table.insert(key_builder.bucketed_key(123u64, 50), "value_50".to_string())?;
454                table.insert(
455                    key_builder.bucketed_key(123u64, 150),
456                    "value_150".to_string(),
457                )?;
458                table.insert(
459                    key_builder.bucketed_key(123u64, 250),
460                    "value_250".to_string(),
461                )?;
462
463                // Insert values for user456 in same buckets (should not appear in iteration)
464                table.insert(key_builder.bucketed_key(456u64, 50), "other_50".to_string())?;
465                table.insert(
466                    key_builder.bucketed_key(456u64, 150),
467                    "other_150".to_string(),
468                )?;
469            }
470            write_txn.commit()?;
471        }
472
473        // Test that we can create bucket range iterators
474        {
475            let read_txn = db.begin_read()?;
476            let iter = BucketRangeIterator::new(
477                read_txn.open_table(TEST_TABLE)?,
478                &key_builder,
479                123u64,
480                0,
481                199,
482            )?;
483            assert_eq!(iter.bucket_range(), (0, 1));
484
485            // Test invalid range
486            let invalid_iter = BucketRangeIterator::new(
487                read_txn.open_table(TEST_TABLE)?,
488                &key_builder,
489                123u64,
490                200,
491                100,
492            );
493            assert!(invalid_iter.is_err());
494        }
495
496        // Test value iteration and base key filtering
497        {
498            let read_txn = db.begin_read()?;
499            let iter = BucketRangeIterator::new(
500                read_txn.open_table(TEST_TABLE)?,
501                &key_builder,
502                123u64,
503                0,
504                299,
505            )?;
506            let values: Vec<String> = iter.collect::<Result<_, _>>()?;
507            assert_eq!(
508                values,
509                vec![
510                    "value_50".to_string(),
511                    "value_150".to_string(),
512                    "value_250".to_string()
513                ]
514            );
515
516            let iter = BucketRangeIterator::new(
517                read_txn.open_table(TEST_TABLE)?,
518                &key_builder,
519                123u64,
520                0,
521                299,
522            )?;
523            let values: Vec<String> = iter.rev().collect::<Result<_, _>>()?;
524            assert_eq!(
525                values,
526                vec![
527                    "value_250".to_string(),
528                    "value_150".to_string(),
529                    "value_50".to_string()
530                ]
531            );
532
533            let iter =
534                read_txn
535                    .open_table(TEST_TABLE)?
536                    .bucket_range(&key_builder, 456u64, 0, 299)?;
537            let values: Vec<String> = iter.collect::<Result<_, _>>()?;
538            assert_eq!(
539                values,
540                vec!["other_50".to_string(), "other_150".to_string()]
541            );
542        }
543
544        Ok(())
545    }
546
547    #[test]
548    fn test_multimap_functionality() -> Result<(), Box<dyn std::error::Error>> {
549        let temp_file = NamedTempFile::new()?;
550        let db = Database::create(temp_file.path())?;
551        let key_builder = KeyBuilder::new(100)?;
552
553        {
554            let write_txn = db.begin_write()?;
555            {
556                let mut table = write_txn.open_multimap_table(TEST_MULTIMAP)?;
557
558                table.insert(key_builder.bucketed_key(123u64, 50), 10u64)?;
559                table.insert(key_builder.bucketed_key(123u64, 50), 20u64)?;
560                table.insert(key_builder.bucketed_key(123u64, 150), 30u64)?;
561                table.insert(key_builder.bucketed_key(123u64, 150), 40u64)?;
562
563                table.insert(key_builder.bucketed_key(456u64, 50), 99u64)?;
564                table.insert(key_builder.bucketed_key(456u64, 50), 100u64)?;
565            }
566            write_txn.commit()?;
567        }
568
569        {
570            let read_txn = db.begin_read()?;
571            let iter = BucketRangeMultimapIterator::new(
572                read_txn.open_multimap_table(TEST_MULTIMAP)?,
573                &key_builder,
574                123u64,
575                0,
576                199,
577            )?;
578            assert_eq!(iter.bucket_range(), (0, 1));
579
580            let values: Vec<u64> = iter.collect::<Result<_, _>>()?;
581            assert_eq!(values, vec![10u64, 20u64, 30u64, 40u64]);
582
583            let iter = BucketRangeMultimapIterator::new(
584                read_txn.open_multimap_table(TEST_MULTIMAP)?,
585                &key_builder,
586                123u64,
587                0,
588                199,
589            )?;
590            let values: Vec<u64> = iter.rev().collect::<Result<_, _>>()?;
591            assert_eq!(values, vec![40u64, 30u64, 20u64, 10u64]);
592
593            let iter = read_txn.open_multimap_table(TEST_MULTIMAP)?.bucket_range(
594                &key_builder,
595                456u64,
596                0,
597                99,
598            )?;
599            let values: Vec<u64> = iter.collect::<Result<_, _>>()?;
600            assert_eq!(values, vec![99u64, 100u64]);
601        }
602
603        Ok(())
604    }
605}