Skip to main content

triblespace_core/
patch.rs

1//! Persistent Adaptive Trie with Cuckoo-compression and
2//! Hash-maintenance (PATCH).
3//!
4//! See the [PATCH](../book/src/deep-dive/patch.md) chapter of the Tribles Book
5//! for the full design description and hashing scheme.
6//!
7//! Values stored in leaves are not part of hashing or equality comparisons.
8//! Two [`PATCH`](crate::patch::PATCH)es are considered equal if they contain the same set of keys,
9//! even if the associated values differ. This allows using the structure as an
10//! idempotent blobstore where a value's hash determines its key.
11//!
12#![allow(unstable_name_collisions)]
13
14mod branch;
15/// Byte-indexed lookup tables used by PATCH branch nodes.
16pub mod bytetable;
17mod entry;
18mod leaf;
19
20use arrayvec::ArrayVec;
21
22use branch::*;
23/// Re-export of [`Entry`](entry::Entry).
24pub use entry::Entry;
25use leaf::*;
26
27/// Re-export of all byte table utilities.
28pub use bytetable::*;
29use rand::thread_rng;
30use rand::RngCore;
31use std::cmp::Reverse;
32use std::convert::TryInto;
33use std::fmt;
34use std::fmt::Debug;
35use std::marker::PhantomData;
36use std::ptr::NonNull;
37use std::sync::Once;
38
39#[cfg(not(target_pointer_width = "64"))]
40compile_error!("PATCH tagged pointers require 64-bit targets");
41
42static mut SIP_KEY: [u8; 16] = [0; 16];
43static INIT: Once = Once::new();
44
45/// Minimum `other.leaf_count` at which [`Head::union`] takes the
46/// scatter-pair + bitset + adaptive-rayon path on the equal-depth-
47/// branch arm. Below this, the per-key `modify_child` loop wins
48/// because asymmetric merges only touch a handful of slots. Tuned
49/// for the `entities/union*` bench family.
50#[cfg(feature = "parallel")]
51const PARALLEL_PATCH_UNION_THRESHOLD: usize = 4096;
52
53/// Adaptive parallel resolver for the equal-depth-branch arm of
54/// [`Head::union`]. Plugs into rayon's `UnindexedProducer` plumbing so
55/// the work list of (key, this, other) pairs is split *only when
56/// workers actually steal*, rather than always-bisecting up-front.
57///
58/// Child count is a poor proxy for cost — the actual cost of a
59/// recursive `Head::union` depends on how much the two subtrees
60/// overlap, which we can't cheaply measure ahead of time. Letting
61/// rayon's work-stealing demand drive splits sidesteps the heuristic
62/// entirely.
63#[cfg(feature = "parallel")]
64mod parallel_union {
65    use super::{Head, KeySchema};
66    use rayon::iter::plumbing::{
67        bridge_unindexed, Folder, UnindexedConsumer, UnindexedProducer,
68    };
69    use rayon::iter::ParallelIterator;
70
71    pub(crate) struct UnionProducer<const KEY_LEN: usize, O, V>
72    where
73        O: KeySchema<KEY_LEN> + Send + Sync,
74        V: Send + Sync,
75    {
76        work: Vec<(u8, Head<KEY_LEN, O, V>, Head<KEY_LEN, O, V>)>,
77        depth: usize,
78        split_budget: usize,
79    }
80
81    impl<const KEY_LEN: usize, O, V> UnionProducer<KEY_LEN, O, V>
82    where
83        O: KeySchema<KEY_LEN> + Send + Sync,
84        V: Send + Sync,
85    {
86        pub(crate) fn new(
87            work: Vec<(u8, Head<KEY_LEN, O, V>, Head<KEY_LEN, O, V>)>,
88            depth: usize,
89        ) -> Self {
90            // num_threads² split budget — same heuristic as
91            // `QueryParIter`. Caps split tree depth at ~2·log₂(N)
92            // even under sustained stealing pressure.
93            let n = rayon::current_num_threads();
94            let split_budget = n.saturating_mul(n).max(2);
95            Self {
96                work,
97                depth,
98                split_budget,
99            }
100        }
101    }
102
103    impl<const KEY_LEN: usize, O, V> UnindexedProducer for UnionProducer<KEY_LEN, O, V>
104    where
105        O: KeySchema<KEY_LEN> + Send + Sync,
106        V: Send + Sync,
107    {
108        type Item = (u8, Head<KEY_LEN, O, V>);
109
110        fn split(mut self) -> (Self, Option<Self>) {
111            // Rayon only calls this when a worker is ready to steal.
112            // Returning `(self, None)` says "I'm a leaf — fold me
113            // sequentially."
114            if self.split_budget == 0 || self.work.len() < 2 {
115                return (self, None);
116            }
117            self.split_budget -= 1;
118            let mid = self.work.len() / 2;
119            let right_work = self.work.split_off(mid);
120            let left_budget = self.split_budget / 2;
121            let right_budget = self.split_budget - left_budget;
122            self.split_budget = left_budget;
123            let depth = self.depth;
124            (
125                self,
126                Some(Self {
127                    work: right_work,
128                    depth,
129                    split_budget: right_budget,
130                }),
131            )
132        }
133
134        fn fold_with<F: Folder<Self::Item>>(self, mut folder: F) -> F {
135            let depth = self.depth;
136            for (k, t, o) in self.work.into_iter() {
137                if folder.full() {
138                    break;
139                }
140                folder = folder.consume((k, Head::union(t, o, depth)));
141            }
142            folder
143        }
144    }
145
146    impl<const KEY_LEN: usize, O, V> ParallelIterator for UnionProducer<KEY_LEN, O, V>
147    where
148        O: KeySchema<KEY_LEN> + Send + Sync,
149        V: Send + Sync,
150    {
151        type Item = (u8, Head<KEY_LEN, O, V>);
152
153        fn drive_unindexed<Con>(self, consumer: Con) -> Con::Result
154        where
155            Con: UnindexedConsumer<Self::Item>,
156        {
157            bridge_unindexed(self, consumer)
158        }
159    }
160}
161
162/// Initializes the SIP key used for key hashing.
163/// This function is called automatically when a new PATCH is created.
164fn init_sip_key() {
165    INIT.call_once(|| {
166        bytetable::init();
167
168        let mut rng = thread_rng();
169        unsafe {
170            rng.fill_bytes(&mut SIP_KEY[..]);
171        }
172    });
173}
174
175/// Builds a per-byte segment map from the segment lengths.
176///
177/// The returned table maps each key byte to its segment index.
178pub const fn build_segmentation<const N: usize, const M: usize>(lens: [usize; M]) -> [usize; N] {
179    let mut res = [0; N];
180    let mut seg = 0;
181    let mut off = 0;
182    while seg < M {
183        let len = lens[seg];
184        let mut i = 0;
185        while i < len {
186            res[off + i] = seg;
187            i += 1;
188        }
189        off += len;
190        seg += 1;
191    }
192    res
193}
194
195/// Builds an identity permutation table of length `N`.
196pub const fn identity_map<const N: usize>() -> [usize; N] {
197    let mut res = [0; N];
198    let mut i = 0;
199    while i < N {
200        res[i] = i;
201        i += 1;
202    }
203    res
204}
205
206/// Builds a table translating indices from key order to tree order.
207///
208/// `lens` describes the segment lengths in key order and `perm` is the
209/// permutation of those segments in tree order.
210pub const fn build_key_to_tree<const N: usize, const M: usize>(
211    lens: [usize; M],
212    perm: [usize; M],
213) -> [usize; N] {
214    let mut key_starts = [0; M];
215    let mut off = 0;
216    let mut i = 0;
217    while i < M {
218        key_starts[i] = off;
219        off += lens[i];
220        i += 1;
221    }
222
223    let mut tree_starts = [0; M];
224    off = 0;
225    i = 0;
226    while i < M {
227        let seg = perm[i];
228        tree_starts[seg] = off;
229        off += lens[seg];
230        i += 1;
231    }
232
233    let mut res = [0; N];
234    let mut seg = 0;
235    while seg < M {
236        let len = lens[seg];
237        let ks = key_starts[seg];
238        let ts = tree_starts[seg];
239        let mut j = 0;
240        while j < len {
241            res[ks + j] = ts + j;
242            j += 1;
243        }
244        seg += 1;
245    }
246    res
247}
248
249/// Inverts a permutation table.
250pub const fn invert<const N: usize>(arr: [usize; N]) -> [usize; N] {
251    let mut res = [0; N];
252    let mut i = 0;
253    while i < N {
254        res[arr[i]] = i;
255        i += 1;
256    }
257    res
258}
259
260#[doc(hidden)]
261#[macro_export]
262macro_rules! key_segmentation {
263    (@count $($e:expr),* $(,)?) => {
264        <[()]>::len(&[$($crate::key_segmentation!(@sub $e)),*])
265    };
266    (@sub $e:expr) => { () };
267    ($(#[$meta:meta])* $name:ident, $len:expr, [$($seg_len:expr),+ $(,)?]) => {
268        $(#[$meta])*
269        #[derive(Copy, Clone, Debug)]
270        pub struct $name;
271        impl $name {
272            pub const SEG_LENS: [usize; $crate::key_segmentation!(@count $($seg_len),*)] = [$($seg_len),*];
273        }
274        impl $crate::patch::KeySegmentation<$len> for $name {
275            const SEGMENTS: [usize; $len] = $crate::patch::build_segmentation::<$len, {$crate::key_segmentation!(@count $($seg_len),*)}>(Self::SEG_LENS);
276        }
277    };
278}
279
280#[doc(hidden)]
281#[macro_export]
282macro_rules! key_schema {
283    (@count $($e:expr),* $(,)?) => {
284        <[()]>::len(&[$($crate::key_schema!(@sub $e)),*])
285    };
286    (@sub $e:expr) => { () };
287    ($(#[$meta:meta])* $name:ident, $seg:ty, $len:expr, [$($perm:expr),+ $(,)?]) => {
288        $(#[$meta])*
289        #[derive(Copy, Clone, Debug)]
290        pub struct $name;
291        impl $crate::patch::KeySchema<$len> for $name {
292            type Segmentation = $seg;
293            const SEGMENT_PERM: &'static [usize] = &[$($perm),*];
294            const KEY_TO_TREE: [usize; $len] = $crate::patch::build_key_to_tree::<$len, {$crate::key_schema!(@count $($perm),*)}>(<$seg>::SEG_LENS, [$($perm),*]);
295            const TREE_TO_KEY: [usize; $len] = $crate::patch::invert(Self::KEY_TO_TREE);
296        }
297    };
298}
299
300/// A trait is used to provide a re-ordered view of the keys stored in the PATCH.
301/// This allows for different PATCH instances share the same leaf nodes,
302/// independent of the key ordering used in the tree.
303pub trait KeySchema<const KEY_LEN: usize>: Copy + Clone + Debug {
304    /// The segmentation this ordering operates over.
305    type Segmentation: KeySegmentation<KEY_LEN>;
306    /// Order of segments from key layout to tree layout.
307    const SEGMENT_PERM: &'static [usize];
308    /// Maps each key index to its position in the tree view.
309    const KEY_TO_TREE: [usize; KEY_LEN];
310    /// Maps each tree index to its position in the key view.
311    const TREE_TO_KEY: [usize; KEY_LEN];
312
313    /// Reorders the key from the shared key ordering to the tree ordering.
314    fn tree_ordered(key: &[u8; KEY_LEN]) -> [u8; KEY_LEN] {
315        let mut new_key = [0; KEY_LEN];
316        let mut i = 0;
317        while i < KEY_LEN {
318            new_key[Self::KEY_TO_TREE[i]] = key[i];
319            i += 1;
320        }
321        new_key
322    }
323
324    /// Reorders the key from the tree ordering to the shared key ordering.
325    fn key_ordered(tree_key: &[u8; KEY_LEN]) -> [u8; KEY_LEN] {
326        let mut new_key = [0; KEY_LEN];
327        let mut i = 0;
328        while i < KEY_LEN {
329            new_key[Self::TREE_TO_KEY[i]] = tree_key[i];
330            i += 1;
331        }
332        new_key
333    }
334
335    /// Return the segment index for the byte at `at_depth` in tree ordering.
336    ///
337    /// Default implementation reads the static segmentation table and the
338    /// tree->key mapping. Having this as a method makes call sites clearer and
339    /// reduces the verbosity of expressions that access the segmentation table.
340    fn segment_of_tree_depth(at_depth: usize) -> usize {
341        <Self::Segmentation as KeySegmentation<KEY_LEN>>::SEGMENTS[Self::TREE_TO_KEY[at_depth]]
342    }
343
344    /// Return true if the tree-ordered bytes at `a` and `b` belong to the same
345    /// logical segment.
346    fn same_segment_tree(a: usize, b: usize) -> bool {
347        <Self::Segmentation as KeySegmentation<KEY_LEN>>::SEGMENTS[Self::TREE_TO_KEY[a]]
348            == <Self::Segmentation as KeySegmentation<KEY_LEN>>::SEGMENTS[Self::TREE_TO_KEY[b]]
349    }
350}
351
352/// This trait is used to segment keys stored in the PATCH.
353/// The segmentation is used to determine sub-fields of the key,
354/// allowing for segment based operations, like counting the number
355/// of elements in a segment with a given prefix without traversing the tree.
356///
357/// Note that the segmentation is defined on the shared key ordering,
358/// and should thus be only implemented once, independent of additional key orderings.
359///
360/// See [TribleSegmentation](crate::trible::TribleSegmentation) for an example that segments keys into entity,
361/// attribute, and value segments.
362pub trait KeySegmentation<const KEY_LEN: usize>: Copy + Clone + Debug {
363    /// Segment index for each position in the key.
364    const SEGMENTS: [usize; KEY_LEN];
365}
366
367/// A `KeySchema` that does not reorder the keys.
368/// This is useful for keys that are already ordered in the desired way.
369/// This is the default ordering.
370#[derive(Copy, Clone, Debug)]
371pub struct IdentitySchema {}
372
373/// A `KeySegmentation` that does not segment the keys.
374/// This is useful for keys that do not have a segment structure.
375/// This is the default segmentation.
376#[derive(Copy, Clone, Debug)]
377pub struct SingleSegmentation {}
378impl<const KEY_LEN: usize> KeySchema<KEY_LEN> for IdentitySchema {
379    type Segmentation = SingleSegmentation;
380    const SEGMENT_PERM: &'static [usize] = &[0];
381    const KEY_TO_TREE: [usize; KEY_LEN] = identity_map::<KEY_LEN>();
382    const TREE_TO_KEY: [usize; KEY_LEN] = identity_map::<KEY_LEN>();
383}
384
385impl<const KEY_LEN: usize> KeySegmentation<KEY_LEN> for SingleSegmentation {
386    const SEGMENTS: [usize; KEY_LEN] = [0; KEY_LEN];
387}
388
389#[allow(dead_code)]
390#[derive(Debug, PartialEq, Copy, Clone)]
391#[repr(u8)]
392pub(crate) enum HeadTag {
393    // Stored in the low 4 bits of `Head::tptr` (see Head::new).
394    //
395    // Branch values encode log2(branch_size) (i.e. `Branch2 == 1`, `Branch256
396    // == 8`). `0` is reserved for leaf nodes, which lets us compute the branch
397    // size as `1 << tag` without any offset.
398    Leaf = 0,
399    Branch2 = 1,
400    Branch4 = 2,
401    Branch8 = 3,
402    Branch16 = 4,
403    Branch32 = 5,
404    Branch64 = 6,
405    Branch128 = 7,
406    Branch256 = 8,
407}
408
409impl HeadTag {
410    #[inline]
411    fn from_raw(raw: u8) -> Self {
412        debug_assert!(raw <= HeadTag::Branch256 as u8);
413        // SAFETY: `HeadTag` is `#[repr(u8)]` with a contiguous discriminant
414        // range 0..=8. The tag bits are written by Head::new/set_body and
415        // Branch::tag, which only emit valid discriminants.
416        unsafe { std::mem::transmute(raw) }
417    }
418}
419
420pub(crate) enum BodyPtr<const KEY_LEN: usize, O: KeySchema<KEY_LEN>, V> {
421    Leaf(NonNull<Leaf<KEY_LEN, V>>),
422    Branch(branch::BranchNN<KEY_LEN, O, V>),
423}
424
425/// Immutable borrow view of a Head body.
426/// Returned by `body_ref()` and tied to the lifetime of the `&Head`.
427pub(crate) enum BodyRef<'a, const KEY_LEN: usize, O: KeySchema<KEY_LEN>, V> {
428    Leaf(&'a Leaf<KEY_LEN, V>),
429    Branch(&'a Branch<KEY_LEN, O, [Option<Head<KEY_LEN, O, V>>], V>),
430}
431
432/// Mutable borrow view of a Head body.
433/// Returned by `body_mut()` and tied to the lifetime of the `&mut Head`.
434pub(crate) enum BodyMut<'a, const KEY_LEN: usize, O: KeySchema<KEY_LEN>, V> {
435    Leaf(&'a mut Leaf<KEY_LEN, V>),
436    Branch(&'a mut Branch<KEY_LEN, O, [Option<Head<KEY_LEN, O, V>>], V>),
437}
438
439pub(crate) trait Body {
440    fn tag(body: NonNull<Self>) -> HeadTag;
441}
442
443#[repr(C)]
444pub(crate) struct Head<const KEY_LEN: usize, O: KeySchema<KEY_LEN>, V> {
445    tptr: std::ptr::NonNull<u8>,
446    key_ordering: PhantomData<O>,
447    key_segments: PhantomData<O::Segmentation>,
448    value: PhantomData<V>,
449}
450
451unsafe impl<const KEY_LEN: usize, O: KeySchema<KEY_LEN>, V> Send for Head<KEY_LEN, O, V> {}
452unsafe impl<const KEY_LEN: usize, O: KeySchema<KEY_LEN>, V> Sync for Head<KEY_LEN, O, V> {}
453
454impl<const KEY_LEN: usize, O: KeySchema<KEY_LEN>, V> Head<KEY_LEN, O, V> {
455    // Tagged pointer layout (64-bit only):
456    // - bits 0..=3:   HeadTag (requires 16-byte aligned bodies)
457    // - bits 4..=55:  body pointer bits (52 bits)
458    // - bits 56..=63: key byte for cuckoo table lookup
459    const TAG_MASK: u64 = 0x0f;
460    const BODY_MASK: u64 = 0x00_ff_ff_ff_ff_ff_ff_f0;
461    const KEY_MASK: u64 = 0xff_00_00_00_00_00_00_00;
462
463    pub(crate) fn new<T: Body + ?Sized>(key: u8, body: NonNull<T>) -> Self {
464        unsafe {
465            let tptr =
466                std::ptr::NonNull::new_unchecked((body.as_ptr() as *mut u8).map_addr(|addr| {
467                    debug_assert_eq!(addr as u64 & Self::TAG_MASK, 0);
468                    ((addr as u64 & Self::BODY_MASK)
469                        | ((key as u64) << 56)
470                        | (<T as Body>::tag(body) as u64)) as usize
471                }));
472            Self {
473                tptr,
474                key_ordering: PhantomData,
475                key_segments: PhantomData,
476                value: PhantomData,
477            }
478        }
479    }
480
481    #[inline]
482    pub(crate) fn tag(&self) -> HeadTag {
483        HeadTag::from_raw((self.tptr.as_ptr() as u64 & Self::TAG_MASK) as u8)
484    }
485
486    #[inline]
487    pub(crate) fn key(&self) -> u8 {
488        (self.tptr.as_ptr() as u64 >> 56) as u8
489    }
490
491    #[inline]
492    pub(crate) fn with_key(mut self, key: u8) -> Self {
493        self.tptr =
494            std::ptr::NonNull::new(self.tptr.as_ptr().map_addr(|addr| {
495                ((addr as u64 & !Self::KEY_MASK) | ((key as u64) << 56)) as usize
496            }))
497            .unwrap();
498        self
499    }
500
501    #[inline]
502    pub(crate) fn set_body<T: Body + ?Sized>(&mut self, body: NonNull<T>) {
503        unsafe {
504            self.tptr = NonNull::new_unchecked((body.as_ptr() as *mut u8).map_addr(|addr| {
505                debug_assert_eq!(addr as u64 & Self::TAG_MASK, 0);
506                ((addr as u64 & Self::BODY_MASK)
507                    | (self.tptr.as_ptr() as u64 & Self::KEY_MASK)
508                    | (<T as Body>::tag(body) as u64)) as usize
509            }))
510        }
511    }
512
513    pub(crate) fn with_start(self, new_start_depth: usize) -> Head<KEY_LEN, O, V> {
514        let leaf_key = self.childleaf_key();
515        let i = O::TREE_TO_KEY[new_start_depth];
516        let key = leaf_key[i];
517        self.with_key(key)
518    }
519
520    // Removed childleaf_matches_key_from in favor of composing the existing
521    // has_prefix primitives directly at call sites. Use
522    // `self.has_prefix::<KEY_LEN>(at_depth, key)` or for partial checks
523    // `self.childleaf().has_prefix::<O>(at_depth, &key[..limit])` instead.
524
525    pub(crate) fn body(&self) -> BodyPtr<KEY_LEN, O, V> {
526        unsafe {
527            let ptr = NonNull::new_unchecked(self.tptr.as_ptr().map_addr(|addr| {
528                let masked = (addr as u64) & Self::BODY_MASK;
529                masked as usize
530            }));
531            match self.tag() {
532                HeadTag::Leaf => BodyPtr::Leaf(ptr.cast()),
533                branch_tag => {
534                    let count = 1 << (branch_tag as usize);
535                    BodyPtr::Branch(NonNull::new_unchecked(std::ptr::slice_from_raw_parts(
536                        ptr.as_ptr(),
537                        count,
538                    )
539                        as *mut Branch<KEY_LEN, O, [Option<Head<KEY_LEN, O, V>>], V>))
540                }
541            }
542        }
543    }
544
545    pub(crate) fn body_mut(&mut self) -> BodyMut<'_, KEY_LEN, O, V> {
546        unsafe {
547            match self.body() {
548                BodyPtr::Leaf(mut leaf) => BodyMut::Leaf(leaf.as_mut()),
549                BodyPtr::Branch(mut branch) => {
550                    // Ensure ownership: try copy-on-write and update local pointer if needed.
551                    let mut branch_nn = branch;
552                    if Branch::rc_cow(&mut branch_nn).is_some() {
553                        self.set_body(branch_nn);
554                        BodyMut::Branch(branch_nn.as_mut())
555                    } else {
556                        BodyMut::Branch(branch.as_mut())
557                    }
558                }
559            }
560        }
561    }
562
563    /// Returns an immutable borrow of the body (Leaf or Branch) tied to &self.
564    pub(crate) fn body_ref(&self) -> BodyRef<'_, KEY_LEN, O, V> {
565        match self.body() {
566            BodyPtr::Leaf(nn) => BodyRef::Leaf(unsafe { nn.as_ref() }),
567            BodyPtr::Branch(nn) => BodyRef::Branch(unsafe { nn.as_ref() }),
568        }
569    }
570
571    pub(crate) fn count(&self) -> u64 {
572        match self.body_ref() {
573            BodyRef::Leaf(_) => 1,
574            BodyRef::Branch(branch) => branch.leaf_count,
575        }
576    }
577
578    pub(crate) fn count_segment(&self, at_depth: usize) -> u64 {
579        match self.body_ref() {
580            BodyRef::Leaf(_) => 1,
581            BodyRef::Branch(branch) => branch.count_segment(at_depth),
582        }
583    }
584
585    pub(crate) fn hash(&self) -> u128 {
586        match self.body_ref() {
587            BodyRef::Leaf(leaf) => leaf.hash,
588            BodyRef::Branch(branch) => branch.hash,
589        }
590    }
591
592    pub(crate) fn end_depth(&self) -> usize {
593        match self.body_ref() {
594            BodyRef::Leaf(_) => KEY_LEN,
595            BodyRef::Branch(branch) => branch.end_depth as usize,
596        }
597    }
598
599    /// Return the raw pointer to the child leaf for use in low-level
600    /// operations (for example when constructing a Branch). Prefer
601    /// `childleaf_key()` or other safe accessors when you only need the
602    /// key or value; those avoid unsafe dereferences.
603    pub(crate) fn childleaf_ptr(&self) -> *const Leaf<KEY_LEN, V> {
604        match self.body_ref() {
605            BodyRef::Leaf(leaf) => leaf as *const Leaf<KEY_LEN, V>,
606            BodyRef::Branch(branch) => branch.childleaf_ptr(),
607        }
608    }
609
610    pub(crate) fn childleaf_key(&self) -> &[u8; KEY_LEN] {
611        match self.body_ref() {
612            BodyRef::Leaf(leaf) => &leaf.key,
613            BodyRef::Branch(branch) => &branch.childleaf().key,
614        }
615    }
616
617    // Slot wrapper defined at module level (moved to below the impl block)
618
619    /// Find the first depth in [start_depth, limit) where the tree-ordered
620    /// bytes of `self` and `other` differ. The comparison limit is computed
621    /// as min(self.end_depth(), other.end_depth(), KEY_LEN) which is the
622    /// natural bound for comparing two heads. Returns `Some((depth, a, b))`
623    /// where `a` and `b` are the differing bytes at that depth, or `None`
624    /// if no divergence is found in the range.
625    pub(crate) fn first_divergence(
626        &self,
627        other: &Self,
628        start_depth: usize,
629    ) -> Option<(usize, u8, u8)> {
630        let limit = std::cmp::min(std::cmp::min(self.end_depth(), other.end_depth()), KEY_LEN);
631        debug_assert!(limit <= KEY_LEN);
632        let this_key = self.childleaf_key();
633        let other_key = other.childleaf_key();
634        let mut depth = start_depth;
635        while depth < limit {
636            let i = O::TREE_TO_KEY[depth];
637            let a = this_key[i];
638            let b = other_key[i];
639            if a != b {
640                return Some((depth, a, b));
641            }
642            depth += 1;
643        }
644        None
645    }
646
647    // Mutable access to the child slots for this head. If the head is a
648    // branch, returns a mutable slice referencing the underlying child table
649    // (each element is Option<Head>). If the head is a leaf an empty slice
650    // is returned.
651    //
652    // The caller receives a &mut slice tied to the borrow of `self` and may
653    // reorder entries in-place (e.g., sort_unstable) and then take them using
654    // `Option::take()` to extract Head values. The call uses `body_mut()` so
655    // COW semantics are preserved and callers have exclusive access to the
656    // branch storage while the mutable borrow lasts.
657    // NOTE: mut_children removed — prefer matching on BodyRef returned by
658    // `body_mut()` and operating directly on the `&mut Branch` reference.
659
660    pub(crate) fn remove_leaf(
661        slot: &mut Option<Self>,
662        leaf_key: &[u8; KEY_LEN],
663        start_depth: usize,
664    ) {
665        if let Some(this) = slot {
666            let end_depth = std::cmp::min(this.end_depth(), KEY_LEN);
667            // Check reachable equality by asking the head to test the prefix
668            // up to its end_depth. Using the head/leaf primitive centralises the
669            // unsafe deref into Branch::childleaf()/Leaf::has_prefix.
670            if !this.has_prefix::<KEY_LEN>(start_depth, leaf_key) {
671                return;
672            }
673            if this.tag() == HeadTag::Leaf {
674                slot.take();
675            } else {
676                let mut ed = crate::patch::branch::BranchMut::from_head(this);
677                let key = leaf_key[end_depth];
678                ed.modify_child(key, |mut opt| {
679                    Self::remove_leaf(&mut opt, leaf_key, end_depth);
680                    opt
681                });
682
683                // If the branch now contains a single remaining child we
684                // collapse the branch upward into that child. We must pull
685                // the remaining child out while `ed` is still borrowed,
686                // then drop `ed` before writing back into `slot` to avoid
687                // double mutable borrows of the slot.
688                if ed.leaf_count == 1 {
689                    let mut remaining: Option<Head<KEY_LEN, O, V>> = None;
690                    for slot_child in &mut ed.child_table {
691                        if let Some(child) = slot_child.take() {
692                            remaining = Some(child.with_start(start_depth));
693                            break;
694                        }
695                    }
696                    drop(ed);
697                    if let Some(child) = remaining {
698                        slot.replace(child);
699                    }
700                } else {
701                    // ensure we drop the editor when not collapsing so the
702                    // final pointer is committed back into the head.
703                    drop(ed);
704                }
705            }
706        }
707    }
708
709    // NOTE: slot-level wrappers removed; callers should take the slot and call
710    // the owned helpers (insert_leaf / replace_leaf / union)
711    // directly. This reduces the indirection and keeps ownership semantics
712    // explicit at the call site.
713
714    // Owned variants of the slot-based helpers. These accept the existing
715    // Head by value and return the new Head after performing the
716    // modification. They are used with the split `insert_child` /
717    // `update_child` APIs so we no longer need `Branch::upsert_child`.
718    pub(crate) fn insert_leaf(mut this: Self, leaf: Self, start_depth: usize) -> Self {
719        if let Some((depth, this_byte_key, leaf_byte_key)) =
720            this.first_divergence(&leaf, start_depth)
721        {
722            let old_key = this.key();
723            let new_body = Branch::new(
724                depth,
725                this.with_key(this_byte_key),
726                leaf.with_key(leaf_byte_key),
727            );
728            return Head::new(old_key, new_body);
729        }
730
731        let end_depth = this.end_depth();
732        if end_depth != KEY_LEN {
733            // Use the editable BranchMut view to perform mutations without
734            // exposing pointer juggling at the call site.
735            let mut ed = crate::patch::branch::BranchMut::from_head(&mut this);
736            let inserted = leaf.with_start(ed.end_depth as usize);
737            let key = inserted.key();
738            ed.modify_child(key, |opt| match opt {
739                Some(old) => Some(Head::insert_leaf(old, inserted, end_depth)),
740                None => Some(inserted),
741            });
742        }
743        this
744    }
745
746    pub(crate) fn replace_leaf(mut this: Self, leaf: Self, start_depth: usize) -> Self {
747        if let Some((depth, this_byte_key, leaf_byte_key)) =
748            this.first_divergence(&leaf, start_depth)
749        {
750            let old_key = this.key();
751            let new_body = Branch::new(
752                depth,
753                this.with_key(this_byte_key),
754                leaf.with_key(leaf_byte_key),
755            );
756
757            return Head::new(old_key, new_body);
758        }
759
760        let end_depth = this.end_depth();
761        if end_depth == KEY_LEN {
762            let old_key = this.key();
763            return leaf.with_key(old_key);
764        } else {
765            // Use the editor view for branch mutation instead of raw pointer ops.
766            let mut ed = crate::patch::branch::BranchMut::from_head(&mut this);
767            let inserted = leaf.with_start(ed.end_depth as usize);
768            let key = inserted.key();
769            ed.modify_child(key, |opt| match opt {
770                Some(old) => Some(Head::replace_leaf(old, inserted, end_depth)),
771                None => Some(inserted),
772            });
773        }
774        this
775    }
776
777    pub(crate) fn union(mut this: Self, mut other: Self, at_depth: usize) -> Self
778    where
779        O: Send + Sync,
780        V: Send + Sync,
781    {
782        if this.hash() == other.hash() {
783            return this;
784        }
785
786        if let Some((depth, this_byte_key, other_byte_key)) =
787            this.first_divergence(&other, at_depth)
788        {
789            let old_key = this.key();
790            let new_body = Branch::new(
791                depth,
792                this.with_key(this_byte_key),
793                other.with_key(other_byte_key),
794            );
795
796            return Head::new(old_key, new_body);
797        }
798
799        let this_depth = this.end_depth();
800        let other_depth = other.end_depth();
801        if this_depth < other_depth {
802            // Use BranchMut to edit `this` safely and avoid pointer juggling.
803            let mut ed = crate::patch::branch::BranchMut::from_head(&mut this);
804            let inserted = other.with_start(ed.end_depth as usize);
805            let key = inserted.key();
806            ed.modify_child(key, |opt| match opt {
807                Some(old) => Some(Head::union(old, inserted, this_depth)),
808                None => Some(inserted),
809            });
810
811            drop(ed);
812            return this;
813        }
814
815        if other_depth < this_depth {
816            let old_key = this.key();
817            let this_head = this;
818            let mut ed = crate::patch::branch::BranchMut::from_head(&mut other);
819            let inserted = this_head.with_start(ed.end_depth as usize);
820            let key = inserted.key();
821            ed.modify_child(key, |opt| match opt {
822                Some(old) => Some(Head::union(old, inserted, other_depth)),
823                None => Some(inserted),
824            });
825            drop(ed);
826
827            return other.with_key(old_key);
828        }
829
830        // Both depths are equal and the hashes differ: merge children.
831        //
832        // For small `other` (the common case in serial folds where each
833        // `+=` adds a handful of tribles) the existing `modify_child`
834        // loop wins: it only touches keys present in `other`, with one
835        // cuckoo lookup + drain + closure-resolve + install per key.
836        //
837        // When `other` is large (e.g. final pair-merges in a parallel
838        // reduce, where both sides hold millions of tribles), we
839        // amortise the scatter overhead and `par_iter_mut` across
840        // rayon: scatter both child tables into a 256-slot pair array
841        // keyed by byte, parallel-resolve, install all results, then
842        // recompute aggregates in one pass at the end.
843        let BodyMut::Branch(other_branch_ref) = other.body_mut() else {
844            unreachable!();
845        };
846
847        #[cfg(feature = "parallel")]
848        if other_branch_ref.leaf_count >= PARALLEL_PATCH_UNION_THRESHOLD as u64 {
849            use rayon::iter::ParallelIterator;
850
851            {
852                let mut ed = crate::patch::branch::BranchMut::from_head(&mut this);
853                let end_depth = ed.end_depth as usize;
854
855                // Scatter both child tables into key-indexed 256-slot
856                // arrays plus `present` bitsets. The bitsets identify
857                // which keys actually need a recursive union
858                // (intersection) vs simple pass-through (symmetric
859                // difference), so the parallel work list is sized to
860                // popcount(both) rather than the full 256.
861                let mut this_arr: [Option<Head<KEY_LEN, O, V>>; 256] =
862                    std::array::from_fn(|_| None);
863                let mut other_arr: [Option<Head<KEY_LEN, O, V>>; 256] =
864                    std::array::from_fn(|_| None);
865                let mut this_present = crate::patch::bytetable::ByteSet::new_empty();
866                let mut other_present = crate::patch::bytetable::ByteSet::new_empty();
867
868                for slot in ed.child_table.iter_mut() {
869                    if let Some(head) = slot.take() {
870                        let key = head.key();
871                        this_present.insert(key);
872                        this_arr[key as usize] = Some(head);
873                    }
874                }
875                for slot in other_branch_ref.child_table.iter_mut() {
876                    if let Some(head) = slot.take() {
877                        let head = head.with_start(end_depth);
878                        let key = head.key();
879                        other_present.insert(key);
880                        other_arr[key as usize] = Some(head);
881                    }
882                }
883
884                let mut both = this_present.intersect(&other_present);
885                let mut only = this_present.symmetric_difference(&other_present);
886
887                // Build owned (key, this, other) work tuples.
888                let mut work: Vec<(u8, Head<KEY_LEN, O, V>, Head<KEY_LEN, O, V>)> =
889                    Vec::with_capacity(both.popcount() as usize);
890                while let Some(k) = both.drain_next_ascending() {
891                    let i = k as usize;
892                    let t = this_arr[i].take().expect("both ⇒ this present");
893                    let o = other_arr[i].take().expect("both ⇒ other present");
894                    work.push((k, t, o));
895                }
896
897                // Adaptive rayon: split happens on stealing demand, not
898                // up-front chunking. For idle thread pools the producer
899                // folds entirely sequentially.
900                let resolved: Vec<(u8, Head<KEY_LEN, O, V>)> =
901                    parallel_union::UnionProducer::new(work, this_depth).collect();
902
903                for (_, head) in resolved {
904                    ed.install_child_growing(head);
905                }
906                while let Some(k) = only.drain_next_ascending() {
907                    let i = k as usize;
908                    let head = this_arr[i]
909                        .take()
910                        .or_else(|| other_arr[i].take())
911                        .expect("only ⇒ exactly one side present");
912                    ed.install_child_growing(head);
913                }
914
915                ed.recompute_aggregates();
916            }
917            return this;
918        }
919
920        // Serial path — small `other`; per-key modify_child wins here.
921        {
922            let mut ed = crate::patch::branch::BranchMut::from_head(&mut this);
923            for other_child in other_branch_ref
924                .child_table
925                .iter_mut()
926                .filter_map(Option::take)
927            {
928                let inserted = other_child.with_start(ed.end_depth as usize);
929                let key = inserted.key();
930                ed.modify_child(key, |opt| match opt {
931                    Some(old) => Some(Head::union(old, inserted, this_depth)),
932                    None => Some(inserted),
933                });
934            }
935        }
936        this
937    }
938
939    pub(crate) fn infixes<const PREFIX_LEN: usize, const INFIX_LEN: usize, F>(
940        &self,
941        prefix: &[u8; PREFIX_LEN],
942        at_depth: usize,
943        f: &mut F,
944    ) where
945        F: FnMut(&[u8; INFIX_LEN]),
946    {
947        match self.body_ref() {
948            BodyRef::Leaf(leaf) => leaf.infixes::<PREFIX_LEN, INFIX_LEN, O, F>(prefix, at_depth, f),
949            BodyRef::Branch(branch) => {
950                branch.infixes::<PREFIX_LEN, INFIX_LEN, F>(prefix, at_depth, f)
951            }
952        }
953    }
954
955    pub(crate) fn infixes_range<const PREFIX_LEN: usize, const INFIX_LEN: usize, F>(
956        &self,
957        prefix: &[u8; PREFIX_LEN],
958        at_depth: usize,
959        min_infix: &[u8; INFIX_LEN],
960        max_infix: &[u8; INFIX_LEN],
961        f: &mut F,
962    ) where
963        F: FnMut(&[u8; INFIX_LEN]),
964    {
965        match self.body_ref() {
966            BodyRef::Leaf(leaf) => leaf.infixes_range::<PREFIX_LEN, INFIX_LEN, O, F>(
967                prefix, at_depth, min_infix, max_infix, f,
968            ),
969            BodyRef::Branch(branch) => branch.infixes_range::<PREFIX_LEN, INFIX_LEN, F>(
970                prefix, at_depth, min_infix, max_infix, f,
971            ),
972        }
973    }
974
975    pub(crate) fn count_range<const PREFIX_LEN: usize, const INFIX_LEN: usize>(
976        &self,
977        prefix: &[u8; PREFIX_LEN],
978        at_depth: usize,
979        min_infix: &[u8; INFIX_LEN],
980        max_infix: &[u8; INFIX_LEN],
981    ) -> u64 {
982        match self.body_ref() {
983            BodyRef::Leaf(leaf) => {
984                leaf.count_range::<PREFIX_LEN, INFIX_LEN, O>(prefix, at_depth, min_infix, max_infix)
985            }
986            BodyRef::Branch(branch) => {
987                branch.count_range::<PREFIX_LEN, INFIX_LEN>(prefix, at_depth, min_infix, max_infix)
988            }
989        }
990    }
991
992    pub(crate) fn has_prefix<const PREFIX_LEN: usize>(
993        &self,
994        at_depth: usize,
995        prefix: &[u8; PREFIX_LEN],
996    ) -> bool {
997        const {
998            assert!(PREFIX_LEN <= KEY_LEN);
999        }
1000        match self.body_ref() {
1001            BodyRef::Leaf(leaf) => leaf.has_prefix::<O>(at_depth, prefix),
1002            BodyRef::Branch(branch) => branch.has_prefix::<PREFIX_LEN>(at_depth, prefix),
1003        }
1004    }
1005
1006    pub(crate) fn get<'a>(&'a self, at_depth: usize, key: &[u8; KEY_LEN]) -> Option<&'a V>
1007    where
1008        O: 'a,
1009    {
1010        match self.body_ref() {
1011            BodyRef::Leaf(leaf) => leaf.get::<O>(at_depth, key),
1012            BodyRef::Branch(branch) => branch.get(at_depth, key),
1013        }
1014    }
1015
1016    pub(crate) fn segmented_len<const PREFIX_LEN: usize>(
1017        &self,
1018        at_depth: usize,
1019        prefix: &[u8; PREFIX_LEN],
1020    ) -> u64 {
1021        match self.body_ref() {
1022            BodyRef::Leaf(leaf) => leaf.segmented_len::<O, PREFIX_LEN>(at_depth, prefix),
1023            BodyRef::Branch(branch) => branch.segmented_len::<PREFIX_LEN>(at_depth, prefix),
1024        }
1025    }
1026
1027    // NOTE: slot-level union wrapper removed; callers should take the slot and
1028    // call the owned helper `union` directly.
1029
1030    pub(crate) fn intersect(&self, other: &Self, at_depth: usize) -> Option<Self> {
1031        if self.hash() == other.hash() {
1032            return Some(self.clone());
1033        }
1034
1035        if self.first_divergence(other, at_depth).is_some() {
1036            return None;
1037        }
1038
1039        let self_depth = self.end_depth();
1040        let other_depth = other.end_depth();
1041        if self_depth < other_depth {
1042            // This means that there can be at most one child in self
1043            // that might intersect with other.
1044            let BodyRef::Branch(branch) = self.body_ref() else {
1045                unreachable!();
1046            };
1047            return branch
1048                .child_table
1049                .table_get(other.childleaf_key()[O::TREE_TO_KEY[self_depth]])
1050                .and_then(|self_child| other.intersect(self_child, self_depth));
1051        }
1052
1053        if other_depth < self_depth {
1054            // This means that there can be at most one child in other
1055            // that might intersect with self.
1056            // If the depth of other is less than the depth of self, then it can't be a leaf.
1057            let BodyRef::Branch(other_branch) = other.body_ref() else {
1058                unreachable!();
1059            };
1060            return other_branch
1061                .child_table
1062                .table_get(self.childleaf_key()[O::TREE_TO_KEY[other_depth]])
1063                .and_then(|other_child| self.intersect(other_child, other_depth));
1064        }
1065
1066        // If we reached this point then the depths are equal. The only way to have a leaf
1067        // is if the other is a leaf as well, which is already handled by the hash check if they are equal,
1068        // and by the key check if they are not equal.
1069        // If one of them is a leaf and the other is a branch, then they would also have different depths,
1070        // which is already handled by the above code.
1071        let BodyRef::Branch(self_branch) = self.body_ref() else {
1072            unreachable!();
1073        };
1074        let BodyRef::Branch(other_branch) = other.body_ref() else {
1075            unreachable!();
1076        };
1077
1078        let mut intersected_children = self_branch
1079            .child_table
1080            .iter()
1081            .filter_map(Option::as_ref)
1082            .filter_map(|self_child| {
1083                let other_child = other_branch.child_table.table_get(self_child.key())?;
1084                self_child.intersect(other_child, self_depth)
1085            });
1086        let first_child = intersected_children.next()?;
1087        let Some(second_child) = intersected_children.next() else {
1088            return Some(first_child);
1089        };
1090        let new_branch = Branch::new(
1091            self_depth,
1092            first_child.with_start(self_depth),
1093            second_child.with_start(self_depth),
1094        );
1095        // Use a BranchMut editor to perform all child insertions via the
1096        // safe editor API instead of manipulating the NonNull pointer
1097        // directly. The editor will perform COW and commit the final
1098        // pointer into the Head when it is dropped.
1099        let mut head_for_branch = Head::new(0, new_branch);
1100        {
1101            let mut ed = crate::patch::branch::BranchMut::from_head(&mut head_for_branch);
1102            for child in intersected_children {
1103                let inserted = child.with_start(self_depth);
1104                let k = inserted.key();
1105                ed.modify_child(k, |_opt| Some(inserted));
1106            }
1107            // ed dropped here commits the final branch pointer into head_for_branch
1108        }
1109        Some(head_for_branch)
1110    }
1111
1112    /// Returns the difference between self and other.
1113    /// This is the set of elements that are in self but not in other.
1114    /// If the difference is empty, None is returned.
1115    pub(crate) fn difference(&self, other: &Self, at_depth: usize) -> Option<Self> {
1116        if self.hash() == other.hash() {
1117            return None;
1118        }
1119
1120        if self.first_divergence(other, at_depth).is_some() {
1121            return Some(self.clone());
1122        }
1123
1124        let self_depth = self.end_depth();
1125        let other_depth = other.end_depth();
1126        if self_depth < other_depth {
1127            // This means that there can be at most one child in self
1128            // that might intersect with other. It's the only child that may not be in the difference.
1129            // The other children are definitely in the difference, as they have no corresponding byte in other.
1130            // Thus the cheapest way to compute the difference is compute the difference of the only child
1131            // that might intersect with other, copy self with it's correctly filled byte table, then
1132            // remove the old child, and insert the new child.
1133            let mut new_branch = self.clone();
1134            let other_byte_key = other.childleaf_key()[O::TREE_TO_KEY[self_depth]];
1135            {
1136                let mut ed = crate::patch::branch::BranchMut::from_head(&mut new_branch);
1137                ed.modify_child(other_byte_key, |opt| {
1138                    opt.and_then(|child| child.difference(other, self_depth))
1139                });
1140            }
1141            return Some(new_branch);
1142        }
1143
1144        if other_depth < self_depth {
1145            // This means that we need to check if there is a child in other
1146            // that matches the path at the current depth of self.
1147            // There is no such child, then then self must be in the difference.
1148            // If there is such a child, then we have to compute the difference
1149            // between self and that child.
1150            // We know that other must be a branch.
1151            let BodyRef::Branch(other_branch) = other.body_ref() else {
1152                unreachable!();
1153            };
1154            let self_byte_key = self.childleaf_key()[O::TREE_TO_KEY[other_depth]];
1155            if let Some(other_child) = other_branch.child_table.table_get(self_byte_key) {
1156                return self.difference(other_child, at_depth);
1157            } else {
1158                return Some(self.clone());
1159            }
1160        }
1161
1162        // If we reached this point then the depths are equal. The only way to have a leaf
1163        // is if the other is a leaf as well, which is already handled by the hash check if they are equal,
1164        // and by the key check if they are not equal.
1165        // If one of them is a leaf and the other is a branch, then they would also have different depths,
1166        // which is already handled by the above code.
1167        let BodyRef::Branch(self_branch) = self.body_ref() else {
1168            unreachable!();
1169        };
1170        let BodyRef::Branch(other_branch) = other.body_ref() else {
1171            unreachable!();
1172        };
1173
1174        let mut differenced_children = self_branch
1175            .child_table
1176            .iter()
1177            .filter_map(Option::as_ref)
1178            .filter_map(|self_child| {
1179                if let Some(other_child) = other_branch.child_table.table_get(self_child.key()) {
1180                    self_child.difference(other_child, self_depth)
1181                } else {
1182                    Some(self_child.clone())
1183                }
1184            });
1185
1186        let first_child = differenced_children.next()?;
1187        let second_child = match differenced_children.next() {
1188            Some(sc) => sc,
1189            None => return Some(first_child),
1190        };
1191
1192        let new_branch = Branch::new(
1193            self_depth,
1194            first_child.with_start(self_depth),
1195            second_child.with_start(self_depth),
1196        );
1197        let mut head_for_branch = Head::new(0, new_branch);
1198        {
1199            let mut ed = crate::patch::branch::BranchMut::from_head(&mut head_for_branch);
1200            for child in differenced_children {
1201                let inserted = child.with_start(self_depth);
1202                let k = inserted.key();
1203                ed.modify_child(k, |_opt| Some(inserted));
1204            }
1205            // ed dropped here commits the final branch pointer into head_for_branch
1206        }
1207        // The key will be set later, because we don't know it yet.
1208        // The difference might remove multiple levels of branches,
1209        // so we can't just take the key from self or other.
1210        Some(head_for_branch)
1211    }
1212}
1213
1214unsafe impl<const KEY_LEN: usize, O: KeySchema<KEY_LEN>, V> ByteEntry for Head<KEY_LEN, O, V> {
1215    fn key(&self) -> u8 {
1216        self.key()
1217    }
1218}
1219
1220impl<const KEY_LEN: usize, O: KeySchema<KEY_LEN>, V> fmt::Debug for Head<KEY_LEN, O, V> {
1221    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1222        self.tag().fmt(f)
1223    }
1224}
1225
1226impl<const KEY_LEN: usize, O: KeySchema<KEY_LEN>, V> Clone for Head<KEY_LEN, O, V> {
1227    fn clone(&self) -> Self {
1228        unsafe {
1229            match self.body() {
1230                BodyPtr::Leaf(leaf) => Self::new(self.key(), Leaf::rc_inc(leaf)),
1231                BodyPtr::Branch(branch) => Self::new(self.key(), Branch::rc_inc(branch)),
1232            }
1233        }
1234    }
1235}
1236
1237// The Slot wrapper was removed in favor of using BranchMut::from_slot(&mut
1238// Option<Head<...>>) directly. This keeps the API surface smaller and
1239// avoids an extra helper type that simply forwarded to BranchMut.
1240
1241impl<const KEY_LEN: usize, O: KeySchema<KEY_LEN>, V> Drop for Head<KEY_LEN, O, V> {
1242    fn drop(&mut self) {
1243        unsafe {
1244            match self.body() {
1245                BodyPtr::Leaf(leaf) => Leaf::rc_dec(leaf),
1246                BodyPtr::Branch(branch) => Branch::rc_dec(branch),
1247            }
1248        }
1249    }
1250}
1251
1252/// A PATCH is a persistent data structure that stores a set of keys.
1253/// Each key can be reordered and segmented, based on the provided key ordering and segmentation.
1254///
1255/// The patch supports efficient set operations, like union, intersection, and difference,
1256/// because it efficiently maintains a hash for all keys that are part of a sub-tree.
1257///
1258/// The tree itself is a path- and node-compressed a 256-ary trie.
1259/// Each nodes stores its children in a byte oriented cuckoo hash table,
1260/// allowing for O(1) access to children, while keeping the memory overhead low.
1261/// Table sizes are powers of two, starting at 2.
1262///
1263/// Having a single node type for all branching factors simplifies the implementation,
1264/// compared to other adaptive trie implementations, like ARTs or Judy Arrays
1265///
1266/// The PATCH allows for cheap copy-on-write operations, with `clone` being O(1).
1267#[derive(Debug)]
1268pub struct PATCH<const KEY_LEN: usize, O = IdentitySchema, V = ()>
1269where
1270    O: KeySchema<KEY_LEN>,
1271{
1272    root: Option<Head<KEY_LEN, O, V>>,
1273}
1274
1275impl<const KEY_LEN: usize, O, V> Clone for PATCH<KEY_LEN, O, V>
1276where
1277    O: KeySchema<KEY_LEN>,
1278{
1279    fn clone(&self) -> Self {
1280        Self {
1281            root: self.root.clone(),
1282        }
1283    }
1284}
1285
1286impl<const KEY_LEN: usize, O, V> Default for PATCH<KEY_LEN, O, V>
1287where
1288    O: KeySchema<KEY_LEN>,
1289{
1290    fn default() -> Self {
1291        Self::new()
1292    }
1293}
1294
1295impl<const KEY_LEN: usize, O, V> PATCH<KEY_LEN, O, V>
1296where
1297    O: KeySchema<KEY_LEN>,
1298{
1299    /// Creates a new empty PATCH.
1300    pub fn new() -> Self {
1301        init_sip_key();
1302        PATCH { root: None }
1303    }
1304
1305    /// Inserts a shared key into the PATCH.
1306    ///
1307    /// Takes an [Entry] object that can be created from a key,
1308    /// and inserted into multiple PATCH instances.
1309    ///
1310    /// If the key is already present, this is a no-op.
1311    pub fn insert(&mut self, entry: &Entry<KEY_LEN, V>) {
1312        if self.root.is_some() {
1313            let this = self.root.take().expect("root should not be empty");
1314            let new_head = Head::insert_leaf(this, entry.leaf(), 0);
1315            self.root.replace(new_head);
1316        } else {
1317            self.root.replace(entry.leaf());
1318        }
1319    }
1320
1321    /// Inserts a key into the PATCH, replacing the value if it already exists.
1322    pub fn replace(&mut self, entry: &Entry<KEY_LEN, V>) {
1323        if self.root.is_some() {
1324            let this = self.root.take().expect("root should not be empty");
1325            let new_head = Head::replace_leaf(this, entry.leaf(), 0);
1326            self.root.replace(new_head);
1327        } else {
1328            self.root.replace(entry.leaf());
1329        }
1330    }
1331
1332    /// Removes a key from the PATCH.
1333    ///
1334    /// If the key is not present, this is a no-op.
1335    pub fn remove(&mut self, key: &[u8; KEY_LEN]) {
1336        Head::remove_leaf(&mut self.root, key, 0);
1337    }
1338
1339    /// Returns the number of keys in the PATCH.
1340    pub fn len(&self) -> u64 {
1341        if let Some(root) = &self.root {
1342            root.count()
1343        } else {
1344            0
1345        }
1346    }
1347
1348    /// Returns true if the PATCH contains no keys.
1349    pub fn is_empty(&self) -> bool {
1350        self.len() == 0
1351    }
1352
1353    pub(crate) fn root_hash(&self) -> Option<u128> {
1354        self.root.as_ref().map(|root| root.hash())
1355    }
1356
1357    /// Returns the value associated with `key` if present.
1358    pub fn get(&self, key: &[u8; KEY_LEN]) -> Option<&V> {
1359        self.root.as_ref().and_then(|root| root.get(0, key))
1360    }
1361
1362    /// Allows iteratig over all infixes of a given length with a given prefix.
1363    /// Each infix is passed to the provided closure.
1364    ///
1365    /// The entire operation is performed over the tree view ordering of the keys.
1366    ///
1367    /// The length of the prefix and the infix is provided as type parameters,
1368    /// but will usually inferred from the arguments.
1369    ///
1370    /// The sum of `PREFIX_LEN` and `INFIX_LEN` must be less than or equal to `KEY_LEN`
1371    /// or a compile-time assertion will fail.
1372    ///
1373    /// Because all infixes are iterated in one go, less bookkeeping is required,
1374    /// than when using an Iterator, allowing for better performance.
1375    pub fn infixes<const PREFIX_LEN: usize, const INFIX_LEN: usize, F>(
1376        &self,
1377        prefix: &[u8; PREFIX_LEN],
1378        mut for_each: F,
1379    ) where
1380        F: FnMut(&[u8; INFIX_LEN]),
1381    {
1382        const {
1383            assert!(PREFIX_LEN + INFIX_LEN <= KEY_LEN);
1384        }
1385        assert!(
1386            O::same_segment_tree(PREFIX_LEN, PREFIX_LEN + INFIX_LEN - 1)
1387                && (PREFIX_LEN + INFIX_LEN == KEY_LEN
1388                    || !O::same_segment_tree(PREFIX_LEN + INFIX_LEN - 1, PREFIX_LEN + INFIX_LEN)),
1389            "INFIX_LEN must cover a whole segment"
1390        );
1391        if let Some(root) = &self.root {
1392            root.infixes(prefix, 0, &mut for_each);
1393        }
1394    }
1395
1396    /// Like [`infixes`](Self::infixes) but only yields infixes in the
1397    /// byte range `[min_infix, max_infix]` (inclusive).
1398    ///
1399    /// The trie is pruned at each depth: branches whose byte key falls
1400    /// outside the range at the current infix position are skipped
1401    /// entirely, avoiding traversal of irrelevant subtrees.
1402    pub fn infixes_range<const PREFIX_LEN: usize, const INFIX_LEN: usize, F>(
1403        &self,
1404        prefix: &[u8; PREFIX_LEN],
1405        min_infix: &[u8; INFIX_LEN],
1406        max_infix: &[u8; INFIX_LEN],
1407        mut for_each: F,
1408    ) where
1409        F: FnMut(&[u8; INFIX_LEN]),
1410    {
1411        const {
1412            assert!(PREFIX_LEN + INFIX_LEN <= KEY_LEN);
1413        }
1414        assert!(
1415            O::same_segment_tree(PREFIX_LEN, PREFIX_LEN + INFIX_LEN - 1)
1416                && (PREFIX_LEN + INFIX_LEN == KEY_LEN
1417                    || !O::same_segment_tree(PREFIX_LEN + INFIX_LEN - 1, PREFIX_LEN + INFIX_LEN)),
1418            "INFIX_LEN must cover a whole segment"
1419        );
1420        if let Some(root) = &self.root {
1421            root.infixes_range(prefix, 0, min_infix, max_infix, &mut for_each);
1422        }
1423    }
1424
1425    /// Count entries whose infix falls within [min_infix, max_infix].
1426    ///
1427    /// Uses cached `leaf_count` on branches to skip entire subtrees that
1428    /// are fully inside the range, making the count O(boundary_nodes)
1429    /// rather than O(matching_leaves).
1430    pub fn count_range<const PREFIX_LEN: usize, const INFIX_LEN: usize>(
1431        &self,
1432        prefix: &[u8; PREFIX_LEN],
1433        min_infix: &[u8; INFIX_LEN],
1434        max_infix: &[u8; INFIX_LEN],
1435    ) -> u64 {
1436        const {
1437            assert!(PREFIX_LEN + INFIX_LEN <= KEY_LEN);
1438        }
1439        match &self.root {
1440            Some(root) => root.count_range(prefix, 0, min_infix, max_infix),
1441            None => 0,
1442        }
1443    }
1444
1445    /// Returns true if the PATCH has a key with the given prefix.
1446    ///
1447    /// `PREFIX_LEN` must be less than or equal to `KEY_LEN` or a compile-time
1448    /// assertion will fail.
1449    pub fn has_prefix<const PREFIX_LEN: usize>(&self, prefix: &[u8; PREFIX_LEN]) -> bool {
1450        const {
1451            assert!(PREFIX_LEN <= KEY_LEN);
1452        }
1453        if let Some(root) = &self.root {
1454            root.has_prefix(0, prefix)
1455        } else {
1456            PREFIX_LEN == 0
1457        }
1458    }
1459
1460    /// Returns the number of unique segments in keys with the given prefix.
1461    pub fn segmented_len<const PREFIX_LEN: usize>(&self, prefix: &[u8; PREFIX_LEN]) -> u64 {
1462        const {
1463            assert!(PREFIX_LEN <= KEY_LEN);
1464            if PREFIX_LEN > 0 && PREFIX_LEN < KEY_LEN {
1465                assert!(
1466                    <O as KeySchema<KEY_LEN>>::Segmentation::SEGMENTS
1467                        [O::TREE_TO_KEY[PREFIX_LEN - 1]]
1468                        != <O as KeySchema<KEY_LEN>>::Segmentation::SEGMENTS
1469                            [O::TREE_TO_KEY[PREFIX_LEN]],
1470                    "PREFIX_LEN must align to segment boundary",
1471                );
1472            }
1473        }
1474        if let Some(root) = &self.root {
1475            root.segmented_len(0, prefix)
1476        } else {
1477            0
1478        }
1479    }
1480
1481    /// Iterates over all keys in the PATCH.
1482    /// The keys are returned in key ordering but random order.
1483    pub fn iter<'a>(&'a self) -> PATCHIterator<'a, KEY_LEN, O, V> {
1484        PATCHIterator::new(self)
1485    }
1486
1487    /// Iterates over all keys in the PATCH in key order.
1488    ///
1489    /// The traversal visits every key in lexicographic key order, without
1490    /// accepting a prefix filter. For prefix-aware iteration, see
1491    /// [`PATCH::iter_prefix_count`].
1492    pub fn iter_ordered<'a>(&'a self) -> PATCHOrderedIterator<'a, KEY_LEN, O, V> {
1493        PATCHOrderedIterator::new(self)
1494    }
1495
1496    /// Iterate over all prefixes of the given length in the PATCH.
1497    /// The prefixes are naturally returned in tree ordering and tree order.
1498    /// A count of the number of elements for the given prefix is also returned.
1499    pub fn iter_prefix_count<'a, const PREFIX_LEN: usize>(
1500        &'a self,
1501    ) -> PATCHPrefixIterator<'a, KEY_LEN, PREFIX_LEN, O, V> {
1502        PATCHPrefixIterator::new(self)
1503    }
1504
1505    /// Unions this PATCH with another PATCH.
1506    ///
1507    /// The other PATCH is consumed, and this PATCH is updated in place.
1508    pub fn union(&mut self, other: Self)
1509    where
1510        O: Send + Sync,
1511        V: Send + Sync,
1512    {
1513        if let Some(other) = other.root {
1514            if self.root.is_some() {
1515                let this = self.root.take().expect("root should not be empty");
1516                let merged = Head::union(this, other, 0);
1517                self.root.replace(merged);
1518            } else {
1519                self.root.replace(other);
1520            }
1521        }
1522    }
1523
1524    /// Intersects this PATCH with another PATCH.
1525    ///
1526    /// Returns a new PATCH that contains only the keys that are present in both PATCHes.
1527    pub fn intersect(&self, other: &Self) -> Self {
1528        if let Some(root) = &self.root {
1529            if let Some(other_root) = &other.root {
1530                return Self {
1531                    root: root.intersect(other_root, 0).map(|root| root.with_start(0)),
1532                };
1533            }
1534        }
1535        Self::new()
1536    }
1537
1538    /// Returns the difference between this PATCH and another PATCH.
1539    ///
1540    /// Returns a new PATCH that contains only the keys that are present in this PATCH,
1541    /// but not in the other PATCH.
1542    pub fn difference(&self, other: &Self) -> Self {
1543        if let Some(root) = &self.root {
1544            if let Some(other_root) = &other.root {
1545                Self {
1546                    root: root.difference(other_root, 0),
1547                }
1548            } else {
1549                (*self).clone()
1550            }
1551        } else {
1552            Self::new()
1553        }
1554    }
1555
1556    /// Calculates the average fill level for branch nodes grouped by their
1557    /// branching factor. The returned array contains eight entries for branch
1558    /// sizes `2`, `4`, `8`, `16`, `32`, `64`, `128` and `256` in that order.
1559    //#[cfg(debug_assertions)]
1560    pub fn debug_branch_fill(&self) -> [f32; 8] {
1561        let mut counts = [0u64; 8];
1562        let mut used = [0u64; 8];
1563
1564        if let Some(root) = &self.root {
1565            let mut stack = Vec::new();
1566            stack.push(root);
1567
1568            while let Some(head) = stack.pop() {
1569                match head.body_ref() {
1570                    BodyRef::Leaf(_) => {}
1571                    BodyRef::Branch(b) => {
1572                        let size = b.child_table.len();
1573                        let idx = size.trailing_zeros() as usize - 1;
1574                        counts[idx] += 1;
1575                        used[idx] += b.child_table.iter().filter(|c| c.is_some()).count() as u64;
1576                        for child in b.child_table.iter().filter_map(|c| c.as_ref()) {
1577                            stack.push(child);
1578                        }
1579                    }
1580                }
1581            }
1582        }
1583
1584        let mut avg = [0f32; 8];
1585        for i in 0..8 {
1586            if counts[i] > 0 {
1587                let size = 1u64 << (i + 1);
1588                avg[i] = used[i] as f32 / (counts[i] as f32 * size as f32);
1589            }
1590        }
1591        avg
1592    }
1593}
1594
1595impl<const KEY_LEN: usize, O, V> PartialEq for PATCH<KEY_LEN, O, V>
1596where
1597    O: KeySchema<KEY_LEN>,
1598{
1599    fn eq(&self, other: &Self) -> bool {
1600        self.root.as_ref().map(|root| root.hash()) == other.root.as_ref().map(|root| root.hash())
1601    }
1602}
1603
1604impl<const KEY_LEN: usize, O, V> Eq for PATCH<KEY_LEN, O, V> where O: KeySchema<KEY_LEN> {}
1605
1606impl<'a, const KEY_LEN: usize, O, V> IntoIterator for &'a PATCH<KEY_LEN, O, V>
1607where
1608    O: KeySchema<KEY_LEN>,
1609{
1610    type Item = &'a [u8; KEY_LEN];
1611    type IntoIter = PATCHIterator<'a, KEY_LEN, O, V>;
1612
1613    fn into_iter(self) -> Self::IntoIter {
1614        PATCHIterator::new(self)
1615    }
1616}
1617
1618/// An iterator over all keys in a PATCH.
1619/// The keys are returned in key ordering but in random order.
1620pub struct PATCHIterator<'a, const KEY_LEN: usize, O: KeySchema<KEY_LEN>, V> {
1621    stack: ArrayVec<std::slice::Iter<'a, Option<Head<KEY_LEN, O, V>>>, KEY_LEN>,
1622    remaining: usize,
1623}
1624
1625impl<'a, const KEY_LEN: usize, O: KeySchema<KEY_LEN>, V> PATCHIterator<'a, KEY_LEN, O, V> {
1626    /// Creates an iterator over all keys in `patch`.
1627    pub fn new(patch: &'a PATCH<KEY_LEN, O, V>) -> Self {
1628        let mut r = PATCHIterator {
1629            stack: ArrayVec::new(),
1630            remaining: patch.len().min(usize::MAX as u64) as usize,
1631        };
1632        r.stack.push(std::slice::from_ref(&patch.root).iter());
1633        r
1634    }
1635}
1636
1637impl<'a, const KEY_LEN: usize, O: KeySchema<KEY_LEN>, V> Iterator
1638    for PATCHIterator<'a, KEY_LEN, O, V>
1639{
1640    type Item = &'a [u8; KEY_LEN];
1641
1642    fn next(&mut self) -> Option<Self::Item> {
1643        let mut iter = self.stack.last_mut()?;
1644        loop {
1645            if let Some(child) = iter.next() {
1646                if let Some(child) = child {
1647                    match child.body_ref() {
1648                        BodyRef::Leaf(_) => {
1649                            self.remaining = self.remaining.saturating_sub(1);
1650                            // Use the safe accessor on the child reference to obtain the leaf key bytes.
1651                            return Some(child.childleaf_key());
1652                        }
1653                        BodyRef::Branch(branch) => {
1654                            self.stack.push(branch.child_table.iter());
1655                            iter = self.stack.last_mut()?;
1656                        }
1657                    }
1658                }
1659            } else {
1660                self.stack.pop();
1661                iter = self.stack.last_mut()?;
1662            }
1663        }
1664    }
1665
1666    fn size_hint(&self) -> (usize, Option<usize>) {
1667        (self.remaining, Some(self.remaining))
1668    }
1669}
1670
1671impl<'a, const KEY_LEN: usize, O: KeySchema<KEY_LEN>, V> ExactSizeIterator
1672    for PATCHIterator<'a, KEY_LEN, O, V>
1673{
1674}
1675
1676impl<'a, const KEY_LEN: usize, O: KeySchema<KEY_LEN>, V> std::iter::FusedIterator
1677    for PATCHIterator<'a, KEY_LEN, O, V>
1678{
1679}
1680
1681/// An iterator over every key in a PATCH, returned in key order.
1682///
1683/// Keys are yielded in lexicographic key order regardless of their physical
1684/// layout in the underlying tree. This iterator walks the full tree and does
1685/// not accept a prefix filter. For prefix-aware iteration, use
1686/// [`PATCHPrefixIterator`], constructed via [`PATCH::iter_prefix_count`].
1687pub struct PATCHOrderedIterator<'a, const KEY_LEN: usize, O: KeySchema<KEY_LEN>, V> {
1688    stack: Vec<ArrayVec<&'a Head<KEY_LEN, O, V>, 256>>,
1689    remaining: usize,
1690}
1691
1692impl<'a, const KEY_LEN: usize, O: KeySchema<KEY_LEN>, V> PATCHOrderedIterator<'a, KEY_LEN, O, V> {
1693    pub fn new(patch: &'a PATCH<KEY_LEN, O, V>) -> Self {
1694        let mut r = PATCHOrderedIterator {
1695            stack: Vec::with_capacity(KEY_LEN),
1696            remaining: patch.len().min(usize::MAX as u64) as usize,
1697        };
1698        if let Some(root) = &patch.root {
1699            r.stack.push(ArrayVec::new());
1700            match root.body_ref() {
1701                BodyRef::Leaf(_) => {
1702                    r.stack[0].push(root);
1703                }
1704                BodyRef::Branch(branch) => {
1705                    let first_level = &mut r.stack[0];
1706                    first_level.extend(branch.child_table.iter().filter_map(|c| c.as_ref()));
1707                    first_level.sort_unstable_by_key(|&k| Reverse(k.key())); // We need to reverse here because we pop from the vec.
1708                }
1709            }
1710        }
1711        r
1712    }
1713}
1714
1715// --- Owned consuming iterators ---
1716/// Iterator that owns a PATCH and yields keys in key-order. The iterator
1717/// consumes the PATCH and stores it on the heap (Box) so it can safely hold
1718/// raw pointers into the patch memory while the iterator is moved.
1719pub struct PATCHIntoIterator<const KEY_LEN: usize, O: KeySchema<KEY_LEN>, V> {
1720    queue: Vec<Head<KEY_LEN, O, V>>,
1721    remaining: usize,
1722}
1723
1724impl<const KEY_LEN: usize, O: KeySchema<KEY_LEN>, V> PATCHIntoIterator<KEY_LEN, O, V> {}
1725
1726impl<const KEY_LEN: usize, O: KeySchema<KEY_LEN>, V> Iterator for PATCHIntoIterator<KEY_LEN, O, V> {
1727    type Item = [u8; KEY_LEN];
1728
1729    fn next(&mut self) -> Option<Self::Item> {
1730        let q = &mut self.queue;
1731        while let Some(mut head) = q.pop() {
1732            // Match on the mutable body directly. For leaves we can return the
1733            // stored key (the array is Copy), for branches we take children out
1734            // of the table and push them onto the stack so they are visited
1735            // depth-first.
1736            match head.body_mut() {
1737                BodyMut::Leaf(leaf) => {
1738                    self.remaining = self.remaining.saturating_sub(1);
1739                    return Some(leaf.key);
1740                }
1741                BodyMut::Branch(branch) => {
1742                    for slot in branch.child_table.iter_mut().rev() {
1743                        if let Some(c) = slot.take() {
1744                            q.push(c);
1745                        }
1746                    }
1747                }
1748            }
1749        }
1750        None
1751    }
1752}
1753
1754/// Iterator that owns a PATCH and yields keys in key order.
1755pub struct PATCHIntoOrderedIterator<const KEY_LEN: usize, O: KeySchema<KEY_LEN>, V> {
1756    queue: Vec<Head<KEY_LEN, O, V>>,
1757    remaining: usize,
1758}
1759
1760impl<const KEY_LEN: usize, O: KeySchema<KEY_LEN>, V> Iterator
1761    for PATCHIntoOrderedIterator<KEY_LEN, O, V>
1762{
1763    type Item = [u8; KEY_LEN];
1764
1765    fn next(&mut self) -> Option<Self::Item> {
1766        let q = &mut self.queue;
1767        while let Some(mut head) = q.pop() {
1768            // Match the mutable body directly — we own `head` so calling
1769            // `body_mut()` is safe and allows returning the copied leaf key
1770            // or mutating the branch child table in-place.
1771            match head.body_mut() {
1772                BodyMut::Leaf(leaf) => {
1773                    self.remaining = self.remaining.saturating_sub(1);
1774                    return Some(leaf.key);
1775                }
1776                BodyMut::Branch(branch) => {
1777                    let slice: &mut [Option<Head<KEY_LEN, O, V>>] = &mut branch.child_table;
1778                    // Sort children by their byte-key, placing empty slots (None)
1779                    // after all occupied slots. Using `sort_unstable_by_key` with
1780                    // a simple key projection is clearer than a custom
1781                    // comparator; it also avoids allocating temporaries. The
1782                    // old comparator manually handled None/Some cases — we
1783                    // express that intent directly by sorting on the tuple
1784                    // (is_none, key_opt).
1785                    slice
1786                        .sort_unstable_by_key(|opt| (opt.is_none(), opt.as_ref().map(|h| h.key())));
1787                    for slot in slice.iter_mut().rev() {
1788                        if let Some(c) = slot.take() {
1789                            q.push(c);
1790                        }
1791                    }
1792                }
1793            }
1794        }
1795        None
1796    }
1797}
1798
1799impl<const KEY_LEN: usize, O: KeySchema<KEY_LEN>, V> IntoIterator for PATCH<KEY_LEN, O, V> {
1800    type Item = [u8; KEY_LEN];
1801    type IntoIter = PATCHIntoIterator<KEY_LEN, O, V>;
1802
1803    fn into_iter(self) -> Self::IntoIter {
1804        let remaining = self.len().min(usize::MAX as u64) as usize;
1805        let mut q = Vec::new();
1806        if let Some(root) = self.root {
1807            q.push(root);
1808        }
1809        PATCHIntoIterator {
1810            queue: q,
1811            remaining,
1812        }
1813    }
1814}
1815
1816impl<const KEY_LEN: usize, O: KeySchema<KEY_LEN>, V> PATCH<KEY_LEN, O, V> {
1817    /// Consume and return an iterator that yields keys in key order.
1818    pub fn into_iter_ordered(self) -> PATCHIntoOrderedIterator<KEY_LEN, O, V> {
1819        let remaining = self.len().min(usize::MAX as u64) as usize;
1820        let mut q = Vec::new();
1821        if let Some(root) = self.root {
1822            q.push(root);
1823        }
1824        PATCHIntoOrderedIterator {
1825            queue: q,
1826            remaining,
1827        }
1828    }
1829}
1830
1831impl<'a, const KEY_LEN: usize, O: KeySchema<KEY_LEN>, V> Iterator
1832    for PATCHOrderedIterator<'a, KEY_LEN, O, V>
1833{
1834    type Item = &'a [u8; KEY_LEN];
1835
1836    fn next(&mut self) -> Option<Self::Item> {
1837        let mut level = self.stack.last_mut()?;
1838        loop {
1839            if let Some(child) = level.pop() {
1840                match child.body_ref() {
1841                    BodyRef::Leaf(_) => {
1842                        self.remaining = self.remaining.saturating_sub(1);
1843                        return Some(child.childleaf_key());
1844                    }
1845                    BodyRef::Branch(branch) => {
1846                        self.stack.push(ArrayVec::new());
1847                        level = self.stack.last_mut()?;
1848                        level.extend(branch.child_table.iter().filter_map(|c| c.as_ref()));
1849                        level.sort_unstable_by_key(|&k| Reverse(k.key())); // We need to reverse here because we pop from the vec.
1850                    }
1851                }
1852            } else {
1853                self.stack.pop();
1854                level = self.stack.last_mut()?;
1855            }
1856        }
1857    }
1858
1859    fn size_hint(&self) -> (usize, Option<usize>) {
1860        (self.remaining, Some(self.remaining))
1861    }
1862}
1863
1864impl<'a, const KEY_LEN: usize, O: KeySchema<KEY_LEN>, V> ExactSizeIterator
1865    for PATCHOrderedIterator<'a, KEY_LEN, O, V>
1866{
1867}
1868
1869impl<'a, const KEY_LEN: usize, O: KeySchema<KEY_LEN>, V> std::iter::FusedIterator
1870    for PATCHOrderedIterator<'a, KEY_LEN, O, V>
1871{
1872}
1873
1874/// An iterator over all keys in a PATCH that have a given prefix.
1875/// The keys are returned in tree ordering and in tree order.
1876pub struct PATCHPrefixIterator<
1877    'a,
1878    const KEY_LEN: usize,
1879    const PREFIX_LEN: usize,
1880    O: KeySchema<KEY_LEN>,
1881    V,
1882> {
1883    stack: Vec<ArrayVec<&'a Head<KEY_LEN, O, V>, 256>>,
1884}
1885
1886impl<'a, const KEY_LEN: usize, const PREFIX_LEN: usize, O: KeySchema<KEY_LEN>, V>
1887    PATCHPrefixIterator<'a, KEY_LEN, PREFIX_LEN, O, V>
1888{
1889    fn new(patch: &'a PATCH<KEY_LEN, O, V>) -> Self {
1890        const {
1891            assert!(PREFIX_LEN <= KEY_LEN);
1892        }
1893        let mut r = PATCHPrefixIterator {
1894            stack: Vec::with_capacity(PREFIX_LEN),
1895        };
1896        if let Some(root) = &patch.root {
1897            r.stack.push(ArrayVec::new());
1898            if root.end_depth() >= PREFIX_LEN {
1899                r.stack[0].push(root);
1900            } else {
1901                let BodyRef::Branch(branch) = root.body_ref() else {
1902                    unreachable!();
1903                };
1904                let first_level = &mut r.stack[0];
1905                first_level.extend(branch.child_table.iter().filter_map(|c| c.as_ref()));
1906                first_level.sort_unstable_by_key(|&k| Reverse(k.key())); // We need to reverse here because we pop from the vec.
1907            }
1908        }
1909        r
1910    }
1911}
1912
1913impl<'a, const KEY_LEN: usize, const PREFIX_LEN: usize, O: KeySchema<KEY_LEN>, V> Iterator
1914    for PATCHPrefixIterator<'a, KEY_LEN, PREFIX_LEN, O, V>
1915{
1916    type Item = ([u8; PREFIX_LEN], u64);
1917
1918    fn next(&mut self) -> Option<Self::Item> {
1919        let mut level = self.stack.last_mut()?;
1920        loop {
1921            if let Some(child) = level.pop() {
1922                if child.end_depth() >= PREFIX_LEN {
1923                    let key = O::tree_ordered(child.childleaf_key());
1924                    let suffix_count = child.count();
1925                    return Some((key[0..PREFIX_LEN].try_into().unwrap(), suffix_count));
1926                } else {
1927                    let BodyRef::Branch(branch) = child.body_ref() else {
1928                        unreachable!();
1929                    };
1930                    self.stack.push(ArrayVec::new());
1931                    level = self.stack.last_mut()?;
1932                    level.extend(branch.child_table.iter().filter_map(|c| c.as_ref()));
1933                    level.sort_unstable_by_key(|&k| Reverse(k.key())); // We need to reverse here because we pop from the vec.
1934                }
1935            } else {
1936                self.stack.pop();
1937                level = self.stack.last_mut()?;
1938            }
1939        }
1940    }
1941}
1942
1943#[cfg(test)]
1944mod tests {
1945    use super::*;
1946    use itertools::Itertools;
1947    use proptest::prelude::*;
1948    use std::collections::HashSet;
1949    use std::convert::TryInto;
1950    use std::iter::FromIterator;
1951    use std::mem;
1952
1953    #[test]
1954    fn head_tag() {
1955        let head = Head::<64, IdentitySchema, ()>::new::<Leaf<64, ()>>(0, NonNull::dangling());
1956        assert_eq!(head.tag(), HeadTag::Leaf);
1957        mem::forget(head);
1958    }
1959
1960    #[test]
1961    fn head_key() {
1962        for k in 0..=255 {
1963            let head = Head::<64, IdentitySchema, ()>::new::<Leaf<64, ()>>(k, NonNull::dangling());
1964            assert_eq!(head.key(), k);
1965            mem::forget(head);
1966        }
1967    }
1968
1969    #[test]
1970    fn head_size() {
1971        assert_eq!(mem::size_of::<Head<64, IdentitySchema, ()>>(), 8);
1972    }
1973
1974    #[test]
1975    fn option_head_size() {
1976        assert_eq!(mem::size_of::<Option<Head<64, IdentitySchema, ()>>>(), 8);
1977    }
1978
1979    #[test]
1980    fn empty_tree() {
1981        let _tree = PATCH::<64, IdentitySchema, ()>::new();
1982    }
1983
1984    #[test]
1985    fn tree_put_one() {
1986        const KEY_SIZE: usize = 64;
1987        let mut tree = PATCH::<KEY_SIZE, IdentitySchema, ()>::new();
1988        let entry = Entry::new(&[0; KEY_SIZE]);
1989        tree.insert(&entry);
1990    }
1991
1992    #[test]
1993    fn tree_clone_one() {
1994        const KEY_SIZE: usize = 64;
1995        let mut tree = PATCH::<KEY_SIZE, IdentitySchema, ()>::new();
1996        let entry = Entry::new(&[0; KEY_SIZE]);
1997        tree.insert(&entry);
1998        let _clone = tree.clone();
1999    }
2000
2001    #[test]
2002    fn tree_put_same() {
2003        const KEY_SIZE: usize = 64;
2004        let mut tree = PATCH::<KEY_SIZE, IdentitySchema, ()>::new();
2005        let entry = Entry::new(&[0; KEY_SIZE]);
2006        tree.insert(&entry);
2007        tree.insert(&entry);
2008    }
2009
2010    #[test]
2011    fn tree_replace_existing() {
2012        const KEY_SIZE: usize = 64;
2013        let key = [1u8; KEY_SIZE];
2014        let mut tree = PATCH::<KEY_SIZE, IdentitySchema, u32>::new();
2015        let entry1 = Entry::with_value(&key, 1);
2016        tree.insert(&entry1);
2017        let entry2 = Entry::with_value(&key, 2);
2018        tree.replace(&entry2);
2019        assert_eq!(tree.get(&key), Some(&2));
2020    }
2021
2022    #[test]
2023    fn tree_replace_childleaf_updates_branch() {
2024        const KEY_SIZE: usize = 64;
2025        let key1 = [0u8; KEY_SIZE];
2026        let key2 = [1u8; KEY_SIZE];
2027        let mut tree = PATCH::<KEY_SIZE, IdentitySchema, u32>::new();
2028        let entry1 = Entry::with_value(&key1, 1);
2029        let entry2 = Entry::with_value(&key2, 2);
2030        tree.insert(&entry1);
2031        tree.insert(&entry2);
2032        let entry1b = Entry::with_value(&key1, 3);
2033        tree.replace(&entry1b);
2034        assert_eq!(tree.get(&key1), Some(&3));
2035        assert_eq!(tree.get(&key2), Some(&2));
2036    }
2037
2038    #[test]
2039    fn update_child_refreshes_childleaf_on_replace() {
2040        const KEY_SIZE: usize = 4;
2041        let mut tree = PATCH::<KEY_SIZE, IdentitySchema, u32>::new();
2042
2043        let key1 = [0u8; KEY_SIZE];
2044        let key2 = [1u8; KEY_SIZE];
2045        tree.insert(&Entry::with_value(&key1, 1));
2046        tree.insert(&Entry::with_value(&key2, 2));
2047
2048        // Determine which child currently provides the branch childleaf.
2049        let root_ref = tree.root.as_ref().expect("root exists");
2050        let before_childleaf = *root_ref.childleaf_key();
2051
2052        // Find the slot key (the byte index used in the branch table) for the child
2053        // that currently provides the childleaf.
2054        let slot_key = match root_ref.body_ref() {
2055            BodyRef::Branch(branch) => branch
2056                .child_table
2057                .iter()
2058                .filter_map(|c| c.as_ref())
2059                .find(|c| c.childleaf_key() == &before_childleaf)
2060                .expect("child exists")
2061                .key(),
2062            BodyRef::Leaf(_) => panic!("root should be a branch"),
2063        };
2064
2065        // Replace that child with a new leaf that has a different childleaf key.
2066        let new_key = [2u8; KEY_SIZE];
2067        {
2068            let mut ed = crate::patch::branch::BranchMut::from_slot(&mut tree.root);
2069            ed.modify_child(slot_key, |_| {
2070                Some(Entry::with_value(&new_key, 42).leaf::<IdentitySchema>())
2071            });
2072            // drop(ed) commits
2073        }
2074
2075        let after = tree.root.as_ref().expect("root exists");
2076        assert_eq!(after.childleaf_key(), &new_key);
2077    }
2078
2079    #[test]
2080    fn remove_childleaf_updates_branch() {
2081        const KEY_SIZE: usize = 4;
2082        let mut tree = PATCH::<KEY_SIZE, IdentitySchema, u32>::new();
2083
2084        let key1 = [0u8; KEY_SIZE];
2085        let key2 = [1u8; KEY_SIZE];
2086        tree.insert(&Entry::with_value(&key1, 1));
2087        tree.insert(&Entry::with_value(&key2, 2));
2088
2089        let childleaf_before = *tree.root.as_ref().unwrap().childleaf_key();
2090        // remove the leaf that currently provides the branch.childleaf
2091        tree.remove(&childleaf_before);
2092
2093        // Ensure the removed key is gone and the other key remains and is now the childleaf.
2094        let other = if childleaf_before == key1 { key2 } else { key1 };
2095        assert_eq!(tree.get(&childleaf_before), None);
2096        assert_eq!(tree.get(&other), Some(&2u32));
2097        let after_childleaf = tree.root.as_ref().unwrap().childleaf_key();
2098        assert_eq!(after_childleaf, &other);
2099    }
2100
2101    #[test]
2102    fn remove_collapses_branch_to_single_child() {
2103        const KEY_SIZE: usize = 4;
2104        let mut tree = PATCH::<KEY_SIZE, IdentitySchema, u32>::new();
2105
2106        let key1 = [0u8; KEY_SIZE];
2107        let key2 = [1u8; KEY_SIZE];
2108        tree.insert(&Entry::with_value(&key1, 1));
2109        tree.insert(&Entry::with_value(&key2, 2));
2110
2111        // Remove one key and ensure the root collapses to the remaining child.
2112        tree.remove(&key1);
2113        assert_eq!(tree.get(&key1), None);
2114        assert_eq!(tree.get(&key2), Some(&2u32));
2115        let root = tree.root.as_ref().expect("root exists");
2116        match root.body_ref() {
2117            BodyRef::Leaf(_) => {}
2118            BodyRef::Branch(_) => panic!("root should have collapsed to a leaf"),
2119        }
2120    }
2121
2122    #[test]
2123    fn branch_size() {
2124        assert_eq!(
2125            mem::size_of::<Branch<64, IdentitySchema, [Option<Head<64, IdentitySchema, ()>>; 2], ()>>(
2126            ),
2127            64
2128        );
2129        assert_eq!(
2130            mem::size_of::<Branch<64, IdentitySchema, [Option<Head<64, IdentitySchema, ()>>; 4], ()>>(
2131            ),
2132            48 + 16 * 2
2133        );
2134        assert_eq!(
2135            mem::size_of::<Branch<64, IdentitySchema, [Option<Head<64, IdentitySchema, ()>>; 8], ()>>(
2136            ),
2137            48 + 16 * 4
2138        );
2139        assert_eq!(
2140            mem::size_of::<
2141                Branch<64, IdentitySchema, [Option<Head<64, IdentitySchema, ()>>; 16], ()>,
2142            >(),
2143            48 + 16 * 8
2144        );
2145        assert_eq!(
2146            mem::size_of::<
2147                Branch<64, IdentitySchema, [Option<Head<32, IdentitySchema, ()>>; 32], ()>,
2148            >(),
2149            48 + 16 * 16
2150        );
2151        assert_eq!(
2152            mem::size_of::<
2153                Branch<64, IdentitySchema, [Option<Head<64, IdentitySchema, ()>>; 64], ()>,
2154            >(),
2155            48 + 16 * 32
2156        );
2157        assert_eq!(
2158            mem::size_of::<
2159                Branch<64, IdentitySchema, [Option<Head<64, IdentitySchema, ()>>; 128], ()>,
2160            >(),
2161            48 + 16 * 64
2162        );
2163        assert_eq!(
2164            mem::size_of::<
2165                Branch<64, IdentitySchema, [Option<Head<64, IdentitySchema, ()>>; 256], ()>,
2166            >(),
2167            48 + 16 * 128
2168        );
2169    }
2170
2171    /// Checks what happens if we join two PATCHes that
2172    /// only contain a single element each, that differs in the last byte.
2173    #[test]
2174    fn tree_union_single() {
2175        const KEY_SIZE: usize = 8;
2176        let mut left = PATCH::<KEY_SIZE, IdentitySchema, ()>::new();
2177        let mut right = PATCH::<KEY_SIZE, IdentitySchema, ()>::new();
2178        let left_entry = Entry::new(&[0, 0, 0, 0, 0, 0, 0, 0]);
2179        let right_entry = Entry::new(&[0, 0, 0, 0, 0, 0, 0, 1]);
2180        left.insert(&left_entry);
2181        right.insert(&right_entry);
2182        left.union(right);
2183        assert_eq!(left.len(), 2);
2184    }
2185
2186    // Small unit tests that ensure BranchMut-based editing is used by
2187    // the higher-level set operations like intersect/difference. These are
2188    // ordinary unit tests (not proptest) and must appear outside the
2189    // `proptest!` macro below.
2190
2191    proptest! {
2192        #[test]
2193        fn tree_insert(keys in prop::collection::vec(prop::collection::vec(0u8..=255, 64), 1..1024)) {
2194            let mut tree = PATCH::<64, IdentitySchema, ()>::new();
2195            for key in keys {
2196                let key: [u8; 64] = key.try_into().unwrap();
2197                let entry = Entry::new(&key);
2198                tree.insert(&entry);
2199            }
2200        }
2201
2202        #[test]
2203        fn tree_len(keys in prop::collection::vec(prop::collection::vec(0u8..=255, 64), 1..1024)) {
2204            let mut tree = PATCH::<64, IdentitySchema, ()>::new();
2205            let mut set = HashSet::new();
2206            for key in keys {
2207                let key: [u8; 64] = key.try_into().unwrap();
2208                let entry = Entry::new(&key);
2209                tree.insert(&entry);
2210                set.insert(key);
2211            }
2212
2213            prop_assert_eq!(set.len() as u64, tree.len())
2214        }
2215
2216        #[test]
2217        fn tree_infixes(keys in prop::collection::vec(prop::collection::vec(0u8..=255, 64), 1..1024)) {
2218            let mut tree = PATCH::<64, IdentitySchema, ()>::new();
2219            let mut set = HashSet::new();
2220            for key in keys {
2221                let key: [u8; 64] = key.try_into().unwrap();
2222                let entry = Entry::new(&key);
2223                tree.insert(&entry);
2224                set.insert(key);
2225            }
2226            let mut set_vec = Vec::from_iter(set.into_iter());
2227            let mut tree_vec = vec![];
2228            tree.infixes(&[0; 0], &mut |&x: &[u8; 64]| tree_vec.push(x));
2229
2230            set_vec.sort();
2231            tree_vec.sort();
2232
2233            prop_assert_eq!(set_vec, tree_vec);
2234        }
2235
2236        #[test]
2237        fn tree_iter(keys in prop::collection::vec(prop::collection::vec(0u8..=255, 64), 1..1024)) {
2238            let mut tree = PATCH::<64, IdentitySchema, ()>::new();
2239            let mut set = HashSet::new();
2240            for key in keys {
2241                let key: [u8; 64] = key.try_into().unwrap();
2242                let entry = Entry::new(&key);
2243                tree.insert(&entry);
2244                set.insert(key);
2245            }
2246            let mut set_vec = Vec::from_iter(set.into_iter());
2247            let mut tree_vec = vec![];
2248            for key in &tree {
2249                tree_vec.push(*key);
2250            }
2251
2252            set_vec.sort();
2253            tree_vec.sort();
2254
2255            prop_assert_eq!(set_vec, tree_vec);
2256        }
2257
2258        #[test]
2259        fn tree_union(left in prop::collection::vec(prop::collection::vec(0u8..=255, 64), 200),
2260                        right in prop::collection::vec(prop::collection::vec(0u8..=255, 64), 200)) {
2261            let mut set = HashSet::new();
2262
2263            let mut left_tree = PATCH::<64, IdentitySchema, ()>::new();
2264            for entry in left {
2265                let mut key = [0; 64];
2266                key.iter_mut().set_from(entry.iter().cloned());
2267                let entry = Entry::new(&key);
2268                left_tree.insert(&entry);
2269                set.insert(key);
2270            }
2271
2272            let mut right_tree = PATCH::<64, IdentitySchema, ()>::new();
2273            for entry in right {
2274                let mut key = [0; 64];
2275                key.iter_mut().set_from(entry.iter().cloned());
2276                let entry = Entry::new(&key);
2277                right_tree.insert(&entry);
2278                set.insert(key);
2279            }
2280
2281            left_tree.union(right_tree);
2282
2283            let mut set_vec = Vec::from_iter(set.into_iter());
2284            let mut tree_vec = vec![];
2285            left_tree.infixes(&[0; 0], &mut |&x: &[u8;64]| tree_vec.push(x));
2286
2287            set_vec.sort();
2288            tree_vec.sort();
2289
2290            prop_assert_eq!(set_vec, tree_vec);
2291            }
2292
2293        #[test]
2294        fn tree_union_empty(left in prop::collection::vec(prop::collection::vec(0u8..=255, 64), 2)) {
2295            let mut set = HashSet::new();
2296
2297            let mut left_tree = PATCH::<64, IdentitySchema, ()>::new();
2298            for entry in left {
2299                let mut key = [0; 64];
2300                key.iter_mut().set_from(entry.iter().cloned());
2301                let entry = Entry::new(&key);
2302                left_tree.insert(&entry);
2303                set.insert(key);
2304            }
2305
2306            let right_tree = PATCH::<64, IdentitySchema, ()>::new();
2307
2308            left_tree.union(right_tree);
2309
2310            let mut set_vec = Vec::from_iter(set.into_iter());
2311            let mut tree_vec = vec![];
2312            left_tree.infixes(&[0; 0], &mut |&x: &[u8;64]| tree_vec.push(x));
2313
2314            set_vec.sort();
2315            tree_vec.sort();
2316
2317            prop_assert_eq!(set_vec, tree_vec);
2318            }
2319
2320        // I got a feeling that we're not testing COW properly.
2321        // We should check if a tree remains the same after a clone of it
2322        // is modified by inserting new keys.
2323
2324    #[test]
2325    fn cow_on_insert(base_keys in prop::collection::vec(prop::collection::vec(0u8..=255, 8), 1..1024),
2326                         new_keys in prop::collection::vec(prop::collection::vec(0u8..=255, 8), 1..1024)) {
2327            // Note that we can't compare the trees directly, as that uses the hash,
2328            // which might not be affected by nodes in lower levels being changed accidentally.
2329            // Instead we need to iterate over the keys and check if they are the same.
2330
2331            let mut tree = PATCH::<8, IdentitySchema, ()>::new();
2332            for key in base_keys {
2333                let key: [u8; 8] = key[..].try_into().unwrap();
2334                let entry = Entry::new(&key);
2335                tree.insert(&entry);
2336            }
2337            let base_tree_content: Vec<[u8; 8]> = tree.iter().copied().collect();
2338
2339            let mut tree_clone = tree.clone();
2340            for key in new_keys {
2341                let key: [u8; 8] = key[..].try_into().unwrap();
2342                let entry = Entry::new(&key);
2343                tree_clone.insert(&entry);
2344            }
2345
2346            let new_tree_content: Vec<[u8; 8]> = tree.iter().copied().collect();
2347            prop_assert_eq!(base_tree_content, new_tree_content);
2348        }
2349
2350        #[test]
2351    fn cow_on_union(base_keys in prop::collection::vec(prop::collection::vec(0u8..=255, 8), 1..1024),
2352                         new_keys in prop::collection::vec(prop::collection::vec(0u8..=255, 8), 1..1024)) {
2353            // Note that we can't compare the trees directly, as that uses the hash,
2354            // which might not be affected by nodes in lower levels being changed accidentally.
2355            // Instead we need to iterate over the keys and check if they are the same.
2356
2357            let mut tree = PATCH::<8, IdentitySchema, ()>::new();
2358            for key in base_keys {
2359                let key: [u8; 8] = key[..].try_into().unwrap();
2360                let entry = Entry::new(&key);
2361                tree.insert(&entry);
2362            }
2363            let base_tree_content: Vec<[u8; 8]> = tree.iter().copied().collect();
2364
2365            let mut tree_clone = tree.clone();
2366            let mut new_tree = PATCH::<8, IdentitySchema, ()>::new();
2367            for key in new_keys {
2368                let key: [u8; 8] = key[..].try_into().unwrap();
2369                let entry = Entry::new(&key);
2370                new_tree.insert(&entry);
2371            }
2372            tree_clone.union(new_tree);
2373
2374            let new_tree_content: Vec<[u8; 8]> = tree.iter().copied().collect();
2375            prop_assert_eq!(base_tree_content, new_tree_content);
2376        }
2377    }
2378
2379    #[test]
2380    fn intersect_multiple_common_children_commits_branchmut() {
2381        const KEY_SIZE: usize = 4;
2382        let mut left = PATCH::<KEY_SIZE, IdentitySchema, u32>::new();
2383        let mut right = PATCH::<KEY_SIZE, IdentitySchema, u32>::new();
2384
2385        let a = [0u8, 0u8, 0u8, 1u8];
2386        let b = [0u8, 0u8, 0u8, 2u8];
2387        let c = [0u8, 0u8, 0u8, 3u8];
2388        let d = [2u8, 0u8, 0u8, 0u8];
2389        let e = [3u8, 0u8, 0u8, 0u8];
2390
2391        left.insert(&Entry::with_value(&a, 1));
2392        left.insert(&Entry::with_value(&b, 2));
2393        left.insert(&Entry::with_value(&c, 3));
2394        left.insert(&Entry::with_value(&d, 4));
2395
2396        right.insert(&Entry::with_value(&a, 10));
2397        right.insert(&Entry::with_value(&b, 11));
2398        right.insert(&Entry::with_value(&c, 12));
2399        right.insert(&Entry::with_value(&e, 13));
2400
2401        let res = left.intersect(&right);
2402        // A, B, C are common
2403        assert_eq!(res.len(), 3);
2404        assert!(res.get(&a).is_some());
2405        assert!(res.get(&b).is_some());
2406        assert!(res.get(&c).is_some());
2407    }
2408
2409    #[test]
2410    fn difference_multiple_children_commits_branchmut() {
2411        const KEY_SIZE: usize = 4;
2412        let mut left = PATCH::<KEY_SIZE, IdentitySchema, u32>::new();
2413        let mut right = PATCH::<KEY_SIZE, IdentitySchema, u32>::new();
2414
2415        let a = [0u8, 0u8, 0u8, 1u8];
2416        let b = [0u8, 0u8, 0u8, 2u8];
2417        let c = [0u8, 0u8, 0u8, 3u8];
2418        let d = [2u8, 0u8, 0u8, 0u8];
2419        let e = [3u8, 0u8, 0u8, 0u8];
2420
2421        left.insert(&Entry::with_value(&a, 1));
2422        left.insert(&Entry::with_value(&b, 2));
2423        left.insert(&Entry::with_value(&c, 3));
2424        left.insert(&Entry::with_value(&d, 4));
2425
2426        right.insert(&Entry::with_value(&a, 10));
2427        right.insert(&Entry::with_value(&b, 11));
2428        right.insert(&Entry::with_value(&c, 12));
2429        right.insert(&Entry::with_value(&e, 13));
2430
2431        let res = left.difference(&right);
2432        // left only has d
2433        assert_eq!(res.len(), 1);
2434        assert!(res.get(&d).is_some());
2435    }
2436
2437    #[test]
2438    fn difference_empty_left_is_empty() {
2439        const KEY_SIZE: usize = 4;
2440        let left = PATCH::<KEY_SIZE, IdentitySchema, u32>::new();
2441        let mut right = PATCH::<KEY_SIZE, IdentitySchema, u32>::new();
2442        let key = [1u8, 2u8, 3u8, 4u8];
2443        right.insert(&Entry::with_value(&key, 7));
2444
2445        let res = left.difference(&right);
2446        assert_eq!(res.len(), 0);
2447    }
2448
2449    #[test]
2450    fn difference_empty_right_returns_left() {
2451        const KEY_SIZE: usize = 4;
2452        let mut left = PATCH::<KEY_SIZE, IdentitySchema, u32>::new();
2453        let right = PATCH::<KEY_SIZE, IdentitySchema, u32>::new();
2454        let key = [1u8, 2u8, 3u8, 4u8];
2455        left.insert(&Entry::with_value(&key, 7));
2456
2457        let res = left.difference(&right);
2458        assert_eq!(res.len(), 1);
2459        assert!(res.get(&key).is_some());
2460    }
2461
2462    #[test]
2463    fn slot_edit_branchmut_insert_update() {
2464        // Small unit test demonstrating the Slot::edit -> BranchMut insert/update pattern.
2465        const KEY_SIZE: usize = 8;
2466        let mut tree = PATCH::<KEY_SIZE, IdentitySchema, u32>::new();
2467
2468        let entry1 = Entry::with_value(&[0u8; KEY_SIZE], 1u32);
2469        let entry2 = Entry::with_value(&[1u8; KEY_SIZE], 2u32);
2470        tree.insert(&entry1);
2471        tree.insert(&entry2);
2472        assert_eq!(tree.len(), 2);
2473
2474        // Edit the root slot in-place using the BranchMut editor.
2475        {
2476            let mut ed = crate::patch::branch::BranchMut::from_slot(&mut tree.root);
2477
2478            // Compute the insertion start depth first to avoid borrowing `ed` inside the closure.
2479            let start_depth = ed.end_depth as usize;
2480            let inserted = Entry::with_value(&[2u8; KEY_SIZE], 3u32)
2481                .leaf::<IdentitySchema>()
2482                .with_start(start_depth);
2483            let key = inserted.key();
2484
2485            ed.modify_child(key, |opt| match opt {
2486                Some(old) => Some(Head::insert_leaf(old, inserted, start_depth)),
2487                None => Some(inserted),
2488            });
2489            // BranchMut is dropped here and commits the updated branch pointer back into the head.
2490        }
2491
2492        assert_eq!(tree.len(), 3);
2493        assert_eq!(tree.get(&[2u8; KEY_SIZE]), Some(&3u32));
2494    }
2495}