1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
use async_trait::async_trait;
use crate::error::SlateDBError;
use crate::types::RowEntry;
use crate::types::{KeyValue, ValueDeletable};
#[derive(Clone, Copy, Debug)]
pub(crate) enum IterationOrder {
Ascending,
#[allow(dead_code)]
Descending,
}
/// Note: this is intentionally its own trait instead of an Iterator<Item=KeyValue>,
/// because next will need to be made async to support SSTs, which are loaded over
/// the network.
/// See: https://github.com/slatedb/slatedb/issues/12
#[async_trait]
pub trait KeyValueIterator: Send + Sync {
/// Performs any expensive initialization required before regular iteration.
///
/// This method should be idempotent and can be called multiple times, only
/// the first initialization should perform expensive operations.
async fn init(&mut self) -> Result<(), SlateDBError>;
/// Returns the next non-deleted key-value pair in the iterator.
async fn next(&mut self) -> Result<Option<KeyValue>, SlateDBError> {
loop {
let entry = self.next_entry().await?;
if let Some(kv) = entry {
match kv.value {
ValueDeletable::Value(v) => {
return Ok(Some(KeyValue {
key: kv.key,
value: v,
}))
}
// next() should only be called at the top level and therefore
// if a merge is returned it's because there was no base value.
// Since the merge is fully resolved at this point we can just
// return it as is
ValueDeletable::Merge(value) => {
return Ok(Some(KeyValue { key: kv.key, value }))
}
ValueDeletable::Tombstone => continue,
}
} else {
return Ok(None);
}
}
}
/// Returns the next entry in the iterator, which may be a key-value pair or
/// a tombstone of a deleted key-value pair.
///
/// Will fail with `SlateDBError::IteratorNotInitialized` if the iterator is
/// not yet initialized.
///
/// NOTE: we don't initialize the iterator when calling next_entry and instead
/// require the caller to explicitly initialize the iterator. This is in order
/// to ensure that optimizations which eagerly initialize the iterator are not
/// lost in a refactor and instead would throw errors.
async fn next_entry(&mut self) -> Result<Option<RowEntry>, SlateDBError>;
/// Seek to the next (inclusive) key
///
/// Will fail with `SlateDBError::IteratorNotInitialized` if the iterator is
/// not yet initialized.
///
/// NOTE: we don't initialize the iterator when calling seek and instead
/// require the caller to explicitly initialize the iterator. This is in order
/// to ensure that optimizations which eagerly initialize the iterator are not
/// lost in a refactor and instead would throw errors.
async fn seek(&mut self, next_key: &[u8]) -> Result<(), SlateDBError>;
}
/// Initializes the iterator contained in the option, propagating `None` unchanged.
pub async fn init_optional_iterator<T: KeyValueIterator>(
iter: Option<T>,
) -> Result<Option<T>, SlateDBError> {
match iter {
Some(mut it) => {
it.init().await?;
Ok(Some(it))
}
None => Ok(None),
}
}
#[async_trait]
impl<'a> KeyValueIterator for Box<dyn KeyValueIterator + 'a> {
async fn init(&mut self) -> Result<(), SlateDBError> {
self.as_mut().init().await
}
async fn next(&mut self) -> Result<Option<KeyValue>, SlateDBError> {
self.as_mut().next().await
}
async fn next_entry(&mut self) -> Result<Option<RowEntry>, SlateDBError> {
self.as_mut().next_entry().await
}
async fn seek(&mut self, next_key: &[u8]) -> Result<(), SlateDBError> {
self.as_mut().seek(next_key).await
}
}
pub(crate) struct EmptyIterator;
impl EmptyIterator {
pub(crate) fn new() -> Self {
Self
}
}
#[async_trait]
impl KeyValueIterator for EmptyIterator {
async fn init(&mut self) -> Result<(), SlateDBError> {
Ok(())
}
async fn next_entry(&mut self) -> Result<Option<RowEntry>, SlateDBError> {
Ok(None)
}
async fn seek(&mut self, _next_key: &[u8]) -> Result<(), SlateDBError> {
Ok(())
}
}