1use crate::buckets::key::{BucketedKey, KeyBuilder};
6use crate::buckets::BucketError;
7use redb::{MultimapValue, ReadOnlyMultimapTable, ReadOnlyTable};
8
9pub struct BucketRangeIterator<'a, V>
16where
17 V: redb::Value + 'static,
18 for<'b> V: From<V::SelfType<'b>>,
19{
20 table: &'a ReadOnlyTable<BucketedKey<u64>, V>,
21 base_key: u64,
22 start_bucket: u64,
23 end_bucket: u64,
24 front_bucket: i64,
25 back_bucket: i64,
26 finished: bool,
27}
28
29impl<'a, V> BucketRangeIterator<'a, V>
30where
31 V: redb::Value + 'static,
32 for<'b> V: From<V::SelfType<'b>>,
33{
34 pub fn new(
36 table: &'a ReadOnlyTable<BucketedKey<u64>, V>,
37 key_builder: &KeyBuilder,
38 base_key: u64,
39 start_sequence: u64,
40 end_sequence: u64,
41 ) -> Result<Self, BucketError> {
42 if start_sequence > end_sequence {
43 return Err(BucketError::InvalidRange {
44 start: start_sequence,
45 end: end_sequence,
46 });
47 }
48
49 let bucket_size = key_builder.bucket_size();
50 let start_bucket = start_sequence / bucket_size;
51 let end_bucket = end_sequence / bucket_size;
52
53 Ok(Self {
54 table,
55 base_key,
56 start_bucket,
57 end_bucket,
58 front_bucket: start_bucket as i64,
59 back_bucket: end_bucket as i64,
60 finished: false,
61 })
62 }
63
64 pub fn bucket_range(&self) -> (u64, u64) {
66 (self.start_bucket, self.end_bucket)
67 }
68}
69
70impl<'a, V> Iterator for BucketRangeIterator<'a, V>
71where
72 V: redb::Value + 'static,
73 for<'b> V: From<V::SelfType<'b>>,
74{
75 type Item = Result<V, BucketError>;
76
77 fn next(&mut self) -> Option<Self::Item> {
78 if self.finished {
79 return None;
80 }
81
82 while self.front_bucket <= self.back_bucket {
83 let bucket = self.front_bucket as u64;
84 self.front_bucket += 1;
85
86 match self.table.get(&BucketedKey::new(self.base_key, bucket)) {
87 Ok(Some(value_guard)) => {
88 return Some(Ok(V::from(value_guard.value())));
89 }
90 Ok(None) => continue,
91 Err(err) => {
92 self.finished = true;
93 return Some(Err(BucketError::IterationError(format!(
94 "Database error during point lookup: {}",
95 err
96 ))));
97 }
98 }
99 }
100
101 self.finished = true;
102 None
103 }
104}
105
106impl<'a, V> DoubleEndedIterator for BucketRangeIterator<'a, V>
107where
108 V: redb::Value + 'static,
109 for<'b> V: From<V::SelfType<'b>>,
110{
111 fn next_back(&mut self) -> Option<Self::Item> {
112 if self.finished {
113 return None;
114 }
115
116 while self.front_bucket <= self.back_bucket {
117 let bucket = self.back_bucket as u64;
118 self.back_bucket -= 1;
119
120 match self.table.get(&BucketedKey::new(self.base_key, bucket)) {
121 Ok(Some(value_guard)) => {
122 return Some(Ok(V::from(value_guard.value())));
123 }
124 Ok(None) => continue,
125 Err(err) => {
126 self.finished = true;
127 return Some(Err(BucketError::IterationError(format!(
128 "Database error during point lookup: {}",
129 err
130 ))));
131 }
132 }
133 }
134
135 self.finished = true;
136 None
137 }
138}
139
140pub struct BucketRangeMultimapIterator<'a, V>
176where
177 V: redb::Key + 'static,
178 for<'b> V: From<V::SelfType<'b>>,
179{
180 table: &'a ReadOnlyMultimapTable<BucketedKey<u64>, V>,
181 base_key: u64,
182 start_bucket: u64,
183 end_bucket: u64,
184 front_bucket: i64,
185 back_bucket: i64,
186 finished: bool,
187 front_values: Option<MultimapValue<'a, V>>,
188 back_values: Option<MultimapValue<'a, V>>,
189}
190
191impl<'a, V> BucketRangeMultimapIterator<'a, V>
192where
193 V: redb::Key + 'static,
194 for<'b> V: From<V::SelfType<'b>>,
195{
196 pub fn new(
198 table: &'a ReadOnlyMultimapTable<BucketedKey<u64>, V>,
199 key_builder: &KeyBuilder,
200 base_key: u64,
201 start_sequence: u64,
202 end_sequence: u64,
203 ) -> Result<Self, BucketError> {
204 if start_sequence > end_sequence {
205 return Err(BucketError::InvalidRange {
206 start: start_sequence,
207 end: end_sequence,
208 });
209 }
210
211 let bucket_size = key_builder.bucket_size();
212 let start_bucket = start_sequence / bucket_size;
213 let end_bucket = end_sequence / bucket_size;
214
215 Ok(Self {
216 table,
217 base_key,
218 start_bucket,
219 end_bucket,
220 front_bucket: start_bucket as i64,
221 back_bucket: end_bucket as i64,
222 finished: false,
223 front_values: None,
224 back_values: None,
225 })
226 }
227
228 pub fn bucket_range(&self) -> (u64, u64) {
230 (self.start_bucket, self.end_bucket)
231 }
232}
233
234impl<'a, V> Iterator for BucketRangeMultimapIterator<'a, V>
235where
236 V: redb::Key + 'static,
237 for<'b> V: From<V::SelfType<'b>>,
238{
239 type Item = Result<V, BucketError>;
240
241 fn next(&mut self) -> Option<Self::Item> {
242 if self.finished {
243 return None;
244 }
245
246 loop {
247 if let Some(values) = self.front_values.as_mut() {
248 match values.next() {
249 Some(Ok(value_guard)) => {
250 return Some(Ok(V::from(value_guard.value())));
251 }
252 Some(Err(err)) => {
253 self.finished = true;
254 return Some(Err(BucketError::IterationError(format!(
255 "Database error during point lookup: {}",
256 err
257 ))));
258 }
259 None => {
260 self.front_values = None;
261 }
262 }
263 }
264
265 if self.front_bucket > self.back_bucket {
266 self.finished = true;
267 return None;
268 }
269
270 let bucket = self.front_bucket as u64;
271 self.front_bucket += 1;
272
273 match self.table.get(&BucketedKey::new(self.base_key, bucket)) {
274 Ok(values) => {
275 self.front_values = Some(values);
276 }
277 Err(err) => {
278 self.finished = true;
279 return Some(Err(BucketError::IterationError(format!(
280 "Database error during point lookup: {}",
281 err
282 ))));
283 }
284 }
285 }
286 }
287}
288
289impl<'a, V> DoubleEndedIterator for BucketRangeMultimapIterator<'a, V>
290where
291 V: redb::Key + 'static,
292 for<'b> V: From<V::SelfType<'b>>,
293{
294 fn next_back(&mut self) -> Option<Self::Item> {
295 if self.finished {
296 return None;
297 }
298
299 loop {
300 if let Some(values) = self.back_values.as_mut() {
301 match values.next_back() {
302 Some(Ok(value_guard)) => {
303 return Some(Ok(V::from(value_guard.value())));
304 }
305 Some(Err(err)) => {
306 self.finished = true;
307 return Some(Err(BucketError::IterationError(format!(
308 "Database error during point lookup: {}",
309 err
310 ))));
311 }
312 None => {
313 self.back_values = None;
314 }
315 }
316 }
317
318 if self.front_bucket > self.back_bucket {
319 self.finished = true;
320 return None;
321 }
322
323 let bucket = self.back_bucket as u64;
324 self.back_bucket -= 1;
325
326 match self.table.get(&BucketedKey::new(self.base_key, bucket)) {
327 Ok(values) => {
328 self.back_values = Some(values);
329 }
330 Err(err) => {
331 self.finished = true;
332 return Some(Err(BucketError::IterationError(format!(
333 "Database error during point lookup: {}",
334 err
335 ))));
336 }
337 }
338 }
339 }
340}
341
342pub trait BucketIterExt<V>
347where
348 V: redb::Value + 'static,
349 for<'b> V: From<V::SelfType<'b>>,
350{
351 fn bucket_range(
352 &self,
353 key_builder: &KeyBuilder,
354 base_key: u64,
355 start_sequence: u64,
356 end_sequence: u64,
357 ) -> Result<BucketRangeIterator<'_, V>, BucketError>;
358}
359
360impl<V> BucketIterExt<V> for ReadOnlyTable<BucketedKey<u64>, V>
361where
362 V: redb::Value + 'static,
363 for<'b> V: From<V::SelfType<'b>>,
364{
365 fn bucket_range(
366 &self,
367 key_builder: &KeyBuilder,
368 base_key: u64,
369 start_sequence: u64,
370 end_sequence: u64,
371 ) -> Result<BucketRangeIterator<'_, V>, BucketError> {
372 BucketRangeIterator::new(self, key_builder, base_key, start_sequence, end_sequence)
373 }
374}
375
376pub trait BucketMultimapIterExt<V>
381where
382 V: redb::Key + 'static,
383 for<'b> V: From<V::SelfType<'b>>,
384{
385 fn bucket_range(
386 &self,
387 key_builder: &KeyBuilder,
388 base_key: u64,
389 start_sequence: u64,
390 end_sequence: u64,
391 ) -> Result<BucketRangeMultimapIterator<'_, V>, BucketError>;
392}
393
394impl<V> BucketMultimapIterExt<V> for ReadOnlyMultimapTable<BucketedKey<u64>, V>
395where
396 V: redb::Key + 'static,
397 for<'b> V: From<V::SelfType<'b>>,
398{
399 fn bucket_range(
400 &self,
401 key_builder: &KeyBuilder,
402 base_key: u64,
403 start_sequence: u64,
404 end_sequence: u64,
405 ) -> Result<BucketRangeMultimapIterator<'_, V>, BucketError> {
406 BucketRangeMultimapIterator::new(self, key_builder, base_key, start_sequence, end_sequence)
407 }
408}
409
410#[cfg(test)]
411mod tests {
412 use super::*;
413 use redb::{Database, MultimapTableDefinition, ReadableDatabase, TableDefinition};
414 use tempfile::NamedTempFile;
415
416 const TEST_TABLE: TableDefinition<'static, BucketedKey<u64>, String> =
417 TableDefinition::new("test_table");
418 const TEST_MULTIMAP: MultimapTableDefinition<'static, BucketedKey<u64>, u64> =
419 MultimapTableDefinition::new("test_multimap");
420
421 #[test]
422 fn test_basic_functionality() -> Result<(), Box<dyn std::error::Error>> {
423 let temp_file = NamedTempFile::new()?;
424 let db = Database::create(temp_file.path())?;
425 let key_builder = KeyBuilder::new(100)?;
426
427 {
429 let write_txn = db.begin_write()?;
430 {
431 let mut table = write_txn.open_table(TEST_TABLE)?;
432
433 table.insert(key_builder.bucketed_key(123u64, 50), "value_50".to_string())?;
435 table.insert(
436 key_builder.bucketed_key(123u64, 150),
437 "value_150".to_string(),
438 )?;
439 table.insert(
440 key_builder.bucketed_key(123u64, 250),
441 "value_250".to_string(),
442 )?;
443
444 table.insert(key_builder.bucketed_key(456u64, 50), "other_50".to_string())?;
446 table.insert(
447 key_builder.bucketed_key(456u64, 150),
448 "other_150".to_string(),
449 )?;
450 }
451 write_txn.commit()?;
452 }
453
454 {
456 let read_txn = db.begin_read()?;
457 let table = read_txn.open_table(TEST_TABLE)?;
458 let iter = BucketRangeIterator::new(&table, &key_builder, 123u64, 0, 199)?;
459 assert_eq!(iter.bucket_range(), (0, 1));
460
461 let invalid_iter = BucketRangeIterator::new(&table, &key_builder, 123u64, 200, 100);
463 assert!(invalid_iter.is_err());
464 }
465
466 {
468 let read_txn = db.begin_read()?;
469 let table = read_txn.open_table(TEST_TABLE)?;
470 let iter = BucketRangeIterator::new(&table, &key_builder, 123u64, 0, 299)?;
471 let values: Vec<String> = iter.collect::<Result<_, _>>()?;
472 assert_eq!(
473 values,
474 vec![
475 "value_50".to_string(),
476 "value_150".to_string(),
477 "value_250".to_string()
478 ]
479 );
480
481 let iter = BucketRangeIterator::new(&table, &key_builder, 123u64, 0, 299)?;
482 let values: Vec<String> = iter.rev().collect::<Result<_, _>>()?;
483 assert_eq!(
484 values,
485 vec![
486 "value_250".to_string(),
487 "value_150".to_string(),
488 "value_50".to_string()
489 ]
490 );
491
492 let iter = table.bucket_range(&key_builder, 456u64, 0, 299)?;
493 let values: Vec<String> = iter.collect::<Result<_, _>>()?;
494 assert_eq!(
495 values,
496 vec!["other_50".to_string(), "other_150".to_string()]
497 );
498 }
499
500 Ok(())
501 }
502
503 #[test]
504 fn test_multimap_functionality() -> Result<(), Box<dyn std::error::Error>> {
505 let temp_file = NamedTempFile::new()?;
506 let db = Database::create(temp_file.path())?;
507 let key_builder = KeyBuilder::new(100)?;
508
509 {
510 let write_txn = db.begin_write()?;
511 {
512 let mut table = write_txn.open_multimap_table(TEST_MULTIMAP)?;
513
514 table.insert(key_builder.bucketed_key(123u64, 50), 10u64)?;
515 table.insert(key_builder.bucketed_key(123u64, 50), 20u64)?;
516 table.insert(key_builder.bucketed_key(123u64, 150), 30u64)?;
517 table.insert(key_builder.bucketed_key(123u64, 150), 40u64)?;
518
519 table.insert(key_builder.bucketed_key(456u64, 50), 99u64)?;
520 table.insert(key_builder.bucketed_key(456u64, 50), 100u64)?;
521 }
522 write_txn.commit()?;
523 }
524
525 {
526 let read_txn = db.begin_read()?;
527 let table = read_txn.open_multimap_table(TEST_MULTIMAP)?;
528 let iter = BucketRangeMultimapIterator::new(&table, &key_builder, 123u64, 0, 199)?;
529 assert_eq!(iter.bucket_range(), (0, 1));
530
531 let values: Vec<u64> = iter.collect::<Result<_, _>>()?;
532 assert_eq!(values, vec![10u64, 20u64, 30u64, 40u64]);
533
534 let iter = BucketRangeMultimapIterator::new(&table, &key_builder, 123u64, 0, 199)?;
535 let values: Vec<u64> = iter.rev().collect::<Result<_, _>>()?;
536 assert_eq!(values, vec![40u64, 30u64, 20u64, 10u64]);
537
538 let iter = table.bucket_range(&key_builder, 456u64, 0, 99)?;
539 let values: Vec<u64> = iter.collect::<Result<_, _>>()?;
540 assert_eq!(values, vec![99u64, 100u64]);
541 }
542
543 Ok(())
544 }
545}