1use std::ops::Range;
2use std::pin::Pin;
3use std::task::{Context, Poll};
4
5use futures::stream::Stream;
6use futures::{Future, FutureExt};
7
8use super::api::ScanLimit;
9use super::tr::Transactor;
10use super::{Key, Result, Val};
11use crate::cnf::NORMAL_FETCH_SIZE;
12
13#[cfg(not(target_family = "wasm"))]
14type FutureResult<'a, I> = Pin<Box<dyn Future<Output = Result<Vec<I>>> + 'a + Send>>;
15
16#[cfg(target_family = "wasm")]
17type FutureResult<'a, I> = Pin<Box<dyn Future<Output = Result<Vec<I>>> + 'a>>;
18
19#[derive(Clone, Copy)]
21pub enum Direction {
22 Forward,
23 Backward,
24}
25
26pub struct Scanner<'a, I> {
42 store: &'a Transactor,
44 range: Range<Key>,
46 future: Option<FutureResult<'a, I>>,
48 prefetched: Option<Result<Vec<I>>>,
50 first_batch: bool,
52 exhausted: bool,
54 limit: Option<usize>,
56 skip: u32,
58 dir: Direction,
60 version: Option<u64>,
62 enable_prefetch: bool,
64 initial_batch_size: ScanLimit,
66 subsequent_batch_size: ScanLimit,
68}
69
70impl<'a, I> Scanner<'a, I> {
71 pub fn new(
73 store: &'a Transactor,
74 range: Range<Key>,
75 limit: Option<usize>,
76 dir: Direction,
77 ) -> Self {
78 let exhausted = range.start >= range.end;
80 Scanner {
86 store,
87 range,
88 limit,
89 dir,
90 skip: 0,
91 exhausted,
92 future: None,
93 prefetched: None,
94 first_batch: true,
95 version: None,
96 enable_prefetch: false,
97 initial_batch_size: ScanLimit::Count(*NORMAL_FETCH_SIZE),
98 subsequent_batch_size: ScanLimit::Bytes(4 * 1024 * 1024),
99 }
100 }
101
102 pub fn skip(mut self, skip: u32) -> Self {
104 self.skip = skip;
105 self
106 }
107
108 pub fn version(mut self, version: u64) -> Self {
112 self.version = Some(version);
113 self
114 }
115
116 pub fn prefetch(mut self, enabled: bool) -> Self {
122 self.enable_prefetch = enabled;
123 self
124 }
125
126 pub fn initial_batch_size(mut self, size: ScanLimit) -> Self {
131 self.initial_batch_size = size;
132 self
133 }
134
135 pub fn subsequent_batch_size(mut self, size: ScanLimit) -> Self {
140 self.subsequent_batch_size = size;
141 self
142 }
143
144 #[inline]
146 fn update_range(&mut self, last_key: &Key) {
147 match self.dir {
148 Direction::Forward => {
149 self.range.start.clone_from(last_key);
150 self.range.start.push(0xff);
151 }
152 Direction::Backward => {
153 self.range.end.clone_from(last_key);
154 }
155 }
156 }
157
158 #[inline]
160 fn next_scan_limit(&self) -> ScanLimit {
161 let batch_size = if self.first_batch {
163 self.initial_batch_size
164 } else {
165 self.subsequent_batch_size
166 };
167 match batch_size {
169 ScanLimit::Count(c) => match self.limit {
170 Some(l) => ScanLimit::Count(c.min(l as u32)),
171 None => ScanLimit::Count(c),
172 },
173 ScanLimit::Bytes(b) => match self.limit {
174 Some(l) => ScanLimit::BytesOrCount(b, l as u32),
175 None => ScanLimit::Bytes(b),
176 },
177 ScanLimit::BytesOrCount(b, c) => match self.limit {
178 Some(l) => ScanLimit::BytesOrCount(b, c.min(l as u32)),
179 None => ScanLimit::BytesOrCount(b, c),
180 },
181 }
182 }
183
184 #[inline]
185 fn start_prefetch<S>(&mut self, cx: &mut Context, scan: S)
186 where
187 S: Fn(Range<Key>, ScanLimit, u32) -> FutureResult<'a, I>,
188 {
189 if self.enable_prefetch && !self.exhausted {
190 let limit = self.next_scan_limit();
192 let skip = self.skip;
194 let mut future = scan(self.range.clone(), limit, skip);
196 match future.poll_unpin(cx) {
198 Poll::Pending => {
199 self.future = Some(future);
201 }
202 Poll::Ready(result) => {
203 self.prefetched = Some(result);
205 }
206 }
207 }
208 }
209
210 fn process_result<K>(&mut self, result: Result<Vec<I>>, key: &K) -> Poll<Option<Result<Vec<I>>>>
213 where
214 K: Fn(&I) -> &Key,
215 {
216 match result {
217 Ok(batch) if !batch.is_empty() => {
219 if let Some(l) = &mut self.limit {
221 *l = l.saturating_sub(batch.len());
222 if *l == 0 {
223 self.exhausted = true;
224 }
225 }
226 let limiter = if self.first_batch {
228 self.initial_batch_size
229 } else {
230 self.subsequent_batch_size
231 };
232 if let ScanLimit::Count(l) = limiter
234 && batch.len() < l as usize
235 {
236 self.exhausted = true;
237 }
238 let last = batch.last().expect("batch should not be empty");
240 let last_key = key(last);
241 self.update_range(last_key);
243 self.first_batch = false;
245 self.skip = 0;
247 Poll::Ready(Some(Ok(batch)))
249 }
250 Ok(_) => {
252 self.exhausted = true;
254 Poll::Ready(None)
256 }
257 Err(error) => {
259 self.exhausted = true;
261 Poll::Ready(Some(Err(error)))
263 }
264 }
265 }
266
267 fn next_poll<S, K>(&mut self, cx: &mut Context, scan: S, key: K) -> Poll<Option<Result<Vec<I>>>>
268 where
269 S: Fn(Range<Key>, ScanLimit, u32) -> FutureResult<'a, I>,
270 K: Fn(&I) -> &Key,
271 {
272 if self.exhausted {
274 return Poll::Ready(None);
275 }
276 if let Some(result) = self.prefetched.take() {
278 let poll = self.process_result(result, &key);
280 self.start_prefetch(cx, &scan);
282 return poll;
284 }
285 let limit = self.next_scan_limit();
287 let skip = self.skip;
289 let future = self.future.get_or_insert_with(|| scan(self.range.clone(), limit, skip));
291 match future.poll_unpin(cx) {
293 Poll::Pending => Poll::Pending,
295 Poll::Ready(result) => {
297 self.future = None;
299 let poll = self.process_result(result, &key);
301 self.start_prefetch(cx, &scan);
303 poll
305 }
306 }
307 }
308}
309
310impl Stream for Scanner<'_, Key> {
311 type Item = Result<Vec<Key>>;
312 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Result<Vec<Key>>>> {
313 let (store, version) = (self.store, self.version);
314 match self.dir {
315 Direction::Forward => self.next_poll(
316 cx,
317 move |range, limit, skip| Box::pin(store.keys(range, limit, skip, version)),
318 |v| v,
319 ),
320 Direction::Backward => self.next_poll(
321 cx,
322 move |range, limit, skip| Box::pin(store.keysr(range, limit, skip, version)),
323 |v| v,
324 ),
325 }
326 }
327}
328
329impl Stream for Scanner<'_, (Key, Val)> {
330 type Item = Result<Vec<(Key, Val)>>;
331 fn poll_next(
332 mut self: Pin<&mut Self>,
333 cx: &mut Context,
334 ) -> Poll<Option<Result<Vec<(Key, Val)>>>> {
335 let (store, version) = (self.store, self.version);
336 match self.dir {
337 Direction::Forward => self.next_poll(
338 cx,
339 move |range, limit, skip| Box::pin(store.scan(range, limit, skip, version)),
340 |v| &v.0,
341 ),
342 Direction::Backward => self.next_poll(
343 cx,
344 move |range, limit, skip| Box::pin(store.scanr(range, limit, skip, version)),
345 |v| &v.0,
346 ),
347 }
348 }
349}