Skip to main content

noxu_db/
disk_ordered_cursor.rs

1//! Disk-ordered cursor for high-throughput unordered scans.
2//!
3//! A [`DiskOrderedCursor`] returns user records in approximate **on-disk
4//! order** — the order in which their LN log entries appear in the
5//! write-ahead log — rather than in B-tree key order.  This enables bulk
6//! export, full-DB replication catch-up, and analytical scans without paying
7//! the cost of B-tree traversal or lock acquisition.
8//!
9//! # Trade-offs vs [`crate::Cursor`]
10//!
11//! | Property | `Cursor` | `DiskOrderedCursor` |
12//! |---|---|---|
13//! | Order of returned keys | Key order (B-tree) | Disk order (log append order) |
14//! | Lock acquisition       | Yes (per record)   | **No** |
15//! | Isolation              | Per-txn isolation  | At best `READ_UNCOMMITTED` |
16//! | Throughput             | Limited by random I/O on B-tree pages | Sequential log read |
17//! | Deduplication of stale versions | Yes — only the latest committed value is returned | **No** by default — the same key may appear multiple times if it was updated, and a deleted key may still appear |
18//!
19//! # Consistency guarantees
20//!
21//! The records returned by a `DiskOrderedCursor` correspond to the state of
22//! the database at the moment each LN was written to the log, which may
23//! include uncommitted writes.  Concurrent inserts/updates/deletes performed
24//! during the scan are **not** required to be visible.  Applications that
25//! need a transactionally-consistent snapshot should drain in-flight writers
26//! before opening the cursor (e.g. by holding a quiesce barrier).
27//!
28//! # Stale versions
29//!
30//! By default the cursor matches BDB JE: every LN that survives in the log
31//! and belongs to one of the targeted databases is yielded, even if a newer
32//! version of the same key follows.  This is the JE-correct behaviour for
33//! bulk-export workflows that want to observe every committed mutation.
34//! Set [`DiskOrderedCursorConfig::dedup_keys`] to `true` to filter stale
35//! versions client-side (see field docs for caveats).
36//!
37//! # Producer thread
38//!
39//! Opening a cursor spawns a single background producer thread that reads
40//! the log files sequentially and pushes decoded `(db_idx, key, data)`
41//! tuples through a bounded channel.  The thread is joined when the cursor
42//! is dropped or [`DiskOrderedCursor::close`] is called.
43
44use std::marker::PhantomData;
45
46use noxu_dbi::DiskOrderedCursorImpl;
47
48use crate::database::Database;
49use crate::database_entry::DatabaseEntry;
50use crate::error::{NoxuError, Result};
51use crate::operation_status::OperationStatus;
52
53/// Configuration for a [`DiskOrderedCursor`].
54///
55/// Mirrors the field set of `DiskOrderedCursorConfig` but uses
56/// Rust-idiomatic builder methods.  All fields have sensible defaults
57/// matching JE.
58#[derive(Debug, Clone)]
59pub struct DiskOrderedCursorConfig {
60    /// Maximum number of `(key, data)` entries the producer thread may queue
61    /// before blocking.
62    ///
63    /// Default: `1000` (matches JE's `DOS_PRODUCER_QUEUE_SIZE`).
64    pub queue_size: usize,
65    /// Maximum number of LSNs the producer accumulates before yielding a
66    /// batch downstream.  Currently advisory — the producer streams entries
67    /// one at a time; this field is preserved for JE shape compatibility
68    /// and future batched-fetch support.
69    ///
70    /// Default: `usize::MAX`.
71    pub lsn_batch_size: usize,
72    /// Maximum number of bytes the in-flight queue may occupy before the
73    /// producer thread blocks.  Approximate — measured as the sum of key +
74    /// data lengths of buffered entries.
75    ///
76    /// Default: `usize::MAX`.
77    pub internal_memory_limit: usize,
78    /// If `true`, only keys are read from the log; data is left empty.
79    /// Slightly faster because the on-disk LN value bytes are skipped.
80    ///
81    /// Default: `false`.
82    pub keys_only: bool,
83    /// JE legacy flag — scan only BIN entries.  Honoured as an alias for
84    /// `keys_only` in this implementation because Noxu's log iterator
85    /// always emits LN payloads (no separate BIN-only scan path is
86    /// available at this layer).
87    ///
88    /// Default: `false`.
89    pub bins_only: bool,
90    /// JE legacy flag — count records without materialising key/data.
91    /// Currently honoured as `keys_only` plus a discard policy on data.
92    /// `next()` still returns one `Success` per record so the application
93    /// can compute the count by iterating.
94    ///
95    /// Default: `false`.
96    pub count_only: bool,
97    /// **Noxu extension.** If `true`, the cursor maintains a `HashSet` of
98    /// `(db_idx, key)` pairs already returned and skips duplicates.  This
99    /// can be expensive on large scans — the set grows linearly with
100    /// distinct keys.  Default `false` matches JE.
101    ///
102    /// Note: even with `dedup_keys = true`, the cursor returns the *first*
103    /// version of a key that the log scan encounters, which is the
104    /// **oldest** version — not the latest.  For latest-only semantics
105    /// the application must run a regular B-tree scan.
106    pub dedup_keys: bool,
107}
108
109impl Default for DiskOrderedCursorConfig {
110    fn default() -> Self {
111        Self {
112            queue_size: 1000,
113            lsn_batch_size: usize::MAX,
114            internal_memory_limit: usize::MAX,
115            keys_only: false,
116            bins_only: false,
117            count_only: false,
118            dedup_keys: false,
119        }
120    }
121}
122
123impl DiskOrderedCursorConfig {
124    /// Returns a configuration with default settings.
125    pub fn new() -> Self {
126        Self::default()
127    }
128
129    /// Sets the producer queue size.
130    pub fn with_queue_size(mut self, queue_size: usize) -> Self {
131        self.queue_size = queue_size.max(1);
132        self
133    }
134
135    /// Sets the LSN batch size (advisory).
136    pub fn with_lsn_batch_size(mut self, lsn_batch_size: usize) -> Self {
137        self.lsn_batch_size = lsn_batch_size.max(1);
138        self
139    }
140
141    /// Sets the internal memory limit in bytes.
142    pub fn with_internal_memory_limit(
143        mut self,
144        internal_memory_limit: usize,
145    ) -> Self {
146        self.internal_memory_limit = internal_memory_limit.max(1);
147        self
148    }
149
150    /// Sets keys-only mode (no data is read).
151    pub fn with_keys_only(mut self, keys_only: bool) -> Self {
152        self.keys_only = keys_only;
153        self
154    }
155
156    /// Sets BINs-only mode (alias for `keys_only` in Noxu).
157    pub fn with_bins_only(mut self, bins_only: bool) -> Self {
158        self.bins_only = bins_only;
159        self
160    }
161
162    /// Sets count-only mode.
163    pub fn with_count_only(mut self, count_only: bool) -> Self {
164        self.count_only = count_only;
165        self
166    }
167
168    /// Enables client-side dedup of repeated keys.
169    pub fn with_dedup_keys(mut self, dedup_keys: bool) -> Self {
170        self.dedup_keys = dedup_keys;
171        self
172    }
173}
174
175/// A cursor that returns records in on-disk order rather than key order.
176///
177/// See the [module-level docs][self] for trade-offs and consistency
178/// guarantees.
179///
180/// # Lifetime
181///
182/// The lifetime parameter `'env` ties the cursor to the borrow of the
183/// `Database` slice passed to [`Database::open_disk_ordered_cursor`] and
184/// [`open_disk_ordered_cursor_multi`], preventing the application from
185/// closing a database while the cursor is still scanning.
186///
187/// # Example
188///
189/// ```ignore
190/// use noxu_db::{DatabaseEntry, DiskOrderedCursorConfig, OperationStatus};
191///
192/// # fn example(db: &noxu_db::Database) -> noxu_db::Result<()> {
193/// let mut cursor = db.open_disk_ordered_cursor(
194///     DiskOrderedCursorConfig::new().with_queue_size(64),
195/// )?;
196/// let mut key = DatabaseEntry::new();
197/// let mut data = DatabaseEntry::new();
198///
199/// while cursor.next(&mut key, &mut data)? == OperationStatus::Success {
200///     // ...process key + data...
201/// }
202/// cursor.close()?;
203/// # Ok(())
204/// # }
205/// ```
206pub struct DiskOrderedCursor<'env> {
207    inner: DiskOrderedCursorImpl,
208    /// Cached value of the most-recent successful `next()` so that
209    /// [`Self::current`] can re-emit it without re-reading the queue.
210    last: Option<(Vec<u8>, Vec<u8>)>,
211    /// `true` once [`Self::close`] has been called or the producer is
212    /// drained.  After this, all operations return `OperationStatus::NotFound`.
213    closed: bool,
214    /// Borrows the slice of `&Database` handles to keep them alive.
215    _marker: PhantomData<&'env ()>,
216}
217
218impl<'env> DiskOrderedCursor<'env> {
219    pub(crate) fn from_impl(inner: DiskOrderedCursorImpl) -> Self {
220        Self { inner, last: None, closed: false, _marker: PhantomData }
221    }
222
223    /// Advances the cursor to the next record.
224    ///
225    /// On `Ok(Success)` the `key` and `data` `DatabaseEntry`s are populated
226    /// with the next record's bytes.  On `Ok(NotFound)` the cursor has
227    /// reached end-of-log and no further records will be returned (it is
228    /// safe — and idempotent — to call again).
229    ///
230    /// # Errors
231    /// * [`NoxuError::CursorClosed`] if [`Self::close`] has been called.
232    /// * [`NoxuError::IoError`] / [`NoxuError::LogChecksumMismatch`] if the
233    ///   producer thread reported a permanent log-read error.
234    pub fn next(
235        &mut self,
236        key: &mut DatabaseEntry,
237        data: &mut DatabaseEntry,
238    ) -> Result<OperationStatus> {
239        if self.closed {
240            return Err(NoxuError::CursorClosed);
241        }
242        match self.inner.next_entry()? {
243            Some((k, d)) => {
244                key.set_data(&k);
245                data.set_data(&d);
246                self.last = Some((k, d));
247                Ok(OperationStatus::Success)
248            }
249            None => Ok(OperationStatus::NotFound),
250        }
251    }
252
253    /// Returns the most recent record yielded by [`Self::next`] without
254    /// advancing.
255    ///
256    /// Returns `OperationStatus::NotFound` if the cursor has not yet been
257    /// advanced or has reached end-of-log.
258    pub fn current(
259        &self,
260        key: &mut DatabaseEntry,
261        data: &mut DatabaseEntry,
262    ) -> Result<OperationStatus> {
263        if self.closed {
264            return Err(NoxuError::CursorClosed);
265        }
266        match &self.last {
267            Some((k, d)) => {
268                key.set_data(k);
269                data.set_data(d);
270                Ok(OperationStatus::Success)
271            }
272            None => Ok(OperationStatus::NotFound),
273        }
274    }
275
276    /// Closes the cursor, signalling and joining the producer thread.
277    ///
278    /// Idempotent — calling `close` on an already-closed cursor is a no-op
279    /// and returns `Ok(())`.  This is also called automatically when the
280    /// cursor is dropped, so applications using RAII can rely on the drop
281    /// glue rather than calling `close` explicitly.
282    pub fn close(mut self) -> Result<()> {
283        self.close_in_place()
284    }
285
286    fn close_in_place(&mut self) -> Result<()> {
287        if self.closed {
288            return Ok(());
289        }
290        self.closed = true;
291        self.inner.shutdown();
292        Ok(())
293    }
294}
295
296impl Drop for DiskOrderedCursor<'_> {
297    fn drop(&mut self) {
298        // Close-on-drop matches JE behaviour and ensures the producer
299        // thread is always joined.
300        let _ = self.close_in_place();
301    }
302}
303
304impl Database {
305    /// Opens a single-database disk-ordered cursor.
306    ///
307    /// This is a convenience for the common case; for a multi-database
308    /// scan use [`open_disk_ordered_cursor_multi`].
309    pub fn open_disk_ordered_cursor(
310        &self,
311        config: DiskOrderedCursorConfig,
312    ) -> Result<DiskOrderedCursor<'_>> {
313        let dbs: [&Database; 1] = [self];
314        // Build a vector view that owns the slice for the call (the impl
315        // copies what it needs out of the slice before returning).
316        let inner = DiskOrderedCursorImpl::open(
317            self.cached_log_manager().cloned(),
318            vec![self.database_id_for_doc()],
319            noxu_dbi::DiskOrderedCursorOptions {
320                queue_size: config.queue_size,
321                lsn_batch_size: config.lsn_batch_size,
322                internal_memory_limit: config.internal_memory_limit,
323                keys_only: config.keys_only
324                    || config.bins_only
325                    || config.count_only,
326                dedup_keys: config.dedup_keys,
327            },
328        )?;
329        // Validate the database is open and reserve the dbs binding to
330        // satisfy the borrow checker (forces &self to live for 'env).
331        self.check_open_for_doc()?;
332        let _ = dbs;
333        Ok(DiskOrderedCursor::from_impl(inner))
334    }
335}
336
337/// Opens a disk-ordered cursor that scans entries from any of the given
338/// databases.
339///
340/// All databases must belong to the same [`crate::Environment`].  The cursor
341/// holds a borrow of the slice for its entire lifetime, which prevents any of
342/// the databases from being closed mid-scan.
343///
344/// # Errors
345///
346/// * [`NoxuError::IllegalArgument`] if `databases` is empty or contains
347///   handles from different environments.
348/// * [`NoxuError::DatabaseClosed`] if any of the databases has been closed.
349/// * [`NoxuError::IoError`] if the producer thread cannot be spawned.
350pub fn open_disk_ordered_cursor_multi<'env>(
351    databases: &'env [&'env Database],
352    config: DiskOrderedCursorConfig,
353) -> Result<DiskOrderedCursor<'env>> {
354    if databases.is_empty() {
355        return Err(NoxuError::IllegalArgument(
356            "open_disk_ordered_cursor: at least one database is required"
357                .into(),
358        ));
359    }
360
361    // Each Database has a snapshot LogManager; verify they all share the
362    // same one (i.e. the same environment).  If a database is non-WAL the
363    // disk-ordered scan returns no entries, but the construction still
364    // succeeds for API consistency.
365    let log_manager = databases[0].cached_log_manager().cloned();
366    for db in &databases[1..] {
367        let other = db.cached_log_manager().cloned();
368        match (&log_manager, &other) {
369            (Some(a), Some(b)) if !std::sync::Arc::ptr_eq(a, b) => {
370                return Err(NoxuError::IllegalArgument(
371                    "open_disk_ordered_cursor: all databases must share \
372                     the same environment"
373                        .into(),
374                ));
375            }
376            (Some(_), None) | (None, Some(_)) => {
377                return Err(NoxuError::IllegalArgument(
378                    "open_disk_ordered_cursor: all databases must share \
379                     the same environment"
380                        .into(),
381                ));
382            }
383            _ => {}
384        }
385    }
386
387    let mut db_ids = Vec::with_capacity(databases.len());
388    for db in databases {
389        db.check_open_for_doc()?;
390        db_ids.push(db.database_id_for_doc());
391    }
392
393    let inner = DiskOrderedCursorImpl::open(
394        log_manager,
395        db_ids,
396        noxu_dbi::DiskOrderedCursorOptions {
397            queue_size: config.queue_size,
398            lsn_batch_size: config.lsn_batch_size,
399            internal_memory_limit: config.internal_memory_limit,
400            keys_only: config.keys_only
401                || config.bins_only
402                || config.count_only,
403            dedup_keys: config.dedup_keys,
404        },
405    )?;
406
407    Ok(DiskOrderedCursor::from_impl(inner))
408}
409
410#[cfg(test)]
411mod tests {
412    use super::*;
413
414    #[test]
415    fn config_defaults_match_je_shape() {
416        let c = DiskOrderedCursorConfig::default();
417        assert_eq!(c.queue_size, 1000);
418        assert_eq!(c.lsn_batch_size, usize::MAX);
419        assert_eq!(c.internal_memory_limit, usize::MAX);
420        assert!(!c.keys_only);
421        assert!(!c.bins_only);
422        assert!(!c.count_only);
423        assert!(!c.dedup_keys);
424    }
425
426    #[test]
427    fn config_builders_clamp_zero_to_one() {
428        let c = DiskOrderedCursorConfig::new()
429            .with_queue_size(0)
430            .with_lsn_batch_size(0)
431            .with_internal_memory_limit(0);
432        assert_eq!(c.queue_size, 1);
433        assert_eq!(c.lsn_batch_size, 1);
434        assert_eq!(c.internal_memory_limit, 1);
435    }
436
437    #[test]
438    fn config_builders_chain() {
439        let c = DiskOrderedCursorConfig::new()
440            .with_queue_size(8)
441            .with_keys_only(true)
442            .with_dedup_keys(true);
443        assert_eq!(c.queue_size, 8);
444        assert!(c.keys_only);
445        assert!(c.dedup_keys);
446    }
447}