1use crate::key_buckets::key::{BucketedKey, KeyBuilder};
6use crate::key_buckets::BucketError;
7use redb::{ReadOnlyMultimapTable, ReadOnlyTable};
8use std::collections::VecDeque;
9
10pub struct BucketRangeIterator<V>
17where
18 V: redb::Value + 'static,
19 for<'b> V: From<V::SelfType<'b>>,
20{
21 table: ReadOnlyTable<BucketedKey<u64>, V>,
22 base_key: u64,
23 start_bucket: u64,
24 end_bucket: u64,
25 front_bucket: i64,
26 back_bucket: i64,
27 finished: bool,
28}
29
30impl<V> BucketRangeIterator<V>
31where
32 V: redb::Value + 'static,
33 for<'b> V: From<V::SelfType<'b>>,
34{
35 pub fn new(
37 table: ReadOnlyTable<BucketedKey<u64>, V>,
38 key_builder: &KeyBuilder,
39 base_key: u64,
40 start_sequence: u64,
41 end_sequence: u64,
42 ) -> Result<Self, BucketError> {
43 if start_sequence > end_sequence {
44 return Err(BucketError::InvalidRange {
45 start: start_sequence,
46 end: end_sequence,
47 });
48 }
49
50 let bucket_size = key_builder.bucket_size();
51 let start_bucket = start_sequence / bucket_size;
52 let end_bucket = end_sequence / bucket_size;
53
54 Ok(Self {
55 table,
56 base_key,
57 start_bucket,
58 end_bucket,
59 front_bucket: start_bucket as i64,
60 back_bucket: end_bucket as i64,
61 finished: false,
62 })
63 }
64
65 pub fn bucket_range(&self) -> (u64, u64) {
67 (self.start_bucket, self.end_bucket)
68 }
69}
70
71impl<V> Iterator for BucketRangeIterator<V>
72where
73 V: redb::Value + 'static,
74 for<'b> V: From<V::SelfType<'b>>,
75{
76 type Item = Result<V, BucketError>;
77
78 fn next(&mut self) -> Option<Self::Item> {
79 if self.finished {
80 return None;
81 }
82
83 while self.front_bucket <= self.back_bucket {
84 let bucket = self.front_bucket as u64;
85 self.front_bucket += 1;
86
87 match self.table.get(&BucketedKey::new(self.base_key, bucket)) {
88 Ok(Some(value_guard)) => {
89 return Some(Ok(V::from(value_guard.value())));
90 }
91 Ok(None) => continue,
92 Err(err) => {
93 self.finished = true;
94 return Some(Err(BucketError::IterationError(format!(
95 "Database error during point lookup: {}",
96 err
97 ))));
98 }
99 }
100 }
101
102 self.finished = true;
103 None
104 }
105}
106
107impl<V> DoubleEndedIterator for BucketRangeIterator<V>
108where
109 V: redb::Value + 'static,
110 for<'b> V: From<V::SelfType<'b>>,
111{
112 fn next_back(&mut self) -> Option<Self::Item> {
113 if self.finished {
114 return None;
115 }
116
117 while self.front_bucket <= self.back_bucket {
118 let bucket = self.back_bucket as u64;
119 self.back_bucket -= 1;
120
121 match self.table.get(&BucketedKey::new(self.base_key, bucket)) {
122 Ok(Some(value_guard)) => {
123 return Some(Ok(V::from(value_guard.value())));
124 }
125 Ok(None) => continue,
126 Err(err) => {
127 self.finished = true;
128 return Some(Err(BucketError::IterationError(format!(
129 "Database error during point lookup: {}",
130 err
131 ))));
132 }
133 }
134 }
135
136 self.finished = true;
137 None
138 }
139}
140
141pub struct BucketRangeMultimapIterator<V>
177where
178 V: redb::Key + 'static,
179 for<'b> V: From<V::SelfType<'b>>,
180{
181 table: ReadOnlyMultimapTable<BucketedKey<u64>, V>,
182 base_key: u64,
183 start_bucket: u64,
184 end_bucket: u64,
185 front_bucket: i64,
186 back_bucket: i64,
187 finished: bool,
188 front_values: Option<VecDeque<V>>,
189 back_values: Option<VecDeque<V>>,
190}
191
192impl<V> BucketRangeMultimapIterator<V>
193where
194 V: redb::Key + 'static,
195 for<'b> V: From<V::SelfType<'b>>,
196{
197 pub fn new(
199 table: ReadOnlyMultimapTable<BucketedKey<u64>, V>,
200 key_builder: &KeyBuilder,
201 base_key: u64,
202 start_sequence: u64,
203 end_sequence: u64,
204 ) -> Result<Self, BucketError> {
205 if start_sequence > end_sequence {
206 return Err(BucketError::InvalidRange {
207 start: start_sequence,
208 end: end_sequence,
209 });
210 }
211
212 let bucket_size = key_builder.bucket_size();
213 let start_bucket = start_sequence / bucket_size;
214 let end_bucket = end_sequence / bucket_size;
215
216 Ok(Self {
217 table,
218 base_key,
219 start_bucket,
220 end_bucket,
221 front_bucket: start_bucket as i64,
222 back_bucket: end_bucket as i64,
223 finished: false,
224 front_values: None,
225 back_values: None,
226 })
227 }
228
229 pub fn bucket_range(&self) -> (u64, u64) {
231 (self.start_bucket, self.end_bucket)
232 }
233}
234
235impl<V> Iterator for BucketRangeMultimapIterator<V>
236where
237 V: redb::Key + 'static,
238 for<'b> V: From<V::SelfType<'b>>,
239{
240 type Item = Result<V, BucketError>;
241
242 fn next(&mut self) -> Option<Self::Item> {
243 if self.finished {
244 return None;
245 }
246
247 loop {
248 if let Some(values) = self.front_values.as_mut() {
249 if let Some(value) = values.pop_front() {
250 return Some(Ok(value));
251 }
252 self.front_values = None;
253 }
254
255 if self.front_bucket > self.back_bucket {
256 self.finished = true;
257 return None;
258 }
259
260 let bucket = self.front_bucket as u64;
261 self.front_bucket += 1;
262
263 match self.table.get(&BucketedKey::new(self.base_key, bucket)) {
264 Ok(values) => {
265 let mut collected = VecDeque::new();
266 for value_result in values {
267 match value_result {
268 Ok(value_guard) => {
269 collected.push_back(V::from(value_guard.value()));
270 }
271 Err(err) => {
272 self.finished = true;
273 return Some(Err(BucketError::IterationError(format!(
274 "Database error during point lookup: {}",
275 err
276 ))));
277 }
278 }
279 }
280 if collected.is_empty() {
281 continue;
282 }
283 self.front_values = Some(collected);
284 }
285 Err(err) => {
286 self.finished = true;
287 return Some(Err(BucketError::IterationError(format!(
288 "Database error during point lookup: {}",
289 err
290 ))));
291 }
292 }
293 }
294 }
295}
296
297impl<V> DoubleEndedIterator for BucketRangeMultimapIterator<V>
298where
299 V: redb::Key + 'static,
300 for<'b> V: From<V::SelfType<'b>>,
301{
302 fn next_back(&mut self) -> Option<Self::Item> {
303 if self.finished {
304 return None;
305 }
306
307 loop {
308 if let Some(values) = self.back_values.as_mut() {
309 if let Some(value) = values.pop_back() {
310 return Some(Ok(value));
311 }
312 self.back_values = None;
313 }
314
315 if self.front_bucket > self.back_bucket {
316 self.finished = true;
317 return None;
318 }
319
320 let bucket = self.back_bucket as u64;
321 self.back_bucket -= 1;
322
323 match self.table.get(&BucketedKey::new(self.base_key, bucket)) {
324 Ok(values) => {
325 let mut collected = VecDeque::new();
326 for value_result in values {
327 match value_result {
328 Ok(value_guard) => {
329 collected.push_back(V::from(value_guard.value()));
330 }
331 Err(err) => {
332 self.finished = true;
333 return Some(Err(BucketError::IterationError(format!(
334 "Database error during point lookup: {}",
335 err
336 ))));
337 }
338 }
339 }
340 if collected.is_empty() {
341 continue;
342 }
343 self.back_values = Some(collected);
344 }
345 Err(err) => {
346 self.finished = true;
347 return Some(Err(BucketError::IterationError(format!(
348 "Database error during point lookup: {}",
349 err
350 ))));
351 }
352 }
353 }
354 }
355}
356
357pub trait BucketIterExt<V>
364where
365 V: redb::Value + 'static,
366 for<'b> V: From<V::SelfType<'b>>,
367{
368 fn bucket_range(
369 self,
370 key_builder: &KeyBuilder,
371 base_key: u64,
372 start_sequence: u64,
373 end_sequence: u64,
374 ) -> Result<BucketRangeIterator<V>, BucketError>;
375}
376
377impl<V> BucketIterExt<V> for ReadOnlyTable<BucketedKey<u64>, V>
378where
379 V: redb::Value + 'static,
380 for<'b> V: From<V::SelfType<'b>>,
381{
382 fn bucket_range(
383 self,
384 key_builder: &KeyBuilder,
385 base_key: u64,
386 start_sequence: u64,
387 end_sequence: u64,
388 ) -> Result<BucketRangeIterator<V>, BucketError> {
389 BucketRangeIterator::new(self, key_builder, base_key, start_sequence, end_sequence)
390 }
391}
392
393pub trait BucketMultimapIterExt<V>
400where
401 V: redb::Key + 'static,
402 for<'b> V: From<V::SelfType<'b>>,
403{
404 fn bucket_range(
405 self,
406 key_builder: &KeyBuilder,
407 base_key: u64,
408 start_sequence: u64,
409 end_sequence: u64,
410 ) -> Result<BucketRangeMultimapIterator<V>, BucketError>;
411}
412
413impl<V> BucketMultimapIterExt<V> for ReadOnlyMultimapTable<BucketedKey<u64>, V>
414where
415 V: redb::Key + 'static,
416 for<'b> V: From<V::SelfType<'b>>,
417{
418 fn bucket_range(
419 self,
420 key_builder: &KeyBuilder,
421 base_key: u64,
422 start_sequence: u64,
423 end_sequence: u64,
424 ) -> Result<BucketRangeMultimapIterator<V>, BucketError> {
425 BucketRangeMultimapIterator::new(self, key_builder, base_key, start_sequence, end_sequence)
426 }
427}
428
429#[cfg(test)]
430mod tests {
431 use super::*;
432 use redb::{Database, MultimapTableDefinition, ReadableDatabase, TableDefinition};
433 use tempfile::NamedTempFile;
434
435 const TEST_TABLE: TableDefinition<'static, BucketedKey<u64>, String> =
436 TableDefinition::new("test_table");
437 const TEST_MULTIMAP: MultimapTableDefinition<'static, BucketedKey<u64>, u64> =
438 MultimapTableDefinition::new("test_multimap");
439
440 #[test]
441 fn test_basic_functionality() -> Result<(), Box<dyn std::error::Error>> {
442 let temp_file = NamedTempFile::new()?;
443 let db = Database::create(temp_file.path())?;
444 let key_builder = KeyBuilder::new(100)?;
445
446 {
448 let write_txn = db.begin_write()?;
449 {
450 let mut table = write_txn.open_table(TEST_TABLE)?;
451
452 table.insert(key_builder.bucketed_key(123u64, 50), "value_50".to_string())?;
454 table.insert(
455 key_builder.bucketed_key(123u64, 150),
456 "value_150".to_string(),
457 )?;
458 table.insert(
459 key_builder.bucketed_key(123u64, 250),
460 "value_250".to_string(),
461 )?;
462
463 table.insert(key_builder.bucketed_key(456u64, 50), "other_50".to_string())?;
465 table.insert(
466 key_builder.bucketed_key(456u64, 150),
467 "other_150".to_string(),
468 )?;
469 }
470 write_txn.commit()?;
471 }
472
473 {
475 let read_txn = db.begin_read()?;
476 let iter = BucketRangeIterator::new(
477 read_txn.open_table(TEST_TABLE)?,
478 &key_builder,
479 123u64,
480 0,
481 199,
482 )?;
483 assert_eq!(iter.bucket_range(), (0, 1));
484
485 let invalid_iter = BucketRangeIterator::new(
487 read_txn.open_table(TEST_TABLE)?,
488 &key_builder,
489 123u64,
490 200,
491 100,
492 );
493 assert!(invalid_iter.is_err());
494 }
495
496 {
498 let read_txn = db.begin_read()?;
499 let iter = BucketRangeIterator::new(
500 read_txn.open_table(TEST_TABLE)?,
501 &key_builder,
502 123u64,
503 0,
504 299,
505 )?;
506 let values: Vec<String> = iter.collect::<Result<_, _>>()?;
507 assert_eq!(
508 values,
509 vec![
510 "value_50".to_string(),
511 "value_150".to_string(),
512 "value_250".to_string()
513 ]
514 );
515
516 let iter = BucketRangeIterator::new(
517 read_txn.open_table(TEST_TABLE)?,
518 &key_builder,
519 123u64,
520 0,
521 299,
522 )?;
523 let values: Vec<String> = iter.rev().collect::<Result<_, _>>()?;
524 assert_eq!(
525 values,
526 vec![
527 "value_250".to_string(),
528 "value_150".to_string(),
529 "value_50".to_string()
530 ]
531 );
532
533 let iter =
534 read_txn
535 .open_table(TEST_TABLE)?
536 .bucket_range(&key_builder, 456u64, 0, 299)?;
537 let values: Vec<String> = iter.collect::<Result<_, _>>()?;
538 assert_eq!(
539 values,
540 vec!["other_50".to_string(), "other_150".to_string()]
541 );
542 }
543
544 Ok(())
545 }
546
547 #[test]
548 fn test_multimap_functionality() -> Result<(), Box<dyn std::error::Error>> {
549 let temp_file = NamedTempFile::new()?;
550 let db = Database::create(temp_file.path())?;
551 let key_builder = KeyBuilder::new(100)?;
552
553 {
554 let write_txn = db.begin_write()?;
555 {
556 let mut table = write_txn.open_multimap_table(TEST_MULTIMAP)?;
557
558 table.insert(key_builder.bucketed_key(123u64, 50), 10u64)?;
559 table.insert(key_builder.bucketed_key(123u64, 50), 20u64)?;
560 table.insert(key_builder.bucketed_key(123u64, 150), 30u64)?;
561 table.insert(key_builder.bucketed_key(123u64, 150), 40u64)?;
562
563 table.insert(key_builder.bucketed_key(456u64, 50), 99u64)?;
564 table.insert(key_builder.bucketed_key(456u64, 50), 100u64)?;
565 }
566 write_txn.commit()?;
567 }
568
569 {
570 let read_txn = db.begin_read()?;
571 let iter = BucketRangeMultimapIterator::new(
572 read_txn.open_multimap_table(TEST_MULTIMAP)?,
573 &key_builder,
574 123u64,
575 0,
576 199,
577 )?;
578 assert_eq!(iter.bucket_range(), (0, 1));
579
580 let values: Vec<u64> = iter.collect::<Result<_, _>>()?;
581 assert_eq!(values, vec![10u64, 20u64, 30u64, 40u64]);
582
583 let iter = BucketRangeMultimapIterator::new(
584 read_txn.open_multimap_table(TEST_MULTIMAP)?,
585 &key_builder,
586 123u64,
587 0,
588 199,
589 )?;
590 let values: Vec<u64> = iter.rev().collect::<Result<_, _>>()?;
591 assert_eq!(values, vec![40u64, 30u64, 20u64, 10u64]);
592
593 let iter = read_txn.open_multimap_table(TEST_MULTIMAP)?.bucket_range(
594 &key_builder,
595 456u64,
596 0,
597 99,
598 )?;
599 let values: Vec<u64> = iter.collect::<Result<_, _>>()?;
600 assert_eq!(values, vec![99u64, 100u64]);
601 }
602
603 Ok(())
604 }
605}