graphrefly_core/subgraph.rs
1//! Per-subgraph union-find registry (Slice X5, 2026-05-08).
2//!
3//! Direct port of [`graphrefly-py`'s
4//! `subgraph_locks.py`](https://github.com/graphrefly/graphrefly-py/blob/main/src/graphrefly/core/subgraph_locks.py)
5//! (locked design via [`SESSION-rust-port-d3-per-subgraph-parallelism.md`](https://github.com/graphrefly/graphrefly-ts/blob/main/archive/docs/SESSION-rust-port-d3-per-subgraph-parallelism.md)
6//! — Q1 = (c-uf split-eager); see decision log D086).
7//!
8//! # Concept
9//!
10//! Each registered node is a member of exactly one connected
11//! component (a "subgraph") at any moment. Two nodes are in the same
12//! subgraph iff they're connected via dep edges (transitively).
13//!
14//! Connected-component membership is tracked via union-find with
15//! union-by-rank + path compression. Each component's root carries
16//! an [`Arc<SubgraphLockBox>`] that owns the partition's
17//! `wave_owner` re-entrant mutex — nodes in the same component share
18//! one lock, disjoint components run truly parallel.
19//!
20//! # Lifecycle hooks
21//!
22//! - [`SubgraphRegistry::ensure_registered`] — called when a node is
23//! first registered via [`crate::Core::register`]. Allocates a
24//! fresh singleton component for the new node.
25//! - [`SubgraphRegistry::union_nodes`] — called for each new dep
26//! edge (in `register` and `set_deps`'s add-edge path). Merges
27//! the two endpoints' components.
28//! - [`SubgraphRegistry::cleanup_node`] — wired but **not yet called
29//! in X5**. In Y1 the wave engine will invoke it from sites that
30//! actually remove a `NodeRecord` from `CoreState.nodes` (today
31//! `terminate_node` only marks the node terminal — `NodeRecord`s
32//! persist for the life of `CoreState`). The registry's HashMap
33//! entries drop together with `CoreState` when the last `Core`
34//! clone goes, so X5 doesn't leak partitions in practice; the
35//! per-node cleanup hook becomes load-bearing once Y1 lands a
36//! `Drop`-via-removal lifecycle for `NodeRecord` (post-Y1 work).
37//! - [`SubgraphRegistry::on_edge_removed`] — called for each removed
38//! dep edge in `set_deps`'s remove path. Slice X5 commit-1 just
39//! notes the removal; **Y1 (split-eager)** adds the reachability
40//! walk that splits disconnected components.
41//!
42//! # Lock-acquisition discipline (Y1)
43//!
44//! [`SubgraphRegistry::lock_for`] returns the component's
45//! [`Arc<SubgraphLockBox>`] — caller acquires `box.wave_owner` and
46//! re-validates that the resolved root hasn't been redirected by a
47//! concurrent `union` (mirrors py's `MAX_LOCK_RETRIES` retry loop).
48//!
49//! # Slice X5 scope
50//!
51//! Substrate types + registry tracking are wired to `Core::register`
52//! and `Core::set_deps`'s edge add path so the union-find state is
53//! maintained as nodes register and topology changes. The wave engine
54//! itself still uses the legacy Core-level `wave_owner` — Y1 (the
55//! wave-engine migration to per-partition `wave_owner`) is carried
56//! forward as an explicit follow-on slice given its scope (every
57//! `begin_batch` / `run_wave` call site + `Core::subscribe` lock
58//! acquisition + `BatchGuard` lock retention + retry-validate
59//! semantics for held-Arc-vs-current-root divergence on union).
60//!
61//! Several substrate methods (`cleanup_node`, `lock_for`,
62//! `lock_for_validate`) and the `SubgraphLockBox::wave_owner` field
63//! are wired but unused in X5; they're annotated per-item with
64//! `#[allow(dead_code)]` until Y1 activates the wave engine through
65//! them. (Per-item rather than module-level shotgun so any genuinely
66//! unused new item still flags as dead code during X5 follow-up
67//! review.)
68
69use std::collections::{HashMap, HashSet};
70use std::sync::Arc;
71
72use parking_lot::ReentrantMutex;
73
74use crate::handle::NodeId;
75
76/// Newtype identifier for a connected-component partition.
77///
78/// Internally a `u64` (the union-find root's `NodeId.raw()`). Distinct
79/// from [`NodeId`] at the type system level — partitions and nodes are
80/// not interchangeable.
81#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord)]
82pub struct SubgraphId(pub(crate) u64);
83
84impl SubgraphId {
85 /// Construct from a [`NodeId`] root. Internal use — partition
86 /// identity is the union-find root's NodeId.
87 #[must_use]
88 pub(crate) fn from_node(node: NodeId) -> Self {
89 Self(node.raw())
90 }
91
92 /// Raw u64 view. Used for total-ordering across multi-partition
93 /// wave acquisitions per Q4=(a) (deadlock-free via ascending
94 /// SubgraphId order).
95 #[must_use]
96 pub fn raw(self) -> u64 {
97 self.0
98 }
99}
100
101impl std::fmt::Display for SubgraphId {
102 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
103 write!(f, "subgraph#{}", self.0)
104 }
105}
106
107/// Per-component lock box. Holds the partition's `wave_owner`
108/// re-entrant mutex. On `union`, the box of the smaller-rank root is
109/// replaced by the larger-rank root's box — the lock identity is
110/// preserved across merges via the [`Arc`] reference (mirrors py's
111/// `_LockBox.lock` redirect on union).
112///
113/// **Slice Y1 (D3 / Phase E, 2026-05-08):** the wave engine acquires
114/// this per-partition lock via [`crate::Core::partition_wave_owner_lock_arc`]
115/// (with retry-validate against concurrent union/split). Closure-form
116/// `Core::batch` acquires every partition's lock in ascending
117/// [`SubgraphId`] order; per-seed entry points (`emit`, `subscribe`,
118/// etc.) acquire only the seed's transitively-touched partitions.
119pub struct SubgraphLockBox {
120 /// Per-partition wave ownership. Cross-thread emits to the same
121 /// partition serialize here; same-thread re-entry passes through.
122 /// Wrapped in `Arc` at the registry level so two roots' boxes
123 /// can share the same mutex identity after `union`.
124 pub(crate) wave_owner: Arc<ReentrantMutex<()>>,
125}
126
127impl SubgraphLockBox {
128 fn new() -> Arc<Self> {
129 Arc::new(Self {
130 wave_owner: Arc::new(ReentrantMutex::new(())),
131 })
132 }
133}
134
135/// Default cap for [`SubgraphRegistry::lock_for`]'s root-validation
136/// retry loop. Mirrors graphrefly-py's `_MAX_LOCK_RETRIES` constant
137/// (`subgraph_locks.py` line 39). Reached only under continuous
138/// `union` activity racing with `lock_for` — pathological in
139/// practice but bounded for safety.
140///
141/// Consumed by [`crate::Core::partition_wave_owner_lock_arc`] under
142/// the retry-validate loop (Slice Y1 / Phase E, 2026-05-08).
143pub(crate) const MAX_LOCK_RETRIES: u32 = 100;
144
145/// Union-find registry tracking each node's connected-component
146/// membership. Wrapped by [`crate::Core`] in `Arc<Mutex<...>>` so
147/// the wave engine can resolve a node's partition before acquiring
148/// that partition's `wave_owner`.
149pub struct SubgraphRegistry {
150 /// `node_id → parent_id` union-find array. A root has
151 /// `parent[root] == root`.
152 parent: HashMap<NodeId, NodeId>,
153 /// `node_id → rank` for union-by-rank. Only meaningful for roots.
154 rank: HashMap<NodeId, u32>,
155 /// Reverse map for cleanup-time re-rooting: `node_id → set of
156 /// direct children whose `parent` field points at `node_id``.
157 /// On root cleanup, we promote one child to root and re-attach
158 /// the others.
159 children: HashMap<NodeId, HashSet<NodeId>>,
160 /// `root_node_id → Arc<SubgraphLockBox>`. Only roots have entries.
161 /// On `union`, the loser's entry is removed and any future
162 /// `lock_for` calls on its members find the winner's box via
163 /// `find`.
164 boxes: HashMap<NodeId, Arc<SubgraphLockBox>>,
165}
166
167impl SubgraphRegistry {
168 /// Construct an empty registry. `pub(crate)` because all useful
169 /// operations on the registry are crate-internal — only `Core::new`
170 /// needs to construct one. External callers receive
171 /// [`SubgraphId`]s via [`Core::partition_of`] but cannot construct
172 /// a registry from outside the crate.
173 #[must_use]
174 pub(crate) fn new() -> Self {
175 Self {
176 parent: HashMap::new(),
177 rank: HashMap::new(),
178 children: HashMap::new(),
179 boxes: HashMap::new(),
180 }
181 }
182
183 /// Register `node` as the root of a fresh singleton component.
184 /// Idempotent — calling twice on the same node is a no-op.
185 ///
186 /// Called from [`crate::Core::register`] right after the new
187 /// `NodeId` is allocated, BEFORE any `union_nodes` calls for the
188 /// node's deps.
189 pub(crate) fn ensure_registered(&mut self, node: NodeId) {
190 if self.parent.contains_key(&node) {
191 return;
192 }
193 self.parent.insert(node, node);
194 self.rank.insert(node, 0);
195 self.children.insert(node, HashSet::new());
196 self.boxes.insert(node, SubgraphLockBox::new());
197 }
198
199 /// Find the root of `node`'s component, with path compression.
200 /// Panics if `node` is not registered.
201 ///
202 /// **Iterative two-pass implementation** (Slice X5 /qa):
203 /// pass 1 walks up the parent chain to the root; pass 2 re-links
204 /// every node on the path directly to the root + maintains the
205 /// `children` reverse-map. Iterative form mirrors the existing
206 /// `terminate_node` cascade discipline (and graphrefly-py's
207 /// `_find_locked` while-loop) — keeps stack depth O(1) regardless
208 /// of pre-compression chain length, avoiding stack overflow on
209 /// pathological un-compressed trees that can briefly form before
210 /// path compression settles (especially relevant once Y1 puts
211 /// `find` on the hot path under `lock_for`).
212 #[must_use]
213 pub(crate) fn find(&mut self, node: NodeId) -> NodeId {
214 // Pass 1: walk up to root (no mutation).
215 let mut cur = node;
216 loop {
217 let parent = *self
218 .parent
219 .get(&cur)
220 .expect("subgraph_registry::find: node not registered");
221 if parent == cur {
222 break; // cur is root
223 }
224 cur = parent;
225 }
226 let root = cur;
227
228 // Pass 2: path compression — re-link every node on the original
229 // walk directly to root, maintaining the `children` reverse-map.
230 // Skip when already pointing at root (no-op for root itself, and
231 // for already-compressed nodes).
232 let mut walker = node;
233 while walker != root {
234 let parent = *self
235 .parent
236 .get(&walker)
237 .expect("walker on path-to-root must be registered");
238 if parent != root {
239 self.parent.insert(walker, root);
240 if let Some(old_kids) = self.children.get_mut(&parent) {
241 old_kids.remove(&walker);
242 }
243 self.children.entry(root).or_default().insert(walker);
244 }
245 walker = parent;
246 }
247 root
248 }
249
250 /// Merge the components containing `a` and `b`. Both nodes must
251 /// already be registered. After this call, `find(a) == find(b)`.
252 ///
253 /// Union-by-rank: the smaller-rank root becomes a child of the
254 /// larger. Equal-rank breaks the tie by promoting `a`'s root and
255 /// bumping its rank.
256 ///
257 /// On merge, the loser's [`SubgraphLockBox`] entry is removed
258 /// from `boxes`. Future `lock_for` calls on the loser's members
259 /// resolve to the winner's root → winner's box.
260 pub(crate) fn union_nodes(&mut self, a: NodeId, b: NodeId) {
261 debug_assert!(
262 self.parent.contains_key(&a) && self.parent.contains_key(&b),
263 "union_nodes: both nodes must be registered first"
264 );
265 // Defense-in-depth against bypassed cycle detection: a self-edge
266 // `set_deps(n, &[n])` would reach here as `union_nodes(n, n)`.
267 // Cycle rejection in `Core::set_deps` should catch this BEFORE
268 // we get called, but the registry has no other defense.
269 debug_assert!(
270 a != b,
271 "union_nodes called with self-edge — \
272 Core's cycle detection bypassed?"
273 );
274 let mut root_a = self.find(a);
275 let mut root_b = self.find(b);
276 if root_a == root_b {
277 return;
278 }
279 let rank_a = *self.rank.get(&root_a).unwrap_or(&0);
280 let rank_b = *self.rank.get(&root_b).unwrap_or(&0);
281 if rank_a < rank_b {
282 std::mem::swap(&mut root_a, &mut root_b);
283 }
284 // root_a is the winner (kept). root_b becomes a child.
285 self.parent.insert(root_b, root_a);
286 self.children.entry(root_a).or_default().insert(root_b);
287 if rank_a == rank_b {
288 self.rank.insert(root_a, rank_a + 1);
289 }
290 // Drop the loser's box. Any in-flight readers holding an Arc
291 // clone keep it alive; the registry no longer references it.
292 self.boxes.remove(&root_b);
293 }
294
295 /// Remove `node` from the registry. If `node` was a root, promote
296 /// one of its direct children as the new root and re-link the
297 /// others. Mirrors py's `_on_gc` (`subgraph_locks.py` lines 53–92).
298 ///
299 /// **Slice X5 status:** wired but NOT called from Core. Today
300 /// `Core::terminate_node` marks a node terminal but does NOT remove
301 /// it from `CoreState.nodes` (NodeRecords persist for the life of
302 /// `CoreState`); the registry's HashMap entries drop together with
303 /// `CoreState` when the last `Core` clone goes, so the partition
304 /// state is reclaimed without an explicit per-node cleanup. Y1
305 /// will revisit this — when the wave engine activates per-partition
306 /// `wave_owner`, terminated nodes' partition entries should be
307 /// purged so a stale partition doesn't keep its `Arc<SubgraphLockBox>`
308 /// alive in the registry. Idempotent on unregistered nodes.
309 #[allow(dead_code)]
310 pub(crate) fn cleanup_node(&mut self, node: NodeId) {
311 let Some(parent) = self.parent.get(&node).copied() else {
312 return; // already cleaned up — idempotent.
313 };
314
315 let direct_children: Vec<NodeId> = self
316 .children
317 .get(&node)
318 .map(|s| s.iter().copied().collect())
319 .unwrap_or_default();
320
321 if parent == node {
322 // `node` was the root.
323 if let Some(&new_root) = direct_children.first() {
324 self.parent.insert(new_root, new_root);
325 // Detach `node` from each grandchild's children-set first
326 // (before borrowing children mut for the new_root entry).
327 for child in &direct_children {
328 if let Some(kids) = self.children.get_mut(child) {
329 kids.remove(&node);
330 }
331 }
332 // Re-attach all children except `new_root` to `new_root`.
333 let new_root_kids = self.children.entry(new_root).or_default();
334 for child in direct_children.iter().skip(1).copied() {
335 self.parent.insert(child, new_root);
336 new_root_kids.insert(child);
337 }
338 // Box ownership transfers to the new root — same Arc, so
339 // any in-flight `lock_for` holders keep their guard alive.
340 if let Some(box_arc) = self.boxes.remove(&node) {
341 self.boxes.insert(new_root, box_arc);
342 }
343 let old_rank = self.rank.get(&node).copied().unwrap_or(0);
344 let new_rank = self.rank.entry(new_root).or_insert(0);
345 if old_rank > *new_rank {
346 *new_rank = old_rank;
347 }
348 } else {
349 // Singleton root — just remove the box.
350 self.boxes.remove(&node);
351 }
352 } else {
353 // `node` was a non-root. Detach from its parent's children;
354 // re-attach its own children to its parent.
355 if let Some(parent_kids) = self.children.get_mut(&parent) {
356 parent_kids.remove(&node);
357 for child in &direct_children {
358 parent_kids.insert(*child);
359 }
360 }
361 for child in &direct_children {
362 self.parent.insert(*child, parent);
363 }
364 }
365
366 self.children.remove(&node);
367 self.parent.remove(&node);
368 self.rank.remove(&node);
369 // boxes already removed above on root path; non-root path never
370 // had a box entry.
371 }
372
373 /// Hook for an edge removal. Slice X5 commit-1: was a no-op
374 /// (monotonic-merge stepping stone). Slice Y1 / Phase F (D3
375 /// split-eager, 2026-05-09): the actual reachability walk +
376 /// split decision lives in `Core::set_deps` (it has access to
377 /// `s.children` and `s.nodes`'s `dep_records` for the undirected
378 /// dep-edge graph). When `Core` detects disconnection it calls
379 /// [`Self::split_partition`] directly. This `on_edge_removed`
380 /// hook remains as a no-op marker — kept for API symmetry with
381 /// [`Self::union_nodes`] and for future use if the registry
382 /// gains its own edge-graph view.
383 pub(crate) fn on_edge_removed(&mut self, _from: NodeId, _to: NodeId) {
384 // No-op. See `Core::set_deps` Phase F split-eager block which
385 // calls `split_partition` directly when an edge removal causes
386 // disconnection in the undirected dep-edge graph.
387 }
388
389 /// Split an existing component into two, given the keep-side and
390 /// orphan-side membership. Slice Y1 / Phase F (D3 split-eager,
391 /// 2026-05-09).
392 ///
393 /// **Invariants assumed by caller:**
394 /// 1. `component_nodes` lists every node currently in some single
395 /// component C of this registry.
396 /// 2. `keep_side` ⊆ `component_nodes` is the post-removal
397 /// connected subset that should retain C's existing
398 /// [`SubgraphLockBox`] (Arc identity preserved). `keep_side`
399 /// must be non-empty.
400 /// 3. The orphan side (`component_nodes - keep_side`) must be
401 /// non-empty. (If it were empty, no split is needed; caller
402 /// should not invoke this method.)
403 /// 4. `edges_in_component` lists the dep edges (as `(parent,
404 /// child)` data-flow pairs) currently present in
405 /// `Core::s.children` / `s.nodes[*].dep_records`,
406 /// POST-removal of the triggering edge. The edges connect
407 /// nodes within `component_nodes`. The caller is responsible
408 /// for filtering to in-component edges.
409 ///
410 /// **Algorithm (mirrors graphrefly-py [`subgraph_locks.py`](https://github.com/graphrefly/graphrefly-py/blob/main/src/graphrefly/core/subgraph_locks.py)
411 /// `_on_split` plus a Rust-idiomatic Arc redirect):**
412 /// 1. Capture the original [`Arc<SubgraphLockBox>`] for the
413 /// component's pre-split root.
414 /// 2. Reset every component node to a singleton (parent[n] = n,
415 /// rank = 0, children = ∅).
416 /// 3. Re-union via `edges_in_component` — each edge calls
417 /// [`Self::union_nodes`], which restores connectivity within
418 /// each side independently (since `edges_in_component`
419 /// contains only the post-removal edges, the two sides stay
420 /// disconnected from each other in the union-find tree).
421 /// 4. Resolve the new keep-side root and orphan-side root.
422 /// 5. Assign the original lock box to the keep-side root (so
423 /// in-flight waves holding the original `Arc` succeed
424 /// `lock_for_validate` against the keep-side); allocate a
425 /// fresh box for the orphan-side root (in-flight waves on
426 /// the orphan side fail validate, retry with the new box).
427 pub(crate) fn split_partition(
428 &mut self,
429 component_nodes: &[NodeId],
430 keep_side_nodes: &[NodeId],
431 edges_in_component: &[(NodeId, NodeId)],
432 ) {
433 debug_assert!(
434 !component_nodes.is_empty(),
435 "component_nodes must be non-empty"
436 );
437 debug_assert!(
438 !keep_side_nodes.is_empty(),
439 "keep_side_nodes must be non-empty"
440 );
441 // Build a contains-lookup set for the keep side. The caller
442 // passes a slice (avoids cross-crate-internal HashSet flavor
443 // mismatch — `std::collections::HashSet` vs `ahash::AHashSet`).
444 let keep_side: HashSet<NodeId> = keep_side_nodes.iter().copied().collect();
445 debug_assert!(
446 component_nodes.iter().any(|n| !keep_side.contains(n)),
447 "orphan side must be non-empty (no-op caller)"
448 );
449
450 // Step 1: capture original box.
451 let original_root = self.find(component_nodes[0]);
452 let original_box = self
453 .boxes
454 .remove(&original_root)
455 .expect("original_root must have a registered box");
456
457 // Step 2: reset every component node to a singleton.
458 for &n in component_nodes {
459 self.parent.insert(n, n);
460 self.rank.insert(n, 0);
461 self.children.insert(n, HashSet::new());
462 }
463
464 // Step 3: re-union via post-removal edges. Both sides become
465 // internally connected; the two sides remain disconnected from
466 // each other (since the triggering edge was removed and the
467 // caller's BFS verified disconnection).
468 for &(a, b) in edges_in_component {
469 // `union_nodes` is idempotent on already-merged components,
470 // so multiple edges into the same connected subset are fine.
471 // Self-edges shouldn't appear here (Core's cycle detection
472 // rejects them at edge-mutation time) but guard defensively.
473 if a != b {
474 self.union_nodes(a, b);
475 }
476 }
477
478 // Step 4: resolve roots. Pick any keep-side / orphan-side
479 // representative and find its post-re-union root.
480 let keep_repr = keep_side_nodes[0];
481 let keep_root = self.find(keep_repr);
482 let orphan_repr = *component_nodes
483 .iter()
484 .find(|n| !keep_side.contains(n))
485 .expect("non-empty orphan side");
486 let orphan_root = self.find(orphan_repr);
487 debug_assert!(
488 keep_root != orphan_root,
489 "split_partition: keep_root {keep_root:?} and orphan_root {orphan_root:?} \
490 must be distinct after re-union — caller's BFS must have asserted \
491 disconnection"
492 );
493
494 // Step 5: assign boxes. Original box → keep-side root (so
495 // in-flight waves' `lock_for_validate` against the held Arc
496 // succeed for keep-side nodes). Fresh box → orphan-side root.
497 self.boxes.insert(keep_root, original_box);
498 self.boxes.insert(orphan_root, SubgraphLockBox::new());
499 }
500
501 /// Resolve `node`'s partition lock box. Caller acquires the
502 /// box's `wave_owner`, then SHOULD re-validate via
503 /// [`Self::lock_for_validate`] that the resolved root hasn't
504 /// shifted under a concurrent `union` (lock-validation retry
505 /// loop, mirroring py `lock_for` lines 154–178).
506 ///
507 /// Returns `None` if `node` is not registered (defensive — should
508 /// not happen in correct call paths).
509 ///
510 /// Consumed by [`crate::Core::partition_wave_owner_lock_arc`]
511 /// under the retry-validate loop (Slice Y1 / Phase E, 2026-05-08).
512 #[must_use]
513 pub(crate) fn lock_for(&mut self, node: NodeId) -> Option<(SubgraphId, Arc<SubgraphLockBox>)> {
514 if !self.parent.contains_key(&node) {
515 return None;
516 }
517 let root = self.find(node);
518 let box_arc = self.boxes.get(&root).cloned()?;
519 Some((SubgraphId::from_node(root), box_arc))
520 }
521
522 /// Re-validate that `node`'s root resolves to `expected_box`. Used
523 /// in the retry loop after acquiring the box's `wave_owner`: if a
524 /// concurrent `union` redirected the root mid-acquire, the held
525 /// lock is for the wrong partition and the caller must release +
526 /// retry.
527 ///
528 /// Consumed by [`crate::Core::partition_wave_owner_lock_arc`]
529 /// under the retry-validate loop (Slice Y1 / Phase E, 2026-05-08).
530 #[must_use]
531 pub(crate) fn lock_for_validate(
532 &mut self,
533 node: NodeId,
534 expected_box: &Arc<SubgraphLockBox>,
535 ) -> bool {
536 let Some(root) = self.parent.get(&node).copied() else {
537 return false;
538 };
539 let actual_root = self.find(root);
540 match self.boxes.get(&actual_root) {
541 Some(actual) => Arc::ptr_eq(actual, expected_box),
542 None => false,
543 }
544 }
545
546 /// Snapshot of every currently-existing partition, sorted in
547 /// ascending [`SubgraphId`] order. Returns `(partition_id, lock_box)`
548 /// pairs — the caller acquires `box.wave_owner.lock_arc()` on each
549 /// in ascending order to enter a closure-form batch (which doesn't
550 /// have a known seed and must serialize against ALL partitions per
551 /// session-doc Q7 + decision D092).
552 ///
553 /// Consumed by `Core::all_partitions_lock_boxes` (Slice Y1 /
554 /// Phase E, 2026-05-08).
555 #[must_use]
556 pub(crate) fn all_partitions(&self) -> Vec<(SubgraphId, Arc<SubgraphLockBox>)> {
557 let mut out: Vec<(SubgraphId, Arc<SubgraphLockBox>)> = self
558 .boxes
559 .iter()
560 .map(|(root, box_arc)| (SubgraphId::from_node(*root), Arc::clone(box_arc)))
561 .collect();
562 out.sort_unstable_by_key(|(sid, _)| *sid);
563 out
564 }
565
566 /// Number of registered nodes. Useful for debugging + acceptance
567 /// tests that verify the registry stays in sync with `Core::nodes`.
568 #[must_use]
569 pub fn node_count(&self) -> usize {
570 self.parent.len()
571 }
572
573 /// Snapshot of every currently-registered node. Used by
574 /// `Core::set_deps`'s split-eager block (Slice Y1 / Phase F,
575 /// 2026-05-09) to enumerate candidate nodes for a component-
576 /// membership filter; iterating + calling `find` would alias-
577 /// borrow `&mut self`, so we snapshot first.
578 #[must_use]
579 pub(crate) fn registered_nodes(&self) -> Vec<NodeId> {
580 self.parent.keys().copied().collect()
581 }
582
583 /// Number of distinct connected components. Two threads emitting
584 /// into nodes with distinct partitions can run truly parallel
585 /// (Y1+); X5 substrate does not yet exercise that property.
586 #[must_use]
587 pub fn component_count(&self) -> usize {
588 self.boxes.len()
589 }
590
591 /// Resolve `node`'s partition. Returns `None` for unregistered
592 /// nodes. Mutating because path compression may relink under
593 /// `find`.
594 #[must_use]
595 pub fn partition_of(&mut self, node: NodeId) -> Option<SubgraphId> {
596 if !self.parent.contains_key(&node) {
597 return None;
598 }
599 Some(SubgraphId::from_node(self.find(node)))
600 }
601}
602
603// No `Default` impl — the registry is crate-internal infrastructure;
604// `Core::new` is the only construction site. Adding a public `Default`
605// would let external callers build a registry that can do nothing
606// useful (every operation method is `pub(crate)`).
607
608#[cfg(test)]
609mod tests {
610 use super::*;
611
612 fn n(raw: u64) -> NodeId {
613 NodeId::new(raw)
614 }
615
616 #[test]
617 fn singleton_register_creates_one_partition() {
618 let mut r = SubgraphRegistry::new();
619 r.ensure_registered(n(1));
620 assert_eq!(r.node_count(), 1);
621 assert_eq!(r.component_count(), 1);
622 assert_eq!(r.find(n(1)), n(1));
623 }
624
625 #[test]
626 fn union_merges_two_singletons() {
627 let mut r = SubgraphRegistry::new();
628 r.ensure_registered(n(1));
629 r.ensure_registered(n(2));
630 assert_eq!(r.component_count(), 2);
631 r.union_nodes(n(1), n(2));
632 assert_eq!(r.component_count(), 1);
633 assert_eq!(r.find(n(1)), r.find(n(2)));
634 }
635
636 #[test]
637 fn union_idempotent_within_same_component() {
638 let mut r = SubgraphRegistry::new();
639 r.ensure_registered(n(1));
640 r.ensure_registered(n(2));
641 r.union_nodes(n(1), n(2));
642 let comp_before = r.component_count();
643 r.union_nodes(n(1), n(2));
644 assert_eq!(r.component_count(), comp_before);
645 }
646
647 #[test]
648 fn cleanup_singleton_removes_partition() {
649 let mut r = SubgraphRegistry::new();
650 r.ensure_registered(n(1));
651 r.cleanup_node(n(1));
652 assert_eq!(r.node_count(), 0);
653 assert_eq!(r.component_count(), 0);
654 }
655
656 #[test]
657 fn cleanup_root_promotes_child() {
658 let mut r = SubgraphRegistry::new();
659 r.ensure_registered(n(1));
660 r.ensure_registered(n(2));
661 r.union_nodes(n(1), n(2));
662 // After union: one of {1, 2} is root.
663 let root_before = r.find(n(1));
664 let child = if root_before == n(1) { n(2) } else { n(1) };
665 r.cleanup_node(root_before);
666 // The remaining node should be its own root.
667 assert_eq!(r.find(child), child);
668 assert_eq!(r.component_count(), 1);
669 }
670
671 #[test]
672 fn cleanup_non_root_re_links_grandchildren_to_parent() {
673 let mut r = SubgraphRegistry::new();
674 for i in 1..=3 {
675 r.ensure_registered(n(i));
676 }
677 r.union_nodes(n(1), n(2));
678 r.union_nodes(n(2), n(3));
679 let root_before = r.find(n(1));
680 // Pick a non-root node to clean up.
681 let non_root = if root_before == n(1) {
682 n(2)
683 } else if root_before == n(2) {
684 n(1)
685 } else {
686 n(2)
687 };
688 r.cleanup_node(non_root);
689 // Remaining nodes should still find a single root.
690 let other = (1..=3u64)
691 .map(n)
692 .find(|x| *x != root_before && *x != non_root)
693 .expect("third node");
694 assert_eq!(r.find(root_before), r.find(other));
695 }
696
697 #[test]
698 fn lock_for_returns_same_box_for_same_component() {
699 let mut r = SubgraphRegistry::new();
700 r.ensure_registered(n(1));
701 r.ensure_registered(n(2));
702 r.union_nodes(n(1), n(2));
703 let (_sid_a, box_a) = r.lock_for(n(1)).expect("registered");
704 let (_sid_b, box_b) = r.lock_for(n(2)).expect("registered");
705 assert!(Arc::ptr_eq(&box_a, &box_b));
706 }
707
708 #[test]
709 fn lock_for_validate_detects_redirect_after_union() {
710 // Slice X5 /qa P6: deterministic redirect via forced rank
711 // skew. Pre-fix this test asserted conditionally based on
712 // which node union promoted, masking a hypothetical regression
713 // where `lock_for_validate` always returned `true`. Now the
714 // setup forces n(1)'s root to be displaced — the assertion
715 // is unconditionally `false`.
716 let mut r = SubgraphRegistry::new();
717 for i in 1..=4 {
718 r.ensure_registered(n(i));
719 }
720 // Build a higher-rank tree under n(2)'s root: union n(2)+n(3)
721 // and n(2)+n(4), which under union-by-rank with equal initial
722 // ranks promotes n(2) and bumps its rank by the second union.
723 // After this, find(n(2)) == n(2) and rank[n(2)] >= 1.
724 r.union_nodes(n(2), n(3));
725 r.union_nodes(n(2), n(4));
726 let n2_root = r.find(n(2));
727 // n(1) stays its own singleton (rank 0). Resolve its box BEFORE
728 // the cross-tree union.
729 let (_sid_before, box_1_alone) = r.lock_for(n(1)).expect("registered");
730 let n1_root_before = r.find(n(1));
731 assert_eq!(n1_root_before, n(1), "n(1) is still its own root");
732
733 // Cross-tree union: union-by-rank promotes the higher-rank tree
734 // (n(2)'s) — n(1)'s root MUST become n(2)'s root.
735 r.union_nodes(n(1), n(2));
736 let n1_root_after = r.find(n(1));
737 assert_eq!(
738 n1_root_after, n2_root,
739 "union-by-rank promoted n(2)'s tree; n(1)'s root displaced"
740 );
741
742 // The previously-resolved box is now stale: lock_for_validate
743 // must detect the redirect unconditionally.
744 let still_valid = r.lock_for_validate(n(1), &box_1_alone);
745 assert!(
746 !still_valid,
747 "lock_for_validate must detect the box-redirect after union promotes a different root"
748 );
749 // And lock_for now returns a different box.
750 let (_sid_after, box_after) = r.lock_for(n(1)).expect("registered");
751 assert!(
752 !Arc::ptr_eq(&box_1_alone, &box_after),
753 "stale box and active box must be distinct Arc identities"
754 );
755 }
756
757 #[test]
758 fn partition_of_distinct_singletons_differ() {
759 let mut r = SubgraphRegistry::new();
760 r.ensure_registered(n(1));
761 r.ensure_registered(n(2));
762 let p1 = r.partition_of(n(1)).expect("registered");
763 let p2 = r.partition_of(n(2)).expect("registered");
764 assert_ne!(p1, p2);
765 }
766
767 #[test]
768 fn partition_of_unioned_nodes_match() {
769 let mut r = SubgraphRegistry::new();
770 r.ensure_registered(n(1));
771 r.ensure_registered(n(2));
772 r.union_nodes(n(1), n(2));
773 let p1 = r.partition_of(n(1)).expect("registered");
774 let p2 = r.partition_of(n(2)).expect("registered");
775 assert_eq!(p1, p2);
776 }
777}