Skip to main content

wombatkv_node/
embed.rs

1#![forbid(unsafe_code)]
2//! Embeddable KV-cache store API for inference engines.
3//!
4//! Combines the in-process [`FoyerHybridCache`]
5//! (G2 RAM + G3 `NVMe`) with a durable object store (G4: S3, `MinIO`,
6//! local fs via [`wombatkv_store::wal_store::InMemoryObjectStore`] for tests) so
7//! a single inference engine can:
8//!
9//! 1. Write a KV blob through both tiers (foyer hot, S3 cold) on prefill.
10//! 2. Read with foyer-first, S3-fallback semantics on decode / cold start.
11//! 3. Restart cleanly: a fresh process can rehydrate foyer from S3 on
12//!    boot, so no work is lost when the engine restarts.
13//!
14//! The store is generic over [`wombatkv_store::wal_store::ObjectStore`], so the
15//! same code path is exercised by unit tests (in-memory) and by live
16//! MinIO/S3 integration tests.
17
18use std::sync::Arc;
19use std::time::Instant;
20
21use bytes::Bytes;
22use wombatkv_store::wal_store::{ObjectStore, WalStoreError};
23
24use crate::compression::{
25    decode_if_compressed, encode_with_header, BlockCompressionConfig, CompressAlgo,
26};
27use crate::embed_metrics::{metrics, Op};
28use crate::foyer_cache::{FoyerCacheConfig, FoyerCacheError, FoyerHitTier, FoyerHybridCache};
29
30const DEFAULT_S3_PREFIX: &str = "kv";
31
32/// Soft warn threshold for [`WombatKVKvStore::bootstrap_world_knowledge`].
33/// Logged once when an `ObjectStore::list_prefix` returns ≥ this many
34/// keys, multi-tenant production buckets approaching this size are
35/// healthy but worth an operator look (cf. RFC 0008 §5).
36const BOOTSTRAP_KEY_LIMIT_WARN: usize = 100_000;
37
38/// Hard log threshold for [`WombatKVKvStore::bootstrap_world_knowledge`].
39/// Above this we still process every key (the prior contract is
40/// preserved) but emit a louder "`limit_exceeded`" event so the caller
41/// can decide whether to bound the bootstrap on the next run.
42const BOOTSTRAP_KEY_LIMIT_HARD: usize = 1_000_000;
43
44/// Tuning knobs for the embeddable KV store.
45#[derive(Clone, Debug, PartialEq, Eq)]
46pub struct EmbedConfig {
47    /// Prefix prepended to every S3 key. Combined with `namespace` per put.
48    pub s3_prefix: String,
49    /// Foyer config. Reused verbatim for the hybrid memory + disk tier.
50    pub foyer: FoyerCacheConfig,
51    /// When true, `put_kv` blocks until the S3 write returns. When false,
52    /// foyer is updated synchronously and S3 is best-effort.
53    pub write_through_s3: bool,
54    /// Transparent block-storage compression policy. Default: off. When
55    /// enabled, the put path encodes payloads with the `WBZ1` envelope
56    /// before the object-store PUT; the get path detects the envelope
57    /// on cold reads and decompresses transparently. In-memory tiers
58    /// (flat + foyer) always hold uncompressed bytes so cache hits stay
59    /// allocation-cheap. See `crate::compression`.
60    pub compression: BlockCompressionConfig,
61}
62
63impl Default for EmbedConfig {
64    fn default() -> Self {
65        Self {
66            s3_prefix: DEFAULT_S3_PREFIX.to_string(),
67            foyer: FoyerCacheConfig::default(),
68            write_through_s3: true,
69            compression: BlockCompressionConfig::default(),
70        }
71    }
72}
73
74/// Errors surfaced by the embeddable KV store.
75#[derive(Debug)]
76pub enum EmbedError {
77    Foyer(FoyerCacheError),
78    ObjectStore(WalStoreError),
79    InvalidConfig(String),
80}
81
82impl std::fmt::Display for EmbedError {
83    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
84        match self {
85            Self::Foyer(err) => write!(f, "WombatKV puffer error: {err}"),
86            Self::ObjectStore(err) => write!(f, "object store error: {err:?}"),
87            Self::InvalidConfig(msg) => write!(f, "invalid config: {msg}"),
88        }
89    }
90}
91
92impl std::error::Error for EmbedError {}
93
94impl From<FoyerCacheError> for EmbedError {
95    fn from(value: FoyerCacheError) -> Self {
96        Self::Foyer(value)
97    }
98}
99
100impl From<WalStoreError> for EmbedError {
101    fn from(value: WalStoreError) -> Self {
102        Self::ObjectStore(value)
103    }
104}
105
106/// A KV cache lookup hit, with the tier the value came from.
107#[derive(Debug, Clone)]
108pub enum HitTier {
109    Foyer,
110    ObjectStore,
111}
112
113/// Outcome of a `get_kv` call.
114#[derive(Debug, Clone)]
115pub enum GetOutcome {
116    Hit { tier: HitTier, payload: Bytes },
117    Miss,
118}
119
120/// Embeddable KV cache store.
121///
122/// Wraps a [`FoyerHybridCache`] for hot-path lookups and an [`ObjectStore`]
123/// (typically S3 / `MinIO`) for durability. Designed to be embedded in an
124/// inference engine binary (e.g. `vllm.rs`) without dragging in any
125/// additional async runtime.
126pub struct WombatKVKvStore<S: ObjectStore> {
127    foyer: Arc<FoyerHybridCache>,
128    /// Profile-driven addition (2026-05-15 _debug branch): foyer's
129    /// `block_on(inner.get())` was 17.96 ms of 18.36 ms warm C ABI time.
130    /// Flat file + page-cache hit drops that to ~9 ms. Foyer stays for
131    /// the cold-via-S3 path and the multi-tenant SSD-spill story; the
132    /// flat tier handles the same-machine warm hot path.
133    flat: Arc<crate::kv_blob_cache::FlatFileKvBlobCache>,
134    object_store: S,
135    s3_prefix: String,
136    write_through_s3: bool,
137    /// Block compression policy applied at the object-store boundary.
138    /// Cache tiers always hold uncompressed bytes. See `EmbedConfig`.
139    compression: BlockCompressionConfig,
140    /// World-knowledge index: on startup, the puffer can populate this
141    /// from `list_prefix` (manifest reads) so it knows what's in the
142    /// bucket without lookup-by-lookup S3 round-trips. RFC 0008 §5.
143    metadata_index: Arc<wombatkv_radix::InMemoryMetadataIndex>,
144}
145
146impl<S: ObjectStore> WombatKVKvStore<S> {
147    /// Build a new store. The foyer cache is created up front; the object
148    /// store handle is passed in so callers can configure S3 credentials
149    /// or supply an in-memory backend for tests.
150    pub fn new(config: EmbedConfig, object_store: S) -> Result<Self, EmbedError> {
151        if config.s3_prefix.is_empty() {
152            return Err(EmbedError::InvalidConfig("s3_prefix must be non-empty".to_string()));
153        }
154        let flat_root = config.foyer.ssd_dir.join("_flat");
155        let flat = Arc::new(
156            crate::kv_blob_cache::FlatFileKvBlobCache::open(flat_root)
157                .map_err(|err| EmbedError::InvalidConfig(format!("flat cache open: {err}")))?,
158        );
159        let foyer = FoyerHybridCache::open(config.foyer)?;
160        Ok(Self {
161            foyer,
162            flat,
163            object_store,
164            s3_prefix: config.s3_prefix,
165            write_through_s3: config.write_through_s3,
166            compression: config.compression,
167            metadata_index: Arc::new(wombatkv_radix::InMemoryMetadataIndex::new()),
168        })
169    }
170
171    /// Build a store reusing an already-opened foyer instance. Useful when
172    /// the engine wants to share one foyer across multiple stores.
173    pub fn with_foyer(
174        foyer: Arc<FoyerHybridCache>,
175        object_store: S,
176        s3_prefix: impl Into<String>,
177        write_through_s3: bool,
178    ) -> Result<Self, EmbedError> {
179        let s3_prefix = s3_prefix.into();
180        if s3_prefix.is_empty() {
181            return Err(EmbedError::InvalidConfig("s3_prefix must be non-empty".to_string()));
182        }
183        // Sibling flat-file cache next to foyer's SSD tier. Same dir
184        // tree, distinct subdir so we don't collide with foyer's blocks.
185        let flat_root = foyer.ssd_dir().join("_flat");
186        let flat = Arc::new(
187            crate::kv_blob_cache::FlatFileKvBlobCache::open(flat_root)
188                .map_err(|err| EmbedError::InvalidConfig(format!("flat cache open: {err}")))?,
189        );
190        Ok(Self {
191            foyer,
192            flat,
193            object_store,
194            s3_prefix,
195            write_through_s3,
196            compression: BlockCompressionConfig::from_env(),
197            metadata_index: Arc::new(wombatkv_radix::InMemoryMetadataIndex::new()),
198        })
199    }
200
201    /// Expose the block-compression policy currently in force. Test
202    /// helper, production code reads from the put/get paths instead.
203    #[must_use]
204    pub fn compression(&self) -> BlockCompressionConfig {
205        self.compression
206    }
207
208    /// Expose the metadata index. Callers (e.g. the FFI Handle on
209    /// startup) can call `bootstrap_world_knowledge` to populate it
210    /// from S3, or query it directly for chain-aware lookups.
211    pub fn metadata_index(&self) -> Arc<wombatkv_radix::InMemoryMetadataIndex> {
212        self.metadata_index.clone()
213    }
214
215    /// World-knowledge bootstrap (RFC 0008 §5): walk the S3 prefix for
216    /// `namespace`, read each manifest, decode the chunk-hash chain,
217    /// populate the in-memory metadata index. After this returns, the
218    /// puffer "knows what's in the bucket" without per-request S3
219    /// round-trips.
220    ///
221    /// Cost is O(M) S3 GETs where M = manifest count. Each manifest is
222    /// tiny (~800 bytes for 23 chunks). For 100 cached prompts: ~100
223    /// small GETs ≈ 1-2 s on local `MinIO`. Run at startup behind an env
224    /// gate; not on the request hot path.
225    ///
226    /// Pagination: the underlying `ObjectStore::list_prefix` is
227    /// responsible for exhausting S3's 1000-keys-per-page continuation
228    /// loop and returning every match, see the `S3ObjectStore::list_prefix`
229    /// impl, which iterates the `rust-s3` `Vec<ListBucketResult>` already
230    /// returned in fully-paged form. We add a defensive log + warn here
231    /// so an unexpectedly large bucket surfaces in operator logs instead
232    /// of silently bottling up: real production buckets above
233    /// `BOOTSTRAP_KEY_LIMIT_WARN` keys deserve an operator look (likely
234    /// a stale-data sweep is overdue), and above `BOOTSTRAP_KEY_LIMIT_HARD`
235    /// we still process them but emit an explicit "exceeded" event so the
236    /// caller can decide to bound the work.
237    pub fn bootstrap_world_knowledge(&self, namespace: &str) -> Result<usize, EmbedError> {
238        use wombatkv_radix::MetadataIndex;
239        let started = Instant::now();
240        let prefix = if namespace.is_empty() {
241            format!("{}/", self.s3_prefix)
242        } else {
243            format!("{}/{}/", self.s3_prefix, namespace)
244        };
245        let keys = self.object_store.list_prefix(&prefix).map_err(EmbedError::from)?;
246        let key_count = keys.len();
247        // DST invariant: bootstrap_world_knowledge must never load more
248        // keys than the hard limit emits as warning. Catches future
249        // regressions where the limit-warn path silently grows past
250        // its envelope. Inert in non-dst builds.
251        #[cfg(feature = "dst")]
252        wombatkv_dst::assert_always(
253            key_count <= BOOTSTRAP_KEY_LIMIT_HARD.saturating_mul(2),
254            "bootstrap key_count within 2× hard limit",
255            format!("got {key_count} keys, hard limit {BOOTSTRAP_KEY_LIMIT_HARD}"),
256        );
257        // DST coverage: empty-namespace bootstrap should be exercised
258        // by some seeded run. Sticks once any namespace returns 0 keys.
259        #[cfg(feature = "dst")]
260        wombatkv_dst::assert_sometimes(
261            key_count == 0,
262            "bootstrap saw empty namespace",
263            "DST coverage gate, exercises the empty-bucket cold path",
264        );
265        if key_count >= BOOTSTRAP_KEY_LIMIT_HARD {
266            eprintln!(
267                "[MyelonInstr] {{\"scope\":\"wmbt_kv_warn\",\"fn\":\"bootstrap_world_knowledge\",\
268                 \"event\":\"key_count_exceeded_hard_limit\",\"keys\":{key_count},\
269                 \"limit\":{BOOTSTRAP_KEY_LIMIT_HARD},\"prefix\":\"{prefix}\"}}"
270            );
271        } else if key_count >= BOOTSTRAP_KEY_LIMIT_WARN {
272            eprintln!(
273                "[MyelonInstr] {{\"scope\":\"wmbt_kv_warn\",\"fn\":\"bootstrap_world_knowledge\",\
274                 \"event\":\"key_count_high\",\"keys\":{key_count},\
275                 \"warn_at\":{BOOTSTRAP_KEY_LIMIT_WARN},\"prefix\":\"{prefix}\"}}"
276            );
277        }
278        // Production block-keys land at `wombatkv/v1/block/b3=<hex>` -
279        // content-addressed, no per-prompt manifest blob (chain lives only
280        // in the in-process metadata index at write time). Bootstrap parses
281        // each block key directly and inserts it as a standalone root entry.
282        // `lookup_block_prefix` only checks presence, so chain wiring is
283        // optional here.
284        //
285        // Sidecar keys (`wombatkv/v1/sidecar/raw_tail/b3=<hex>`) carry the
286        // 28-byte raw_tail payload for warm-restore. We GET each and stuff
287        // it into the flat blob cache so the subsequent prompt-path
288        // `get_raw_tail_borrowed` is a cache hit instead of an S3 RTT.
289        // This shaves ~60ms off the cell-B warm restore on Mac MinIO.
290        let mut blocks_loaded: usize = 0;
291        let mut skipped_unrecognized: usize = 0;
292        let mut sidecars_prewarmed: usize = 0;
293        let mut sidecar_bytes_total: usize = 0;
294        let mut sidecar_keys: Vec<&String> = Vec::new();
295
296        // Match keys via the canonical prefixes from wombatkv-radix so a
297        // future rename touches a single source of truth (the cabi PUT
298        // path and the prefetch path read from the same constants).
299        let block_key_infix = format!("/{}", wombatkv_radix::BLOCK_KEY_PREFIX);
300        let sidecar_key_infix = format!("/{}", wombatkv_radix::SIDECAR_RAW_TAIL_KEY_PREFIX);
301
302        for key in &keys {
303            if let Some(idx) = key.find(&block_key_infix) {
304                let hex = &key[idx + block_key_infix.len()..];
305                if hex.len() == 64 {
306                    let mut hash = [0u8; 32];
307                    if decode_hex32(hex, &mut hash) {
308                        let meta = wombatkv_radix::BlockMeta::new_root(0, [0u8; 24], [0u8; 16]);
309                        self.metadata_index.insert(hash, meta);
310                        blocks_loaded += 1;
311                    }
312                }
313                continue;
314            }
315            if key.contains(&sidecar_key_infix) {
316                sidecar_keys.push(key);
317                continue;
318            }
319            skipped_unrecognized += 1;
320        }
321        // Pre-warm raw_tail sidecars into the flat blob cache so the prompt
322        // path skips the S3 RTT. We call `self.get_kv(ns, rel_key)` which
323        // already does the canonical "GET + decode_if_compressed + flat-cache
324        // PUT" pipeline, reusing it means the prewarm cache shape is
325        // guaranteed to match what the prompt path looks for.
326        for full_key in &sidecar_keys {
327            let rel_idx = full_key.find("/wombatkv/v1/").map_or(0, |i| i + 1);
328            let rel_key = full_key[rel_idx..].to_string();
329            match self.get_kv(namespace, &rel_key) {
330                Ok(GetOutcome::Hit { payload, .. }) => {
331                    sidecar_bytes_total = sidecar_bytes_total.saturating_add(payload.len());
332                    sidecars_prewarmed += 1;
333                }
334                Ok(GetOutcome::Miss) | Err(_) => {
335                    // Best-effort prewarm, a miss here just means the prompt
336                    // path will pay the S3 RTT for raw_tail. Not fatal.
337                }
338            }
339        }
340        let elapsed_ms = started.elapsed().as_millis();
341        eprintln!(
342            "[MyelonInstr] {{\"scope\":\"wmbt_kv_timing\",\"fn\":\"bootstrap_world_knowledge\",\
343             \"stages\":{{\"total_ms\":{elapsed_ms},\"blocks_indexed\":{blocks_loaded},\
344             \"sidecars_prewarmed\":{sidecars_prewarmed},\
345             \"sidecar_bytes_total\":{sidecar_bytes_total},\
346             \"unrecognized_keys\":{skipped_unrecognized},\
347             \"namespace\":\"{namespace}\"}}}}"
348        );
349        Ok(blocks_loaded)
350    }
351
352    /// L1 bootstrap from `SlateDbMetadataIndex` (RFC 0008 §5 fast path).
353    ///
354    /// Snapshots all (hash, meta) pairs from the persistent SlateDB-backed
355    /// index and bulk-loads them into the in-memory `metadata_index`.
356    /// On a fresh process this lets us rehydrate "what's in the world"
357    /// in milliseconds, one local `SlateDB` scan vs the O(M) S3 GETs the
358    /// S3-based `bootstrap_world_knowledge` would issue.
359    ///
360    /// Idempotent: `InMemoryMetadataIndex::bulk_load` skips already-present
361    /// hashes, so a second call with the same `SlateDB` returns the same
362    /// loaded count and does not clobber any in-memory access stamps.
363    ///
364    /// Returns the number of entries pulled out of `SlateDB` (this is the
365    /// `SlateDB` row count, not the net-new RAM-index inserts, by design,
366    /// mirroring `bootstrap_world_knowledge`'s "blocks loaded" semantic).
367    pub fn bootstrap_from_slatedb(
368        &self,
369        slatedb_index: &wombatkv_radix::SlateDbMetadataIndex,
370    ) -> Result<usize, EmbedError> {
371        use wombatkv_radix::MetadataIndex;
372        let started = Instant::now();
373        let entries = slatedb_index.entries();
374        let count = entries.len();
375        self.metadata_index.bulk_load(entries);
376        let elapsed_ms = started.elapsed().as_millis();
377        eprintln!(
378            "[MyelonInstr] {{\"scope\":\"wmbt_kv_timing\",\"fn\":\"bootstrap_from_slatedb\",\
379             \"stages\":{{\"total_ms\":{elapsed_ms},\"blocks_loaded\":{count}}}}}"
380        );
381        Ok(count)
382    }
383
384    /// Compose the S3 object key for a given namespace + cache key.
385    /// Layout: `{s3_prefix}/{namespace}/{key}`. Namespace and key MUST be
386    /// callers' responsibility to keep filesystem-safe.
387    #[must_use]
388    pub fn object_key(&self, namespace: &str, key: &str) -> String {
389        if namespace.is_empty() {
390            format!("{}/{}", self.s3_prefix, key)
391        } else {
392            format!("{}/{}/{}", self.s3_prefix, namespace, key)
393        }
394    }
395
396    /// Local-only cache key (foyer's key space). Mirrors `object_key` so a
397    /// single string identifies the same blob in both tiers.
398    fn cache_key(&self, namespace: &str, key: &str) -> String {
399        self.object_key(namespace, key)
400    }
401
402    /// Write a payload through both tiers.
403    ///
404    /// Always inserts into foyer synchronously. When `write_through_s3` is
405    /// true the call also blocks on the S3 PUT and surfaces any error;
406    /// when false the S3 write is best-effort and only logs on failure.
407    pub fn put_kv(&self, namespace: &str, key: &str, payload: Bytes) -> Result<(), EmbedError> {
408        // DST chaos site:
409        // Two complementary fault-injection paths:
410        //  (a) dst_buggify!(), probabilistic per-callsite roll driven
411        //      by WMBT_KV_DST_BUGGIFY env vars. Used for general chaos
412        //      exploration across seeds.
413        //  (b) dst_plan::is_put_suppressed(), plan-aware fault dispatch.
414        //      Returns true if the loaded FaultPlan has a put-suppressing
415        //      event (S3PutFailure / S3PutLatency / KillBeforeChainHead /
416        //      KillBeforeSidecar) scheduled for the current op counter.
417        //      The DST runner calls dst_plan::set_plan() at startup and
418        //      dst_plan::advance_op() between each op so this consult
419        //      lines up with the scheduled trigger.
420        // Both are inert in non-dst builds. Either path returning true
421        // simulates the "S3 PUT failed mid-chain" failure class, the
422        // caller's partial-chain-recovery path is what we're exercising.
423        #[cfg(feature = "dst")]
424        {
425            if wombatkv_dst::dst_buggify!() {
426                return Err(EmbedError::InvalidConfig(
427                    "dst buggify: simulated put_kv S3 PUT failure".to_string(),
428                ));
429            }
430            if wombatkv_dst::dst_plan::is_put_suppressed() {
431                return Err(EmbedError::InvalidConfig(
432                    "dst_plan: scheduled put-suppressing fault for current op".to_string(),
433                ));
434            }
435        }
436        let started = Instant::now();
437        let cache_key = self.cache_key(namespace, key);
438        let object_key = self.object_key(namespace, key);
439        let bytes_len = payload.len() as u64;
440
441        // Write to flat cache first (fast warm-read path). Caches always
442        // hold uncompressed bytes so the warm-read TTFT story doesn't
443        // pay a per-hit decode tax.
444        crate::kv_blob_cache::KvBlobCache::put(self.flat.as_ref(), &cache_key, payload.clone());
445        self.foyer.put(&cache_key, payload.clone());
446
447        // Compress (or pass through) at the object-store boundary.
448        let on_wire = encode_for_storage(&payload, self.compression, key, "put_kv")?;
449        let result = if self.write_through_s3 {
450            self.object_store.put_object(&object_key, &on_wire).map_err(EmbedError::from)
451        } else {
452            if let Err(err) = self.object_store.put_object(&object_key, &on_wire) {
453                eprintln!("wombatkv: best-effort S3 PUT failed for {object_key}: {err:?}");
454            }
455            Ok(())
456        };
457
458        let elapsed_us = u64::try_from(started.elapsed().as_micros()).unwrap_or(u64::MAX);
459        metrics().observe(Op::Stash, elapsed_us, bytes_len);
460        // emit per-component timing so
461        // the global latency_histogram registry picks it up. Tag is
462        // <func>:<path>:<stage> per emit_timing's record_global convention.
463        emit_timing(
464            "WombatKVKvStore.put_kv",
465            if self.write_through_s3 { "s3_write_through" } else { "s3_best_effort" },
466            &[("total_us", elapsed_us), ("payload_bytes", bytes_len)],
467        );
468        result
469    }
470
471    /// Foyer-sync write, S3 write spawned on a detached thread.
472    ///
473    /// Returns as soon as foyer has the bytes, typically within a few
474    /// hundred microseconds, so callers (e.g. ds4 right after Metal
475    /// prefill) can move on to decode while the slow `ObjectStore` PUT
476    /// happens off-thread. Foyer is updated atomically inside this call,
477    /// so any subsequent `get_kv` against the same key (from the same
478    /// process) will hit foyer-RAM, not race the in-flight S3 write.
479    ///
480    /// Trade-offs vs the synchronous [`Self::put_kv`]:
481    /// - Cross-process GET against the same key racing the background
482    ///   S3 PUT will miss in S3 until the write completes (foyer is
483    ///   process-local). Not safe for cross-engine sharing under that
484    ///   pattern; safe for the single-client-per-host shape ds4 uses.
485    /// - Spawn failure (rare) loses the S3 write entirely; foyer still
486    ///   has it. We log to stderr and otherwise swallow because the
487    ///   caller (ds4) has already moved on.
488    /// - One detached thread per call. For sustained high-rate puts a
489    ///   bounded executor would be safer; the current ds4 pattern is
490    ///   one put per chat completion (a few per minute at most), so the
491    ///   thread cost is negligible.
492    pub fn put_kv_async_s3(this: Arc<Self>, namespace: &str, key: &str, payload: Bytes) {
493        let cache_key = this.cache_key(namespace, key);
494        let object_key = this.object_key(namespace, key);
495        let bytes_len = payload.len() as u64;
496
497        // Write to flat cache first (fast warm-read path). Caches always
498        // hold uncompressed bytes.
499        crate::kv_blob_cache::KvBlobCache::put(this.flat.as_ref(), &cache_key, payload.clone());
500        this.foyer.put(&cache_key, payload.clone());
501
502        let compression_cfg = this.compression;
503        let object_store = this.object_store.clone();
504        let object_key_owned = object_key;
505        let key_owned = key.to_string();
506        match std::thread::Builder::new()
507            .name(format!("wombatkv-embed-async-s3-{bytes_len}"))
508            .spawn(move || {
509                // Compress at the wire boundary, same envelope as the
510                // synchronous put_kv. Block writes never exceed the
511                // user-tunable WMBT_KV_BLOCK_TOKENS so no chunking
512                // (the legacy byte-chunked path was deleted).
513                let on_wire = match encode_for_storage(
514                    &payload,
515                    compression_cfg,
516                    &key_owned,
517                    "put_kv_async_s3",
518                ) {
519                    Ok(buf) => buf,
520                    Err(err) => {
521                        eprintln!("wombatkv[embed-async-s3]: compress {object_key_owned}: {err}");
522                        return;
523                    }
524                };
525                if let Err(err) = object_store.put_object(&object_key_owned, &on_wire) {
526                    eprintln!("wombatkv[embed-async-s3]: put_object {object_key_owned}: {err:?}");
527                }
528            }) {
529            Ok(_) => {}
530            Err(err) => {
531                eprintln!(
532                    "wombatkv[embed-async-s3]: spawn failed (foyer has the bytes, S3 write skipped): {err}"
533                );
534            }
535        }
536    }
537
538    /// Look up a payload, foyer-first, S3-fallback. On S3 hit the value is
539    /// promoted into foyer so subsequent calls hit the warm path.
540    ///
541    /// Each load emits a `[MyelonInstr]` JSON line attributing latency to
542    /// the actual tier that served the hit (foyer RAM, foyer SSD, S3, or
543    /// miss). Critical for diagnosing the "blob too big for RAM" pattern, e.g. qwen3's pre-allocated 4.7 GiB KV cache against a 2 GiB RAM
544    /// budget always streams from SSD, which a single `LoadFoyer` bucket
545    /// can't tell apart from a fast in-memory hit.
546    pub fn get_kv(&self, namespace: &str, key: &str) -> Result<GetOutcome, EmbedError> {
547        let started = Instant::now();
548        let cache_key = self.cache_key(namespace, key);
549        let t_cache_key = started.elapsed().as_micros() as u64;
550
551        // Flat-file fast path, std::fs::read from OS page cache.
552        // Profile (2026-05-15) showed foyer's block_on(inner.get()) was
553        // 17.96 ms / 18.36 ms warm TTFT. Flat file drops that to ~9 ms.
554        if let Some((payload, op_label)) =
555            crate::kv_blob_cache::KvBlobCache::get(self.flat.as_ref(), &cache_key)
556        {
557            let elapsed_us = u64::try_from(started.elapsed().as_micros()).unwrap_or(u64::MAX);
558            let bytes_len = payload.len() as u64;
559            // Reuse the LoadFoyerRam Op for flat hits, same shape
560            // (local cache hit, no S3) for the metrics aggregator. We
561            // distinguish via the [MyelonInstr] op_label.
562            metrics().observe(Op::LoadFoyerRam, elapsed_us, bytes_len);
563            emit_tier_event(op_label, "flat", key, bytes_len, elapsed_us);
564            emit_timing(
565                "WombatKVKvStore.get_kv",
566                "flat_hit",
567                &[
568                    ("cache_key_us", t_cache_key),
569                    ("flat_call_us", elapsed_us - t_cache_key),
570                    ("total_us", elapsed_us),
571                    ("payload_bytes", bytes_len),
572                ],
573            );
574            return Ok(GetOutcome::Hit { tier: HitTier::Foyer, payload });
575        }
576        let t_flat_done = started.elapsed().as_micros() as u64;
577        let foyer_result = self.foyer.get_with_tier(&cache_key);
578        let t_foyer_done = started.elapsed().as_micros() as u64;
579        if let Some((payload, tier)) = foyer_result {
580            let bytes_len = payload.len() as u64;
581            let elapsed_us = u64::try_from(started.elapsed().as_micros()).unwrap_or(u64::MAX);
582            let op = match tier {
583                FoyerHitTier::Ram => Op::LoadFoyerRam,
584                FoyerHitTier::Ssd => Op::LoadFoyerSsd,
585            };
586            metrics().observe(op, elapsed_us, bytes_len);
587            emit_tier_event(op.as_str(), tier.as_str(), key, bytes_len, elapsed_us);
588            // Foyer hit but flat missed (file evicted/never written). Repair
589            // the flat tier for next time.
590            crate::kv_blob_cache::KvBlobCache::put(self.flat.as_ref(), &cache_key, payload.clone());
591            emit_timing(
592                "WombatKVKvStore.get_kv",
593                match tier {
594                    FoyerHitTier::Ram => "foyer_ram_hit",
595                    FoyerHitTier::Ssd => "foyer_ssd_hit",
596                },
597                &[
598                    ("cache_key_us", t_cache_key),
599                    ("flat_miss_us", t_flat_done - t_cache_key),
600                    ("foyer_call_us", t_foyer_done - t_flat_done),
601                    ("total_us", elapsed_us),
602                    ("payload_bytes", bytes_len),
603                ],
604            );
605            return Ok(GetOutcome::Hit { tier: HitTier::Foyer, payload });
606        }
607
608        let object_key = self.object_key(namespace, key);
609        let t_obj_key = started.elapsed().as_micros() as u64;
610        match self.object_store.get_object(&object_key) {
611            Ok(payload) => {
612                let t_s3_done = started.elapsed().as_micros() as u64;
613                // `bytes_len` is now recomputed per-branch: the manifest
614                // Transparent decompression at the wire boundary. If the
615                // blob carries the `WBZ1` envelope, decode into an owned
616                // Vec; otherwise pass `payload` through unchanged (the
617                // `Cow::Borrowed` branch). The cache tiers always see
618                // uncompressed bytes.
619                let on_wire_bytes = payload.len() as u64;
620                let bytes = match decode_if_compressed(&payload) {
621                    std::borrow::Cow::Borrowed(_) => Bytes::from(payload),
622                    std::borrow::Cow::Owned(decoded) => Bytes::from(decoded),
623                };
624                let uncompressed_bytes_len = bytes.len() as u64;
625                let t_from_vec = started.elapsed().as_micros() as u64;
626                // Populate both tiers so future reads hit flat first.
627                crate::kv_blob_cache::KvBlobCache::put(
628                    self.flat.as_ref(),
629                    &cache_key,
630                    bytes.clone(),
631                );
632                self.foyer.put(&cache_key, bytes.clone());
633                let t_foyer_put = started.elapsed().as_micros() as u64;
634                let elapsed_us = u64::try_from(started.elapsed().as_micros()).unwrap_or(u64::MAX);
635                metrics().observe(Op::LoadS3, elapsed_us, uncompressed_bytes_len);
636                emit_tier_event("load_s3", "s3", key, uncompressed_bytes_len, elapsed_us);
637                emit_timing(
638                    "WombatKVKvStore.get_kv",
639                    "s3_hit",
640                    &[
641                        ("flat_miss_us", t_flat_done - t_cache_key),
642                        ("foyer_miss_us", t_foyer_done - t_flat_done),
643                        ("obj_key_us", t_obj_key - t_foyer_done),
644                        ("s3_get_us", t_s3_done - t_obj_key),
645                        ("bytes_wrap_us", t_from_vec - t_s3_done),
646                        ("cache_put_us", t_foyer_put - t_from_vec),
647                        ("total_us", elapsed_us),
648                        ("payload_bytes", uncompressed_bytes_len),
649                        ("on_wire_bytes", on_wire_bytes),
650                    ],
651                );
652                Ok(GetOutcome::Hit { tier: HitTier::ObjectStore, payload: bytes })
653            }
654            Err(WalStoreError::ObjectNotFound(_)) => {
655                let elapsed_us = u64::try_from(started.elapsed().as_micros()).unwrap_or(u64::MAX);
656                metrics().observe(Op::Miss, elapsed_us, 0);
657                emit_tier_event("miss", "miss", key, 0, elapsed_us);
658                emit_timing("WombatKVKvStore.get_kv", "miss", &[("total_us", elapsed_us)]);
659                Ok(GetOutcome::Miss)
660            }
661            Err(other) => Err(EmbedError::ObjectStore(other)),
662        }
663    }
664
665    /// Check for a key without materializing the payload.
666    ///
667    /// This is intentionally separate from `get_kv`: vLLM's scheduler
668    /// calls `exists` while deciding whether a prefix can be loaded. If
669    /// `exists` falls through to a full GET, large KV payloads traverse the
670    /// daemon once during lookup and then again during the real load.
671    pub fn exists_kv(&self, namespace: &str, key: &str) -> Result<bool, EmbedError> {
672        let cache_key = self.cache_key(namespace, key);
673        if self.foyer.contains(&cache_key) {
674            return Ok(true);
675        }
676
677        let object_key = self.object_key(namespace, key);
678        self.object_store.head_object(&object_key).map_err(EmbedError::ObjectStore)
679    }
680
681    /// List keys for a namespace as their S3 object keys.
682    pub fn list_namespace(&self, namespace: &str) -> Result<Vec<String>, EmbedError> {
683        let prefix = if namespace.is_empty() {
684            format!("{}/", self.s3_prefix)
685        } else {
686            format!("{}/{}/", self.s3_prefix, namespace)
687        };
688        Ok(self.object_store.list_prefix(&prefix)?)
689    }
690
691    /// List keys for a namespace relative to that namespace.
692    pub fn list_kv_keys(&self, namespace: &str) -> Result<Vec<String>, EmbedError> {
693        let prefix = if namespace.is_empty() {
694            format!("{}/", self.s3_prefix)
695        } else {
696            format!("{}/{}/", self.s3_prefix, namespace)
697        };
698        let mut keys = Vec::new();
699        for object_key in self.object_store.list_prefix(&prefix)? {
700            if let Some(key) = object_key.strip_prefix(&prefix) {
701                keys.push(key.to_string());
702            }
703        }
704        Ok(keys)
705    }
706
707    /// Rehydrate foyer from S3. Useful at engine startup so the warm tier
708    /// is primed with whatever survived the previous process. Returns the
709    /// number of keys restored.
710    pub fn restore_from_s3(&self, namespace: &str) -> Result<usize, EmbedError> {
711        let started = Instant::now();
712        let object_keys = self.list_namespace(namespace)?;
713        let mut restored = 0_usize;
714        let mut bytes_total: u64 = 0;
715        for object_key in object_keys {
716            let payload = match self.object_store.get_object(&object_key) {
717                Ok(value) => value,
718                Err(WalStoreError::ObjectNotFound(_)) => continue,
719                Err(other) => return Err(EmbedError::ObjectStore(other)),
720            };
721            // Decompress at the wire boundary so foyer holds the
722            // uncompressed bytes the warm-read path expects. Skips a
723            // wasted memcpy on legacy uncompressed blobs via Cow.
724            let bytes = match decode_if_compressed(&payload) {
725                std::borrow::Cow::Borrowed(_) => Bytes::from(payload),
726                std::borrow::Cow::Owned(decoded) => Bytes::from(decoded),
727            };
728            bytes_total = bytes_total.saturating_add(bytes.len() as u64);
729            self.foyer.put(&object_key, bytes);
730            restored += 1;
731        }
732        let elapsed_us = u64::try_from(started.elapsed().as_micros()).unwrap_or(u64::MAX);
733        metrics().observe(Op::RestoreFromS3, elapsed_us, bytes_total);
734        Ok(restored)
735    }
736
737    /// Delete one block from the object store (and best-effort from the
738    /// flat tier). Used by the LRU eviction worker (RFC 0009 §4); not
739    /// called on the hot path.
740    ///
741    /// Returns true iff the object store reported a delete. Foyer is
742    /// left untouched: `foyer::HybridCache` does not expose a single-
743    /// key remove on its public API in the version we pin; the bytes
744    /// will age out naturally as new inserts evict them. The metadata
745    /// index (the authority for the per-namespace byte budget) is
746    /// updated separately by the worker so the budget accounting stays
747    /// correct even while foyer still holds the bytes briefly.
748    pub fn delete_kv(&self, namespace: &str, key: &str) -> Result<bool, EmbedError> {
749        let cache_key = self.cache_key(namespace, key);
750        let object_key = self.object_key(namespace, key);
751
752        // Best-effort flat-tier delete first. If a block leaves the
753        // metadata index but remains in flat, the next `get_kv` would
754        // still return it (which would defeat eviction).
755        let _ = crate::kv_blob_cache::KvBlobCache::remove(self.flat.as_ref(), &cache_key);
756
757        let deleted = self.object_store.delete_object(&object_key)?;
758        Ok(deleted)
759    }
760
761    /// Drop foyer state. Object-store data is unaffected.
762    pub fn clear_foyer(&self) {
763        self.foyer.clear();
764    }
765
766    /// Drop flat-file blob-cache state. Object-store data is unaffected.
767    ///
768    /// The flat tier sits in front of foyer on the get path (see commit
769    /// `2ca65cb`). Tests that want to exercise the object-store fallback
770    /// must clear both tiers, otherwise the flat hit short-circuits before
771    /// foyer is consulted. Use [`Self::clear_foyer`] for the foyer tier.
772    pub fn clear_flat_cache(&self) {
773        crate::kv_blob_cache::KvBlobCache::clear(self.flat.as_ref());
774    }
775
776    /// Borrow the underlying foyer cache (e.g. for stats or sharing).
777    #[must_use]
778    pub fn foyer(&self) -> &Arc<FoyerHybridCache> {
779        &self.foyer
780    }
781
782    /// Borrow the underlying object store (e.g. for direct list/delete).
783    #[must_use]
784    pub fn object_store(&self) -> &S {
785        &self.object_store
786    }
787
788    /// Spawn the background block-prefetch worker (RFC 0008 §6).
789    ///
790    /// The worker periodically scores the metadata index per the
791    /// recency / chain-head / model-affinity heuristic and (v2) issues
792    /// `get_kv` GETs for the top-K candidates, materializing the
793    /// payloads into the local flat tier so subsequent requests hit
794    /// warm. See [`crate::block_prefetch`] for the heuristic.
795    ///
796    /// Behavior is selected at construction time by the
797    /// `WMBT_KV_PREFETCH_DRY_RUN=1` env: when set, the worker scores
798    /// and logs only (the v1 escape hatch). Default is v2.
799    ///
800    /// Holds an `Arc` to self via the `PrefetchFetcher` impl, so the
801    /// worker can issue GETs. Dropping the returned worker signals
802    /// stop and joins the thread.
803    /// Spawn the background LRU eviction worker (RFC 0009 §4).
804    ///
805    /// The worker periodically scans the in-memory metadata index for
806    /// the configured namespace, sums `payload_bytes`, and when the
807    /// sum exceeds `LruConfig::namespace_max_bytes`, evicts the
808    /// oldest entries (by `last_access_ns`) until the budget has a
809    /// 10% headroom.
810    ///
811    /// Caller is responsible for handing in the optional `SlateDB`
812    /// index so the L1 persistence is kept in sync. If `None`, only
813    /// the L0 in-memory index and the object store are touched.
814    ///
815    /// Dropping the returned worker signals stop and joins the thread.
816    #[must_use]
817    pub fn start_eviction_worker(
818        self: &Arc<Self>,
819        config: crate::lru::LruConfig,
820        slatedb: Option<Arc<wombatkv_radix::SlateDbMetadataIndex>>,
821    ) -> crate::lru::LruEvictionWorker {
822        let deleter: Arc<dyn crate::lru::EvictionDeleter> =
823            Arc::new(KvStoreEvictionDeleter::new(self.clone()));
824        let emit = crate::lru::default_emit(config.namespace.clone());
825        crate::lru::spawn_worker(self.metadata_index.clone(), slatedb, deleter, config, emit)
826    }
827
828    #[must_use]
829    pub fn start_prefetcher(
830        self: &Arc<Self>,
831        config: crate::block_prefetch::PrefetchConfig,
832    ) -> crate::block_prefetch::PrefetchWorker {
833        let index: Arc<dyn wombatkv_radix::MetadataIndex> = self.metadata_index.clone();
834        if crate::block_prefetch::dry_run_enabled() {
835            eprintln!("wombatkv[prefetch]: WMBT_KV_PREFETCH_DRY_RUN=1 → v1 log-only path");
836            return crate::block_prefetch::spawn_worker(
837                index,
838                config,
839                crate::block_prefetch::default_emit(),
840            );
841        }
842        let fetcher: Arc<dyn crate::block_prefetch::PrefetchFetcher> =
843            Arc::new(KvStorePrefetchFetcher::new(self.clone()));
844        crate::block_prefetch::spawn_worker_v2(
845            index,
846            config,
847            fetcher,
848            crate::block_prefetch::default_v2_emit(),
849        )
850    }
851}
852
853/// `EvictionDeleter` adapter binding the LRU worker to
854/// `WombatKVKvStore<S>::delete_kv`. Holds an `Arc<WombatKVKvStore<S>>`
855/// so the worker's lifetime is independent of the FFI handle that
856/// built the store. The adapter is `S`-generic and we lift it back to
857/// `Arc<dyn EvictionDeleter>` at the call site.
858struct KvStoreEvictionDeleter<S: ObjectStore> {
859    store: Arc<WombatKVKvStore<S>>,
860}
861
862impl<S: ObjectStore> KvStoreEvictionDeleter<S> {
863    fn new(store: Arc<WombatKVKvStore<S>>) -> Self {
864        Self { store }
865    }
866}
867
868impl<S: ObjectStore> crate::lru::EvictionDeleter for KvStoreEvictionDeleter<S> {
869    fn delete_block(&self, namespace: &str, key: &str) -> Result<bool, String> {
870        self.store.delete_kv(namespace, key).map_err(|err| format!("{err}"))
871    }
872}
873
874/// `PrefetchFetcher` adapter binding the algorithm-crate worker to
875/// `WombatKVKvStore<S>::get_kv`. Holds an `Arc<WombatKVKvStore<S>>` so
876/// the worker's lifetime is independent of the FFI handle that built
877/// the store. The adapter is `S`-generic and we lift it back to
878/// `Arc<dyn PrefetchFetcher>` at the call site.
879struct KvStorePrefetchFetcher<S: ObjectStore> {
880    store: Arc<WombatKVKvStore<S>>,
881}
882
883impl<S: ObjectStore> KvStorePrefetchFetcher<S> {
884    fn new(store: Arc<WombatKVKvStore<S>>) -> Self {
885        Self { store }
886    }
887}
888
889impl<S: ObjectStore> crate::block_prefetch::PrefetchFetcher for KvStorePrefetchFetcher<S> {
890    fn contains_flat(&self, namespace: &str, key: &str) -> bool {
891        let cache_key = self.store.cache_key(namespace, key);
892        crate::kv_blob_cache::KvBlobCache::contains(self.store.flat.as_ref(), &cache_key)
893    }
894
895    fn fetch_block(&self, namespace: &str, key: &str) -> Result<Option<u64>, String> {
896        match self.store.get_kv(namespace, key) {
897            Ok(GetOutcome::Hit { payload, .. }) => Ok(Some(payload.len() as u64)),
898            Ok(GetOutcome::Miss) => Ok(None),
899            Err(err) => Err(format!("{err}")),
900        }
901    }
902}
903
904/// Emit a single-line `[MyelonInstr]` JSON event attributing one load
905/// to a specific tier. Off by default to keep production logs quiet;
906/// opt in with `WMBT_KV_TIER_EVENTS=1` (or `=stderr`).
907///
908/// Format mirrors the existing aggregate metrics envelope so downstream
909/// log parsers don't need a second schema:
910///
911///   [`MyelonInstr`] {"`scope":"wombatkv_tier","op":"load_foyer_ssd`",
912///                  "`tier":"ssd","key_hash":"...","bytes":4697949641`,
913///                  "`elapsed_us":340512`}
914///
915/// `key_hash` is the first 16 hex chars of `key` to avoid leaking full
916/// content-addressed digests into log aggregation systems while still
917/// letting operators correlate adjacent events for the same blob.
918fn emit_tier_event(op: &str, tier: &str, key: &str, bytes: u64, elapsed_us: u64) {
919    if !tier_events_enabled() {
920        return;
921    }
922    let key_hash: String = key.chars().take(16).collect();
923    eprintln!(
924        "[MyelonInstr] {{\"scope\":\"wombatkv_tier\",\"op\":\"{op}\",\"tier\":\"{tier}\",\"key_hash\":\"{key_hash}\",\"bytes\":{bytes},\"elapsed_us\":{elapsed_us}}}"
925    );
926}
927
928/// Fine-grained per-stage timing log for the _debug branch. Gated by
929/// `WMBT_KV_TIMING=1`. Format:
930///
931///   [`MyelonInstr`] {"`scope":"wmbt_kv_timing","fn":"foyer_get_with_tier`",
932///                  "`path":"ram_hit","stages":{"ram_probe_us":4`,...}}
933///
934/// `stages` is a flat (name, u64) list rendered as JSON object. Lets us
935/// attribute the warm overhead to specific lines, not just tier-level
936/// aggregates.
937/// Encode `payload` for object-store transit with the optional
938/// `WBZ1` envelope. Returns the raw payload as a fresh `Vec<u8>` when
939/// compression is disabled, the cost is one memcpy, same as we already
940/// paid for the previous put-path `payload.clone()` into S3.
941///
942/// Emits a `wmbt_kv_compress` timing event when compression actually
943/// ran: stages include compress duration, `uncompressed_bytes`,
944/// `compressed_bytes`, and the ratio in basis points so downstream
945/// dashboards don't have to division on parse.
946fn encode_for_storage(
947    payload: &[u8],
948    cfg: BlockCompressionConfig,
949    key: &str,
950    func: &'static str,
951) -> Result<Vec<u8>, EmbedError> {
952    if !cfg.is_enabled() {
953        // Fast path: caller's bytes verbatim. One copy; the S3 SDK
954        // requires an owned buffer anyway.
955        return Ok(payload.to_vec());
956    }
957    let started = Instant::now();
958    let encoded = encode_with_header(payload, cfg)
959        .map_err(|err| EmbedError::InvalidConfig(format!("compress {key}: {err}")))?;
960    let elapsed_us = u64::try_from(started.elapsed().as_micros()).unwrap_or(u64::MAX);
961    let uncompressed = payload.len() as u64;
962    let compressed = encoded.len() as u64;
963    let ratio_bps = compressed.saturating_mul(10_000).checked_div(uncompressed).unwrap_or(0);
964    if timing_enabled() {
965        // Hand-rolled emit so we can include the algo + level alongside
966        // the integer stages.
967        let algo = match cfg.algo {
968            CompressAlgo::Zstd => "zstd",
969            CompressAlgo::Lz4 => "lz4",
970            CompressAlgo::None => "none",
971        };
972        eprintln!(
973            "[MyelonInstr] {{\"scope\":\"wmbt_kv_compress\",\"fn\":\"{func}\",\
974             \"algo\":\"{algo}\",\"level\":{level},\"stages\":{{\
975             \"compress_us\":{elapsed_us},\
976             \"uncompressed_bytes\":{uncompressed},\
977             \"compressed_bytes\":{compressed},\
978             \"ratio_bps\":{ratio_bps}}}}}",
979            level = cfg.level,
980        );
981    }
982    Ok(encoded)
983}
984
985pub fn emit_timing(func: &str, path: &str, stages: &[(&str, u64)]) {
986    // ALWAYS record into the global histogram registry -
987    // tags are `<func>:<path>:<stage>` for fine-grained per-step
988    // percentile tracking. The eprintln below is the verbose
989    // single-event log gated by WMBT_KV_TIMING; histogram
990    // aggregation runs even when verbose logging is off so the
991    // tail-latency surface is always available via
992    // `latency_histogram::snapshot_all()` or the periodic dumper.
993    for (name, us) in stages {
994        let tag = format!("{func}:{path}:{name}");
995        crate::latency_histogram::record_global(&tag, *us);
996    }
997
998    if !timing_enabled() {
999        return;
1000    }
1001    let mut buf = String::with_capacity(160);
1002    buf.push_str("[MyelonInstr] {\"scope\":\"wmbt_kv_timing\",\"fn\":\"");
1003    buf.push_str(func);
1004    buf.push_str("\",\"path\":\"");
1005    buf.push_str(path);
1006    buf.push_str("\",\"stages\":{");
1007    for (i, (name, us)) in stages.iter().enumerate() {
1008        if i > 0 {
1009            buf.push(',');
1010        }
1011        buf.push('"');
1012        buf.push_str(name);
1013        buf.push_str("\":");
1014        buf.push_str(&us.to_string());
1015    }
1016    buf.push_str("}}");
1017    eprintln!("{buf}");
1018}
1019
1020fn timing_enabled() -> bool {
1021    static ENABLED: once_cell::sync::OnceCell<bool> = once_cell::sync::OnceCell::new();
1022    *ENABLED.get_or_init(|| {
1023        std::env::var("WMBT_KV_TIMING")
1024            .is_ok_and(|v| matches!(v.as_str(), "1" | "true" | "TRUE" | "stderr"))
1025    })
1026}
1027
1028fn tier_events_enabled() -> bool {
1029    static ENABLED: once_cell::sync::OnceCell<bool> = once_cell::sync::OnceCell::new();
1030    *ENABLED.get_or_init(|| {
1031        std::env::var("WMBT_KV_TIER_EVENTS")
1032            .is_ok_and(|v| matches!(v.as_str(), "1" | "true" | "TRUE" | "stderr"))
1033    })
1034}
1035
1036/// Parse a 64-char lowercase hex string into a 32-byte hash. Returns
1037/// `false` on any non-hex char or wrong length. Used by
1038/// `bootstrap_world_knowledge` to extract block hashes directly
1039/// from the S3 list output without round-tripping through GET.
1040fn decode_hex32(hex: &str, out: &mut [u8; 32]) -> bool {
1041    if hex.len() != 64 {
1042        return false;
1043    }
1044    let bytes = hex.as_bytes();
1045    for i in 0..32 {
1046        let hi = match bytes[2 * i] {
1047            b'0'..=b'9' => bytes[2 * i] - b'0',
1048            b'a'..=b'f' => bytes[2 * i] - b'a' + 10,
1049            b'A'..=b'F' => bytes[2 * i] - b'A' + 10,
1050            _ => return false,
1051        };
1052        let lo = match bytes[2 * i + 1] {
1053            b'0'..=b'9' => bytes[2 * i + 1] - b'0',
1054            b'a'..=b'f' => bytes[2 * i + 1] - b'a' + 10,
1055            b'A'..=b'F' => bytes[2 * i + 1] - b'A' + 10,
1056            _ => return false,
1057        };
1058        out[i] = (hi << 4) | lo;
1059    }
1060    true
1061}
1062
1063#[cfg(test)]
1064mod tests {
1065    use super::{EmbedConfig, GetOutcome, HitTier, WombatKVKvStore};
1066    use crate::compression::BlockCompressionConfig;
1067    use crate::foyer_cache::FoyerCacheConfig;
1068    use bytes::Bytes;
1069    use tempfile::tempdir;
1070    use wombatkv_store::wal_store::InMemoryObjectStore;
1071
1072    fn small_foyer(dir: std::path::PathBuf) -> FoyerCacheConfig {
1073        FoyerCacheConfig {
1074            ram_bytes: 8 * 1024 * 1024,
1075            ssd_dir: dir,
1076            ssd_bytes: 32 * 1024 * 1024,
1077            block_size: 1024 * 1024,
1078            buffer_pool_size: 4 * 1024 * 1024,
1079            iouring: false,
1080        }
1081    }
1082
1083    fn build_store(dir: std::path::PathBuf) -> WombatKVKvStore<InMemoryObjectStore> {
1084        let cfg = EmbedConfig {
1085            s3_prefix: "test/kv".to_string(),
1086            foyer: small_foyer(dir),
1087            write_through_s3: true,
1088            compression: BlockCompressionConfig::default(),
1089        };
1090        WombatKVKvStore::new(cfg, InMemoryObjectStore::default()).expect("build store")
1091    }
1092
1093    fn build_store_with_compression(
1094        dir: std::path::PathBuf,
1095    ) -> WombatKVKvStore<InMemoryObjectStore> {
1096        let cfg = EmbedConfig {
1097            s3_prefix: "test/kv".to_string(),
1098            foyer: small_foyer(dir),
1099            write_through_s3: true,
1100            compression: BlockCompressionConfig {
1101                algo: crate::compression::CompressAlgo::Zstd,
1102                level: 3,
1103            },
1104        };
1105        WombatKVKvStore::new(cfg, InMemoryObjectStore::default()).expect("build store")
1106    }
1107
1108    /// End-to-end: when compression is on, the S3 object on the wire
1109    /// carries the `WBZ1` envelope; `get_kv` still hands back the
1110    /// uncompressed bytes; cache hits stay uncompressed too.
1111    #[test]
1112    fn compressed_round_trip_through_object_store() {
1113        let dir = tempdir().expect("tempdir");
1114        let store = build_store_with_compression(dir.path().to_path_buf());
1115
1116        // Highly compressible payload: 64 KiB of zeros + a small trailer.
1117        let mut payload = vec![0_u8; 64 * 1024];
1118        payload.extend_from_slice(b"trailer bytes");
1119        let bytes = Bytes::from(payload.clone());
1120
1121        store.put_kv("ns", "k1", bytes.clone()).expect("put");
1122
1123        // The actual S3 object carries the compression envelope.
1124        let object_key = store.object_key("ns", "k1");
1125        let on_wire = store.object_store().get_object(&object_key).expect("s3 get");
1126        assert!(crate::compression::has_magic(&on_wire), "expected WBZ1 magic");
1127        assert!(
1128            on_wire.len() < payload.len(),
1129            "compressed payload should be smaller than the original; got {} vs {}",
1130            on_wire.len(),
1131            payload.len()
1132        );
1133
1134        // Clearing the local caches forces get_kv to traverse the S3
1135        // path, exercises the decode-on-read branch.
1136        store.clear_flat_cache();
1137        store.clear_foyer();
1138
1139        match store.get_kv("ns", "k1").expect("get") {
1140            GetOutcome::Hit { tier, payload: got } => {
1141                assert!(matches!(tier, HitTier::ObjectStore));
1142                assert_eq!(got.as_ref(), payload.as_slice());
1143            }
1144            GetOutcome::Miss => panic!("expected hit"),
1145        }
1146    }
1147
1148    /// Mixed-state bucket: an uncompressed legacy blob and a freshly
1149    /// compressed blob written by the same compressed-mode store are
1150    /// both readable.
1151    #[test]
1152    fn mixed_compressed_and_legacy_blobs_both_readable() {
1153        let dir = tempdir().expect("tempdir");
1154        let store = build_store_with_compression(dir.path().to_path_buf());
1155
1156        // Inject a legacy uncompressed blob directly into the object store.
1157        let legacy_payload = b"legacy uncompressed payload".to_vec();
1158        let legacy_key = store.object_key("ns", "legacy");
1159        store.object_store().put_object(&legacy_key, &legacy_payload).expect("legacy put");
1160
1161        // Write a new blob via the compressed put path.
1162        let fresh_payload = vec![7_u8; 32 * 1024];
1163        store.put_kv("ns", "fresh", Bytes::from(fresh_payload.clone())).expect("fresh put");
1164
1165        // Drop caches so both reads have to touch S3.
1166        store.clear_flat_cache();
1167        store.clear_foyer();
1168
1169        match store.get_kv("ns", "legacy").expect("get legacy") {
1170            GetOutcome::Hit { payload, .. } => {
1171                assert_eq!(payload.as_ref(), legacy_payload.as_slice());
1172            }
1173            GetOutcome::Miss => panic!("legacy miss"),
1174        }
1175        match store.get_kv("ns", "fresh").expect("get fresh") {
1176            GetOutcome::Hit { payload, .. } => {
1177                assert_eq!(payload.as_ref(), fresh_payload.as_slice());
1178            }
1179            GetOutcome::Miss => panic!("fresh miss"),
1180        }
1181    }
1182
1183    #[test]
1184    fn put_get_round_trip_serves_from_foyer_warm_path() {
1185        let dir = tempdir().expect("tempdir");
1186        let store = build_store(dir.path().to_path_buf());
1187
1188        let payload = Bytes::from_static(b"qwen3-pd-payload");
1189        store.put_kv("ns-a", "seq-1", payload.clone()).expect("put");
1190
1191        match store.get_kv("ns-a", "seq-1").expect("get") {
1192            GetOutcome::Hit { tier, payload: got } => {
1193                assert!(matches!(tier, HitTier::Foyer));
1194                assert_eq!(got, payload);
1195            }
1196            GetOutcome::Miss => panic!("expected hit"),
1197        }
1198    }
1199
1200    #[test]
1201    fn write_through_s3_persists_to_object_store() {
1202        let dir = tempdir().expect("tempdir");
1203        let store = build_store(dir.path().to_path_buf());
1204
1205        let payload = Bytes::from_static(b"qwen3-prefill-bytes");
1206        store.put_kv("ns-a", "seq-7", payload.clone()).expect("put");
1207
1208        let object_key = store.object_key("ns-a", "seq-7");
1209        let raw = store.object_store().get_object(&object_key).expect("s3 get");
1210        // S3 holds the encoded form (zstd by default); round-trip-decode to
1211        // assert payload equality regardless of compression status.
1212        let decoded = crate::compression::decode_if_compressed(&raw);
1213        assert_eq!(&*decoded, payload.as_ref());
1214    }
1215
1216    #[test]
1217    fn s3_fallback_serves_value_when_foyer_was_cleared() {
1218        let dir = tempdir().expect("tempdir");
1219        let store = build_store(dir.path().to_path_buf());
1220
1221        let payload = Bytes::from_static(b"survives-foyer-clear");
1222        store.put_kv("ns-a", "seq-9", payload.clone()).expect("put");
1223        // Flat sits in front of foyer on the get path (commit 2ca65cb);
1224        // both tiers must be dropped to exercise the S3 fallback.
1225        store.clear_foyer();
1226        store.clear_flat_cache();
1227
1228        match store.get_kv("ns-a", "seq-9").expect("get") {
1229            GetOutcome::Hit { tier, payload: got } => {
1230                assert!(matches!(tier, HitTier::ObjectStore));
1231                assert_eq!(got, payload);
1232            }
1233            GetOutcome::Miss => panic!("expected S3 fallback hit"),
1234        }
1235
1236        // Subsequent get should be a foyer/flat hit because the previous
1237        // call promoted the value back into the warm tiers (HitTier::Foyer
1238        // covers both flat and foyer hits in the current API).
1239        match store.get_kv("ns-a", "seq-9").expect("get-2") {
1240            GetOutcome::Hit { tier, .. } => assert!(matches!(tier, HitTier::Foyer)),
1241            GetOutcome::Miss => panic!("expected foyer promotion"),
1242        }
1243    }
1244
1245    #[test]
1246    fn restart_pattern_rebuilds_foyer_from_s3_only() {
1247        let dir = tempdir().expect("tempdir");
1248        let cfg_a = EmbedConfig {
1249            s3_prefix: "test/kv".to_string(),
1250            foyer: small_foyer(dir.path().join("a")),
1251            write_through_s3: true,
1252            compression: BlockCompressionConfig::default(),
1253        };
1254        let object_store = InMemoryObjectStore::default();
1255        let store_a = WombatKVKvStore::new(cfg_a, object_store.clone()).expect("a");
1256        for idx in 0..6_u32 {
1257            let key = format!("seq-{idx}");
1258            store_a.put_kv("ns", &key, Bytes::from(vec![idx as u8; 1024])).expect("put");
1259        }
1260        drop(store_a); // simulate process crash; foyer state is gone
1261
1262        // Fresh process: same object store handle, new foyer dir.
1263        let cfg_b = EmbedConfig {
1264            s3_prefix: "test/kv".to_string(),
1265            foyer: small_foyer(dir.path().join("b")),
1266            write_through_s3: true,
1267            compression: BlockCompressionConfig::default(),
1268        };
1269        let store_b = WombatKVKvStore::new(cfg_b, object_store).expect("b");
1270
1271        let restored = store_b.restore_from_s3("ns").expect("restore");
1272        assert_eq!(restored, 6);
1273
1274        for idx in 0..6_u32 {
1275            let key = format!("seq-{idx}");
1276            match store_b.get_kv("ns", &key).expect("get") {
1277                GetOutcome::Hit { tier, payload } => {
1278                    assert!(matches!(tier, HitTier::Foyer));
1279                    assert_eq!(payload.as_ref(), vec![idx as u8; 1024].as_slice());
1280                }
1281                GetOutcome::Miss => panic!("expected foyer hit after restore"),
1282            }
1283        }
1284    }
1285
1286    #[test]
1287    fn miss_returns_miss_without_falling_through_on_unknown_key() {
1288        let dir = tempdir().expect("tempdir");
1289        let store = build_store(dir.path().to_path_buf());
1290
1291        assert!(matches!(store.get_kv("ns", "missing").expect("get"), GetOutcome::Miss));
1292    }
1293
1294    #[test]
1295    fn exists_kv_checks_foyer_and_object_store_without_loading_payload() {
1296        let dir = tempdir().expect("tempdir");
1297        let store = build_store(dir.path().to_path_buf());
1298
1299        assert!(!store.exists_kv("ns", "missing").expect("missing exists"));
1300
1301        store.put_kv("ns", "present", Bytes::from_static(b"exists-payload")).expect("put");
1302        assert!(store.exists_kv("ns", "present").expect("foyer exists"));
1303
1304        store.clear_foyer();
1305        assert!(store.exists_kv("ns", "present").expect("object store exists"));
1306    }
1307
1308    #[test]
1309    fn list_namespace_returns_only_prefix_matching_keys() {
1310        let dir = tempdir().expect("tempdir");
1311        let store = build_store(dir.path().to_path_buf());
1312
1313        store.put_kv("ns-a", "k1", Bytes::from_static(b"a1")).expect("a1");
1314        store.put_kv("ns-a", "k2", Bytes::from_static(b"a2")).expect("a2");
1315        store.put_kv("ns-b", "k1", Bytes::from_static(b"b1")).expect("b1");
1316
1317        let mut a_keys = store.list_namespace("ns-a").expect("list-a");
1318        let mut b_keys = store.list_namespace("ns-b").expect("list-b");
1319        a_keys.sort();
1320        b_keys.sort();
1321        assert_eq!(a_keys, vec!["test/kv/ns-a/k1", "test/kv/ns-a/k2"]);
1322        assert_eq!(b_keys, vec!["test/kv/ns-b/k1"]);
1323
1324        let mut a_relative = store.list_kv_keys("ns-a").expect("relative-a");
1325        a_relative.sort();
1326        assert_eq!(a_relative, vec!["k1", "k2"]);
1327    }
1328
1329    #[test]
1330    fn empty_s3_prefix_is_rejected() {
1331        let dir = tempdir().expect("tempdir");
1332        let cfg = EmbedConfig {
1333            s3_prefix: String::new(),
1334            foyer: small_foyer(dir.path().to_path_buf()),
1335            write_through_s3: true,
1336            compression: BlockCompressionConfig::default(),
1337        };
1338        let result = WombatKVKvStore::new(cfg, InMemoryObjectStore::default());
1339        assert!(result.is_err());
1340    }
1341
1342    fn mk_meta_for_test(seq: u32, parent: wombatkv_radix::BlockHash) -> wombatkv_radix::BlockMeta {
1343        wombatkv_radix::BlockMeta::new_successor(
1344            parent,
1345            seq,
1346            1024,
1347            [42u8; 24],
1348            *b"test-v1\0\0\0\0\0\0\0\0\0",
1349        )
1350    }
1351
1352    #[test]
1353    fn bootstrap_from_slatedb_roundtrip() {
1354        use wombatkv_radix::{BlockMeta, MetadataIndex, SlateDbMetadataIndex};
1355
1356        let dir = tempdir().expect("tempdir");
1357        let slatedb_root = dir.path().join("slatedb-root");
1358        let store = build_store(dir.path().join("kv-store"));
1359
1360        let slatedb_index =
1361            SlateDbMetadataIndex::open_local(&slatedb_root, "node-bootstrap-rt", "tenant-a")
1362                .expect("open slatedb");
1363
1364        // Seed 10 entries forming a chain: h0 is root, h_i has parent h_{i-1}.
1365        let mut hashes: Vec<[u8; 32]> = Vec::with_capacity(10);
1366        for i in 0..10u8 {
1367            hashes.push([i + 1; 32]);
1368        }
1369        let mut parent = BlockMeta::ZERO_HASH;
1370        for (i, h) in hashes.iter().enumerate() {
1371            slatedb_index.insert(*h, mk_meta_for_test(i as u32, parent));
1372            parent = *h;
1373        }
1374        assert_eq!(slatedb_index.len(), 10);
1375
1376        let count = store.bootstrap_from_slatedb(&slatedb_index).expect("bootstrap_from_slatedb");
1377        assert_eq!(count, 10);
1378
1379        let in_mem = store.metadata_index();
1380        assert_eq!(in_mem.len(), 10);
1381        // Spot-check a couple of entries copied correctly.
1382        let got = in_mem.get(&hashes[0]).expect("h0 must be in RAM index");
1383        assert_eq!(got.block_seq, 0);
1384        let got_last = in_mem.get(&hashes[9]).expect("h9 must be in RAM index");
1385        assert_eq!(got_last.block_seq, 9);
1386        assert_eq!(got_last.parent_hash, hashes[8]);
1387    }
1388
1389    #[test]
1390    fn bootstrap_from_slatedb_empty() {
1391        use wombatkv_radix::{MetadataIndex, SlateDbMetadataIndex};
1392
1393        let dir = tempdir().expect("tempdir");
1394        let slatedb_root = dir.path().join("slatedb-root");
1395        let store = build_store(dir.path().join("kv-store"));
1396
1397        let slatedb_index =
1398            SlateDbMetadataIndex::open_local(&slatedb_root, "node-bootstrap-empty", "tenant-a")
1399                .expect("open slatedb");
1400
1401        let count =
1402            store.bootstrap_from_slatedb(&slatedb_index).expect("bootstrap_from_slatedb on empty");
1403        assert_eq!(count, 0);
1404        assert_eq!(store.metadata_index().len(), 0);
1405    }
1406
1407    /// Simulates the daemon's metadata-persistence contract:
1408    /// `put_kv_blocks` writes through to both the in-memory index and
1409    /// `SlateDB`. Across a "daemon restart" (close + reopen at the same
1410    /// `SlateDB` path), the new process's `bootstrap_from_slatedb` hydrates
1411    /// the in-memory index so `longest_prefix` matches the previously-
1412    /// saved chain.
1413    ///
1414    /// This is the unit-test equivalent of "daemon starts → client
1415    /// `put_kv_blocks` → daemon restarts → client `lookup_block_prefix`
1416    /// finds the chain": we exercise the same code paths the daemon
1417    /// binary uses (`SlateDbMetadataIndex::insert` per hash + a fresh
1418    /// store + `bootstrap_from_slatedb` on the next startup) without
1419    /// spinning up SHM rings or a child process.
1420    #[test]
1421    fn daemon_slatedb_writethrough_survives_restart() {
1422        use wombatkv_radix::{BlockMeta, MetadataIndex, SlateDbMetadataIndex};
1423
1424        let dir = tempdir().expect("tempdir");
1425        let slatedb_root = dir.path().join("slatedb-root");
1426        let node_id = "node-restart";
1427        let ns = "tenant-a";
1428
1429        // Chain of three blocks: h0 root, h1 child of h0, h2 child of h1.
1430        let h0 = [21u8; 32];
1431        let h1 = [22u8; 32];
1432        let h2 = [23u8; 32];
1433
1434        // ---- "First daemon process" ----
1435        {
1436            let store = build_store(dir.path().join("kv-store-1"));
1437            let slatedb_index = SlateDbMetadataIndex::open_local(&slatedb_root, node_id, ns)
1438                .expect("open slatedb #1");
1439
1440            // Optional but recommended: bootstrap on first run too, must
1441            // be empty.
1442            let n0 = store.bootstrap_from_slatedb(&slatedb_index).expect("bootstrap #1");
1443            assert_eq!(n0, 0);
1444
1445            // Mirrors `dispatch_put_kv_blocks_batch`: insert each hash
1446            // into BOTH the in-memory index AND the SlateDB index.
1447            let ram = store.metadata_index();
1448            let m0 = BlockMeta::new_root(1024, [0u8; 24], [0u8; 16]);
1449            let m1 = BlockMeta::new_successor(h0, 1, 1024, [0u8; 24], [0u8; 16]);
1450            let m2 = BlockMeta::new_successor(h1, 2, 1024, [0u8; 24], [0u8; 16]);
1451            ram.insert(h0, m0);
1452            slatedb_index.insert(h0, m0);
1453            ram.insert(h1, m1);
1454            slatedb_index.insert(h1, m1);
1455            ram.insert(h2, m2);
1456            slatedb_index.insert(h2, m2);
1457
1458            // Pre-restart sanity: chain reports as fully present.
1459            assert_eq!(ram.longest_prefix(&[h0, h1, h2]), 3);
1460            assert_eq!(slatedb_index.len(), 3);
1461
1462            // Close SlateDB cleanly so the WAL is flushed to disk.
1463            // This is what the daemon's Drop path SHOULD do at shutdown;
1464            // here we do it explicitly to model the next-process read.
1465            slatedb_index.close().expect("close slatedb #1");
1466            // store is dropped at end of scope, its in-memory index is
1467            // lost. This is the bug the persistence fix solves.
1468        }
1469
1470        // ---- "Second daemon process" (post-restart) ----
1471        let store = build_store(dir.path().join("kv-store-2"));
1472        let slatedb_index = SlateDbMetadataIndex::open_local(&slatedb_root, node_id, ns)
1473            .expect("reopen slatedb #2");
1474
1475        // The bootstrap_from_slatedb hydrate is what restores the
1476        // metadata that the prior `put_kv_blocks` write-through wrote.
1477        let n = store.bootstrap_from_slatedb(&slatedb_index).expect("bootstrap #2");
1478        assert_eq!(n, 3, "second-process bootstrap must see the 3 prior writes");
1479
1480        // Without the write-through fix this would be 0, the prior
1481        // process's in-memory index dropped at exit. With the fix the
1482        // RAM index is rehydrated from SlateDB and the chain is intact.
1483        let ram = store.metadata_index();
1484        assert_eq!(ram.longest_prefix(&[h0, h1, h2]), 3);
1485        assert_eq!(ram.longest_prefix(&[h0]), 1);
1486        // Sequence / parent wiring also survives.
1487        let got2 = ram.get(&h2).expect("h2 must be present");
1488        assert_eq!(got2.block_seq, 2);
1489        assert_eq!(got2.parent_hash, h1);
1490    }
1491
1492    /// Regression: a Tier-B bucket with >1000 keys (the legacy S3 page
1493    /// size) must be indexed in full. Prior to the explicit "consume the
1494    /// paginated list to completion" contract, a single-page list call
1495    /// would silently drop everything beyond the first 1000 keys. We
1496    /// don't have a paginated `InMemoryObjectStore`, instead we rely on
1497    /// `S3ObjectStore::list_prefix` to exhaust pages via `rust-s3`'s
1498    /// `Bucket::list` (returning a `Vec<ListBucketResult>` of every
1499    /// page) and here we assert the embed-side bootstrap iterates EVERY
1500    /// returned key, so the call contract bottles up the entire prefix.
1501    #[test]
1502    fn bootstrap_world_knowledge_indexes_all_tier_b_keys_above_page_size() {
1503        use wombatkv_radix::MetadataIndex;
1504        let dir = tempdir().expect("tempdir");
1505        let store = build_store(dir.path().to_path_buf());
1506        // 2000 distinct block keys, twice the legacy S3 page
1507        // size. Each key is `test/kv/ns-pagetest/wombatkv/v1/block/b3=<hex>`.
1508        let object_store = store.object_store();
1509        let mut expected_hashes: Vec<[u8; 32]> = Vec::with_capacity(2000);
1510        for i in 0..2000u32 {
1511            let mut h = [0u8; 32];
1512            // Deterministic but non-trivial spread so insert() doesn't
1513            // see collisions on a tiny prefix.
1514            let bytes = i.to_be_bytes();
1515            h[0..4].copy_from_slice(&bytes);
1516            h[28..32].copy_from_slice(&bytes);
1517            expected_hashes.push(h);
1518            let hex = hex32_lower(&h);
1519            let key = format!("test/kv/ns-pagetest/wombatkv/v1/block/b3={hex}");
1520            object_store.put_object(&key, &[0u8; 4]).expect("put");
1521        }
1522        let loaded = store.bootstrap_world_knowledge("ns-pagetest").expect("bootstrap");
1523        assert_eq!(
1524            loaded, 2000,
1525            "bootstrap must index every block key returned by list_prefix, \
1526             not just the first page"
1527        );
1528        let idx = store.metadata_index();
1529        assert_eq!(idx.len(), 2000);
1530        // Spot-check a few hashes from different positions of the input.
1531        for probe in [0, 999, 1000, 1500, 1999] {
1532            assert!(
1533                idx.get(&expected_hashes[probe]).is_some(),
1534                "hash at offset {probe} missing from metadata index"
1535            );
1536        }
1537    }
1538
1539    fn hex32_lower(h: &[u8; 32]) -> String {
1540        let mut s = String::with_capacity(64);
1541        const HEX: &[u8; 16] = b"0123456789abcdef";
1542        for byte in h {
1543            s.push(HEX[(byte >> 4) as usize] as char);
1544            s.push(HEX[(byte & 0x0f) as usize] as char);
1545        }
1546        s
1547    }
1548
1549    #[test]
1550    fn bootstrap_from_slatedb_idempotent() {
1551        use wombatkv_radix::{BlockMeta, MetadataIndex, SlateDbMetadataIndex};
1552
1553        let dir = tempdir().expect("tempdir");
1554        let slatedb_root = dir.path().join("slatedb-root");
1555        let store = build_store(dir.path().join("kv-store"));
1556
1557        let slatedb_index =
1558            SlateDbMetadataIndex::open_local(&slatedb_root, "node-bootstrap-idem", "tenant-a")
1559                .expect("open slatedb");
1560
1561        for i in 0..5u8 {
1562            slatedb_index.insert([i + 1; 32], mk_meta_for_test(u32::from(i), BlockMeta::ZERO_HASH));
1563        }
1564
1565        let first = store.bootstrap_from_slatedb(&slatedb_index).expect("first bootstrap");
1566        assert_eq!(first, 5);
1567
1568        // Second call returns the same SlateDB row count. `bulk_load` skips
1569        // already-present hashes, so the RAM index does not grow.
1570        let second = store.bootstrap_from_slatedb(&slatedb_index).expect("second bootstrap");
1571        assert_eq!(second, first);
1572        assert_eq!(store.metadata_index().len(), 5);
1573    }
1574}