noxu_db/disk_ordered_cursor.rs
1//! Disk-ordered cursor for high-throughput unordered scans.
2//!
3//! A [`DiskOrderedCursor`] returns user records in approximate **on-disk
4//! order** — the order in which their LN log entries appear in the
5//! write-ahead log — rather than in B-tree key order. This enables bulk
6//! export, full-DB replication catch-up, and analytical scans without paying
7//! the cost of B-tree traversal or lock acquisition.
8//!
9//! # Trade-offs vs [`crate::Cursor`]
10//!
11//! | Property | `Cursor` | `DiskOrderedCursor` |
12//! |---|---|---|
13//! | Order of returned keys | Key order (B-tree) | Disk order (log append order) |
14//! | Lock acquisition | Yes (per record) | **No** |
15//! | Isolation | Per-txn isolation | At best `READ_UNCOMMITTED` |
16//! | Throughput | Limited by random I/O on B-tree pages | Sequential log read |
17//! | Deduplication of stale versions | Yes — only the latest committed value is returned | **No** by default — the same key may appear multiple times if it was updated, and a deleted key may still appear |
18//!
19//! # Consistency guarantees
20//!
21//! The records returned by a `DiskOrderedCursor` correspond to the state of
22//! the database at the moment each LN was written to the log, which may
23//! include uncommitted writes. Concurrent inserts/updates/deletes performed
24//! during the scan are **not** required to be visible. Applications that
25//! need a transactionally-consistent snapshot should drain in-flight writers
26//! before opening the cursor (e.g. by holding a quiesce barrier).
27//!
28//! # Stale versions
29//!
30//! By default the cursor matches BDB JE: every LN that survives in the log
31//! and belongs to one of the targeted databases is yielded, even if a newer
32//! version of the same key follows. This is the JE-correct behaviour for
33//! bulk-export workflows that want to observe every committed mutation.
34//! Set [`DiskOrderedCursorConfig::dedup_keys`] to `true` to filter stale
35//! versions client-side (see field docs for caveats).
36//!
37//! # Producer thread
38//!
39//! Opening a cursor spawns a single background producer thread that reads
40//! the log files sequentially and pushes decoded `(db_idx, key, data)`
41//! tuples through a bounded channel. The thread is joined when the cursor
42//! is dropped or [`DiskOrderedCursor::close`] is called.
43
44use std::marker::PhantomData;
45
46use noxu_dbi::DiskOrderedCursorImpl;
47
48use crate::database::Database;
49use crate::database_entry::DatabaseEntry;
50use crate::error::{NoxuError, Result};
51use crate::operation_status::OperationStatus;
52
53/// Configuration for a [`DiskOrderedCursor`].
54///
55/// Mirrors the field set of `DiskOrderedCursorConfig` but uses
56/// Rust-idiomatic builder methods. All fields have sensible defaults
57/// matching JE.
58#[derive(Debug, Clone)]
59pub struct DiskOrderedCursorConfig {
60 /// Maximum number of `(key, data)` entries the producer thread may queue
61 /// before blocking.
62 ///
63 /// Default: `1000` (matches JE's `DOS_PRODUCER_QUEUE_SIZE`).
64 pub queue_size: usize,
65 /// Maximum number of LSNs the producer accumulates before yielding a
66 /// batch downstream. Currently advisory — the producer streams entries
67 /// one at a time; this field is preserved for JE shape compatibility
68 /// and future batched-fetch support.
69 ///
70 /// Default: `usize::MAX`.
71 pub lsn_batch_size: usize,
72 /// Maximum number of bytes the in-flight queue may occupy before the
73 /// producer thread blocks. Approximate — measured as the sum of key +
74 /// data lengths of buffered entries.
75 ///
76 /// Default: `usize::MAX`.
77 pub internal_memory_limit: usize,
78 /// If `true`, only keys are read from the log; data is left empty.
79 /// Slightly faster because the on-disk LN value bytes are skipped.
80 ///
81 /// Default: `false`.
82 pub keys_only: bool,
83 /// JE legacy flag — scan only BIN entries. Honoured as an alias for
84 /// `keys_only` in this implementation because Noxu's log iterator
85 /// always emits LN payloads (no separate BIN-only scan path is
86 /// available at this layer).
87 ///
88 /// Default: `false`.
89 pub bins_only: bool,
90 /// JE legacy flag — count records without materialising key/data.
91 /// Currently honoured as `keys_only` plus a discard policy on data.
92 /// `next()` still returns one `Success` per record so the application
93 /// can compute the count by iterating.
94 ///
95 /// Default: `false`.
96 pub count_only: bool,
97 /// **Noxu extension.** If `true`, the cursor maintains a `HashSet` of
98 /// `(db_idx, key)` pairs already returned and skips duplicates. This
99 /// can be expensive on large scans — the set grows linearly with
100 /// distinct keys. Default `false` matches JE.
101 ///
102 /// Note: even with `dedup_keys = true`, the cursor returns the *first*
103 /// version of a key that the log scan encounters, which is the
104 /// **oldest** version — not the latest. For latest-only semantics
105 /// the application must run a regular B-tree scan.
106 pub dedup_keys: bool,
107}
108
109impl Default for DiskOrderedCursorConfig {
110 fn default() -> Self {
111 Self {
112 queue_size: 1000,
113 lsn_batch_size: usize::MAX,
114 internal_memory_limit: usize::MAX,
115 keys_only: false,
116 bins_only: false,
117 count_only: false,
118 dedup_keys: false,
119 }
120 }
121}
122
123impl DiskOrderedCursorConfig {
124 /// Returns a configuration with default settings.
125 pub fn new() -> Self {
126 Self::default()
127 }
128
129 /// Sets the producer queue size.
130 pub fn with_queue_size(mut self, queue_size: usize) -> Self {
131 self.queue_size = queue_size.max(1);
132 self
133 }
134
135 /// Sets the LSN batch size (advisory).
136 pub fn with_lsn_batch_size(mut self, lsn_batch_size: usize) -> Self {
137 self.lsn_batch_size = lsn_batch_size.max(1);
138 self
139 }
140
141 /// Sets the internal memory limit in bytes.
142 pub fn with_internal_memory_limit(
143 mut self,
144 internal_memory_limit: usize,
145 ) -> Self {
146 self.internal_memory_limit = internal_memory_limit.max(1);
147 self
148 }
149
150 /// Sets keys-only mode (no data is read).
151 pub fn with_keys_only(mut self, keys_only: bool) -> Self {
152 self.keys_only = keys_only;
153 self
154 }
155
156 /// Sets BINs-only mode (alias for `keys_only` in Noxu).
157 pub fn with_bins_only(mut self, bins_only: bool) -> Self {
158 self.bins_only = bins_only;
159 self
160 }
161
162 /// Sets count-only mode.
163 pub fn with_count_only(mut self, count_only: bool) -> Self {
164 self.count_only = count_only;
165 self
166 }
167
168 /// Enables client-side dedup of repeated keys.
169 pub fn with_dedup_keys(mut self, dedup_keys: bool) -> Self {
170 self.dedup_keys = dedup_keys;
171 self
172 }
173}
174
175/// A cursor that returns records in on-disk order rather than key order.
176///
177/// See the [module-level docs][self] for trade-offs and consistency
178/// guarantees.
179///
180/// # Lifetime
181///
182/// The lifetime parameter `'env` ties the cursor to the borrow of the
183/// `Database` slice passed to [`Database::open_disk_ordered_cursor`] and
184/// [`open_disk_ordered_cursor_multi`], preventing the application from
185/// closing a database while the cursor is still scanning.
186///
187/// # Example
188///
189/// ```ignore
190/// use noxu_db::{DatabaseEntry, DiskOrderedCursorConfig, OperationStatus};
191///
192/// # fn example(db: &noxu_db::Database) -> noxu_db::Result<()> {
193/// let mut cursor = db.open_disk_ordered_cursor(
194/// DiskOrderedCursorConfig::new().with_queue_size(64),
195/// )?;
196/// let mut key = DatabaseEntry::new();
197/// let mut data = DatabaseEntry::new();
198///
199/// while cursor.next(&mut key, &mut data)? == OperationStatus::Success {
200/// // ...process key + data...
201/// }
202/// cursor.close()?;
203/// # Ok(())
204/// # }
205/// ```
206pub struct DiskOrderedCursor<'env> {
207 inner: DiskOrderedCursorImpl,
208 /// Cached value of the most-recent successful `next()` so that
209 /// [`Self::current`] can re-emit it without re-reading the queue.
210 last: Option<(Vec<u8>, Vec<u8>)>,
211 /// `true` once [`Self::close`] has been called or the producer is
212 /// drained. After this, all operations return `OperationStatus::NotFound`.
213 closed: bool,
214 /// Borrows the slice of `&Database` handles to keep them alive.
215 _marker: PhantomData<&'env ()>,
216}
217
218impl<'env> DiskOrderedCursor<'env> {
219 pub(crate) fn from_impl(inner: DiskOrderedCursorImpl) -> Self {
220 Self { inner, last: None, closed: false, _marker: PhantomData }
221 }
222
223 /// Advances the cursor to the next record.
224 ///
225 /// On `Ok(Success)` the `key` and `data` `DatabaseEntry`s are populated
226 /// with the next record's bytes. On `Ok(NotFound)` the cursor has
227 /// reached end-of-log and no further records will be returned (it is
228 /// safe — and idempotent — to call again).
229 ///
230 /// # Errors
231 /// * [`NoxuError::CursorClosed`] if [`Self::close`] has been called.
232 /// * [`NoxuError::IoError`] / [`NoxuError::LogChecksumMismatch`] if the
233 /// producer thread reported a permanent log-read error.
234 pub fn next(
235 &mut self,
236 key: &mut DatabaseEntry,
237 data: &mut DatabaseEntry,
238 ) -> Result<OperationStatus> {
239 if self.closed {
240 return Err(NoxuError::CursorClosed);
241 }
242 match self.inner.next_entry()? {
243 Some((k, d)) => {
244 key.set_data(&k);
245 data.set_data(&d);
246 self.last = Some((k, d));
247 Ok(OperationStatus::Success)
248 }
249 None => Ok(OperationStatus::NotFound),
250 }
251 }
252
253 /// Returns the most recent record yielded by [`Self::next`] without
254 /// advancing.
255 ///
256 /// Returns `OperationStatus::NotFound` if the cursor has not yet been
257 /// advanced or has reached end-of-log.
258 pub fn current(
259 &self,
260 key: &mut DatabaseEntry,
261 data: &mut DatabaseEntry,
262 ) -> Result<OperationStatus> {
263 if self.closed {
264 return Err(NoxuError::CursorClosed);
265 }
266 match &self.last {
267 Some((k, d)) => {
268 key.set_data(k);
269 data.set_data(d);
270 Ok(OperationStatus::Success)
271 }
272 None => Ok(OperationStatus::NotFound),
273 }
274 }
275
276 /// Closes the cursor, signalling and joining the producer thread.
277 ///
278 /// Idempotent — calling `close` on an already-closed cursor is a no-op
279 /// and returns `Ok(())`. This is also called automatically when the
280 /// cursor is dropped, so applications using RAII can rely on the drop
281 /// glue rather than calling `close` explicitly.
282 pub fn close(mut self) -> Result<()> {
283 self.close_in_place()
284 }
285
286 fn close_in_place(&mut self) -> Result<()> {
287 if self.closed {
288 return Ok(());
289 }
290 self.closed = true;
291 self.inner.shutdown();
292 Ok(())
293 }
294}
295
296impl Drop for DiskOrderedCursor<'_> {
297 fn drop(&mut self) {
298 // Close-on-drop matches JE behaviour and ensures the producer
299 // thread is always joined.
300 let _ = self.close_in_place();
301 }
302}
303
304impl Database {
305 /// Opens a single-database disk-ordered cursor.
306 ///
307 /// This is a convenience for the common case; for a multi-database
308 /// scan use [`open_disk_ordered_cursor_multi`].
309 pub fn open_disk_ordered_cursor(
310 &self,
311 config: DiskOrderedCursorConfig,
312 ) -> Result<DiskOrderedCursor<'_>> {
313 let dbs: [&Database; 1] = [self];
314 // Build a vector view that owns the slice for the call (the impl
315 // copies what it needs out of the slice before returning).
316 let inner = DiskOrderedCursorImpl::open(
317 self.cached_log_manager().cloned(),
318 vec![self.database_id_for_doc()],
319 noxu_dbi::DiskOrderedCursorOptions {
320 queue_size: config.queue_size,
321 lsn_batch_size: config.lsn_batch_size,
322 internal_memory_limit: config.internal_memory_limit,
323 keys_only: config.keys_only
324 || config.bins_only
325 || config.count_only,
326 dedup_keys: config.dedup_keys,
327 },
328 )?;
329 // Validate the database is open and reserve the dbs binding to
330 // satisfy the borrow checker (forces &self to live for 'env).
331 self.check_open_for_doc()?;
332 let _ = dbs;
333 Ok(DiskOrderedCursor::from_impl(inner))
334 }
335}
336
337/// Opens a disk-ordered cursor that scans entries from any of the given
338/// databases.
339///
340/// All databases must belong to the same [`crate::Environment`]. The cursor
341/// holds a borrow of the slice for its entire lifetime, which prevents any of
342/// the databases from being closed mid-scan.
343///
344/// # Errors
345///
346/// * [`NoxuError::IllegalArgument`] if `databases` is empty or contains
347/// handles from different environments.
348/// * [`NoxuError::DatabaseClosed`] if any of the databases has been closed.
349/// * [`NoxuError::IoError`] if the producer thread cannot be spawned.
350pub fn open_disk_ordered_cursor_multi<'env>(
351 databases: &'env [&'env Database],
352 config: DiskOrderedCursorConfig,
353) -> Result<DiskOrderedCursor<'env>> {
354 if databases.is_empty() {
355 return Err(NoxuError::IllegalArgument(
356 "open_disk_ordered_cursor: at least one database is required"
357 .into(),
358 ));
359 }
360
361 // Each Database has a snapshot LogManager; verify they all share the
362 // same one (i.e. the same environment). If a database is non-WAL the
363 // disk-ordered scan returns no entries, but the construction still
364 // succeeds for API consistency.
365 let log_manager = databases[0].cached_log_manager().cloned();
366 for db in &databases[1..] {
367 let other = db.cached_log_manager().cloned();
368 match (&log_manager, &other) {
369 (Some(a), Some(b)) if !std::sync::Arc::ptr_eq(a, b) => {
370 return Err(NoxuError::IllegalArgument(
371 "open_disk_ordered_cursor: all databases must share \
372 the same environment"
373 .into(),
374 ));
375 }
376 (Some(_), None) | (None, Some(_)) => {
377 return Err(NoxuError::IllegalArgument(
378 "open_disk_ordered_cursor: all databases must share \
379 the same environment"
380 .into(),
381 ));
382 }
383 _ => {}
384 }
385 }
386
387 let mut db_ids = Vec::with_capacity(databases.len());
388 for db in databases {
389 db.check_open_for_doc()?;
390 db_ids.push(db.database_id_for_doc());
391 }
392
393 let inner = DiskOrderedCursorImpl::open(
394 log_manager,
395 db_ids,
396 noxu_dbi::DiskOrderedCursorOptions {
397 queue_size: config.queue_size,
398 lsn_batch_size: config.lsn_batch_size,
399 internal_memory_limit: config.internal_memory_limit,
400 keys_only: config.keys_only
401 || config.bins_only
402 || config.count_only,
403 dedup_keys: config.dedup_keys,
404 },
405 )?;
406
407 Ok(DiskOrderedCursor::from_impl(inner))
408}
409
410#[cfg(test)]
411mod tests {
412 use super::*;
413
414 #[test]
415 fn config_defaults_match_je_shape() {
416 let c = DiskOrderedCursorConfig::default();
417 assert_eq!(c.queue_size, 1000);
418 assert_eq!(c.lsn_batch_size, usize::MAX);
419 assert_eq!(c.internal_memory_limit, usize::MAX);
420 assert!(!c.keys_only);
421 assert!(!c.bins_only);
422 assert!(!c.count_only);
423 assert!(!c.dedup_keys);
424 }
425
426 #[test]
427 fn config_builders_clamp_zero_to_one() {
428 let c = DiskOrderedCursorConfig::new()
429 .with_queue_size(0)
430 .with_lsn_batch_size(0)
431 .with_internal_memory_limit(0);
432 assert_eq!(c.queue_size, 1);
433 assert_eq!(c.lsn_batch_size, 1);
434 assert_eq!(c.internal_memory_limit, 1);
435 }
436
437 #[test]
438 fn config_builders_chain() {
439 let c = DiskOrderedCursorConfig::new()
440 .with_queue_size(8)
441 .with_keys_only(true)
442 .with_dedup_keys(true);
443 assert_eq!(c.queue_size, 8);
444 assert!(c.keys_only);
445 assert!(c.dedup_keys);
446 }
447}