use std::marker::PhantomData;
use noxu_dbi::DiskOrderedCursorImpl;
use crate::database::Database;
use crate::database_entry::DatabaseEntry;
use crate::error::{NoxuError, Result};
use crate::operation_status::OperationStatus;
#[derive(Debug, Clone)]
pub struct DiskOrderedCursorConfig {
pub queue_size: usize,
pub lsn_batch_size: usize,
pub internal_memory_limit: usize,
pub keys_only: bool,
pub bins_only: bool,
pub count_only: bool,
pub dedup_keys: bool,
}
impl Default for DiskOrderedCursorConfig {
fn default() -> Self {
Self {
queue_size: 1000,
lsn_batch_size: usize::MAX,
internal_memory_limit: usize::MAX,
keys_only: false,
bins_only: false,
count_only: false,
dedup_keys: false,
}
}
}
impl DiskOrderedCursorConfig {
pub fn new() -> Self {
Self::default()
}
pub fn with_queue_size(mut self, queue_size: usize) -> Self {
self.queue_size = queue_size.max(1);
self
}
pub fn with_lsn_batch_size(mut self, lsn_batch_size: usize) -> Self {
self.lsn_batch_size = lsn_batch_size.max(1);
self
}
pub fn with_internal_memory_limit(
mut self,
internal_memory_limit: usize,
) -> Self {
self.internal_memory_limit = internal_memory_limit.max(1);
self
}
pub fn with_keys_only(mut self, keys_only: bool) -> Self {
self.keys_only = keys_only;
self
}
pub fn with_bins_only(mut self, bins_only: bool) -> Self {
self.bins_only = bins_only;
self
}
pub fn with_count_only(mut self, count_only: bool) -> Self {
self.count_only = count_only;
self
}
pub fn with_dedup_keys(mut self, dedup_keys: bool) -> Self {
self.dedup_keys = dedup_keys;
self
}
}
pub struct DiskOrderedCursor<'env> {
inner: DiskOrderedCursorImpl,
last: Option<(Vec<u8>, Vec<u8>)>,
closed: bool,
_marker: PhantomData<&'env ()>,
}
impl<'env> DiskOrderedCursor<'env> {
pub(crate) fn from_impl(inner: DiskOrderedCursorImpl) -> Self {
Self { inner, last: None, closed: false, _marker: PhantomData }
}
pub fn next(
&mut self,
key: &mut DatabaseEntry,
data: &mut DatabaseEntry,
) -> Result<OperationStatus> {
if self.closed {
return Err(NoxuError::CursorClosed);
}
match self.inner.next_entry()? {
Some((k, d)) => {
key.set_data(&k);
data.set_data(&d);
self.last = Some((k, d));
Ok(OperationStatus::Success)
}
None => Ok(OperationStatus::NotFound),
}
}
pub fn current(
&self,
key: &mut DatabaseEntry,
data: &mut DatabaseEntry,
) -> Result<OperationStatus> {
if self.closed {
return Err(NoxuError::CursorClosed);
}
match &self.last {
Some((k, d)) => {
key.set_data(k);
data.set_data(d);
Ok(OperationStatus::Success)
}
None => Ok(OperationStatus::NotFound),
}
}
pub fn close(mut self) -> Result<()> {
self.close_in_place()
}
fn close_in_place(&mut self) -> Result<()> {
if self.closed {
return Ok(());
}
self.closed = true;
self.inner.shutdown();
Ok(())
}
}
impl Drop for DiskOrderedCursor<'_> {
fn drop(&mut self) {
let _ = self.close_in_place();
}
}
impl Database {
pub fn open_disk_ordered_cursor(
&self,
config: DiskOrderedCursorConfig,
) -> Result<DiskOrderedCursor<'_>> {
let dbs: [&Database; 1] = [self];
let inner = DiskOrderedCursorImpl::open(
self.cached_log_manager().cloned(),
vec![self.database_id_for_doc()],
noxu_dbi::DiskOrderedCursorOptions {
queue_size: config.queue_size,
lsn_batch_size: config.lsn_batch_size,
internal_memory_limit: config.internal_memory_limit,
keys_only: config.keys_only
|| config.bins_only
|| config.count_only,
dedup_keys: config.dedup_keys,
},
)?;
self.check_open_for_doc()?;
let _ = dbs;
Ok(DiskOrderedCursor::from_impl(inner))
}
}
pub fn open_disk_ordered_cursor_multi<'env>(
databases: &'env [&'env Database],
config: DiskOrderedCursorConfig,
) -> Result<DiskOrderedCursor<'env>> {
if databases.is_empty() {
return Err(NoxuError::IllegalArgument(
"open_disk_ordered_cursor: at least one database is required"
.into(),
));
}
let log_manager = databases[0].cached_log_manager().cloned();
for db in &databases[1..] {
let other = db.cached_log_manager().cloned();
match (&log_manager, &other) {
(Some(a), Some(b)) if !std::sync::Arc::ptr_eq(a, b) => {
return Err(NoxuError::IllegalArgument(
"open_disk_ordered_cursor: all databases must share \
the same environment"
.into(),
));
}
(Some(_), None) | (None, Some(_)) => {
return Err(NoxuError::IllegalArgument(
"open_disk_ordered_cursor: all databases must share \
the same environment"
.into(),
));
}
_ => {}
}
}
let mut db_ids = Vec::with_capacity(databases.len());
for db in databases {
db.check_open_for_doc()?;
db_ids.push(db.database_id_for_doc());
}
let inner = DiskOrderedCursorImpl::open(
log_manager,
db_ids,
noxu_dbi::DiskOrderedCursorOptions {
queue_size: config.queue_size,
lsn_batch_size: config.lsn_batch_size,
internal_memory_limit: config.internal_memory_limit,
keys_only: config.keys_only
|| config.bins_only
|| config.count_only,
dedup_keys: config.dedup_keys,
},
)?;
Ok(DiskOrderedCursor::from_impl(inner))
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn config_defaults_match_je_shape() {
let c = DiskOrderedCursorConfig::default();
assert_eq!(c.queue_size, 1000);
assert_eq!(c.lsn_batch_size, usize::MAX);
assert_eq!(c.internal_memory_limit, usize::MAX);
assert!(!c.keys_only);
assert!(!c.bins_only);
assert!(!c.count_only);
assert!(!c.dedup_keys);
}
#[test]
fn config_builders_clamp_zero_to_one() {
let c = DiskOrderedCursorConfig::new()
.with_queue_size(0)
.with_lsn_batch_size(0)
.with_internal_memory_limit(0);
assert_eq!(c.queue_size, 1);
assert_eq!(c.lsn_batch_size, 1);
assert_eq!(c.internal_memory_limit, 1);
}
#[test]
fn config_builders_chain() {
let c = DiskOrderedCursorConfig::new()
.with_queue_size(8)
.with_keys_only(true)
.with_dedup_keys(true);
assert_eq!(c.queue_size, 8);
assert!(c.keys_only);
assert!(c.dedup_keys);
}
}