1use std::collections::HashSet;
41use std::sync::{
42 Arc, Condvar, Mutex,
43 atomic::{AtomicBool, AtomicUsize, Ordering},
44 mpsc::{self, RecvTimeoutError, SyncSender},
45};
46use std::thread::{self, JoinHandle};
47use std::time::Duration;
48
49use bytes::Bytes;
50use noxu_log::LogManager;
51use noxu_recovery::{LnOperation, LogEntry, LogScanner, PositionedEntry};
52
53use crate::database_id::DatabaseId;
54use crate::error::{DbiError, Result};
55use crate::file_manager_scanner::FileManagerLogScanner;
56
57type DocItem = std::result::Result<(Vec<u8>, Vec<u8>), DbiError>;
59
60#[derive(Debug, Clone)]
66pub struct DiskOrderedCursorOptions {
67 pub queue_size: usize,
69 pub internal_memory_limit: usize,
71 pub lsn_batch_size: usize,
74 pub keys_only: bool,
76 pub dedup_keys: bool,
78}
79
80impl Default for DiskOrderedCursorOptions {
81 fn default() -> Self {
82 Self {
83 queue_size: 1000,
84 internal_memory_limit: usize::MAX,
85 lsn_batch_size: usize::MAX,
86 keys_only: false,
87 dedup_keys: false,
88 }
89 }
90}
91
92struct MemoryBudget {
94 in_use: AtomicUsize,
95 limit: usize,
96 cv: Condvar,
99 mu: Mutex<()>,
101}
102
103impl MemoryBudget {
104 fn new(limit: usize) -> Self {
105 Self {
106 in_use: AtomicUsize::new(0),
107 limit,
108 cv: Condvar::new(),
109 mu: Mutex::new(()),
110 }
111 }
112
113 fn reserve(&self, bytes: usize, cancel: &AtomicBool) -> bool {
116 if self.limit == usize::MAX {
117 self.in_use.fetch_add(bytes, Ordering::Relaxed);
118 return true;
119 }
120 let mut guard = self.mu.lock().unwrap_or_else(|p| p.into_inner());
121 loop {
122 if cancel.load(Ordering::Acquire) {
123 return false;
124 }
125 let cur = self.in_use.load(Ordering::Acquire);
126 if cur == 0 || cur + bytes <= self.limit {
129 self.in_use.fetch_add(bytes, Ordering::Relaxed);
130 return true;
131 }
132 let (g, _) = self
135 .cv
136 .wait_timeout(guard, Duration::from_millis(50))
137 .unwrap_or_else(|p| p.into_inner());
138 guard = g;
139 }
140 }
141
142 fn release(&self, bytes: usize) {
143 if self.limit == usize::MAX {
144 self.in_use.fetch_sub(bytes, Ordering::Relaxed);
145 return;
146 }
147 self.in_use.fetch_sub(bytes, Ordering::Relaxed);
148 let _g = self.mu.lock();
150 self.cv.notify_all();
151 }
152}
153
154pub struct DiskOrderedCursorImpl {
160 rx: mpsc::Receiver<DocItem>,
162 handle: Option<JoinHandle<()>>,
164 cancel: Arc<AtomicBool>,
166 budget: Arc<MemoryBudget>,
168 drained: bool,
170 terminal_err: Option<DbiError>,
173}
174
175impl DiskOrderedCursorImpl {
176 pub fn open(
183 log_manager: Option<Arc<LogManager>>,
184 target_db_ids: Vec<DatabaseId>,
185 opts: DiskOrderedCursorOptions,
186 ) -> Result<Self> {
187 let queue_size = opts.queue_size.max(1);
188 let (tx, rx) = mpsc::sync_channel::<DocItem>(queue_size);
189 let cancel = Arc::new(AtomicBool::new(false));
190 let budget = Arc::new(MemoryBudget::new(opts.internal_memory_limit));
191
192 let handle = match log_manager {
193 Some(lm) => {
194 let cancel_p = Arc::clone(&cancel);
195 let budget_p = Arc::clone(&budget);
196 let tx_p = tx;
197 let opts_p = opts;
198 let target = target_db_ids;
199 let builder = thread::Builder::new()
200 .name("noxu-disk-ordered-cursor".to_string());
201 let h = builder
202 .spawn(move || {
203 produce(lm, target, opts_p, tx_p, cancel_p, budget_p)
204 })
205 .map_err(|e| {
206 DbiError::OperationFailed(format!(
207 "failed to spawn disk-ordered-cursor producer: {e}"
208 ))
209 })?;
210 Some(h)
211 }
212 None => {
213 drop(tx);
215 None
216 }
217 };
218
219 Ok(Self {
220 rx,
221 handle,
222 cancel,
223 budget,
224 drained: false,
225 terminal_err: None,
226 })
227 }
228
229 pub fn next_entry(&mut self) -> Result<Option<(Vec<u8>, Vec<u8>)>> {
235 if let Some(e) = &self.terminal_err {
236 return Err(clone_dbi_err(e));
237 }
238 if self.drained {
239 return Ok(None);
240 }
241 loop {
242 match self.rx.recv_timeout(Duration::from_millis(100)) {
243 Ok(Ok((k, d))) => {
244 let n = k.len() + d.len();
245 self.budget.release(n);
246 return Ok(Some((k, d)));
247 }
248 Ok(Err(e)) => {
249 let cloned = clone_dbi_err(&e);
250 self.terminal_err = Some(e);
251 return Err(cloned);
252 }
253 Err(RecvTimeoutError::Timeout) => {
254 if self.cancel.load(Ordering::Acquire) {
255 self.drained = true;
256 return Ok(None);
257 }
258 continue;
259 }
260 Err(RecvTimeoutError::Disconnected) => {
261 self.drained = true;
262 return Ok(None);
263 }
264 }
265 }
266 }
267
268 pub fn shutdown(&mut self) {
274 self.cancel.store(true, Ordering::Release);
275 {
277 let _g = self.budget.mu.lock();
278 self.budget.cv.notify_all();
279 }
280 while self.rx.try_recv().is_ok() {}
282 if let Some(h) = self.handle.take() {
283 if let Err(e) = h.join() {
288 log::warn!(
289 target: "noxu-disk-ordered-cursor",
290 "producer thread panicked during shutdown: {e:?}"
291 );
292 }
293 }
294 self.drained = true;
295 }
296}
297
298impl Drop for DiskOrderedCursorImpl {
299 fn drop(&mut self) {
300 self.shutdown();
301 }
302}
303
304fn clone_dbi_err(e: &DbiError) -> DbiError {
309 match e {
310 DbiError::OperationFailed(s) => DbiError::OperationFailed(s.clone()),
311 DbiError::IoError(io) => DbiError::OperationFailed(format!(
312 "disk-ordered-cursor producer I/O error: {io}"
313 )),
314 other => DbiError::OperationFailed(format!(
315 "disk-ordered-cursor producer error: {other}"
316 )),
317 }
318}
319
320fn produce(
326 log_manager: Arc<LogManager>,
327 target_db_ids: Vec<DatabaseId>,
328 opts: DiskOrderedCursorOptions,
329 tx: SyncSender<DocItem>,
330 cancel: Arc<AtomicBool>,
331 budget: Arc<MemoryBudget>,
332) {
333 let target_set: HashSet<u64> =
334 target_db_ids.iter().map(|d| d.as_i64() as u64).collect();
335 let fm = Arc::clone(log_manager.file_manager());
336 let scanner = FileManagerLogScanner::new(fm);
337
338 let file_nums = match log_manager.file_manager().list_file_numbers() {
339 Ok(v) => v,
340 Err(e) => {
341 let _ = tx.send(Err(DbiError::OperationFailed(format!(
342 "list_file_numbers: {e}"
343 ))));
344 return;
345 }
346 };
347
348 let mut dedup: Option<HashSet<Vec<u8>>> =
349 opts.dedup_keys.then(HashSet::new);
350 let mut counter_since_check = 0usize;
351
352 for &file_num in &file_nums {
353 if cancel.load(Ordering::Acquire) {
354 return;
355 }
356 let start = noxu_util::Lsn::new(file_num, 0);
357 let end = noxu_util::Lsn::new(file_num.saturating_add(1), 0);
358 let entries: Vec<PositionedEntry> = scanner.scan_forward(start, end);
359 for pe in entries {
360 counter_since_check += 1;
361 if counter_since_check >= 64
362 || counter_since_check >= opts.lsn_batch_size
363 {
364 counter_since_check = 0;
365 if cancel.load(Ordering::Acquire) {
366 return;
367 }
368 }
369
370 let LogEntry::Ln(ln) = pe.entry else { continue };
371
372 if matches!(ln.operation, LnOperation::Delete) || ln.data.is_none()
374 {
375 continue;
376 }
377 if !target_set.contains(&ln.db_id) {
378 continue;
379 }
380
381 let key_bytes: Bytes = ln.key;
382 let data_bytes: Bytes = ln.data.unwrap_or_default();
383
384 let key_vec = key_bytes.to_vec();
385 let data_vec =
386 if opts.keys_only { Vec::new() } else { data_bytes.to_vec() };
387
388 if let Some(set) = dedup.as_mut()
389 && !set.insert(key_vec.clone())
390 {
391 continue;
392 }
393
394 let n = key_vec.len() + data_vec.len();
397 if !budget.reserve(n, &cancel) {
398 return;
399 }
400
401 if tx.send(Ok((key_vec, data_vec))).is_err() {
404 budget.release(n);
406 return;
407 }
408 }
409 }
410 }
413
414#[cfg(test)]
415mod tests {
416 use super::*;
417
418 #[test]
419 fn open_with_no_log_manager_yields_empty() {
420 let mut doc = DiskOrderedCursorImpl::open(
421 None,
422 vec![DatabaseId::new(1)],
423 DiskOrderedCursorOptions::default(),
424 )
425 .unwrap();
426 assert_eq!(doc.next_entry().unwrap(), None);
427 assert_eq!(doc.next_entry().unwrap(), None);
429 }
430
431 #[test]
432 fn shutdown_is_idempotent() {
433 let mut doc = DiskOrderedCursorImpl::open(
434 None,
435 vec![DatabaseId::new(1)],
436 DiskOrderedCursorOptions::default(),
437 )
438 .unwrap();
439 doc.shutdown();
440 doc.shutdown();
441 assert_eq!(doc.next_entry().unwrap(), None);
442 }
443
444 #[test]
445 fn budget_release_balances_reserve() {
446 let b = MemoryBudget::new(1024);
447 let cancel = AtomicBool::new(false);
448 assert!(b.reserve(512, &cancel));
449 assert_eq!(b.in_use.load(Ordering::Relaxed), 512);
450 b.release(512);
451 assert_eq!(b.in_use.load(Ordering::Relaxed), 0);
452 }
453
454 #[test]
455 fn budget_unbounded_short_circuits() {
456 let b = MemoryBudget::new(usize::MAX);
457 let cancel = AtomicBool::new(false);
458 assert!(b.reserve(1_000_000, &cancel));
459 b.release(1_000_000);
460 assert_eq!(b.in_use.load(Ordering::Relaxed), 0);
461 }
462
463 #[test]
464 fn budget_cancel_unblocks_reserve() {
465 use std::thread;
466 let b = Arc::new(MemoryBudget::new(8));
467 let cancel = Arc::new(AtomicBool::new(false));
468 let cancel2 = Arc::clone(&cancel);
469 let b2 = Arc::clone(&b);
470 assert!(b.reserve(8, &cancel));
472 let h = thread::spawn(move || {
473 b2.reserve(8, &cancel2)
475 });
476 thread::sleep(Duration::from_millis(20));
477 cancel.store(true, Ordering::Release);
478 let _g = b.mu.lock();
479 b.cv.notify_all();
480 drop(_g);
481 let res = h.join().unwrap();
482 assert!(!res, "reserve should return false when cancel fires");
483 }
484}