Skip to main content

wombatkv_radix/
lib.rs

1#![forbid(unsafe_code)]
2
3use std::collections::BTreeMap;
4use std::mem::size_of;
5use std::sync::atomic::AtomicUsize;
6
7pub mod block_meta;
8pub mod slatedb_meta;
9pub use block_meta::{
10    BlockHash, BlockMeta, InMemoryMetadataIndex, LayoutTag, MetadataIndex, ModelDigest,
11};
12pub use slatedb_meta::{SlateDbMetaError, SlateDbMetadataIndex};
13
14/// Returns crate identity for smoke tests.
15#[must_use]
16pub fn crate_id() -> &'static str {
17    "tp-radix"
18}
19
20/// M3 Max unified memory size used by default guardrail presets.
21pub const M3_MAX_UNIFIED_MEMORY_BYTES: u64 = 96_u64 * 1024 * 1024 * 1024;
22
23/// Relative S3 key prefix for content-addressed block payloads.
24/// The full S3 key is `<s3_prefix>/<namespace>/wombatkv/v1/block/b3=<hex>`.
25/// Single source of truth shared between `wombatkv-cabi` (write/read path)
26/// and `wombatkv-node::block_prefetch` (prefetch path). Both call sites
27/// MUST use this constant rather than redefining the prefix literal, any
28/// skew silently breaks the prefetch worker.
29pub const BLOCK_KEY_PREFIX: &str = "wombatkv/v1/block/b3=";
30
31/// Relative S3 key prefix for raw_tail sidecar payloads.
32/// The full S3 key is `<s3_prefix>/<namespace>/wombatkv/v1/sidecar/raw_tail/b3=<chain_tip_hex>`.
33pub const SIDECAR_RAW_TAIL_KEY_PREFIX: &str = "wombatkv/v1/sidecar/raw_tail/b3=";
34
35const DEFAULT_MAP_OVERHEAD_BYTES_PER_ENTRY: u64 = 64;
36
37/// Residency class for a cached block.
38#[derive(Clone, Copy, Debug, PartialEq, Eq)]
39pub enum Residency {
40    Ram,
41    Nvme,
42    RemoteOnly,
43}
44
45/// Handle returned to callers for resident blocks.
46#[derive(Clone, Debug, PartialEq, Eq)]
47pub struct BlockHandle {
48    pub key: [u8; 32],
49    pub block_index: u32,
50    pub residency: Residency,
51}
52
53/// Lookup output contract for the hot path.
54#[derive(Clone, Debug, PartialEq, Eq)]
55pub struct LookupResult {
56    pub matched_depth: u32,
57    pub resident_handles: Vec<BlockHandle>,
58    pub prefetch_keys: Vec<[u8; 32]>,
59}
60
61/// Reusable buffers for allocation-stable lookup execution.
62#[derive(Clone, Debug, Default, PartialEq, Eq)]
63pub struct LookupBuffers {
64    pub resident_handles: Vec<BlockHandle>,
65    pub prefetch_keys: Vec<[u8; 32]>,
66}
67
68impl LookupBuffers {
69    #[must_use]
70    pub fn with_capacity(capacity: usize) -> Self {
71        Self {
72            resident_handles: Vec::with_capacity(capacity),
73            prefetch_keys: Vec::with_capacity(capacity),
74        }
75    }
76
77    pub fn clear(&mut self) {
78        self.resident_handles.clear();
79        self.prefetch_keys.clear();
80    }
81
82    #[must_use]
83    pub fn combined_capacity(&self) -> usize {
84        self.resident_handles.capacity().saturating_add(self.prefetch_keys.capacity())
85    }
86}
87
88/// Small pool for recycling lookup buffers and reducing allocator churn.
89#[derive(Debug)]
90pub struct LookupBufferPool {
91    free: Vec<LookupBuffers>,
92    max_pool_size: usize,
93    high_watermark: usize,
94}
95
96impl Default for LookupBufferPool {
97    fn default() -> Self {
98        Self::new(32)
99    }
100}
101
102impl LookupBufferPool {
103    #[must_use]
104    pub fn new(max_pool_size: usize) -> Self {
105        Self { free: Vec::new(), max_pool_size: max_pool_size.max(1), high_watermark: 0 }
106    }
107
108    #[must_use]
109    pub fn acquire(&mut self) -> LookupBuffers {
110        self.free.pop().unwrap_or_default()
111    }
112
113    pub fn release(&mut self, mut buffers: LookupBuffers) {
114        buffers.clear();
115        if self.free.len() < self.max_pool_size {
116            self.free.push(buffers);
117            self.high_watermark = self.high_watermark.max(self.free.len());
118        }
119    }
120
121    #[must_use]
122    pub const fn high_watermark(&self) -> usize {
123        self.high_watermark
124    }
125
126    #[must_use]
127    pub fn available(&self) -> usize {
128        self.free.len()
129    }
130}
131
132/// Lightweight key encoder that avoids temporary allocations.
133#[derive(Clone, Debug, Default)]
134pub struct KeyEncoder {
135    scratch: [u8; 32],
136}
137
138impl KeyEncoder {
139    /// Encode one block key from fingerprint and block/depth coordinates.
140    #[must_use]
141    pub fn encode(&mut self, fingerprint: [u8; 16], block_index: u32, depth: u32) -> [u8; 32] {
142        self.scratch.fill(0);
143        self.scratch[..16].copy_from_slice(&fingerprint);
144        self.scratch[16..20].copy_from_slice(&block_index.to_le_bytes());
145        self.scratch[20..24].copy_from_slice(&depth.to_le_bytes());
146
147        for idx in 24..32 {
148            self.scratch[idx] = self.scratch[idx - 24] ^ self.scratch[idx - 8].rotate_left(1);
149        }
150        self.scratch
151    }
152
153    /// Encode a full sequence into a caller-provided vector, reusing capacity.
154    pub fn encode_chain(&mut self, fingerprint: [u8; 16], depths: &[u32], out: &mut Vec<[u8; 32]>) {
155        out.clear();
156        if out.capacity() < depths.len() {
157            out.reserve(depths.len() - out.capacity());
158        }
159
160        for depth in depths {
161            out.push(self.encode(fingerprint, *depth, *depth));
162        }
163    }
164}
165
166/// Memory guardrail used to bound in-memory radix footprint.
167#[derive(Clone, Copy, Debug, PartialEq, Eq)]
168pub struct MemoryGuardrail {
169    pub host_memory_bytes: u64,
170    pub reserved_system_bytes: u64,
171    pub max_index_fraction_pct: u8,
172    pub map_overhead_bytes_per_entry: u64,
173}
174
175impl Default for MemoryGuardrail {
176    fn default() -> Self {
177        Self::m3_max_96gb()
178    }
179}
180
181impl MemoryGuardrail {
182    #[must_use]
183    pub const fn m3_max_96gb() -> Self {
184        Self {
185            host_memory_bytes: M3_MAX_UNIFIED_MEMORY_BYTES,
186            reserved_system_bytes: 48_u64 * 1024 * 1024 * 1024,
187            max_index_fraction_pct: 30,
188            map_overhead_bytes_per_entry: DEFAULT_MAP_OVERHEAD_BYTES_PER_ENTRY,
189        }
190    }
191
192    #[must_use]
193    pub fn index_budget_bytes(self) -> u64 {
194        let available = self.host_memory_bytes.saturating_sub(self.reserved_system_bytes);
195        available.saturating_mul(u64::from(self.max_index_fraction_pct)) / 100
196    }
197
198    #[must_use]
199    pub fn estimate_entry_bytes(self) -> u64 {
200        let key_size = u64::try_from(size_of::<[u8; 32]>()).unwrap_or(u64::MAX);
201        let entry_size = u64::try_from(size_of::<Entry>()).unwrap_or(u64::MAX);
202        key_size.saturating_add(entry_size).saturating_add(self.map_overhead_bytes_per_entry)
203    }
204
205    #[must_use]
206    pub fn estimate_index_bytes(self, entry_count: usize) -> u64 {
207        let entry_count_u64 = u64::try_from(entry_count).unwrap_or(u64::MAX);
208        self.estimate_entry_bytes().saturating_mul(entry_count_u64)
209    }
210}
211
212/// Guardrail violations during inserts.
213#[derive(Clone, Copy, Debug, PartialEq, Eq)]
214pub enum GuardrailError {
215    MemoryBudgetExceeded { estimated_bytes: u64, budget_bytes: u64 },
216}
217
218#[derive(Clone, Copy, Debug, PartialEq, Eq)]
219struct Entry {
220    block_index: u32,
221    residency: Residency,
222}
223
224/// Local-memory-only radix-like lookup index for block keys.
225#[derive(Default)]
226pub struct RadixIndex {
227    entries: BTreeMap<[u8; 32], Entry>,
228}
229
230/// Global guard used by tests to ensure lookup stays memory-only.
231pub static IO_GUARD: AtomicUsize = AtomicUsize::new(0);
232
233impl RadixIndex {
234    /// Upsert a key with block index and residency state.
235    pub fn upsert(&mut self, key: [u8; 32], block_index: u32, residency: Residency) {
236        self.entries.insert(key, Entry { block_index, residency });
237    }
238
239    /// Upsert only when guardrail budget remains under limit.
240    pub fn upsert_guarded(
241        &mut self,
242        key: [u8; 32],
243        block_index: u32,
244        residency: Residency,
245        guardrail: MemoryGuardrail,
246    ) -> Result<(), GuardrailError> {
247        let next_entry_count = if self.entries.contains_key(&key) {
248            self.entries.len()
249        } else {
250            self.entries.len().saturating_add(1)
251        };
252        let estimated_bytes = guardrail.estimate_index_bytes(next_entry_count);
253        let budget_bytes = guardrail.index_budget_bytes();
254        if estimated_bytes > budget_bytes {
255            return Err(GuardrailError::MemoryBudgetExceeded { estimated_bytes, budget_bytes });
256        }
257
258        self.upsert(key, block_index, residency);
259        Ok(())
260    }
261
262    /// Estimated heap footprint for current entry count.
263    #[must_use]
264    pub fn estimated_heap_bytes(&self, guardrail: MemoryGuardrail) -> u64 {
265        guardrail.estimate_index_bytes(self.entries.len())
266    }
267
268    /// Number of entries in the index.
269    #[must_use]
270    pub fn len(&self) -> usize {
271        self.entries.len()
272    }
273
274    /// Returns true when index is empty.
275    #[must_use]
276    pub fn is_empty(&self) -> bool {
277        self.entries.is_empty()
278    }
279
280    /// Allocation-stable longest-prefix lookup.
281    ///
282    /// Partial-hit rule:
283    /// - resident handles stop at the first `RemoteOnly` block
284    /// - all subsequent matched blocks are returned as prefetch keys
285    #[must_use]
286    pub fn lookup_blocks_into(&self, chain: &[[u8; 32]], buffers: &mut LookupBuffers) -> u32 {
287        buffers.clear();
288
289        let mut matched_depth = 0_u32;
290        let mut remote_seen = false;
291
292        for key in chain {
293            let Some(entry) = self.entries.get(key) else {
294                break;
295            };
296
297            matched_depth = matched_depth.saturating_add(1);
298            if remote_seen {
299                buffers.prefetch_keys.push(*key);
300                continue;
301            }
302
303            match entry.residency {
304                Residency::Ram | Residency::Nvme => buffers.resident_handles.push(BlockHandle {
305                    key: *key,
306                    block_index: entry.block_index,
307                    residency: entry.residency,
308                }),
309                Residency::RemoteOnly => {
310                    remote_seen = true;
311                    buffers.prefetch_keys.push(*key);
312                }
313            }
314        }
315
316        matched_depth
317    }
318
319    /// Longest-prefix lookup across a hashed prefix chain.
320    #[must_use]
321    pub fn lookup_chain(&self, chain: &[[u8; 32]]) -> LookupResult {
322        let mut buffers = LookupBuffers::with_capacity(chain.len());
323        let matched_depth = self.lookup_blocks_into(chain, &mut buffers);
324        LookupResult {
325            matched_depth,
326            resident_handles: buffers.resident_handles,
327            prefetch_keys: buffers.prefetch_keys,
328        }
329    }
330}
331
332#[cfg(test)]
333mod tests {
334    use super::{
335        GuardrailError, KeyEncoder, LookupBufferPool, LookupBuffers, MemoryGuardrail, RadixIndex,
336        Residency, IO_GUARD, M3_MAX_UNIFIED_MEMORY_BYTES,
337    };
338    use std::sync::atomic::Ordering;
339
340    fn key(byte: u8) -> [u8; 32] {
341        [byte; 32]
342    }
343
344    #[test]
345    fn crate_id_is_stable() {
346        assert_eq!(super::crate_id(), "tp-radix");
347    }
348
349    #[test]
350    fn longest_prefix_match_stops_at_first_missing_key() {
351        let mut index = RadixIndex::default();
352        index.upsert(key(1), 0, Residency::Ram);
353        index.upsert(key(2), 1, Residency::Ram);
354        // key(3) missing in index
355
356        let result = index.lookup_chain(&[key(1), key(2), key(3), key(4)]);
357        assert_eq!(result.matched_depth, 2);
358        assert_eq!(result.resident_handles.len(), 2);
359        assert!(result.prefetch_keys.is_empty());
360    }
361
362    #[test]
363    fn lookup_is_memory_only_no_io_side_effects() {
364        IO_GUARD.store(0, Ordering::SeqCst);
365        let mut index = RadixIndex::default();
366        index.upsert(key(1), 0, Residency::Ram);
367        let _ = index.lookup_chain(&[key(1)]);
368        assert_eq!(IO_GUARD.load(Ordering::SeqCst), 0);
369    }
370
371    #[test]
372    fn mixed_residency_returns_partial_handles_and_prefetch_keys() {
373        let mut index = RadixIndex::default();
374        index.upsert(key(1), 0, Residency::Ram);
375        index.upsert(key(2), 1, Residency::Nvme);
376        index.upsert(key(3), 2, Residency::RemoteOnly);
377        index.upsert(key(4), 3, Residency::Ram);
378
379        let result = index.lookup_chain(&[key(1), key(2), key(3), key(4)]);
380        assert_eq!(result.matched_depth, 4);
381        assert_eq!(result.resident_handles.len(), 2);
382        assert_eq!(result.prefetch_keys, vec![key(3), key(4)]);
383    }
384
385    #[test]
386    fn m3_guardrail_has_conservative_budget() {
387        let guardrail = MemoryGuardrail::m3_max_96gb();
388        assert_eq!(guardrail.host_memory_bytes, M3_MAX_UNIFIED_MEMORY_BYTES);
389        assert!(guardrail.index_budget_bytes() < M3_MAX_UNIFIED_MEMORY_BYTES);
390        assert!(guardrail.index_budget_bytes() > 0);
391    }
392
393    #[test]
394    fn memory_guardrail_rejects_insert_when_budget_exceeded() {
395        let mut index = RadixIndex::default();
396        let guardrail = MemoryGuardrail {
397            host_memory_bytes: 1024,
398            reserved_system_bytes: 0,
399            max_index_fraction_pct: 1,
400            map_overhead_bytes_per_entry: 256,
401        };
402
403        let result = index.upsert_guarded(key(1), 0, Residency::Ram, guardrail);
404        assert!(matches!(result, Err(GuardrailError::MemoryBudgetExceeded { .. })));
405        assert!(index.is_empty());
406    }
407
408    #[test]
409    fn lookup_blocks_into_reuses_buffer_capacity() {
410        let mut index = RadixIndex::default();
411        index.upsert(key(1), 0, Residency::Ram);
412        index.upsert(key(2), 1, Residency::Ram);
413        index.upsert(key(3), 2, Residency::Ram);
414
415        let chain = [key(1), key(2), key(3)];
416        let mut buffers = LookupBuffers::with_capacity(8);
417
418        let _ = index.lookup_blocks_into(&chain, &mut buffers);
419        let ptr = buffers.resident_handles.as_ptr();
420        let cap = buffers.resident_handles.capacity();
421
422        for _ in 0..16 {
423            let matched = index.lookup_blocks_into(&chain, &mut buffers);
424            assert_eq!(matched, 3);
425        }
426
427        assert_eq!(buffers.resident_handles.as_ptr(), ptr);
428        assert_eq!(buffers.resident_handles.capacity(), cap);
429    }
430
431    #[test]
432    fn lookup_buffer_pool_recycles_allocations() {
433        let mut pool = LookupBufferPool::new(1);
434        let mut first = pool.acquire();
435        first.resident_handles.reserve(32);
436        let first_capacity = first.resident_handles.capacity();
437        pool.release(first);
438
439        let second = pool.acquire();
440        assert!(second.resident_handles.capacity() >= first_capacity);
441        assert_eq!(pool.available(), 0);
442        pool.release(second);
443
444        assert_eq!(pool.available(), 1);
445        assert_eq!(pool.high_watermark(), 1);
446    }
447
448    #[test]
449    fn key_encoder_is_deterministic_and_reuses_chain_buffer() {
450        let mut encoder = KeyEncoder::default();
451        let mut out = Vec::with_capacity(8);
452        encoder.encode_chain([7; 16], &[1, 2, 3, 4], &mut out);
453        let first_run = out.clone();
454        let cap = out.capacity();
455
456        encoder.encode_chain([7; 16], &[1, 2, 3, 4], &mut out);
457        assert_eq!(out, first_run);
458        assert_eq!(out.capacity(), cap);
459
460        let single = encoder.encode([7; 16], 2, 2);
461        assert_eq!(single, out[1]);
462    }
463}