Skip to main content

graphrefly_core/
subgraph.rs

1//! Per-subgraph union-find registry (Slice X5, 2026-05-08).
2//!
3//! Direct port of [`graphrefly-py`'s
4//! `subgraph_locks.py`](https://github.com/graphrefly/graphrefly-py/blob/main/src/graphrefly/core/subgraph_locks.py)
5//! (locked design via [`SESSION-rust-port-d3-per-subgraph-parallelism.md`](https://github.com/graphrefly/graphrefly-ts/blob/main/archive/docs/SESSION-rust-port-d3-per-subgraph-parallelism.md)
6//! — Q1 = (c-uf split-eager); see decision log D086).
7//!
8//! # Concept
9//!
10//! Each registered node is a member of exactly one connected
11//! component (a "subgraph") at any moment. Two nodes are in the same
12//! subgraph iff they're connected via dep edges (transitively).
13//!
14//! Connected-component membership is tracked via union-find with
15//! union-by-rank + path compression. Each component's root carries
16//! an [`Arc<SubgraphLockBox>`] that owns the partition's
17//! `wave_owner` re-entrant mutex — nodes in the same component share
18//! one lock, disjoint components run truly parallel.
19//!
20//! # Lifecycle hooks
21//!
22//! - [`SubgraphRegistry::ensure_registered`] — called when a node is
23//!   first registered via [`crate::Core::register`]. Allocates a
24//!   fresh singleton component for the new node.
25//! - [`SubgraphRegistry::union_nodes`] — called for each new dep
26//!   edge (in `register` and `set_deps`'s add-edge path). Merges
27//!   the two endpoints' components.
28//! - [`SubgraphRegistry::cleanup_node`] — wired but **not yet called
29//!   in X5**. In Y1 the wave engine will invoke it from sites that
30//!   actually remove a `NodeRecord` from `CoreState.nodes` (today
31//!   `terminate_node` only marks the node terminal — `NodeRecord`s
32//!   persist for the life of `CoreState`). The registry's HashMap
33//!   entries drop together with `CoreState` when the last `Core`
34//!   clone goes, so X5 doesn't leak partitions in practice; the
35//!   per-node cleanup hook becomes load-bearing once Y1 lands a
36//!   `Drop`-via-removal lifecycle for `NodeRecord` (post-Y1 work).
37//! - [`SubgraphRegistry::on_edge_removed`] — called for each removed
38//!   dep edge in `set_deps`'s remove path. Slice X5 commit-1 just
39//!   notes the removal; **Y1 (split-eager)** adds the reachability
40//!   walk that splits disconnected components.
41//!
42//! # Lock-acquisition discipline (Y1)
43//!
44//! [`SubgraphRegistry::lock_for`] returns the component's
45//! [`Arc<SubgraphLockBox>`] — caller acquires `box.wave_owner` and
46//! re-validates that the resolved root hasn't been redirected by a
47//! concurrent `union` (mirrors py's `MAX_LOCK_RETRIES` retry loop).
48//!
49//! # Slice X5 scope
50//!
51//! Substrate types + registry tracking are wired to `Core::register`
52//! and `Core::set_deps`'s edge add path so the union-find state is
53//! maintained as nodes register and topology changes. The wave engine
54//! itself still uses the legacy Core-level `wave_owner` — Y1 (the
55//! wave-engine migration to per-partition `wave_owner`) is carried
56//! forward as an explicit follow-on slice given its scope (every
57//! `begin_batch` / `run_wave` call site + `Core::subscribe` lock
58//! acquisition + `BatchGuard` lock retention + retry-validate
59//! semantics for held-Arc-vs-current-root divergence on union).
60//!
61//! Several substrate methods (`cleanup_node`, `lock_for`,
62//! `lock_for_validate`) and the `SubgraphLockBox::wave_owner` field
63//! are wired but unused in X5; they're annotated per-item with
64//! `#[allow(dead_code)]` until Y1 activates the wave engine through
65//! them. (Per-item rather than module-level shotgun so any genuinely
66//! unused new item still flags as dead code during X5 follow-up
67//! review.)
68
69use std::collections::{HashMap, HashSet};
70use std::sync::Arc;
71
72use parking_lot::ReentrantMutex;
73
74use crate::handle::NodeId;
75
76/// Newtype identifier for a connected-component partition.
77///
78/// Internally a `u64` (the union-find root's `NodeId.raw()`). Distinct
79/// from [`NodeId`] at the type system level — partitions and nodes are
80/// not interchangeable.
81#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord)]
82pub struct SubgraphId(pub(crate) u64);
83
84impl SubgraphId {
85    /// Construct from a [`NodeId`] root. Internal use — partition
86    /// identity is the union-find root's NodeId.
87    #[must_use]
88    pub(crate) fn from_node(node: NodeId) -> Self {
89        Self(node.raw())
90    }
91
92    /// Raw u64 view. Used for total-ordering across multi-partition
93    /// wave acquisitions per Q4=(a) (deadlock-free via ascending
94    /// SubgraphId order).
95    #[must_use]
96    pub fn raw(self) -> u64 {
97        self.0
98    }
99}
100
101impl std::fmt::Display for SubgraphId {
102    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
103        write!(f, "subgraph#{}", self.0)
104    }
105}
106
107// Q3 (2026-05-09) introduced `SubgraphState { tier3_emitted_this_wave }`
108// + a `state: parking_lot::Mutex<SubgraphState>` field on
109// `SubgraphLockBox`. The D1 patch (2026-05-09) reverted this placement
110// after QA surfaced a mid-wave cross-thread `set_deps` partition-split
111// hazard: thread B's split could move a node X from the wave's
112// partition to a fresh orphan-side partition with empty
113// `tier3_emitted_this_wave`, and thread A's subsequent emit at X would
114// then mis-detect "first emit" and violate R1.3.3.a coalescing. Slice G
115// tier3 tracking now lives in a per-thread thread-local in
116// `crate::batch::TIER3_EMITTED_THIS_WAVE` — robust to cross-thread
117// splits because thread B's split doesn't touch thread A's
118// thread-local at all. The `SubgraphState` struct is GONE for now;
119// Q-beyond will reintroduce a per-partition state placement when the
120// CoreState shard layout actually needs it (with a different field
121// shape that's robust to mid-wave splits).
122
123/// Per-component lock box. Holds the partition's `wave_owner`
124/// re-entrant mutex. On `union`, the box of the smaller-rank root is
125/// replaced by the larger-rank root's box — the lock identity is
126/// preserved across merges via the [`Arc`] reference (mirrors py's
127/// `_LockBox.lock` redirect on union).
128///
129/// **Slice Y1 (D3 / Phase E, 2026-05-08):** the wave engine acquires
130/// this per-partition lock via [`crate::Core::partition_wave_owner_lock_arc`]
131/// (with retry-validate against concurrent union/split). Closure-form
132/// `Core::batch` acquires every partition's lock in ascending
133/// [`SubgraphId`] order; per-seed entry points (`emit`, `subscribe`,
134/// etc.) acquire only the seed's transitively-touched partitions.
135pub struct SubgraphLockBox {
136    /// Per-partition wave ownership. Cross-thread emits to the same
137    /// partition serialize here; same-thread re-entry passes through.
138    /// Wrapped in `Arc` at the registry level so two roots' boxes
139    /// can share the same mutex identity after `union`.
140    pub(crate) wave_owner: Arc<ReentrantMutex<()>>,
141}
142
143impl SubgraphLockBox {
144    fn new() -> Arc<Self> {
145        Arc::new(Self {
146            wave_owner: Arc::new(ReentrantMutex::new(())),
147        })
148    }
149}
150
151/// Default cap for [`SubgraphRegistry::lock_for`]'s root-validation
152/// retry loop. Mirrors graphrefly-py's `_MAX_LOCK_RETRIES` constant
153/// (`subgraph_locks.py` line 39). Reached only under continuous
154/// `union` activity racing with `lock_for` — pathological in
155/// practice but bounded for safety.
156///
157/// Consumed by [`crate::Core::partition_wave_owner_lock_arc`] under
158/// the retry-validate loop (Slice Y1 / Phase E, 2026-05-08).
159pub(crate) const MAX_LOCK_RETRIES: u32 = 100;
160
161/// Union-find registry tracking each node's connected-component
162/// membership. Wrapped by [`crate::Core`] in `Arc<Mutex<...>>` so
163/// the wave engine can resolve a node's partition before acquiring
164/// that partition's `wave_owner`.
165pub struct SubgraphRegistry {
166    /// `node_id → parent_id` union-find array. A root has
167    /// `parent[root] == root`.
168    parent: HashMap<NodeId, NodeId>,
169    /// `node_id → rank` for union-by-rank. Only meaningful for roots.
170    rank: HashMap<NodeId, u32>,
171    /// Reverse map for cleanup-time re-rooting: `node_id → set of
172    /// direct children whose `parent` field points at `node_id``.
173    /// On root cleanup, we promote one child to root and re-attach
174    /// the others.
175    children: HashMap<NodeId, HashSet<NodeId>>,
176    /// `root_node_id → Arc<SubgraphLockBox>`. Only roots have entries.
177    /// On `union`, the loser's entry is removed and any future
178    /// `lock_for` calls on its members find the winner's box via
179    /// `find`.
180    boxes: HashMap<NodeId, Arc<SubgraphLockBox>>,
181    /// Monotonic counter bumped on every operation that changes the
182    /// SET of partitions or any partition's box-identity:
183    /// [`Self::ensure_registered`] (creates a new partition),
184    /// [`Self::union_nodes`] (merges two partitions — actual merge
185    /// only, idempotent calls don't bump), and
186    /// [`Self::split_partition`] (splits a partition into two with a
187    /// fresh box for the orphan side).
188    ///
189    /// Consumed by [`crate::Core::begin_batch`] and
190    /// [`crate::Core::begin_batch_for`] for retry-validate at the
191    /// all-partitions / touched-partitions level (QA-fix #2,
192    /// 2026-05-09): if the epoch changes between snapshot and
193    /// post-acquire re-read, a concurrent registry mutation may
194    /// have added/removed/redirected a partition the batch should
195    /// have held. The batch drops its acquired guards and retries.
196    ///
197    /// Counter is `u64`; wrap-around is unreachable in practice
198    /// (bumping 1B times per second still takes 584 years).
199    epoch: u64,
200}
201
202impl SubgraphRegistry {
203    /// Construct an empty registry. `pub(crate)` because all useful
204    /// operations on the registry are crate-internal — only `Core::new`
205    /// needs to construct one. External callers receive
206    /// [`SubgraphId`]s via [`Core::partition_of`] but cannot construct
207    /// a registry from outside the crate.
208    #[must_use]
209    pub(crate) fn new() -> Self {
210        Self {
211            parent: HashMap::new(),
212            rank: HashMap::new(),
213            children: HashMap::new(),
214            boxes: HashMap::new(),
215            epoch: 0,
216        }
217    }
218
219    /// Read the current registry epoch. Used by
220    /// [`crate::Core::begin_batch`] / [`crate::Core::begin_batch_for`]
221    /// for the snapshot-and-acquire retry-validate loop (QA-fix #2,
222    /// 2026-05-09).
223    #[must_use]
224    pub(crate) fn epoch(&self) -> u64 {
225        self.epoch
226    }
227
228    /// Register `node` as the root of a fresh singleton component.
229    /// Idempotent — calling twice on the same node is a no-op.
230    ///
231    /// Called from [`crate::Core::register`] right after the new
232    /// `NodeId` is allocated, BEFORE any `union_nodes` calls for the
233    /// node's deps.
234    pub(crate) fn ensure_registered(&mut self, node: NodeId) {
235        if self.parent.contains_key(&node) {
236            return;
237        }
238        self.parent.insert(node, node);
239        self.rank.insert(node, 0);
240        self.children.insert(node, HashSet::new());
241        self.boxes.insert(node, SubgraphLockBox::new());
242        // QA-fix #2: new partition exists. Closure-form `Core::begin_batch`
243        // that snapshotted before now must retry to acquire it.
244        self.epoch = self.epoch.wrapping_add(1);
245    }
246
247    /// Find the root of `node`'s component, with path compression.
248    /// Panics if `node` is not registered.
249    ///
250    /// **Iterative two-pass implementation** (Slice X5 /qa):
251    /// pass 1 walks up the parent chain to the root; pass 2 re-links
252    /// every node on the path directly to the root + maintains the
253    /// `children` reverse-map. Iterative form mirrors the existing
254    /// `terminate_node` cascade discipline (and graphrefly-py's
255    /// `_find_locked` while-loop) — keeps stack depth O(1) regardless
256    /// of pre-compression chain length, avoiding stack overflow on
257    /// pathological un-compressed trees that can briefly form before
258    /// path compression settles (especially relevant once Y1 puts
259    /// `find` on the hot path under `lock_for`).
260    #[must_use]
261    pub(crate) fn find(&mut self, node: NodeId) -> NodeId {
262        // Pass 1: walk up to root (no mutation).
263        let mut cur = node;
264        loop {
265            let parent = *self
266                .parent
267                .get(&cur)
268                .expect("subgraph_registry::find: node not registered");
269            if parent == cur {
270                break; // cur is root
271            }
272            cur = parent;
273        }
274        let root = cur;
275
276        // Pass 2: path compression — re-link every node on the original
277        // walk directly to root, maintaining the `children` reverse-map.
278        // Skip when already pointing at root (no-op for root itself, and
279        // for already-compressed nodes).
280        let mut walker = node;
281        while walker != root {
282            let parent = *self
283                .parent
284                .get(&walker)
285                .expect("walker on path-to-root must be registered");
286            if parent != root {
287                self.parent.insert(walker, root);
288                if let Some(old_kids) = self.children.get_mut(&parent) {
289                    old_kids.remove(&walker);
290                }
291                self.children.entry(root).or_default().insert(walker);
292            }
293            walker = parent;
294        }
295        root
296    }
297
298    /// Merge the components containing `a` and `b`. Both nodes must
299    /// already be registered. After this call, `find(a) == find(b)`.
300    ///
301    /// Union-by-rank: the smaller-rank root becomes a child of the
302    /// larger. Equal-rank breaks the tie by promoting `a`'s root and
303    /// bumping its rank.
304    ///
305    /// On merge, the loser's [`SubgraphLockBox`] entry is removed
306    /// from `boxes`. Future `lock_for` calls on the loser's members
307    /// resolve to the winner's root → winner's box.
308    pub(crate) fn union_nodes(&mut self, a: NodeId, b: NodeId) {
309        debug_assert!(
310            self.parent.contains_key(&a) && self.parent.contains_key(&b),
311            "union_nodes: both nodes must be registered first"
312        );
313        // Defense-in-depth against bypassed cycle detection: a self-edge
314        // `set_deps(n, &[n])` would reach here as `union_nodes(n, n)`.
315        // Cycle rejection in `Core::set_deps` should catch this BEFORE
316        // we get called, but the registry has no other defense.
317        debug_assert!(
318            a != b,
319            "union_nodes called with self-edge — \
320             Core's cycle detection bypassed?"
321        );
322        let mut root_a = self.find(a);
323        let mut root_b = self.find(b);
324        if root_a == root_b {
325            return;
326        }
327        let rank_a = *self.rank.get(&root_a).unwrap_or(&0);
328        let rank_b = *self.rank.get(&root_b).unwrap_or(&0);
329        if rank_a < rank_b {
330            std::mem::swap(&mut root_a, &mut root_b);
331        }
332        // root_a is the winner (kept). root_b becomes a child.
333        self.parent.insert(root_b, root_a);
334        self.children.entry(root_a).or_default().insert(root_b);
335        if rank_a == rank_b {
336            self.rank.insert(root_a, rank_a + 1);
337        }
338        // Drop the loser's box. Any in-flight readers holding an Arc
339        // clone keep it alive; the registry no longer references it.
340        self.boxes.remove(&root_b);
341        // QA-fix #2: real merge happened (early-return above handled the
342        // idempotent same-root case). The set of partitions changed;
343        // bump epoch so closure-form / per-seed retry-validate detects.
344        self.epoch = self.epoch.wrapping_add(1);
345    }
346
347    /// Remove `node` from the registry. If `node` was a root, promote
348    /// one of its direct children as the new root and re-link the
349    /// others. Mirrors py's `_on_gc` (`subgraph_locks.py` lines 53–92).
350    ///
351    /// **Slice X5 status:** wired but NOT called from Core. Today
352    /// `Core::terminate_node` marks a node terminal but does NOT remove
353    /// it from `CoreState.nodes` (NodeRecords persist for the life of
354    /// `CoreState`); the registry's HashMap entries drop together with
355    /// `CoreState` when the last `Core` clone goes, so the partition
356    /// state is reclaimed without an explicit per-node cleanup. Y1
357    /// will revisit this — when the wave engine activates per-partition
358    /// `wave_owner`, terminated nodes' partition entries should be
359    /// purged so a stale partition doesn't keep its `Arc<SubgraphLockBox>`
360    /// alive in the registry. Idempotent on unregistered nodes.
361    #[allow(dead_code)]
362    pub(crate) fn cleanup_node(&mut self, node: NodeId) {
363        let Some(parent) = self.parent.get(&node).copied() else {
364            return; // already cleaned up — idempotent.
365        };
366
367        let direct_children: Vec<NodeId> = self
368            .children
369            .get(&node)
370            .map(|s| s.iter().copied().collect())
371            .unwrap_or_default();
372
373        if parent == node {
374            // `node` was the root.
375            if let Some(&new_root) = direct_children.first() {
376                self.parent.insert(new_root, new_root);
377                // Detach `node` from each grandchild's children-set first
378                // (before borrowing children mut for the new_root entry).
379                for child in &direct_children {
380                    if let Some(kids) = self.children.get_mut(child) {
381                        kids.remove(&node);
382                    }
383                }
384                // Re-attach all children except `new_root` to `new_root`.
385                let new_root_kids = self.children.entry(new_root).or_default();
386                for child in direct_children.iter().skip(1).copied() {
387                    self.parent.insert(child, new_root);
388                    new_root_kids.insert(child);
389                }
390                // Box ownership transfers to the new root — same Arc, so
391                // any in-flight `lock_for` holders keep their guard alive.
392                if let Some(box_arc) = self.boxes.remove(&node) {
393                    self.boxes.insert(new_root, box_arc);
394                }
395                let old_rank = self.rank.get(&node).copied().unwrap_or(0);
396                let new_rank = self.rank.entry(new_root).or_insert(0);
397                if old_rank > *new_rank {
398                    *new_rank = old_rank;
399                }
400            } else {
401                // Singleton root — just remove the box.
402                self.boxes.remove(&node);
403            }
404        } else {
405            // `node` was a non-root. Detach from its parent's children;
406            // re-attach its own children to its parent.
407            if let Some(parent_kids) = self.children.get_mut(&parent) {
408                parent_kids.remove(&node);
409                for child in &direct_children {
410                    parent_kids.insert(*child);
411                }
412            }
413            for child in &direct_children {
414                self.parent.insert(*child, parent);
415            }
416        }
417
418        self.children.remove(&node);
419        self.parent.remove(&node);
420        self.rank.remove(&node);
421        // boxes already removed above on root path; non-root path never
422        // had a box entry.
423    }
424
425    /// Hook for an edge removal. Slice X5 commit-1: was a no-op
426    /// (monotonic-merge stepping stone). Slice Y1 / Phase F (D3
427    /// split-eager, 2026-05-09): the actual reachability walk +
428    /// split decision lives in `Core::set_deps` (it has access to
429    /// `s.children` and `s.nodes`'s `dep_records` for the undirected
430    /// dep-edge graph). When `Core` detects disconnection it calls
431    /// [`Self::split_partition`] directly. This `on_edge_removed`
432    /// hook remains as a no-op marker — kept for API symmetry with
433    /// [`Self::union_nodes`] and for future use if the registry
434    /// gains its own edge-graph view.
435    pub(crate) fn on_edge_removed(&mut self, _from: NodeId, _to: NodeId) {
436        // No-op. See `Core::set_deps` Phase F split-eager block which
437        // calls `split_partition` directly when an edge removal causes
438        // disconnection in the undirected dep-edge graph.
439    }
440
441    /// Split an existing component into two, given the keep-side and
442    /// orphan-side membership. Slice Y1 / Phase F (D3 split-eager,
443    /// 2026-05-09).
444    ///
445    /// **Invariants assumed by caller:**
446    /// 1. `component_nodes` lists every node currently in some single
447    ///    component C of this registry.
448    /// 2. `keep_side` ⊆ `component_nodes` is the post-removal
449    ///    connected subset that should retain C's existing
450    ///    [`SubgraphLockBox`] (Arc identity preserved). `keep_side`
451    ///    must be non-empty.
452    /// 3. The orphan side (`component_nodes - keep_side`) must be
453    ///    non-empty. (If it were empty, no split is needed; caller
454    ///    should not invoke this method.)
455    /// 4. `edges_in_component` lists the dep edges (as `(parent,
456    ///    child)` data-flow pairs) currently present in
457    ///    `Core::s.children` / `s.nodes[*].dep_records`,
458    ///    POST-removal of the triggering edge. The edges connect
459    ///    nodes within `component_nodes`. The caller is responsible
460    ///    for filtering to in-component edges.
461    ///
462    /// **Algorithm (mirrors graphrefly-py [`subgraph_locks.py`](https://github.com/graphrefly/graphrefly-py/blob/main/src/graphrefly/core/subgraph_locks.py)
463    /// `_on_split` plus a Rust-idiomatic Arc redirect):**
464    /// 1. Capture the original [`Arc<SubgraphLockBox>`] for the
465    ///    component's pre-split root.
466    /// 2. Reset every component node to a singleton (parent[n] = n,
467    ///    rank = 0, children = ∅).
468    /// 3. Re-union via `edges_in_component` — each edge calls
469    ///    [`Self::union_nodes`], which restores connectivity within
470    ///    each side independently (since `edges_in_component`
471    ///    contains only the post-removal edges, the two sides stay
472    ///    disconnected from each other in the union-find tree).
473    /// 4. Resolve the new keep-side root and orphan-side root.
474    /// 5. Assign the original lock box to the keep-side root (so
475    ///    in-flight waves holding the original `Arc` succeed
476    ///    `lock_for_validate` against the keep-side); allocate a
477    ///    fresh box for the orphan-side root (in-flight waves on
478    ///    the orphan side fail validate, retry with the new box).
479    pub(crate) fn split_partition(
480        &mut self,
481        component_nodes: &[NodeId],
482        keep_side_nodes: &[NodeId],
483        edges_in_component: &[(NodeId, NodeId)],
484    ) {
485        debug_assert!(
486            !component_nodes.is_empty(),
487            "component_nodes must be non-empty"
488        );
489        debug_assert!(
490            !keep_side_nodes.is_empty(),
491            "keep_side_nodes must be non-empty"
492        );
493        // Build a contains-lookup set for the keep side. The caller
494        // passes a slice (avoids cross-crate-internal HashSet flavor
495        // mismatch — `std::collections::HashSet` vs `ahash::AHashSet`).
496        let keep_side: HashSet<NodeId> = keep_side_nodes.iter().copied().collect();
497        debug_assert!(
498            component_nodes.iter().any(|n| !keep_side.contains(n)),
499            "orphan side must be non-empty (no-op caller)"
500        );
501
502        // Step 1: capture original box.
503        let original_root = self.find(component_nodes[0]);
504        let original_box = self
505            .boxes
506            .remove(&original_root)
507            .expect("original_root must have a registered box");
508
509        // Step 2: reset every component node to a singleton.
510        for &n in component_nodes {
511            self.parent.insert(n, n);
512            self.rank.insert(n, 0);
513            self.children.insert(n, HashSet::new());
514        }
515
516        // Step 3: re-union via post-removal edges. Both sides become
517        // internally connected; the two sides remain disconnected from
518        // each other (since the triggering edge was removed and the
519        // caller's BFS verified disconnection).
520        for &(a, b) in edges_in_component {
521            // `union_nodes` is idempotent on already-merged components,
522            // so multiple edges into the same connected subset are fine.
523            // Self-edges shouldn't appear here (Core's cycle detection
524            // rejects them at edge-mutation time) but guard defensively.
525            if a != b {
526                self.union_nodes(a, b);
527            }
528        }
529
530        // Step 4: resolve roots. Pick any keep-side / orphan-side
531        // representative and find its post-re-union root.
532        let keep_repr = keep_side_nodes[0];
533        let keep_root = self.find(keep_repr);
534        let orphan_repr = *component_nodes
535            .iter()
536            .find(|n| !keep_side.contains(n))
537            .expect("non-empty orphan side");
538        let orphan_root = self.find(orphan_repr);
539        debug_assert!(
540            keep_root != orphan_root,
541            "split_partition: keep_root {keep_root:?} and orphan_root {orphan_root:?} \
542             must be distinct after re-union — caller's BFS must have asserted \
543             disconnection"
544        );
545
546        // Step 5: assign boxes. Original box → keep-side root (so
547        // in-flight waves' `lock_for_validate` against the held Arc
548        // succeed for keep-side nodes). Fresh box → orphan-side root.
549        self.boxes.insert(keep_root, original_box);
550        self.boxes.insert(orphan_root, SubgraphLockBox::new());
551
552        // QA-fix #2: split changed the partition set + introduced a
553        // fresh box. Bump epoch so any closure-form / per-seed batch
554        // that snapshotted before the split detects + retries.
555        self.epoch = self.epoch.wrapping_add(1);
556    }
557
558    /// Resolve `node`'s partition lock box. Caller acquires the
559    /// box's `wave_owner`, then SHOULD re-validate via
560    /// [`Self::lock_for_validate`] that the resolved root hasn't
561    /// shifted under a concurrent `union` (lock-validation retry
562    /// loop, mirroring py `lock_for` lines 154–178).
563    ///
564    /// Returns `None` if `node` is not registered (defensive — should
565    /// not happen in correct call paths).
566    ///
567    /// Consumed by [`crate::Core::partition_wave_owner_lock_arc`]
568    /// under the retry-validate loop (Slice Y1 / Phase E, 2026-05-08).
569    #[must_use]
570    pub(crate) fn lock_for(&mut self, node: NodeId) -> Option<(SubgraphId, Arc<SubgraphLockBox>)> {
571        if !self.parent.contains_key(&node) {
572            return None;
573        }
574        let root = self.find(node);
575        let box_arc = self.boxes.get(&root).cloned()?;
576        Some((SubgraphId::from_node(root), box_arc))
577    }
578
579    /// Re-validate that `node`'s root resolves to `expected_box`. Used
580    /// in the retry loop after acquiring the box's `wave_owner`: if a
581    /// concurrent `union` redirected the root mid-acquire, the held
582    /// lock is for the wrong partition and the caller must release +
583    /// retry.
584    ///
585    /// Consumed by [`crate::Core::partition_wave_owner_lock_arc`]
586    /// under the retry-validate loop (Slice Y1 / Phase E, 2026-05-08).
587    #[must_use]
588    pub(crate) fn lock_for_validate(
589        &mut self,
590        node: NodeId,
591        expected_box: &Arc<SubgraphLockBox>,
592    ) -> bool {
593        let Some(root) = self.parent.get(&node).copied() else {
594            return false;
595        };
596        let actual_root = self.find(root);
597        match self.boxes.get(&actual_root) {
598            Some(actual) => Arc::ptr_eq(actual, expected_box),
599            None => false,
600        }
601    }
602
603    /// Snapshot of every currently-existing partition, sorted in
604    /// ascending [`SubgraphId`] order. Returns `(partition_id, lock_box)`
605    /// pairs — the caller acquires `box.wave_owner.lock_arc()` on each
606    /// in ascending order to enter a closure-form batch (which doesn't
607    /// have a known seed and must serialize against ALL partitions per
608    /// session-doc Q7 + decision D092).
609    ///
610    /// Consumed by `Core::all_partitions_lock_boxes` (Slice Y1 /
611    /// Phase E, 2026-05-08).
612    #[must_use]
613    pub(crate) fn all_partitions(&self) -> Vec<(SubgraphId, Arc<SubgraphLockBox>)> {
614        let mut out: Vec<(SubgraphId, Arc<SubgraphLockBox>)> = self
615            .boxes
616            .iter()
617            .map(|(root, box_arc)| (SubgraphId::from_node(*root), Arc::clone(box_arc)))
618            .collect();
619        out.sort_unstable_by_key(|(sid, _)| *sid);
620        out
621    }
622
623    /// Number of registered nodes. Useful for debugging + acceptance
624    /// tests that verify the registry stays in sync with `Core::nodes`.
625    #[must_use]
626    pub fn node_count(&self) -> usize {
627        self.parent.len()
628    }
629
630    /// Snapshot of every currently-registered node. Used by
631    /// `Core::set_deps`'s split-eager block (Slice Y1 / Phase F,
632    /// 2026-05-09) to enumerate candidate nodes for a component-
633    /// membership filter; iterating + calling `find` would alias-
634    /// borrow `&mut self`, so we snapshot first.
635    #[must_use]
636    pub(crate) fn registered_nodes(&self) -> Vec<NodeId> {
637        self.parent.keys().copied().collect()
638    }
639
640    /// Number of distinct connected components. Two threads emitting
641    /// into nodes with distinct partitions can run truly parallel
642    /// (Y1+); X5 substrate does not yet exercise that property.
643    #[must_use]
644    pub fn component_count(&self) -> usize {
645        self.boxes.len()
646    }
647
648    /// Resolve `node`'s partition. Returns `None` for unregistered
649    /// nodes. Mutating because path compression may relink under
650    /// `find`.
651    #[must_use]
652    pub fn partition_of(&mut self, node: NodeId) -> Option<SubgraphId> {
653        if !self.parent.contains_key(&node) {
654            return None;
655        }
656        Some(SubgraphId::from_node(self.find(node)))
657    }
658}
659
660// No `Default` impl — the registry is crate-internal infrastructure;
661// `Core::new` is the only construction site. Adding a public `Default`
662// would let external callers build a registry that can do nothing
663// useful (every operation method is `pub(crate)`).
664
665#[cfg(test)]
666mod tests {
667    use super::*;
668
669    fn n(raw: u64) -> NodeId {
670        NodeId::new(raw)
671    }
672
673    #[test]
674    fn singleton_register_creates_one_partition() {
675        let mut r = SubgraphRegistry::new();
676        r.ensure_registered(n(1));
677        assert_eq!(r.node_count(), 1);
678        assert_eq!(r.component_count(), 1);
679        assert_eq!(r.find(n(1)), n(1));
680    }
681
682    #[test]
683    fn union_merges_two_singletons() {
684        let mut r = SubgraphRegistry::new();
685        r.ensure_registered(n(1));
686        r.ensure_registered(n(2));
687        assert_eq!(r.component_count(), 2);
688        r.union_nodes(n(1), n(2));
689        assert_eq!(r.component_count(), 1);
690        assert_eq!(r.find(n(1)), r.find(n(2)));
691    }
692
693    #[test]
694    fn union_idempotent_within_same_component() {
695        let mut r = SubgraphRegistry::new();
696        r.ensure_registered(n(1));
697        r.ensure_registered(n(2));
698        r.union_nodes(n(1), n(2));
699        let comp_before = r.component_count();
700        r.union_nodes(n(1), n(2));
701        assert_eq!(r.component_count(), comp_before);
702    }
703
704    #[test]
705    fn cleanup_singleton_removes_partition() {
706        let mut r = SubgraphRegistry::new();
707        r.ensure_registered(n(1));
708        r.cleanup_node(n(1));
709        assert_eq!(r.node_count(), 0);
710        assert_eq!(r.component_count(), 0);
711    }
712
713    #[test]
714    fn cleanup_root_promotes_child() {
715        let mut r = SubgraphRegistry::new();
716        r.ensure_registered(n(1));
717        r.ensure_registered(n(2));
718        r.union_nodes(n(1), n(2));
719        // After union: one of {1, 2} is root.
720        let root_before = r.find(n(1));
721        let child = if root_before == n(1) { n(2) } else { n(1) };
722        r.cleanup_node(root_before);
723        // The remaining node should be its own root.
724        assert_eq!(r.find(child), child);
725        assert_eq!(r.component_count(), 1);
726    }
727
728    #[test]
729    fn cleanup_non_root_re_links_grandchildren_to_parent() {
730        let mut r = SubgraphRegistry::new();
731        for i in 1..=3 {
732            r.ensure_registered(n(i));
733        }
734        r.union_nodes(n(1), n(2));
735        r.union_nodes(n(2), n(3));
736        let root_before = r.find(n(1));
737        // Pick a non-root node to clean up.
738        let non_root = if root_before == n(1) {
739            n(2)
740        } else if root_before == n(2) {
741            n(1)
742        } else {
743            n(2)
744        };
745        r.cleanup_node(non_root);
746        // Remaining nodes should still find a single root.
747        let other = (1..=3u64)
748            .map(n)
749            .find(|x| *x != root_before && *x != non_root)
750            .expect("third node");
751        assert_eq!(r.find(root_before), r.find(other));
752    }
753
754    #[test]
755    fn lock_for_returns_same_box_for_same_component() {
756        let mut r = SubgraphRegistry::new();
757        r.ensure_registered(n(1));
758        r.ensure_registered(n(2));
759        r.union_nodes(n(1), n(2));
760        let (_sid_a, box_a) = r.lock_for(n(1)).expect("registered");
761        let (_sid_b, box_b) = r.lock_for(n(2)).expect("registered");
762        assert!(Arc::ptr_eq(&box_a, &box_b));
763    }
764
765    #[test]
766    fn lock_for_validate_detects_redirect_after_union() {
767        // Slice X5 /qa P6: deterministic redirect via forced rank
768        // skew. Pre-fix this test asserted conditionally based on
769        // which node union promoted, masking a hypothetical regression
770        // where `lock_for_validate` always returned `true`. Now the
771        // setup forces n(1)'s root to be displaced — the assertion
772        // is unconditionally `false`.
773        let mut r = SubgraphRegistry::new();
774        for i in 1..=4 {
775            r.ensure_registered(n(i));
776        }
777        // Build a higher-rank tree under n(2)'s root: union n(2)+n(3)
778        // and n(2)+n(4), which under union-by-rank with equal initial
779        // ranks promotes n(2) and bumps its rank by the second union.
780        // After this, find(n(2)) == n(2) and rank[n(2)] >= 1.
781        r.union_nodes(n(2), n(3));
782        r.union_nodes(n(2), n(4));
783        let n2_root = r.find(n(2));
784        // n(1) stays its own singleton (rank 0). Resolve its box BEFORE
785        // the cross-tree union.
786        let (_sid_before, box_1_alone) = r.lock_for(n(1)).expect("registered");
787        let n1_root_before = r.find(n(1));
788        assert_eq!(n1_root_before, n(1), "n(1) is still its own root");
789
790        // Cross-tree union: union-by-rank promotes the higher-rank tree
791        // (n(2)'s) — n(1)'s root MUST become n(2)'s root.
792        r.union_nodes(n(1), n(2));
793        let n1_root_after = r.find(n(1));
794        assert_eq!(
795            n1_root_after, n2_root,
796            "union-by-rank promoted n(2)'s tree; n(1)'s root displaced"
797        );
798
799        // The previously-resolved box is now stale: lock_for_validate
800        // must detect the redirect unconditionally.
801        let still_valid = r.lock_for_validate(n(1), &box_1_alone);
802        assert!(
803            !still_valid,
804            "lock_for_validate must detect the box-redirect after union promotes a different root"
805        );
806        // And lock_for now returns a different box.
807        let (_sid_after, box_after) = r.lock_for(n(1)).expect("registered");
808        assert!(
809            !Arc::ptr_eq(&box_1_alone, &box_after),
810            "stale box and active box must be distinct Arc identities"
811        );
812    }
813
814    #[test]
815    fn partition_of_distinct_singletons_differ() {
816        let mut r = SubgraphRegistry::new();
817        r.ensure_registered(n(1));
818        r.ensure_registered(n(2));
819        let p1 = r.partition_of(n(1)).expect("registered");
820        let p2 = r.partition_of(n(2)).expect("registered");
821        assert_ne!(p1, p2);
822    }
823
824    #[test]
825    fn partition_of_unioned_nodes_match() {
826        let mut r = SubgraphRegistry::new();
827        r.ensure_registered(n(1));
828        r.ensure_registered(n(2));
829        r.union_nodes(n(1), n(2));
830        let p1 = r.partition_of(n(1)).expect("registered");
831        let p2 = r.partition_of(n(2)).expect("registered");
832        assert_eq!(p1, p2);
833    }
834}