1use crate::key_buckets::BucketError;
7use crate::table_buckets::TableBucketBuilder;
8use redb::{ReadOnlyMultimapTable, ReadOnlyTable, ReadTransaction, TableError};
9use std::borrow::Borrow;
10use std::collections::VecDeque;
11use std::marker::PhantomData;
12
13pub struct TableBucketRangeIterator<'a, K, V>
20where
21 K: redb::Key + Clone + 'static,
22 for<'b> K: Borrow<K::SelfType<'b>>,
23 V: redb::Value + 'static,
24 for<'b> V: From<V::SelfType<'b>>,
25{
26 txn: &'a ReadTransaction,
27 builder: &'a TableBucketBuilder,
28 base_key: K,
29 start_bucket: u64,
30 end_bucket: u64,
31 front_bucket: i64,
32 back_bucket: i64,
33 finished: bool,
34 _phantom: PhantomData<V>,
35}
36
37impl<'a, K, V> TableBucketRangeIterator<'a, K, V>
38where
39 K: redb::Key + Clone + 'static,
40 for<'b> K: Borrow<K::SelfType<'b>>,
41 V: redb::Value + 'static,
42 for<'b> V: From<V::SelfType<'b>>,
43{
44 pub fn new(
46 txn: &'a ReadTransaction,
47 builder: &'a TableBucketBuilder,
48 base_key: K,
49 start_sequence: u64,
50 end_sequence: u64,
51 ) -> Result<Self, BucketError> {
52 if start_sequence > end_sequence {
53 return Err(BucketError::InvalidRange {
54 start: start_sequence,
55 end: end_sequence,
56 });
57 }
58
59 let bucket_size = builder.bucket_size();
60 let start_bucket = start_sequence / bucket_size;
61 let end_bucket = end_sequence / bucket_size;
62
63 Ok(Self {
64 txn,
65 builder,
66 base_key,
67 start_bucket,
68 end_bucket,
69 front_bucket: start_bucket as i64,
70 back_bucket: end_bucket as i64,
71 finished: false,
72 _phantom: PhantomData,
73 })
74 }
75
76 pub fn bucket_range(&self) -> (u64, u64) {
78 (self.start_bucket, self.end_bucket)
79 }
80
81 fn open_table(&self, bucket: u64) -> Result<Option<ReadOnlyTable<K, V>>, BucketError> {
82 let definition = self.builder.table_definition::<K, V>(bucket);
83 match self.txn.open_table(definition) {
84 Ok(table) => Ok(Some(table)),
85 Err(TableError::TableDoesNotExist(_)) => Ok(None),
86 Err(err) => Err(BucketError::IterationError(format!(
87 "Failed to open bucket table {}: {}",
88 bucket, err
89 ))),
90 }
91 }
92}
93
94impl<'a, K, V> Iterator for TableBucketRangeIterator<'a, K, V>
95where
96 K: redb::Key + Clone + 'static,
97 for<'b> K: Borrow<K::SelfType<'b>>,
98 V: redb::Value + 'static,
99 for<'b> V: From<V::SelfType<'b>>,
100{
101 type Item = Result<V, BucketError>;
102
103 fn next(&mut self) -> Option<Self::Item> {
104 if self.finished {
105 return None;
106 }
107
108 while self.front_bucket <= self.back_bucket {
109 let bucket = self.front_bucket as u64;
110 self.front_bucket += 1;
111
112 let table = match self.open_table(bucket) {
113 Ok(Some(table)) => table,
114 Ok(None) => continue,
115 Err(err) => {
116 self.finished = true;
117 return Some(Err(err));
118 }
119 };
120
121 match table.get(self.base_key.clone()) {
122 Ok(Some(value_guard)) => {
123 return Some(Ok(V::from(value_guard.value())));
124 }
125 Ok(None) => continue,
126 Err(err) => {
127 self.finished = true;
128 return Some(Err(BucketError::IterationError(format!(
129 "Database error during point lookup: {}",
130 err
131 ))));
132 }
133 }
134 }
135
136 self.finished = true;
137 None
138 }
139}
140
141impl<'a, K, V> DoubleEndedIterator for TableBucketRangeIterator<'a, K, V>
142where
143 K: redb::Key + Clone + 'static,
144 for<'b> K: Borrow<K::SelfType<'b>>,
145 V: redb::Value + 'static,
146 for<'b> V: From<V::SelfType<'b>>,
147{
148 fn next_back(&mut self) -> Option<Self::Item> {
149 if self.finished {
150 return None;
151 }
152
153 while self.front_bucket <= self.back_bucket {
154 let bucket = self.back_bucket as u64;
155 self.back_bucket -= 1;
156
157 let table = match self.open_table(bucket) {
158 Ok(Some(table)) => table,
159 Ok(None) => continue,
160 Err(err) => {
161 self.finished = true;
162 return Some(Err(err));
163 }
164 };
165
166 match table.get(self.base_key.clone()) {
167 Ok(Some(value_guard)) => {
168 return Some(Ok(V::from(value_guard.value())));
169 }
170 Ok(None) => continue,
171 Err(err) => {
172 self.finished = true;
173 return Some(Err(BucketError::IterationError(format!(
174 "Database error during point lookup: {}",
175 err
176 ))));
177 }
178 }
179 }
180
181 self.finished = true;
182 None
183 }
184}
185
186pub struct TableBucketRangeMultimapIterator<'a, K, V>
193where
194 K: redb::Key + Clone + 'static,
195 for<'b> K: Borrow<K::SelfType<'b>>,
196 V: redb::Key + 'static,
197 for<'b> V: From<V::SelfType<'b>>,
198{
199 txn: &'a ReadTransaction,
200 builder: &'a TableBucketBuilder,
201 base_key: K,
202 start_bucket: u64,
203 end_bucket: u64,
204 front_bucket: i64,
205 back_bucket: i64,
206 finished: bool,
207 front_values: Option<VecDeque<V>>,
208 back_values: Option<VecDeque<V>>,
209}
210
211impl<'a, K, V> TableBucketRangeMultimapIterator<'a, K, V>
212where
213 K: redb::Key + Clone + 'static,
214 for<'b> K: Borrow<K::SelfType<'b>>,
215 V: redb::Key + 'static,
216 for<'b> V: From<V::SelfType<'b>>,
217{
218 pub fn new(
220 txn: &'a ReadTransaction,
221 builder: &'a TableBucketBuilder,
222 base_key: K,
223 start_sequence: u64,
224 end_sequence: u64,
225 ) -> Result<Self, BucketError> {
226 if start_sequence > end_sequence {
227 return Err(BucketError::InvalidRange {
228 start: start_sequence,
229 end: end_sequence,
230 });
231 }
232
233 let bucket_size = builder.bucket_size();
234 let start_bucket = start_sequence / bucket_size;
235 let end_bucket = end_sequence / bucket_size;
236
237 Ok(Self {
238 txn,
239 builder,
240 base_key,
241 start_bucket,
242 end_bucket,
243 front_bucket: start_bucket as i64,
244 back_bucket: end_bucket as i64,
245 finished: false,
246 front_values: None,
247 back_values: None,
248 })
249 }
250
251 pub fn bucket_range(&self) -> (u64, u64) {
253 (self.start_bucket, self.end_bucket)
254 }
255
256 fn open_table(&self, bucket: u64) -> Result<Option<ReadOnlyMultimapTable<K, V>>, BucketError> {
257 let definition = self.builder.multimap_table_definition::<K, V>(bucket);
258 match self.txn.open_multimap_table(definition) {
259 Ok(table) => Ok(Some(table)),
260 Err(TableError::TableDoesNotExist(_)) => Ok(None),
261 Err(err) => Err(BucketError::IterationError(format!(
262 "Failed to open bucket table {}: {}",
263 bucket, err
264 ))),
265 }
266 }
267}
268
269impl<'a, K, V> Iterator for TableBucketRangeMultimapIterator<'a, K, V>
270where
271 K: redb::Key + Clone + 'static,
272 for<'b> K: Borrow<K::SelfType<'b>>,
273 V: redb::Key + 'static,
274 for<'b> V: From<V::SelfType<'b>>,
275{
276 type Item = Result<V, BucketError>;
277
278 fn next(&mut self) -> Option<Self::Item> {
279 if self.finished {
280 return None;
281 }
282
283 loop {
284 if let Some(values) = self.front_values.as_mut() {
285 if let Some(value) = values.pop_front() {
286 return Some(Ok(value));
287 }
288 self.front_values = None;
289 }
290
291 if self.front_bucket > self.back_bucket {
292 self.finished = true;
293 return None;
294 }
295
296 let bucket = self.front_bucket as u64;
297 self.front_bucket += 1;
298
299 let table = match self.open_table(bucket) {
300 Ok(Some(table)) => table,
301 Ok(None) => continue,
302 Err(err) => {
303 self.finished = true;
304 return Some(Err(err));
305 }
306 };
307
308 match table.get(self.base_key.clone()) {
309 Ok(values) => {
310 let mut collected = VecDeque::new();
311 for value_result in values {
312 match value_result {
313 Ok(value_guard) => {
314 collected.push_back(V::from(value_guard.value()));
315 }
316 Err(err) => {
317 self.finished = true;
318 return Some(Err(BucketError::IterationError(format!(
319 "Database error during point lookup: {}",
320 err
321 ))));
322 }
323 }
324 }
325 if collected.is_empty() {
326 continue;
327 }
328 self.front_values = Some(collected);
329 }
330 Err(err) => {
331 self.finished = true;
332 return Some(Err(BucketError::IterationError(format!(
333 "Database error during point lookup: {}",
334 err
335 ))));
336 }
337 }
338 }
339 }
340}
341
342impl<'a, K, V> DoubleEndedIterator for TableBucketRangeMultimapIterator<'a, K, V>
343where
344 K: redb::Key + Clone + 'static,
345 for<'b> K: Borrow<K::SelfType<'b>>,
346 V: redb::Key + 'static,
347 for<'b> V: From<V::SelfType<'b>>,
348{
349 fn next_back(&mut self) -> Option<Self::Item> {
350 if self.finished {
351 return None;
352 }
353
354 loop {
355 if let Some(values) = self.back_values.as_mut() {
356 if let Some(value) = values.pop_back() {
357 return Some(Ok(value));
358 }
359 self.back_values = None;
360 }
361
362 if self.front_bucket > self.back_bucket {
363 self.finished = true;
364 return None;
365 }
366
367 let bucket = self.back_bucket as u64;
368 self.back_bucket -= 1;
369
370 let table = match self.open_table(bucket) {
371 Ok(Some(table)) => table,
372 Ok(None) => continue,
373 Err(err) => {
374 self.finished = true;
375 return Some(Err(err));
376 }
377 };
378
379 match table.get(self.base_key.clone()) {
380 Ok(values) => {
381 let mut collected = VecDeque::new();
382 for value_result in values {
383 match value_result {
384 Ok(value_guard) => {
385 collected.push_back(V::from(value_guard.value()));
386 }
387 Err(err) => {
388 self.finished = true;
389 return Some(Err(BucketError::IterationError(format!(
390 "Database error during point lookup: {}",
391 err
392 ))));
393 }
394 }
395 }
396 if collected.is_empty() {
397 continue;
398 }
399 self.back_values = Some(collected);
400 }
401 Err(err) => {
402 self.finished = true;
403 return Some(Err(BucketError::IterationError(format!(
404 "Database error during point lookup: {}",
405 err
406 ))));
407 }
408 }
409 }
410 }
411}
412
413pub trait TableBucketIterExt {
415 fn table_bucket_range<'a, K, V>(
416 &'a self,
417 builder: &'a TableBucketBuilder,
418 base_key: K,
419 start_sequence: u64,
420 end_sequence: u64,
421 ) -> Result<TableBucketRangeIterator<'a, K, V>, BucketError>
422 where
423 K: redb::Key + Clone + 'static,
424 for<'b> K: Borrow<K::SelfType<'b>>,
425 V: redb::Value + 'static,
426 for<'b> V: From<V::SelfType<'b>>;
427}
428
429impl TableBucketIterExt for ReadTransaction {
430 fn table_bucket_range<'a, K, V>(
431 &'a self,
432 builder: &'a TableBucketBuilder,
433 base_key: K,
434 start_sequence: u64,
435 end_sequence: u64,
436 ) -> Result<TableBucketRangeIterator<'a, K, V>, BucketError>
437 where
438 K: redb::Key + Clone + 'static,
439 for<'b> K: Borrow<K::SelfType<'b>>,
440 V: redb::Value + 'static,
441 for<'b> V: From<V::SelfType<'b>>,
442 {
443 TableBucketRangeIterator::<K, V>::new(self, builder, base_key, start_sequence, end_sequence)
444 }
445}
446
447pub trait TableBucketMultimapIterExt {
449 fn table_bucket_multimap_range<'a, K, V>(
450 &'a self,
451 builder: &'a TableBucketBuilder,
452 base_key: K,
453 start_sequence: u64,
454 end_sequence: u64,
455 ) -> Result<TableBucketRangeMultimapIterator<'a, K, V>, BucketError>
456 where
457 K: redb::Key + Clone + 'static,
458 for<'b> K: Borrow<K::SelfType<'b>>,
459 V: redb::Key + 'static,
460 for<'b> V: From<V::SelfType<'b>>;
461}
462
463impl TableBucketMultimapIterExt for ReadTransaction {
464 fn table_bucket_multimap_range<'a, K, V>(
465 &'a self,
466 builder: &'a TableBucketBuilder,
467 base_key: K,
468 start_sequence: u64,
469 end_sequence: u64,
470 ) -> Result<TableBucketRangeMultimapIterator<'a, K, V>, BucketError>
471 where
472 K: redb::Key + Clone + 'static,
473 for<'b> K: Borrow<K::SelfType<'b>>,
474 V: redb::Key + 'static,
475 for<'b> V: From<V::SelfType<'b>>,
476 {
477 TableBucketRangeMultimapIterator::<K, V>::new(
478 self,
479 builder,
480 base_key,
481 start_sequence,
482 end_sequence,
483 )
484 }
485}
486
487#[cfg(test)]
488mod tests {
489 use super::*;
490 use crate::table_buckets::TableBucketBuilder;
491 use redb::{Database, ReadableDatabase};
492 use tempfile::NamedTempFile;
493
494 #[test]
495 fn test_table_bucket_iteration() -> Result<(), Box<dyn std::error::Error>> {
496 let temp_file = NamedTempFile::new()?;
497 let db = Database::create(temp_file.path())?;
498 let builder = TableBucketBuilder::new(100, "table_bucket")?;
499
500 {
501 let write_txn = db.begin_write()?;
502 {
503 {
504 let mut table =
505 write_txn.open_table(builder.table_definition::<u64, String>(0))?;
506 table.insert(123u64, "value_50".to_string())?;
507 table.insert(456u64, "other_50".to_string())?;
508 }
509
510 {
511 let mut table =
512 write_txn.open_table(builder.table_definition::<u64, String>(1))?;
513 table.insert(123u64, "value_150".to_string())?;
514 }
515
516 {
517 let mut table =
518 write_txn.open_table(builder.table_definition::<u64, String>(2))?;
519 table.insert(123u64, "value_250".to_string())?;
520 }
521 }
522 write_txn.commit()?;
523 }
524
525 let read_txn = db.begin_read()?;
526 let iter = TableBucketRangeIterator::new(&read_txn, &builder, 123u64, 0, 299)?;
527 assert_eq!(iter.bucket_range(), (0, 2));
528
529 let values: Vec<String> = iter.collect::<Result<_, _>>()?;
530 assert_eq!(
531 values,
532 vec![
533 "value_50".to_string(),
534 "value_150".to_string(),
535 "value_250".to_string()
536 ]
537 );
538
539 let iter = TableBucketRangeIterator::new(&read_txn, &builder, 123u64, 0, 299)?;
540 let values: Vec<String> = iter.rev().collect::<Result<_, _>>()?;
541 assert_eq!(
542 values,
543 vec![
544 "value_250".to_string(),
545 "value_150".to_string(),
546 "value_50".to_string()
547 ]
548 );
549
550 let iter = read_txn.table_bucket_range(&builder, 456u64, 0, 299)?;
551 let values: Vec<String> = iter.collect::<Result<_, _>>()?;
552 assert_eq!(values, vec!["other_50".to_string()]);
553
554 let invalid_iter =
555 TableBucketRangeIterator::<u64, String>::new(&read_txn, &builder, 123u64, 200, 100);
556 assert!(invalid_iter.is_err());
557
558 Ok(())
559 }
560
561 #[test]
562 fn test_table_bucket_multimap_iteration() -> Result<(), Box<dyn std::error::Error>> {
563 let temp_file = NamedTempFile::new()?;
564 let db = Database::create(temp_file.path())?;
565 let builder = TableBucketBuilder::new(100, "table_bucket_multimap")?;
566
567 {
568 let write_txn = db.begin_write()?;
569 {
570 {
571 let mut table = write_txn
572 .open_multimap_table(builder.multimap_table_definition::<u64, u64>(0))?;
573 table.insert(123u64, 10u64)?;
574 table.insert(123u64, 20u64)?;
575 table.insert(456u64, 99u64)?;
576 table.insert(456u64, 100u64)?;
577 }
578
579 {
580 let mut table = write_txn
581 .open_multimap_table(builder.multimap_table_definition::<u64, u64>(1))?;
582 table.insert(123u64, 30u64)?;
583 table.insert(123u64, 40u64)?;
584 }
585 }
586 write_txn.commit()?;
587 }
588
589 let read_txn = db.begin_read()?;
590 let iter = TableBucketRangeMultimapIterator::new(&read_txn, &builder, 123u64, 0, 199)?;
591 assert_eq!(iter.bucket_range(), (0, 1));
592
593 let values: Vec<u64> = iter.collect::<Result<_, _>>()?;
594 assert_eq!(values, vec![10u64, 20u64, 30u64, 40u64]);
595
596 let iter = TableBucketRangeMultimapIterator::new(&read_txn, &builder, 123u64, 0, 199)?;
597 let values: Vec<u64> = iter.rev().collect::<Result<_, _>>()?;
598 assert_eq!(values, vec![40u64, 30u64, 20u64, 10u64]);
599
600 let iter = read_txn.table_bucket_multimap_range(&builder, 456u64, 0, 99)?;
601 let values: Vec<u64> = iter.collect::<Result<_, _>>()?;
602 assert_eq!(values, vec![99u64, 100u64]);
603
604 Ok(())
605 }
606}