1use std::mem::size_of;
2use std::ops::Bound;
3use std::sync::Arc;
4
5use crate::Key;
6
7use crate::byte_view::ByteView;
8use crate::cache::{BlockCache, BlockKey};
9use crate::compaction::{CompactionIndex, compact_shard};
10use crate::config::Config;
11use crate::disk_loc::DiskLoc;
12use crate::engine::Engine;
13use crate::error::{DbError, DbResult};
14use crate::hook::{NoHook, WriteHook};
15use crate::io::aligned_buf::AlignedBuf;
16use crate::recovery::recover_var_tree;
17use crate::shard::ShardInner;
18use crate::skiplist::node::{SkipNode, VarNode, random_height};
19use crate::skiplist::{InsertResult, SkipList};
20use crate::sync::MutexGuard;
21
22const MAX_STALE_RETRIES: usize = 3;
23
24pub struct VarTree<K: Key, H: WriteHook<K> = NoHook> {
49 index: SkipList<VarNode<K>>,
50 engine: Engine,
51 cache: BlockCache,
52 compaction_threshold: f64,
53 shard_prefix_bits: usize,
54 reversed: bool,
55 hook: H,
56}
57
58impl<K: Key> VarTree<K> {
59 pub fn open(path: impl AsRef<std::path::Path>, config: Config) -> DbResult<Self> {
62 Self::open_inner(path, config, NoHook)
63 }
64}
65
66impl<K: Key, H: WriteHook<K>> VarTree<K, H> {
67 pub fn open_hooked(
69 path: impl AsRef<std::path::Path>,
70 config: Config,
71 hook: H,
72 ) -> DbResult<Self> {
73 Self::open_inner(path, config, hook)
74 }
75
76 fn open_inner(path: impl AsRef<std::path::Path>, config: Config, hook: H) -> DbResult<Self> {
77 let compaction_threshold = config.compaction_threshold;
78 let shard_prefix_bits = config.shard_prefix_bits;
79 let reversed = config.reversed;
80 let cache = BlockCache::new(&config.cache);
81 let engine = Engine::open(path, config)?;
82
83 let tree = Self {
84 index: SkipList::new(reversed),
85 engine,
86 cache,
87 compaction_threshold,
88 shard_prefix_bits,
89 reversed,
90 hook,
91 };
92
93 let shard_dirs = tree.engine.shard_dirs();
95 let shard_dir_refs = Engine::shard_dir_refs(&shard_dirs);
96 let shard_ids = tree.engine.shard_ids();
97
98 let hints = tree.engine.hints();
99 let outcome = recover_var_tree::<K>(
100 &shard_dir_refs,
101 &shard_ids,
102 tree.index(),
103 hints,
104 #[cfg(feature = "encryption")]
105 tree.engine.cipher(),
106 )?;
107 for tail in &outcome.active_tails {
108 tree.engine.shards()[tail.shard_idx].apply_recovery_tail(tail)?;
109 }
110 for (shard_idx, dead) in outcome.shard_dead_bytes {
111 tree.engine.shards()[shard_idx].install_dead_bytes(dead);
112 }
113 let max_gsn = outcome.max_gsn;
114
115 tree.engine
116 .gsn()
117 .fetch_max(max_gsn + 1, std::sync::atomic::Ordering::Relaxed);
118 if hints {
119 for shard in tree.engine.shards().iter() {
120 shard.set_key_len(size_of::<K>());
121 }
122 }
123 tracing::info!(
124 key_size = size_of::<K>(),
125 entries = tree.len(),
126 "var_tree recovered"
127 );
128
129 Ok(tree)
130 }
131
132 pub fn close(self) -> DbResult<()> {
134 if self.engine.hints() {
135 self.sync_hints()?;
136 }
137 self.engine.flush()
138 }
139
140 pub fn flush_buffers(&self) -> DbResult<()> {
142 self.engine.flush_buffers()
143 }
144
145 pub fn config(&self) -> &Config {
147 self.engine.config()
148 }
149}
150
151impl<K: Key, H: WriteHook<K>> CompactionIndex<K> for VarTree<K, H> {
152 fn update_if_match(&self, key: &K, old_loc: DiskLoc, new_loc: DiskLoc) -> bool {
153 let guard = self.index.collector().enter();
154 if let Some(node) = self.index.get(key.as_bytes(), &guard) {
155 let current_ptr = node.load_disk_ptr();
156 let current_disk = unsafe { *current_ptr };
157 if current_disk == old_loc {
158 let new_disk_ptr = Box::into_raw(Box::new(new_loc));
159 match node.compare_exchange_disk(current_ptr, new_disk_ptr) {
160 Ok(old_ptr) => {
161 unsafe {
162 self.index
163 .collector()
164 .retire(old_ptr, seize::reclaim::boxed::<DiskLoc>);
165 }
166 return true;
167 }
168 Err(_) => {
169 unsafe {
172 drop(Box::from_raw(new_disk_ptr));
173 }
174 return false;
175 }
176 }
177 }
178 }
179 false
180 }
181
182 fn invalidate_blocks(&self, shard_id: u8, file_id: u32, total_bytes: u64) {
183 self.cache.invalidate_file(shard_id, file_id, total_bytes);
184 }
185
186 fn contains_key(&self, key: &K) -> bool {
187 self.contains(key)
188 }
189}
190
191impl<K: Key, H: WriteHook<K>> VarTree<K, H> {
192 pub fn compact(&self) -> DbResult<usize> {
194 let mut total_compacted = 0;
195 for shard in self.engine.shards().iter() {
196 total_compacted += compact_shard(shard, self, self.compaction_threshold)?;
197 }
198 Ok(total_compacted)
199 }
200
201 pub fn get(&self, key: &K) -> Option<ByteView> {
203 metrics::counter!("armdb.ops", "op" => "get", "tree" => "var_tree").increment(1);
204 #[cfg(feature = "hot-path-tracing")]
205 tracing::trace!("var_tree.get");
206 let guard = self.index.collector().enter();
207 let node = match self.index.get(key.as_bytes(), &guard) {
208 Some(n) => n,
209 None => {
210 #[cfg(feature = "hot-path-tracing")]
211 tracing::error!(
212 "VarTree get error: index.get returned None for key {:?}",
213 key.as_bytes()
214 );
215 return None;
216 }
217 };
218 self.read_value_cached(node, &guard)
219 }
220
221 pub fn get_or_err(&self, key: &K) -> DbResult<ByteView> {
223 self.get(key).ok_or(DbError::KeyNotFound)
224 }
225
226 pub fn put(&self, key: &K, value: &[u8]) -> DbResult<()> {
228 metrics::counter!("armdb.ops", "op" => "put", "tree" => "var_tree").increment(1);
229 #[cfg(feature = "hot-path-tracing")]
230 tracing::trace!("var_tree.put");
231 let shard_id = self.shard_for(key);
232 let mut inner = self.engine.shards()[shard_id].lock();
233 let guard = self.index.collector().enter();
234 let old_value = if H::NEEDS_OLD_VALUE {
235 if let Some(node) = self.index.get(key.as_bytes(), &guard) {
236 let disk = *node.load_disk();
237 Some(self.read_value_locked_result(&disk, &inner)?)
238 } else {
239 None
240 }
241 } else {
242 None
243 };
244 self.put_locked(shard_id, &mut inner, &guard, key, value)?;
245 drop(inner);
246 self.hook.on_write(key, old_value.as_deref(), Some(value));
247 Ok(())
248 }
249
250 pub fn insert(&self, key: &K, value: &[u8]) -> DbResult<()> {
253 metrics::counter!("armdb.ops", "op" => "insert", "tree" => "var_tree").increment(1);
254 #[cfg(feature = "hot-path-tracing")]
255 tracing::trace!("var_tree.insert");
256 let shard_id = self.shard_for(key);
257 let mut inner = self.engine.shards()[shard_id].lock();
258 let guard = self.index.collector().enter();
259 self.insert_locked(shard_id, &mut inner, &guard, key, value)?;
260 drop(inner);
261 self.hook.on_write(key, None, Some(value));
262 Ok(())
263 }
264
265 pub fn delete(&self, key: &K) -> DbResult<bool> {
267 metrics::counter!("armdb.ops", "op" => "delete", "tree" => "var_tree").increment(1);
268 #[cfg(feature = "hot-path-tracing")]
269 tracing::trace!("var_tree.delete");
270 let shard_id = self.shard_for(key);
271 let mut inner = self.engine.shards()[shard_id].lock();
272 let guard = self.index.collector().enter();
273 let old_value = if H::NEEDS_OLD_VALUE {
274 if let Some(node) = self.index.get(key.as_bytes(), &guard) {
275 let disk = *node.load_disk();
276 Some(self.read_value_locked_result(&disk, &inner)?)
277 } else {
278 None
279 }
280 } else {
281 None
282 };
283 let existed = self.delete_locked(shard_id, &mut inner, &guard, key)?;
284 drop(inner);
285 if existed {
286 self.hook.on_write(key, old_value.as_deref(), None);
287 }
288 Ok(existed)
289 }
290
291 pub fn atomic<R>(
295 &self,
296 shard_key: &K,
297 f: impl FnOnce(&mut VarShard<'_, K, H>) -> DbResult<R>,
298 ) -> DbResult<R> {
299 let shard_id = self.shard_for(shard_key);
300 let inner = self.engine.shards()[shard_id].lock();
301 let guard = self.index.collector().enter();
302 let mut shard = VarShard {
303 tree: self,
304 inner,
305 shard_id,
306 guard,
307 };
308 f(&mut shard)
309 }
310
311 fn put_locked(
312 &self,
313 shard_id: usize,
314 inner: &mut ShardInner,
315 guard: &seize::LocalGuard<'_>,
316 key: &K,
317 value: &[u8],
318 ) -> DbResult<()> {
319 let (disk_loc, _gsn) = inner.append_entry(shard_id as u8, key.as_bytes(), value, false)?;
320
321 if let Some(existing) = self.index.get(key.as_bytes(), guard) {
323 let new_disk = Box::into_raw(Box::new(disk_loc));
324 let old_disk_ptr = existing.swap_disk(new_disk);
325 let old_disk = unsafe { *old_disk_ptr };
326 inner.add_dead_bytes(
327 old_disk.file_id,
328 crate::entry::entry_size(size_of::<K>(), old_disk.len),
329 );
330 unsafe {
331 self.index
332 .collector()
333 .retire(old_disk_ptr, seize::reclaim::boxed::<DiskLoc>);
334 }
335 return Ok(());
336 }
337
338 let height = random_height();
340 let node_ptr = VarNode::alloc(*key, disk_loc, height);
341
342 match self.index.insert(node_ptr, guard) {
343 InsertResult::Inserted => {}
344 InsertResult::Exists(existing) => {
345 let new_disk = Box::into_raw(Box::new(disk_loc));
347 let old_disk_ptr = existing.swap_disk(new_disk);
348 let old_disk = unsafe { *old_disk_ptr };
349 inner.add_dead_bytes(
350 old_disk.file_id,
351 crate::entry::entry_size(size_of::<K>(), old_disk.len),
352 );
353 unsafe {
354 self.index
355 .collector()
356 .retire(old_disk_ptr, seize::reclaim::boxed::<DiskLoc>);
357 }
358 unsafe {
359 (*node_ptr)
360 .disk
361 .store(std::ptr::null_mut(), std::sync::atomic::Ordering::Relaxed);
362 VarNode::<K>::dealloc_node(node_ptr);
363 }
364 }
365 }
366
367 Ok(())
368 }
369
370 fn insert_locked(
371 &self,
372 shard_id: usize,
373 inner: &mut ShardInner,
374 guard: &seize::LocalGuard<'_>,
375 key: &K,
376 value: &[u8],
377 ) -> DbResult<()> {
378 if self.index.get(key.as_bytes(), guard).is_some() {
379 return Err(DbError::KeyExists);
380 }
381
382 let (disk_loc, _gsn) = inner.append_entry(shard_id as u8, key.as_bytes(), value, false)?;
383 let height = random_height();
384 let node_ptr = VarNode::alloc(*key, disk_loc, height);
385
386 match self.index.insert(node_ptr, guard) {
387 InsertResult::Inserted => Ok(()),
388 InsertResult::Exists(_existing) => {
389 inner.add_dead_bytes(
393 disk_loc.file_id,
394 crate::entry::entry_size(size_of::<K>(), disk_loc.len),
395 );
396 unsafe { VarNode::<K>::dealloc_node(node_ptr) };
400 Err(DbError::KeyExists)
401 }
402 }
403 }
404
405 fn delete_locked(
406 &self,
407 shard_id: usize,
408 inner: &mut ShardInner,
409 guard: &seize::LocalGuard<'_>,
410 key: &K,
411 ) -> DbResult<bool> {
412 if self.index.get(key.as_bytes(), guard).is_none() {
413 return Ok(false);
414 }
415
416 inner.append_entry(shard_id as u8, key.as_bytes(), &[], true)?;
417
418 let removed = self.index.remove(key.as_bytes(), guard);
419
420 if let Some(node_ptr) = removed {
421 let disk = *unsafe { &*node_ptr }.load_disk();
422 inner.add_dead_bytes(
423 disk.file_id,
424 crate::entry::entry_size(size_of::<K>(), disk.len),
425 );
426 }
427
428 Ok(removed.is_some())
429 }
430
431 pub fn contains(&self, key: &K) -> bool {
433 let guard = self.index.collector().enter();
434 self.index.get(key.as_bytes(), &guard).is_some()
435 }
436
437 pub fn entry_len(&self, key: &K) -> Option<u32> {
440 let guard = self.index.collector().enter();
441 self.index
442 .get(key.as_bytes(), &guard)
443 .map(|node| node.load_disk().len)
444 }
445
446 pub fn first(&self) -> Option<(K, ByteView)> {
450 let guard = self.index.collector().enter();
451 let mut ptr = crate::skiplist::strip_mark(unsafe {
452 (*self.index.head_ptr())
453 .tower(0)
454 .load(std::sync::atomic::Ordering::Acquire)
455 });
456 while !ptr.is_null() {
457 let node = unsafe { &*ptr };
458 if !node.is_marked() {
459 return self.read_value_cached(node, &guard).map(|v| (node.key, v));
460 }
461 ptr = crate::skiplist::strip_mark(
462 node.tower(0).load(std::sync::atomic::Ordering::Acquire),
463 );
464 }
465 None
466 }
467
468 pub fn last(&self) -> Option<(K, ByteView)> {
472 self.iter().next_back()
473 }
474
475 fn resolve_front(&self, bound: &Bound<&K>, guard: &seize::LocalGuard<'_>) -> *mut VarNode<K> {
478 match bound {
479 Bound::Included(k) => self.index.find_first_ge(k.as_bytes(), guard),
480 Bound::Excluded(k) => {
481 let ge = self.index.find_first_ge(k.as_bytes(), guard);
482 if !ge.is_null()
483 && !unsafe { &*ge }.is_marked()
484 && unsafe { &*ge }.key_bytes() == k.as_bytes()
485 {
486 crate::skiplist::strip_mark(unsafe {
487 (*ge).tower(0).load(std::sync::atomic::Ordering::Acquire)
488 })
489 } else {
490 ge
491 }
492 }
493 Bound::Unbounded => crate::skiplist::strip_mark(unsafe {
494 (*self.index.head_ptr())
495 .tower(0)
496 .load(std::sync::atomic::Ordering::Acquire)
497 }),
498 }
499 }
500
501 fn prefix_bounds(&self, prefix: &[u8]) -> (K, Bound<K>) {
502 if self.reversed {
503 let mut search = K::zeroed();
504 search.as_bytes_mut().fill(0xFF);
505 search.as_bytes_mut()[..prefix.len()].copy_from_slice(prefix);
506 let mut end_key = K::zeroed();
507 end_key.as_bytes_mut()[..prefix.len()].copy_from_slice(prefix);
508 (search, Bound::Included(end_key))
509 } else {
510 let mut search = K::zeroed();
511 search.as_bytes_mut()[..prefix.len()].copy_from_slice(prefix);
512 let end = prefix_to_end_bound::<K>(prefix);
513 (search, end)
514 }
515 }
516
517 pub fn prefix_iter(&self, prefix: &[u8]) -> VarIter<'_, K, H> {
522 let guard = self.index.collector().enter();
523 let (search_key, end) = self.prefix_bounds(prefix);
524 let front = self.index.find_first_ge(search_key.as_bytes(), &guard);
525 VarIter {
526 tree: self,
527 front,
528 back: None,
529 end,
530 start: Bound::Included(search_key),
531 reversed: self.reversed,
532 done: false,
533 _guard: guard,
534 }
535 }
536
537 pub fn iter(&self) -> VarIter<'_, K, H> {
542 let guard = self.index.collector().enter();
543 let front = crate::skiplist::strip_mark(unsafe {
544 (*self.index.head_ptr())
545 .tower(0)
546 .load(std::sync::atomic::Ordering::Acquire)
547 });
548 VarIter {
549 tree: self,
550 front,
551 back: None,
552 end: Bound::Unbounded,
553 start: Bound::Unbounded,
554 reversed: self.reversed,
555 done: false,
556 _guard: guard,
557 }
558 }
559
560 pub fn range(&self, start: &K, end: &K) -> VarIter<'_, K, H> {
565 self.range_bounds(Bound::Included(start), Bound::Excluded(end))
566 }
567
568 pub fn range_bounds(&self, start: Bound<&K>, end: Bound<&K>) -> VarIter<'_, K, H> {
576 let guard = self.index.collector().enter();
577 if self.reversed {
578 let front = self.resolve_front(&end, &guard);
579 VarIter {
580 tree: self,
581 front,
582 back: None,
583 end: bound_owned(&start),
584 start: bound_owned(&end),
585 reversed: true,
586 done: false,
587 _guard: guard,
588 }
589 } else {
590 let front = self.resolve_front(&start, &guard);
591 VarIter {
592 tree: self,
593 front,
594 back: None,
595 end: bound_owned(&end),
596 start: bound_owned(&start),
597 reversed: false,
598 done: false,
599 _guard: guard,
600 }
601 }
602 }
603
604 pub fn len(&self) -> usize {
605 self.index.len()
606 }
607
608 pub fn is_empty(&self) -> bool {
609 self.index.is_empty()
610 }
611
612 pub fn sync_hints(&self) -> DbResult<()> {
614 for shard in self.engine.shards().iter() {
615 shard.write_active_hint(size_of::<K>())?;
616 }
617 Ok(())
618 }
619
620 pub fn warmup(&self) -> DbResult<()> {
626 use std::collections::BTreeSet;
627
628 let guard = self.index.collector().enter();
629
630 let mut blocks: BTreeSet<(u8, u32, u64)> = BTreeSet::new();
632 let mut current = crate::skiplist::strip_mark(unsafe {
633 (*self.index.head_ptr())
634 .tower(0)
635 .load(std::sync::atomic::Ordering::Acquire)
636 });
637 while !current.is_null() {
638 let node = unsafe { &*current };
639 current = crate::skiplist::strip_mark(
640 node.tower(0).load(std::sync::atomic::Ordering::Acquire),
641 );
642 if node.is_marked() {
643 continue;
644 }
645 let disk = *node.load_disk();
646 let block_offset = disk.offset as u64 & !4095;
647 blocks.insert((disk.shard_id, disk.file_id, block_offset));
648 }
649 drop(guard);
650
651 for (shard_id, file_id, block_offset) in &blocks {
653 let key = BlockKey {
654 shard_id: *shard_id,
655 file_id: *file_id,
656 block_offset: *block_offset,
657 };
658 if self.cache.get(&key).is_some() {
659 continue;
660 }
661 let shard = &self.engine.shards()[*shard_id as usize];
662 let (buf, is_full_block) = shard.read_block(*file_id, *block_offset)?;
663 if is_full_block {
664 self.cache.insert(key, Arc::new(buf));
665 }
666 }
667
668 Ok(())
669 }
670
671 pub(crate) fn index(&self) -> &SkipList<VarNode<K>> {
672 &self.index
673 }
674
675 pub fn migrate(
685 &self,
686 f: impl Fn(&K, &[u8]) -> crate::MigrateAction<ByteView>,
687 ) -> DbResult<usize> {
688 use crate::MigrateAction;
689
690 let guard = self.index.collector().enter();
691 let mut current = crate::skiplist::strip_mark(unsafe {
692 (*self.index.head_ptr())
693 .tower(0)
694 .load(std::sync::atomic::Ordering::Acquire)
695 });
696 let mut count = 0;
697 while !current.is_null() {
698 let node = unsafe { &*current };
699 current = crate::skiplist::strip_mark(
700 node.tower(0).load(std::sync::atomic::Ordering::Acquire),
701 );
702 if node.is_marked() {
703 continue;
704 }
705 let value = match self.read_value_cached(node, &guard) {
706 Some(v) => v,
707 None => {
708 tracing::warn!(
709 key = ?node.key.as_bytes(),
710 "var_tree migrate: skipping entry — value read failed"
711 );
712 continue;
713 }
714 };
715 match f(&node.key, &value) {
716 MigrateAction::Keep => {
717 if H::NEEDS_INIT {
718 self.hook.on_init(&node.key, &value);
719 }
720 }
721 MigrateAction::Update(new_value) => {
722 let shard_id = self.shard_for(&node.key);
723 {
724 let mut inner = self.engine.shards()[shard_id].lock();
725 self.put_locked(shard_id, &mut inner, &guard, &node.key, &new_value)?;
726 }
727 if H::NEEDS_INIT {
728 self.hook.on_init(&node.key, &new_value);
729 }
730 count += 1;
731 }
732 MigrateAction::Delete => {
733 let shard_id = self.shard_for(&node.key);
734 let mut inner = self.engine.shards()[shard_id].lock();
735 self.delete_locked(shard_id, &mut inner, &guard, &node.key)?;
736 count += 1;
737 }
738 }
739 }
740
741 tracing::info!(mutations = count, "var_tree migration complete");
742 Ok(count)
743 }
744
745 pub(crate) fn replay_init(&self) {
749 if !H::NEEDS_INIT {
750 return;
751 }
752 let guard = self.index.collector().enter();
753 let mut current = crate::skiplist::strip_mark(unsafe {
754 (*self.index.head_ptr())
755 .tower(0)
756 .load(std::sync::atomic::Ordering::Acquire)
757 });
758 let mut count = 0usize;
759 while !current.is_null() {
760 let node = unsafe { &*current };
761 current = crate::skiplist::strip_mark(
762 node.tower(0).load(std::sync::atomic::Ordering::Acquire),
763 );
764 if node.is_marked() {
765 continue;
766 }
767 let value = match self.read_value_cached(node, &guard) {
768 Some(v) => v,
769 None => {
770 tracing::warn!(
771 key = ?node.key.as_bytes(),
772 "var_tree replay_init: skipping entry — value read failed"
773 );
774 continue;
775 }
776 };
777 self.hook.on_init(&node.key, &value);
778 count += 1;
779 }
780 tracing::debug!(replayed = count, "var_tree replay_init complete");
781 }
782
783 fn read_value_cached_inner(&self, disk: &DiskLoc) -> DbResult<ByteView> {
788 let len = disk.len as usize;
789 let start = (disk.offset & 4095) as usize;
790
791 if start + len > 8192 {
798 let shard = &self.engine.shards()[disk.shard_id as usize];
799 let inner = shard.lock();
800 return self.read_value_locked_result(disk, &inner);
801 }
802
803 let block_offset = disk.offset as u64 & !4095;
804 let cache_key = BlockKey {
805 shard_id: disk.shard_id,
806 file_id: disk.file_id,
807 block_offset,
808 };
809
810 if let Some(block) = self.cache.get(&cache_key) {
812 metrics::counter!("armdb.cache.hit").increment(1);
813 return Self::extract_from_block(&block, start, len, || {
814 self.get_or_read_block(disk.shard_id, disk.file_id, block_offset + 4096)
815 });
816 }
817
818 {
820 let shard = &self.engine.shards()[disk.shard_id as usize];
821 let inner = shard.lock();
822 if inner.active.file_id == disk.file_id
823 && let Some(bytes) = inner.write_buf.read(disk.offset as u64, len)
824 {
825 return Ok(ByteView::new(bytes));
826 }
827 }
828
829 metrics::counter!("armdb.cache.miss").increment(1);
831 let block = self.get_or_read_block(disk.shard_id, disk.file_id, block_offset)?;
832 Self::extract_from_block(&block, start, len, || {
833 self.get_or_read_block(disk.shard_id, disk.file_id, block_offset + 4096)
834 })
835 }
836
837 fn read_value_cached(
845 &self,
846 node: &VarNode<K>,
847 _guard: &seize::LocalGuard<'_>,
848 ) -> Option<ByteView> {
849 for _ in 0..MAX_STALE_RETRIES {
850 let disk = *node.load_disk();
851 match self.read_value_cached_inner(&disk) {
852 Ok(v) => return Some(v),
853 Err(DbError::StaleDiskLoc) => {
854 metrics::counter!("armdb.read.stale_retry", "tree" => "var_tree").increment(1);
855 continue;
856 }
857 Err(_e) => {
858 #[cfg(feature = "hot-path-tracing")]
859 tracing::error!("VarTree read_value_cached error: {:?}", _e);
860 return None;
861 }
862 }
863 }
864 None
865 }
866
867 fn extract_from_block(
868 block: &AlignedBuf,
869 start: usize,
870 len: usize,
871 next_block: impl FnOnce() -> DbResult<Arc<AlignedBuf>>,
872 ) -> DbResult<ByteView> {
873 debug_assert!(
874 start + len <= 8192,
875 "extract_from_block supports at most 2 blocks (8192 bytes)"
876 );
877 if start + len <= 4096 {
878 Ok(ByteView::new(&block[start..start + len]))
879 } else {
880 let next = next_block()?;
881 let first_part = &block[start..];
882 let second_len = len - first_part.len();
883 let mut combined = Vec::with_capacity(len);
884 combined.extend_from_slice(first_part);
885 combined.extend_from_slice(&next[..second_len]);
886 Ok(ByteView::from_vec(combined))
887 }
888 }
889
890 fn get_or_read_block(
892 &self,
893 shard_id: u8,
894 file_id: u32,
895 block_offset: u64,
896 ) -> DbResult<Arc<AlignedBuf>> {
897 let key = BlockKey {
898 shard_id,
899 file_id,
900 block_offset,
901 };
902 if let Some(cached) = self.cache.get(&key) {
903 return Ok(cached);
904 }
905 let shard = &self.engine.shards()[shard_id as usize];
906 let (buf, is_full_block) = shard.read_block(file_id, block_offset)?;
907 let arc = Arc::new(buf);
908 if is_full_block {
912 self.cache.insert(key, arc.clone());
913 }
914 Ok(arc)
915 }
916
917 pub fn cas(&self, key: &K, expected: &[u8], new_value: &[u8]) -> DbResult<()> {
925 metrics::counter!("armdb.ops", "op" => "cas", "tree" => "var_tree").increment(1);
926 #[cfg(feature = "hot-path-tracing")]
927 tracing::trace!("var_tree.cas");
928 let shard_id = self.shard_for(key);
929 let shard = &self.engine.shards()[shard_id];
930 let mut inner = shard.lock();
931
932 let guard = self.index.collector().enter();
933 let existing = self
934 .index
935 .get(key.as_bytes(), &guard)
936 .ok_or(DbError::KeyNotFound)?;
937
938 let disk = *existing.load_disk();
939 let current = self
940 .read_value_locked(&disk, &inner)
941 .ok_or(DbError::KeyNotFound)?;
942 if current.as_ref() != expected {
943 return Err(DbError::CasMismatch);
944 }
945
946 let (new_disk_loc, _gsn) =
947 inner.append_entry(shard_id as u8, key.as_bytes(), new_value, false)?;
948
949 let new_disk = Box::into_raw(Box::new(new_disk_loc));
950 let old_disk_ptr = existing.swap_disk(new_disk);
951 let old_disk = unsafe { *old_disk_ptr };
952 inner.add_dead_bytes(
953 old_disk.file_id,
954 crate::entry::entry_size(size_of::<K>(), old_disk.len),
955 );
956 unsafe {
957 self.index
958 .collector()
959 .retire(old_disk_ptr, seize::reclaim::boxed::<DiskLoc>);
960 }
961
962 drop(inner);
963 self.hook.on_write(
964 key,
965 if H::NEEDS_OLD_VALUE {
966 Some(&*current)
967 } else {
968 None
969 },
970 Some(new_value),
971 );
972 Ok(())
973 }
974
975 pub fn update(&self, key: &K, f: impl FnOnce(&[u8]) -> ByteView) -> DbResult<Option<ByteView>> {
983 self.update_inner(key, f, false)
984 }
985
986 pub fn fetch_update(
988 &self,
989 key: &K,
990 f: impl FnOnce(&[u8]) -> ByteView,
991 ) -> DbResult<Option<ByteView>> {
992 self.update_inner(key, f, true)
993 }
994
995 pub(crate) fn try_update_inner(
996 &self,
997 key: &K,
998 f: impl FnOnce(&[u8]) -> DbResult<Option<ByteView>>,
999 return_old: bool,
1000 ) -> DbResult<Option<ByteView>> {
1001 metrics::counter!("armdb.ops", "op" => "update", "tree" => "var_tree").increment(1);
1002 #[cfg(feature = "hot-path-tracing")]
1003 tracing::trace!("var_tree.update");
1004 let shard_id = self.shard_for(key);
1005 let shard = &self.engine.shards()[shard_id];
1006 let mut inner = shard.lock();
1007
1008 let guard = self.index.collector().enter();
1009 let existing = match self.index.get(key.as_bytes(), &guard) {
1010 Some(n) => n,
1011 None => return Ok(None),
1012 };
1013
1014 let disk = *existing.load_disk();
1015 let current = match self.read_value_locked(&disk, &inner) {
1016 Some(v) => v,
1017 None => return Ok(None),
1018 };
1019
1020 let new_value = match f(¤t)? {
1021 Some(v) => v,
1022 None => return Ok(Some(current)),
1023 };
1024
1025 let (new_disk_loc, _gsn) =
1026 inner.append_entry(shard_id as u8, key.as_bytes(), &new_value, false)?;
1027
1028 let new_disk = Box::into_raw(Box::new(new_disk_loc));
1029 let old_disk_ptr = existing.swap_disk(new_disk);
1030 let old_disk = unsafe { *old_disk_ptr };
1031 inner.add_dead_bytes(
1032 old_disk.file_id,
1033 crate::entry::entry_size(size_of::<K>(), old_disk.len),
1034 );
1035 unsafe {
1036 self.index
1037 .collector()
1038 .retire(old_disk_ptr, seize::reclaim::boxed::<DiskLoc>);
1039 }
1040
1041 drop(inner);
1042 self.hook.on_write(
1043 key,
1044 if H::NEEDS_OLD_VALUE {
1045 Some(&*current)
1046 } else {
1047 None
1048 },
1049 Some(&*new_value),
1050 );
1051 Ok(Some(if return_old { current } else { new_value }))
1052 }
1053
1054 fn update_inner(
1055 &self,
1056 key: &K,
1057 f: impl FnOnce(&[u8]) -> ByteView,
1058 return_old: bool,
1059 ) -> DbResult<Option<ByteView>> {
1060 self.try_update_inner(key, |bytes| Ok(Some(f(bytes))), return_old)
1061 }
1062
1063 fn read_value_locked_result(&self, disk: &DiskLoc, inner: &ShardInner) -> DbResult<ByteView> {
1067 let len = disk.len as usize;
1068
1069 if inner.active.file_id == disk.file_id
1071 && let Some(bytes) = inner.write_buf.read(disk.offset as u64, len)
1072 {
1073 return Ok(ByteView::new(bytes));
1074 }
1075
1076 let block_offset = disk.offset as u64 & !4095;
1078 let start = (disk.offset & 4095) as usize;
1079 if start + len <= 4096 {
1080 let cache_key = BlockKey {
1081 shard_id: disk.shard_id,
1082 file_id: disk.file_id,
1083 block_offset,
1084 };
1085 if let Some(block) = self.cache.get(&cache_key) {
1086 return Ok(ByteView::new(&block[start..start + len]));
1087 }
1088
1089 let (buf, is_full_block) = inner.read_block_locked(disk.file_id, block_offset)?;
1091 let arc = Arc::new(buf);
1092 if is_full_block {
1093 self.cache.insert(cache_key, arc.clone());
1094 }
1095 return Ok(ByteView::new(&arc[start..start + len]));
1096 }
1097
1098 let bytes = inner.read_value_from_disk_locked(disk)?;
1100 Ok(ByteView::new(&bytes))
1101 }
1102
1103 fn read_value_locked(&self, disk: &DiskLoc, inner: &ShardInner) -> Option<ByteView> {
1109 match self.read_value_locked_result(disk, inner) {
1110 Ok(v) => Some(v),
1111 Err(DbError::StaleDiskLoc) => {
1112 tracing::error!(
1113 file_id = disk.file_id,
1114 shard_id = disk.shard_id,
1115 "stale DiskLoc under shard lock - programming bug",
1116 );
1117 None
1118 }
1119 Err(_) => None,
1120 }
1121 }
1122
1123 pub fn shard_for(&self, key: &K) -> usize {
1124 crate::shard_for_key(key, self.shard_prefix_bits, self.engine.shards().len())
1125 }
1126}
1127
1128#[cfg(feature = "replication")]
1129impl<K: Key, H: WriteHook<K>> crate::replication::ReplicationTarget for VarTree<K, H> {
1130 fn apply_entry(
1131 &self,
1132 _shard_inner: &mut crate::shard::ShardInner,
1133 shard_id: u8,
1134 file_id: u32,
1135 entry_offset: u64,
1136 header: &crate::entry::EntryHeader,
1137 key: &[u8],
1138 _value: &[u8],
1139 ) -> DbResult<crate::replication::ApplyOutcome> {
1140 use crate::replication::ApplyOutcome;
1141
1142 let key: K = K::from_bytes(key);
1143
1144 let value_offset =
1145 entry_offset + size_of::<crate::entry::EntryHeader>() as u64 + size_of::<K>() as u64;
1146 let disk = DiskLoc::new(shard_id, file_id, value_offset as u32, header.value_len);
1147
1148 if header.is_tombstone() {
1149 let guard = self.index.collector().enter();
1150 let removed = self.index.remove(key.as_bytes(), &guard);
1151 match removed {
1152 Some(node_ptr) => {
1153 let old_disk = *unsafe { &*node_ptr }.load_disk();
1154 Ok(ApplyOutcome::TombstoneRemoved(old_disk))
1155 }
1156 None => Ok(ApplyOutcome::Inserted), }
1158 } else {
1159 let guard = self.index.collector().enter();
1160 let height = random_height();
1161 let node_ptr = VarNode::alloc(key, disk, height);
1162 match self.index.insert(node_ptr, &guard) {
1163 InsertResult::Inserted => Ok(ApplyOutcome::Inserted),
1164 InsertResult::Exists(existing) => {
1165 let new_disk_ptr = Box::into_raw(Box::new(disk));
1166 let old_disk_ptr = existing.swap_disk(new_disk_ptr);
1167 let old_disk = unsafe { *old_disk_ptr };
1168 unsafe {
1169 self.index
1170 .collector()
1171 .retire(old_disk_ptr, seize::reclaim::boxed::<DiskLoc>);
1172 }
1173 unsafe {
1175 (*node_ptr)
1176 .disk
1177 .store(std::ptr::null_mut(), std::sync::atomic::Ordering::Relaxed);
1178 VarNode::<K>::dealloc_node(node_ptr);
1179 }
1180 Ok(ApplyOutcome::Replaced(old_disk))
1181 }
1182 }
1183 }
1184 }
1185
1186 fn try_apply_entry(
1187 &self,
1188 shard_inner: &mut crate::shard::ShardInner,
1189 shard_id: u8,
1190 file_id: u32,
1191 entry_offset: u64,
1192 header: &crate::entry::EntryHeader,
1193 raw_after_header: &[u8],
1194 ) -> DbResult<crate::replication::ApplyOutcome> {
1195 use crate::replication::ApplyOutcome;
1196
1197 if raw_after_header.len() < size_of::<K>() + header.value_len as usize {
1198 return Ok(ApplyOutcome::NotMatched);
1199 }
1200 let key = &raw_after_header[..size_of::<K>()];
1201 let value = &raw_after_header[size_of::<K>()..size_of::<K>() + header.value_len as usize];
1202 let crc = crate::entry::compute_crc32(header.gsn, header.value_len, key, value);
1203 if crc != header.crc32 {
1204 return Ok(ApplyOutcome::NotMatched);
1205 }
1206 self.apply_entry(
1207 shard_inner,
1208 shard_id,
1209 file_id,
1210 entry_offset,
1211 header,
1212 key,
1213 value,
1214 )
1215 }
1216
1217 fn key_len(&self) -> usize {
1218 size_of::<K>()
1219 }
1220}
1221
1222#[cfg(feature = "replication")]
1223impl<K: Key, H: WriteHook<K>> VarTree<K, H> {
1224 pub fn start_replication_server(
1235 &self,
1236 bind_addr: std::net::SocketAddr,
1237 signal: crate::shutdown::ShutdownSignal,
1238 ) -> crate::error::DbResult<crate::replication::ReplicationServer> {
1239 let consumers = self.install_replication_producers()?;
1240 crate::replication::ReplicationServer::start(
1241 bind_addr,
1242 self.engine.shards().clone(),
1243 consumers,
1244 self.engine.config().max_file_size,
1245 signal,
1246 )
1247 }
1248
1249 pub fn start_replication_server_with_options(
1250 &self,
1251 bind_addr: std::net::SocketAddr,
1252 signal: crate::shutdown::ShutdownSignal,
1253 options: crate::replication::ReplicationServerOptions,
1254 ) -> crate::error::DbResult<crate::replication::ReplicationServer> {
1255 let consumers = self.install_replication_producers()?;
1256 crate::replication::ReplicationServer::start_with_options(
1257 bind_addr,
1258 self.engine.shards().clone(),
1259 consumers,
1260 self.engine.config().max_file_size,
1261 signal,
1262 options,
1263 )
1264 }
1265
1266 fn install_replication_producers(
1267 &self,
1268 ) -> crate::error::DbResult<Vec<rtrb::Consumer<crate::replication::ReplicationEntry>>> {
1269 const SPSC_CAPACITY: usize = 4096;
1270 let shards = self.engine.shards();
1271 let mut consumers = Vec::with_capacity(shards.len());
1272 for shard in shards.iter() {
1273 let (p, c) = rtrb::RingBuffer::new(SPSC_CAPACITY);
1274 shard.set_replication_producer(p);
1275 consumers.push(c);
1276 }
1277 Ok(consumers)
1278 }
1279
1280 pub fn start_replication_client(
1285 &self,
1286 leader_addr: std::net::SocketAddr,
1287 registry: std::sync::Arc<crate::replication::ReplicationRegistry>,
1288 signal: crate::shutdown::ShutdownSignal,
1289 ) -> crate::error::DbResult<crate::replication::ReplicationClient> {
1290 crate::replication::ReplicationClient::start(
1291 leader_addr,
1292 self.engine.shards().clone(),
1293 registry,
1294 size_of::<K>() as u16,
1295 signal,
1296 )
1297 }
1298
1299 pub fn start_replication_client_with_options(
1300 &self,
1301 leader_addr: std::net::SocketAddr,
1302 registry: std::sync::Arc<crate::replication::ReplicationRegistry>,
1303 signal: crate::shutdown::ShutdownSignal,
1304 options: crate::replication::ReplicationClientOptions,
1305 ) -> crate::error::DbResult<crate::replication::ReplicationClient> {
1306 crate::replication::ReplicationClient::start_with_options(
1307 leader_addr,
1308 self.engine.shards().clone(),
1309 registry,
1310 size_of::<K>() as u16,
1311 signal,
1312 options,
1313 )
1314 }
1315}
1316
1317#[cfg(feature = "replication")]
1318impl<K, H> VarTree<K, H>
1319where
1320 K: Key + Send + Sync + 'static,
1321 H: WriteHook<K> + Send + Sync + 'static,
1322{
1323 pub fn as_replication_target(
1335 self: &std::sync::Arc<Self>,
1336 ) -> Box<dyn crate::replication::ReplicationTarget> {
1337 Box::new(std::sync::Arc::clone(self))
1338 }
1339}
1340
1341pub struct VarShard<'a, K: Key, H: WriteHook<K> = NoHook> {
1345 tree: &'a VarTree<K, H>,
1346 inner: MutexGuard<'a, ShardInner>,
1347 shard_id: usize,
1348 guard: seize::LocalGuard<'a>,
1349}
1350
1351impl<K: Key, H: WriteHook<K>> VarShard<'_, K, H> {
1352 pub fn put(&mut self, key: &K, value: &[u8]) -> DbResult<()> {
1353 self.check_shard(key)?;
1354 self.tree
1355 .put_locked(self.shard_id, &mut self.inner, &self.guard, key, value)
1356 }
1357
1358 pub fn insert(&mut self, key: &K, value: &[u8]) -> DbResult<()> {
1359 self.check_shard(key)?;
1360 self.tree
1361 .insert_locked(self.shard_id, &mut self.inner, &self.guard, key, value)
1362 }
1363
1364 pub fn delete(&mut self, key: &K) -> DbResult<bool> {
1365 self.check_shard(key)?;
1366 self.tree
1367 .delete_locked(self.shard_id, &mut self.inner, &self.guard, key)
1368 }
1369
1370 pub fn get(&self, key: &K) -> Option<ByteView> {
1371 let node = self.tree.index.get(key.as_bytes(), &self.guard)?;
1372 let disk = *node.load_disk();
1373 self.tree.read_value_locked(&disk, &self.inner)
1374 }
1375
1376 pub fn get_or_err(&self, key: &K) -> DbResult<ByteView> {
1377 self.get(key).ok_or(DbError::KeyNotFound)
1378 }
1379
1380 pub fn contains(&self, key: &K) -> bool {
1381 self.tree.index.get(key.as_bytes(), &self.guard).is_some()
1382 }
1383
1384 fn check_shard(&self, key: &K) -> DbResult<()> {
1385 if self.tree.shard_for(key) != self.shard_id {
1386 return Err(DbError::ShardMismatch);
1387 }
1388 Ok(())
1389 }
1390}
1391
1392fn bound_owned<K: Copy>(b: &Bound<&K>) -> Bound<K> {
1393 match b {
1394 Bound::Included(k) => Bound::Included(**k),
1395 Bound::Excluded(k) => Bound::Excluded(**k),
1396 Bound::Unbounded => Bound::Unbounded,
1397 }
1398}
1399
1400fn prefix_to_end_bound<K: Key>(prefix: &[u8]) -> Bound<K> {
1401 let mut incremented = prefix.to_vec();
1402 let mut carry = true;
1403 for byte in incremented.iter_mut().rev() {
1404 if carry {
1405 if *byte == 0xFF {
1406 *byte = 0x00;
1407 } else {
1408 *byte += 1;
1409 carry = false;
1410 break;
1411 }
1412 }
1413 }
1414 if carry {
1415 Bound::Unbounded
1416 } else {
1417 let mut end = K::zeroed();
1418 end.as_bytes_mut()[..incremented.len()].copy_from_slice(&incremented);
1419 Bound::Excluded(end)
1420 }
1421}
1422
1423pub struct VarIter<'a, K: Key, H: WriteHook<K> = NoHook> {
1429 tree: &'a VarTree<K, H>,
1430 front: *mut VarNode<K>,
1431 back: Option<*mut VarNode<K>>,
1433 end: Bound<K>,
1434 start: Bound<K>,
1435 reversed: bool,
1436 done: bool,
1437 _guard: seize::LocalGuard<'a>,
1438}
1439
1440impl<K: Key, H: WriteHook<K>> Iterator for VarIter<'_, K, H> {
1441 type Item = (K, ByteView);
1442
1443 fn next(&mut self) -> Option<Self::Item> {
1444 loop {
1445 if self.done || self.front.is_null() {
1446 return None;
1447 }
1448 let node = unsafe { &*self.front };
1449 let converged = self.back.is_some_and(|back| std::ptr::eq(self.front, back));
1450 self.front = crate::skiplist::strip_mark(
1451 node.tower(0).load(std::sync::atomic::Ordering::Acquire),
1452 );
1453 if converged {
1454 self.done = true;
1455 }
1456 if node.is_marked() {
1457 if converged {
1458 return None;
1459 }
1460 continue;
1461 }
1462 if !self.check_end(&node.key) {
1463 self.done = true;
1464 return None;
1465 }
1466 match self.tree.read_value_cached(node, &self._guard) {
1467 Some(value) => return Some((node.key, value)),
1468 None => {
1469 if converged {
1470 return None;
1471 }
1472 continue;
1473 }
1474 }
1475 }
1476 }
1477}
1478
1479impl<K: Key, H: WriteHook<K>> DoubleEndedIterator for VarIter<'_, K, H> {
1480 fn next_back(&mut self) -> Option<Self::Item> {
1481 if self.back.is_none() {
1482 self.back = Some(self.resolve_back());
1483 if self.front.is_null() {
1484 self.done = true;
1485 }
1486 }
1487 loop {
1488 let back = self.back.unwrap_or(std::ptr::null_mut());
1489 if self.done || back.is_null() {
1490 return None;
1491 }
1492 let node = unsafe { &*back };
1493 let key = node.key;
1494 let converged = std::ptr::eq(self.front, back);
1495 self.back = Some(self.tree.index().find_last_lt(key.as_bytes(), &self._guard));
1496 if converged {
1497 self.done = true;
1498 }
1499 if node.is_marked() {
1500 if converged {
1501 return None;
1502 }
1503 continue;
1504 }
1505 if !self.check_start(&key) {
1506 self.done = true;
1507 return None;
1508 }
1509 match self.tree.read_value_cached(node, &self._guard) {
1510 Some(value) => return Some((key, value)),
1511 None => {
1512 if converged {
1513 return None;
1514 }
1515 continue;
1516 }
1517 }
1518 }
1519 }
1520}
1521
1522impl<K: Key, H: WriteHook<K>> VarIter<'_, K, H> {
1523 fn resolve_back(&self) -> *mut VarNode<K> {
1525 let index = self.tree.index();
1526 match &self.end {
1527 Bound::Unbounded => index.find_last(&self._guard),
1528 Bound::Excluded(k) => index.find_last_lt(k.as_bytes(), &self._guard),
1529 Bound::Included(k) => {
1530 let ge = index.find_first_ge(k.as_bytes(), &self._guard);
1531 if !ge.is_null()
1532 && !unsafe { &*ge }.is_marked()
1533 && unsafe { &*ge }.key_bytes() == k.as_bytes()
1534 {
1535 ge
1536 } else {
1537 index.find_last_lt(k.as_bytes(), &self._guard)
1538 }
1539 }
1540 }
1541 }
1542
1543 #[inline(always)]
1544 fn check_end(&self, key: &K) -> bool {
1545 match &self.end {
1546 Bound::Unbounded => true,
1547 Bound::Excluded(end) => {
1548 if self.reversed {
1549 key.as_bytes() > end.as_bytes()
1550 } else {
1551 key.as_bytes() < end.as_bytes()
1552 }
1553 }
1554 Bound::Included(end) => {
1555 if self.reversed {
1556 key.as_bytes() >= end.as_bytes()
1557 } else {
1558 key.as_bytes() <= end.as_bytes()
1559 }
1560 }
1561 }
1562 }
1563
1564 #[inline(always)]
1565 fn check_start(&self, key: &K) -> bool {
1566 match &self.start {
1567 Bound::Unbounded => true,
1568 Bound::Excluded(s) => {
1569 if self.reversed {
1570 key.as_bytes() < s.as_bytes()
1571 } else {
1572 key.as_bytes() > s.as_bytes()
1573 }
1574 }
1575 Bound::Included(s) => {
1576 if self.reversed {
1577 key.as_bytes() <= s.as_bytes()
1578 } else {
1579 key.as_bytes() >= s.as_bytes()
1580 }
1581 }
1582 }
1583 }
1584 pub fn collect_keys(&mut self) -> Vec<K> {
1586 self.map(|(k, _)| k).collect()
1587 }
1588
1589 pub fn collect_entries(&mut self) -> Vec<(K, ByteView)> {
1591 self.collect()
1592 }
1593}
1594
1595#[cfg(test)]
1596mod tests {
1597 use super::*;
1598 use crate::Config;
1599 use crate::compaction::compact_shard;
1600 use tempfile::tempdir;
1601
1602 use std::sync::atomic::{AtomicUsize, Ordering as AtomicOrdering};
1603
1604 #[derive(Default)]
1607 struct CountingHook<const NEEDS_INIT: bool, const NEEDS_OLD: bool> {
1608 writes: AtomicUsize,
1609 writes_with_old: AtomicUsize,
1610 inits: AtomicUsize,
1611 last_write_new: crate::sync::Mutex<Option<Vec<u8>>>,
1612 last_init_value: crate::sync::Mutex<Option<Vec<u8>>>,
1613 }
1614
1615 impl<const NEEDS_INIT: bool, const NEEDS_OLD: bool> WriteHook<[u8; 8]>
1616 for CountingHook<NEEDS_INIT, NEEDS_OLD>
1617 {
1618 const NEEDS_OLD_VALUE: bool = NEEDS_OLD;
1619 const NEEDS_INIT: bool = NEEDS_INIT;
1620
1621 fn on_write(&self, _key: &[u8; 8], old: Option<&[u8]>, new: Option<&[u8]>) {
1622 self.writes.fetch_add(1, AtomicOrdering::Relaxed);
1623 if old.is_some() {
1624 self.writes_with_old.fetch_add(1, AtomicOrdering::Relaxed);
1625 }
1626 *crate::sync::lock(&self.last_write_new) = new.map(<[u8]>::to_vec);
1627 }
1628
1629 fn on_init(&self, _key: &[u8; 8], value: &[u8]) {
1630 self.inits.fetch_add(1, AtomicOrdering::Relaxed);
1631 *crate::sync::lock(&self.last_init_value) = Some(value.to_vec());
1632 }
1633 }
1634
1635 fn open_test_tree(dir: &std::path::Path) -> VarTree<[u8; 8]> {
1636 let mut cfg = Config::test();
1637 cfg.shard_count = 1;
1638 cfg.max_file_size = 8192;
1639 cfg.write_buffer_size = 8192;
1640 cfg.compaction_threshold = 0.0;
1641 VarTree::open(dir, cfg).expect("open test tree")
1642 }
1643
1644 fn open_test_tree_hooked<const NEEDS_INIT: bool, const NEEDS_OLD: bool>(
1645 dir: &std::path::Path,
1646 hook: CountingHook<NEEDS_INIT, NEEDS_OLD>,
1647 ) -> VarTree<[u8; 8], CountingHook<NEEDS_INIT, NEEDS_OLD>> {
1648 let mut cfg = Config::test();
1649 cfg.shard_count = 1;
1650 cfg.max_file_size = 8192;
1651 cfg.write_buffer_size = 8192;
1652 cfg.compaction_threshold = 0.0;
1653 VarTree::open_hooked(dir, cfg, hook).expect("open hooked test tree")
1654 }
1655
1656 fn put_until_compactable(tree: &VarTree<[u8; 8]>, key: [u8; 8]) -> DiskLoc {
1657 tree.put(&key, &[0u8; 256]).expect("first put");
1661 let snap = {
1662 let guard = tree.index.collector().enter();
1663 let node = tree.index.get(key.as_bytes(), &guard).expect("indexed");
1664 *node.load_disk()
1665 };
1666 for i in 1..65u8 {
1670 tree.put(&key, &[i; 256]).expect("put");
1671 }
1672 tree.put(&key, b"final-value-payload-XX")
1673 .expect("final put");
1674 snap
1675 }
1676
1677 #[test]
1678 fn read_value_cached_inner_returns_stale_after_compaction() {
1679 let dir = tempdir().unwrap();
1680 let tree = open_test_tree(dir.path());
1681
1682 let key = 7u64.to_be_bytes();
1683 let snap = put_until_compactable(&tree, key);
1684
1685 let shard = &tree.engine.shards()[snap.shard_id as usize];
1686 let _ = compact_shard(shard, &tree, 0.0).expect("compaction");
1687
1688 match tree.read_value_cached_inner(&snap) {
1689 Err(DbError::StaleDiskLoc) => {}
1690 Ok(v) => panic!("expected StaleDiskLoc, got Ok({:?})", v.as_bytes()),
1691 Err(e) => panic!("expected StaleDiskLoc, got Err({e})"),
1692 }
1693 }
1694
1695 #[test]
1696 fn read_value_cached_returns_some_after_compaction() {
1697 let dir = tempdir().unwrap();
1698 let tree = open_test_tree(dir.path());
1699
1700 let key = 11u64.to_be_bytes();
1701 let _snap = put_until_compactable(&tree, key);
1702 let shard = &tree.engine.shards()[0];
1703 let _ = compact_shard(shard, &tree, 0.0).expect("compaction");
1704
1705 let guard = tree.index.collector().enter();
1706 let node = tree.index.get(key.as_bytes(), &guard).expect("indexed");
1707 let v = tree
1708 .read_value_cached(node, &guard)
1709 .expect("post-compaction value must be readable");
1710 assert_eq!(v.as_bytes(), b"final-value-payload-XX");
1711 }
1712
1713 #[test]
1714 fn get_during_compaction_returns_some() {
1715 let dir = tempdir().unwrap();
1716 let tree = open_test_tree(dir.path());
1717
1718 let key = 13u64.to_be_bytes();
1719 let _snap = put_until_compactable(&tree, key);
1720 let shard = &tree.engine.shards()[0];
1721 let _ = compact_shard(shard, &tree, 0.0).expect("compaction");
1722
1723 let v = tree.get(&key).expect("post-compaction get");
1724 assert_eq!(v.as_bytes(), b"final-value-payload-XX");
1725 }
1726
1727 #[test]
1728 fn iter_during_compaction_yields_all_live_keys() {
1729 let dir = tempdir().unwrap();
1730 let tree = open_test_tree(dir.path());
1731
1732 for k in 1u64..=3 {
1733 put_until_compactable(&tree, k.to_be_bytes());
1734 }
1735 let shard = &tree.engine.shards()[0];
1736 let _ = compact_shard(shard, &tree, 0.0).expect("compaction");
1737
1738 let collected: std::collections::BTreeMap<[u8; 8], Vec<u8>> = tree
1739 .iter()
1740 .map(|(k, v)| (k, v.as_bytes().to_vec()))
1741 .collect();
1742 assert_eq!(collected.len(), 3);
1743 for k in 1u64..=3 {
1744 let bytes = collected
1745 .get(&k.to_be_bytes())
1746 .expect("every original key must remain");
1747 assert_eq!(bytes.as_slice(), b"final-value-payload-XX");
1748 }
1749 }
1750
1751 #[test]
1752 fn get_or_read_block_returns_stale_for_unknown_file_id() {
1753 let dir = tempdir().unwrap();
1754 let tree = open_test_tree(dir.path());
1755
1756 match tree.get_or_read_block(0, 9999, 0) {
1757 Err(DbError::StaleDiskLoc) => {}
1758 Ok(_) => panic!("expected StaleDiskLoc, got Ok"),
1759 Err(e) => panic!("expected StaleDiskLoc, got Err({e})"),
1760 }
1761 }
1762
1763 #[test]
1764 fn extract_from_block_propagates_next_block_error() {
1765 let block = AlignedBuf::zeroed(4096);
1766 let start = 4090;
1767 let len = 32;
1768 let result: DbResult<ByteView> =
1769 VarTree::<[u8; 8]>::extract_from_block(&block, start, len, || {
1770 Err(DbError::StaleDiskLoc)
1771 });
1772 match result {
1773 Err(DbError::StaleDiskLoc) => {}
1774 Ok(_) => panic!("expected StaleDiskLoc, got Ok"),
1775 Err(e) => panic!("expected StaleDiskLoc, got Err({e})"),
1776 }
1777 }
1778
1779 #[test]
1780 fn extract_from_block_multi_block_first_cached_second_stale() {
1781 let dir = tempdir().unwrap();
1782 let tree = open_test_tree(dir.path());
1783
1784 let key = 21u64.to_be_bytes();
1785 let value = vec![0xCDu8; 4073];
1786 tree.put(&key, &value).expect("initial put");
1787
1788 let snap = {
1790 let guard = tree.index.collector().enter();
1791 let node = tree.index.get(key.as_bytes(), &guard).expect("indexed");
1792 *node.load_disk()
1793 };
1794
1795 for i in 0..65u8 {
1800 tree.put(&key, &[i; 256]).expect("overwrite");
1801 }
1802 tree.put(&key, b"final").expect("final");
1803
1804 let shard = &tree.engine.shards()[0];
1805 let _ = compact_shard(shard, &tree, 0.0).expect("compaction");
1806
1807 let block_offset = snap.offset as u64 & !4095;
1808 let first_block = AlignedBuf::zeroed(4096);
1809 let cache_key = BlockKey {
1810 shard_id: snap.shard_id,
1811 file_id: snap.file_id,
1812 block_offset,
1813 };
1814 tree.cache.insert(cache_key, Arc::new(first_block));
1815
1816 match tree.read_value_cached_inner(&snap) {
1817 Err(DbError::StaleDiskLoc) => {}
1818 Ok(_) => panic!("expected StaleDiskLoc from second block, got Ok"),
1819 Err(e) => panic!("expected StaleDiskLoc, got Err({e})"),
1820 }
1821 }
1822
1823 #[test]
1824 fn retry_limit_returns_none_on_persistent_stale() {
1825 let dir = tempdir().unwrap();
1826 let tree = open_test_tree(dir.path());
1827
1828 let key = 99u64.to_be_bytes();
1829 tree.put(&key, b"payload").expect("put");
1830
1831 let guard = tree.index.collector().enter();
1837 let node = tree.index.get(key.as_bytes(), &guard).expect("indexed");
1838 let snap = *node.load_disk();
1839 drop(guard);
1840
1841 {
1842 let shard = &tree.engine.shards()[snap.shard_id as usize];
1843 let mut inner = shard.lock();
1844 inner.immutable = Vec::new();
1845 inner.active.file_id = u32::MAX;
1849 }
1850
1851 let guard = tree.index.collector().enter();
1852 let node = tree
1853 .index
1854 .get(key.as_bytes(), &guard)
1855 .expect("still indexed");
1856 assert!(
1857 tree.read_value_cached(node, &guard).is_none(),
1858 "MAX_STALE_RETRIES must terminate the loop and return None"
1859 );
1860 }
1861
1862 #[test]
1863 fn var_tree_replay_init_fires_on_init_per_live_key_raw() {
1864 let dir = tempdir().unwrap();
1865 let tree = open_test_tree_hooked::<true, false>(dir.path(), CountingHook::default());
1866
1867 for i in 0u64..5 {
1868 tree.put(&i.to_be_bytes(), &[i as u8; 16]).expect("put");
1869 }
1870 tree.delete(&3u64.to_be_bytes()).expect("delete");
1872
1873 tree.hook.writes.store(0, AtomicOrdering::Relaxed);
1875 tree.hook.inits.store(0, AtomicOrdering::Relaxed);
1876
1877 tree.replay_init();
1878
1879 assert_eq!(
1880 tree.hook.inits.load(AtomicOrdering::Relaxed),
1881 4,
1882 "4 live keys"
1883 );
1884 assert_eq!(
1885 tree.hook.writes.load(AtomicOrdering::Relaxed),
1886 0,
1887 "no on_write"
1888 );
1889 }
1890
1891 #[test]
1892 fn var_tree_replay_init_no_hook_is_noop() {
1893 let dir = tempdir().unwrap();
1894 let tree = open_test_tree(dir.path());
1895
1896 for i in 0u64..3 {
1897 tree.put(&i.to_be_bytes(), &[i as u8; 8]).expect("put");
1898 }
1899 tree.replay_init();
1901 assert!(tree.get(&0u64.to_be_bytes()).is_some());
1903 }
1904
1905 #[test]
1906 fn var_tree_migrate_keep_fires_on_init_not_on_write_raw() {
1907 use crate::MigrateAction;
1908 let dir = tempdir().unwrap();
1909 let tree = open_test_tree_hooked::<true, false>(dir.path(), CountingHook::default());
1910
1911 for i in 0u64..4 {
1912 tree.put(&i.to_be_bytes(), &[i as u8; 16]).expect("put");
1913 }
1914 tree.hook.writes.store(0, AtomicOrdering::Relaxed);
1915 tree.hook.inits.store(0, AtomicOrdering::Relaxed);
1916
1917 let mutated = tree.migrate(|_, _| MigrateAction::Keep).expect("migrate");
1918
1919 assert_eq!(mutated, 0);
1920 assert_eq!(
1921 tree.hook.inits.load(AtomicOrdering::Relaxed),
1922 4,
1923 "4 keeps -> 4 on_init"
1924 );
1925 assert_eq!(
1926 tree.hook.writes.load(AtomicOrdering::Relaxed),
1927 0,
1928 "Keep must not fire on_write"
1929 );
1930 }
1931
1932 #[test]
1933 fn var_tree_migrate_update_fires_on_init_with_new_value_raw() {
1934 use crate::MigrateAction;
1935 let dir = tempdir().unwrap();
1936 let tree = open_test_tree_hooked::<true, false>(dir.path(), CountingHook::default());
1937
1938 let key = 42u64.to_be_bytes();
1939 tree.put(&key, b"old-value").expect("put");
1940 tree.hook.writes.store(0, AtomicOrdering::Relaxed);
1941 tree.hook.inits.store(0, AtomicOrdering::Relaxed);
1942
1943 let new = ByteView::new(b"new-value");
1944 let mutated = tree
1945 .migrate(move |_, _| MigrateAction::Update(new.clone()))
1946 .expect("migrate");
1947
1948 assert_eq!(mutated, 1);
1949 assert_eq!(tree.hook.inits.load(AtomicOrdering::Relaxed), 1);
1950 assert_eq!(
1951 crate::sync::lock(&tree.hook.last_init_value).as_deref(),
1952 Some(b"new-value".as_ref()),
1953 "on_init must receive the NEW value"
1954 );
1955 assert_eq!(
1956 tree.hook.writes.load(AtomicOrdering::Relaxed),
1957 0,
1958 "Update must NOT fire on_write (was double-firing through self.put)"
1959 );
1960 assert_eq!(tree.get(&key).unwrap().as_bytes(), b"new-value");
1961 }
1962
1963 #[test]
1964 fn var_tree_migrate_delete_fires_no_hooks_raw() {
1965 use crate::MigrateAction;
1966 let dir = tempdir().unwrap();
1967 let tree = open_test_tree_hooked::<true, false>(dir.path(), CountingHook::default());
1968
1969 let key = 7u64.to_be_bytes();
1970 tree.put(&key, b"x").expect("put");
1971 tree.hook.writes.store(0, AtomicOrdering::Relaxed);
1972 tree.hook.inits.store(0, AtomicOrdering::Relaxed);
1973
1974 let mutated = tree.migrate(|_, _| MigrateAction::Delete).expect("migrate");
1975
1976 assert_eq!(mutated, 1);
1977 assert_eq!(tree.hook.inits.load(AtomicOrdering::Relaxed), 0);
1978 assert_eq!(tree.hook.writes.load(AtomicOrdering::Relaxed), 0);
1979 assert!(tree.get(&key).is_none());
1980 }
1981
1982 #[test]
1983 fn var_tree_migrate_no_init_hook_is_silent_for_keep_and_update() {
1984 use crate::MigrateAction;
1985 let dir = tempdir().unwrap();
1986 let tree = open_test_tree_hooked::<false, false>(dir.path(), CountingHook::default());
1988
1989 for i in 0u64..3 {
1990 tree.put(&i.to_be_bytes(), &[i as u8; 16]).expect("put");
1991 }
1992 tree.hook.writes.store(0, AtomicOrdering::Relaxed);
1993 tree.hook.inits.store(0, AtomicOrdering::Relaxed);
1994
1995 tree.migrate(|_, _| MigrateAction::Keep)
1997 .expect("migrate keep");
1998 assert_eq!(
1999 tree.hook.inits.load(AtomicOrdering::Relaxed),
2000 0,
2001 "Keep with NEEDS_INIT=false"
2002 );
2003 assert_eq!(tree.hook.writes.load(AtomicOrdering::Relaxed), 0);
2004
2005 let new = ByteView::new(b"new");
2007 tree.migrate(move |_, _| MigrateAction::Update(new.clone()))
2008 .expect("migrate update");
2009 assert_eq!(
2010 tree.hook.inits.load(AtomicOrdering::Relaxed),
2011 0,
2012 "Update with NEEDS_INIT=false"
2013 );
2014 assert_eq!(
2015 tree.hook.writes.load(AtomicOrdering::Relaxed),
2016 0,
2017 "Update must not fire on_write either"
2018 );
2019 }
2020
2021 #[test]
2022 fn var_tree_public_put_still_fires_on_write_once() {
2023 let dir = tempdir().unwrap();
2024 let tree = open_test_tree_hooked::<true, false>(dir.path(), CountingHook::default());
2025
2026 tree.put(&1u64.to_be_bytes(), b"v").expect("put");
2027 assert_eq!(tree.hook.writes.load(AtomicOrdering::Relaxed), 1);
2028 }
2029
2030 #[test]
2031 fn var_tree_atomic_does_not_fire_hooks() {
2032 let dir = tempdir().unwrap();
2033 let tree = open_test_tree_hooked::<true, false>(dir.path(), CountingHook::default());
2034
2035 let key = 1u64.to_be_bytes();
2036 tree.atomic(&key, |shard| {
2037 shard.put(&key, b"a")?;
2038 shard.delete(&key)?;
2039 Ok(())
2040 })
2041 .expect("atomic");
2042
2043 assert_eq!(tree.hook.writes.load(AtomicOrdering::Relaxed), 0);
2044 assert_eq!(tree.hook.inits.load(AtomicOrdering::Relaxed), 0);
2045 }
2046
2047 #[test]
2049 fn read_value_locked_result_ok_from_write_buf() {
2050 let dir = tempdir().unwrap();
2051 let tree = open_test_tree(dir.path());
2052
2053 let key = 1u64.to_be_bytes();
2054 let payload = b"in-write-buffer-value";
2055 tree.put(&key, payload).expect("put");
2056
2057 let guard = tree.index.collector().enter();
2058 let node = tree.index.get(key.as_bytes(), &guard).expect("indexed");
2059 let disk = *node.load_disk();
2060 drop(guard);
2061
2062 let shard = &tree.engine.shards()[disk.shard_id as usize];
2063 let inner = shard.lock();
2064 let v = tree
2065 .read_value_locked_result(&disk, &inner)
2066 .expect("write-buf read must succeed");
2067 assert_eq!(v.as_bytes(), payload);
2068 }
2069
2070 #[test]
2073 fn read_value_locked_result_ok_from_disk_immutable() {
2074 let dir = tempdir().unwrap();
2075 let tree = open_test_tree(dir.path());
2076
2077 let key = 2u64.to_be_bytes();
2078 let payload = b"single-block-immutable";
2079 tree.put(&key, payload).expect("put");
2080 for i in 100u64..135 {
2084 tree.put(&i.to_be_bytes(), &[i as u8; 256])
2085 .expect("rotator");
2086 }
2087
2088 let guard = tree.index.collector().enter();
2089 let node = tree.index.get(key.as_bytes(), &guard).expect("indexed");
2090 let disk = *node.load_disk();
2091 drop(guard);
2092
2093 let shard = &tree.engine.shards()[disk.shard_id as usize];
2098 let inner = shard.lock();
2099 assert_ne!(
2102 disk.file_id, inner.active.file_id,
2103 "test setup failed: key=2 entry is still in the active file's write buffer",
2104 );
2105 let v = tree
2106 .read_value_locked_result(&disk, &inner)
2107 .expect("disk read must succeed");
2108 assert_eq!(v.as_bytes(), payload);
2109 }
2110
2111 #[test]
2113 fn read_value_locked_result_propagates_stale_disk_loc() {
2114 let dir = tempdir().unwrap();
2115 let tree = open_test_tree(dir.path());
2116
2117 let key = 3u64.to_be_bytes();
2118 let snap = put_until_compactable(&tree, key);
2119
2120 let shard = &tree.engine.shards()[snap.shard_id as usize];
2121 let _ = compact_shard(shard, &tree, 0.0).expect("compaction");
2122
2123 let inner = shard.lock();
2124 match tree.read_value_locked_result(&snap, &inner) {
2125 Err(DbError::StaleDiskLoc) => {}
2126 Ok(v) => panic!("expected StaleDiskLoc, got Ok({:?})", v.as_bytes()),
2127 Err(e) => panic!("expected StaleDiskLoc, got Err({e})"),
2128 }
2129 }
2130
2131 #[test]
2137 fn read_value_locked_returns_none_on_stale() {
2138 let dir = tempdir().unwrap();
2139 let tree = open_test_tree(dir.path());
2140
2141 let key = 4u64.to_be_bytes();
2142 let snap = put_until_compactable(&tree, key);
2143
2144 let shard = &tree.engine.shards()[snap.shard_id as usize];
2145 let _ = compact_shard(shard, &tree, 0.0).expect("compaction");
2146
2147 let inner = shard.lock();
2148 assert!(tree.read_value_locked(&snap, &inner).is_none());
2149 }
2150
2151 #[test]
2161 fn large_value_with_first_block_cached_uses_fallback() {
2162 let dir = tempdir().unwrap();
2163 let mut cfg = Config::test();
2167 cfg.shard_count = 1;
2168 cfg.max_file_size = 128 * 1024;
2169 cfg.write_buffer_size = 128 * 1024;
2170 cfg.compaction_threshold = 0.0;
2171 cfg.cache.max_size = 1 << 20;
2175 let tree: VarTree<[u8; 8]> = VarTree::open(dir.path(), cfg).expect("open");
2176
2177 let key = 42u64.to_be_bytes();
2178 let payload: Vec<u8> = (0..20_000u32).map(|i| i as u8).collect();
2180 tree.put(&key, &payload).expect("put large");
2181 tree.engine.shards()[0]
2183 .rotate_active_for_test(8)
2184 .expect("rotate");
2185
2186 let guard = tree.index.collector().enter();
2187 let node = tree.index.get(key.as_bytes(), &guard).expect("indexed");
2188 let disk = *node.load_disk();
2189 drop(guard);
2190
2191 let start = (disk.offset & 4095) as usize;
2193 assert!(
2194 start + disk.len as usize > 8192,
2195 "test precondition: large value must span >2 blocks",
2196 );
2197
2198 let block_offset = disk.offset as u64 & !4095;
2202 let cache_key = BlockKey {
2203 shard_id: disk.shard_id,
2204 file_id: disk.file_id,
2205 block_offset,
2206 };
2207 tree.cache
2208 .insert(cache_key, Arc::new(AlignedBuf::zeroed(4096)));
2209 assert!(
2211 tree.cache.get(&cache_key).is_some(),
2212 "cache must be enabled for warm-cache scenario",
2213 );
2214
2215 let v = tree
2216 .read_value_cached_inner(&disk)
2217 .expect("large value must read via locked fallback");
2218 assert_eq!(v.as_bytes(), payload.as_slice());
2219 }
2220
2221 #[test]
2223 fn extract_from_block_single_block() {
2224 let mut block = AlignedBuf::zeroed(4096);
2225 for (i, byte) in block.iter_mut().enumerate() {
2226 *byte = i as u8;
2227 }
2228 let v = VarTree::<[u8; 8]>::extract_from_block(&block, 100, 50, || {
2229 panic!("next_block must not be called for single-block reads")
2230 })
2231 .expect("ok");
2232 let expected: Vec<u8> = (100u8..150u8).collect();
2233 assert_eq!(v.as_bytes(), expected.as_slice());
2234 }
2235
2236 #[test]
2238 fn extract_from_block_two_blocks_exact() {
2239 let mut first = AlignedBuf::zeroed(4096);
2240 for byte in first.iter_mut() {
2241 *byte = 0xAA;
2242 }
2243 let mut second = AlignedBuf::zeroed(4096);
2244 for byte in second.iter_mut() {
2245 *byte = 0xBB;
2246 }
2247 let v = VarTree::<[u8; 8]>::extract_from_block(&first, 4095, 4097, || Ok(Arc::new(second)))
2250 .expect("ok");
2251 let bytes = v.as_bytes();
2252 assert_eq!(bytes.len(), 4097);
2253 assert_eq!(bytes[0], 0xAA);
2254 assert_eq!(bytes[1], 0xBB);
2255 assert_eq!(bytes[4096], 0xBB);
2256 }
2257
2258 #[test]
2260 fn extract_from_block_two_blocks_partial() {
2261 let mut first = AlignedBuf::zeroed(4096);
2262 for byte in first.iter_mut() {
2263 *byte = 0x11;
2264 }
2265 let mut second = AlignedBuf::zeroed(4096);
2266 for byte in second.iter_mut() {
2267 *byte = 0x22;
2268 }
2269 let v = VarTree::<[u8; 8]>::extract_from_block(&first, 4000, 200, || Ok(Arc::new(second)))
2271 .expect("ok");
2272 let bytes = v.as_bytes();
2273 assert_eq!(bytes.len(), 200);
2274 assert!(bytes[..96].iter().all(|b| *b == 0x11));
2275 assert!(bytes[96..].iter().all(|b| *b == 0x22));
2276 }
2277
2278 fn open_large_value_tree(dir: &std::path::Path) -> VarTree<[u8; 8]> {
2283 let mut cfg = Config::test();
2284 cfg.shard_count = 1;
2285 cfg.max_file_size = 128 * 1024;
2286 cfg.write_buffer_size = 128 * 1024;
2287 cfg.compaction_threshold = 0.0;
2288 VarTree::open(dir, cfg).expect("open large-value test tree")
2289 }
2290
2291 fn build_large_payload(seed: u8) -> Vec<u8> {
2292 (0..50_000u32)
2293 .map(|i| (i as u8).wrapping_add(seed))
2294 .collect()
2295 }
2296
2297 #[test]
2304 fn large_value_read_via_locked_fallback() {
2305 let dir = tempdir().unwrap();
2306 let tree = open_large_value_tree(dir.path());
2307
2308 let key = 100u64.to_be_bytes();
2309 let payload = build_large_payload(0);
2310 tree.put(&key, &payload).expect("put large");
2311 tree.engine.shards()[0]
2312 .rotate_active_for_test(8)
2313 .expect("rotate");
2314
2315 let guard = tree.index.collector().enter();
2316 let node = tree.index.get(key.as_bytes(), &guard).expect("indexed");
2317 let v = tree
2318 .read_value_cached(node, &guard)
2319 .expect("read must succeed via locked fallback");
2320 assert_eq!(v.as_bytes(), payload.as_slice());
2321 }
2322
2323 #[cfg(feature = "encryption")]
2324 #[test]
2325 fn large_value_read_encrypted() {
2326 let dir = tempdir().unwrap();
2327 let mut cfg = Config::test();
2328 cfg.shard_count = 1;
2329 cfg.max_file_size = 128 * 1024;
2330 cfg.write_buffer_size = 128 * 1024;
2331 cfg.compaction_threshold = 0.0;
2332 cfg.encryption_key = Some([7u8; 32]);
2333 let tree: VarTree<[u8; 8]> = VarTree::open(dir.path(), cfg).expect("open enc");
2337
2338 let key = 101u64.to_be_bytes();
2339 let payload = build_large_payload(0xAB);
2340 tree.put(&key, &payload).expect("put encrypted large");
2341 tree.engine.shards()[0]
2342 .rotate_active_for_test(8)
2343 .expect("rotate");
2344
2345 let guard = tree.index.collector().enter();
2346 let node = tree.index.get(key.as_bytes(), &guard).expect("indexed");
2347 let v = tree
2348 .read_value_cached(node, &guard)
2349 .expect("encrypted large read must succeed via pread_value_encrypted");
2350 assert_eq!(v.as_bytes(), payload.as_slice());
2351 }
2352
2353 #[test]
2359 fn large_value_stale_disk_loc_deterministic() {
2360 let dir = tempdir().unwrap();
2361 let tree = open_large_value_tree(dir.path());
2362
2363 let key = 102u64.to_be_bytes();
2364 let payload = build_large_payload(0x42);
2365 tree.put(&key, &payload).expect("first large put");
2366
2367 let snap = {
2370 let guard = tree.index.collector().enter();
2371 let node = tree.index.get(key.as_bytes(), &guard).expect("indexed");
2372 *node.load_disk()
2373 };
2374
2375 tree.engine.shards()[0]
2377 .rotate_active_for_test(8)
2378 .expect("rotate after large put");
2379
2380 for i in 1..20u8 {
2383 tree.put(&key, &[i; 256]).expect("overwrite");
2384 }
2385 tree.put(&key, b"live-after-compaction").expect("final put");
2386
2387 let shard = &tree.engine.shards()[snap.shard_id as usize];
2388 let _ = compact_shard(shard, &tree, 0.0).expect("compaction");
2389
2390 match tree.read_value_cached_inner(&snap) {
2393 Err(DbError::StaleDiskLoc) => {}
2394 Ok(v) => panic!("expected StaleDiskLoc, got Ok({:?})", v.as_bytes()),
2395 Err(e) => panic!("expected StaleDiskLoc, got Err({e})"),
2396 }
2397
2398 let guard = tree.index.collector().enter();
2400 let node = tree.index.get(key.as_bytes(), &guard).expect("indexed");
2401 let v = tree
2402 .read_value_cached(node, &guard)
2403 .expect("public path must retry and return live value");
2404 assert_eq!(v.as_bytes(), b"live-after-compaction");
2405 }
2406
2407 #[test]
2412 fn read_value_locked_result_ok_from_cache_single_block() {
2413 let dir = tempdir().unwrap();
2414 let mut cfg = Config::test();
2416 cfg.shard_count = 1;
2417 cfg.max_file_size = 8192;
2418 cfg.write_buffer_size = 8192;
2419 cfg.compaction_threshold = 0.0;
2420 cfg.cache.max_size = 1 << 20;
2421 let tree: VarTree<[u8; 8]> = VarTree::open(dir.path(), cfg).expect("open");
2422
2423 let key = 9u64.to_be_bytes();
2424 let payload = b"small-single-block-value";
2425 tree.put(&key, payload).expect("put");
2426 for i in 100u64..135 {
2430 tree.put(&i.to_be_bytes(), &[i as u8; 256])
2431 .expect("rotator");
2432 }
2433
2434 let guard = tree.index.collector().enter();
2435 let node = tree.index.get(key.as_bytes(), &guard).expect("indexed");
2436 let disk = *node.load_disk();
2437 drop(guard);
2438
2439 let start = (disk.offset & 4095) as usize;
2442 let len = disk.len as usize;
2443 assert!(
2444 start + len <= 4096,
2445 "test precondition: value must fit in a single block",
2446 );
2447
2448 let block_offset = disk.offset as u64 & !4095;
2452 let cache_key = BlockKey {
2453 shard_id: disk.shard_id,
2454 file_id: disk.file_id,
2455 block_offset,
2456 };
2457 let block = tree
2458 .get_or_read_block(disk.shard_id, disk.file_id, block_offset)
2459 .expect("read block");
2460 tree.cache.insert(cache_key, block);
2461 assert!(
2462 tree.cache.get(&cache_key).is_some(),
2463 "cache must contain the block for step 2 to fire",
2464 );
2465
2466 let shard = &tree.engine.shards()[disk.shard_id as usize];
2467 let inner = shard.lock();
2468 assert_ne!(
2469 disk.file_id, inner.active.file_id,
2470 "test setup failed: key entry is still in the active write buffer",
2471 );
2472
2473 let v = tree
2474 .read_value_locked_result(&disk, &inner)
2475 .expect("cache-hit read must succeed");
2476 assert_eq!(v.as_bytes(), payload);
2477 }
2478
2479 #[test]
2490 fn var_tree_get_with_file_id_above_u16() {
2491 let dir = tempdir().unwrap();
2492 let mut cfg = Config::test();
2493 cfg.shard_count = 1;
2494 cfg.max_file_size = 128 * 1024;
2495 cfg.write_buffer_size = 128 * 1024;
2496 cfg.compaction_threshold = 0.0;
2497 cfg.cache.max_size = 1 << 20;
2498 let tree: VarTree<[u8; 8]> = VarTree::open(dir.path(), cfg).expect("open");
2499
2500 let shard = &tree.engine.shards()[0];
2501
2502 shard.set_next_file_id(70_000);
2505 shard.rotate_active_for_test(8).expect("first rotate");
2506 assert!(
2507 shard.active_file_id() >= 70_000,
2508 "active_file_id should be >= 70_000 after rotation"
2509 );
2510
2511 let key = 42u64.to_be_bytes();
2512 let value = vec![0xC3u8; 512];
2513 tree.put(&key, &value).expect("put");
2514
2515 {
2517 let guard = tree.index.collector().enter();
2518 let node = tree.index.get(key.as_bytes(), &guard).expect("indexed");
2519 let disk = *node.load_disk();
2520 assert!(
2521 disk.file_id > u16::MAX as u32,
2522 "DiskLoc.file_id must be above u16::MAX, got {}",
2523 disk.file_id,
2524 );
2525 }
2526
2527 shard.flush().expect("flush");
2530 shard.rotate_active_for_test(8).expect("second rotate");
2531
2532 let got = tree.get(&key).expect("get must return Some");
2533 assert_eq!(got.as_bytes(), value.as_slice());
2534 }
2535
2536 #[test]
2549 fn recovery_handles_file_id_above_u16() {
2550 let dir = tempdir().unwrap();
2551
2552 let mut cfg = Config::test();
2553 cfg.shard_count = 1;
2554 cfg.max_file_size = 128 * 1024;
2555 cfg.write_buffer_size = 128 * 1024;
2556 cfg.compaction_threshold = 0.0;
2557 cfg.cache.max_size = 1 << 20;
2558
2559 {
2561 let tree: VarTree<[u8; 8]> = VarTree::open(dir.path(), cfg.clone()).expect("open A");
2562 let shard = &tree.engine.shards()[0];
2563
2564 shard.set_next_file_id(70_000);
2566 shard
2567 .rotate_active_for_test(8)
2568 .expect("rotate to file_id 70_000");
2569 assert!(
2570 shard.active_file_id() >= 70_000,
2571 "active_file_id should be >= 70_000 after rotation"
2572 );
2573
2574 for i in 0u64..4 {
2575 let key = i.to_be_bytes();
2576 let value = vec![i as u8; 200];
2577 tree.put(&key, &value).expect("put phase A");
2578 }
2579
2580 tree.close().expect("close phase A");
2581 }
2582
2583 {
2585 let tree: VarTree<[u8; 8]> = VarTree::open(dir.path(), cfg).expect("open B");
2586
2587 assert_eq!(tree.len(), 4, "all 4 entries must survive recovery");
2588
2589 for i in 0u64..4 {
2590 let key = i.to_be_bytes();
2591 let expected = vec![i as u8; 200];
2592 let got = tree
2593 .get(&key)
2594 .unwrap_or_else(|| panic!("key {i} not found after recovery"));
2595 assert_eq!(
2596 got.as_bytes(),
2597 expected.as_slice(),
2598 "value mismatch for key {i} after recovery"
2599 );
2600 }
2601
2602 let shard = &tree.engine.shards()[0];
2605 let max_fid = shard.file_ids().into_iter().max().expect("non-empty");
2606 assert!(
2607 max_fid > u16::MAX as u32,
2608 "max file_id should exceed u16::MAX after recovery (got {})",
2609 max_fid
2610 );
2611 }
2612 }
2613}