Skip to main content

triblespace_core/
patch.rs

1//! Persistent Adaptive Trie with Cuckoo-compression and
2//! Hash-maintenance (PATCH).
3//!
4//! See the [PATCH](../book/src/deep-dive/patch.md) chapter of the Tribles Book
5//! for the full design description and hashing scheme.
6//!
7//! Values stored in leaves are not part of hashing or equality comparisons.
8//! Two [`PATCH`](crate::patch::PATCH)es are considered equal if they contain the same set of keys,
9//! even if the associated values differ. This allows using the structure as an
10//! idempotent blobstore where a value's hash determines its key.
11//!
12#![allow(unstable_name_collisions)]
13
14mod branch;
15/// Byte-indexed lookup tables used by PATCH branch nodes.
16pub mod bytetable;
17mod entry;
18mod leaf;
19
20use arrayvec::ArrayVec;
21
22use branch::*;
23/// Re-export of [`Entry`](entry::Entry).
24pub use entry::Entry;
25use leaf::*;
26
27/// Re-export of all byte table utilities.
28pub use bytetable::*;
29use rand::thread_rng;
30use rand::RngCore;
31use std::cmp::Reverse;
32use std::convert::TryInto;
33use std::fmt;
34use std::fmt::Debug;
35use std::marker::PhantomData;
36use std::ptr::NonNull;
37use std::sync::Once;
38
39#[cfg(not(target_pointer_width = "64"))]
40compile_error!("PATCH tagged pointers require 64-bit targets");
41
42static mut SIP_KEY: [u8; 16] = [0; 16];
43static INIT: Once = Once::new();
44
45/// Minimum `other.leaf_count` at which [`Head::par_union`] takes the
46/// scatter + bitset + rayon::scope-spawn path on the equal-depth-
47/// branch arm. Below this, the per-key `modify_child` loop wins
48/// because asymmetric merges only touch a handful of slots.
49#[cfg(feature = "parallel")]
50const PARALLEL_PATCH_UNION_THRESHOLD: usize = 4096;
51
52/// Parallel-aware PATCH union, with a shared work-stealing budget
53/// carried across the entire recursive descent.
54///
55/// Two-phase model per parallel call:
56///   1. Spawn phase (collect sequentially, dispatch per child):
57///      drain "both" pairs, for each: claim 1 unit from the
58///      shared budget — if successful, spawn the child union as
59///      a `rayon::scope` task; if budget is exhausted, run the
60///      child serially via `Head::union`.
61///   2. Install phase (purely serial): scatter-collected resolved
62///      heads + single-side pass-throughs land in the parent
63///      branch, then `recompute_aggregates` rebuilds the
64///      hash/leaf_count/segment_count/childleaf in one pass.
65///
66/// The budget is a single shared atomic — `num_threads²` total
67/// spawns across the entire descent, after which everything is
68/// sequential. This caps overhead without restricting the depth
69/// at which parallelism is reached: a heavy subtree near the
70/// root claims many units; a balanced descent spreads them.
71#[cfg(feature = "parallel")]
72mod parallel_union {
73    use core::sync::atomic::{AtomicUsize, Ordering};
74
75    /// Carries the shared spawn budget across recursive
76    /// `par_union_with_ctx` calls.
77    pub(crate) struct ParUnionCtx {
78        pub(crate) budget: AtomicUsize,
79    }
80
81    impl ParUnionCtx {
82        pub(crate) fn new() -> Self {
83            let n = rayon::current_num_threads();
84            Self {
85                budget: AtomicUsize::new(n.saturating_mul(n).max(2)),
86            }
87        }
88
89        /// Try to claim one spawn unit. Returns `true` if a unit was
90        /// claimed (caller should spawn), `false` if the budget was
91        /// already exhausted (caller should run serially).
92        ///
93        /// A naive `fetch_sub(1)` would wrap `0 → usize::MAX` on
94        /// over-subtract, briefly letting other threads see a huge
95        /// budget — so we use compare-exchange to refuse the claim
96        /// without ever observing the underflow.
97        pub(crate) fn try_claim(&self) -> bool {
98            let mut current = self.budget.load(Ordering::Relaxed);
99            loop {
100                if current == 0 {
101                    return false;
102                }
103                match self.budget.compare_exchange_weak(
104                    current,
105                    current - 1,
106                    Ordering::Relaxed,
107                    Ordering::Relaxed,
108                ) {
109                    Ok(_) => return true,
110                    Err(observed) => current = observed,
111                }
112            }
113        }
114    }
115
116    /// Raw-pointer wrapper for the scatter-write target. Each
117    /// spawned task writes to `resolved[k]` for its specific key
118    /// byte `k`; keys are pairwise distinct by construction (each
119    /// "both" bit in the partition uniquely identifies a slot), so
120    /// the writes are non-aliasing despite sharing a `*mut` across
121    /// threads.
122    ///
123    /// `write_at` exists as an inherent method (rather than callers
124    /// reading the `*mut` field directly) so that move closures
125    /// capture the whole wrapper — Rust 2021 precise-capture would
126    /// otherwise grab the raw pointer field, dropping the manual
127    /// `Send`/`Sync` impls and triggering a Send error.
128    pub(crate) struct ScatterPtr<T>(pub *mut T);
129
130    // Manual `Copy`/`Clone` impls so `T` doesn't get a spurious
131    // `T: Copy` / `T: Clone` bound from derive — the wrapper holds a
132    // raw pointer, which is always `Copy` regardless of `T`.
133    impl<T> Clone for ScatterPtr<T> {
134        fn clone(&self) -> Self {
135            *self
136        }
137    }
138    impl<T> Copy for ScatterPtr<T> {}
139
140    unsafe impl<T> Send for ScatterPtr<T> {}
141    unsafe impl<T> Sync for ScatterPtr<T> {}
142
143    impl<T> ScatterPtr<T> {
144        /// SAFETY: `i` must be in-bounds of the underlying buffer,
145        /// and the caller must guarantee no other thread is writing
146        /// to slot `i` concurrently.
147        pub(crate) unsafe fn write_at(self, i: usize, v: T) {
148            self.0.add(i).write(v);
149        }
150    }
151}
152
153/// Initializes the SIP key used for key hashing.
154/// This function is called automatically when a new PATCH is created.
155fn init_sip_key() {
156    INIT.call_once(|| {
157        bytetable::init();
158
159        let mut rng = thread_rng();
160        unsafe {
161            rng.fill_bytes(&mut SIP_KEY[..]);
162        }
163    });
164}
165
166/// Builds a per-byte segment map from the segment lengths.
167///
168/// The returned table maps each key byte to its segment index.
169pub const fn build_segmentation<const N: usize, const M: usize>(lens: [usize; M]) -> [usize; N] {
170    let mut res = [0; N];
171    let mut seg = 0;
172    let mut off = 0;
173    while seg < M {
174        let len = lens[seg];
175        let mut i = 0;
176        while i < len {
177            res[off + i] = seg;
178            i += 1;
179        }
180        off += len;
181        seg += 1;
182    }
183    res
184}
185
186/// Builds an identity permutation table of length `N`.
187pub const fn identity_map<const N: usize>() -> [usize; N] {
188    let mut res = [0; N];
189    let mut i = 0;
190    while i < N {
191        res[i] = i;
192        i += 1;
193    }
194    res
195}
196
197/// Builds a table translating indices from key order to tree order.
198///
199/// `lens` describes the segment lengths in key order and `perm` is the
200/// permutation of those segments in tree order.
201pub const fn build_key_to_tree<const N: usize, const M: usize>(
202    lens: [usize; M],
203    perm: [usize; M],
204) -> [usize; N] {
205    let mut key_starts = [0; M];
206    let mut off = 0;
207    let mut i = 0;
208    while i < M {
209        key_starts[i] = off;
210        off += lens[i];
211        i += 1;
212    }
213
214    let mut tree_starts = [0; M];
215    off = 0;
216    i = 0;
217    while i < M {
218        let seg = perm[i];
219        tree_starts[seg] = off;
220        off += lens[seg];
221        i += 1;
222    }
223
224    let mut res = [0; N];
225    let mut seg = 0;
226    while seg < M {
227        let len = lens[seg];
228        let ks = key_starts[seg];
229        let ts = tree_starts[seg];
230        let mut j = 0;
231        while j < len {
232            res[ks + j] = ts + j;
233            j += 1;
234        }
235        seg += 1;
236    }
237    res
238}
239
240/// Inverts a permutation table.
241pub const fn invert<const N: usize>(arr: [usize; N]) -> [usize; N] {
242    let mut res = [0; N];
243    let mut i = 0;
244    while i < N {
245        res[arr[i]] = i;
246        i += 1;
247    }
248    res
249}
250
251#[doc(hidden)]
252#[macro_export]
253macro_rules! key_segmentation {
254    (@count $($e:expr),* $(,)?) => {
255        <[()]>::len(&[$($crate::key_segmentation!(@sub $e)),*])
256    };
257    (@sub $e:expr) => { () };
258    ($(#[$meta:meta])* $name:ident, $len:expr, [$($seg_len:expr),+ $(,)?]) => {
259        $(#[$meta])*
260        #[derive(Copy, Clone, Debug)]
261        pub struct $name;
262        impl $name {
263            pub const SEG_LENS: [usize; $crate::key_segmentation!(@count $($seg_len),*)] = [$($seg_len),*];
264        }
265        impl $crate::patch::KeySegmentation<$len> for $name {
266            const SEGMENTS: [usize; $len] = $crate::patch::build_segmentation::<$len, {$crate::key_segmentation!(@count $($seg_len),*)}>(Self::SEG_LENS);
267        }
268    };
269}
270
271#[doc(hidden)]
272#[macro_export]
273macro_rules! key_schema {
274    (@count $($e:expr),* $(,)?) => {
275        <[()]>::len(&[$($crate::key_schema!(@sub $e)),*])
276    };
277    (@sub $e:expr) => { () };
278    ($(#[$meta:meta])* $name:ident, $seg:ty, $len:expr, [$($perm:expr),+ $(,)?]) => {
279        $(#[$meta])*
280        #[derive(Copy, Clone, Debug)]
281        pub struct $name;
282        impl $crate::patch::KeySchema<$len> for $name {
283            type Segmentation = $seg;
284            const SEGMENT_PERM: &'static [usize] = &[$($perm),*];
285            const KEY_TO_TREE: [usize; $len] = $crate::patch::build_key_to_tree::<$len, {$crate::key_schema!(@count $($perm),*)}>(<$seg>::SEG_LENS, [$($perm),*]);
286            const TREE_TO_KEY: [usize; $len] = $crate::patch::invert(Self::KEY_TO_TREE);
287        }
288    };
289}
290
291/// A trait is used to provide a re-ordered view of the keys stored in the PATCH.
292/// This allows for different PATCH instances share the same leaf nodes,
293/// independent of the key ordering used in the tree.
294pub trait KeySchema<const KEY_LEN: usize>: Copy + Clone + Debug {
295    /// The segmentation this ordering operates over.
296    type Segmentation: KeySegmentation<KEY_LEN>;
297    /// Order of segments from key layout to tree layout.
298    const SEGMENT_PERM: &'static [usize];
299    /// Maps each key index to its position in the tree view.
300    const KEY_TO_TREE: [usize; KEY_LEN];
301    /// Maps each tree index to its position in the key view.
302    const TREE_TO_KEY: [usize; KEY_LEN];
303
304    /// Reorders the key from the shared key ordering to the tree ordering.
305    fn tree_ordered(key: &[u8; KEY_LEN]) -> [u8; KEY_LEN] {
306        let mut new_key = [0; KEY_LEN];
307        let mut i = 0;
308        while i < KEY_LEN {
309            new_key[Self::KEY_TO_TREE[i]] = key[i];
310            i += 1;
311        }
312        new_key
313    }
314
315    /// Reorders the key from the tree ordering to the shared key ordering.
316    fn key_ordered(tree_key: &[u8; KEY_LEN]) -> [u8; KEY_LEN] {
317        let mut new_key = [0; KEY_LEN];
318        let mut i = 0;
319        while i < KEY_LEN {
320            new_key[Self::TREE_TO_KEY[i]] = tree_key[i];
321            i += 1;
322        }
323        new_key
324    }
325
326    /// Return the segment index for the byte at `at_depth` in tree ordering.
327    ///
328    /// Default implementation reads the static segmentation table and the
329    /// tree->key mapping. Having this as a method makes call sites clearer and
330    /// reduces the verbosity of expressions that access the segmentation table.
331    fn segment_of_tree_depth(at_depth: usize) -> usize {
332        <Self::Segmentation as KeySegmentation<KEY_LEN>>::SEGMENTS[Self::TREE_TO_KEY[at_depth]]
333    }
334
335    /// Return true if the tree-ordered bytes at `a` and `b` belong to the same
336    /// logical segment.
337    fn same_segment_tree(a: usize, b: usize) -> bool {
338        <Self::Segmentation as KeySegmentation<KEY_LEN>>::SEGMENTS[Self::TREE_TO_KEY[a]]
339            == <Self::Segmentation as KeySegmentation<KEY_LEN>>::SEGMENTS[Self::TREE_TO_KEY[b]]
340    }
341}
342
343/// This trait is used to segment keys stored in the PATCH.
344/// The segmentation is used to determine sub-fields of the key,
345/// allowing for segment based operations, like counting the number
346/// of elements in a segment with a given prefix without traversing the tree.
347///
348/// Note that the segmentation is defined on the shared key ordering,
349/// and should thus be only implemented once, independent of additional key orderings.
350///
351/// See [TribleSegmentation](crate::trible::TribleSegmentation) for an example that segments keys into entity,
352/// attribute, and value segments.
353pub trait KeySegmentation<const KEY_LEN: usize>: Copy + Clone + Debug {
354    /// Segment index for each position in the key.
355    const SEGMENTS: [usize; KEY_LEN];
356}
357
358/// A `KeySchema` that does not reorder the keys.
359/// This is useful for keys that are already ordered in the desired way.
360/// This is the default ordering.
361#[derive(Copy, Clone, Debug)]
362pub struct IdentitySchema {}
363
364/// A `KeySegmentation` that does not segment the keys.
365/// This is useful for keys that do not have a segment structure.
366/// This is the default segmentation.
367#[derive(Copy, Clone, Debug)]
368pub struct SingleSegmentation {}
369impl<const KEY_LEN: usize> KeySchema<KEY_LEN> for IdentitySchema {
370    type Segmentation = SingleSegmentation;
371    const SEGMENT_PERM: &'static [usize] = &[0];
372    const KEY_TO_TREE: [usize; KEY_LEN] = identity_map::<KEY_LEN>();
373    const TREE_TO_KEY: [usize; KEY_LEN] = identity_map::<KEY_LEN>();
374}
375
376impl<const KEY_LEN: usize> KeySegmentation<KEY_LEN> for SingleSegmentation {
377    const SEGMENTS: [usize; KEY_LEN] = [0; KEY_LEN];
378}
379
380#[allow(dead_code)]
381#[derive(Debug, PartialEq, Copy, Clone)]
382#[repr(u8)]
383pub(crate) enum HeadTag {
384    // Stored in the low 4 bits of `Head::tptr` (see Head::new).
385    //
386    // Branch values encode log2(branch_size) (i.e. `Branch2 == 1`, `Branch256
387    // == 8`). `0` is reserved for leaf nodes, which lets us compute the branch
388    // size as `1 << tag` without any offset.
389    Leaf = 0,
390    Branch2 = 1,
391    Branch4 = 2,
392    Branch8 = 3,
393    Branch16 = 4,
394    Branch32 = 5,
395    Branch64 = 6,
396    Branch128 = 7,
397    Branch256 = 8,
398}
399
400impl HeadTag {
401    #[inline]
402    fn from_raw(raw: u8) -> Self {
403        debug_assert!(raw <= HeadTag::Branch256 as u8);
404        // SAFETY: `HeadTag` is `#[repr(u8)]` with a contiguous discriminant
405        // range 0..=8. The tag bits are written by Head::new/set_body and
406        // Branch::tag, which only emit valid discriminants.
407        unsafe { std::mem::transmute(raw) }
408    }
409}
410
411pub(crate) enum BodyPtr<const KEY_LEN: usize, O: KeySchema<KEY_LEN>, V> {
412    Leaf(NonNull<Leaf<KEY_LEN, V>>),
413    Branch(branch::BranchNN<KEY_LEN, O, V>),
414}
415
416/// Immutable borrow view of a Head body.
417/// Returned by `body_ref()` and tied to the lifetime of the `&Head`.
418pub(crate) enum BodyRef<'a, const KEY_LEN: usize, O: KeySchema<KEY_LEN>, V> {
419    Leaf(&'a Leaf<KEY_LEN, V>),
420    Branch(&'a Branch<KEY_LEN, O, [Option<Head<KEY_LEN, O, V>>], V>),
421}
422
423/// Mutable borrow view of a Head body.
424/// Returned by `body_mut()` and tied to the lifetime of the `&mut Head`.
425pub(crate) enum BodyMut<'a, const KEY_LEN: usize, O: KeySchema<KEY_LEN>, V> {
426    Leaf(&'a mut Leaf<KEY_LEN, V>),
427    Branch(&'a mut Branch<KEY_LEN, O, [Option<Head<KEY_LEN, O, V>>], V>),
428}
429
430pub(crate) trait Body {
431    fn tag(body: NonNull<Self>) -> HeadTag;
432}
433
434#[repr(C)]
435pub(crate) struct Head<const KEY_LEN: usize, O: KeySchema<KEY_LEN>, V> {
436    tptr: std::ptr::NonNull<u8>,
437    key_ordering: PhantomData<O>,
438    key_segments: PhantomData<O::Segmentation>,
439    value: PhantomData<V>,
440}
441
442unsafe impl<const KEY_LEN: usize, O: KeySchema<KEY_LEN>, V> Send for Head<KEY_LEN, O, V> {}
443unsafe impl<const KEY_LEN: usize, O: KeySchema<KEY_LEN>, V> Sync for Head<KEY_LEN, O, V> {}
444
445impl<const KEY_LEN: usize, O: KeySchema<KEY_LEN>, V> Head<KEY_LEN, O, V> {
446    // Tagged pointer layout (64-bit only):
447    // - bits 0..=3:   HeadTag (requires 16-byte aligned bodies)
448    // - bits 4..=55:  body pointer bits (52 bits)
449    // - bits 56..=63: key byte for cuckoo table lookup
450    const TAG_MASK: u64 = 0x0f;
451    const BODY_MASK: u64 = 0x00_ff_ff_ff_ff_ff_ff_f0;
452    const KEY_MASK: u64 = 0xff_00_00_00_00_00_00_00;
453
454    pub(crate) fn new<T: Body + ?Sized>(key: u8, body: NonNull<T>) -> Self {
455        unsafe {
456            let tptr =
457                std::ptr::NonNull::new_unchecked((body.as_ptr() as *mut u8).map_addr(|addr| {
458                    debug_assert_eq!(addr as u64 & Self::TAG_MASK, 0);
459                    ((addr as u64 & Self::BODY_MASK)
460                        | ((key as u64) << 56)
461                        | (<T as Body>::tag(body) as u64)) as usize
462                }));
463            Self {
464                tptr,
465                key_ordering: PhantomData,
466                key_segments: PhantomData,
467                value: PhantomData,
468            }
469        }
470    }
471
472    #[inline]
473    pub(crate) fn tag(&self) -> HeadTag {
474        HeadTag::from_raw((self.tptr.as_ptr() as u64 & Self::TAG_MASK) as u8)
475    }
476
477    #[inline]
478    pub(crate) fn key(&self) -> u8 {
479        (self.tptr.as_ptr() as u64 >> 56) as u8
480    }
481
482    #[inline]
483    pub(crate) fn with_key(mut self, key: u8) -> Self {
484        self.tptr =
485            std::ptr::NonNull::new(self.tptr.as_ptr().map_addr(|addr| {
486                ((addr as u64 & !Self::KEY_MASK) | ((key as u64) << 56)) as usize
487            }))
488            .unwrap();
489        self
490    }
491
492    #[inline]
493    pub(crate) fn set_body<T: Body + ?Sized>(&mut self, body: NonNull<T>) {
494        unsafe {
495            self.tptr = NonNull::new_unchecked((body.as_ptr() as *mut u8).map_addr(|addr| {
496                debug_assert_eq!(addr as u64 & Self::TAG_MASK, 0);
497                ((addr as u64 & Self::BODY_MASK)
498                    | (self.tptr.as_ptr() as u64 & Self::KEY_MASK)
499                    | (<T as Body>::tag(body) as u64)) as usize
500            }))
501        }
502    }
503
504    pub(crate) fn with_start(self, new_start_depth: usize) -> Head<KEY_LEN, O, V> {
505        let leaf_key = self.childleaf_key();
506        let i = O::TREE_TO_KEY[new_start_depth];
507        let key = leaf_key[i];
508        self.with_key(key)
509    }
510
511    // Removed childleaf_matches_key_from in favor of composing the existing
512    // has_prefix primitives directly at call sites. Use
513    // `self.has_prefix::<KEY_LEN>(at_depth, key)` or for partial checks
514    // `self.childleaf().has_prefix::<O>(at_depth, &key[..limit])` instead.
515
516    pub(crate) fn body(&self) -> BodyPtr<KEY_LEN, O, V> {
517        unsafe {
518            let ptr = NonNull::new_unchecked(self.tptr.as_ptr().map_addr(|addr| {
519                let masked = (addr as u64) & Self::BODY_MASK;
520                masked as usize
521            }));
522            match self.tag() {
523                HeadTag::Leaf => BodyPtr::Leaf(ptr.cast()),
524                branch_tag => {
525                    let count = 1 << (branch_tag as usize);
526                    BodyPtr::Branch(NonNull::new_unchecked(std::ptr::slice_from_raw_parts(
527                        ptr.as_ptr(),
528                        count,
529                    )
530                        as *mut Branch<KEY_LEN, O, [Option<Head<KEY_LEN, O, V>>], V>))
531                }
532            }
533        }
534    }
535
536    pub(crate) fn body_mut(&mut self) -> BodyMut<'_, KEY_LEN, O, V> {
537        unsafe {
538            match self.body() {
539                BodyPtr::Leaf(mut leaf) => BodyMut::Leaf(leaf.as_mut()),
540                BodyPtr::Branch(mut branch) => {
541                    // Ensure ownership: try copy-on-write and update local pointer if needed.
542                    let mut branch_nn = branch;
543                    if Branch::rc_cow(&mut branch_nn).is_some() {
544                        self.set_body(branch_nn);
545                        BodyMut::Branch(branch_nn.as_mut())
546                    } else {
547                        BodyMut::Branch(branch.as_mut())
548                    }
549                }
550            }
551        }
552    }
553
554    /// Returns an immutable borrow of the body (Leaf or Branch) tied to &self.
555    pub(crate) fn body_ref(&self) -> BodyRef<'_, KEY_LEN, O, V> {
556        match self.body() {
557            BodyPtr::Leaf(nn) => BodyRef::Leaf(unsafe { nn.as_ref() }),
558            BodyPtr::Branch(nn) => BodyRef::Branch(unsafe { nn.as_ref() }),
559        }
560    }
561
562    pub(crate) fn count(&self) -> u64 {
563        match self.body_ref() {
564            BodyRef::Leaf(_) => 1,
565            BodyRef::Branch(branch) => branch.leaf_count,
566        }
567    }
568
569    pub(crate) fn count_segment(&self, at_depth: usize) -> u64 {
570        match self.body_ref() {
571            BodyRef::Leaf(_) => 1,
572            BodyRef::Branch(branch) => branch.count_segment(at_depth),
573        }
574    }
575
576    pub(crate) fn hash(&self) -> u128 {
577        match self.body_ref() {
578            BodyRef::Leaf(leaf) => leaf.hash,
579            BodyRef::Branch(branch) => branch.hash,
580        }
581    }
582
583    pub(crate) fn end_depth(&self) -> usize {
584        match self.body_ref() {
585            BodyRef::Leaf(_) => KEY_LEN,
586            BodyRef::Branch(branch) => branch.end_depth as usize,
587        }
588    }
589
590    /// Return the raw pointer to the child leaf for use in low-level
591    /// operations (for example when constructing a Branch). Prefer
592    /// `childleaf_key()` or other safe accessors when you only need the
593    /// key or value; those avoid unsafe dereferences.
594    pub(crate) fn childleaf_ptr(&self) -> *const Leaf<KEY_LEN, V> {
595        match self.body_ref() {
596            BodyRef::Leaf(leaf) => leaf as *const Leaf<KEY_LEN, V>,
597            BodyRef::Branch(branch) => branch.childleaf_ptr(),
598        }
599    }
600
601    pub(crate) fn childleaf_key(&self) -> &[u8; KEY_LEN] {
602        match self.body_ref() {
603            BodyRef::Leaf(leaf) => &leaf.key,
604            BodyRef::Branch(branch) => &branch.childleaf().key,
605        }
606    }
607
608    // Slot wrapper defined at module level (moved to below the impl block)
609
610    /// Find the first depth in [start_depth, limit) where the tree-ordered
611    /// bytes of `self` and `other` differ. The comparison limit is computed
612    /// as min(self.end_depth(), other.end_depth(), KEY_LEN) which is the
613    /// natural bound for comparing two heads. Returns `Some((depth, a, b))`
614    /// where `a` and `b` are the differing bytes at that depth, or `None`
615    /// if no divergence is found in the range.
616    pub(crate) fn first_divergence(
617        &self,
618        other: &Self,
619        start_depth: usize,
620    ) -> Option<(usize, u8, u8)> {
621        let limit = std::cmp::min(std::cmp::min(self.end_depth(), other.end_depth()), KEY_LEN);
622        debug_assert!(limit <= KEY_LEN);
623        let this_key = self.childleaf_key();
624        let other_key = other.childleaf_key();
625        let mut depth = start_depth;
626        while depth < limit {
627            let i = O::TREE_TO_KEY[depth];
628            let a = this_key[i];
629            let b = other_key[i];
630            if a != b {
631                return Some((depth, a, b));
632            }
633            depth += 1;
634        }
635        None
636    }
637
638    // Mutable access to the child slots for this head. If the head is a
639    // branch, returns a mutable slice referencing the underlying child table
640    // (each element is Option<Head>). If the head is a leaf an empty slice
641    // is returned.
642    //
643    // The caller receives a &mut slice tied to the borrow of `self` and may
644    // reorder entries in-place (e.g., sort_unstable) and then take them using
645    // `Option::take()` to extract Head values. The call uses `body_mut()` so
646    // COW semantics are preserved and callers have exclusive access to the
647    // branch storage while the mutable borrow lasts.
648    // NOTE: mut_children removed — prefer matching on BodyRef returned by
649    // `body_mut()` and operating directly on the `&mut Branch` reference.
650
651    pub(crate) fn remove_leaf(
652        slot: &mut Option<Self>,
653        leaf_key: &[u8; KEY_LEN],
654        start_depth: usize,
655    ) {
656        if let Some(this) = slot {
657            let end_depth = std::cmp::min(this.end_depth(), KEY_LEN);
658            // Check reachable equality by asking the head to test the prefix
659            // up to its end_depth. Using the head/leaf primitive centralises the
660            // unsafe deref into Branch::childleaf()/Leaf::has_prefix.
661            if !this.has_prefix::<KEY_LEN>(start_depth, leaf_key) {
662                return;
663            }
664            if this.tag() == HeadTag::Leaf {
665                slot.take();
666            } else {
667                let mut ed = crate::patch::branch::BranchMut::from_head(this);
668                let key = leaf_key[end_depth];
669                ed.modify_child(key, |mut opt| {
670                    Self::remove_leaf(&mut opt, leaf_key, end_depth);
671                    opt
672                });
673
674                // If the branch now contains a single remaining child we
675                // collapse the branch upward into that child. We must pull
676                // the remaining child out while `ed` is still borrowed,
677                // then drop `ed` before writing back into `slot` to avoid
678                // double mutable borrows of the slot.
679                if ed.leaf_count == 1 {
680                    let mut remaining: Option<Head<KEY_LEN, O, V>> = None;
681                    for slot_child in &mut ed.child_table {
682                        if let Some(child) = slot_child.take() {
683                            remaining = Some(child.with_start(start_depth));
684                            break;
685                        }
686                    }
687                    drop(ed);
688                    if let Some(child) = remaining {
689                        slot.replace(child);
690                    }
691                } else {
692                    // ensure we drop the editor when not collapsing so the
693                    // final pointer is committed back into the head.
694                    drop(ed);
695                }
696            }
697        }
698    }
699
700    // NOTE: slot-level wrappers removed; callers should take the slot and call
701    // the owned helpers (insert_leaf / replace_leaf / union)
702    // directly. This reduces the indirection and keeps ownership semantics
703    // explicit at the call site.
704
705    // Owned variants of the slot-based helpers. These accept the existing
706    // Head by value and return the new Head after performing the
707    // modification. They are used with the split `insert_child` /
708    // `update_child` APIs so we no longer need `Branch::upsert_child`.
709    pub(crate) fn insert_leaf(mut this: Self, leaf: Self, start_depth: usize) -> Self {
710        if let Some((depth, this_byte_key, leaf_byte_key)) =
711            this.first_divergence(&leaf, start_depth)
712        {
713            let old_key = this.key();
714            let new_body = Branch::new(
715                depth,
716                this.with_key(this_byte_key),
717                leaf.with_key(leaf_byte_key),
718            );
719            return Head::new(old_key, new_body);
720        }
721
722        let end_depth = this.end_depth();
723        if end_depth != KEY_LEN {
724            // Use the editable BranchMut view to perform mutations without
725            // exposing pointer juggling at the call site.
726            let mut ed = crate::patch::branch::BranchMut::from_head(&mut this);
727            let inserted = leaf.with_start(ed.end_depth as usize);
728            let key = inserted.key();
729            ed.modify_child(key, |opt| match opt {
730                Some(old) => Some(Head::insert_leaf(old, inserted, end_depth)),
731                None => Some(inserted),
732            });
733        }
734        this
735    }
736
737    pub(crate) fn replace_leaf(mut this: Self, leaf: Self, start_depth: usize) -> Self {
738        if let Some((depth, this_byte_key, leaf_byte_key)) =
739            this.first_divergence(&leaf, start_depth)
740        {
741            let old_key = this.key();
742            let new_body = Branch::new(
743                depth,
744                this.with_key(this_byte_key),
745                leaf.with_key(leaf_byte_key),
746            );
747
748            return Head::new(old_key, new_body);
749        }
750
751        let end_depth = this.end_depth();
752        if end_depth == KEY_LEN {
753            let old_key = this.key();
754            return leaf.with_key(old_key);
755        } else {
756            // Use the editor view for branch mutation instead of raw pointer ops.
757            let mut ed = crate::patch::branch::BranchMut::from_head(&mut this);
758            let inserted = leaf.with_start(ed.end_depth as usize);
759            let key = inserted.key();
760            ed.modify_child(key, |opt| match opt {
761                Some(old) => Some(Head::replace_leaf(old, inserted, end_depth)),
762                None => Some(inserted),
763            });
764        }
765        this
766    }
767
768    /// Sequential PATCH-trie union. Always serial; the parallel
769    /// dispatch lives in [`Self::par_union`] which calls back into
770    /// `union` once budget is exhausted.
771    pub(crate) fn union(mut this: Self, mut other: Self, at_depth: usize) -> Self {
772        if this.hash() == other.hash() {
773            return this;
774        }
775
776        if let Some((depth, this_byte_key, other_byte_key)) =
777            this.first_divergence(&other, at_depth)
778        {
779            let old_key = this.key();
780            let new_body = Branch::new(
781                depth,
782                this.with_key(this_byte_key),
783                other.with_key(other_byte_key),
784            );
785
786            return Head::new(old_key, new_body);
787        }
788
789        let this_depth = this.end_depth();
790        let other_depth = other.end_depth();
791        if this_depth < other_depth {
792            let mut ed = crate::patch::branch::BranchMut::from_head(&mut this);
793            let inserted = other.with_start(ed.end_depth as usize);
794            let key = inserted.key();
795            ed.modify_child(key, |opt| match opt {
796                Some(old) => Some(Head::union(old, inserted, this_depth)),
797                None => Some(inserted),
798            });
799            drop(ed);
800            return this;
801        }
802
803        if other_depth < this_depth {
804            let old_key = this.key();
805            let this_head = this;
806            let mut ed = crate::patch::branch::BranchMut::from_head(&mut other);
807            let inserted = this_head.with_start(ed.end_depth as usize);
808            let key = inserted.key();
809            ed.modify_child(key, |opt| match opt {
810                Some(old) => Some(Head::union(old, inserted, other_depth)),
811                None => Some(inserted),
812            });
813            drop(ed);
814            return other.with_key(old_key);
815        }
816
817        // Equal depth, hashes differ → walk `other`'s children,
818        // resolving collisions via recursive `Head::union` and the
819        // `modify_child`'s per-call accounting.
820        let BodyMut::Branch(other_branch_ref) = other.body_mut() else {
821            unreachable!();
822        };
823        let mut ed = crate::patch::branch::BranchMut::from_head(&mut this);
824        for other_child in other_branch_ref
825            .child_table
826            .iter_mut()
827            .filter_map(Option::take)
828        {
829            let inserted = other_child.with_start(ed.end_depth as usize);
830            let key = inserted.key();
831            ed.modify_child(key, |opt| match opt {
832                Some(old) => Some(Head::union(old, inserted, this_depth)),
833                None => Some(inserted),
834            });
835        }
836        drop(ed);
837        this
838    }
839
840    /// Parallel-aware top-level union entry. Allocates a fresh
841    /// [`parallel_union::ParUnionCtx`] with a budget of
842    /// `num_threads²` shared spawns, then delegates to
843    /// [`Self::par_union_with_ctx`]. The budget persists across the
844    /// entire recursive descent — once exhausted, the rest is
845    /// sequential.
846    #[cfg(feature = "parallel")]
847    pub(crate) fn par_union(this: Self, other: Self, at_depth: usize) -> Self
848    where
849        O: Send + Sync,
850        V: Send + Sync,
851    {
852        let ctx = parallel_union::ParUnionCtx::new();
853        Self::par_union_with_ctx(this, other, at_depth, &ctx)
854    }
855
856    /// Recursive parallel-aware union: at the equal-depth-branch
857    /// arm, drains the "both" pairs and, for each pair, either
858    /// claims a budget unit and spawns a parallel task or falls
859    /// back to serial `Self::union`. All other arms (hash-equal,
860    /// divergence, asymmetric depth) delegate to `Self::union` —
861    /// they don't generate fan-out work for the budget to spend.
862    #[cfg(feature = "parallel")]
863    pub(crate) fn par_union_with_ctx(
864        mut this: Self,
865        mut other: Self,
866        at_depth: usize,
867        ctx: &parallel_union::ParUnionCtx,
868    ) -> Self
869    where
870        O: Send + Sync,
871        V: Send + Sync,
872    {
873        if this.hash() == other.hash() {
874            return this;
875        }
876
877        if let Some((depth, this_byte_key, other_byte_key)) =
878            this.first_divergence(&other, at_depth)
879        {
880            let old_key = this.key();
881            let new_body = Branch::new(
882                depth,
883                this.with_key(this_byte_key),
884                other.with_key(other_byte_key),
885            );
886            return Head::new(old_key, new_body);
887        }
888
889        let this_depth = this.end_depth();
890        let other_depth = other.end_depth();
891        if this_depth != other_depth {
892            // Asymmetric — no fan-out opportunity, serial path wins.
893            return Self::union(this, other, at_depth);
894        }
895
896        // Equal depth, hashes differ → branch merge.
897        let BodyMut::Branch(other_branch_ref) = other.body_mut() else {
898            unreachable!();
899        };
900
901        if (other_branch_ref.leaf_count as usize) < PARALLEL_PATCH_UNION_THRESHOLD {
902            // Small merge — modify_child loop wins. Reconstruct
903            // `other` and delegate.
904            return Self::union(this, other, at_depth);
905        }
906
907        {
908            let mut ed = crate::patch::branch::BranchMut::from_head(&mut this);
909            let end_depth = ed.end_depth as usize;
910
911            // Scatter both child tables into key-indexed 256-slot
912            // arrays + present bitsets. The bitset partition tells us
913            // which keys need a recursive union ("both") vs which are
914            // simple pass-throughs ("only").
915            let mut this_arr: [Option<Head<KEY_LEN, O, V>>; 256] =
916                std::array::from_fn(|_| None);
917            let mut other_arr: [Option<Head<KEY_LEN, O, V>>; 256] =
918                std::array::from_fn(|_| None);
919            let mut this_present = crate::patch::bytetable::ByteSet::new_empty();
920            let mut other_present = crate::patch::bytetable::ByteSet::new_empty();
921
922            for slot in ed.child_table.iter_mut() {
923                if let Some(head) = slot.take() {
924                    let key = head.key();
925                    this_present.insert(key);
926                    this_arr[key as usize] = Some(head);
927                }
928            }
929            for slot in other_branch_ref.child_table.iter_mut() {
930                if let Some(head) = slot.take() {
931                    let head = head.with_start(end_depth);
932                    let key = head.key();
933                    other_present.insert(key);
934                    other_arr[key as usize] = Some(head);
935                }
936            }
937
938            let mut both = this_present.intersect(&other_present);
939            let mut only = this_present.symmetric_difference(&other_present);
940
941            // Pre-allocated scatter-write target. Each spawned task
942            // writes to `resolved[k]` for its specific key byte —
943            // disjoint by construction. The raw pointer wrapper
944            // (`ScatterPtr`) makes the cross-thread sharing explicit.
945            let mut resolved: [Option<Head<KEY_LEN, O, V>>; 256] =
946                std::array::from_fn(|_| None);
947            let resolved_ptr = parallel_union::ScatterPtr(resolved.as_mut_ptr());
948
949            rayon::scope(|s| {
950                // Drain `both` pairs serially in the parent; per
951                // pair, either claim a spawn unit and dispatch as a
952                // task, or run serially via `Head::union` here on
953                // the parent thread. The atomic budget is shared
954                // with all nested `par_union_with_ctx` calls.
955                while let Some(k) = both.drain_next_ascending() {
956                    let i = k as usize;
957                    let t = this_arr[i].take().expect("both ⇒ this");
958                    let o = other_arr[i].take().expect("both ⇒ other");
959                    if ctx.try_claim() {
960                        s.spawn(move |_| {
961                            let head = Self::par_union_with_ctx(t, o, this_depth, ctx);
962                            // SAFETY: each task has a distinct
963                            // key `k`, so the writes to
964                            // `resolved[i]` are non-aliasing.
965                            unsafe {
966                                resolved_ptr.write_at(i, Some(head));
967                            }
968                        });
969                    } else {
970                        // Budget exhausted — fall back to fully
971                        // serial union on this pair, then scatter
972                        // the result. SAFETY: same disjointness
973                        // invariant; the parent thread races only
974                        // with tasks targeting distinct keys.
975                        let head = Self::union(t, o, this_depth);
976                        unsafe {
977                            resolved_ptr.write_at(i, Some(head));
978                        }
979                    }
980                }
981            });
982            // After scope: all spawned tasks have completed; the
983            // scatter writes to `resolved` are all sequenced-before
984            // here by rayon's join semantics.
985
986            for slot in resolved.iter_mut() {
987                if let Some(head) = slot.take() {
988                    ed.install_child_growing(head);
989                }
990            }
991            while let Some(k) = only.drain_next_ascending() {
992                let i = k as usize;
993                let head = this_arr[i]
994                    .take()
995                    .or_else(|| other_arr[i].take())
996                    .expect("only ⇒ exactly one side");
997                ed.install_child_growing(head);
998            }
999
1000            ed.recompute_aggregates();
1001        }
1002        this
1003    }
1004
1005    pub(crate) fn infixes<const PREFIX_LEN: usize, const INFIX_LEN: usize, F>(
1006        &self,
1007        prefix: &[u8; PREFIX_LEN],
1008        at_depth: usize,
1009        f: &mut F,
1010    ) where
1011        F: FnMut(&[u8; INFIX_LEN]),
1012    {
1013        match self.body_ref() {
1014            BodyRef::Leaf(leaf) => leaf.infixes::<PREFIX_LEN, INFIX_LEN, O, F>(prefix, at_depth, f),
1015            BodyRef::Branch(branch) => {
1016                branch.infixes::<PREFIX_LEN, INFIX_LEN, F>(prefix, at_depth, f)
1017            }
1018        }
1019    }
1020
1021    pub(crate) fn infixes_range<const PREFIX_LEN: usize, const INFIX_LEN: usize, F>(
1022        &self,
1023        prefix: &[u8; PREFIX_LEN],
1024        at_depth: usize,
1025        min_infix: &[u8; INFIX_LEN],
1026        max_infix: &[u8; INFIX_LEN],
1027        f: &mut F,
1028    ) where
1029        F: FnMut(&[u8; INFIX_LEN]),
1030    {
1031        match self.body_ref() {
1032            BodyRef::Leaf(leaf) => leaf.infixes_range::<PREFIX_LEN, INFIX_LEN, O, F>(
1033                prefix, at_depth, min_infix, max_infix, f,
1034            ),
1035            BodyRef::Branch(branch) => branch.infixes_range::<PREFIX_LEN, INFIX_LEN, F>(
1036                prefix, at_depth, min_infix, max_infix, f,
1037            ),
1038        }
1039    }
1040
1041    pub(crate) fn count_range<const PREFIX_LEN: usize, const INFIX_LEN: usize>(
1042        &self,
1043        prefix: &[u8; PREFIX_LEN],
1044        at_depth: usize,
1045        min_infix: &[u8; INFIX_LEN],
1046        max_infix: &[u8; INFIX_LEN],
1047    ) -> u64 {
1048        match self.body_ref() {
1049            BodyRef::Leaf(leaf) => {
1050                leaf.count_range::<PREFIX_LEN, INFIX_LEN, O>(prefix, at_depth, min_infix, max_infix)
1051            }
1052            BodyRef::Branch(branch) => {
1053                branch.count_range::<PREFIX_LEN, INFIX_LEN>(prefix, at_depth, min_infix, max_infix)
1054            }
1055        }
1056    }
1057
1058    pub(crate) fn has_prefix<const PREFIX_LEN: usize>(
1059        &self,
1060        at_depth: usize,
1061        prefix: &[u8; PREFIX_LEN],
1062    ) -> bool {
1063        const {
1064            assert!(PREFIX_LEN <= KEY_LEN);
1065        }
1066        match self.body_ref() {
1067            BodyRef::Leaf(leaf) => leaf.has_prefix::<O>(at_depth, prefix),
1068            BodyRef::Branch(branch) => branch.has_prefix::<PREFIX_LEN>(at_depth, prefix),
1069        }
1070    }
1071
1072    pub(crate) fn get<'a>(&'a self, at_depth: usize, key: &[u8; KEY_LEN]) -> Option<&'a V>
1073    where
1074        O: 'a,
1075    {
1076        match self.body_ref() {
1077            BodyRef::Leaf(leaf) => leaf.get::<O>(at_depth, key),
1078            BodyRef::Branch(branch) => branch.get(at_depth, key),
1079        }
1080    }
1081
1082    pub(crate) fn segmented_len<const PREFIX_LEN: usize>(
1083        &self,
1084        at_depth: usize,
1085        prefix: &[u8; PREFIX_LEN],
1086    ) -> u64 {
1087        match self.body_ref() {
1088            BodyRef::Leaf(leaf) => leaf.segmented_len::<O, PREFIX_LEN>(at_depth, prefix),
1089            BodyRef::Branch(branch) => branch.segmented_len::<PREFIX_LEN>(at_depth, prefix),
1090        }
1091    }
1092
1093    // NOTE: slot-level union wrapper removed; callers should take the slot and
1094    // call the owned helper `union` directly.
1095
1096    pub(crate) fn intersect(&self, other: &Self, at_depth: usize) -> Option<Self> {
1097        if self.hash() == other.hash() {
1098            return Some(self.clone());
1099        }
1100
1101        if self.first_divergence(other, at_depth).is_some() {
1102            return None;
1103        }
1104
1105        let self_depth = self.end_depth();
1106        let other_depth = other.end_depth();
1107        if self_depth < other_depth {
1108            // This means that there can be at most one child in self
1109            // that might intersect with other.
1110            let BodyRef::Branch(branch) = self.body_ref() else {
1111                unreachable!();
1112            };
1113            return branch
1114                .child_table
1115                .table_get(other.childleaf_key()[O::TREE_TO_KEY[self_depth]])
1116                .and_then(|self_child| other.intersect(self_child, self_depth));
1117        }
1118
1119        if other_depth < self_depth {
1120            // This means that there can be at most one child in other
1121            // that might intersect with self.
1122            // If the depth of other is less than the depth of self, then it can't be a leaf.
1123            let BodyRef::Branch(other_branch) = other.body_ref() else {
1124                unreachable!();
1125            };
1126            return other_branch
1127                .child_table
1128                .table_get(self.childleaf_key()[O::TREE_TO_KEY[other_depth]])
1129                .and_then(|other_child| self.intersect(other_child, other_depth));
1130        }
1131
1132        // If we reached this point then the depths are equal. The only way to have a leaf
1133        // is if the other is a leaf as well, which is already handled by the hash check if they are equal,
1134        // and by the key check if they are not equal.
1135        // If one of them is a leaf and the other is a branch, then they would also have different depths,
1136        // which is already handled by the above code.
1137        let BodyRef::Branch(self_branch) = self.body_ref() else {
1138            unreachable!();
1139        };
1140        let BodyRef::Branch(other_branch) = other.body_ref() else {
1141            unreachable!();
1142        };
1143
1144        let mut intersected_children = self_branch
1145            .child_table
1146            .iter()
1147            .filter_map(Option::as_ref)
1148            .filter_map(|self_child| {
1149                let other_child = other_branch.child_table.table_get(self_child.key())?;
1150                self_child.intersect(other_child, self_depth)
1151            });
1152        let first_child = intersected_children.next()?;
1153        let Some(second_child) = intersected_children.next() else {
1154            return Some(first_child);
1155        };
1156        let new_branch = Branch::new(
1157            self_depth,
1158            first_child.with_start(self_depth),
1159            second_child.with_start(self_depth),
1160        );
1161        // Use a BranchMut editor to perform all child insertions via the
1162        // safe editor API instead of manipulating the NonNull pointer
1163        // directly. The editor will perform COW and commit the final
1164        // pointer into the Head when it is dropped.
1165        let mut head_for_branch = Head::new(0, new_branch);
1166        {
1167            let mut ed = crate::patch::branch::BranchMut::from_head(&mut head_for_branch);
1168            for child in intersected_children {
1169                let inserted = child.with_start(self_depth);
1170                let k = inserted.key();
1171                ed.modify_child(k, |_opt| Some(inserted));
1172            }
1173            // ed dropped here commits the final branch pointer into head_for_branch
1174        }
1175        Some(head_for_branch)
1176    }
1177
1178    /// Returns the difference between self and other.
1179    /// This is the set of elements that are in self but not in other.
1180    /// If the difference is empty, None is returned.
1181    pub(crate) fn difference(&self, other: &Self, at_depth: usize) -> Option<Self> {
1182        if self.hash() == other.hash() {
1183            return None;
1184        }
1185
1186        if self.first_divergence(other, at_depth).is_some() {
1187            return Some(self.clone());
1188        }
1189
1190        let self_depth = self.end_depth();
1191        let other_depth = other.end_depth();
1192        if self_depth < other_depth {
1193            // This means that there can be at most one child in self
1194            // that might intersect with other. It's the only child that may not be in the difference.
1195            // The other children are definitely in the difference, as they have no corresponding byte in other.
1196            // Thus the cheapest way to compute the difference is compute the difference of the only child
1197            // that might intersect with other, copy self with it's correctly filled byte table, then
1198            // remove the old child, and insert the new child.
1199            let mut new_branch = self.clone();
1200            let other_byte_key = other.childleaf_key()[O::TREE_TO_KEY[self_depth]];
1201            {
1202                let mut ed = crate::patch::branch::BranchMut::from_head(&mut new_branch);
1203                ed.modify_child(other_byte_key, |opt| {
1204                    opt.and_then(|child| child.difference(other, self_depth))
1205                });
1206            }
1207            return Some(new_branch);
1208        }
1209
1210        if other_depth < self_depth {
1211            // This means that we need to check if there is a child in other
1212            // that matches the path at the current depth of self.
1213            // There is no such child, then then self must be in the difference.
1214            // If there is such a child, then we have to compute the difference
1215            // between self and that child.
1216            // We know that other must be a branch.
1217            let BodyRef::Branch(other_branch) = other.body_ref() else {
1218                unreachable!();
1219            };
1220            let self_byte_key = self.childleaf_key()[O::TREE_TO_KEY[other_depth]];
1221            if let Some(other_child) = other_branch.child_table.table_get(self_byte_key) {
1222                return self.difference(other_child, at_depth);
1223            } else {
1224                return Some(self.clone());
1225            }
1226        }
1227
1228        // If we reached this point then the depths are equal. The only way to have a leaf
1229        // is if the other is a leaf as well, which is already handled by the hash check if they are equal,
1230        // and by the key check if they are not equal.
1231        // If one of them is a leaf and the other is a branch, then they would also have different depths,
1232        // which is already handled by the above code.
1233        let BodyRef::Branch(self_branch) = self.body_ref() else {
1234            unreachable!();
1235        };
1236        let BodyRef::Branch(other_branch) = other.body_ref() else {
1237            unreachable!();
1238        };
1239
1240        let mut differenced_children = self_branch
1241            .child_table
1242            .iter()
1243            .filter_map(Option::as_ref)
1244            .filter_map(|self_child| {
1245                if let Some(other_child) = other_branch.child_table.table_get(self_child.key()) {
1246                    self_child.difference(other_child, self_depth)
1247                } else {
1248                    Some(self_child.clone())
1249                }
1250            });
1251
1252        let first_child = differenced_children.next()?;
1253        let second_child = match differenced_children.next() {
1254            Some(sc) => sc,
1255            None => return Some(first_child),
1256        };
1257
1258        let new_branch = Branch::new(
1259            self_depth,
1260            first_child.with_start(self_depth),
1261            second_child.with_start(self_depth),
1262        );
1263        let mut head_for_branch = Head::new(0, new_branch);
1264        {
1265            let mut ed = crate::patch::branch::BranchMut::from_head(&mut head_for_branch);
1266            for child in differenced_children {
1267                let inserted = child.with_start(self_depth);
1268                let k = inserted.key();
1269                ed.modify_child(k, |_opt| Some(inserted));
1270            }
1271            // ed dropped here commits the final branch pointer into head_for_branch
1272        }
1273        // The key will be set later, because we don't know it yet.
1274        // The difference might remove multiple levels of branches,
1275        // so we can't just take the key from self or other.
1276        Some(head_for_branch)
1277    }
1278}
1279
1280unsafe impl<const KEY_LEN: usize, O: KeySchema<KEY_LEN>, V> ByteEntry for Head<KEY_LEN, O, V> {
1281    fn key(&self) -> u8 {
1282        self.key()
1283    }
1284}
1285
1286impl<const KEY_LEN: usize, O: KeySchema<KEY_LEN>, V> fmt::Debug for Head<KEY_LEN, O, V> {
1287    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1288        self.tag().fmt(f)
1289    }
1290}
1291
1292impl<const KEY_LEN: usize, O: KeySchema<KEY_LEN>, V> Clone for Head<KEY_LEN, O, V> {
1293    fn clone(&self) -> Self {
1294        unsafe {
1295            match self.body() {
1296                BodyPtr::Leaf(leaf) => Self::new(self.key(), Leaf::rc_inc(leaf)),
1297                BodyPtr::Branch(branch) => Self::new(self.key(), Branch::rc_inc(branch)),
1298            }
1299        }
1300    }
1301}
1302
1303// The Slot wrapper was removed in favor of using BranchMut::from_slot(&mut
1304// Option<Head<...>>) directly. This keeps the API surface smaller and
1305// avoids an extra helper type that simply forwarded to BranchMut.
1306
1307impl<const KEY_LEN: usize, O: KeySchema<KEY_LEN>, V> Drop for Head<KEY_LEN, O, V> {
1308    fn drop(&mut self) {
1309        unsafe {
1310            match self.body() {
1311                BodyPtr::Leaf(leaf) => Leaf::rc_dec(leaf),
1312                BodyPtr::Branch(branch) => Branch::rc_dec(branch),
1313            }
1314        }
1315    }
1316}
1317
1318/// A PATCH is a persistent data structure that stores a set of keys.
1319/// Each key can be reordered and segmented, based on the provided key ordering and segmentation.
1320///
1321/// The patch supports efficient set operations, like union, intersection, and difference,
1322/// because it efficiently maintains a hash for all keys that are part of a sub-tree.
1323///
1324/// The tree itself is a path- and node-compressed a 256-ary trie.
1325/// Each nodes stores its children in a byte oriented cuckoo hash table,
1326/// allowing for O(1) access to children, while keeping the memory overhead low.
1327/// Table sizes are powers of two, starting at 2.
1328///
1329/// Having a single node type for all branching factors simplifies the implementation,
1330/// compared to other adaptive trie implementations, like ARTs or Judy Arrays
1331///
1332/// The PATCH allows for cheap copy-on-write operations, with `clone` being O(1).
1333#[derive(Debug)]
1334pub struct PATCH<const KEY_LEN: usize, O = IdentitySchema, V = ()>
1335where
1336    O: KeySchema<KEY_LEN>,
1337{
1338    root: Option<Head<KEY_LEN, O, V>>,
1339}
1340
1341impl<const KEY_LEN: usize, O, V> Clone for PATCH<KEY_LEN, O, V>
1342where
1343    O: KeySchema<KEY_LEN>,
1344{
1345    fn clone(&self) -> Self {
1346        Self {
1347            root: self.root.clone(),
1348        }
1349    }
1350}
1351
1352impl<const KEY_LEN: usize, O, V> Default for PATCH<KEY_LEN, O, V>
1353where
1354    O: KeySchema<KEY_LEN>,
1355{
1356    fn default() -> Self {
1357        Self::new()
1358    }
1359}
1360
1361impl<const KEY_LEN: usize, O, V> PATCH<KEY_LEN, O, V>
1362where
1363    O: KeySchema<KEY_LEN>,
1364{
1365    /// Creates a new empty PATCH.
1366    pub fn new() -> Self {
1367        init_sip_key();
1368        PATCH { root: None }
1369    }
1370
1371    /// Inserts a shared key into the PATCH.
1372    ///
1373    /// Takes an [Entry] object that can be created from a key,
1374    /// and inserted into multiple PATCH instances.
1375    ///
1376    /// If the key is already present, this is a no-op.
1377    pub fn insert(&mut self, entry: &Entry<KEY_LEN, V>) {
1378        if self.root.is_some() {
1379            let this = self.root.take().expect("root should not be empty");
1380            let new_head = Head::insert_leaf(this, entry.leaf(), 0);
1381            self.root.replace(new_head);
1382        } else {
1383            self.root.replace(entry.leaf());
1384        }
1385    }
1386
1387    /// Inserts a key into the PATCH, replacing the value if it already exists.
1388    pub fn replace(&mut self, entry: &Entry<KEY_LEN, V>) {
1389        if self.root.is_some() {
1390            let this = self.root.take().expect("root should not be empty");
1391            let new_head = Head::replace_leaf(this, entry.leaf(), 0);
1392            self.root.replace(new_head);
1393        } else {
1394            self.root.replace(entry.leaf());
1395        }
1396    }
1397
1398    /// Removes a key from the PATCH.
1399    ///
1400    /// If the key is not present, this is a no-op.
1401    pub fn remove(&mut self, key: &[u8; KEY_LEN]) {
1402        Head::remove_leaf(&mut self.root, key, 0);
1403    }
1404
1405    /// Returns the number of keys in the PATCH.
1406    pub fn len(&self) -> u64 {
1407        if let Some(root) = &self.root {
1408            root.count()
1409        } else {
1410            0
1411        }
1412    }
1413
1414    /// Returns true if the PATCH contains no keys.
1415    pub fn is_empty(&self) -> bool {
1416        self.len() == 0
1417    }
1418
1419    pub(crate) fn root_hash(&self) -> Option<u128> {
1420        self.root.as_ref().map(|root| root.hash())
1421    }
1422
1423    /// Returns the value associated with `key` if present.
1424    pub fn get(&self, key: &[u8; KEY_LEN]) -> Option<&V> {
1425        self.root.as_ref().and_then(|root| root.get(0, key))
1426    }
1427
1428    /// Allows iteratig over all infixes of a given length with a given prefix.
1429    /// Each infix is passed to the provided closure.
1430    ///
1431    /// The entire operation is performed over the tree view ordering of the keys.
1432    ///
1433    /// The length of the prefix and the infix is provided as type parameters,
1434    /// but will usually inferred from the arguments.
1435    ///
1436    /// The sum of `PREFIX_LEN` and `INFIX_LEN` must be less than or equal to `KEY_LEN`
1437    /// or a compile-time assertion will fail.
1438    ///
1439    /// Because all infixes are iterated in one go, less bookkeeping is required,
1440    /// than when using an Iterator, allowing for better performance.
1441    pub fn infixes<const PREFIX_LEN: usize, const INFIX_LEN: usize, F>(
1442        &self,
1443        prefix: &[u8; PREFIX_LEN],
1444        mut for_each: F,
1445    ) where
1446        F: FnMut(&[u8; INFIX_LEN]),
1447    {
1448        const {
1449            assert!(PREFIX_LEN + INFIX_LEN <= KEY_LEN);
1450        }
1451        assert!(
1452            O::same_segment_tree(PREFIX_LEN, PREFIX_LEN + INFIX_LEN - 1)
1453                && (PREFIX_LEN + INFIX_LEN == KEY_LEN
1454                    || !O::same_segment_tree(PREFIX_LEN + INFIX_LEN - 1, PREFIX_LEN + INFIX_LEN)),
1455            "INFIX_LEN must cover a whole segment"
1456        );
1457        if let Some(root) = &self.root {
1458            root.infixes(prefix, 0, &mut for_each);
1459        }
1460    }
1461
1462    /// Like [`infixes`](Self::infixes) but only yields infixes in the
1463    /// byte range `[min_infix, max_infix]` (inclusive).
1464    ///
1465    /// The trie is pruned at each depth: branches whose byte key falls
1466    /// outside the range at the current infix position are skipped
1467    /// entirely, avoiding traversal of irrelevant subtrees.
1468    pub fn infixes_range<const PREFIX_LEN: usize, const INFIX_LEN: usize, F>(
1469        &self,
1470        prefix: &[u8; PREFIX_LEN],
1471        min_infix: &[u8; INFIX_LEN],
1472        max_infix: &[u8; INFIX_LEN],
1473        mut for_each: F,
1474    ) where
1475        F: FnMut(&[u8; INFIX_LEN]),
1476    {
1477        const {
1478            assert!(PREFIX_LEN + INFIX_LEN <= KEY_LEN);
1479        }
1480        assert!(
1481            O::same_segment_tree(PREFIX_LEN, PREFIX_LEN + INFIX_LEN - 1)
1482                && (PREFIX_LEN + INFIX_LEN == KEY_LEN
1483                    || !O::same_segment_tree(PREFIX_LEN + INFIX_LEN - 1, PREFIX_LEN + INFIX_LEN)),
1484            "INFIX_LEN must cover a whole segment"
1485        );
1486        if let Some(root) = &self.root {
1487            root.infixes_range(prefix, 0, min_infix, max_infix, &mut for_each);
1488        }
1489    }
1490
1491    /// Count entries whose infix falls within [min_infix, max_infix].
1492    ///
1493    /// Uses cached `leaf_count` on branches to skip entire subtrees that
1494    /// are fully inside the range, making the count O(boundary_nodes)
1495    /// rather than O(matching_leaves).
1496    pub fn count_range<const PREFIX_LEN: usize, const INFIX_LEN: usize>(
1497        &self,
1498        prefix: &[u8; PREFIX_LEN],
1499        min_infix: &[u8; INFIX_LEN],
1500        max_infix: &[u8; INFIX_LEN],
1501    ) -> u64 {
1502        const {
1503            assert!(PREFIX_LEN + INFIX_LEN <= KEY_LEN);
1504        }
1505        match &self.root {
1506            Some(root) => root.count_range(prefix, 0, min_infix, max_infix),
1507            None => 0,
1508        }
1509    }
1510
1511    /// Returns true if the PATCH has a key with the given prefix.
1512    ///
1513    /// `PREFIX_LEN` must be less than or equal to `KEY_LEN` or a compile-time
1514    /// assertion will fail.
1515    pub fn has_prefix<const PREFIX_LEN: usize>(&self, prefix: &[u8; PREFIX_LEN]) -> bool {
1516        const {
1517            assert!(PREFIX_LEN <= KEY_LEN);
1518        }
1519        if let Some(root) = &self.root {
1520            root.has_prefix(0, prefix)
1521        } else {
1522            PREFIX_LEN == 0
1523        }
1524    }
1525
1526    /// Returns the number of unique segments in keys with the given prefix.
1527    pub fn segmented_len<const PREFIX_LEN: usize>(&self, prefix: &[u8; PREFIX_LEN]) -> u64 {
1528        const {
1529            assert!(PREFIX_LEN <= KEY_LEN);
1530            if PREFIX_LEN > 0 && PREFIX_LEN < KEY_LEN {
1531                assert!(
1532                    <O as KeySchema<KEY_LEN>>::Segmentation::SEGMENTS
1533                        [O::TREE_TO_KEY[PREFIX_LEN - 1]]
1534                        != <O as KeySchema<KEY_LEN>>::Segmentation::SEGMENTS
1535                            [O::TREE_TO_KEY[PREFIX_LEN]],
1536                    "PREFIX_LEN must align to segment boundary",
1537                );
1538            }
1539        }
1540        if let Some(root) = &self.root {
1541            root.segmented_len(0, prefix)
1542        } else {
1543            0
1544        }
1545    }
1546
1547    /// Iterates over all keys in the PATCH.
1548    /// The keys are returned in key ordering but random order.
1549    pub fn iter<'a>(&'a self) -> PATCHIterator<'a, KEY_LEN, O, V> {
1550        PATCHIterator::new(self)
1551    }
1552
1553    /// Iterates over all keys in the PATCH in key order.
1554    ///
1555    /// The traversal visits every key in lexicographic key order, without
1556    /// accepting a prefix filter. For prefix-aware iteration, see
1557    /// [`PATCH::iter_prefix_count`].
1558    pub fn iter_ordered<'a>(&'a self) -> PATCHOrderedIterator<'a, KEY_LEN, O, V> {
1559        PATCHOrderedIterator::new(self)
1560    }
1561
1562    /// Iterate over all prefixes of the given length in the PATCH.
1563    /// The prefixes are naturally returned in tree ordering and tree order.
1564    /// A count of the number of elements for the given prefix is also returned.
1565    pub fn iter_prefix_count<'a, const PREFIX_LEN: usize>(
1566        &'a self,
1567    ) -> PATCHPrefixIterator<'a, KEY_LEN, PREFIX_LEN, O, V> {
1568        PATCHPrefixIterator::new(self)
1569    }
1570
1571    /// Unions this PATCH with another PATCH.
1572    ///
1573    /// The other PATCH is consumed, and this PATCH is updated in place.
1574    pub fn union(&mut self, other: Self)
1575    where
1576        O: Send + Sync,
1577        V: Send + Sync,
1578    {
1579        if let Some(other) = other.root {
1580            if self.root.is_some() {
1581                let this = self.root.take().expect("root should not be empty");
1582                #[cfg(feature = "parallel")]
1583                let merged = Head::par_union(this, other, 0);
1584                #[cfg(not(feature = "parallel"))]
1585                let merged = Head::union(this, other, 0);
1586                self.root.replace(merged);
1587            } else {
1588                self.root.replace(other);
1589            }
1590        }
1591    }
1592
1593    /// Intersects this PATCH with another PATCH.
1594    ///
1595    /// Returns a new PATCH that contains only the keys that are present in both PATCHes.
1596    pub fn intersect(&self, other: &Self) -> Self {
1597        if let Some(root) = &self.root {
1598            if let Some(other_root) = &other.root {
1599                return Self {
1600                    root: root.intersect(other_root, 0).map(|root| root.with_start(0)),
1601                };
1602            }
1603        }
1604        Self::new()
1605    }
1606
1607    /// Returns the difference between this PATCH and another PATCH.
1608    ///
1609    /// Returns a new PATCH that contains only the keys that are present in this PATCH,
1610    /// but not in the other PATCH.
1611    pub fn difference(&self, other: &Self) -> Self {
1612        if let Some(root) = &self.root {
1613            if let Some(other_root) = &other.root {
1614                Self {
1615                    root: root.difference(other_root, 0),
1616                }
1617            } else {
1618                (*self).clone()
1619            }
1620        } else {
1621            Self::new()
1622        }
1623    }
1624
1625    /// Calculates the average fill level for branch nodes grouped by their
1626    /// branching factor. The returned array contains eight entries for branch
1627    /// sizes `2`, `4`, `8`, `16`, `32`, `64`, `128` and `256` in that order.
1628    //#[cfg(debug_assertions)]
1629    pub fn debug_branch_fill(&self) -> [f32; 8] {
1630        let mut counts = [0u64; 8];
1631        let mut used = [0u64; 8];
1632
1633        if let Some(root) = &self.root {
1634            let mut stack = Vec::new();
1635            stack.push(root);
1636
1637            while let Some(head) = stack.pop() {
1638                match head.body_ref() {
1639                    BodyRef::Leaf(_) => {}
1640                    BodyRef::Branch(b) => {
1641                        let size = b.child_table.len();
1642                        let idx = size.trailing_zeros() as usize - 1;
1643                        counts[idx] += 1;
1644                        used[idx] += b.child_table.iter().filter(|c| c.is_some()).count() as u64;
1645                        for child in b.child_table.iter().filter_map(|c| c.as_ref()) {
1646                            stack.push(child);
1647                        }
1648                    }
1649                }
1650            }
1651        }
1652
1653        let mut avg = [0f32; 8];
1654        for i in 0..8 {
1655            if counts[i] > 0 {
1656                let size = 1u64 << (i + 1);
1657                avg[i] = used[i] as f32 / (counts[i] as f32 * size as f32);
1658            }
1659        }
1660        avg
1661    }
1662}
1663
1664impl<const KEY_LEN: usize, O, V> PartialEq for PATCH<KEY_LEN, O, V>
1665where
1666    O: KeySchema<KEY_LEN>,
1667{
1668    fn eq(&self, other: &Self) -> bool {
1669        self.root.as_ref().map(|root| root.hash()) == other.root.as_ref().map(|root| root.hash())
1670    }
1671}
1672
1673impl<const KEY_LEN: usize, O, V> Eq for PATCH<KEY_LEN, O, V> where O: KeySchema<KEY_LEN> {}
1674
1675impl<'a, const KEY_LEN: usize, O, V> IntoIterator for &'a PATCH<KEY_LEN, O, V>
1676where
1677    O: KeySchema<KEY_LEN>,
1678{
1679    type Item = &'a [u8; KEY_LEN];
1680    type IntoIter = PATCHIterator<'a, KEY_LEN, O, V>;
1681
1682    fn into_iter(self) -> Self::IntoIter {
1683        PATCHIterator::new(self)
1684    }
1685}
1686
1687/// An iterator over all keys in a PATCH.
1688/// The keys are returned in key ordering but in random order.
1689pub struct PATCHIterator<'a, const KEY_LEN: usize, O: KeySchema<KEY_LEN>, V> {
1690    stack: ArrayVec<std::slice::Iter<'a, Option<Head<KEY_LEN, O, V>>>, KEY_LEN>,
1691    remaining: usize,
1692}
1693
1694impl<'a, const KEY_LEN: usize, O: KeySchema<KEY_LEN>, V> PATCHIterator<'a, KEY_LEN, O, V> {
1695    /// Creates an iterator over all keys in `patch`.
1696    pub fn new(patch: &'a PATCH<KEY_LEN, O, V>) -> Self {
1697        let mut r = PATCHIterator {
1698            stack: ArrayVec::new(),
1699            remaining: patch.len().min(usize::MAX as u64) as usize,
1700        };
1701        r.stack.push(std::slice::from_ref(&patch.root).iter());
1702        r
1703    }
1704}
1705
1706impl<'a, const KEY_LEN: usize, O: KeySchema<KEY_LEN>, V> Iterator
1707    for PATCHIterator<'a, KEY_LEN, O, V>
1708{
1709    type Item = &'a [u8; KEY_LEN];
1710
1711    fn next(&mut self) -> Option<Self::Item> {
1712        let mut iter = self.stack.last_mut()?;
1713        loop {
1714            if let Some(child) = iter.next() {
1715                if let Some(child) = child {
1716                    match child.body_ref() {
1717                        BodyRef::Leaf(_) => {
1718                            self.remaining = self.remaining.saturating_sub(1);
1719                            // Use the safe accessor on the child reference to obtain the leaf key bytes.
1720                            return Some(child.childleaf_key());
1721                        }
1722                        BodyRef::Branch(branch) => {
1723                            self.stack.push(branch.child_table.iter());
1724                            iter = self.stack.last_mut()?;
1725                        }
1726                    }
1727                }
1728            } else {
1729                self.stack.pop();
1730                iter = self.stack.last_mut()?;
1731            }
1732        }
1733    }
1734
1735    fn size_hint(&self) -> (usize, Option<usize>) {
1736        (self.remaining, Some(self.remaining))
1737    }
1738}
1739
1740impl<'a, const KEY_LEN: usize, O: KeySchema<KEY_LEN>, V> ExactSizeIterator
1741    for PATCHIterator<'a, KEY_LEN, O, V>
1742{
1743}
1744
1745impl<'a, const KEY_LEN: usize, O: KeySchema<KEY_LEN>, V> std::iter::FusedIterator
1746    for PATCHIterator<'a, KEY_LEN, O, V>
1747{
1748}
1749
1750/// An iterator over every key in a PATCH, returned in key order.
1751///
1752/// Keys are yielded in lexicographic key order regardless of their physical
1753/// layout in the underlying tree. This iterator walks the full tree and does
1754/// not accept a prefix filter. For prefix-aware iteration, use
1755/// [`PATCHPrefixIterator`], constructed via [`PATCH::iter_prefix_count`].
1756pub struct PATCHOrderedIterator<'a, const KEY_LEN: usize, O: KeySchema<KEY_LEN>, V> {
1757    stack: Vec<ArrayVec<&'a Head<KEY_LEN, O, V>, 256>>,
1758    remaining: usize,
1759}
1760
1761impl<'a, const KEY_LEN: usize, O: KeySchema<KEY_LEN>, V> PATCHOrderedIterator<'a, KEY_LEN, O, V> {
1762    pub fn new(patch: &'a PATCH<KEY_LEN, O, V>) -> Self {
1763        let mut r = PATCHOrderedIterator {
1764            stack: Vec::with_capacity(KEY_LEN),
1765            remaining: patch.len().min(usize::MAX as u64) as usize,
1766        };
1767        if let Some(root) = &patch.root {
1768            r.stack.push(ArrayVec::new());
1769            match root.body_ref() {
1770                BodyRef::Leaf(_) => {
1771                    r.stack[0].push(root);
1772                }
1773                BodyRef::Branch(branch) => {
1774                    let first_level = &mut r.stack[0];
1775                    first_level.extend(branch.child_table.iter().filter_map(|c| c.as_ref()));
1776                    first_level.sort_unstable_by_key(|&k| Reverse(k.key())); // We need to reverse here because we pop from the vec.
1777                }
1778            }
1779        }
1780        r
1781    }
1782}
1783
1784// --- Owned consuming iterators ---
1785/// Iterator that owns a PATCH and yields keys in key-order. The iterator
1786/// consumes the PATCH and stores it on the heap (Box) so it can safely hold
1787/// raw pointers into the patch memory while the iterator is moved.
1788pub struct PATCHIntoIterator<const KEY_LEN: usize, O: KeySchema<KEY_LEN>, V> {
1789    queue: Vec<Head<KEY_LEN, O, V>>,
1790    remaining: usize,
1791}
1792
1793impl<const KEY_LEN: usize, O: KeySchema<KEY_LEN>, V> PATCHIntoIterator<KEY_LEN, O, V> {}
1794
1795impl<const KEY_LEN: usize, O: KeySchema<KEY_LEN>, V> Iterator for PATCHIntoIterator<KEY_LEN, O, V> {
1796    type Item = [u8; KEY_LEN];
1797
1798    fn next(&mut self) -> Option<Self::Item> {
1799        let q = &mut self.queue;
1800        while let Some(mut head) = q.pop() {
1801            // Match on the mutable body directly. For leaves we can return the
1802            // stored key (the array is Copy), for branches we take children out
1803            // of the table and push them onto the stack so they are visited
1804            // depth-first.
1805            match head.body_mut() {
1806                BodyMut::Leaf(leaf) => {
1807                    self.remaining = self.remaining.saturating_sub(1);
1808                    return Some(leaf.key);
1809                }
1810                BodyMut::Branch(branch) => {
1811                    for slot in branch.child_table.iter_mut().rev() {
1812                        if let Some(c) = slot.take() {
1813                            q.push(c);
1814                        }
1815                    }
1816                }
1817            }
1818        }
1819        None
1820    }
1821}
1822
1823/// Iterator that owns a PATCH and yields keys in key order.
1824pub struct PATCHIntoOrderedIterator<const KEY_LEN: usize, O: KeySchema<KEY_LEN>, V> {
1825    queue: Vec<Head<KEY_LEN, O, V>>,
1826    remaining: usize,
1827}
1828
1829impl<const KEY_LEN: usize, O: KeySchema<KEY_LEN>, V> Iterator
1830    for PATCHIntoOrderedIterator<KEY_LEN, O, V>
1831{
1832    type Item = [u8; KEY_LEN];
1833
1834    fn next(&mut self) -> Option<Self::Item> {
1835        let q = &mut self.queue;
1836        while let Some(mut head) = q.pop() {
1837            // Match the mutable body directly — we own `head` so calling
1838            // `body_mut()` is safe and allows returning the copied leaf key
1839            // or mutating the branch child table in-place.
1840            match head.body_mut() {
1841                BodyMut::Leaf(leaf) => {
1842                    self.remaining = self.remaining.saturating_sub(1);
1843                    return Some(leaf.key);
1844                }
1845                BodyMut::Branch(branch) => {
1846                    let slice: &mut [Option<Head<KEY_LEN, O, V>>] = &mut branch.child_table;
1847                    // Sort children by their byte-key, placing empty slots (None)
1848                    // after all occupied slots. Using `sort_unstable_by_key` with
1849                    // a simple key projection is clearer than a custom
1850                    // comparator; it also avoids allocating temporaries. The
1851                    // old comparator manually handled None/Some cases — we
1852                    // express that intent directly by sorting on the tuple
1853                    // (is_none, key_opt).
1854                    slice
1855                        .sort_unstable_by_key(|opt| (opt.is_none(), opt.as_ref().map(|h| h.key())));
1856                    for slot in slice.iter_mut().rev() {
1857                        if let Some(c) = slot.take() {
1858                            q.push(c);
1859                        }
1860                    }
1861                }
1862            }
1863        }
1864        None
1865    }
1866}
1867
1868impl<const KEY_LEN: usize, O: KeySchema<KEY_LEN>, V> IntoIterator for PATCH<KEY_LEN, O, V> {
1869    type Item = [u8; KEY_LEN];
1870    type IntoIter = PATCHIntoIterator<KEY_LEN, O, V>;
1871
1872    fn into_iter(self) -> Self::IntoIter {
1873        let remaining = self.len().min(usize::MAX as u64) as usize;
1874        let mut q = Vec::new();
1875        if let Some(root) = self.root {
1876            q.push(root);
1877        }
1878        PATCHIntoIterator {
1879            queue: q,
1880            remaining,
1881        }
1882    }
1883}
1884
1885impl<const KEY_LEN: usize, O: KeySchema<KEY_LEN>, V> PATCH<KEY_LEN, O, V> {
1886    /// Consume and return an iterator that yields keys in key order.
1887    pub fn into_iter_ordered(self) -> PATCHIntoOrderedIterator<KEY_LEN, O, V> {
1888        let remaining = self.len().min(usize::MAX as u64) as usize;
1889        let mut q = Vec::new();
1890        if let Some(root) = self.root {
1891            q.push(root);
1892        }
1893        PATCHIntoOrderedIterator {
1894            queue: q,
1895            remaining,
1896        }
1897    }
1898}
1899
1900impl<'a, const KEY_LEN: usize, O: KeySchema<KEY_LEN>, V> Iterator
1901    for PATCHOrderedIterator<'a, KEY_LEN, O, V>
1902{
1903    type Item = &'a [u8; KEY_LEN];
1904
1905    fn next(&mut self) -> Option<Self::Item> {
1906        let mut level = self.stack.last_mut()?;
1907        loop {
1908            if let Some(child) = level.pop() {
1909                match child.body_ref() {
1910                    BodyRef::Leaf(_) => {
1911                        self.remaining = self.remaining.saturating_sub(1);
1912                        return Some(child.childleaf_key());
1913                    }
1914                    BodyRef::Branch(branch) => {
1915                        self.stack.push(ArrayVec::new());
1916                        level = self.stack.last_mut()?;
1917                        level.extend(branch.child_table.iter().filter_map(|c| c.as_ref()));
1918                        level.sort_unstable_by_key(|&k| Reverse(k.key())); // We need to reverse here because we pop from the vec.
1919                    }
1920                }
1921            } else {
1922                self.stack.pop();
1923                level = self.stack.last_mut()?;
1924            }
1925        }
1926    }
1927
1928    fn size_hint(&self) -> (usize, Option<usize>) {
1929        (self.remaining, Some(self.remaining))
1930    }
1931}
1932
1933impl<'a, const KEY_LEN: usize, O: KeySchema<KEY_LEN>, V> ExactSizeIterator
1934    for PATCHOrderedIterator<'a, KEY_LEN, O, V>
1935{
1936}
1937
1938impl<'a, const KEY_LEN: usize, O: KeySchema<KEY_LEN>, V> std::iter::FusedIterator
1939    for PATCHOrderedIterator<'a, KEY_LEN, O, V>
1940{
1941}
1942
1943/// An iterator over all keys in a PATCH that have a given prefix.
1944/// The keys are returned in tree ordering and in tree order.
1945pub struct PATCHPrefixIterator<
1946    'a,
1947    const KEY_LEN: usize,
1948    const PREFIX_LEN: usize,
1949    O: KeySchema<KEY_LEN>,
1950    V,
1951> {
1952    stack: Vec<ArrayVec<&'a Head<KEY_LEN, O, V>, 256>>,
1953}
1954
1955impl<'a, const KEY_LEN: usize, const PREFIX_LEN: usize, O: KeySchema<KEY_LEN>, V>
1956    PATCHPrefixIterator<'a, KEY_LEN, PREFIX_LEN, O, V>
1957{
1958    fn new(patch: &'a PATCH<KEY_LEN, O, V>) -> Self {
1959        const {
1960            assert!(PREFIX_LEN <= KEY_LEN);
1961        }
1962        let mut r = PATCHPrefixIterator {
1963            stack: Vec::with_capacity(PREFIX_LEN),
1964        };
1965        if let Some(root) = &patch.root {
1966            r.stack.push(ArrayVec::new());
1967            if root.end_depth() >= PREFIX_LEN {
1968                r.stack[0].push(root);
1969            } else {
1970                let BodyRef::Branch(branch) = root.body_ref() else {
1971                    unreachable!();
1972                };
1973                let first_level = &mut r.stack[0];
1974                first_level.extend(branch.child_table.iter().filter_map(|c| c.as_ref()));
1975                first_level.sort_unstable_by_key(|&k| Reverse(k.key())); // We need to reverse here because we pop from the vec.
1976            }
1977        }
1978        r
1979    }
1980}
1981
1982impl<'a, const KEY_LEN: usize, const PREFIX_LEN: usize, O: KeySchema<KEY_LEN>, V> Iterator
1983    for PATCHPrefixIterator<'a, KEY_LEN, PREFIX_LEN, O, V>
1984{
1985    type Item = ([u8; PREFIX_LEN], u64);
1986
1987    fn next(&mut self) -> Option<Self::Item> {
1988        let mut level = self.stack.last_mut()?;
1989        loop {
1990            if let Some(child) = level.pop() {
1991                if child.end_depth() >= PREFIX_LEN {
1992                    let key = O::tree_ordered(child.childleaf_key());
1993                    let suffix_count = child.count();
1994                    return Some((key[0..PREFIX_LEN].try_into().unwrap(), suffix_count));
1995                } else {
1996                    let BodyRef::Branch(branch) = child.body_ref() else {
1997                        unreachable!();
1998                    };
1999                    self.stack.push(ArrayVec::new());
2000                    level = self.stack.last_mut()?;
2001                    level.extend(branch.child_table.iter().filter_map(|c| c.as_ref()));
2002                    level.sort_unstable_by_key(|&k| Reverse(k.key())); // We need to reverse here because we pop from the vec.
2003                }
2004            } else {
2005                self.stack.pop();
2006                level = self.stack.last_mut()?;
2007            }
2008        }
2009    }
2010}
2011
2012#[cfg(test)]
2013mod tests {
2014    use super::*;
2015    use itertools::Itertools;
2016    use proptest::prelude::*;
2017    use std::collections::HashSet;
2018    use std::convert::TryInto;
2019    use std::iter::FromIterator;
2020    use std::mem;
2021
2022    #[test]
2023    fn head_tag() {
2024        let head = Head::<64, IdentitySchema, ()>::new::<Leaf<64, ()>>(0, NonNull::dangling());
2025        assert_eq!(head.tag(), HeadTag::Leaf);
2026        mem::forget(head);
2027    }
2028
2029    #[test]
2030    fn head_key() {
2031        for k in 0..=255 {
2032            let head = Head::<64, IdentitySchema, ()>::new::<Leaf<64, ()>>(k, NonNull::dangling());
2033            assert_eq!(head.key(), k);
2034            mem::forget(head);
2035        }
2036    }
2037
2038    #[test]
2039    fn head_size() {
2040        assert_eq!(mem::size_of::<Head<64, IdentitySchema, ()>>(), 8);
2041    }
2042
2043    #[test]
2044    fn option_head_size() {
2045        assert_eq!(mem::size_of::<Option<Head<64, IdentitySchema, ()>>>(), 8);
2046    }
2047
2048    #[test]
2049    fn empty_tree() {
2050        let _tree = PATCH::<64, IdentitySchema, ()>::new();
2051    }
2052
2053    #[test]
2054    fn tree_put_one() {
2055        const KEY_SIZE: usize = 64;
2056        let mut tree = PATCH::<KEY_SIZE, IdentitySchema, ()>::new();
2057        let entry = Entry::new(&[0; KEY_SIZE]);
2058        tree.insert(&entry);
2059    }
2060
2061    #[test]
2062    fn tree_clone_one() {
2063        const KEY_SIZE: usize = 64;
2064        let mut tree = PATCH::<KEY_SIZE, IdentitySchema, ()>::new();
2065        let entry = Entry::new(&[0; KEY_SIZE]);
2066        tree.insert(&entry);
2067        let _clone = tree.clone();
2068    }
2069
2070    #[test]
2071    fn tree_put_same() {
2072        const KEY_SIZE: usize = 64;
2073        let mut tree = PATCH::<KEY_SIZE, IdentitySchema, ()>::new();
2074        let entry = Entry::new(&[0; KEY_SIZE]);
2075        tree.insert(&entry);
2076        tree.insert(&entry);
2077    }
2078
2079    #[test]
2080    fn tree_replace_existing() {
2081        const KEY_SIZE: usize = 64;
2082        let key = [1u8; KEY_SIZE];
2083        let mut tree = PATCH::<KEY_SIZE, IdentitySchema, u32>::new();
2084        let entry1 = Entry::with_value(&key, 1);
2085        tree.insert(&entry1);
2086        let entry2 = Entry::with_value(&key, 2);
2087        tree.replace(&entry2);
2088        assert_eq!(tree.get(&key), Some(&2));
2089    }
2090
2091    #[test]
2092    fn tree_replace_childleaf_updates_branch() {
2093        const KEY_SIZE: usize = 64;
2094        let key1 = [0u8; KEY_SIZE];
2095        let key2 = [1u8; KEY_SIZE];
2096        let mut tree = PATCH::<KEY_SIZE, IdentitySchema, u32>::new();
2097        let entry1 = Entry::with_value(&key1, 1);
2098        let entry2 = Entry::with_value(&key2, 2);
2099        tree.insert(&entry1);
2100        tree.insert(&entry2);
2101        let entry1b = Entry::with_value(&key1, 3);
2102        tree.replace(&entry1b);
2103        assert_eq!(tree.get(&key1), Some(&3));
2104        assert_eq!(tree.get(&key2), Some(&2));
2105    }
2106
2107    #[test]
2108    fn update_child_refreshes_childleaf_on_replace() {
2109        const KEY_SIZE: usize = 4;
2110        let mut tree = PATCH::<KEY_SIZE, IdentitySchema, u32>::new();
2111
2112        let key1 = [0u8; KEY_SIZE];
2113        let key2 = [1u8; KEY_SIZE];
2114        tree.insert(&Entry::with_value(&key1, 1));
2115        tree.insert(&Entry::with_value(&key2, 2));
2116
2117        // Determine which child currently provides the branch childleaf.
2118        let root_ref = tree.root.as_ref().expect("root exists");
2119        let before_childleaf = *root_ref.childleaf_key();
2120
2121        // Find the slot key (the byte index used in the branch table) for the child
2122        // that currently provides the childleaf.
2123        let slot_key = match root_ref.body_ref() {
2124            BodyRef::Branch(branch) => branch
2125                .child_table
2126                .iter()
2127                .filter_map(|c| c.as_ref())
2128                .find(|c| c.childleaf_key() == &before_childleaf)
2129                .expect("child exists")
2130                .key(),
2131            BodyRef::Leaf(_) => panic!("root should be a branch"),
2132        };
2133
2134        // Replace that child with a new leaf that has a different childleaf key.
2135        let new_key = [2u8; KEY_SIZE];
2136        {
2137            let mut ed = crate::patch::branch::BranchMut::from_slot(&mut tree.root);
2138            ed.modify_child(slot_key, |_| {
2139                Some(Entry::with_value(&new_key, 42).leaf::<IdentitySchema>())
2140            });
2141            // drop(ed) commits
2142        }
2143
2144        let after = tree.root.as_ref().expect("root exists");
2145        assert_eq!(after.childleaf_key(), &new_key);
2146    }
2147
2148    #[test]
2149    fn remove_childleaf_updates_branch() {
2150        const KEY_SIZE: usize = 4;
2151        let mut tree = PATCH::<KEY_SIZE, IdentitySchema, u32>::new();
2152
2153        let key1 = [0u8; KEY_SIZE];
2154        let key2 = [1u8; KEY_SIZE];
2155        tree.insert(&Entry::with_value(&key1, 1));
2156        tree.insert(&Entry::with_value(&key2, 2));
2157
2158        let childleaf_before = *tree.root.as_ref().unwrap().childleaf_key();
2159        // remove the leaf that currently provides the branch.childleaf
2160        tree.remove(&childleaf_before);
2161
2162        // Ensure the removed key is gone and the other key remains and is now the childleaf.
2163        let other = if childleaf_before == key1 { key2 } else { key1 };
2164        assert_eq!(tree.get(&childleaf_before), None);
2165        assert_eq!(tree.get(&other), Some(&2u32));
2166        let after_childleaf = tree.root.as_ref().unwrap().childleaf_key();
2167        assert_eq!(after_childleaf, &other);
2168    }
2169
2170    #[test]
2171    fn remove_collapses_branch_to_single_child() {
2172        const KEY_SIZE: usize = 4;
2173        let mut tree = PATCH::<KEY_SIZE, IdentitySchema, u32>::new();
2174
2175        let key1 = [0u8; KEY_SIZE];
2176        let key2 = [1u8; KEY_SIZE];
2177        tree.insert(&Entry::with_value(&key1, 1));
2178        tree.insert(&Entry::with_value(&key2, 2));
2179
2180        // Remove one key and ensure the root collapses to the remaining child.
2181        tree.remove(&key1);
2182        assert_eq!(tree.get(&key1), None);
2183        assert_eq!(tree.get(&key2), Some(&2u32));
2184        let root = tree.root.as_ref().expect("root exists");
2185        match root.body_ref() {
2186            BodyRef::Leaf(_) => {}
2187            BodyRef::Branch(_) => panic!("root should have collapsed to a leaf"),
2188        }
2189    }
2190
2191    #[test]
2192    fn branch_size() {
2193        assert_eq!(
2194            mem::size_of::<Branch<64, IdentitySchema, [Option<Head<64, IdentitySchema, ()>>; 2], ()>>(
2195            ),
2196            64
2197        );
2198        assert_eq!(
2199            mem::size_of::<Branch<64, IdentitySchema, [Option<Head<64, IdentitySchema, ()>>; 4], ()>>(
2200            ),
2201            48 + 16 * 2
2202        );
2203        assert_eq!(
2204            mem::size_of::<Branch<64, IdentitySchema, [Option<Head<64, IdentitySchema, ()>>; 8], ()>>(
2205            ),
2206            48 + 16 * 4
2207        );
2208        assert_eq!(
2209            mem::size_of::<
2210                Branch<64, IdentitySchema, [Option<Head<64, IdentitySchema, ()>>; 16], ()>,
2211            >(),
2212            48 + 16 * 8
2213        );
2214        assert_eq!(
2215            mem::size_of::<
2216                Branch<64, IdentitySchema, [Option<Head<32, IdentitySchema, ()>>; 32], ()>,
2217            >(),
2218            48 + 16 * 16
2219        );
2220        assert_eq!(
2221            mem::size_of::<
2222                Branch<64, IdentitySchema, [Option<Head<64, IdentitySchema, ()>>; 64], ()>,
2223            >(),
2224            48 + 16 * 32
2225        );
2226        assert_eq!(
2227            mem::size_of::<
2228                Branch<64, IdentitySchema, [Option<Head<64, IdentitySchema, ()>>; 128], ()>,
2229            >(),
2230            48 + 16 * 64
2231        );
2232        assert_eq!(
2233            mem::size_of::<
2234                Branch<64, IdentitySchema, [Option<Head<64, IdentitySchema, ()>>; 256], ()>,
2235            >(),
2236            48 + 16 * 128
2237        );
2238    }
2239
2240    /// Checks what happens if we join two PATCHes that
2241    /// only contain a single element each, that differs in the last byte.
2242    #[test]
2243    fn tree_union_single() {
2244        const KEY_SIZE: usize = 8;
2245        let mut left = PATCH::<KEY_SIZE, IdentitySchema, ()>::new();
2246        let mut right = PATCH::<KEY_SIZE, IdentitySchema, ()>::new();
2247        let left_entry = Entry::new(&[0, 0, 0, 0, 0, 0, 0, 0]);
2248        let right_entry = Entry::new(&[0, 0, 0, 0, 0, 0, 0, 1]);
2249        left.insert(&left_entry);
2250        right.insert(&right_entry);
2251        left.union(right);
2252        assert_eq!(left.len(), 2);
2253    }
2254
2255    // Small unit tests that ensure BranchMut-based editing is used by
2256    // the higher-level set operations like intersect/difference. These are
2257    // ordinary unit tests (not proptest) and must appear outside the
2258    // `proptest!` macro below.
2259
2260    proptest! {
2261        #[test]
2262        fn tree_insert(keys in prop::collection::vec(prop::collection::vec(0u8..=255, 64), 1..1024)) {
2263            let mut tree = PATCH::<64, IdentitySchema, ()>::new();
2264            for key in keys {
2265                let key: [u8; 64] = key.try_into().unwrap();
2266                let entry = Entry::new(&key);
2267                tree.insert(&entry);
2268            }
2269        }
2270
2271        #[test]
2272        fn tree_len(keys in prop::collection::vec(prop::collection::vec(0u8..=255, 64), 1..1024)) {
2273            let mut tree = PATCH::<64, IdentitySchema, ()>::new();
2274            let mut set = HashSet::new();
2275            for key in keys {
2276                let key: [u8; 64] = key.try_into().unwrap();
2277                let entry = Entry::new(&key);
2278                tree.insert(&entry);
2279                set.insert(key);
2280            }
2281
2282            prop_assert_eq!(set.len() as u64, tree.len())
2283        }
2284
2285        #[test]
2286        fn tree_infixes(keys in prop::collection::vec(prop::collection::vec(0u8..=255, 64), 1..1024)) {
2287            let mut tree = PATCH::<64, IdentitySchema, ()>::new();
2288            let mut set = HashSet::new();
2289            for key in keys {
2290                let key: [u8; 64] = key.try_into().unwrap();
2291                let entry = Entry::new(&key);
2292                tree.insert(&entry);
2293                set.insert(key);
2294            }
2295            let mut set_vec = Vec::from_iter(set.into_iter());
2296            let mut tree_vec = vec![];
2297            tree.infixes(&[0; 0], &mut |&x: &[u8; 64]| tree_vec.push(x));
2298
2299            set_vec.sort();
2300            tree_vec.sort();
2301
2302            prop_assert_eq!(set_vec, tree_vec);
2303        }
2304
2305        #[test]
2306        fn tree_iter(keys in prop::collection::vec(prop::collection::vec(0u8..=255, 64), 1..1024)) {
2307            let mut tree = PATCH::<64, IdentitySchema, ()>::new();
2308            let mut set = HashSet::new();
2309            for key in keys {
2310                let key: [u8; 64] = key.try_into().unwrap();
2311                let entry = Entry::new(&key);
2312                tree.insert(&entry);
2313                set.insert(key);
2314            }
2315            let mut set_vec = Vec::from_iter(set.into_iter());
2316            let mut tree_vec = vec![];
2317            for key in &tree {
2318                tree_vec.push(*key);
2319            }
2320
2321            set_vec.sort();
2322            tree_vec.sort();
2323
2324            prop_assert_eq!(set_vec, tree_vec);
2325        }
2326
2327        #[test]
2328        fn tree_union(left in prop::collection::vec(prop::collection::vec(0u8..=255, 64), 200),
2329                        right in prop::collection::vec(prop::collection::vec(0u8..=255, 64), 200)) {
2330            let mut set = HashSet::new();
2331
2332            let mut left_tree = PATCH::<64, IdentitySchema, ()>::new();
2333            for entry in left {
2334                let mut key = [0; 64];
2335                key.iter_mut().set_from(entry.iter().cloned());
2336                let entry = Entry::new(&key);
2337                left_tree.insert(&entry);
2338                set.insert(key);
2339            }
2340
2341            let mut right_tree = PATCH::<64, IdentitySchema, ()>::new();
2342            for entry in right {
2343                let mut key = [0; 64];
2344                key.iter_mut().set_from(entry.iter().cloned());
2345                let entry = Entry::new(&key);
2346                right_tree.insert(&entry);
2347                set.insert(key);
2348            }
2349
2350            left_tree.union(right_tree);
2351
2352            let mut set_vec = Vec::from_iter(set.into_iter());
2353            let mut tree_vec = vec![];
2354            left_tree.infixes(&[0; 0], &mut |&x: &[u8;64]| tree_vec.push(x));
2355
2356            set_vec.sort();
2357            tree_vec.sort();
2358
2359            prop_assert_eq!(set_vec, tree_vec);
2360            }
2361
2362        #[test]
2363        fn tree_union_empty(left in prop::collection::vec(prop::collection::vec(0u8..=255, 64), 2)) {
2364            let mut set = HashSet::new();
2365
2366            let mut left_tree = PATCH::<64, IdentitySchema, ()>::new();
2367            for entry in left {
2368                let mut key = [0; 64];
2369                key.iter_mut().set_from(entry.iter().cloned());
2370                let entry = Entry::new(&key);
2371                left_tree.insert(&entry);
2372                set.insert(key);
2373            }
2374
2375            let right_tree = PATCH::<64, IdentitySchema, ()>::new();
2376
2377            left_tree.union(right_tree);
2378
2379            let mut set_vec = Vec::from_iter(set.into_iter());
2380            let mut tree_vec = vec![];
2381            left_tree.infixes(&[0; 0], &mut |&x: &[u8;64]| tree_vec.push(x));
2382
2383            set_vec.sort();
2384            tree_vec.sort();
2385
2386            prop_assert_eq!(set_vec, tree_vec);
2387            }
2388
2389        // I got a feeling that we're not testing COW properly.
2390        // We should check if a tree remains the same after a clone of it
2391        // is modified by inserting new keys.
2392
2393    #[test]
2394    fn cow_on_insert(base_keys in prop::collection::vec(prop::collection::vec(0u8..=255, 8), 1..1024),
2395                         new_keys in prop::collection::vec(prop::collection::vec(0u8..=255, 8), 1..1024)) {
2396            // Note that we can't compare the trees directly, as that uses the hash,
2397            // which might not be affected by nodes in lower levels being changed accidentally.
2398            // Instead we need to iterate over the keys and check if they are the same.
2399
2400            let mut tree = PATCH::<8, IdentitySchema, ()>::new();
2401            for key in base_keys {
2402                let key: [u8; 8] = key[..].try_into().unwrap();
2403                let entry = Entry::new(&key);
2404                tree.insert(&entry);
2405            }
2406            let base_tree_content: Vec<[u8; 8]> = tree.iter().copied().collect();
2407
2408            let mut tree_clone = tree.clone();
2409            for key in new_keys {
2410                let key: [u8; 8] = key[..].try_into().unwrap();
2411                let entry = Entry::new(&key);
2412                tree_clone.insert(&entry);
2413            }
2414
2415            let new_tree_content: Vec<[u8; 8]> = tree.iter().copied().collect();
2416            prop_assert_eq!(base_tree_content, new_tree_content);
2417        }
2418
2419        #[test]
2420    fn cow_on_union(base_keys in prop::collection::vec(prop::collection::vec(0u8..=255, 8), 1..1024),
2421                         new_keys in prop::collection::vec(prop::collection::vec(0u8..=255, 8), 1..1024)) {
2422            // Note that we can't compare the trees directly, as that uses the hash,
2423            // which might not be affected by nodes in lower levels being changed accidentally.
2424            // Instead we need to iterate over the keys and check if they are the same.
2425
2426            let mut tree = PATCH::<8, IdentitySchema, ()>::new();
2427            for key in base_keys {
2428                let key: [u8; 8] = key[..].try_into().unwrap();
2429                let entry = Entry::new(&key);
2430                tree.insert(&entry);
2431            }
2432            let base_tree_content: Vec<[u8; 8]> = tree.iter().copied().collect();
2433
2434            let mut tree_clone = tree.clone();
2435            let mut new_tree = PATCH::<8, IdentitySchema, ()>::new();
2436            for key in new_keys {
2437                let key: [u8; 8] = key[..].try_into().unwrap();
2438                let entry = Entry::new(&key);
2439                new_tree.insert(&entry);
2440            }
2441            tree_clone.union(new_tree);
2442
2443            let new_tree_content: Vec<[u8; 8]> = tree.iter().copied().collect();
2444            prop_assert_eq!(base_tree_content, new_tree_content);
2445        }
2446    }
2447
2448    #[test]
2449    fn intersect_multiple_common_children_commits_branchmut() {
2450        const KEY_SIZE: usize = 4;
2451        let mut left = PATCH::<KEY_SIZE, IdentitySchema, u32>::new();
2452        let mut right = PATCH::<KEY_SIZE, IdentitySchema, u32>::new();
2453
2454        let a = [0u8, 0u8, 0u8, 1u8];
2455        let b = [0u8, 0u8, 0u8, 2u8];
2456        let c = [0u8, 0u8, 0u8, 3u8];
2457        let d = [2u8, 0u8, 0u8, 0u8];
2458        let e = [3u8, 0u8, 0u8, 0u8];
2459
2460        left.insert(&Entry::with_value(&a, 1));
2461        left.insert(&Entry::with_value(&b, 2));
2462        left.insert(&Entry::with_value(&c, 3));
2463        left.insert(&Entry::with_value(&d, 4));
2464
2465        right.insert(&Entry::with_value(&a, 10));
2466        right.insert(&Entry::with_value(&b, 11));
2467        right.insert(&Entry::with_value(&c, 12));
2468        right.insert(&Entry::with_value(&e, 13));
2469
2470        let res = left.intersect(&right);
2471        // A, B, C are common
2472        assert_eq!(res.len(), 3);
2473        assert!(res.get(&a).is_some());
2474        assert!(res.get(&b).is_some());
2475        assert!(res.get(&c).is_some());
2476    }
2477
2478    #[test]
2479    fn difference_multiple_children_commits_branchmut() {
2480        const KEY_SIZE: usize = 4;
2481        let mut left = PATCH::<KEY_SIZE, IdentitySchema, u32>::new();
2482        let mut right = PATCH::<KEY_SIZE, IdentitySchema, u32>::new();
2483
2484        let a = [0u8, 0u8, 0u8, 1u8];
2485        let b = [0u8, 0u8, 0u8, 2u8];
2486        let c = [0u8, 0u8, 0u8, 3u8];
2487        let d = [2u8, 0u8, 0u8, 0u8];
2488        let e = [3u8, 0u8, 0u8, 0u8];
2489
2490        left.insert(&Entry::with_value(&a, 1));
2491        left.insert(&Entry::with_value(&b, 2));
2492        left.insert(&Entry::with_value(&c, 3));
2493        left.insert(&Entry::with_value(&d, 4));
2494
2495        right.insert(&Entry::with_value(&a, 10));
2496        right.insert(&Entry::with_value(&b, 11));
2497        right.insert(&Entry::with_value(&c, 12));
2498        right.insert(&Entry::with_value(&e, 13));
2499
2500        let res = left.difference(&right);
2501        // left only has d
2502        assert_eq!(res.len(), 1);
2503        assert!(res.get(&d).is_some());
2504    }
2505
2506    #[test]
2507    fn difference_empty_left_is_empty() {
2508        const KEY_SIZE: usize = 4;
2509        let left = PATCH::<KEY_SIZE, IdentitySchema, u32>::new();
2510        let mut right = PATCH::<KEY_SIZE, IdentitySchema, u32>::new();
2511        let key = [1u8, 2u8, 3u8, 4u8];
2512        right.insert(&Entry::with_value(&key, 7));
2513
2514        let res = left.difference(&right);
2515        assert_eq!(res.len(), 0);
2516    }
2517
2518    #[test]
2519    fn difference_empty_right_returns_left() {
2520        const KEY_SIZE: usize = 4;
2521        let mut left = PATCH::<KEY_SIZE, IdentitySchema, u32>::new();
2522        let right = PATCH::<KEY_SIZE, IdentitySchema, u32>::new();
2523        let key = [1u8, 2u8, 3u8, 4u8];
2524        left.insert(&Entry::with_value(&key, 7));
2525
2526        let res = left.difference(&right);
2527        assert_eq!(res.len(), 1);
2528        assert!(res.get(&key).is_some());
2529    }
2530
2531    #[test]
2532    fn slot_edit_branchmut_insert_update() {
2533        // Small unit test demonstrating the Slot::edit -> BranchMut insert/update pattern.
2534        const KEY_SIZE: usize = 8;
2535        let mut tree = PATCH::<KEY_SIZE, IdentitySchema, u32>::new();
2536
2537        let entry1 = Entry::with_value(&[0u8; KEY_SIZE], 1u32);
2538        let entry2 = Entry::with_value(&[1u8; KEY_SIZE], 2u32);
2539        tree.insert(&entry1);
2540        tree.insert(&entry2);
2541        assert_eq!(tree.len(), 2);
2542
2543        // Edit the root slot in-place using the BranchMut editor.
2544        {
2545            let mut ed = crate::patch::branch::BranchMut::from_slot(&mut tree.root);
2546
2547            // Compute the insertion start depth first to avoid borrowing `ed` inside the closure.
2548            let start_depth = ed.end_depth as usize;
2549            let inserted = Entry::with_value(&[2u8; KEY_SIZE], 3u32)
2550                .leaf::<IdentitySchema>()
2551                .with_start(start_depth);
2552            let key = inserted.key();
2553
2554            ed.modify_child(key, |opt| match opt {
2555                Some(old) => Some(Head::insert_leaf(old, inserted, start_depth)),
2556                None => Some(inserted),
2557            });
2558            // BranchMut is dropped here and commits the updated branch pointer back into the head.
2559        }
2560
2561        assert_eq!(tree.len(), 3);
2562        assert_eq!(tree.get(&[2u8; KEY_SIZE]), Some(&3u32));
2563    }
2564}