Skip to main content

noxu_dbi/
disk_ordered_cursor_impl.rs

1//! Disk-ordered cursor implementation.
2//!
3//! Sits behind the public `noxu_db::DiskOrderedCursor` API.  Spawns a
4//! background producer thread that walks the log files sequentially, decodes
5//! LN entries that belong to the targeted databases, and pushes
6//! `(key, data)` tuples through a bounded channel for the consumer to drain
7//! via [`DiskOrderedCursorImpl::next_entry`].
8//!
9//! See `crates/noxu-db/src/disk_ordered_cursor.rs` for the public-API
10//! contract and consistency guarantees.
11//!
12//! # Producer-thread lifecycle
13//!
14//! ```text
15//!   open() ──spawn──> Producer thread (filescan + decode + send)
16//!                        │
17//!                        ▼
18//!   next_entry() <──── bounded mpsc channel
19//!                        ▲
20//!                        │
21//!   shutdown() ──signal──┘   (joins on drop)
22//! ```
23//!
24//! Two channels are used:
25//!
26//! * **Data channel** — `sync_channel::<DocItem>(queue_size)`: producer sends
27//!   results, consumer receives.
28//! * **Shutdown flag** — `Arc<AtomicBool>`: consumer sets it; producer
29//!   checks it between entries and terminates promptly.
30//!
31//! # Memory budget
32//!
33//! `internal_memory_limit` is interpreted as a soft cap on the *cumulative
34//! key+data byte size* of items currently buffered in the channel.  The
35//! producer tracks it via an `Arc<AtomicUsize>` that the consumer
36//! decrements as it drains; the producer blocks on a `Condvar` when the
37//! limit is reached.  This is approximate (race windows around
38//! send/recv), in line with JE's own approximation.
39
40use std::collections::HashSet;
41use std::sync::{
42    Arc, Condvar, Mutex,
43    atomic::{AtomicBool, AtomicUsize, Ordering},
44    mpsc::{self, RecvTimeoutError, SyncSender},
45};
46use std::thread::{self, JoinHandle};
47use std::time::Duration;
48
49use bytes::Bytes;
50use noxu_log::LogManager;
51use noxu_recovery::{LnOperation, LogEntry, LogScanner, PositionedEntry};
52
53use crate::database_id::DatabaseId;
54use crate::error::{DbiError, Result};
55use crate::file_manager_scanner::FileManagerLogScanner;
56
57/// Items pushed by the producer through the data channel.
58type DocItem = std::result::Result<(Vec<u8>, Vec<u8>), DbiError>;
59
60/// Plain-data options accepted by [`DiskOrderedCursorImpl::open`].
61///
62/// The public `noxu-db::DiskOrderedCursorConfig` is mapped onto this struct
63/// at the API boundary so the implementation crate has no knowledge of the
64/// outer config type.
65#[derive(Debug, Clone)]
66pub struct DiskOrderedCursorOptions {
67    /// Maximum number of entries the data channel may hold.
68    pub queue_size: usize,
69    /// Soft cap on the cumulative key+data bytes buffered in the channel.
70    pub internal_memory_limit: usize,
71    /// Advisory: max LSNs to consider per producer batch.  Currently
72    /// honoured as a periodic shutdown-flag check interval.
73    pub lsn_batch_size: usize,
74    /// If `true`, only key bytes are returned; data is empty.
75    pub keys_only: bool,
76    /// If `true`, keep a `(db_idx, key)` HashSet and skip duplicates.
77    pub dedup_keys: bool,
78}
79
80impl Default for DiskOrderedCursorOptions {
81    fn default() -> Self {
82        Self {
83            queue_size: 1000,
84            internal_memory_limit: usize::MAX,
85            lsn_batch_size: usize::MAX,
86            keys_only: false,
87            dedup_keys: false,
88        }
89    }
90}
91
92/// Memory-budget tracker shared between producer and consumer.
93struct MemoryBudget {
94    in_use: AtomicUsize,
95    limit: usize,
96    /// Producer parks here when in_use >= limit.  Notified by consumer
97    /// after each successful recv() that decrements `in_use`.
98    cv: Condvar,
99    /// Mutex protecting the Condvar (no shared data — Mutex<()> is fine).
100    mu: Mutex<()>,
101}
102
103impl MemoryBudget {
104    fn new(limit: usize) -> Self {
105        Self {
106            in_use: AtomicUsize::new(0),
107            limit,
108            cv: Condvar::new(),
109            mu: Mutex::new(()),
110        }
111    }
112
113    /// Block on the condvar until there is room for `bytes`, or `cancel`
114    /// fires.  Returns `false` if cancelled before room was available.
115    fn reserve(&self, bytes: usize, cancel: &AtomicBool) -> bool {
116        if self.limit == usize::MAX {
117            self.in_use.fetch_add(bytes, Ordering::Relaxed);
118            return true;
119        }
120        let mut guard = self.mu.lock().unwrap_or_else(|p| p.into_inner());
121        loop {
122            if cancel.load(Ordering::Acquire) {
123                return false;
124            }
125            let cur = self.in_use.load(Ordering::Acquire);
126            // Always allow at least one item even if it exceeds the budget,
127            // so that giant payloads still progress.
128            if cur == 0 || cur + bytes <= self.limit {
129                self.in_use.fetch_add(bytes, Ordering::Relaxed);
130                return true;
131            }
132            // Wait for a release().  Bound the wait so cancellation is
133            // observed even if the consumer is also stalled.
134            let (g, _) = self
135                .cv
136                .wait_timeout(guard, Duration::from_millis(50))
137                .unwrap_or_else(|p| p.into_inner());
138            guard = g;
139        }
140    }
141
142    fn release(&self, bytes: usize) {
143        if self.limit == usize::MAX {
144            self.in_use.fetch_sub(bytes, Ordering::Relaxed);
145            return;
146        }
147        self.in_use.fetch_sub(bytes, Ordering::Relaxed);
148        // Wake the producer.
149        let _g = self.mu.lock();
150        self.cv.notify_all();
151    }
152}
153
154/// Internals of a disk-ordered cursor.
155///
156/// Owned by the public `DiskOrderedCursor` and lives on the consumer thread.
157/// Sending `Self` between threads is not supported (the receiver is a
158/// non-`Sync` `Receiver`).
159pub struct DiskOrderedCursorImpl {
160    /// Receives `(key, data)` items (or errors) from the producer thread.
161    rx: mpsc::Receiver<DocItem>,
162    /// Joined by `shutdown()`.
163    handle: Option<JoinHandle<()>>,
164    /// Set to true to signal the producer to stop.
165    cancel: Arc<AtomicBool>,
166    /// Memory budget — consumer releases bytes on each recv.
167    budget: Arc<MemoryBudget>,
168    /// Whether next_entry() has observed end-of-stream.
169    drained: bool,
170    /// Sticky terminal error from the producer (latched on the first
171    /// `Err` received so subsequent calls keep returning it).
172    terminal_err: Option<DbiError>,
173}
174
175impl DiskOrderedCursorImpl {
176    /// Construct and start a disk-ordered cursor over the given target
177    /// databases.
178    ///
179    /// `log_manager == None` produces a cursor that immediately reaches
180    /// end-of-stream — no entries can be returned because the environment
181    /// has no WAL to scan.
182    pub fn open(
183        log_manager: Option<Arc<LogManager>>,
184        target_db_ids: Vec<DatabaseId>,
185        opts: DiskOrderedCursorOptions,
186    ) -> Result<Self> {
187        let queue_size = opts.queue_size.max(1);
188        let (tx, rx) = mpsc::sync_channel::<DocItem>(queue_size);
189        let cancel = Arc::new(AtomicBool::new(false));
190        let budget = Arc::new(MemoryBudget::new(opts.internal_memory_limit));
191
192        let handle = match log_manager {
193            Some(lm) => {
194                let cancel_p = Arc::clone(&cancel);
195                let budget_p = Arc::clone(&budget);
196                let tx_p = tx;
197                let opts_p = opts;
198                let target = target_db_ids;
199                let builder = thread::Builder::new()
200                    .name("noxu-disk-ordered-cursor".to_string());
201                let h = builder
202                    .spawn(move || {
203                        produce(lm, target, opts_p, tx_p, cancel_p, budget_p)
204                    })
205                    .map_err(|e| {
206                        DbiError::OperationFailed(format!(
207                            "failed to spawn disk-ordered-cursor producer: {e}"
208                        ))
209                    })?;
210                Some(h)
211            }
212            None => {
213                // No log: drop tx so rx returns Disconnected immediately.
214                drop(tx);
215                None
216            }
217        };
218
219        Ok(Self {
220            rx,
221            handle,
222            cancel,
223            budget,
224            drained: false,
225            terminal_err: None,
226        })
227    }
228
229    /// Receive the next `(key, data)` tuple.
230    ///
231    /// Returns `Ok(None)` at end-of-log.  Once `None` has been returned,
232    /// every subsequent call also returns `Ok(None)`.  After a producer
233    /// error every subsequent call returns the same error (latched).
234    pub fn next_entry(&mut self) -> Result<Option<(Vec<u8>, Vec<u8>)>> {
235        if let Some(e) = &self.terminal_err {
236            return Err(clone_dbi_err(e));
237        }
238        if self.drained {
239            return Ok(None);
240        }
241        loop {
242            match self.rx.recv_timeout(Duration::from_millis(100)) {
243                Ok(Ok((k, d))) => {
244                    let n = k.len() + d.len();
245                    self.budget.release(n);
246                    return Ok(Some((k, d)));
247                }
248                Ok(Err(e)) => {
249                    let cloned = clone_dbi_err(&e);
250                    self.terminal_err = Some(e);
251                    return Err(cloned);
252                }
253                Err(RecvTimeoutError::Timeout) => {
254                    if self.cancel.load(Ordering::Acquire) {
255                        self.drained = true;
256                        return Ok(None);
257                    }
258                    continue;
259                }
260                Err(RecvTimeoutError::Disconnected) => {
261                    self.drained = true;
262                    return Ok(None);
263                }
264            }
265        }
266    }
267
268    /// Signal the producer thread to stop and join it.
269    ///
270    /// Idempotent.  Called by the public `DiskOrderedCursor::close()` and
271    /// also by its `Drop` impl, so applications never observe a leaked
272    /// thread.
273    pub fn shutdown(&mut self) {
274        self.cancel.store(true, Ordering::Release);
275        // Wake the producer if it is parked in MemoryBudget::reserve().
276        {
277            let _g = self.budget.mu.lock();
278            self.budget.cv.notify_all();
279        }
280        // Drain remaining items so the producer never blocks on send.
281        while self.rx.try_recv().is_ok() {}
282        if let Some(h) = self.handle.take() {
283            // Join is best-effort — the producer exits promptly once it
284            // sees `cancel`.  A panic in the producer is converted to a
285            // log message; we don't propagate it because shutdown() is
286            // also called from Drop.
287            if let Err(e) = h.join() {
288                log::warn!(
289                    target: "noxu-disk-ordered-cursor",
290                    "producer thread panicked during shutdown: {e:?}"
291                );
292            }
293        }
294        self.drained = true;
295    }
296}
297
298impl Drop for DiskOrderedCursorImpl {
299    fn drop(&mut self) {
300        self.shutdown();
301    }
302}
303
304/// Best-effort clone of a `DbiError` so the consumer can latch a copy of
305/// the producer's terminal error.  `DbiError` does not derive `Clone`
306/// because of its embedded `io::Error`; we degrade those to a string
307/// representation.
308fn clone_dbi_err(e: &DbiError) -> DbiError {
309    match e {
310        DbiError::OperationFailed(s) => DbiError::OperationFailed(s.clone()),
311        DbiError::IoError(io) => DbiError::OperationFailed(format!(
312            "disk-ordered-cursor producer I/O error: {io}"
313        )),
314        other => DbiError::OperationFailed(format!(
315            "disk-ordered-cursor producer error: {other}"
316        )),
317    }
318}
319
320/// Producer thread body.
321///
322/// Walks every log file in ascending order, scans entries via
323/// `FileManagerLogScanner::scan_forward` per file, filters to LN entries
324/// belonging to a target db, and pushes results onto `tx`.
325fn produce(
326    log_manager: Arc<LogManager>,
327    target_db_ids: Vec<DatabaseId>,
328    opts: DiskOrderedCursorOptions,
329    tx: SyncSender<DocItem>,
330    cancel: Arc<AtomicBool>,
331    budget: Arc<MemoryBudget>,
332) {
333    let target_set: HashSet<u64> =
334        target_db_ids.iter().map(|d| d.as_i64() as u64).collect();
335    let fm = Arc::clone(log_manager.file_manager());
336    let scanner = FileManagerLogScanner::new(fm);
337
338    let file_nums = match log_manager.file_manager().list_file_numbers() {
339        Ok(v) => v,
340        Err(e) => {
341            let _ = tx.send(Err(DbiError::OperationFailed(format!(
342                "list_file_numbers: {e}"
343            ))));
344            return;
345        }
346    };
347
348    let mut dedup: Option<HashSet<Vec<u8>>> =
349        opts.dedup_keys.then(HashSet::new);
350    let mut counter_since_check = 0usize;
351
352    for &file_num in &file_nums {
353        if cancel.load(Ordering::Acquire) {
354            return;
355        }
356        let start = noxu_util::Lsn::new(file_num, 0);
357        let end = noxu_util::Lsn::new(file_num.saturating_add(1), 0);
358        let entries: Vec<PositionedEntry> = scanner.scan_forward(start, end);
359        for pe in entries {
360            counter_since_check += 1;
361            if counter_since_check >= 64
362                || counter_since_check >= opts.lsn_batch_size
363            {
364                counter_since_check = 0;
365                if cancel.load(Ordering::Acquire) {
366                    return;
367                }
368            }
369
370            let LogEntry::Ln(ln) = pe.entry else { continue };
371
372            // Skip deletes — JE returns only live records.
373            if matches!(ln.operation, LnOperation::Delete) || ln.data.is_none()
374            {
375                continue;
376            }
377            if !target_set.contains(&ln.db_id) {
378                continue;
379            }
380
381            let key_bytes: Bytes = ln.key;
382            let data_bytes: Bytes = ln.data.unwrap_or_default();
383
384            let key_vec = key_bytes.to_vec();
385            let data_vec =
386                if opts.keys_only { Vec::new() } else { data_bytes.to_vec() };
387
388            if let Some(set) = dedup.as_mut()
389                && !set.insert(key_vec.clone())
390            {
391                continue;
392            }
393
394            // Reserve budget before sending.  If reserve returns false,
395            // the consumer cancelled — exit promptly.
396            let n = key_vec.len() + data_vec.len();
397            if !budget.reserve(n, &cancel) {
398                return;
399            }
400
401            // Backpressure on the channel itself: send blocks when the
402            // bounded queue is full.
403            if tx.send(Ok((key_vec, data_vec))).is_err() {
404                // Receiver dropped — consumer is gone.
405                budget.release(n);
406                return;
407            }
408        }
409    }
410    // Falling out of the loop closes `tx` (drop on return), which the
411    // consumer observes as Disconnected → end-of-log.
412}
413
414#[cfg(test)]
415mod tests {
416    use super::*;
417
418    #[test]
419    fn open_with_no_log_manager_yields_empty() {
420        let mut doc = DiskOrderedCursorImpl::open(
421            None,
422            vec![DatabaseId::new(1)],
423            DiskOrderedCursorOptions::default(),
424        )
425        .unwrap();
426        assert_eq!(doc.next_entry().unwrap(), None);
427        // Idempotent end-of-stream.
428        assert_eq!(doc.next_entry().unwrap(), None);
429    }
430
431    #[test]
432    fn shutdown_is_idempotent() {
433        let mut doc = DiskOrderedCursorImpl::open(
434            None,
435            vec![DatabaseId::new(1)],
436            DiskOrderedCursorOptions::default(),
437        )
438        .unwrap();
439        doc.shutdown();
440        doc.shutdown();
441        assert_eq!(doc.next_entry().unwrap(), None);
442    }
443
444    #[test]
445    fn budget_release_balances_reserve() {
446        let b = MemoryBudget::new(1024);
447        let cancel = AtomicBool::new(false);
448        assert!(b.reserve(512, &cancel));
449        assert_eq!(b.in_use.load(Ordering::Relaxed), 512);
450        b.release(512);
451        assert_eq!(b.in_use.load(Ordering::Relaxed), 0);
452    }
453
454    #[test]
455    fn budget_unbounded_short_circuits() {
456        let b = MemoryBudget::new(usize::MAX);
457        let cancel = AtomicBool::new(false);
458        assert!(b.reserve(1_000_000, &cancel));
459        b.release(1_000_000);
460        assert_eq!(b.in_use.load(Ordering::Relaxed), 0);
461    }
462
463    #[test]
464    fn budget_cancel_unblocks_reserve() {
465        use std::thread;
466        let b = Arc::new(MemoryBudget::new(8));
467        let cancel = Arc::new(AtomicBool::new(false));
468        let cancel2 = Arc::clone(&cancel);
469        let b2 = Arc::clone(&b);
470        // Saturate the budget.
471        assert!(b.reserve(8, &cancel));
472        let h = thread::spawn(move || {
473            // This call should block until cancel fires.
474            b2.reserve(8, &cancel2)
475        });
476        thread::sleep(Duration::from_millis(20));
477        cancel.store(true, Ordering::Release);
478        let _g = b.mu.lock();
479        b.cv.notify_all();
480        drop(_g);
481        let res = h.join().unwrap();
482        assert!(!res, "reserve should return false when cancel fires");
483    }
484}