Skip to main content

bedrock_leveldb/
options.rs

1/// Compression used when this crate writes native `LevelDB` table blocks.
2#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
3pub enum CompressionPolicy {
4    /// Store native table blocks uncompressed.
5    None,
6    /// Compress native table blocks with Snappy.
7    Snappy,
8    /// Compress native table blocks with zlib.
9    #[default]
10    Zlib,
11}
12
13/// Options used when opening a database directory.
14#[allow(clippy::struct_excessive_bools)]
15#[derive(Debug, Clone)]
16pub struct OpenOptions {
17    /// Open without performing writes, initialization, repair, or flushes.
18    pub read_only: bool,
19    /// Create the database directory and initial native manifest when missing.
20    pub create_if_missing: bool,
21    /// Fail if the target directory already contains files.
22    pub error_if_exists: bool,
23    /// Verify checksums while replaying logs and reading table blocks by default.
24    pub paranoid_checks: bool,
25    /// Compression used for native tables written by this crate.
26    pub compression_policy: CompressionPolicy,
27    /// Maximum decoded native table block cache size, in bytes.
28    pub cache_size: usize,
29    /// Approximate overlay size that triggers a flush to a native table.
30    pub write_buffer_size: usize,
31}
32
33impl Default for OpenOptions {
34    fn default() -> Self {
35        Self {
36            read_only: false,
37            create_if_missing: true,
38            error_if_exists: false,
39            paranoid_checks: true,
40            compression_policy: CompressionPolicy::Zlib,
41            cache_size: 64 * 1024 * 1024,
42            write_buffer_size: 4 * 1024 * 1024,
43        }
44    }
45}
46
47use crate::error::{LevelDbError, Result};
48use std::sync::{
49    Arc,
50    atomic::{AtomicBool, Ordering},
51};
52
53/// Per-read behavior for point lookups and scans.
54#[derive(Debug, Clone)]
55pub struct ReadOptions {
56    /// Checksum behavior for this read.
57    pub checksum: ChecksumMode,
58    /// Whether the native decoded block cache may be used.
59    pub cache_policy: CachePolicy,
60    /// Preferred ownership model for values returned by read APIs.
61    pub read_strategy: ReadStrategy,
62    /// Worker selection for parallel table scans.
63    pub threading: ThreadingOptions,
64    /// Sequential or table-parallel scan execution.
65    pub scan_mode: ScanMode,
66    /// Bounded scan pipeline behavior for parallel scans and progress cadence.
67    pub pipeline: ScanPipelineOptions,
68    /// Optional cooperative cancellation flag checked during scans.
69    pub cancel: Option<ScanCancelFlag>,
70    /// Optional progress callback emitted during scans.
71    pub progress: Option<ScanProgressSink>,
72}
73
74impl Default for ReadOptions {
75    fn default() -> Self {
76        Self {
77            checksum: ChecksumMode::Inherit,
78            cache_policy: CachePolicy::Bypass,
79            read_strategy: ReadStrategy::Shared,
80            threading: ThreadingOptions::Auto,
81            scan_mode: ScanMode::Sequential,
82            pipeline: ScanPipelineOptions::default(),
83            cancel: None,
84            progress: None,
85        }
86    }
87}
88
89/// Bounded pipeline policy used by table scans.
90///
91/// A zero value chooses an automatic default based on worker count and table
92/// count.
93#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
94pub struct ScanPipelineOptions {
95    /// Maximum number of queued scan messages before worker threads apply
96    /// backpressure.
97    pub queue_depth: usize,
98    /// Number of table files assigned to one Rayon work item.
99    pub table_batch_size: usize,
100    /// Emit progress after this many visited records.
101    pub progress_interval: usize,
102}
103
104/// Options used when writing to the overlay and WAL.
105#[derive(Debug, Clone, Copy, Default)]
106pub struct WriteOptions {
107    /// Call `File::sync_data` after appending the write batch to the log.
108    pub sync: bool,
109}
110
111/// How many worker threads a table-parallel scan may use.
112#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
113pub enum ThreadingOptions {
114    /// Use available parallelism, capped by the number of table files.
115    #[default]
116    Auto,
117    /// Use an explicit worker count in `1..=512`.
118    Fixed(usize),
119    /// Force one worker.
120    Single,
121}
122
123/// Scan execution mode.
124#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
125pub enum ScanMode {
126    /// Visit table files on the calling thread in manifest order.
127    #[default]
128    Sequential,
129    /// Partition table files across bounded workers.
130    ParallelTables,
131}
132
133/// Upper bound for explicit scan worker counts.
134pub const MAX_LEVELDB_THREADS: usize = 512;
135
136/// Checksum behavior for a read operation.
137#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
138pub enum ChecksumMode {
139    /// Follow `OpenOptions::paranoid_checks`.
140    #[default]
141    Inherit,
142    /// Verify checksums for this read.
143    Verify,
144    /// Skip checksum verification for this read.
145    Skip,
146}
147
148/// Cache behavior for a read operation.
149#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
150pub enum CachePolicy {
151    /// Use the database's native block cache.
152    Use,
153    /// Bypass the native block cache for this read.
154    #[default]
155    Bypass,
156}
157
158/// Value ownership strategy for point reads and visitor callbacks.
159#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
160pub enum ReadStrategy {
161    /// Prefer borrowed views when the backend can tie the value lifetime to an
162    /// in-memory table/block buffer.
163    Borrowed,
164    /// Prefer shared reference-counted buffers. This is the default because it
165    /// works uniformly for compressed blocks and overlay values.
166    #[default]
167    Shared,
168    /// Force APIs that support borrowed/shared values to materialize owned
169    /// buffers before returning them.
170    Owned,
171}
172
173/// Visitor result used by scan callbacks.
174#[derive(Debug, Clone, Copy, PartialEq, Eq)]
175pub enum VisitorControl {
176    /// Continue the scan.
177    Continue,
178    /// Stop the scan without treating it as an error.
179    Stop,
180}
181
182/// Aggregate information returned after a scan.
183#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
184pub struct ScanOutcome {
185    /// Number of visible records visited.
186    pub visited: usize,
187    /// Sum of visited value lengths in bytes.
188    pub bytes_read: usize,
189    /// Whether the visitor stopped the scan.
190    pub stopped: bool,
191    /// Number of table files that were opened and scanned.
192    pub tables_scanned: usize,
193    /// Number of worker threads used by the scan.
194    pub worker_threads: usize,
195    /// Milliseconds workers spent waiting for bounded scan queues.
196    pub queue_wait_ms: u128,
197    /// Number of cooperative cancellation checks performed by the scan.
198    pub cancel_checks: usize,
199    /// Number of exact point lookups performed by batch read APIs.
200    pub exact_gets: usize,
201    /// Number of exact point lookup batches performed.
202    pub exact_get_batches: usize,
203    /// Number of table-index cache hits during exact point lookups.
204    pub table_index_hits: usize,
205    /// Number of table-index cache misses during exact point lookups.
206    pub table_index_misses: usize,
207    /// Number of data-block cache hits during exact point lookups.
208    pub data_block_hits: usize,
209    /// Number of data-block cache misses during exact point lookups.
210    pub data_block_misses: usize,
211}
212
213impl ScanOutcome {
214    /// Creates an empty scan outcome.
215    #[must_use]
216    pub const fn empty() -> Self {
217        Self {
218            visited: 0,
219            bytes_read: 0,
220            stopped: false,
221            tables_scanned: 0,
222            worker_threads: 0,
223            queue_wait_ms: 0,
224            cancel_checks: 0,
225            exact_gets: 0,
226            exact_get_batches: 0,
227            table_index_hits: 0,
228            table_index_misses: 0,
229            data_block_hits: 0,
230            data_block_misses: 0,
231        }
232    }
233
234    /// Adds one visited record with a value length in bytes.
235    pub fn record(&mut self, value_len: usize) {
236        self.visited = self.visited.saturating_add(1);
237        self.bytes_read = self.bytes_read.saturating_add(value_len);
238    }
239
240    /// Merges another scan outcome into this one.
241    pub fn merge(&mut self, other: Self) {
242        self.visited = self.visited.saturating_add(other.visited);
243        self.bytes_read = self.bytes_read.saturating_add(other.bytes_read);
244        self.stopped |= other.stopped;
245        self.tables_scanned = self.tables_scanned.saturating_add(other.tables_scanned);
246        self.worker_threads = self.worker_threads.max(other.worker_threads);
247        self.queue_wait_ms = self.queue_wait_ms.saturating_add(other.queue_wait_ms);
248        self.cancel_checks = self.cancel_checks.saturating_add(other.cancel_checks);
249        self.exact_gets = self.exact_gets.saturating_add(other.exact_gets);
250        self.exact_get_batches = self
251            .exact_get_batches
252            .saturating_add(other.exact_get_batches);
253        self.table_index_hits = self.table_index_hits.saturating_add(other.table_index_hits);
254        self.table_index_misses = self
255            .table_index_misses
256            .saturating_add(other.table_index_misses);
257        self.data_block_hits = self.data_block_hits.saturating_add(other.data_block_hits);
258        self.data_block_misses = self
259            .data_block_misses
260            .saturating_add(other.data_block_misses);
261    }
262}
263
264/// Shared cooperative cancellation flag for long scans.
265#[derive(Debug, Clone, Default)]
266pub struct ScanCancelFlag(Arc<AtomicBool>);
267
268impl ScanCancelFlag {
269    /// Creates a new non-cancelled flag.
270    #[must_use]
271    pub fn new() -> Self {
272        Self::default()
273    }
274
275    /// Marks the flag as cancelled.
276    pub fn cancel(&self) {
277        self.0.store(true, Ordering::Relaxed);
278    }
279
280    /// Wraps a shared atomic flag supplied by the caller.
281    #[must_use]
282    pub fn from_shared(cancelled: Arc<AtomicBool>) -> Self {
283        Self(cancelled)
284    }
285
286    /// Returns whether the flag has been cancelled.
287    #[must_use]
288    pub fn is_cancelled(&self) -> bool {
289        self.0.load(Ordering::Relaxed)
290    }
291}
292
293impl ScanPipelineOptions {
294    /// Resolves the bounded queue depth for a scan.
295    #[must_use]
296    pub fn resolve_queue_depth(self, workers: usize, tables: usize) -> usize {
297        self.queue_depth
298            .max(if self.queue_depth == 0 {
299                workers.max(1).saturating_mul(256).max(tables.max(1))
300            } else {
301                1
302            })
303            .max(1)
304    }
305
306    /// Resolves the table batch size for one Rayon task.
307    #[must_use]
308    pub fn resolve_table_batch_size(self, workers: usize, tables: usize) -> usize {
309        self.table_batch_size
310            .max(if self.table_batch_size == 0 {
311                tables.div_ceil(workers.max(1).saturating_mul(2)).max(1)
312            } else {
313                1
314            })
315            .max(1)
316    }
317
318    /// Resolves the progress emission interval.
319    #[must_use]
320    pub fn resolve_progress_interval(self) -> usize {
321        self.progress_interval
322            .max(if self.progress_interval == 0 { 8192 } else { 1 })
323    }
324}
325
326/// Callback sink for scan progress.
327#[derive(Clone)]
328pub struct ScanProgressSink {
329    inner: Arc<dyn Fn(ScanProgress) + Send + Sync>,
330}
331
332impl std::fmt::Debug for ScanProgressSink {
333    fn fmt(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
334        formatter
335            .debug_struct("ScanProgressSink")
336            .finish_non_exhaustive()
337    }
338}
339
340impl ScanProgressSink {
341    /// Creates a progress sink from a callback.
342    #[must_use]
343    pub fn new(callback: impl Fn(ScanProgress) + Send + Sync + 'static) -> Self {
344        Self {
345            inner: Arc::new(callback),
346        }
347    }
348
349    /// Emits one progress sample.
350    pub fn emit(&self, progress: ScanProgress) {
351        (self.inner)(progress);
352    }
353}
354
355/// Progress sample emitted during scans.
356#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
357pub struct ScanProgress {
358    /// Number of visible records visited so far.
359    pub visited: usize,
360    /// Sum of visited value lengths in bytes so far.
361    pub bytes_read: usize,
362}
363
364impl ThreadingOptions {
365    /// Resolves this setting to a concrete worker count.
366    #[must_use]
367    pub fn resolve(self, work_items: usize) -> usize {
368        self.resolve_unchecked(work_items)
369    }
370
371    /// Resolves this setting without returning validation errors.
372    #[must_use]
373    pub fn resolve_unchecked(self, work_items: usize) -> usize {
374        match self {
375            Self::Single => 1,
376            Self::Fixed(threads) => threads.clamp(1, MAX_LEVELDB_THREADS),
377            Self::Auto => std::thread::available_parallelism()
378                .map(usize::from)
379                .unwrap_or(1)
380                .min(work_items.max(1)),
381        }
382    }
383
384    /// Resolves this setting and rejects invalid fixed worker counts.
385    ///
386    /// # Errors
387    ///
388    /// Returns [`LevelDbError::InvalidArgument`] when `Fixed(0)` or a value
389    /// above 512 is requested.
390    pub fn resolve_checked(self, work_items: usize) -> Result<usize> {
391        match self {
392            Self::Fixed(0) => Err(LevelDbError::invalid_argument(
393                "thread count must be in 1..=512",
394            )),
395            Self::Fixed(threads) if threads > MAX_LEVELDB_THREADS => Err(
396                LevelDbError::invalid_argument("thread count must be in 1..=512"),
397            ),
398            _ => Ok(self.resolve_unchecked(work_items)),
399        }
400    }
401}
402
403#[cfg(test)]
404mod tests {
405    use super::*;
406
407    #[test]
408    fn threading_validates_fixed_range_and_auto_is_not_capped_to_eight() {
409        let expected_auto = std::thread::available_parallelism()
410            .map(usize::from)
411            .unwrap_or(1)
412            .min(10_000);
413        assert_eq!(
414            ThreadingOptions::Auto
415                .resolve_checked(10_000)
416                .expect("auto threads"),
417            expected_auto
418        );
419        assert_eq!(
420            ThreadingOptions::Fixed(MAX_LEVELDB_THREADS)
421                .resolve_checked(10_000)
422                .expect("max fixed threads"),
423            MAX_LEVELDB_THREADS
424        );
425        assert!(ThreadingOptions::Fixed(0).resolve_checked(10).is_err());
426        assert!(
427            ThreadingOptions::Fixed(MAX_LEVELDB_THREADS + 1)
428                .resolve_checked(10)
429                .is_err()
430        );
431    }
432
433    #[test]
434    fn scan_pipeline_options_resolve_automatic_bounds() {
435        let options = ScanPipelineOptions::default();
436
437        assert!(options.resolve_queue_depth(4, 128) >= 1);
438        assert!(options.resolve_table_batch_size(4, 128) >= 1);
439        assert_eq!(options.resolve_progress_interval(), 8192);
440
441        let explicit = ScanPipelineOptions {
442            queue_depth: 7,
443            table_batch_size: 3,
444            progress_interval: 11,
445        };
446        assert_eq!(explicit.resolve_queue_depth(4, 128), 7);
447        assert_eq!(explicit.resolve_table_batch_size(4, 128), 3);
448        assert_eq!(explicit.resolve_progress_interval(), 11);
449    }
450
451    #[test]
452    fn default_reads_bypass_shared_cache_and_use_shared_values() {
453        let options = ReadOptions::default();
454        assert_eq!(options.cache_policy, CachePolicy::Bypass);
455        assert_eq!(options.read_strategy, ReadStrategy::Shared);
456    }
457}