cesiumdb 0.2.2

Blazing fast, persistent key-value store for Rust
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
// Copyright (c) Sienna Meridian Satterwhite
// SPDX-License-Identifier: GPL-3.0-only WITH Classpath-exception-2.0

//! Internal database implementation details.
//!
//! [`DbInner`] holds the actual state and implements the core get, scan,
//! batch, and sync logic behind the public [`Db`] API.

use std::{
    sync::{
        Arc,
        atomic::{
            AtomicU64,
            Ordering,
        },
    },
    thread,
    time::Duration,
};

use bytes::Bytes;
use parking_lot::{
    Mutex,
    RwLock,
};

use crate::{
    Batch::{
        Delete,
        DeleteNs,
        Put,
        PutNs,
    },
    CesiumError,
    CesiumError::MemtableError,
    DbScanIterator,
    OwnedSegmentIterator,
    ReadAmpStats,
    VersionStats,
    db_options::Batch,
    errs::MemtableError as MtError,
    keypair::{
        DEFAULT_NS,
        KeyBytes,
        ValueBytes,
    },
    memtable::Memtable,
    merge,
    state::DbStorageState,
    utils::Serializer,
    version::VersionManager,
};

/// Internal database state and operations.
#[repr(C)]
pub(crate) struct DbInner {
    pub(crate) state: Mutex<DbStorageState>,
    /// Cached current memtable to avoid state lock on hot write/get paths.
    /// Updated whenever `new_memtable()` is called under state lock.
    pub(crate) curr_memtable: RwLock<Arc<Memtable>>,
    /// Version manager for checking L0 size without state lock
    pub(crate) version_manager: Arc<VersionManager>,
    /// Warm thread pool for parallel LSM reads across levels
    pub(crate) read_pool: rayon::ThreadPool,
    /// Cumulative read amplification counters
    pub(crate) total_gets: AtomicU64,
    pub(crate) l0_reads: AtomicU64,
    pub(crate) ln_reads: AtomicU64,
}

impl DbInner {
    pub(crate) fn get(&self, key: KeyBytes) -> Result<Option<ValueBytes>, CesiumError> {
        // Track that we did a get (for read amplification instrumentation)
        self.total_gets.fetch_add(1, Ordering::Relaxed);

        // 1. Check current memtable (hottest data) without state lock
        {
            let mtable = self.curr_memtable.read().clone();
            if let Some(val) = mtable.get(&key) {
                // Return None for tombstones
                if val.is_tombstone() {
                    return Ok(None);
                }
                return Ok(Some(val));
            }
        }

        // 2. Check frozen memtables (newest to oldest)
        {
            let guard = self.state.lock();
            if let Some(val) = guard.get_from_frozen(&key) {
                if val.is_tombstone() {
                    return Ok(None);
                }
                return Ok(Some(val));
            }
        }

        // 3. Check L0-L7 via VersionManager
        {
            let guard = self.state.lock();
            let version = guard.version_manager.current();
            let _key_bytes = key.serialize();

            // Check L0 (newest to oldest - reverse chronological)
            // L0 must be checked sequentially because newer segments override older ones
            // We need to search by key prefix (ns + key) to find any version
            // Since timestamps are stored as (u128::MAX - ts), newest=0, oldest=u128::MAX
            let key_prefix_lower = {
                use bytes::{
                    BufMut,
                    BytesMut,
                };
                let mut bytes = BytesMut::with_capacity(8 + key.as_bytes().len() + 16);
                bytes.put_u64_le(key.ns());
                bytes.put_slice(key.as_bytes().as_ref());
                bytes.put_u128_le(0); // newest possible (u128::MAX - u128::MAX = 0)
                bytes.freeze()
            };
            let key_prefix_upper = {
                use bytes::{
                    BufMut,
                    BytesMut,
                };
                let mut bytes = BytesMut::with_capacity(8 + key.as_bytes().len() + 16);
                bytes.put_u64_le(key.ns());
                bytes.put_slice(key.as_bytes().as_ref());
                bytes.put_u128_le(u128::MAX); // oldest possible (u128::MAX - 0 = u128::MAX)
                bytes.freeze()
            };

            // Prepare key without timestamp for bloom filter checks
            let key_for_bloom = {
                use bytes::{
                    BufMut,
                    BytesMut,
                };
                let mut bytes = BytesMut::with_capacity(8 + key.as_bytes().len());
                bytes.put_u64_le(key.ns());
                bytes.put_slice(key.as_bytes().as_ref());
                bytes.freeze()
            };

            // Check L0 segments in reverse chronological order (newest first)
            for segment in version.l0.iter().rev() {
                let reader = match segment.reader() {
                    | Ok(r) => r,
                    | Err(e) => return Err(CesiumError::SegmentError(e)),
                };

                // Fast bloom filter check - skip L0 segments that definitely don't have
                // this key. L0 segments DO have bloom filters (built during flush).
                if !reader.may_contain(&key_for_bloom) {
                    continue;
                }

                self.l0_reads.fetch_add(1, Ordering::Relaxed);

                // Scan for keys matching this prefix (any timestamp)
                use std::ops::Bound;
                let mut scan_iter = reader.scan(
                    Bound::Included(key_prefix_lower.as_ref()),
                    Bound::Included(key_prefix_upper.as_ref()),
                );

                // Take the first match (newest version due to timestamp ordering)
                // scan_iter returns (KeyBytes, ValueBytes) already deserialized
                if let Some(Ok((_, val))) = scan_iter.next() {
                    if val.is_tombstone() {
                        return Ok(None);
                    }
                    return Ok(Some(val));
                }
            }

            // Check L1-L7 sequentially from newest level to oldest
            // Parallel search across levels is unsafe because deeper levels may
            // return stale data before newer levels are checked.
            for level in &version.levels {
                for segment in &level.segments {
                    let reader = match segment.reader() {
                        | Ok(r) => r,
                        | Err(e) => return Err(CesiumError::SegmentError(e)),
                    };

                    // Fast bloom filter check - skip segments that definitely don't have
                    // this key
                    if !reader.may_contain(&key_for_bloom) {
                        continue;
                    }

                    self.ln_reads.fetch_add(1, Ordering::Relaxed);

                    use std::ops::Bound;
                    let mut scan_iter = reader.scan(
                        Bound::Included(key_prefix_lower.as_ref()),
                        Bound::Included(key_prefix_upper.as_ref()),
                    );
                    if let Some(Ok((_, val))) = scan_iter.next() {
                        if val.is_tombstone() {
                            return Ok(None);
                        }
                        return Ok(Some(val));
                    }
                }
            }
        }

        // 4. Not found anywhere
        Ok(None)
    }

    pub(crate) fn scan(
        &self,
        ns: u64,
        lower: std::ops::Bound<&[u8]>,
        upper: std::ops::Bound<&[u8]>,
    ) -> Result<DbScanIterator, CesiumError> {
        use std::ops::Bound;

        // Convert bounds to KeyBytes format (with namespace and timestamp)
        // For namespace isolation, we need to ensure we only scan within the given
        // namespace
        //
        // IMPORTANT: KeyBytes serializes timestamps as `u128::MAX - ts`, so:
        // - ts=0 (newest) serializes to MAX (sorts LAST in byte order)
        // - ts=MAX (oldest) serializes to 0 (sorts FIRST in byte order)
        // Therefore, to scan forward seeing newest versions first, we need ts=MAX in
        // lower bound.
        let lower_key = match lower {
            | Bound::Included(k) => {
                // Start with oldest version (ts=MAX serializes to 0, sorts first)
                Bound::Included(KeyBytes::new(ns, Bytes::copy_from_slice(k), u128::MAX))
            },
            | Bound::Excluded(k) => {
                // Exclude oldest version
                Bound::Excluded(KeyBytes::new(ns, Bytes::copy_from_slice(k), u128::MAX))
            },
            | Bound::Unbounded => {
                // Start from the beginning of this namespace
                Bound::Included(KeyBytes::new(ns, Bytes::new(), u128::MAX))
            },
        };

        let upper_key = match upper {
            | Bound::Included(k) => {
                // Include newest version (ts=0 serializes to MAX, sorts last)
                Bound::Included(KeyBytes::new(ns, Bytes::copy_from_slice(k), 0))
            },
            | Bound::Excluded(k) => {
                // Exclude all versions (ts=MAX serializes to 0, sorts first, so excluded bound
                // excludes all)
                Bound::Excluded(KeyBytes::new(ns, Bytes::copy_from_slice(k), u128::MAX))
            },
            | Bound::Unbounded => {
                // End at the last possible key in this namespace
                // Use next namespace's first key as excluded upper bound
                Bound::Excluded(KeyBytes::new(ns + 1, Bytes::new(), u128::MAX))
            },
        };

        let mut iters: Vec<Box<dyn Iterator<Item = (KeyBytes, ValueBytes)> + Send>> = Vec::new();

        // 1. Add current memtable iterator (without state lock)
        {
            let mtable = self.curr_memtable.read().clone();
            let memtable_iter = mtable.scan(lower_key.clone(), upper_key.clone());
            iters
                .push(Box::new(memtable_iter)
                    as Box<dyn Iterator<Item = (KeyBytes, ValueBytes)> + Send>);
        }

        // 2. Add frozen memtables and segment iterators under a single state lock
        {
            let guard = self.state.lock();
            let frozen = guard.frozen_memtables_for_scan();
            for memtable in frozen.iter().rev() {
                let iter = memtable.scan(lower_key.clone(), upper_key.clone());
                iters.push(Box::new(iter) as Box<dyn Iterator<Item = (KeyBytes, ValueBytes)> + Send>);
            }

            let version = guard.version_manager.current();

            // Add L0 segments (can overlap, so all must be scanned)
            for segment in &version.l0 {
                let reader = match segment.reader() {
                    | Ok(r) => r,
                    | Err(e) => return Err(CesiumError::SegmentError(e)),
                };
                let owned_iter =
                    OwnedSegmentIterator::new(reader, lower_key.clone(), upper_key.clone());
                iters
                    .push(Box::new(owned_iter)
                        as Box<dyn Iterator<Item = (KeyBytes, ValueBytes)> + Send>);
            }

            // Add segments from L1-L7
            for level in &version.levels {
                for segment in &level.segments {
                    let reader = match segment.reader() {
                        | Ok(r) => r,
                        | Err(e) => return Err(CesiumError::SegmentError(e)),
                    };
                    let owned_iter =
                        OwnedSegmentIterator::new(reader, lower_key.clone(), upper_key.clone());
                    iters.push(Box::new(owned_iter)
                        as Box<dyn Iterator<Item = (KeyBytes, ValueBytes)> + Send>);
                }
            }
        }

        // Create merge iterator
        let merge_iter = merge::MergeIterator::new(iters);

        Ok(DbScanIterator {
            inner: merge_iter,
            last_key: None,
        })
    }

    /// Block until the compaction manager indicates writes may proceed.
    fn stall_if_needed(&self) {
        loop {
            let should_stall = {
                let guard = self.state.lock();
                guard.should_stall_writes()
            };
            if !should_stall {
                break;
            }
            thread::sleep(Duration::from_millis(10));
        }
    }

    /// Helper: write the remaining portion of a batch, swapping memtables as
    /// needed.  Deduplicates the retry logic used by both the partial-write
    /// and the DataExceedsMaximum paths.
    fn write_batch_with_swap(
        &self,
        batch: &[(KeyBytes, ValueBytes)],
        offset: &mut usize,
        last_attempted: &mut Arc<Memtable>,
    ) -> Result<(), CesiumError> {
        while *offset < batch.len() {
            self.wait_for_frozen_capacity();

            let new_mtable = {
                let mut guard = self.state.lock();
                let current = guard.current_memtable();
                if Arc::ptr_eq(last_attempted, &current) {
                    guard.new_memtable();
                }
                let new = guard.current_memtable();
                *self.curr_memtable.write() = new.clone();
                new
            };

            match new_mtable.put_batch(&batch[*offset..]) {
                | Ok(w) => {
                    *offset += w;
                    if *offset >= batch.len() {
                        return Ok(());
                    }
                    *last_attempted = new_mtable;
                },
                | Err(e) => {
                    if matches!(e, MtError::MemtableIsFrozen | MtError::DataExceedsMaximum) {
                        *last_attempted = new_mtable;
                        continue;
                    }
                    return Err(MemtableError(e));
                },
            }
        }
        Ok(())
    }

    pub(crate) fn batch<K: AsRef<[u8]>, V: AsRef<[u8]>>(
        &self,
        ops: &[Batch<K, V>],
    ) -> Result<(), CesiumError> {
        self.stall_if_needed();

        let mut _batch = Vec::with_capacity(ops.len());
        for b in ops.iter() {
            match b {
                | PutNs(ns, k, v, ts) => {
                    _batch.push((
                        KeyBytes::new(*ns, Bytes::copy_from_slice(k.as_ref()), *ts),
                        ValueBytes::new(*ns, Bytes::copy_from_slice(v.as_ref())),
                    ));
                },
                | DeleteNs(ns, k, ts) => {
                    _batch.push((
                        KeyBytes::new(*ns, Bytes::copy_from_slice(k.as_ref()), *ts),
                        ValueBytes::new_tombstone(*ns),
                    ));
                },
                | Put(k, v, ts) => {
                    _batch.push((
                        KeyBytes::new(DEFAULT_NS, Bytes::copy_from_slice(k.as_ref()), *ts),
                        ValueBytes::new(DEFAULT_NS, Bytes::copy_from_slice(v.as_ref())),
                    ));
                },
                | Delete(k, ts) => {
                    _batch.push((
                        KeyBytes::new(DEFAULT_NS, Bytes::copy_from_slice(k.as_ref()), *ts),
                        ValueBytes::new_tombstone(DEFAULT_NS),
                    ));
                },
            }
        }

        // Fast path: try to write entire batch to current memtable
        let mtable = self.curr_memtable.read().clone();

        match mtable.put_batch(_batch.as_ref()) {
            | Ok(written) if written == _batch.len() => {
                // All written, done!
                Ok(())
            },
            | Ok(written) => {
                // Partial write - need to handle remaining with memtable swaps
                let mut offset = written;
                let mut last_attempted = mtable.clone();
                self.write_batch_with_swap(&_batch, &mut offset, &mut last_attempted)
            },
            | Err(e) => {
                match e {
                    | MtError::DataExceedsMaximum => {
                        // First entry doesn't fit - same logic as partial write loop
                        let mut offset = 0usize;
                        let mut last_attempted = mtable.clone();
                        self.write_batch_with_swap(&_batch, &mut offset, &mut last_attempted)
                    },
                    | MtError::MemtableIsFrozen => {
                        // Memtable was frozen during write - get current and retry
                        let new_mtable = {
                            let guard = self.state.lock();
                            let new = guard.current_memtable();
                            *self.curr_memtable.write() = new.clone();
                            new
                        };
                        match new_mtable.put_batch(_batch.as_ref()) {
                            | Ok(_) => Ok(()),
                            | Err(e) => Err(MemtableError(e)),
                        }
                    },
                }
            },
        }
    }

    pub(crate) fn sync(&self) -> Result<(), CesiumError> {
        let mut guard = self.state.lock();
        guard.sync()?;
        // Update cached curr_memtable to match the new empty memtable
        // created by sync(). Without this, db.get would continue reading
        // from the old (flushed) memtable via the stale cache.
        let new_mtable = guard.current_memtable();
        *self.curr_memtable.write() = new_mtable;
        Ok(())
    }

    /// Block until frozen memtables are below the limit.
    /// This prevents unbounded memory growth when the flusher can't keep up.
    pub(crate) fn wait_for_frozen_capacity(&self) {
        let limit = {
            let guard = self.state.lock();
            guard.memtable_limit()
        };
        // If limit is 0, disable backpressure (default behavior)
        if limit == 0 {
            return;
        }
        loop {
            let frozen = {
                let guard = self.state.lock();
                guard.frozen_count()
            };
            if frozen < limit as usize {
                break;
            }
            thread::sleep(Duration::from_millis(1));
        }
    }

    pub(crate) fn version_stats(&self) -> VersionStats {
        self.state.lock().version_stats()
    }

    pub(crate) fn read_amp_stats(&self) -> ReadAmpStats {
        ReadAmpStats {
            total_gets: self.total_gets.load(Ordering::Relaxed),
            l0_segments_checked: self.l0_reads.load(Ordering::Relaxed),
            ln_segments_checked: self.ln_reads.load(Ordering::Relaxed),
        }
    }

    pub(crate) fn frozen_memtable_count(&self) -> usize {
        self.state.lock().frozen_count()
    }
}