1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
use super::tx::Transaction;
use super::Key;
use super::Val;
use crate::err::Error;
use futures::stream::Stream;
use futures::Future;
use futures::FutureExt;
use std::collections::VecDeque;
use std::ops::Range;
use std::pin::Pin;
use std::task::{Context, Poll};
pub(super) struct Scanner<'a, I> {
/// The store which started this range scan
store: &'a Transaction,
/// The number of keys to fetch at once
batch: u32,
// The key range for this range scan
range: Range<Key>,
// The results from the last range scan
results: VecDeque<I>,
#[allow(clippy::type_complexity)]
/// The currently running future to be polled
#[cfg(not(target_family = "wasm"))]
future: Option<Pin<Box<dyn Future<Output = Result<Vec<I>, Error>> + 'a + Send>>>,
#[cfg(target_family = "wasm")]
future: Option<Pin<Box<dyn Future<Output = Result<Vec<I>, Error>> + 'a>>>,
/// Whether this stream should try to fetch more
exhausted: bool,
/// Version as timestamp, 0 means latest.
version: Option<u64>,
}
impl<'a, I> Scanner<'a, I> {
pub fn new(
store: &'a Transaction,
batch: u32,
range: Range<Key>,
version: Option<u64>,
) -> Self {
Scanner {
store,
batch,
range,
future: None,
results: VecDeque::new(),
exhausted: false,
version,
}
}
}
impl Stream for Scanner<'_, (Key, Val)> {
type Item = Result<(Key, Val), Error>;
fn poll_next(
mut self: Pin<&mut Self>,
cx: &mut Context,
) -> Poll<Option<Result<(Key, Val), Error>>> {
// If we have results, return the first one
if let Some(v) = self.results.pop_front() {
return Poll::Ready(Some(Ok(v)));
}
// If we won't fetch more results then exit
if self.exhausted {
return Poll::Ready(None);
}
// Check if there is no pending future task
if self.future.is_none() {
// Clone the range to use when scanning
let range = self.range.clone();
// Prepare a future to scan for results
self.future = Some(Box::pin(self.store.scan(range, self.batch, self.version)));
}
// Try to resolve the future
match self.future.as_mut().unwrap().poll_unpin(cx) {
// The future has now completed fully
Poll::Ready(result) => {
// Drop the completed asynchronous future
self.future = None;
// Check the result of the finished future
match result {
// The range was fetched successfully
Ok(v) => match v.is_empty() {
// There are no more results to stream
true => {
// Mark this stream as complete
Poll::Ready(None)
}
// There are results which need streaming
false => {
// We fetched the last elements in the range
if v.len() < self.batch as usize {
self.exhausted = true;
}
// Get the last element of the results
let last = v.last().ok_or_else(|| {
fail!("Expected the last key-value pair to not be none")
})?;
// Start the next scan from the last result
self.range.start.clone_from(&last.0);
// Ensure we don't see the last result again
self.range.start.push(0xff);
// Store the fetched range results
self.results.extend(v);
// Remove the first result to return
let item = self.results.pop_front().unwrap();
// Return the first result
Poll::Ready(Some(Ok(item)))
}
},
// Return the received error
Err(error) => Poll::Ready(Some(Err(error))),
}
}
// The future has not yet completed
Poll::Pending => Poll::Pending,
}
}
}
impl Stream for Scanner<'_, Key> {
type Item = Result<Key, Error>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Result<Key, Error>>> {
// If we have results, return the first one
if let Some(v) = self.results.pop_front() {
return Poll::Ready(Some(Ok(v)));
}
// If we won't fetch more results then exit
if self.exhausted {
return Poll::Ready(None);
}
// Check if there is no pending future task
if self.future.is_none() {
// Clone the range to use when scanning
let range = self.range.clone();
// Prepare a future to scan for results
self.future = Some(Box::pin(self.store.keys(range, self.batch, self.version)));
}
// Try to resolve the future
match self.future.as_mut().unwrap().poll_unpin(cx) {
// The future has now completed fully
Poll::Ready(result) => {
// Drop the completed asynchronous future
self.future = None;
// Check the result of the finished future
match result {
// The range was fetched successfully
Ok(v) => match v.is_empty() {
// There are no more results to stream
true => {
// Mark this stream as complete
Poll::Ready(None)
}
// There are results which need streaming
false => {
// We fetched the last elements in the range
if v.len() < self.batch as usize {
self.exhausted = true;
}
// Get the last element of the results
let last = v.last().ok_or_else(|| {
fail!("Expected the last key-value pair to not be none")
})?;
// Start the next scan from the last result
self.range.start.clone_from(last);
// Ensure we don't see the last result again
self.range.start.push(0xff);
// Store the fetched range results
self.results.extend(v);
// Remove the first result to return
let item = self.results.pop_front().unwrap();
// Return the first result
Poll::Ready(Some(Ok(item)))
}
},
// Return the received error
Err(error) => Poll::Ready(Some(Err(error))),
}
}
// The future has not yet completed
Poll::Pending => Poll::Pending,
}
}
}