Skip to main content

graphrefly_core/
subgraph.rs

1//! Per-subgraph union-find registry (Slice X5, 2026-05-08).
2//!
3//! Direct port of [`graphrefly-py`'s
4//! `subgraph_locks.py`](https://github.com/graphrefly/graphrefly-py/blob/main/src/graphrefly/core/subgraph_locks.py)
5//! (locked design via [`SESSION-rust-port-d3-per-subgraph-parallelism.md`](https://github.com/graphrefly/graphrefly-ts/blob/main/archive/docs/SESSION-rust-port-d3-per-subgraph-parallelism.md)
6//! — Q1 = (c-uf split-eager); see decision log D086).
7//!
8//! # Concept
9//!
10//! Each registered node is a member of exactly one connected
11//! component (a "subgraph") at any moment. Two nodes are in the same
12//! subgraph iff they're connected via dep edges (transitively).
13//!
14//! Connected-component membership is tracked via union-find with
15//! union-by-rank + path compression. Each component's root carries
16//! an [`Arc<SubgraphLockBox>`] that owns the partition's
17//! `wave_owner` re-entrant mutex — nodes in the same component share
18//! one lock, disjoint components run truly parallel.
19//!
20//! # Lifecycle hooks
21//!
22//! - [`SubgraphRegistry::ensure_registered`] — called when a node is
23//!   first registered via [`crate::Core::register`]. Allocates a
24//!   fresh singleton component for the new node.
25//! - [`SubgraphRegistry::union_nodes`] — called for each new dep
26//!   edge (in `register` and `set_deps`'s add-edge path). Merges
27//!   the two endpoints' components.
28//! - [`SubgraphRegistry::cleanup_node`] — wired but **not yet called
29//!   in X5**. In Y1 the wave engine will invoke it from sites that
30//!   actually remove a `NodeRecord` from `CoreState.nodes` (today
31//!   `terminate_node` only marks the node terminal — `NodeRecord`s
32//!   persist for the life of `CoreState`). The registry's HashMap
33//!   entries drop together with `CoreState` when the last `Core`
34//!   clone goes, so X5 doesn't leak partitions in practice; the
35//!   per-node cleanup hook becomes load-bearing once Y1 lands a
36//!   `Drop`-via-removal lifecycle for `NodeRecord` (post-Y1 work).
37//! - [`SubgraphRegistry::on_edge_removed`] — called for each removed
38//!   dep edge in `set_deps`'s remove path. Slice X5 commit-1 just
39//!   notes the removal; **Y1 (split-eager)** adds the reachability
40//!   walk that splits disconnected components.
41//!
42//! # Lock-acquisition discipline (Y1)
43//!
44//! [`SubgraphRegistry::lock_for`] returns the component's
45//! [`Arc<SubgraphLockBox>`] — caller acquires `box.wave_owner` and
46//! re-validates that the resolved root hasn't been redirected by a
47//! concurrent `union` (mirrors py's `MAX_LOCK_RETRIES` retry loop).
48//!
49//! # Slice X5 scope
50//!
51//! Substrate types + registry tracking are wired to `Core::register`
52//! and `Core::set_deps`'s edge add path so the union-find state is
53//! maintained as nodes register and topology changes. The wave engine
54//! itself still uses the legacy Core-level `wave_owner` — Y1 (the
55//! wave-engine migration to per-partition `wave_owner`) is carried
56//! forward as an explicit follow-on slice given its scope (every
57//! `begin_batch` / `run_wave` call site + `Core::subscribe` lock
58//! acquisition + `BatchGuard` lock retention + retry-validate
59//! semantics for held-Arc-vs-current-root divergence on union).
60//!
61//! Several substrate methods (`cleanup_node`, `lock_for`,
62//! `lock_for_validate`) and the `SubgraphLockBox::wave_owner` field
63//! are wired but unused in X5; they're annotated per-item with
64//! `#[allow(dead_code)]` until Y1 activates the wave engine through
65//! them. (Per-item rather than module-level shotgun so any genuinely
66//! unused new item still flags as dead code during X5 follow-up
67//! review.)
68
69use std::collections::{HashMap, HashSet};
70use std::sync::Arc;
71
72use parking_lot::ReentrantMutex;
73
74use crate::handle::NodeId;
75
76/// Newtype identifier for a connected-component partition.
77///
78/// Internally a `u64` (the union-find root's `NodeId.raw()`). Distinct
79/// from [`NodeId`] at the type system level — partitions and nodes are
80/// not interchangeable.
81#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord)]
82pub struct SubgraphId(pub(crate) u64);
83
84impl SubgraphId {
85    /// Construct from a [`NodeId`] root. Internal use — partition
86    /// identity is the union-find root's NodeId.
87    #[must_use]
88    pub(crate) fn from_node(node: NodeId) -> Self {
89        Self(node.raw())
90    }
91
92    /// Raw u64 view. Used for total-ordering across multi-partition
93    /// wave acquisitions per Q4=(a) (deadlock-free via ascending
94    /// SubgraphId order).
95    #[must_use]
96    pub fn raw(self) -> u64 {
97        self.0
98    }
99}
100
101impl std::fmt::Display for SubgraphId {
102    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
103        write!(f, "subgraph#{}", self.0)
104    }
105}
106
107/// Per-component lock box. Holds the partition's `wave_owner`
108/// re-entrant mutex. On `union`, the box of the smaller-rank root is
109/// replaced by the larger-rank root's box — the lock identity is
110/// preserved across merges via the [`Arc`] reference (mirrors py's
111/// `_LockBox.lock` redirect on union).
112///
113/// **Slice Y1 (D3 / Phase E, 2026-05-08):** the wave engine acquires
114/// this per-partition lock via [`crate::Core::partition_wave_owner_lock_arc`]
115/// (with retry-validate against concurrent union/split). Closure-form
116/// `Core::batch` acquires every partition's lock in ascending
117/// [`SubgraphId`] order; per-seed entry points (`emit`, `subscribe`,
118/// etc.) acquire only the seed's transitively-touched partitions.
119pub struct SubgraphLockBox {
120    /// Per-partition wave ownership. Cross-thread emits to the same
121    /// partition serialize here; same-thread re-entry passes through.
122    /// Wrapped in `Arc` at the registry level so two roots' boxes
123    /// can share the same mutex identity after `union`.
124    pub(crate) wave_owner: Arc<ReentrantMutex<()>>,
125}
126
127impl SubgraphLockBox {
128    fn new() -> Arc<Self> {
129        Arc::new(Self {
130            wave_owner: Arc::new(ReentrantMutex::new(())),
131        })
132    }
133}
134
135/// Default cap for [`SubgraphRegistry::lock_for`]'s root-validation
136/// retry loop. Mirrors graphrefly-py's `_MAX_LOCK_RETRIES` constant
137/// (`subgraph_locks.py` line 39). Reached only under continuous
138/// `union` activity racing with `lock_for` — pathological in
139/// practice but bounded for safety.
140///
141/// Consumed by [`crate::Core::partition_wave_owner_lock_arc`] under
142/// the retry-validate loop (Slice Y1 / Phase E, 2026-05-08).
143pub(crate) const MAX_LOCK_RETRIES: u32 = 100;
144
145/// Union-find registry tracking each node's connected-component
146/// membership. Wrapped by [`crate::Core`] in `Arc<Mutex<...>>` so
147/// the wave engine can resolve a node's partition before acquiring
148/// that partition's `wave_owner`.
149pub struct SubgraphRegistry {
150    /// `node_id → parent_id` union-find array. A root has
151    /// `parent[root] == root`.
152    parent: HashMap<NodeId, NodeId>,
153    /// `node_id → rank` for union-by-rank. Only meaningful for roots.
154    rank: HashMap<NodeId, u32>,
155    /// Reverse map for cleanup-time re-rooting: `node_id → set of
156    /// direct children whose `parent` field points at `node_id``.
157    /// On root cleanup, we promote one child to root and re-attach
158    /// the others.
159    children: HashMap<NodeId, HashSet<NodeId>>,
160    /// `root_node_id → Arc<SubgraphLockBox>`. Only roots have entries.
161    /// On `union`, the loser's entry is removed and any future
162    /// `lock_for` calls on its members find the winner's box via
163    /// `find`.
164    boxes: HashMap<NodeId, Arc<SubgraphLockBox>>,
165}
166
167impl SubgraphRegistry {
168    /// Construct an empty registry. `pub(crate)` because all useful
169    /// operations on the registry are crate-internal — only `Core::new`
170    /// needs to construct one. External callers receive
171    /// [`SubgraphId`]s via [`Core::partition_of`] but cannot construct
172    /// a registry from outside the crate.
173    #[must_use]
174    pub(crate) fn new() -> Self {
175        Self {
176            parent: HashMap::new(),
177            rank: HashMap::new(),
178            children: HashMap::new(),
179            boxes: HashMap::new(),
180        }
181    }
182
183    /// Register `node` as the root of a fresh singleton component.
184    /// Idempotent — calling twice on the same node is a no-op.
185    ///
186    /// Called from [`crate::Core::register`] right after the new
187    /// `NodeId` is allocated, BEFORE any `union_nodes` calls for the
188    /// node's deps.
189    pub(crate) fn ensure_registered(&mut self, node: NodeId) {
190        if self.parent.contains_key(&node) {
191            return;
192        }
193        self.parent.insert(node, node);
194        self.rank.insert(node, 0);
195        self.children.insert(node, HashSet::new());
196        self.boxes.insert(node, SubgraphLockBox::new());
197    }
198
199    /// Find the root of `node`'s component, with path compression.
200    /// Panics if `node` is not registered.
201    ///
202    /// **Iterative two-pass implementation** (Slice X5 /qa):
203    /// pass 1 walks up the parent chain to the root; pass 2 re-links
204    /// every node on the path directly to the root + maintains the
205    /// `children` reverse-map. Iterative form mirrors the existing
206    /// `terminate_node` cascade discipline (and graphrefly-py's
207    /// `_find_locked` while-loop) — keeps stack depth O(1) regardless
208    /// of pre-compression chain length, avoiding stack overflow on
209    /// pathological un-compressed trees that can briefly form before
210    /// path compression settles (especially relevant once Y1 puts
211    /// `find` on the hot path under `lock_for`).
212    #[must_use]
213    pub(crate) fn find(&mut self, node: NodeId) -> NodeId {
214        // Pass 1: walk up to root (no mutation).
215        let mut cur = node;
216        loop {
217            let parent = *self
218                .parent
219                .get(&cur)
220                .expect("subgraph_registry::find: node not registered");
221            if parent == cur {
222                break; // cur is root
223            }
224            cur = parent;
225        }
226        let root = cur;
227
228        // Pass 2: path compression — re-link every node on the original
229        // walk directly to root, maintaining the `children` reverse-map.
230        // Skip when already pointing at root (no-op for root itself, and
231        // for already-compressed nodes).
232        let mut walker = node;
233        while walker != root {
234            let parent = *self
235                .parent
236                .get(&walker)
237                .expect("walker on path-to-root must be registered");
238            if parent != root {
239                self.parent.insert(walker, root);
240                if let Some(old_kids) = self.children.get_mut(&parent) {
241                    old_kids.remove(&walker);
242                }
243                self.children.entry(root).or_default().insert(walker);
244            }
245            walker = parent;
246        }
247        root
248    }
249
250    /// Merge the components containing `a` and `b`. Both nodes must
251    /// already be registered. After this call, `find(a) == find(b)`.
252    ///
253    /// Union-by-rank: the smaller-rank root becomes a child of the
254    /// larger. Equal-rank breaks the tie by promoting `a`'s root and
255    /// bumping its rank.
256    ///
257    /// On merge, the loser's [`SubgraphLockBox`] entry is removed
258    /// from `boxes`. Future `lock_for` calls on the loser's members
259    /// resolve to the winner's root → winner's box.
260    pub(crate) fn union_nodes(&mut self, a: NodeId, b: NodeId) {
261        debug_assert!(
262            self.parent.contains_key(&a) && self.parent.contains_key(&b),
263            "union_nodes: both nodes must be registered first"
264        );
265        // Defense-in-depth against bypassed cycle detection: a self-edge
266        // `set_deps(n, &[n])` would reach here as `union_nodes(n, n)`.
267        // Cycle rejection in `Core::set_deps` should catch this BEFORE
268        // we get called, but the registry has no other defense.
269        debug_assert!(
270            a != b,
271            "union_nodes called with self-edge — \
272             Core's cycle detection bypassed?"
273        );
274        let mut root_a = self.find(a);
275        let mut root_b = self.find(b);
276        if root_a == root_b {
277            return;
278        }
279        let rank_a = *self.rank.get(&root_a).unwrap_or(&0);
280        let rank_b = *self.rank.get(&root_b).unwrap_or(&0);
281        if rank_a < rank_b {
282            std::mem::swap(&mut root_a, &mut root_b);
283        }
284        // root_a is the winner (kept). root_b becomes a child.
285        self.parent.insert(root_b, root_a);
286        self.children.entry(root_a).or_default().insert(root_b);
287        if rank_a == rank_b {
288            self.rank.insert(root_a, rank_a + 1);
289        }
290        // Drop the loser's box. Any in-flight readers holding an Arc
291        // clone keep it alive; the registry no longer references it.
292        self.boxes.remove(&root_b);
293    }
294
295    /// Remove `node` from the registry. If `node` was a root, promote
296    /// one of its direct children as the new root and re-link the
297    /// others. Mirrors py's `_on_gc` (`subgraph_locks.py` lines 53–92).
298    ///
299    /// **Slice X5 status:** wired but NOT called from Core. Today
300    /// `Core::terminate_node` marks a node terminal but does NOT remove
301    /// it from `CoreState.nodes` (NodeRecords persist for the life of
302    /// `CoreState`); the registry's HashMap entries drop together with
303    /// `CoreState` when the last `Core` clone goes, so the partition
304    /// state is reclaimed without an explicit per-node cleanup. Y1
305    /// will revisit this — when the wave engine activates per-partition
306    /// `wave_owner`, terminated nodes' partition entries should be
307    /// purged so a stale partition doesn't keep its `Arc<SubgraphLockBox>`
308    /// alive in the registry. Idempotent on unregistered nodes.
309    #[allow(dead_code)]
310    pub(crate) fn cleanup_node(&mut self, node: NodeId) {
311        let Some(parent) = self.parent.get(&node).copied() else {
312            return; // already cleaned up — idempotent.
313        };
314
315        let direct_children: Vec<NodeId> = self
316            .children
317            .get(&node)
318            .map(|s| s.iter().copied().collect())
319            .unwrap_or_default();
320
321        if parent == node {
322            // `node` was the root.
323            if let Some(&new_root) = direct_children.first() {
324                self.parent.insert(new_root, new_root);
325                // Detach `node` from each grandchild's children-set first
326                // (before borrowing children mut for the new_root entry).
327                for child in &direct_children {
328                    if let Some(kids) = self.children.get_mut(child) {
329                        kids.remove(&node);
330                    }
331                }
332                // Re-attach all children except `new_root` to `new_root`.
333                let new_root_kids = self.children.entry(new_root).or_default();
334                for child in direct_children.iter().skip(1).copied() {
335                    self.parent.insert(child, new_root);
336                    new_root_kids.insert(child);
337                }
338                // Box ownership transfers to the new root — same Arc, so
339                // any in-flight `lock_for` holders keep their guard alive.
340                if let Some(box_arc) = self.boxes.remove(&node) {
341                    self.boxes.insert(new_root, box_arc);
342                }
343                let old_rank = self.rank.get(&node).copied().unwrap_or(0);
344                let new_rank = self.rank.entry(new_root).or_insert(0);
345                if old_rank > *new_rank {
346                    *new_rank = old_rank;
347                }
348            } else {
349                // Singleton root — just remove the box.
350                self.boxes.remove(&node);
351            }
352        } else {
353            // `node` was a non-root. Detach from its parent's children;
354            // re-attach its own children to its parent.
355            if let Some(parent_kids) = self.children.get_mut(&parent) {
356                parent_kids.remove(&node);
357                for child in &direct_children {
358                    parent_kids.insert(*child);
359                }
360            }
361            for child in &direct_children {
362                self.parent.insert(*child, parent);
363            }
364        }
365
366        self.children.remove(&node);
367        self.parent.remove(&node);
368        self.rank.remove(&node);
369        // boxes already removed above on root path; non-root path never
370        // had a box entry.
371    }
372
373    /// Hook for an edge removal. Slice X5 commit-1: was a no-op
374    /// (monotonic-merge stepping stone). Slice Y1 / Phase F (D3
375    /// split-eager, 2026-05-09): the actual reachability walk +
376    /// split decision lives in `Core::set_deps` (it has access to
377    /// `s.children` and `s.nodes`'s `dep_records` for the undirected
378    /// dep-edge graph). When `Core` detects disconnection it calls
379    /// [`Self::split_partition`] directly. This `on_edge_removed`
380    /// hook remains as a no-op marker — kept for API symmetry with
381    /// [`Self::union_nodes`] and for future use if the registry
382    /// gains its own edge-graph view.
383    pub(crate) fn on_edge_removed(&mut self, _from: NodeId, _to: NodeId) {
384        // No-op. See `Core::set_deps` Phase F split-eager block which
385        // calls `split_partition` directly when an edge removal causes
386        // disconnection in the undirected dep-edge graph.
387    }
388
389    /// Split an existing component into two, given the keep-side and
390    /// orphan-side membership. Slice Y1 / Phase F (D3 split-eager,
391    /// 2026-05-09).
392    ///
393    /// **Invariants assumed by caller:**
394    /// 1. `component_nodes` lists every node currently in some single
395    ///    component C of this registry.
396    /// 2. `keep_side` ⊆ `component_nodes` is the post-removal
397    ///    connected subset that should retain C's existing
398    ///    [`SubgraphLockBox`] (Arc identity preserved). `keep_side`
399    ///    must be non-empty.
400    /// 3. The orphan side (`component_nodes - keep_side`) must be
401    ///    non-empty. (If it were empty, no split is needed; caller
402    ///    should not invoke this method.)
403    /// 4. `edges_in_component` lists the dep edges (as `(parent,
404    ///    child)` data-flow pairs) currently present in
405    ///    `Core::s.children` / `s.nodes[*].dep_records`,
406    ///    POST-removal of the triggering edge. The edges connect
407    ///    nodes within `component_nodes`. The caller is responsible
408    ///    for filtering to in-component edges.
409    ///
410    /// **Algorithm (mirrors graphrefly-py [`subgraph_locks.py`](https://github.com/graphrefly/graphrefly-py/blob/main/src/graphrefly/core/subgraph_locks.py)
411    /// `_on_split` plus a Rust-idiomatic Arc redirect):**
412    /// 1. Capture the original [`Arc<SubgraphLockBox>`] for the
413    ///    component's pre-split root.
414    /// 2. Reset every component node to a singleton (parent[n] = n,
415    ///    rank = 0, children = ∅).
416    /// 3. Re-union via `edges_in_component` — each edge calls
417    ///    [`Self::union_nodes`], which restores connectivity within
418    ///    each side independently (since `edges_in_component`
419    ///    contains only the post-removal edges, the two sides stay
420    ///    disconnected from each other in the union-find tree).
421    /// 4. Resolve the new keep-side root and orphan-side root.
422    /// 5. Assign the original lock box to the keep-side root (so
423    ///    in-flight waves holding the original `Arc` succeed
424    ///    `lock_for_validate` against the keep-side); allocate a
425    ///    fresh box for the orphan-side root (in-flight waves on
426    ///    the orphan side fail validate, retry with the new box).
427    pub(crate) fn split_partition(
428        &mut self,
429        component_nodes: &[NodeId],
430        keep_side_nodes: &[NodeId],
431        edges_in_component: &[(NodeId, NodeId)],
432    ) {
433        debug_assert!(
434            !component_nodes.is_empty(),
435            "component_nodes must be non-empty"
436        );
437        debug_assert!(
438            !keep_side_nodes.is_empty(),
439            "keep_side_nodes must be non-empty"
440        );
441        // Build a contains-lookup set for the keep side. The caller
442        // passes a slice (avoids cross-crate-internal HashSet flavor
443        // mismatch — `std::collections::HashSet` vs `ahash::AHashSet`).
444        let keep_side: HashSet<NodeId> = keep_side_nodes.iter().copied().collect();
445        debug_assert!(
446            component_nodes.iter().any(|n| !keep_side.contains(n)),
447            "orphan side must be non-empty (no-op caller)"
448        );
449
450        // Step 1: capture original box.
451        let original_root = self.find(component_nodes[0]);
452        let original_box = self
453            .boxes
454            .remove(&original_root)
455            .expect("original_root must have a registered box");
456
457        // Step 2: reset every component node to a singleton.
458        for &n in component_nodes {
459            self.parent.insert(n, n);
460            self.rank.insert(n, 0);
461            self.children.insert(n, HashSet::new());
462        }
463
464        // Step 3: re-union via post-removal edges. Both sides become
465        // internally connected; the two sides remain disconnected from
466        // each other (since the triggering edge was removed and the
467        // caller's BFS verified disconnection).
468        for &(a, b) in edges_in_component {
469            // `union_nodes` is idempotent on already-merged components,
470            // so multiple edges into the same connected subset are fine.
471            // Self-edges shouldn't appear here (Core's cycle detection
472            // rejects them at edge-mutation time) but guard defensively.
473            if a != b {
474                self.union_nodes(a, b);
475            }
476        }
477
478        // Step 4: resolve roots. Pick any keep-side / orphan-side
479        // representative and find its post-re-union root.
480        let keep_repr = keep_side_nodes[0];
481        let keep_root = self.find(keep_repr);
482        let orphan_repr = *component_nodes
483            .iter()
484            .find(|n| !keep_side.contains(n))
485            .expect("non-empty orphan side");
486        let orphan_root = self.find(orphan_repr);
487        debug_assert!(
488            keep_root != orphan_root,
489            "split_partition: keep_root {keep_root:?} and orphan_root {orphan_root:?} \
490             must be distinct after re-union — caller's BFS must have asserted \
491             disconnection"
492        );
493
494        // Step 5: assign boxes. Original box → keep-side root (so
495        // in-flight waves' `lock_for_validate` against the held Arc
496        // succeed for keep-side nodes). Fresh box → orphan-side root.
497        self.boxes.insert(keep_root, original_box);
498        self.boxes.insert(orphan_root, SubgraphLockBox::new());
499    }
500
501    /// Resolve `node`'s partition lock box. Caller acquires the
502    /// box's `wave_owner`, then SHOULD re-validate via
503    /// [`Self::lock_for_validate`] that the resolved root hasn't
504    /// shifted under a concurrent `union` (lock-validation retry
505    /// loop, mirroring py `lock_for` lines 154–178).
506    ///
507    /// Returns `None` if `node` is not registered (defensive — should
508    /// not happen in correct call paths).
509    ///
510    /// Consumed by [`crate::Core::partition_wave_owner_lock_arc`]
511    /// under the retry-validate loop (Slice Y1 / Phase E, 2026-05-08).
512    #[must_use]
513    pub(crate) fn lock_for(&mut self, node: NodeId) -> Option<(SubgraphId, Arc<SubgraphLockBox>)> {
514        if !self.parent.contains_key(&node) {
515            return None;
516        }
517        let root = self.find(node);
518        let box_arc = self.boxes.get(&root).cloned()?;
519        Some((SubgraphId::from_node(root), box_arc))
520    }
521
522    /// Re-validate that `node`'s root resolves to `expected_box`. Used
523    /// in the retry loop after acquiring the box's `wave_owner`: if a
524    /// concurrent `union` redirected the root mid-acquire, the held
525    /// lock is for the wrong partition and the caller must release +
526    /// retry.
527    ///
528    /// Consumed by [`crate::Core::partition_wave_owner_lock_arc`]
529    /// under the retry-validate loop (Slice Y1 / Phase E, 2026-05-08).
530    #[must_use]
531    pub(crate) fn lock_for_validate(
532        &mut self,
533        node: NodeId,
534        expected_box: &Arc<SubgraphLockBox>,
535    ) -> bool {
536        let Some(root) = self.parent.get(&node).copied() else {
537            return false;
538        };
539        let actual_root = self.find(root);
540        match self.boxes.get(&actual_root) {
541            Some(actual) => Arc::ptr_eq(actual, expected_box),
542            None => false,
543        }
544    }
545
546    /// Snapshot of every currently-existing partition, sorted in
547    /// ascending [`SubgraphId`] order. Returns `(partition_id, lock_box)`
548    /// pairs — the caller acquires `box.wave_owner.lock_arc()` on each
549    /// in ascending order to enter a closure-form batch (which doesn't
550    /// have a known seed and must serialize against ALL partitions per
551    /// session-doc Q7 + decision D092).
552    ///
553    /// Consumed by `Core::all_partitions_lock_boxes` (Slice Y1 /
554    /// Phase E, 2026-05-08).
555    #[must_use]
556    pub(crate) fn all_partitions(&self) -> Vec<(SubgraphId, Arc<SubgraphLockBox>)> {
557        let mut out: Vec<(SubgraphId, Arc<SubgraphLockBox>)> = self
558            .boxes
559            .iter()
560            .map(|(root, box_arc)| (SubgraphId::from_node(*root), Arc::clone(box_arc)))
561            .collect();
562        out.sort_unstable_by_key(|(sid, _)| *sid);
563        out
564    }
565
566    /// Number of registered nodes. Useful for debugging + acceptance
567    /// tests that verify the registry stays in sync with `Core::nodes`.
568    #[must_use]
569    pub fn node_count(&self) -> usize {
570        self.parent.len()
571    }
572
573    /// Snapshot of every currently-registered node. Used by
574    /// `Core::set_deps`'s split-eager block (Slice Y1 / Phase F,
575    /// 2026-05-09) to enumerate candidate nodes for a component-
576    /// membership filter; iterating + calling `find` would alias-
577    /// borrow `&mut self`, so we snapshot first.
578    #[must_use]
579    pub(crate) fn registered_nodes(&self) -> Vec<NodeId> {
580        self.parent.keys().copied().collect()
581    }
582
583    /// Number of distinct connected components. Two threads emitting
584    /// into nodes with distinct partitions can run truly parallel
585    /// (Y1+); X5 substrate does not yet exercise that property.
586    #[must_use]
587    pub fn component_count(&self) -> usize {
588        self.boxes.len()
589    }
590
591    /// Resolve `node`'s partition. Returns `None` for unregistered
592    /// nodes. Mutating because path compression may relink under
593    /// `find`.
594    #[must_use]
595    pub fn partition_of(&mut self, node: NodeId) -> Option<SubgraphId> {
596        if !self.parent.contains_key(&node) {
597            return None;
598        }
599        Some(SubgraphId::from_node(self.find(node)))
600    }
601}
602
603// No `Default` impl — the registry is crate-internal infrastructure;
604// `Core::new` is the only construction site. Adding a public `Default`
605// would let external callers build a registry that can do nothing
606// useful (every operation method is `pub(crate)`).
607
608#[cfg(test)]
609mod tests {
610    use super::*;
611
612    fn n(raw: u64) -> NodeId {
613        NodeId::new(raw)
614    }
615
616    #[test]
617    fn singleton_register_creates_one_partition() {
618        let mut r = SubgraphRegistry::new();
619        r.ensure_registered(n(1));
620        assert_eq!(r.node_count(), 1);
621        assert_eq!(r.component_count(), 1);
622        assert_eq!(r.find(n(1)), n(1));
623    }
624
625    #[test]
626    fn union_merges_two_singletons() {
627        let mut r = SubgraphRegistry::new();
628        r.ensure_registered(n(1));
629        r.ensure_registered(n(2));
630        assert_eq!(r.component_count(), 2);
631        r.union_nodes(n(1), n(2));
632        assert_eq!(r.component_count(), 1);
633        assert_eq!(r.find(n(1)), r.find(n(2)));
634    }
635
636    #[test]
637    fn union_idempotent_within_same_component() {
638        let mut r = SubgraphRegistry::new();
639        r.ensure_registered(n(1));
640        r.ensure_registered(n(2));
641        r.union_nodes(n(1), n(2));
642        let comp_before = r.component_count();
643        r.union_nodes(n(1), n(2));
644        assert_eq!(r.component_count(), comp_before);
645    }
646
647    #[test]
648    fn cleanup_singleton_removes_partition() {
649        let mut r = SubgraphRegistry::new();
650        r.ensure_registered(n(1));
651        r.cleanup_node(n(1));
652        assert_eq!(r.node_count(), 0);
653        assert_eq!(r.component_count(), 0);
654    }
655
656    #[test]
657    fn cleanup_root_promotes_child() {
658        let mut r = SubgraphRegistry::new();
659        r.ensure_registered(n(1));
660        r.ensure_registered(n(2));
661        r.union_nodes(n(1), n(2));
662        // After union: one of {1, 2} is root.
663        let root_before = r.find(n(1));
664        let child = if root_before == n(1) { n(2) } else { n(1) };
665        r.cleanup_node(root_before);
666        // The remaining node should be its own root.
667        assert_eq!(r.find(child), child);
668        assert_eq!(r.component_count(), 1);
669    }
670
671    #[test]
672    fn cleanup_non_root_re_links_grandchildren_to_parent() {
673        let mut r = SubgraphRegistry::new();
674        for i in 1..=3 {
675            r.ensure_registered(n(i));
676        }
677        r.union_nodes(n(1), n(2));
678        r.union_nodes(n(2), n(3));
679        let root_before = r.find(n(1));
680        // Pick a non-root node to clean up.
681        let non_root = if root_before == n(1) {
682            n(2)
683        } else if root_before == n(2) {
684            n(1)
685        } else {
686            n(2)
687        };
688        r.cleanup_node(non_root);
689        // Remaining nodes should still find a single root.
690        let other = (1..=3u64)
691            .map(n)
692            .find(|x| *x != root_before && *x != non_root)
693            .expect("third node");
694        assert_eq!(r.find(root_before), r.find(other));
695    }
696
697    #[test]
698    fn lock_for_returns_same_box_for_same_component() {
699        let mut r = SubgraphRegistry::new();
700        r.ensure_registered(n(1));
701        r.ensure_registered(n(2));
702        r.union_nodes(n(1), n(2));
703        let (_sid_a, box_a) = r.lock_for(n(1)).expect("registered");
704        let (_sid_b, box_b) = r.lock_for(n(2)).expect("registered");
705        assert!(Arc::ptr_eq(&box_a, &box_b));
706    }
707
708    #[test]
709    fn lock_for_validate_detects_redirect_after_union() {
710        // Slice X5 /qa P6: deterministic redirect via forced rank
711        // skew. Pre-fix this test asserted conditionally based on
712        // which node union promoted, masking a hypothetical regression
713        // where `lock_for_validate` always returned `true`. Now the
714        // setup forces n(1)'s root to be displaced — the assertion
715        // is unconditionally `false`.
716        let mut r = SubgraphRegistry::new();
717        for i in 1..=4 {
718            r.ensure_registered(n(i));
719        }
720        // Build a higher-rank tree under n(2)'s root: union n(2)+n(3)
721        // and n(2)+n(4), which under union-by-rank with equal initial
722        // ranks promotes n(2) and bumps its rank by the second union.
723        // After this, find(n(2)) == n(2) and rank[n(2)] >= 1.
724        r.union_nodes(n(2), n(3));
725        r.union_nodes(n(2), n(4));
726        let n2_root = r.find(n(2));
727        // n(1) stays its own singleton (rank 0). Resolve its box BEFORE
728        // the cross-tree union.
729        let (_sid_before, box_1_alone) = r.lock_for(n(1)).expect("registered");
730        let n1_root_before = r.find(n(1));
731        assert_eq!(n1_root_before, n(1), "n(1) is still its own root");
732
733        // Cross-tree union: union-by-rank promotes the higher-rank tree
734        // (n(2)'s) — n(1)'s root MUST become n(2)'s root.
735        r.union_nodes(n(1), n(2));
736        let n1_root_after = r.find(n(1));
737        assert_eq!(
738            n1_root_after, n2_root,
739            "union-by-rank promoted n(2)'s tree; n(1)'s root displaced"
740        );
741
742        // The previously-resolved box is now stale: lock_for_validate
743        // must detect the redirect unconditionally.
744        let still_valid = r.lock_for_validate(n(1), &box_1_alone);
745        assert!(
746            !still_valid,
747            "lock_for_validate must detect the box-redirect after union promotes a different root"
748        );
749        // And lock_for now returns a different box.
750        let (_sid_after, box_after) = r.lock_for(n(1)).expect("registered");
751        assert!(
752            !Arc::ptr_eq(&box_1_alone, &box_after),
753            "stale box and active box must be distinct Arc identities"
754        );
755    }
756
757    #[test]
758    fn partition_of_distinct_singletons_differ() {
759        let mut r = SubgraphRegistry::new();
760        r.ensure_registered(n(1));
761        r.ensure_registered(n(2));
762        let p1 = r.partition_of(n(1)).expect("registered");
763        let p2 = r.partition_of(n(2)).expect("registered");
764        assert_ne!(p1, p2);
765    }
766
767    #[test]
768    fn partition_of_unioned_nodes_match() {
769        let mut r = SubgraphRegistry::new();
770        r.ensure_registered(n(1));
771        r.ensure_registered(n(2));
772        r.union_nodes(n(1), n(2));
773        let p1 = r.partition_of(n(1)).expect("registered");
774        let p2 = r.partition_of(n(2)).expect("registered");
775        assert_eq!(p1, p2);
776    }
777}