Skip to main content

armdb/
var_tree.rs

1use std::mem::size_of;
2use std::ops::Bound;
3use std::sync::Arc;
4
5use crate::Key;
6
7use crate::byte_view::ByteView;
8use crate::cache::{BlockCache, BlockKey};
9use crate::compaction::{CompactionIndex, compact_shard};
10use crate::config::Config;
11use crate::disk_loc::DiskLoc;
12use crate::engine::Engine;
13use crate::error::{DbError, DbResult};
14use crate::hook::{NoHook, WriteHook};
15use crate::io::aligned_buf::AlignedBuf;
16use crate::recovery::recover_var_tree;
17use crate::shard::ShardInner;
18use crate::skiplist::node::{SkipNode, VarNode, random_height};
19use crate::skiplist::{InsertResult, SkipList};
20use crate::sync::MutexGuard;
21
22const MAX_STALE_RETRIES: usize = 3;
23
24/// A tree with fixed-size keys and variable-length values.
25/// Values are stored as `ByteView` (inline ≤20 bytes, heap with ref counting for larger).
26/// Disk reads are cached at 4096-byte block granularity via `BlockCache`.
27///
28/// Each `VarTree` owns its storage engine — one tree = one database directory.
29///
30/// # Usage
31///
32/// ```ignore
33/// let tree = VarTree::<16>::open("data/messages", Config::default())?;
34/// tree.put(&key, b"hello world")?;
35/// tree.close()?;
36/// ```
37///
38/// # Iteration
39///
40/// `iter()`, `range()`, and `prefix_iter()` all return [`VarIter`] which
41/// implements `Iterator + DoubleEndedIterator` with `Item = (K, ByteView)`.
42/// Each `next()` / `next_back()` may perform disk I/O on a block-cache miss.
43///
44/// ```ignore
45/// let latest = tree.prefix_iter(&group_id).take(50).collect::<Vec<_>>();
46/// let oldest = tree.prefix_iter(&group_id).rev().take(10);  // DoubleEndedIterator
47/// ```
48pub struct VarTree<K: Key, H: WriteHook<K> = NoHook> {
49    index: SkipList<VarNode<K>>,
50    engine: Engine,
51    cache: BlockCache,
52    compaction_threshold: f64,
53    shard_prefix_bits: usize,
54    reversed: bool,
55    hook: H,
56}
57
58impl<K: Key> VarTree<K> {
59    /// Open or create a `VarTree` at the given path.
60    /// Recovers the index from existing data files on disk.
61    pub fn open(path: impl AsRef<std::path::Path>, config: Config) -> DbResult<Self> {
62        Self::open_inner(path, config, NoHook)
63    }
64}
65
66impl<K: Key, H: WriteHook<K>> VarTree<K, H> {
67    /// Open or create a `VarTree` with a write hook for secondary index maintenance.
68    pub fn open_hooked(
69        path: impl AsRef<std::path::Path>,
70        config: Config,
71        hook: H,
72    ) -> DbResult<Self> {
73        Self::open_inner(path, config, hook)
74    }
75
76    fn open_inner(path: impl AsRef<std::path::Path>, config: Config, hook: H) -> DbResult<Self> {
77        let compaction_threshold = config.compaction_threshold;
78        let shard_prefix_bits = config.shard_prefix_bits;
79        let reversed = config.reversed;
80        let cache = BlockCache::new(&config.cache);
81        let engine = Engine::open(path, config)?;
82
83        let tree = Self {
84            index: SkipList::new(reversed),
85            engine,
86            cache,
87            compaction_threshold,
88            shard_prefix_bits,
89            reversed,
90            hook,
91        };
92
93        // Recover index from disk
94        let shard_dirs = tree.engine.shard_dirs();
95        let shard_dir_refs = Engine::shard_dir_refs(&shard_dirs);
96        let shard_ids = tree.engine.shard_ids();
97
98        let hints = tree.engine.hints();
99        let outcome = recover_var_tree::<K>(
100            &shard_dir_refs,
101            &shard_ids,
102            tree.index(),
103            hints,
104            #[cfg(feature = "encryption")]
105            tree.engine.cipher(),
106        )?;
107        for tail in &outcome.active_tails {
108            tree.engine.shards()[tail.shard_idx].apply_recovery_tail(tail)?;
109        }
110        for (shard_idx, dead) in outcome.shard_dead_bytes {
111            tree.engine.shards()[shard_idx].install_dead_bytes(dead);
112        }
113        let max_gsn = outcome.max_gsn;
114
115        tree.engine
116            .gsn()
117            .fetch_max(max_gsn + 1, std::sync::atomic::Ordering::Relaxed);
118        if hints {
119            for shard in tree.engine.shards().iter() {
120                shard.set_key_len(size_of::<K>());
121            }
122        }
123        tracing::info!(
124            key_size = size_of::<K>(),
125            entries = tree.len(),
126            "var_tree recovered"
127        );
128
129        Ok(tree)
130    }
131
132    /// Graceful shutdown: write hint files (if enabled), flush write buffers + fsync.
133    pub fn close(self) -> DbResult<()> {
134        if self.engine.hints() {
135            self.sync_hints()?;
136        }
137        self.engine.flush()
138    }
139
140    /// Flush all shard write buffers to disk (without fsync).
141    pub fn flush_buffers(&self) -> DbResult<()> {
142        self.engine.flush_buffers()
143    }
144
145    /// Get the database configuration.
146    pub fn config(&self) -> &Config {
147        self.engine.config()
148    }
149}
150
151impl<K: Key, H: WriteHook<K>> CompactionIndex<K> for VarTree<K, H> {
152    fn update_if_match(&self, key: &K, old_loc: DiskLoc, new_loc: DiskLoc) -> bool {
153        let guard = self.index.collector().enter();
154        if let Some(node) = self.index.get(key.as_bytes(), &guard) {
155            let current_ptr = node.load_disk_ptr();
156            let current_disk = unsafe { *current_ptr };
157            if current_disk == old_loc {
158                let new_disk_ptr = Box::into_raw(Box::new(new_loc));
159                match node.compare_exchange_disk(current_ptr, new_disk_ptr) {
160                    Ok(old_ptr) => {
161                        unsafe {
162                            self.index
163                                .collector()
164                                .retire(old_ptr, seize::reclaim::boxed::<DiskLoc>);
165                        }
166                        return true;
167                    }
168                    Err(_) => {
169                        // Concurrent put changed the DiskLoc — entry is no longer
170                        // at old_loc, treat as dead in the compacted file.
171                        unsafe {
172                            drop(Box::from_raw(new_disk_ptr));
173                        }
174                        return false;
175                    }
176                }
177            }
178        }
179        false
180    }
181
182    fn invalidate_blocks(&self, shard_id: u8, file_id: u32, total_bytes: u64) {
183        self.cache.invalidate_file(shard_id, file_id, total_bytes);
184    }
185
186    fn contains_key(&self, key: &K) -> bool {
187        self.contains(key)
188    }
189}
190
191impl<K: Key, H: WriteHook<K>> VarTree<K, H> {
192    /// Trigger a background compaction pass across all shards.
193    pub fn compact(&self) -> DbResult<usize> {
194        let mut total_compacted = 0;
195        for shard in self.engine.shards().iter() {
196            total_compacted += compact_shard(shard, self, self.compaction_threshold)?;
197        }
198        Ok(total_compacted)
199    }
200
201    /// Get a value by key. Checks block cache first, then reads from disk.
202    pub fn get(&self, key: &K) -> Option<ByteView> {
203        metrics::counter!("armdb.ops", "op" => "get", "tree" => "var_tree").increment(1);
204        #[cfg(feature = "hot-path-tracing")]
205        tracing::trace!("var_tree.get");
206        let guard = self.index.collector().enter();
207        let node = match self.index.get(key.as_bytes(), &guard) {
208            Some(n) => n,
209            None => {
210                #[cfg(feature = "hot-path-tracing")]
211                tracing::error!(
212                    "VarTree get error: index.get returned None for key {:?}",
213                    key.as_bytes()
214                );
215                return None;
216            }
217        };
218        self.read_value_cached(node, &guard)
219    }
220
221    /// Get a value by key, returning `Err(KeyNotFound)` if absent.
222    pub fn get_or_err(&self, key: &K) -> DbResult<ByteView> {
223        self.get(key).ok_or(DbError::KeyNotFound)
224    }
225
226    /// Insert or update a key-value pair.
227    pub fn put(&self, key: &K, value: &[u8]) -> DbResult<()> {
228        metrics::counter!("armdb.ops", "op" => "put", "tree" => "var_tree").increment(1);
229        #[cfg(feature = "hot-path-tracing")]
230        tracing::trace!("var_tree.put");
231        let shard_id = self.shard_for(key);
232        let mut inner = self.engine.shards()[shard_id].lock();
233        let guard = self.index.collector().enter();
234        let old_value = if H::NEEDS_OLD_VALUE {
235            if let Some(node) = self.index.get(key.as_bytes(), &guard) {
236                let disk = *node.load_disk();
237                Some(self.read_value_locked_result(&disk, &inner)?)
238            } else {
239                None
240            }
241        } else {
242            None
243        };
244        self.put_locked(shard_id, &mut inner, &guard, key, value)?;
245        drop(inner);
246        self.hook.on_write(key, old_value.as_deref(), Some(value));
247        Ok(())
248    }
249
250    /// Insert a key-value pair only if the key does not exist.
251    /// Returns `Err(KeyExists)` if the key is already present.
252    pub fn insert(&self, key: &K, value: &[u8]) -> DbResult<()> {
253        metrics::counter!("armdb.ops", "op" => "insert", "tree" => "var_tree").increment(1);
254        #[cfg(feature = "hot-path-tracing")]
255        tracing::trace!("var_tree.insert");
256        let shard_id = self.shard_for(key);
257        let mut inner = self.engine.shards()[shard_id].lock();
258        let guard = self.index.collector().enter();
259        self.insert_locked(shard_id, &mut inner, &guard, key, value)?;
260        drop(inner);
261        self.hook.on_write(key, None, Some(value));
262        Ok(())
263    }
264
265    /// Delete a key. Returns `true` if the key existed.
266    pub fn delete(&self, key: &K) -> DbResult<bool> {
267        metrics::counter!("armdb.ops", "op" => "delete", "tree" => "var_tree").increment(1);
268        #[cfg(feature = "hot-path-tracing")]
269        tracing::trace!("var_tree.delete");
270        let shard_id = self.shard_for(key);
271        let mut inner = self.engine.shards()[shard_id].lock();
272        let guard = self.index.collector().enter();
273        let old_value = if H::NEEDS_OLD_VALUE {
274            if let Some(node) = self.index.get(key.as_bytes(), &guard) {
275                let disk = *node.load_disk();
276                Some(self.read_value_locked_result(&disk, &inner)?)
277            } else {
278                None
279            }
280        } else {
281            None
282        };
283        let existed = self.delete_locked(shard_id, &mut inner, &guard, key)?;
284        drop(inner);
285        if existed {
286            self.hook.on_write(key, old_value.as_deref(), None);
287        }
288        Ok(existed)
289    }
290
291    /// Atomically execute multiple operations on a single shard.
292    /// All keys must route to the same shard as `shard_key`.
293    /// The closure must be short — shard lock is held for its duration.
294    pub fn atomic<R>(
295        &self,
296        shard_key: &K,
297        f: impl FnOnce(&mut VarShard<'_, K, H>) -> DbResult<R>,
298    ) -> DbResult<R> {
299        let shard_id = self.shard_for(shard_key);
300        let inner = self.engine.shards()[shard_id].lock();
301        let guard = self.index.collector().enter();
302        let mut shard = VarShard {
303            tree: self,
304            inner,
305            shard_id,
306            guard,
307        };
308        f(&mut shard)
309    }
310
311    fn put_locked(
312        &self,
313        shard_id: usize,
314        inner: &mut ShardInner,
315        guard: &seize::LocalGuard<'_>,
316        key: &K,
317        value: &[u8],
318    ) -> DbResult<()> {
319        let (disk_loc, _gsn) = inner.append_entry(shard_id as u8, key.as_bytes(), value, false)?;
320
321        // Fast path: key exists — atomic swap, no write_lock, no node allocation
322        if let Some(existing) = self.index.get(key.as_bytes(), guard) {
323            let new_disk = Box::into_raw(Box::new(disk_loc));
324            let old_disk_ptr = existing.swap_disk(new_disk);
325            let old_disk = unsafe { *old_disk_ptr };
326            inner.add_dead_bytes(
327                old_disk.file_id,
328                crate::entry::entry_size(size_of::<K>(), old_disk.len),
329            );
330            unsafe {
331                self.index
332                    .collector()
333                    .retire(old_disk_ptr, seize::reclaim::boxed::<DiskLoc>);
334            }
335            return Ok(());
336        }
337
338        // Slow path: new key — allocate node + take write_lock via insert
339        let height = random_height();
340        let node_ptr = VarNode::alloc(*key, disk_loc, height);
341
342        match self.index.insert(node_ptr, guard) {
343            InsertResult::Inserted => {}
344            InsertResult::Exists(existing) => {
345                // Race: another shard inserted same key between get and insert
346                let new_disk = Box::into_raw(Box::new(disk_loc));
347                let old_disk_ptr = existing.swap_disk(new_disk);
348                let old_disk = unsafe { *old_disk_ptr };
349                inner.add_dead_bytes(
350                    old_disk.file_id,
351                    crate::entry::entry_size(size_of::<K>(), old_disk.len),
352                );
353                unsafe {
354                    self.index
355                        .collector()
356                        .retire(old_disk_ptr, seize::reclaim::boxed::<DiskLoc>);
357                }
358                unsafe {
359                    (*node_ptr)
360                        .disk
361                        .store(std::ptr::null_mut(), std::sync::atomic::Ordering::Relaxed);
362                    VarNode::<K>::dealloc_node(node_ptr);
363                }
364            }
365        }
366
367        Ok(())
368    }
369
370    fn insert_locked(
371        &self,
372        shard_id: usize,
373        inner: &mut ShardInner,
374        guard: &seize::LocalGuard<'_>,
375        key: &K,
376        value: &[u8],
377    ) -> DbResult<()> {
378        if self.index.get(key.as_bytes(), guard).is_some() {
379            return Err(DbError::KeyExists);
380        }
381
382        let (disk_loc, _gsn) = inner.append_entry(shard_id as u8, key.as_bytes(), value, false)?;
383        let height = random_height();
384        let node_ptr = VarNode::alloc(*key, disk_loc, height);
385
386        match self.index.insert(node_ptr, guard) {
387            InsertResult::Inserted => Ok(()),
388            InsertResult::Exists(_existing) => {
389                // Race: another path inserted this key after our `get` check and
390                // before `index.insert`. Account for the entry we already appended
391                // as dead bytes, free the unpublished node, and report KeyExists.
392                inner.add_dead_bytes(
393                    disk_loc.file_id,
394                    crate::entry::entry_size(size_of::<K>(), disk_loc.len),
395                );
396                // SAFETY: node_ptr was just allocated by VarNode::alloc and has
397                // not been published into the SkipList. It uniquely owns its
398                // freshly-allocated DiskLoc box.
399                unsafe { VarNode::<K>::dealloc_node(node_ptr) };
400                Err(DbError::KeyExists)
401            }
402        }
403    }
404
405    fn delete_locked(
406        &self,
407        shard_id: usize,
408        inner: &mut ShardInner,
409        guard: &seize::LocalGuard<'_>,
410        key: &K,
411    ) -> DbResult<bool> {
412        if self.index.get(key.as_bytes(), guard).is_none() {
413            return Ok(false);
414        }
415
416        inner.append_entry(shard_id as u8, key.as_bytes(), &[], true)?;
417
418        let removed = self.index.remove(key.as_bytes(), guard);
419
420        if let Some(node_ptr) = removed {
421            let disk = *unsafe { &*node_ptr }.load_disk();
422            inner.add_dead_bytes(
423                disk.file_id,
424                crate::entry::entry_size(size_of::<K>(), disk.len),
425            );
426        }
427
428        Ok(removed.is_some())
429    }
430
431    /// Check if a key exists.
432    pub fn contains(&self, key: &K) -> bool {
433        let guard = self.index.collector().enter();
434        self.index.get(key.as_bytes(), &guard).is_some()
435    }
436
437    /// Encoded value byte length for `key`, or `None` if absent.
438    /// Reads only the in-memory index entry (`DiskLoc::len`); no disk I/O.
439    pub fn entry_len(&self, key: &K) -> Option<u32> {
440        let guard = self.index.collector().enter();
441        self.index
442            .get(key.as_bytes(), &guard)
443            .map(|node| node.load_disk().len)
444    }
445
446    /// Return the first entry in index order, or `None` if empty.
447    /// With `reversed=true` (default): the entry with the largest key.
448    /// O(1) index lookup. May perform disk I/O on block-cache miss.
449    pub fn first(&self) -> Option<(K, ByteView)> {
450        let guard = self.index.collector().enter();
451        let mut ptr = crate::skiplist::strip_mark(unsafe {
452            (*self.index.head_ptr())
453                .tower(0)
454                .load(std::sync::atomic::Ordering::Acquire)
455        });
456        while !ptr.is_null() {
457            let node = unsafe { &*ptr };
458            if !node.is_marked() {
459                return self.read_value_cached(node, &guard).map(|v| (node.key, v));
460            }
461            ptr = crate::skiplist::strip_mark(
462                node.tower(0).load(std::sync::atomic::Ordering::Acquire),
463            );
464        }
465        None
466    }
467
468    /// Return the last entry in index order, or `None` if empty.
469    /// With `reversed=true` (default): the entry with the smallest key.
470    /// May perform disk I/O on block-cache miss.
471    pub fn last(&self) -> Option<(K, ByteView)> {
472        self.iter().next_back()
473    }
474
475    // -- Range helpers (front/back pointer positioning) -------------------------
476
477    fn resolve_front(&self, bound: &Bound<&K>, guard: &seize::LocalGuard<'_>) -> *mut VarNode<K> {
478        match bound {
479            Bound::Included(k) => self.index.find_first_ge(k.as_bytes(), guard),
480            Bound::Excluded(k) => {
481                let ge = self.index.find_first_ge(k.as_bytes(), guard);
482                if !ge.is_null()
483                    && !unsafe { &*ge }.is_marked()
484                    && unsafe { &*ge }.key_bytes() == k.as_bytes()
485                {
486                    crate::skiplist::strip_mark(unsafe {
487                        (*ge).tower(0).load(std::sync::atomic::Ordering::Acquire)
488                    })
489                } else {
490                    ge
491                }
492            }
493            Bound::Unbounded => crate::skiplist::strip_mark(unsafe {
494                (*self.index.head_ptr())
495                    .tower(0)
496                    .load(std::sync::atomic::Ordering::Acquire)
497            }),
498        }
499    }
500
501    fn prefix_bounds(&self, prefix: &[u8]) -> (K, Bound<K>) {
502        if self.reversed {
503            let mut search = K::zeroed();
504            search.as_bytes_mut().fill(0xFF);
505            search.as_bytes_mut()[..prefix.len()].copy_from_slice(prefix);
506            let mut end_key = K::zeroed();
507            end_key.as_bytes_mut()[..prefix.len()].copy_from_slice(prefix);
508            (search, Bound::Included(end_key))
509        } else {
510            let mut search = K::zeroed();
511            search.as_bytes_mut()[..prefix.len()].copy_from_slice(prefix);
512            let end = prefix_to_end_bound::<K>(prefix);
513            (search, end)
514        }
515    }
516
517    /// Iterate entries whose keys start with `prefix`.
518    ///
519    /// `reversed=true` (default): yields matching keys in DESC order.
520    /// `next()` is O(1), `next_back()` is O(log n). Both may perform disk I/O on cache miss.
521    pub fn prefix_iter(&self, prefix: &[u8]) -> VarIter<'_, K, H> {
522        let guard = self.index.collector().enter();
523        let (search_key, end) = self.prefix_bounds(prefix);
524        let front = self.index.find_first_ge(search_key.as_bytes(), &guard);
525        VarIter {
526            tree: self,
527            front,
528            back: None,
529            end,
530            start: Bound::Included(search_key),
531            reversed: self.reversed,
532            done: false,
533            _guard: guard,
534        }
535    }
536
537    /// Iterate all entries in index order.
538    ///
539    /// `reversed=true` (default): DESC. `reversed=false`: ASC.
540    /// `next()` is O(1), `next_back()` is O(log n). Both may perform disk I/O on cache miss.
541    pub fn iter(&self) -> VarIter<'_, K, H> {
542        let guard = self.index.collector().enter();
543        let front = crate::skiplist::strip_mark(unsafe {
544            (*self.index.head_ptr())
545                .tower(0)
546                .load(std::sync::atomic::Ordering::Acquire)
547        });
548        VarIter {
549            tree: self,
550            front,
551            back: None,
552            end: Bound::Unbounded,
553            start: Bound::Unbounded,
554            reversed: self.reversed,
555            done: false,
556            _guard: guard,
557        }
558    }
559
560    /// Iterate entries in `[start, end)` — start inclusive, end exclusive.
561    ///
562    /// `reversed=true` (default): DESC within range. `reversed=false`: ASC.
563    /// `next()` is O(1), `next_back()` is O(log n). Both may perform disk I/O on cache miss.
564    pub fn range(&self, start: &K, end: &K) -> VarIter<'_, K, H> {
565        self.range_bounds(Bound::Included(start), Bound::Excluded(end))
566    }
567
568    /// Iterate entries in range defined by `start` and `end` bounds.
569    ///
570    /// Unlike [`range()`](Self::range), allows `Included`, `Excluded`, or `Unbounded`
571    /// for each bound independently.
572    ///
573    /// `reversed=true` (default): DESC within range. `reversed=false`: ASC.
574    /// `next()` is O(1), `next_back()` is O(log n). Both may perform disk I/O on cache miss.
575    pub fn range_bounds(&self, start: Bound<&K>, end: Bound<&K>) -> VarIter<'_, K, H> {
576        let guard = self.index.collector().enter();
577        if self.reversed {
578            let front = self.resolve_front(&end, &guard);
579            VarIter {
580                tree: self,
581                front,
582                back: None,
583                end: bound_owned(&start),
584                start: bound_owned(&end),
585                reversed: true,
586                done: false,
587                _guard: guard,
588            }
589        } else {
590            let front = self.resolve_front(&start, &guard);
591            VarIter {
592                tree: self,
593                front,
594                back: None,
595                end: bound_owned(&end),
596                start: bound_owned(&start),
597                reversed: false,
598                done: false,
599                _guard: guard,
600            }
601        }
602    }
603
604    pub fn len(&self) -> usize {
605        self.index.len()
606    }
607
608    pub fn is_empty(&self) -> bool {
609        self.index.is_empty()
610    }
611
612    /// Write hint files for all active shard files. Call during graceful shutdown.
613    pub fn sync_hints(&self) -> DbResult<()> {
614        for shard in self.engine.shards().iter() {
615            shard.write_active_hint(size_of::<K>())?;
616        }
617        Ok(())
618    }
619
620    /// Pre-populate the block cache with blocks containing live values.
621    ///
622    /// Walks the index to collect unique block offsets, sorts them for sequential I/O,
623    /// then reads each block into the cache. Only blocks with live data are loaded —
624    /// dead entries and garbage are skipped.
625    pub fn warmup(&self) -> DbResult<()> {
626        use std::collections::BTreeSet;
627
628        let guard = self.index.collector().enter();
629
630        // Collect unique (shard_id, file_id, block_offset) from live index entries
631        let mut blocks: BTreeSet<(u8, u32, u64)> = BTreeSet::new();
632        let mut current = crate::skiplist::strip_mark(unsafe {
633            (*self.index.head_ptr())
634                .tower(0)
635                .load(std::sync::atomic::Ordering::Acquire)
636        });
637        while !current.is_null() {
638            let node = unsafe { &*current };
639            current = crate::skiplist::strip_mark(
640                node.tower(0).load(std::sync::atomic::Ordering::Acquire),
641            );
642            if node.is_marked() {
643                continue;
644            }
645            let disk = *node.load_disk();
646            let block_offset = disk.offset as u64 & !4095;
647            blocks.insert((disk.shard_id, disk.file_id, block_offset));
648        }
649        drop(guard);
650
651        // Read blocks in sorted order (sequential I/O per shard/file)
652        for (shard_id, file_id, block_offset) in &blocks {
653            let key = BlockKey {
654                shard_id: *shard_id,
655                file_id: *file_id,
656                block_offset: *block_offset,
657            };
658            if self.cache.get(&key).is_some() {
659                continue;
660            }
661            let shard = &self.engine.shards()[*shard_id as usize];
662            let (buf, is_full_block) = shard.read_block(*file_id, *block_offset)?;
663            if is_full_block {
664                self.cache.insert(key, Arc::new(buf));
665            }
666        }
667
668        Ok(())
669    }
670
671    pub(crate) fn index(&self) -> &SkipList<VarNode<K>> {
672        &self.index
673    }
674
675    /// Iterate all entries and optionally mutate them. Call once at startup.
676    ///
677    /// The callback receives each (key, value_bytes) and returns `MigrateAction`:
678    /// - `Keep` — no change (fires `on_init` if `H::NEEDS_INIT`); not counted
679    /// - `Update(new_value)` — replace value (hook-free write, fires `on_init`)
680    /// - `Delete` — remove entry (hook-free tombstone, no hooks)
681    ///
682    /// `on_write` is NEVER fired during migrate — see `docs/hooks.md` in the armdb crate.
683    /// Returns the number of mutated entries.
684    pub fn migrate(
685        &self,
686        f: impl Fn(&K, &[u8]) -> crate::MigrateAction<ByteView>,
687    ) -> DbResult<usize> {
688        use crate::MigrateAction;
689
690        let guard = self.index.collector().enter();
691        let mut current = crate::skiplist::strip_mark(unsafe {
692            (*self.index.head_ptr())
693                .tower(0)
694                .load(std::sync::atomic::Ordering::Acquire)
695        });
696        let mut count = 0;
697        while !current.is_null() {
698            let node = unsafe { &*current };
699            current = crate::skiplist::strip_mark(
700                node.tower(0).load(std::sync::atomic::Ordering::Acquire),
701            );
702            if node.is_marked() {
703                continue;
704            }
705            let value = match self.read_value_cached(node, &guard) {
706                Some(v) => v,
707                None => {
708                    tracing::warn!(
709                        key = ?node.key.as_bytes(),
710                        "var_tree migrate: skipping entry — value read failed"
711                    );
712                    continue;
713                }
714            };
715            match f(&node.key, &value) {
716                MigrateAction::Keep => {
717                    if H::NEEDS_INIT {
718                        self.hook.on_init(&node.key, &value);
719                    }
720                }
721                MigrateAction::Update(new_value) => {
722                    let shard_id = self.shard_for(&node.key);
723                    {
724                        let mut inner = self.engine.shards()[shard_id].lock();
725                        self.put_locked(shard_id, &mut inner, &guard, &node.key, &new_value)?;
726                    }
727                    if H::NEEDS_INIT {
728                        self.hook.on_init(&node.key, &new_value);
729                    }
730                    count += 1;
731                }
732                MigrateAction::Delete => {
733                    let shard_id = self.shard_for(&node.key);
734                    let mut inner = self.engine.shards()[shard_id].lock();
735                    self.delete_locked(shard_id, &mut inner, &guard, &node.key)?;
736                    count += 1;
737                }
738            }
739        }
740
741        tracing::info!(mutations = count, "var_tree migration complete");
742        Ok(count)
743    }
744
745    /// Replay `on_init` for every live entry. Used when no migration runs
746    /// (Db calls this after `run_migration` returns `false`). Public users
747    /// should invoke `migrate(|_, _| MigrateAction::Keep)` instead.
748    pub(crate) fn replay_init(&self) {
749        if !H::NEEDS_INIT {
750            return;
751        }
752        let guard = self.index.collector().enter();
753        let mut current = crate::skiplist::strip_mark(unsafe {
754            (*self.index.head_ptr())
755                .tower(0)
756                .load(std::sync::atomic::Ordering::Acquire)
757        });
758        let mut count = 0usize;
759        while !current.is_null() {
760            let node = unsafe { &*current };
761            current = crate::skiplist::strip_mark(
762                node.tower(0).load(std::sync::atomic::Ordering::Acquire),
763            );
764            if node.is_marked() {
765                continue;
766            }
767            let value = match self.read_value_cached(node, &guard) {
768                Some(v) => v,
769                None => {
770                    tracing::warn!(
771                        key = ?node.key.as_bytes(),
772                        "var_tree replay_init: skipping entry — value read failed"
773                    );
774                    continue;
775                }
776            };
777            self.hook.on_init(&node.key, &value);
778            count += 1;
779        }
780        tracing::debug!(replayed = count, "var_tree replay_init complete");
781    }
782
783    /// Lock-free read body. Returns `Ok(v)` on success, `Err(StaleDiskLoc)` if
784    /// compaction removed the file referenced by `disk` between the caller's
785    /// snapshot and the read. Other `DbError`s indicate I/O or decryption
786    /// failure.
787    fn read_value_cached_inner(&self, disk: &DiskLoc) -> DbResult<ByteView> {
788        let len = disk.len as usize;
789        let start = (disk.offset & 4095) as usize;
790
791        // Large values (>2 blocks) bypass the cached path. This check MUST
792        // sit before the cache lookup: a warm first block would otherwise
793        // route a large value into `extract_from_block`, which only supports
794        // two blocks and would panic on `next[..second_len]` when
795        // `second_len > 4096`. StaleDiskLoc propagates so the outer retry
796        // loop (`read_value_cached`) takes a fresh DiskLoc snapshot.
797        if start + len > 8192 {
798            let shard = &self.engine.shards()[disk.shard_id as usize];
799            let inner = shard.lock();
800            return self.read_value_locked_result(disk, &inner);
801        }
802
803        let block_offset = disk.offset as u64 & !4095;
804        let cache_key = BlockKey {
805            shard_id: disk.shard_id,
806            file_id: disk.file_id,
807            block_offset,
808        };
809
810        // 1. Block Cache (lock-free)
811        if let Some(block) = self.cache.get(&cache_key) {
812            metrics::counter!("armdb.cache.hit").increment(1);
813            return Self::extract_from_block(&block, start, len, || {
814                self.get_or_read_block(disk.shard_id, disk.file_id, block_offset + 4096)
815            });
816        }
817
818        // 2. Write buffer — for unflushed data in active file (brief shard lock)
819        {
820            let shard = &self.engine.shards()[disk.shard_id as usize];
821            let inner = shard.lock();
822            if inner.active.file_id == disk.file_id
823                && let Some(bytes) = inner.write_buf.read(disk.offset as u64, len)
824            {
825                return Ok(ByteView::new(bytes));
826            }
827        }
828
829        // 3. Disk read + cache
830        metrics::counter!("armdb.cache.miss").increment(1);
831        let block = self.get_or_read_block(disk.shard_id, disk.file_id, block_offset)?;
832        Self::extract_from_block(&block, start, len, || {
833            self.get_or_read_block(disk.shard_id, disk.file_id, block_offset + 4096)
834        })
835    }
836
837    /// Lock-free read with bounded retry on compaction race.
838    ///
839    /// The caller must hold a `seize::LocalGuard` for the duration of the
840    /// call (passed by reference so it cannot be dropped early); `node` must
841    /// have been obtained from this tree's index under the same guard. The
842    /// guard's lifetime, encoded in the parameter signature, prevents the
843    /// retire collector from reclaiming `node` between retry iterations.
844    fn read_value_cached(
845        &self,
846        node: &VarNode<K>,
847        _guard: &seize::LocalGuard<'_>,
848    ) -> Option<ByteView> {
849        for _ in 0..MAX_STALE_RETRIES {
850            let disk = *node.load_disk();
851            match self.read_value_cached_inner(&disk) {
852                Ok(v) => return Some(v),
853                Err(DbError::StaleDiskLoc) => {
854                    metrics::counter!("armdb.read.stale_retry", "tree" => "var_tree").increment(1);
855                    continue;
856                }
857                Err(_e) => {
858                    #[cfg(feature = "hot-path-tracing")]
859                    tracing::error!("VarTree read_value_cached error: {:?}", _e);
860                    return None;
861                }
862            }
863        }
864        None
865    }
866
867    fn extract_from_block(
868        block: &AlignedBuf,
869        start: usize,
870        len: usize,
871        next_block: impl FnOnce() -> DbResult<Arc<AlignedBuf>>,
872    ) -> DbResult<ByteView> {
873        debug_assert!(
874            start + len <= 8192,
875            "extract_from_block supports at most 2 blocks (8192 bytes)"
876        );
877        if start + len <= 4096 {
878            Ok(ByteView::new(&block[start..start + len]))
879        } else {
880            let next = next_block()?;
881            let first_part = &block[start..];
882            let second_len = len - first_part.len();
883            let mut combined = Vec::with_capacity(len);
884            combined.extend_from_slice(first_part);
885            combined.extend_from_slice(&next[..second_len]);
886            Ok(ByteView::from_vec(combined))
887        }
888    }
889
890    /// Get a block from cache, or read it from disk and cache it.
891    fn get_or_read_block(
892        &self,
893        shard_id: u8,
894        file_id: u32,
895        block_offset: u64,
896    ) -> DbResult<Arc<AlignedBuf>> {
897        let key = BlockKey {
898            shard_id,
899            file_id,
900            block_offset,
901        };
902        if let Some(cached) = self.cache.get(&key) {
903            return Ok(cached);
904        }
905        let shard = &self.engine.shards()[shard_id as usize];
906        let (buf, is_full_block) = shard.read_block(file_id, block_offset)?;
907        let arc = Arc::new(buf);
908        // Only cache full blocks (entirely within the file's data region).
909        // Partial blocks at the end of a file have zero-padded tails that
910        // would become stale after file rotation or further writes.
911        if is_full_block {
912            self.cache.insert(key, arc.clone());
913        }
914        Ok(arc)
915    }
916
917    /// Compare-and-swap: if current value == expected, replace with new_value.
918    /// Returns `Ok(())` on success, `Err(CasMismatch)` if current != expected,
919    /// `Err(KeyNotFound)` if key doesn't exist.
920    ///
921    /// **Caveat:** Holds the shard lock while reading the current value. On a
922    /// block-cache miss this performs disk I/O under the lock, blocking all
923    /// writes to the same shard. Pre-warm the cache to avoid latency spikes.
924    pub fn cas(&self, key: &K, expected: &[u8], new_value: &[u8]) -> DbResult<()> {
925        metrics::counter!("armdb.ops", "op" => "cas", "tree" => "var_tree").increment(1);
926        #[cfg(feature = "hot-path-tracing")]
927        tracing::trace!("var_tree.cas");
928        let shard_id = self.shard_for(key);
929        let shard = &self.engine.shards()[shard_id];
930        let mut inner = shard.lock();
931
932        let guard = self.index.collector().enter();
933        let existing = self
934            .index
935            .get(key.as_bytes(), &guard)
936            .ok_or(DbError::KeyNotFound)?;
937
938        let disk = *existing.load_disk();
939        let current = self
940            .read_value_locked(&disk, &inner)
941            .ok_or(DbError::KeyNotFound)?;
942        if current.as_ref() != expected {
943            return Err(DbError::CasMismatch);
944        }
945
946        let (new_disk_loc, _gsn) =
947            inner.append_entry(shard_id as u8, key.as_bytes(), new_value, false)?;
948
949        let new_disk = Box::into_raw(Box::new(new_disk_loc));
950        let old_disk_ptr = existing.swap_disk(new_disk);
951        let old_disk = unsafe { *old_disk_ptr };
952        inner.add_dead_bytes(
953            old_disk.file_id,
954            crate::entry::entry_size(size_of::<K>(), old_disk.len),
955        );
956        unsafe {
957            self.index
958                .collector()
959                .retire(old_disk_ptr, seize::reclaim::boxed::<DiskLoc>);
960        }
961
962        drop(inner);
963        self.hook.on_write(
964            key,
965            if H::NEEDS_OLD_VALUE {
966                Some(&*current)
967            } else {
968                None
969            },
970            Some(new_value),
971        );
972        Ok(())
973    }
974
975    /// Atomically read-modify-write. Returns `Some(new_value)` if key existed, `None` otherwise.
976    /// The closure receives the current value and returns a new ByteView.
977    /// The closure must not be heavy (shard lock is held).
978    ///
979    /// **Caveat:** Holds the shard lock while reading the current value. On a
980    /// block-cache miss this performs disk I/O under the lock, blocking all
981    /// writes to the same shard. Pre-warm the cache to avoid latency spikes.
982    pub fn update(&self, key: &K, f: impl FnOnce(&[u8]) -> ByteView) -> DbResult<Option<ByteView>> {
983        self.update_inner(key, f, false)
984    }
985
986    /// Like [`update()`](Self::update), but returns `Some(old_value)` instead of the new one.
987    pub fn fetch_update(
988        &self,
989        key: &K,
990        f: impl FnOnce(&[u8]) -> ByteView,
991    ) -> DbResult<Option<ByteView>> {
992        self.update_inner(key, f, true)
993    }
994
995    pub(crate) fn try_update_inner(
996        &self,
997        key: &K,
998        f: impl FnOnce(&[u8]) -> DbResult<Option<ByteView>>,
999        return_old: bool,
1000    ) -> DbResult<Option<ByteView>> {
1001        metrics::counter!("armdb.ops", "op" => "update", "tree" => "var_tree").increment(1);
1002        #[cfg(feature = "hot-path-tracing")]
1003        tracing::trace!("var_tree.update");
1004        let shard_id = self.shard_for(key);
1005        let shard = &self.engine.shards()[shard_id];
1006        let mut inner = shard.lock();
1007
1008        let guard = self.index.collector().enter();
1009        let existing = match self.index.get(key.as_bytes(), &guard) {
1010            Some(n) => n,
1011            None => return Ok(None),
1012        };
1013
1014        let disk = *existing.load_disk();
1015        let current = match self.read_value_locked(&disk, &inner) {
1016            Some(v) => v,
1017            None => return Ok(None),
1018        };
1019
1020        let new_value = match f(&current)? {
1021            Some(v) => v,
1022            None => return Ok(Some(current)),
1023        };
1024
1025        let (new_disk_loc, _gsn) =
1026            inner.append_entry(shard_id as u8, key.as_bytes(), &new_value, false)?;
1027
1028        let new_disk = Box::into_raw(Box::new(new_disk_loc));
1029        let old_disk_ptr = existing.swap_disk(new_disk);
1030        let old_disk = unsafe { *old_disk_ptr };
1031        inner.add_dead_bytes(
1032            old_disk.file_id,
1033            crate::entry::entry_size(size_of::<K>(), old_disk.len),
1034        );
1035        unsafe {
1036            self.index
1037                .collector()
1038                .retire(old_disk_ptr, seize::reclaim::boxed::<DiskLoc>);
1039        }
1040
1041        drop(inner);
1042        self.hook.on_write(
1043            key,
1044            if H::NEEDS_OLD_VALUE {
1045                Some(&*current)
1046            } else {
1047                None
1048            },
1049            Some(&*new_value),
1050        );
1051        Ok(Some(if return_old { current } else { new_value }))
1052    }
1053
1054    fn update_inner(
1055        &self,
1056        key: &K,
1057        f: impl FnOnce(&[u8]) -> ByteView,
1058        return_old: bool,
1059    ) -> DbResult<Option<ByteView>> {
1060        self.try_update_inner(key, |bytes| Ok(Some(f(bytes))), return_old)
1061    }
1062
1063    /// Read value when shard lock is already held, propagating `DbError`.
1064    /// Used by both `read_value_locked` (Option wrapper) and the large-value
1065    /// fallback in `read_value_cached_inner`.
1066    fn read_value_locked_result(&self, disk: &DiskLoc, inner: &ShardInner) -> DbResult<ByteView> {
1067        let len = disk.len as usize;
1068
1069        // 1. Write buffer (for unflushed data)
1070        if inner.active.file_id == disk.file_id
1071            && let Some(bytes) = inner.write_buf.read(disk.offset as u64, len)
1072        {
1073            return Ok(ByteView::new(bytes));
1074        }
1075
1076        // 2. Block cache (lock-free, single-block fast path)
1077        let block_offset = disk.offset as u64 & !4095;
1078        let start = (disk.offset & 4095) as usize;
1079        if start + len <= 4096 {
1080            let cache_key = BlockKey {
1081                shard_id: disk.shard_id,
1082                file_id: disk.file_id,
1083                block_offset,
1084            };
1085            if let Some(block) = self.cache.get(&cache_key) {
1086                return Ok(ByteView::new(&block[start..start + len]));
1087            }
1088
1089            // 2b. Cache miss — read block under held lock, populate cache
1090            let (buf, is_full_block) = inner.read_block_locked(disk.file_id, block_offset)?;
1091            let arc = Arc::new(buf);
1092            if is_full_block {
1093                self.cache.insert(cache_key, arc.clone());
1094            }
1095            return Ok(ByteView::new(&arc[start..start + len]));
1096        }
1097
1098        // 3. Multi-block: disk read via encryption-aware helper (no cache).
1099        let bytes = inner.read_value_from_disk_locked(disk)?;
1100        Ok(ByteView::new(&bytes))
1101    }
1102
1103    /// Read value when shard lock is already held. Used by CAS/update.
1104    ///
1105    /// Thin `Option` wrapper around `read_value_locked_result` that preserves
1106    /// the existing call-site contract: `StaleDiskLoc` is logged at error
1107    /// level and collapsed to `None`; other errors collapse to `None` silently.
1108    fn read_value_locked(&self, disk: &DiskLoc, inner: &ShardInner) -> Option<ByteView> {
1109        match self.read_value_locked_result(disk, inner) {
1110            Ok(v) => Some(v),
1111            Err(DbError::StaleDiskLoc) => {
1112                tracing::error!(
1113                    file_id = disk.file_id,
1114                    shard_id = disk.shard_id,
1115                    "stale DiskLoc under shard lock - programming bug",
1116                );
1117                None
1118            }
1119            Err(_) => None,
1120        }
1121    }
1122
1123    pub fn shard_for(&self, key: &K) -> usize {
1124        crate::shard_for_key(key, self.shard_prefix_bits, self.engine.shards().len())
1125    }
1126}
1127
1128#[cfg(feature = "replication")]
1129impl<K: Key, H: WriteHook<K>> crate::replication::ReplicationTarget for VarTree<K, H> {
1130    fn apply_entry(
1131        &self,
1132        _shard_inner: &mut crate::shard::ShardInner,
1133        shard_id: u8,
1134        file_id: u32,
1135        entry_offset: u64,
1136        header: &crate::entry::EntryHeader,
1137        key: &[u8],
1138        _value: &[u8],
1139    ) -> DbResult<crate::replication::ApplyOutcome> {
1140        use crate::replication::ApplyOutcome;
1141
1142        let key: K = K::from_bytes(key);
1143
1144        let value_offset =
1145            entry_offset + size_of::<crate::entry::EntryHeader>() as u64 + size_of::<K>() as u64;
1146        let disk = DiskLoc::new(shard_id, file_id, value_offset as u32, header.value_len);
1147
1148        if header.is_tombstone() {
1149            let guard = self.index.collector().enter();
1150            let removed = self.index.remove(key.as_bytes(), &guard);
1151            match removed {
1152                Some(node_ptr) => {
1153                    let old_disk = *unsafe { &*node_ptr }.load_disk();
1154                    Ok(ApplyOutcome::TombstoneRemoved(old_disk))
1155                }
1156                None => Ok(ApplyOutcome::Inserted), // no-op tombstone — no dead bytes
1157            }
1158        } else {
1159            let guard = self.index.collector().enter();
1160            let height = random_height();
1161            let node_ptr = VarNode::alloc(key, disk, height);
1162            match self.index.insert(node_ptr, &guard) {
1163                InsertResult::Inserted => Ok(ApplyOutcome::Inserted),
1164                InsertResult::Exists(existing) => {
1165                    let new_disk_ptr = Box::into_raw(Box::new(disk));
1166                    let old_disk_ptr = existing.swap_disk(new_disk_ptr);
1167                    let old_disk = unsafe { *old_disk_ptr };
1168                    unsafe {
1169                        self.index
1170                            .collector()
1171                            .retire(old_disk_ptr, seize::reclaim::boxed::<DiskLoc>);
1172                    }
1173                    // Deallocate the unused node
1174                    unsafe {
1175                        (*node_ptr)
1176                            .disk
1177                            .store(std::ptr::null_mut(), std::sync::atomic::Ordering::Relaxed);
1178                        VarNode::<K>::dealloc_node(node_ptr);
1179                    }
1180                    Ok(ApplyOutcome::Replaced(old_disk))
1181                }
1182            }
1183        }
1184    }
1185
1186    fn try_apply_entry(
1187        &self,
1188        shard_inner: &mut crate::shard::ShardInner,
1189        shard_id: u8,
1190        file_id: u32,
1191        entry_offset: u64,
1192        header: &crate::entry::EntryHeader,
1193        raw_after_header: &[u8],
1194    ) -> DbResult<crate::replication::ApplyOutcome> {
1195        use crate::replication::ApplyOutcome;
1196
1197        if raw_after_header.len() < size_of::<K>() + header.value_len as usize {
1198            return Ok(ApplyOutcome::NotMatched);
1199        }
1200        let key = &raw_after_header[..size_of::<K>()];
1201        let value = &raw_after_header[size_of::<K>()..size_of::<K>() + header.value_len as usize];
1202        let crc = crate::entry::compute_crc32(header.gsn, header.value_len, key, value);
1203        if crc != header.crc32 {
1204            return Ok(ApplyOutcome::NotMatched);
1205        }
1206        self.apply_entry(
1207            shard_inner,
1208            shard_id,
1209            file_id,
1210            entry_offset,
1211            header,
1212            key,
1213            value,
1214        )
1215    }
1216
1217    fn key_len(&self) -> usize {
1218        size_of::<K>()
1219    }
1220}
1221
1222#[cfg(feature = "replication")]
1223impl<K: Key, H: WriteHook<K>> VarTree<K, H> {
1224    /// Install SPSC replication producers into every shard and start a
1225    /// `ReplicationServer` bound to `bind_addr`.
1226    ///
1227    /// # Single-call contract
1228    ///
1229    /// Each call installs fresh SPSC producers, replacing any previously
1230    /// installed ones. Call this at most once per `VarTree` instance — a
1231    /// second call will orphan the in-flight producer of any active streaming
1232    /// connection on the first server, which will then observe an empty ring
1233    /// buffer and silently stop forwarding entries.
1234    pub fn start_replication_server(
1235        &self,
1236        bind_addr: std::net::SocketAddr,
1237        signal: crate::shutdown::ShutdownSignal,
1238    ) -> crate::error::DbResult<crate::replication::ReplicationServer> {
1239        let consumers = self.install_replication_producers()?;
1240        crate::replication::ReplicationServer::start(
1241            bind_addr,
1242            self.engine.shards().clone(),
1243            consumers,
1244            self.engine.config().max_file_size,
1245            signal,
1246        )
1247    }
1248
1249    pub fn start_replication_server_with_options(
1250        &self,
1251        bind_addr: std::net::SocketAddr,
1252        signal: crate::shutdown::ShutdownSignal,
1253        options: crate::replication::ReplicationServerOptions,
1254    ) -> crate::error::DbResult<crate::replication::ReplicationServer> {
1255        let consumers = self.install_replication_producers()?;
1256        crate::replication::ReplicationServer::start_with_options(
1257            bind_addr,
1258            self.engine.shards().clone(),
1259            consumers,
1260            self.engine.config().max_file_size,
1261            signal,
1262            options,
1263        )
1264    }
1265
1266    fn install_replication_producers(
1267        &self,
1268    ) -> crate::error::DbResult<Vec<rtrb::Consumer<crate::replication::ReplicationEntry>>> {
1269        const SPSC_CAPACITY: usize = 4096;
1270        let shards = self.engine.shards();
1271        let mut consumers = Vec::with_capacity(shards.len());
1272        for shard in shards.iter() {
1273            let (p, c) = rtrb::RingBuffer::new(SPSC_CAPACITY);
1274            shard.set_replication_producer(p);
1275            consumers.push(c);
1276        }
1277        Ok(consumers)
1278    }
1279
1280    /// Start a `ReplicationClient` that streams entries from `leader_addr`
1281    /// into `registry`. Symmetric to [`Self::start_replication_server`].
1282    ///
1283    /// `key_len` is derived from `size_of::<K>()`.
1284    pub fn start_replication_client(
1285        &self,
1286        leader_addr: std::net::SocketAddr,
1287        registry: std::sync::Arc<crate::replication::ReplicationRegistry>,
1288        signal: crate::shutdown::ShutdownSignal,
1289    ) -> crate::error::DbResult<crate::replication::ReplicationClient> {
1290        crate::replication::ReplicationClient::start(
1291            leader_addr,
1292            self.engine.shards().clone(),
1293            registry,
1294            size_of::<K>() as u16,
1295            signal,
1296        )
1297    }
1298
1299    pub fn start_replication_client_with_options(
1300        &self,
1301        leader_addr: std::net::SocketAddr,
1302        registry: std::sync::Arc<crate::replication::ReplicationRegistry>,
1303        signal: crate::shutdown::ShutdownSignal,
1304        options: crate::replication::ReplicationClientOptions,
1305    ) -> crate::error::DbResult<crate::replication::ReplicationClient> {
1306        crate::replication::ReplicationClient::start_with_options(
1307            leader_addr,
1308            self.engine.shards().clone(),
1309            registry,
1310            size_of::<K>() as u16,
1311            signal,
1312            options,
1313        )
1314    }
1315}
1316
1317#[cfg(feature = "replication")]
1318impl<K, H> VarTree<K, H>
1319where
1320    K: Key + Send + Sync + 'static,
1321    H: WriteHook<K> + Send + Sync + 'static,
1322{
1323    /// Wrap a shared handle to this tree as a `Box<dyn ReplicationTarget>`.
1324    ///
1325    /// The returned box holds an `Arc` clone — the caller retains full read
1326    /// access to the original tree through the `Arc` while the registry owns
1327    /// the box. This is the intended pattern for follower-side wiring:
1328    ///
1329    /// ```ignore
1330    /// let follower = Arc::new(VarTree::<[u8; 8]>::open(path, cfg)?);
1331    /// let registry = ReplicationRegistry::new(follower.as_replication_target());
1332    /// // `follower` remains usable for .get() etc.
1333    /// ```
1334    pub fn as_replication_target(
1335        self: &std::sync::Arc<Self>,
1336    ) -> Box<dyn crate::replication::ReplicationTarget> {
1337        Box::new(std::sync::Arc::clone(self))
1338    }
1339}
1340
1341/// Handle for atomic multi-key operations within a single shard.
1342/// Obtained via [`VarTree::atomic`]. The shard lock is held for the
1343/// lifetime of this struct — keep the closure short.
1344pub struct VarShard<'a, K: Key, H: WriteHook<K> = NoHook> {
1345    tree: &'a VarTree<K, H>,
1346    inner: MutexGuard<'a, ShardInner>,
1347    shard_id: usize,
1348    guard: seize::LocalGuard<'a>,
1349}
1350
1351impl<K: Key, H: WriteHook<K>> VarShard<'_, K, H> {
1352    pub fn put(&mut self, key: &K, value: &[u8]) -> DbResult<()> {
1353        self.check_shard(key)?;
1354        self.tree
1355            .put_locked(self.shard_id, &mut self.inner, &self.guard, key, value)
1356    }
1357
1358    pub fn insert(&mut self, key: &K, value: &[u8]) -> DbResult<()> {
1359        self.check_shard(key)?;
1360        self.tree
1361            .insert_locked(self.shard_id, &mut self.inner, &self.guard, key, value)
1362    }
1363
1364    pub fn delete(&mut self, key: &K) -> DbResult<bool> {
1365        self.check_shard(key)?;
1366        self.tree
1367            .delete_locked(self.shard_id, &mut self.inner, &self.guard, key)
1368    }
1369
1370    pub fn get(&self, key: &K) -> Option<ByteView> {
1371        let node = self.tree.index.get(key.as_bytes(), &self.guard)?;
1372        let disk = *node.load_disk();
1373        self.tree.read_value_locked(&disk, &self.inner)
1374    }
1375
1376    pub fn get_or_err(&self, key: &K) -> DbResult<ByteView> {
1377        self.get(key).ok_or(DbError::KeyNotFound)
1378    }
1379
1380    pub fn contains(&self, key: &K) -> bool {
1381        self.tree.index.get(key.as_bytes(), &self.guard).is_some()
1382    }
1383
1384    fn check_shard(&self, key: &K) -> DbResult<()> {
1385        if self.tree.shard_for(key) != self.shard_id {
1386            return Err(DbError::ShardMismatch);
1387        }
1388        Ok(())
1389    }
1390}
1391
1392fn bound_owned<K: Copy>(b: &Bound<&K>) -> Bound<K> {
1393    match b {
1394        Bound::Included(k) => Bound::Included(**k),
1395        Bound::Excluded(k) => Bound::Excluded(**k),
1396        Bound::Unbounded => Bound::Unbounded,
1397    }
1398}
1399
1400fn prefix_to_end_bound<K: Key>(prefix: &[u8]) -> Bound<K> {
1401    let mut incremented = prefix.to_vec();
1402    let mut carry = true;
1403    for byte in incremented.iter_mut().rev() {
1404        if carry {
1405            if *byte == 0xFF {
1406                *byte = 0x00;
1407            } else {
1408                *byte += 1;
1409                carry = false;
1410                break;
1411            }
1412        }
1413    }
1414    if carry {
1415        Bound::Unbounded
1416    } else {
1417        let mut end = K::zeroed();
1418        end.as_bytes_mut()[..incremented.len()].copy_from_slice(&incremented);
1419        Bound::Excluded(end)
1420    }
1421}
1422
1423/// Iterator over entries in a `VarTree`. Returned by `iter()`, `range()`, and `prefix_iter()`.
1424///
1425/// Weakly-consistent: concurrent inserts/updates may be visible during iteration.
1426/// Deleted entries are skipped. The `seize` guard prevents use-after-free.
1427/// Each `next()` may perform disk I/O on a block-cache miss.
1428pub struct VarIter<'a, K: Key, H: WriteHook<K> = NoHook> {
1429    tree: &'a VarTree<K, H>,
1430    front: *mut VarNode<K>,
1431    /// `None` = not yet resolved (lazy). Computed on first `next_back()` call.
1432    back: Option<*mut VarNode<K>>,
1433    end: Bound<K>,
1434    start: Bound<K>,
1435    reversed: bool,
1436    done: bool,
1437    _guard: seize::LocalGuard<'a>,
1438}
1439
1440impl<K: Key, H: WriteHook<K>> Iterator for VarIter<'_, K, H> {
1441    type Item = (K, ByteView);
1442
1443    fn next(&mut self) -> Option<Self::Item> {
1444        loop {
1445            if self.done || self.front.is_null() {
1446                return None;
1447            }
1448            let node = unsafe { &*self.front };
1449            let converged = self.back.is_some_and(|back| std::ptr::eq(self.front, back));
1450            self.front = crate::skiplist::strip_mark(
1451                node.tower(0).load(std::sync::atomic::Ordering::Acquire),
1452            );
1453            if converged {
1454                self.done = true;
1455            }
1456            if node.is_marked() {
1457                if converged {
1458                    return None;
1459                }
1460                continue;
1461            }
1462            if !self.check_end(&node.key) {
1463                self.done = true;
1464                return None;
1465            }
1466            match self.tree.read_value_cached(node, &self._guard) {
1467                Some(value) => return Some((node.key, value)),
1468                None => {
1469                    if converged {
1470                        return None;
1471                    }
1472                    continue;
1473                }
1474            }
1475        }
1476    }
1477}
1478
1479impl<K: Key, H: WriteHook<K>> DoubleEndedIterator for VarIter<'_, K, H> {
1480    fn next_back(&mut self) -> Option<Self::Item> {
1481        if self.back.is_none() {
1482            self.back = Some(self.resolve_back());
1483            if self.front.is_null() {
1484                self.done = true;
1485            }
1486        }
1487        loop {
1488            let back = self.back.unwrap_or(std::ptr::null_mut());
1489            if self.done || back.is_null() {
1490                return None;
1491            }
1492            let node = unsafe { &*back };
1493            let key = node.key;
1494            let converged = std::ptr::eq(self.front, back);
1495            self.back = Some(self.tree.index().find_last_lt(key.as_bytes(), &self._guard));
1496            if converged {
1497                self.done = true;
1498            }
1499            if node.is_marked() {
1500                if converged {
1501                    return None;
1502                }
1503                continue;
1504            }
1505            if !self.check_start(&key) {
1506                self.done = true;
1507                return None;
1508            }
1509            match self.tree.read_value_cached(node, &self._guard) {
1510                Some(value) => return Some((key, value)),
1511                None => {
1512                    if converged {
1513                        return None;
1514                    }
1515                    continue;
1516                }
1517            }
1518        }
1519    }
1520}
1521
1522impl<K: Key, H: WriteHook<K>> VarIter<'_, K, H> {
1523    /// Lazily resolve the back pointer for DoubleEndedIterator.
1524    fn resolve_back(&self) -> *mut VarNode<K> {
1525        let index = self.tree.index();
1526        match &self.end {
1527            Bound::Unbounded => index.find_last(&self._guard),
1528            Bound::Excluded(k) => index.find_last_lt(k.as_bytes(), &self._guard),
1529            Bound::Included(k) => {
1530                let ge = index.find_first_ge(k.as_bytes(), &self._guard);
1531                if !ge.is_null()
1532                    && !unsafe { &*ge }.is_marked()
1533                    && unsafe { &*ge }.key_bytes() == k.as_bytes()
1534                {
1535                    ge
1536                } else {
1537                    index.find_last_lt(k.as_bytes(), &self._guard)
1538                }
1539            }
1540        }
1541    }
1542
1543    #[inline(always)]
1544    fn check_end(&self, key: &K) -> bool {
1545        match &self.end {
1546            Bound::Unbounded => true,
1547            Bound::Excluded(end) => {
1548                if self.reversed {
1549                    key.as_bytes() > end.as_bytes()
1550                } else {
1551                    key.as_bytes() < end.as_bytes()
1552                }
1553            }
1554            Bound::Included(end) => {
1555                if self.reversed {
1556                    key.as_bytes() >= end.as_bytes()
1557                } else {
1558                    key.as_bytes() <= end.as_bytes()
1559                }
1560            }
1561        }
1562    }
1563
1564    #[inline(always)]
1565    fn check_start(&self, key: &K) -> bool {
1566        match &self.start {
1567            Bound::Unbounded => true,
1568            Bound::Excluded(s) => {
1569                if self.reversed {
1570                    key.as_bytes() < s.as_bytes()
1571                } else {
1572                    key.as_bytes() > s.as_bytes()
1573                }
1574            }
1575            Bound::Included(s) => {
1576                if self.reversed {
1577                    key.as_bytes() <= s.as_bytes()
1578                } else {
1579                    key.as_bytes() >= s.as_bytes()
1580                }
1581            }
1582        }
1583    }
1584    /// Collect only the keys. Convenience for backward compatibility.
1585    pub fn collect_keys(&mut self) -> Vec<K> {
1586        self.map(|(k, _)| k).collect()
1587    }
1588
1589    /// Collect all remaining entries. Convenience for backward compatibility.
1590    pub fn collect_entries(&mut self) -> Vec<(K, ByteView)> {
1591        self.collect()
1592    }
1593}
1594
1595#[cfg(test)]
1596mod tests {
1597    use super::*;
1598    use crate::Config;
1599    use crate::compaction::compact_shard;
1600    use tempfile::tempdir;
1601
1602    use std::sync::atomic::{AtomicUsize, Ordering as AtomicOrdering};
1603
1604    /// Test hook with counters for on_write / on_init calls and the last observed new value for
1605    /// on_write. NEEDS_INIT and NEEDS_OLD_VALUE are parameterized by const generics.
1606    #[derive(Default)]
1607    struct CountingHook<const NEEDS_INIT: bool, const NEEDS_OLD: bool> {
1608        writes: AtomicUsize,
1609        writes_with_old: AtomicUsize,
1610        inits: AtomicUsize,
1611        last_write_new: crate::sync::Mutex<Option<Vec<u8>>>,
1612        last_init_value: crate::sync::Mutex<Option<Vec<u8>>>,
1613    }
1614
1615    impl<const NEEDS_INIT: bool, const NEEDS_OLD: bool> WriteHook<[u8; 8]>
1616        for CountingHook<NEEDS_INIT, NEEDS_OLD>
1617    {
1618        const NEEDS_OLD_VALUE: bool = NEEDS_OLD;
1619        const NEEDS_INIT: bool = NEEDS_INIT;
1620
1621        fn on_write(&self, _key: &[u8; 8], old: Option<&[u8]>, new: Option<&[u8]>) {
1622            self.writes.fetch_add(1, AtomicOrdering::Relaxed);
1623            if old.is_some() {
1624                self.writes_with_old.fetch_add(1, AtomicOrdering::Relaxed);
1625            }
1626            *crate::sync::lock(&self.last_write_new) = new.map(<[u8]>::to_vec);
1627        }
1628
1629        fn on_init(&self, _key: &[u8; 8], value: &[u8]) {
1630            self.inits.fetch_add(1, AtomicOrdering::Relaxed);
1631            *crate::sync::lock(&self.last_init_value) = Some(value.to_vec());
1632        }
1633    }
1634
1635    fn open_test_tree(dir: &std::path::Path) -> VarTree<[u8; 8]> {
1636        let mut cfg = Config::test();
1637        cfg.shard_count = 1;
1638        cfg.max_file_size = 8192;
1639        cfg.write_buffer_size = 8192;
1640        cfg.compaction_threshold = 0.0;
1641        VarTree::open(dir, cfg).expect("open test tree")
1642    }
1643
1644    fn open_test_tree_hooked<const NEEDS_INIT: bool, const NEEDS_OLD: bool>(
1645        dir: &std::path::Path,
1646        hook: CountingHook<NEEDS_INIT, NEEDS_OLD>,
1647    ) -> VarTree<[u8; 8], CountingHook<NEEDS_INIT, NEEDS_OLD>> {
1648        let mut cfg = Config::test();
1649        cfg.shard_count = 1;
1650        cfg.max_file_size = 8192;
1651        cfg.write_buffer_size = 8192;
1652        cfg.compaction_threshold = 0.0;
1653        VarTree::open_hooked(dir, cfg, hook).expect("open hooked test tree")
1654    }
1655
1656    fn put_until_compactable(tree: &VarTree<[u8; 8]>, key: [u8; 8]) -> DiskLoc {
1657        // Capture the DiskLoc after the first put — this points to an early
1658        // file that will be erased by compaction (all later puts overwrite
1659        // the index, leaving this entry dead in its file).
1660        tree.put(&key, &[0u8; 256]).expect("first put");
1661        let snap = {
1662            let guard = tree.index.collector().enter();
1663            let node = tree.index.get(key.as_bytes(), &guard).expect("indexed");
1664            *node.load_disk()
1665        };
1666        // Each put is 280 bytes (16 header + 8 key + 256 value, 8-byte aligned).
1667        // With max_file_size=8192, ~30 puts cross the first rotation boundary.
1668        // Use 65 puts so the snap file is sealed as immutable AND fully dead.
1669        for i in 1..65u8 {
1670            tree.put(&key, &[i; 256]).expect("put");
1671        }
1672        tree.put(&key, b"final-value-payload-XX")
1673            .expect("final put");
1674        snap
1675    }
1676
1677    #[test]
1678    fn read_value_cached_inner_returns_stale_after_compaction() {
1679        let dir = tempdir().unwrap();
1680        let tree = open_test_tree(dir.path());
1681
1682        let key = 7u64.to_be_bytes();
1683        let snap = put_until_compactable(&tree, key);
1684
1685        let shard = &tree.engine.shards()[snap.shard_id as usize];
1686        let _ = compact_shard(shard, &tree, 0.0).expect("compaction");
1687
1688        match tree.read_value_cached_inner(&snap) {
1689            Err(DbError::StaleDiskLoc) => {}
1690            Ok(v) => panic!("expected StaleDiskLoc, got Ok({:?})", v.as_bytes()),
1691            Err(e) => panic!("expected StaleDiskLoc, got Err({e})"),
1692        }
1693    }
1694
1695    #[test]
1696    fn read_value_cached_returns_some_after_compaction() {
1697        let dir = tempdir().unwrap();
1698        let tree = open_test_tree(dir.path());
1699
1700        let key = 11u64.to_be_bytes();
1701        let _snap = put_until_compactable(&tree, key);
1702        let shard = &tree.engine.shards()[0];
1703        let _ = compact_shard(shard, &tree, 0.0).expect("compaction");
1704
1705        let guard = tree.index.collector().enter();
1706        let node = tree.index.get(key.as_bytes(), &guard).expect("indexed");
1707        let v = tree
1708            .read_value_cached(node, &guard)
1709            .expect("post-compaction value must be readable");
1710        assert_eq!(v.as_bytes(), b"final-value-payload-XX");
1711    }
1712
1713    #[test]
1714    fn get_during_compaction_returns_some() {
1715        let dir = tempdir().unwrap();
1716        let tree = open_test_tree(dir.path());
1717
1718        let key = 13u64.to_be_bytes();
1719        let _snap = put_until_compactable(&tree, key);
1720        let shard = &tree.engine.shards()[0];
1721        let _ = compact_shard(shard, &tree, 0.0).expect("compaction");
1722
1723        let v = tree.get(&key).expect("post-compaction get");
1724        assert_eq!(v.as_bytes(), b"final-value-payload-XX");
1725    }
1726
1727    #[test]
1728    fn iter_during_compaction_yields_all_live_keys() {
1729        let dir = tempdir().unwrap();
1730        let tree = open_test_tree(dir.path());
1731
1732        for k in 1u64..=3 {
1733            put_until_compactable(&tree, k.to_be_bytes());
1734        }
1735        let shard = &tree.engine.shards()[0];
1736        let _ = compact_shard(shard, &tree, 0.0).expect("compaction");
1737
1738        let collected: std::collections::BTreeMap<[u8; 8], Vec<u8>> = tree
1739            .iter()
1740            .map(|(k, v)| (k, v.as_bytes().to_vec()))
1741            .collect();
1742        assert_eq!(collected.len(), 3);
1743        for k in 1u64..=3 {
1744            let bytes = collected
1745                .get(&k.to_be_bytes())
1746                .expect("every original key must remain");
1747            assert_eq!(bytes.as_slice(), b"final-value-payload-XX");
1748        }
1749    }
1750
1751    #[test]
1752    fn get_or_read_block_returns_stale_for_unknown_file_id() {
1753        let dir = tempdir().unwrap();
1754        let tree = open_test_tree(dir.path());
1755
1756        match tree.get_or_read_block(0, 9999, 0) {
1757            Err(DbError::StaleDiskLoc) => {}
1758            Ok(_) => panic!("expected StaleDiskLoc, got Ok"),
1759            Err(e) => panic!("expected StaleDiskLoc, got Err({e})"),
1760        }
1761    }
1762
1763    #[test]
1764    fn extract_from_block_propagates_next_block_error() {
1765        let block = AlignedBuf::zeroed(4096);
1766        let start = 4090;
1767        let len = 32;
1768        let result: DbResult<ByteView> =
1769            VarTree::<[u8; 8]>::extract_from_block(&block, start, len, || {
1770                Err(DbError::StaleDiskLoc)
1771            });
1772        match result {
1773            Err(DbError::StaleDiskLoc) => {}
1774            Ok(_) => panic!("expected StaleDiskLoc, got Ok"),
1775            Err(e) => panic!("expected StaleDiskLoc, got Err({e})"),
1776        }
1777    }
1778
1779    #[test]
1780    fn extract_from_block_multi_block_first_cached_second_stale() {
1781        let dir = tempdir().unwrap();
1782        let tree = open_test_tree(dir.path());
1783
1784        let key = 21u64.to_be_bytes();
1785        let value = vec![0xCDu8; 4073];
1786        tree.put(&key, &value).expect("initial put");
1787
1788        // Capture the multi-block DiskLoc BEFORE overwriting.
1789        let snap = {
1790            let guard = tree.index.collector().enter();
1791            let node = tree.index.get(key.as_bytes(), &guard).expect("indexed");
1792            *node.load_disk()
1793        };
1794
1795        // Make the initial entry dead by overwriting many times so compaction
1796        // erases the file containing `snap` (need enough writes to force file
1797        // rotation past max_file_size=8192 and seal the file as immutable).
1798        // 65 overwrites ensures the snap file is fully dead after two rotations.
1799        for i in 0..65u8 {
1800            tree.put(&key, &[i; 256]).expect("overwrite");
1801        }
1802        tree.put(&key, b"final").expect("final");
1803
1804        let shard = &tree.engine.shards()[0];
1805        let _ = compact_shard(shard, &tree, 0.0).expect("compaction");
1806
1807        let block_offset = snap.offset as u64 & !4095;
1808        let first_block = AlignedBuf::zeroed(4096);
1809        let cache_key = BlockKey {
1810            shard_id: snap.shard_id,
1811            file_id: snap.file_id,
1812            block_offset,
1813        };
1814        tree.cache.insert(cache_key, Arc::new(first_block));
1815
1816        match tree.read_value_cached_inner(&snap) {
1817            Err(DbError::StaleDiskLoc) => {}
1818            Ok(_) => panic!("expected StaleDiskLoc from second block, got Ok"),
1819            Err(e) => panic!("expected StaleDiskLoc, got Err({e})"),
1820        }
1821    }
1822
1823    #[test]
1824    fn retry_limit_returns_none_on_persistent_stale() {
1825        let dir = tempdir().unwrap();
1826        let tree = open_test_tree(dir.path());
1827
1828        let key = 99u64.to_be_bytes();
1829        tree.put(&key, b"payload").expect("put");
1830
1831        // Force every read_value_cached_inner call to see Stale by manually
1832        // clearing inner.immutable AND moving the active.file_id out of the
1833        // way. node.load_disk() still points at the original (now-unreachable)
1834        // file_id, so each retry hits Shard::read_block, which returns
1835        // StaleDiskLoc.
1836        let guard = tree.index.collector().enter();
1837        let node = tree.index.get(key.as_bytes(), &guard).expect("indexed");
1838        let snap = *node.load_disk();
1839        drop(guard);
1840
1841        {
1842            let shard = &tree.engine.shards()[snap.shard_id as usize];
1843            let mut inner = shard.lock();
1844            inner.immutable = Vec::new();
1845            // Reassign active.file_id to a sentinel value that does not match
1846            // snap.file_id, so the active-file branch in Shard::read_block
1847            // never matches either.
1848            inner.active.file_id = u32::MAX;
1849        }
1850
1851        let guard = tree.index.collector().enter();
1852        let node = tree
1853            .index
1854            .get(key.as_bytes(), &guard)
1855            .expect("still indexed");
1856        assert!(
1857            tree.read_value_cached(node, &guard).is_none(),
1858            "MAX_STALE_RETRIES must terminate the loop and return None"
1859        );
1860    }
1861
1862    #[test]
1863    fn var_tree_replay_init_fires_on_init_per_live_key_raw() {
1864        let dir = tempdir().unwrap();
1865        let tree = open_test_tree_hooked::<true, false>(dir.path(), CountingHook::default());
1866
1867        for i in 0u64..5 {
1868            tree.put(&i.to_be_bytes(), &[i as u8; 16]).expect("put");
1869        }
1870        // Delete one — replay_init must not see it.
1871        tree.delete(&3u64.to_be_bytes()).expect("delete");
1872
1873        // Reset counters so we measure only the effect of replay_init.
1874        tree.hook.writes.store(0, AtomicOrdering::Relaxed);
1875        tree.hook.inits.store(0, AtomicOrdering::Relaxed);
1876
1877        tree.replay_init();
1878
1879        assert_eq!(
1880            tree.hook.inits.load(AtomicOrdering::Relaxed),
1881            4,
1882            "4 live keys"
1883        );
1884        assert_eq!(
1885            tree.hook.writes.load(AtomicOrdering::Relaxed),
1886            0,
1887            "no on_write"
1888        );
1889    }
1890
1891    #[test]
1892    fn var_tree_replay_init_no_hook_is_noop() {
1893        let dir = tempdir().unwrap();
1894        let tree = open_test_tree(dir.path());
1895
1896        for i in 0u64..3 {
1897            tree.put(&i.to_be_bytes(), &[i as u8; 8]).expect("put");
1898        }
1899        // Must finish instantly with no effects.
1900        tree.replay_init();
1901        // sanity: tree is intact.
1902        assert!(tree.get(&0u64.to_be_bytes()).is_some());
1903    }
1904
1905    #[test]
1906    fn var_tree_migrate_keep_fires_on_init_not_on_write_raw() {
1907        use crate::MigrateAction;
1908        let dir = tempdir().unwrap();
1909        let tree = open_test_tree_hooked::<true, false>(dir.path(), CountingHook::default());
1910
1911        for i in 0u64..4 {
1912            tree.put(&i.to_be_bytes(), &[i as u8; 16]).expect("put");
1913        }
1914        tree.hook.writes.store(0, AtomicOrdering::Relaxed);
1915        tree.hook.inits.store(0, AtomicOrdering::Relaxed);
1916
1917        let mutated = tree.migrate(|_, _| MigrateAction::Keep).expect("migrate");
1918
1919        assert_eq!(mutated, 0);
1920        assert_eq!(
1921            tree.hook.inits.load(AtomicOrdering::Relaxed),
1922            4,
1923            "4 keeps -> 4 on_init"
1924        );
1925        assert_eq!(
1926            tree.hook.writes.load(AtomicOrdering::Relaxed),
1927            0,
1928            "Keep must not fire on_write"
1929        );
1930    }
1931
1932    #[test]
1933    fn var_tree_migrate_update_fires_on_init_with_new_value_raw() {
1934        use crate::MigrateAction;
1935        let dir = tempdir().unwrap();
1936        let tree = open_test_tree_hooked::<true, false>(dir.path(), CountingHook::default());
1937
1938        let key = 42u64.to_be_bytes();
1939        tree.put(&key, b"old-value").expect("put");
1940        tree.hook.writes.store(0, AtomicOrdering::Relaxed);
1941        tree.hook.inits.store(0, AtomicOrdering::Relaxed);
1942
1943        let new = ByteView::new(b"new-value");
1944        let mutated = tree
1945            .migrate(move |_, _| MigrateAction::Update(new.clone()))
1946            .expect("migrate");
1947
1948        assert_eq!(mutated, 1);
1949        assert_eq!(tree.hook.inits.load(AtomicOrdering::Relaxed), 1);
1950        assert_eq!(
1951            crate::sync::lock(&tree.hook.last_init_value).as_deref(),
1952            Some(b"new-value".as_ref()),
1953            "on_init must receive the NEW value"
1954        );
1955        assert_eq!(
1956            tree.hook.writes.load(AtomicOrdering::Relaxed),
1957            0,
1958            "Update must NOT fire on_write (was double-firing through self.put)"
1959        );
1960        assert_eq!(tree.get(&key).unwrap().as_bytes(), b"new-value");
1961    }
1962
1963    #[test]
1964    fn var_tree_migrate_delete_fires_no_hooks_raw() {
1965        use crate::MigrateAction;
1966        let dir = tempdir().unwrap();
1967        let tree = open_test_tree_hooked::<true, false>(dir.path(), CountingHook::default());
1968
1969        let key = 7u64.to_be_bytes();
1970        tree.put(&key, b"x").expect("put");
1971        tree.hook.writes.store(0, AtomicOrdering::Relaxed);
1972        tree.hook.inits.store(0, AtomicOrdering::Relaxed);
1973
1974        let mutated = tree.migrate(|_, _| MigrateAction::Delete).expect("migrate");
1975
1976        assert_eq!(mutated, 1);
1977        assert_eq!(tree.hook.inits.load(AtomicOrdering::Relaxed), 0);
1978        assert_eq!(tree.hook.writes.load(AtomicOrdering::Relaxed), 0);
1979        assert!(tree.get(&key).is_none());
1980    }
1981
1982    #[test]
1983    fn var_tree_migrate_no_init_hook_is_silent_for_keep_and_update() {
1984        use crate::MigrateAction;
1985        let dir = tempdir().unwrap();
1986        // NEEDS_INIT = false: on_init must never be called regardless of action.
1987        let tree = open_test_tree_hooked::<false, false>(dir.path(), CountingHook::default());
1988
1989        for i in 0u64..3 {
1990            tree.put(&i.to_be_bytes(), &[i as u8; 16]).expect("put");
1991        }
1992        tree.hook.writes.store(0, AtomicOrdering::Relaxed);
1993        tree.hook.inits.store(0, AtomicOrdering::Relaxed);
1994
1995        // Keep
1996        tree.migrate(|_, _| MigrateAction::Keep)
1997            .expect("migrate keep");
1998        assert_eq!(
1999            tree.hook.inits.load(AtomicOrdering::Relaxed),
2000            0,
2001            "Keep with NEEDS_INIT=false"
2002        );
2003        assert_eq!(tree.hook.writes.load(AtomicOrdering::Relaxed), 0);
2004
2005        // Update
2006        let new = ByteView::new(b"new");
2007        tree.migrate(move |_, _| MigrateAction::Update(new.clone()))
2008            .expect("migrate update");
2009        assert_eq!(
2010            tree.hook.inits.load(AtomicOrdering::Relaxed),
2011            0,
2012            "Update with NEEDS_INIT=false"
2013        );
2014        assert_eq!(
2015            tree.hook.writes.load(AtomicOrdering::Relaxed),
2016            0,
2017            "Update must not fire on_write either"
2018        );
2019    }
2020
2021    #[test]
2022    fn var_tree_public_put_still_fires_on_write_once() {
2023        let dir = tempdir().unwrap();
2024        let tree = open_test_tree_hooked::<true, false>(dir.path(), CountingHook::default());
2025
2026        tree.put(&1u64.to_be_bytes(), b"v").expect("put");
2027        assert_eq!(tree.hook.writes.load(AtomicOrdering::Relaxed), 1);
2028    }
2029
2030    #[test]
2031    fn var_tree_atomic_does_not_fire_hooks() {
2032        let dir = tempdir().unwrap();
2033        let tree = open_test_tree_hooked::<true, false>(dir.path(), CountingHook::default());
2034
2035        let key = 1u64.to_be_bytes();
2036        tree.atomic(&key, |shard| {
2037            shard.put(&key, b"a")?;
2038            shard.delete(&key)?;
2039            Ok(())
2040        })
2041        .expect("atomic");
2042
2043        assert_eq!(tree.hook.writes.load(AtomicOrdering::Relaxed), 0);
2044        assert_eq!(tree.hook.inits.load(AtomicOrdering::Relaxed), 0);
2045    }
2046
2047    /// `_result` returns Ok when the value sits in the active write buffer.
2048    #[test]
2049    fn read_value_locked_result_ok_from_write_buf() {
2050        let dir = tempdir().unwrap();
2051        let tree = open_test_tree(dir.path());
2052
2053        let key = 1u64.to_be_bytes();
2054        let payload = b"in-write-buffer-value";
2055        tree.put(&key, payload).expect("put");
2056
2057        let guard = tree.index.collector().enter();
2058        let node = tree.index.get(key.as_bytes(), &guard).expect("indexed");
2059        let disk = *node.load_disk();
2060        drop(guard);
2061
2062        let shard = &tree.engine.shards()[disk.shard_id as usize];
2063        let inner = shard.lock();
2064        let v = tree
2065            .read_value_locked_result(&disk, &inner)
2066            .expect("write-buf read must succeed");
2067        assert_eq!(v.as_bytes(), payload);
2068    }
2069
2070    /// `_result` returns Ok when the value is on an immutable file and the
2071    /// entry sits within a single 4 KiB block.
2072    #[test]
2073    fn read_value_locked_result_ok_from_disk_immutable() {
2074        let dir = tempdir().unwrap();
2075        let tree = open_test_tree(dir.path());
2076
2077        let key = 2u64.to_be_bytes();
2078        let payload = b"single-block-immutable";
2079        tree.put(&key, payload).expect("put");
2080        // Force rotation so the entry moves out of the active write buffer.
2081        // Each put is 280 bytes; with max_file_size=8192, need >30 puts to
2082        // cross the rotation boundary. Use keys 100..135 to avoid overwriting key 2.
2083        for i in 100u64..135 {
2084            tree.put(&i.to_be_bytes(), &[i as u8; 256])
2085                .expect("rotator");
2086        }
2087
2088        let guard = tree.index.collector().enter();
2089        let node = tree.index.get(key.as_bytes(), &guard).expect("indexed");
2090        let disk = *node.load_disk();
2091        drop(guard);
2092
2093        // `open_test_tree` uses the default CacheConfig (max_size = 0), so the
2094        // cache is disabled and step 2 (single-block cache lookup) returns
2095        // None unconditionally — the read naturally exercises step 3 (disk).
2096
2097        let shard = &tree.engine.shards()[disk.shard_id as usize];
2098        let inner = shard.lock();
2099        // Sanity: the entry must NOT be in the write buffer anymore — the rotator
2100        // loop above is sized to force rotation past max_file_size=8192.
2101        assert_ne!(
2102            disk.file_id, inner.active.file_id,
2103            "test setup failed: key=2 entry is still in the active file's write buffer",
2104        );
2105        let v = tree
2106            .read_value_locked_result(&disk, &inner)
2107            .expect("disk read must succeed");
2108        assert_eq!(v.as_bytes(), payload);
2109    }
2110
2111    /// `_result` propagates `StaleDiskLoc` from the disk read.
2112    #[test]
2113    fn read_value_locked_result_propagates_stale_disk_loc() {
2114        let dir = tempdir().unwrap();
2115        let tree = open_test_tree(dir.path());
2116
2117        let key = 3u64.to_be_bytes();
2118        let snap = put_until_compactable(&tree, key);
2119
2120        let shard = &tree.engine.shards()[snap.shard_id as usize];
2121        let _ = compact_shard(shard, &tree, 0.0).expect("compaction");
2122
2123        let inner = shard.lock();
2124        match tree.read_value_locked_result(&snap, &inner) {
2125            Err(DbError::StaleDiskLoc) => {}
2126            Ok(v) => panic!("expected StaleDiskLoc, got Ok({:?})", v.as_bytes()),
2127            Err(e) => panic!("expected StaleDiskLoc, got Err({e})"),
2128        }
2129    }
2130
2131    /// Backward-compat: the `Option` wrapper still returns `None` on
2132    /// `StaleDiskLoc`, so callers like `cas`/`update` keep their contract.
2133    /// We intentionally do NOT assert on the tracing output (no capture
2134    /// helper exists in this crate); the `None` is the load-bearing
2135    /// invariant.
2136    #[test]
2137    fn read_value_locked_returns_none_on_stale() {
2138        let dir = tempdir().unwrap();
2139        let tree = open_test_tree(dir.path());
2140
2141        let key = 4u64.to_be_bytes();
2142        let snap = put_until_compactable(&tree, key);
2143
2144        let shard = &tree.engine.shards()[snap.shard_id as usize];
2145        let _ = compact_shard(shard, &tree, 0.0).expect("compaction");
2146
2147        let inner = shard.lock();
2148        assert!(tree.read_value_locked(&snap, &inner).is_none());
2149    }
2150
2151    /// Pins the "size check sits before cache lookup" invariant.
2152    ///
2153    /// Manually inject the first 4 KiB block of a large value into the
2154    /// cache. Without the early `start + len > 8192` check, the cached path
2155    /// would call `extract_from_block`, which only supports two blocks and
2156    /// would panic on `next[..second_len]` for `second_len > 4096`.
2157    ///
2158    /// With the early check, the call routes to the locked path and returns
2159    /// the correct bytes.
2160    #[test]
2161    fn large_value_with_first_block_cached_uses_fallback() {
2162        let dir = tempdir().unwrap();
2163        // Custom geometry: write_buffer_size big enough for 20 KiB entry,
2164        // max_file_size == write_buffer_size (required by Config::validate);
2165        // rotation is forced explicitly via rotate_active_for_test.
2166        let mut cfg = Config::test();
2167        cfg.shard_count = 1;
2168        cfg.max_file_size = 128 * 1024;
2169        cfg.write_buffer_size = 128 * 1024;
2170        cfg.compaction_threshold = 0.0;
2171        // Enable the block cache so `cache.insert` actually takes effect.
2172        // Default `CacheConfig::default` has `max_size = 0` which makes the
2173        // cache a no-op; we need real inserts to set up the warm-cache scenario.
2174        cfg.cache.max_size = 1 << 20;
2175        let tree: VarTree<[u8; 8]> = VarTree::open(dir.path(), cfg).expect("open");
2176
2177        let key = 42u64.to_be_bytes();
2178        // Build a value whose body spans ~5 blocks: > 8192 bytes.
2179        let payload: Vec<u8> = (0..20_000u32).map(|i| i as u8).collect();
2180        tree.put(&key, &payload).expect("put large");
2181        // Force rotation so the large entry moves to an immutable file.
2182        tree.engine.shards()[0]
2183            .rotate_active_for_test(8)
2184            .expect("rotate");
2185
2186        let guard = tree.index.collector().enter();
2187        let node = tree.index.get(key.as_bytes(), &guard).expect("indexed");
2188        let disk = *node.load_disk();
2189        drop(guard);
2190
2191        // Sanity: this is a multi-block large value.
2192        let start = (disk.offset & 4095) as usize;
2193        assert!(
2194            start + disk.len as usize > 8192,
2195            "test precondition: large value must span >2 blocks",
2196        );
2197
2198        // Inject the first block (zeroes) into the cache to ensure the cache
2199        // lookup would succeed for the first block — exactly the warm-cache
2200        // scenario that would have triggered the old panic.
2201        let block_offset = disk.offset as u64 & !4095;
2202        let cache_key = BlockKey {
2203            shard_id: disk.shard_id,
2204            file_id: disk.file_id,
2205            block_offset,
2206        };
2207        tree.cache
2208            .insert(cache_key, Arc::new(AlignedBuf::zeroed(4096)));
2209        // Sanity: insert took effect.
2210        assert!(
2211            tree.cache.get(&cache_key).is_some(),
2212            "cache must be enabled for warm-cache scenario",
2213        );
2214
2215        let v = tree
2216            .read_value_cached_inner(&disk)
2217            .expect("large value must read via locked fallback");
2218        assert_eq!(v.as_bytes(), payload.as_slice());
2219    }
2220
2221    /// Value entirely in one block: `start + len <= 4096`.
2222    #[test]
2223    fn extract_from_block_single_block() {
2224        let mut block = AlignedBuf::zeroed(4096);
2225        for (i, byte) in block.iter_mut().enumerate() {
2226            *byte = i as u8;
2227        }
2228        let v = VarTree::<[u8; 8]>::extract_from_block(&block, 100, 50, || {
2229            panic!("next_block must not be called for single-block reads")
2230        })
2231        .expect("ok");
2232        let expected: Vec<u8> = (100u8..150u8).collect();
2233        assert_eq!(v.as_bytes(), expected.as_slice());
2234    }
2235
2236    /// Value ends exactly at the second block's end: `start + len == 8192`.
2237    #[test]
2238    fn extract_from_block_two_blocks_exact() {
2239        let mut first = AlignedBuf::zeroed(4096);
2240        for byte in first.iter_mut() {
2241            *byte = 0xAA;
2242        }
2243        let mut second = AlignedBuf::zeroed(4096);
2244        for byte in second.iter_mut() {
2245            *byte = 0xBB;
2246        }
2247        // start = 4096 - 1 (last byte of first block); len = 4097.
2248        // start + len = 8192 — the boundary that must remain supported.
2249        let v = VarTree::<[u8; 8]>::extract_from_block(&first, 4095, 4097, || Ok(Arc::new(second)))
2250            .expect("ok");
2251        let bytes = v.as_bytes();
2252        assert_eq!(bytes.len(), 4097);
2253        assert_eq!(bytes[0], 0xAA);
2254        assert_eq!(bytes[1], 0xBB);
2255        assert_eq!(bytes[4096], 0xBB);
2256    }
2257
2258    /// Value spans two blocks partially: `4096 < start + len < 8192`.
2259    #[test]
2260    fn extract_from_block_two_blocks_partial() {
2261        let mut first = AlignedBuf::zeroed(4096);
2262        for byte in first.iter_mut() {
2263            *byte = 0x11;
2264        }
2265        let mut second = AlignedBuf::zeroed(4096);
2266        for byte in second.iter_mut() {
2267            *byte = 0x22;
2268        }
2269        // start = 4000, len = 200 -> first 96 bytes from first, next 104 from second.
2270        let v = VarTree::<[u8; 8]>::extract_from_block(&first, 4000, 200, || Ok(Arc::new(second)))
2271            .expect("ok");
2272        let bytes = v.as_bytes();
2273        assert_eq!(bytes.len(), 200);
2274        assert!(bytes[..96].iter().all(|b| *b == 0x11));
2275        assert!(bytes[96..].iter().all(|b| *b == 0x22));
2276    }
2277
2278    /// Fixed geometry that forces large entries onto an immutable file:
2279    /// `write_buffer_size` fits the 50 KiB entry; rotation is done explicitly
2280    /// via `rotate_active_for_test` so max_file_size == write_buffer_size
2281    /// satisfies Config::validate.
2282    fn open_large_value_tree(dir: &std::path::Path) -> VarTree<[u8; 8]> {
2283        let mut cfg = Config::test();
2284        cfg.shard_count = 1;
2285        cfg.max_file_size = 128 * 1024;
2286        cfg.write_buffer_size = 128 * 1024;
2287        cfg.compaction_threshold = 0.0;
2288        VarTree::open(dir, cfg).expect("open large-value test tree")
2289    }
2290
2291    fn build_large_payload(seed: u8) -> Vec<u8> {
2292        (0..50_000u32)
2293            .map(|i| (i as u8).wrapping_add(seed))
2294            .collect()
2295    }
2296
2297    /// End-to-end: 50 KiB value, force rotation to immutable, then `get`
2298    /// must return correct bytes via read_value_from_disk_locked.
2299    ///
2300    /// `open_large_value_tree` uses the default CacheConfig (max_size = 0),
2301    /// so the block cache is disabled and the read goes through the
2302    /// large-value locked fallback → disk read on every call.
2303    #[test]
2304    fn large_value_read_via_locked_fallback() {
2305        let dir = tempdir().unwrap();
2306        let tree = open_large_value_tree(dir.path());
2307
2308        let key = 100u64.to_be_bytes();
2309        let payload = build_large_payload(0);
2310        tree.put(&key, &payload).expect("put large");
2311        tree.engine.shards()[0]
2312            .rotate_active_for_test(8)
2313            .expect("rotate");
2314
2315        let guard = tree.index.collector().enter();
2316        let node = tree.index.get(key.as_bytes(), &guard).expect("indexed");
2317        let v = tree
2318            .read_value_cached(node, &guard)
2319            .expect("read must succeed via locked fallback");
2320        assert_eq!(v.as_bytes(), payload.as_slice());
2321    }
2322
2323    #[cfg(feature = "encryption")]
2324    #[test]
2325    fn large_value_read_encrypted() {
2326        let dir = tempdir().unwrap();
2327        let mut cfg = Config::test();
2328        cfg.shard_count = 1;
2329        cfg.max_file_size = 128 * 1024;
2330        cfg.write_buffer_size = 128 * 1024;
2331        cfg.compaction_threshold = 0.0;
2332        cfg.encryption_key = Some([7u8; 32]);
2333        // Cache stays at default (max_size = 0, disabled). The locked
2334        // fallback always goes to step 3 -> read_value_from_disk_locked,
2335        // which routes to pread_value_encrypted under this feature gate.
2336        let tree: VarTree<[u8; 8]> = VarTree::open(dir.path(), cfg).expect("open enc");
2337
2338        let key = 101u64.to_be_bytes();
2339        let payload = build_large_payload(0xAB);
2340        tree.put(&key, &payload).expect("put encrypted large");
2341        tree.engine.shards()[0]
2342            .rotate_active_for_test(8)
2343            .expect("rotate");
2344
2345        let guard = tree.index.collector().enter();
2346        let node = tree.index.get(key.as_bytes(), &guard).expect("indexed");
2347        let v = tree
2348            .read_value_cached(node, &guard)
2349            .expect("encrypted large read must succeed via pread_value_encrypted");
2350        assert_eq!(v.as_bytes(), payload.as_slice());
2351    }
2352
2353    /// Deterministic stale-DiskLoc test for large values: build a large
2354    /// entry, capture its DiskLoc, overwrite enough to make the file fully
2355    /// dead, compact (removes the file), then the captured DiskLoc must
2356    /// surface as StaleDiskLoc via the locked fallback. The public
2357    /// retry path then returns the live value.
2358    #[test]
2359    fn large_value_stale_disk_loc_deterministic() {
2360        let dir = tempdir().unwrap();
2361        let tree = open_large_value_tree(dir.path());
2362
2363        let key = 102u64.to_be_bytes();
2364        let payload = build_large_payload(0x42);
2365        tree.put(&key, &payload).expect("first large put");
2366
2367        // Snapshot the DiskLoc of the first put — points at the file we
2368        // will erase via compaction.
2369        let snap = {
2370            let guard = tree.index.collector().enter();
2371            let node = tree.index.get(key.as_bytes(), &guard).expect("indexed");
2372            *node.load_disk()
2373        };
2374
2375        // Rotate the active file so the large entry moves to an immutable file.
2376        tree.engine.shards()[0]
2377            .rotate_active_for_test(8)
2378            .expect("rotate after large put");
2379
2380        // Overwrite many times with small payloads to move the live pointer off
2381        // the original file (making it 100% dead so compaction erases it).
2382        for i in 1..20u8 {
2383            tree.put(&key, &[i; 256]).expect("overwrite");
2384        }
2385        tree.put(&key, b"live-after-compaction").expect("final put");
2386
2387        let shard = &tree.engine.shards()[snap.shard_id as usize];
2388        let _ = compact_shard(shard, &tree, 0.0).expect("compaction");
2389
2390        // Direct call on the stale snapshot — must propagate StaleDiskLoc
2391        // through the large-value locked fallback.
2392        match tree.read_value_cached_inner(&snap) {
2393            Err(DbError::StaleDiskLoc) => {}
2394            Ok(v) => panic!("expected StaleDiskLoc, got Ok({:?})", v.as_bytes()),
2395            Err(e) => panic!("expected StaleDiskLoc, got Err({e})"),
2396        }
2397
2398        // Public retry path picks up the live value.
2399        let guard = tree.index.collector().enter();
2400        let node = tree.index.get(key.as_bytes(), &guard).expect("indexed");
2401        let v = tree
2402            .read_value_cached(node, &guard)
2403            .expect("public path must retry and return live value");
2404        assert_eq!(v.as_bytes(), b"live-after-compaction");
2405    }
2406
2407    /// `_result` step 2: single-block cache fast path.
2408    /// Writes a value, snapshots its DiskLoc, manually inserts the matching
2409    /// 4 KiB block into the cache, then calls `_result` and verifies the
2410    /// cache-hit path returns the correct bytes.
2411    #[test]
2412    fn read_value_locked_result_ok_from_cache_single_block() {
2413        let dir = tempdir().unwrap();
2414        // Enable the cache so `cache.insert` actually takes effect.
2415        let mut cfg = Config::test();
2416        cfg.shard_count = 1;
2417        cfg.max_file_size = 8192;
2418        cfg.write_buffer_size = 8192;
2419        cfg.compaction_threshold = 0.0;
2420        cfg.cache.max_size = 1 << 20;
2421        let tree: VarTree<[u8; 8]> = VarTree::open(dir.path(), cfg).expect("open");
2422
2423        let key = 9u64.to_be_bytes();
2424        let payload = b"small-single-block-value";
2425        tree.put(&key, payload).expect("put");
2426        // Force rotation so the entry leaves the active write buffer (otherwise
2427        // step 1 short-circuits and step 2 never runs). Need enough writes to
2428        // push write_offset past max_file_size=8192.
2429        for i in 100u64..135 {
2430            tree.put(&i.to_be_bytes(), &[i as u8; 256])
2431                .expect("rotator");
2432        }
2433
2434        let guard = tree.index.collector().enter();
2435        let node = tree.index.get(key.as_bytes(), &guard).expect("indexed");
2436        let disk = *node.load_disk();
2437        drop(guard);
2438
2439        // Sanity: this value fits in a single block (step 2 only runs when
2440        // start + len <= 4096).
2441        let start = (disk.offset & 4095) as usize;
2442        let len = disk.len as usize;
2443        assert!(
2444            start + len <= 4096,
2445            "test precondition: value must fit in a single block",
2446        );
2447
2448        // Pre-warm the cache BEFORE acquiring the shard lock.
2449        // get_or_read_block → shard.read_block → sync::lock(&inner), so it
2450        // must not be called while the shard lock is already held.
2451        let block_offset = disk.offset as u64 & !4095;
2452        let cache_key = BlockKey {
2453            shard_id: disk.shard_id,
2454            file_id: disk.file_id,
2455            block_offset,
2456        };
2457        let block = tree
2458            .get_or_read_block(disk.shard_id, disk.file_id, block_offset)
2459            .expect("read block");
2460        tree.cache.insert(cache_key, block);
2461        assert!(
2462            tree.cache.get(&cache_key).is_some(),
2463            "cache must contain the block for step 2 to fire",
2464        );
2465
2466        let shard = &tree.engine.shards()[disk.shard_id as usize];
2467        let inner = shard.lock();
2468        assert_ne!(
2469            disk.file_id, inner.active.file_id,
2470            "test setup failed: key entry is still in the active write buffer",
2471        );
2472
2473        let v = tree
2474            .read_value_locked_result(&disk, &inner)
2475            .expect("cache-hit read must succeed");
2476        assert_eq!(v.as_bytes(), payload);
2477    }
2478
2479    /// End-to-end roundtrip with file_id above u16::MAX.
2480    ///
2481    /// Uses write_buffer_size=128 KiB so a single 512-byte entry fits in one
2482    /// flush cycle; max_file_size=4096 forces per-entry rotation; cache
2483    /// enabled so the fast path is also exercised.  After bumping
2484    /// next_file_id to 70_000 and rotating, the active file gets an id that
2485    /// exceeds u16::MAX.  The entry is flushed + rotated to make it
2486    /// immutable, then `get` must return the correct bytes via the locked
2487    /// disk-read path.  Before the file_id u32 widening this would either
2488    /// panic or return wrong bytes because the high bits were truncated.
2489    #[test]
2490    fn var_tree_get_with_file_id_above_u16() {
2491        let dir = tempdir().unwrap();
2492        let mut cfg = Config::test();
2493        cfg.shard_count = 1;
2494        cfg.max_file_size = 128 * 1024;
2495        cfg.write_buffer_size = 128 * 1024;
2496        cfg.compaction_threshold = 0.0;
2497        cfg.cache.max_size = 1 << 20;
2498        let tree: VarTree<[u8; 8]> = VarTree::open(dir.path(), cfg).expect("open");
2499
2500        let shard = &tree.engine.shards()[0];
2501
2502        // Bump file id well past u16::MAX and rotate so the next active file
2503        // carries the new id.
2504        shard.set_next_file_id(70_000);
2505        shard.rotate_active_for_test(8).expect("first rotate");
2506        assert!(
2507            shard.active_file_id() >= 70_000,
2508            "active_file_id should be >= 70_000 after rotation"
2509        );
2510
2511        let key = 42u64.to_be_bytes();
2512        let value = vec![0xC3u8; 512];
2513        tree.put(&key, &value).expect("put");
2514
2515        // Confirm the DiskLoc has file_id > u16::MAX.
2516        {
2517            let guard = tree.index.collector().enter();
2518            let node = tree.index.get(key.as_bytes(), &guard).expect("indexed");
2519            let disk = *node.load_disk();
2520            assert!(
2521                disk.file_id > u16::MAX as u32,
2522                "DiskLoc.file_id must be above u16::MAX, got {}",
2523                disk.file_id,
2524            );
2525        }
2526
2527        // Flush the write buffer and rotate so the entry lands on an
2528        // immutable file — reads will go through Shard::read_block.
2529        shard.flush().expect("flush");
2530        shard.rotate_active_for_test(8).expect("second rotate");
2531
2532        let got = tree.get(&key).expect("get must return Some");
2533        assert_eq!(got.as_bytes(), value.as_slice());
2534    }
2535
2536    /// Recovery roundtrip with file_id above u16::MAX.
2537    ///
2538    /// Phase A: open, bump next_file_id past u16::MAX, rotate once (so the
2539    /// active file gets an id > 65535), write entries, close.  `close()` calls
2540    /// `sync_hints()` which flushes the write buffer and writes the hint file
2541    /// with the correct key length (8), then `engine.flush()` fsyncs.
2542    /// Phase B: reopen from the same tempdir, read every entry back, confirm
2543    /// at least one on-disk file_id still exceeds u16::MAX.
2544    ///
2545    /// Before the file_id u32 widening the high bits were truncated during
2546    /// hint-file serialization and recovery; this test pins that code path so
2547    /// a future regression cannot go undetected.
2548    #[test]
2549    fn recovery_handles_file_id_above_u16() {
2550        let dir = tempdir().unwrap();
2551
2552        let mut cfg = Config::test();
2553        cfg.shard_count = 1;
2554        cfg.max_file_size = 128 * 1024;
2555        cfg.write_buffer_size = 128 * 1024;
2556        cfg.compaction_threshold = 0.0;
2557        cfg.cache.max_size = 1 << 20;
2558
2559        // --- Phase A: open, bump file_id past u16, write, close ---
2560        {
2561            let tree: VarTree<[u8; 8]> = VarTree::open(dir.path(), cfg.clone()).expect("open A");
2562            let shard = &tree.engine.shards()[0];
2563
2564            // Bump the id counter and rotate so the active file gets id 70_000.
2565            shard.set_next_file_id(70_000);
2566            shard
2567                .rotate_active_for_test(8)
2568                .expect("rotate to file_id 70_000");
2569            assert!(
2570                shard.active_file_id() >= 70_000,
2571                "active_file_id should be >= 70_000 after rotation"
2572            );
2573
2574            for i in 0u64..4 {
2575                let key = i.to_be_bytes();
2576                let value = vec![i as u8; 200];
2577                tree.put(&key, &value).expect("put phase A");
2578            }
2579
2580            tree.close().expect("close phase A");
2581        }
2582
2583        // --- Phase B: reopen, read entries, assert wide file_id persists ---
2584        {
2585            let tree: VarTree<[u8; 8]> = VarTree::open(dir.path(), cfg).expect("open B");
2586
2587            assert_eq!(tree.len(), 4, "all 4 entries must survive recovery");
2588
2589            for i in 0u64..4 {
2590                let key = i.to_be_bytes();
2591                let expected = vec![i as u8; 200];
2592                let got = tree
2593                    .get(&key)
2594                    .unwrap_or_else(|| panic!("key {i} not found after recovery"));
2595                assert_eq!(
2596                    got.as_bytes(),
2597                    expected.as_slice(),
2598                    "value mismatch for key {i} after recovery"
2599                );
2600            }
2601
2602            // Verify at least one on-disk file_id exceeded u16::MAX so that
2603            // the test actually exercises the wide-id path.
2604            let shard = &tree.engine.shards()[0];
2605            let max_fid = shard.file_ids().into_iter().max().expect("non-empty");
2606            assert!(
2607                max_fid > u16::MAX as u32,
2608                "max file_id should exceed u16::MAX after recovery (got {})",
2609                max_fid
2610            );
2611        }
2612    }
2613}