Skip to main content

sqry_core/graph/unified/build/
parallel_commit.rs

1//! Parallel commit pipeline for pre-allocated ID ranges.
2//!
3//! Replaces the serial commit loop with a four-phase pipeline:
4//! Phase 2: Count + range assignment via prefix sums
5//! Phase 3: Parallel commit into disjoint pre-allocated ranges
6//! Phase 4: String dedup, remap, index build, edge bulk insert
7//!
8//! # Phase 3 Architecture
9//!
10//! Phase 3 uses `split_at_mut` to carve disjoint sub-slices from pre-allocated
11//! arena and interner ranges, then uses `rayon` to commit each file's staging
12//! graph in parallel without locks:
13//!
14//! ```text
15//! NodeArena slots:   [   file0   |   file1   |   file2   ]
16//! StringInterner:    [   file0   |   file1   |   file2   ]
17//!                         ↑            ↑            ↑
18//!                    split_at_mut  split_at_mut  remainder
19//! ```
20//!
21//! Each file's `commit_single_file` receives its own disjoint slices and
22//! operates independently without contention.
23
24use std::collections::HashMap;
25use std::ops::Range;
26use std::sync::Arc;
27
28use rayon::prelude::*;
29
30use crate::graph::unified::edge::delta::{DeltaEdge, DeltaOp};
31use crate::graph::unified::edge::kind::{EdgeKind, MqProtocol};
32use crate::graph::unified::file::FileId;
33use crate::graph::unified::node::NodeId;
34use crate::graph::unified::storage::NodeArena;
35use crate::graph::unified::storage::arena::{NodeEntry, Slot};
36use crate::graph::unified::string::StringId;
37
38use super::pass3_intra::PendingEdge;
39use super::staging::{StagingGraph, StagingOp};
40
41/// Running offsets carried across chunks for deterministic ID assignment.
42///
43/// Each chunk's ranges begin where the previous chunk ended, ensuring
44/// globally unique, contiguous ID spaces.
45#[derive(Debug, Clone, Default)]
46pub struct GlobalOffsets {
47    /// Next available node slot index.
48    pub node_offset: u32,
49    /// Next available string slot index.
50    pub string_offset: u32,
51}
52
53/// Per-file commit plan with pre-assigned ID ranges.
54#[derive(Debug, Clone)]
55pub struct FilePlan {
56    /// Index into the chunk's `ParsedFile` vec.
57    pub parsed_index: usize,
58    /// Pre-assigned `FileId` from batch registration.
59    pub file_id: FileId,
60    /// Node slot range [start..end) in `NodeArena`.
61    pub node_range: Range<u32>,
62    /// String slot range [start..end) in `StringInterner`.
63    pub string_range: Range<u32>,
64}
65
66/// Plan for parallel commit of a single chunk.
67#[derive(Debug, Clone)]
68pub struct ChunkCommitPlan {
69    /// Per-file plans in deterministic file order.
70    pub file_plans: Vec<FilePlan>,
71    /// Total nodes across all files in this chunk.
72    pub total_nodes: u32,
73    /// Total strings across all files in this chunk.
74    pub total_strings: u32,
75    /// Total edges across all files in this chunk.
76    pub total_edges: u64,
77}
78
79/// Compute commit plan from parsed files using prefix-sum range assignment.
80///
81/// Each file gets contiguous, non-overlapping ranges for nodes and strings.
82/// Ranges start from the given global offsets, which carry forward across
83/// chunks.
84///
85/// # Arguments
86///
87/// * `node_counts` - Per-file node counts (from `StagingGraph::node_count_u32()`)
88/// * `string_counts` - Per-file string counts
89/// * `edge_counts` - Per-file edge counts (used for `total_edges` only)
90/// * `file_ids` - Pre-assigned `FileId`s from batch registration
91/// * `node_offset` - Running global node offset across chunks
92/// * `string_offset` - Running global string offset across chunks
93///
94/// # Panics
95///
96/// Panics in debug builds if the per-chunk accounting arrays do not have
97/// identical lengths.
98#[must_use]
99pub fn compute_commit_plan(
100    node_counts: &[u32],
101    string_counts: &[u32],
102    edge_counts: &[u32],
103    file_ids: &[FileId],
104    node_offset: u32,
105    string_offset: u32,
106) -> ChunkCommitPlan {
107    debug_assert_eq!(node_counts.len(), string_counts.len());
108    debug_assert_eq!(node_counts.len(), edge_counts.len());
109    debug_assert_eq!(node_counts.len(), file_ids.len());
110
111    let mut plans = Vec::with_capacity(node_counts.len());
112    let mut node_cursor = node_offset;
113    let mut string_cursor = string_offset;
114    let mut total_edges: u64 = 0;
115
116    for i in 0..node_counts.len() {
117        let nc = node_counts[i];
118        let sc = string_counts[i];
119
120        let node_end = node_cursor
121            .checked_add(nc)
122            .expect("node ID space overflow in commit plan");
123        let string_end = string_cursor
124            .checked_add(sc)
125            .expect("string ID space overflow in commit plan");
126
127        plans.push(FilePlan {
128            parsed_index: i,
129            file_id: file_ids[i],
130            node_range: node_cursor..node_end,
131            string_range: string_cursor..string_end,
132        });
133
134        node_cursor = node_end;
135        string_cursor = string_end;
136        total_edges += u64::from(edge_counts[i]);
137    }
138
139    ChunkCommitPlan {
140        file_plans: plans,
141        total_nodes: node_cursor - node_offset,
142        total_strings: string_cursor - string_offset,
143        total_edges,
144    }
145}
146
147/// Execute Phase 2: count + range assignment for a parsed chunk.
148///
149/// Extracts per-file counts from staging graphs and delegates to
150/// [`compute_commit_plan`] for prefix-sum range assignment.
151#[must_use]
152pub fn phase2_assign_ranges(
153    staging_graphs: &[&StagingGraph],
154    file_ids: &[FileId],
155    offsets: &GlobalOffsets,
156) -> ChunkCommitPlan {
157    let node_counts: Vec<u32> = staging_graphs
158        .iter()
159        .map(|sg| sg.node_count_u32())
160        .collect();
161    let string_counts: Vec<u32> = staging_graphs
162        .iter()
163        .map(|sg| sg.string_count_u32())
164        .collect();
165    let edge_counts: Vec<u32> = staging_graphs
166        .iter()
167        .map(|sg| sg.edge_count_u32())
168        .collect();
169
170    compute_commit_plan(
171        &node_counts,
172        &string_counts,
173        &edge_counts,
174        file_ids,
175        offsets.node_offset,
176        offsets.string_offset,
177    )
178}
179
180/// Phase 3 result: per-file edges, per-file node IDs, and total written
181/// counts for validation.
182pub struct Phase3Result {
183    /// Per-file edge collections for Phase 4 bulk insert.
184    pub per_file_edges: Vec<Vec<PendingEdge>>,
185    /// Per-file node IDs actually committed. Indexed identically to
186    /// `per_file_edges` — element `i` is the Vec of NodeIds committed
187    /// for `plan.file_plans[i]`. Empty Vec when that file wrote no
188    /// nodes (slot overflow skip, or a staging graph with only strings).
189    ///
190    /// Used by the caller to populate
191    /// [`crate::graph::unified::storage::registry::FileRegistry::record_node`],
192    /// which feeds the Gate 0c bucket-bijection debug invariant.
193    pub per_file_node_ids: Vec<Vec<NodeId>>,
194    /// Total nodes actually written (for validation against planned totals).
195    pub total_nodes_written: usize,
196    /// Total strings actually written (for validation against planned totals).
197    pub total_strings_written: usize,
198    /// Total edges collected across all files.
199    pub total_edges_collected: usize,
200}
201
202/// Execute Phase 3: parallel commit into disjoint pre-allocated ranges.
203///
204/// Pre-splits arena and interner slices into per-file disjoint sub-slices
205/// using `split_at_mut`, then uses `rayon` `par_iter` for lock-free parallel
206/// writes. Each file's staging graph is committed independently.
207///
208/// Returns [`Phase3Result`] with per-file edges and written counts so the
209/// caller can validate against plan totals and truncate allocations on
210/// mismatch.
211///
212/// # Parameterisation over the mutation target
213///
214/// As of Task 4 Step 4 Phase 1, this function is generic over
215/// `G: GraphMutationTarget`. At the full-build call site in
216/// `build_unified_graph_inner` the target is `CodeGraph`; at the
217/// Task 4 Step 4 Phase 2+ incremental rebuild call site the target
218/// will be `RebuildGraph`. Both impls live in
219/// [`crate::graph::unified::mutation_target`]; see that module's
220/// docs for the field-coverage contract.
221///
222/// The function accesses exactly two fields via the trait —
223/// [`GraphMutationTarget::nodes_and_strings_mut`] — and pre-splits
224/// those two slices for the per-file parallel commit. Every other
225/// piece of the pipeline (the CSR/delta edge store, auxiliary
226/// indices, file registry, etc.) is untouched by this helper: the
227/// `PendingEdge` vectors in the returned [`Phase3Result`] are
228/// threaded through to Phase 4d (`pending_edges_to_delta` +
229/// `BidirectionalEdgeStore::add_edges_bulk_ordered`) by the caller.
230///
231/// # Panics
232///
233/// Panics if `plan.total_nodes` or `plan.total_strings` exceeds the
234/// pre-allocated range in the arena or interner.
235#[must_use]
236pub(crate) fn phase3_parallel_commit<
237    G: crate::graph::unified::mutation_target::GraphMutationTarget,
238>(
239    plan: &ChunkCommitPlan,
240    staging_graphs: &[&StagingGraph],
241    graph: &mut G,
242) -> Phase3Result {
243    if plan.file_plans.is_empty() {
244        return Phase3Result {
245            per_file_edges: Vec::new(),
246            per_file_node_ids: Vec::new(),
247            total_nodes_written: 0,
248            total_strings_written: 0,
249            total_edges_collected: 0,
250        };
251    }
252
253    // Determine the start of the pre-allocated ranges.
254    let node_start = plan.file_plans[0].node_range.start;
255    let string_start = plan.file_plans[0].string_range.start;
256
257    // Borrow the arena and interner disjointly via the mutation-plane
258    // trait. This is the one-and-only field access this helper makes
259    // on the graph; every downstream step operates on the resulting
260    // slices without revisiting `graph`.
261    let (arena, interner) = graph.nodes_and_strings_mut();
262
263    // Get mutable slices covering the entire pre-allocated region.
264    let node_slice = arena.bulk_slice_mut(node_start, plan.total_nodes);
265    let (str_slice, rc_slice) = interner.bulk_slices_mut(string_start, plan.total_strings);
266
267    // Pre-split into per-file disjoint sub-slices using split_at_mut.
268    let mut node_remaining = &mut *node_slice;
269    let mut str_remaining = &mut *str_slice;
270    let mut rc_remaining = &mut *rc_slice;
271
272    #[allow(clippy::type_complexity)]
273    let mut file_work: Vec<(
274        &mut [Slot<NodeEntry>],
275        &mut [Option<Arc<str>>],
276        &mut [u32],
277        &FilePlan,
278        usize,
279    )> = Vec::with_capacity(plan.file_plans.len());
280
281    for (i, file_plan) in plan.file_plans.iter().enumerate() {
282        let nc = (file_plan.node_range.end - file_plan.node_range.start) as usize;
283        let sc = (file_plan.string_range.end - file_plan.string_range.start) as usize;
284
285        let (n, nr) = node_remaining.split_at_mut(nc);
286        let (s, sr) = str_remaining.split_at_mut(sc);
287        let (r, rr) = rc_remaining.split_at_mut(sc);
288
289        file_work.push((n, s, r, file_plan, i));
290        node_remaining = nr;
291        str_remaining = sr;
292        rc_remaining = rr;
293    }
294
295    // Parallel commit — each closure owns disjoint slices, no contention.
296    let results: Vec<FileCommitResult> = file_work
297        .into_par_iter()
298        .map(|(node_slots, str_slots, rc_slots, file_plan, idx)| {
299            commit_single_file(
300                staging_graphs[idx],
301                file_plan,
302                node_slots,
303                str_slots,
304                rc_slots,
305            )
306        })
307        .collect();
308
309    let total_nodes_written: usize = results.iter().map(|r| r.nodes_written).sum();
310    let total_strings_written: usize = results.iter().map(|r| r.strings_written).sum();
311    let total_edges_collected: usize = results.iter().map(|r| r.edges.len()).sum();
312    let mut per_file_edges = Vec::with_capacity(results.len());
313    let mut per_file_node_ids = Vec::with_capacity(results.len());
314    for r in results {
315        per_file_edges.push(r.edges);
316        per_file_node_ids.push(r.node_ids);
317    }
318
319    Phase3Result {
320        per_file_edges,
321        per_file_node_ids,
322        total_nodes_written,
323        total_strings_written,
324        total_edges_collected,
325    }
326}
327
328/// Commit a single file's staging graph into pre-allocated disjoint ranges.
329///
330/// This function operates on slices that belong exclusively to this file,
331/// so it requires no locks or synchronization.
332///
333/// # Steps
334///
335/// 1. **Strings**: Extract `InternString` ops, write `Arc<str>` values into
336///    pre-allocated string slots, build local→global `StringId` remap.
337/// 2. **Nodes**: Extract `AddNode` ops, apply string remap to each `NodeEntry`,
338///    set `file_id`, write into pre-allocated node slots, build expected→actual
339///    `NodeId` remap.
340/// 3. **Edges**: Extract `AddEdge` ops, apply node ID remap to source/target,
341///    assign pre-computed sequence numbers, return as `PendingEdge` vec.
342// Result of committing a single file: edges + committed NodeIds + actual written counts.
343struct FileCommitResult {
344    edges: Vec<PendingEdge>,
345    /// Every `NodeId` committed into the arena for this file, in
346    /// commit order. Used by the sequential post-commit step that
347    /// populates `FileRegistry::per_file_nodes`.
348    node_ids: Vec<NodeId>,
349    nodes_written: usize,
350    strings_written: usize,
351}
352
353fn commit_single_file(
354    staging: &StagingGraph,
355    plan: &FilePlan,
356    node_slots: &mut [Slot<NodeEntry>],
357    str_slots: &mut [Option<Arc<str>>],
358    rc_slots: &mut [u32],
359) -> FileCommitResult {
360    let ops = staging.operations();
361
362    // --- Step 1: Write strings, build local→global remap ---
363    let (string_remap, strings_written) = write_strings(ops, plan, str_slots, rc_slots);
364
365    // --- Step 2: Write nodes, build expected→actual node ID remap ---
366    let (node_remap, nodes_written, node_ids) = write_nodes(ops, plan, node_slots, &string_remap);
367
368    // --- Step 3: Collect remapped edges with pre-assigned sequence numbers ---
369    let edges = collect_edges(ops, plan, &node_remap, &string_remap);
370
371    FileCommitResult {
372        edges,
373        node_ids,
374        nodes_written,
375        strings_written,
376    }
377}
378
379/// Write staged strings into pre-allocated interner slots.
380///
381/// Validates that each `InternString` op has a local `StringId` and that
382/// no duplicate local IDs exist (matching the serial `commit_strings` checks).
383///
384/// Returns `(remap, strings_written)`.
385fn write_strings(
386    ops: &[StagingOp],
387    plan: &FilePlan,
388    str_slots: &mut [Option<Arc<str>>],
389    rc_slots: &mut [u32],
390) -> (HashMap<StringId, StringId>, usize) {
391    let mut remap = HashMap::new();
392    let mut string_cursor = 0usize;
393
394    for op in ops {
395        if let StagingOp::InternString { local_id, value } = op {
396            // Validate: only local IDs are allowed in staging (matching serial commit_strings)
397            assert!(
398                local_id.is_local(),
399                "non-local StringId {:?} in InternString op for file {:?}",
400                local_id,
401                plan.file_id,
402            );
403            // Validate: no duplicate local IDs (matching serial commit_strings)
404            assert!(
405                !remap.contains_key(local_id),
406                "duplicate local StringId {:?} in InternString op for file {:?}",
407                local_id,
408                plan.file_id,
409            );
410
411            if string_cursor >= str_slots.len() {
412                log::warn!(
413                    "string slot overflow in file {:?}: cursor={string_cursor}, slots={}, skipping remaining strings",
414                    plan.file_id,
415                    str_slots.len()
416                );
417                break;
418            }
419
420            // The global StringId for this string is the pre-allocated slot index.
421            #[allow(clippy::cast_possible_truncation)] // cursor is bounded by allocated slot count
422            let global_id = StringId::new(plan.string_range.start + string_cursor as u32);
423
424            // Write the string into the pre-allocated slot.
425            str_slots[string_cursor] = Some(Arc::from(value.as_str()));
426            rc_slots[string_cursor] = 1;
427
428            remap.insert(*local_id, global_id);
429            string_cursor += 1;
430        }
431    }
432
433    (remap, string_cursor)
434}
435
436/// Remap all `StringId` fields in a `NodeEntry` using a local→global table.
437///
438/// Required field (`name`) is always remapped if local.
439/// Optional fields (`signature`, `doc`, `qualified_name`, `visibility`)
440/// are remapped if present and local.
441fn remap_node_entry_string_ids(entry: &mut NodeEntry, remap: &HashMap<StringId, StringId>) {
442    remap_required_local(&mut entry.name, remap);
443    remap_option_local(&mut entry.signature, remap);
444    remap_option_local(&mut entry.doc, remap);
445    remap_option_local(&mut entry.qualified_name, remap);
446    remap_option_local(&mut entry.visibility, remap);
447}
448
449/// Remap all local `StringId` fields in an `EdgeKind`.
450///
451/// Uses the same exhaustive match as `remap_edge_kind_string_ids`, but
452/// only remaps local IDs (those with `LOCAL_TAG_BIT` set).
453#[allow(clippy::match_same_arms)]
454fn remap_edge_kind_local_string_ids(kind: &mut EdgeKind, remap: &HashMap<StringId, StringId>) {
455    match kind {
456        EdgeKind::Imports { alias, .. } => remap_option_local(alias, remap),
457        EdgeKind::Exports { alias, .. } => remap_option_local(alias, remap),
458        EdgeKind::TypeOf { name, .. } => remap_option_local(name, remap),
459        EdgeKind::TraitMethodBinding {
460            trait_name,
461            impl_type,
462            ..
463        } => {
464            remap_required_local(trait_name, remap);
465            remap_required_local(impl_type, remap);
466        }
467        EdgeKind::HttpRequest { url, .. } => remap_option_local(url, remap),
468        EdgeKind::GrpcCall { service, method } => {
469            remap_required_local(service, remap);
470            remap_required_local(method, remap);
471        }
472        EdgeKind::DbQuery { table, .. } => remap_option_local(table, remap),
473        EdgeKind::TableRead { table_name, schema } => {
474            remap_required_local(table_name, remap);
475            remap_option_local(schema, remap);
476        }
477        EdgeKind::TableWrite {
478            table_name, schema, ..
479        } => {
480            remap_required_local(table_name, remap);
481            remap_option_local(schema, remap);
482        }
483        EdgeKind::TriggeredBy {
484            trigger_name,
485            schema,
486        } => {
487            remap_required_local(trigger_name, remap);
488            remap_option_local(schema, remap);
489        }
490        EdgeKind::MessageQueue { protocol, topic } => {
491            if let MqProtocol::Other(s) = protocol {
492                remap_required_local(s, remap);
493            }
494            remap_option_local(topic, remap);
495        }
496        EdgeKind::WebSocket { event } => remap_option_local(event, remap),
497        EdgeKind::GraphQLOperation { operation } => remap_required_local(operation, remap),
498        EdgeKind::ProcessExec { command } => remap_required_local(command, remap),
499        EdgeKind::FileIpc { path_pattern } => remap_option_local(path_pattern, remap),
500        EdgeKind::ProtocolCall { protocol, metadata } => {
501            remap_required_local(protocol, remap);
502            remap_option_local(metadata, remap);
503        }
504        // Variants without StringId fields — exhaustive, no wildcard.
505        EdgeKind::Defines
506        | EdgeKind::Contains
507        | EdgeKind::Calls { .. }
508        | EdgeKind::References
509        | EdgeKind::Inherits
510        | EdgeKind::Implements
511        | EdgeKind::LifetimeConstraint { .. }
512        | EdgeKind::MacroExpansion { .. }
513        | EdgeKind::FfiCall { .. }
514        | EdgeKind::WebAssemblyCall
515        | EdgeKind::GenericBound
516        | EdgeKind::AnnotatedWith
517        | EdgeKind::AnnotationParam
518        | EdgeKind::LambdaCaptures
519        | EdgeKind::ModuleExports
520        | EdgeKind::ModuleRequires
521        | EdgeKind::ModuleOpens
522        | EdgeKind::ModuleProvides
523        | EdgeKind::TypeArgument
524        | EdgeKind::ExtensionReceiver
525        | EdgeKind::CompanionOf
526        | EdgeKind::SealedPermit => {}
527    }
528}
529
530/// Remap a required local `StringId` in place.
531///
532/// Panics if a local ID has no mapping, matching the serial
533/// `apply_string_remap` behavior that returned `UnmappedLocalStringId`.
534fn remap_required_local(id: &mut StringId, remap: &HashMap<StringId, StringId>) {
535    if id.is_local() {
536        let global = remap.get(id).unwrap_or_else(|| {
537            panic!("unmapped local StringId {id:?} — missing intern_string op?")
538        });
539        *id = *global;
540    }
541}
542
543/// Remap an optional local `StringId` in place.
544fn remap_option_local(opt: &mut Option<StringId>, remap: &HashMap<StringId, StringId>) {
545    if let Some(id) = opt
546        && id.is_local()
547    {
548        let global = remap.get(id).unwrap_or_else(|| {
549            panic!("unmapped local StringId {id:?} — missing intern_string op?")
550        });
551        *id = *global;
552    }
553}
554
555/// Write staged nodes into pre-allocated arena slots.
556///
557/// Returns `(remap, nodes_written, node_ids)`. `node_ids` is the Vec of
558/// every `NodeId` committed for this file, in commit order, for use by
559/// the sequential bucket-population post-step.
560fn write_nodes(
561    ops: &[StagingOp],
562    plan: &FilePlan,
563    node_slots: &mut [Slot<NodeEntry>],
564    string_remap: &HashMap<StringId, StringId>,
565) -> (HashMap<NodeId, NodeId>, usize, Vec<NodeId>) {
566    let mut node_remap = HashMap::new();
567    let mut node_cursor = 0usize;
568    let mut node_ids: Vec<NodeId> = Vec::with_capacity(node_slots.len());
569
570    for op in ops {
571        if let StagingOp::AddNode {
572            entry, expected_id, ..
573        } = op
574        {
575            if node_cursor >= node_slots.len() {
576                log::warn!(
577                    "node slot overflow in file {:?}: cursor={node_cursor}, slots={}, skipping remaining nodes",
578                    plan.file_id,
579                    node_slots.len()
580                );
581                break;
582            }
583
584            let mut entry = entry.clone();
585
586            // Apply string remap to all StringId fields in the entry.
587            remap_node_entry_string_ids(&mut entry, string_remap);
588
589            // Set the file ID from the plan.
590            entry.file = plan.file_id;
591
592            // The actual NodeId is the pre-allocated slot index with generation 1.
593            #[allow(clippy::cast_possible_truncation)] // cursor is bounded by allocated slot count
594            let actual_index = plan.node_range.start + node_cursor as u32;
595            let actual_id = NodeId::new(actual_index, 1);
596
597            // Write into the pre-allocated slot.
598            node_slots[node_cursor] = Slot::new_occupied(1, entry);
599
600            if let Some(expected) = expected_id {
601                node_remap.insert(*expected, actual_id);
602            }
603
604            node_ids.push(actual_id);
605            node_cursor += 1;
606        }
607    }
608
609    (node_remap, node_cursor, node_ids)
610}
611
612/// Collect staged edges with remapped node IDs, string IDs, and pre-assigned
613/// sequence numbers.
614fn collect_edges(
615    ops: &[StagingOp],
616    plan: &FilePlan,
617    node_remap: &HashMap<NodeId, NodeId>,
618    string_remap: &HashMap<StringId, StringId>,
619) -> Vec<PendingEdge> {
620    let mut edges = Vec::new();
621
622    for op in ops {
623        if let StagingOp::AddEdge {
624            source,
625            target,
626            kind,
627            spans,
628            ..
629        } = op
630        {
631            let actual_source = node_remap.get(source).copied().unwrap_or(*source);
632            let actual_target = node_remap.get(target).copied().unwrap_or(*target);
633
634            // Clone and remap any local StringIds in the EdgeKind.
635            let mut remapped_kind = kind.clone();
636            remap_edge_kind_local_string_ids(&mut remapped_kind, string_remap);
637
638            edges.push(PendingEdge {
639                source: actual_source,
640                target: actual_target,
641                kind: remapped_kind,
642                file: plan.file_id,
643                spans: spans.clone(),
644            });
645        }
646    }
647
648    edges
649}
650
651/// Remap a required `StringId` using the dedup remap table.
652///
653/// If the ID is in the remap table, it is replaced with the canonical ID.
654/// Otherwise, it is left unchanged (identity mapping).
655#[allow(clippy::implicit_hasher)]
656pub fn remap_string_id(id: &mut StringId, remap: &HashMap<StringId, StringId>) {
657    if let Some(&canonical) = remap.get(id) {
658        *id = canonical;
659    }
660}
661
662/// Remap an optional `StringId` using the dedup remap table.
663#[allow(clippy::implicit_hasher)]
664pub fn remap_option_string_id(id: &mut Option<StringId>, remap: &HashMap<StringId, StringId>) {
665    if let Some(inner) = id {
666        remap_string_id(inner, remap);
667    }
668}
669
670/// Exhaustive remap of all `StringId` fields in an `EdgeKind`.
671///
672/// No wildcard arm — the compiler ensures completeness when new variants
673/// are added to `EdgeKind`.
674#[allow(clippy::match_same_arms, clippy::implicit_hasher)] // Arms are separated by category for documentation clarity
675pub fn remap_edge_kind_string_ids(kind: &mut EdgeKind, remap: &HashMap<StringId, StringId>) {
676    match kind {
677        // === Variants WITH StringId fields ===
678        EdgeKind::Imports { alias, .. } => remap_option_string_id(alias, remap),
679        EdgeKind::Exports { alias, .. } => remap_option_string_id(alias, remap),
680        EdgeKind::TypeOf { name, .. } => remap_option_string_id(name, remap),
681        EdgeKind::TraitMethodBinding {
682            trait_name,
683            impl_type,
684            ..
685        } => {
686            remap_string_id(trait_name, remap);
687            remap_string_id(impl_type, remap);
688        }
689        EdgeKind::HttpRequest { url, .. } => remap_option_string_id(url, remap),
690        EdgeKind::GrpcCall { service, method } => {
691            remap_string_id(service, remap);
692            remap_string_id(method, remap);
693        }
694        EdgeKind::DbQuery { table, .. } => remap_option_string_id(table, remap),
695        EdgeKind::TableRead { table_name, schema } => {
696            remap_string_id(table_name, remap);
697            remap_option_string_id(schema, remap);
698        }
699        EdgeKind::TableWrite {
700            table_name, schema, ..
701        } => {
702            remap_string_id(table_name, remap);
703            remap_option_string_id(schema, remap);
704        }
705        EdgeKind::TriggeredBy {
706            trigger_name,
707            schema,
708        } => {
709            remap_string_id(trigger_name, remap);
710            remap_option_string_id(schema, remap);
711        }
712        EdgeKind::MessageQueue { protocol, topic } => {
713            if let MqProtocol::Other(s) = protocol {
714                remap_string_id(s, remap);
715            }
716            remap_option_string_id(topic, remap);
717        }
718        EdgeKind::WebSocket { event } => remap_option_string_id(event, remap),
719        EdgeKind::GraphQLOperation { operation } => remap_string_id(operation, remap),
720        EdgeKind::ProcessExec { command } => remap_string_id(command, remap),
721        EdgeKind::FileIpc { path_pattern } => remap_option_string_id(path_pattern, remap),
722        EdgeKind::ProtocolCall { protocol, metadata } => {
723            remap_string_id(protocol, remap);
724            remap_option_string_id(metadata, remap);
725        }
726        // === Variants WITHOUT StringId fields — exhaustive, no wildcard ===
727        EdgeKind::Defines
728        | EdgeKind::Contains
729        | EdgeKind::Calls { .. }
730        | EdgeKind::References
731        | EdgeKind::Inherits
732        | EdgeKind::Implements
733        | EdgeKind::LifetimeConstraint { .. }
734        | EdgeKind::MacroExpansion { .. }
735        | EdgeKind::FfiCall { .. }
736        | EdgeKind::WebAssemblyCall
737        | EdgeKind::GenericBound
738        | EdgeKind::AnnotatedWith
739        | EdgeKind::AnnotationParam
740        | EdgeKind::LambdaCaptures
741        | EdgeKind::ModuleExports
742        | EdgeKind::ModuleRequires
743        | EdgeKind::ModuleOpens
744        | EdgeKind::ModuleProvides
745        | EdgeKind::TypeArgument
746        | EdgeKind::ExtensionReceiver
747        | EdgeKind::CompanionOf
748        | EdgeKind::SealedPermit => {}
749    }
750}
751
752// === Phase 4: Post-chunk Finalization ===
753
754/// Apply global string dedup remap to all `StringId` fields in a `NodeEntry`.
755///
756/// This is the Phase 4 counterpart to `remap_node_entry_string_ids` (Phase 3).
757/// Phase 3 remaps local→global; Phase 4 remaps duplicate global→canonical global.
758#[allow(clippy::implicit_hasher)]
759pub fn remap_node_entry_global(entry: &mut NodeEntry, remap: &HashMap<StringId, StringId>) {
760    remap_string_id(&mut entry.name, remap);
761    remap_option_string_id(&mut entry.signature, remap);
762    remap_option_string_id(&mut entry.doc, remap);
763    remap_option_string_id(&mut entry.qualified_name, remap);
764    remap_option_string_id(&mut entry.visibility, remap);
765}
766
767/// Apply global string dedup remap to all nodes in the arena and all pending edges.
768///
769/// This is Phase 4b of the parallel commit pipeline. After `build_dedup_table()`
770/// produces a remap table, this function applies it to every `StringId` in:
771/// - All `NodeEntry` fields in the arena
772/// - All `EdgeKind` fields in the pending edges
773#[allow(clippy::implicit_hasher)]
774pub fn phase4_apply_global_remap(
775    arena: &mut NodeArena,
776    all_edges: &mut [Vec<PendingEdge>],
777    remap: &HashMap<StringId, StringId>,
778) {
779    if remap.is_empty() {
780        return;
781    }
782
783    // Remap all nodes
784    for (_id, entry) in arena.iter_mut() {
785        remap_node_entry_global(entry, remap);
786    }
787
788    // Remap all edges
789    for file_edges in all_edges.iter_mut() {
790        for edge in file_edges.iter_mut() {
791            remap_edge_kind_string_ids(&mut edge.kind, remap);
792        }
793    }
794}
795
796/// Statistics from Phase 4c-prime cross-file node unification.
797#[derive(Debug, Default)]
798pub struct UnificationStats {
799    /// Total (qualified_name, kind) groups of size >= 2 examined.
800    pub candidate_pairs_examined: usize,
801    /// Number of loser nodes merged into winners.
802    pub nodes_merged: usize,
803    /// Number of PendingEdge fields rewritten.
804    pub edges_rewritten: usize,
805    /// Number of loser nodes (metadata merged into winners, slot kept inert).
806    pub nodes_inert: usize,
807    /// Time spent in the unification pass (milliseconds).
808    pub elapsed_ms: u64,
809}
810
811/// Phase 4c-prime: Unify cross-file duplicate nodes sharing the same
812/// canonical qualified name and a call-compatible kind.
813///
814/// Runs after `rebuild_indices` (Phase 4c) which populates `by_qualified_name`,
815/// and before `pending_edges_to_delta` (Phase 4d) so the remap operates on
816/// `PendingEdge` targets, not committed `DeltaEdge`s.
817///
818/// **Winner selection**: Among nodes sharing a qualified name and call-compatible
819/// kinds, the node with `start_line > 0` wins. Tie-break in order:
820///   1. Wider `end_line - start_line` span.
821///   2. **Lexicographically smallest file path** (resolved via the rebuild
822///      plane's [`FileRegistry`]). Phase 3e correctness requires the
823///      path-based tie-break rather than the previous `FileId` comparison,
824///      because `FileId` slot assignment differs between a fresh full
825///      rebuild and an incremental rebuild — the incremental path clones
826///      the existing `FileRegistry` and appends new paths, while the full
827///      path assigns FileIds in filesystem-walk order from an empty
828///      registry. Two builds of the same logical workspace therefore
829///      disagree on which `FileId` is smaller when duplicate definitions
830///      tie on span width, flipping the unification winner and stranding
831///      `qualified_name` on the wrong side of the merge. Tie-breaking on
832///      the (stable-across-builds) path makes winner selection
833///      representation-independent.
834///   3. Final fallback: smaller `NodeId::index()` when paths also tie
835///      (e.g. two definitions in the same file — rare but possible via
836///      duplicate declarations). `NodeId` is deterministic within a
837///      single build so this keeps the fallback stable for any individual
838///      build even if it isn't invariant across representations.
839///
840/// **Safety**: Caller must hold an exclusive write lock on the graph.
841pub(crate) fn phase4c_prime_unify_cross_file_nodes<
842    G: crate::graph::unified::mutation_target::GraphMutationTarget,
843>(
844    graph: &mut G,
845    all_edges: &mut [Vec<PendingEdge>],
846) -> UnificationStats {
847    use crate::graph::unified::mutation_target::GraphMutationTarget;
848
849    use super::helper::CALL_COMPATIBLE_KINDS;
850    use super::unification::{NodeRemapTable, merge_node_into};
851    use std::time::Instant;
852
853    let start = Instant::now();
854    let mut stats = UnificationStats::default();
855
856    // Collect candidates: walk arena, group by qualified_name for nodes
857    // with call-compatible kinds. Only groups of size >= 2 need unification.
858    let mut qn_groups: HashMap<crate::graph::unified::string::StringId, Vec<NodeId>> =
859        HashMap::new();
860
861    for (node_id, entry) in GraphMutationTarget::nodes(graph).iter() {
862        if !CALL_COMPATIBLE_KINDS.contains(&entry.kind) {
863            continue;
864        }
865        if let Some(qn_id) = entry.qualified_name {
866            qn_groups.entry(qn_id).or_default().push(node_id);
867        }
868    }
869
870    // Filter to groups with 2+ members
871    let groups_to_unify: Vec<Vec<NodeId>> = qn_groups
872        .into_values()
873        .filter(|group| {
874            if group.len() >= 2 {
875                stats.candidate_pairs_examined += 1;
876                true
877            } else {
878                false
879            }
880        })
881        .collect();
882
883    // Now perform merges
884    let mut remap = NodeRemapTable::with_capacity(groups_to_unify.len());
885
886    // Pre-resolve every candidate node's canonical path-based tie-break
887    // key into an owned `String` keyed by `NodeId`. Lifting the resolution
888    // here instead of inside the `max_by` comparator avoids re-borrowing
889    // `graph` immutably from a closure that lives across the
890    // `merge_node_into(&mut graph, …)` call below. Without this
891    // precomputation the borrow checker rejects the mutation loop because
892    // the comparator closure captures the immutable borrow of `graph`
893    // required by `path_key`.
894    //
895    // Path conversion goes through `Arc<Path>::to_string_lossy()` because
896    // `Path` does not implement `Ord` lexicographically across platforms
897    // consistently; forcing a canonical string form keeps the tie-break
898    // deterministic on any host filesystem. When the registry can't
899    // resolve a `FileId` (shouldn't happen in practice — every live
900    // node's `FileId` was registered before the node was allocated) we
901    // fall back to an empty string so the comparison still produces a
902    // total order. Empty resolves tie-break each other stably (then fall
903    // through to the `NodeId` index tie-break).
904    let path_keys: HashMap<NodeId, String> = {
905        let arena = GraphMutationTarget::nodes(graph);
906        let files = GraphMutationTarget::files(graph);
907        let mut out: HashMap<NodeId, String> =
908            HashMap::with_capacity(groups_to_unify.iter().map(Vec::len).sum());
909        for group in &groups_to_unify {
910            for &nid in group {
911                if out.contains_key(&nid) {
912                    continue;
913                }
914                let key = arena
915                    .get(nid)
916                    .and_then(|entry| files.resolve(entry.file))
917                    .map_or_else(String::new, |path| path.to_string_lossy().into_owned());
918                out.insert(nid, key);
919            }
920        }
921        out
922    };
923    let empty_path_key = String::new();
924
925    for group in &groups_to_unify {
926        // Pick winner: prefer start_line > 0, tie-break by wider span,
927        // then smaller path (stable across rebuild representations),
928        // then smaller NodeId index.
929        let winner_id = *group
930            .iter()
931            .max_by(|&&a, &&b| {
932                let ea = GraphMutationTarget::nodes(graph).get(a);
933                let eb = GraphMutationTarget::nodes(graph).get(b);
934                match (ea, eb) {
935                    (Some(ea), Some(eb)) => {
936                        // Primary: prefer non-zero start_line
937                        let a_real = ea.start_line > 0;
938                        let b_real = eb.start_line > 0;
939                        match (a_real, b_real) {
940                            (true, false) => std::cmp::Ordering::Greater,
941                            (false, true) => std::cmp::Ordering::Less,
942                            _ => {
943                                // Tie-break 1: prefer wider span
944                                let a_range = ea.end_line.saturating_sub(ea.start_line);
945                                let b_range = eb.end_line.saturating_sub(eb.start_line);
946                                a_range
947                                    .cmp(&b_range)
948                                    .then_with(|| {
949                                        // Tie-break 2: prefer smaller path
950                                        // (reversed because `max_by` picks the
951                                        // greater side — we want smaller path
952                                        // to win, so invert the direct compare).
953                                        let pa = path_keys.get(&a).unwrap_or(&empty_path_key);
954                                        let pb = path_keys.get(&b).unwrap_or(&empty_path_key);
955                                        pb.cmp(pa)
956                                    })
957                                    .then_with(|| {
958                                        // Tie-break 3: smaller NodeId index
959                                        // (stable within a single build;
960                                        // deterministic fallback for co-located
961                                        // duplicate definitions).
962                                        b.index().cmp(&a.index())
963                                    })
964                            }
965                        }
966                    }
967                    (Some(_), None) => std::cmp::Ordering::Greater,
968                    (None, Some(_)) => std::cmp::Ordering::Less,
969                    (None, None) => std::cmp::Ordering::Equal,
970                }
971            })
972            .expect("group is non-empty");
973
974        // Merge all losers into winner
975        for &node_id in group {
976            if node_id == winner_id {
977                continue;
978            }
979            match merge_node_into(GraphMutationTarget::nodes_mut(graph), node_id, winner_id) {
980                Ok(()) => {
981                    remap.insert(node_id, winner_id);
982                    stats.nodes_merged += 1;
983                    stats.nodes_inert += 1;
984                }
985                Err(e) => {
986                    log::debug!("Phase 4c-prime: skipping merge ({e})");
987                }
988            }
989        }
990    }
991
992    // Apply remap table to all pending edges AND to every committed
993    // edge already in the graph's edge store.
994    //
995    // The `apply_to_edges` call keeps PendingEdges (the output of this
996    // chunk's parallel commit) pointing at canonical winners before
997    // Phase 4d converts them into `DeltaEdge`s. On a full build that is
998    // sufficient — no committed edges exist yet.
999    //
1000    // The `apply_to_committed_edges` call closes the Phase 3e incremental
1001    // gap: the rebuild plane clones the pre-edit graph's committed edges
1002    // via `clone_for_rebuild`, so a newly-reparsed file whose definition
1003    // becomes the unification winner can leave surviving cross-file
1004    // edges pointing at what is now an inert loser slot. Retargeting the
1005    // committed edges through `remap` is the only way those edges
1006    // observe the canonical winner after finalize. On a full build the
1007    // second call is a no-op (edge store is empty).
1008    if !remap.is_empty() {
1009        let pre_count: usize = all_edges.iter().map(|v| v.len()).sum();
1010        remap.apply_to_edges(all_edges);
1011        remap.apply_to_committed_edges(GraphMutationTarget::edges(graph));
1012        stats.edges_rewritten = pre_count; // conservative: all edges walked
1013
1014        // Keep FileRegistry::per_file_nodes consistent with the arena.
1015        //
1016        // [`merge_node_into`] (see `unification.rs`) intentionally does
1017        // **not** vacate the loser slot — the slot stays `Occupied` but
1018        // inert so `NodeArena::slot_count()` (which CSR row_ptr sizing
1019        // depends on) is preserved. Because the slot is still live per
1020        // `NodeArena::iter()`, the §F.1 bucket bijection would panic
1021        // with "live node absent from all buckets" if we purged losers
1022        // from their home bucket.
1023        //
1024        // Therefore: losers stay in whichever per-file bucket Phase 3
1025        // first committed them to. That bucket's `FileId` matches the
1026        // loser's `NodeEntry.file`, so (c) passes. Each loser is in
1027        // exactly one bucket, so (b) passes. Every live arena slot is
1028        // accounted for by some bucket, so (d) passes. The §K master
1029        // matrix already admits this semantics — inert merged-losers
1030        // are semantically equivalent to any other live `NodeArena`
1031        // entry for bucket-membership purposes.
1032        //
1033        // Name-resolution containment (Gate 0d iter-1 blocker).
1034        //
1035        // `merge_node_into` now ALSO clears the loser's `name` and
1036        // `qualified_name` fields (to `StringId::INVALID` / `None`), and
1037        // `AuxiliaryIndices::build_from_arena` skips any arena entry
1038        // whose `name == StringId::INVALID` when rebuilding the name,
1039        // qualified-name, kind, and file buckets. The second
1040        // `rebuild_indices()` call in `build_unified_graph_inner`
1041        // (entrypoint.rs:571, right below this function) runs AFTER
1042        // unification, so the buckets surfaced by `indices.by_name` /
1043        // `by_qualified_name` / `by_kind` / `by_file` contain only
1044        // winners — every public name-resolution surface
1045        // (`resolution::exact_qualified_bucket`,
1046        // `graph::find_by_pattern`, etc.) is therefore free of
1047        // unified-away duplicates. The only publish-visible bucket that
1048        // still references losers is `FileRegistry::per_file_nodes`,
1049        // which preserves the §F.1 bucket bijection without surfacing
1050        // them through name resolution.
1051        //
1052        // Historical note: an earlier iteration of this pass called
1053        // `retain_nodes_in_buckets` to purge losers; that matched a
1054        // stale understanding where `merge_node_into` was expected to
1055        // vacate the slot. Gate 0d's bucket-bijection invariant
1056        // surfaced the mismatch (every full rebuild produced a live
1057        // slot with no bucket entry). The fix is to align with the
1058        // unification contract: inert slots remain in their home
1059        // bucket, but `AuxiliaryIndices` treats them as name-invisible.
1060    }
1061
1062    stats.elapsed_ms = start.elapsed().as_millis() as u64;
1063    stats
1064}
1065
1066/// Convert per-file `PendingEdge` collections to per-file `DeltaEdge` collections
1067/// with monotonically increasing sequence numbers.
1068///
1069/// The sequence numbers are assigned file-by-file, edge-by-edge, starting from
1070/// `seq_start`. This produces the deterministic ordering required by
1071/// `BidirectionalEdgeStore::add_edges_bulk_ordered()`.
1072#[must_use]
1073pub fn pending_edges_to_delta(
1074    per_file_edges: &[Vec<PendingEdge>],
1075    seq_start: u64,
1076) -> (Vec<Vec<DeltaEdge>>, u64) {
1077    let mut seq = seq_start;
1078    let mut result = Vec::with_capacity(per_file_edges.len());
1079
1080    for file_edges in per_file_edges {
1081        let mut delta_vec = Vec::with_capacity(file_edges.len());
1082        for edge in file_edges {
1083            delta_vec.push(DeltaEdge::with_spans(
1084                edge.source,
1085                edge.target,
1086                edge.kind.clone(),
1087                seq,
1088                DeltaOp::Add,
1089                edge.file,
1090                edge.spans.clone(),
1091            ));
1092            seq += 1;
1093        }
1094        result.push(delta_vec);
1095    }
1096
1097    (result, seq)
1098}
1099
1100/// Rebuild the auxiliary indices on `graph` from its current node arena.
1101///
1102/// Generic counterpart to the inherent [`CodeGraph::rebuild_indices`].
1103/// Takes a [`GraphMutationTarget`] so both the full-build
1104/// (`build_unified_graph_inner`) and incremental-rebuild
1105/// (`incremental_rebuild` on `RebuildGraph`) pipelines can share the
1106/// same helper. The inherent method now delegates here so the
1107/// implementation lives in exactly one place.
1108///
1109/// Internally uses [`GraphMutationTarget::nodes_and_indices_mut`] to
1110/// acquire a disjoint `(&NodeArena, &mut AuxiliaryIndices)` pair, then
1111/// hands them to [`AuxiliaryIndices::build_from_arena`] which clears
1112/// the existing indices and rebuilds in a single pass without
1113/// per-element duplicate checking.
1114///
1115/// [`CodeGraph::rebuild_indices`]: crate::graph::unified::concurrent::CodeGraph::rebuild_indices
1116/// [`AuxiliaryIndices::build_from_arena`]: crate::graph::unified::storage::indices::AuxiliaryIndices::build_from_arena
1117pub(crate) fn rebuild_indices<G: crate::graph::unified::mutation_target::GraphMutationTarget>(
1118    graph: &mut G,
1119) {
1120    let (nodes, indices) = graph.nodes_and_indices_mut();
1121    indices.build_from_arena(nodes);
1122}
1123
1124/// Phase 4d — bulk-insert every pending edge into the graph via the
1125/// deterministic `DeltaEdge` conversion path.
1126///
1127/// Wraps the pure [`pending_edges_to_delta`] conversion + the
1128/// [`BidirectionalEdgeStore::add_edges_bulk_ordered`] call that
1129/// `build_unified_graph_inner` ran inline between Phase 4c-prime and
1130/// Phase 4e. The wrapper is generic over [`GraphMutationTarget`] so
1131/// the Task 4 Step 4 Phase 3 `incremental_rebuild` body can call it
1132/// against a [`RebuildGraph`] without duplicating the seq-counter +
1133/// flatten logic.
1134///
1135/// Returns the final edge sequence counter (for callers that need to
1136/// continue allocating deterministic sequence numbers downstream).
1137/// The counter flows from
1138/// [`BidirectionalEdgeStore::forward().seq_counter()`] on the way in
1139/// and advances by one per inserted edge.
1140///
1141/// # Semantics
1142///
1143/// * `per_file_edges` is consumed by-reference; the function does not
1144///   mutate the caller's buffer. Callers who no longer need the
1145///   vectors may drop them after the call.
1146/// * If `per_file_edges` is empty (or every inner vector is empty),
1147///   the edge store is left untouched.
1148/// * The helper does not `bump_epoch()` on the graph — Phase 4d is
1149///   edge-level only; the full pipeline bumps epoch separately.
1150///
1151/// # Edge-source-identity invariant (`C_EDGE_MIGRATE`)
1152///
1153/// Phase 4d does NOT dedup edges by `(source, target, kind)`. Every
1154/// `PendingEdge` from every file becomes one `DeltaEdge` with a unique
1155/// monotonically increasing `seq` number; the
1156/// [`BidirectionalEdgeStore::add_edges_bulk_ordered`] insertion contract
1157/// preserves that 1:1 mapping. This is what lets the Cluster C
1158/// `C_EDGE_MIGRATE` DAG unit (2026-04-29 BadLiveware Go batch) move the
1159/// `TypeOf{Field}` edge source from the struct node to the per-field
1160/// `Property` node without touching this helper: the new
1161/// Property-sourced edge addresses a distinct `(source, target)` pair
1162/// from the legacy struct-sourced edge, and Phase 4d emits both shapes
1163/// with no collapsing. Plugins that only emit the new shape (Go after
1164/// `C_EDGE_MIGRATE`) therefore produce a clean Property-sourced
1165/// `TypeOf{Field}` edge set with no struct-sourced shadows. Plugins
1166/// outside Cluster C's scope (`C_OTHER_PLUGINS`) keep emitting the
1167/// legacy shape until they migrate; the bulk-insert path treats both
1168/// shapes identically.
1169///
1170/// Determinism: per-file `PendingEdge` order is fixed by the parser
1171/// pass, and `pending_edges_to_delta` walks the per-file vectors in
1172/// the input order. So `phase4d_bulk_insert_edges` produces a
1173/// byte-identical `DeltaEdge` sequence on every fresh rebuild of the
1174/// same source tree, which is what guarantees the
1175/// `SnapshotReader → SnapshotWriter` round-trip identity required by
1176/// the `C_EDGE_MIGRATE` acceptance criteria.
1177///
1178/// [`BidirectionalEdgeStore::add_edges_bulk_ordered`]: crate::graph::unified::edge::bidirectional::BidirectionalEdgeStore::add_edges_bulk_ordered
1179/// [`RebuildGraph`]: crate::graph::unified::rebuild::rebuild_graph::RebuildGraph
1180pub(crate) fn phase4d_bulk_insert_edges<
1181    G: crate::graph::unified::mutation_target::GraphMutationTarget,
1182>(
1183    graph: &mut G,
1184    per_file_edges: &[Vec<PendingEdge>],
1185) -> u64 {
1186    // Start seq numbering from the edge store's current counter to
1187    // support non-empty graphs (incremental rebuild carries forward
1188    // the prior build's counter).
1189    let edge_seq_start = graph.edges().forward().seq_counter();
1190    let (delta_edge_vecs, final_seq) = pending_edges_to_delta(per_file_edges, edge_seq_start);
1191    let total_edge_count: u64 = delta_edge_vecs.iter().map(|v| v.len() as u64).sum();
1192    if total_edge_count > 0 {
1193        graph
1194            .edges_mut()
1195            .add_edges_bulk_ordered(&delta_edge_vecs, total_edge_count);
1196    }
1197    final_seq
1198}
1199
1200#[cfg(test)]
1201mod tests {
1202    use super::*;
1203
1204    #[test]
1205    fn test_compute_commit_plan_basic() {
1206        let file_ids = vec![FileId::new(0), FileId::new(1), FileId::new(2)];
1207        let node_counts = vec![3, 0, 5];
1208        let string_counts = vec![2, 1, 3];
1209        let edge_counts = vec![4, 0, 6];
1210
1211        let plan = compute_commit_plan(
1212            &node_counts,
1213            &string_counts,
1214            &edge_counts,
1215            &file_ids,
1216            0,
1217            1, // string_offset=1 for sentinel
1218        );
1219
1220        assert_eq!(plan.total_nodes, 8);
1221        assert_eq!(plan.total_strings, 6);
1222        assert_eq!(plan.total_edges, 10);
1223
1224        // File 0: nodes [0..3), strings [1..3)
1225        assert_eq!(plan.file_plans[0].node_range, 0..3);
1226        assert_eq!(plan.file_plans[0].string_range, 1..3);
1227
1228        // File 1: nodes [3..3), strings [3..4) — empty nodes
1229        assert_eq!(plan.file_plans[1].node_range, 3..3);
1230        assert_eq!(plan.file_plans[1].string_range, 3..4);
1231
1232        // File 2: nodes [3..8), strings [4..7)
1233        assert_eq!(plan.file_plans[2].node_range, 3..8);
1234        assert_eq!(plan.file_plans[2].string_range, 4..7);
1235    }
1236
1237    #[test]
1238    fn test_compute_commit_plan_with_offsets() {
1239        let file_ids = vec![FileId::new(5)];
1240        let plan = compute_commit_plan(&[10], &[5], &[7], &file_ids, 100, 50);
1241        assert_eq!(plan.file_plans[0].node_range, 100..110);
1242        assert_eq!(plan.file_plans[0].string_range, 50..55);
1243        assert_eq!(plan.total_nodes, 10);
1244        assert_eq!(plan.total_strings, 5);
1245        assert_eq!(plan.total_edges, 7);
1246    }
1247
1248    #[test]
1249    fn test_compute_commit_plan_empty() {
1250        let plan = compute_commit_plan(&[], &[], &[], &[], 0, 1);
1251        assert_eq!(plan.total_nodes, 0);
1252        assert_eq!(plan.total_strings, 0);
1253        assert_eq!(plan.total_edges, 0);
1254        assert!(plan.file_plans.is_empty());
1255    }
1256
1257    #[test]
1258    fn test_remap_string_id_basic() {
1259        let mut remap = HashMap::new();
1260        remap.insert(StringId::new(1), StringId::new(100));
1261
1262        let mut id = StringId::new(1);
1263        remap_string_id(&mut id, &remap);
1264        assert_eq!(id, StringId::new(100));
1265    }
1266
1267    #[test]
1268    fn test_remap_string_id_not_in_remap() {
1269        let remap = HashMap::new();
1270        let mut id = StringId::new(42);
1271        remap_string_id(&mut id, &remap);
1272        assert_eq!(id, StringId::new(42)); // unchanged
1273    }
1274
1275    #[test]
1276    fn test_remap_option_string_id() {
1277        let mut remap = HashMap::new();
1278        remap.insert(StringId::new(5), StringId::new(50));
1279
1280        let mut some_id = Some(StringId::new(5));
1281        remap_option_string_id(&mut some_id, &remap);
1282        assert_eq!(some_id, Some(StringId::new(50)));
1283
1284        let mut none_id: Option<StringId> = None;
1285        remap_option_string_id(&mut none_id, &remap);
1286        assert_eq!(none_id, None);
1287    }
1288
1289    #[test]
1290    fn test_remap_edge_kind_imports() {
1291        let mut remap = HashMap::new();
1292        remap.insert(StringId::new(1), StringId::new(100));
1293
1294        let mut kind = EdgeKind::Imports {
1295            alias: Some(StringId::new(1)),
1296            is_wildcard: false,
1297        };
1298        remap_edge_kind_string_ids(&mut kind, &remap);
1299        assert!(
1300            matches!(kind, EdgeKind::Imports { alias: Some(id), .. } if id == StringId::new(100))
1301        );
1302    }
1303
1304    #[test]
1305    fn test_remap_edge_kind_trait_method_binding() {
1306        let mut remap = HashMap::new();
1307        remap.insert(StringId::new(1), StringId::new(100));
1308        remap.insert(StringId::new(2), StringId::new(200));
1309
1310        let mut kind = EdgeKind::TraitMethodBinding {
1311            trait_name: StringId::new(1),
1312            impl_type: StringId::new(2),
1313            is_ambiguous: false,
1314        };
1315        remap_edge_kind_string_ids(&mut kind, &remap);
1316        assert!(
1317            matches!(kind, EdgeKind::TraitMethodBinding { trait_name, impl_type, .. }
1318                if trait_name == StringId::new(100) && impl_type == StringId::new(200))
1319        );
1320    }
1321
1322    #[test]
1323    fn test_remap_edge_kind_no_op_variants() {
1324        let remap = HashMap::new();
1325
1326        // Defines — no StringId fields
1327        let mut kind = EdgeKind::Defines;
1328        remap_edge_kind_string_ids(&mut kind, &remap);
1329        assert!(matches!(kind, EdgeKind::Defines));
1330
1331        // Calls — no StringId fields
1332        let mut kind = EdgeKind::Calls {
1333            argument_count: 3,
1334            is_async: true,
1335        };
1336        remap_edge_kind_string_ids(&mut kind, &remap);
1337        assert!(matches!(
1338            kind,
1339            EdgeKind::Calls {
1340                argument_count: 3,
1341                is_async: true,
1342            }
1343        ));
1344    }
1345
1346    fn placeholder_entry() -> NodeEntry {
1347        use crate::graph::unified::node::NodeKind;
1348        NodeEntry::new(NodeKind::Function, StringId::new(0), FileId::new(0))
1349    }
1350
1351    #[test]
1352    fn test_phase2_assign_ranges_basic() {
1353        use super::super::staging::StagingGraph;
1354
1355        // Create 2 staging graphs with known counts
1356        let mut sg0 = StagingGraph::new();
1357        let mut sg1 = StagingGraph::new();
1358
1359        // sg0: 2 nodes, 1 string, 1 edge
1360        let entry0 = placeholder_entry();
1361        let n0 = sg0.add_node(entry0.clone());
1362        let n1 = sg0.add_node(entry0.clone());
1363        sg0.intern_string(StringId::new_local(0), "hello".into());
1364        sg0.add_edge(
1365            n0,
1366            n1,
1367            EdgeKind::Calls {
1368                argument_count: 0,
1369                is_async: false,
1370            },
1371            FileId::new(0),
1372        );
1373
1374        // sg1: 1 node, 2 strings, 0 edges
1375        sg1.add_node(entry0);
1376        sg1.intern_string(StringId::new_local(0), "world".into());
1377        sg1.intern_string(StringId::new_local(1), "foo".into());
1378
1379        let file_ids = vec![FileId::new(10), FileId::new(11)];
1380        let offsets = GlobalOffsets {
1381            node_offset: 5,
1382            string_offset: 3,
1383        };
1384
1385        let plan = phase2_assign_ranges(&[&sg0, &sg1], &file_ids, &offsets);
1386
1387        // sg0: 2 nodes, 1 string, 1 edge
1388        assert_eq!(plan.file_plans[0].node_range, 5..7);
1389        assert_eq!(plan.file_plans[0].string_range, 3..4);
1390
1391        // sg1: 1 node, 2 strings, 0 edges
1392        assert_eq!(plan.file_plans[1].node_range, 7..8);
1393        assert_eq!(plan.file_plans[1].string_range, 4..6);
1394
1395        assert_eq!(plan.total_nodes, 3);
1396        assert_eq!(plan.total_strings, 3);
1397        assert_eq!(plan.total_edges, 1);
1398    }
1399
1400    #[test]
1401    fn test_phase3_parallel_commit_basic() {
1402        use super::super::staging::StagingGraph;
1403        use crate::graph::unified::concurrent::CodeGraph;
1404        use crate::graph::unified::node::NodeKind;
1405        // The `nodes_mut` / `strings_mut` method calls below resolve
1406        // to inherent methods on `CodeGraph`; the `GraphMutationTarget`
1407        // trait impl provides the same surface for `RebuildGraph`
1408        // (see `phase3_parallel_commit_runs_against_rebuild_graph`).
1409        // No trait import is needed here because inherent-method
1410        // resolution wins for `CodeGraph`.
1411
1412        // Create a staging graph with 2 nodes, 1 string, 1 edge
1413        let mut sg = StagingGraph::new();
1414        let local_name = StringId::new_local(0);
1415        sg.intern_string(local_name, "my_func".into());
1416
1417        let entry = NodeEntry::new(NodeKind::Function, local_name, FileId::new(0));
1418        let n0 = sg.add_node(entry.clone());
1419
1420        let entry2 = NodeEntry::new(NodeKind::Variable, local_name, FileId::new(0));
1421        let n1 = sg.add_node(entry2);
1422
1423        sg.add_edge(
1424            n0,
1425            n1,
1426            EdgeKind::Calls {
1427                argument_count: 0,
1428                is_async: false,
1429            },
1430            FileId::new(0),
1431        );
1432
1433        let file_ids = vec![FileId::new(5)];
1434
1435        // Pre-allocate with non-zero offsets to verify remap works,
1436        // against a full `CodeGraph` so the new generic signature is
1437        // exercised end-to-end via `GraphMutationTarget`.
1438        let mut graph = CodeGraph::new();
1439        graph
1440            .nodes_mut()
1441            .alloc_range(10, &placeholder_entry())
1442            .unwrap();
1443        let string_start = graph.strings_mut().alloc_range(1).unwrap();
1444        assert_eq!(string_start, 1); // past sentinel
1445
1446        let offsets = GlobalOffsets {
1447            node_offset: 10, // file's nodes start at index 10
1448            string_offset: string_start,
1449        };
1450        let plan = phase2_assign_ranges(&[&sg], &file_ids, &offsets);
1451        assert_eq!(plan.file_plans[0].node_range, 10..12);
1452
1453        // Pre-allocate the actual ranges for Phase 3.
1454        graph
1455            .nodes_mut()
1456            .alloc_range(plan.total_nodes, &placeholder_entry())
1457            .unwrap();
1458        graph.strings_mut().alloc_range(plan.total_strings).unwrap();
1459
1460        // Phase 3 — generic over `G: GraphMutationTarget`. Passing
1461        // `&mut graph` infers `G = CodeGraph`.
1462        let result = phase3_parallel_commit(&plan, &[&sg], &mut graph);
1463
1464        // Verify written counts
1465        assert_eq!(result.total_nodes_written, 2);
1466        assert_eq!(result.total_strings_written, 1);
1467
1468        // Verify strings were written
1469        let global_name = StringId::new(string_start);
1470        assert_eq!(&*graph.strings().resolve(global_name).unwrap(), "my_func");
1471
1472        // Verify 1 file, 1 edge
1473        assert_eq!(result.per_file_edges.len(), 1);
1474        assert_eq!(result.per_file_edges[0].len(), 1);
1475
1476        // Verify edge was remapped to global IDs (node_offset=10)
1477        let edge = &result.per_file_edges[0][0];
1478        assert_eq!(edge.file, FileId::new(5));
1479        assert_eq!(edge.source, NodeId::new(10, 1)); // first node at slot 10
1480        assert_eq!(edge.target, NodeId::new(11, 1)); // second node at slot 11
1481
1482        // Gate 0c (iter-2 B2): per-file node IDs must be recorded in
1483        // commit order, one Vec per FilePlan, so the caller can
1484        // populate FileRegistry::per_file_nodes deterministically.
1485        assert_eq!(result.per_file_node_ids.len(), 1);
1486        assert_eq!(
1487            result.per_file_node_ids[0],
1488            vec![NodeId::new(10, 1), NodeId::new(11, 1)]
1489        );
1490    }
1491
1492    #[test]
1493    fn test_phase3_parallel_commit_empty() {
1494        use crate::graph::unified::concurrent::CodeGraph;
1495
1496        let mut graph = CodeGraph::new();
1497
1498        let plan = ChunkCommitPlan {
1499            file_plans: vec![],
1500            total_nodes: 0,
1501            total_strings: 0,
1502            total_edges: 0,
1503        };
1504
1505        let result = phase3_parallel_commit(&plan, &[], &mut graph);
1506        assert!(result.per_file_edges.is_empty());
1507        assert!(result.per_file_node_ids.is_empty());
1508        assert_eq!(result.total_nodes_written, 0);
1509        assert_eq!(result.total_strings_written, 0);
1510    }
1511
1512    /// Task 4 Step 4 Phase 1 — exercise the `GraphMutationTarget`
1513    /// trait's second implementor.
1514    ///
1515    /// Builds a tiny staging graph, hosts it in a fresh `RebuildGraph`,
1516    /// and asserts the committed nodes land in the **rebuild-local**
1517    /// arena — not in a `CodeGraph`. The test also confirms the
1518    /// per-file edges / node-id vectors the helper returns agree with
1519    /// the `CodeGraph` call-path result shape.
1520    ///
1521    /// If a future refactor accidentally routed Phase 3 back to a
1522    /// `CodeGraph` (e.g. through a hidden static `Arc::make_mut`), this
1523    /// test would observe an empty rebuild arena and fail.
1524    #[test]
1525    #[cfg(feature = "rebuild-internals")]
1526    fn phase3_parallel_commit_runs_against_rebuild_graph() {
1527        use super::super::staging::StagingGraph;
1528        use crate::graph::unified::concurrent::CodeGraph;
1529        use crate::graph::unified::mutation_target::GraphMutationTarget;
1530        use crate::graph::unified::node::NodeKind;
1531
1532        // Staging graph: 2 nodes + 1 string + 1 Calls edge (identical
1533        // shape to the CodeGraph test above, so any behavioural drift
1534        // between the two paths surfaces as different assertions).
1535        let mut sg = StagingGraph::new();
1536        let local_name = StringId::new_local(0);
1537        sg.intern_string(local_name, "rebuild_target".into());
1538        let entry = NodeEntry::new(NodeKind::Function, local_name, FileId::new(0));
1539        let n0 = sg.add_node(entry.clone());
1540        let entry2 = NodeEntry::new(NodeKind::Variable, local_name, FileId::new(0));
1541        let n1 = sg.add_node(entry2);
1542        sg.add_edge(
1543            n0,
1544            n1,
1545            EdgeKind::Calls {
1546                argument_count: 0,
1547                is_async: false,
1548            },
1549            FileId::new(0),
1550        );
1551
1552        // Produce a RebuildGraph from an empty CodeGraph; drop the
1553        // CodeGraph immediately so any subsequent mutation observed in
1554        // the rebuild cannot possibly be leaking back to a shared Arc.
1555        let mut rebuild = {
1556            let graph = CodeGraph::new();
1557            graph.clone_for_rebuild()
1558        };
1559
1560        // Pre-allocate leading slots on the rebuild-local arena +
1561        // interner so the file's ranges begin at a non-zero offset —
1562        // this is the same pattern the CodeGraph test uses, verifying
1563        // the trait's disjoint-borrow combinator threads through
1564        // identically.
1565        rebuild
1566            .nodes_mut()
1567            .alloc_range(10, &placeholder_entry())
1568            .unwrap();
1569        let string_start = rebuild.strings_mut().alloc_range(1).unwrap();
1570        assert_eq!(string_start, 1);
1571
1572        let file_ids = vec![FileId::new(5)];
1573        let offsets = GlobalOffsets {
1574            node_offset: 10,
1575            string_offset: string_start,
1576        };
1577        let plan = phase2_assign_ranges(&[&sg], &file_ids, &offsets);
1578
1579        rebuild
1580            .nodes_mut()
1581            .alloc_range(plan.total_nodes, &placeholder_entry())
1582            .unwrap();
1583        rebuild
1584            .strings_mut()
1585            .alloc_range(plan.total_strings)
1586            .unwrap();
1587
1588        // Phase 3 against the RebuildGraph. Inferred `G = RebuildGraph`.
1589        let result = phase3_parallel_commit(&plan, &[&sg], &mut rebuild);
1590
1591        // === Invariant: the written data lives in the rebuild-local
1592        // arena, not in any CodeGraph field. ===
1593        //
1594        // Two slot ranges exist on the rebuild's arena now:
1595        //   * slots 0..10 = pre-fill placeholders (each `Function` /
1596        //     `StringId::new(0)` — note every alloc_range writes a
1597        //     clone of the template entry).
1598        //   * slots 10..12 = the two committed nodes from `sg`.
1599        //
1600        // Fetch the two committed NodeIds and resolve their names
1601        // through the rebuild-local interner; the string must match
1602        // the staged value "rebuild_target", proving the commit ran
1603        // on the rebuild's own fields.
1604        let committed_ids = &result.per_file_node_ids[0];
1605        assert_eq!(
1606            committed_ids,
1607            &vec![NodeId::new(10, 1), NodeId::new(11, 1)],
1608            "Phase 3 must commit into slots 10..12 on the rebuild-local arena"
1609        );
1610
1611        let resolved_name = rebuild
1612            .nodes_mut()
1613            .get(NodeId::new(10, 1))
1614            .map(|entry| entry.name)
1615            .expect("committed node must exist in rebuild arena");
1616        // The name StringId on the committed node is a global ID
1617        // (Phase 3 remaps local → global); resolving it through the
1618        // rebuild-local interner must produce the staged value.
1619        let resolved_str = rebuild
1620            .strings_mut()
1621            .resolve(resolved_name)
1622            .expect("name must resolve in rebuild-local interner");
1623        assert_eq!(&*resolved_str, "rebuild_target");
1624
1625        // === Shape invariants match the CodeGraph path ===
1626        assert_eq!(result.total_nodes_written, 2);
1627        assert_eq!(result.total_strings_written, 1);
1628        assert_eq!(result.per_file_edges.len(), 1);
1629        assert_eq!(result.per_file_edges[0].len(), 1);
1630        let edge = &result.per_file_edges[0][0];
1631        assert_eq!(edge.file, FileId::new(5));
1632        assert_eq!(edge.source, NodeId::new(10, 1));
1633        assert_eq!(edge.target, NodeId::new(11, 1));
1634    }
1635
1636    #[test]
1637    fn test_commit_single_file_string_remap() {
1638        use super::super::staging::StagingGraph;
1639        use crate::graph::unified::node::NodeKind;
1640
1641        let mut sg = StagingGraph::new();
1642        let local_0 = StringId::new_local(0);
1643        let local_1 = StringId::new_local(1);
1644        sg.intern_string(local_0, "alpha".into());
1645        sg.intern_string(local_1, "beta".into());
1646
1647        let mut entry = NodeEntry::new(NodeKind::Function, local_0, FileId::new(0));
1648        entry.signature = Some(local_1);
1649        sg.add_node(entry);
1650
1651        let plan = FilePlan {
1652            parsed_index: 0,
1653            file_id: FileId::new(42),
1654            node_range: 10..11,
1655            string_range: 20..22,
1656        };
1657
1658        let mut node_slots = vec![Slot::new_occupied(1, placeholder_entry())];
1659        let mut str_slots: Vec<Option<Arc<str>>> = vec![None, None];
1660        let mut rc_slots: Vec<u32> = vec![0, 0];
1661
1662        let result = commit_single_file(&sg, &plan, &mut node_slots, &mut str_slots, &mut rc_slots);
1663
1664        // Strings written
1665        assert_eq!(str_slots[0].as_deref(), Some("alpha"));
1666        assert_eq!(str_slots[1].as_deref(), Some("beta"));
1667        assert_eq!(rc_slots[0], 1);
1668        assert_eq!(rc_slots[1], 1);
1669        assert_eq!(result.strings_written, 2);
1670
1671        // Node entry has remapped StringIds
1672        if let crate::graph::unified::storage::SlotState::Occupied(entry) = node_slots[0].state() {
1673            assert_eq!(entry.name, StringId::new(20)); // global slot 20
1674            assert_eq!(entry.signature, Some(StringId::new(21))); // global slot 21
1675            assert_eq!(entry.file, FileId::new(42));
1676        } else {
1677            panic!("Expected occupied slot");
1678        }
1679        assert_eq!(result.nodes_written, 1);
1680
1681        // Per-file node IDs are recorded in commit order (Gate 0c bucket contract).
1682        assert_eq!(result.node_ids, vec![NodeId::new(10, 1)]);
1683
1684        // No edges
1685        assert!(result.edges.is_empty());
1686    }
1687
1688    #[test]
1689    fn test_remap_edge_kind_message_queue_other() {
1690        let mut remap = HashMap::new();
1691        remap.insert(StringId::new(10), StringId::new(110));
1692        remap.insert(StringId::new(20), StringId::new(220));
1693
1694        let mut kind = EdgeKind::MessageQueue {
1695            protocol: MqProtocol::Other(StringId::new(10)),
1696            topic: Some(StringId::new(20)),
1697        };
1698        remap_edge_kind_string_ids(&mut kind, &remap);
1699        assert!(matches!(
1700            kind,
1701            EdgeKind::MessageQueue {
1702                protocol: MqProtocol::Other(proto),
1703                topic: Some(topic),
1704            } if proto == StringId::new(110) && topic == StringId::new(220)
1705        ));
1706    }
1707
1708    // === Phase 4 tests ===
1709
1710    #[test]
1711    fn test_phase4_apply_global_remap_basic() {
1712        use crate::graph::unified::node::NodeKind;
1713        use crate::graph::unified::storage::NodeArena;
1714
1715        let mut arena = NodeArena::new();
1716
1717        // Allocate two nodes with duplicate string IDs (2 and 3 are dupes of 1)
1718        let entry1 = NodeEntry::new(NodeKind::Function, StringId::new(1), FileId::new(0));
1719        let mut entry2 = NodeEntry::new(NodeKind::Variable, StringId::new(2), FileId::new(0));
1720        entry2.signature = Some(StringId::new(3));
1721
1722        arena.alloc(entry1).unwrap();
1723        arena.alloc(entry2).unwrap();
1724
1725        // Edges with string IDs that need remapping
1726        let mut all_edges = vec![vec![PendingEdge {
1727            source: NodeId::new(0, 1),
1728            target: NodeId::new(1, 1),
1729            kind: EdgeKind::Imports {
1730                alias: Some(StringId::new(3)),
1731                is_wildcard: false,
1732            },
1733            file: FileId::new(0),
1734            spans: vec![],
1735        }]];
1736
1737        // Dedup remap: 2→1, 3→1
1738        let mut remap = HashMap::new();
1739        remap.insert(StringId::new(2), StringId::new(1));
1740        remap.insert(StringId::new(3), StringId::new(1));
1741
1742        phase4_apply_global_remap(&mut arena, &mut all_edges, &remap);
1743
1744        // Check that node 1's name was remapped from 2→1
1745        let (_, entry) = arena.iter().nth(1).unwrap();
1746        assert_eq!(entry.name, StringId::new(1));
1747        assert_eq!(entry.signature, Some(StringId::new(1)));
1748
1749        // Check that edge's alias was remapped from 3→1
1750        if let EdgeKind::Imports { alias, .. } = &all_edges[0][0].kind {
1751            assert_eq!(*alias, Some(StringId::new(1)));
1752        } else {
1753            panic!("Expected Imports edge");
1754        }
1755    }
1756
1757    #[test]
1758    fn test_phase4_apply_global_remap_empty() {
1759        use crate::graph::unified::storage::NodeArena;
1760
1761        let mut arena = NodeArena::new();
1762        let mut edges: Vec<Vec<PendingEdge>> = vec![];
1763        let remap = HashMap::new();
1764
1765        // Should be a no-op
1766        phase4_apply_global_remap(&mut arena, &mut edges, &remap);
1767    }
1768
1769    #[test]
1770    fn test_pending_edges_to_delta_basic() {
1771        let edges = vec![
1772            vec![
1773                PendingEdge {
1774                    source: NodeId::new(0, 1),
1775                    target: NodeId::new(1, 1),
1776                    kind: EdgeKind::Calls {
1777                        argument_count: 0,
1778                        is_async: false,
1779                    },
1780                    file: FileId::new(0),
1781                    spans: vec![],
1782                },
1783                PendingEdge {
1784                    source: NodeId::new(1, 1),
1785                    target: NodeId::new(2, 1),
1786                    kind: EdgeKind::References,
1787                    file: FileId::new(0),
1788                    spans: vec![],
1789                },
1790            ],
1791            vec![PendingEdge {
1792                source: NodeId::new(3, 1),
1793                target: NodeId::new(4, 1),
1794                kind: EdgeKind::Defines,
1795                file: FileId::new(1),
1796                spans: vec![],
1797            }],
1798        ];
1799
1800        let (deltas, final_seq) = pending_edges_to_delta(&edges, 100);
1801
1802        assert_eq!(deltas.len(), 2);
1803        assert_eq!(deltas[0].len(), 2);
1804        assert_eq!(deltas[1].len(), 1);
1805        assert_eq!(final_seq, 103);
1806
1807        // Check sequence numbers are monotonic
1808        assert_eq!(deltas[0][0].seq, 100);
1809        assert_eq!(deltas[0][1].seq, 101);
1810        assert_eq!(deltas[1][0].seq, 102);
1811
1812        // Check all are Add operations
1813        assert!(matches!(deltas[0][0].op, DeltaOp::Add));
1814        assert!(matches!(deltas[1][0].op, DeltaOp::Add));
1815    }
1816
1817    #[test]
1818    fn test_pending_edges_to_delta_empty() {
1819        let edges: Vec<Vec<PendingEdge>> = vec![];
1820        let (deltas, final_seq) = pending_edges_to_delta(&edges, 0);
1821        assert!(deltas.is_empty());
1822        assert_eq!(final_seq, 0);
1823    }
1824
1825    // ==================================================================
1826    // Task 4 Step 4 Phase 2: rebuild-plane coverage for migrated helpers.
1827    //
1828    // Each test below proves that the migrated helper runs against a
1829    // `RebuildGraph` (not just a `CodeGraph`) and that the mutation
1830    // lands on the rebuild-local state. Together with the CodeGraph
1831    // tests that still exercise the same helpers on the full-build
1832    // path, they form the "runs on both implementors" coverage
1833    // contract for `GraphMutationTarget` consumers.
1834    // ==================================================================
1835
1836    /// Seed two call-compatible nodes (both `NodeKind::Function`) under
1837    /// the same qualified-name StringId across two distinct files, then
1838    /// run [`phase4c_prime_unify_cross_file_nodes`] against a
1839    /// [`RebuildGraph`]. Verify the loser node is tombstoned
1840    /// (name + qualified_name cleared per `merge_node_into`'s contract)
1841    /// and that pending edges pointing at the loser are rewritten to
1842    /// the winner.
1843    #[test]
1844    #[cfg(feature = "rebuild-internals")]
1845    fn phase4c_prime_unify_cross_file_nodes_runs_against_rebuild_graph() {
1846        use crate::graph::unified::concurrent::CodeGraph;
1847        use crate::graph::unified::mutation_target::GraphMutationTarget;
1848        use crate::graph::unified::node::NodeKind;
1849
1850        let mut rebuild = {
1851            let graph = CodeGraph::new();
1852            graph.clone_for_rebuild()
1853        };
1854
1855        // Intern a shared qualified name. On the rebuild-local
1856        // interner; strings() resolves it for later assertions.
1857        let qname_sid = rebuild.strings_mut().intern("my_mod::my_func").unwrap();
1858
1859        // Register two files that host the duplicate Function nodes.
1860        let file_a = FileId::new(7);
1861        let file_b = FileId::new(8);
1862
1863        // Build two `NodeKind::Function` entries sharing the same
1864        // qualified_name. Winner has a wider span (start_line > 0 and
1865        // end_line > start_line) to exercise the winner-selection
1866        // tie-break.
1867        let mut winner_entry = NodeEntry::new(NodeKind::Function, qname_sid, file_a);
1868        winner_entry.qualified_name = Some(qname_sid);
1869        winner_entry.start_line = 10;
1870        winner_entry.end_line = 30;
1871
1872        let mut loser_entry = NodeEntry::new(NodeKind::Function, qname_sid, file_b);
1873        loser_entry.qualified_name = Some(qname_sid);
1874        // Narrower span → loses the tie-break.
1875        loser_entry.start_line = 5;
1876        loser_entry.end_line = 6;
1877
1878        let winner_id = rebuild.nodes_mut().alloc(winner_entry).unwrap();
1879        let loser_id = rebuild.nodes_mut().alloc(loser_entry).unwrap();
1880
1881        // A pending edge whose target is the loser — the remap table
1882        // should rewrite it to point at the winner.
1883        let mut all_edges = vec![vec![PendingEdge {
1884            source: winner_id, // any valid source — the helper only rewrites targets here
1885            target: loser_id,
1886            kind: EdgeKind::Calls {
1887                argument_count: 0,
1888                is_async: false,
1889            },
1890            file: file_b,
1891            spans: vec![],
1892        }]];
1893
1894        let stats = phase4c_prime_unify_cross_file_nodes(&mut rebuild, &mut all_edges);
1895
1896        // Stats shape
1897        assert_eq!(stats.nodes_merged, 1, "exactly one loser was tombstoned");
1898        assert_eq!(stats.candidate_pairs_examined, 1);
1899        assert_eq!(stats.edges_rewritten, 1);
1900
1901        // Winner node survived with qualified_name intact.
1902        let winner_entry_after = GraphMutationTarget::nodes(&rebuild)
1903            .get(winner_id)
1904            .expect("winner must remain live");
1905        assert_eq!(
1906            winner_entry_after.qualified_name,
1907            Some(qname_sid),
1908            "winner keeps its qualified_name"
1909        );
1910
1911        // Loser entry was merged via `merge_node_into`, which clears
1912        // `name` and `qualified_name` to make the slot name-invisible.
1913        let loser_entry_after = GraphMutationTarget::nodes(&rebuild)
1914            .get(loser_id)
1915            .expect("loser slot remains live (inert) per §F.1 bijection");
1916        assert_eq!(
1917            loser_entry_after.qualified_name, None,
1918            "loser qualified_name cleared by merge_node_into"
1919        );
1920
1921        // Pending edge target rewritten winner-ward.
1922        assert_eq!(
1923            all_edges[0][0].target, winner_id,
1924            "PendingEdge.target rewritten from loser → winner"
1925        );
1926    }
1927
1928    /// Lock in the Phase 4c-prime tie-break ordering Codex blessed in iter-1:
1929    /// primary = `start_line > 0`, tie-break 1 = wider span, tie-break 2 =
1930    /// lexicographically smaller **file path** (stable across rebuild
1931    /// representations), final fallback = smaller `NodeId::index()`.
1932    ///
1933    /// This test exercises the tie-break 2 path: two candidates with real
1934    /// spans of identical width, hosted in two different files that differ
1935    /// only in filename ordering. The winner must be the node whose file
1936    /// path sorts earlier, regardless of NodeId allocation order.
1937    #[test]
1938    #[cfg(feature = "rebuild-internals")]
1939    fn phase4c_prime_tie_break_prefers_lex_smaller_path_over_node_id() {
1940        use crate::graph::unified::concurrent::CodeGraph;
1941        use crate::graph::unified::node::NodeKind;
1942        use std::path::Path;
1943
1944        let mut graph = CodeGraph::new();
1945        let qname = graph.strings_mut().intern("shared_qname").unwrap();
1946        // Register two paths whose lexical ordering is the reverse of
1947        // the registration (and hence NodeId) order. This isolates the
1948        // path-based tie-break from any accidental NodeId-ordering
1949        // coincidence: if the helper fell back to NodeId the "wrong"
1950        // node would win.
1951        let high_path_file = graph
1952            .files_mut()
1953            .register(Path::new("zzz_late.rs"))
1954            .unwrap();
1955        let low_path_file = graph
1956            .files_mut()
1957            .register(Path::new("aaa_early.rs"))
1958            .unwrap();
1959
1960        // Allocate the `zzz_late.rs` node first so its NodeId::index() is
1961        // numerically smaller than the `aaa_early.rs` node's. With
1962        // identical spans, NodeId-only tie-break would incorrectly pick
1963        // the `zzz_late.rs` node. The correct behaviour is that the
1964        // path-based tie-break picks the `aaa_early.rs` node.
1965        let mut high_entry = NodeEntry::new(NodeKind::Function, qname, high_path_file);
1966        high_entry.qualified_name = Some(qname);
1967        high_entry.start_line = 10;
1968        high_entry.end_line = 20;
1969        let high_node = graph.nodes_mut().alloc(high_entry).unwrap();
1970
1971        let mut low_entry = NodeEntry::new(NodeKind::Function, qname, low_path_file);
1972        low_entry.qualified_name = Some(qname);
1973        // Identical span width — forces the tie-break to ignore primary
1974        // + tie-break 1 (span width) and reach tie-break 2 (path).
1975        low_entry.start_line = 10;
1976        low_entry.end_line = 20;
1977        let low_node = graph.nodes_mut().alloc(low_entry).unwrap();
1978
1979        graph.rebuild_indices();
1980
1981        let mut all_edges: Vec<Vec<PendingEdge>> = Vec::new();
1982        let stats = phase4c_prime_unify_cross_file_nodes(&mut graph, &mut all_edges);
1983
1984        assert_eq!(
1985            stats.nodes_merged, 1,
1986            "one of the duplicate nodes must be merged into the other"
1987        );
1988
1989        // The `aaa_early.rs` node wins because its path sorts lexically
1990        // smaller. Verify its qualified_name is intact.
1991        let low_after = graph
1992            .nodes()
1993            .get(low_node)
1994            .expect("winner slot remains live");
1995        assert_eq!(
1996            low_after.qualified_name,
1997            Some(qname),
1998            "path-earlier node keeps qualified_name as the unification winner"
1999        );
2000
2001        // And the `zzz_late.rs` node — despite a numerically smaller
2002        // NodeId::index() — was merged away.
2003        let high_after = graph
2004            .nodes()
2005            .get(high_node)
2006            .expect("loser slot remains inert (Gate 0d bijection contract)");
2007        assert_eq!(
2008            high_after.qualified_name, None,
2009            "path-later node loses even when its NodeId::index() is smaller"
2010        );
2011    }
2012
2013    /// When the path-based tie-break ALSO ties (two duplicate nodes in the
2014    /// same file — rare but possible via duplicate definitions), the
2015    /// deterministic fallback is `b.index().cmp(&a.index())` which picks
2016    /// the node with the **smaller** NodeId index. Lock that in so future
2017    /// refactors of the tie-break don't accidentally flip the fallback
2018    /// direction.
2019    #[test]
2020    #[cfg(feature = "rebuild-internals")]
2021    fn phase4c_prime_tie_break_falls_back_to_smaller_node_id_on_identical_path() {
2022        use crate::graph::unified::concurrent::CodeGraph;
2023        use crate::graph::unified::node::NodeKind;
2024        use std::path::Path;
2025
2026        let mut graph = CodeGraph::new();
2027        let qname = graph.strings_mut().intern("shared_qname").unwrap();
2028        let file = graph.files_mut().register(Path::new("shared.rs")).unwrap();
2029
2030        // Allocate two duplicate nodes in the SAME file with identical
2031        // spans. The only thing that differs between them is their
2032        // NodeId index (allocation order). Tie-breaks 1 (span width)
2033        // and 2 (path) both return Equal; the final `b.index().cmp(&a.index())`
2034        // fallback picks the smaller index as the winner.
2035        let mut first_entry = NodeEntry::new(NodeKind::Function, qname, file);
2036        first_entry.qualified_name = Some(qname);
2037        first_entry.start_line = 1;
2038        first_entry.end_line = 5;
2039        let first_node = graph.nodes_mut().alloc(first_entry).unwrap();
2040
2041        let mut second_entry = NodeEntry::new(NodeKind::Function, qname, file);
2042        second_entry.qualified_name = Some(qname);
2043        second_entry.start_line = 1;
2044        second_entry.end_line = 5;
2045        let second_node = graph.nodes_mut().alloc(second_entry).unwrap();
2046
2047        assert!(
2048            first_node.index() < second_node.index(),
2049            "precondition: first_node's arena slot precedes second_node's"
2050        );
2051
2052        graph.rebuild_indices();
2053
2054        let mut all_edges: Vec<Vec<PendingEdge>> = Vec::new();
2055        let stats = phase4c_prime_unify_cross_file_nodes(&mut graph, &mut all_edges);
2056
2057        assert_eq!(stats.nodes_merged, 1);
2058
2059        // Smaller NodeId::index() wins.
2060        let winner_after = graph.nodes().get(first_node).expect("winner live");
2061        assert_eq!(
2062            winner_after.qualified_name,
2063            Some(qname),
2064            "smaller-index node wins the same-path / same-span tie-break"
2065        );
2066        let loser_after = graph.nodes().get(second_node).expect("loser inert");
2067        assert_eq!(
2068            loser_after.qualified_name, None,
2069            "larger-index node loses the same-path / same-span tie-break"
2070        );
2071    }
2072
2073    /// Drive the free [`rebuild_indices`] function against both a
2074    /// `RebuildGraph` and a `CodeGraph` seeded with identical data,
2075    /// and verify the resulting `AuxiliaryIndices` are structurally
2076    /// equivalent (same name buckets, same kind buckets).
2077    #[test]
2078    #[cfg(feature = "rebuild-internals")]
2079    fn rebuild_indices_runs_against_rebuild_graph() {
2080        use crate::graph::unified::concurrent::CodeGraph;
2081        use crate::graph::unified::mutation_target::GraphMutationTarget;
2082        use crate::graph::unified::node::NodeKind;
2083
2084        // === CodeGraph baseline ===
2085        let mut code_graph = CodeGraph::new();
2086        let alpha_id_code = code_graph.strings_mut().intern("alpha").unwrap();
2087        let mut code_entry = NodeEntry::new(NodeKind::Function, alpha_id_code, FileId::new(1));
2088        code_entry.qualified_name = Some(alpha_id_code);
2089        let code_node_id = code_graph.nodes_mut().alloc(code_entry).unwrap();
2090        rebuild_indices(&mut code_graph);
2091        let code_buckets_function: Vec<NodeId> =
2092            code_graph.indices().by_kind(NodeKind::Function).to_vec();
2093
2094        // === RebuildGraph path ===
2095        let mut rebuild = {
2096            let graph = CodeGraph::new();
2097            graph.clone_for_rebuild()
2098        };
2099        let alpha_id_rebuild = rebuild.strings_mut().intern("alpha").unwrap();
2100        let mut rebuild_entry =
2101            NodeEntry::new(NodeKind::Function, alpha_id_rebuild, FileId::new(1));
2102        rebuild_entry.qualified_name = Some(alpha_id_rebuild);
2103        let rebuild_node_id = rebuild.nodes_mut().alloc(rebuild_entry).unwrap();
2104        rebuild_indices(&mut rebuild);
2105
2106        // The node ids are both the first allocation on their
2107        // respective arenas, so they share slot indices and
2108        // generations.
2109        assert_eq!(code_node_id, rebuild_node_id);
2110
2111        // The trait-method accessor routes through the impl on
2112        // `RebuildGraph`; the returned indices came from the
2113        // rebuild-local `AuxiliaryIndices` (not a CodeGraph's).
2114        let rebuild_buckets_function: Vec<NodeId> = GraphMutationTarget::indices(&rebuild)
2115            .by_kind(NodeKind::Function)
2116            .to_vec();
2117
2118        assert_eq!(
2119            code_buckets_function, rebuild_buckets_function,
2120            "rebuild_indices must produce equivalent Function buckets on both paths"
2121        );
2122        // Name bucket also present on the rebuild side.
2123        let by_name: Vec<NodeId> = GraphMutationTarget::indices(&rebuild)
2124            .by_name(alpha_id_rebuild)
2125            .to_vec();
2126        assert_eq!(by_name, vec![rebuild_node_id]);
2127    }
2128
2129    /// Drive [`phase4d_bulk_insert_edges`] against a `RebuildGraph`.
2130    /// Seed two nodes, construct a per-file `PendingEdge` vector, and
2131    /// prove the edges land on the rebuild-local edge store with the
2132    /// expected monotonically-advancing sequence counter.
2133    #[test]
2134    #[cfg(feature = "rebuild-internals")]
2135    fn phase4d_bulk_insert_edges_runs_against_rebuild_graph() {
2136        use crate::graph::unified::concurrent::CodeGraph;
2137        use crate::graph::unified::mutation_target::GraphMutationTarget;
2138        use crate::graph::unified::node::NodeKind;
2139
2140        let mut rebuild = {
2141            let graph = CodeGraph::new();
2142            graph.clone_for_rebuild()
2143        };
2144
2145        let name_sid = rebuild.strings_mut().intern("edge_target").unwrap();
2146        let file = FileId::new(3);
2147
2148        let n_source = rebuild
2149            .nodes_mut()
2150            .alloc(NodeEntry::new(NodeKind::Function, name_sid, file))
2151            .unwrap();
2152        let n_target = rebuild
2153            .nodes_mut()
2154            .alloc(NodeEntry::new(NodeKind::Variable, name_sid, file))
2155            .unwrap();
2156
2157        // Pre-condition: no edges in the rebuild-local forward store.
2158        let pre_counter = GraphMutationTarget::edges(&rebuild).forward().seq_counter();
2159
2160        let per_file_edges = vec![vec![
2161            PendingEdge {
2162                source: n_source,
2163                target: n_target,
2164                kind: EdgeKind::Calls {
2165                    argument_count: 0,
2166                    is_async: false,
2167                },
2168                file,
2169                spans: vec![],
2170            },
2171            PendingEdge {
2172                source: n_source,
2173                target: n_target,
2174                kind: EdgeKind::Calls {
2175                    argument_count: 1,
2176                    is_async: false,
2177                },
2178                file,
2179                spans: vec![],
2180            },
2181        ]];
2182
2183        let final_seq = phase4d_bulk_insert_edges(&mut rebuild, &per_file_edges);
2184
2185        // Seq counter advanced by exactly two edges.
2186        assert_eq!(
2187            final_seq,
2188            pre_counter + 2,
2189            "phase4d_bulk_insert_edges must advance seq by edge count"
2190        );
2191
2192        // Rebuild-local forward store now contains both edges.
2193        let forward = GraphMutationTarget::edges(&rebuild).forward();
2194        let after_counter = forward.seq_counter();
2195        assert_eq!(after_counter, pre_counter + 2);
2196        // Forward delta must carry the two new edges.
2197        assert!(
2198            forward.delta().iter().filter(|e| e.is_add()).count() >= 2,
2199            "expected at least two Add edges in the rebuild-local forward delta"
2200        );
2201        drop(forward);
2202
2203        // Empty input is a no-op on the edge store.
2204        let empty_final = phase4d_bulk_insert_edges(&mut rebuild, &[]);
2205        assert_eq!(empty_final, pre_counter + 2, "empty input is a no-op");
2206    }
2207
2208    /// `C_EDGE_MIGRATE` regression: when a Cluster C plugin migrates a
2209    /// `TypeOf{Field}` edge's source from a struct node to the per-field
2210    /// `Property` node, Phase 4d must NOT collapse the new shape onto
2211    /// any sibling edge. Both Property-sourced and struct-sourced
2212    /// edges - including a struct-sourced edge over the same target /
2213    /// kind tuple - must round-trip into the bulk-insert path with
2214    /// distinct `(source, target)` identities and stable seq ordering.
2215    ///
2216    /// This locks the property the
2217    /// `phase4d_bulk_insert_edges` doc-comment promises to plugin
2218    /// authors: per-file `PendingEdge` order is preserved 1:1 by
2219    /// `pending_edges_to_delta`, and no `(source, target, kind)` dedup
2220    /// fires inside Phase 4d. Without this guarantee the migration
2221    /// would silently drop the new Property-sourced edges whenever an
2222    /// older legacy snapshot mixed both shapes during a partial
2223    /// rebuild.
2224    #[test]
2225    fn phase4d_preserves_property_sourced_typeof_field_edges() {
2226        use crate::graph::unified::edge::kind::TypeOfContext;
2227
2228        // Synthetic NodeIds standing in for `main.SelectorSource` (struct),
2229        // `main.SelectorSource.NeedTags` (Property), and `bool` (target type).
2230        let struct_id = NodeId::new(10, 1);
2231        let property_id = NodeId::new(11, 1);
2232        let bool_id = NodeId::new(12, 1);
2233
2234        let typeof_field_kind = EdgeKind::TypeOf {
2235            context: Some(TypeOfContext::Field),
2236            index: Some(0),
2237            name: None,
2238        };
2239
2240        // Two PendingEdges over the same (target, kind) discriminator
2241        // but different sources - the post-migration Property-sourced
2242        // shape and a hypothetical legacy struct-sourced shadow that
2243        // could appear during a partial rebuild. Phase 4d must keep
2244        // both.
2245        let per_file_edges = vec![vec![
2246            PendingEdge {
2247                source: property_id,
2248                target: bool_id,
2249                kind: typeof_field_kind.clone(),
2250                file: FileId::new(0),
2251                spans: vec![],
2252            },
2253            PendingEdge {
2254                source: struct_id,
2255                target: bool_id,
2256                kind: typeof_field_kind.clone(),
2257                file: FileId::new(0),
2258                spans: vec![],
2259            },
2260        ]];
2261
2262        let (deltas, final_seq) = pending_edges_to_delta(&per_file_edges, 500);
2263
2264        // No dedup: both edges land in the per-file delta vector with
2265        // distinct seq numbers, in input order.
2266        assert_eq!(deltas.len(), 1);
2267        assert_eq!(deltas[0].len(), 2);
2268        assert_eq!(final_seq, 502);
2269
2270        assert_eq!(deltas[0][0].source, property_id);
2271        assert_eq!(deltas[0][0].target, bool_id);
2272        assert_eq!(deltas[0][0].seq, 500);
2273        assert!(matches!(
2274            deltas[0][0].kind,
2275            EdgeKind::TypeOf {
2276                context: Some(TypeOfContext::Field),
2277                ..
2278            }
2279        ));
2280
2281        assert_eq!(deltas[0][1].source, struct_id);
2282        assert_eq!(deltas[0][1].target, bool_id);
2283        assert_eq!(deltas[0][1].seq, 501);
2284
2285        // Determinism re-check: re-running the conversion against the
2286        // same input produces an identical DeltaEdge sequence (same
2287        // sources, same targets, same kinds, same seq numbers when
2288        // re-anchored to the same `seq_start`). This is the property
2289        // the SnapshotReader → SnapshotWriter byte-identity round-trip
2290        // assertion relies on for fresh-rebuild reproducibility.
2291        let (deltas_again, final_seq_again) = pending_edges_to_delta(&per_file_edges, 500);
2292        assert_eq!(final_seq_again, final_seq);
2293        assert_eq!(deltas_again.len(), deltas.len());
2294        assert_eq!(deltas_again[0].len(), deltas[0].len());
2295        for (a, b) in deltas[0].iter().zip(deltas_again[0].iter()) {
2296            assert_eq!(a.source, b.source);
2297            assert_eq!(a.target, b.target);
2298            assert_eq!(a.seq, b.seq);
2299        }
2300    }
2301}