graphrefly_core/subgraph.rs
1//! Per-subgraph union-find registry (Slice X5, 2026-05-08).
2//!
3//! Direct port of [`graphrefly-py`'s
4//! `subgraph_locks.py`](https://github.com/graphrefly/graphrefly-py/blob/main/src/graphrefly/core/subgraph_locks.py)
5//! (locked design via [`SESSION-rust-port-d3-per-subgraph-parallelism.md`](https://github.com/graphrefly/graphrefly-ts/blob/main/archive/docs/SESSION-rust-port-d3-per-subgraph-parallelism.md)
6//! — Q1 = (c-uf split-eager); see decision log D086).
7//!
8//! # Concept
9//!
10//! Each registered node is a member of exactly one connected
11//! component (a "subgraph") at any moment. Two nodes are in the same
12//! subgraph iff they're connected via dep edges (transitively).
13//!
14//! Connected-component membership is tracked via union-find with
15//! union-by-rank + path compression. Each component's root carries
16//! an [`Arc<SubgraphLockBox>`] that owns the partition's
17//! `wave_owner` re-entrant mutex — nodes in the same component share
18//! one lock, disjoint components run truly parallel.
19//!
20//! # Lifecycle hooks
21//!
22//! - [`SubgraphRegistry::ensure_registered`] — called when a node is
23//! first registered via [`crate::Core::register`]. Allocates a
24//! fresh singleton component for the new node.
25//! - [`SubgraphRegistry::union_nodes`] — called for each new dep
26//! edge (in `register` and `set_deps`'s add-edge path). Merges
27//! the two endpoints' components.
28//! - [`SubgraphRegistry::cleanup_node`] — wired but **not yet called
29//! in X5**. In Y1 the wave engine will invoke it from sites that
30//! actually remove a `NodeRecord` from `CoreState.nodes` (today
31//! `terminate_node` only marks the node terminal — `NodeRecord`s
32//! persist for the life of `CoreState`). The registry's HashMap
33//! entries drop together with `CoreState` when the last `Core`
34//! clone goes, so X5 doesn't leak partitions in practice; the
35//! per-node cleanup hook becomes load-bearing once Y1 lands a
36//! `Drop`-via-removal lifecycle for `NodeRecord` (post-Y1 work).
37//! - [`SubgraphRegistry::on_edge_removed`] — called for each removed
38//! dep edge in `set_deps`'s remove path. Slice X5 commit-1 just
39//! notes the removal; **Y1 (split-eager)** adds the reachability
40//! walk that splits disconnected components.
41//!
42//! # Lock-acquisition discipline (Y1)
43//!
44//! [`SubgraphRegistry::lock_for`] returns the component's
45//! [`Arc<SubgraphLockBox>`] — caller acquires `box.wave_owner` and
46//! re-validates that the resolved root hasn't been redirected by a
47//! concurrent `union` (mirrors py's `MAX_LOCK_RETRIES` retry loop).
48//!
49//! # Slice X5 scope
50//!
51//! Substrate types + registry tracking are wired to `Core::register`
52//! and `Core::set_deps`'s edge add path so the union-find state is
53//! maintained as nodes register and topology changes. The wave engine
54//! itself still uses the legacy Core-level `wave_owner` — Y1 (the
55//! wave-engine migration to per-partition `wave_owner`) is carried
56//! forward as an explicit follow-on slice given its scope (every
57//! `begin_batch` / `run_wave` call site + `Core::subscribe` lock
58//! acquisition + `BatchGuard` lock retention + retry-validate
59//! semantics for held-Arc-vs-current-root divergence on union).
60//!
61//! Several substrate methods (`cleanup_node`, `lock_for`,
62//! `lock_for_validate`) and the `SubgraphLockBox::wave_owner` field
63//! are wired but unused in X5; they're annotated per-item with
64//! `#[allow(dead_code)]` until Y1 activates the wave engine through
65//! them. (Per-item rather than module-level shotgun so any genuinely
66//! unused new item still flags as dead code during X5 follow-up
67//! review.)
68
69use std::collections::{HashMap, HashSet};
70use std::sync::Arc;
71
72use parking_lot::ReentrantMutex;
73
74use crate::handle::NodeId;
75
76/// Newtype identifier for a connected-component partition.
77///
78/// Internally a `u64` (the union-find root's `NodeId.raw()`). Distinct
79/// from [`NodeId`] at the type system level — partitions and nodes are
80/// not interchangeable.
81#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord)]
82pub struct SubgraphId(pub(crate) u64);
83
84impl SubgraphId {
85 /// Construct from a [`NodeId`] root. Internal use — partition
86 /// identity is the union-find root's NodeId.
87 #[must_use]
88 pub(crate) fn from_node(node: NodeId) -> Self {
89 Self(node.raw())
90 }
91
92 /// Raw u64 view. Used for total-ordering across multi-partition
93 /// wave acquisitions per Q4=(a) (deadlock-free via ascending
94 /// SubgraphId order).
95 #[must_use]
96 pub fn raw(self) -> u64 {
97 self.0
98 }
99}
100
101impl std::fmt::Display for SubgraphId {
102 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
103 write!(f, "subgraph#{}", self.0)
104 }
105}
106
107// Q3 (2026-05-09) introduced `SubgraphState { tier3_emitted_this_wave }`
108// + a `state: parking_lot::Mutex<SubgraphState>` field on
109// `SubgraphLockBox`. The D1 patch (2026-05-09) reverted this placement
110// after QA surfaced a mid-wave cross-thread `set_deps` partition-split
111// hazard: thread B's split could move a node X from the wave's
112// partition to a fresh orphan-side partition with empty
113// `tier3_emitted_this_wave`, and thread A's subsequent emit at X would
114// then mis-detect "first emit" and violate R1.3.3.a coalescing. Slice G
115// tier3 tracking now lives in a per-thread thread-local in
116// `crate::batch::TIER3_EMITTED_THIS_WAVE` — robust to cross-thread
117// splits because thread B's split doesn't touch thread A's
118// thread-local at all. The `SubgraphState` struct is GONE for now;
119// Q-beyond will reintroduce a per-partition state placement when the
120// CoreState shard layout actually needs it (with a different field
121// shape that's robust to mid-wave splits).
122
123/// Per-component lock box. Holds the partition's `wave_owner`
124/// re-entrant mutex. On `union`, the box of the smaller-rank root is
125/// replaced by the larger-rank root's box — the lock identity is
126/// preserved across merges via the [`Arc`] reference (mirrors py's
127/// `_LockBox.lock` redirect on union).
128///
129/// **Slice Y1 (D3 / Phase E, 2026-05-08):** the wave engine acquires
130/// this per-partition lock via [`crate::Core::partition_wave_owner_lock_arc`]
131/// (with retry-validate against concurrent union/split). Closure-form
132/// `Core::batch` acquires every partition's lock in ascending
133/// [`SubgraphId`] order; per-seed entry points (`emit`, `subscribe`,
134/// etc.) acquire only the seed's transitively-touched partitions.
135pub struct SubgraphLockBox {
136 /// Per-partition wave ownership. Cross-thread emits to the same
137 /// partition serialize here; same-thread re-entry passes through.
138 /// Wrapped in `Arc` at the registry level so two roots' boxes
139 /// can share the same mutex identity after `union`.
140 pub(crate) wave_owner: Arc<ReentrantMutex<()>>,
141}
142
143impl SubgraphLockBox {
144 fn new() -> Arc<Self> {
145 Arc::new(Self {
146 wave_owner: Arc::new(ReentrantMutex::new(())),
147 })
148 }
149}
150
151/// Default cap for [`SubgraphRegistry::lock_for`]'s root-validation
152/// retry loop. Mirrors graphrefly-py's `_MAX_LOCK_RETRIES` constant
153/// (`subgraph_locks.py` line 39). Reached only under continuous
154/// `union` activity racing with `lock_for` — pathological in
155/// practice but bounded for safety.
156///
157/// Consumed by [`crate::Core::partition_wave_owner_lock_arc`] under
158/// the retry-validate loop (Slice Y1 / Phase E, 2026-05-08).
159pub(crate) const MAX_LOCK_RETRIES: u32 = 100;
160
161/// Union-find registry tracking each node's connected-component
162/// membership. Wrapped by [`crate::Core`] in `Arc<Mutex<...>>` so
163/// the wave engine can resolve a node's partition before acquiring
164/// that partition's `wave_owner`.
165pub struct SubgraphRegistry {
166 /// `node_id → parent_id` union-find array. A root has
167 /// `parent[root] == root`.
168 parent: HashMap<NodeId, NodeId>,
169 /// `node_id → rank` for union-by-rank. Only meaningful for roots.
170 rank: HashMap<NodeId, u32>,
171 /// Reverse map for cleanup-time re-rooting: `node_id → set of
172 /// direct children whose `parent` field points at `node_id``.
173 /// On root cleanup, we promote one child to root and re-attach
174 /// the others.
175 children: HashMap<NodeId, HashSet<NodeId>>,
176 /// `root_node_id → Arc<SubgraphLockBox>`. Only roots have entries.
177 /// On `union`, the loser's entry is removed and any future
178 /// `lock_for` calls on its members find the winner's box via
179 /// `find`.
180 boxes: HashMap<NodeId, Arc<SubgraphLockBox>>,
181 /// Monotonic counter bumped on every operation that changes the
182 /// SET of partitions or any partition's box-identity:
183 /// [`Self::ensure_registered`] (creates a new partition),
184 /// [`Self::union_nodes`] (merges two partitions — actual merge
185 /// only, idempotent calls don't bump), and
186 /// [`Self::split_partition`] (splits a partition into two with a
187 /// fresh box for the orphan side).
188 ///
189 /// Consumed by [`crate::Core::begin_batch`] and
190 /// [`crate::Core::begin_batch_for`] for retry-validate at the
191 /// all-partitions / touched-partitions level (QA-fix #2,
192 /// 2026-05-09): if the epoch changes between snapshot and
193 /// post-acquire re-read, a concurrent registry mutation may
194 /// have added/removed/redirected a partition the batch should
195 /// have held. The batch drops its acquired guards and retries.
196 ///
197 /// Counter is `u64`; wrap-around is unreachable in practice
198 /// (bumping 1B times per second still takes 584 years).
199 epoch: u64,
200}
201
202impl SubgraphRegistry {
203 /// Construct an empty registry. `pub(crate)` because all useful
204 /// operations on the registry are crate-internal — only `Core::new`
205 /// needs to construct one. External callers receive
206 /// [`SubgraphId`]s via [`Core::partition_of`] but cannot construct
207 /// a registry from outside the crate.
208 #[must_use]
209 pub(crate) fn new() -> Self {
210 Self {
211 parent: HashMap::new(),
212 rank: HashMap::new(),
213 children: HashMap::new(),
214 boxes: HashMap::new(),
215 epoch: 0,
216 }
217 }
218
219 /// Read the current registry epoch. Used by
220 /// [`crate::Core::begin_batch`] / [`crate::Core::begin_batch_for`]
221 /// for the snapshot-and-acquire retry-validate loop (QA-fix #2,
222 /// 2026-05-09).
223 #[must_use]
224 pub(crate) fn epoch(&self) -> u64 {
225 self.epoch
226 }
227
228 /// Register `node` as the root of a fresh singleton component.
229 /// Idempotent — calling twice on the same node is a no-op.
230 ///
231 /// Called from [`crate::Core::register`] right after the new
232 /// `NodeId` is allocated, BEFORE any `union_nodes` calls for the
233 /// node's deps.
234 pub(crate) fn ensure_registered(&mut self, node: NodeId) {
235 if self.parent.contains_key(&node) {
236 return;
237 }
238 self.parent.insert(node, node);
239 self.rank.insert(node, 0);
240 self.children.insert(node, HashSet::new());
241 self.boxes.insert(node, SubgraphLockBox::new());
242 // QA-fix #2: new partition exists. Closure-form `Core::begin_batch`
243 // that snapshotted before now must retry to acquire it.
244 self.epoch = self.epoch.wrapping_add(1);
245 }
246
247 /// Find the root of `node`'s component, with path compression.
248 /// Panics if `node` is not registered.
249 ///
250 /// **Iterative two-pass implementation** (Slice X5 /qa):
251 /// pass 1 walks up the parent chain to the root; pass 2 re-links
252 /// every node on the path directly to the root + maintains the
253 /// `children` reverse-map. Iterative form mirrors the existing
254 /// `terminate_node` cascade discipline (and graphrefly-py's
255 /// `_find_locked` while-loop) — keeps stack depth O(1) regardless
256 /// of pre-compression chain length, avoiding stack overflow on
257 /// pathological un-compressed trees that can briefly form before
258 /// path compression settles (especially relevant once Y1 puts
259 /// `find` on the hot path under `lock_for`).
260 #[must_use]
261 pub(crate) fn find(&mut self, node: NodeId) -> NodeId {
262 // Pass 1: walk up to root (no mutation).
263 let mut cur = node;
264 loop {
265 let parent = *self
266 .parent
267 .get(&cur)
268 .expect("subgraph_registry::find: node not registered");
269 if parent == cur {
270 break; // cur is root
271 }
272 cur = parent;
273 }
274 let root = cur;
275
276 // Pass 2: path compression — re-link every node on the original
277 // walk directly to root, maintaining the `children` reverse-map.
278 // Skip when already pointing at root (no-op for root itself, and
279 // for already-compressed nodes).
280 let mut walker = node;
281 while walker != root {
282 let parent = *self
283 .parent
284 .get(&walker)
285 .expect("walker on path-to-root must be registered");
286 if parent != root {
287 self.parent.insert(walker, root);
288 if let Some(old_kids) = self.children.get_mut(&parent) {
289 old_kids.remove(&walker);
290 }
291 self.children.entry(root).or_default().insert(walker);
292 }
293 walker = parent;
294 }
295 root
296 }
297
298 /// Merge the components containing `a` and `b`. Both nodes must
299 /// already be registered. After this call, `find(a) == find(b)`.
300 ///
301 /// Union-by-rank: the smaller-rank root becomes a child of the
302 /// larger. Equal-rank breaks the tie by promoting `a`'s root and
303 /// bumping its rank.
304 ///
305 /// On merge, the loser's [`SubgraphLockBox`] entry is removed
306 /// from `boxes`. Future `lock_for` calls on the loser's members
307 /// resolve to the winner's root → winner's box.
308 pub(crate) fn union_nodes(&mut self, a: NodeId, b: NodeId) {
309 debug_assert!(
310 self.parent.contains_key(&a) && self.parent.contains_key(&b),
311 "union_nodes: both nodes must be registered first"
312 );
313 // Defense-in-depth against bypassed cycle detection: a self-edge
314 // `set_deps(n, &[n])` would reach here as `union_nodes(n, n)`.
315 // Cycle rejection in `Core::set_deps` should catch this BEFORE
316 // we get called, but the registry has no other defense.
317 debug_assert!(
318 a != b,
319 "union_nodes called with self-edge — \
320 Core's cycle detection bypassed?"
321 );
322 let mut root_a = self.find(a);
323 let mut root_b = self.find(b);
324 if root_a == root_b {
325 return;
326 }
327 let rank_a = *self.rank.get(&root_a).unwrap_or(&0);
328 let rank_b = *self.rank.get(&root_b).unwrap_or(&0);
329 if rank_a < rank_b {
330 std::mem::swap(&mut root_a, &mut root_b);
331 }
332 // root_a is the winner (kept). root_b becomes a child.
333 self.parent.insert(root_b, root_a);
334 self.children.entry(root_a).or_default().insert(root_b);
335 if rank_a == rank_b {
336 self.rank.insert(root_a, rank_a + 1);
337 }
338 // Drop the loser's box. Any in-flight readers holding an Arc
339 // clone keep it alive; the registry no longer references it.
340 self.boxes.remove(&root_b);
341 // QA-fix #2: real merge happened (early-return above handled the
342 // idempotent same-root case). The set of partitions changed;
343 // bump epoch so closure-form / per-seed retry-validate detects.
344 self.epoch = self.epoch.wrapping_add(1);
345 }
346
347 /// Remove `node` from the registry. If `node` was a root, promote
348 /// one of its direct children as the new root and re-link the
349 /// others. Mirrors py's `_on_gc` (`subgraph_locks.py` lines 53–92).
350 ///
351 /// **Slice X5 status:** wired but NOT called from Core. Today
352 /// `Core::terminate_node` marks a node terminal but does NOT remove
353 /// it from `CoreState.nodes` (NodeRecords persist for the life of
354 /// `CoreState`); the registry's HashMap entries drop together with
355 /// `CoreState` when the last `Core` clone goes, so the partition
356 /// state is reclaimed without an explicit per-node cleanup. Y1
357 /// will revisit this — when the wave engine activates per-partition
358 /// `wave_owner`, terminated nodes' partition entries should be
359 /// purged so a stale partition doesn't keep its `Arc<SubgraphLockBox>`
360 /// alive in the registry. Idempotent on unregistered nodes.
361 #[allow(dead_code)]
362 pub(crate) fn cleanup_node(&mut self, node: NodeId) {
363 let Some(parent) = self.parent.get(&node).copied() else {
364 return; // already cleaned up — idempotent.
365 };
366
367 let direct_children: Vec<NodeId> = self
368 .children
369 .get(&node)
370 .map(|s| s.iter().copied().collect())
371 .unwrap_or_default();
372
373 if parent == node {
374 // `node` was the root.
375 if let Some(&new_root) = direct_children.first() {
376 self.parent.insert(new_root, new_root);
377 // Detach `node` from each grandchild's children-set first
378 // (before borrowing children mut for the new_root entry).
379 for child in &direct_children {
380 if let Some(kids) = self.children.get_mut(child) {
381 kids.remove(&node);
382 }
383 }
384 // Re-attach all children except `new_root` to `new_root`.
385 let new_root_kids = self.children.entry(new_root).or_default();
386 for child in direct_children.iter().skip(1).copied() {
387 self.parent.insert(child, new_root);
388 new_root_kids.insert(child);
389 }
390 // Box ownership transfers to the new root — same Arc, so
391 // any in-flight `lock_for` holders keep their guard alive.
392 if let Some(box_arc) = self.boxes.remove(&node) {
393 self.boxes.insert(new_root, box_arc);
394 }
395 let old_rank = self.rank.get(&node).copied().unwrap_or(0);
396 let new_rank = self.rank.entry(new_root).or_insert(0);
397 if old_rank > *new_rank {
398 *new_rank = old_rank;
399 }
400 } else {
401 // Singleton root — just remove the box.
402 self.boxes.remove(&node);
403 }
404 } else {
405 // `node` was a non-root. Detach from its parent's children;
406 // re-attach its own children to its parent.
407 if let Some(parent_kids) = self.children.get_mut(&parent) {
408 parent_kids.remove(&node);
409 for child in &direct_children {
410 parent_kids.insert(*child);
411 }
412 }
413 for child in &direct_children {
414 self.parent.insert(*child, parent);
415 }
416 }
417
418 self.children.remove(&node);
419 self.parent.remove(&node);
420 self.rank.remove(&node);
421 // boxes already removed above on root path; non-root path never
422 // had a box entry.
423 }
424
425 /// Hook for an edge removal. Slice X5 commit-1: was a no-op
426 /// (monotonic-merge stepping stone). Slice Y1 / Phase F (D3
427 /// split-eager, 2026-05-09): the actual reachability walk +
428 /// split decision lives in `Core::set_deps` (it has access to
429 /// `s.children` and `s.nodes`'s `dep_records` for the undirected
430 /// dep-edge graph). When `Core` detects disconnection it calls
431 /// [`Self::split_partition`] directly. This `on_edge_removed`
432 /// hook remains as a no-op marker — kept for API symmetry with
433 /// [`Self::union_nodes`] and for future use if the registry
434 /// gains its own edge-graph view.
435 pub(crate) fn on_edge_removed(&mut self, _from: NodeId, _to: NodeId) {
436 // No-op. See `Core::set_deps` Phase F split-eager block which
437 // calls `split_partition` directly when an edge removal causes
438 // disconnection in the undirected dep-edge graph.
439 }
440
441 /// Split an existing component into two, given the keep-side and
442 /// orphan-side membership. Slice Y1 / Phase F (D3 split-eager,
443 /// 2026-05-09).
444 ///
445 /// **Invariants assumed by caller:**
446 /// 1. `component_nodes` lists every node currently in some single
447 /// component C of this registry.
448 /// 2. `keep_side` ⊆ `component_nodes` is the post-removal
449 /// connected subset that should retain C's existing
450 /// [`SubgraphLockBox`] (Arc identity preserved). `keep_side`
451 /// must be non-empty.
452 /// 3. The orphan side (`component_nodes - keep_side`) must be
453 /// non-empty. (If it were empty, no split is needed; caller
454 /// should not invoke this method.)
455 /// 4. `edges_in_component` lists the dep edges (as `(parent,
456 /// child)` data-flow pairs) currently present in
457 /// `Core::s.children` / `s.nodes[*].dep_records`,
458 /// POST-removal of the triggering edge. The edges connect
459 /// nodes within `component_nodes`. The caller is responsible
460 /// for filtering to in-component edges.
461 ///
462 /// **Algorithm (mirrors graphrefly-py [`subgraph_locks.py`](https://github.com/graphrefly/graphrefly-py/blob/main/src/graphrefly/core/subgraph_locks.py)
463 /// `_on_split` plus a Rust-idiomatic Arc redirect):**
464 /// 1. Capture the original [`Arc<SubgraphLockBox>`] for the
465 /// component's pre-split root.
466 /// 2. Reset every component node to a singleton (parent[n] = n,
467 /// rank = 0, children = ∅).
468 /// 3. Re-union via `edges_in_component` — each edge calls
469 /// [`Self::union_nodes`], which restores connectivity within
470 /// each side independently (since `edges_in_component`
471 /// contains only the post-removal edges, the two sides stay
472 /// disconnected from each other in the union-find tree).
473 /// 4. Resolve the new keep-side root and orphan-side root.
474 /// 5. Assign the original lock box to the keep-side root (so
475 /// in-flight waves holding the original `Arc` succeed
476 /// `lock_for_validate` against the keep-side); allocate a
477 /// fresh box for the orphan-side root (in-flight waves on
478 /// the orphan side fail validate, retry with the new box).
479 pub(crate) fn split_partition(
480 &mut self,
481 component_nodes: &[NodeId],
482 keep_side_nodes: &[NodeId],
483 edges_in_component: &[(NodeId, NodeId)],
484 ) {
485 debug_assert!(
486 !component_nodes.is_empty(),
487 "component_nodes must be non-empty"
488 );
489 debug_assert!(
490 !keep_side_nodes.is_empty(),
491 "keep_side_nodes must be non-empty"
492 );
493 // Build a contains-lookup set for the keep side. The caller
494 // passes a slice (avoids cross-crate-internal HashSet flavor
495 // mismatch — `std::collections::HashSet` vs `ahash::AHashSet`).
496 let keep_side: HashSet<NodeId> = keep_side_nodes.iter().copied().collect();
497 debug_assert!(
498 component_nodes.iter().any(|n| !keep_side.contains(n)),
499 "orphan side must be non-empty (no-op caller)"
500 );
501
502 // Step 1: capture original box.
503 let original_root = self.find(component_nodes[0]);
504 let original_box = self
505 .boxes
506 .remove(&original_root)
507 .expect("original_root must have a registered box");
508
509 // Step 2: reset every component node to a singleton.
510 for &n in component_nodes {
511 self.parent.insert(n, n);
512 self.rank.insert(n, 0);
513 self.children.insert(n, HashSet::new());
514 }
515
516 // Step 3: re-union via post-removal edges. Both sides become
517 // internally connected; the two sides remain disconnected from
518 // each other (since the triggering edge was removed and the
519 // caller's BFS verified disconnection).
520 for &(a, b) in edges_in_component {
521 // `union_nodes` is idempotent on already-merged components,
522 // so multiple edges into the same connected subset are fine.
523 // Self-edges shouldn't appear here (Core's cycle detection
524 // rejects them at edge-mutation time) but guard defensively.
525 if a != b {
526 self.union_nodes(a, b);
527 }
528 }
529
530 // Step 4: resolve roots. Pick any keep-side / orphan-side
531 // representative and find its post-re-union root.
532 let keep_repr = keep_side_nodes[0];
533 let keep_root = self.find(keep_repr);
534 let orphan_repr = *component_nodes
535 .iter()
536 .find(|n| !keep_side.contains(n))
537 .expect("non-empty orphan side");
538 let orphan_root = self.find(orphan_repr);
539 debug_assert!(
540 keep_root != orphan_root,
541 "split_partition: keep_root {keep_root:?} and orphan_root {orphan_root:?} \
542 must be distinct after re-union — caller's BFS must have asserted \
543 disconnection"
544 );
545
546 // Step 5: assign boxes. Original box → keep-side root (so
547 // in-flight waves' `lock_for_validate` against the held Arc
548 // succeed for keep-side nodes). Fresh box → orphan-side root.
549 self.boxes.insert(keep_root, original_box);
550 self.boxes.insert(orphan_root, SubgraphLockBox::new());
551
552 // QA-fix #2: split changed the partition set + introduced a
553 // fresh box. Bump epoch so any closure-form / per-seed batch
554 // that snapshotted before the split detects + retries.
555 self.epoch = self.epoch.wrapping_add(1);
556 }
557
558 /// Resolve `node`'s partition lock box. Caller acquires the
559 /// box's `wave_owner`, then SHOULD re-validate via
560 /// [`Self::lock_for_validate`] that the resolved root hasn't
561 /// shifted under a concurrent `union` (lock-validation retry
562 /// loop, mirroring py `lock_for` lines 154–178).
563 ///
564 /// Returns `None` if `node` is not registered (defensive — should
565 /// not happen in correct call paths).
566 ///
567 /// Consumed by [`crate::Core::partition_wave_owner_lock_arc`]
568 /// under the retry-validate loop (Slice Y1 / Phase E, 2026-05-08).
569 #[must_use]
570 pub(crate) fn lock_for(&mut self, node: NodeId) -> Option<(SubgraphId, Arc<SubgraphLockBox>)> {
571 if !self.parent.contains_key(&node) {
572 return None;
573 }
574 let root = self.find(node);
575 let box_arc = self.boxes.get(&root).cloned()?;
576 Some((SubgraphId::from_node(root), box_arc))
577 }
578
579 /// Re-validate that `node`'s root resolves to `expected_box`. Used
580 /// in the retry loop after acquiring the box's `wave_owner`: if a
581 /// concurrent `union` redirected the root mid-acquire, the held
582 /// lock is for the wrong partition and the caller must release +
583 /// retry.
584 ///
585 /// Consumed by [`crate::Core::partition_wave_owner_lock_arc`]
586 /// under the retry-validate loop (Slice Y1 / Phase E, 2026-05-08).
587 #[must_use]
588 pub(crate) fn lock_for_validate(
589 &mut self,
590 node: NodeId,
591 expected_box: &Arc<SubgraphLockBox>,
592 ) -> bool {
593 let Some(root) = self.parent.get(&node).copied() else {
594 return false;
595 };
596 let actual_root = self.find(root);
597 match self.boxes.get(&actual_root) {
598 Some(actual) => Arc::ptr_eq(actual, expected_box),
599 None => false,
600 }
601 }
602
603 /// Snapshot of every currently-existing partition, sorted in
604 /// ascending [`SubgraphId`] order. Returns `(partition_id, lock_box)`
605 /// pairs — the caller acquires `box.wave_owner.lock_arc()` on each
606 /// in ascending order to enter a closure-form batch (which doesn't
607 /// have a known seed and must serialize against ALL partitions per
608 /// session-doc Q7 + decision D092).
609 ///
610 /// Consumed by `Core::all_partitions_lock_boxes` (Slice Y1 /
611 /// Phase E, 2026-05-08).
612 #[must_use]
613 pub(crate) fn all_partitions(&self) -> Vec<(SubgraphId, Arc<SubgraphLockBox>)> {
614 let mut out: Vec<(SubgraphId, Arc<SubgraphLockBox>)> = self
615 .boxes
616 .iter()
617 .map(|(root, box_arc)| (SubgraphId::from_node(*root), Arc::clone(box_arc)))
618 .collect();
619 out.sort_unstable_by_key(|(sid, _)| *sid);
620 out
621 }
622
623 /// Number of registered nodes. Useful for debugging + acceptance
624 /// tests that verify the registry stays in sync with `Core::nodes`.
625 #[must_use]
626 pub fn node_count(&self) -> usize {
627 self.parent.len()
628 }
629
630 /// Snapshot of every currently-registered node. Used by
631 /// `Core::set_deps`'s split-eager block (Slice Y1 / Phase F,
632 /// 2026-05-09) to enumerate candidate nodes for a component-
633 /// membership filter; iterating + calling `find` would alias-
634 /// borrow `&mut self`, so we snapshot first.
635 #[must_use]
636 pub(crate) fn registered_nodes(&self) -> Vec<NodeId> {
637 self.parent.keys().copied().collect()
638 }
639
640 /// Number of distinct connected components. Two threads emitting
641 /// into nodes with distinct partitions can run truly parallel
642 /// (Y1+); X5 substrate does not yet exercise that property.
643 #[must_use]
644 pub fn component_count(&self) -> usize {
645 self.boxes.len()
646 }
647
648 /// Resolve `node`'s partition. Returns `None` for unregistered
649 /// nodes. Mutating because path compression may relink under
650 /// `find`.
651 #[must_use]
652 pub fn partition_of(&mut self, node: NodeId) -> Option<SubgraphId> {
653 if !self.parent.contains_key(&node) {
654 return None;
655 }
656 Some(SubgraphId::from_node(self.find(node)))
657 }
658}
659
660// No `Default` impl — the registry is crate-internal infrastructure;
661// `Core::new` is the only construction site. Adding a public `Default`
662// would let external callers build a registry that can do nothing
663// useful (every operation method is `pub(crate)`).
664
665#[cfg(test)]
666mod tests {
667 use super::*;
668
669 fn n(raw: u64) -> NodeId {
670 NodeId::new(raw)
671 }
672
673 #[test]
674 fn singleton_register_creates_one_partition() {
675 let mut r = SubgraphRegistry::new();
676 r.ensure_registered(n(1));
677 assert_eq!(r.node_count(), 1);
678 assert_eq!(r.component_count(), 1);
679 assert_eq!(r.find(n(1)), n(1));
680 }
681
682 #[test]
683 fn union_merges_two_singletons() {
684 let mut r = SubgraphRegistry::new();
685 r.ensure_registered(n(1));
686 r.ensure_registered(n(2));
687 assert_eq!(r.component_count(), 2);
688 r.union_nodes(n(1), n(2));
689 assert_eq!(r.component_count(), 1);
690 assert_eq!(r.find(n(1)), r.find(n(2)));
691 }
692
693 #[test]
694 fn union_idempotent_within_same_component() {
695 let mut r = SubgraphRegistry::new();
696 r.ensure_registered(n(1));
697 r.ensure_registered(n(2));
698 r.union_nodes(n(1), n(2));
699 let comp_before = r.component_count();
700 r.union_nodes(n(1), n(2));
701 assert_eq!(r.component_count(), comp_before);
702 }
703
704 #[test]
705 fn cleanup_singleton_removes_partition() {
706 let mut r = SubgraphRegistry::new();
707 r.ensure_registered(n(1));
708 r.cleanup_node(n(1));
709 assert_eq!(r.node_count(), 0);
710 assert_eq!(r.component_count(), 0);
711 }
712
713 #[test]
714 fn cleanup_root_promotes_child() {
715 let mut r = SubgraphRegistry::new();
716 r.ensure_registered(n(1));
717 r.ensure_registered(n(2));
718 r.union_nodes(n(1), n(2));
719 // After union: one of {1, 2} is root.
720 let root_before = r.find(n(1));
721 let child = if root_before == n(1) { n(2) } else { n(1) };
722 r.cleanup_node(root_before);
723 // The remaining node should be its own root.
724 assert_eq!(r.find(child), child);
725 assert_eq!(r.component_count(), 1);
726 }
727
728 #[test]
729 fn cleanup_non_root_re_links_grandchildren_to_parent() {
730 let mut r = SubgraphRegistry::new();
731 for i in 1..=3 {
732 r.ensure_registered(n(i));
733 }
734 r.union_nodes(n(1), n(2));
735 r.union_nodes(n(2), n(3));
736 let root_before = r.find(n(1));
737 // Pick a non-root node to clean up.
738 let non_root = if root_before == n(1) {
739 n(2)
740 } else if root_before == n(2) {
741 n(1)
742 } else {
743 n(2)
744 };
745 r.cleanup_node(non_root);
746 // Remaining nodes should still find a single root.
747 let other = (1..=3u64)
748 .map(n)
749 .find(|x| *x != root_before && *x != non_root)
750 .expect("third node");
751 assert_eq!(r.find(root_before), r.find(other));
752 }
753
754 #[test]
755 fn lock_for_returns_same_box_for_same_component() {
756 let mut r = SubgraphRegistry::new();
757 r.ensure_registered(n(1));
758 r.ensure_registered(n(2));
759 r.union_nodes(n(1), n(2));
760 let (_sid_a, box_a) = r.lock_for(n(1)).expect("registered");
761 let (_sid_b, box_b) = r.lock_for(n(2)).expect("registered");
762 assert!(Arc::ptr_eq(&box_a, &box_b));
763 }
764
765 #[test]
766 fn lock_for_validate_detects_redirect_after_union() {
767 // Slice X5 /qa P6: deterministic redirect via forced rank
768 // skew. Pre-fix this test asserted conditionally based on
769 // which node union promoted, masking a hypothetical regression
770 // where `lock_for_validate` always returned `true`. Now the
771 // setup forces n(1)'s root to be displaced — the assertion
772 // is unconditionally `false`.
773 let mut r = SubgraphRegistry::new();
774 for i in 1..=4 {
775 r.ensure_registered(n(i));
776 }
777 // Build a higher-rank tree under n(2)'s root: union n(2)+n(3)
778 // and n(2)+n(4), which under union-by-rank with equal initial
779 // ranks promotes n(2) and bumps its rank by the second union.
780 // After this, find(n(2)) == n(2) and rank[n(2)] >= 1.
781 r.union_nodes(n(2), n(3));
782 r.union_nodes(n(2), n(4));
783 let n2_root = r.find(n(2));
784 // n(1) stays its own singleton (rank 0). Resolve its box BEFORE
785 // the cross-tree union.
786 let (_sid_before, box_1_alone) = r.lock_for(n(1)).expect("registered");
787 let n1_root_before = r.find(n(1));
788 assert_eq!(n1_root_before, n(1), "n(1) is still its own root");
789
790 // Cross-tree union: union-by-rank promotes the higher-rank tree
791 // (n(2)'s) — n(1)'s root MUST become n(2)'s root.
792 r.union_nodes(n(1), n(2));
793 let n1_root_after = r.find(n(1));
794 assert_eq!(
795 n1_root_after, n2_root,
796 "union-by-rank promoted n(2)'s tree; n(1)'s root displaced"
797 );
798
799 // The previously-resolved box is now stale: lock_for_validate
800 // must detect the redirect unconditionally.
801 let still_valid = r.lock_for_validate(n(1), &box_1_alone);
802 assert!(
803 !still_valid,
804 "lock_for_validate must detect the box-redirect after union promotes a different root"
805 );
806 // And lock_for now returns a different box.
807 let (_sid_after, box_after) = r.lock_for(n(1)).expect("registered");
808 assert!(
809 !Arc::ptr_eq(&box_1_alone, &box_after),
810 "stale box and active box must be distinct Arc identities"
811 );
812 }
813
814 #[test]
815 fn partition_of_distinct_singletons_differ() {
816 let mut r = SubgraphRegistry::new();
817 r.ensure_registered(n(1));
818 r.ensure_registered(n(2));
819 let p1 = r.partition_of(n(1)).expect("registered");
820 let p2 = r.partition_of(n(2)).expect("registered");
821 assert_ne!(p1, p2);
822 }
823
824 #[test]
825 fn partition_of_unioned_nodes_match() {
826 let mut r = SubgraphRegistry::new();
827 r.ensure_registered(n(1));
828 r.ensure_registered(n(2));
829 r.union_nodes(n(1), n(2));
830 let p1 = r.partition_of(n(1)).expect("registered");
831 let p2 = r.partition_of(n(2)).expect("registered");
832 assert_eq!(p1, p2);
833 }
834}