Skip to main content

sqry_core/graph/unified/build/
parallel_commit.rs

1//! Parallel commit pipeline for pre-allocated ID ranges.
2//!
3//! Replaces the serial commit loop with a four-phase pipeline:
4//! Phase 2: Count + range assignment via prefix sums
5//! Phase 3: Parallel commit into disjoint pre-allocated ranges
6//! Phase 4: String dedup, remap, index build, edge bulk insert
7//!
8//! # Phase 3 Architecture
9//!
10//! Phase 3 uses `split_at_mut` to carve disjoint sub-slices from pre-allocated
11//! arena and interner ranges, then uses `rayon` to commit each file's staging
12//! graph in parallel without locks:
13//!
14//! ```text
15//! NodeArena slots:   [   file0   |   file1   |   file2   ]
16//! StringInterner:    [   file0   |   file1   |   file2   ]
17//!                         ↑            ↑            ↑
18//!                    split_at_mut  split_at_mut  remainder
19//! ```
20//!
21//! Each file's `commit_single_file` receives its own disjoint slices and
22//! operates independently without contention.
23
24use std::collections::HashMap;
25use std::ops::Range;
26use std::sync::Arc;
27
28use rayon::prelude::*;
29
30use crate::graph::unified::edge::delta::{DeltaEdge, DeltaOp};
31#[cfg(test)]
32use crate::graph::unified::edge::kind::ResolvedVia;
33use crate::graph::unified::edge::kind::{EdgeKind, MqProtocol};
34use crate::graph::unified::file::FileId;
35use crate::graph::unified::node::NodeId;
36use crate::graph::unified::storage::NodeArena;
37use crate::graph::unified::storage::arena::{NodeEntry, Slot};
38use crate::graph::unified::storage::c_indirect::LocalScopeIndex;
39use crate::graph::unified::string::StringId;
40
41use super::pass3_intra::PendingEdge;
42use super::staging::{
43    GoEmbeddingHint, GoFunctionSignatureHint, GoMethodReceiverHint, GoMethodSignatureHint,
44    GoNamedTypeConversionHint, GoReceiverCallHint, GoReceiverHintKind, PendingBinding,
45    PendingIndirectCallsite, StagingGraph, StagingOp,
46};
47
48/// Running offsets carried across chunks for deterministic ID assignment.
49///
50/// Each chunk's ranges begin where the previous chunk ended, ensuring
51/// globally unique, contiguous ID spaces.
52#[derive(Debug, Clone, Default)]
53pub struct GlobalOffsets {
54    /// Next available node slot index.
55    pub node_offset: u32,
56    /// Next available string slot index.
57    pub string_offset: u32,
58}
59
60/// Per-file commit plan with pre-assigned ID ranges.
61#[derive(Debug, Clone)]
62pub struct FilePlan {
63    /// Index into the chunk's `ParsedFile` vec.
64    pub parsed_index: usize,
65    /// Pre-assigned `FileId` from batch registration.
66    pub file_id: FileId,
67    /// Node slot range [start..end) in `NodeArena`.
68    pub node_range: Range<u32>,
69    /// String slot range [start..end) in `StringInterner`.
70    pub string_range: Range<u32>,
71}
72
73/// Plan for parallel commit of a single chunk.
74#[derive(Debug, Clone)]
75pub struct ChunkCommitPlan {
76    /// Per-file plans in deterministic file order.
77    pub file_plans: Vec<FilePlan>,
78    /// Total nodes across all files in this chunk.
79    pub total_nodes: u32,
80    /// Total strings across all files in this chunk.
81    pub total_strings: u32,
82    /// Total edges across all files in this chunk.
83    pub total_edges: u64,
84}
85
86/// Compute commit plan from parsed files using prefix-sum range assignment.
87///
88/// Each file gets contiguous, non-overlapping ranges for nodes and strings.
89/// Ranges start from the given global offsets, which carry forward across
90/// chunks.
91///
92/// # Arguments
93///
94/// * `node_counts` - Per-file node counts (from `StagingGraph::node_count_u32()`)
95/// * `string_counts` - Per-file string counts
96/// * `edge_counts` - Per-file edge counts (used for `total_edges` only)
97/// * `file_ids` - Pre-assigned `FileId`s from batch registration
98/// * `node_offset` - Running global node offset across chunks
99/// * `string_offset` - Running global string offset across chunks
100///
101/// # Panics
102///
103/// Panics in debug builds if the per-chunk accounting arrays do not have
104/// identical lengths.
105#[must_use]
106pub fn compute_commit_plan(
107    node_counts: &[u32],
108    string_counts: &[u32],
109    edge_counts: &[u32],
110    file_ids: &[FileId],
111    node_offset: u32,
112    string_offset: u32,
113) -> ChunkCommitPlan {
114    debug_assert_eq!(node_counts.len(), string_counts.len());
115    debug_assert_eq!(node_counts.len(), edge_counts.len());
116    debug_assert_eq!(node_counts.len(), file_ids.len());
117
118    let mut plans = Vec::with_capacity(node_counts.len());
119    let mut node_cursor = node_offset;
120    let mut string_cursor = string_offset;
121    let mut total_edges: u64 = 0;
122
123    for i in 0..node_counts.len() {
124        let nc = node_counts[i];
125        let sc = string_counts[i];
126
127        let node_end = node_cursor
128            .checked_add(nc)
129            .expect("node ID space overflow in commit plan");
130        let string_end = string_cursor
131            .checked_add(sc)
132            .expect("string ID space overflow in commit plan");
133
134        plans.push(FilePlan {
135            parsed_index: i,
136            file_id: file_ids[i],
137            node_range: node_cursor..node_end,
138            string_range: string_cursor..string_end,
139        });
140
141        node_cursor = node_end;
142        string_cursor = string_end;
143        total_edges += u64::from(edge_counts[i]);
144    }
145
146    ChunkCommitPlan {
147        file_plans: plans,
148        total_nodes: node_cursor - node_offset,
149        total_strings: string_cursor - string_offset,
150        total_edges,
151    }
152}
153
154/// Execute Phase 2: count + range assignment for a parsed chunk.
155///
156/// Extracts per-file counts from staging graphs and delegates to
157/// [`compute_commit_plan`] for prefix-sum range assignment.
158#[must_use]
159pub fn phase2_assign_ranges(
160    staging_graphs: &[&StagingGraph],
161    file_ids: &[FileId],
162    offsets: &GlobalOffsets,
163) -> ChunkCommitPlan {
164    let node_counts: Vec<u32> = staging_graphs
165        .iter()
166        .map(|sg| sg.node_count_u32())
167        .collect();
168    let string_counts: Vec<u32> = staging_graphs
169        .iter()
170        .map(|sg| sg.string_count_u32())
171        .collect();
172    let edge_counts: Vec<u32> = staging_graphs
173        .iter()
174        .map(|sg| sg.edge_count_u32())
175        .collect();
176
177    compute_commit_plan(
178        &node_counts,
179        &string_counts,
180        &edge_counts,
181        file_ids,
182        offsets.node_offset,
183        offsets.string_offset,
184    )
185}
186
187/// Phase 3 result: per-file edges, per-file node IDs, and total written
188/// counts for validation.
189pub struct Phase3Result {
190    /// Per-file edge collections for Phase 4 bulk insert.
191    pub per_file_edges: Vec<Vec<PendingEdge>>,
192    /// Per-file node IDs actually committed. Indexed identically to
193    /// `per_file_edges` — element `i` is the Vec of `NodeIds` committed
194    /// for `plan.file_plans[i]`. Empty Vec when that file wrote no
195    /// nodes (slot overflow skip, or a staging graph with only strings).
196    ///
197    /// Used by the caller to populate
198    /// [`crate::graph::unified::storage::registry::FileRegistry::record_node`],
199    /// which feeds the Gate 0c bucket-bijection debug invariant.
200    pub per_file_node_ids: Vec<Vec<NodeId>>,
201    /// Total nodes actually written (for validation against planned totals).
202    pub total_nodes_written: usize,
203    /// Total strings actually written (for validation against planned totals).
204    pub total_strings_written: usize,
205    /// Total edges collected across all files.
206    pub total_edges_collected: usize,
207    /// Per-chunk drained C indirect-call staging payloads (DESIGN §8.2).
208    ///
209    /// Populated when any file in the chunk staged a
210    /// [`super::staging::CIndirectStagingPayload`] (C plugin Phase 1, U10).
211    /// `None` for chunks containing no C files, keeping the wire-shape
212    /// budget unchanged for non-C workspaces.
213    ///
214    /// Consumed by [`apply_c_indirect_drain`] from `entrypoint.rs` after
215    /// Phase 4c-prime cross-file unification rebuilds the qualified-name
216    /// index — see U11 plumbing for the full Phase 3 → Phase 4 hand-off.
217    pub c_indirect_drain: Option<PhaseCIndirectDrain>,
218}
219
220/// Drained C indirect-call staging payload, resolved to owned `String`s.
221///
222/// The per-file
223/// [`super::staging::CIndirectStagingPayload`] contains:
224///   * `pending_address_taken_names: Vec<StringId>` — staging-local string
225///     ids that we resolve to owned `String`s via `staging.resolve_local_string`
226///     here so the post-4c-prime applier can re-intern through the canonical
227///     interner without holding any staging-graph reference;
228///   * `pending_struct_field_signatures: Vec<(String, String, String)>` —
229///     already owned;
230///   * `pending_bindings: Vec<PendingBinding>` — already owned;
231///   * `pending_indirect_callsites: Vec<PendingIndirectCallsite>` — already
232///     owned (carrier-side stamping of `FileId` happens here so the applier
233///     does not need per-file context);
234///   * `local_scope_index: Option<LocalScopeIndex>` — moved verbatim.
235///
236/// The applier ([`apply_c_indirect_drain`]) interns the owned strings into
237/// the **post-Phase-4a-dedup** graph interner, resolves names to canonical
238/// `NodeId`s via [`crate::graph::unified::storage::indices::AuxiliaryIndices::by_qualified_name`]
239/// (with a `by_name` fallback for languages whose canonical qualified name
240/// equals the semantic name and therefore leaves
241/// [`NodeEntry::qualified_name`] unset — e.g. C, where `cb_alpha` is its
242/// own qualified name), and writes them into
243/// [`CodeGraph::c_indirect_tables_mut`].
244///
245/// Per DESIGN §8.2, this drain bridges the parallel-parse-and-commit
246/// boundary (Phase 3) to the post-unification application step (Phase 4
247/// finalisation, just after Phase 4c-prime returns).
248#[derive(Debug, Default)]
249pub struct PhaseCIndirectDrain {
250    /// Address-taken function qualified-name entries to mark post-unification.
251    ///
252    /// Each entry pairs the bare/qualified function name the C plugin
253    /// captured in `helper.mark_function_address_taken_by_name(...)` with
254    /// the source `FileId` (always a C-language file by construction —
255    /// only the C plugin populates `CIndirectStagingPayload`).
256    ///
257    /// Per DESIGN §8.2 lines 1239-1241: "A pending list of
258    /// `(function_qualified_name, file_id)` for address-taken marks". The
259    /// `file_id` is the *origin* file (where the address-take site lives),
260    /// not the file of the resolved callable target. It is carried so the
261    /// applier can constrain the workspace-global `by_name` fallback in
262    /// [`crate::graph::unified::build::entrypoint::apply_deferred_address_taken_marks`]
263    /// to candidate nodes whose own owning file's language is `C` — a
264    /// non-C namesake (e.g. a Rust `fn cb_alpha`) must NOT be marked by
265    /// the C-scoped contract of SPEC §3.1.2.
266    ///
267    /// Duplicates on `function_qualified_name` are tolerated —
268    /// [`crate::graph::unified::storage::metadata::NodeMetadataStore::mark_address_taken`]
269    /// is idempotent.
270    pub address_taken_names: Vec<DeferredAddressTakenEntry>,
271    /// `(struct_tag, field_name, signature)` triples — DESIGN §3.2.2.
272    ///
273    /// Drained verbatim from the staging payload. The applier interns each
274    /// leg via `graph.strings_mut().intern(...)` and inserts into
275    /// `CIndirectSideTables::struct_field_fnptr`.
276    pub struct_field_signatures: Vec<(String, String, String)>,
277    /// Binding-plane entries (DESIGN §7.1) paired with their origin `FileId`.
278    ///
279    /// The applier resolves `instance_name` and `target_fn_name` to
280    /// canonical `NodeId`s and inserts a [`BindingEntry`] under the
281    /// interned `(struct_tag, field_name)` key in
282    /// `CIndirectSideTables::bindings_by_field`. The `FileId` is the
283    /// origin file (the C TU that staged the binding), retained for the
284    /// same C-language-scoped fallback rationale as
285    /// [`Self::address_taken_names`].
286    pub bindings: Vec<(FileId, PendingBinding)>,
287    /// Indirect callsites paired with their owning `FileId`. The applier
288    /// resolves `caller_qualified_name` to a `NodeId` and pushes an
289    /// [`IndirectCallsite`] onto `CIndirectSideTables::pending_callsites`.
290    /// `FileId` is stamped here from the per-file `FilePlan` so the applier
291    /// does not need per-file context.
292    pub indirect_callsites: Vec<(FileId, PendingIndirectCallsite)>,
293    /// Per-file block-scope arenas (DESIGN §4.1). Moved verbatim into
294    /// `CIndirectSideTables::local_scope_indices` keyed by `FileId`.
295    pub local_scope_indices: Vec<(FileId, LocalScopeIndex)>,
296}
297
298/// One deferred address-taken mark, carrying the origin `FileId`
299/// alongside the qualified function name (DESIGN §8.2 lines 1239-1241).
300///
301/// The origin `FileId` is always a C-language file by construction (only
302/// the C plugin populates `CIndirectStagingPayload`). It is retained on
303/// the drain so the post-unification applier can constrain the
304/// workspace-global `by_name` fallback to candidate nodes whose own
305/// owning file's language is `C` — defending against the SPEC §3.1.2
306/// "Every C `NodeKind::Function`" contract being widened to mark
307/// same-named non-C nodes (e.g. Rust `fn cb_alpha`, Python `def
308/// cb_alpha`) that happen to share a bare name with a C symbol.
309#[derive(Debug, Clone, PartialEq, Eq)]
310pub struct DeferredAddressTakenEntry {
311    /// Qualified function name as captured by
312    /// `helper.mark_function_address_taken_by_name(...)`.
313    pub function_qualified_name: String,
314    /// Origin C file that staged this address-taken site. Used only as
315    /// metadata for DESIGN §8.2 conformance and provenance — the
316    /// candidate-language filter in the applier compares each
317    /// candidate's owning-file language to `Language::C`, not to this
318    /// `file_id` directly (cross-TU address-takes are legal: a
319    /// `cb_alpha` declared in `a.c` may have its address taken in
320    /// `b.c`).
321    pub file_id: FileId,
322}
323
324impl PhaseCIndirectDrain {
325    /// Returns `true` when every drained vec/map is empty.
326    ///
327    /// Used by the chunk-accumulator in `entrypoint.rs` to skip Phase 4
328    /// application entirely for non-C workspaces, keeping the
329    /// `CodeGraph.c_indirect_tables` slot at its default `None`.
330    #[must_use]
331    pub fn is_empty(&self) -> bool {
332        self.address_taken_names.is_empty()
333            && self.struct_field_signatures.is_empty()
334            && self.bindings.is_empty()
335            && self.indirect_callsites.is_empty()
336            && self.local_scope_indices.is_empty()
337    }
338
339    /// Merge another drain into this one, taking ownership of its contents.
340    ///
341    /// Used by the chunk-loop in `entrypoint.rs` to accumulate per-chunk
342    /// drains into a single workspace-global drain before invoking
343    /// [`apply_c_indirect_drain`].
344    pub fn merge(&mut self, mut other: PhaseCIndirectDrain) {
345        self.address_taken_names
346            .append(&mut other.address_taken_names);
347        self.struct_field_signatures
348            .append(&mut other.struct_field_signatures);
349        self.bindings.append(&mut other.bindings);
350        self.indirect_callsites
351            .append(&mut other.indirect_callsites);
352        self.local_scope_indices
353            .append(&mut other.local_scope_indices);
354    }
355}
356
357/// Execute Phase 3: parallel commit into disjoint pre-allocated ranges.
358///
359/// Pre-splits arena and interner slices into per-file disjoint sub-slices
360/// using `split_at_mut`, then uses `rayon` `par_iter` for lock-free parallel
361/// writes. Each file's staging graph is committed independently.
362///
363/// Returns [`Phase3Result`] with per-file edges and written counts so the
364/// caller can validate against plan totals and truncate allocations on
365/// mismatch.
366///
367/// # Parameterisation over the mutation target
368///
369/// As of Task 4 Step 4 Phase 1, this function is generic over
370/// `G: GraphMutationTarget`. At the full-build call site in
371/// `build_unified_graph_inner` the target is `CodeGraph`; at the
372/// Task 4 Step 4 Phase 2+ incremental rebuild call site the target
373/// will be `RebuildGraph`. Both impls live in
374/// [`crate::graph::unified::mutation_target`]; see that module's
375/// docs for the field-coverage contract.
376///
377/// The function accesses exactly two fields via the trait —
378/// [`GraphMutationTarget::nodes_and_strings_mut`] — and pre-splits
379/// those two slices for the per-file parallel commit. Every other
380/// piece of the pipeline (the CSR/delta edge store, auxiliary
381/// indices, file registry, etc.) is untouched by this helper: the
382/// `PendingEdge` vectors in the returned [`Phase3Result`] are
383/// threaded through to Phase 4d (`pending_edges_to_delta` +
384/// `BidirectionalEdgeStore::add_edges_bulk_ordered`) by the caller.
385///
386/// # Panics
387///
388/// Panics if `plan.total_nodes` or `plan.total_strings` exceeds the
389/// pre-allocated range in the arena or interner.
390#[must_use]
391pub(crate) fn phase3_parallel_commit<
392    G: crate::graph::unified::mutation_target::GraphMutationTarget,
393>(
394    plan: &ChunkCommitPlan,
395    staging_graphs: &[&StagingGraph],
396    graph: &mut G,
397) -> Phase3Result {
398    if plan.file_plans.is_empty() {
399        return Phase3Result {
400            per_file_edges: Vec::new(),
401            per_file_node_ids: Vec::new(),
402            total_nodes_written: 0,
403            total_strings_written: 0,
404            total_edges_collected: 0,
405            c_indirect_drain: None,
406        };
407    }
408
409    // Determine the start of the pre-allocated ranges.
410    let node_start = plan.file_plans[0].node_range.start;
411    let string_start = plan.file_plans[0].string_range.start;
412
413    // Borrow the arena and interner disjointly via the mutation-plane
414    // trait. This is the one-and-only field access this helper makes
415    // on the graph; every downstream step operates on the resulting
416    // slices without revisiting `graph`.
417    let (arena, interner) = graph.nodes_and_strings_mut();
418
419    // Get mutable slices covering the entire pre-allocated region.
420    let node_slice = arena.bulk_slice_mut(node_start, plan.total_nodes);
421    let (str_slice, rc_slice) = interner.bulk_slices_mut(string_start, plan.total_strings);
422
423    // Pre-split into per-file disjoint sub-slices using split_at_mut.
424    let mut node_remaining = &mut *node_slice;
425    let mut str_remaining = &mut *str_slice;
426    let mut rc_remaining = &mut *rc_slice;
427
428    #[allow(clippy::type_complexity)]
429    let mut file_work: Vec<(
430        &mut [Slot<NodeEntry>],
431        &mut [Option<Arc<str>>],
432        &mut [u32],
433        &FilePlan,
434        usize,
435    )> = Vec::with_capacity(plan.file_plans.len());
436
437    for (i, file_plan) in plan.file_plans.iter().enumerate() {
438        let nc = (file_plan.node_range.end - file_plan.node_range.start) as usize;
439        let sc = (file_plan.string_range.end - file_plan.string_range.start) as usize;
440
441        let (n, nr) = node_remaining.split_at_mut(nc);
442        let (s, sr) = str_remaining.split_at_mut(sc);
443        let (r, rr) = rc_remaining.split_at_mut(sc);
444
445        file_work.push((n, s, r, file_plan, i));
446        node_remaining = nr;
447        str_remaining = sr;
448        rc_remaining = rr;
449    }
450
451    // Parallel commit — each closure owns disjoint slices, no contention.
452    let results: Vec<FileCommitResult> = file_work
453        .into_par_iter()
454        .map(|(node_slots, str_slots, rc_slots, file_plan, idx)| {
455            commit_single_file(
456                staging_graphs[idx],
457                file_plan,
458                node_slots,
459                str_slots,
460                rc_slots,
461            )
462        })
463        .collect();
464
465    let total_nodes_written: usize = results.iter().map(|r| r.nodes_written).sum();
466    let total_strings_written: usize = results.iter().map(|r| r.strings_written).sum();
467    let total_edges_collected: usize = results.iter().map(|r| r.edges.len()).sum();
468    let mut per_file_edges = Vec::with_capacity(results.len());
469    let mut per_file_node_ids = Vec::with_capacity(results.len());
470
471    // Cluster B3 (Go T1): aggregate per-file remapped Go hints. Each
472    // file's hints have already been remapped through that file's
473    // local→global NodeId / StringId tables inside `commit_single_file`,
474    // so the merge into the live target is a straightforward extend.
475    let mut all_embedding_hints: Vec<GoEmbeddingHint> = Vec::new();
476    let mut all_named_type_conversion_hints: Vec<GoNamedTypeConversionHint> = Vec::new();
477    let mut all_receiver_call_hints: Vec<GoReceiverCallHint> = Vec::new();
478    // Cluster D2.1: receiver-pointerness per Go method declaration. Same
479    // commit-time NodeId / StringId remap discipline as the other three
480    // vectors above; the consumer is Cluster D2's T1.1 pass and D2's
481    // tightening of D1's bucket classifier.
482    let mut all_method_receiver_hints: Vec<GoMethodReceiverHint> = Vec::new();
483    // Cluster D3 (Go T1): canonical signatures per Go method / function
484    // / named function-type declaration. Same remap discipline as the
485    // four hint vectors above. Consumer is the tightened T1.1
486    // satisfaction predicate and the new T1.3 function-signature
487    // implementation pass.
488    let mut all_method_signature_hints: Vec<GoMethodSignatureHint> = Vec::new();
489    let mut all_function_signature_hints: Vec<GoFunctionSignatureHint> = Vec::new();
490
491    for r in results {
492        per_file_edges.push(r.edges);
493        per_file_node_ids.push(r.node_ids);
494        all_embedding_hints.extend(r.embedding_hints);
495        all_named_type_conversion_hints.extend(r.named_type_conversion_hints);
496        all_receiver_call_hints.extend(r.receiver_call_hints);
497        all_method_receiver_hints.extend(r.method_receiver_hints);
498        all_method_signature_hints.extend(r.method_signature_hints);
499        all_function_signature_hints.extend(r.function_signature_hints);
500    }
501
502    // Cluster B3 / D2.1 / D3: merge aggregated hints into the live
503    // target. This closes the deferred wire-through noted in Cluster A:
504    // every per-file `StagingGraph::go_hints` now lands in the live
505    // target's `GoHints` buffer during Phase 3, with all NodeId /
506    // StringId references remapped to global identities. The
507    // post-Phase-4e `pass_go_method_set_satisfaction` will drain this
508    // buffer.
509    if !all_embedding_hints.is_empty()
510        || !all_named_type_conversion_hints.is_empty()
511        || !all_receiver_call_hints.is_empty()
512        || !all_method_receiver_hints.is_empty()
513        || !all_method_signature_hints.is_empty()
514        || !all_function_signature_hints.is_empty()
515    {
516        let go_hints = graph.go_hints_mut();
517        go_hints.embeddings.extend(all_embedding_hints);
518        go_hints
519            .named_type_conversions
520            .extend(all_named_type_conversion_hints);
521        go_hints.receiver_calls.extend(all_receiver_call_hints);
522        go_hints.method_receivers.extend(all_method_receiver_hints);
523        go_hints
524            .method_signatures
525            .extend(all_method_signature_hints);
526        go_hints
527            .function_signatures
528            .extend(all_function_signature_hints);
529    }
530
531    // --- C indirect-call drain (DESIGN §8.2 / U11) ---
532    //
533    // Sequentially walk the per-file staging graphs and drain each
534    // `CIndirectStagingPayload` into a single per-chunk
535    // [`PhaseCIndirectDrain`]. Sequential (not parallel) because: (a) the
536    // payloads are typically tiny — even a large C TU stages ~tens of
537    // bindings + ~dozens of callsites — and (b) the address-taken names
538    // need their staging-local `StringId`s resolved to owned strings while
539    // we still hold the source `StagingGraph` reference; once Phase 3
540    // returns, the chunk-local `ParsedFile`s drop and the staged strings
541    // become unrecoverable.
542    //
543    // Local-string resolution: `pending_address_taken_names` contains
544    // staging-local `StringId`s interned by `helper.intern(name)` (see
545    // `helper::mark_function_address_taken_by_name`). The applier needs
546    // the underlying `&str` to re-intern through the **post-dedup** graph
547    // interner, so we resolve here via `staging.resolve_local_string`.
548    // The string already had its `intern` ref-count bumped on stage —
549    // the post-4c-prime applier's re-intern produces the canonical global
550    // `StringId` independent of the staging-local id.
551    let c_indirect_drain = collect_c_indirect_drain(plan, staging_graphs);
552
553    Phase3Result {
554        per_file_edges,
555        per_file_node_ids,
556        total_nodes_written,
557        total_strings_written,
558        total_edges_collected,
559        c_indirect_drain,
560    }
561}
562
563/// Drain per-file C indirect-call staging payloads from the chunk.
564///
565/// Returns `Some(PhaseCIndirectDrain)` when at least one file in the chunk
566/// staged a `CIndirectStagingPayload`; otherwise `None` (non-C workspaces).
567/// Sequential rather than parallel — see commentary in
568/// [`phase3_parallel_commit`] for the rationale.
569fn collect_c_indirect_drain(
570    plan: &ChunkCommitPlan,
571    staging_graphs: &[&StagingGraph],
572) -> Option<PhaseCIndirectDrain> {
573    debug_assert_eq!(plan.file_plans.len(), staging_graphs.len());
574
575    let mut drain = PhaseCIndirectDrain::default();
576
577    for (file_plan, staging) in plan.file_plans.iter().zip(staging_graphs.iter()) {
578        let Some(payload) = staging.c_indirect() else {
579            continue;
580        };
581
582        // Resolve local string ids → owned Strings for address-taken names.
583        // A `None` from `resolve_local_string` would indicate a staging-API
584        // misuse (a non-local id was pushed). Skip with a warn rather than
585        // panic so a buggy plugin can't take down the build pipeline.
586        //
587        // Each captured entry pairs the resolved name with the origin
588        // `file_plan.file_id` (DESIGN §8.2 lines 1239-1241), allowing the
589        // post-unification applier to scope the workspace-global by_name
590        // fallback to C-language nodes.
591        for &local_id in &payload.pending_address_taken_names {
592            match staging.resolve_local_string(local_id) {
593                Some(name) => drain.address_taken_names.push(DeferredAddressTakenEntry {
594                    function_qualified_name: name.to_owned(),
595                    file_id: file_plan.file_id,
596                }),
597                None => log::warn!(
598                    "Phase 3 C-indirect drain: address-taken name local string id \
599                     {:?} did not resolve in staging graph for file {:?} — skipping. \
600                     This indicates the C plugin staged a non-local StringId via \
601                     helper.mark_function_address_taken_by_name.",
602                    local_id,
603                    file_plan.file_id,
604                ),
605            }
606        }
607
608        // `pending_struct_field_signatures` is already `Vec<(String, String, String)>`
609        // — clone the triple set into the drain. We clone (rather than
610        // mutate-take) because `staging` is a `&StagingGraph` shared borrow.
611        drain
612            .struct_field_signatures
613            .extend(payload.pending_struct_field_signatures.iter().cloned());
614
615        // `pending_bindings` is `Vec<PendingBinding>` (owned Strings).
616        // Stamp each binding with its origin `file_plan.file_id` so the
617        // post-unification applier can scope the by_name fallback for
618        // `instance_name` / `target_fn_name` resolution to C-language
619        // nodes (same rationale as `address_taken_names` above).
620        drain.bindings.extend(
621            payload
622                .pending_bindings
623                .iter()
624                .cloned()
625                .map(|b| (file_plan.file_id, b)),
626        );
627
628        // Stamp each indirect callsite with its FileId from the plan, then
629        // append. The applier needs the FileId to construct the persisted
630        // `IndirectCallsite` (which carries `caller: NodeId` + `file_id:
631        // FileId` rather than staging's `caller_qualified_name: String`).
632        drain.indirect_callsites.extend(
633            payload
634                .pending_indirect_callsites
635                .iter()
636                .cloned()
637                .map(|cs| (file_plan.file_id, cs)),
638        );
639
640        // Move the per-file scope index by clone (we hold `&StagingGraph`,
641        // so cannot take). `LocalScopeIndex: Clone` — see
642        // `c_indirect/scope_index.rs:21` documentation header.
643        if let Some(scope_index) = payload.local_scope_index.as_ref() {
644            drain
645                .local_scope_indices
646                .push((file_plan.file_id, scope_index.clone()));
647        }
648    }
649
650    if drain.is_empty() { None } else { Some(drain) }
651}
652
653/// Commit a single file's staging graph into pre-allocated disjoint ranges.
654///
655/// This function operates on slices that belong exclusively to this file,
656/// so it requires no locks or synchronization.
657///
658/// # Steps
659///
660/// 1. **Strings**: Extract `InternString` ops, write `Arc<str>` values into
661///    pre-allocated string slots, build local→global `StringId` remap.
662/// 2. **Nodes**: Extract `AddNode` ops, apply string remap to each `NodeEntry`,
663///    set `file_id`, write into pre-allocated node slots, build expected→actual
664///    `NodeId` remap.
665/// 3. **Edges**: Extract `AddEdge` ops, apply node ID remap to source/target,
666///    assign pre-computed sequence numbers, return as `PendingEdge` vec.
667// Result of committing a single file: edges + committed NodeIds + actual written counts.
668struct FileCommitResult {
669    edges: Vec<PendingEdge>,
670    /// Every `NodeId` committed into the arena for this file, in
671    /// commit order. Used by the sequential post-commit step that
672    /// populates `FileRegistry::per_file_nodes`.
673    node_ids: Vec<NodeId>,
674    nodes_written: usize,
675    strings_written: usize,
676    /// Cluster B3 (Go T1 implements-and-promotion): per-file
677    /// [`GoEmbeddingHint`] / [`GoNamedTypeConversionHint`] /
678    /// [`GoReceiverCallHint`] entries, with their staging-local
679    /// `NodeId` / `StringId` fields remapped to global identities via
680    /// the same tables that drive node + edge commit. The sequential
681    /// post-rayon step in [`phase3_parallel_commit`] aggregates these
682    /// across files and flushes the result into
683    /// [`crate::graph::unified::mutation_target::GraphMutationTarget::go_hints_mut`].
684    ///
685    /// Non-Go staging graphs leave all four vectors empty — no work
686    /// is performed for them.
687    embedding_hints: Vec<GoEmbeddingHint>,
688    named_type_conversion_hints: Vec<GoNamedTypeConversionHint>,
689    receiver_call_hints: Vec<GoReceiverCallHint>,
690    /// Cluster D2.1: per-method receiver-pointerness hints recovered from
691    /// the Go plugin's Phase-1 method emission sites. `method_node` and
692    /// `receiver_type_qualified_name` are remapped via the per-file
693    /// remap tables in the same `commit_single_file` step that drives
694    /// the other three hint vectors.
695    method_receiver_hints: Vec<GoMethodReceiverHint>,
696    /// Cluster D3: canonical-signature hints for Go method declarations
697    /// (top-level methods and interface methods). `method_node` is
698    /// remapped via the per-file node table. `canonical_signature` is a
699    /// plain `String` so no `StringId` remap is needed.
700    method_signature_hints: Vec<GoMethodSignatureHint>,
701    /// Cluster D3: canonical-signature hints for Go function
702    /// declarations and named function-type declarations. `function_node`
703    /// is remapped via the per-file node table; `canonical_signature` is
704    /// plain text.
705    function_signature_hints: Vec<GoFunctionSignatureHint>,
706}
707
708fn commit_single_file(
709    staging: &StagingGraph,
710    plan: &FilePlan,
711    node_slots: &mut [Slot<NodeEntry>],
712    str_slots: &mut [Option<Arc<str>>],
713    rc_slots: &mut [u32],
714) -> FileCommitResult {
715    let ops = staging.operations();
716
717    // --- Step 1: Write strings, build local→global remap ---
718    let (string_remap, strings_written) = write_strings(ops, plan, str_slots, rc_slots);
719
720    // --- Step 2: Write nodes, build expected→actual node ID remap ---
721    let (node_remap, nodes_written, node_ids) = write_nodes(ops, plan, node_slots, &string_remap);
722
723    // --- Step 3: Collect remapped edges with pre-assigned sequence numbers ---
724    let edges = collect_edges(ops, plan, &node_remap, &string_remap);
725
726    // --- Step 4 (Cluster B3 / D2.1 / D3): Remap Go side-channel hints ---
727    //
728    // The Go plugin captures staging-local NodeId / StringId values in
729    // GoHints during Phase-1 parse. Both ID spaces are file-local until
730    // Phase 3 writes the file's nodes and strings into the globally
731    // assigned ranges; once node_remap / string_remap exist, every hint
732    // gets the same identity rewrite as a PendingEdge.
733    let RemappedGoHints {
734        embeddings: embedding_hints,
735        named_type_conversions: named_type_conversion_hints,
736        receiver_calls: receiver_call_hints,
737        method_receivers: method_receiver_hints,
738        method_signatures: method_signature_hints,
739        function_signatures: function_signature_hints,
740    } = remap_go_hints(staging, &node_remap, &string_remap, plan);
741
742    FileCommitResult {
743        edges,
744        node_ids,
745        nodes_written,
746        strings_written,
747        embedding_hints,
748        named_type_conversion_hints,
749        receiver_call_hints,
750        method_receiver_hints,
751        method_signature_hints,
752        function_signature_hints,
753    }
754}
755
756/// Bundle returned by [`remap_go_hints`] so the per-file commit step can
757/// destructure without juggling a tuple of six vectors. Each field
758/// matches its sibling on [`FileCommitResult`].
759struct RemappedGoHints {
760    embeddings: Vec<GoEmbeddingHint>,
761    named_type_conversions: Vec<GoNamedTypeConversionHint>,
762    receiver_calls: Vec<GoReceiverCallHint>,
763    method_receivers: Vec<GoMethodReceiverHint>,
764    method_signatures: Vec<GoMethodSignatureHint>,
765    function_signatures: Vec<GoFunctionSignatureHint>,
766}
767
768/// Remap a `NodeId` through the per-file node-remap table.
769///
770/// Hint construction in the plugin uses staging-local `NodeIds` (assigned
771/// by `StagingGraph::add_node` / equivalents). After Phase 3 commit the
772/// canonical `NodeId` for each staged node lives in `node_remap`; the
773/// remap is identity for already-global IDs.
774fn remap_node_id_hint(id: NodeId, node_remap: &HashMap<NodeId, NodeId>) -> NodeId {
775    node_remap.get(&id).copied().unwrap_or(id)
776}
777
778/// Remap a `StringId` through the per-file string-remap table.
779///
780/// Local-tagged staging `StringIds` are mapped to their global slot ID;
781/// already-global IDs are passed through unchanged.
782fn remap_string_id_hint(id: StringId, string_remap: &HashMap<StringId, StringId>) -> StringId {
783    if id.is_local() {
784        string_remap.get(&id).copied().unwrap_or(id)
785    } else {
786        id
787    }
788}
789
790fn remap_receiver_hint(
791    receiver: &GoReceiverHintKind,
792    node_remap: &HashMap<NodeId, NodeId>,
793) -> GoReceiverHintKind {
794    match receiver {
795        GoReceiverHintKind::LocalIdent { binding_local } => GoReceiverHintKind::LocalIdent {
796            binding_local: remap_node_id_hint(*binding_local, node_remap),
797        },
798        // The Type-/Pointer-Prefixed and CallReturn variants carry plain
799        // `String` text, so no ID remap is required.
800        GoReceiverHintKind::TypePrefixed { type_text } => GoReceiverHintKind::TypePrefixed {
801            type_text: type_text.clone(),
802        },
803        GoReceiverHintKind::PointerPrefixed { type_text } => GoReceiverHintKind::PointerPrefixed {
804            type_text: type_text.clone(),
805        },
806        GoReceiverHintKind::CallReturn { callee_qn } => GoReceiverHintKind::CallReturn {
807            callee_qn: callee_qn.clone(),
808        },
809    }
810}
811
812/// Drain the staging graph's Go hint vectors, remap each entry's
813/// staging-local `NodeId` / `StringId` fields through the per-file
814/// remap tables built by Phase 3 commit, and return four globally-
815/// addressable vectors ready to be merged into the live target.
816///
817/// Non-Go staging graphs return empty vectors with no allocations
818/// beyond the empty `Vec::new()` headers.
819fn remap_go_hints(
820    staging: &StagingGraph,
821    node_remap: &HashMap<NodeId, NodeId>,
822    string_remap: &HashMap<StringId, StringId>,
823    plan: &FilePlan,
824) -> RemappedGoHints {
825    let hints = staging.go_hints();
826    if hints.embeddings.is_empty()
827        && hints.named_type_conversions.is_empty()
828        && hints.receiver_calls.is_empty()
829        && hints.method_receivers.is_empty()
830        && hints.method_signatures.is_empty()
831        && hints.function_signatures.is_empty()
832    {
833        return RemappedGoHints {
834            embeddings: Vec::new(),
835            named_type_conversions: Vec::new(),
836            receiver_calls: Vec::new(),
837            method_receivers: Vec::new(),
838            method_signatures: Vec::new(),
839            function_signatures: Vec::new(),
840        };
841    }
842
843    let embeddings: Vec<GoEmbeddingHint> = hints
844        .embeddings
845        .iter()
846        .map(|h| GoEmbeddingHint {
847            outer: remap_node_id_hint(h.outer, node_remap),
848            inner_qualified_name: remap_string_id_hint(h.inner_qualified_name, string_remap),
849            pointerness: h.pointerness,
850            file: plan.file_id,
851        })
852        .collect();
853
854    let named_type_conversions: Vec<GoNamedTypeConversionHint> = hints
855        .named_type_conversions
856        .iter()
857        .map(|h| GoNamedTypeConversionHint {
858            call_site: remap_node_id_hint(h.call_site, node_remap),
859            target_type_qualified_name: remap_string_id_hint(
860                h.target_type_qualified_name,
861                string_remap,
862            ),
863            argument_node: remap_node_id_hint(h.argument_node, node_remap),
864            file: plan.file_id,
865        })
866        .collect();
867
868    let receiver_calls: Vec<GoReceiverCallHint> = hints
869        .receiver_calls
870        .iter()
871        .map(|h| GoReceiverCallHint {
872            call_site: remap_node_id_hint(h.call_site, node_remap),
873            callee_method: remap_node_id_hint(h.callee_method, node_remap),
874            method_name: remap_string_id_hint(h.method_name, string_remap),
875            receiver: remap_receiver_hint(&h.receiver, node_remap),
876            argument_count: h.argument_count,
877            is_async: h.is_async,
878            file: plan.file_id,
879        })
880        .collect();
881
882    let method_receivers: Vec<GoMethodReceiverHint> = hints
883        .method_receivers
884        .iter()
885        .map(|h| GoMethodReceiverHint {
886            method_node: remap_node_id_hint(h.method_node, node_remap),
887            receiver_type_qualified_name: remap_string_id_hint(
888                h.receiver_type_qualified_name,
889                string_remap,
890            ),
891            receiver_pointerness: h.receiver_pointerness,
892            file: plan.file_id,
893        })
894        .collect();
895
896    // Cluster D3: method-signature and function-signature hints. Only
897    // the NodeId field requires remap; `canonical_signature` is a plain
898    // `String` produced by `canonicalise_go_signature` and is identity
899    // across the commit boundary.
900    let method_signatures: Vec<GoMethodSignatureHint> = hints
901        .method_signatures
902        .iter()
903        .map(|h| GoMethodSignatureHint {
904            method_node: remap_node_id_hint(h.method_node, node_remap),
905            canonical_signature: h.canonical_signature.clone(),
906            file: plan.file_id,
907        })
908        .collect();
909
910    let function_signatures: Vec<GoFunctionSignatureHint> = hints
911        .function_signatures
912        .iter()
913        .map(|h| GoFunctionSignatureHint {
914            function_node: remap_node_id_hint(h.function_node, node_remap),
915            canonical_signature: h.canonical_signature.clone(),
916            file: plan.file_id,
917        })
918        .collect();
919
920    RemappedGoHints {
921        embeddings,
922        named_type_conversions,
923        receiver_calls,
924        method_receivers,
925        method_signatures,
926        function_signatures,
927    }
928}
929
930/// Write staged strings into pre-allocated interner slots.
931///
932/// Validates that each `InternString` op has a local `StringId` and that
933/// no duplicate local IDs exist (matching the serial `commit_strings` checks).
934///
935/// Returns `(remap, strings_written)`.
936fn write_strings(
937    ops: &[StagingOp],
938    plan: &FilePlan,
939    str_slots: &mut [Option<Arc<str>>],
940    rc_slots: &mut [u32],
941) -> (HashMap<StringId, StringId>, usize) {
942    let mut remap = HashMap::new();
943    let mut string_cursor = 0usize;
944
945    for op in ops {
946        if let StagingOp::InternString { local_id, value } = op {
947            // Validate: only local IDs are allowed in staging (matching serial commit_strings)
948            assert!(
949                local_id.is_local(),
950                "non-local StringId {:?} in InternString op for file {:?}",
951                local_id,
952                plan.file_id,
953            );
954            // Validate: no duplicate local IDs (matching serial commit_strings)
955            assert!(
956                !remap.contains_key(local_id),
957                "duplicate local StringId {:?} in InternString op for file {:?}",
958                local_id,
959                plan.file_id,
960            );
961
962            if string_cursor >= str_slots.len() {
963                log::warn!(
964                    "string slot overflow in file {:?}: cursor={string_cursor}, slots={}, skipping remaining strings",
965                    plan.file_id,
966                    str_slots.len()
967                );
968                break;
969            }
970
971            // The global StringId for this string is the pre-allocated slot index.
972            #[allow(clippy::cast_possible_truncation)] // cursor is bounded by allocated slot count
973            let global_id = StringId::new(plan.string_range.start + string_cursor as u32);
974
975            // Write the string into the pre-allocated slot.
976            str_slots[string_cursor] = Some(Arc::from(value.as_str()));
977            rc_slots[string_cursor] = 1;
978
979            remap.insert(*local_id, global_id);
980            string_cursor += 1;
981        }
982    }
983
984    (remap, string_cursor)
985}
986
987/// Remap all `StringId` fields in a `NodeEntry` using a local→global table.
988///
989/// Required field (`name`) is always remapped if local.
990/// Optional fields (`signature`, `doc`, `qualified_name`, `visibility`)
991/// are remapped if present and local.
992fn remap_node_entry_string_ids(entry: &mut NodeEntry, remap: &HashMap<StringId, StringId>) {
993    remap_required_local(&mut entry.name, remap);
994    remap_option_local(&mut entry.signature, remap);
995    remap_option_local(&mut entry.doc, remap);
996    remap_option_local(&mut entry.qualified_name, remap);
997    remap_option_local(&mut entry.visibility, remap);
998}
999
1000/// Remap all local `StringId` fields in an `EdgeKind`.
1001///
1002/// Uses the same exhaustive match as `remap_edge_kind_string_ids`, but
1003/// only remaps local IDs (those with `LOCAL_TAG_BIT` set).
1004#[allow(clippy::match_same_arms)]
1005fn remap_edge_kind_local_string_ids(kind: &mut EdgeKind, remap: &HashMap<StringId, StringId>) {
1006    match kind {
1007        EdgeKind::Imports { alias, .. } => remap_option_local(alias, remap),
1008        EdgeKind::Exports { alias, .. } => remap_option_local(alias, remap),
1009        EdgeKind::TypeOf { name, .. } => remap_option_local(name, remap),
1010        EdgeKind::TraitMethodBinding {
1011            trait_name,
1012            impl_type,
1013            ..
1014        } => {
1015            remap_required_local(trait_name, remap);
1016            remap_required_local(impl_type, remap);
1017        }
1018        EdgeKind::HttpRequest { url, .. } => remap_option_local(url, remap),
1019        EdgeKind::GrpcCall { service, method } => {
1020            remap_required_local(service, remap);
1021            remap_required_local(method, remap);
1022        }
1023        EdgeKind::DbQuery { table, .. } => remap_option_local(table, remap),
1024        EdgeKind::TableRead { table_name, schema } => {
1025            remap_required_local(table_name, remap);
1026            remap_option_local(schema, remap);
1027        }
1028        EdgeKind::TableWrite {
1029            table_name, schema, ..
1030        } => {
1031            remap_required_local(table_name, remap);
1032            remap_option_local(schema, remap);
1033        }
1034        EdgeKind::TriggeredBy {
1035            trigger_name,
1036            schema,
1037        } => {
1038            remap_required_local(trigger_name, remap);
1039            remap_option_local(schema, remap);
1040        }
1041        EdgeKind::MessageQueue { protocol, topic } => {
1042            if let MqProtocol::Other(s) = protocol {
1043                remap_required_local(s, remap);
1044            }
1045            remap_option_local(topic, remap);
1046        }
1047        EdgeKind::WebSocket { event } => remap_option_local(event, remap),
1048        EdgeKind::GraphQLOperation { operation } => remap_required_local(operation, remap),
1049        EdgeKind::ProcessExec { command } => remap_required_local(command, remap),
1050        EdgeKind::FileIpc { path_pattern } => remap_option_local(path_pattern, remap),
1051        EdgeKind::ProtocolCall { protocol, metadata } => {
1052            remap_required_local(protocol, remap);
1053            remap_option_local(metadata, remap);
1054        }
1055        // T2.5: remap each TypeArg.name local StringId.
1056        EdgeKind::Instantiates { type_args, .. } => {
1057            for ta in type_args.iter_mut() {
1058                remap_required_local(&mut ta.name, remap);
1059            }
1060        }
1061        // Variants without StringId fields — exhaustive, no wildcard.
1062        EdgeKind::Defines
1063        | EdgeKind::Contains
1064        | EdgeKind::Calls { .. }
1065        | EdgeKind::References
1066        | EdgeKind::Inherits
1067        | EdgeKind::Implements
1068        | EdgeKind::LifetimeConstraint { .. }
1069        | EdgeKind::MacroExpansion { .. }
1070        | EdgeKind::FfiCall { .. }
1071        | EdgeKind::WebAssemblyCall
1072        | EdgeKind::GenericBound
1073        | EdgeKind::AnnotatedWith
1074        | EdgeKind::AnnotationParam
1075        | EdgeKind::LambdaCaptures
1076        | EdgeKind::ModuleExports
1077        | EdgeKind::ModuleRequires
1078        | EdgeKind::ModuleOpens
1079        | EdgeKind::ModuleProvides
1080        | EdgeKind::TypeArgument
1081        | EdgeKind::ExtensionReceiver
1082        | EdgeKind::CompanionOf
1083        | EdgeKind::SealedPermit
1084        // T3 Wraps carries WrapKind (Copy) + Option<u16>; no StringId fields.
1085        | EdgeKind::Wraps { .. }
1086        // T2.4 ChannelPeer carries only Copy enums; no StringId fields.
1087        | EdgeKind::ChannelPeer { .. } => {}
1088    }
1089}
1090
1091/// Remap a required local `StringId` in place.
1092///
1093/// Panics if a local ID has no mapping, matching the serial
1094/// `apply_string_remap` behavior that returned `UnmappedLocalStringId`.
1095fn remap_required_local(id: &mut StringId, remap: &HashMap<StringId, StringId>) {
1096    if id.is_local() {
1097        let global = remap.get(id).unwrap_or_else(|| {
1098            panic!("unmapped local StringId {id:?} — missing intern_string op?")
1099        });
1100        *id = *global;
1101    }
1102}
1103
1104/// Remap an optional local `StringId` in place.
1105fn remap_option_local(opt: &mut Option<StringId>, remap: &HashMap<StringId, StringId>) {
1106    if let Some(id) = opt
1107        && id.is_local()
1108    {
1109        let global = remap.get(id).unwrap_or_else(|| {
1110            panic!("unmapped local StringId {id:?} — missing intern_string op?")
1111        });
1112        *id = *global;
1113    }
1114}
1115
1116/// Write staged nodes into pre-allocated arena slots.
1117///
1118/// Returns `(remap, nodes_written, node_ids)`. `node_ids` is the Vec of
1119/// every `NodeId` committed for this file, in commit order, for use by
1120/// the sequential bucket-population post-step.
1121fn write_nodes(
1122    ops: &[StagingOp],
1123    plan: &FilePlan,
1124    node_slots: &mut [Slot<NodeEntry>],
1125    string_remap: &HashMap<StringId, StringId>,
1126) -> (HashMap<NodeId, NodeId>, usize, Vec<NodeId>) {
1127    let mut node_remap = HashMap::new();
1128    let mut node_cursor = 0usize;
1129    let mut node_ids: Vec<NodeId> = Vec::with_capacity(node_slots.len());
1130
1131    for op in ops {
1132        if let StagingOp::AddNode {
1133            entry, expected_id, ..
1134        } = op
1135        {
1136            if node_cursor >= node_slots.len() {
1137                log::warn!(
1138                    "node slot overflow in file {:?}: cursor={node_cursor}, slots={}, skipping remaining nodes",
1139                    plan.file_id,
1140                    node_slots.len()
1141                );
1142                break;
1143            }
1144
1145            let mut entry = entry.clone();
1146
1147            // Apply string remap to all StringId fields in the entry.
1148            remap_node_entry_string_ids(&mut entry, string_remap);
1149
1150            // Set the file ID from the plan.
1151            entry.file = plan.file_id;
1152
1153            // The actual NodeId is the pre-allocated slot index with generation 1.
1154            #[allow(clippy::cast_possible_truncation)] // cursor is bounded by allocated slot count
1155            let actual_index = plan.node_range.start + node_cursor as u32;
1156            let actual_id = NodeId::new(actual_index, 1);
1157
1158            // Write into the pre-allocated slot.
1159            node_slots[node_cursor] = Slot::new_occupied(1, entry);
1160
1161            if let Some(expected) = expected_id {
1162                node_remap.insert(*expected, actual_id);
1163            }
1164
1165            node_ids.push(actual_id);
1166            node_cursor += 1;
1167        }
1168    }
1169
1170    (node_remap, node_cursor, node_ids)
1171}
1172
1173/// Collect staged edges with remapped node IDs, string IDs, and pre-assigned
1174/// sequence numbers.
1175fn collect_edges(
1176    ops: &[StagingOp],
1177    plan: &FilePlan,
1178    node_remap: &HashMap<NodeId, NodeId>,
1179    string_remap: &HashMap<StringId, StringId>,
1180) -> Vec<PendingEdge> {
1181    let mut edges = Vec::new();
1182
1183    for op in ops {
1184        if let StagingOp::AddEdge {
1185            source,
1186            target,
1187            kind,
1188            spans,
1189            ..
1190        } = op
1191        {
1192            let actual_source = node_remap.get(source).copied().unwrap_or(*source);
1193            let actual_target = node_remap.get(target).copied().unwrap_or(*target);
1194
1195            // Clone and remap any local StringIds in the EdgeKind.
1196            let mut remapped_kind = kind.clone();
1197            remap_edge_kind_local_string_ids(&mut remapped_kind, string_remap);
1198
1199            edges.push(PendingEdge {
1200                source: actual_source,
1201                target: actual_target,
1202                kind: remapped_kind,
1203                file: plan.file_id,
1204                spans: spans.clone(),
1205            });
1206        }
1207    }
1208
1209    edges
1210}
1211
1212/// Remap a required `StringId` using the dedup remap table.
1213///
1214/// If the ID is in the remap table, it is replaced with the canonical ID.
1215/// Otherwise, it is left unchanged (identity mapping).
1216#[allow(clippy::implicit_hasher)]
1217pub fn remap_string_id(id: &mut StringId, remap: &HashMap<StringId, StringId>) {
1218    if let Some(&canonical) = remap.get(id) {
1219        *id = canonical;
1220    }
1221}
1222
1223/// Remap an optional `StringId` using the dedup remap table.
1224#[allow(clippy::implicit_hasher)]
1225pub fn remap_option_string_id(id: &mut Option<StringId>, remap: &HashMap<StringId, StringId>) {
1226    if let Some(inner) = id {
1227        remap_string_id(inner, remap);
1228    }
1229}
1230
1231/// Exhaustive remap of all `StringId` fields in an `EdgeKind`.
1232///
1233/// No wildcard arm — the compiler ensures completeness when new variants
1234/// are added to `EdgeKind`.
1235#[allow(clippy::match_same_arms, clippy::implicit_hasher)] // Arms are separated by category for documentation clarity
1236pub fn remap_edge_kind_string_ids(kind: &mut EdgeKind, remap: &HashMap<StringId, StringId>) {
1237    match kind {
1238        // === Variants WITH StringId fields ===
1239        EdgeKind::Imports { alias, .. } => remap_option_string_id(alias, remap),
1240        EdgeKind::Exports { alias, .. } => remap_option_string_id(alias, remap),
1241        EdgeKind::TypeOf { name, .. } => remap_option_string_id(name, remap),
1242        EdgeKind::TraitMethodBinding {
1243            trait_name,
1244            impl_type,
1245            ..
1246        } => {
1247            remap_string_id(trait_name, remap);
1248            remap_string_id(impl_type, remap);
1249        }
1250        EdgeKind::HttpRequest { url, .. } => remap_option_string_id(url, remap),
1251        EdgeKind::GrpcCall { service, method } => {
1252            remap_string_id(service, remap);
1253            remap_string_id(method, remap);
1254        }
1255        EdgeKind::DbQuery { table, .. } => remap_option_string_id(table, remap),
1256        EdgeKind::TableRead { table_name, schema } => {
1257            remap_string_id(table_name, remap);
1258            remap_option_string_id(schema, remap);
1259        }
1260        EdgeKind::TableWrite {
1261            table_name, schema, ..
1262        } => {
1263            remap_string_id(table_name, remap);
1264            remap_option_string_id(schema, remap);
1265        }
1266        EdgeKind::TriggeredBy {
1267            trigger_name,
1268            schema,
1269        } => {
1270            remap_string_id(trigger_name, remap);
1271            remap_option_string_id(schema, remap);
1272        }
1273        EdgeKind::MessageQueue { protocol, topic } => {
1274            if let MqProtocol::Other(s) = protocol {
1275                remap_string_id(s, remap);
1276            }
1277            remap_option_string_id(topic, remap);
1278        }
1279        EdgeKind::WebSocket { event } => remap_option_string_id(event, remap),
1280        EdgeKind::GraphQLOperation { operation } => remap_string_id(operation, remap),
1281        EdgeKind::ProcessExec { command } => remap_string_id(command, remap),
1282        EdgeKind::FileIpc { path_pattern } => remap_option_string_id(path_pattern, remap),
1283        EdgeKind::ProtocolCall { protocol, metadata } => {
1284            remap_string_id(protocol, remap);
1285            remap_option_string_id(metadata, remap);
1286        }
1287        // T2.5: remap each TypeArg.name StringId. This is the site actually
1288        // called during the Phase 4d edge-bulk-insert pipeline; without it
1289        // every TypeArg.name dangles after global string dedup.
1290        EdgeKind::Instantiates { type_args, .. } => {
1291            for ta in type_args.iter_mut() {
1292                remap_string_id(&mut ta.name, remap);
1293            }
1294        }
1295        // === Variants WITHOUT StringId fields — exhaustive, no wildcard ===
1296        EdgeKind::Defines
1297        | EdgeKind::Contains
1298        | EdgeKind::Calls { .. }
1299        | EdgeKind::References
1300        | EdgeKind::Inherits
1301        | EdgeKind::Implements
1302        | EdgeKind::LifetimeConstraint { .. }
1303        | EdgeKind::MacroExpansion { .. }
1304        | EdgeKind::FfiCall { .. }
1305        | EdgeKind::WebAssemblyCall
1306        | EdgeKind::GenericBound
1307        | EdgeKind::AnnotatedWith
1308        | EdgeKind::AnnotationParam
1309        | EdgeKind::LambdaCaptures
1310        | EdgeKind::ModuleExports
1311        | EdgeKind::ModuleRequires
1312        | EdgeKind::ModuleOpens
1313        | EdgeKind::ModuleProvides
1314        | EdgeKind::TypeArgument
1315        | EdgeKind::ExtensionReceiver
1316        | EdgeKind::CompanionOf
1317        | EdgeKind::SealedPermit
1318        // T3 Wraps carries WrapKind (Copy) + Option<u16>; no StringId fields.
1319        | EdgeKind::Wraps { .. }
1320        // T2.4 ChannelPeer carries only Copy enums; no StringId fields.
1321        | EdgeKind::ChannelPeer { .. } => {}
1322    }
1323}
1324
1325// === Phase 4: Post-chunk Finalization ===
1326
1327/// Apply global string dedup remap to all `StringId` fields in a `NodeEntry`.
1328///
1329/// This is the Phase 4 counterpart to `remap_node_entry_string_ids` (Phase 3).
1330/// Phase 3 remaps local→global; Phase 4 remaps duplicate global→canonical global.
1331#[allow(clippy::implicit_hasher)]
1332pub fn remap_node_entry_global(entry: &mut NodeEntry, remap: &HashMap<StringId, StringId>) {
1333    remap_string_id(&mut entry.name, remap);
1334    remap_option_string_id(&mut entry.signature, remap);
1335    remap_option_string_id(&mut entry.doc, remap);
1336    remap_option_string_id(&mut entry.qualified_name, remap);
1337    remap_option_string_id(&mut entry.visibility, remap);
1338}
1339
1340/// Apply global string dedup remap to all nodes in the arena and all pending edges.
1341///
1342/// This is Phase 4b of the parallel commit pipeline. After `build_dedup_table()`
1343/// produces a remap table, this function applies it to every `StringId` in:
1344/// - All `NodeEntry` fields in the arena
1345/// - All `EdgeKind` fields in the pending edges
1346#[allow(clippy::implicit_hasher)]
1347pub fn phase4_apply_global_remap(
1348    arena: &mut NodeArena,
1349    all_edges: &mut [Vec<PendingEdge>],
1350    remap: &HashMap<StringId, StringId>,
1351) {
1352    if remap.is_empty() {
1353        return;
1354    }
1355
1356    // Remap all nodes
1357    for (_id, entry) in arena.iter_mut() {
1358        remap_node_entry_global(entry, remap);
1359    }
1360
1361    // Remap all edges
1362    for file_edges in all_edges.iter_mut() {
1363        for edge in file_edges.iter_mut() {
1364            remap_edge_kind_string_ids(&mut edge.kind, remap);
1365        }
1366    }
1367}
1368
1369/// Statistics from Phase 4c-prime cross-file node unification.
1370#[derive(Debug, Default)]
1371pub struct UnificationStats {
1372    /// Total (`qualified_name`, kind) groups of size >= 2 examined.
1373    pub candidate_pairs_examined: usize,
1374    /// Number of loser nodes merged into winners.
1375    pub nodes_merged: usize,
1376    /// Number of `PendingEdge` fields rewritten.
1377    pub edges_rewritten: usize,
1378    /// Number of loser nodes (metadata merged into winners, slot kept inert).
1379    pub nodes_inert: usize,
1380    /// Time spent in the unification pass (milliseconds).
1381    pub elapsed_ms: u64,
1382}
1383
1384fn collect_unification_path_keys<G>(
1385    graph: &G,
1386    groups_to_unify: &[Vec<NodeId>],
1387) -> HashMap<NodeId, String>
1388where
1389    G: crate::graph::unified::mutation_target::GraphMutationTarget,
1390{
1391    use crate::graph::unified::mutation_target::GraphMutationTarget;
1392
1393    let arena = GraphMutationTarget::nodes(graph);
1394    let files = GraphMutationTarget::files(graph);
1395    let mut out = HashMap::with_capacity(groups_to_unify.iter().map(Vec::len).sum());
1396    for group in groups_to_unify {
1397        for &node_id in group {
1398            if out.contains_key(&node_id) {
1399                continue;
1400            }
1401            let key = arena
1402                .get(node_id)
1403                .and_then(|entry| files.resolve(entry.file))
1404                .map_or_else(String::new, |path| path.to_string_lossy().into_owned());
1405            out.insert(node_id, key);
1406        }
1407    }
1408    out
1409}
1410
1411fn select_unification_winner<G>(
1412    graph: &G,
1413    group: &[NodeId],
1414    path_keys: &HashMap<NodeId, String>,
1415    empty_path_key: &String,
1416) -> NodeId
1417where
1418    G: crate::graph::unified::mutation_target::GraphMutationTarget,
1419{
1420    use crate::graph::unified::mutation_target::GraphMutationTarget;
1421
1422    *group
1423        .iter()
1424        .max_by(|&&a, &&b| {
1425            let ea = GraphMutationTarget::nodes(graph).get(a);
1426            let eb = GraphMutationTarget::nodes(graph).get(b);
1427            match (ea, eb) {
1428                (Some(ea), Some(eb)) => {
1429                    let a_real = ea.start_line > 0;
1430                    let b_real = eb.start_line > 0;
1431                    match (a_real, b_real) {
1432                        (true, false) => std::cmp::Ordering::Greater,
1433                        (false, true) => std::cmp::Ordering::Less,
1434                        _ => {
1435                            let a_range = ea.end_line.saturating_sub(ea.start_line);
1436                            let b_range = eb.end_line.saturating_sub(eb.start_line);
1437                            a_range
1438                                .cmp(&b_range)
1439                                .then_with(|| {
1440                                    let pa = path_keys.get(&a).unwrap_or(empty_path_key);
1441                                    let pb = path_keys.get(&b).unwrap_or(empty_path_key);
1442                                    pb.cmp(pa)
1443                                })
1444                                .then_with(|| b.index().cmp(&a.index()))
1445                        }
1446                    }
1447                }
1448                (Some(_), None) => std::cmp::Ordering::Greater,
1449                (None, Some(_)) => std::cmp::Ordering::Less,
1450                (None, None) => std::cmp::Ordering::Equal,
1451            }
1452        })
1453        .expect("group is non-empty")
1454}
1455
1456/// Phase 4c-prime: Unify cross-file duplicate nodes sharing the same
1457/// canonical qualified name and a call-compatible kind.
1458///
1459/// Runs after `rebuild_indices` (Phase 4c) which populates `by_qualified_name`,
1460/// and before `pending_edges_to_delta` (Phase 4d) so the remap operates on
1461/// `PendingEdge` targets, not committed `DeltaEdge`s.
1462///
1463/// **Winner selection**: Among nodes sharing a qualified name and call-compatible
1464/// kinds, the node with `start_line > 0` wins. Tie-break in order:
1465///   1. Wider `end_line - start_line` span.
1466///   2. **Lexicographically smallest file path** (resolved via the rebuild
1467///      plane's [`FileRegistry`]). Phase 3e correctness requires the
1468///      path-based tie-break rather than the previous `FileId` comparison,
1469///      because `FileId` slot assignment differs between a fresh full
1470///      rebuild and an incremental rebuild — the incremental path clones
1471///      the existing `FileRegistry` and appends new paths, while the full
1472///      path assigns `FileIds` in filesystem-walk order from an empty
1473///      registry. Two builds of the same logical workspace therefore
1474///      disagree on which `FileId` is smaller when duplicate definitions
1475///      tie on span width, flipping the unification winner and stranding
1476///      `qualified_name` on the wrong side of the merge. Tie-breaking on
1477///      the (stable-across-builds) path makes winner selection
1478///      representation-independent.
1479///   3. Final fallback: smaller `NodeId::index()` when paths also tie
1480///      (e.g. two definitions in the same file — rare but possible via
1481///      duplicate declarations). `NodeId` is deterministic within a
1482///      single build so this keeps the fallback stable for any individual
1483///      build even if it isn't invariant across representations.
1484///
1485/// **Safety**: Caller must hold an exclusive write lock on the graph.
1486pub(crate) fn phase4c_prime_unify_cross_file_nodes<
1487    G: crate::graph::unified::mutation_target::GraphMutationTarget,
1488>(
1489    graph: &mut G,
1490    all_edges: &mut [Vec<PendingEdge>],
1491) -> (UnificationStats, super::unification::NodeRemapTable) {
1492    use crate::graph::unified::mutation_target::GraphMutationTarget;
1493
1494    use super::helper::CALL_COMPATIBLE_KINDS;
1495    use super::unification::{NodeRemapTable, merge_node_into};
1496    use std::time::Instant;
1497
1498    let start = Instant::now();
1499    let mut stats = UnificationStats::default();
1500
1501    // Collect candidates: walk arena, group by qualified_name for nodes
1502    // with call-compatible kinds. Only groups of size >= 2 need unification.
1503    let mut qn_groups: HashMap<crate::graph::unified::string::StringId, Vec<NodeId>> =
1504        HashMap::new();
1505
1506    for (node_id, entry) in GraphMutationTarget::nodes(graph).iter() {
1507        if !CALL_COMPATIBLE_KINDS.contains(&entry.kind) {
1508            continue;
1509        }
1510        if let Some(qn_id) = entry.qualified_name {
1511            qn_groups.entry(qn_id).or_default().push(node_id);
1512        }
1513    }
1514
1515    // Filter to groups with 2+ members
1516    let groups_to_unify: Vec<Vec<NodeId>> = qn_groups
1517        .into_values()
1518        .filter(|group| {
1519            if group.len() >= 2 {
1520                stats.candidate_pairs_examined += 1;
1521                true
1522            } else {
1523                false
1524            }
1525        })
1526        .collect();
1527
1528    // Now perform merges
1529    let mut remap = NodeRemapTable::with_capacity(groups_to_unify.len());
1530
1531    // Pre-resolve every candidate node's canonical path-based tie-break
1532    // key into an owned `String` keyed by `NodeId`. Lifting the resolution
1533    // here instead of inside the `max_by` comparator avoids re-borrowing
1534    // `graph` immutably from a closure that lives across the
1535    // `merge_node_into(&mut graph, …)` call below. Without this
1536    // precomputation the borrow checker rejects the mutation loop because
1537    // the comparator closure captures the immutable borrow of `graph`
1538    // required by `path_key`.
1539    //
1540    // Path conversion goes through `Arc<Path>::to_string_lossy()` because
1541    // `Path` does not implement `Ord` lexicographically across platforms
1542    // consistently; forcing a canonical string form keeps the tie-break
1543    // deterministic on any host filesystem. When the registry can't
1544    // resolve a `FileId` (shouldn't happen in practice — every live
1545    // node's `FileId` was registered before the node was allocated) we
1546    // fall back to an empty string so the comparison still produces a
1547    // total order. Empty resolves tie-break each other stably (then fall
1548    // through to the `NodeId` index tie-break).
1549    let path_keys = collect_unification_path_keys(graph, &groups_to_unify);
1550    let empty_path_key = String::new();
1551
1552    for group in &groups_to_unify {
1553        // Pick winner: prefer start_line > 0, tie-break by wider span,
1554        // then smaller path (stable across rebuild representations),
1555        // then smaller NodeId index.
1556        let winner_id = select_unification_winner(graph, group, &path_keys, &empty_path_key);
1557
1558        // Merge all losers into winner
1559        for &node_id in group {
1560            if node_id == winner_id {
1561                continue;
1562            }
1563            match merge_node_into(GraphMutationTarget::nodes_mut(graph), node_id, winner_id) {
1564                Ok(()) => {
1565                    remap.insert(node_id, winner_id);
1566                    stats.nodes_merged += 1;
1567                    stats.nodes_inert += 1;
1568                }
1569                Err(e) => {
1570                    log::debug!("Phase 4c-prime: skipping merge ({e})");
1571                }
1572            }
1573        }
1574    }
1575
1576    // Apply remap table to all pending edges AND to every committed
1577    // edge already in the graph's edge store.
1578    //
1579    // The `apply_to_edges` call keeps PendingEdges (the output of this
1580    // chunk's parallel commit) pointing at canonical winners before
1581    // Phase 4d converts them into `DeltaEdge`s. On a full build that is
1582    // sufficient — no committed edges exist yet.
1583    //
1584    // The `apply_to_committed_edges` call closes the Phase 3e incremental
1585    // gap: the rebuild plane clones the pre-edit graph's committed edges
1586    // via `clone_for_rebuild`, so a newly-reparsed file whose definition
1587    // becomes the unification winner can leave surviving cross-file
1588    // edges pointing at what is now an inert loser slot. Retargeting the
1589    // committed edges through `remap` is the only way those edges
1590    // observe the canonical winner after finalize. On a full build the
1591    // second call is a no-op (edge store is empty).
1592    if !remap.is_empty() {
1593        let pre_count: usize = all_edges.iter().map(std::vec::Vec::len).sum();
1594        remap.apply_to_edges(all_edges);
1595        remap.apply_to_committed_edges(GraphMutationTarget::edges(graph));
1596        stats.edges_rewritten = pre_count; // conservative: all edges walked
1597
1598        // Keep FileRegistry::per_file_nodes consistent with the arena.
1599        //
1600        // [`merge_node_into`] (see `unification.rs`) intentionally does
1601        // **not** vacate the loser slot — the slot stays `Occupied` but
1602        // inert so `NodeArena::slot_count()` (which CSR row_ptr sizing
1603        // depends on) is preserved. Because the slot is still live per
1604        // `NodeArena::iter()`, the §F.1 bucket bijection would panic
1605        // with "live node absent from all buckets" if we purged losers
1606        // from their home bucket.
1607        //
1608        // Therefore: losers stay in whichever per-file bucket Phase 3
1609        // first committed them to. That bucket's `FileId` matches the
1610        // loser's `NodeEntry.file`, so (c) passes. Each loser is in
1611        // exactly one bucket, so (b) passes. Every live arena slot is
1612        // accounted for by some bucket, so (d) passes. The §K master
1613        // matrix already admits this semantics — inert merged-losers
1614        // are semantically equivalent to any other live `NodeArena`
1615        // entry for bucket-membership purposes.
1616        //
1617        // Name-resolution containment (Gate 0d iter-1 blocker).
1618        //
1619        // `merge_node_into` now ALSO clears the loser's `name` and
1620        // `qualified_name` fields (to `StringId::INVALID` / `None`), and
1621        // `AuxiliaryIndices::build_from_arena` skips any arena entry
1622        // whose `name == StringId::INVALID` when rebuilding the name,
1623        // qualified-name, kind, and file buckets. The second
1624        // `rebuild_indices()` call in `build_unified_graph_inner`
1625        // (entrypoint.rs:571, right below this function) runs AFTER
1626        // unification, so the buckets surfaced by `indices.by_name` /
1627        // `by_qualified_name` / `by_kind` / `by_file` contain only
1628        // winners — every public name-resolution surface
1629        // (`resolution::exact_qualified_bucket`,
1630        // `graph::find_by_pattern`, etc.) is therefore free of
1631        // unified-away duplicates. The only publish-visible bucket that
1632        // still references losers is `FileRegistry::per_file_nodes`,
1633        // which preserves the §F.1 bucket bijection without surfacing
1634        // them through name resolution.
1635        //
1636        // Historical note: an earlier iteration of this pass called
1637        // `retain_nodes_in_buckets` to purge losers; that matched a
1638        // stale understanding where `merge_node_into` was expected to
1639        // vacate the slot. Gate 0d's bucket-bijection invariant
1640        // surfaced the mismatch (every full rebuild produced a live
1641        // slot with no bucket entry). The fix is to align with the
1642        // unification contract: inert slots remain in their home
1643        // bucket, but `AuxiliaryIndices` treats them as name-invisible.
1644    }
1645
1646    stats.elapsed_ms = u64::try_from(start.elapsed().as_millis()).unwrap_or(u64::MAX);
1647    // Return the remap alongside the stats so the new Phase 4d-prime
1648    // (`phase4d_prime_propagate_staging_metadata`, 02_DESIGN §4.3.e
1649    // Changes 2 + 4) can drop loser-keyed metadata before merging the
1650    // per-file staging stores into `CodeGraph::macro_metadata`. The
1651    // `apply_to_edges` / `apply_to_committed_edges` calls above have
1652    // already consumed `remap` for edge retargeting; the returned table
1653    // is the same authoritative map used downstream.
1654    (stats, remap)
1655}
1656
1657/// Rekey a per-file staging `NodeMetadataStore` from staging-local
1658/// `NodeId`s to canonical arena `NodeId`s using the per-file commit
1659/// order.
1660///
1661/// `02_DESIGN` §4.3.e Change 1 assumes staging metadata reaches Phase
1662/// 4d-prime under the arena `NodeIds` Phase 3 assigned. In practice
1663/// `StagingGraph::add_node` returns `NodeId::new(i, 1)` where `i` is the
1664/// staging-local sequential index (see `staging.rs:355`), and plugins
1665/// key their `NodeMetadataStore` entries under those staging-local IDs
1666/// (see e.g. the Rust plugin's `metadata_store.get_or_insert_default(func_id)`
1667/// at `sqry-lang-rust/src/macro_boundaries/proc_macro_classify.rs:84`).
1668/// Phase 3 then renumbers those into arena slots; `per_file_node_ids[i]`
1669/// is the arena `NodeId` for staging `NodeId(i, 1)`.
1670///
1671/// This helper rekeys each metadata entry by index: an entry under
1672/// staging `NodeId(i, 1)` is moved to `per_file_node_ids[i]`. Entries
1673/// whose staging index is out of bounds or whose generation is not the
1674/// staging-canonical `1` are dropped (defensive — should never happen
1675/// under the documented `StagingGraph::add_node` contract).
1676///
1677/// Returns a fresh arena-keyed [`NodeMetadataStore`]. The input is borrowed
1678/// because this helper only reads staging entries and clones the values that
1679/// survive rekeying into the returned store.
1680#[must_use]
1681pub(crate) fn rekey_staging_metadata_to_arena(
1682    staging_metadata: &crate::graph::unified::storage::metadata::NodeMetadataStore,
1683    per_file_node_ids: &[crate::graph::unified::node::id::NodeId],
1684) -> crate::graph::unified::storage::metadata::NodeMetadataStore {
1685    use crate::graph::unified::node::id::NodeId;
1686    use crate::graph::unified::storage::metadata::NodeMetadataStore;
1687
1688    let mut rekeyed = NodeMetadataStore::new();
1689    for ((index, generation), entry) in staging_metadata.iter_entries() {
1690        // Defensive: staging.add_node always emits generation 1. Drop
1691        // any entry that does not match that contract; it cannot
1692        // correspond to a Phase 3 commit slot.
1693        if generation != 1 {
1694            continue;
1695        }
1696        let idx_usize = index as usize;
1697        let Some(&arena_id) = per_file_node_ids.get(idx_usize) else {
1698            // Stale key beyond the file's committed range — drop silently.
1699            continue;
1700        };
1701        let _ = NodeId::new(index, generation); // documentation: this was the staging-local id
1702        // Re-insert the whole `StoredEntry` (typed payload + flags) so both
1703        // `cfg_condition`/macro metadata AND synthetic markers survive the
1704        // staging-to-arena rekey.
1705        rekeyed.insert_entry(arena_id, entry.clone());
1706    }
1707    // Rekey shape descriptors the same way: staging keys them under
1708    // `NodeId(i, 1)`, the same staging-local index `i` that Phase 3 renumbered
1709    // into `per_file_node_ids[i]`. Without this, descriptors computed in the
1710    // staging store never reach the arena and the feature silently no-ops.
1711    for (&staging_id, descriptor) in staging_metadata.shape_descriptors() {
1712        if staging_id.generation() != 1 {
1713            continue;
1714        }
1715        let Some(&arena_id) = per_file_node_ids.get(staging_id.index() as usize) else {
1716            continue;
1717        };
1718        rekeyed.insert_shape_descriptor(arena_id, descriptor.clone());
1719    }
1720    rekeyed
1721}
1722
1723/// Phase 4d-prime — propagate per-file staging `NodeMetadataStore` into
1724/// the live graph's `macro_metadata` after Phase 4d (bulk edge insert)
1725/// and before Phase 4e (binding-plane derivation).
1726///
1727/// `02_DESIGN` §4.3.e (Changes 4 + 7): the active Phase 3 commit path does
1728/// not read `staging.macro_metadata`, and `StagingGraph::take_macro_metadata`
1729/// was previously defined but never called — staging metadata never reached
1730/// `CodeGraph::macro_metadata`. T3.8's `cfg_condition` cannot ride the Go
1731/// plugin's parallel synthetic-flag channel (per-symbol metadata, not a
1732/// boolean bit on a known set of placeholders), so this sub-phase wires
1733/// the missing path.
1734///
1735/// For each `(file_id, store)` entry:
1736/// 1. Apply the Phase 4c-prime `NodeRemapTable` via
1737///    [`NodeRemapTable::apply_to_metadata_store`] so loser-keyed entries
1738///    are dropped (per `01_SPEC` §5.3.f the spec contract is "losers'
1739///    constraints are lost"; the winner's own per-file store carries the
1740///    authoritative metadata).
1741/// 2. If the store still has entries after the remap, call
1742///    [`NodeMetadataStore::merge`] into the graph's authoritative metadata
1743///    store.
1744///
1745/// Returns `true` when at least one entry was merged, `false` when every
1746/// staged store was empty or fully consumed by loser-drops. The boolean
1747/// is observed by the Phase 3d post-Pass-4d hook on the incremental
1748/// rebuild plane; production callers ignore it.
1749///
1750/// Generic over [`GraphMutationTarget`] so both the full-build
1751/// (`build_unified_graph_inner`) and incremental
1752/// (`incremental_rebuild` → `phase3d_insert_cross_file_edges`) planes
1753/// can call it against `CodeGraph` and `RebuildGraph` respectively.
1754///
1755/// Runs after Phase 4d (`NodeRemapTable` produced by 4c is final) and
1756/// before Phase 4e (binding-plane synthesis can observe `cfg_condition`
1757/// if it later needs to). The Rust plugin's existing `merge_macro_metadata`
1758/// call automatically benefits: Rust-side `#[cfg(...)]` strings start
1759/// flowing into the live snapshot for the first time as an incidental
1760/// fix of an existing latent gap.
1761#[must_use]
1762pub(crate) fn phase4d_prime_propagate_staging_metadata<G>(
1763    graph: &mut G,
1764    staged_metadata: Vec<(
1765        crate::graph::unified::file::id::FileId,
1766        crate::graph::unified::storage::metadata::NodeMetadataStore,
1767    )>,
1768    remap: &super::unification::NodeRemapTable,
1769) -> bool
1770where
1771    G: crate::graph::unified::mutation_target::GraphMutationTarget,
1772{
1773    let target = graph.macro_metadata_mut();
1774    let mut any_inserted = false;
1775    for (_file_id, mut metadata) in staged_metadata {
1776        remap.apply_to_metadata_store(&mut metadata);
1777        if !metadata.is_empty() {
1778            target.merge(&metadata);
1779            any_inserted = true;
1780        }
1781    }
1782    any_inserted
1783}
1784
1785/// Convert per-file `PendingEdge` collections to per-file `DeltaEdge` collections
1786/// with monotonically increasing sequence numbers.
1787///
1788/// The sequence numbers are assigned file-by-file, edge-by-edge, starting from
1789/// `seq_start`. This produces the deterministic ordering required by
1790/// `BidirectionalEdgeStore::add_edges_bulk_ordered()`.
1791#[must_use]
1792pub fn pending_edges_to_delta(
1793    per_file_edges: &[Vec<PendingEdge>],
1794    seq_start: u64,
1795) -> (Vec<Vec<DeltaEdge>>, u64) {
1796    let mut seq = seq_start;
1797    let mut result = Vec::with_capacity(per_file_edges.len());
1798
1799    for file_edges in per_file_edges {
1800        let mut delta_vec = Vec::with_capacity(file_edges.len());
1801        for edge in file_edges {
1802            delta_vec.push(DeltaEdge::with_spans(
1803                edge.source,
1804                edge.target,
1805                edge.kind.clone(),
1806                seq,
1807                DeltaOp::Add,
1808                edge.file,
1809                edge.spans.clone(),
1810            ));
1811            seq += 1;
1812        }
1813        result.push(delta_vec);
1814    }
1815
1816    (result, seq)
1817}
1818
1819/// Rebuild the auxiliary indices on `graph` from its current node arena.
1820///
1821/// Generic counterpart to the inherent [`CodeGraph::rebuild_indices`].
1822/// Takes a [`GraphMutationTarget`] so both the full-build
1823/// (`build_unified_graph_inner`) and incremental-rebuild
1824/// (`incremental_rebuild` on `RebuildGraph`) pipelines can share the
1825/// same helper. The inherent method now delegates here so the
1826/// implementation lives in exactly one place.
1827///
1828/// Internally uses [`GraphMutationTarget::nodes_and_indices_mut`] to
1829/// acquire a disjoint `(&NodeArena, &mut AuxiliaryIndices)` pair, then
1830/// hands them to [`AuxiliaryIndices::build_from_arena`] which clears
1831/// the existing indices and rebuilds in a single pass without
1832/// per-element duplicate checking.
1833///
1834/// [`CodeGraph::rebuild_indices`]: crate::graph::unified::concurrent::CodeGraph::rebuild_indices
1835/// [`AuxiliaryIndices::build_from_arena`]: crate::graph::unified::storage::indices::AuxiliaryIndices::build_from_arena
1836pub(crate) fn rebuild_indices<G: crate::graph::unified::mutation_target::GraphMutationTarget>(
1837    graph: &mut G,
1838) {
1839    let (nodes, indices) = graph.nodes_and_indices_mut();
1840    indices.build_from_arena(nodes);
1841}
1842
1843/// Phase 4d — bulk-insert every pending edge into the graph via the
1844/// deterministic `DeltaEdge` conversion path.
1845///
1846/// Wraps the pure [`pending_edges_to_delta`] conversion + the
1847/// [`BidirectionalEdgeStore::add_edges_bulk_ordered`] call that
1848/// `build_unified_graph_inner` ran inline between Phase 4c-prime and
1849/// Phase 4e. The wrapper is generic over [`GraphMutationTarget`] so
1850/// the Task 4 Step 4 Phase 3 `incremental_rebuild` body can call it
1851/// against a [`RebuildGraph`] without duplicating the seq-counter +
1852/// flatten logic.
1853///
1854/// Returns the final edge sequence counter (for callers that need to
1855/// continue allocating deterministic sequence numbers downstream).
1856/// The counter flows from
1857/// [`BidirectionalEdgeStore::forward().seq_counter()`] on the way in
1858/// and advances by one per inserted edge.
1859///
1860/// # Semantics
1861///
1862/// * `per_file_edges` is consumed by-reference; the function does not
1863///   mutate the caller's buffer. Callers who no longer need the
1864///   vectors may drop them after the call.
1865/// * If `per_file_edges` is empty (or every inner vector is empty),
1866///   the edge store is left untouched.
1867/// * The helper does not `bump_epoch()` on the graph — Phase 4d is
1868///   edge-level only; the full pipeline bumps epoch separately.
1869///
1870/// # Edge-source-identity invariant (`C_EDGE_MIGRATE`)
1871///
1872/// Phase 4d does NOT dedup edges by `(source, target, kind)`. Every
1873/// `PendingEdge` from every file becomes one `DeltaEdge` with a unique
1874/// monotonically increasing `seq` number; the
1875/// [`BidirectionalEdgeStore::add_edges_bulk_ordered`] insertion contract
1876/// preserves that 1:1 mapping. This is what lets the Cluster C
1877/// `C_EDGE_MIGRATE` DAG unit (2026-04-29 `BadLiveware` Go batch) move the
1878/// `TypeOf{Field}` edge source from the struct node to the per-field
1879/// `Property` node without touching this helper: the new
1880/// Property-sourced edge addresses a distinct `(source, target)` pair
1881/// from the legacy struct-sourced edge, and Phase 4d emits both shapes
1882/// with no collapsing. Plugins that only emit the new shape (Go after
1883/// `C_EDGE_MIGRATE`) therefore produce a clean Property-sourced
1884/// `TypeOf{Field}` edge set with no struct-sourced shadows. Plugins
1885/// outside Cluster C's scope (`C_OTHER_PLUGINS`) keep emitting the
1886/// legacy shape until they migrate; the bulk-insert path treats both
1887/// shapes identically.
1888///
1889/// Determinism: per-file `PendingEdge` order is fixed by the parser
1890/// pass, and `pending_edges_to_delta` walks the per-file vectors in
1891/// the input order. So `phase4d_bulk_insert_edges` produces a
1892/// byte-identical `DeltaEdge` sequence on every fresh rebuild of the
1893/// same source tree, which is what guarantees the
1894/// `SnapshotReader → SnapshotWriter` round-trip identity required by
1895/// the `C_EDGE_MIGRATE` acceptance criteria.
1896///
1897/// [`BidirectionalEdgeStore::add_edges_bulk_ordered`]: crate::graph::unified::edge::bidirectional::BidirectionalEdgeStore::add_edges_bulk_ordered
1898/// [`RebuildGraph`]: crate::graph::unified::rebuild::rebuild_graph::RebuildGraph
1899pub(crate) fn phase4d_bulk_insert_edges<
1900    G: crate::graph::unified::mutation_target::GraphMutationTarget,
1901>(
1902    graph: &mut G,
1903    per_file_edges: &[Vec<PendingEdge>],
1904) -> u64 {
1905    // Start seq numbering from the edge store's current counter to
1906    // support non-empty graphs (incremental rebuild carries forward
1907    // the prior build's counter).
1908    let edge_seq_start = graph.edges().forward().seq_counter();
1909    let (delta_edge_vecs, final_seq) = pending_edges_to_delta(per_file_edges, edge_seq_start);
1910    let total_edge_count: u64 = delta_edge_vecs.iter().map(|v| v.len() as u64).sum();
1911    if total_edge_count > 0 {
1912        graph
1913            .edges_mut()
1914            .add_edges_bulk_ordered(&delta_edge_vecs, total_edge_count);
1915    }
1916    final_seq
1917}
1918
1919#[cfg(test)]
1920mod tests {
1921    use super::*;
1922
1923    #[test]
1924    fn test_compute_commit_plan_basic() {
1925        let file_ids = vec![FileId::new(0), FileId::new(1), FileId::new(2)];
1926        let node_counts = vec![3, 0, 5];
1927        let string_counts = vec![2, 1, 3];
1928        let edge_counts = vec![4, 0, 6];
1929
1930        let plan = compute_commit_plan(
1931            &node_counts,
1932            &string_counts,
1933            &edge_counts,
1934            &file_ids,
1935            0,
1936            1, // string_offset=1 for sentinel
1937        );
1938
1939        assert_eq!(plan.total_nodes, 8);
1940        assert_eq!(plan.total_strings, 6);
1941        assert_eq!(plan.total_edges, 10);
1942
1943        // File 0: nodes [0..3), strings [1..3)
1944        assert_eq!(plan.file_plans[0].node_range, 0..3);
1945        assert_eq!(plan.file_plans[0].string_range, 1..3);
1946
1947        // File 1: nodes [3..3), strings [3..4) — empty nodes
1948        assert_eq!(plan.file_plans[1].node_range, 3..3);
1949        assert_eq!(plan.file_plans[1].string_range, 3..4);
1950
1951        // File 2: nodes [3..8), strings [4..7)
1952        assert_eq!(plan.file_plans[2].node_range, 3..8);
1953        assert_eq!(plan.file_plans[2].string_range, 4..7);
1954    }
1955
1956    #[test]
1957    fn test_compute_commit_plan_with_offsets() {
1958        let file_ids = vec![FileId::new(5)];
1959        let plan = compute_commit_plan(&[10], &[5], &[7], &file_ids, 100, 50);
1960        assert_eq!(plan.file_plans[0].node_range, 100..110);
1961        assert_eq!(plan.file_plans[0].string_range, 50..55);
1962        assert_eq!(plan.total_nodes, 10);
1963        assert_eq!(plan.total_strings, 5);
1964        assert_eq!(plan.total_edges, 7);
1965    }
1966
1967    #[test]
1968    fn test_compute_commit_plan_empty() {
1969        let plan = compute_commit_plan(&[], &[], &[], &[], 0, 1);
1970        assert_eq!(plan.total_nodes, 0);
1971        assert_eq!(plan.total_strings, 0);
1972        assert_eq!(plan.total_edges, 0);
1973        assert!(plan.file_plans.is_empty());
1974    }
1975
1976    #[test]
1977    fn test_remap_string_id_basic() {
1978        let mut remap = HashMap::new();
1979        remap.insert(StringId::new(1), StringId::new(100));
1980
1981        let mut id = StringId::new(1);
1982        remap_string_id(&mut id, &remap);
1983        assert_eq!(id, StringId::new(100));
1984    }
1985
1986    #[test]
1987    fn test_remap_string_id_not_in_remap() {
1988        let remap = HashMap::new();
1989        let mut id = StringId::new(42);
1990        remap_string_id(&mut id, &remap);
1991        assert_eq!(id, StringId::new(42)); // unchanged
1992    }
1993
1994    #[test]
1995    fn test_remap_option_string_id() {
1996        let mut remap = HashMap::new();
1997        remap.insert(StringId::new(5), StringId::new(50));
1998
1999        let mut some_id = Some(StringId::new(5));
2000        remap_option_string_id(&mut some_id, &remap);
2001        assert_eq!(some_id, Some(StringId::new(50)));
2002
2003        let mut none_id: Option<StringId> = None;
2004        remap_option_string_id(&mut none_id, &remap);
2005        assert_eq!(none_id, None);
2006    }
2007
2008    #[test]
2009    fn test_remap_edge_kind_imports() {
2010        let mut remap = HashMap::new();
2011        remap.insert(StringId::new(1), StringId::new(100));
2012
2013        let mut kind = EdgeKind::Imports {
2014            alias: Some(StringId::new(1)),
2015            is_wildcard: false,
2016        };
2017        remap_edge_kind_string_ids(&mut kind, &remap);
2018        assert!(
2019            matches!(kind, EdgeKind::Imports { alias: Some(id), .. } if id == StringId::new(100))
2020        );
2021    }
2022
2023    #[test]
2024    fn test_remap_edge_kind_trait_method_binding() {
2025        let mut remap = HashMap::new();
2026        remap.insert(StringId::new(1), StringId::new(100));
2027        remap.insert(StringId::new(2), StringId::new(200));
2028
2029        let mut kind = EdgeKind::TraitMethodBinding {
2030            trait_name: StringId::new(1),
2031            impl_type: StringId::new(2),
2032            is_ambiguous: false,
2033        };
2034        remap_edge_kind_string_ids(&mut kind, &remap);
2035        assert!(
2036            matches!(kind, EdgeKind::TraitMethodBinding { trait_name, impl_type, .. }
2037                if trait_name == StringId::new(100) && impl_type == StringId::new(200))
2038        );
2039    }
2040
2041    #[test]
2042    fn test_remap_edge_kind_no_op_variants() {
2043        let remap = HashMap::new();
2044
2045        // Defines — no StringId fields
2046        let mut kind = EdgeKind::Defines;
2047        remap_edge_kind_string_ids(&mut kind, &remap);
2048        assert!(matches!(kind, EdgeKind::Defines));
2049
2050        // Calls — no StringId fields
2051        let mut kind = EdgeKind::Calls {
2052            argument_count: 3,
2053            is_async: true,
2054            resolved_via: ResolvedVia::Direct,
2055        };
2056        remap_edge_kind_string_ids(&mut kind, &remap);
2057        assert!(matches!(
2058            kind,
2059            EdgeKind::Calls {
2060                argument_count: 3,
2061                is_async: true,
2062                resolved_via: ResolvedVia::Direct,
2063            }
2064        ));
2065    }
2066
2067    fn placeholder_entry() -> NodeEntry {
2068        use crate::graph::unified::node::NodeKind;
2069        NodeEntry::new(NodeKind::Function, StringId::new(0), FileId::new(0))
2070    }
2071
2072    #[test]
2073    fn test_phase2_assign_ranges_basic() {
2074        use super::super::staging::StagingGraph;
2075
2076        // Create 2 staging graphs with known counts
2077        let mut sg0 = StagingGraph::new();
2078        let mut sg1 = StagingGraph::new();
2079
2080        // sg0: 2 nodes, 1 string, 1 edge
2081        let entry0 = placeholder_entry();
2082        let n0 = sg0.add_node(entry0.clone());
2083        let n1 = sg0.add_node(entry0.clone());
2084        sg0.intern_string(StringId::new_local(0), "hello".into());
2085        sg0.add_edge(
2086            n0,
2087            n1,
2088            EdgeKind::Calls {
2089                argument_count: 0,
2090                is_async: false,
2091                resolved_via: ResolvedVia::Direct,
2092            },
2093            FileId::new(0),
2094        );
2095
2096        // sg1: 1 node, 2 strings, 0 edges
2097        sg1.add_node(entry0);
2098        sg1.intern_string(StringId::new_local(0), "world".into());
2099        sg1.intern_string(StringId::new_local(1), "foo".into());
2100
2101        let file_ids = vec![FileId::new(10), FileId::new(11)];
2102        let offsets = GlobalOffsets {
2103            node_offset: 5,
2104            string_offset: 3,
2105        };
2106
2107        let plan = phase2_assign_ranges(&[&sg0, &sg1], &file_ids, &offsets);
2108
2109        // sg0: 2 nodes, 1 string, 1 edge
2110        assert_eq!(plan.file_plans[0].node_range, 5..7);
2111        assert_eq!(plan.file_plans[0].string_range, 3..4);
2112
2113        // sg1: 1 node, 2 strings, 0 edges
2114        assert_eq!(plan.file_plans[1].node_range, 7..8);
2115        assert_eq!(plan.file_plans[1].string_range, 4..6);
2116
2117        assert_eq!(plan.total_nodes, 3);
2118        assert_eq!(plan.total_strings, 3);
2119        assert_eq!(plan.total_edges, 1);
2120    }
2121
2122    #[test]
2123    fn test_phase3_parallel_commit_basic() {
2124        use super::super::staging::StagingGraph;
2125        use crate::graph::unified::concurrent::CodeGraph;
2126        use crate::graph::unified::node::NodeKind;
2127        // The `nodes_mut` / `strings_mut` method calls below resolve
2128        // to inherent methods on `CodeGraph`; the `GraphMutationTarget`
2129        // trait impl provides the same surface for `RebuildGraph`
2130        // (see `phase3_parallel_commit_runs_against_rebuild_graph`).
2131        // No trait import is needed here because inherent-method
2132        // resolution wins for `CodeGraph`.
2133
2134        // Create a staging graph with 2 nodes, 1 string, 1 edge
2135        let mut sg = StagingGraph::new();
2136        let local_name = StringId::new_local(0);
2137        sg.intern_string(local_name, "my_func".into());
2138
2139        let entry = NodeEntry::new(NodeKind::Function, local_name, FileId::new(0));
2140        let n0 = sg.add_node(entry.clone());
2141
2142        let entry2 = NodeEntry::new(NodeKind::Variable, local_name, FileId::new(0));
2143        let n1 = sg.add_node(entry2);
2144
2145        sg.add_edge(
2146            n0,
2147            n1,
2148            EdgeKind::Calls {
2149                argument_count: 0,
2150                is_async: false,
2151                resolved_via: ResolvedVia::Direct,
2152            },
2153            FileId::new(0),
2154        );
2155
2156        let file_ids = vec![FileId::new(5)];
2157
2158        // Pre-allocate with non-zero offsets to verify remap works,
2159        // against a full `CodeGraph` so the new generic signature is
2160        // exercised end-to-end via `GraphMutationTarget`.
2161        let mut graph = CodeGraph::new();
2162        graph
2163            .nodes_mut()
2164            .alloc_range(10, &placeholder_entry())
2165            .unwrap();
2166        let string_start = graph.strings_mut().alloc_range(1).unwrap();
2167        assert_eq!(string_start, 1); // past sentinel
2168
2169        let offsets = GlobalOffsets {
2170            node_offset: 10, // file's nodes start at index 10
2171            string_offset: string_start,
2172        };
2173        let plan = phase2_assign_ranges(&[&sg], &file_ids, &offsets);
2174        assert_eq!(plan.file_plans[0].node_range, 10..12);
2175
2176        // Pre-allocate the actual ranges for Phase 3.
2177        graph
2178            .nodes_mut()
2179            .alloc_range(plan.total_nodes, &placeholder_entry())
2180            .unwrap();
2181        graph.strings_mut().alloc_range(plan.total_strings).unwrap();
2182
2183        // Phase 3 — generic over `G: GraphMutationTarget`. Passing
2184        // `&mut graph` infers `G = CodeGraph`.
2185        let result = phase3_parallel_commit(&plan, &[&sg], &mut graph);
2186
2187        // Verify written counts
2188        assert_eq!(result.total_nodes_written, 2);
2189        assert_eq!(result.total_strings_written, 1);
2190
2191        // Verify strings were written
2192        let global_name = StringId::new(string_start);
2193        assert_eq!(&*graph.strings().resolve(global_name).unwrap(), "my_func");
2194
2195        // Verify 1 file, 1 edge
2196        assert_eq!(result.per_file_edges.len(), 1);
2197        assert_eq!(result.per_file_edges[0].len(), 1);
2198
2199        // Verify edge was remapped to global IDs (node_offset=10)
2200        let edge = &result.per_file_edges[0][0];
2201        assert_eq!(edge.file, FileId::new(5));
2202        assert_eq!(edge.source, NodeId::new(10, 1)); // first node at slot 10
2203        assert_eq!(edge.target, NodeId::new(11, 1)); // second node at slot 11
2204
2205        // Gate 0c (iter-2 B2): per-file node IDs must be recorded in
2206        // commit order, one Vec per FilePlan, so the caller can
2207        // populate FileRegistry::per_file_nodes deterministically.
2208        assert_eq!(result.per_file_node_ids.len(), 1);
2209        assert_eq!(
2210            result.per_file_node_ids[0],
2211            vec![NodeId::new(10, 1), NodeId::new(11, 1)]
2212        );
2213    }
2214
2215    #[test]
2216    fn test_phase3_parallel_commit_empty() {
2217        use crate::graph::unified::concurrent::CodeGraph;
2218
2219        let mut graph = CodeGraph::new();
2220
2221        let plan = ChunkCommitPlan {
2222            file_plans: vec![],
2223            total_nodes: 0,
2224            total_strings: 0,
2225            total_edges: 0,
2226        };
2227
2228        let result = phase3_parallel_commit(&plan, &[], &mut graph);
2229        assert!(result.per_file_edges.is_empty());
2230        assert!(result.per_file_node_ids.is_empty());
2231        assert_eq!(result.total_nodes_written, 0);
2232        assert_eq!(result.total_strings_written, 0);
2233    }
2234
2235    /// Task 4 Step 4 Phase 1 — exercise the `GraphMutationTarget`
2236    /// trait's second implementor.
2237    ///
2238    /// Builds a tiny staging graph, hosts it in a fresh `RebuildGraph`,
2239    /// and asserts the committed nodes land in the **rebuild-local**
2240    /// arena — not in a `CodeGraph`. The test also confirms the
2241    /// per-file edges / node-id vectors the helper returns agree with
2242    /// the `CodeGraph` call-path result shape.
2243    ///
2244    /// If a future refactor accidentally routed Phase 3 back to a
2245    /// `CodeGraph` (e.g. through a hidden static `Arc::make_mut`), this
2246    /// test would observe an empty rebuild arena and fail.
2247    #[test]
2248    #[cfg(feature = "rebuild-internals")]
2249    fn phase3_parallel_commit_runs_against_rebuild_graph() {
2250        use super::super::staging::StagingGraph;
2251        use crate::graph::unified::concurrent::CodeGraph;
2252        use crate::graph::unified::mutation_target::GraphMutationTarget;
2253        use crate::graph::unified::node::NodeKind;
2254
2255        // Staging graph: 2 nodes + 1 string + 1 Calls edge (identical
2256        // shape to the CodeGraph test above, so any behavioural drift
2257        // between the two paths surfaces as different assertions).
2258        let mut sg = StagingGraph::new();
2259        let local_name = StringId::new_local(0);
2260        sg.intern_string(local_name, "rebuild_target".into());
2261        let entry = NodeEntry::new(NodeKind::Function, local_name, FileId::new(0));
2262        let n0 = sg.add_node(entry.clone());
2263        let entry2 = NodeEntry::new(NodeKind::Variable, local_name, FileId::new(0));
2264        let n1 = sg.add_node(entry2);
2265        sg.add_edge(
2266            n0,
2267            n1,
2268            EdgeKind::Calls {
2269                argument_count: 0,
2270                is_async: false,
2271                resolved_via: ResolvedVia::Direct,
2272            },
2273            FileId::new(0),
2274        );
2275
2276        // Produce a RebuildGraph from an empty CodeGraph; drop the
2277        // CodeGraph immediately so any subsequent mutation observed in
2278        // the rebuild cannot possibly be leaking back to a shared Arc.
2279        let mut rebuild = {
2280            let graph = CodeGraph::new();
2281            graph.clone_for_rebuild()
2282        };
2283
2284        // Pre-allocate leading slots on the rebuild-local arena +
2285        // interner so the file's ranges begin at a non-zero offset —
2286        // this is the same pattern the CodeGraph test uses, verifying
2287        // the trait's disjoint-borrow combinator threads through
2288        // identically.
2289        rebuild
2290            .nodes_mut()
2291            .alloc_range(10, &placeholder_entry())
2292            .unwrap();
2293        let string_start = rebuild.strings_mut().alloc_range(1).unwrap();
2294        assert_eq!(string_start, 1);
2295
2296        let file_ids = vec![FileId::new(5)];
2297        let offsets = GlobalOffsets {
2298            node_offset: 10,
2299            string_offset: string_start,
2300        };
2301        let plan = phase2_assign_ranges(&[&sg], &file_ids, &offsets);
2302
2303        rebuild
2304            .nodes_mut()
2305            .alloc_range(plan.total_nodes, &placeholder_entry())
2306            .unwrap();
2307        rebuild
2308            .strings_mut()
2309            .alloc_range(plan.total_strings)
2310            .unwrap();
2311
2312        // Phase 3 against the RebuildGraph. Inferred `G = RebuildGraph`.
2313        let result = phase3_parallel_commit(&plan, &[&sg], &mut rebuild);
2314
2315        // === Invariant: the written data lives in the rebuild-local
2316        // arena, not in any CodeGraph field. ===
2317        //
2318        // Two slot ranges exist on the rebuild's arena now:
2319        //   * slots 0..10 = pre-fill placeholders (each `Function` /
2320        //     `StringId::new(0)` — note every alloc_range writes a
2321        //     clone of the template entry).
2322        //   * slots 10..12 = the two committed nodes from `sg`.
2323        //
2324        // Fetch the two committed NodeIds and resolve their names
2325        // through the rebuild-local interner; the string must match
2326        // the staged value "rebuild_target", proving the commit ran
2327        // on the rebuild's own fields.
2328        let committed_ids = &result.per_file_node_ids[0];
2329        assert_eq!(
2330            committed_ids,
2331            &vec![NodeId::new(10, 1), NodeId::new(11, 1)],
2332            "Phase 3 must commit into slots 10..12 on the rebuild-local arena"
2333        );
2334
2335        let resolved_name = rebuild
2336            .nodes_mut()
2337            .get(NodeId::new(10, 1))
2338            .map(|entry| entry.name)
2339            .expect("committed node must exist in rebuild arena");
2340        // The name StringId on the committed node is a global ID
2341        // (Phase 3 remaps local → global); resolving it through the
2342        // rebuild-local interner must produce the staged value.
2343        let resolved_str = rebuild
2344            .strings_mut()
2345            .resolve(resolved_name)
2346            .expect("name must resolve in rebuild-local interner");
2347        assert_eq!(&*resolved_str, "rebuild_target");
2348
2349        // === Shape invariants match the CodeGraph path ===
2350        assert_eq!(result.total_nodes_written, 2);
2351        assert_eq!(result.total_strings_written, 1);
2352        assert_eq!(result.per_file_edges.len(), 1);
2353        assert_eq!(result.per_file_edges[0].len(), 1);
2354        let edge = &result.per_file_edges[0][0];
2355        assert_eq!(edge.file, FileId::new(5));
2356        assert_eq!(edge.source, NodeId::new(10, 1));
2357        assert_eq!(edge.target, NodeId::new(11, 1));
2358    }
2359
2360    #[test]
2361    fn test_commit_single_file_string_remap() {
2362        use super::super::staging::StagingGraph;
2363        use crate::graph::unified::node::NodeKind;
2364
2365        let mut sg = StagingGraph::new();
2366        let local_0 = StringId::new_local(0);
2367        let local_1 = StringId::new_local(1);
2368        sg.intern_string(local_0, "alpha".into());
2369        sg.intern_string(local_1, "beta".into());
2370
2371        let mut entry = NodeEntry::new(NodeKind::Function, local_0, FileId::new(0));
2372        entry.signature = Some(local_1);
2373        sg.add_node(entry);
2374
2375        let plan = FilePlan {
2376            parsed_index: 0,
2377            file_id: FileId::new(42),
2378            node_range: 10..11,
2379            string_range: 20..22,
2380        };
2381
2382        let mut node_slots = vec![Slot::new_occupied(1, placeholder_entry())];
2383        let mut str_slots: Vec<Option<Arc<str>>> = vec![None, None];
2384        let mut rc_slots: Vec<u32> = vec![0, 0];
2385
2386        let result = commit_single_file(&sg, &plan, &mut node_slots, &mut str_slots, &mut rc_slots);
2387
2388        // Strings written
2389        assert_eq!(str_slots[0].as_deref(), Some("alpha"));
2390        assert_eq!(str_slots[1].as_deref(), Some("beta"));
2391        assert_eq!(rc_slots[0], 1);
2392        assert_eq!(rc_slots[1], 1);
2393        assert_eq!(result.strings_written, 2);
2394
2395        // Node entry has remapped StringIds
2396        if let crate::graph::unified::storage::SlotState::Occupied(entry) = node_slots[0].state() {
2397            assert_eq!(entry.name, StringId::new(20)); // global slot 20
2398            assert_eq!(entry.signature, Some(StringId::new(21))); // global slot 21
2399            assert_eq!(entry.file, FileId::new(42));
2400        } else {
2401            panic!("Expected occupied slot");
2402        }
2403        assert_eq!(result.nodes_written, 1);
2404
2405        // Per-file node IDs are recorded in commit order (Gate 0c bucket contract).
2406        assert_eq!(result.node_ids, vec![NodeId::new(10, 1)]);
2407
2408        // No edges
2409        assert!(result.edges.is_empty());
2410    }
2411
2412    #[test]
2413    fn test_remap_edge_kind_message_queue_other() {
2414        let mut remap = HashMap::new();
2415        remap.insert(StringId::new(10), StringId::new(110));
2416        remap.insert(StringId::new(20), StringId::new(220));
2417
2418        let mut kind = EdgeKind::MessageQueue {
2419            protocol: MqProtocol::Other(StringId::new(10)),
2420            topic: Some(StringId::new(20)),
2421        };
2422        remap_edge_kind_string_ids(&mut kind, &remap);
2423        assert!(matches!(
2424            kind,
2425            EdgeKind::MessageQueue {
2426                protocol: MqProtocol::Other(proto),
2427                topic: Some(topic),
2428            } if proto == StringId::new(110) && topic == StringId::new(220)
2429        ));
2430    }
2431
2432    // === Phase 4 tests ===
2433
2434    #[test]
2435    fn test_phase4_apply_global_remap_basic() {
2436        use crate::graph::unified::node::NodeKind;
2437        use crate::graph::unified::storage::NodeArena;
2438
2439        let mut arena = NodeArena::new();
2440
2441        // Allocate two nodes with duplicate string IDs (2 and 3 are dupes of 1)
2442        let entry1 = NodeEntry::new(NodeKind::Function, StringId::new(1), FileId::new(0));
2443        let mut entry2 = NodeEntry::new(NodeKind::Variable, StringId::new(2), FileId::new(0));
2444        entry2.signature = Some(StringId::new(3));
2445
2446        arena.alloc(entry1).unwrap();
2447        arena.alloc(entry2).unwrap();
2448
2449        // Edges with string IDs that need remapping
2450        let mut all_edges = vec![vec![PendingEdge {
2451            source: NodeId::new(0, 1),
2452            target: NodeId::new(1, 1),
2453            kind: EdgeKind::Imports {
2454                alias: Some(StringId::new(3)),
2455                is_wildcard: false,
2456            },
2457            file: FileId::new(0),
2458            spans: vec![],
2459        }]];
2460
2461        // Dedup remap: 2→1, 3→1
2462        let mut remap = HashMap::new();
2463        remap.insert(StringId::new(2), StringId::new(1));
2464        remap.insert(StringId::new(3), StringId::new(1));
2465
2466        phase4_apply_global_remap(&mut arena, &mut all_edges, &remap);
2467
2468        // Check that node 1's name was remapped from 2→1
2469        let (_, entry) = arena.iter().nth(1).unwrap();
2470        assert_eq!(entry.name, StringId::new(1));
2471        assert_eq!(entry.signature, Some(StringId::new(1)));
2472
2473        // Check that edge's alias was remapped from 3→1
2474        if let EdgeKind::Imports { alias, .. } = &all_edges[0][0].kind {
2475            assert_eq!(*alias, Some(StringId::new(1)));
2476        } else {
2477            panic!("Expected Imports edge");
2478        }
2479    }
2480
2481    #[test]
2482    fn test_phase4_apply_global_remap_empty() {
2483        use crate::graph::unified::storage::NodeArena;
2484
2485        let mut arena = NodeArena::new();
2486        let mut edges: Vec<Vec<PendingEdge>> = vec![];
2487        let remap = HashMap::new();
2488
2489        // Should be a no-op
2490        phase4_apply_global_remap(&mut arena, &mut edges, &remap);
2491    }
2492
2493    #[test]
2494    fn test_pending_edges_to_delta_basic() {
2495        let edges = vec![
2496            vec![
2497                PendingEdge {
2498                    source: NodeId::new(0, 1),
2499                    target: NodeId::new(1, 1),
2500                    kind: EdgeKind::Calls {
2501                        argument_count: 0,
2502                        is_async: false,
2503                        resolved_via: ResolvedVia::Direct,
2504                    },
2505                    file: FileId::new(0),
2506                    spans: vec![],
2507                },
2508                PendingEdge {
2509                    source: NodeId::new(1, 1),
2510                    target: NodeId::new(2, 1),
2511                    kind: EdgeKind::References,
2512                    file: FileId::new(0),
2513                    spans: vec![],
2514                },
2515            ],
2516            vec![PendingEdge {
2517                source: NodeId::new(3, 1),
2518                target: NodeId::new(4, 1),
2519                kind: EdgeKind::Defines,
2520                file: FileId::new(1),
2521                spans: vec![],
2522            }],
2523        ];
2524
2525        let (deltas, final_seq) = pending_edges_to_delta(&edges, 100);
2526
2527        assert_eq!(deltas.len(), 2);
2528        assert_eq!(deltas[0].len(), 2);
2529        assert_eq!(deltas[1].len(), 1);
2530        assert_eq!(final_seq, 103);
2531
2532        // Check sequence numbers are monotonic
2533        assert_eq!(deltas[0][0].seq, 100);
2534        assert_eq!(deltas[0][1].seq, 101);
2535        assert_eq!(deltas[1][0].seq, 102);
2536
2537        // Check all are Add operations
2538        assert!(matches!(deltas[0][0].op, DeltaOp::Add));
2539        assert!(matches!(deltas[1][0].op, DeltaOp::Add));
2540    }
2541
2542    #[test]
2543    fn test_pending_edges_to_delta_empty() {
2544        let edges: Vec<Vec<PendingEdge>> = vec![];
2545        let (deltas, final_seq) = pending_edges_to_delta(&edges, 0);
2546        assert!(deltas.is_empty());
2547        assert_eq!(final_seq, 0);
2548    }
2549
2550    // ==================================================================
2551    // Task 4 Step 4 Phase 2: rebuild-plane coverage for migrated helpers.
2552    //
2553    // Each test below proves that the migrated helper runs against a
2554    // `RebuildGraph` (not just a `CodeGraph`) and that the mutation
2555    // lands on the rebuild-local state. Together with the CodeGraph
2556    // tests that still exercise the same helpers on the full-build
2557    // path, they form the "runs on both implementors" coverage
2558    // contract for `GraphMutationTarget` consumers.
2559    // ==================================================================
2560
2561    /// Seed two call-compatible nodes (both `NodeKind::Function`) under
2562    /// the same qualified-name StringId across two distinct files, then
2563    /// run [`phase4c_prime_unify_cross_file_nodes`] against a
2564    /// [`RebuildGraph`]. Verify the loser node is tombstoned
2565    /// (name + qualified_name cleared per `merge_node_into`'s contract)
2566    /// and that pending edges pointing at the loser are rewritten to
2567    /// the winner.
2568    #[test]
2569    #[cfg(feature = "rebuild-internals")]
2570    fn phase4c_prime_unify_cross_file_nodes_runs_against_rebuild_graph() {
2571        use crate::graph::unified::concurrent::CodeGraph;
2572        use crate::graph::unified::mutation_target::GraphMutationTarget;
2573        use crate::graph::unified::node::NodeKind;
2574
2575        let mut rebuild = {
2576            let graph = CodeGraph::new();
2577            graph.clone_for_rebuild()
2578        };
2579
2580        // Intern a shared qualified name. On the rebuild-local
2581        // interner; strings() resolves it for later assertions.
2582        let qname_sid = rebuild.strings_mut().intern("my_mod::my_func").unwrap();
2583
2584        // Register two files that host the duplicate Function nodes.
2585        let file_a = FileId::new(7);
2586        let file_b = FileId::new(8);
2587
2588        // Build two `NodeKind::Function` entries sharing the same
2589        // qualified_name. Winner has a wider span (start_line > 0 and
2590        // end_line > start_line) to exercise the winner-selection
2591        // tie-break.
2592        let mut winner_entry = NodeEntry::new(NodeKind::Function, qname_sid, file_a);
2593        winner_entry.qualified_name = Some(qname_sid);
2594        winner_entry.start_line = 10;
2595        winner_entry.end_line = 30;
2596
2597        let mut loser_entry = NodeEntry::new(NodeKind::Function, qname_sid, file_b);
2598        loser_entry.qualified_name = Some(qname_sid);
2599        // Narrower span → loses the tie-break.
2600        loser_entry.start_line = 5;
2601        loser_entry.end_line = 6;
2602
2603        let winner_id = rebuild.nodes_mut().alloc(winner_entry).unwrap();
2604        let loser_id = rebuild.nodes_mut().alloc(loser_entry).unwrap();
2605
2606        // A pending edge whose target is the loser — the remap table
2607        // should rewrite it to point at the winner.
2608        let mut all_edges = vec![vec![PendingEdge {
2609            source: winner_id, // any valid source — the helper only rewrites targets here
2610            target: loser_id,
2611            kind: EdgeKind::Calls {
2612                argument_count: 0,
2613                is_async: false,
2614                resolved_via: ResolvedVia::Direct,
2615            },
2616            file: file_b,
2617            spans: vec![],
2618        }]];
2619
2620        let (stats, _remap) = phase4c_prime_unify_cross_file_nodes(&mut rebuild, &mut all_edges);
2621
2622        // Stats shape
2623        assert_eq!(stats.nodes_merged, 1, "exactly one loser was tombstoned");
2624        assert_eq!(stats.candidate_pairs_examined, 1);
2625        assert_eq!(stats.edges_rewritten, 1);
2626
2627        // Winner node survived with qualified_name intact.
2628        let winner_entry_after = GraphMutationTarget::nodes(&rebuild)
2629            .get(winner_id)
2630            .expect("winner must remain live");
2631        assert_eq!(
2632            winner_entry_after.qualified_name,
2633            Some(qname_sid),
2634            "winner keeps its qualified_name"
2635        );
2636
2637        // Loser entry was merged via `merge_node_into`, which clears
2638        // `name` and `qualified_name` to make the slot name-invisible.
2639        let loser_entry_after = GraphMutationTarget::nodes(&rebuild)
2640            .get(loser_id)
2641            .expect("loser slot remains live (inert) per §F.1 bijection");
2642        assert_eq!(
2643            loser_entry_after.qualified_name, None,
2644            "loser qualified_name cleared by merge_node_into"
2645        );
2646
2647        // Pending edge target rewritten winner-ward.
2648        assert_eq!(
2649            all_edges[0][0].target, winner_id,
2650            "PendingEdge.target rewritten from loser → winner"
2651        );
2652    }
2653
2654    /// Lock in the Phase 4c-prime tie-break ordering Codex blessed in iter-1:
2655    /// primary = `start_line > 0`, tie-break 1 = wider span, tie-break 2 =
2656    /// lexicographically smaller **file path** (stable across rebuild
2657    /// representations), final fallback = smaller `NodeId::index()`.
2658    ///
2659    /// This test exercises the tie-break 2 path: two candidates with real
2660    /// spans of identical width, hosted in two different files that differ
2661    /// only in filename ordering. The winner must be the node whose file
2662    /// path sorts earlier, regardless of NodeId allocation order.
2663    #[test]
2664    #[cfg(feature = "rebuild-internals")]
2665    fn phase4c_prime_tie_break_prefers_lex_smaller_path_over_node_id() {
2666        use crate::graph::unified::concurrent::CodeGraph;
2667        use crate::graph::unified::node::NodeKind;
2668        use std::path::Path;
2669
2670        let mut graph = CodeGraph::new();
2671        let qname = graph.strings_mut().intern("shared_qname").unwrap();
2672        // Register two paths whose lexical ordering is the reverse of
2673        // the registration (and hence NodeId) order. This isolates the
2674        // path-based tie-break from any accidental NodeId-ordering
2675        // coincidence: if the helper fell back to NodeId the "wrong"
2676        // node would win.
2677        let high_path_file = graph
2678            .files_mut()
2679            .register(Path::new("zzz_late.rs"))
2680            .unwrap();
2681        let low_path_file = graph
2682            .files_mut()
2683            .register(Path::new("aaa_early.rs"))
2684            .unwrap();
2685
2686        // Allocate the `zzz_late.rs` node first so its NodeId::index() is
2687        // numerically smaller than the `aaa_early.rs` node's. With
2688        // identical spans, NodeId-only tie-break would incorrectly pick
2689        // the `zzz_late.rs` node. The correct behaviour is that the
2690        // path-based tie-break picks the `aaa_early.rs` node.
2691        let mut high_entry = NodeEntry::new(NodeKind::Function, qname, high_path_file);
2692        high_entry.qualified_name = Some(qname);
2693        high_entry.start_line = 10;
2694        high_entry.end_line = 20;
2695        let high_node = graph.nodes_mut().alloc(high_entry).unwrap();
2696
2697        let mut low_entry = NodeEntry::new(NodeKind::Function, qname, low_path_file);
2698        low_entry.qualified_name = Some(qname);
2699        // Identical span width — forces the tie-break to ignore primary
2700        // + tie-break 1 (span width) and reach tie-break 2 (path).
2701        low_entry.start_line = 10;
2702        low_entry.end_line = 20;
2703        let low_node = graph.nodes_mut().alloc(low_entry).unwrap();
2704
2705        graph.rebuild_indices();
2706
2707        let mut all_edges: Vec<Vec<PendingEdge>> = Vec::new();
2708        let (stats, _remap) = phase4c_prime_unify_cross_file_nodes(&mut graph, &mut all_edges);
2709
2710        assert_eq!(
2711            stats.nodes_merged, 1,
2712            "one of the duplicate nodes must be merged into the other"
2713        );
2714
2715        // The `aaa_early.rs` node wins because its path sorts lexically
2716        // smaller. Verify its qualified_name is intact.
2717        let low_after = graph
2718            .nodes()
2719            .get(low_node)
2720            .expect("winner slot remains live");
2721        assert_eq!(
2722            low_after.qualified_name,
2723            Some(qname),
2724            "path-earlier node keeps qualified_name as the unification winner"
2725        );
2726
2727        // And the `zzz_late.rs` node — despite a numerically smaller
2728        // NodeId::index() — was merged away.
2729        let high_after = graph
2730            .nodes()
2731            .get(high_node)
2732            .expect("loser slot remains inert (Gate 0d bijection contract)");
2733        assert_eq!(
2734            high_after.qualified_name, None,
2735            "path-later node loses even when its NodeId::index() is smaller"
2736        );
2737    }
2738
2739    /// When the path-based tie-break ALSO ties (two duplicate nodes in the
2740    /// same file — rare but possible via duplicate definitions), the
2741    /// deterministic fallback is `b.index().cmp(&a.index())` which picks
2742    /// the node with the **smaller** NodeId index. Lock that in so future
2743    /// refactors of the tie-break don't accidentally flip the fallback
2744    /// direction.
2745    #[test]
2746    #[cfg(feature = "rebuild-internals")]
2747    fn phase4c_prime_tie_break_falls_back_to_smaller_node_id_on_identical_path() {
2748        use crate::graph::unified::concurrent::CodeGraph;
2749        use crate::graph::unified::node::NodeKind;
2750        use std::path::Path;
2751
2752        let mut graph = CodeGraph::new();
2753        let qname = graph.strings_mut().intern("shared_qname").unwrap();
2754        let file = graph.files_mut().register(Path::new("shared.rs")).unwrap();
2755
2756        // Allocate two duplicate nodes in the SAME file with identical
2757        // spans. The only thing that differs between them is their
2758        // NodeId index (allocation order). Tie-breaks 1 (span width)
2759        // and 2 (path) both return Equal; the final `b.index().cmp(&a.index())`
2760        // fallback picks the smaller index as the winner.
2761        let mut first_entry = NodeEntry::new(NodeKind::Function, qname, file);
2762        first_entry.qualified_name = Some(qname);
2763        first_entry.start_line = 1;
2764        first_entry.end_line = 5;
2765        let first_node = graph.nodes_mut().alloc(first_entry).unwrap();
2766
2767        let mut second_entry = NodeEntry::new(NodeKind::Function, qname, file);
2768        second_entry.qualified_name = Some(qname);
2769        second_entry.start_line = 1;
2770        second_entry.end_line = 5;
2771        let second_node = graph.nodes_mut().alloc(second_entry).unwrap();
2772
2773        assert!(
2774            first_node.index() < second_node.index(),
2775            "precondition: first_node's arena slot precedes second_node's"
2776        );
2777
2778        graph.rebuild_indices();
2779
2780        let mut all_edges: Vec<Vec<PendingEdge>> = Vec::new();
2781        let (stats, _remap) = phase4c_prime_unify_cross_file_nodes(&mut graph, &mut all_edges);
2782
2783        assert_eq!(stats.nodes_merged, 1);
2784
2785        // Smaller NodeId::index() wins.
2786        let winner_after = graph.nodes().get(first_node).expect("winner live");
2787        assert_eq!(
2788            winner_after.qualified_name,
2789            Some(qname),
2790            "smaller-index node wins the same-path / same-span tie-break"
2791        );
2792        let loser_after = graph.nodes().get(second_node).expect("loser inert");
2793        assert_eq!(
2794            loser_after.qualified_name, None,
2795            "larger-index node loses the same-path / same-span tie-break"
2796        );
2797    }
2798
2799    /// Drive the free [`rebuild_indices`] function against both a
2800    /// `RebuildGraph` and a `CodeGraph` seeded with identical data,
2801    /// and verify the resulting `AuxiliaryIndices` are structurally
2802    /// equivalent (same name buckets, same kind buckets).
2803    #[test]
2804    #[cfg(feature = "rebuild-internals")]
2805    fn rebuild_indices_runs_against_rebuild_graph() {
2806        use crate::graph::unified::concurrent::CodeGraph;
2807        use crate::graph::unified::mutation_target::GraphMutationTarget;
2808        use crate::graph::unified::node::NodeKind;
2809
2810        // === CodeGraph baseline ===
2811        let mut code_graph = CodeGraph::new();
2812        let alpha_id_code = code_graph.strings_mut().intern("alpha").unwrap();
2813        let mut code_entry = NodeEntry::new(NodeKind::Function, alpha_id_code, FileId::new(1));
2814        code_entry.qualified_name = Some(alpha_id_code);
2815        let code_node_id = code_graph.nodes_mut().alloc(code_entry).unwrap();
2816        rebuild_indices(&mut code_graph);
2817        let code_buckets_function: Vec<NodeId> =
2818            code_graph.indices().by_kind(NodeKind::Function).to_vec();
2819
2820        // === RebuildGraph path ===
2821        let mut rebuild = {
2822            let graph = CodeGraph::new();
2823            graph.clone_for_rebuild()
2824        };
2825        let alpha_id_rebuild = rebuild.strings_mut().intern("alpha").unwrap();
2826        let mut rebuild_entry =
2827            NodeEntry::new(NodeKind::Function, alpha_id_rebuild, FileId::new(1));
2828        rebuild_entry.qualified_name = Some(alpha_id_rebuild);
2829        let rebuild_node_id = rebuild.nodes_mut().alloc(rebuild_entry).unwrap();
2830        rebuild_indices(&mut rebuild);
2831
2832        // The node ids are both the first allocation on their
2833        // respective arenas, so they share slot indices and
2834        // generations.
2835        assert_eq!(code_node_id, rebuild_node_id);
2836
2837        // The trait-method accessor routes through the impl on
2838        // `RebuildGraph`; the returned indices came from the
2839        // rebuild-local `AuxiliaryIndices` (not a CodeGraph's).
2840        let rebuild_buckets_function: Vec<NodeId> = GraphMutationTarget::indices(&rebuild)
2841            .by_kind(NodeKind::Function)
2842            .to_vec();
2843
2844        assert_eq!(
2845            code_buckets_function, rebuild_buckets_function,
2846            "rebuild_indices must produce equivalent Function buckets on both paths"
2847        );
2848        // Name bucket also present on the rebuild side.
2849        let by_name: Vec<NodeId> = GraphMutationTarget::indices(&rebuild)
2850            .by_name(alpha_id_rebuild)
2851            .to_vec();
2852        assert_eq!(by_name, vec![rebuild_node_id]);
2853    }
2854
2855    /// Drive [`phase4d_bulk_insert_edges`] against a `RebuildGraph`.
2856    /// Seed two nodes, construct a per-file `PendingEdge` vector, and
2857    /// prove the edges land on the rebuild-local edge store with the
2858    /// expected monotonically-advancing sequence counter.
2859    #[test]
2860    #[cfg(feature = "rebuild-internals")]
2861    fn phase4d_bulk_insert_edges_runs_against_rebuild_graph() {
2862        use crate::graph::unified::concurrent::CodeGraph;
2863        use crate::graph::unified::mutation_target::GraphMutationTarget;
2864        use crate::graph::unified::node::NodeKind;
2865
2866        let mut rebuild = {
2867            let graph = CodeGraph::new();
2868            graph.clone_for_rebuild()
2869        };
2870
2871        let name_sid = rebuild.strings_mut().intern("edge_target").unwrap();
2872        let file = FileId::new(3);
2873
2874        let n_source = rebuild
2875            .nodes_mut()
2876            .alloc(NodeEntry::new(NodeKind::Function, name_sid, file))
2877            .unwrap();
2878        let n_target = rebuild
2879            .nodes_mut()
2880            .alloc(NodeEntry::new(NodeKind::Variable, name_sid, file))
2881            .unwrap();
2882
2883        // Pre-condition: no edges in the rebuild-local forward store.
2884        let pre_counter = GraphMutationTarget::edges(&rebuild).forward().seq_counter();
2885
2886        let per_file_edges = vec![vec![
2887            PendingEdge {
2888                source: n_source,
2889                target: n_target,
2890                kind: EdgeKind::Calls {
2891                    argument_count: 0,
2892                    is_async: false,
2893                    resolved_via: ResolvedVia::Direct,
2894                },
2895                file,
2896                spans: vec![],
2897            },
2898            PendingEdge {
2899                source: n_source,
2900                target: n_target,
2901                kind: EdgeKind::Calls {
2902                    argument_count: 1,
2903                    is_async: false,
2904                    resolved_via: ResolvedVia::Direct,
2905                },
2906                file,
2907                spans: vec![],
2908            },
2909        ]];
2910
2911        let final_seq = phase4d_bulk_insert_edges(&mut rebuild, &per_file_edges);
2912
2913        // Seq counter advanced by exactly two edges.
2914        assert_eq!(
2915            final_seq,
2916            pre_counter + 2,
2917            "phase4d_bulk_insert_edges must advance seq by edge count"
2918        );
2919
2920        // Rebuild-local forward store now contains both edges.
2921        let forward = GraphMutationTarget::edges(&rebuild).forward();
2922        let after_counter = forward.seq_counter();
2923        assert_eq!(after_counter, pre_counter + 2);
2924        // Forward delta must carry the two new edges.
2925        assert!(
2926            forward.delta().iter().filter(|e| e.is_add()).count() >= 2,
2927            "expected at least two Add edges in the rebuild-local forward delta"
2928        );
2929        drop(forward);
2930
2931        // Empty input is a no-op on the edge store.
2932        let empty_final = phase4d_bulk_insert_edges(&mut rebuild, &[]);
2933        assert_eq!(empty_final, pre_counter + 2, "empty input is a no-op");
2934    }
2935
2936    /// `C_EDGE_MIGRATE` regression: when a Cluster C plugin migrates a
2937    /// `TypeOf{Field}` edge's source from a struct node to the per-field
2938    /// `Property` node, Phase 4d must NOT collapse the new shape onto
2939    /// any sibling edge. Both Property-sourced and struct-sourced
2940    /// edges - including a struct-sourced edge over the same target /
2941    /// kind tuple - must round-trip into the bulk-insert path with
2942    /// distinct `(source, target)` identities and stable seq ordering.
2943    ///
2944    /// This locks the property the
2945    /// `phase4d_bulk_insert_edges` doc-comment promises to plugin
2946    /// authors: per-file `PendingEdge` order is preserved 1:1 by
2947    /// `pending_edges_to_delta`, and no `(source, target, kind)` dedup
2948    /// fires inside Phase 4d. Without this guarantee the migration
2949    /// would silently drop the new Property-sourced edges whenever an
2950    /// older legacy snapshot mixed both shapes during a partial
2951    /// rebuild.
2952    #[test]
2953    fn phase4d_preserves_property_sourced_typeof_field_edges() {
2954        use crate::graph::unified::edge::kind::TypeOfContext;
2955
2956        // Synthetic NodeIds standing in for `main.SelectorSource` (struct),
2957        // `main.SelectorSource.NeedTags` (Property), and `bool` (target type).
2958        let struct_id = NodeId::new(10, 1);
2959        let property_id = NodeId::new(11, 1);
2960        let bool_id = NodeId::new(12, 1);
2961
2962        let typeof_field_kind = EdgeKind::TypeOf {
2963            context: Some(TypeOfContext::Field),
2964            index: Some(0),
2965            name: None,
2966        };
2967
2968        // Two PendingEdges over the same (target, kind) discriminator
2969        // but different sources - the post-migration Property-sourced
2970        // shape and a hypothetical legacy struct-sourced shadow that
2971        // could appear during a partial rebuild. Phase 4d must keep
2972        // both.
2973        let per_file_edges = vec![vec![
2974            PendingEdge {
2975                source: property_id,
2976                target: bool_id,
2977                kind: typeof_field_kind.clone(),
2978                file: FileId::new(0),
2979                spans: vec![],
2980            },
2981            PendingEdge {
2982                source: struct_id,
2983                target: bool_id,
2984                kind: typeof_field_kind.clone(),
2985                file: FileId::new(0),
2986                spans: vec![],
2987            },
2988        ]];
2989
2990        let (deltas, final_seq) = pending_edges_to_delta(&per_file_edges, 500);
2991
2992        // No dedup: both edges land in the per-file delta vector with
2993        // distinct seq numbers, in input order.
2994        assert_eq!(deltas.len(), 1);
2995        assert_eq!(deltas[0].len(), 2);
2996        assert_eq!(final_seq, 502);
2997
2998        assert_eq!(deltas[0][0].source, property_id);
2999        assert_eq!(deltas[0][0].target, bool_id);
3000        assert_eq!(deltas[0][0].seq, 500);
3001        assert!(matches!(
3002            deltas[0][0].kind,
3003            EdgeKind::TypeOf {
3004                context: Some(TypeOfContext::Field),
3005                ..
3006            }
3007        ));
3008
3009        assert_eq!(deltas[0][1].source, struct_id);
3010        assert_eq!(deltas[0][1].target, bool_id);
3011        assert_eq!(deltas[0][1].seq, 501);
3012
3013        // Determinism re-check: re-running the conversion against the
3014        // same input produces an identical DeltaEdge sequence (same
3015        // sources, same targets, same kinds, same seq numbers when
3016        // re-anchored to the same `seq_start`). This is the property
3017        // the SnapshotReader → SnapshotWriter byte-identity round-trip
3018        // assertion relies on for fresh-rebuild reproducibility.
3019        let (deltas_again, final_seq_again) = pending_edges_to_delta(&per_file_edges, 500);
3020        assert_eq!(final_seq_again, final_seq);
3021        assert_eq!(deltas_again.len(), deltas.len());
3022        assert_eq!(deltas_again[0].len(), deltas[0].len());
3023        for (a, b) in deltas[0].iter().zip(deltas_again[0].iter()) {
3024            assert_eq!(a.source, b.source);
3025            assert_eq!(a.target, b.target);
3026            assert_eq!(a.seq, b.seq);
3027        }
3028    }
3029
3030    // ----------------------------------------------------------------------
3031    // T3 Cluster B (02_DESIGN §4.3.e Change 4): Phase 4d-prime propagation
3032    // ----------------------------------------------------------------------
3033
3034    /// Build a per-file `NodeMetadataStore` carrying one Macro entry with
3035    /// a `cfg_condition` so the merge step is non-vacuous.
3036    fn macro_store_with(
3037        node_id: NodeId,
3038        cfg: &str,
3039    ) -> crate::graph::unified::storage::metadata::NodeMetadataStore {
3040        use crate::graph::unified::storage::metadata::{MacroNodeMetadata, NodeMetadataStore};
3041        let mut store = NodeMetadataStore::new();
3042        let m = MacroNodeMetadata {
3043            cfg_condition: Some(cfg.to_string()),
3044            ..Default::default()
3045        };
3046        store.insert(node_id, m);
3047        store
3048    }
3049
3050    #[test]
3051    fn phase4d_prime_merges_per_file_metadata_into_graph_macro_metadata() {
3052        use super::super::unification::NodeRemapTable;
3053        use crate::graph::unified::concurrent::CodeGraph;
3054        use crate::graph::unified::mutation_target::GraphMutationTarget;
3055
3056        let mut graph = CodeGraph::new();
3057        let nid_a = NodeId::new(101, 1);
3058        let nid_b = NodeId::new(202, 1);
3059        let file_a = FileId::new(7);
3060        let file_b = FileId::new(8);
3061
3062        let staged = vec![
3063            (file_a, macro_store_with(nid_a, "linux")),
3064            (file_b, macro_store_with(nid_b, "darwin")),
3065        ];
3066
3067        let remap = NodeRemapTable::default();
3068        let merged = phase4d_prime_propagate_staging_metadata(&mut graph, staged, &remap);
3069
3070        assert!(
3071            merged,
3072            "non-empty staged stores must report metadata_changed=true"
3073        );
3074        assert_eq!(
3075            GraphMutationTarget::macro_metadata_mut(&mut graph)
3076                .get_macro(nid_a)
3077                .and_then(|m| m.cfg_condition.clone()),
3078            Some("linux".to_string())
3079        );
3080        assert_eq!(
3081            GraphMutationTarget::macro_metadata_mut(&mut graph)
3082                .get_macro(nid_b)
3083                .and_then(|m| m.cfg_condition.clone()),
3084            Some("darwin".to_string())
3085        );
3086    }
3087
3088    #[test]
3089    fn phase4d_prime_drops_loser_metadata_before_merge() {
3090        // Pins 02_DESIGN §4.3.e Change 3 contract: when the unifier
3091        // tombstones a loser, its staged metadata must NOT survive into
3092        // the graph (the winner's own per-file store carries the
3093        // authoritative cfg_condition; 01_SPEC §5.3.f spec text).
3094        use super::super::unification::NodeRemapTable;
3095        use crate::graph::unified::concurrent::CodeGraph;
3096        use crate::graph::unified::mutation_target::GraphMutationTarget;
3097
3098        let mut graph = CodeGraph::new();
3099        let loser = NodeId::new(101, 1);
3100        let winner = NodeId::new(202, 1);
3101        let file_loser = FileId::new(7);
3102        let file_winner = FileId::new(8);
3103
3104        // Loser file stages `linux`, winner file stages `darwin`.
3105        let staged = vec![
3106            (file_loser, macro_store_with(loser, "linux")),
3107            (file_winner, macro_store_with(winner, "darwin")),
3108        ];
3109
3110        // Unifier marks `loser → winner`.
3111        let mut remap = NodeRemapTable::default();
3112        remap.insert(loser, winner);
3113
3114        let merged = phase4d_prime_propagate_staging_metadata(&mut graph, staged, &remap);
3115        assert!(
3116            merged,
3117            "winner's store still merges so metadata_changed=true"
3118        );
3119
3120        // The winner gets `darwin` from its own file's store. The loser
3121        // entry is dropped before merge — it never reaches the graph
3122        // under the winner key.
3123        assert_eq!(
3124            GraphMutationTarget::macro_metadata_mut(&mut graph)
3125                .get_macro(winner)
3126                .and_then(|m| m.cfg_condition.clone()),
3127            Some("darwin".to_string()),
3128            "winner's authoritative cfg_condition wins; loser's `linux` is dropped"
3129        );
3130        assert!(
3131            GraphMutationTarget::macro_metadata_mut(&mut graph)
3132                .get_macro(loser)
3133                .is_none(),
3134            "loser key has no metadata in the graph after Phase 4d-prime"
3135        );
3136    }
3137
3138    #[test]
3139    fn rekey_staging_metadata_to_arena_maps_local_to_arena() {
3140        // Stage metadata under staging-local NodeIds (i, 1) for i ∈ {0, 1, 2}
3141        // and confirm the rekeyed store carries the same payload under
3142        // the corresponding arena NodeIds drawn from per_file_node_ids.
3143        use crate::graph::unified::storage::metadata::{MacroNodeMetadata, NodeMetadataStore};
3144
3145        let mut staging = NodeMetadataStore::new();
3146        for (i, cond) in ["linux", "darwin", "windows"].iter().enumerate() {
3147            let m = MacroNodeMetadata {
3148                cfg_condition: Some((*cond).to_string()),
3149                ..Default::default()
3150            };
3151            staging.insert(NodeId::new(i as u32, 1), m);
3152        }
3153
3154        // Arena NodeIds — note generation 1 (the standard staging.add_node
3155        // contract) and arbitrary non-sequential arena slots.
3156        let arena_ids = vec![
3157            NodeId::new(100, 1),
3158            NodeId::new(101, 1),
3159            NodeId::new(102, 1),
3160        ];
3161
3162        let rekeyed = rekey_staging_metadata_to_arena(&staging, &arena_ids);
3163
3164        assert_eq!(rekeyed.len(), 3);
3165        for (i, cond) in ["linux", "darwin", "windows"].iter().enumerate() {
3166            let m = rekeyed
3167                .get_macro(arena_ids[i])
3168                .expect("arena NodeId carries the remapped entry");
3169            assert_eq!(m.cfg_condition.as_deref(), Some(*cond));
3170        }
3171        // Original staging keys are gone (no longer in the rekeyed store).
3172        assert!(rekeyed.get_macro(NodeId::new(0, 1)).is_none());
3173    }
3174
3175    #[test]
3176    fn rekey_staging_metadata_maps_shape_descriptors_local_to_arena() {
3177        // The shape-only path: descriptors keyed under staging-local NodeIds
3178        // must land under the corresponding arena NodeIds, with no entry
3179        // metadata present at all (the common case for ordinary functions).
3180        use crate::graph::unified::build::shape::CfBucket;
3181        use crate::graph::unified::storage::metadata::NodeMetadataStore;
3182        use crate::graph::unified::storage::shape::ShapeDescriptor;
3183
3184        let mut staging = NodeMetadataStore::new();
3185        for i in 0..3u32 {
3186            let mut d = ShapeDescriptor::default();
3187            // Stamp a distinguishable histogram per node so we can confirm the
3188            // exact descriptor rode the rekey, not just some descriptor.
3189            d.cf_histogram[CfBucket::Branch.index()] = (i + 1) as u16;
3190            staging.insert_shape_descriptor(NodeId::new(i, 1), d);
3191        }
3192        assert!(staging.get_macro(NodeId::new(0, 1)).is_none());
3193        assert!(
3194            !staging.is_empty(),
3195            "a shape-only staging store is non-empty"
3196        );
3197
3198        let arena_ids = vec![
3199            NodeId::new(100, 1),
3200            NodeId::new(101, 1),
3201            NodeId::new(102, 1),
3202        ];
3203        let rekeyed = rekey_staging_metadata_to_arena(&staging, &arena_ids);
3204
3205        assert_eq!(rekeyed.shape_descriptors().len(), 3);
3206        for i in 0..3u32 {
3207            let d = rekeyed
3208                .shape_descriptor(arena_ids[i as usize])
3209                .expect("arena NodeId carries the remapped descriptor");
3210            assert_eq!(d.cf_histogram[CfBucket::Branch.index()], (i + 1) as u16);
3211        }
3212        // Original staging key carries nothing in the rekeyed store.
3213        assert!(rekeyed.shape_descriptor(NodeId::new(0, 1)).is_none());
3214    }
3215
3216    #[test]
3217    fn rekey_staging_metadata_drops_out_of_range_keys() {
3218        // Staging metadata keyed at index 5 but per_file_node_ids only has
3219        // 3 entries: the helper drops the stale key rather than panicking.
3220        use crate::graph::unified::storage::metadata::{MacroNodeMetadata, NodeMetadataStore};
3221
3222        let mut staging = NodeMetadataStore::new();
3223        let in_range = MacroNodeMetadata {
3224            cfg_condition: Some("good".to_string()),
3225            ..Default::default()
3226        };
3227        staging.insert(NodeId::new(0, 1), in_range);
3228
3229        let stale = MacroNodeMetadata {
3230            cfg_condition: Some("bad".to_string()),
3231            ..Default::default()
3232        };
3233        staging.insert(NodeId::new(5, 1), stale);
3234
3235        let arena_ids = vec![NodeId::new(100, 1)];
3236        let rekeyed = rekey_staging_metadata_to_arena(&staging, &arena_ids);
3237
3238        assert_eq!(rekeyed.len(), 1, "stale out-of-range key dropped");
3239        assert_eq!(
3240            rekeyed
3241                .get_macro(NodeId::new(100, 1))
3242                .and_then(|m| m.cfg_condition.clone()),
3243            Some("good".to_string())
3244        );
3245    }
3246
3247    #[test]
3248    fn phase4d_prime_empty_staged_metadata_returns_false() {
3249        use super::super::unification::NodeRemapTable;
3250        use crate::graph::unified::concurrent::CodeGraph;
3251
3252        let mut graph = CodeGraph::new();
3253        let remap = NodeRemapTable::default();
3254        let merged = phase4d_prime_propagate_staging_metadata(&mut graph, Vec::new(), &remap);
3255        assert!(!merged, "no staged stores → metadata_changed=false");
3256    }
3257
3258    #[test]
3259    fn phase4d_prime_empty_store_after_loser_drop_returns_false() {
3260        // Single staged store that is ENTIRELY losers — after
3261        // `apply_to_metadata_store` drops them all, the store is empty
3262        // and `merge` should not be called.
3263        use super::super::unification::NodeRemapTable;
3264        use crate::graph::unified::concurrent::CodeGraph;
3265        use crate::graph::unified::mutation_target::GraphMutationTarget;
3266
3267        let mut graph = CodeGraph::new();
3268        let loser = NodeId::new(101, 1);
3269        let winner = NodeId::new(202, 1);
3270        let file_loser = FileId::new(7);
3271
3272        let staged = vec![(file_loser, macro_store_with(loser, "linux"))];
3273
3274        let mut remap = NodeRemapTable::default();
3275        remap.insert(loser, winner);
3276
3277        let merged = phase4d_prime_propagate_staging_metadata(&mut graph, staged, &remap);
3278
3279        assert!(
3280            !merged,
3281            "store collapsed to empty by loser-drop → no merge → metadata_changed=false"
3282        );
3283        assert!(
3284            GraphMutationTarget::macro_metadata_mut(&mut graph).is_empty(),
3285            "graph metadata store stays empty"
3286        );
3287    }
3288}