Skip to main content

sqry_core/graph/unified/build/
parallel_commit.rs

1//! Parallel commit pipeline for pre-allocated ID ranges.
2//!
3//! Replaces the serial commit loop with a four-phase pipeline:
4//! Phase 2: Count + range assignment via prefix sums
5//! Phase 3: Parallel commit into disjoint pre-allocated ranges
6//! Phase 4: String dedup, remap, index build, edge bulk insert
7//!
8//! # Phase 3 Architecture
9//!
10//! Phase 3 uses `split_at_mut` to carve disjoint sub-slices from pre-allocated
11//! arena and interner ranges, then uses `rayon` to commit each file's staging
12//! graph in parallel without locks:
13//!
14//! ```text
15//! NodeArena slots:   [   file0   |   file1   |   file2   ]
16//! StringInterner:    [   file0   |   file1   |   file2   ]
17//!                         ↑            ↑            ↑
18//!                    split_at_mut  split_at_mut  remainder
19//! ```
20//!
21//! Each file's `commit_single_file` receives its own disjoint slices and
22//! operates independently without contention.
23
24use std::collections::HashMap;
25use std::ops::Range;
26use std::sync::Arc;
27
28use rayon::prelude::*;
29
30use crate::graph::unified::edge::delta::{DeltaEdge, DeltaOp};
31use crate::graph::unified::edge::kind::{EdgeKind, MqProtocol};
32use crate::graph::unified::file::FileId;
33use crate::graph::unified::node::NodeId;
34use crate::graph::unified::storage::NodeArena;
35use crate::graph::unified::storage::arena::{NodeEntry, Slot};
36use crate::graph::unified::string::StringId;
37
38use super::pass3_intra::PendingEdge;
39use super::staging::{StagingGraph, StagingOp};
40
41/// Running offsets carried across chunks for deterministic ID assignment.
42///
43/// Each chunk's ranges begin where the previous chunk ended, ensuring
44/// globally unique, contiguous ID spaces.
45#[derive(Debug, Clone, Default)]
46pub struct GlobalOffsets {
47    /// Next available node slot index.
48    pub node_offset: u32,
49    /// Next available string slot index.
50    pub string_offset: u32,
51}
52
53/// Per-file commit plan with pre-assigned ID ranges.
54#[derive(Debug, Clone)]
55pub struct FilePlan {
56    /// Index into the chunk's `ParsedFile` vec.
57    pub parsed_index: usize,
58    /// Pre-assigned `FileId` from batch registration.
59    pub file_id: FileId,
60    /// Node slot range [start..end) in `NodeArena`.
61    pub node_range: Range<u32>,
62    /// String slot range [start..end) in `StringInterner`.
63    pub string_range: Range<u32>,
64}
65
66/// Plan for parallel commit of a single chunk.
67#[derive(Debug, Clone)]
68pub struct ChunkCommitPlan {
69    /// Per-file plans in deterministic file order.
70    pub file_plans: Vec<FilePlan>,
71    /// Total nodes across all files in this chunk.
72    pub total_nodes: u32,
73    /// Total strings across all files in this chunk.
74    pub total_strings: u32,
75    /// Total edges across all files in this chunk.
76    pub total_edges: u64,
77}
78
79/// Compute commit plan from parsed files using prefix-sum range assignment.
80///
81/// Each file gets contiguous, non-overlapping ranges for nodes and strings.
82/// Ranges start from the given global offsets, which carry forward across
83/// chunks.
84///
85/// # Arguments
86///
87/// * `node_counts` - Per-file node counts (from `StagingGraph::node_count_u32()`)
88/// * `string_counts` - Per-file string counts
89/// * `edge_counts` - Per-file edge counts (used for `total_edges` only)
90/// * `file_ids` - Pre-assigned `FileId`s from batch registration
91/// * `node_offset` - Running global node offset across chunks
92/// * `string_offset` - Running global string offset across chunks
93///
94/// # Panics
95///
96/// Panics in debug builds if the per-chunk accounting arrays do not have
97/// identical lengths.
98#[must_use]
99pub fn compute_commit_plan(
100    node_counts: &[u32],
101    string_counts: &[u32],
102    edge_counts: &[u32],
103    file_ids: &[FileId],
104    node_offset: u32,
105    string_offset: u32,
106) -> ChunkCommitPlan {
107    debug_assert_eq!(node_counts.len(), string_counts.len());
108    debug_assert_eq!(node_counts.len(), edge_counts.len());
109    debug_assert_eq!(node_counts.len(), file_ids.len());
110
111    let mut plans = Vec::with_capacity(node_counts.len());
112    let mut node_cursor = node_offset;
113    let mut string_cursor = string_offset;
114    let mut total_edges: u64 = 0;
115
116    for i in 0..node_counts.len() {
117        let nc = node_counts[i];
118        let sc = string_counts[i];
119
120        let node_end = node_cursor
121            .checked_add(nc)
122            .expect("node ID space overflow in commit plan");
123        let string_end = string_cursor
124            .checked_add(sc)
125            .expect("string ID space overflow in commit plan");
126
127        plans.push(FilePlan {
128            parsed_index: i,
129            file_id: file_ids[i],
130            node_range: node_cursor..node_end,
131            string_range: string_cursor..string_end,
132        });
133
134        node_cursor = node_end;
135        string_cursor = string_end;
136        total_edges += u64::from(edge_counts[i]);
137    }
138
139    ChunkCommitPlan {
140        file_plans: plans,
141        total_nodes: node_cursor - node_offset,
142        total_strings: string_cursor - string_offset,
143        total_edges,
144    }
145}
146
147/// Execute Phase 2: count + range assignment for a parsed chunk.
148///
149/// Extracts per-file counts from staging graphs and delegates to
150/// [`compute_commit_plan`] for prefix-sum range assignment.
151#[must_use]
152pub fn phase2_assign_ranges(
153    staging_graphs: &[&StagingGraph],
154    file_ids: &[FileId],
155    offsets: &GlobalOffsets,
156) -> ChunkCommitPlan {
157    let node_counts: Vec<u32> = staging_graphs
158        .iter()
159        .map(|sg| sg.node_count_u32())
160        .collect();
161    let string_counts: Vec<u32> = staging_graphs
162        .iter()
163        .map(|sg| sg.string_count_u32())
164        .collect();
165    let edge_counts: Vec<u32> = staging_graphs
166        .iter()
167        .map(|sg| sg.edge_count_u32())
168        .collect();
169
170    compute_commit_plan(
171        &node_counts,
172        &string_counts,
173        &edge_counts,
174        file_ids,
175        offsets.node_offset,
176        offsets.string_offset,
177    )
178}
179
180/// Phase 3 result: per-file edges, per-file node IDs, and total written
181/// counts for validation.
182pub struct Phase3Result {
183    /// Per-file edge collections for Phase 4 bulk insert.
184    pub per_file_edges: Vec<Vec<PendingEdge>>,
185    /// Per-file node IDs actually committed. Indexed identically to
186    /// `per_file_edges` — element `i` is the Vec of NodeIds committed
187    /// for `plan.file_plans[i]`. Empty Vec when that file wrote no
188    /// nodes (slot overflow skip, or a staging graph with only strings).
189    ///
190    /// Used by the caller to populate
191    /// [`crate::graph::unified::storage::registry::FileRegistry::record_node`],
192    /// which feeds the Gate 0c bucket-bijection debug invariant.
193    pub per_file_node_ids: Vec<Vec<NodeId>>,
194    /// Total nodes actually written (for validation against planned totals).
195    pub total_nodes_written: usize,
196    /// Total strings actually written (for validation against planned totals).
197    pub total_strings_written: usize,
198    /// Total edges collected across all files.
199    pub total_edges_collected: usize,
200}
201
202/// Execute Phase 3: parallel commit into disjoint pre-allocated ranges.
203///
204/// Pre-splits arena and interner slices into per-file disjoint sub-slices
205/// using `split_at_mut`, then uses `rayon` `par_iter` for lock-free parallel
206/// writes. Each file's staging graph is committed independently.
207///
208/// Returns [`Phase3Result`] with per-file edges and written counts so the
209/// caller can validate against plan totals and truncate allocations on
210/// mismatch.
211///
212/// # Parameterisation over the mutation target
213///
214/// As of Task 4 Step 4 Phase 1, this function is generic over
215/// `G: GraphMutationTarget`. At the full-build call site in
216/// `build_unified_graph_inner` the target is `CodeGraph`; at the
217/// Task 4 Step 4 Phase 2+ incremental rebuild call site the target
218/// will be `RebuildGraph`. Both impls live in
219/// [`crate::graph::unified::mutation_target`]; see that module's
220/// docs for the field-coverage contract.
221///
222/// The function accesses exactly two fields via the trait —
223/// [`GraphMutationTarget::nodes_and_strings_mut`] — and pre-splits
224/// those two slices for the per-file parallel commit. Every other
225/// piece of the pipeline (the CSR/delta edge store, auxiliary
226/// indices, file registry, etc.) is untouched by this helper: the
227/// `PendingEdge` vectors in the returned [`Phase3Result`] are
228/// threaded through to Phase 4d (`pending_edges_to_delta` +
229/// `BidirectionalEdgeStore::add_edges_bulk_ordered`) by the caller.
230///
231/// # Panics
232///
233/// Panics if `plan.total_nodes` or `plan.total_strings` exceeds the
234/// pre-allocated range in the arena or interner.
235#[must_use]
236pub(crate) fn phase3_parallel_commit<
237    G: crate::graph::unified::mutation_target::GraphMutationTarget,
238>(
239    plan: &ChunkCommitPlan,
240    staging_graphs: &[&StagingGraph],
241    graph: &mut G,
242) -> Phase3Result {
243    if plan.file_plans.is_empty() {
244        return Phase3Result {
245            per_file_edges: Vec::new(),
246            per_file_node_ids: Vec::new(),
247            total_nodes_written: 0,
248            total_strings_written: 0,
249            total_edges_collected: 0,
250        };
251    }
252
253    // Determine the start of the pre-allocated ranges.
254    let node_start = plan.file_plans[0].node_range.start;
255    let string_start = plan.file_plans[0].string_range.start;
256
257    // Borrow the arena and interner disjointly via the mutation-plane
258    // trait. This is the one-and-only field access this helper makes
259    // on the graph; every downstream step operates on the resulting
260    // slices without revisiting `graph`.
261    let (arena, interner) = graph.nodes_and_strings_mut();
262
263    // Get mutable slices covering the entire pre-allocated region.
264    let node_slice = arena.bulk_slice_mut(node_start, plan.total_nodes);
265    let (str_slice, rc_slice) = interner.bulk_slices_mut(string_start, plan.total_strings);
266
267    // Pre-split into per-file disjoint sub-slices using split_at_mut.
268    let mut node_remaining = &mut *node_slice;
269    let mut str_remaining = &mut *str_slice;
270    let mut rc_remaining = &mut *rc_slice;
271
272    #[allow(clippy::type_complexity)]
273    let mut file_work: Vec<(
274        &mut [Slot<NodeEntry>],
275        &mut [Option<Arc<str>>],
276        &mut [u32],
277        &FilePlan,
278        usize,
279    )> = Vec::with_capacity(plan.file_plans.len());
280
281    for (i, file_plan) in plan.file_plans.iter().enumerate() {
282        let nc = (file_plan.node_range.end - file_plan.node_range.start) as usize;
283        let sc = (file_plan.string_range.end - file_plan.string_range.start) as usize;
284
285        let (n, nr) = node_remaining.split_at_mut(nc);
286        let (s, sr) = str_remaining.split_at_mut(sc);
287        let (r, rr) = rc_remaining.split_at_mut(sc);
288
289        file_work.push((n, s, r, file_plan, i));
290        node_remaining = nr;
291        str_remaining = sr;
292        rc_remaining = rr;
293    }
294
295    // Parallel commit — each closure owns disjoint slices, no contention.
296    let results: Vec<FileCommitResult> = file_work
297        .into_par_iter()
298        .map(|(node_slots, str_slots, rc_slots, file_plan, idx)| {
299            commit_single_file(
300                staging_graphs[idx],
301                file_plan,
302                node_slots,
303                str_slots,
304                rc_slots,
305            )
306        })
307        .collect();
308
309    let total_nodes_written: usize = results.iter().map(|r| r.nodes_written).sum();
310    let total_strings_written: usize = results.iter().map(|r| r.strings_written).sum();
311    let total_edges_collected: usize = results.iter().map(|r| r.edges.len()).sum();
312    let mut per_file_edges = Vec::with_capacity(results.len());
313    let mut per_file_node_ids = Vec::with_capacity(results.len());
314    for r in results {
315        per_file_edges.push(r.edges);
316        per_file_node_ids.push(r.node_ids);
317    }
318
319    Phase3Result {
320        per_file_edges,
321        per_file_node_ids,
322        total_nodes_written,
323        total_strings_written,
324        total_edges_collected,
325    }
326}
327
328/// Commit a single file's staging graph into pre-allocated disjoint ranges.
329///
330/// This function operates on slices that belong exclusively to this file,
331/// so it requires no locks or synchronization.
332///
333/// # Steps
334///
335/// 1. **Strings**: Extract `InternString` ops, write `Arc<str>` values into
336///    pre-allocated string slots, build local→global `StringId` remap.
337/// 2. **Nodes**: Extract `AddNode` ops, apply string remap to each `NodeEntry`,
338///    set `file_id`, write into pre-allocated node slots, build expected→actual
339///    `NodeId` remap.
340/// 3. **Edges**: Extract `AddEdge` ops, apply node ID remap to source/target,
341///    assign pre-computed sequence numbers, return as `PendingEdge` vec.
342// Result of committing a single file: edges + committed NodeIds + actual written counts.
343struct FileCommitResult {
344    edges: Vec<PendingEdge>,
345    /// Every `NodeId` committed into the arena for this file, in
346    /// commit order. Used by the sequential post-commit step that
347    /// populates `FileRegistry::per_file_nodes`.
348    node_ids: Vec<NodeId>,
349    nodes_written: usize,
350    strings_written: usize,
351}
352
353fn commit_single_file(
354    staging: &StagingGraph,
355    plan: &FilePlan,
356    node_slots: &mut [Slot<NodeEntry>],
357    str_slots: &mut [Option<Arc<str>>],
358    rc_slots: &mut [u32],
359) -> FileCommitResult {
360    let ops = staging.operations();
361
362    // --- Step 1: Write strings, build local→global remap ---
363    let (string_remap, strings_written) = write_strings(ops, plan, str_slots, rc_slots);
364
365    // --- Step 2: Write nodes, build expected→actual node ID remap ---
366    let (node_remap, nodes_written, node_ids) = write_nodes(ops, plan, node_slots, &string_remap);
367
368    // --- Step 3: Collect remapped edges with pre-assigned sequence numbers ---
369    let edges = collect_edges(ops, plan, &node_remap, &string_remap);
370
371    FileCommitResult {
372        edges,
373        node_ids,
374        nodes_written,
375        strings_written,
376    }
377}
378
379/// Write staged strings into pre-allocated interner slots.
380///
381/// Validates that each `InternString` op has a local `StringId` and that
382/// no duplicate local IDs exist (matching the serial `commit_strings` checks).
383///
384/// Returns `(remap, strings_written)`.
385fn write_strings(
386    ops: &[StagingOp],
387    plan: &FilePlan,
388    str_slots: &mut [Option<Arc<str>>],
389    rc_slots: &mut [u32],
390) -> (HashMap<StringId, StringId>, usize) {
391    let mut remap = HashMap::new();
392    let mut string_cursor = 0usize;
393
394    for op in ops {
395        if let StagingOp::InternString { local_id, value } = op {
396            // Validate: only local IDs are allowed in staging (matching serial commit_strings)
397            assert!(
398                local_id.is_local(),
399                "non-local StringId {:?} in InternString op for file {:?}",
400                local_id,
401                plan.file_id,
402            );
403            // Validate: no duplicate local IDs (matching serial commit_strings)
404            assert!(
405                !remap.contains_key(local_id),
406                "duplicate local StringId {:?} in InternString op for file {:?}",
407                local_id,
408                plan.file_id,
409            );
410
411            if string_cursor >= str_slots.len() {
412                log::warn!(
413                    "string slot overflow in file {:?}: cursor={string_cursor}, slots={}, skipping remaining strings",
414                    plan.file_id,
415                    str_slots.len()
416                );
417                break;
418            }
419
420            // The global StringId for this string is the pre-allocated slot index.
421            #[allow(clippy::cast_possible_truncation)] // cursor is bounded by allocated slot count
422            let global_id = StringId::new(plan.string_range.start + string_cursor as u32);
423
424            // Write the string into the pre-allocated slot.
425            str_slots[string_cursor] = Some(Arc::from(value.as_str()));
426            rc_slots[string_cursor] = 1;
427
428            remap.insert(*local_id, global_id);
429            string_cursor += 1;
430        }
431    }
432
433    (remap, string_cursor)
434}
435
436/// Remap all `StringId` fields in a `NodeEntry` using a local→global table.
437///
438/// Required field (`name`) is always remapped if local.
439/// Optional fields (`signature`, `doc`, `qualified_name`, `visibility`)
440/// are remapped if present and local.
441fn remap_node_entry_string_ids(entry: &mut NodeEntry, remap: &HashMap<StringId, StringId>) {
442    remap_required_local(&mut entry.name, remap);
443    remap_option_local(&mut entry.signature, remap);
444    remap_option_local(&mut entry.doc, remap);
445    remap_option_local(&mut entry.qualified_name, remap);
446    remap_option_local(&mut entry.visibility, remap);
447}
448
449/// Remap all local `StringId` fields in an `EdgeKind`.
450///
451/// Uses the same exhaustive match as `remap_edge_kind_string_ids`, but
452/// only remaps local IDs (those with `LOCAL_TAG_BIT` set).
453#[allow(clippy::match_same_arms)]
454fn remap_edge_kind_local_string_ids(kind: &mut EdgeKind, remap: &HashMap<StringId, StringId>) {
455    match kind {
456        EdgeKind::Imports { alias, .. } => remap_option_local(alias, remap),
457        EdgeKind::Exports { alias, .. } => remap_option_local(alias, remap),
458        EdgeKind::TypeOf { name, .. } => remap_option_local(name, remap),
459        EdgeKind::TraitMethodBinding {
460            trait_name,
461            impl_type,
462            ..
463        } => {
464            remap_required_local(trait_name, remap);
465            remap_required_local(impl_type, remap);
466        }
467        EdgeKind::HttpRequest { url, .. } => remap_option_local(url, remap),
468        EdgeKind::GrpcCall { service, method } => {
469            remap_required_local(service, remap);
470            remap_required_local(method, remap);
471        }
472        EdgeKind::DbQuery { table, .. } => remap_option_local(table, remap),
473        EdgeKind::TableRead { table_name, schema } => {
474            remap_required_local(table_name, remap);
475            remap_option_local(schema, remap);
476        }
477        EdgeKind::TableWrite {
478            table_name, schema, ..
479        } => {
480            remap_required_local(table_name, remap);
481            remap_option_local(schema, remap);
482        }
483        EdgeKind::TriggeredBy {
484            trigger_name,
485            schema,
486        } => {
487            remap_required_local(trigger_name, remap);
488            remap_option_local(schema, remap);
489        }
490        EdgeKind::MessageQueue { protocol, topic } => {
491            if let MqProtocol::Other(s) = protocol {
492                remap_required_local(s, remap);
493            }
494            remap_option_local(topic, remap);
495        }
496        EdgeKind::WebSocket { event } => remap_option_local(event, remap),
497        EdgeKind::GraphQLOperation { operation } => remap_required_local(operation, remap),
498        EdgeKind::ProcessExec { command } => remap_required_local(command, remap),
499        EdgeKind::FileIpc { path_pattern } => remap_option_local(path_pattern, remap),
500        EdgeKind::ProtocolCall { protocol, metadata } => {
501            remap_required_local(protocol, remap);
502            remap_option_local(metadata, remap);
503        }
504        // Variants without StringId fields — exhaustive, no wildcard.
505        EdgeKind::Defines
506        | EdgeKind::Contains
507        | EdgeKind::Calls { .. }
508        | EdgeKind::References
509        | EdgeKind::Inherits
510        | EdgeKind::Implements
511        | EdgeKind::LifetimeConstraint { .. }
512        | EdgeKind::MacroExpansion { .. }
513        | EdgeKind::FfiCall { .. }
514        | EdgeKind::WebAssemblyCall
515        | EdgeKind::GenericBound
516        | EdgeKind::AnnotatedWith
517        | EdgeKind::AnnotationParam
518        | EdgeKind::LambdaCaptures
519        | EdgeKind::ModuleExports
520        | EdgeKind::ModuleRequires
521        | EdgeKind::ModuleOpens
522        | EdgeKind::ModuleProvides
523        | EdgeKind::TypeArgument
524        | EdgeKind::ExtensionReceiver
525        | EdgeKind::CompanionOf
526        | EdgeKind::SealedPermit => {}
527    }
528}
529
530/// Remap a required local `StringId` in place.
531///
532/// Panics if a local ID has no mapping, matching the serial
533/// `apply_string_remap` behavior that returned `UnmappedLocalStringId`.
534fn remap_required_local(id: &mut StringId, remap: &HashMap<StringId, StringId>) {
535    if id.is_local() {
536        let global = remap.get(id).unwrap_or_else(|| {
537            panic!("unmapped local StringId {id:?} — missing intern_string op?")
538        });
539        *id = *global;
540    }
541}
542
543/// Remap an optional local `StringId` in place.
544fn remap_option_local(opt: &mut Option<StringId>, remap: &HashMap<StringId, StringId>) {
545    if let Some(id) = opt
546        && id.is_local()
547    {
548        let global = remap.get(id).unwrap_or_else(|| {
549            panic!("unmapped local StringId {id:?} — missing intern_string op?")
550        });
551        *id = *global;
552    }
553}
554
555/// Write staged nodes into pre-allocated arena slots.
556///
557/// Returns `(remap, nodes_written, node_ids)`. `node_ids` is the Vec of
558/// every `NodeId` committed for this file, in commit order, for use by
559/// the sequential bucket-population post-step.
560fn write_nodes(
561    ops: &[StagingOp],
562    plan: &FilePlan,
563    node_slots: &mut [Slot<NodeEntry>],
564    string_remap: &HashMap<StringId, StringId>,
565) -> (HashMap<NodeId, NodeId>, usize, Vec<NodeId>) {
566    let mut node_remap = HashMap::new();
567    let mut node_cursor = 0usize;
568    let mut node_ids: Vec<NodeId> = Vec::with_capacity(node_slots.len());
569
570    for op in ops {
571        if let StagingOp::AddNode {
572            entry, expected_id, ..
573        } = op
574        {
575            if node_cursor >= node_slots.len() {
576                log::warn!(
577                    "node slot overflow in file {:?}: cursor={node_cursor}, slots={}, skipping remaining nodes",
578                    plan.file_id,
579                    node_slots.len()
580                );
581                break;
582            }
583
584            let mut entry = entry.clone();
585
586            // Apply string remap to all StringId fields in the entry.
587            remap_node_entry_string_ids(&mut entry, string_remap);
588
589            // Set the file ID from the plan.
590            entry.file = plan.file_id;
591
592            // The actual NodeId is the pre-allocated slot index with generation 1.
593            #[allow(clippy::cast_possible_truncation)] // cursor is bounded by allocated slot count
594            let actual_index = plan.node_range.start + node_cursor as u32;
595            let actual_id = NodeId::new(actual_index, 1);
596
597            // Write into the pre-allocated slot.
598            node_slots[node_cursor] = Slot::new_occupied(1, entry);
599
600            if let Some(expected) = expected_id {
601                node_remap.insert(*expected, actual_id);
602            }
603
604            node_ids.push(actual_id);
605            node_cursor += 1;
606        }
607    }
608
609    (node_remap, node_cursor, node_ids)
610}
611
612/// Collect staged edges with remapped node IDs, string IDs, and pre-assigned
613/// sequence numbers.
614fn collect_edges(
615    ops: &[StagingOp],
616    plan: &FilePlan,
617    node_remap: &HashMap<NodeId, NodeId>,
618    string_remap: &HashMap<StringId, StringId>,
619) -> Vec<PendingEdge> {
620    let mut edges = Vec::new();
621
622    for op in ops {
623        if let StagingOp::AddEdge {
624            source,
625            target,
626            kind,
627            spans,
628            ..
629        } = op
630        {
631            let actual_source = node_remap.get(source).copied().unwrap_or(*source);
632            let actual_target = node_remap.get(target).copied().unwrap_or(*target);
633
634            // Clone and remap any local StringIds in the EdgeKind.
635            let mut remapped_kind = kind.clone();
636            remap_edge_kind_local_string_ids(&mut remapped_kind, string_remap);
637
638            edges.push(PendingEdge {
639                source: actual_source,
640                target: actual_target,
641                kind: remapped_kind,
642                file: plan.file_id,
643                spans: spans.clone(),
644            });
645        }
646    }
647
648    edges
649}
650
651/// Remap a required `StringId` using the dedup remap table.
652///
653/// If the ID is in the remap table, it is replaced with the canonical ID.
654/// Otherwise, it is left unchanged (identity mapping).
655#[allow(clippy::implicit_hasher)]
656pub fn remap_string_id(id: &mut StringId, remap: &HashMap<StringId, StringId>) {
657    if let Some(&canonical) = remap.get(id) {
658        *id = canonical;
659    }
660}
661
662/// Remap an optional `StringId` using the dedup remap table.
663#[allow(clippy::implicit_hasher)]
664pub fn remap_option_string_id(id: &mut Option<StringId>, remap: &HashMap<StringId, StringId>) {
665    if let Some(inner) = id {
666        remap_string_id(inner, remap);
667    }
668}
669
670/// Exhaustive remap of all `StringId` fields in an `EdgeKind`.
671///
672/// No wildcard arm — the compiler ensures completeness when new variants
673/// are added to `EdgeKind`.
674#[allow(clippy::match_same_arms, clippy::implicit_hasher)] // Arms are separated by category for documentation clarity
675pub fn remap_edge_kind_string_ids(kind: &mut EdgeKind, remap: &HashMap<StringId, StringId>) {
676    match kind {
677        // === Variants WITH StringId fields ===
678        EdgeKind::Imports { alias, .. } => remap_option_string_id(alias, remap),
679        EdgeKind::Exports { alias, .. } => remap_option_string_id(alias, remap),
680        EdgeKind::TypeOf { name, .. } => remap_option_string_id(name, remap),
681        EdgeKind::TraitMethodBinding {
682            trait_name,
683            impl_type,
684            ..
685        } => {
686            remap_string_id(trait_name, remap);
687            remap_string_id(impl_type, remap);
688        }
689        EdgeKind::HttpRequest { url, .. } => remap_option_string_id(url, remap),
690        EdgeKind::GrpcCall { service, method } => {
691            remap_string_id(service, remap);
692            remap_string_id(method, remap);
693        }
694        EdgeKind::DbQuery { table, .. } => remap_option_string_id(table, remap),
695        EdgeKind::TableRead { table_name, schema } => {
696            remap_string_id(table_name, remap);
697            remap_option_string_id(schema, remap);
698        }
699        EdgeKind::TableWrite {
700            table_name, schema, ..
701        } => {
702            remap_string_id(table_name, remap);
703            remap_option_string_id(schema, remap);
704        }
705        EdgeKind::TriggeredBy {
706            trigger_name,
707            schema,
708        } => {
709            remap_string_id(trigger_name, remap);
710            remap_option_string_id(schema, remap);
711        }
712        EdgeKind::MessageQueue { protocol, topic } => {
713            if let MqProtocol::Other(s) = protocol {
714                remap_string_id(s, remap);
715            }
716            remap_option_string_id(topic, remap);
717        }
718        EdgeKind::WebSocket { event } => remap_option_string_id(event, remap),
719        EdgeKind::GraphQLOperation { operation } => remap_string_id(operation, remap),
720        EdgeKind::ProcessExec { command } => remap_string_id(command, remap),
721        EdgeKind::FileIpc { path_pattern } => remap_option_string_id(path_pattern, remap),
722        EdgeKind::ProtocolCall { protocol, metadata } => {
723            remap_string_id(protocol, remap);
724            remap_option_string_id(metadata, remap);
725        }
726        // === Variants WITHOUT StringId fields — exhaustive, no wildcard ===
727        EdgeKind::Defines
728        | EdgeKind::Contains
729        | EdgeKind::Calls { .. }
730        | EdgeKind::References
731        | EdgeKind::Inherits
732        | EdgeKind::Implements
733        | EdgeKind::LifetimeConstraint { .. }
734        | EdgeKind::MacroExpansion { .. }
735        | EdgeKind::FfiCall { .. }
736        | EdgeKind::WebAssemblyCall
737        | EdgeKind::GenericBound
738        | EdgeKind::AnnotatedWith
739        | EdgeKind::AnnotationParam
740        | EdgeKind::LambdaCaptures
741        | EdgeKind::ModuleExports
742        | EdgeKind::ModuleRequires
743        | EdgeKind::ModuleOpens
744        | EdgeKind::ModuleProvides
745        | EdgeKind::TypeArgument
746        | EdgeKind::ExtensionReceiver
747        | EdgeKind::CompanionOf
748        | EdgeKind::SealedPermit => {}
749    }
750}
751
752// === Phase 4: Post-chunk Finalization ===
753
754/// Apply global string dedup remap to all `StringId` fields in a `NodeEntry`.
755///
756/// This is the Phase 4 counterpart to `remap_node_entry_string_ids` (Phase 3).
757/// Phase 3 remaps local→global; Phase 4 remaps duplicate global→canonical global.
758#[allow(clippy::implicit_hasher)]
759pub fn remap_node_entry_global(entry: &mut NodeEntry, remap: &HashMap<StringId, StringId>) {
760    remap_string_id(&mut entry.name, remap);
761    remap_option_string_id(&mut entry.signature, remap);
762    remap_option_string_id(&mut entry.doc, remap);
763    remap_option_string_id(&mut entry.qualified_name, remap);
764    remap_option_string_id(&mut entry.visibility, remap);
765}
766
767/// Apply global string dedup remap to all nodes in the arena and all pending edges.
768///
769/// This is Phase 4b of the parallel commit pipeline. After `build_dedup_table()`
770/// produces a remap table, this function applies it to every `StringId` in:
771/// - All `NodeEntry` fields in the arena
772/// - All `EdgeKind` fields in the pending edges
773#[allow(clippy::implicit_hasher)]
774pub fn phase4_apply_global_remap(
775    arena: &mut NodeArena,
776    all_edges: &mut [Vec<PendingEdge>],
777    remap: &HashMap<StringId, StringId>,
778) {
779    if remap.is_empty() {
780        return;
781    }
782
783    // Remap all nodes
784    for (_id, entry) in arena.iter_mut() {
785        remap_node_entry_global(entry, remap);
786    }
787
788    // Remap all edges
789    for file_edges in all_edges.iter_mut() {
790        for edge in file_edges.iter_mut() {
791            remap_edge_kind_string_ids(&mut edge.kind, remap);
792        }
793    }
794}
795
796/// Statistics from Phase 4c-prime cross-file node unification.
797#[derive(Debug, Default)]
798pub struct UnificationStats {
799    /// Total (qualified_name, kind) groups of size >= 2 examined.
800    pub candidate_pairs_examined: usize,
801    /// Number of loser nodes merged into winners.
802    pub nodes_merged: usize,
803    /// Number of PendingEdge fields rewritten.
804    pub edges_rewritten: usize,
805    /// Number of loser nodes (metadata merged into winners, slot kept inert).
806    pub nodes_inert: usize,
807    /// Time spent in the unification pass (milliseconds).
808    pub elapsed_ms: u64,
809}
810
811/// Phase 4c-prime: Unify cross-file duplicate nodes sharing the same
812/// canonical qualified name and a call-compatible kind.
813///
814/// Runs after `rebuild_indices` (Phase 4c) which populates `by_qualified_name`,
815/// and before `pending_edges_to_delta` (Phase 4d) so the remap operates on
816/// `PendingEdge` targets, not committed `DeltaEdge`s.
817///
818/// **Winner selection**: Among nodes sharing a qualified name and call-compatible
819/// kinds, the node with `start_line > 0` wins. Tie-break in order:
820///   1. Wider `end_line - start_line` span.
821///   2. **Lexicographically smallest file path** (resolved via the rebuild
822///      plane's [`FileRegistry`]). Phase 3e correctness requires the
823///      path-based tie-break rather than the previous `FileId` comparison,
824///      because `FileId` slot assignment differs between a fresh full
825///      rebuild and an incremental rebuild — the incremental path clones
826///      the existing `FileRegistry` and appends new paths, while the full
827///      path assigns FileIds in filesystem-walk order from an empty
828///      registry. Two builds of the same logical workspace therefore
829///      disagree on which `FileId` is smaller when duplicate definitions
830///      tie on span width, flipping the unification winner and stranding
831///      `qualified_name` on the wrong side of the merge. Tie-breaking on
832///      the (stable-across-builds) path makes winner selection
833///      representation-independent.
834///   3. Final fallback: smaller `NodeId::index()` when paths also tie
835///      (e.g. two definitions in the same file — rare but possible via
836///      duplicate declarations). `NodeId` is deterministic within a
837///      single build so this keeps the fallback stable for any individual
838///      build even if it isn't invariant across representations.
839///
840/// **Safety**: Caller must hold an exclusive write lock on the graph.
841pub(crate) fn phase4c_prime_unify_cross_file_nodes<
842    G: crate::graph::unified::mutation_target::GraphMutationTarget,
843>(
844    graph: &mut G,
845    all_edges: &mut [Vec<PendingEdge>],
846) -> UnificationStats {
847    use crate::graph::unified::mutation_target::GraphMutationTarget;
848
849    use super::helper::CALL_COMPATIBLE_KINDS;
850    use super::unification::{NodeRemapTable, merge_node_into};
851    use std::time::Instant;
852
853    let start = Instant::now();
854    let mut stats = UnificationStats::default();
855
856    // Collect candidates: walk arena, group by qualified_name for nodes
857    // with call-compatible kinds. Only groups of size >= 2 need unification.
858    let mut qn_groups: HashMap<crate::graph::unified::string::StringId, Vec<NodeId>> =
859        HashMap::new();
860
861    for (node_id, entry) in GraphMutationTarget::nodes(graph).iter() {
862        if !CALL_COMPATIBLE_KINDS.contains(&entry.kind) {
863            continue;
864        }
865        if let Some(qn_id) = entry.qualified_name {
866            qn_groups.entry(qn_id).or_default().push(node_id);
867        }
868    }
869
870    // Filter to groups with 2+ members
871    let groups_to_unify: Vec<Vec<NodeId>> = qn_groups
872        .into_values()
873        .filter(|group| {
874            if group.len() >= 2 {
875                stats.candidate_pairs_examined += 1;
876                true
877            } else {
878                false
879            }
880        })
881        .collect();
882
883    // Now perform merges
884    let mut remap = NodeRemapTable::with_capacity(groups_to_unify.len());
885
886    // Pre-resolve every candidate node's canonical path-based tie-break
887    // key into an owned `String` keyed by `NodeId`. Lifting the resolution
888    // here instead of inside the `max_by` comparator avoids re-borrowing
889    // `graph` immutably from a closure that lives across the
890    // `merge_node_into(&mut graph, …)` call below. Without this
891    // precomputation the borrow checker rejects the mutation loop because
892    // the comparator closure captures the immutable borrow of `graph`
893    // required by `path_key`.
894    //
895    // Path conversion goes through `Arc<Path>::to_string_lossy()` because
896    // `Path` does not implement `Ord` lexicographically across platforms
897    // consistently; forcing a canonical string form keeps the tie-break
898    // deterministic on any host filesystem. When the registry can't
899    // resolve a `FileId` (shouldn't happen in practice — every live
900    // node's `FileId` was registered before the node was allocated) we
901    // fall back to an empty string so the comparison still produces a
902    // total order. Empty resolves tie-break each other stably (then fall
903    // through to the `NodeId` index tie-break).
904    let path_keys: HashMap<NodeId, String> = {
905        let arena = GraphMutationTarget::nodes(graph);
906        let files = GraphMutationTarget::files(graph);
907        let mut out: HashMap<NodeId, String> =
908            HashMap::with_capacity(groups_to_unify.iter().map(Vec::len).sum());
909        for group in &groups_to_unify {
910            for &nid in group {
911                if out.contains_key(&nid) {
912                    continue;
913                }
914                let key = arena
915                    .get(nid)
916                    .and_then(|entry| files.resolve(entry.file))
917                    .map_or_else(String::new, |path| path.to_string_lossy().into_owned());
918                out.insert(nid, key);
919            }
920        }
921        out
922    };
923    let empty_path_key = String::new();
924
925    for group in &groups_to_unify {
926        // Pick winner: prefer start_line > 0, tie-break by wider span,
927        // then smaller path (stable across rebuild representations),
928        // then smaller NodeId index.
929        let winner_id = *group
930            .iter()
931            .max_by(|&&a, &&b| {
932                let ea = GraphMutationTarget::nodes(graph).get(a);
933                let eb = GraphMutationTarget::nodes(graph).get(b);
934                match (ea, eb) {
935                    (Some(ea), Some(eb)) => {
936                        // Primary: prefer non-zero start_line
937                        let a_real = ea.start_line > 0;
938                        let b_real = eb.start_line > 0;
939                        match (a_real, b_real) {
940                            (true, false) => std::cmp::Ordering::Greater,
941                            (false, true) => std::cmp::Ordering::Less,
942                            _ => {
943                                // Tie-break 1: prefer wider span
944                                let a_range = ea.end_line.saturating_sub(ea.start_line);
945                                let b_range = eb.end_line.saturating_sub(eb.start_line);
946                                a_range
947                                    .cmp(&b_range)
948                                    .then_with(|| {
949                                        // Tie-break 2: prefer smaller path
950                                        // (reversed because `max_by` picks the
951                                        // greater side — we want smaller path
952                                        // to win, so invert the direct compare).
953                                        let pa = path_keys.get(&a).unwrap_or(&empty_path_key);
954                                        let pb = path_keys.get(&b).unwrap_or(&empty_path_key);
955                                        pb.cmp(pa)
956                                    })
957                                    .then_with(|| {
958                                        // Tie-break 3: smaller NodeId index
959                                        // (stable within a single build;
960                                        // deterministic fallback for co-located
961                                        // duplicate definitions).
962                                        b.index().cmp(&a.index())
963                                    })
964                            }
965                        }
966                    }
967                    (Some(_), None) => std::cmp::Ordering::Greater,
968                    (None, Some(_)) => std::cmp::Ordering::Less,
969                    (None, None) => std::cmp::Ordering::Equal,
970                }
971            })
972            .expect("group is non-empty");
973
974        // Merge all losers into winner
975        for &node_id in group {
976            if node_id == winner_id {
977                continue;
978            }
979            match merge_node_into(GraphMutationTarget::nodes_mut(graph), node_id, winner_id) {
980                Ok(()) => {
981                    remap.insert(node_id, winner_id);
982                    stats.nodes_merged += 1;
983                    stats.nodes_inert += 1;
984                }
985                Err(e) => {
986                    log::debug!("Phase 4c-prime: skipping merge ({e})");
987                }
988            }
989        }
990    }
991
992    // Apply remap table to all pending edges AND to every committed
993    // edge already in the graph's edge store.
994    //
995    // The `apply_to_edges` call keeps PendingEdges (the output of this
996    // chunk's parallel commit) pointing at canonical winners before
997    // Phase 4d converts them into `DeltaEdge`s. On a full build that is
998    // sufficient — no committed edges exist yet.
999    //
1000    // The `apply_to_committed_edges` call closes the Phase 3e incremental
1001    // gap: the rebuild plane clones the pre-edit graph's committed edges
1002    // via `clone_for_rebuild`, so a newly-reparsed file whose definition
1003    // becomes the unification winner can leave surviving cross-file
1004    // edges pointing at what is now an inert loser slot. Retargeting the
1005    // committed edges through `remap` is the only way those edges
1006    // observe the canonical winner after finalize. On a full build the
1007    // second call is a no-op (edge store is empty).
1008    if !remap.is_empty() {
1009        let pre_count: usize = all_edges.iter().map(|v| v.len()).sum();
1010        remap.apply_to_edges(all_edges);
1011        remap.apply_to_committed_edges(GraphMutationTarget::edges(graph));
1012        stats.edges_rewritten = pre_count; // conservative: all edges walked
1013
1014        // Keep FileRegistry::per_file_nodes consistent with the arena.
1015        //
1016        // [`merge_node_into`] (see `unification.rs`) intentionally does
1017        // **not** vacate the loser slot — the slot stays `Occupied` but
1018        // inert so `NodeArena::slot_count()` (which CSR row_ptr sizing
1019        // depends on) is preserved. Because the slot is still live per
1020        // `NodeArena::iter()`, the §F.1 bucket bijection would panic
1021        // with "live node absent from all buckets" if we purged losers
1022        // from their home bucket.
1023        //
1024        // Therefore: losers stay in whichever per-file bucket Phase 3
1025        // first committed them to. That bucket's `FileId` matches the
1026        // loser's `NodeEntry.file`, so (c) passes. Each loser is in
1027        // exactly one bucket, so (b) passes. Every live arena slot is
1028        // accounted for by some bucket, so (d) passes. The §K master
1029        // matrix already admits this semantics — inert merged-losers
1030        // are semantically equivalent to any other live `NodeArena`
1031        // entry for bucket-membership purposes.
1032        //
1033        // Name-resolution containment (Gate 0d iter-1 blocker).
1034        //
1035        // `merge_node_into` now ALSO clears the loser's `name` and
1036        // `qualified_name` fields (to `StringId::INVALID` / `None`), and
1037        // `AuxiliaryIndices::build_from_arena` skips any arena entry
1038        // whose `name == StringId::INVALID` when rebuilding the name,
1039        // qualified-name, kind, and file buckets. The second
1040        // `rebuild_indices()` call in `build_unified_graph_inner`
1041        // (entrypoint.rs:571, right below this function) runs AFTER
1042        // unification, so the buckets surfaced by `indices.by_name` /
1043        // `by_qualified_name` / `by_kind` / `by_file` contain only
1044        // winners — every public name-resolution surface
1045        // (`resolution::exact_qualified_bucket`,
1046        // `graph::find_by_pattern`, etc.) is therefore free of
1047        // unified-away duplicates. The only publish-visible bucket that
1048        // still references losers is `FileRegistry::per_file_nodes`,
1049        // which preserves the §F.1 bucket bijection without surfacing
1050        // them through name resolution.
1051        //
1052        // Historical note: an earlier iteration of this pass called
1053        // `retain_nodes_in_buckets` to purge losers; that matched a
1054        // stale understanding where `merge_node_into` was expected to
1055        // vacate the slot. Gate 0d's bucket-bijection invariant
1056        // surfaced the mismatch (every full rebuild produced a live
1057        // slot with no bucket entry). The fix is to align with the
1058        // unification contract: inert slots remain in their home
1059        // bucket, but `AuxiliaryIndices` treats them as name-invisible.
1060    }
1061
1062    stats.elapsed_ms = start.elapsed().as_millis() as u64;
1063    stats
1064}
1065
1066/// Convert per-file `PendingEdge` collections to per-file `DeltaEdge` collections
1067/// with monotonically increasing sequence numbers.
1068///
1069/// The sequence numbers are assigned file-by-file, edge-by-edge, starting from
1070/// `seq_start`. This produces the deterministic ordering required by
1071/// `BidirectionalEdgeStore::add_edges_bulk_ordered()`.
1072#[must_use]
1073pub fn pending_edges_to_delta(
1074    per_file_edges: &[Vec<PendingEdge>],
1075    seq_start: u64,
1076) -> (Vec<Vec<DeltaEdge>>, u64) {
1077    let mut seq = seq_start;
1078    let mut result = Vec::with_capacity(per_file_edges.len());
1079
1080    for file_edges in per_file_edges {
1081        let mut delta_vec = Vec::with_capacity(file_edges.len());
1082        for edge in file_edges {
1083            delta_vec.push(DeltaEdge::with_spans(
1084                edge.source,
1085                edge.target,
1086                edge.kind.clone(),
1087                seq,
1088                DeltaOp::Add,
1089                edge.file,
1090                edge.spans.clone(),
1091            ));
1092            seq += 1;
1093        }
1094        result.push(delta_vec);
1095    }
1096
1097    (result, seq)
1098}
1099
1100/// Rebuild the auxiliary indices on `graph` from its current node arena.
1101///
1102/// Generic counterpart to the inherent [`CodeGraph::rebuild_indices`].
1103/// Takes a [`GraphMutationTarget`] so both the full-build
1104/// (`build_unified_graph_inner`) and incremental-rebuild
1105/// (`incremental_rebuild` on `RebuildGraph`) pipelines can share the
1106/// same helper. The inherent method now delegates here so the
1107/// implementation lives in exactly one place.
1108///
1109/// Internally uses [`GraphMutationTarget::nodes_and_indices_mut`] to
1110/// acquire a disjoint `(&NodeArena, &mut AuxiliaryIndices)` pair, then
1111/// hands them to [`AuxiliaryIndices::build_from_arena`] which clears
1112/// the existing indices and rebuilds in a single pass without
1113/// per-element duplicate checking.
1114///
1115/// [`CodeGraph::rebuild_indices`]: crate::graph::unified::concurrent::CodeGraph::rebuild_indices
1116/// [`AuxiliaryIndices::build_from_arena`]: crate::graph::unified::storage::indices::AuxiliaryIndices::build_from_arena
1117pub(crate) fn rebuild_indices<G: crate::graph::unified::mutation_target::GraphMutationTarget>(
1118    graph: &mut G,
1119) {
1120    let (nodes, indices) = graph.nodes_and_indices_mut();
1121    indices.build_from_arena(nodes);
1122}
1123
1124/// Phase 4d — bulk-insert every pending edge into the graph via the
1125/// deterministic `DeltaEdge` conversion path.
1126///
1127/// Wraps the pure [`pending_edges_to_delta`] conversion + the
1128/// [`BidirectionalEdgeStore::add_edges_bulk_ordered`] call that
1129/// `build_unified_graph_inner` ran inline between Phase 4c-prime and
1130/// Phase 4e. The wrapper is generic over [`GraphMutationTarget`] so
1131/// the Task 4 Step 4 Phase 3 `incremental_rebuild` body can call it
1132/// against a [`RebuildGraph`] without duplicating the seq-counter +
1133/// flatten logic.
1134///
1135/// Returns the final edge sequence counter (for callers that need to
1136/// continue allocating deterministic sequence numbers downstream).
1137/// The counter flows from
1138/// [`BidirectionalEdgeStore::forward().seq_counter()`] on the way in
1139/// and advances by one per inserted edge.
1140///
1141/// # Semantics
1142///
1143/// * `per_file_edges` is consumed by-reference; the function does not
1144///   mutate the caller's buffer. Callers who no longer need the
1145///   vectors may drop them after the call.
1146/// * If `per_file_edges` is empty (or every inner vector is empty),
1147///   the edge store is left untouched.
1148/// * The helper does not `bump_epoch()` on the graph — Phase 4d is
1149///   edge-level only; the full pipeline bumps epoch separately.
1150///
1151/// [`BidirectionalEdgeStore::add_edges_bulk_ordered`]: crate::graph::unified::edge::bidirectional::BidirectionalEdgeStore::add_edges_bulk_ordered
1152/// [`RebuildGraph`]: crate::graph::unified::rebuild::rebuild_graph::RebuildGraph
1153pub(crate) fn phase4d_bulk_insert_edges<
1154    G: crate::graph::unified::mutation_target::GraphMutationTarget,
1155>(
1156    graph: &mut G,
1157    per_file_edges: &[Vec<PendingEdge>],
1158) -> u64 {
1159    // Start seq numbering from the edge store's current counter to
1160    // support non-empty graphs (incremental rebuild carries forward
1161    // the prior build's counter).
1162    let edge_seq_start = graph.edges().forward().seq_counter();
1163    let (delta_edge_vecs, final_seq) = pending_edges_to_delta(per_file_edges, edge_seq_start);
1164    let total_edge_count: u64 = delta_edge_vecs.iter().map(|v| v.len() as u64).sum();
1165    if total_edge_count > 0 {
1166        graph
1167            .edges_mut()
1168            .add_edges_bulk_ordered(&delta_edge_vecs, total_edge_count);
1169    }
1170    final_seq
1171}
1172
1173#[cfg(test)]
1174mod tests {
1175    use super::*;
1176
1177    #[test]
1178    fn test_compute_commit_plan_basic() {
1179        let file_ids = vec![FileId::new(0), FileId::new(1), FileId::new(2)];
1180        let node_counts = vec![3, 0, 5];
1181        let string_counts = vec![2, 1, 3];
1182        let edge_counts = vec![4, 0, 6];
1183
1184        let plan = compute_commit_plan(
1185            &node_counts,
1186            &string_counts,
1187            &edge_counts,
1188            &file_ids,
1189            0,
1190            1, // string_offset=1 for sentinel
1191        );
1192
1193        assert_eq!(plan.total_nodes, 8);
1194        assert_eq!(plan.total_strings, 6);
1195        assert_eq!(plan.total_edges, 10);
1196
1197        // File 0: nodes [0..3), strings [1..3)
1198        assert_eq!(plan.file_plans[0].node_range, 0..3);
1199        assert_eq!(plan.file_plans[0].string_range, 1..3);
1200
1201        // File 1: nodes [3..3), strings [3..4) — empty nodes
1202        assert_eq!(plan.file_plans[1].node_range, 3..3);
1203        assert_eq!(plan.file_plans[1].string_range, 3..4);
1204
1205        // File 2: nodes [3..8), strings [4..7)
1206        assert_eq!(plan.file_plans[2].node_range, 3..8);
1207        assert_eq!(plan.file_plans[2].string_range, 4..7);
1208    }
1209
1210    #[test]
1211    fn test_compute_commit_plan_with_offsets() {
1212        let file_ids = vec![FileId::new(5)];
1213        let plan = compute_commit_plan(&[10], &[5], &[7], &file_ids, 100, 50);
1214        assert_eq!(plan.file_plans[0].node_range, 100..110);
1215        assert_eq!(plan.file_plans[0].string_range, 50..55);
1216        assert_eq!(plan.total_nodes, 10);
1217        assert_eq!(plan.total_strings, 5);
1218        assert_eq!(plan.total_edges, 7);
1219    }
1220
1221    #[test]
1222    fn test_compute_commit_plan_empty() {
1223        let plan = compute_commit_plan(&[], &[], &[], &[], 0, 1);
1224        assert_eq!(plan.total_nodes, 0);
1225        assert_eq!(plan.total_strings, 0);
1226        assert_eq!(plan.total_edges, 0);
1227        assert!(plan.file_plans.is_empty());
1228    }
1229
1230    #[test]
1231    fn test_remap_string_id_basic() {
1232        let mut remap = HashMap::new();
1233        remap.insert(StringId::new(1), StringId::new(100));
1234
1235        let mut id = StringId::new(1);
1236        remap_string_id(&mut id, &remap);
1237        assert_eq!(id, StringId::new(100));
1238    }
1239
1240    #[test]
1241    fn test_remap_string_id_not_in_remap() {
1242        let remap = HashMap::new();
1243        let mut id = StringId::new(42);
1244        remap_string_id(&mut id, &remap);
1245        assert_eq!(id, StringId::new(42)); // unchanged
1246    }
1247
1248    #[test]
1249    fn test_remap_option_string_id() {
1250        let mut remap = HashMap::new();
1251        remap.insert(StringId::new(5), StringId::new(50));
1252
1253        let mut some_id = Some(StringId::new(5));
1254        remap_option_string_id(&mut some_id, &remap);
1255        assert_eq!(some_id, Some(StringId::new(50)));
1256
1257        let mut none_id: Option<StringId> = None;
1258        remap_option_string_id(&mut none_id, &remap);
1259        assert_eq!(none_id, None);
1260    }
1261
1262    #[test]
1263    fn test_remap_edge_kind_imports() {
1264        let mut remap = HashMap::new();
1265        remap.insert(StringId::new(1), StringId::new(100));
1266
1267        let mut kind = EdgeKind::Imports {
1268            alias: Some(StringId::new(1)),
1269            is_wildcard: false,
1270        };
1271        remap_edge_kind_string_ids(&mut kind, &remap);
1272        assert!(
1273            matches!(kind, EdgeKind::Imports { alias: Some(id), .. } if id == StringId::new(100))
1274        );
1275    }
1276
1277    #[test]
1278    fn test_remap_edge_kind_trait_method_binding() {
1279        let mut remap = HashMap::new();
1280        remap.insert(StringId::new(1), StringId::new(100));
1281        remap.insert(StringId::new(2), StringId::new(200));
1282
1283        let mut kind = EdgeKind::TraitMethodBinding {
1284            trait_name: StringId::new(1),
1285            impl_type: StringId::new(2),
1286            is_ambiguous: false,
1287        };
1288        remap_edge_kind_string_ids(&mut kind, &remap);
1289        assert!(
1290            matches!(kind, EdgeKind::TraitMethodBinding { trait_name, impl_type, .. }
1291                if trait_name == StringId::new(100) && impl_type == StringId::new(200))
1292        );
1293    }
1294
1295    #[test]
1296    fn test_remap_edge_kind_no_op_variants() {
1297        let remap = HashMap::new();
1298
1299        // Defines — no StringId fields
1300        let mut kind = EdgeKind::Defines;
1301        remap_edge_kind_string_ids(&mut kind, &remap);
1302        assert!(matches!(kind, EdgeKind::Defines));
1303
1304        // Calls — no StringId fields
1305        let mut kind = EdgeKind::Calls {
1306            argument_count: 3,
1307            is_async: true,
1308        };
1309        remap_edge_kind_string_ids(&mut kind, &remap);
1310        assert!(matches!(
1311            kind,
1312            EdgeKind::Calls {
1313                argument_count: 3,
1314                is_async: true,
1315            }
1316        ));
1317    }
1318
1319    fn placeholder_entry() -> NodeEntry {
1320        use crate::graph::unified::node::NodeKind;
1321        NodeEntry::new(NodeKind::Function, StringId::new(0), FileId::new(0))
1322    }
1323
1324    #[test]
1325    fn test_phase2_assign_ranges_basic() {
1326        use super::super::staging::StagingGraph;
1327
1328        // Create 2 staging graphs with known counts
1329        let mut sg0 = StagingGraph::new();
1330        let mut sg1 = StagingGraph::new();
1331
1332        // sg0: 2 nodes, 1 string, 1 edge
1333        let entry0 = placeholder_entry();
1334        let n0 = sg0.add_node(entry0.clone());
1335        let n1 = sg0.add_node(entry0.clone());
1336        sg0.intern_string(StringId::new_local(0), "hello".into());
1337        sg0.add_edge(
1338            n0,
1339            n1,
1340            EdgeKind::Calls {
1341                argument_count: 0,
1342                is_async: false,
1343            },
1344            FileId::new(0),
1345        );
1346
1347        // sg1: 1 node, 2 strings, 0 edges
1348        sg1.add_node(entry0);
1349        sg1.intern_string(StringId::new_local(0), "world".into());
1350        sg1.intern_string(StringId::new_local(1), "foo".into());
1351
1352        let file_ids = vec![FileId::new(10), FileId::new(11)];
1353        let offsets = GlobalOffsets {
1354            node_offset: 5,
1355            string_offset: 3,
1356        };
1357
1358        let plan = phase2_assign_ranges(&[&sg0, &sg1], &file_ids, &offsets);
1359
1360        // sg0: 2 nodes, 1 string, 1 edge
1361        assert_eq!(plan.file_plans[0].node_range, 5..7);
1362        assert_eq!(plan.file_plans[0].string_range, 3..4);
1363
1364        // sg1: 1 node, 2 strings, 0 edges
1365        assert_eq!(plan.file_plans[1].node_range, 7..8);
1366        assert_eq!(plan.file_plans[1].string_range, 4..6);
1367
1368        assert_eq!(plan.total_nodes, 3);
1369        assert_eq!(plan.total_strings, 3);
1370        assert_eq!(plan.total_edges, 1);
1371    }
1372
1373    #[test]
1374    fn test_phase3_parallel_commit_basic() {
1375        use super::super::staging::StagingGraph;
1376        use crate::graph::unified::concurrent::CodeGraph;
1377        use crate::graph::unified::node::NodeKind;
1378        // The `nodes_mut` / `strings_mut` method calls below resolve
1379        // to inherent methods on `CodeGraph`; the `GraphMutationTarget`
1380        // trait impl provides the same surface for `RebuildGraph`
1381        // (see `phase3_parallel_commit_runs_against_rebuild_graph`).
1382        // No trait import is needed here because inherent-method
1383        // resolution wins for `CodeGraph`.
1384
1385        // Create a staging graph with 2 nodes, 1 string, 1 edge
1386        let mut sg = StagingGraph::new();
1387        let local_name = StringId::new_local(0);
1388        sg.intern_string(local_name, "my_func".into());
1389
1390        let entry = NodeEntry::new(NodeKind::Function, local_name, FileId::new(0));
1391        let n0 = sg.add_node(entry.clone());
1392
1393        let entry2 = NodeEntry::new(NodeKind::Variable, local_name, FileId::new(0));
1394        let n1 = sg.add_node(entry2);
1395
1396        sg.add_edge(
1397            n0,
1398            n1,
1399            EdgeKind::Calls {
1400                argument_count: 0,
1401                is_async: false,
1402            },
1403            FileId::new(0),
1404        );
1405
1406        let file_ids = vec![FileId::new(5)];
1407
1408        // Pre-allocate with non-zero offsets to verify remap works,
1409        // against a full `CodeGraph` so the new generic signature is
1410        // exercised end-to-end via `GraphMutationTarget`.
1411        let mut graph = CodeGraph::new();
1412        graph
1413            .nodes_mut()
1414            .alloc_range(10, &placeholder_entry())
1415            .unwrap();
1416        let string_start = graph.strings_mut().alloc_range(1).unwrap();
1417        assert_eq!(string_start, 1); // past sentinel
1418
1419        let offsets = GlobalOffsets {
1420            node_offset: 10, // file's nodes start at index 10
1421            string_offset: string_start,
1422        };
1423        let plan = phase2_assign_ranges(&[&sg], &file_ids, &offsets);
1424        assert_eq!(plan.file_plans[0].node_range, 10..12);
1425
1426        // Pre-allocate the actual ranges for Phase 3.
1427        graph
1428            .nodes_mut()
1429            .alloc_range(plan.total_nodes, &placeholder_entry())
1430            .unwrap();
1431        graph.strings_mut().alloc_range(plan.total_strings).unwrap();
1432
1433        // Phase 3 — generic over `G: GraphMutationTarget`. Passing
1434        // `&mut graph` infers `G = CodeGraph`.
1435        let result = phase3_parallel_commit(&plan, &[&sg], &mut graph);
1436
1437        // Verify written counts
1438        assert_eq!(result.total_nodes_written, 2);
1439        assert_eq!(result.total_strings_written, 1);
1440
1441        // Verify strings were written
1442        let global_name = StringId::new(string_start);
1443        assert_eq!(&*graph.strings().resolve(global_name).unwrap(), "my_func");
1444
1445        // Verify 1 file, 1 edge
1446        assert_eq!(result.per_file_edges.len(), 1);
1447        assert_eq!(result.per_file_edges[0].len(), 1);
1448
1449        // Verify edge was remapped to global IDs (node_offset=10)
1450        let edge = &result.per_file_edges[0][0];
1451        assert_eq!(edge.file, FileId::new(5));
1452        assert_eq!(edge.source, NodeId::new(10, 1)); // first node at slot 10
1453        assert_eq!(edge.target, NodeId::new(11, 1)); // second node at slot 11
1454
1455        // Gate 0c (iter-2 B2): per-file node IDs must be recorded in
1456        // commit order, one Vec per FilePlan, so the caller can
1457        // populate FileRegistry::per_file_nodes deterministically.
1458        assert_eq!(result.per_file_node_ids.len(), 1);
1459        assert_eq!(
1460            result.per_file_node_ids[0],
1461            vec![NodeId::new(10, 1), NodeId::new(11, 1)]
1462        );
1463    }
1464
1465    #[test]
1466    fn test_phase3_parallel_commit_empty() {
1467        use crate::graph::unified::concurrent::CodeGraph;
1468
1469        let mut graph = CodeGraph::new();
1470
1471        let plan = ChunkCommitPlan {
1472            file_plans: vec![],
1473            total_nodes: 0,
1474            total_strings: 0,
1475            total_edges: 0,
1476        };
1477
1478        let result = phase3_parallel_commit(&plan, &[], &mut graph);
1479        assert!(result.per_file_edges.is_empty());
1480        assert!(result.per_file_node_ids.is_empty());
1481        assert_eq!(result.total_nodes_written, 0);
1482        assert_eq!(result.total_strings_written, 0);
1483    }
1484
1485    /// Task 4 Step 4 Phase 1 — exercise the `GraphMutationTarget`
1486    /// trait's second implementor.
1487    ///
1488    /// Builds a tiny staging graph, hosts it in a fresh `RebuildGraph`,
1489    /// and asserts the committed nodes land in the **rebuild-local**
1490    /// arena — not in a `CodeGraph`. The test also confirms the
1491    /// per-file edges / node-id vectors the helper returns agree with
1492    /// the `CodeGraph` call-path result shape.
1493    ///
1494    /// If a future refactor accidentally routed Phase 3 back to a
1495    /// `CodeGraph` (e.g. through a hidden static `Arc::make_mut`), this
1496    /// test would observe an empty rebuild arena and fail.
1497    #[test]
1498    #[cfg(feature = "rebuild-internals")]
1499    fn phase3_parallel_commit_runs_against_rebuild_graph() {
1500        use super::super::staging::StagingGraph;
1501        use crate::graph::unified::concurrent::CodeGraph;
1502        use crate::graph::unified::mutation_target::GraphMutationTarget;
1503        use crate::graph::unified::node::NodeKind;
1504
1505        // Staging graph: 2 nodes + 1 string + 1 Calls edge (identical
1506        // shape to the CodeGraph test above, so any behavioural drift
1507        // between the two paths surfaces as different assertions).
1508        let mut sg = StagingGraph::new();
1509        let local_name = StringId::new_local(0);
1510        sg.intern_string(local_name, "rebuild_target".into());
1511        let entry = NodeEntry::new(NodeKind::Function, local_name, FileId::new(0));
1512        let n0 = sg.add_node(entry.clone());
1513        let entry2 = NodeEntry::new(NodeKind::Variable, local_name, FileId::new(0));
1514        let n1 = sg.add_node(entry2);
1515        sg.add_edge(
1516            n0,
1517            n1,
1518            EdgeKind::Calls {
1519                argument_count: 0,
1520                is_async: false,
1521            },
1522            FileId::new(0),
1523        );
1524
1525        // Produce a RebuildGraph from an empty CodeGraph; drop the
1526        // CodeGraph immediately so any subsequent mutation observed in
1527        // the rebuild cannot possibly be leaking back to a shared Arc.
1528        let mut rebuild = {
1529            let graph = CodeGraph::new();
1530            graph.clone_for_rebuild()
1531        };
1532
1533        // Pre-allocate leading slots on the rebuild-local arena +
1534        // interner so the file's ranges begin at a non-zero offset —
1535        // this is the same pattern the CodeGraph test uses, verifying
1536        // the trait's disjoint-borrow combinator threads through
1537        // identically.
1538        rebuild
1539            .nodes_mut()
1540            .alloc_range(10, &placeholder_entry())
1541            .unwrap();
1542        let string_start = rebuild.strings_mut().alloc_range(1).unwrap();
1543        assert_eq!(string_start, 1);
1544
1545        let file_ids = vec![FileId::new(5)];
1546        let offsets = GlobalOffsets {
1547            node_offset: 10,
1548            string_offset: string_start,
1549        };
1550        let plan = phase2_assign_ranges(&[&sg], &file_ids, &offsets);
1551
1552        rebuild
1553            .nodes_mut()
1554            .alloc_range(plan.total_nodes, &placeholder_entry())
1555            .unwrap();
1556        rebuild
1557            .strings_mut()
1558            .alloc_range(plan.total_strings)
1559            .unwrap();
1560
1561        // Phase 3 against the RebuildGraph. Inferred `G = RebuildGraph`.
1562        let result = phase3_parallel_commit(&plan, &[&sg], &mut rebuild);
1563
1564        // === Invariant: the written data lives in the rebuild-local
1565        // arena, not in any CodeGraph field. ===
1566        //
1567        // Two slot ranges exist on the rebuild's arena now:
1568        //   * slots 0..10 = pre-fill placeholders (each `Function` /
1569        //     `StringId::new(0)` — note every alloc_range writes a
1570        //     clone of the template entry).
1571        //   * slots 10..12 = the two committed nodes from `sg`.
1572        //
1573        // Fetch the two committed NodeIds and resolve their names
1574        // through the rebuild-local interner; the string must match
1575        // the staged value "rebuild_target", proving the commit ran
1576        // on the rebuild's own fields.
1577        let committed_ids = &result.per_file_node_ids[0];
1578        assert_eq!(
1579            committed_ids,
1580            &vec![NodeId::new(10, 1), NodeId::new(11, 1)],
1581            "Phase 3 must commit into slots 10..12 on the rebuild-local arena"
1582        );
1583
1584        let resolved_name = rebuild
1585            .nodes_mut()
1586            .get(NodeId::new(10, 1))
1587            .map(|entry| entry.name)
1588            .expect("committed node must exist in rebuild arena");
1589        // The name StringId on the committed node is a global ID
1590        // (Phase 3 remaps local → global); resolving it through the
1591        // rebuild-local interner must produce the staged value.
1592        let resolved_str = rebuild
1593            .strings_mut()
1594            .resolve(resolved_name)
1595            .expect("name must resolve in rebuild-local interner");
1596        assert_eq!(&*resolved_str, "rebuild_target");
1597
1598        // === Shape invariants match the CodeGraph path ===
1599        assert_eq!(result.total_nodes_written, 2);
1600        assert_eq!(result.total_strings_written, 1);
1601        assert_eq!(result.per_file_edges.len(), 1);
1602        assert_eq!(result.per_file_edges[0].len(), 1);
1603        let edge = &result.per_file_edges[0][0];
1604        assert_eq!(edge.file, FileId::new(5));
1605        assert_eq!(edge.source, NodeId::new(10, 1));
1606        assert_eq!(edge.target, NodeId::new(11, 1));
1607    }
1608
1609    #[test]
1610    fn test_commit_single_file_string_remap() {
1611        use super::super::staging::StagingGraph;
1612        use crate::graph::unified::node::NodeKind;
1613
1614        let mut sg = StagingGraph::new();
1615        let local_0 = StringId::new_local(0);
1616        let local_1 = StringId::new_local(1);
1617        sg.intern_string(local_0, "alpha".into());
1618        sg.intern_string(local_1, "beta".into());
1619
1620        let mut entry = NodeEntry::new(NodeKind::Function, local_0, FileId::new(0));
1621        entry.signature = Some(local_1);
1622        sg.add_node(entry);
1623
1624        let plan = FilePlan {
1625            parsed_index: 0,
1626            file_id: FileId::new(42),
1627            node_range: 10..11,
1628            string_range: 20..22,
1629        };
1630
1631        let mut node_slots = vec![Slot::new_occupied(1, placeholder_entry())];
1632        let mut str_slots: Vec<Option<Arc<str>>> = vec![None, None];
1633        let mut rc_slots: Vec<u32> = vec![0, 0];
1634
1635        let result = commit_single_file(&sg, &plan, &mut node_slots, &mut str_slots, &mut rc_slots);
1636
1637        // Strings written
1638        assert_eq!(str_slots[0].as_deref(), Some("alpha"));
1639        assert_eq!(str_slots[1].as_deref(), Some("beta"));
1640        assert_eq!(rc_slots[0], 1);
1641        assert_eq!(rc_slots[1], 1);
1642        assert_eq!(result.strings_written, 2);
1643
1644        // Node entry has remapped StringIds
1645        if let crate::graph::unified::storage::SlotState::Occupied(entry) = node_slots[0].state() {
1646            assert_eq!(entry.name, StringId::new(20)); // global slot 20
1647            assert_eq!(entry.signature, Some(StringId::new(21))); // global slot 21
1648            assert_eq!(entry.file, FileId::new(42));
1649        } else {
1650            panic!("Expected occupied slot");
1651        }
1652        assert_eq!(result.nodes_written, 1);
1653
1654        // Per-file node IDs are recorded in commit order (Gate 0c bucket contract).
1655        assert_eq!(result.node_ids, vec![NodeId::new(10, 1)]);
1656
1657        // No edges
1658        assert!(result.edges.is_empty());
1659    }
1660
1661    #[test]
1662    fn test_remap_edge_kind_message_queue_other() {
1663        let mut remap = HashMap::new();
1664        remap.insert(StringId::new(10), StringId::new(110));
1665        remap.insert(StringId::new(20), StringId::new(220));
1666
1667        let mut kind = EdgeKind::MessageQueue {
1668            protocol: MqProtocol::Other(StringId::new(10)),
1669            topic: Some(StringId::new(20)),
1670        };
1671        remap_edge_kind_string_ids(&mut kind, &remap);
1672        assert!(matches!(
1673            kind,
1674            EdgeKind::MessageQueue {
1675                protocol: MqProtocol::Other(proto),
1676                topic: Some(topic),
1677            } if proto == StringId::new(110) && topic == StringId::new(220)
1678        ));
1679    }
1680
1681    // === Phase 4 tests ===
1682
1683    #[test]
1684    fn test_phase4_apply_global_remap_basic() {
1685        use crate::graph::unified::node::NodeKind;
1686        use crate::graph::unified::storage::NodeArena;
1687
1688        let mut arena = NodeArena::new();
1689
1690        // Allocate two nodes with duplicate string IDs (2 and 3 are dupes of 1)
1691        let entry1 = NodeEntry::new(NodeKind::Function, StringId::new(1), FileId::new(0));
1692        let mut entry2 = NodeEntry::new(NodeKind::Variable, StringId::new(2), FileId::new(0));
1693        entry2.signature = Some(StringId::new(3));
1694
1695        arena.alloc(entry1).unwrap();
1696        arena.alloc(entry2).unwrap();
1697
1698        // Edges with string IDs that need remapping
1699        let mut all_edges = vec![vec![PendingEdge {
1700            source: NodeId::new(0, 1),
1701            target: NodeId::new(1, 1),
1702            kind: EdgeKind::Imports {
1703                alias: Some(StringId::new(3)),
1704                is_wildcard: false,
1705            },
1706            file: FileId::new(0),
1707            spans: vec![],
1708        }]];
1709
1710        // Dedup remap: 2→1, 3→1
1711        let mut remap = HashMap::new();
1712        remap.insert(StringId::new(2), StringId::new(1));
1713        remap.insert(StringId::new(3), StringId::new(1));
1714
1715        phase4_apply_global_remap(&mut arena, &mut all_edges, &remap);
1716
1717        // Check that node 1's name was remapped from 2→1
1718        let (_, entry) = arena.iter().nth(1).unwrap();
1719        assert_eq!(entry.name, StringId::new(1));
1720        assert_eq!(entry.signature, Some(StringId::new(1)));
1721
1722        // Check that edge's alias was remapped from 3→1
1723        if let EdgeKind::Imports { alias, .. } = &all_edges[0][0].kind {
1724            assert_eq!(*alias, Some(StringId::new(1)));
1725        } else {
1726            panic!("Expected Imports edge");
1727        }
1728    }
1729
1730    #[test]
1731    fn test_phase4_apply_global_remap_empty() {
1732        use crate::graph::unified::storage::NodeArena;
1733
1734        let mut arena = NodeArena::new();
1735        let mut edges: Vec<Vec<PendingEdge>> = vec![];
1736        let remap = HashMap::new();
1737
1738        // Should be a no-op
1739        phase4_apply_global_remap(&mut arena, &mut edges, &remap);
1740    }
1741
1742    #[test]
1743    fn test_pending_edges_to_delta_basic() {
1744        let edges = vec![
1745            vec![
1746                PendingEdge {
1747                    source: NodeId::new(0, 1),
1748                    target: NodeId::new(1, 1),
1749                    kind: EdgeKind::Calls {
1750                        argument_count: 0,
1751                        is_async: false,
1752                    },
1753                    file: FileId::new(0),
1754                    spans: vec![],
1755                },
1756                PendingEdge {
1757                    source: NodeId::new(1, 1),
1758                    target: NodeId::new(2, 1),
1759                    kind: EdgeKind::References,
1760                    file: FileId::new(0),
1761                    spans: vec![],
1762                },
1763            ],
1764            vec![PendingEdge {
1765                source: NodeId::new(3, 1),
1766                target: NodeId::new(4, 1),
1767                kind: EdgeKind::Defines,
1768                file: FileId::new(1),
1769                spans: vec![],
1770            }],
1771        ];
1772
1773        let (deltas, final_seq) = pending_edges_to_delta(&edges, 100);
1774
1775        assert_eq!(deltas.len(), 2);
1776        assert_eq!(deltas[0].len(), 2);
1777        assert_eq!(deltas[1].len(), 1);
1778        assert_eq!(final_seq, 103);
1779
1780        // Check sequence numbers are monotonic
1781        assert_eq!(deltas[0][0].seq, 100);
1782        assert_eq!(deltas[0][1].seq, 101);
1783        assert_eq!(deltas[1][0].seq, 102);
1784
1785        // Check all are Add operations
1786        assert!(matches!(deltas[0][0].op, DeltaOp::Add));
1787        assert!(matches!(deltas[1][0].op, DeltaOp::Add));
1788    }
1789
1790    #[test]
1791    fn test_pending_edges_to_delta_empty() {
1792        let edges: Vec<Vec<PendingEdge>> = vec![];
1793        let (deltas, final_seq) = pending_edges_to_delta(&edges, 0);
1794        assert!(deltas.is_empty());
1795        assert_eq!(final_seq, 0);
1796    }
1797
1798    // ==================================================================
1799    // Task 4 Step 4 Phase 2: rebuild-plane coverage for migrated helpers.
1800    //
1801    // Each test below proves that the migrated helper runs against a
1802    // `RebuildGraph` (not just a `CodeGraph`) and that the mutation
1803    // lands on the rebuild-local state. Together with the CodeGraph
1804    // tests that still exercise the same helpers on the full-build
1805    // path, they form the "runs on both implementors" coverage
1806    // contract for `GraphMutationTarget` consumers.
1807    // ==================================================================
1808
1809    /// Seed two call-compatible nodes (both `NodeKind::Function`) under
1810    /// the same qualified-name StringId across two distinct files, then
1811    /// run [`phase4c_prime_unify_cross_file_nodes`] against a
1812    /// [`RebuildGraph`]. Verify the loser node is tombstoned
1813    /// (name + qualified_name cleared per `merge_node_into`'s contract)
1814    /// and that pending edges pointing at the loser are rewritten to
1815    /// the winner.
1816    #[test]
1817    #[cfg(feature = "rebuild-internals")]
1818    fn phase4c_prime_unify_cross_file_nodes_runs_against_rebuild_graph() {
1819        use crate::graph::unified::concurrent::CodeGraph;
1820        use crate::graph::unified::mutation_target::GraphMutationTarget;
1821        use crate::graph::unified::node::NodeKind;
1822
1823        let mut rebuild = {
1824            let graph = CodeGraph::new();
1825            graph.clone_for_rebuild()
1826        };
1827
1828        // Intern a shared qualified name. On the rebuild-local
1829        // interner; strings() resolves it for later assertions.
1830        let qname_sid = rebuild.strings_mut().intern("my_mod::my_func").unwrap();
1831
1832        // Register two files that host the duplicate Function nodes.
1833        let file_a = FileId::new(7);
1834        let file_b = FileId::new(8);
1835
1836        // Build two `NodeKind::Function` entries sharing the same
1837        // qualified_name. Winner has a wider span (start_line > 0 and
1838        // end_line > start_line) to exercise the winner-selection
1839        // tie-break.
1840        let mut winner_entry = NodeEntry::new(NodeKind::Function, qname_sid, file_a);
1841        winner_entry.qualified_name = Some(qname_sid);
1842        winner_entry.start_line = 10;
1843        winner_entry.end_line = 30;
1844
1845        let mut loser_entry = NodeEntry::new(NodeKind::Function, qname_sid, file_b);
1846        loser_entry.qualified_name = Some(qname_sid);
1847        // Narrower span → loses the tie-break.
1848        loser_entry.start_line = 5;
1849        loser_entry.end_line = 6;
1850
1851        let winner_id = rebuild.nodes_mut().alloc(winner_entry).unwrap();
1852        let loser_id = rebuild.nodes_mut().alloc(loser_entry).unwrap();
1853
1854        // A pending edge whose target is the loser — the remap table
1855        // should rewrite it to point at the winner.
1856        let mut all_edges = vec![vec![PendingEdge {
1857            source: winner_id, // any valid source — the helper only rewrites targets here
1858            target: loser_id,
1859            kind: EdgeKind::Calls {
1860                argument_count: 0,
1861                is_async: false,
1862            },
1863            file: file_b,
1864            spans: vec![],
1865        }]];
1866
1867        let stats = phase4c_prime_unify_cross_file_nodes(&mut rebuild, &mut all_edges);
1868
1869        // Stats shape
1870        assert_eq!(stats.nodes_merged, 1, "exactly one loser was tombstoned");
1871        assert_eq!(stats.candidate_pairs_examined, 1);
1872        assert_eq!(stats.edges_rewritten, 1);
1873
1874        // Winner node survived with qualified_name intact.
1875        let winner_entry_after = GraphMutationTarget::nodes(&rebuild)
1876            .get(winner_id)
1877            .expect("winner must remain live");
1878        assert_eq!(
1879            winner_entry_after.qualified_name,
1880            Some(qname_sid),
1881            "winner keeps its qualified_name"
1882        );
1883
1884        // Loser entry was merged via `merge_node_into`, which clears
1885        // `name` and `qualified_name` to make the slot name-invisible.
1886        let loser_entry_after = GraphMutationTarget::nodes(&rebuild)
1887            .get(loser_id)
1888            .expect("loser slot remains live (inert) per §F.1 bijection");
1889        assert_eq!(
1890            loser_entry_after.qualified_name, None,
1891            "loser qualified_name cleared by merge_node_into"
1892        );
1893
1894        // Pending edge target rewritten winner-ward.
1895        assert_eq!(
1896            all_edges[0][0].target, winner_id,
1897            "PendingEdge.target rewritten from loser → winner"
1898        );
1899    }
1900
1901    /// Lock in the Phase 4c-prime tie-break ordering Codex blessed in iter-1:
1902    /// primary = `start_line > 0`, tie-break 1 = wider span, tie-break 2 =
1903    /// lexicographically smaller **file path** (stable across rebuild
1904    /// representations), final fallback = smaller `NodeId::index()`.
1905    ///
1906    /// This test exercises the tie-break 2 path: two candidates with real
1907    /// spans of identical width, hosted in two different files that differ
1908    /// only in filename ordering. The winner must be the node whose file
1909    /// path sorts earlier, regardless of NodeId allocation order.
1910    #[test]
1911    #[cfg(feature = "rebuild-internals")]
1912    fn phase4c_prime_tie_break_prefers_lex_smaller_path_over_node_id() {
1913        use crate::graph::unified::concurrent::CodeGraph;
1914        use crate::graph::unified::node::NodeKind;
1915        use std::path::Path;
1916
1917        let mut graph = CodeGraph::new();
1918        let qname = graph.strings_mut().intern("shared_qname").unwrap();
1919        // Register two paths whose lexical ordering is the reverse of
1920        // the registration (and hence NodeId) order. This isolates the
1921        // path-based tie-break from any accidental NodeId-ordering
1922        // coincidence: if the helper fell back to NodeId the "wrong"
1923        // node would win.
1924        let high_path_file = graph
1925            .files_mut()
1926            .register(Path::new("zzz_late.rs"))
1927            .unwrap();
1928        let low_path_file = graph
1929            .files_mut()
1930            .register(Path::new("aaa_early.rs"))
1931            .unwrap();
1932
1933        // Allocate the `zzz_late.rs` node first so its NodeId::index() is
1934        // numerically smaller than the `aaa_early.rs` node's. With
1935        // identical spans, NodeId-only tie-break would incorrectly pick
1936        // the `zzz_late.rs` node. The correct behaviour is that the
1937        // path-based tie-break picks the `aaa_early.rs` node.
1938        let mut high_entry = NodeEntry::new(NodeKind::Function, qname, high_path_file);
1939        high_entry.qualified_name = Some(qname);
1940        high_entry.start_line = 10;
1941        high_entry.end_line = 20;
1942        let high_node = graph.nodes_mut().alloc(high_entry).unwrap();
1943
1944        let mut low_entry = NodeEntry::new(NodeKind::Function, qname, low_path_file);
1945        low_entry.qualified_name = Some(qname);
1946        // Identical span width — forces the tie-break to ignore primary
1947        // + tie-break 1 (span width) and reach tie-break 2 (path).
1948        low_entry.start_line = 10;
1949        low_entry.end_line = 20;
1950        let low_node = graph.nodes_mut().alloc(low_entry).unwrap();
1951
1952        graph.rebuild_indices();
1953
1954        let mut all_edges: Vec<Vec<PendingEdge>> = Vec::new();
1955        let stats = phase4c_prime_unify_cross_file_nodes(&mut graph, &mut all_edges);
1956
1957        assert_eq!(
1958            stats.nodes_merged, 1,
1959            "one of the duplicate nodes must be merged into the other"
1960        );
1961
1962        // The `aaa_early.rs` node wins because its path sorts lexically
1963        // smaller. Verify its qualified_name is intact.
1964        let low_after = graph
1965            .nodes()
1966            .get(low_node)
1967            .expect("winner slot remains live");
1968        assert_eq!(
1969            low_after.qualified_name,
1970            Some(qname),
1971            "path-earlier node keeps qualified_name as the unification winner"
1972        );
1973
1974        // And the `zzz_late.rs` node — despite a numerically smaller
1975        // NodeId::index() — was merged away.
1976        let high_after = graph
1977            .nodes()
1978            .get(high_node)
1979            .expect("loser slot remains inert (Gate 0d bijection contract)");
1980        assert_eq!(
1981            high_after.qualified_name, None,
1982            "path-later node loses even when its NodeId::index() is smaller"
1983        );
1984    }
1985
1986    /// When the path-based tie-break ALSO ties (two duplicate nodes in the
1987    /// same file — rare but possible via duplicate definitions), the
1988    /// deterministic fallback is `b.index().cmp(&a.index())` which picks
1989    /// the node with the **smaller** NodeId index. Lock that in so future
1990    /// refactors of the tie-break don't accidentally flip the fallback
1991    /// direction.
1992    #[test]
1993    #[cfg(feature = "rebuild-internals")]
1994    fn phase4c_prime_tie_break_falls_back_to_smaller_node_id_on_identical_path() {
1995        use crate::graph::unified::concurrent::CodeGraph;
1996        use crate::graph::unified::node::NodeKind;
1997        use std::path::Path;
1998
1999        let mut graph = CodeGraph::new();
2000        let qname = graph.strings_mut().intern("shared_qname").unwrap();
2001        let file = graph.files_mut().register(Path::new("shared.rs")).unwrap();
2002
2003        // Allocate two duplicate nodes in the SAME file with identical
2004        // spans. The only thing that differs between them is their
2005        // NodeId index (allocation order). Tie-breaks 1 (span width)
2006        // and 2 (path) both return Equal; the final `b.index().cmp(&a.index())`
2007        // fallback picks the smaller index as the winner.
2008        let mut first_entry = NodeEntry::new(NodeKind::Function, qname, file);
2009        first_entry.qualified_name = Some(qname);
2010        first_entry.start_line = 1;
2011        first_entry.end_line = 5;
2012        let first_node = graph.nodes_mut().alloc(first_entry).unwrap();
2013
2014        let mut second_entry = NodeEntry::new(NodeKind::Function, qname, file);
2015        second_entry.qualified_name = Some(qname);
2016        second_entry.start_line = 1;
2017        second_entry.end_line = 5;
2018        let second_node = graph.nodes_mut().alloc(second_entry).unwrap();
2019
2020        assert!(
2021            first_node.index() < second_node.index(),
2022            "precondition: first_node's arena slot precedes second_node's"
2023        );
2024
2025        graph.rebuild_indices();
2026
2027        let mut all_edges: Vec<Vec<PendingEdge>> = Vec::new();
2028        let stats = phase4c_prime_unify_cross_file_nodes(&mut graph, &mut all_edges);
2029
2030        assert_eq!(stats.nodes_merged, 1);
2031
2032        // Smaller NodeId::index() wins.
2033        let winner_after = graph.nodes().get(first_node).expect("winner live");
2034        assert_eq!(
2035            winner_after.qualified_name,
2036            Some(qname),
2037            "smaller-index node wins the same-path / same-span tie-break"
2038        );
2039        let loser_after = graph.nodes().get(second_node).expect("loser inert");
2040        assert_eq!(
2041            loser_after.qualified_name, None,
2042            "larger-index node loses the same-path / same-span tie-break"
2043        );
2044    }
2045
2046    /// Drive the free [`rebuild_indices`] function against both a
2047    /// `RebuildGraph` and a `CodeGraph` seeded with identical data,
2048    /// and verify the resulting `AuxiliaryIndices` are structurally
2049    /// equivalent (same name buckets, same kind buckets).
2050    #[test]
2051    #[cfg(feature = "rebuild-internals")]
2052    fn rebuild_indices_runs_against_rebuild_graph() {
2053        use crate::graph::unified::concurrent::CodeGraph;
2054        use crate::graph::unified::mutation_target::GraphMutationTarget;
2055        use crate::graph::unified::node::NodeKind;
2056
2057        // === CodeGraph baseline ===
2058        let mut code_graph = CodeGraph::new();
2059        let alpha_id_code = code_graph.strings_mut().intern("alpha").unwrap();
2060        let mut code_entry = NodeEntry::new(NodeKind::Function, alpha_id_code, FileId::new(1));
2061        code_entry.qualified_name = Some(alpha_id_code);
2062        let code_node_id = code_graph.nodes_mut().alloc(code_entry).unwrap();
2063        rebuild_indices(&mut code_graph);
2064        let code_buckets_function: Vec<NodeId> =
2065            code_graph.indices().by_kind(NodeKind::Function).to_vec();
2066
2067        // === RebuildGraph path ===
2068        let mut rebuild = {
2069            let graph = CodeGraph::new();
2070            graph.clone_for_rebuild()
2071        };
2072        let alpha_id_rebuild = rebuild.strings_mut().intern("alpha").unwrap();
2073        let mut rebuild_entry =
2074            NodeEntry::new(NodeKind::Function, alpha_id_rebuild, FileId::new(1));
2075        rebuild_entry.qualified_name = Some(alpha_id_rebuild);
2076        let rebuild_node_id = rebuild.nodes_mut().alloc(rebuild_entry).unwrap();
2077        rebuild_indices(&mut rebuild);
2078
2079        // The node ids are both the first allocation on their
2080        // respective arenas, so they share slot indices and
2081        // generations.
2082        assert_eq!(code_node_id, rebuild_node_id);
2083
2084        // The trait-method accessor routes through the impl on
2085        // `RebuildGraph`; the returned indices came from the
2086        // rebuild-local `AuxiliaryIndices` (not a CodeGraph's).
2087        let rebuild_buckets_function: Vec<NodeId> = GraphMutationTarget::indices(&rebuild)
2088            .by_kind(NodeKind::Function)
2089            .to_vec();
2090
2091        assert_eq!(
2092            code_buckets_function, rebuild_buckets_function,
2093            "rebuild_indices must produce equivalent Function buckets on both paths"
2094        );
2095        // Name bucket also present on the rebuild side.
2096        let by_name: Vec<NodeId> = GraphMutationTarget::indices(&rebuild)
2097            .by_name(alpha_id_rebuild)
2098            .to_vec();
2099        assert_eq!(by_name, vec![rebuild_node_id]);
2100    }
2101
2102    /// Drive [`phase4d_bulk_insert_edges`] against a `RebuildGraph`.
2103    /// Seed two nodes, construct a per-file `PendingEdge` vector, and
2104    /// prove the edges land on the rebuild-local edge store with the
2105    /// expected monotonically-advancing sequence counter.
2106    #[test]
2107    #[cfg(feature = "rebuild-internals")]
2108    fn phase4d_bulk_insert_edges_runs_against_rebuild_graph() {
2109        use crate::graph::unified::concurrent::CodeGraph;
2110        use crate::graph::unified::mutation_target::GraphMutationTarget;
2111        use crate::graph::unified::node::NodeKind;
2112
2113        let mut rebuild = {
2114            let graph = CodeGraph::new();
2115            graph.clone_for_rebuild()
2116        };
2117
2118        let name_sid = rebuild.strings_mut().intern("edge_target").unwrap();
2119        let file = FileId::new(3);
2120
2121        let n_source = rebuild
2122            .nodes_mut()
2123            .alloc(NodeEntry::new(NodeKind::Function, name_sid, file))
2124            .unwrap();
2125        let n_target = rebuild
2126            .nodes_mut()
2127            .alloc(NodeEntry::new(NodeKind::Variable, name_sid, file))
2128            .unwrap();
2129
2130        // Pre-condition: no edges in the rebuild-local forward store.
2131        let pre_counter = GraphMutationTarget::edges(&rebuild).forward().seq_counter();
2132
2133        let per_file_edges = vec![vec![
2134            PendingEdge {
2135                source: n_source,
2136                target: n_target,
2137                kind: EdgeKind::Calls {
2138                    argument_count: 0,
2139                    is_async: false,
2140                },
2141                file,
2142                spans: vec![],
2143            },
2144            PendingEdge {
2145                source: n_source,
2146                target: n_target,
2147                kind: EdgeKind::Calls {
2148                    argument_count: 1,
2149                    is_async: false,
2150                },
2151                file,
2152                spans: vec![],
2153            },
2154        ]];
2155
2156        let final_seq = phase4d_bulk_insert_edges(&mut rebuild, &per_file_edges);
2157
2158        // Seq counter advanced by exactly two edges.
2159        assert_eq!(
2160            final_seq,
2161            pre_counter + 2,
2162            "phase4d_bulk_insert_edges must advance seq by edge count"
2163        );
2164
2165        // Rebuild-local forward store now contains both edges.
2166        let forward = GraphMutationTarget::edges(&rebuild).forward();
2167        let after_counter = forward.seq_counter();
2168        assert_eq!(after_counter, pre_counter + 2);
2169        // Forward delta must carry the two new edges.
2170        assert!(
2171            forward.delta().iter().filter(|e| e.is_add()).count() >= 2,
2172            "expected at least two Add edges in the rebuild-local forward delta"
2173        );
2174        drop(forward);
2175
2176        // Empty input is a no-op on the edge store.
2177        let empty_final = phase4d_bulk_insert_edges(&mut rebuild, &[]);
2178        assert_eq!(empty_final, pre_counter + 2, "empty input is a no-op");
2179    }
2180}