1#![forbid(unsafe_code)]
24
25use heapless::Vec as HeaplessVec;
26
27use crate::btree::node::{
28 decode_node, encode_node, max_inline_value, max_key_len, DecodedNode, InternalEntry, LeafEntry,
29 NodeKind, INTERNAL_LEFTMOST_CHILD_BYTES, INTERNAL_SLOT_BYTES, LEAF_SLOT_BYTES,
30};
31use crate::btree::{BTree, MAX_BTREE_DEPTH};
32use crate::error::{Error, Result};
33use crate::pager::page::{Page, PageId, PAGE_SIZE, PAGE_TRAILER_SIZE};
34use crate::pager::Pager;
35use crate::platform::FileBackend;
36
37struct PathFrame {
40 page_id: PageId,
41 node: DecodedNode,
42 child_index: usize,
45}
46
47enum ReplaceOutcome {
51 Fits { new_id: PageId },
54 Split {
59 left_id: PageId,
60 right_id: PageId,
61 promoted_key: Vec<u8>,
62 },
63}
64
65const PAYLOAD_BYTES: usize = PAGE_SIZE - PAGE_TRAILER_SIZE - crate::btree::node::NODE_HEADER_SIZE;
68
69impl<F: FileBackend> BTree<F> {
70 pub fn insert(&mut self, pager: &mut Pager<F>, key: &[u8], value: &[u8]) -> Result<()> {
90 check_key_value_size(key, value)?;
91 let path = self.descend_with_path(pager, key)?;
92 self.apply_insert(pager, path, key, value)
93 }
94
95 fn descend_with_path(
98 &self,
99 pager: &mut Pager<F>,
100 key: &[u8],
101 ) -> Result<HeaplessVec<PathFrame, MAX_BTREE_DEPTH>> {
102 let mut path: HeaplessVec<PathFrame, MAX_BTREE_DEPTH> = HeaplessVec::new();
103 let mut current = self.root;
104 loop {
105 let decoded = {
106 let page_ref = pager.read_page(current)?;
107 decode_node(page_ref.as_bytes())?
108 };
109 match decoded.kind {
110 NodeKind::Leaf => {
111 let frame = PathFrame {
112 page_id: current,
113 node: decoded,
114 child_index: 0,
115 };
116 if path.push(frame).is_err() {
117 return Err(Error::BTreeDepthExceeded {
118 limit: MAX_BTREE_DEPTH,
119 });
120 }
121 return Ok(path);
122 }
123 NodeKind::Internal => {
124 let child_index = pivot_index(&decoded, key);
125 let raw = decoded.children[child_index];
126 let next = PageId::new(raw).ok_or(Error::BTreeInvariantViolated {
127 reason: "internal node had zero child page-id",
128 })?;
129 let frame = PathFrame {
130 page_id: current,
131 node: decoded,
132 child_index,
133 };
134 if path.push(frame).is_err() {
135 return Err(Error::BTreeDepthExceeded {
136 limit: MAX_BTREE_DEPTH,
137 });
138 }
139 current = next;
140 }
141 }
142 }
143 }
144
145 fn apply_insert(
148 &mut self,
149 pager: &mut Pager<F>,
150 mut path: HeaplessVec<PathFrame, MAX_BTREE_DEPTH>,
151 key: &[u8],
152 value: &[u8],
153 ) -> Result<()> {
154 let mut freed: HeaplessVec<PageId, { MAX_BTREE_DEPTH * 2 }> = HeaplessVec::new();
155 let Some(leaf_frame) = path.pop() else {
156 return Err(Error::BTreeInvariantViolated {
157 reason: "insert: descend returned empty path",
158 });
159 };
160 let mut outcome = replace_leaf(pager, leaf_frame, key, value, &mut freed)?;
161 while let Some(parent_frame) = path.pop() {
162 outcome = replace_internal(pager, parent_frame, outcome, &mut freed)?;
163 }
164 let new_root = build_new_root(pager, outcome)?;
165 self.root = new_root;
166 for old_id in freed.iter().copied() {
171 pager.free_page(old_id)?;
172 }
173 Ok(())
174 }
175}
176
177fn replace_leaf<F: FileBackend>(
178 pager: &mut Pager<F>,
179 frame: PathFrame,
180 key: &[u8],
181 value: &[u8],
182 freed: &mut HeaplessVec<PageId, { MAX_BTREE_DEPTH * 2 }>,
183) -> Result<ReplaceOutcome> {
184 let mut leaf = frame.node;
185 if leaf.leaves.iter().any(|e| e.key.as_slice() == key) {
186 return Err(Error::BTreeKeyExists);
187 }
188 let insert_at = leaf
189 .leaves
190 .iter()
191 .position(|e| e.key.as_slice() > key)
192 .unwrap_or(leaf.leaves.len());
193 leaf.leaves.insert(
194 insert_at,
195 LeafEntry {
196 key: key.to_vec(),
197 value: value.to_vec(),
198 },
199 );
200 push_freed(freed, frame.page_id)?;
201 if leaf.occupied_bytes() <= PAYLOAD_BYTES {
202 let new_id = write_new_node(pager, &leaf)?;
203 return Ok(ReplaceOutcome::Fits { new_id });
204 }
205 split_leaf(pager, leaf)
206}
207
208fn replace_internal<F: FileBackend>(
209 pager: &mut Pager<F>,
210 frame: PathFrame,
211 child_outcome: ReplaceOutcome,
212 freed: &mut HeaplessVec<PageId, { MAX_BTREE_DEPTH * 2 }>,
213) -> Result<ReplaceOutcome> {
214 let mut internal = frame.node;
215 let idx = frame.child_index;
216 match child_outcome {
217 ReplaceOutcome::Fits { new_id } => {
218 internal.children[idx] = new_id.get();
219 }
220 ReplaceOutcome::Split {
221 left_id,
222 right_id,
223 promoted_key,
224 } => {
225 internal.children[idx] = left_id.get();
226 internal
227 .internals
228 .insert(idx, InternalEntry { key: promoted_key });
229 internal.children.insert(idx + 1, right_id.get());
230 }
231 }
232 push_freed(freed, frame.page_id)?;
233 if internal.occupied_bytes() <= PAYLOAD_BYTES {
234 let new_id = write_new_node(pager, &internal)?;
235 return Ok(ReplaceOutcome::Fits { new_id });
236 }
237 split_internal(pager, internal)
238}
239
240fn build_new_root<F: FileBackend>(pager: &mut Pager<F>, outcome: ReplaceOutcome) -> Result<PageId> {
244 let (left_id, right_id, promoted_key) = match outcome {
245 ReplaceOutcome::Fits { new_id } => return Ok(new_id),
246 ReplaceOutcome::Split {
247 left_id,
248 right_id,
249 promoted_key,
250 } => (left_id, right_id, promoted_key),
251 };
252 let level = node_level_after_split(pager, left_id)?;
253 let next_level = level.checked_add(1).ok_or(Error::BTreeDepthExceeded {
254 limit: MAX_BTREE_DEPTH,
255 })?;
256 let root_node = DecodedNode {
257 kind: NodeKind::Internal,
258 level: next_level,
259 next_sibling: 0,
260 children: vec![left_id.get(), right_id.get()],
261 leaves: Vec::new(),
262 internals: vec![InternalEntry { key: promoted_key }],
263 };
264 write_new_node(pager, &root_node)
265}
266
267fn push_freed(freed: &mut HeaplessVec<PageId, { MAX_BTREE_DEPTH * 2 }>, id: PageId) -> Result<()> {
268 freed.push(id).map_err(|_| Error::BTreeInvariantViolated {
269 reason: "insert: too many displaced pages to track",
270 })
271}
272
273fn pivot_index(node: &DecodedNode, key: &[u8]) -> usize {
275 let mut idx = node.internals.len();
276 for (i, pivot) in node.internals.iter().enumerate() {
277 if pivot.key.as_slice() > key {
278 idx = i;
279 break;
280 }
281 }
282 idx
283}
284
285fn check_key_value_size(key: &[u8], value: &[u8]) -> Result<()> {
287 if key.len() > max_key_len() {
288 return Err(Error::BTreeKeyTooLarge {
289 key_len: key.len(),
290 max: max_key_len(),
291 });
292 }
293 let v_max = max_inline_value(key.len());
294 if value.len() > v_max {
295 return Err(Error::BTreeValueTooLarge {
296 value_len: value.len(),
297 max: v_max,
298 });
299 }
300 Ok(())
301}
302
303pub(crate) fn write_new_node<F: FileBackend>(
306 pager: &mut Pager<F>,
307 node: &DecodedNode,
308) -> Result<PageId> {
309 let new_id = pager.alloc_page()?;
310 let mut page = Page::zeroed();
311 encode_node(node, &mut page)?;
312 pager.write_page(new_id, &page)?;
313 Ok(new_id)
314}
315
316fn split_leaf<F: FileBackend>(
323 pager: &mut Pager<F>,
324 mut leaf: DecodedNode,
325) -> Result<ReplaceOutcome> {
326 let mid = leaf.leaves.len() / 2;
327 let original_sibling = leaf.next_sibling;
328 let right_entries: Vec<LeafEntry> = leaf.leaves.split_off(mid);
332 let promoted_key = right_entries[0].key.clone();
333 let right_node = DecodedNode {
334 kind: NodeKind::Leaf,
335 level: 0,
336 next_sibling: original_sibling,
337 children: Vec::new(),
338 leaves: right_entries,
339 internals: Vec::new(),
340 };
341 let right_id = write_new_node(pager, &right_node)?;
342 let left_node = DecodedNode {
343 kind: NodeKind::Leaf,
344 level: 0,
345 next_sibling: right_id.get(),
346 children: Vec::new(),
347 leaves: leaf.leaves,
348 internals: Vec::new(),
349 };
350 let left_id = write_new_node(pager, &left_node)?;
351 Ok(ReplaceOutcome::Split {
352 left_id,
353 right_id,
354 promoted_key,
355 })
356}
357
358fn split_internal<F: FileBackend>(
362 pager: &mut Pager<F>,
363 mut internal: DecodedNode,
364) -> Result<ReplaceOutcome> {
365 let k = internal.internals.len();
366 debug_assert!(k >= 2, "internal split needs ≥ 2 pivots");
367 let mid = k / 2;
368 let right_pivots: Vec<InternalEntry> = internal.internals.split_off(mid + 1);
371 let right_children: Vec<u64> = internal.children.split_off(mid + 1);
372 let promoted_pivot = internal
375 .internals
376 .pop()
377 .ok_or(Error::BTreeInvariantViolated {
378 reason: "internal split: missing promoted pivot",
379 })?;
380 let level = internal.level;
381 let right_node = DecodedNode {
382 kind: NodeKind::Internal,
383 level,
384 next_sibling: 0,
385 children: right_children,
386 leaves: Vec::new(),
387 internals: right_pivots,
388 };
389 let right_id = write_new_node(pager, &right_node)?;
390 let left_node = DecodedNode {
391 kind: NodeKind::Internal,
392 level,
393 next_sibling: 0,
394 children: internal.children,
395 leaves: Vec::new(),
396 internals: internal.internals,
397 };
398 let left_id = write_new_node(pager, &left_node)?;
399 Ok(ReplaceOutcome::Split {
400 left_id,
401 right_id,
402 promoted_key: promoted_pivot.key,
403 })
404}
405
406fn node_level_after_split<F: FileBackend>(pager: &mut Pager<F>, id: PageId) -> Result<u8> {
410 let page_ref = pager.read_page(id)?;
411 let decoded = decode_node(page_ref.as_bytes())?;
412 Ok(decoded.level)
413}
414
415const _: usize = PAYLOAD_BYTES;
419
420const _UNUSED_CHECKS: () = {
424 let _ = INTERNAL_LEFTMOST_CHILD_BYTES;
425 let _ = INTERNAL_SLOT_BYTES;
426 let _ = LEAF_SLOT_BYTES;
427};
428
429#[cfg(test)]
430mod tests {
431 use super::*;
432 use crate::pager::{Config, Pager};
433 use crate::platform::FileHandle;
434
435 use proptest::prelude::*;
436 use rand::prelude::IndexedRandom;
437 use rand::SeedableRng;
438 use rand_chacha::ChaCha8Rng;
439 use std::collections::BTreeMap;
440
441 fn config() -> Config {
442 Config::default()
443 }
444
445 #[test]
446 fn insert_single_key_round_trip() {
447 let mut pager = Pager::<FileHandle>::memory(config()).expect("pager");
448 let mut tree = BTree::<FileHandle>::empty(&mut pager).expect("empty");
449 tree.insert(&mut pager, b"hello", b"world").expect("ins");
450 assert_eq!(
451 tree.get(&mut pager, b"hello").expect("get"),
452 Some(b"world".to_vec())
453 );
454 }
455
456 #[test]
457 fn duplicate_key_errors() {
458 let mut pager = Pager::<FileHandle>::memory(config()).expect("pager");
459 let mut tree = BTree::<FileHandle>::empty(&mut pager).expect("empty");
460 tree.insert(&mut pager, b"k", b"v1").expect("ins");
461 let err = tree
462 .insert(&mut pager, b"k", b"v2")
463 .expect_err("dup must fail");
464 assert!(matches!(err, Error::BTreeKeyExists));
465 }
466
467 #[test]
468 fn insert_growth_splits_root() {
469 let mut pager = Pager::<FileHandle>::memory(config()).expect("pager");
470 let mut tree = BTree::<FileHandle>::empty(&mut pager).expect("empty");
471 let value = vec![0xABu8; 256];
473 for i in 0..200u32 {
474 let key = format!("key-{i:08}");
475 tree.insert(&mut pager, key.as_bytes(), &value)
476 .expect("ins");
477 }
478 for i in 0..200u32 {
479 let key = format!("key-{i:08}");
480 assert_eq!(
481 tree.get(&mut pager, key.as_bytes()).expect("get"),
482 Some(value.clone()),
483 "key {key}"
484 );
485 }
486 let root = tree.root();
488 let page_ref = pager.read_page(root).expect("read root");
489 let decoded = decode_node(page_ref.as_bytes()).expect("decode root");
490 assert!(
491 decoded.level >= 1,
492 "expected internal root, got {decoded:?}"
493 );
494 }
495
496 proptest! {
497 #![proptest_config(ProptestConfig {
498 cases: 16,
499 max_shrink_iters: 32,
500 .. ProptestConfig::default()
501 })]
502
503 #[test]
504 fn insert_oracle_property(seed in any::<u64>()) {
505 run_insert_oracle(seed, 200);
506 }
507 }
508
509 #[test]
513 fn insert_oracle_10k() {
514 for seed in 0..3u64 {
515 run_insert_oracle(seed, 10_000);
516 }
517 }
518
519 fn run_insert_oracle(seed: u64, ops: usize) {
520 let mut rng = ChaCha8Rng::seed_from_u64(seed);
521 let mut pager = Pager::<FileHandle>::memory(config()).expect("pager");
522 let mut tree = BTree::<FileHandle>::empty(&mut pager).expect("empty");
523 let mut oracle: BTreeMap<Vec<u8>, Vec<u8>> = BTreeMap::new();
524 for op in 0..ops {
525 let key = random_key(&mut rng);
526 let value = random_value(&mut rng);
527 let key_already = oracle.contains_key(&key);
528 let res = tree.insert(&mut pager, &key, &value);
529 if key_already {
530 assert!(
531 matches!(res, Err(Error::BTreeKeyExists)),
532 "seed {seed} op {op}: expected BTreeKeyExists, got {res:?}"
533 );
534 } else {
535 res.unwrap_or_else(|e| panic!("seed {seed} op {op}: insert err {e:?}"));
536 oracle.insert(key.clone(), value.clone());
537 }
538 if op.is_multiple_of(127) {
539 let keys: Vec<&Vec<u8>> = oracle.keys().collect();
541 if !keys.is_empty() {
542 let sample: Vec<&Vec<u8>> =
543 keys.choose_multiple(&mut rng, 4).copied().collect();
544 for k in sample {
545 assert_eq!(
546 tree.get(&mut pager, k).expect("get").as_ref(),
547 oracle.get(k),
548 "seed {seed} op {op}: key {k:?}"
549 );
550 }
551 }
552 }
553 }
554 for (k, v) in &oracle {
556 assert_eq!(
557 tree.get(&mut pager, k).expect("get").as_ref(),
558 Some(v),
559 "seed {seed} final: key {k:?}"
560 );
561 }
562 }
563
564 fn random_key(rng: &mut ChaCha8Rng) -> Vec<u8> {
565 use rand::Rng;
566 let len = rng.random_range(1..16);
567 (0..len).map(|_| rng.random_range(b'a'..=b'z')).collect()
568 }
569
570 fn random_value(rng: &mut ChaCha8Rng) -> Vec<u8> {
571 use rand::Rng;
572 let len = rng.random_range(0..64);
573 (0..len).map(|_| rng.random()).collect()
574 }
575}