1#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
3pub enum CompressionPolicy {
4 None,
6 Snappy,
8 #[default]
10 Zlib,
11}
12
13#[allow(clippy::struct_excessive_bools)]
15#[derive(Debug, Clone)]
16pub struct OpenOptions {
17 pub read_only: bool,
19 pub create_if_missing: bool,
21 pub error_if_exists: bool,
23 pub paranoid_checks: bool,
25 pub compression_policy: CompressionPolicy,
27 pub cache_size: usize,
29 pub write_buffer_size: usize,
31}
32
33impl Default for OpenOptions {
34 fn default() -> Self {
35 Self {
36 read_only: false,
37 create_if_missing: true,
38 error_if_exists: false,
39 paranoid_checks: true,
40 compression_policy: CompressionPolicy::Zlib,
41 cache_size: 64 * 1024 * 1024,
42 write_buffer_size: 4 * 1024 * 1024,
43 }
44 }
45}
46
47use crate::error::{LevelDbError, Result};
48use std::sync::{
49 Arc,
50 atomic::{AtomicBool, Ordering},
51};
52
53#[derive(Debug, Clone)]
55pub struct ReadOptions {
56 pub checksum: ChecksumMode,
58 pub cache_policy: CachePolicy,
60 pub read_strategy: ReadStrategy,
62 pub threading: ThreadingOptions,
64 pub scan_mode: ScanMode,
66 pub pipeline: ScanPipelineOptions,
68 pub cancel: Option<ScanCancelFlag>,
70 pub progress: Option<ScanProgressSink>,
72}
73
74impl Default for ReadOptions {
75 fn default() -> Self {
76 Self {
77 checksum: ChecksumMode::Inherit,
78 cache_policy: CachePolicy::Bypass,
79 read_strategy: ReadStrategy::Shared,
80 threading: ThreadingOptions::Auto,
81 scan_mode: ScanMode::Sequential,
82 pipeline: ScanPipelineOptions::default(),
83 cancel: None,
84 progress: None,
85 }
86 }
87}
88
89#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
94pub struct ScanPipelineOptions {
95 pub queue_depth: usize,
98 pub table_batch_size: usize,
100 pub progress_interval: usize,
102}
103
104#[derive(Debug, Clone, Copy, Default)]
106pub struct WriteOptions {
107 pub sync: bool,
109}
110
111#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
113pub enum ThreadingOptions {
114 #[default]
116 Auto,
117 Fixed(usize),
119 Single,
121}
122
123#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
125pub enum ScanMode {
126 #[default]
128 Sequential,
129 ParallelTables,
131}
132
133pub const MAX_LEVELDB_THREADS: usize = 512;
135
136#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
138pub enum ChecksumMode {
139 #[default]
141 Inherit,
142 Verify,
144 Skip,
146}
147
148#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
150pub enum CachePolicy {
151 Use,
153 #[default]
155 Bypass,
156}
157
158#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
160pub enum ReadStrategy {
161 Borrowed,
164 #[default]
167 Shared,
168 Owned,
171}
172
173#[derive(Debug, Clone, Copy, PartialEq, Eq)]
175pub enum VisitorControl {
176 Continue,
178 Stop,
180}
181
182#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
184pub struct ScanOutcome {
185 pub visited: usize,
187 pub bytes_read: usize,
189 pub stopped: bool,
191 pub tables_scanned: usize,
193 pub worker_threads: usize,
195 pub queue_wait_ms: u128,
197 pub cancel_checks: usize,
199 pub exact_gets: usize,
201 pub exact_get_batches: usize,
203 pub table_index_hits: usize,
205 pub table_index_misses: usize,
207 pub data_block_hits: usize,
209 pub data_block_misses: usize,
211}
212
213impl ScanOutcome {
214 #[must_use]
216 pub const fn empty() -> Self {
217 Self {
218 visited: 0,
219 bytes_read: 0,
220 stopped: false,
221 tables_scanned: 0,
222 worker_threads: 0,
223 queue_wait_ms: 0,
224 cancel_checks: 0,
225 exact_gets: 0,
226 exact_get_batches: 0,
227 table_index_hits: 0,
228 table_index_misses: 0,
229 data_block_hits: 0,
230 data_block_misses: 0,
231 }
232 }
233
234 pub fn record(&mut self, value_len: usize) {
236 self.visited = self.visited.saturating_add(1);
237 self.bytes_read = self.bytes_read.saturating_add(value_len);
238 }
239
240 pub fn merge(&mut self, other: Self) {
242 self.visited = self.visited.saturating_add(other.visited);
243 self.bytes_read = self.bytes_read.saturating_add(other.bytes_read);
244 self.stopped |= other.stopped;
245 self.tables_scanned = self.tables_scanned.saturating_add(other.tables_scanned);
246 self.worker_threads = self.worker_threads.max(other.worker_threads);
247 self.queue_wait_ms = self.queue_wait_ms.saturating_add(other.queue_wait_ms);
248 self.cancel_checks = self.cancel_checks.saturating_add(other.cancel_checks);
249 self.exact_gets = self.exact_gets.saturating_add(other.exact_gets);
250 self.exact_get_batches = self
251 .exact_get_batches
252 .saturating_add(other.exact_get_batches);
253 self.table_index_hits = self.table_index_hits.saturating_add(other.table_index_hits);
254 self.table_index_misses = self
255 .table_index_misses
256 .saturating_add(other.table_index_misses);
257 self.data_block_hits = self.data_block_hits.saturating_add(other.data_block_hits);
258 self.data_block_misses = self
259 .data_block_misses
260 .saturating_add(other.data_block_misses);
261 }
262}
263
264#[derive(Debug, Clone, Default)]
266pub struct ScanCancelFlag(Arc<AtomicBool>);
267
268impl ScanCancelFlag {
269 #[must_use]
271 pub fn new() -> Self {
272 Self::default()
273 }
274
275 pub fn cancel(&self) {
277 self.0.store(true, Ordering::Relaxed);
278 }
279
280 #[must_use]
282 pub fn from_shared(cancelled: Arc<AtomicBool>) -> Self {
283 Self(cancelled)
284 }
285
286 #[must_use]
288 pub fn is_cancelled(&self) -> bool {
289 self.0.load(Ordering::Relaxed)
290 }
291}
292
293impl ScanPipelineOptions {
294 #[must_use]
296 pub fn resolve_queue_depth(self, workers: usize, tables: usize) -> usize {
297 self.queue_depth
298 .max(if self.queue_depth == 0 {
299 workers.max(1).saturating_mul(256).max(tables.max(1))
300 } else {
301 1
302 })
303 .max(1)
304 }
305
306 #[must_use]
308 pub fn resolve_table_batch_size(self, workers: usize, tables: usize) -> usize {
309 self.table_batch_size
310 .max(if self.table_batch_size == 0 {
311 tables.div_ceil(workers.max(1).saturating_mul(2)).max(1)
312 } else {
313 1
314 })
315 .max(1)
316 }
317
318 #[must_use]
320 pub fn resolve_progress_interval(self) -> usize {
321 self.progress_interval
322 .max(if self.progress_interval == 0 { 8192 } else { 1 })
323 }
324}
325
326#[derive(Clone)]
328pub struct ScanProgressSink {
329 inner: Arc<dyn Fn(ScanProgress) + Send + Sync>,
330}
331
332impl std::fmt::Debug for ScanProgressSink {
333 fn fmt(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
334 formatter
335 .debug_struct("ScanProgressSink")
336 .finish_non_exhaustive()
337 }
338}
339
340impl ScanProgressSink {
341 #[must_use]
343 pub fn new(callback: impl Fn(ScanProgress) + Send + Sync + 'static) -> Self {
344 Self {
345 inner: Arc::new(callback),
346 }
347 }
348
349 pub fn emit(&self, progress: ScanProgress) {
351 (self.inner)(progress);
352 }
353}
354
355#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
357pub struct ScanProgress {
358 pub visited: usize,
360 pub bytes_read: usize,
362}
363
364impl ThreadingOptions {
365 #[must_use]
367 pub fn resolve(self, work_items: usize) -> usize {
368 self.resolve_unchecked(work_items)
369 }
370
371 #[must_use]
373 pub fn resolve_unchecked(self, work_items: usize) -> usize {
374 match self {
375 Self::Single => 1,
376 Self::Fixed(threads) => threads.clamp(1, MAX_LEVELDB_THREADS),
377 Self::Auto => std::thread::available_parallelism()
378 .map(usize::from)
379 .unwrap_or(1)
380 .min(work_items.max(1)),
381 }
382 }
383
384 pub fn resolve_checked(self, work_items: usize) -> Result<usize> {
391 match self {
392 Self::Fixed(0) => Err(LevelDbError::invalid_argument(
393 "thread count must be in 1..=512",
394 )),
395 Self::Fixed(threads) if threads > MAX_LEVELDB_THREADS => Err(
396 LevelDbError::invalid_argument("thread count must be in 1..=512"),
397 ),
398 _ => Ok(self.resolve_unchecked(work_items)),
399 }
400 }
401}
402
403#[cfg(test)]
404mod tests {
405 use super::*;
406
407 #[test]
408 fn threading_validates_fixed_range_and_auto_is_not_capped_to_eight() {
409 let expected_auto = std::thread::available_parallelism()
410 .map(usize::from)
411 .unwrap_or(1)
412 .min(10_000);
413 assert_eq!(
414 ThreadingOptions::Auto
415 .resolve_checked(10_000)
416 .expect("auto threads"),
417 expected_auto
418 );
419 assert_eq!(
420 ThreadingOptions::Fixed(MAX_LEVELDB_THREADS)
421 .resolve_checked(10_000)
422 .expect("max fixed threads"),
423 MAX_LEVELDB_THREADS
424 );
425 assert!(ThreadingOptions::Fixed(0).resolve_checked(10).is_err());
426 assert!(
427 ThreadingOptions::Fixed(MAX_LEVELDB_THREADS + 1)
428 .resolve_checked(10)
429 .is_err()
430 );
431 }
432
433 #[test]
434 fn scan_pipeline_options_resolve_automatic_bounds() {
435 let options = ScanPipelineOptions::default();
436
437 assert!(options.resolve_queue_depth(4, 128) >= 1);
438 assert!(options.resolve_table_batch_size(4, 128) >= 1);
439 assert_eq!(options.resolve_progress_interval(), 8192);
440
441 let explicit = ScanPipelineOptions {
442 queue_depth: 7,
443 table_batch_size: 3,
444 progress_interval: 11,
445 };
446 assert_eq!(explicit.resolve_queue_depth(4, 128), 7);
447 assert_eq!(explicit.resolve_table_batch_size(4, 128), 3);
448 assert_eq!(explicit.resolve_progress_interval(), 11);
449 }
450
451 #[test]
452 fn default_reads_bypass_shared_cache_and_use_shared_values() {
453 let options = ReadOptions::default();
454 assert_eq!(options.cache_policy, CachePolicy::Bypass);
455 assert_eq!(options.read_strategy, ReadStrategy::Shared);
456 }
457}