#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
pub enum CompressionPolicy {
None,
Snappy,
#[default]
Zlib,
}
#[allow(clippy::struct_excessive_bools)]
#[derive(Debug, Clone)]
pub struct OpenOptions {
pub read_only: bool,
pub create_if_missing: bool,
pub error_if_exists: bool,
pub paranoid_checks: bool,
pub compression_policy: CompressionPolicy,
pub cache_size: usize,
pub write_buffer_size: usize,
}
impl Default for OpenOptions {
fn default() -> Self {
Self {
read_only: false,
create_if_missing: true,
error_if_exists: false,
paranoid_checks: true,
compression_policy: CompressionPolicy::Zlib,
cache_size: 64 * 1024 * 1024,
write_buffer_size: 4 * 1024 * 1024,
}
}
}
use crate::error::{LevelDbError, Result};
use std::sync::{
Arc,
atomic::{AtomicBool, Ordering},
};
#[derive(Debug, Clone)]
pub struct ReadOptions {
pub checksum: ChecksumMode,
pub cache_policy: CachePolicy,
pub read_strategy: ReadStrategy,
pub threading: ThreadingOptions,
pub scan_mode: ScanMode,
pub pipeline: ScanPipelineOptions,
pub cancel: Option<ScanCancelFlag>,
pub progress: Option<ScanProgressSink>,
}
impl Default for ReadOptions {
fn default() -> Self {
Self {
checksum: ChecksumMode::Inherit,
cache_policy: CachePolicy::Bypass,
read_strategy: ReadStrategy::Shared,
threading: ThreadingOptions::Auto,
scan_mode: ScanMode::Sequential,
pipeline: ScanPipelineOptions::default(),
cancel: None,
progress: None,
}
}
}
#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
pub struct ScanPipelineOptions {
pub queue_depth: usize,
pub table_batch_size: usize,
pub progress_interval: usize,
}
#[derive(Debug, Clone, Copy, Default)]
pub struct WriteOptions {
pub sync: bool,
}
#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
pub enum ThreadingOptions {
#[default]
Auto,
Fixed(usize),
Single,
}
#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
pub enum ScanMode {
#[default]
Sequential,
ParallelTables,
}
pub const MAX_LEVELDB_THREADS: usize = 512;
#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
pub enum ChecksumMode {
#[default]
Inherit,
Verify,
Skip,
}
#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
pub enum CachePolicy {
Use,
#[default]
Bypass,
}
#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
pub enum ReadStrategy {
Borrowed,
#[default]
Shared,
Owned,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum VisitorControl {
Continue,
Stop,
}
#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
pub struct ScanOutcome {
pub visited: usize,
pub bytes_read: usize,
pub stopped: bool,
pub tables_scanned: usize,
pub worker_threads: usize,
pub queue_wait_ms: u128,
pub cancel_checks: usize,
pub exact_gets: usize,
pub exact_get_batches: usize,
pub table_index_hits: usize,
pub table_index_misses: usize,
pub data_block_hits: usize,
pub data_block_misses: usize,
}
impl ScanOutcome {
#[must_use]
pub const fn empty() -> Self {
Self {
visited: 0,
bytes_read: 0,
stopped: false,
tables_scanned: 0,
worker_threads: 0,
queue_wait_ms: 0,
cancel_checks: 0,
exact_gets: 0,
exact_get_batches: 0,
table_index_hits: 0,
table_index_misses: 0,
data_block_hits: 0,
data_block_misses: 0,
}
}
pub fn record(&mut self, value_len: usize) {
self.visited = self.visited.saturating_add(1);
self.bytes_read = self.bytes_read.saturating_add(value_len);
}
pub fn merge(&mut self, other: Self) {
self.visited = self.visited.saturating_add(other.visited);
self.bytes_read = self.bytes_read.saturating_add(other.bytes_read);
self.stopped |= other.stopped;
self.tables_scanned = self.tables_scanned.saturating_add(other.tables_scanned);
self.worker_threads = self.worker_threads.max(other.worker_threads);
self.queue_wait_ms = self.queue_wait_ms.saturating_add(other.queue_wait_ms);
self.cancel_checks = self.cancel_checks.saturating_add(other.cancel_checks);
self.exact_gets = self.exact_gets.saturating_add(other.exact_gets);
self.exact_get_batches = self
.exact_get_batches
.saturating_add(other.exact_get_batches);
self.table_index_hits = self.table_index_hits.saturating_add(other.table_index_hits);
self.table_index_misses = self
.table_index_misses
.saturating_add(other.table_index_misses);
self.data_block_hits = self.data_block_hits.saturating_add(other.data_block_hits);
self.data_block_misses = self
.data_block_misses
.saturating_add(other.data_block_misses);
}
}
#[derive(Debug, Clone, Default)]
pub struct ScanCancelFlag(Arc<AtomicBool>);
impl ScanCancelFlag {
#[must_use]
pub fn new() -> Self {
Self::default()
}
pub fn cancel(&self) {
self.0.store(true, Ordering::Relaxed);
}
#[must_use]
pub fn from_shared(cancelled: Arc<AtomicBool>) -> Self {
Self(cancelled)
}
#[must_use]
pub fn is_cancelled(&self) -> bool {
self.0.load(Ordering::Relaxed)
}
}
impl ScanPipelineOptions {
#[must_use]
pub fn resolve_queue_depth(self, workers: usize, tables: usize) -> usize {
self.queue_depth
.max(if self.queue_depth == 0 {
workers.max(1).saturating_mul(256).max(tables.max(1))
} else {
1
})
.max(1)
}
#[must_use]
pub fn resolve_table_batch_size(self, workers: usize, tables: usize) -> usize {
self.table_batch_size
.max(if self.table_batch_size == 0 {
tables.div_ceil(workers.max(1).saturating_mul(2)).max(1)
} else {
1
})
.max(1)
}
#[must_use]
pub fn resolve_progress_interval(self) -> usize {
self.progress_interval
.max(if self.progress_interval == 0 { 8192 } else { 1 })
}
}
#[derive(Clone)]
pub struct ScanProgressSink {
inner: Arc<dyn Fn(ScanProgress) + Send + Sync>,
}
impl std::fmt::Debug for ScanProgressSink {
fn fmt(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
formatter
.debug_struct("ScanProgressSink")
.finish_non_exhaustive()
}
}
impl ScanProgressSink {
#[must_use]
pub fn new(callback: impl Fn(ScanProgress) + Send + Sync + 'static) -> Self {
Self {
inner: Arc::new(callback),
}
}
pub fn emit(&self, progress: ScanProgress) {
(self.inner)(progress);
}
}
#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
pub struct ScanProgress {
pub visited: usize,
pub bytes_read: usize,
}
impl ThreadingOptions {
#[must_use]
pub fn resolve(self, work_items: usize) -> usize {
self.resolve_unchecked(work_items)
}
#[must_use]
pub fn resolve_unchecked(self, work_items: usize) -> usize {
match self {
Self::Single => 1,
Self::Fixed(threads) => threads.clamp(1, MAX_LEVELDB_THREADS),
Self::Auto => std::thread::available_parallelism()
.map(usize::from)
.unwrap_or(1)
.min(work_items.max(1)),
}
}
pub fn resolve_checked(self, work_items: usize) -> Result<usize> {
match self {
Self::Fixed(0) => Err(LevelDbError::invalid_argument(
"thread count must be in 1..=512",
)),
Self::Fixed(threads) if threads > MAX_LEVELDB_THREADS => Err(
LevelDbError::invalid_argument("thread count must be in 1..=512"),
),
_ => Ok(self.resolve_unchecked(work_items)),
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn threading_validates_fixed_range_and_auto_is_not_capped_to_eight() {
let expected_auto = std::thread::available_parallelism()
.map(usize::from)
.unwrap_or(1)
.min(10_000);
assert_eq!(
ThreadingOptions::Auto
.resolve_checked(10_000)
.expect("auto threads"),
expected_auto
);
assert_eq!(
ThreadingOptions::Fixed(MAX_LEVELDB_THREADS)
.resolve_checked(10_000)
.expect("max fixed threads"),
MAX_LEVELDB_THREADS
);
assert!(ThreadingOptions::Fixed(0).resolve_checked(10).is_err());
assert!(
ThreadingOptions::Fixed(MAX_LEVELDB_THREADS + 1)
.resolve_checked(10)
.is_err()
);
}
#[test]
fn scan_pipeline_options_resolve_automatic_bounds() {
let options = ScanPipelineOptions::default();
assert!(options.resolve_queue_depth(4, 128) >= 1);
assert!(options.resolve_table_batch_size(4, 128) >= 1);
assert_eq!(options.resolve_progress_interval(), 8192);
let explicit = ScanPipelineOptions {
queue_depth: 7,
table_batch_size: 3,
progress_interval: 11,
};
assert_eq!(explicit.resolve_queue_depth(4, 128), 7);
assert_eq!(explicit.resolve_table_batch_size(4, 128), 3);
assert_eq!(explicit.resolve_progress_interval(), 11);
}
#[test]
fn default_reads_bypass_shared_cache_and_use_shared_values() {
let options = ReadOptions::default();
assert_eq!(options.cache_policy, CachePolicy::Bypass);
assert_eq!(options.read_strategy, ReadStrategy::Shared);
}
}