use std::collections::HashSet;
use std::sync::{
Arc, Condvar, Mutex,
atomic::{AtomicBool, AtomicUsize, Ordering},
mpsc::{self, RecvTimeoutError, SyncSender},
};
use std::thread::{self, JoinHandle};
use std::time::Duration;
use bytes::Bytes;
use noxu_log::LogManager;
use noxu_recovery::{LnOperation, LogEntry, LogScanner, PositionedEntry};
use crate::database_id::DatabaseId;
use crate::error::{DbiError, Result};
use crate::file_manager_scanner::FileManagerLogScanner;
type DocItem = std::result::Result<(Vec<u8>, Vec<u8>), DbiError>;
#[derive(Debug, Clone)]
pub struct DiskOrderedCursorOptions {
pub queue_size: usize,
pub internal_memory_limit: usize,
pub lsn_batch_size: usize,
pub keys_only: bool,
pub dedup_keys: bool,
}
impl Default for DiskOrderedCursorOptions {
fn default() -> Self {
Self {
queue_size: 1000,
internal_memory_limit: usize::MAX,
lsn_batch_size: usize::MAX,
keys_only: false,
dedup_keys: false,
}
}
}
struct MemoryBudget {
in_use: AtomicUsize,
limit: usize,
cv: Condvar,
mu: Mutex<()>,
}
impl MemoryBudget {
fn new(limit: usize) -> Self {
Self {
in_use: AtomicUsize::new(0),
limit,
cv: Condvar::new(),
mu: Mutex::new(()),
}
}
fn reserve(&self, bytes: usize, cancel: &AtomicBool) -> bool {
if self.limit == usize::MAX {
self.in_use.fetch_add(bytes, Ordering::Relaxed);
return true;
}
let mut guard = self.mu.lock().unwrap_or_else(|p| p.into_inner());
loop {
if cancel.load(Ordering::Acquire) {
return false;
}
let cur = self.in_use.load(Ordering::Acquire);
if cur == 0 || cur + bytes <= self.limit {
self.in_use.fetch_add(bytes, Ordering::Relaxed);
return true;
}
let (g, _) = self
.cv
.wait_timeout(guard, Duration::from_millis(50))
.unwrap_or_else(|p| p.into_inner());
guard = g;
}
}
fn release(&self, bytes: usize) {
if self.limit == usize::MAX {
self.in_use.fetch_sub(bytes, Ordering::Relaxed);
return;
}
self.in_use.fetch_sub(bytes, Ordering::Relaxed);
let _g = self.mu.lock();
self.cv.notify_all();
}
}
pub struct DiskOrderedCursorImpl {
rx: mpsc::Receiver<DocItem>,
handle: Option<JoinHandle<()>>,
cancel: Arc<AtomicBool>,
budget: Arc<MemoryBudget>,
drained: bool,
terminal_err: Option<DbiError>,
}
impl DiskOrderedCursorImpl {
pub fn open(
log_manager: Option<Arc<LogManager>>,
target_db_ids: Vec<DatabaseId>,
opts: DiskOrderedCursorOptions,
) -> Result<Self> {
let queue_size = opts.queue_size.max(1);
let (tx, rx) = mpsc::sync_channel::<DocItem>(queue_size);
let cancel = Arc::new(AtomicBool::new(false));
let budget = Arc::new(MemoryBudget::new(opts.internal_memory_limit));
let handle = match log_manager {
Some(lm) => {
let cancel_p = Arc::clone(&cancel);
let budget_p = Arc::clone(&budget);
let tx_p = tx;
let opts_p = opts;
let target = target_db_ids;
let builder = thread::Builder::new()
.name("noxu-disk-ordered-cursor".to_string());
let h = builder
.spawn(move || {
produce(lm, target, opts_p, tx_p, cancel_p, budget_p)
})
.map_err(|e| {
DbiError::OperationFailed(format!(
"failed to spawn disk-ordered-cursor producer: {e}"
))
})?;
Some(h)
}
None => {
drop(tx);
None
}
};
Ok(Self {
rx,
handle,
cancel,
budget,
drained: false,
terminal_err: None,
})
}
pub fn next_entry(&mut self) -> Result<Option<(Vec<u8>, Vec<u8>)>> {
if let Some(e) = &self.terminal_err {
return Err(clone_dbi_err(e));
}
if self.drained {
return Ok(None);
}
loop {
match self.rx.recv_timeout(Duration::from_millis(100)) {
Ok(Ok((k, d))) => {
let n = k.len() + d.len();
self.budget.release(n);
return Ok(Some((k, d)));
}
Ok(Err(e)) => {
let cloned = clone_dbi_err(&e);
self.terminal_err = Some(e);
return Err(cloned);
}
Err(RecvTimeoutError::Timeout) => {
if self.cancel.load(Ordering::Acquire) {
self.drained = true;
return Ok(None);
}
continue;
}
Err(RecvTimeoutError::Disconnected) => {
self.drained = true;
return Ok(None);
}
}
}
}
pub fn shutdown(&mut self) {
self.cancel.store(true, Ordering::Release);
{
let _g = self.budget.mu.lock();
self.budget.cv.notify_all();
}
while self.rx.try_recv().is_ok() {}
if let Some(h) = self.handle.take() {
if let Err(e) = h.join() {
log::warn!(
target: "noxu-disk-ordered-cursor",
"producer thread panicked during shutdown: {e:?}"
);
}
}
self.drained = true;
}
}
impl Drop for DiskOrderedCursorImpl {
fn drop(&mut self) {
self.shutdown();
}
}
fn clone_dbi_err(e: &DbiError) -> DbiError {
match e {
DbiError::OperationFailed(s) => DbiError::OperationFailed(s.clone()),
DbiError::IoError(io) => DbiError::OperationFailed(format!(
"disk-ordered-cursor producer I/O error: {io}"
)),
other => DbiError::OperationFailed(format!(
"disk-ordered-cursor producer error: {other}"
)),
}
}
fn produce(
log_manager: Arc<LogManager>,
target_db_ids: Vec<DatabaseId>,
opts: DiskOrderedCursorOptions,
tx: SyncSender<DocItem>,
cancel: Arc<AtomicBool>,
budget: Arc<MemoryBudget>,
) {
let target_set: HashSet<u64> =
target_db_ids.iter().map(|d| d.as_i64() as u64).collect();
let fm = Arc::clone(log_manager.file_manager());
let scanner = FileManagerLogScanner::new(fm);
let file_nums = match log_manager.file_manager().list_file_numbers() {
Ok(v) => v,
Err(e) => {
let _ = tx.send(Err(DbiError::OperationFailed(format!(
"list_file_numbers: {e}"
))));
return;
}
};
let mut dedup: Option<HashSet<Vec<u8>>> =
opts.dedup_keys.then(HashSet::new);
let mut counter_since_check = 0usize;
for &file_num in &file_nums {
if cancel.load(Ordering::Acquire) {
return;
}
let start = noxu_util::Lsn::new(file_num, 0);
let end = noxu_util::Lsn::new(file_num.saturating_add(1), 0);
let entries: Vec<PositionedEntry> = scanner.scan_forward(start, end);
for pe in entries {
counter_since_check += 1;
if counter_since_check >= 64
|| counter_since_check >= opts.lsn_batch_size
{
counter_since_check = 0;
if cancel.load(Ordering::Acquire) {
return;
}
}
let LogEntry::Ln(ln) = pe.entry else { continue };
if matches!(ln.operation, LnOperation::Delete) || ln.data.is_none()
{
continue;
}
if !target_set.contains(&ln.db_id) {
continue;
}
let key_bytes: Bytes = ln.key;
let data_bytes: Bytes = ln.data.unwrap_or_default();
let key_vec = key_bytes.to_vec();
let data_vec =
if opts.keys_only { Vec::new() } else { data_bytes.to_vec() };
if let Some(set) = dedup.as_mut()
&& !set.insert(key_vec.clone())
{
continue;
}
let n = key_vec.len() + data_vec.len();
if !budget.reserve(n, &cancel) {
return;
}
if tx.send(Ok((key_vec, data_vec))).is_err() {
budget.release(n);
return;
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn open_with_no_log_manager_yields_empty() {
let mut doc = DiskOrderedCursorImpl::open(
None,
vec![DatabaseId::new(1)],
DiskOrderedCursorOptions::default(),
)
.unwrap();
assert_eq!(doc.next_entry().unwrap(), None);
assert_eq!(doc.next_entry().unwrap(), None);
}
#[test]
fn shutdown_is_idempotent() {
let mut doc = DiskOrderedCursorImpl::open(
None,
vec![DatabaseId::new(1)],
DiskOrderedCursorOptions::default(),
)
.unwrap();
doc.shutdown();
doc.shutdown();
assert_eq!(doc.next_entry().unwrap(), None);
}
#[test]
fn budget_release_balances_reserve() {
let b = MemoryBudget::new(1024);
let cancel = AtomicBool::new(false);
assert!(b.reserve(512, &cancel));
assert_eq!(b.in_use.load(Ordering::Relaxed), 512);
b.release(512);
assert_eq!(b.in_use.load(Ordering::Relaxed), 0);
}
#[test]
fn budget_unbounded_short_circuits() {
let b = MemoryBudget::new(usize::MAX);
let cancel = AtomicBool::new(false);
assert!(b.reserve(1_000_000, &cancel));
b.release(1_000_000);
assert_eq!(b.in_use.load(Ordering::Relaxed), 0);
}
#[test]
fn budget_cancel_unblocks_reserve() {
use std::thread;
let b = Arc::new(MemoryBudget::new(8));
let cancel = Arc::new(AtomicBool::new(false));
let cancel2 = Arc::clone(&cancel);
let b2 = Arc::clone(&b);
assert!(b.reserve(8, &cancel));
let h = thread::spawn(move || {
b2.reserve(8, &cancel2)
});
thread::sleep(Duration::from_millis(20));
cancel.store(true, Ordering::Release);
let _g = b.mu.lock();
b.cv.notify_all();
drop(_g);
let res = h.join().unwrap();
assert!(!res, "reserve should return false when cancel fires");
}
}