Skip to main content

armdb/
zero_tree.rs

1use std::marker::PhantomData;
2use std::ops::Bound;
3
4use crate::Key;
5use crate::compaction::CompactionIndex;
6use crate::config::Config;
7use crate::const_tree::{ConstIter, ConstShard, ConstTree};
8use crate::disk_loc::DiskLoc;
9use crate::durability::{Bitcask, Durability, Fixed};
10use crate::error::{DbError, DbResult};
11use crate::fixed::config::FixedConfig;
12use crate::hook::{NoHook, TypedWriteHook, ZeroHookAdapter};
13use crate::key::Location;
14
15/// A tree with fixed-size keys and zerocopy-compatible values.
16///
17/// Thin wrapper around [`ConstTree<K, V, D>`] that provides a typed API for values
18/// implementing [`FromBytes`] + [`IntoBytes`] + [`Immutable`]. Conversions are
19/// zero-cost — values are transmuted without serialization.
20///
21/// Generic over `D: Durability` (default: `Bitcask`). Use `Fixed` backend for
22/// frequent updates without compaction: `ZeroTree::<K, V, Fixed, T>::open(...)`.
23///
24/// Requires `size_of::<T>() == V` (compile-time assertion in constructor).
25///
26/// # Write hooks
27///
28/// Uses [`TypedWriteHook<K, T>`] — the hook receives `&T` directly (not raw bytes).
29/// `on_write` fires on `put`/`insert`/`delete`/`cas`/`update`.
30/// Does **not** fire inside `atomic()`. Old value is always provided (it lives in
31/// memory) — `NEEDS_OLD_VALUE` is ignored.
32///
33/// `on_init` fires once per live entry during `migrate()` or `replay_init()`
34/// (enable via `NEEDS_INIT = true`).
35///
36/// # When to use
37///
38/// For `T` with trivial representation (no heap allocations, fixed layout):
39/// structs with `#[derive(FromBytes, IntoBytes, Immutable, KnownLayout)]`.
40/// All values live in memory — reads never touch disk.
41///
42/// For complex types (String, Vec, nested structs) use [`TypedTree`](crate::TypedTree) instead.
43///
44/// # Usage
45///
46/// ```ignore
47/// use zerocopy::{FromBytes, IntoBytes, Immutable, KnownLayout};
48///
49/// #[derive(FromBytes, IntoBytes, Immutable, KnownLayout, Clone, Copy)]
50/// #[repr(C)]
51/// struct Counters {
52///     views: u64,
53///     likes: u64,
54/// }
55///
56/// let tree = ZeroTree::<[u8; 16], { size_of::<Counters>() }, Counters>::open(
57///     "data/counters",
58///     Config::default(),
59/// )?;
60/// tree.put(&key, &Counters { views: 1, likes: 0 })?;
61/// if let Some(c) = tree.get(&key) {
62///     println!("views: {}", c.views);
63/// }
64/// ```
65///
66/// # Iteration
67///
68/// `iter()`, `range()`, and `prefix_iter()` return [`ZeroIter`] which
69/// implements `Iterator + DoubleEndedIterator` with `Item = (K, T)`.
70pub struct ZeroTree<
71    K: Key,
72    const V: usize,
73    T: Copy = [u8; V],
74    H: TypedWriteHook<K, T> = NoHook,
75    D: Durability = Bitcask,
76> {
77    inner: ConstTree<K, V, ZeroHookAdapter<K, T, H>, D>,
78    _marker: PhantomData<T>,
79}
80
81// ==========================================================================
82// Bitcask-specific impl blocks
83// ==========================================================================
84
85impl<K: Key, const V: usize, T: Copy> ZeroTree<K, V, T, NoHook, Bitcask> {
86    /// Open or create a `ZeroTree` at the given path.
87    /// Recovers the index from existing data files on disk.
88    pub fn open(path: impl AsRef<std::path::Path>, config: Config) -> DbResult<Self> {
89        const { assert!(size_of::<T>() == V) }
90        let adapter = ZeroHookAdapter {
91            inner: NoHook,
92            _marker: PhantomData,
93        };
94        Ok(Self {
95            inner: ConstTree::open_hooked(path, config, adapter)?,
96            _marker: PhantomData,
97        })
98    }
99}
100
101impl<K: Key, const V: usize, T: Copy, H: TypedWriteHook<K, T>> ZeroTree<K, V, T, H, Bitcask> {
102    /// Open or create a `ZeroTree` with a write hook for secondary index maintenance.
103    pub fn open_hooked(
104        path: impl AsRef<std::path::Path>,
105        config: Config,
106        hook: H,
107    ) -> DbResult<Self> {
108        const { assert!(size_of::<T>() == V) }
109        let adapter = ZeroHookAdapter {
110            inner: hook,
111            _marker: PhantomData,
112        };
113        Ok(Self {
114            inner: ConstTree::open_hooked(path, config, adapter)?,
115            _marker: PhantomData,
116        })
117    }
118
119    /// Graceful shutdown: write hint files, flush write buffers + fsync.
120    pub fn close(self) -> DbResult<()> {
121        self.inner.close()
122    }
123
124    /// Flush all shard write buffers to disk (without fsync).
125    pub fn flush_buffers(&self) -> DbResult<()> {
126        self.inner.flush_buffers()
127    }
128
129    /// Get the database configuration.
130    pub fn config(&self) -> &Config {
131        self.inner.config()
132    }
133
134    /// Trigger a compaction pass across all shards.
135    pub fn compact(&self) -> DbResult<usize> {
136        self.inner.compact()
137    }
138
139    /// Write hint files for all active shard files. Call during graceful shutdown.
140    pub fn sync_hints(&self) -> DbResult<()> {
141        self.inner.sync_hints()
142    }
143
144    /// Iterate all entries and optionally mutate them. Call once at startup.
145    ///
146    /// The callback receives each (key, &T) and returns `MigrateAction`:
147    /// - `Keep` — no change (fires `on_init` if `NEEDS_INIT`)
148    /// - `Update(new_value)` — replace value (hook-free write, fires `on_init`)
149    /// - `Delete` — remove entry (hook-free tombstone, no `on_init`)
150    ///
151    /// `on_write` is **never** fired during migration.
152    /// Returns the number of mutated entries.
153    pub fn migrate(&self, f: impl Fn(&K, &T) -> crate::MigrateAction<T>) -> DbResult<usize> {
154        self.inner.migrate(|key, bytes| {
155            let val: T = from_value_bytes(bytes);
156            match f(key, &val) {
157                crate::MigrateAction::Keep => crate::MigrateAction::Keep,
158                crate::MigrateAction::Update(new) => {
159                    crate::MigrateAction::Update(to_bytes::<V, T>(&new))
160                }
161                crate::MigrateAction::Delete => crate::MigrateAction::Delete,
162            }
163        })
164    }
165
166    /// Replay `on_init` for every live entry. Used when no migration runs.
167    pub(crate) fn replay_init(&self) {
168        self.inner.replay_init();
169    }
170
171    /// Access the underlying `ConstTree`.
172    pub fn as_inner(&self) -> &ConstTree<K, V, ZeroHookAdapter<K, T, H>, Bitcask> {
173        &self.inner
174    }
175}
176
177// ==========================================================================
178// Fixed-specific impl blocks
179// ==========================================================================
180
181impl<K: Key, const V: usize, T: Copy> ZeroTree<K, V, T, NoHook, Fixed> {
182    /// Open or create a `ZeroTree` with Fixed (fixed-slot) backend.
183    /// Recovers the index from existing data files on disk.
184    pub fn open(path: impl AsRef<std::path::Path>, config: FixedConfig) -> DbResult<Self> {
185        const { assert!(size_of::<T>() == V) }
186        let adapter = ZeroHookAdapter {
187            inner: NoHook,
188            _marker: PhantomData,
189        };
190        Ok(Self {
191            inner: ConstTree::open_with_hook(path, config, adapter)?,
192            _marker: PhantomData,
193        })
194    }
195}
196
197impl<K: Key, const V: usize, T: Copy, H: TypedWriteHook<K, T>> ZeroTree<K, V, T, H, Fixed> {
198    /// Open or create a `ZeroTree` with a write hook, using Fixed (fixed-slot) backend.
199    pub fn open_with_hook(
200        path: impl AsRef<std::path::Path>,
201        config: FixedConfig,
202        hook: H,
203    ) -> DbResult<Self> {
204        const { assert!(size_of::<T>() == V) }
205        let adapter = ZeroHookAdapter {
206            inner: hook,
207            _marker: PhantomData,
208        };
209        Ok(Self {
210            inner: ConstTree::open_with_hook(path, config, adapter)?,
211            _marker: PhantomData,
212        })
213    }
214
215    /// Perform a clean shutdown (Fixed backend).
216    pub fn close(self) -> DbResult<()> {
217        self.inner.close()
218    }
219}
220
221// ==========================================================================
222// Generic impl block — works with any D: Durability
223// ==========================================================================
224
225impl<K: Key, const V: usize, T: Copy, H: TypedWriteHook<K, T>, D: Durability>
226    ZeroTree<K, V, T, H, D>
227{
228    // -- Reads ----------------------------------------------------------------
229
230    /// Get a value by key. Lock-free, zero disk I/O. Returns a copy of `T`.
231    pub fn get(&self, key: &K) -> Option<T> {
232        let bytes = self.inner.get(key)?;
233        Some(from_value_bytes::<V, T>(&bytes))
234    }
235
236    /// Get a value by key, returning `Err(KeyNotFound)` if absent.
237    pub fn get_or_err(&self, key: &K) -> DbResult<T> {
238        self.get(key).ok_or(DbError::KeyNotFound)
239    }
240
241    /// Check if a key exists.
242    pub fn contains(&self, key: &K) -> bool {
243        self.inner.contains(key)
244    }
245
246    /// Return the first entry in index order, or `None` if empty.
247    /// With `reversed=true` (default): the entry with the largest key.
248    /// O(1) — follows head's level-0 pointer.
249    pub fn first(&self) -> Option<(K, T)> {
250        self.inner
251            .first()
252            .map(|(k, v)| (k, from_value_bytes::<V, T>(&v)))
253    }
254
255    /// Return the last entry in index order, or `None` if empty.
256    /// With `reversed=true` (default): the entry with the smallest key.
257    pub fn last(&self) -> Option<(K, T)> {
258        self.inner
259            .last()
260            .map(|(k, v)| (k, from_value_bytes::<V, T>(&v)))
261    }
262
263    // -- Writes ---------------------------------------------------------------
264
265    /// Insert or update a key-value pair. Returns the old value if the key existed.
266    pub fn put(&self, key: &K, value: &T) -> DbResult<Option<T>> {
267        let bytes = to_bytes::<V, T>(value);
268        self.inner
269            .put(key, &bytes)
270            .map(|opt| opt.map(|b| from_value_bytes::<V, T>(&b)))
271    }
272
273    /// Insert a key-value pair only if the key does not exist.
274    /// Returns `Err(KeyExists)` if the key is already present.
275    pub fn insert(&self, key: &K, value: &T) -> DbResult<()> {
276        let bytes = to_bytes::<V, T>(value);
277        self.inner.insert(key, &bytes)
278    }
279
280    /// Delete a key. Returns the old value if the key existed.
281    pub fn delete(&self, key: &K) -> DbResult<Option<T>> {
282        self.inner
283            .delete(key)
284            .map(|opt| opt.map(|b| from_value_bytes::<V, T>(&b)))
285    }
286
287    /// Compare-and-swap: if current value == expected, replace with new_value.
288    /// Returns `Ok(())` on success, `Err(CasMismatch)` if current != expected,
289    /// `Err(KeyNotFound)` if key doesn't exist.
290    pub fn cas(&self, key: &K, expected: &T, new_value: &T) -> DbResult<()> {
291        let exp_bytes = to_bytes::<V, T>(expected);
292        let new_bytes = to_bytes::<V, T>(new_value);
293        self.inner.cas(key, &exp_bytes, &new_bytes)
294    }
295
296    /// Atomically read-modify-write. Returns `Some(T)` (the **new** value)
297    /// if key existed, `None` otherwise.
298    /// The closure must not be heavy (shard lock is held).
299    pub fn update(&self, key: &K, f: impl FnOnce(&T) -> T) -> DbResult<Option<T>> {
300        self.inner
301            .update(key, |bytes| {
302                let val = from_value_bytes::<V, T>(bytes);
303                let new_val = f(&val);
304                to_bytes::<V, T>(&new_val)
305            })
306            .map(|opt| opt.map(|b| from_value_bytes::<V, T>(&b)))
307    }
308
309    /// Like [`update()`](Self::update), but returns `Some(T)` with the **old** value.
310    pub fn fetch_update(&self, key: &K, f: impl FnOnce(&T) -> T) -> DbResult<Option<T>> {
311        self.inner
312            .fetch_update(key, |bytes| {
313                let val = from_value_bytes::<V, T>(bytes);
314                let new_val = f(&val);
315                to_bytes::<V, T>(&new_val)
316            })
317            .map(|opt| opt.map(|b| from_value_bytes::<V, T>(&b)))
318    }
319
320    // -- Atomic ---------------------------------------------------------------
321
322    /// Atomically execute multiple operations on a single shard.
323    /// All keys must route to the same shard as `shard_key`.
324    /// The closure must be short — shard lock is held for its duration.
325    pub fn atomic<R>(
326        &self,
327        shard_key: &K,
328        f: impl FnOnce(&mut ZeroShard<'_, K, V, T, D>) -> DbResult<R>,
329    ) -> DbResult<R> {
330        self.inner.atomic(shard_key, |const_shard| {
331            // SAFETY: ZeroShard is a transparent wrapper over ConstShard with same layout
332            // (PhantomData is ZST). The hook type parameter doesn't affect ConstShard layout.
333            let shard = unsafe {
334                &mut *(const_shard as *mut ConstShard<'_, K, V, ZeroHookAdapter<K, T, H>, D>
335                    as *mut ZeroShard<'_, K, V, T, D>)
336            };
337            f(shard)
338        })
339    }
340
341    // -- Iteration ------------------------------------------------------------
342
343    /// Iterate entries whose keys start with `prefix`.
344    ///
345    /// `reversed=true` (default): yields matching keys in DESC order.
346    /// `next()` is O(1), `next_back()` is O(log n).
347    pub fn prefix_iter(&self, prefix: &[u8]) -> ZeroIter<'_, K, V, T, D::Loc> {
348        ZeroIter {
349            inner: self.inner.prefix_iter(prefix),
350            _marker: PhantomData,
351        }
352    }
353
354    /// Iterate all entries in index order.
355    ///
356    /// `reversed=true` (default): DESC. `reversed=false`: ASC.
357    /// `next()` is O(1), `next_back()` is O(log n).
358    pub fn iter(&self) -> ZeroIter<'_, K, V, T, D::Loc> {
359        ZeroIter {
360            inner: self.inner.iter(),
361            _marker: PhantomData,
362        }
363    }
364
365    /// Iterate entries in `[start, end)` — start inclusive, end exclusive.
366    ///
367    /// `reversed=true` (default): DESC within range. `reversed=false`: ASC.
368    /// `next()` is O(1), `next_back()` is O(log n).
369    pub fn range(&self, start: &K, end: &K) -> ZeroIter<'_, K, V, T, D::Loc> {
370        self.range_bounds(Bound::Included(start), Bound::Excluded(end))
371    }
372
373    /// Iterate entries in range defined by `start` and `end` bounds.
374    ///
375    /// Unlike [`range()`](Self::range), allows `Included`, `Excluded`, or `Unbounded`
376    /// for each bound independently.
377    ///
378    /// `reversed=true` (default): DESC within range. `reversed=false`: ASC.
379    /// `next()` is O(1), `next_back()` is O(log n).
380    pub fn range_bounds(&self, start: Bound<&K>, end: Bound<&K>) -> ZeroIter<'_, K, V, T, D::Loc> {
381        ZeroIter {
382            inner: self.inner.range_bounds(start, end),
383            _marker: PhantomData,
384        }
385    }
386
387    // -- Info -----------------------------------------------------------------
388
389    pub fn len(&self) -> usize {
390        self.inner.len()
391    }
392
393    pub fn is_empty(&self) -> bool {
394        self.inner.is_empty()
395    }
396
397    pub fn shard_for(&self, key: &K) -> usize {
398        self.inner.shard_for(key)
399    }
400
401    /// Flush all shard data to disk.
402    pub fn flush(&self) -> DbResult<()> {
403        self.inner.flush()
404    }
405}
406
407impl<K: Key, const V: usize, T: Copy + Send + Sync, H: TypedWriteHook<K, T>> CompactionIndex<K>
408    for ZeroTree<K, V, T, H, Bitcask>
409{
410    fn update_if_match(
411        &self,
412        key: &K,
413        old_loc: crate::disk_loc::DiskLoc,
414        new_loc: crate::disk_loc::DiskLoc,
415    ) -> bool {
416        self.inner.update_if_match(key, old_loc, new_loc)
417    }
418
419    fn contains_key(&self, key: &K) -> bool {
420        self.contains(key)
421    }
422}
423
424// ---------------------------------------------------------------------------
425// ZeroIter
426// ---------------------------------------------------------------------------
427
428/// Iterator over entries in a `ZeroTree`. Wraps [`ConstIter`] and converts
429/// `[u8; V]` values to `T` via zerocopy.
430pub struct ZeroIter<'a, K: Key, const V: usize, T = [u8; V], L: Location = DiskLoc> {
431    inner: ConstIter<'a, K, V, L>,
432    _marker: PhantomData<T>,
433}
434
435impl<'a, K: Key, const V: usize, T: Copy, L: Location> Iterator for ZeroIter<'a, K, V, T, L> {
436    type Item = (K, T);
437
438    fn next(&mut self) -> Option<Self::Item> {
439        self.inner
440            .next()
441            .map(|(k, v)| (k, from_value_bytes::<V, T>(&v)))
442    }
443}
444
445impl<'a, K: Key, const V: usize, T: Copy, L: Location> DoubleEndedIterator
446    for ZeroIter<'a, K, V, T, L>
447{
448    fn next_back(&mut self) -> Option<Self::Item> {
449        self.inner
450            .next_back()
451            .map(|(k, v)| (k, from_value_bytes::<V, T>(&v)))
452    }
453}
454
455// ---------------------------------------------------------------------------
456// ZeroShard
457// ---------------------------------------------------------------------------
458
459/// Handle for atomic multi-key operations within a single shard.
460/// Obtained via [`ZeroTree::atomic`].
461#[repr(transparent)]
462pub struct ZeroShard<'a, K: Key, const V: usize, T: Copy = [u8; V], D: Durability = Bitcask> {
463    // The actual hook type is ZeroHookAdapter, but ZeroShard is accessed via
464    // unsafe pointer cast in atomic(). ConstShard layout doesn't depend on H
465    // (it's stored in the parent ConstTree, not in the shard).
466    inner: ConstShard<'a, K, V, NoHook, D>,
467    _marker: PhantomData<T>,
468}
469
470impl<K: Key, const V: usize, T: Copy, D: Durability> ZeroShard<'_, K, V, T, D> {
471    pub fn put(&mut self, key: &K, value: &T) -> DbResult<Option<T>> {
472        let bytes = to_bytes::<V, T>(value);
473        self.inner
474            .put(key, &bytes)
475            .map(|opt| opt.map(|b| from_value_bytes::<V, T>(&b)))
476    }
477
478    pub fn insert(&mut self, key: &K, value: &T) -> DbResult<()> {
479        let bytes = to_bytes::<V, T>(value);
480        self.inner.insert(key, &bytes)
481    }
482
483    pub fn delete(&mut self, key: &K) -> DbResult<Option<T>> {
484        self.inner
485            .delete(key)
486            .map(|opt| opt.map(|b| from_value_bytes::<V, T>(&b)))
487    }
488
489    pub fn get(&self, key: &K) -> Option<T> {
490        let bytes = self.inner.get(key)?;
491        Some(from_value_bytes::<V, T>(&bytes))
492    }
493
494    pub fn get_or_err(&self, key: &K) -> DbResult<T> {
495        self.get(key).ok_or(DbError::KeyNotFound)
496    }
497
498    pub fn contains(&self, key: &K) -> bool {
499        self.inner.contains(key)
500    }
501}
502
503// ---------------------------------------------------------------------------
504// Helpers
505// ---------------------------------------------------------------------------
506
507#[inline(always)]
508pub(crate) fn to_bytes<const V: usize, T: Copy>(value: &T) -> [u8; V] {
509    debug_assert_eq!(size_of::<T>(), V);
510    unsafe { std::ptr::read(std::ptr::from_ref(value).cast()) }
511}
512
513#[inline(always)]
514pub(crate) fn from_value_bytes<const V: usize, T: Copy>(bytes: &[u8; V]) -> T {
515    debug_assert_eq!(V, size_of::<T>());
516    unsafe { std::ptr::read(bytes.as_ptr().cast()) }
517}
518
519#[cfg(feature = "armour")]
520impl<T, const V: usize, H> crate::armour::collection::Collection
521    for ZeroTree<T::SelfId, V, T, H, Bitcask>
522where
523    T: crate::CollectionMeta + Copy + Send + Sync,
524    H: crate::hook::TypedWriteHook<T::SelfId, T>,
525    T::SelfId: crate::Key + Ord,
526{
527    fn name(&self) -> &str {
528        T::NAME
529    }
530    fn len(&self) -> usize {
531        self.len()
532    }
533    fn compact(&self) -> crate::DbResult<usize> {
534        self.compact()
535    }
536}