1#![forbid(unsafe_code)]
2
3use std::collections::BTreeMap;
4use std::mem::size_of;
5use std::sync::atomic::AtomicUsize;
6
7pub mod block_meta;
8pub mod slatedb_meta;
9pub use block_meta::{
10 BlockHash, BlockMeta, InMemoryMetadataIndex, LayoutTag, MetadataIndex, ModelDigest,
11};
12pub use slatedb_meta::{SlateDbMetaError, SlateDbMetadataIndex};
13
14#[must_use]
16pub fn crate_id() -> &'static str {
17 "tp-radix"
18}
19
20pub const M3_MAX_UNIFIED_MEMORY_BYTES: u64 = 96_u64 * 1024 * 1024 * 1024;
22
23pub const BLOCK_KEY_PREFIX: &str = "wombatkv/v1/block/b3=";
30
31pub const SIDECAR_RAW_TAIL_KEY_PREFIX: &str = "wombatkv/v1/sidecar/raw_tail/b3=";
34
35const DEFAULT_MAP_OVERHEAD_BYTES_PER_ENTRY: u64 = 64;
36
37#[derive(Clone, Copy, Debug, PartialEq, Eq)]
39pub enum Residency {
40 Ram,
41 Nvme,
42 RemoteOnly,
43}
44
45#[derive(Clone, Debug, PartialEq, Eq)]
47pub struct BlockHandle {
48 pub key: [u8; 32],
49 pub block_index: u32,
50 pub residency: Residency,
51}
52
53#[derive(Clone, Debug, PartialEq, Eq)]
55pub struct LookupResult {
56 pub matched_depth: u32,
57 pub resident_handles: Vec<BlockHandle>,
58 pub prefetch_keys: Vec<[u8; 32]>,
59}
60
61#[derive(Clone, Debug, Default, PartialEq, Eq)]
63pub struct LookupBuffers {
64 pub resident_handles: Vec<BlockHandle>,
65 pub prefetch_keys: Vec<[u8; 32]>,
66}
67
68impl LookupBuffers {
69 #[must_use]
70 pub fn with_capacity(capacity: usize) -> Self {
71 Self {
72 resident_handles: Vec::with_capacity(capacity),
73 prefetch_keys: Vec::with_capacity(capacity),
74 }
75 }
76
77 pub fn clear(&mut self) {
78 self.resident_handles.clear();
79 self.prefetch_keys.clear();
80 }
81
82 #[must_use]
83 pub fn combined_capacity(&self) -> usize {
84 self.resident_handles.capacity().saturating_add(self.prefetch_keys.capacity())
85 }
86}
87
88#[derive(Debug)]
90pub struct LookupBufferPool {
91 free: Vec<LookupBuffers>,
92 max_pool_size: usize,
93 high_watermark: usize,
94}
95
96impl Default for LookupBufferPool {
97 fn default() -> Self {
98 Self::new(32)
99 }
100}
101
102impl LookupBufferPool {
103 #[must_use]
104 pub fn new(max_pool_size: usize) -> Self {
105 Self { free: Vec::new(), max_pool_size: max_pool_size.max(1), high_watermark: 0 }
106 }
107
108 #[must_use]
109 pub fn acquire(&mut self) -> LookupBuffers {
110 self.free.pop().unwrap_or_default()
111 }
112
113 pub fn release(&mut self, mut buffers: LookupBuffers) {
114 buffers.clear();
115 if self.free.len() < self.max_pool_size {
116 self.free.push(buffers);
117 self.high_watermark = self.high_watermark.max(self.free.len());
118 }
119 }
120
121 #[must_use]
122 pub const fn high_watermark(&self) -> usize {
123 self.high_watermark
124 }
125
126 #[must_use]
127 pub fn available(&self) -> usize {
128 self.free.len()
129 }
130}
131
132#[derive(Clone, Debug, Default)]
134pub struct KeyEncoder {
135 scratch: [u8; 32],
136}
137
138impl KeyEncoder {
139 #[must_use]
141 pub fn encode(&mut self, fingerprint: [u8; 16], block_index: u32, depth: u32) -> [u8; 32] {
142 self.scratch.fill(0);
143 self.scratch[..16].copy_from_slice(&fingerprint);
144 self.scratch[16..20].copy_from_slice(&block_index.to_le_bytes());
145 self.scratch[20..24].copy_from_slice(&depth.to_le_bytes());
146
147 for idx in 24..32 {
148 self.scratch[idx] = self.scratch[idx - 24] ^ self.scratch[idx - 8].rotate_left(1);
149 }
150 self.scratch
151 }
152
153 pub fn encode_chain(&mut self, fingerprint: [u8; 16], depths: &[u32], out: &mut Vec<[u8; 32]>) {
155 out.clear();
156 if out.capacity() < depths.len() {
157 out.reserve(depths.len() - out.capacity());
158 }
159
160 for depth in depths {
161 out.push(self.encode(fingerprint, *depth, *depth));
162 }
163 }
164}
165
166#[derive(Clone, Copy, Debug, PartialEq, Eq)]
168pub struct MemoryGuardrail {
169 pub host_memory_bytes: u64,
170 pub reserved_system_bytes: u64,
171 pub max_index_fraction_pct: u8,
172 pub map_overhead_bytes_per_entry: u64,
173}
174
175impl Default for MemoryGuardrail {
176 fn default() -> Self {
177 Self::m3_max_96gb()
178 }
179}
180
181impl MemoryGuardrail {
182 #[must_use]
183 pub const fn m3_max_96gb() -> Self {
184 Self {
185 host_memory_bytes: M3_MAX_UNIFIED_MEMORY_BYTES,
186 reserved_system_bytes: 48_u64 * 1024 * 1024 * 1024,
187 max_index_fraction_pct: 30,
188 map_overhead_bytes_per_entry: DEFAULT_MAP_OVERHEAD_BYTES_PER_ENTRY,
189 }
190 }
191
192 #[must_use]
193 pub fn index_budget_bytes(self) -> u64 {
194 let available = self.host_memory_bytes.saturating_sub(self.reserved_system_bytes);
195 available.saturating_mul(u64::from(self.max_index_fraction_pct)) / 100
196 }
197
198 #[must_use]
199 pub fn estimate_entry_bytes(self) -> u64 {
200 let key_size = u64::try_from(size_of::<[u8; 32]>()).unwrap_or(u64::MAX);
201 let entry_size = u64::try_from(size_of::<Entry>()).unwrap_or(u64::MAX);
202 key_size.saturating_add(entry_size).saturating_add(self.map_overhead_bytes_per_entry)
203 }
204
205 #[must_use]
206 pub fn estimate_index_bytes(self, entry_count: usize) -> u64 {
207 let entry_count_u64 = u64::try_from(entry_count).unwrap_or(u64::MAX);
208 self.estimate_entry_bytes().saturating_mul(entry_count_u64)
209 }
210}
211
212#[derive(Clone, Copy, Debug, PartialEq, Eq)]
214pub enum GuardrailError {
215 MemoryBudgetExceeded { estimated_bytes: u64, budget_bytes: u64 },
216}
217
218#[derive(Clone, Copy, Debug, PartialEq, Eq)]
219struct Entry {
220 block_index: u32,
221 residency: Residency,
222}
223
224#[derive(Default)]
226pub struct RadixIndex {
227 entries: BTreeMap<[u8; 32], Entry>,
228}
229
230pub static IO_GUARD: AtomicUsize = AtomicUsize::new(0);
232
233impl RadixIndex {
234 pub fn upsert(&mut self, key: [u8; 32], block_index: u32, residency: Residency) {
236 self.entries.insert(key, Entry { block_index, residency });
237 }
238
239 pub fn upsert_guarded(
241 &mut self,
242 key: [u8; 32],
243 block_index: u32,
244 residency: Residency,
245 guardrail: MemoryGuardrail,
246 ) -> Result<(), GuardrailError> {
247 let next_entry_count = if self.entries.contains_key(&key) {
248 self.entries.len()
249 } else {
250 self.entries.len().saturating_add(1)
251 };
252 let estimated_bytes = guardrail.estimate_index_bytes(next_entry_count);
253 let budget_bytes = guardrail.index_budget_bytes();
254 if estimated_bytes > budget_bytes {
255 return Err(GuardrailError::MemoryBudgetExceeded { estimated_bytes, budget_bytes });
256 }
257
258 self.upsert(key, block_index, residency);
259 Ok(())
260 }
261
262 #[must_use]
264 pub fn estimated_heap_bytes(&self, guardrail: MemoryGuardrail) -> u64 {
265 guardrail.estimate_index_bytes(self.entries.len())
266 }
267
268 #[must_use]
270 pub fn len(&self) -> usize {
271 self.entries.len()
272 }
273
274 #[must_use]
276 pub fn is_empty(&self) -> bool {
277 self.entries.is_empty()
278 }
279
280 #[must_use]
286 pub fn lookup_blocks_into(&self, chain: &[[u8; 32]], buffers: &mut LookupBuffers) -> u32 {
287 buffers.clear();
288
289 let mut matched_depth = 0_u32;
290 let mut remote_seen = false;
291
292 for key in chain {
293 let Some(entry) = self.entries.get(key) else {
294 break;
295 };
296
297 matched_depth = matched_depth.saturating_add(1);
298 if remote_seen {
299 buffers.prefetch_keys.push(*key);
300 continue;
301 }
302
303 match entry.residency {
304 Residency::Ram | Residency::Nvme => buffers.resident_handles.push(BlockHandle {
305 key: *key,
306 block_index: entry.block_index,
307 residency: entry.residency,
308 }),
309 Residency::RemoteOnly => {
310 remote_seen = true;
311 buffers.prefetch_keys.push(*key);
312 }
313 }
314 }
315
316 matched_depth
317 }
318
319 #[must_use]
321 pub fn lookup_chain(&self, chain: &[[u8; 32]]) -> LookupResult {
322 let mut buffers = LookupBuffers::with_capacity(chain.len());
323 let matched_depth = self.lookup_blocks_into(chain, &mut buffers);
324 LookupResult {
325 matched_depth,
326 resident_handles: buffers.resident_handles,
327 prefetch_keys: buffers.prefetch_keys,
328 }
329 }
330}
331
332#[cfg(test)]
333mod tests {
334 use super::{
335 GuardrailError, KeyEncoder, LookupBufferPool, LookupBuffers, MemoryGuardrail, RadixIndex,
336 Residency, IO_GUARD, M3_MAX_UNIFIED_MEMORY_BYTES,
337 };
338 use std::sync::atomic::Ordering;
339
340 fn key(byte: u8) -> [u8; 32] {
341 [byte; 32]
342 }
343
344 #[test]
345 fn crate_id_is_stable() {
346 assert_eq!(super::crate_id(), "tp-radix");
347 }
348
349 #[test]
350 fn longest_prefix_match_stops_at_first_missing_key() {
351 let mut index = RadixIndex::default();
352 index.upsert(key(1), 0, Residency::Ram);
353 index.upsert(key(2), 1, Residency::Ram);
354 let result = index.lookup_chain(&[key(1), key(2), key(3), key(4)]);
357 assert_eq!(result.matched_depth, 2);
358 assert_eq!(result.resident_handles.len(), 2);
359 assert!(result.prefetch_keys.is_empty());
360 }
361
362 #[test]
363 fn lookup_is_memory_only_no_io_side_effects() {
364 IO_GUARD.store(0, Ordering::SeqCst);
365 let mut index = RadixIndex::default();
366 index.upsert(key(1), 0, Residency::Ram);
367 let _ = index.lookup_chain(&[key(1)]);
368 assert_eq!(IO_GUARD.load(Ordering::SeqCst), 0);
369 }
370
371 #[test]
372 fn mixed_residency_returns_partial_handles_and_prefetch_keys() {
373 let mut index = RadixIndex::default();
374 index.upsert(key(1), 0, Residency::Ram);
375 index.upsert(key(2), 1, Residency::Nvme);
376 index.upsert(key(3), 2, Residency::RemoteOnly);
377 index.upsert(key(4), 3, Residency::Ram);
378
379 let result = index.lookup_chain(&[key(1), key(2), key(3), key(4)]);
380 assert_eq!(result.matched_depth, 4);
381 assert_eq!(result.resident_handles.len(), 2);
382 assert_eq!(result.prefetch_keys, vec![key(3), key(4)]);
383 }
384
385 #[test]
386 fn m3_guardrail_has_conservative_budget() {
387 let guardrail = MemoryGuardrail::m3_max_96gb();
388 assert_eq!(guardrail.host_memory_bytes, M3_MAX_UNIFIED_MEMORY_BYTES);
389 assert!(guardrail.index_budget_bytes() < M3_MAX_UNIFIED_MEMORY_BYTES);
390 assert!(guardrail.index_budget_bytes() > 0);
391 }
392
393 #[test]
394 fn memory_guardrail_rejects_insert_when_budget_exceeded() {
395 let mut index = RadixIndex::default();
396 let guardrail = MemoryGuardrail {
397 host_memory_bytes: 1024,
398 reserved_system_bytes: 0,
399 max_index_fraction_pct: 1,
400 map_overhead_bytes_per_entry: 256,
401 };
402
403 let result = index.upsert_guarded(key(1), 0, Residency::Ram, guardrail);
404 assert!(matches!(result, Err(GuardrailError::MemoryBudgetExceeded { .. })));
405 assert!(index.is_empty());
406 }
407
408 #[test]
409 fn lookup_blocks_into_reuses_buffer_capacity() {
410 let mut index = RadixIndex::default();
411 index.upsert(key(1), 0, Residency::Ram);
412 index.upsert(key(2), 1, Residency::Ram);
413 index.upsert(key(3), 2, Residency::Ram);
414
415 let chain = [key(1), key(2), key(3)];
416 let mut buffers = LookupBuffers::with_capacity(8);
417
418 let _ = index.lookup_blocks_into(&chain, &mut buffers);
419 let ptr = buffers.resident_handles.as_ptr();
420 let cap = buffers.resident_handles.capacity();
421
422 for _ in 0..16 {
423 let matched = index.lookup_blocks_into(&chain, &mut buffers);
424 assert_eq!(matched, 3);
425 }
426
427 assert_eq!(buffers.resident_handles.as_ptr(), ptr);
428 assert_eq!(buffers.resident_handles.capacity(), cap);
429 }
430
431 #[test]
432 fn lookup_buffer_pool_recycles_allocations() {
433 let mut pool = LookupBufferPool::new(1);
434 let mut first = pool.acquire();
435 first.resident_handles.reserve(32);
436 let first_capacity = first.resident_handles.capacity();
437 pool.release(first);
438
439 let second = pool.acquire();
440 assert!(second.resident_handles.capacity() >= first_capacity);
441 assert_eq!(pool.available(), 0);
442 pool.release(second);
443
444 assert_eq!(pool.available(), 1);
445 assert_eq!(pool.high_watermark(), 1);
446 }
447
448 #[test]
449 fn key_encoder_is_deterministic_and_reuses_chain_buffer() {
450 let mut encoder = KeyEncoder::default();
451 let mut out = Vec::with_capacity(8);
452 encoder.encode_chain([7; 16], &[1, 2, 3, 4], &mut out);
453 let first_run = out.clone();
454 let cap = out.capacity();
455
456 encoder.encode_chain([7; 16], &[1, 2, 3, 4], &mut out);
457 assert_eq!(out, first_run);
458 assert_eq!(out.capacity(), cap);
459
460 let single = encoder.encode([7; 16], 2, 2);
461 assert_eq!(single, out[1]);
462 }
463}