1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
use async_trait::async_trait;
use crate::error::SlateDBError;
use crate::types::RowEntry;
#[derive(Clone, Copy, Debug)]
pub enum IterationOrder {
Ascending,
Descending,
}
/// Note: this is intentionally its own trait instead of an Iterator<Item=KeyValue>,
/// because next will need to be made async to support SSTs, which are loaded over
/// the network.
/// See: https://github.com/slatedb/slatedb/issues/12
#[async_trait]
pub(crate) trait RowEntryIterator: Send + Sync {
/// Performs any expensive initialization required before regular iteration.
///
/// This method should be idempotent and can be called multiple times, only
/// the first initialization should perform expensive operations.
async fn init(&mut self) -> Result<(), SlateDBError>;
/// Returns the next entry in the iterator, which may be a key-value pair or
/// a tombstone of a deleted key-value pair.
///
/// Will fail with `SlateDBError::IteratorNotInitialized` if the iterator is
/// not yet initialized.
///
/// NOTE: we don't initialize the iterator when calling next and instead
/// require the caller to explicitly initialize the iterator. This is in order
/// to ensure that optimizations which eagerly initialize the iterator are not
/// lost in a refactor and instead would throw errors.
async fn next(&mut self) -> Result<Option<RowEntry>, SlateDBError>;
/// Seek to the next (inclusive) key
///
/// Will fail with `SlateDBError::IteratorNotInitialized` if the iterator is
/// not yet initialized.
///
/// NOTE: we don't initialize the iterator when calling seek and instead
/// require the caller to explicitly initialize the iterator. This is in order
/// to ensure that optimizations which eagerly initialize the iterator are not
/// lost in a refactor and instead would throw errors.
async fn seek(&mut self, next_key: &[u8]) -> Result<(), SlateDBError>;
}
/// Iterator trait that tracks bytes processed for progress reporting.
///
/// This extends `RowEntryIterator` with progress tracking capability.
/// Only iterators used in the compaction pipeline implement this trait.
/// The bottom-most iterator (MergeIterator) tracks actual bytes, while
/// wrapper iterators delegate to their inner iterator.
pub(crate) trait TrackedRowEntryIterator: RowEntryIterator {
/// Returns the total bytes processed (key + value length) by this iterator.
fn bytes_processed(&self) -> u64;
}
/// Initializes the iterator contained in the option, propagating `None` unchanged.
pub(crate) async fn init_optional_iterator<T: RowEntryIterator>(
iter: Option<T>,
) -> Result<Option<T>, SlateDBError> {
match iter {
Some(mut it) => {
it.init().await?;
Ok(Some(it))
}
None => Ok(None),
}
}
#[async_trait]
impl<'a> RowEntryIterator for Box<dyn RowEntryIterator + 'a> {
async fn init(&mut self) -> Result<(), SlateDBError> {
self.as_mut().init().await
}
async fn next(&mut self) -> Result<Option<RowEntry>, SlateDBError> {
self.as_mut().next().await
}
async fn seek(&mut self, next_key: &[u8]) -> Result<(), SlateDBError> {
self.as_mut().seek(next_key).await
}
}
#[async_trait]
impl<'a> RowEntryIterator for Box<dyn TrackedRowEntryIterator + 'a> {
async fn init(&mut self) -> Result<(), SlateDBError> {
self.as_mut().init().await
}
async fn next(&mut self) -> Result<Option<RowEntry>, SlateDBError> {
self.as_mut().next().await
}
async fn seek(&mut self, next_key: &[u8]) -> Result<(), SlateDBError> {
self.as_mut().seek(next_key).await
}
}
impl<'a> TrackedRowEntryIterator for Box<dyn TrackedRowEntryIterator + 'a> {
fn bytes_processed(&self) -> u64 {
self.as_ref().bytes_processed()
}
}
pub(crate) struct EmptyIterator;
impl EmptyIterator {
pub(crate) fn new() -> Self {
Self
}
}
#[async_trait]
impl RowEntryIterator for EmptyIterator {
async fn init(&mut self) -> Result<(), SlateDBError> {
Ok(())
}
async fn next(&mut self) -> Result<Option<RowEntry>, SlateDBError> {
Ok(None)
}
async fn seek(&mut self, _next_key: &[u8]) -> Result<(), SlateDBError> {
Ok(())
}
}