coordinode-lsm-tree 5.2.0

Embedded LSM-tree storage engine: BuRR filters, zstd dictionary compression, MVCC, range tombstones, merge operators, K/V separation, AES-256-GCM at rest.
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
// SPDX-License-Identifier: Apache-2.0
// Copyright (c) 2026-present, Structured World Foundation

//! Knuth tournament (loser) tree for `k`-way merge.
//!
//! A loser tree solves the same problem as a binary heap for merging
//! sorted inputs, but with one fewer comparison per `replace_min` and
//! a cache-friendlier memory layout. The winner sits at the root
//! (`tree[0]`); each internal node stores the **loser** of the game
//! played at that node. To replace the winner, only the path from its
//! leaf back up to the root needs to be replayed — `log₂(n)` comparisons
//! instead of the `2 log₂(n)` a binary heap would need (one per level,
//! comparing against the loser already stored there rather than against
//! both children).
//!
//! This module's `LoserTree<E, F>` is generic over the element type and
//! the comparator function (no `Arc<dyn Trait>` indirection — the
//! comparator is monomorphised per use site for zero-cost dispatch).
//!
//! # Layout
//!
//! `leaves[0..cap]` holds the current value for each input slot. `cap`
//! is `n.next_power_of_two().max(2)`, so the tournament always has a
//! complete binary tree above it. Slots beyond `n` are permanently
//! `None`, acting as `+∞` sentinels that never win.
//!
//! `tree[0]` stores the index (into `leaves`) of the overall winner.
//! `tree[1..cap]` stores the loser index at each internal node. The
//! conventional `2cap`-sized embedded-tree layout maps to our `cap`-sized
//! `tree` like this:
//!
//! - leaf `L`'s first internal-node parent lives at index `(cap + L) / 2`
//! - each parent's parent is `idx >> 1`
//! - the loop terminates at `idx == 1` (root); the final winner is
//!   stored at `tree[0]`
//!
//! # Sentinel handling
//!
//! Each leaf is logically `Option<E>` — present or sentinel. The
//! physical representation uses two parallel arrays:
//! `leaves: Vec<MaybeUninit<E>>` for the values and
//! `present: Vec<u8>` for the discriminants (one byte per slot,
//! `1` = present, `0` = absent). `Vec<u8>` is used over `Vec<bool>`
//! to make the byte-per-element layout self-evident from the type
//! and avoid any ambiguity about discriminant packing — the array
//! is a flat byte map by design.
//!
//! This eliminates the `Option` discriminant branch that LLVM
//! emits inside `cmp_indices` on every game (called O(log cap)
//! times per merger step); the present-byte fetch is a single
//! contiguous load + zero-compare that branch-predicts perfectly
//! under steady-state merging (where leaves stay present until
//! exhaustion). `cmp_indices` treats absent leaves (`present[i]
//! == 0`) as strictly greater than any present value, so
//! exhausted sources naturally lose every game.
//!
//! All `MaybeUninit::assume_init_*` calls are guarded by a check
//! on the corresponding `present[i]` byte and never widen the
//! unsafe contract beyond what the byte-map already guarantees.
//! `Drop` walks the byte-map and drops in place.

use alloc::vec::Vec;
use core::cmp::Ordering;
use core::mem::MaybeUninit;

/// Comparator strategy used by the tournament.
///
/// This trait exists instead of a plain `Fn(&E, &E) -> Ordering`
/// bound so callers can pass a concrete struct (e.g.
/// `SeekingMerger`'s `MinCmp<C>` / `MaxCmp<C>`) that
/// monomorphises through the type system. A blanket impl
/// forwards every `Fn` closure to this trait, so test code and
/// internal helpers that already use `|a, b| ...` closures
/// keep working without change.
///
/// **Why not just `Fn`:** wrapping the closure in `Box<dyn Fn>`
/// to satisfy a non-generic field type adds one indirect call per
/// `cmp_indices` invocation — and `cmp_indices` runs O(log cap)
/// times per `replace_min` / `pop_min` step, on the merger's
/// hottest path. With this trait, the comparator's concrete
/// type stays visible to LLVM, so the call inlines flat.
#[diagnostic::on_unimplemented(
    message = "`{Self}` is not a comparator for `LoserTree<{E}, _>`",
    label = "missing `EntryComparator<{E}>` impl",
    note = "implement `EntryComparator<{E}>` directly, or pass a closure of \
            type `Fn(&{E}, &{E}) -> core::cmp::Ordering` — a blanket impl \
            forwards every such closure to this trait automatically"
)]
pub trait EntryComparator<E> {
    /// Compare two leaf values. Smaller wins in the tournament;
    /// to get max-semantics pass a reversed comparator.
    fn compare(&self, a: &E, b: &E) -> Ordering;
}

/// Blanket impl so existing `Fn(&E, &E) -> Ordering` closures
/// (tests, ad-hoc callers) satisfy the trait without rewriting.
/// `#[inline(always)]` propagates the closure's body into the
/// call site at the same cost as a direct call — the indirection
/// only existed when the closure was hidden behind `Box<dyn Fn>`,
/// which this trait specifically avoids by keeping the comparator
/// type concrete at every monomorphisation.
#[diagnostic::do_not_recommend]
impl<E, F> EntryComparator<E> for F
where
    F: Fn(&E, &E) -> Ordering,
{
    #[expect(
        clippy::inline_always,
        reason = "blanket forwarder must inline or the indirection this trait \
                  eliminates comes back; verified flat in disassembly"
    )]
    #[inline(always)]
    fn compare(&self, a: &E, b: &E) -> Ordering {
        (self)(a, b)
    }
}

/// A min-tournament tree over `n` input slots.
///
/// The "min" comes from how `cmp` is interpreted: the leaf whose value
/// is `Ordering::Less` than every other leaf wins. To get max-semantics,
/// pass a reversed comparator (`|a, b| cmp(a, b).reverse()`).
pub struct LoserTree<E, F> {
    /// Current value per source slot, untagged. Only meaningful
    /// when the matching `present[i]` byte is non-zero. Length is
    /// `cap` (padded to the next power of two ≥ 2); the trailing
    /// `cap - n_sources` slots are permanent sentinels with
    /// `present[i] == 0`.
    leaves: Vec<MaybeUninit<E>>,
    /// Discriminant byte for `leaves[i]`. `1` ⇒ initialised; `0` ⇒
    /// uninit (exhausted or padding sentinel). Walking this
    /// contiguous byte-map alongside `leaves` replaces the
    /// `Option` discriminant the previous layout carried inline.
    /// `u8` (not `bool`) is chosen so the byte-per-slot layout
    /// is unambiguous from the type itself.
    present: Vec<u8>,
    /// Tree of size `cap`. `tree[0]` = winner leaf index; `tree[1..cap]`
    /// = loser leaf index at each internal node.
    tree: Vec<usize>,
    /// Number of source slots originally supplied. Less than or equal
    /// to `leaves.len()` (= `cap`). Exposed via [`Self::slots`].
    n_sources: usize,
    /// Count of present leaves. When zero the tree is empty.
    active: usize,
    /// Comparator. Captures source-order tie-break and any other
    /// merge-precedence rules the caller wants.
    cmp: F,
}

impl<E: core::fmt::Debug, F> core::fmt::Debug for LoserTree<E, F> {
    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
        // Print only the structural shape — never expose leaf
        // bytes through Debug, since they sit behind MaybeUninit
        // and may be uninitialised at any given moment. The
        // `E: Debug` bound stays on the impl so this type can be
        // nested inside other `#[derive(Debug)]` structs without
        // breaking their derive expansion; we just don't use it.
        f.debug_struct("LoserTree")
            .field("n_sources", &self.n_sources)
            .field("active", &self.active)
            .field("cap", &self.leaves.len())
            .finish_non_exhaustive()
    }
}

impl<E, F> Drop for LoserTree<E, F> {
    fn drop(&mut self) {
        // Drop each present leaf in place. The MaybeUninit storage
        // doesn't drop on its own; without this loop, owned values
        // (Strings, Boxes, Arcs, ...) inside present leaves would
        // leak when the tree itself is dropped.
        #[expect(
            clippy::indexing_slicing,
            reason = "leaves.len() == present.len() by construction (set in build())"
        )]
        for i in 0..self.leaves.len() {
            if self.present[i] != 0 {
                // SAFETY: `present[i] != 0` is the LoserTree
                // invariant for `leaves[i]` being initialised.
                unsafe { self.leaves[i].assume_init_drop() };
            }
        }
    }
}

impl<E, F> LoserTree<E, F>
where
    F: EntryComparator<E>,
{
    /// Build a tournament over `initial` (one entry per source slot).
    ///
    /// `None` entries are treated as exhausted/sentinel. After
    /// construction, `peek_min()` returns the smallest `Some` value
    /// across all slots.
    ///
    /// O(n) work: each internal node is visited exactly once during
    /// the bottom-up build.
    pub fn build(initial: Vec<Option<E>>, cmp: F) -> Self {
        let n = initial.len();
        let cap = n.next_power_of_two().max(2);

        // Split the Option<E> input into parallel (MaybeUninit, bool)
        // arrays. Trailing padding past `n` is uninit + absent.
        let mut leaves: Vec<MaybeUninit<E>> = Vec::with_capacity(cap);
        let mut present: Vec<u8> = Vec::with_capacity(cap);
        let mut active = 0_usize;
        for item in initial {
            if let Some(v) = item {
                leaves.push(MaybeUninit::new(v));
                present.push(1);
                active += 1;
            } else {
                leaves.push(MaybeUninit::uninit());
                present.push(0);
            }
        }
        while leaves.len() < cap {
            leaves.push(MaybeUninit::uninit());
            present.push(0);
        }
        let tree = alloc::vec![0; cap];

        let mut t = Self {
            leaves,
            present,
            tree,
            n_sources: n,
            active,
            cmp,
        };
        t.build_subtree(1, 0, cap);
        t
    }

    /// Number of source slots `n` originally supplied (not `cap`).
    ///
    /// Reflects the *capacity* of the tournament; some slots may be
    /// exhausted (`None`) at any given moment.
    #[inline]
    #[expect(
        dead_code,
        reason = "part of the slot-set protocol, used by future callers"
    )]
    pub fn slots(&self) -> usize {
        self.n_sources
    }

    /// Whether every slot is exhausted (no present leaves).
    #[expect(
        clippy::inline_always,
        reason = "called from winner_slot() on every merger step; forcing cross-crate inlining \
                  for the bench compilation unit measurably tightens the hot loop"
    )]
    #[inline(always)]
    pub fn is_empty(&self) -> bool {
        self.active == 0
    }

    /// Number of slots still holding a value.
    #[inline]
    #[cfg_attr(
        not(test),
        expect(
            dead_code,
            reason = "diagnostic accessor used by unit tests and future callers"
        )
    )]
    pub fn active_count(&self) -> usize {
        self.active
    }

    /// Slot index of the current overall winner, or `None` if empty.
    ///
    /// Useful for callers (like the LSM merger) that need to know
    /// *which* source produced the current minimum, so the next item
    /// from that source can be pulled in.
    #[expect(
        clippy::inline_always,
        reason = "hot per-step routine; cross-crate inlining for benches"
    )]
    #[inline(always)]
    #[expect(
        clippy::indexing_slicing,
        reason = "tree[0] always exists: cap >= 2 by construction"
    )]
    pub fn winner_slot(&self) -> Option<usize> {
        if self.is_empty() {
            None
        } else {
            Some(self.tree[0])
        }
    }

    /// Borrow the current overall minimum, or `None` if empty.
    #[inline]
    #[expect(
        clippy::indexing_slicing,
        reason = "idx from winner_slot() is always < leaves.len()"
    )]
    #[cfg_attr(
        not(test),
        expect(
            dead_code,
            reason = "exposed for callers that want to inspect the winner without popping; \
                      seeking_merger reads the winner via winner_slot() since the slot index \
                      equals the source index by construction"
        )
    )]
    pub fn peek_min(&self) -> Option<&E> {
        let idx = self.winner_slot()?;
        if self.present[idx] == 0 {
            return None;
        }
        // SAFETY: `present[idx] != 0` is the LoserTree invariant
        // for `leaves[idx]` being initialised.
        Some(unsafe { self.leaves[idx].assume_init_ref() })
    }

    /// Replace the value at the winning slot with `new` and return the
    /// previous winner. Replays `log₂(cap)` games up the path from the
    /// winning leaf to the root.
    ///
    /// Caller-side contract: `new` must come from the **same source
    /// iterator** that produced the previous winner. Mixing slots
    /// breaks the merger's invariant that each slot reflects its own
    /// source's current item.
    ///
    /// # Panics
    ///
    /// Panics if the tree is empty (`peek_min().is_none()`). Callers
    /// must check `is_empty()` first or use [`Self::pop_min`] semantics.
    #[expect(
        clippy::expect_used,
        reason = "empty-tree panic is the documented contract"
    )]
    #[expect(
        clippy::indexing_slicing,
        reason = "slot from winner_slot() is always < leaves.len()"
    )]
    pub fn replace_min(&mut self, new: E) -> E {
        let slot = self
            .winner_slot()
            .expect("replace_min called on empty LoserTree");
        debug_assert!(
            self.present[slot] != 0,
            "LoserTree winner slot must be present when winner_slot() returns Some"
        );
        // SAFETY: `present[slot] != 0` (asserted by winner_slot
        // returning Some plus the LoserTree invariant). The old
        // value is read out and the new is written in its place;
        // `present[slot]` stays non-zero so no byte-map update
        // needed.
        let old = unsafe {
            core::mem::replace(&mut self.leaves[slot], MaybeUninit::new(new)).assume_init()
        };
        self.replay(slot);
        old
    }

    /// Pop the winning value and mark its slot exhausted. The next
    /// `peek_min()` will reflect the new winner (or `None` if this
    /// drained the last slot). O(log cap).
    #[expect(
        clippy::indexing_slicing,
        reason = "slot from winner_slot() is always < leaves.len()"
    )]
    pub fn pop_min(&mut self) -> Option<E> {
        let slot = self.winner_slot()?;
        debug_assert!(
            self.present[slot] != 0,
            "LoserTree winner slot must be present when winner_slot() returns Some"
        );
        // SAFETY: present[slot] != 0 (LoserTree invariant for the
        // winner slot). Reading via `replace(_, uninit)` returns
        // the initialised value and leaves the slot as
        // MaybeUninit::uninit; we then flip the byte-map so
        // subsequent code (cmp_indices, peek_min, Drop) sees it
        // as absent.
        let old = unsafe {
            core::mem::replace(&mut self.leaves[slot], MaybeUninit::uninit()).assume_init()
        };
        self.present[slot] = 0;
        self.active -= 1;
        self.replay(slot);
        Some(old)
    }

    /// Take the value at a specific slot (regardless of winner status)
    /// and mark the slot exhausted. The next `peek_min` reflects the
    /// updated tournament. Returns the taken value, or `None` if the
    /// slot was already exhausted (or out of range).
    ///
    /// This exists for the seeking-merger's cross-direction migration:
    /// when one tournament's source has run out but the OPPOSITE
    /// tournament still holds a buffered leaf for that source, the
    /// active direction takes it via this API rather than dropping
    /// it on the floor. O(log cap).
    #[expect(
        clippy::indexing_slicing,
        reason = "slot is checked < self.leaves.len() before indexing"
    )]
    pub fn take_slot(&mut self, slot: usize) -> Option<E> {
        if slot >= self.leaves.len() || self.present[slot] == 0 {
            return None;
        }
        // SAFETY: present[slot] != 0 (just checked above).
        let taken = unsafe {
            core::mem::replace(&mut self.leaves[slot], MaybeUninit::uninit()).assume_init()
        };
        self.present[slot] = 0;
        self.active -= 1;
        self.replay(slot);
        Some(taken)
    }

    // ---------- internals ----------

    /// Recursive bottom-up build. Each internal node at array index
    /// `node` covers leaves `[leaf_lo, leaf_hi)`. The function returns
    /// the slot index of the subtree's winner; the loser of the final
    /// game at `node` is stored in `self.tree[node]`.
    #[expect(
        clippy::indexing_slicing,
        reason = "node is always in 1..cap by recursive construction; tree[0] always in bounds"
    )]
    fn build_subtree(&mut self, node: usize, leaf_lo: usize, leaf_hi: usize) -> usize {
        if leaf_hi - leaf_lo == 1 {
            // Base case: single leaf, no internal node game here.
            // (`node` corresponds to a leaf "position" in the conceptual
            // 2*cap-array; we don't write to tree[] for leaves.)
            return leaf_lo;
        }
        let mid = usize::midpoint(leaf_lo, leaf_hi);
        let left_winner = self.build_subtree(2 * node, leaf_lo, mid);
        let right_winner = self.build_subtree(2 * node + 1, mid, leaf_hi);

        let (winner, loser) = match self.cmp_indices(left_winner, right_winner) {
            Ordering::Less | Ordering::Equal => (left_winner, right_winner),
            Ordering::Greater => (right_winner, left_winner),
        };
        self.tree[node] = loser;
        if node == 1 {
            // Root subtree finished — store the overall winner.
            self.tree[0] = winner;
        }
        winner
    }

    /// Replay the tournament path from `leaf`'s position back up to the
    /// root, updating `tree[0]` with the new overall winner. O(log cap).
    ///
    /// `#[inline(always)]` — this is the single hot per-step routine
    /// called from `replace_min` / `pop_min` / `take_slot` on every
    /// merger step. Forcing inlining lets the caller hoist
    /// `self.tree.as_ptr()` / `self.cmp` out of the loop and avoids
    /// the call-site branch-predictor cost; default `#[inline]` is
    /// only a hint and stops at crate boundaries (benches live in a
    /// separate compilation unit).
    #[expect(
        clippy::inline_always,
        reason = "the single hot per-merger-step routine; cross-crate inlining for benches"
    )]
    #[inline(always)]
    #[expect(
        clippy::indexing_slicing,
        reason = "node traverses 1..cap by construction; tree[0] always exists"
    )]
    fn replay(&mut self, leaf: usize) {
        let mut winner = leaf;
        // Parent index in the conceptual 2*cap array: (cap + leaf) / 2.
        // Use midpoint to avoid potential overflow on 32-bit targets.
        let mut node = usize::midpoint(self.leaves.len(), leaf);
        while node >= 1 {
            let other = self.tree[node];
            // Smaller wins. If the stored loser is actually smaller
            // than our running winner, swap them: the running winner
            // becomes the new loser stored at this node.
            if self.cmp_indices(other, winner) == Ordering::Less {
                self.tree[node] = winner;
                winner = other;
            }
            node >>= 1;
        }
        self.tree[0] = winner;
    }

    /// Compare leaves by slot index, treating absent slots as `+∞`.
    /// Absent always loses to present; absent vs absent returns
    /// `Equal`.
    ///
    /// `#[inline(always)]` — called O(log cap) times per `replay()`
    /// step, which is itself the hot per-merger-step routine. The
    /// present-bit branch is a single byte load + predictable
    /// compare; inlining lets LLVM fuse consecutive checks across
    /// the replay loop.
    #[expect(
        clippy::inline_always,
        reason = "called O(log cap) per replay; cross-crate inlining for benches"
    )]
    #[inline(always)]
    #[expect(
        clippy::indexing_slicing,
        reason = "callers (build_subtree, replay) only pass slot indices < cap; \
                  present.len() == leaves.len() == cap by construction"
    )]
    fn cmp_indices(&self, a: usize, b: usize) -> Ordering {
        match (self.present[a] != 0, self.present[b] != 0) {
            (true, true) => {
                // SAFETY: both bytes non-zero ⇒ both leaves
                // initialised (LoserTree invariant maintained by
                // build/replace/pop/take_slot).
                let va = unsafe { self.leaves[a].assume_init_ref() };
                let vb = unsafe { self.leaves[b].assume_init_ref() };
                self.cmp.compare(va, vb)
            }
            (true, false) => Ordering::Less,
            (false, true) => Ordering::Greater,
            (false, false) => Ordering::Equal,
        }
    }
}

#[cfg(test)]
#[expect(clippy::unwrap_used, reason = "test assertions")]
#[expect(
    clippy::trivially_copy_pass_by_ref,
    reason = "cmp_u32 signature mirrors the Fn(&E, &E) -> Ordering trait"
)]
#[expect(
    clippy::cast_possible_truncation,
    reason = "test sizes are small (n <= 33)"
)]
mod tests {
    // Test names below are short forms (e.g. `empty_tree`,
    // `drain_in_order`) — the project convention requires
    // `<what>_<condition>_<expected>` for top-level tests, OR
    // short names inside a descriptively-named submodule. The
    // descriptively-named submodule path is what's exercised
    // here: `loser_tree::tests::<short_name>` reads as "loser
    // tree's <short_name> test", which gives the short forms the
    // missing `<what>` context.
    use super::*;
    use test_log::test;

    fn cmp_u32(a: &u32, b: &u32) -> Ordering {
        a.cmp(b)
    }

    fn collect<F: Fn(&u32, &u32) -> Ordering>(mut t: LoserTree<u32, F>) -> Vec<u32> {
        let mut out = Vec::new();
        while let Some(v) = t.pop_min() {
            out.push(v);
        }
        out
    }

    #[test]
    fn empty_tree() {
        let t: LoserTree<u32, fn(&u32, &u32) -> Ordering> =
            LoserTree::build(alloc::vec![None, None, None], cmp_u32);
        assert!(t.is_empty());
        assert_eq!(t.active_count(), 0);
        assert_eq!(t.peek_min(), None);
        assert_eq!(t.winner_slot(), None);
    }

    #[test]
    fn single_slot() {
        let mut t = LoserTree::build(alloc::vec![Some(42_u32)], cmp_u32);
        assert!(!t.is_empty());
        assert_eq!(t.peek_min(), Some(&42));
        assert_eq!(t.pop_min(), Some(42));
        assert!(t.is_empty());
        assert_eq!(t.pop_min(), None);
    }

    #[test]
    fn drain_in_order() {
        // 4 sources, each with a distinct value.
        let t = LoserTree::build(alloc::vec![Some(3_u32), Some(1), Some(4), Some(2)], cmp_u32);
        assert_eq!(collect(t), [1, 2, 3, 4]);
    }

    #[test]
    fn non_pow2_padding() {
        // 5 slots → cap = 8. Sentinels must not affect winner.
        let t = LoserTree::build(
            alloc::vec![Some(50_u32), Some(10), Some(40), Some(20), Some(30)],
            cmp_u32,
        );
        assert_eq!(collect(t), [10, 20, 30, 40, 50]);
    }

    #[test]
    fn replace_min_stays_winner_when_still_smallest() {
        // Slot 0 keeps yielding monotonically increasing values that
        // are still below everyone else.
        let mut t = LoserTree::build(
            alloc::vec![Some(1_u32), Some(100), Some(200), Some(300)],
            cmp_u32,
        );
        assert_eq!(t.replace_min(2), 1);
        assert_eq!(t.peek_min(), Some(&2));
        assert_eq!(t.winner_slot(), Some(0));
        assert_eq!(t.replace_min(3), 2);
        assert_eq!(t.peek_min(), Some(&3));
        assert_eq!(t.winner_slot(), Some(0));
    }

    #[test]
    fn replace_min_changes_winner() {
        let mut t = LoserTree::build(alloc::vec![Some(1_u32), Some(5), Some(3), Some(7)], cmp_u32);
        assert_eq!(t.winner_slot(), Some(0));
        // Replace slot 0's value with something larger than slot 2 (3)
        // but smaller than slot 1 (5).
        assert_eq!(t.replace_min(4), 1);
        assert_eq!(t.peek_min(), Some(&3)); // slot 2 wins now
        assert_eq!(t.winner_slot(), Some(2));
        assert_eq!(t.replace_min(6), 3);
        assert_eq!(t.peek_min(), Some(&4)); // slot 0 wins again with 4
        assert_eq!(t.winner_slot(), Some(0));
    }

    #[test]
    fn pop_min_then_drain() {
        let mut t = LoserTree::build(
            alloc::vec![Some(10_u32), Some(20), Some(5), Some(15)],
            cmp_u32,
        );
        assert_eq!(t.pop_min(), Some(5));
        assert_eq!(t.active_count(), 3);
        assert_eq!(t.peek_min(), Some(&10));
        assert_eq!(collect(t), [10, 15, 20]);
    }

    #[test]
    fn mixed_replace_and_pop() {
        // Drain interleaved: simulates a real merge where some sources
        // get exhausted partway through.
        let mut t = LoserTree::build(alloc::vec![Some(1_u32), Some(2), Some(3), Some(4)], cmp_u32);
        assert_eq!(t.replace_min(5), 1); // slot 0 now 5
        assert_eq!(t.replace_min(6), 2); // slot 1 now 6
        // Order should now be 3, 4, 5, 6.
        assert_eq!(t.pop_min(), Some(3));
        assert_eq!(t.pop_min(), Some(4));
        assert_eq!(t.pop_min(), Some(5));
        assert_eq!(t.pop_min(), Some(6));
        assert!(t.is_empty());
    }

    #[test]
    fn reverse_comparator_gives_max_tree() {
        // Same data, max-tree semantics via reversed cmp.
        let cmp = |a: &u32, b: &u32| b.cmp(a);
        let mut t = LoserTree::build(alloc::vec![Some(1_u32), Some(4), Some(2), Some(3)], cmp);
        assert_eq!(t.peek_min(), Some(&4)); // "min" under reversed cmp = max
        assert_eq!(t.pop_min(), Some(4));
        assert_eq!(t.pop_min(), Some(3));
        assert_eq!(t.pop_min(), Some(2));
        assert_eq!(t.pop_min(), Some(1));
    }

    #[test]
    fn deterministic_tiebreak_by_cmp() {
        // Ties go to whichever the comparator picks; we encode source
        // index via tuple so equal values resolve deterministically.
        let cmp = |a: &(u32, usize), b: &(u32, usize)| (a.0, a.1).cmp(&(b.0, b.1));
        let mut t = LoserTree::build(
            alloc::vec![Some((5_u32, 0)), Some((5, 1)), Some((5, 2)), Some((5, 3)),],
            cmp,
        );
        // All same key → slot 0 wins on tiebreak.
        assert_eq!(t.winner_slot(), Some(0));
        let mut order = Vec::new();
        while let Some((_, idx)) = t.pop_min() {
            order.push(idx);
        }
        assert_eq!(order, [0, 1, 2, 3]);
    }

    #[test]
    fn random_inputs_match_sorted_reference() {
        // Property-style: random multi-source data must drain in the
        // same order a sorted concatenation produces.
        use rand::SeedableRng;
        use rand::seq::SliceRandom;
        let mut rng = rand::rngs::StdRng::seed_from_u64(0xC0DE_F00D);
        for n in [1_usize, 2, 3, 7, 8, 9, 31, 32, 33] {
            for trial in 0..32 {
                let mut all: Vec<u32> = (0..(n as u32 * 4)).collect();
                all.shuffle(&mut rng);
                // Round-robin distribute into n buckets.
                let mut buckets: Vec<Vec<u32>> = (0..n).map(|_| Vec::new()).collect();
                for (i, v) in all.iter().enumerate() {
                    #[expect(clippy::indexing_slicing, reason = "i % n always < n")]
                    buckets[i % n].push(*v);
                }
                // Each bucket must be individually sorted (loser tree
                // assumes per-source sortedness, just like a merger).
                for b in &mut buckets {
                    b.sort_unstable();
                }
                // Snapshot the sorted reference before consuming buckets.
                let mut reference = all.clone();
                reference.sort_unstable();
                // Build tree from the first item of each bucket;
                // simulate refilling via replace_min until all drained.
                let mut iters: Vec<std::vec::IntoIter<u32>> =
                    buckets.into_iter().map(IntoIterator::into_iter).collect();
                let initial: Vec<Option<u32>> = iters.iter_mut().map(Iterator::next).collect();
                let mut t = LoserTree::build(initial, cmp_u32);
                let mut out = Vec::with_capacity(reference.len());
                while let Some(slot) = t.winner_slot() {
                    #[expect(clippy::indexing_slicing, reason = "slot < n by construction")]
                    if let Some(next_val) = iters[slot].next() {
                        out.push(t.replace_min(next_val));
                    } else {
                        out.push(t.pop_min().unwrap());
                    }
                }
                assert_eq!(out, reference, "n={n} trial={trial}");
            }
        }
    }
}