1#![forbid(unsafe_code)]
2use std::sync::Arc;
19use std::time::Instant;
20
21use bytes::Bytes;
22use wombatkv_store::wal_store::{ObjectStore, WalStoreError};
23
24use crate::compression::{
25 decode_if_compressed, encode_with_header, BlockCompressionConfig, CompressAlgo,
26};
27use crate::embed_metrics::{metrics, Op};
28use crate::foyer_cache::{FoyerCacheConfig, FoyerCacheError, FoyerHitTier, FoyerHybridCache};
29
30const DEFAULT_S3_PREFIX: &str = "kv";
31
32const BOOTSTRAP_KEY_LIMIT_WARN: usize = 100_000;
37
38const BOOTSTRAP_KEY_LIMIT_HARD: usize = 1_000_000;
43
44#[derive(Clone, Debug, PartialEq, Eq)]
46pub struct EmbedConfig {
47 pub s3_prefix: String,
49 pub foyer: FoyerCacheConfig,
51 pub write_through_s3: bool,
54 pub compression: BlockCompressionConfig,
61}
62
63impl Default for EmbedConfig {
64 fn default() -> Self {
65 Self {
66 s3_prefix: DEFAULT_S3_PREFIX.to_string(),
67 foyer: FoyerCacheConfig::default(),
68 write_through_s3: true,
69 compression: BlockCompressionConfig::default(),
70 }
71 }
72}
73
74#[derive(Debug)]
76pub enum EmbedError {
77 Foyer(FoyerCacheError),
78 ObjectStore(WalStoreError),
79 InvalidConfig(String),
80}
81
82impl std::fmt::Display for EmbedError {
83 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
84 match self {
85 Self::Foyer(err) => write!(f, "WombatKV puffer error: {err}"),
86 Self::ObjectStore(err) => write!(f, "object store error: {err:?}"),
87 Self::InvalidConfig(msg) => write!(f, "invalid config: {msg}"),
88 }
89 }
90}
91
92impl std::error::Error for EmbedError {}
93
94impl From<FoyerCacheError> for EmbedError {
95 fn from(value: FoyerCacheError) -> Self {
96 Self::Foyer(value)
97 }
98}
99
100impl From<WalStoreError> for EmbedError {
101 fn from(value: WalStoreError) -> Self {
102 Self::ObjectStore(value)
103 }
104}
105
106#[derive(Debug, Clone)]
108pub enum HitTier {
109 Foyer,
110 ObjectStore,
111}
112
113#[derive(Debug, Clone)]
115pub enum GetOutcome {
116 Hit { tier: HitTier, payload: Bytes },
117 Miss,
118}
119
120pub struct WombatKVKvStore<S: ObjectStore> {
127 foyer: Arc<FoyerHybridCache>,
128 flat: Arc<crate::kv_blob_cache::FlatFileKvBlobCache>,
134 object_store: S,
135 s3_prefix: String,
136 write_through_s3: bool,
137 compression: BlockCompressionConfig,
140 metadata_index: Arc<wombatkv_radix::InMemoryMetadataIndex>,
144}
145
146impl<S: ObjectStore> WombatKVKvStore<S> {
147 pub fn new(config: EmbedConfig, object_store: S) -> Result<Self, EmbedError> {
151 if config.s3_prefix.is_empty() {
152 return Err(EmbedError::InvalidConfig("s3_prefix must be non-empty".to_string()));
153 }
154 let flat_root = config.foyer.ssd_dir.join("_flat");
155 let flat = Arc::new(
156 crate::kv_blob_cache::FlatFileKvBlobCache::open(flat_root)
157 .map_err(|err| EmbedError::InvalidConfig(format!("flat cache open: {err}")))?,
158 );
159 let foyer = FoyerHybridCache::open(config.foyer)?;
160 Ok(Self {
161 foyer,
162 flat,
163 object_store,
164 s3_prefix: config.s3_prefix,
165 write_through_s3: config.write_through_s3,
166 compression: config.compression,
167 metadata_index: Arc::new(wombatkv_radix::InMemoryMetadataIndex::new()),
168 })
169 }
170
171 pub fn with_foyer(
174 foyer: Arc<FoyerHybridCache>,
175 object_store: S,
176 s3_prefix: impl Into<String>,
177 write_through_s3: bool,
178 ) -> Result<Self, EmbedError> {
179 let s3_prefix = s3_prefix.into();
180 if s3_prefix.is_empty() {
181 return Err(EmbedError::InvalidConfig("s3_prefix must be non-empty".to_string()));
182 }
183 let flat_root = foyer.ssd_dir().join("_flat");
186 let flat = Arc::new(
187 crate::kv_blob_cache::FlatFileKvBlobCache::open(flat_root)
188 .map_err(|err| EmbedError::InvalidConfig(format!("flat cache open: {err}")))?,
189 );
190 Ok(Self {
191 foyer,
192 flat,
193 object_store,
194 s3_prefix,
195 write_through_s3,
196 compression: BlockCompressionConfig::from_env(),
197 metadata_index: Arc::new(wombatkv_radix::InMemoryMetadataIndex::new()),
198 })
199 }
200
201 #[must_use]
204 pub fn compression(&self) -> BlockCompressionConfig {
205 self.compression
206 }
207
208 pub fn metadata_index(&self) -> Arc<wombatkv_radix::InMemoryMetadataIndex> {
212 self.metadata_index.clone()
213 }
214
215 pub fn bootstrap_world_knowledge(&self, namespace: &str) -> Result<usize, EmbedError> {
238 use wombatkv_radix::MetadataIndex;
239 let started = Instant::now();
240 let prefix = if namespace.is_empty() {
241 format!("{}/", self.s3_prefix)
242 } else {
243 format!("{}/{}/", self.s3_prefix, namespace)
244 };
245 let keys = self.object_store.list_prefix(&prefix).map_err(EmbedError::from)?;
246 let key_count = keys.len();
247 #[cfg(feature = "dst")]
252 wombatkv_dst::assert_always(
253 key_count <= BOOTSTRAP_KEY_LIMIT_HARD.saturating_mul(2),
254 "bootstrap key_count within 2× hard limit",
255 format!("got {key_count} keys, hard limit {BOOTSTRAP_KEY_LIMIT_HARD}"),
256 );
257 #[cfg(feature = "dst")]
260 wombatkv_dst::assert_sometimes(
261 key_count == 0,
262 "bootstrap saw empty namespace",
263 "DST coverage gate, exercises the empty-bucket cold path",
264 );
265 if key_count >= BOOTSTRAP_KEY_LIMIT_HARD {
266 eprintln!(
267 "[MyelonInstr] {{\"scope\":\"wmbt_kv_warn\",\"fn\":\"bootstrap_world_knowledge\",\
268 \"event\":\"key_count_exceeded_hard_limit\",\"keys\":{key_count},\
269 \"limit\":{BOOTSTRAP_KEY_LIMIT_HARD},\"prefix\":\"{prefix}\"}}"
270 );
271 } else if key_count >= BOOTSTRAP_KEY_LIMIT_WARN {
272 eprintln!(
273 "[MyelonInstr] {{\"scope\":\"wmbt_kv_warn\",\"fn\":\"bootstrap_world_knowledge\",\
274 \"event\":\"key_count_high\",\"keys\":{key_count},\
275 \"warn_at\":{BOOTSTRAP_KEY_LIMIT_WARN},\"prefix\":\"{prefix}\"}}"
276 );
277 }
278 let mut blocks_loaded: usize = 0;
291 let mut skipped_unrecognized: usize = 0;
292 let mut sidecars_prewarmed: usize = 0;
293 let mut sidecar_bytes_total: usize = 0;
294 let mut sidecar_keys: Vec<&String> = Vec::new();
295
296 let block_key_infix = format!("/{}", wombatkv_radix::BLOCK_KEY_PREFIX);
300 let sidecar_key_infix = format!("/{}", wombatkv_radix::SIDECAR_RAW_TAIL_KEY_PREFIX);
301
302 for key in &keys {
303 if let Some(idx) = key.find(&block_key_infix) {
304 let hex = &key[idx + block_key_infix.len()..];
305 if hex.len() == 64 {
306 let mut hash = [0u8; 32];
307 if decode_hex32(hex, &mut hash) {
308 let meta = wombatkv_radix::BlockMeta::new_root(0, [0u8; 24], [0u8; 16]);
309 self.metadata_index.insert(hash, meta);
310 blocks_loaded += 1;
311 }
312 }
313 continue;
314 }
315 if key.contains(&sidecar_key_infix) {
316 sidecar_keys.push(key);
317 continue;
318 }
319 skipped_unrecognized += 1;
320 }
321 for full_key in &sidecar_keys {
327 let rel_idx = full_key.find("/wombatkv/v1/").map_or(0, |i| i + 1);
328 let rel_key = full_key[rel_idx..].to_string();
329 match self.get_kv(namespace, &rel_key) {
330 Ok(GetOutcome::Hit { payload, .. }) => {
331 sidecar_bytes_total = sidecar_bytes_total.saturating_add(payload.len());
332 sidecars_prewarmed += 1;
333 }
334 Ok(GetOutcome::Miss) | Err(_) => {
335 }
338 }
339 }
340 let elapsed_ms = started.elapsed().as_millis();
341 eprintln!(
342 "[MyelonInstr] {{\"scope\":\"wmbt_kv_timing\",\"fn\":\"bootstrap_world_knowledge\",\
343 \"stages\":{{\"total_ms\":{elapsed_ms},\"blocks_indexed\":{blocks_loaded},\
344 \"sidecars_prewarmed\":{sidecars_prewarmed},\
345 \"sidecar_bytes_total\":{sidecar_bytes_total},\
346 \"unrecognized_keys\":{skipped_unrecognized},\
347 \"namespace\":\"{namespace}\"}}}}"
348 );
349 Ok(blocks_loaded)
350 }
351
352 pub fn bootstrap_from_slatedb(
368 &self,
369 slatedb_index: &wombatkv_radix::SlateDbMetadataIndex,
370 ) -> Result<usize, EmbedError> {
371 use wombatkv_radix::MetadataIndex;
372 let started = Instant::now();
373 let entries = slatedb_index.entries();
374 let count = entries.len();
375 self.metadata_index.bulk_load(entries);
376 let elapsed_ms = started.elapsed().as_millis();
377 eprintln!(
378 "[MyelonInstr] {{\"scope\":\"wmbt_kv_timing\",\"fn\":\"bootstrap_from_slatedb\",\
379 \"stages\":{{\"total_ms\":{elapsed_ms},\"blocks_loaded\":{count}}}}}"
380 );
381 Ok(count)
382 }
383
384 #[must_use]
388 pub fn object_key(&self, namespace: &str, key: &str) -> String {
389 if namespace.is_empty() {
390 format!("{}/{}", self.s3_prefix, key)
391 } else {
392 format!("{}/{}/{}", self.s3_prefix, namespace, key)
393 }
394 }
395
396 fn cache_key(&self, namespace: &str, key: &str) -> String {
399 self.object_key(namespace, key)
400 }
401
402 pub fn put_kv(&self, namespace: &str, key: &str, payload: Bytes) -> Result<(), EmbedError> {
408 #[cfg(feature = "dst")]
424 {
425 if wombatkv_dst::dst_buggify!() {
426 return Err(EmbedError::InvalidConfig(
427 "dst buggify: simulated put_kv S3 PUT failure".to_string(),
428 ));
429 }
430 if wombatkv_dst::dst_plan::is_put_suppressed() {
431 return Err(EmbedError::InvalidConfig(
432 "dst_plan: scheduled put-suppressing fault for current op".to_string(),
433 ));
434 }
435 }
436 let started = Instant::now();
437 let cache_key = self.cache_key(namespace, key);
438 let object_key = self.object_key(namespace, key);
439 let bytes_len = payload.len() as u64;
440
441 crate::kv_blob_cache::KvBlobCache::put(self.flat.as_ref(), &cache_key, payload.clone());
445 self.foyer.put(&cache_key, payload.clone());
446
447 let on_wire = encode_for_storage(&payload, self.compression, key, "put_kv")?;
449 let result = if self.write_through_s3 {
450 self.object_store.put_object(&object_key, &on_wire).map_err(EmbedError::from)
451 } else {
452 if let Err(err) = self.object_store.put_object(&object_key, &on_wire) {
453 eprintln!("wombatkv: best-effort S3 PUT failed for {object_key}: {err:?}");
454 }
455 Ok(())
456 };
457
458 let elapsed_us = u64::try_from(started.elapsed().as_micros()).unwrap_or(u64::MAX);
459 metrics().observe(Op::Stash, elapsed_us, bytes_len);
460 emit_timing(
464 "WombatKVKvStore.put_kv",
465 if self.write_through_s3 { "s3_write_through" } else { "s3_best_effort" },
466 &[("total_us", elapsed_us), ("payload_bytes", bytes_len)],
467 );
468 result
469 }
470
471 pub fn put_kv_async_s3(this: Arc<Self>, namespace: &str, key: &str, payload: Bytes) {
493 let cache_key = this.cache_key(namespace, key);
494 let object_key = this.object_key(namespace, key);
495 let bytes_len = payload.len() as u64;
496
497 crate::kv_blob_cache::KvBlobCache::put(this.flat.as_ref(), &cache_key, payload.clone());
500 this.foyer.put(&cache_key, payload.clone());
501
502 let compression_cfg = this.compression;
503 let object_store = this.object_store.clone();
504 let object_key_owned = object_key;
505 let key_owned = key.to_string();
506 match std::thread::Builder::new()
507 .name(format!("wombatkv-embed-async-s3-{bytes_len}"))
508 .spawn(move || {
509 let on_wire = match encode_for_storage(
514 &payload,
515 compression_cfg,
516 &key_owned,
517 "put_kv_async_s3",
518 ) {
519 Ok(buf) => buf,
520 Err(err) => {
521 eprintln!("wombatkv[embed-async-s3]: compress {object_key_owned}: {err}");
522 return;
523 }
524 };
525 if let Err(err) = object_store.put_object(&object_key_owned, &on_wire) {
526 eprintln!("wombatkv[embed-async-s3]: put_object {object_key_owned}: {err:?}");
527 }
528 }) {
529 Ok(_) => {}
530 Err(err) => {
531 eprintln!(
532 "wombatkv[embed-async-s3]: spawn failed (foyer has the bytes, S3 write skipped): {err}"
533 );
534 }
535 }
536 }
537
538 pub fn get_kv(&self, namespace: &str, key: &str) -> Result<GetOutcome, EmbedError> {
547 let started = Instant::now();
548 let cache_key = self.cache_key(namespace, key);
549 let t_cache_key = started.elapsed().as_micros() as u64;
550
551 if let Some((payload, op_label)) =
555 crate::kv_blob_cache::KvBlobCache::get(self.flat.as_ref(), &cache_key)
556 {
557 let elapsed_us = u64::try_from(started.elapsed().as_micros()).unwrap_or(u64::MAX);
558 let bytes_len = payload.len() as u64;
559 metrics().observe(Op::LoadFoyerRam, elapsed_us, bytes_len);
563 emit_tier_event(op_label, "flat", key, bytes_len, elapsed_us);
564 emit_timing(
565 "WombatKVKvStore.get_kv",
566 "flat_hit",
567 &[
568 ("cache_key_us", t_cache_key),
569 ("flat_call_us", elapsed_us - t_cache_key),
570 ("total_us", elapsed_us),
571 ("payload_bytes", bytes_len),
572 ],
573 );
574 return Ok(GetOutcome::Hit { tier: HitTier::Foyer, payload });
575 }
576 let t_flat_done = started.elapsed().as_micros() as u64;
577 let foyer_result = self.foyer.get_with_tier(&cache_key);
578 let t_foyer_done = started.elapsed().as_micros() as u64;
579 if let Some((payload, tier)) = foyer_result {
580 let bytes_len = payload.len() as u64;
581 let elapsed_us = u64::try_from(started.elapsed().as_micros()).unwrap_or(u64::MAX);
582 let op = match tier {
583 FoyerHitTier::Ram => Op::LoadFoyerRam,
584 FoyerHitTier::Ssd => Op::LoadFoyerSsd,
585 };
586 metrics().observe(op, elapsed_us, bytes_len);
587 emit_tier_event(op.as_str(), tier.as_str(), key, bytes_len, elapsed_us);
588 crate::kv_blob_cache::KvBlobCache::put(self.flat.as_ref(), &cache_key, payload.clone());
591 emit_timing(
592 "WombatKVKvStore.get_kv",
593 match tier {
594 FoyerHitTier::Ram => "foyer_ram_hit",
595 FoyerHitTier::Ssd => "foyer_ssd_hit",
596 },
597 &[
598 ("cache_key_us", t_cache_key),
599 ("flat_miss_us", t_flat_done - t_cache_key),
600 ("foyer_call_us", t_foyer_done - t_flat_done),
601 ("total_us", elapsed_us),
602 ("payload_bytes", bytes_len),
603 ],
604 );
605 return Ok(GetOutcome::Hit { tier: HitTier::Foyer, payload });
606 }
607
608 let object_key = self.object_key(namespace, key);
609 let t_obj_key = started.elapsed().as_micros() as u64;
610 match self.object_store.get_object(&object_key) {
611 Ok(payload) => {
612 let t_s3_done = started.elapsed().as_micros() as u64;
613 let on_wire_bytes = payload.len() as u64;
620 let bytes = match decode_if_compressed(&payload) {
621 std::borrow::Cow::Borrowed(_) => Bytes::from(payload),
622 std::borrow::Cow::Owned(decoded) => Bytes::from(decoded),
623 };
624 let uncompressed_bytes_len = bytes.len() as u64;
625 let t_from_vec = started.elapsed().as_micros() as u64;
626 crate::kv_blob_cache::KvBlobCache::put(
628 self.flat.as_ref(),
629 &cache_key,
630 bytes.clone(),
631 );
632 self.foyer.put(&cache_key, bytes.clone());
633 let t_foyer_put = started.elapsed().as_micros() as u64;
634 let elapsed_us = u64::try_from(started.elapsed().as_micros()).unwrap_or(u64::MAX);
635 metrics().observe(Op::LoadS3, elapsed_us, uncompressed_bytes_len);
636 emit_tier_event("load_s3", "s3", key, uncompressed_bytes_len, elapsed_us);
637 emit_timing(
638 "WombatKVKvStore.get_kv",
639 "s3_hit",
640 &[
641 ("flat_miss_us", t_flat_done - t_cache_key),
642 ("foyer_miss_us", t_foyer_done - t_flat_done),
643 ("obj_key_us", t_obj_key - t_foyer_done),
644 ("s3_get_us", t_s3_done - t_obj_key),
645 ("bytes_wrap_us", t_from_vec - t_s3_done),
646 ("cache_put_us", t_foyer_put - t_from_vec),
647 ("total_us", elapsed_us),
648 ("payload_bytes", uncompressed_bytes_len),
649 ("on_wire_bytes", on_wire_bytes),
650 ],
651 );
652 Ok(GetOutcome::Hit { tier: HitTier::ObjectStore, payload: bytes })
653 }
654 Err(WalStoreError::ObjectNotFound(_)) => {
655 let elapsed_us = u64::try_from(started.elapsed().as_micros()).unwrap_or(u64::MAX);
656 metrics().observe(Op::Miss, elapsed_us, 0);
657 emit_tier_event("miss", "miss", key, 0, elapsed_us);
658 emit_timing("WombatKVKvStore.get_kv", "miss", &[("total_us", elapsed_us)]);
659 Ok(GetOutcome::Miss)
660 }
661 Err(other) => Err(EmbedError::ObjectStore(other)),
662 }
663 }
664
665 pub fn exists_kv(&self, namespace: &str, key: &str) -> Result<bool, EmbedError> {
672 let cache_key = self.cache_key(namespace, key);
673 if self.foyer.contains(&cache_key) {
674 return Ok(true);
675 }
676
677 let object_key = self.object_key(namespace, key);
678 self.object_store.head_object(&object_key).map_err(EmbedError::ObjectStore)
679 }
680
681 pub fn list_namespace(&self, namespace: &str) -> Result<Vec<String>, EmbedError> {
683 let prefix = if namespace.is_empty() {
684 format!("{}/", self.s3_prefix)
685 } else {
686 format!("{}/{}/", self.s3_prefix, namespace)
687 };
688 Ok(self.object_store.list_prefix(&prefix)?)
689 }
690
691 pub fn list_kv_keys(&self, namespace: &str) -> Result<Vec<String>, EmbedError> {
693 let prefix = if namespace.is_empty() {
694 format!("{}/", self.s3_prefix)
695 } else {
696 format!("{}/{}/", self.s3_prefix, namespace)
697 };
698 let mut keys = Vec::new();
699 for object_key in self.object_store.list_prefix(&prefix)? {
700 if let Some(key) = object_key.strip_prefix(&prefix) {
701 keys.push(key.to_string());
702 }
703 }
704 Ok(keys)
705 }
706
707 pub fn restore_from_s3(&self, namespace: &str) -> Result<usize, EmbedError> {
711 let started = Instant::now();
712 let object_keys = self.list_namespace(namespace)?;
713 let mut restored = 0_usize;
714 let mut bytes_total: u64 = 0;
715 for object_key in object_keys {
716 let payload = match self.object_store.get_object(&object_key) {
717 Ok(value) => value,
718 Err(WalStoreError::ObjectNotFound(_)) => continue,
719 Err(other) => return Err(EmbedError::ObjectStore(other)),
720 };
721 let bytes = match decode_if_compressed(&payload) {
725 std::borrow::Cow::Borrowed(_) => Bytes::from(payload),
726 std::borrow::Cow::Owned(decoded) => Bytes::from(decoded),
727 };
728 bytes_total = bytes_total.saturating_add(bytes.len() as u64);
729 self.foyer.put(&object_key, bytes);
730 restored += 1;
731 }
732 let elapsed_us = u64::try_from(started.elapsed().as_micros()).unwrap_or(u64::MAX);
733 metrics().observe(Op::RestoreFromS3, elapsed_us, bytes_total);
734 Ok(restored)
735 }
736
737 pub fn delete_kv(&self, namespace: &str, key: &str) -> Result<bool, EmbedError> {
749 let cache_key = self.cache_key(namespace, key);
750 let object_key = self.object_key(namespace, key);
751
752 let _ = crate::kv_blob_cache::KvBlobCache::remove(self.flat.as_ref(), &cache_key);
756
757 let deleted = self.object_store.delete_object(&object_key)?;
758 Ok(deleted)
759 }
760
761 pub fn clear_foyer(&self) {
763 self.foyer.clear();
764 }
765
766 pub fn clear_flat_cache(&self) {
773 crate::kv_blob_cache::KvBlobCache::clear(self.flat.as_ref());
774 }
775
776 #[must_use]
778 pub fn foyer(&self) -> &Arc<FoyerHybridCache> {
779 &self.foyer
780 }
781
782 #[must_use]
784 pub fn object_store(&self) -> &S {
785 &self.object_store
786 }
787
788 #[must_use]
817 pub fn start_eviction_worker(
818 self: &Arc<Self>,
819 config: crate::lru::LruConfig,
820 slatedb: Option<Arc<wombatkv_radix::SlateDbMetadataIndex>>,
821 ) -> crate::lru::LruEvictionWorker {
822 let deleter: Arc<dyn crate::lru::EvictionDeleter> =
823 Arc::new(KvStoreEvictionDeleter::new(self.clone()));
824 let emit = crate::lru::default_emit(config.namespace.clone());
825 crate::lru::spawn_worker(self.metadata_index.clone(), slatedb, deleter, config, emit)
826 }
827
828 #[must_use]
829 pub fn start_prefetcher(
830 self: &Arc<Self>,
831 config: crate::block_prefetch::PrefetchConfig,
832 ) -> crate::block_prefetch::PrefetchWorker {
833 let index: Arc<dyn wombatkv_radix::MetadataIndex> = self.metadata_index.clone();
834 if crate::block_prefetch::dry_run_enabled() {
835 eprintln!("wombatkv[prefetch]: WMBT_KV_PREFETCH_DRY_RUN=1 → v1 log-only path");
836 return crate::block_prefetch::spawn_worker(
837 index,
838 config,
839 crate::block_prefetch::default_emit(),
840 );
841 }
842 let fetcher: Arc<dyn crate::block_prefetch::PrefetchFetcher> =
843 Arc::new(KvStorePrefetchFetcher::new(self.clone()));
844 crate::block_prefetch::spawn_worker_v2(
845 index,
846 config,
847 fetcher,
848 crate::block_prefetch::default_v2_emit(),
849 )
850 }
851}
852
853struct KvStoreEvictionDeleter<S: ObjectStore> {
859 store: Arc<WombatKVKvStore<S>>,
860}
861
862impl<S: ObjectStore> KvStoreEvictionDeleter<S> {
863 fn new(store: Arc<WombatKVKvStore<S>>) -> Self {
864 Self { store }
865 }
866}
867
868impl<S: ObjectStore> crate::lru::EvictionDeleter for KvStoreEvictionDeleter<S> {
869 fn delete_block(&self, namespace: &str, key: &str) -> Result<bool, String> {
870 self.store.delete_kv(namespace, key).map_err(|err| format!("{err}"))
871 }
872}
873
874struct KvStorePrefetchFetcher<S: ObjectStore> {
880 store: Arc<WombatKVKvStore<S>>,
881}
882
883impl<S: ObjectStore> KvStorePrefetchFetcher<S> {
884 fn new(store: Arc<WombatKVKvStore<S>>) -> Self {
885 Self { store }
886 }
887}
888
889impl<S: ObjectStore> crate::block_prefetch::PrefetchFetcher for KvStorePrefetchFetcher<S> {
890 fn contains_flat(&self, namespace: &str, key: &str) -> bool {
891 let cache_key = self.store.cache_key(namespace, key);
892 crate::kv_blob_cache::KvBlobCache::contains(self.store.flat.as_ref(), &cache_key)
893 }
894
895 fn fetch_block(&self, namespace: &str, key: &str) -> Result<Option<u64>, String> {
896 match self.store.get_kv(namespace, key) {
897 Ok(GetOutcome::Hit { payload, .. }) => Ok(Some(payload.len() as u64)),
898 Ok(GetOutcome::Miss) => Ok(None),
899 Err(err) => Err(format!("{err}")),
900 }
901 }
902}
903
904fn emit_tier_event(op: &str, tier: &str, key: &str, bytes: u64, elapsed_us: u64) {
919 if !tier_events_enabled() {
920 return;
921 }
922 let key_hash: String = key.chars().take(16).collect();
923 eprintln!(
924 "[MyelonInstr] {{\"scope\":\"wombatkv_tier\",\"op\":\"{op}\",\"tier\":\"{tier}\",\"key_hash\":\"{key_hash}\",\"bytes\":{bytes},\"elapsed_us\":{elapsed_us}}}"
925 );
926}
927
928fn encode_for_storage(
947 payload: &[u8],
948 cfg: BlockCompressionConfig,
949 key: &str,
950 func: &'static str,
951) -> Result<Vec<u8>, EmbedError> {
952 if !cfg.is_enabled() {
953 return Ok(payload.to_vec());
956 }
957 let started = Instant::now();
958 let encoded = encode_with_header(payload, cfg)
959 .map_err(|err| EmbedError::InvalidConfig(format!("compress {key}: {err}")))?;
960 let elapsed_us = u64::try_from(started.elapsed().as_micros()).unwrap_or(u64::MAX);
961 let uncompressed = payload.len() as u64;
962 let compressed = encoded.len() as u64;
963 let ratio_bps = compressed.saturating_mul(10_000).checked_div(uncompressed).unwrap_or(0);
964 if timing_enabled() {
965 let algo = match cfg.algo {
968 CompressAlgo::Zstd => "zstd",
969 CompressAlgo::Lz4 => "lz4",
970 CompressAlgo::None => "none",
971 };
972 eprintln!(
973 "[MyelonInstr] {{\"scope\":\"wmbt_kv_compress\",\"fn\":\"{func}\",\
974 \"algo\":\"{algo}\",\"level\":{level},\"stages\":{{\
975 \"compress_us\":{elapsed_us},\
976 \"uncompressed_bytes\":{uncompressed},\
977 \"compressed_bytes\":{compressed},\
978 \"ratio_bps\":{ratio_bps}}}}}",
979 level = cfg.level,
980 );
981 }
982 Ok(encoded)
983}
984
985pub fn emit_timing(func: &str, path: &str, stages: &[(&str, u64)]) {
986 for (name, us) in stages {
994 let tag = format!("{func}:{path}:{name}");
995 crate::latency_histogram::record_global(&tag, *us);
996 }
997
998 if !timing_enabled() {
999 return;
1000 }
1001 let mut buf = String::with_capacity(160);
1002 buf.push_str("[MyelonInstr] {\"scope\":\"wmbt_kv_timing\",\"fn\":\"");
1003 buf.push_str(func);
1004 buf.push_str("\",\"path\":\"");
1005 buf.push_str(path);
1006 buf.push_str("\",\"stages\":{");
1007 for (i, (name, us)) in stages.iter().enumerate() {
1008 if i > 0 {
1009 buf.push(',');
1010 }
1011 buf.push('"');
1012 buf.push_str(name);
1013 buf.push_str("\":");
1014 buf.push_str(&us.to_string());
1015 }
1016 buf.push_str("}}");
1017 eprintln!("{buf}");
1018}
1019
1020fn timing_enabled() -> bool {
1021 static ENABLED: once_cell::sync::OnceCell<bool> = once_cell::sync::OnceCell::new();
1022 *ENABLED.get_or_init(|| {
1023 std::env::var("WMBT_KV_TIMING")
1024 .is_ok_and(|v| matches!(v.as_str(), "1" | "true" | "TRUE" | "stderr"))
1025 })
1026}
1027
1028fn tier_events_enabled() -> bool {
1029 static ENABLED: once_cell::sync::OnceCell<bool> = once_cell::sync::OnceCell::new();
1030 *ENABLED.get_or_init(|| {
1031 std::env::var("WMBT_KV_TIER_EVENTS")
1032 .is_ok_and(|v| matches!(v.as_str(), "1" | "true" | "TRUE" | "stderr"))
1033 })
1034}
1035
1036fn decode_hex32(hex: &str, out: &mut [u8; 32]) -> bool {
1041 if hex.len() != 64 {
1042 return false;
1043 }
1044 let bytes = hex.as_bytes();
1045 for i in 0..32 {
1046 let hi = match bytes[2 * i] {
1047 b'0'..=b'9' => bytes[2 * i] - b'0',
1048 b'a'..=b'f' => bytes[2 * i] - b'a' + 10,
1049 b'A'..=b'F' => bytes[2 * i] - b'A' + 10,
1050 _ => return false,
1051 };
1052 let lo = match bytes[2 * i + 1] {
1053 b'0'..=b'9' => bytes[2 * i + 1] - b'0',
1054 b'a'..=b'f' => bytes[2 * i + 1] - b'a' + 10,
1055 b'A'..=b'F' => bytes[2 * i + 1] - b'A' + 10,
1056 _ => return false,
1057 };
1058 out[i] = (hi << 4) | lo;
1059 }
1060 true
1061}
1062
1063#[cfg(test)]
1064mod tests {
1065 use super::{EmbedConfig, GetOutcome, HitTier, WombatKVKvStore};
1066 use crate::compression::BlockCompressionConfig;
1067 use crate::foyer_cache::FoyerCacheConfig;
1068 use bytes::Bytes;
1069 use tempfile::tempdir;
1070 use wombatkv_store::wal_store::InMemoryObjectStore;
1071
1072 fn small_foyer(dir: std::path::PathBuf) -> FoyerCacheConfig {
1073 FoyerCacheConfig {
1074 ram_bytes: 8 * 1024 * 1024,
1075 ssd_dir: dir,
1076 ssd_bytes: 32 * 1024 * 1024,
1077 block_size: 1024 * 1024,
1078 buffer_pool_size: 4 * 1024 * 1024,
1079 iouring: false,
1080 }
1081 }
1082
1083 fn build_store(dir: std::path::PathBuf) -> WombatKVKvStore<InMemoryObjectStore> {
1084 let cfg = EmbedConfig {
1085 s3_prefix: "test/kv".to_string(),
1086 foyer: small_foyer(dir),
1087 write_through_s3: true,
1088 compression: BlockCompressionConfig::default(),
1089 };
1090 WombatKVKvStore::new(cfg, InMemoryObjectStore::default()).expect("build store")
1091 }
1092
1093 fn build_store_with_compression(
1094 dir: std::path::PathBuf,
1095 ) -> WombatKVKvStore<InMemoryObjectStore> {
1096 let cfg = EmbedConfig {
1097 s3_prefix: "test/kv".to_string(),
1098 foyer: small_foyer(dir),
1099 write_through_s3: true,
1100 compression: BlockCompressionConfig {
1101 algo: crate::compression::CompressAlgo::Zstd,
1102 level: 3,
1103 },
1104 };
1105 WombatKVKvStore::new(cfg, InMemoryObjectStore::default()).expect("build store")
1106 }
1107
1108 #[test]
1112 fn compressed_round_trip_through_object_store() {
1113 let dir = tempdir().expect("tempdir");
1114 let store = build_store_with_compression(dir.path().to_path_buf());
1115
1116 let mut payload = vec![0_u8; 64 * 1024];
1118 payload.extend_from_slice(b"trailer bytes");
1119 let bytes = Bytes::from(payload.clone());
1120
1121 store.put_kv("ns", "k1", bytes.clone()).expect("put");
1122
1123 let object_key = store.object_key("ns", "k1");
1125 let on_wire = store.object_store().get_object(&object_key).expect("s3 get");
1126 assert!(crate::compression::has_magic(&on_wire), "expected WBZ1 magic");
1127 assert!(
1128 on_wire.len() < payload.len(),
1129 "compressed payload should be smaller than the original; got {} vs {}",
1130 on_wire.len(),
1131 payload.len()
1132 );
1133
1134 store.clear_flat_cache();
1137 store.clear_foyer();
1138
1139 match store.get_kv("ns", "k1").expect("get") {
1140 GetOutcome::Hit { tier, payload: got } => {
1141 assert!(matches!(tier, HitTier::ObjectStore));
1142 assert_eq!(got.as_ref(), payload.as_slice());
1143 }
1144 GetOutcome::Miss => panic!("expected hit"),
1145 }
1146 }
1147
1148 #[test]
1152 fn mixed_compressed_and_legacy_blobs_both_readable() {
1153 let dir = tempdir().expect("tempdir");
1154 let store = build_store_with_compression(dir.path().to_path_buf());
1155
1156 let legacy_payload = b"legacy uncompressed payload".to_vec();
1158 let legacy_key = store.object_key("ns", "legacy");
1159 store.object_store().put_object(&legacy_key, &legacy_payload).expect("legacy put");
1160
1161 let fresh_payload = vec![7_u8; 32 * 1024];
1163 store.put_kv("ns", "fresh", Bytes::from(fresh_payload.clone())).expect("fresh put");
1164
1165 store.clear_flat_cache();
1167 store.clear_foyer();
1168
1169 match store.get_kv("ns", "legacy").expect("get legacy") {
1170 GetOutcome::Hit { payload, .. } => {
1171 assert_eq!(payload.as_ref(), legacy_payload.as_slice());
1172 }
1173 GetOutcome::Miss => panic!("legacy miss"),
1174 }
1175 match store.get_kv("ns", "fresh").expect("get fresh") {
1176 GetOutcome::Hit { payload, .. } => {
1177 assert_eq!(payload.as_ref(), fresh_payload.as_slice());
1178 }
1179 GetOutcome::Miss => panic!("fresh miss"),
1180 }
1181 }
1182
1183 #[test]
1184 fn put_get_round_trip_serves_from_foyer_warm_path() {
1185 let dir = tempdir().expect("tempdir");
1186 let store = build_store(dir.path().to_path_buf());
1187
1188 let payload = Bytes::from_static(b"qwen3-pd-payload");
1189 store.put_kv("ns-a", "seq-1", payload.clone()).expect("put");
1190
1191 match store.get_kv("ns-a", "seq-1").expect("get") {
1192 GetOutcome::Hit { tier, payload: got } => {
1193 assert!(matches!(tier, HitTier::Foyer));
1194 assert_eq!(got, payload);
1195 }
1196 GetOutcome::Miss => panic!("expected hit"),
1197 }
1198 }
1199
1200 #[test]
1201 fn write_through_s3_persists_to_object_store() {
1202 let dir = tempdir().expect("tempdir");
1203 let store = build_store(dir.path().to_path_buf());
1204
1205 let payload = Bytes::from_static(b"qwen3-prefill-bytes");
1206 store.put_kv("ns-a", "seq-7", payload.clone()).expect("put");
1207
1208 let object_key = store.object_key("ns-a", "seq-7");
1209 let raw = store.object_store().get_object(&object_key).expect("s3 get");
1210 let decoded = crate::compression::decode_if_compressed(&raw);
1213 assert_eq!(&*decoded, payload.as_ref());
1214 }
1215
1216 #[test]
1217 fn s3_fallback_serves_value_when_foyer_was_cleared() {
1218 let dir = tempdir().expect("tempdir");
1219 let store = build_store(dir.path().to_path_buf());
1220
1221 let payload = Bytes::from_static(b"survives-foyer-clear");
1222 store.put_kv("ns-a", "seq-9", payload.clone()).expect("put");
1223 store.clear_foyer();
1226 store.clear_flat_cache();
1227
1228 match store.get_kv("ns-a", "seq-9").expect("get") {
1229 GetOutcome::Hit { tier, payload: got } => {
1230 assert!(matches!(tier, HitTier::ObjectStore));
1231 assert_eq!(got, payload);
1232 }
1233 GetOutcome::Miss => panic!("expected S3 fallback hit"),
1234 }
1235
1236 match store.get_kv("ns-a", "seq-9").expect("get-2") {
1240 GetOutcome::Hit { tier, .. } => assert!(matches!(tier, HitTier::Foyer)),
1241 GetOutcome::Miss => panic!("expected foyer promotion"),
1242 }
1243 }
1244
1245 #[test]
1246 fn restart_pattern_rebuilds_foyer_from_s3_only() {
1247 let dir = tempdir().expect("tempdir");
1248 let cfg_a = EmbedConfig {
1249 s3_prefix: "test/kv".to_string(),
1250 foyer: small_foyer(dir.path().join("a")),
1251 write_through_s3: true,
1252 compression: BlockCompressionConfig::default(),
1253 };
1254 let object_store = InMemoryObjectStore::default();
1255 let store_a = WombatKVKvStore::new(cfg_a, object_store.clone()).expect("a");
1256 for idx in 0..6_u32 {
1257 let key = format!("seq-{idx}");
1258 store_a.put_kv("ns", &key, Bytes::from(vec![idx as u8; 1024])).expect("put");
1259 }
1260 drop(store_a); let cfg_b = EmbedConfig {
1264 s3_prefix: "test/kv".to_string(),
1265 foyer: small_foyer(dir.path().join("b")),
1266 write_through_s3: true,
1267 compression: BlockCompressionConfig::default(),
1268 };
1269 let store_b = WombatKVKvStore::new(cfg_b, object_store).expect("b");
1270
1271 let restored = store_b.restore_from_s3("ns").expect("restore");
1272 assert_eq!(restored, 6);
1273
1274 for idx in 0..6_u32 {
1275 let key = format!("seq-{idx}");
1276 match store_b.get_kv("ns", &key).expect("get") {
1277 GetOutcome::Hit { tier, payload } => {
1278 assert!(matches!(tier, HitTier::Foyer));
1279 assert_eq!(payload.as_ref(), vec![idx as u8; 1024].as_slice());
1280 }
1281 GetOutcome::Miss => panic!("expected foyer hit after restore"),
1282 }
1283 }
1284 }
1285
1286 #[test]
1287 fn miss_returns_miss_without_falling_through_on_unknown_key() {
1288 let dir = tempdir().expect("tempdir");
1289 let store = build_store(dir.path().to_path_buf());
1290
1291 assert!(matches!(store.get_kv("ns", "missing").expect("get"), GetOutcome::Miss));
1292 }
1293
1294 #[test]
1295 fn exists_kv_checks_foyer_and_object_store_without_loading_payload() {
1296 let dir = tempdir().expect("tempdir");
1297 let store = build_store(dir.path().to_path_buf());
1298
1299 assert!(!store.exists_kv("ns", "missing").expect("missing exists"));
1300
1301 store.put_kv("ns", "present", Bytes::from_static(b"exists-payload")).expect("put");
1302 assert!(store.exists_kv("ns", "present").expect("foyer exists"));
1303
1304 store.clear_foyer();
1305 assert!(store.exists_kv("ns", "present").expect("object store exists"));
1306 }
1307
1308 #[test]
1309 fn list_namespace_returns_only_prefix_matching_keys() {
1310 let dir = tempdir().expect("tempdir");
1311 let store = build_store(dir.path().to_path_buf());
1312
1313 store.put_kv("ns-a", "k1", Bytes::from_static(b"a1")).expect("a1");
1314 store.put_kv("ns-a", "k2", Bytes::from_static(b"a2")).expect("a2");
1315 store.put_kv("ns-b", "k1", Bytes::from_static(b"b1")).expect("b1");
1316
1317 let mut a_keys = store.list_namespace("ns-a").expect("list-a");
1318 let mut b_keys = store.list_namespace("ns-b").expect("list-b");
1319 a_keys.sort();
1320 b_keys.sort();
1321 assert_eq!(a_keys, vec!["test/kv/ns-a/k1", "test/kv/ns-a/k2"]);
1322 assert_eq!(b_keys, vec!["test/kv/ns-b/k1"]);
1323
1324 let mut a_relative = store.list_kv_keys("ns-a").expect("relative-a");
1325 a_relative.sort();
1326 assert_eq!(a_relative, vec!["k1", "k2"]);
1327 }
1328
1329 #[test]
1330 fn empty_s3_prefix_is_rejected() {
1331 let dir = tempdir().expect("tempdir");
1332 let cfg = EmbedConfig {
1333 s3_prefix: String::new(),
1334 foyer: small_foyer(dir.path().to_path_buf()),
1335 write_through_s3: true,
1336 compression: BlockCompressionConfig::default(),
1337 };
1338 let result = WombatKVKvStore::new(cfg, InMemoryObjectStore::default());
1339 assert!(result.is_err());
1340 }
1341
1342 fn mk_meta_for_test(seq: u32, parent: wombatkv_radix::BlockHash) -> wombatkv_radix::BlockMeta {
1343 wombatkv_radix::BlockMeta::new_successor(
1344 parent,
1345 seq,
1346 1024,
1347 [42u8; 24],
1348 *b"test-v1\0\0\0\0\0\0\0\0\0",
1349 )
1350 }
1351
1352 #[test]
1353 fn bootstrap_from_slatedb_roundtrip() {
1354 use wombatkv_radix::{BlockMeta, MetadataIndex, SlateDbMetadataIndex};
1355
1356 let dir = tempdir().expect("tempdir");
1357 let slatedb_root = dir.path().join("slatedb-root");
1358 let store = build_store(dir.path().join("kv-store"));
1359
1360 let slatedb_index =
1361 SlateDbMetadataIndex::open_local(&slatedb_root, "node-bootstrap-rt", "tenant-a")
1362 .expect("open slatedb");
1363
1364 let mut hashes: Vec<[u8; 32]> = Vec::with_capacity(10);
1366 for i in 0..10u8 {
1367 hashes.push([i + 1; 32]);
1368 }
1369 let mut parent = BlockMeta::ZERO_HASH;
1370 for (i, h) in hashes.iter().enumerate() {
1371 slatedb_index.insert(*h, mk_meta_for_test(i as u32, parent));
1372 parent = *h;
1373 }
1374 assert_eq!(slatedb_index.len(), 10);
1375
1376 let count = store.bootstrap_from_slatedb(&slatedb_index).expect("bootstrap_from_slatedb");
1377 assert_eq!(count, 10);
1378
1379 let in_mem = store.metadata_index();
1380 assert_eq!(in_mem.len(), 10);
1381 let got = in_mem.get(&hashes[0]).expect("h0 must be in RAM index");
1383 assert_eq!(got.block_seq, 0);
1384 let got_last = in_mem.get(&hashes[9]).expect("h9 must be in RAM index");
1385 assert_eq!(got_last.block_seq, 9);
1386 assert_eq!(got_last.parent_hash, hashes[8]);
1387 }
1388
1389 #[test]
1390 fn bootstrap_from_slatedb_empty() {
1391 use wombatkv_radix::{MetadataIndex, SlateDbMetadataIndex};
1392
1393 let dir = tempdir().expect("tempdir");
1394 let slatedb_root = dir.path().join("slatedb-root");
1395 let store = build_store(dir.path().join("kv-store"));
1396
1397 let slatedb_index =
1398 SlateDbMetadataIndex::open_local(&slatedb_root, "node-bootstrap-empty", "tenant-a")
1399 .expect("open slatedb");
1400
1401 let count =
1402 store.bootstrap_from_slatedb(&slatedb_index).expect("bootstrap_from_slatedb on empty");
1403 assert_eq!(count, 0);
1404 assert_eq!(store.metadata_index().len(), 0);
1405 }
1406
1407 #[test]
1421 fn daemon_slatedb_writethrough_survives_restart() {
1422 use wombatkv_radix::{BlockMeta, MetadataIndex, SlateDbMetadataIndex};
1423
1424 let dir = tempdir().expect("tempdir");
1425 let slatedb_root = dir.path().join("slatedb-root");
1426 let node_id = "node-restart";
1427 let ns = "tenant-a";
1428
1429 let h0 = [21u8; 32];
1431 let h1 = [22u8; 32];
1432 let h2 = [23u8; 32];
1433
1434 {
1436 let store = build_store(dir.path().join("kv-store-1"));
1437 let slatedb_index = SlateDbMetadataIndex::open_local(&slatedb_root, node_id, ns)
1438 .expect("open slatedb #1");
1439
1440 let n0 = store.bootstrap_from_slatedb(&slatedb_index).expect("bootstrap #1");
1443 assert_eq!(n0, 0);
1444
1445 let ram = store.metadata_index();
1448 let m0 = BlockMeta::new_root(1024, [0u8; 24], [0u8; 16]);
1449 let m1 = BlockMeta::new_successor(h0, 1, 1024, [0u8; 24], [0u8; 16]);
1450 let m2 = BlockMeta::new_successor(h1, 2, 1024, [0u8; 24], [0u8; 16]);
1451 ram.insert(h0, m0);
1452 slatedb_index.insert(h0, m0);
1453 ram.insert(h1, m1);
1454 slatedb_index.insert(h1, m1);
1455 ram.insert(h2, m2);
1456 slatedb_index.insert(h2, m2);
1457
1458 assert_eq!(ram.longest_prefix(&[h0, h1, h2]), 3);
1460 assert_eq!(slatedb_index.len(), 3);
1461
1462 slatedb_index.close().expect("close slatedb #1");
1466 }
1469
1470 let store = build_store(dir.path().join("kv-store-2"));
1472 let slatedb_index = SlateDbMetadataIndex::open_local(&slatedb_root, node_id, ns)
1473 .expect("reopen slatedb #2");
1474
1475 let n = store.bootstrap_from_slatedb(&slatedb_index).expect("bootstrap #2");
1478 assert_eq!(n, 3, "second-process bootstrap must see the 3 prior writes");
1479
1480 let ram = store.metadata_index();
1484 assert_eq!(ram.longest_prefix(&[h0, h1, h2]), 3);
1485 assert_eq!(ram.longest_prefix(&[h0]), 1);
1486 let got2 = ram.get(&h2).expect("h2 must be present");
1488 assert_eq!(got2.block_seq, 2);
1489 assert_eq!(got2.parent_hash, h1);
1490 }
1491
1492 #[test]
1502 fn bootstrap_world_knowledge_indexes_all_tier_b_keys_above_page_size() {
1503 use wombatkv_radix::MetadataIndex;
1504 let dir = tempdir().expect("tempdir");
1505 let store = build_store(dir.path().to_path_buf());
1506 let object_store = store.object_store();
1509 let mut expected_hashes: Vec<[u8; 32]> = Vec::with_capacity(2000);
1510 for i in 0..2000u32 {
1511 let mut h = [0u8; 32];
1512 let bytes = i.to_be_bytes();
1515 h[0..4].copy_from_slice(&bytes);
1516 h[28..32].copy_from_slice(&bytes);
1517 expected_hashes.push(h);
1518 let hex = hex32_lower(&h);
1519 let key = format!("test/kv/ns-pagetest/wombatkv/v1/block/b3={hex}");
1520 object_store.put_object(&key, &[0u8; 4]).expect("put");
1521 }
1522 let loaded = store.bootstrap_world_knowledge("ns-pagetest").expect("bootstrap");
1523 assert_eq!(
1524 loaded, 2000,
1525 "bootstrap must index every block key returned by list_prefix, \
1526 not just the first page"
1527 );
1528 let idx = store.metadata_index();
1529 assert_eq!(idx.len(), 2000);
1530 for probe in [0, 999, 1000, 1500, 1999] {
1532 assert!(
1533 idx.get(&expected_hashes[probe]).is_some(),
1534 "hash at offset {probe} missing from metadata index"
1535 );
1536 }
1537 }
1538
1539 fn hex32_lower(h: &[u8; 32]) -> String {
1540 let mut s = String::with_capacity(64);
1541 const HEX: &[u8; 16] = b"0123456789abcdef";
1542 for byte in h {
1543 s.push(HEX[(byte >> 4) as usize] as char);
1544 s.push(HEX[(byte & 0x0f) as usize] as char);
1545 }
1546 s
1547 }
1548
1549 #[test]
1550 fn bootstrap_from_slatedb_idempotent() {
1551 use wombatkv_radix::{BlockMeta, MetadataIndex, SlateDbMetadataIndex};
1552
1553 let dir = tempdir().expect("tempdir");
1554 let slatedb_root = dir.path().join("slatedb-root");
1555 let store = build_store(dir.path().join("kv-store"));
1556
1557 let slatedb_index =
1558 SlateDbMetadataIndex::open_local(&slatedb_root, "node-bootstrap-idem", "tenant-a")
1559 .expect("open slatedb");
1560
1561 for i in 0..5u8 {
1562 slatedb_index.insert([i + 1; 32], mk_meta_for_test(u32::from(i), BlockMeta::ZERO_HASH));
1563 }
1564
1565 let first = store.bootstrap_from_slatedb(&slatedb_index).expect("first bootstrap");
1566 assert_eq!(first, 5);
1567
1568 let second = store.bootstrap_from_slatedb(&slatedb_index).expect("second bootstrap");
1571 assert_eq!(second, first);
1572 assert_eq!(store.metadata_index().len(), 5);
1573 }
1574}