graphrefly-core 0.0.2

GraphReFly handle-protocol core dispatcher
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
//! Per-subgraph union-find registry (Slice X5, 2026-05-08).
//!
//! Direct port of [`graphrefly-py`'s
//! `subgraph_locks.py`](https://github.com/graphrefly/graphrefly-py/blob/main/src/graphrefly/core/subgraph_locks.py)
//! (locked design via [`SESSION-rust-port-d3-per-subgraph-parallelism.md`](https://github.com/graphrefly/graphrefly-ts/blob/main/archive/docs/SESSION-rust-port-d3-per-subgraph-parallelism.md)
//! — Q1 = (c-uf split-eager); see decision log D086).
//!
//! # Concept
//!
//! Each registered node is a member of exactly one connected
//! component (a "subgraph") at any moment. Two nodes are in the same
//! subgraph iff they're connected via dep edges (transitively).
//!
//! Connected-component membership is tracked via union-find with
//! union-by-rank + path compression. Each component's root carries
//! an [`Arc<SubgraphLockBox>`] that owns the partition's
//! `wave_owner` re-entrant mutex — nodes in the same component share
//! one lock, disjoint components run truly parallel.
//!
//! # Lifecycle hooks
//!
//! - [`SubgraphRegistry::ensure_registered`] — called when a node is
//!   first registered via [`crate::Core::register`]. Allocates a
//!   fresh singleton component for the new node.
//! - [`SubgraphRegistry::union_nodes`] — called for each new dep
//!   edge (in `register` and `set_deps`'s add-edge path). Merges
//!   the two endpoints' components.
//! - [`SubgraphRegistry::cleanup_node`] — wired but **not yet called
//!   in X5**. In Y1 the wave engine will invoke it from sites that
//!   actually remove a `NodeRecord` from `CoreState.nodes` (today
//!   `terminate_node` only marks the node terminal — `NodeRecord`s
//!   persist for the life of `CoreState`). The registry's HashMap
//!   entries drop together with `CoreState` when the last `Core`
//!   clone goes, so X5 doesn't leak partitions in practice; the
//!   per-node cleanup hook becomes load-bearing once Y1 lands a
//!   `Drop`-via-removal lifecycle for `NodeRecord` (post-Y1 work).
//! - [`SubgraphRegistry::on_edge_removed`] — called for each removed
//!   dep edge in `set_deps`'s remove path. Slice X5 commit-1 just
//!   notes the removal; **Y1 (split-eager)** adds the reachability
//!   walk that splits disconnected components.
//!
//! # Lock-acquisition discipline (Y1)
//!
//! [`SubgraphRegistry::lock_for`] returns the component's
//! [`Arc<SubgraphLockBox>`] — caller acquires `box.wave_owner` and
//! re-validates that the resolved root hasn't been redirected by a
//! concurrent `union` (mirrors py's `MAX_LOCK_RETRIES` retry loop).
//!
//! # Slice X5 scope
//!
//! Substrate types + registry tracking are wired to `Core::register`
//! and `Core::set_deps`'s edge add path so the union-find state is
//! maintained as nodes register and topology changes. The wave engine
//! itself still uses the legacy Core-level `wave_owner` — Y1 (the
//! wave-engine migration to per-partition `wave_owner`) is carried
//! forward as an explicit follow-on slice given its scope (every
//! `begin_batch` / `run_wave` call site + `Core::subscribe` lock
//! acquisition + `BatchGuard` lock retention + retry-validate
//! semantics for held-Arc-vs-current-root divergence on union).
//!
//! Several substrate methods (`cleanup_node`, `lock_for`,
//! `lock_for_validate`) and the `SubgraphLockBox::wave_owner` field
//! are wired but unused in X5; they're annotated per-item with
//! `#[allow(dead_code)]` until Y1 activates the wave engine through
//! them. (Per-item rather than module-level shotgun so any genuinely
//! unused new item still flags as dead code during X5 follow-up
//! review.)

use std::collections::{HashMap, HashSet};
use std::sync::Arc;

use parking_lot::ReentrantMutex;

use crate::handle::NodeId;

/// Newtype identifier for a connected-component partition.
///
/// Internally a `u64` (the union-find root's `NodeId.raw()`). Distinct
/// from [`NodeId`] at the type system level — partitions and nodes are
/// not interchangeable.
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord)]
pub struct SubgraphId(pub(crate) u64);

impl SubgraphId {
    /// Construct from a [`NodeId`] root. Internal use — partition
    /// identity is the union-find root's NodeId.
    #[must_use]
    pub(crate) fn from_node(node: NodeId) -> Self {
        Self(node.raw())
    }

    /// Raw u64 view. Used for total-ordering across multi-partition
    /// wave acquisitions per Q4=(a) (deadlock-free via ascending
    /// SubgraphId order).
    #[must_use]
    pub fn raw(self) -> u64 {
        self.0
    }
}

impl std::fmt::Display for SubgraphId {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        write!(f, "subgraph#{}", self.0)
    }
}

/// Per-component lock box. Holds the partition's `wave_owner`
/// re-entrant mutex. On `union`, the box of the smaller-rank root is
/// replaced by the larger-rank root's box — the lock identity is
/// preserved across merges via the [`Arc`] reference (mirrors py's
/// `_LockBox.lock` redirect on union).
///
/// **Slice X5 commit-1:** the `wave_owner` field is allocated but
/// not yet used by the wave engine. Y1 / commit-2 migrates
/// `Core::subscribe` + `Core::begin_batch` to acquire this per-
/// partition lock instead of the legacy `Core::wave_owner`.
pub struct SubgraphLockBox {
    /// Per-partition wave ownership. Cross-thread emits to the same
    /// partition serialize here; same-thread re-entry passes through.
    /// Wrapped in `Arc` at the registry level so two roots' boxes
    /// can share the same mutex identity after `union`.
    ///
    /// **Slice X5 status:** allocated but unused — Y1 wires the wave
    /// engine through it.
    #[allow(dead_code)]
    pub(crate) wave_owner: Arc<ReentrantMutex<()>>,
}

impl SubgraphLockBox {
    fn new() -> Arc<Self> {
        Arc::new(Self {
            wave_owner: Arc::new(ReentrantMutex::new(())),
        })
    }
}

/// Default cap for [`SubgraphRegistry::lock_for`]'s root-validation
/// retry loop. Mirrors graphrefly-py's `_MAX_LOCK_RETRIES` constant
/// (`subgraph_locks.py` line 39). Reached only under continuous
/// `union` activity racing with `lock_for` — pathological in
/// practice but bounded for safety.
///
/// **Slice X5 status:** unused — Y1 wires the wave engine retry loop
/// against this cap.
#[allow(dead_code)]
pub(crate) const MAX_LOCK_RETRIES: u32 = 100;

/// Union-find registry tracking each node's connected-component
/// membership. Wrapped by [`crate::Core`] in `Arc<Mutex<...>>` so
/// the wave engine can resolve a node's partition before acquiring
/// that partition's `wave_owner`.
pub struct SubgraphRegistry {
    /// `node_id → parent_id` union-find array. A root has
    /// `parent[root] == root`.
    parent: HashMap<NodeId, NodeId>,
    /// `node_id → rank` for union-by-rank. Only meaningful for roots.
    rank: HashMap<NodeId, u32>,
    /// Reverse map for cleanup-time re-rooting: `node_id → set of
    /// direct children whose `parent` field points at `node_id``.
    /// On root cleanup, we promote one child to root and re-attach
    /// the others.
    children: HashMap<NodeId, HashSet<NodeId>>,
    /// `root_node_id → Arc<SubgraphLockBox>`. Only roots have entries.
    /// On `union`, the loser's entry is removed and any future
    /// `lock_for` calls on its members find the winner's box via
    /// `find`.
    boxes: HashMap<NodeId, Arc<SubgraphLockBox>>,
}

impl SubgraphRegistry {
    /// Construct an empty registry. `pub(crate)` because all useful
    /// operations on the registry are crate-internal — only `Core::new`
    /// needs to construct one. External callers receive
    /// [`SubgraphId`]s via [`Core::partition_of`] but cannot construct
    /// a registry from outside the crate.
    #[must_use]
    pub(crate) fn new() -> Self {
        Self {
            parent: HashMap::new(),
            rank: HashMap::new(),
            children: HashMap::new(),
            boxes: HashMap::new(),
        }
    }

    /// Register `node` as the root of a fresh singleton component.
    /// Idempotent — calling twice on the same node is a no-op.
    ///
    /// Called from [`crate::Core::register`] right after the new
    /// `NodeId` is allocated, BEFORE any `union_nodes` calls for the
    /// node's deps.
    pub(crate) fn ensure_registered(&mut self, node: NodeId) {
        if self.parent.contains_key(&node) {
            return;
        }
        self.parent.insert(node, node);
        self.rank.insert(node, 0);
        self.children.insert(node, HashSet::new());
        self.boxes.insert(node, SubgraphLockBox::new());
    }

    /// Find the root of `node`'s component, with path compression.
    /// Panics if `node` is not registered.
    ///
    /// **Iterative two-pass implementation** (Slice X5 /qa):
    /// pass 1 walks up the parent chain to the root; pass 2 re-links
    /// every node on the path directly to the root + maintains the
    /// `children` reverse-map. Iterative form mirrors the existing
    /// `terminate_node` cascade discipline (and graphrefly-py's
    /// `_find_locked` while-loop) — keeps stack depth O(1) regardless
    /// of pre-compression chain length, avoiding stack overflow on
    /// pathological un-compressed trees that can briefly form before
    /// path compression settles (especially relevant once Y1 puts
    /// `find` on the hot path under `lock_for`).
    #[must_use]
    pub(crate) fn find(&mut self, node: NodeId) -> NodeId {
        // Pass 1: walk up to root (no mutation).
        let mut cur = node;
        loop {
            let parent = *self
                .parent
                .get(&cur)
                .expect("subgraph_registry::find: node not registered");
            if parent == cur {
                break; // cur is root
            }
            cur = parent;
        }
        let root = cur;

        // Pass 2: path compression — re-link every node on the original
        // walk directly to root, maintaining the `children` reverse-map.
        // Skip when already pointing at root (no-op for root itself, and
        // for already-compressed nodes).
        let mut walker = node;
        while walker != root {
            let parent = *self
                .parent
                .get(&walker)
                .expect("walker on path-to-root must be registered");
            if parent != root {
                self.parent.insert(walker, root);
                if let Some(old_kids) = self.children.get_mut(&parent) {
                    old_kids.remove(&walker);
                }
                self.children.entry(root).or_default().insert(walker);
            }
            walker = parent;
        }
        root
    }

    /// Merge the components containing `a` and `b`. Both nodes must
    /// already be registered. After this call, `find(a) == find(b)`.
    ///
    /// Union-by-rank: the smaller-rank root becomes a child of the
    /// larger. Equal-rank breaks the tie by promoting `a`'s root and
    /// bumping its rank.
    ///
    /// On merge, the loser's [`SubgraphLockBox`] entry is removed
    /// from `boxes`. Future `lock_for` calls on the loser's members
    /// resolve to the winner's root → winner's box.
    pub(crate) fn union_nodes(&mut self, a: NodeId, b: NodeId) {
        debug_assert!(
            self.parent.contains_key(&a) && self.parent.contains_key(&b),
            "union_nodes: both nodes must be registered first"
        );
        // Defense-in-depth against bypassed cycle detection: a self-edge
        // `set_deps(n, &[n])` would reach here as `union_nodes(n, n)`.
        // Cycle rejection in `Core::set_deps` should catch this BEFORE
        // we get called, but the registry has no other defense.
        debug_assert!(
            a != b,
            "union_nodes called with self-edge — \
             Core's cycle detection bypassed?"
        );
        let mut root_a = self.find(a);
        let mut root_b = self.find(b);
        if root_a == root_b {
            return;
        }
        let rank_a = *self.rank.get(&root_a).unwrap_or(&0);
        let rank_b = *self.rank.get(&root_b).unwrap_or(&0);
        if rank_a < rank_b {
            std::mem::swap(&mut root_a, &mut root_b);
        }
        // root_a is the winner (kept). root_b becomes a child.
        self.parent.insert(root_b, root_a);
        self.children.entry(root_a).or_default().insert(root_b);
        if rank_a == rank_b {
            self.rank.insert(root_a, rank_a + 1);
        }
        // Drop the loser's box. Any in-flight readers holding an Arc
        // clone keep it alive; the registry no longer references it.
        self.boxes.remove(&root_b);
    }

    /// Remove `node` from the registry. If `node` was a root, promote
    /// one of its direct children as the new root and re-link the
    /// others. Mirrors py's `_on_gc` (`subgraph_locks.py` lines 53–92).
    ///
    /// **Slice X5 status:** wired but NOT called from Core. Today
    /// `Core::terminate_node` marks a node terminal but does NOT remove
    /// it from `CoreState.nodes` (NodeRecords persist for the life of
    /// `CoreState`); the registry's HashMap entries drop together with
    /// `CoreState` when the last `Core` clone goes, so the partition
    /// state is reclaimed without an explicit per-node cleanup. Y1
    /// will revisit this — when the wave engine activates per-partition
    /// `wave_owner`, terminated nodes' partition entries should be
    /// purged so a stale partition doesn't keep its `Arc<SubgraphLockBox>`
    /// alive in the registry. Idempotent on unregistered nodes.
    #[allow(dead_code)]
    pub(crate) fn cleanup_node(&mut self, node: NodeId) {
        let Some(parent) = self.parent.get(&node).copied() else {
            return; // already cleaned up — idempotent.
        };

        let direct_children: Vec<NodeId> = self
            .children
            .get(&node)
            .map(|s| s.iter().copied().collect())
            .unwrap_or_default();

        if parent == node {
            // `node` was the root.
            if let Some(&new_root) = direct_children.first() {
                self.parent.insert(new_root, new_root);
                // Detach `node` from each grandchild's children-set first
                // (before borrowing children mut for the new_root entry).
                for child in &direct_children {
                    if let Some(kids) = self.children.get_mut(child) {
                        kids.remove(&node);
                    }
                }
                // Re-attach all children except `new_root` to `new_root`.
                let new_root_kids = self.children.entry(new_root).or_default();
                for child in direct_children.iter().skip(1).copied() {
                    self.parent.insert(child, new_root);
                    new_root_kids.insert(child);
                }
                // Box ownership transfers to the new root — same Arc, so
                // any in-flight `lock_for` holders keep their guard alive.
                if let Some(box_arc) = self.boxes.remove(&node) {
                    self.boxes.insert(new_root, box_arc);
                }
                let old_rank = self.rank.get(&node).copied().unwrap_or(0);
                let new_rank = self.rank.entry(new_root).or_insert(0);
                if old_rank > *new_rank {
                    *new_rank = old_rank;
                }
            } else {
                // Singleton root — just remove the box.
                self.boxes.remove(&node);
            }
        } else {
            // `node` was a non-root. Detach from its parent's children;
            // re-attach its own children to its parent.
            if let Some(parent_kids) = self.children.get_mut(&parent) {
                parent_kids.remove(&node);
                for child in &direct_children {
                    parent_kids.insert(*child);
                }
            }
            for child in &direct_children {
                self.parent.insert(*child, parent);
            }
        }

        self.children.remove(&node);
        self.parent.remove(&node);
        self.rank.remove(&node);
        // boxes already removed above on root path; non-root path never
        // had a box entry.
    }

    /// Hook for an edge removal. Slice X5 commit-1: notes the removal
    /// but does NOT split (monotonic-merge stepping stone). Y1
    /// commit-2 adds the reachability walk to detect disconnected
    /// components and split them into fresh `SubgraphId`s.
    pub(crate) fn on_edge_removed(&mut self, _from: NodeId, _to: NodeId) {
        // X5 commit-1: no-op. Monotonic merge — components only grow.
        // Y1 commit-2 will add:
        //   if !is_still_connected(from, to, &removed_edges) {
        //       self.split_component(from, to);
        //   }
    }

    /// Resolve `node`'s partition lock box. Caller acquires the
    /// box's `wave_owner`, then SHOULD re-validate via
    /// [`Self::lock_for_validate`] that the resolved root hasn't
    /// shifted under a concurrent `union` (lock-validation retry
    /// loop, mirroring py `lock_for` lines 154–178).
    ///
    /// Returns `None` if `node` is not registered (defensive — should
    /// not happen in correct call paths).
    ///
    /// **Slice X5 status:** unused — Y1 wires the wave engine through it.
    #[allow(dead_code)]
    #[must_use]
    pub(crate) fn lock_for(&mut self, node: NodeId) -> Option<(SubgraphId, Arc<SubgraphLockBox>)> {
        if !self.parent.contains_key(&node) {
            return None;
        }
        let root = self.find(node);
        let box_arc = self.boxes.get(&root).cloned()?;
        Some((SubgraphId::from_node(root), box_arc))
    }

    /// Re-validate that `node`'s root resolves to `expected_box`. Used
    /// in the retry loop after acquiring the box's `wave_owner`: if a
    /// concurrent `union` redirected the root mid-acquire, the held
    /// lock is for the wrong partition and the caller must release +
    /// retry.
    ///
    /// **Slice X5 status:** unused — Y1 wires the wave engine retry
    /// loop through it.
    #[allow(dead_code)]
    #[must_use]
    pub(crate) fn lock_for_validate(
        &mut self,
        node: NodeId,
        expected_box: &Arc<SubgraphLockBox>,
    ) -> bool {
        let Some(root) = self.parent.get(&node).copied() else {
            return false;
        };
        let actual_root = self.find(root);
        match self.boxes.get(&actual_root) {
            Some(actual) => Arc::ptr_eq(actual, expected_box),
            None => false,
        }
    }

    /// Number of registered nodes. Useful for debugging + acceptance
    /// tests that verify the registry stays in sync with `Core::nodes`.
    #[must_use]
    pub fn node_count(&self) -> usize {
        self.parent.len()
    }

    /// Number of distinct connected components. Two threads emitting
    /// into nodes with distinct partitions can run truly parallel
    /// (Y1+); X5 substrate does not yet exercise that property.
    #[must_use]
    pub fn component_count(&self) -> usize {
        self.boxes.len()
    }

    /// Resolve `node`'s partition. Returns `None` for unregistered
    /// nodes. Mutating because path compression may relink under
    /// `find`.
    #[must_use]
    pub fn partition_of(&mut self, node: NodeId) -> Option<SubgraphId> {
        if !self.parent.contains_key(&node) {
            return None;
        }
        Some(SubgraphId::from_node(self.find(node)))
    }
}

// No `Default` impl — the registry is crate-internal infrastructure;
// `Core::new` is the only construction site. Adding a public `Default`
// would let external callers build a registry that can do nothing
// useful (every operation method is `pub(crate)`).

#[cfg(test)]
mod tests {
    use super::*;

    fn n(raw: u64) -> NodeId {
        NodeId::new(raw)
    }

    #[test]
    fn singleton_register_creates_one_partition() {
        let mut r = SubgraphRegistry::new();
        r.ensure_registered(n(1));
        assert_eq!(r.node_count(), 1);
        assert_eq!(r.component_count(), 1);
        assert_eq!(r.find(n(1)), n(1));
    }

    #[test]
    fn union_merges_two_singletons() {
        let mut r = SubgraphRegistry::new();
        r.ensure_registered(n(1));
        r.ensure_registered(n(2));
        assert_eq!(r.component_count(), 2);
        r.union_nodes(n(1), n(2));
        assert_eq!(r.component_count(), 1);
        assert_eq!(r.find(n(1)), r.find(n(2)));
    }

    #[test]
    fn union_idempotent_within_same_component() {
        let mut r = SubgraphRegistry::new();
        r.ensure_registered(n(1));
        r.ensure_registered(n(2));
        r.union_nodes(n(1), n(2));
        let comp_before = r.component_count();
        r.union_nodes(n(1), n(2));
        assert_eq!(r.component_count(), comp_before);
    }

    #[test]
    fn cleanup_singleton_removes_partition() {
        let mut r = SubgraphRegistry::new();
        r.ensure_registered(n(1));
        r.cleanup_node(n(1));
        assert_eq!(r.node_count(), 0);
        assert_eq!(r.component_count(), 0);
    }

    #[test]
    fn cleanup_root_promotes_child() {
        let mut r = SubgraphRegistry::new();
        r.ensure_registered(n(1));
        r.ensure_registered(n(2));
        r.union_nodes(n(1), n(2));
        // After union: one of {1, 2} is root.
        let root_before = r.find(n(1));
        let child = if root_before == n(1) { n(2) } else { n(1) };
        r.cleanup_node(root_before);
        // The remaining node should be its own root.
        assert_eq!(r.find(child), child);
        assert_eq!(r.component_count(), 1);
    }

    #[test]
    fn cleanup_non_root_re_links_grandchildren_to_parent() {
        let mut r = SubgraphRegistry::new();
        for i in 1..=3 {
            r.ensure_registered(n(i));
        }
        r.union_nodes(n(1), n(2));
        r.union_nodes(n(2), n(3));
        let root_before = r.find(n(1));
        // Pick a non-root node to clean up.
        let non_root = if root_before == n(1) {
            n(2)
        } else if root_before == n(2) {
            n(1)
        } else {
            n(2)
        };
        r.cleanup_node(non_root);
        // Remaining nodes should still find a single root.
        let other = (1..=3u64)
            .map(n)
            .find(|x| *x != root_before && *x != non_root)
            .expect("third node");
        assert_eq!(r.find(root_before), r.find(other));
    }

    #[test]
    fn lock_for_returns_same_box_for_same_component() {
        let mut r = SubgraphRegistry::new();
        r.ensure_registered(n(1));
        r.ensure_registered(n(2));
        r.union_nodes(n(1), n(2));
        let (_sid_a, box_a) = r.lock_for(n(1)).expect("registered");
        let (_sid_b, box_b) = r.lock_for(n(2)).expect("registered");
        assert!(Arc::ptr_eq(&box_a, &box_b));
    }

    #[test]
    fn lock_for_validate_detects_redirect_after_union() {
        // Slice X5 /qa P6: deterministic redirect via forced rank
        // skew. Pre-fix this test asserted conditionally based on
        // which node union promoted, masking a hypothetical regression
        // where `lock_for_validate` always returned `true`. Now the
        // setup forces n(1)'s root to be displaced — the assertion
        // is unconditionally `false`.
        let mut r = SubgraphRegistry::new();
        for i in 1..=4 {
            r.ensure_registered(n(i));
        }
        // Build a higher-rank tree under n(2)'s root: union n(2)+n(3)
        // and n(2)+n(4), which under union-by-rank with equal initial
        // ranks promotes n(2) and bumps its rank by the second union.
        // After this, find(n(2)) == n(2) and rank[n(2)] >= 1.
        r.union_nodes(n(2), n(3));
        r.union_nodes(n(2), n(4));
        let n2_root = r.find(n(2));
        // n(1) stays its own singleton (rank 0). Resolve its box BEFORE
        // the cross-tree union.
        let (_sid_before, box_1_alone) = r.lock_for(n(1)).expect("registered");
        let n1_root_before = r.find(n(1));
        assert_eq!(n1_root_before, n(1), "n(1) is still its own root");

        // Cross-tree union: union-by-rank promotes the higher-rank tree
        // (n(2)'s) — n(1)'s root MUST become n(2)'s root.
        r.union_nodes(n(1), n(2));
        let n1_root_after = r.find(n(1));
        assert_eq!(
            n1_root_after, n2_root,
            "union-by-rank promoted n(2)'s tree; n(1)'s root displaced"
        );

        // The previously-resolved box is now stale: lock_for_validate
        // must detect the redirect unconditionally.
        let still_valid = r.lock_for_validate(n(1), &box_1_alone);
        assert!(
            !still_valid,
            "lock_for_validate must detect the box-redirect after union promotes a different root"
        );
        // And lock_for now returns a different box.
        let (_sid_after, box_after) = r.lock_for(n(1)).expect("registered");
        assert!(
            !Arc::ptr_eq(&box_1_alone, &box_after),
            "stale box and active box must be distinct Arc identities"
        );
    }

    #[test]
    fn partition_of_distinct_singletons_differ() {
        let mut r = SubgraphRegistry::new();
        r.ensure_registered(n(1));
        r.ensure_registered(n(2));
        let p1 = r.partition_of(n(1)).expect("registered");
        let p2 = r.partition_of(n(2)).expect("registered");
        assert_ne!(p1, p2);
    }

    #[test]
    fn partition_of_unioned_nodes_match() {
        let mut r = SubgraphRegistry::new();
        r.ensure_registered(n(1));
        r.ensure_registered(n(2));
        r.union_nodes(n(1), n(2));
        let p1 = r.partition_of(n(1)).expect("registered");
        let p2 = r.partition_of(n(2)).expect("registered");
        assert_eq!(p1, p2);
    }
}