Skip to main content

surrealdb_core/kvs/
scanner.rs

1use std::ops::Range;
2use std::pin::Pin;
3use std::task::{Context, Poll};
4
5use futures::stream::Stream;
6use futures::{Future, FutureExt};
7
8use super::api::ScanLimit;
9use super::tr::Transactor;
10use super::{Key, Result, Val};
11use crate::cnf::NORMAL_FETCH_SIZE;
12
13#[cfg(not(target_family = "wasm"))]
14type FutureResult<'a, I> = Pin<Box<dyn Future<Output = Result<Vec<I>>> + 'a + Send>>;
15
16#[cfg(target_family = "wasm")]
17type FutureResult<'a, I> = Pin<Box<dyn Future<Output = Result<Vec<I>>> + 'a>>;
18
19/// The direction of a scan.
20#[derive(Clone, Copy)]
21pub enum Direction {
22	Forward,
23	Backward,
24}
25
26/// A batch scanner that streams batches of keys or key-value pairs from a range.
27///
28/// The scanner fetches data in batches with configurable behavior:
29/// - Initial batch: fetches up to NORMAL_FETCH_SIZE items (default 500)
30/// - Subsequent batches: fetches up to 4 MiB of data
31/// - Prefetching: optionally prefetches the next batch concurrently
32///
33/// Use the builder methods to configure the scanner before using it as a stream:
34/// ```ignore
35/// let scanner = Scanner::<Key>::new(&tx, range, limit, Direction::Forward)
36///     .version(1234567890)
37///     .initial_batch_size(ScanLimit::Count(50))
38///     .subsequent_batch_size(ScanLimit::Bytes(8 * 1024 * 1024))
39///     .prefetch(false);
40/// ```
41pub struct Scanner<'a, I> {
42	/// The store which started this range scan
43	store: &'a Transactor,
44	/// The key range for this range scan
45	range: Range<Key>,
46	/// The currently running future to be polled
47	future: Option<FutureResult<'a, I>>,
48	/// A prefetched result ready to be returned
49	prefetched: Option<Result<Vec<I>>>,
50	/// Whether this is the first batch (uses count-based limit)
51	first_batch: bool,
52	/// Whether this stream should try to fetch more
53	exhausted: bool,
54	/// An optional maximum number of keys to scan
55	limit: Option<usize>,
56	/// The number of entries to skip (applied to first batch only)
57	skip: u32,
58	/// The scan direction
59	dir: Direction,
60	/// Version as timestamp, 0 means latest.
61	version: Option<u64>,
62	/// Whether to enable prefetching of the next batch
63	enable_prefetch: bool,
64	/// The initial batch size (default: NORMAL_FETCH_SIZE, typically 500 items)
65	initial_batch_size: ScanLimit,
66	/// The subsequent batch size (default: 16 MiB bytes)
67	subsequent_batch_size: ScanLimit,
68}
69
70impl<'a, I> Scanner<'a, I> {
71	/// Creates a new Scanner with default configuration.
72	pub fn new(
73		store: &'a Transactor,
74		range: Range<Key>,
75		limit: Option<usize>,
76		dir: Direction,
77	) -> Self {
78		// Check if the range is exhausted
79		let exhausted = range.start >= range.end;
80		// Initialize the scanner with defaults.
81		// The initial batch size uses NORMAL_FETCH_SIZE (default 500) to
82		// avoid under-fetching on the first round-trip, which is especially
83		// important for remote backends like TiKV where each scan is a
84		// network call.
85		Scanner {
86			store,
87			range,
88			limit,
89			dir,
90			skip: 0,
91			exhausted,
92			future: None,
93			prefetched: None,
94			first_batch: true,
95			version: None,
96			enable_prefetch: false,
97			initial_batch_size: ScanLimit::Count(*NORMAL_FETCH_SIZE),
98			subsequent_batch_size: ScanLimit::Bytes(4 * 1024 * 1024),
99		}
100	}
101
102	/// Set the number of entries to skip (applied to first batch only)
103	pub fn skip(mut self, skip: u32) -> Self {
104		self.skip = skip;
105		self
106	}
107
108	/// Set the version timestamp for the scan.
109	///
110	/// When set, the scanner will read data as it existed at the specified version.
111	pub fn version(mut self, version: u64) -> Self {
112		self.version = Some(version);
113		self
114	}
115
116	/// Enable or disable background prefetching.
117	///
118	/// When enabled, the scanner will start fetching the next batch while the current
119	/// batch is being processed, improving throughput at the cost of additional resources.
120	/// Default: false
121	pub fn prefetch(mut self, enabled: bool) -> Self {
122		self.enable_prefetch = enabled;
123		self
124	}
125
126	/// Set the initial batch size for the first batch.
127	///
128	/// The first batch fetched will contain up to this many items or bytes of data.
129	/// Default: NORMAL_FETCH_SIZE (500 items)
130	pub fn initial_batch_size(mut self, size: ScanLimit) -> Self {
131		self.initial_batch_size = size;
132		self
133	}
134
135	/// Set the subsequent batch size for subsequent batches.
136	///
137	/// After the first batch, subsequent batches will fetch up to this many items or bytes of data.
138	/// Default: 4 MiB
139	pub fn subsequent_batch_size(mut self, size: ScanLimit) -> Self {
140		self.subsequent_batch_size = size;
141		self
142	}
143
144	/// Updates the range for the next batch based on the last key fetched.
145	#[inline]
146	fn update_range(&mut self, last_key: &Key) {
147		match self.dir {
148			Direction::Forward => {
149				self.range.start.clone_from(last_key);
150				self.range.start.push(0xff);
151			}
152			Direction::Backward => {
153				self.range.end.clone_from(last_key);
154			}
155		}
156	}
157
158	/// Calculate the scan limit for the next batch.
159	#[inline]
160	fn next_scan_limit(&self) -> ScanLimit {
161		// Check if this is the first batch
162		let batch_size = if self.first_batch {
163			self.initial_batch_size
164		} else {
165			self.subsequent_batch_size
166		};
167		// Apply the limit to the batch size
168		match batch_size {
169			ScanLimit::Count(c) => match self.limit {
170				Some(l) => ScanLimit::Count(c.min(l as u32)),
171				None => ScanLimit::Count(c),
172			},
173			ScanLimit::Bytes(b) => match self.limit {
174				Some(l) => ScanLimit::BytesOrCount(b, l as u32),
175				None => ScanLimit::Bytes(b),
176			},
177			ScanLimit::BytesOrCount(b, c) => match self.limit {
178				Some(l) => ScanLimit::BytesOrCount(b, c.min(l as u32)),
179				None => ScanLimit::BytesOrCount(b, c),
180			},
181		}
182	}
183
184	#[inline]
185	fn start_prefetch<S>(&mut self, cx: &mut Context, scan: S)
186	where
187		S: Fn(Range<Key>, ScanLimit, u32) -> FutureResult<'a, I>,
188	{
189		if self.enable_prefetch && !self.exhausted {
190			// Calculate the limit for the next batch
191			let limit = self.next_scan_limit();
192			// Get the skip value for the first batch
193			let skip = self.skip;
194			// Setup the next range scan
195			let mut future = scan(self.range.clone(), limit, skip);
196			// Poll the future to kick off I/O
197			match future.poll_unpin(cx) {
198				Poll::Pending => {
199					// I/O started, store for later
200					self.future = Some(future);
201				}
202				Poll::Ready(result) => {
203					// We received a result immediately
204					self.prefetched = Some(result);
205				}
206			}
207		}
208	}
209
210	/// Process a completed fetch result, updating internal state.
211	/// Returns the batch if successful, or None if the stream is exhausted.
212	fn process_result<K>(&mut self, result: Result<Vec<I>>, key: &K) -> Poll<Option<Result<Vec<I>>>>
213	where
214		K: Fn(&I) -> &Key,
215	{
216		match result {
217			// There were some results returned
218			Ok(batch) if !batch.is_empty() => {
219				// Update limit
220				if let Some(l) = &mut self.limit {
221					*l = l.saturating_sub(batch.len());
222					if *l == 0 {
223						self.exhausted = true;
224					}
225				}
226				// Fetch the limiter for the next batch
227				let limiter = if self.first_batch {
228					self.initial_batch_size
229				} else {
230					self.subsequent_batch_size
231				};
232				// Check if the batch is exhausted
233				if let ScanLimit::Count(l) = limiter
234					&& batch.len() < l as usize
235				{
236					self.exhausted = true;
237				}
238				// Get the last key to update range for the next batch
239				let last = batch.last().expect("batch should not be empty");
240				let last_key = key(last);
241				// Update the range for the next batch
242				self.update_range(last_key);
243				// Mark that we've fetched the first batch
244				self.first_batch = false;
245				// Reset skip after the first batch
246				self.skip = 0;
247				// Return the batch
248				Poll::Ready(Some(Ok(batch)))
249			}
250			// There were no results returned
251			Ok(_) => {
252				// Empty result means we've reached the end
253				self.exhausted = true;
254				// Return no more results
255				Poll::Ready(None)
256			}
257			// We received an error
258			Err(error) => {
259				// An error means we've reached the end
260				self.exhausted = true;
261				// Return the error
262				Poll::Ready(Some(Err(error)))
263			}
264		}
265	}
266
267	fn next_poll<S, K>(&mut self, cx: &mut Context, scan: S, key: K) -> Poll<Option<Result<Vec<I>>>>
268	where
269		S: Fn(Range<Key>, ScanLimit, u32) -> FutureResult<'a, I>,
270		K: Fn(&I) -> &Key,
271	{
272		// Return early if exhausted
273		if self.exhausted {
274			return Poll::Ready(None);
275		}
276		// Check if we have a prefetched result ready
277		if let Some(result) = self.prefetched.take() {
278			// Process the last fetches result batch
279			let poll = self.process_result(result, &key);
280			// If prefetch is enabled, start the next scan
281			self.start_prefetch(cx, &scan);
282			// Return the result
283			return poll;
284		}
285		// Calculate the limit for this fetch
286		let limit = self.next_scan_limit();
287		// Get the skip value (only applies to first batch)
288		let skip = self.skip;
289		// Fetch or start a new fetch if none is pending
290		let future = self.future.get_or_insert_with(|| scan(self.range.clone(), limit, skip));
291		// Try to resolve the main future
292		match future.poll_unpin(cx) {
293			// The future is pending
294			Poll::Pending => Poll::Pending,
295			// The future is ready
296			Poll::Ready(result) => {
297				// Drop the completed future
298				self.future = None;
299				// Process the last fetches result batch
300				let poll = self.process_result(result, &key);
301				// If prefetch is enabled, start the next scan
302				self.start_prefetch(cx, &scan);
303				// Return the result
304				poll
305			}
306		}
307	}
308}
309
310impl Stream for Scanner<'_, Key> {
311	type Item = Result<Vec<Key>>;
312	fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Result<Vec<Key>>>> {
313		let (store, version) = (self.store, self.version);
314		match self.dir {
315			Direction::Forward => self.next_poll(
316				cx,
317				move |range, limit, skip| Box::pin(store.keys(range, limit, skip, version)),
318				|v| v,
319			),
320			Direction::Backward => self.next_poll(
321				cx,
322				move |range, limit, skip| Box::pin(store.keysr(range, limit, skip, version)),
323				|v| v,
324			),
325		}
326	}
327}
328
329impl Stream for Scanner<'_, (Key, Val)> {
330	type Item = Result<Vec<(Key, Val)>>;
331	fn poll_next(
332		mut self: Pin<&mut Self>,
333		cx: &mut Context,
334	) -> Poll<Option<Result<Vec<(Key, Val)>>>> {
335		let (store, version) = (self.store, self.version);
336		match self.dir {
337			Direction::Forward => self.next_poll(
338				cx,
339				move |range, limit, skip| Box::pin(store.scan(range, limit, skip, version)),
340				|v| &v.0,
341			),
342			Direction::Backward => self.next_poll(
343				cx,
344				move |range, limit, skip| Box::pin(store.scanr(range, limit, skip, version)),
345				|v| &v.0,
346			),
347		}
348	}
349}