spg-storage 7.9.28

In-memory storage primitives for SPG: values, rows, table schema, catalog with foreign-key constraints.
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
//! Persistent (structural-sharing) vector — the v4.38 building block for the
//! v4.39 cheap-`Catalog::clone` migration.
//!
//! `PersistentVec<T>` is a Bitmapped Vector Trie (Clojure persistent vector
//! shape): 32-way branching trie with a `tail` buffer at the open end. Every
//! mutating operation produces a new handle that shares interior nodes with
//! the old handle via `Arc`. `Clone` is `O(1)`; `push` and `get` are
//! `O(log₃₂ N)`; a `CoW` path touches only the spine of the affected leaf.
//!
//! Hard rules (do not relax in later milestones):
//! - `no_std` compatible (`alloc::sync::Arc`, `alloc::vec::Vec`).
//! - Zero `unsafe`. Workspace lint `unsafe_code = "deny"` stays in force here.
//! - Zero external deps. Pure std + `alloc`.
//!
//! Layout:
//! - `root: Arc<Node<T>>` — the persistent trie. `Node::Internal(Vec<Arc<Node>>)`
//!   for non-leaf levels, `Node::Leaf(Vec<T>)` for the bottom.
//! - `tail: Arc<Vec<T>>` — the open-end buffer (≤ 32 elements). Lives outside
//!   the trie so `push` to a non-full tail avoids walking the spine.
//! - `len: usize` — total element count (`trie_size + tail.len()`).
//! - `shift: u32` — distance from the root to the leaf level, in bits, in
//!   multiples of `SHIFT`. An empty PV has `shift = SHIFT` and an empty root
//!   so the first incorporate doesn't have to special-case the root type.
//!
//! Invariants (debug-asserted in hot paths):
//! - `tail.len() ≤ BRANCH`.
//! - When `tail.len() == BRANCH` we incorporate it into the trie before the
//!   next push (so post-condition is `tail.len() < BRANCH`, except briefly in
//!   the middle of `push`).
//! - `shift` is always a multiple of `SHIFT` and ≥ `SHIFT`.
//! - `trie_size = len - tail.len()` always fits in `1 << (shift + SHIFT)`.

use alloc::sync::Arc;
use alloc::vec::Vec;
use core::ops::Index;

const SHIFT: u32 = 5;
const BRANCH: usize = 1 << SHIFT; // 32
const MASK: usize = BRANCH - 1; // 0x1F

// `Clone` (v5.5.0) backs `Arc::make_mut` in `get_mut_in_trie`: cloning an
// `Internal` only bumps its children's `Arc`s (shallow), cloning a `Leaf`
// copies its ≤ BRANCH elements — exactly the path-copy a shared spine needs,
// matching what `set_in_trie` does by hand.
#[derive(Debug, Clone)]
enum Node<T> {
    Internal(Vec<Arc<Node<T>>>),
    Leaf(Vec<T>),
}

/// A persistent vector with structural sharing. `Clone` is O(1) (bumps the
/// root `Arc`); `push` is amortised O(log₃₂ N) and only allocates fresh nodes
/// along the spine from the root to the affected leaf.
#[derive(Debug)]
pub struct PersistentVec<T> {
    root: Arc<Node<T>>,
    tail: Arc<Vec<T>>,
    len: usize,
    shift: u32,
}

impl<T> Default for PersistentVec<T> {
    fn default() -> Self {
        Self::new()
    }
}

impl<T> Clone for PersistentVec<T> {
    /// O(1) — only `Arc` bumps, no element copy. This is the whole reason PV
    /// exists in v4.38; `Catalog::clone` in v4.39 inherits the property.
    fn clone(&self) -> Self {
        Self {
            root: self.root.clone(),
            tail: self.tail.clone(),
            len: self.len,
            shift: self.shift,
        }
    }
}

/// Element-wise equality: two PVs are equal iff they yield the same elements
/// in the same order. Independent of internal trie shape — two PVs built via
/// different push / set sequences with the same end state still compare
/// equal. Used by `Catalog::serialize` round-trip tests in v4.39+.
impl<T: PartialEq> PartialEq for PersistentVec<T> {
    fn eq(&self, other: &Self) -> bool {
        self.len == other.len && self.iter().eq(other.iter())
    }
}

impl<T: Eq> Eq for PersistentVec<T> {}

impl<T> PersistentVec<T> {
    /// Empty vector. Allocates one empty `Internal` root and one empty `tail`
    /// `Vec`; both are shared across every empty PV via `Arc::clone` once the
    /// first one is built. The shape matches a `shift = SHIFT` trie so the
    /// incorporate path never has to grow the root type.
    #[must_use]
    pub fn new() -> Self {
        Self {
            root: Arc::new(Node::Internal(Vec::new())),
            tail: Arc::new(Vec::new()),
            len: 0,
            shift: SHIFT,
        }
    }

    #[must_use]
    pub const fn len(&self) -> usize {
        self.len
    }

    #[must_use]
    pub const fn is_empty(&self) -> bool {
        self.len == 0
    }

    /// O(log₃₂ N). `None` for out-of-bounds. Returned reference is valid for
    /// the lifetime of `&self`; structural sharing means the borrow is
    /// independent of any other handle that shares the same spine.
    pub fn get(&self, i: usize) -> Option<&T> {
        if i >= self.len {
            return None;
        }
        let trie_size = self.len - self.tail.len();
        if i >= trie_size {
            return self.tail.get(i - trie_size);
        }
        let mut node: &Arc<Node<T>> = &self.root;
        let mut shift = self.shift;
        loop {
            match &**node {
                Node::Leaf(elems) => {
                    return elems.get(i & MASK);
                }
                Node::Internal(children) => {
                    let sub_idx = (i >> shift) & MASK;
                    node = children.get(sub_idx)?;
                    shift = shift.saturating_sub(SHIFT);
                }
            }
        }
    }

    /// Sequential iterator. Falls back to repeated `get` (O(N log N) total);
    /// callers that need a tight inner loop should switch to a hand-rolled
    /// leaf-batched walk. v4.38 keeps this simple — v4.39 / v4.40 will
    /// profile and upgrade if iter shows up as the bottleneck.
    pub fn iter(&self) -> Iter<'_, T> {
        Iter { pv: self, pos: 0 }
    }
}

impl<'a, T> IntoIterator for &'a PersistentVec<T> {
    type Item = &'a T;
    type IntoIter = Iter<'a, T>;
    fn into_iter(self) -> Self::IntoIter {
        self.iter()
    }
}

/// `pv[i]` indexing, matching `Vec<T>::index`'s contract: panics on
/// out-of-bounds. v4.39 lets `table.rows[i]` work unchanged on the new
/// PV-backed `Table` for the price of one extra `O(log₃₂ N)` walk per
/// lookup (vs Vec's O(1)). Callers in a hot loop should hoist the trie
/// walk where possible (`let row = pv.get(i)?;`) instead of re-indexing.
impl<T> Index<usize> for PersistentVec<T> {
    type Output = T;
    fn index(&self, i: usize) -> &T {
        self.get(i).expect("PersistentVec index out of bounds")
    }
}

impl<T: Clone> PersistentVec<T> {
    /// `O(log₃₂ N)` path-copy push. Returns a new handle; `self` is untouched
    /// (structural sharing means the old handle and the new one share every
    /// internal node except the spine to the newly written tail / leaf).
    #[must_use]
    pub fn push(&self, x: T) -> Self {
        // Fast path: tail still has room.
        if self.tail.len() < BRANCH {
            let mut new_tail = (*self.tail).clone();
            new_tail.push(x);
            return Self {
                root: self.root.clone(),
                tail: Arc::new(new_tail),
                len: self.len + 1,
                shift: self.shift,
            };
        }
        // Slow path: tail is full → incorporate it into the trie as a new
        // Leaf, then start a fresh tail with `x`.
        let leaf: Arc<Node<T>> = Arc::new(Node::Leaf((*self.tail).clone()));
        let old_trie_size = self.len - BRANCH; // tail.len() == BRANCH here
        let trie_capacity: usize = 1usize << (self.shift + SHIFT);
        let needs_grow = old_trie_size + BRANCH > trie_capacity;
        let (new_root, new_shift) = if needs_grow {
            // Root overflow: wrap the old root and a brand-new branch (carrying
            // the new leaf) under a fresh top-level Internal. The new branch
            // sits at the same depth the old root sat at, so it needs
            // `old_shift / SHIFT` layers of `Internal` above the leaf.
            let internal_levels_above_leaf = self.shift / SHIFT;
            let new_branch = new_path(internal_levels_above_leaf, leaf);
            let new_root = Arc::new(Node::Internal(alloc::vec![self.root.clone(), new_branch]));
            (new_root, self.shift + SHIFT)
        } else {
            (
                push_leaf_into_node(&self.root, self.shift, old_trie_size, leaf),
                self.shift,
            )
        };
        Self {
            root: new_root,
            tail: Arc::new(alloc::vec![x]),
            len: self.len + 1,
            shift: new_shift,
        }
    }

    /// `O(1)` amortized — transient in-place push. v4.39.1 perf path for the
    /// `Table::insert` hot loop (and any other streaming caller that holds a
    /// `&mut PersistentVec`). Uses `Arc::make_mut` on the tail buffer: when
    /// the tail's `Arc` is uniquely owned (the common case), this mutates
    /// in place — same cost as `Vec::push`. If a cloned handle is outstanding
    /// (e.g. inside a TX wrap holding a Catalog snapshot), the tail is path-
    /// copied just like `push` and the snapshot is unaffected. Either way,
    /// callers observe the same end state as `self = self.push(x)`.
    pub fn push_mut(&mut self, x: T) {
        if self.tail.len() < BRANCH {
            // Fast path: room in tail, mutate in place when uniquely owned.
            let tail = Arc::make_mut(&mut self.tail);
            tail.push(x);
            self.len += 1;
            return;
        }
        // Slow path: tail full → incorporate into trie, then start a fresh
        // tail with [x]. Take ownership of the tail Arc to reuse its Vec
        // when uniquely owned; the placeholder replacement makes self.tail
        // the fresh `[x]` buffer.
        let old_tail_arc = core::mem::replace(&mut self.tail, Arc::new(alloc::vec![x]));
        let old_tail_vec: Vec<T> =
            Arc::try_unwrap(old_tail_arc).unwrap_or_else(|arc| (*arc).clone());
        let leaf: Arc<Node<T>> = Arc::new(Node::Leaf(old_tail_vec));
        let old_trie_size = self.len - BRANCH;
        let trie_capacity: usize = 1usize << (self.shift + SHIFT);
        let needs_grow = old_trie_size + BRANCH > trie_capacity;
        if needs_grow {
            let internal_levels = self.shift / SHIFT;
            let new_branch = new_path(internal_levels, leaf);
            self.root = Arc::new(Node::Internal(alloc::vec![self.root.clone(), new_branch]));
            self.shift += SHIFT;
        } else {
            self.root = push_leaf_into_node(&self.root, self.shift, old_trie_size, leaf);
        }
        self.len += 1;
    }

    /// `O(log₃₂ N)` path-copy set. `None` for out-of-bounds (matches `get`).
    /// Result shares every node except the spine to the rewritten cell.
    #[must_use]
    pub fn set(&self, i: usize, x: T) -> Option<Self> {
        if i >= self.len {
            return None;
        }
        let trie_size = self.len - self.tail.len();
        if i >= trie_size {
            let mut new_tail: Vec<T> = (*self.tail).clone();
            new_tail[i - trie_size] = x;
            return Some(Self {
                root: self.root.clone(),
                tail: Arc::new(new_tail),
                len: self.len,
                shift: self.shift,
            });
        }
        let new_root = set_in_trie(&self.root, self.shift, i, x);
        Some(Self {
            root: new_root,
            tail: self.tail.clone(),
            len: self.len,
            shift: self.shift,
        })
    }

    /// `O(log₃₂ N)` transient-mut access — the read-side analogue of
    /// `push_mut` (v5.5.0). Walks the spine with `Arc::make_mut`: when every
    /// node along the path is uniquely owned (the common streaming case) the
    /// walk mutates in place at the same cost as `Vec::get_mut`. If a cloned
    /// handle shares the spine (e.g. a `Catalog` snapshot held by an open TX),
    /// the touched nodes are path-copied — the snapshot keeps its old value
    /// and only this handle observes the mutation, exactly like `set`. `None`
    /// for out-of-bounds (matches `get` / `set`).
    ///
    /// Introduced for the v5.5 HNSW `NswGraph` switch to PV-backed layers: the
    /// insert path needs in-place edits to a node's neighbour list
    /// (`layers[l].get_mut(node)`) without the `set`-then-write-back round trip
    /// and its extra path-copy.
    pub fn get_mut(&mut self, i: usize) -> Option<&mut T> {
        if i >= self.len {
            return None;
        }
        let trie_size = self.len - self.tail.len();
        if i >= trie_size {
            let tail = Arc::make_mut(&mut self.tail);
            return tail.get_mut(i - trie_size);
        }
        get_mut_in_trie(&mut self.root, self.shift, i)
    }
}

/// Push a freshly-built `Leaf` into the trie at trie-position `trie_index`.
/// Assumes the caller has already verified `trie_index < trie_capacity` (i.e.
/// `needs_grow == false`). `shift` is the shift at `node`; recursion drops
/// it by `SHIFT` per layer.
fn push_leaf_into_node<T: Clone>(
    node: &Arc<Node<T>>,
    shift: u32,
    trie_index: usize,
    leaf: Arc<Node<T>>,
) -> Arc<Node<T>> {
    let sub_idx = (trie_index >> shift) & MASK;
    let Node::Internal(children) = &**node else {
        // Bottom-of-trie is `Leaf`; we never recurse below `shift == SHIFT`.
        // Reaching a `Leaf` here would be a shift-bookkeeping bug.
        debug_assert!(false, "push_leaf_into_node hit a Leaf — shift bug");
        return node.clone();
    };
    let mut new_children: Vec<Arc<Node<T>>> = children.clone();
    if shift == SHIFT {
        // Next layer down is the Leaf layer — drop the new leaf in at the
        // open slot. Leaves are inserted in trie-index order, so `sub_idx`
        // is always either an existing index (replace — shouldn't happen
        // during push, only during set) or one past the end (append).
        debug_assert!(
            sub_idx == new_children.len(),
            "leaves are pushed sequentially; sub_idx {} != next slot {}",
            sub_idx,
            new_children.len()
        );
        new_children.push(leaf);
    } else {
        let child: Arc<Node<T>> = if sub_idx < new_children.len() {
            push_leaf_into_node(&new_children[sub_idx], shift - SHIFT, trie_index, leaf)
        } else {
            // Fresh branch: wrap the leaf in enough Internal layers to land
            // at the leaf level under this node's child.
            let internal_levels_above_leaf = (shift / SHIFT) - 1;
            new_path(internal_levels_above_leaf, leaf)
        };
        if sub_idx < new_children.len() {
            new_children[sub_idx] = child;
        } else {
            new_children.push(child);
        }
    }
    Arc::new(Node::Internal(new_children))
}

/// Build a chain of `internal_levels` `Internal` nodes wrapping `leaf`. With
/// `internal_levels == 0` the leaf is returned as-is.
fn new_path<T>(internal_levels: u32, leaf: Arc<Node<T>>) -> Arc<Node<T>> {
    let mut node = leaf;
    for _ in 0..internal_levels {
        node = Arc::new(Node::Internal(alloc::vec![node]));
    }
    node
}

/// Path-copy `set` walk. Returns a fresh `Arc<Node>` along the spine; every
/// other node is shared via `Arc::clone`.
fn set_in_trie<T: Clone>(node: &Arc<Node<T>>, shift: u32, i: usize, x: T) -> Arc<Node<T>> {
    match &**node {
        Node::Leaf(elems) => {
            let mut new_elems = elems.clone();
            new_elems[i & MASK] = x;
            Arc::new(Node::Leaf(new_elems))
        }
        Node::Internal(children) => {
            let sub_idx = (i >> shift) & MASK;
            let new_child = set_in_trie(&children[sub_idx], shift - SHIFT, i, x);
            let mut new_children = children.clone();
            new_children[sub_idx] = new_child;
            Arc::new(Node::Internal(new_children))
        }
    }
}

/// Copy-on-write `get_mut` walk (v5.5.0). `Arc::make_mut` clones a node only
/// when it's shared; a uniquely-owned spine is walked in place. Mirrors
/// `set_in_trie` but hands back a `&mut` to the located cell instead of
/// rewriting it, so the caller can mutate the element directly.
fn get_mut_in_trie<T: Clone>(node: &mut Arc<Node<T>>, shift: u32, i: usize) -> Option<&mut T> {
    match Arc::make_mut(node) {
        Node::Leaf(elems) => elems.get_mut(i & MASK),
        Node::Internal(children) => {
            let sub_idx = (i >> shift) & MASK;
            let child = children.get_mut(sub_idx)?;
            get_mut_in_trie(child, shift - SHIFT, i)
        }
    }
}

/// Sequential `&T` iterator. v4.38 implementation is `get(i)`-driven — simple
/// and correct, but O(N log N) over the whole vector. Profile in v4.39 /
/// v4.40 and upgrade if it shows up in flamegraphs.
#[derive(Debug)]
pub struct Iter<'a, T> {
    pv: &'a PersistentVec<T>,
    pos: usize,
}

impl<'a, T> Iterator for Iter<'a, T> {
    type Item = &'a T;
    fn next(&mut self) -> Option<&'a T> {
        if self.pos >= self.pv.len {
            return None;
        }
        let v = self.pv.get(self.pos)?;
        self.pos += 1;
        Some(v)
    }

    fn size_hint(&self) -> (usize, Option<usize>) {
        let remaining = self.pv.len.saturating_sub(self.pos);
        (remaining, Some(remaining))
    }
}

impl<T> ExactSizeIterator for Iter<'_, T> {}

#[cfg(test)]
impl<T> PersistentVec<T> {
    /// Test-only: do two handles share the same root + tail `Arc` — i.e. did
    /// `clone` bump pointers rather than copy elements? Used by v5.5.0's
    /// `nsw_clone_is_o1` to prove `NswGraph::clone` is O(1) structural sharing,
    /// not an O(N) element copy.
    pub(crate) fn shares_storage_with(&self, other: &Self) -> bool {
        Arc::ptr_eq(&self.root, &other.root) && Arc::ptr_eq(&self.tail, &other.tail)
    }
}

#[cfg(test)]
#[allow(
    clippy::cast_possible_truncation,
    clippy::cast_possible_wrap,
    clippy::cast_sign_loss,
    clippy::cast_lossless,
    clippy::needless_range_loop,
    clippy::items_after_statements,
    clippy::manual_range_patterns,
    clippy::unreadable_literal,
    clippy::similar_names
)]
mod tests {
    use super::*;

    #[test]
    fn empty_vec_is_empty() {
        let pv: PersistentVec<u64> = PersistentVec::new();
        assert_eq!(pv.len(), 0);
        assert!(pv.is_empty());
        assert!(pv.get(0).is_none());
    }

    #[test]
    fn push_single_fits_in_tail() {
        let pv: PersistentVec<u64> = PersistentVec::new().push(42);
        assert_eq!(pv.len(), 1);
        assert_eq!(pv.get(0), Some(&42));
        assert!(pv.get(1).is_none());
    }

    #[test]
    fn push_fills_tail_then_incorporates() {
        // 32 elements all sit in tail; 33rd triggers the first incorporate.
        let mut pv: PersistentVec<u64> = PersistentVec::new();
        for i in 0..40_u64 {
            pv = pv.push(i);
        }
        for i in 0..40_u64 {
            assert_eq!(pv.get(i as usize), Some(&i), "mismatch at {i}");
        }
        assert!(pv.get(40).is_none());
    }

    #[test]
    fn push_crosses_root_overflow_boundary() {
        // Crossing 1024 forces the first root grow (`shift` 5 → 10).
        let mut pv: PersistentVec<u64> = PersistentVec::new();
        for i in 0..1100_u64 {
            pv = pv.push(i);
        }
        for i in 0..1100_u64 {
            assert_eq!(pv.get(i as usize), Some(&i), "mismatch at {i}");
        }
    }

    #[test]
    fn push_crosses_second_grow_boundary() {
        // 32_768 forces the second root grow (`shift` 10 → 15). Verifies the
        // recursion in `push_leaf_into_node` handles a 3-deep trie.
        let mut pv: PersistentVec<u64> = PersistentVec::new();
        for i in 0..33_000_u64 {
            pv = pv.push(i);
        }
        // Spot-check a handful — the full 33k loop is too slow under cargo
        // test default mode; the 100K fuzz oracle covers thorough coverage.
        let probes = [0_usize, 1, 31, 32, 1023, 1024, 1056, 32_767, 32_768, 32_999];
        for &p in &probes {
            assert_eq!(pv.get(p), Some(&(p as u64)), "mismatch at {p}");
        }
        assert!(pv.get(33_000).is_none());
    }

    #[test]
    fn clone_then_push_preserves_original() {
        // The whole point of PV: pushing onto a clone must not mutate the
        // original handle.
        let mut a: PersistentVec<u64> = PersistentVec::new();
        for i in 0..50_u64 {
            a = a.push(i);
        }
        let b = a.clone();
        let b = b.push(999);
        assert_eq!(a.len(), 50);
        assert_eq!(b.len(), 51);
        assert_eq!(a.get(50), None);
        assert_eq!(b.get(50), Some(&999));
        // First 50 elements are visible from both handles.
        for i in 0..50_usize {
            assert_eq!(a.get(i), Some(&(i as u64)));
            assert_eq!(b.get(i), Some(&(i as u64)));
        }
    }

    #[test]
    fn set_rewrites_element_in_tail() {
        let pv: PersistentVec<u64> = PersistentVec::new()
            .push(10)
            .push(20)
            .push(30)
            .set(1, 200)
            .unwrap();
        assert_eq!(pv.get(0), Some(&10));
        assert_eq!(pv.get(1), Some(&200));
        assert_eq!(pv.get(2), Some(&30));
    }

    #[test]
    fn set_rewrites_element_in_trie() {
        // Need ≥ 33 elements so that position 0 lives in the trie, not tail.
        let mut pv: PersistentVec<u64> = PersistentVec::new();
        for i in 0..40_u64 {
            pv = pv.push(i);
        }
        let pv2 = pv.set(0, 9999).unwrap();
        assert_eq!(pv2.get(0), Some(&9999));
        assert_eq!(pv.get(0), Some(&0), "set must not mutate original");
        assert_eq!(pv2.get(39), Some(&39));
    }

    #[test]
    fn set_out_of_bounds_is_none() {
        let pv: PersistentVec<u64> = PersistentVec::new().push(1);
        assert!(pv.set(5, 99).is_none());
    }

    #[test]
    fn iter_matches_get_for_full_walk() {
        let mut pv: PersistentVec<u64> = PersistentVec::new();
        for i in 0..200_u64 {
            pv = pv.push(i * 7);
        }
        let via_iter: Vec<u64> = pv.iter().copied().collect();
        let via_get: Vec<u64> = (0..pv.len()).map(|i| *pv.get(i).unwrap()).collect();
        assert_eq!(via_iter, via_get);
        assert_eq!(via_iter.len(), 200);
        assert_eq!(via_iter[199], 199 * 7);
    }

    #[test]
    fn iter_size_hint_exact() {
        let mut pv: PersistentVec<u64> = PersistentVec::new();
        for i in 0..15_u64 {
            pv = pv.push(i);
        }
        let it = pv.iter();
        assert_eq!(it.size_hint(), (15, Some(15)));
        assert_eq!(it.count(), 15);
    }

    /// SplitMix-style PRNG so the fuzz oracle is reproducible without pulling
    /// `rand` in. Same mixer the NSW level assignment uses upstream.
    struct Splitmix(u64);
    impl Splitmix {
        fn new(seed: u64) -> Self {
            Self(seed)
        }
        fn next(&mut self) -> u64 {
            self.0 = self.0.wrapping_add(0x9E37_79B9_7F4A_7C15);
            let mut x = self.0;
            x = (x ^ (x >> 30)).wrapping_mul(0xBF58_476D_1CE4_E5B9);
            x = (x ^ (x >> 27)).wrapping_mul(0x94D0_49BB_1331_11EB);
            x ^ (x >> 31)
        }
    }

    /// Random `push` / `set` / `get` operation sequence ≥ 100K steps mirrored
    /// against the std `Vec<u64>`. Confirms PV's mutating ops match the
    /// canonical ground-truth semantics across every BVT branch / tail
    /// boundary / root overflow.
    #[test]
    fn fuzz_oracle_against_vec_u64() {
        let mut pv: PersistentVec<u64> = PersistentVec::new();
        let mut oracle: Vec<u64> = Vec::new();
        let mut rng = Splitmix::new(0xC0FFEE_u64);
        const STEPS: usize = 100_000;
        for step in 0..STEPS {
            let r = rng.next();
            // Bias toward push so we actually grow the trie past the second
            // boundary (33k+ in ~80k pushes).
            let op = r % 4; // 0..2 push, 3 set
            match (op, oracle.len()) {
                (0 | 1 | 2, _) | (_, 0) => {
                    let val = rng.next();
                    pv = pv.push(val);
                    oracle.push(val);
                }
                (3, n) => {
                    let idx = (rng.next() as usize) % n;
                    let val = rng.next();
                    pv = pv.set(idx, val).expect("in-bounds set");
                    oracle[idx] = val;
                }
                _ => unreachable!(),
            }
            // Cheap step-end check: head + tail + a sampled interior cell.
            assert_eq!(pv.len(), oracle.len(), "len drift @ step {step}");
            if !oracle.is_empty() {
                assert_eq!(pv.get(0), oracle.first(), "head drift @ step {step}");
                assert_eq!(
                    pv.get(oracle.len() - 1),
                    oracle.last(),
                    "tail drift @ step {step}"
                );
                let probe = (rng.next() as usize) % oracle.len();
                assert_eq!(
                    pv.get(probe),
                    Some(&oracle[probe]),
                    "interior drift @ step {step}, probe {probe}"
                );
            }
        }
        // Final exhaustive sweep — every element must match.
        for i in 0..oracle.len() {
            assert_eq!(pv.get(i), Some(&oracle[i]), "final drift at {i}");
        }
        // And `iter` must traverse them in order.
        let via_iter: Vec<u64> = pv.iter().copied().collect();
        assert_eq!(via_iter, oracle, "iter drift");
    }

    /// Clone-isolation: build PV A, branch into B and C from a midpoint, mutate
    /// each independently, and verify each handle reads back its own mutations
    /// without leaking into the others.
    #[test]
    fn fuzz_oracle_clone_isolation() {
        let mut a: PersistentVec<u64> = PersistentVec::new();
        let mut oracle_a: Vec<u64> = Vec::new();
        let mut rng = Splitmix::new(0xDECAFBAD_u64);
        for _ in 0..2_000 {
            let v = rng.next();
            a = a.push(v);
            oracle_a.push(v);
        }
        // Branch.
        let mut b = a.clone();
        let mut oracle_b = oracle_a.clone();
        let mut c = a.clone();
        let mut oracle_c = oracle_a.clone();
        // Mutate B and C independently.
        for _ in 0..500 {
            let v = rng.next();
            b = b.push(v);
            oracle_b.push(v);
        }
        for _ in 0..300 {
            let idx = (rng.next() as usize) % oracle_c.len();
            let v = rng.next();
            c = c.set(idx, v).expect("in-bounds");
            oracle_c[idx] = v;
        }
        // Each handle must match its own oracle, end to end.
        for (i, &want) in oracle_a.iter().enumerate() {
            assert_eq!(a.get(i), Some(&want), "A drift at {i}");
        }
        for (i, &want) in oracle_b.iter().enumerate() {
            assert_eq!(b.get(i), Some(&want), "B drift at {i}");
        }
        for (i, &want) in oracle_c.iter().enumerate() {
            assert_eq!(c.get(i), Some(&want), "C drift at {i}");
        }
        assert_eq!(a.len(), oracle_a.len());
        assert_eq!(b.len(), oracle_b.len());
        assert_eq!(c.len(), oracle_c.len());
    }

    /// v4.39.1: `push_mut` fuzz oracle. Same shape as the `push` oracle but
    /// drives the in-place transient path so every BVT branch / tail boundary
    /// / root overflow is hit under `Arc::make_mut`. Confirms the optimization
    /// preserves the canonical `Vec<u64>` ground-truth.
    #[test]
    fn fuzz_oracle_push_mut_against_vec_u64() {
        let mut pv: PersistentVec<u64> = PersistentVec::new();
        let mut oracle: Vec<u64> = Vec::new();
        let mut rng = Splitmix::new(0xFEEDFACE_u64);
        const STEPS: usize = 100_000;
        for step in 0..STEPS {
            let val = rng.next();
            pv.push_mut(val);
            oracle.push(val);
            assert_eq!(pv.len(), oracle.len(), "len drift @ step {step}");
            if step % 1024 == 0 {
                // Cheap spot-check; full sweep at end.
                let probe = (rng.next() as usize) % oracle.len();
                assert_eq!(
                    pv.get(probe),
                    Some(&oracle[probe]),
                    "interior drift @ step {step}, probe {probe}"
                );
            }
        }
        for i in 0..oracle.len() {
            assert_eq!(pv.get(i), Some(&oracle[i]), "final drift at {i}");
        }
    }

    /// v4.39.1: critical invariant — when a `Clone`d handle B exists and the
    /// original A calls `push_mut(x)`, B's view is **not** affected (the
    /// `Arc::make_mut` tail-clone keeps the immutable contract). Without
    /// this guarantee the v4.34 BEGIN..COMMIT wrap (which holds a Catalog
    /// snapshot) would see writes leak across the snapshot boundary.
    #[test]
    fn push_mut_does_not_disturb_cloned_handle() {
        let mut a: PersistentVec<u64> = PersistentVec::new();
        for i in 0..200_u64 {
            a.push_mut(i);
        }
        let b = a.clone();
        // A pushes through the tail boundary multiple times.
        for i in 200_u64..500 {
            a.push_mut(i);
        }
        assert_eq!(b.len(), 200);
        for i in 0..200_u64 {
            assert_eq!(b.get(i as usize), Some(&i), "B drift at {i}");
        }
        assert!(b.get(200).is_none());
        assert_eq!(a.len(), 500);
        for i in 0..500_u64 {
            assert_eq!(a.get(i as usize), Some(&i), "A drift at {i}");
        }
    }

    #[test]
    fn push_clone_arc_count_stays_constant_in_old_handle() {
        // Smoke check that v4.38's O(1) clone really is Arc bumps: push 200
        // elements, take 5 clones, drop them all, verify the original is
        // unchanged. (No way to assert Arc strong_count here without exposing
        // internals — we just verify the original reads back correctly,
        // which is the property that actually matters.)
        let mut a: PersistentVec<u64> = PersistentVec::new();
        for i in 0..200_u64 {
            a = a.push(i);
        }
        let snapshots: Vec<PersistentVec<u64>> = (0..5).map(|_| a.clone()).collect();
        drop(snapshots);
        for i in 0..200_u64 {
            assert_eq!(a.get(i as usize), Some(&i));
        }
        assert_eq!(a.len(), 200);
    }
}