1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
use crate::values::ValueLog;
use crate::sorted_table::TableIterator;
use crate::memtable::MemtableIterator;
use crate::KV_Trait;
use bincode::Options;
use std::future::Future;
use futures::stream::Stream;
use std::task::{Context, Poll};
use std::marker::PhantomData;
use std::sync::Arc;
use std::pin::Pin;
use crate::sync_iter::DbIterator as SyncIter;
type IterFuture<K, V> = dyn Future<Output = (DbIteratorInner<K,V>, Option<(K,V)>)>+Send;
pub struct DbIterator<K: KV_Trait, V: KV_Trait> {
state: Option<Pin<Box<IterFuture<K,V>>>>
}
impl<K: KV_Trait, V: KV_Trait> DbIterator<K, V> {
pub(crate) fn new(mem_iters: Vec<MemtableIterator>, table_iters: Vec<TableIterator>,
value_log: Arc<ValueLog>) -> Self {
let inner = DbIteratorInner::new(mem_iters, table_iters, value_log);
let state = Box::pin(DbIteratorInner::next(inner));
Self{ state: Some(state) }
}
}
impl<K: KV_Trait, V: KV_Trait> Stream for DbIterator<K, V> {
type Item = (K, V);
fn poll_next(mut self: Pin<&mut Self>, ctx: &mut Context) -> Poll<Option<Self::Item>> {
let (inner, res) = if let Some(mut fut) = self.state.take() {
match Future::poll(fut.as_mut(), ctx) {
Poll::Pending => {
self.state = Some(fut);
return Poll::Pending
}
Poll::Ready((inner, res)) => (inner, res),
}
} else {
return Poll::Ready(None)
};
if res.is_some() {
self.state = Some(Box::pin(DbIteratorInner::next(inner)));
} else {
self.state = None;
}
Poll::Ready(res)
}
}
struct DbIteratorInner<K: KV_Trait, V: KV_Trait> {
_marker: PhantomData<fn(K,V)>,
last_key: Option<Vec<u8>>,
mem_iters: Vec<MemtableIterator>,
table_iters: Vec<TableIterator>,
value_log: Arc<ValueLog>,
}
impl<K: KV_Trait, V: KV_Trait> DbIteratorInner<K, V> {
fn new(mem_iters: Vec<MemtableIterator>, table_iters: Vec<TableIterator>
, value_log: Arc<ValueLog>) -> Self {
Self{
mem_iters, table_iters, value_log,
last_key: None, _marker: PhantomData
}
}
async fn next(mut slf: Self) -> (Self, Option<(K, V)>) {
let mut min_kv = None;
let mut is_pending = true;
for iter in slf.mem_iters.iter_mut() {
SyncIter::<K,V>::parse_iter(&slf.last_key, iter, &mut min_kv);
}
for iter in slf.table_iters.iter_mut() {
if SyncIter::<K,V>::parse_iter(&slf.last_key, iter, &mut min_kv) {
is_pending = false;
}
}
if let Some((key, entry)) = min_kv {
let res_key = super::get_encoder().deserialize(&key).unwrap();
let val_ref = &entry.value_ref;
let res_val = if is_pending {
slf.value_log.get_pending(*val_ref).await
} else {
slf.value_log.get(*val_ref).await
};
slf.last_key = Some(key.clone());
(slf, Some((res_key, res_val)))
} else {
(slf, None)
}
}
}