Skip to main content

sqry_core/graph/unified/build/
parallel_commit.rs

1//! Parallel commit pipeline for pre-allocated ID ranges.
2//!
3//! Replaces the serial commit loop with a four-phase pipeline:
4//! Phase 2: Count + range assignment via prefix sums
5//! Phase 3: Parallel commit into disjoint pre-allocated ranges
6//! Phase 4: String dedup, remap, index build, edge bulk insert
7//!
8//! # Phase 3 Architecture
9//!
10//! Phase 3 uses `split_at_mut` to carve disjoint sub-slices from pre-allocated
11//! arena and interner ranges, then uses `rayon` to commit each file's staging
12//! graph in parallel without locks:
13//!
14//! ```text
15//! NodeArena slots:   [   file0   |   file1   |   file2   ]
16//! StringInterner:    [   file0   |   file1   |   file2   ]
17//!                         ↑            ↑            ↑
18//!                    split_at_mut  split_at_mut  remainder
19//! ```
20//!
21//! Each file's `commit_single_file` receives its own disjoint slices and
22//! operates independently without contention.
23
24use std::collections::HashMap;
25use std::ops::Range;
26use std::sync::Arc;
27
28use rayon::prelude::*;
29
30use crate::graph::unified::edge::delta::{DeltaEdge, DeltaOp};
31use crate::graph::unified::edge::kind::{EdgeKind, MqProtocol};
32use crate::graph::unified::file::FileId;
33use crate::graph::unified::node::NodeId;
34use crate::graph::unified::storage::NodeArena;
35use crate::graph::unified::storage::arena::{NodeEntry, Slot};
36use crate::graph::unified::storage::interner::StringInterner;
37use crate::graph::unified::string::StringId;
38
39use super::pass3_intra::PendingEdge;
40use super::staging::{StagingGraph, StagingOp};
41
42/// Running offsets carried across chunks for deterministic ID assignment.
43///
44/// Each chunk's ranges begin where the previous chunk ended, ensuring
45/// globally unique, contiguous ID spaces.
46#[derive(Debug, Clone, Default)]
47pub struct GlobalOffsets {
48    /// Next available node slot index.
49    pub node_offset: u32,
50    /// Next available string slot index.
51    pub string_offset: u32,
52}
53
54/// Per-file commit plan with pre-assigned ID ranges.
55#[derive(Debug, Clone)]
56pub struct FilePlan {
57    /// Index into the chunk's `ParsedFile` vec.
58    pub parsed_index: usize,
59    /// Pre-assigned `FileId` from batch registration.
60    pub file_id: FileId,
61    /// Node slot range [start..end) in `NodeArena`.
62    pub node_range: Range<u32>,
63    /// String slot range [start..end) in `StringInterner`.
64    pub string_range: Range<u32>,
65}
66
67/// Plan for parallel commit of a single chunk.
68#[derive(Debug, Clone)]
69pub struct ChunkCommitPlan {
70    /// Per-file plans in deterministic file order.
71    pub file_plans: Vec<FilePlan>,
72    /// Total nodes across all files in this chunk.
73    pub total_nodes: u32,
74    /// Total strings across all files in this chunk.
75    pub total_strings: u32,
76    /// Total edges across all files in this chunk.
77    pub total_edges: u64,
78}
79
80/// Compute commit plan from parsed files using prefix-sum range assignment.
81///
82/// Each file gets contiguous, non-overlapping ranges for nodes and strings.
83/// Ranges start from the given global offsets, which carry forward across
84/// chunks.
85///
86/// # Arguments
87///
88/// * `node_counts` - Per-file node counts (from `StagingGraph::node_count_u32()`)
89/// * `string_counts` - Per-file string counts
90/// * `edge_counts` - Per-file edge counts (used for `total_edges` only)
91/// * `file_ids` - Pre-assigned `FileId`s from batch registration
92/// * `node_offset` - Running global node offset across chunks
93/// * `string_offset` - Running global string offset across chunks
94///
95/// # Panics
96///
97/// Panics in debug builds if the per-chunk accounting arrays do not have
98/// identical lengths.
99#[must_use]
100pub fn compute_commit_plan(
101    node_counts: &[u32],
102    string_counts: &[u32],
103    edge_counts: &[u32],
104    file_ids: &[FileId],
105    node_offset: u32,
106    string_offset: u32,
107) -> ChunkCommitPlan {
108    debug_assert_eq!(node_counts.len(), string_counts.len());
109    debug_assert_eq!(node_counts.len(), edge_counts.len());
110    debug_assert_eq!(node_counts.len(), file_ids.len());
111
112    let mut plans = Vec::with_capacity(node_counts.len());
113    let mut node_cursor = node_offset;
114    let mut string_cursor = string_offset;
115    let mut total_edges: u64 = 0;
116
117    for i in 0..node_counts.len() {
118        let nc = node_counts[i];
119        let sc = string_counts[i];
120
121        let node_end = node_cursor
122            .checked_add(nc)
123            .expect("node ID space overflow in commit plan");
124        let string_end = string_cursor
125            .checked_add(sc)
126            .expect("string ID space overflow in commit plan");
127
128        plans.push(FilePlan {
129            parsed_index: i,
130            file_id: file_ids[i],
131            node_range: node_cursor..node_end,
132            string_range: string_cursor..string_end,
133        });
134
135        node_cursor = node_end;
136        string_cursor = string_end;
137        total_edges += u64::from(edge_counts[i]);
138    }
139
140    ChunkCommitPlan {
141        file_plans: plans,
142        total_nodes: node_cursor - node_offset,
143        total_strings: string_cursor - string_offset,
144        total_edges,
145    }
146}
147
148/// Execute Phase 2: count + range assignment for a parsed chunk.
149///
150/// Extracts per-file counts from staging graphs and delegates to
151/// [`compute_commit_plan`] for prefix-sum range assignment.
152#[must_use]
153pub fn phase2_assign_ranges(
154    staging_graphs: &[&StagingGraph],
155    file_ids: &[FileId],
156    offsets: &GlobalOffsets,
157) -> ChunkCommitPlan {
158    let node_counts: Vec<u32> = staging_graphs
159        .iter()
160        .map(|sg| sg.node_count_u32())
161        .collect();
162    let string_counts: Vec<u32> = staging_graphs
163        .iter()
164        .map(|sg| sg.string_count_u32())
165        .collect();
166    let edge_counts: Vec<u32> = staging_graphs
167        .iter()
168        .map(|sg| sg.edge_count_u32())
169        .collect();
170
171    compute_commit_plan(
172        &node_counts,
173        &string_counts,
174        &edge_counts,
175        file_ids,
176        offsets.node_offset,
177        offsets.string_offset,
178    )
179}
180
181/// Phase 3 result: per-file edges and total written counts for validation.
182pub struct Phase3Result {
183    /// Per-file edge collections for Phase 4 bulk insert.
184    pub per_file_edges: Vec<Vec<PendingEdge>>,
185    /// Total nodes actually written (for validation against planned totals).
186    pub total_nodes_written: usize,
187    /// Total strings actually written (for validation against planned totals).
188    pub total_strings_written: usize,
189    /// Total edges collected across all files.
190    pub total_edges_collected: usize,
191}
192
193/// Execute Phase 3: parallel commit into disjoint pre-allocated ranges.
194///
195/// Pre-splits arena and interner slices into per-file disjoint sub-slices
196/// using `split_at_mut`, then uses `rayon` `par_iter` for lock-free parallel
197/// writes. Each file's staging graph is committed independently.
198///
199/// Returns [`Phase3Result`] with per-file edges and written counts so the
200/// caller can validate against plan totals and truncate allocations on
201/// mismatch.
202///
203/// # Panics
204///
205/// Panics if `plan.total_nodes` or `plan.total_strings` exceeds the
206/// pre-allocated range in the arena or interner.
207#[must_use]
208pub fn phase3_parallel_commit(
209    plan: &ChunkCommitPlan,
210    staging_graphs: &[&StagingGraph],
211    arena: &mut NodeArena,
212    interner: &mut StringInterner,
213) -> Phase3Result {
214    if plan.file_plans.is_empty() {
215        return Phase3Result {
216            per_file_edges: Vec::new(),
217            total_nodes_written: 0,
218            total_strings_written: 0,
219            total_edges_collected: 0,
220        };
221    }
222
223    // Determine the start of the pre-allocated ranges.
224    let node_start = plan.file_plans[0].node_range.start;
225    let string_start = plan.file_plans[0].string_range.start;
226
227    // Get mutable slices covering the entire pre-allocated region.
228    let node_slice = arena.bulk_slice_mut(node_start, plan.total_nodes);
229    let (str_slice, rc_slice) = interner.bulk_slices_mut(string_start, plan.total_strings);
230
231    // Pre-split into per-file disjoint sub-slices using split_at_mut.
232    let mut node_remaining = &mut *node_slice;
233    let mut str_remaining = &mut *str_slice;
234    let mut rc_remaining = &mut *rc_slice;
235
236    #[allow(clippy::type_complexity)]
237    let mut file_work: Vec<(
238        &mut [Slot<NodeEntry>],
239        &mut [Option<Arc<str>>],
240        &mut [u32],
241        &FilePlan,
242        usize,
243    )> = Vec::with_capacity(plan.file_plans.len());
244
245    for (i, file_plan) in plan.file_plans.iter().enumerate() {
246        let nc = (file_plan.node_range.end - file_plan.node_range.start) as usize;
247        let sc = (file_plan.string_range.end - file_plan.string_range.start) as usize;
248
249        let (n, nr) = node_remaining.split_at_mut(nc);
250        let (s, sr) = str_remaining.split_at_mut(sc);
251        let (r, rr) = rc_remaining.split_at_mut(sc);
252
253        file_work.push((n, s, r, file_plan, i));
254        node_remaining = nr;
255        str_remaining = sr;
256        rc_remaining = rr;
257    }
258
259    // Parallel commit — each closure owns disjoint slices, no contention.
260    let results: Vec<FileCommitResult> = file_work
261        .into_par_iter()
262        .map(|(node_slots, str_slots, rc_slots, file_plan, idx)| {
263            commit_single_file(
264                staging_graphs[idx],
265                file_plan,
266                node_slots,
267                str_slots,
268                rc_slots,
269            )
270        })
271        .collect();
272
273    let total_nodes_written: usize = results.iter().map(|r| r.nodes_written).sum();
274    let total_strings_written: usize = results.iter().map(|r| r.strings_written).sum();
275    let total_edges_collected: usize = results.iter().map(|r| r.edges.len()).sum();
276    let per_file_edges = results.into_iter().map(|r| r.edges).collect();
277
278    Phase3Result {
279        per_file_edges,
280        total_nodes_written,
281        total_strings_written,
282        total_edges_collected,
283    }
284}
285
286/// Commit a single file's staging graph into pre-allocated disjoint ranges.
287///
288/// This function operates on slices that belong exclusively to this file,
289/// so it requires no locks or synchronization.
290///
291/// # Steps
292///
293/// 1. **Strings**: Extract `InternString` ops, write `Arc<str>` values into
294///    pre-allocated string slots, build local→global `StringId` remap.
295/// 2. **Nodes**: Extract `AddNode` ops, apply string remap to each `NodeEntry`,
296///    set `file_id`, write into pre-allocated node slots, build expected→actual
297///    `NodeId` remap.
298/// 3. **Edges**: Extract `AddEdge` ops, apply node ID remap to source/target,
299///    assign pre-computed sequence numbers, return as `PendingEdge` vec.
300// Result of committing a single file: edges + actual written counts.
301struct FileCommitResult {
302    edges: Vec<PendingEdge>,
303    nodes_written: usize,
304    strings_written: usize,
305}
306
307fn commit_single_file(
308    staging: &StagingGraph,
309    plan: &FilePlan,
310    node_slots: &mut [Slot<NodeEntry>],
311    str_slots: &mut [Option<Arc<str>>],
312    rc_slots: &mut [u32],
313) -> FileCommitResult {
314    let ops = staging.operations();
315
316    // --- Step 1: Write strings, build local→global remap ---
317    let (string_remap, strings_written) = write_strings(ops, plan, str_slots, rc_slots);
318
319    // --- Step 2: Write nodes, build expected→actual node ID remap ---
320    let (node_remap, nodes_written) = write_nodes(ops, plan, node_slots, &string_remap);
321
322    // --- Step 3: Collect remapped edges with pre-assigned sequence numbers ---
323    let edges = collect_edges(ops, plan, &node_remap, &string_remap);
324
325    FileCommitResult {
326        edges,
327        nodes_written,
328        strings_written,
329    }
330}
331
332/// Write staged strings into pre-allocated interner slots.
333///
334/// Validates that each `InternString` op has a local `StringId` and that
335/// no duplicate local IDs exist (matching the serial `commit_strings` checks).
336///
337/// Returns `(remap, strings_written)`.
338fn write_strings(
339    ops: &[StagingOp],
340    plan: &FilePlan,
341    str_slots: &mut [Option<Arc<str>>],
342    rc_slots: &mut [u32],
343) -> (HashMap<StringId, StringId>, usize) {
344    let mut remap = HashMap::new();
345    let mut string_cursor = 0usize;
346
347    for op in ops {
348        if let StagingOp::InternString { local_id, value } = op {
349            // Validate: only local IDs are allowed in staging (matching serial commit_strings)
350            assert!(
351                local_id.is_local(),
352                "non-local StringId {:?} in InternString op for file {:?}",
353                local_id,
354                plan.file_id,
355            );
356            // Validate: no duplicate local IDs (matching serial commit_strings)
357            assert!(
358                !remap.contains_key(local_id),
359                "duplicate local StringId {:?} in InternString op for file {:?}",
360                local_id,
361                plan.file_id,
362            );
363
364            if string_cursor >= str_slots.len() {
365                log::warn!(
366                    "string slot overflow in file {:?}: cursor={string_cursor}, slots={}, skipping remaining strings",
367                    plan.file_id,
368                    str_slots.len()
369                );
370                break;
371            }
372
373            // The global StringId for this string is the pre-allocated slot index.
374            #[allow(clippy::cast_possible_truncation)] // cursor is bounded by allocated slot count
375            let global_id = StringId::new(plan.string_range.start + string_cursor as u32);
376
377            // Write the string into the pre-allocated slot.
378            str_slots[string_cursor] = Some(Arc::from(value.as_str()));
379            rc_slots[string_cursor] = 1;
380
381            remap.insert(*local_id, global_id);
382            string_cursor += 1;
383        }
384    }
385
386    (remap, string_cursor)
387}
388
389/// Remap all `StringId` fields in a `NodeEntry` using a local→global table.
390///
391/// Required field (`name`) is always remapped if local.
392/// Optional fields (`signature`, `doc`, `qualified_name`, `visibility`)
393/// are remapped if present and local.
394fn remap_node_entry_string_ids(entry: &mut NodeEntry, remap: &HashMap<StringId, StringId>) {
395    remap_required_local(&mut entry.name, remap);
396    remap_option_local(&mut entry.signature, remap);
397    remap_option_local(&mut entry.doc, remap);
398    remap_option_local(&mut entry.qualified_name, remap);
399    remap_option_local(&mut entry.visibility, remap);
400}
401
402/// Remap all local `StringId` fields in an `EdgeKind`.
403///
404/// Uses the same exhaustive match as `remap_edge_kind_string_ids`, but
405/// only remaps local IDs (those with `LOCAL_TAG_BIT` set).
406#[allow(clippy::match_same_arms)]
407fn remap_edge_kind_local_string_ids(kind: &mut EdgeKind, remap: &HashMap<StringId, StringId>) {
408    match kind {
409        EdgeKind::Imports { alias, .. } => remap_option_local(alias, remap),
410        EdgeKind::Exports { alias, .. } => remap_option_local(alias, remap),
411        EdgeKind::TypeOf { name, .. } => remap_option_local(name, remap),
412        EdgeKind::TraitMethodBinding {
413            trait_name,
414            impl_type,
415            ..
416        } => {
417            remap_required_local(trait_name, remap);
418            remap_required_local(impl_type, remap);
419        }
420        EdgeKind::HttpRequest { url, .. } => remap_option_local(url, remap),
421        EdgeKind::GrpcCall { service, method } => {
422            remap_required_local(service, remap);
423            remap_required_local(method, remap);
424        }
425        EdgeKind::DbQuery { table, .. } => remap_option_local(table, remap),
426        EdgeKind::TableRead { table_name, schema } => {
427            remap_required_local(table_name, remap);
428            remap_option_local(schema, remap);
429        }
430        EdgeKind::TableWrite {
431            table_name, schema, ..
432        } => {
433            remap_required_local(table_name, remap);
434            remap_option_local(schema, remap);
435        }
436        EdgeKind::TriggeredBy {
437            trigger_name,
438            schema,
439        } => {
440            remap_required_local(trigger_name, remap);
441            remap_option_local(schema, remap);
442        }
443        EdgeKind::MessageQueue { protocol, topic } => {
444            if let MqProtocol::Other(s) = protocol {
445                remap_required_local(s, remap);
446            }
447            remap_option_local(topic, remap);
448        }
449        EdgeKind::WebSocket { event } => remap_option_local(event, remap),
450        EdgeKind::GraphQLOperation { operation } => remap_required_local(operation, remap),
451        EdgeKind::ProcessExec { command } => remap_required_local(command, remap),
452        EdgeKind::FileIpc { path_pattern } => remap_option_local(path_pattern, remap),
453        EdgeKind::ProtocolCall { protocol, metadata } => {
454            remap_required_local(protocol, remap);
455            remap_option_local(metadata, remap);
456        }
457        // Variants without StringId fields — exhaustive, no wildcard.
458        EdgeKind::Defines
459        | EdgeKind::Contains
460        | EdgeKind::Calls { .. }
461        | EdgeKind::References
462        | EdgeKind::Inherits
463        | EdgeKind::Implements
464        | EdgeKind::LifetimeConstraint { .. }
465        | EdgeKind::MacroExpansion { .. }
466        | EdgeKind::FfiCall { .. }
467        | EdgeKind::WebAssemblyCall
468        | EdgeKind::GenericBound
469        | EdgeKind::AnnotatedWith
470        | EdgeKind::AnnotationParam
471        | EdgeKind::LambdaCaptures
472        | EdgeKind::ModuleExports
473        | EdgeKind::ModuleRequires
474        | EdgeKind::ModuleOpens
475        | EdgeKind::ModuleProvides
476        | EdgeKind::TypeArgument
477        | EdgeKind::ExtensionReceiver
478        | EdgeKind::CompanionOf
479        | EdgeKind::SealedPermit => {}
480    }
481}
482
483/// Remap a required local `StringId` in place.
484///
485/// Panics if a local ID has no mapping, matching the serial
486/// `apply_string_remap` behavior that returned `UnmappedLocalStringId`.
487fn remap_required_local(id: &mut StringId, remap: &HashMap<StringId, StringId>) {
488    if id.is_local() {
489        let global = remap.get(id).unwrap_or_else(|| {
490            panic!("unmapped local StringId {id:?} — missing intern_string op?")
491        });
492        *id = *global;
493    }
494}
495
496/// Remap an optional local `StringId` in place.
497fn remap_option_local(opt: &mut Option<StringId>, remap: &HashMap<StringId, StringId>) {
498    if let Some(id) = opt
499        && id.is_local()
500    {
501        let global = remap.get(id).unwrap_or_else(|| {
502            panic!("unmapped local StringId {id:?} — missing intern_string op?")
503        });
504        *id = *global;
505    }
506}
507
508/// Write staged nodes into pre-allocated arena slots.
509///
510/// Returns `(remap, nodes_written)`.
511fn write_nodes(
512    ops: &[StagingOp],
513    plan: &FilePlan,
514    node_slots: &mut [Slot<NodeEntry>],
515    string_remap: &HashMap<StringId, StringId>,
516) -> (HashMap<NodeId, NodeId>, usize) {
517    let mut node_remap = HashMap::new();
518    let mut node_cursor = 0usize;
519
520    for op in ops {
521        if let StagingOp::AddNode {
522            entry, expected_id, ..
523        } = op
524        {
525            if node_cursor >= node_slots.len() {
526                log::warn!(
527                    "node slot overflow in file {:?}: cursor={node_cursor}, slots={}, skipping remaining nodes",
528                    plan.file_id,
529                    node_slots.len()
530                );
531                break;
532            }
533
534            let mut entry = entry.clone();
535
536            // Apply string remap to all StringId fields in the entry.
537            remap_node_entry_string_ids(&mut entry, string_remap);
538
539            // Set the file ID from the plan.
540            entry.file = plan.file_id;
541
542            // The actual NodeId is the pre-allocated slot index with generation 1.
543            #[allow(clippy::cast_possible_truncation)] // cursor is bounded by allocated slot count
544            let actual_index = plan.node_range.start + node_cursor as u32;
545            let actual_id = NodeId::new(actual_index, 1);
546
547            // Write into the pre-allocated slot.
548            node_slots[node_cursor] = Slot::new_occupied(1, entry);
549
550            if let Some(expected) = expected_id {
551                node_remap.insert(*expected, actual_id);
552            }
553
554            node_cursor += 1;
555        }
556    }
557
558    (node_remap, node_cursor)
559}
560
561/// Collect staged edges with remapped node IDs, string IDs, and pre-assigned
562/// sequence numbers.
563fn collect_edges(
564    ops: &[StagingOp],
565    plan: &FilePlan,
566    node_remap: &HashMap<NodeId, NodeId>,
567    string_remap: &HashMap<StringId, StringId>,
568) -> Vec<PendingEdge> {
569    let mut edges = Vec::new();
570
571    for op in ops {
572        if let StagingOp::AddEdge {
573            source,
574            target,
575            kind,
576            spans,
577            ..
578        } = op
579        {
580            let actual_source = node_remap.get(source).copied().unwrap_or(*source);
581            let actual_target = node_remap.get(target).copied().unwrap_or(*target);
582
583            // Clone and remap any local StringIds in the EdgeKind.
584            let mut remapped_kind = kind.clone();
585            remap_edge_kind_local_string_ids(&mut remapped_kind, string_remap);
586
587            edges.push(PendingEdge {
588                source: actual_source,
589                target: actual_target,
590                kind: remapped_kind,
591                file: plan.file_id,
592                spans: spans.clone(),
593            });
594        }
595    }
596
597    edges
598}
599
600/// Remap a required `StringId` using the dedup remap table.
601///
602/// If the ID is in the remap table, it is replaced with the canonical ID.
603/// Otherwise, it is left unchanged (identity mapping).
604#[allow(clippy::implicit_hasher)]
605pub fn remap_string_id(id: &mut StringId, remap: &HashMap<StringId, StringId>) {
606    if let Some(&canonical) = remap.get(id) {
607        *id = canonical;
608    }
609}
610
611/// Remap an optional `StringId` using the dedup remap table.
612#[allow(clippy::implicit_hasher)]
613pub fn remap_option_string_id(id: &mut Option<StringId>, remap: &HashMap<StringId, StringId>) {
614    if let Some(inner) = id {
615        remap_string_id(inner, remap);
616    }
617}
618
619/// Exhaustive remap of all `StringId` fields in an `EdgeKind`.
620///
621/// No wildcard arm — the compiler ensures completeness when new variants
622/// are added to `EdgeKind`.
623#[allow(clippy::match_same_arms, clippy::implicit_hasher)] // Arms are separated by category for documentation clarity
624pub fn remap_edge_kind_string_ids(kind: &mut EdgeKind, remap: &HashMap<StringId, StringId>) {
625    match kind {
626        // === Variants WITH StringId fields ===
627        EdgeKind::Imports { alias, .. } => remap_option_string_id(alias, remap),
628        EdgeKind::Exports { alias, .. } => remap_option_string_id(alias, remap),
629        EdgeKind::TypeOf { name, .. } => remap_option_string_id(name, remap),
630        EdgeKind::TraitMethodBinding {
631            trait_name,
632            impl_type,
633            ..
634        } => {
635            remap_string_id(trait_name, remap);
636            remap_string_id(impl_type, remap);
637        }
638        EdgeKind::HttpRequest { url, .. } => remap_option_string_id(url, remap),
639        EdgeKind::GrpcCall { service, method } => {
640            remap_string_id(service, remap);
641            remap_string_id(method, remap);
642        }
643        EdgeKind::DbQuery { table, .. } => remap_option_string_id(table, remap),
644        EdgeKind::TableRead { table_name, schema } => {
645            remap_string_id(table_name, remap);
646            remap_option_string_id(schema, remap);
647        }
648        EdgeKind::TableWrite {
649            table_name, schema, ..
650        } => {
651            remap_string_id(table_name, remap);
652            remap_option_string_id(schema, remap);
653        }
654        EdgeKind::TriggeredBy {
655            trigger_name,
656            schema,
657        } => {
658            remap_string_id(trigger_name, remap);
659            remap_option_string_id(schema, remap);
660        }
661        EdgeKind::MessageQueue { protocol, topic } => {
662            if let MqProtocol::Other(s) = protocol {
663                remap_string_id(s, remap);
664            }
665            remap_option_string_id(topic, remap);
666        }
667        EdgeKind::WebSocket { event } => remap_option_string_id(event, remap),
668        EdgeKind::GraphQLOperation { operation } => remap_string_id(operation, remap),
669        EdgeKind::ProcessExec { command } => remap_string_id(command, remap),
670        EdgeKind::FileIpc { path_pattern } => remap_option_string_id(path_pattern, remap),
671        EdgeKind::ProtocolCall { protocol, metadata } => {
672            remap_string_id(protocol, remap);
673            remap_option_string_id(metadata, remap);
674        }
675        // === Variants WITHOUT StringId fields — exhaustive, no wildcard ===
676        EdgeKind::Defines
677        | EdgeKind::Contains
678        | EdgeKind::Calls { .. }
679        | EdgeKind::References
680        | EdgeKind::Inherits
681        | EdgeKind::Implements
682        | EdgeKind::LifetimeConstraint { .. }
683        | EdgeKind::MacroExpansion { .. }
684        | EdgeKind::FfiCall { .. }
685        | EdgeKind::WebAssemblyCall
686        | EdgeKind::GenericBound
687        | EdgeKind::AnnotatedWith
688        | EdgeKind::AnnotationParam
689        | EdgeKind::LambdaCaptures
690        | EdgeKind::ModuleExports
691        | EdgeKind::ModuleRequires
692        | EdgeKind::ModuleOpens
693        | EdgeKind::ModuleProvides
694        | EdgeKind::TypeArgument
695        | EdgeKind::ExtensionReceiver
696        | EdgeKind::CompanionOf
697        | EdgeKind::SealedPermit => {}
698    }
699}
700
701// === Phase 4: Post-chunk Finalization ===
702
703/// Apply global string dedup remap to all `StringId` fields in a `NodeEntry`.
704///
705/// This is the Phase 4 counterpart to `remap_node_entry_string_ids` (Phase 3).
706/// Phase 3 remaps local→global; Phase 4 remaps duplicate global→canonical global.
707#[allow(clippy::implicit_hasher)]
708pub fn remap_node_entry_global(entry: &mut NodeEntry, remap: &HashMap<StringId, StringId>) {
709    remap_string_id(&mut entry.name, remap);
710    remap_option_string_id(&mut entry.signature, remap);
711    remap_option_string_id(&mut entry.doc, remap);
712    remap_option_string_id(&mut entry.qualified_name, remap);
713    remap_option_string_id(&mut entry.visibility, remap);
714}
715
716/// Apply global string dedup remap to all nodes in the arena and all pending edges.
717///
718/// This is Phase 4b of the parallel commit pipeline. After `build_dedup_table()`
719/// produces a remap table, this function applies it to every `StringId` in:
720/// - All `NodeEntry` fields in the arena
721/// - All `EdgeKind` fields in the pending edges
722#[allow(clippy::implicit_hasher)]
723pub fn phase4_apply_global_remap(
724    arena: &mut NodeArena,
725    all_edges: &mut [Vec<PendingEdge>],
726    remap: &HashMap<StringId, StringId>,
727) {
728    if remap.is_empty() {
729        return;
730    }
731
732    // Remap all nodes
733    for (_id, entry) in arena.iter_mut() {
734        remap_node_entry_global(entry, remap);
735    }
736
737    // Remap all edges
738    for file_edges in all_edges.iter_mut() {
739        for edge in file_edges.iter_mut() {
740            remap_edge_kind_string_ids(&mut edge.kind, remap);
741        }
742    }
743}
744
745/// Convert per-file `PendingEdge` collections to per-file `DeltaEdge` collections
746/// with monotonically increasing sequence numbers.
747///
748/// The sequence numbers are assigned file-by-file, edge-by-edge, starting from
749/// `seq_start`. This produces the deterministic ordering required by
750/// `BidirectionalEdgeStore::add_edges_bulk_ordered()`.
751#[must_use]
752pub fn pending_edges_to_delta(
753    per_file_edges: &[Vec<PendingEdge>],
754    seq_start: u64,
755) -> (Vec<Vec<DeltaEdge>>, u64) {
756    let mut seq = seq_start;
757    let mut result = Vec::with_capacity(per_file_edges.len());
758
759    for file_edges in per_file_edges {
760        let mut delta_vec = Vec::with_capacity(file_edges.len());
761        for edge in file_edges {
762            delta_vec.push(DeltaEdge::with_spans(
763                edge.source,
764                edge.target,
765                edge.kind.clone(),
766                seq,
767                DeltaOp::Add,
768                edge.file,
769                edge.spans.clone(),
770            ));
771            seq += 1;
772        }
773        result.push(delta_vec);
774    }
775
776    (result, seq)
777}
778
779#[cfg(test)]
780mod tests {
781    use super::*;
782
783    #[test]
784    fn test_compute_commit_plan_basic() {
785        let file_ids = vec![FileId::new(0), FileId::new(1), FileId::new(2)];
786        let node_counts = vec![3, 0, 5];
787        let string_counts = vec![2, 1, 3];
788        let edge_counts = vec![4, 0, 6];
789
790        let plan = compute_commit_plan(
791            &node_counts,
792            &string_counts,
793            &edge_counts,
794            &file_ids,
795            0,
796            1, // string_offset=1 for sentinel
797        );
798
799        assert_eq!(plan.total_nodes, 8);
800        assert_eq!(plan.total_strings, 6);
801        assert_eq!(plan.total_edges, 10);
802
803        // File 0: nodes [0..3), strings [1..3)
804        assert_eq!(plan.file_plans[0].node_range, 0..3);
805        assert_eq!(plan.file_plans[0].string_range, 1..3);
806
807        // File 1: nodes [3..3), strings [3..4) — empty nodes
808        assert_eq!(plan.file_plans[1].node_range, 3..3);
809        assert_eq!(plan.file_plans[1].string_range, 3..4);
810
811        // File 2: nodes [3..8), strings [4..7)
812        assert_eq!(plan.file_plans[2].node_range, 3..8);
813        assert_eq!(plan.file_plans[2].string_range, 4..7);
814    }
815
816    #[test]
817    fn test_compute_commit_plan_with_offsets() {
818        let file_ids = vec![FileId::new(5)];
819        let plan = compute_commit_plan(&[10], &[5], &[7], &file_ids, 100, 50);
820        assert_eq!(plan.file_plans[0].node_range, 100..110);
821        assert_eq!(plan.file_plans[0].string_range, 50..55);
822        assert_eq!(plan.total_nodes, 10);
823        assert_eq!(plan.total_strings, 5);
824        assert_eq!(plan.total_edges, 7);
825    }
826
827    #[test]
828    fn test_compute_commit_plan_empty() {
829        let plan = compute_commit_plan(&[], &[], &[], &[], 0, 1);
830        assert_eq!(plan.total_nodes, 0);
831        assert_eq!(plan.total_strings, 0);
832        assert_eq!(plan.total_edges, 0);
833        assert!(plan.file_plans.is_empty());
834    }
835
836    #[test]
837    fn test_remap_string_id_basic() {
838        let mut remap = HashMap::new();
839        remap.insert(StringId::new(1), StringId::new(100));
840
841        let mut id = StringId::new(1);
842        remap_string_id(&mut id, &remap);
843        assert_eq!(id, StringId::new(100));
844    }
845
846    #[test]
847    fn test_remap_string_id_not_in_remap() {
848        let remap = HashMap::new();
849        let mut id = StringId::new(42);
850        remap_string_id(&mut id, &remap);
851        assert_eq!(id, StringId::new(42)); // unchanged
852    }
853
854    #[test]
855    fn test_remap_option_string_id() {
856        let mut remap = HashMap::new();
857        remap.insert(StringId::new(5), StringId::new(50));
858
859        let mut some_id = Some(StringId::new(5));
860        remap_option_string_id(&mut some_id, &remap);
861        assert_eq!(some_id, Some(StringId::new(50)));
862
863        let mut none_id: Option<StringId> = None;
864        remap_option_string_id(&mut none_id, &remap);
865        assert_eq!(none_id, None);
866    }
867
868    #[test]
869    fn test_remap_edge_kind_imports() {
870        let mut remap = HashMap::new();
871        remap.insert(StringId::new(1), StringId::new(100));
872
873        let mut kind = EdgeKind::Imports {
874            alias: Some(StringId::new(1)),
875            is_wildcard: false,
876        };
877        remap_edge_kind_string_ids(&mut kind, &remap);
878        assert!(
879            matches!(kind, EdgeKind::Imports { alias: Some(id), .. } if id == StringId::new(100))
880        );
881    }
882
883    #[test]
884    fn test_remap_edge_kind_trait_method_binding() {
885        let mut remap = HashMap::new();
886        remap.insert(StringId::new(1), StringId::new(100));
887        remap.insert(StringId::new(2), StringId::new(200));
888
889        let mut kind = EdgeKind::TraitMethodBinding {
890            trait_name: StringId::new(1),
891            impl_type: StringId::new(2),
892            is_ambiguous: false,
893        };
894        remap_edge_kind_string_ids(&mut kind, &remap);
895        assert!(
896            matches!(kind, EdgeKind::TraitMethodBinding { trait_name, impl_type, .. }
897                if trait_name == StringId::new(100) && impl_type == StringId::new(200))
898        );
899    }
900
901    #[test]
902    fn test_remap_edge_kind_no_op_variants() {
903        let remap = HashMap::new();
904
905        // Defines — no StringId fields
906        let mut kind = EdgeKind::Defines;
907        remap_edge_kind_string_ids(&mut kind, &remap);
908        assert!(matches!(kind, EdgeKind::Defines));
909
910        // Calls — no StringId fields
911        let mut kind = EdgeKind::Calls {
912            argument_count: 3,
913            is_async: true,
914        };
915        remap_edge_kind_string_ids(&mut kind, &remap);
916        assert!(matches!(
917            kind,
918            EdgeKind::Calls {
919                argument_count: 3,
920                is_async: true,
921            }
922        ));
923    }
924
925    fn placeholder_entry() -> NodeEntry {
926        use crate::graph::unified::node::NodeKind;
927        NodeEntry::new(NodeKind::Function, StringId::new(0), FileId::new(0))
928    }
929
930    #[test]
931    fn test_phase2_assign_ranges_basic() {
932        use super::super::staging::StagingGraph;
933
934        // Create 2 staging graphs with known counts
935        let mut sg0 = StagingGraph::new();
936        let mut sg1 = StagingGraph::new();
937
938        // sg0: 2 nodes, 1 string, 1 edge
939        let entry0 = placeholder_entry();
940        let n0 = sg0.add_node(entry0.clone());
941        let n1 = sg0.add_node(entry0.clone());
942        sg0.intern_string(StringId::new_local(0), "hello".into());
943        sg0.add_edge(
944            n0,
945            n1,
946            EdgeKind::Calls {
947                argument_count: 0,
948                is_async: false,
949            },
950            FileId::new(0),
951        );
952
953        // sg1: 1 node, 2 strings, 0 edges
954        sg1.add_node(entry0);
955        sg1.intern_string(StringId::new_local(0), "world".into());
956        sg1.intern_string(StringId::new_local(1), "foo".into());
957
958        let file_ids = vec![FileId::new(10), FileId::new(11)];
959        let offsets = GlobalOffsets {
960            node_offset: 5,
961            string_offset: 3,
962        };
963
964        let plan = phase2_assign_ranges(&[&sg0, &sg1], &file_ids, &offsets);
965
966        // sg0: 2 nodes, 1 string, 1 edge
967        assert_eq!(plan.file_plans[0].node_range, 5..7);
968        assert_eq!(plan.file_plans[0].string_range, 3..4);
969
970        // sg1: 1 node, 2 strings, 0 edges
971        assert_eq!(plan.file_plans[1].node_range, 7..8);
972        assert_eq!(plan.file_plans[1].string_range, 4..6);
973
974        assert_eq!(plan.total_nodes, 3);
975        assert_eq!(plan.total_strings, 3);
976        assert_eq!(plan.total_edges, 1);
977    }
978
979    #[test]
980    fn test_phase3_parallel_commit_basic() {
981        use super::super::staging::StagingGraph;
982        use crate::graph::unified::node::NodeKind;
983        use crate::graph::unified::storage::NodeArena;
984        use crate::graph::unified::storage::interner::StringInterner;
985
986        // Create a staging graph with 2 nodes, 1 string, 1 edge
987        let mut sg = StagingGraph::new();
988        let local_name = StringId::new_local(0);
989        sg.intern_string(local_name, "my_func".into());
990
991        let entry = NodeEntry::new(NodeKind::Function, local_name, FileId::new(0));
992        let n0 = sg.add_node(entry.clone());
993
994        let entry2 = NodeEntry::new(NodeKind::Variable, local_name, FileId::new(0));
995        let n1 = sg.add_node(entry2);
996
997        sg.add_edge(
998            n0,
999            n1,
1000            EdgeKind::Calls {
1001                argument_count: 0,
1002                is_async: false,
1003            },
1004            FileId::new(0),
1005        );
1006
1007        let file_ids = vec![FileId::new(5)];
1008
1009        // Pre-allocate with non-zero offsets to verify remap works.
1010        let mut arena = NodeArena::new();
1011        let mut interner = StringInterner::new();
1012
1013        // Pre-fill some slots so our file starts at a non-zero offset.
1014        arena.alloc_range(10, &placeholder_entry()).unwrap();
1015        let string_start = interner.alloc_range(1).unwrap();
1016        assert_eq!(string_start, 1); // past sentinel
1017
1018        let offsets = GlobalOffsets {
1019            node_offset: 10, // file's nodes start at index 10
1020            string_offset: string_start,
1021        };
1022        let plan = phase2_assign_ranges(&[&sg], &file_ids, &offsets);
1023        assert_eq!(plan.file_plans[0].node_range, 10..12);
1024
1025        // Pre-allocate the actual ranges for Phase 3.
1026        arena
1027            .alloc_range(plan.total_nodes, &placeholder_entry())
1028            .unwrap();
1029        interner.alloc_range(plan.total_strings).unwrap();
1030
1031        // Phase 3
1032        let result = phase3_parallel_commit(&plan, &[&sg], &mut arena, &mut interner);
1033
1034        // Verify written counts
1035        assert_eq!(result.total_nodes_written, 2);
1036        assert_eq!(result.total_strings_written, 1);
1037
1038        // Verify strings were written
1039        let global_name = StringId::new(string_start);
1040        assert_eq!(&*interner.resolve(global_name).unwrap(), "my_func");
1041
1042        // Verify 1 file, 1 edge
1043        assert_eq!(result.per_file_edges.len(), 1);
1044        assert_eq!(result.per_file_edges[0].len(), 1);
1045
1046        // Verify edge was remapped to global IDs (node_offset=10)
1047        let edge = &result.per_file_edges[0][0];
1048        assert_eq!(edge.file, FileId::new(5));
1049        assert_eq!(edge.source, NodeId::new(10, 1)); // first node at slot 10
1050        assert_eq!(edge.target, NodeId::new(11, 1)); // second node at slot 11
1051    }
1052
1053    #[test]
1054    fn test_phase3_parallel_commit_empty() {
1055        use crate::graph::unified::storage::NodeArena;
1056        use crate::graph::unified::storage::interner::StringInterner;
1057
1058        let mut arena = NodeArena::new();
1059        let mut interner = StringInterner::new();
1060
1061        let plan = ChunkCommitPlan {
1062            file_plans: vec![],
1063            total_nodes: 0,
1064            total_strings: 0,
1065            total_edges: 0,
1066        };
1067
1068        let result = phase3_parallel_commit(&plan, &[], &mut arena, &mut interner);
1069        assert!(result.per_file_edges.is_empty());
1070        assert_eq!(result.total_nodes_written, 0);
1071        assert_eq!(result.total_strings_written, 0);
1072    }
1073
1074    #[test]
1075    fn test_commit_single_file_string_remap() {
1076        use super::super::staging::StagingGraph;
1077        use crate::graph::unified::node::NodeKind;
1078
1079        let mut sg = StagingGraph::new();
1080        let local_0 = StringId::new_local(0);
1081        let local_1 = StringId::new_local(1);
1082        sg.intern_string(local_0, "alpha".into());
1083        sg.intern_string(local_1, "beta".into());
1084
1085        let mut entry = NodeEntry::new(NodeKind::Function, local_0, FileId::new(0));
1086        entry.signature = Some(local_1);
1087        sg.add_node(entry);
1088
1089        let plan = FilePlan {
1090            parsed_index: 0,
1091            file_id: FileId::new(42),
1092            node_range: 10..11,
1093            string_range: 20..22,
1094        };
1095
1096        let mut node_slots = vec![Slot::new_occupied(1, placeholder_entry())];
1097        let mut str_slots: Vec<Option<Arc<str>>> = vec![None, None];
1098        let mut rc_slots: Vec<u32> = vec![0, 0];
1099
1100        let result = commit_single_file(&sg, &plan, &mut node_slots, &mut str_slots, &mut rc_slots);
1101
1102        // Strings written
1103        assert_eq!(str_slots[0].as_deref(), Some("alpha"));
1104        assert_eq!(str_slots[1].as_deref(), Some("beta"));
1105        assert_eq!(rc_slots[0], 1);
1106        assert_eq!(rc_slots[1], 1);
1107        assert_eq!(result.strings_written, 2);
1108
1109        // Node entry has remapped StringIds
1110        if let crate::graph::unified::storage::SlotState::Occupied(entry) = node_slots[0].state() {
1111            assert_eq!(entry.name, StringId::new(20)); // global slot 20
1112            assert_eq!(entry.signature, Some(StringId::new(21))); // global slot 21
1113            assert_eq!(entry.file, FileId::new(42));
1114        } else {
1115            panic!("Expected occupied slot");
1116        }
1117        assert_eq!(result.nodes_written, 1);
1118
1119        // No edges
1120        assert!(result.edges.is_empty());
1121    }
1122
1123    #[test]
1124    fn test_remap_edge_kind_message_queue_other() {
1125        let mut remap = HashMap::new();
1126        remap.insert(StringId::new(10), StringId::new(110));
1127        remap.insert(StringId::new(20), StringId::new(220));
1128
1129        let mut kind = EdgeKind::MessageQueue {
1130            protocol: MqProtocol::Other(StringId::new(10)),
1131            topic: Some(StringId::new(20)),
1132        };
1133        remap_edge_kind_string_ids(&mut kind, &remap);
1134        assert!(matches!(
1135            kind,
1136            EdgeKind::MessageQueue {
1137                protocol: MqProtocol::Other(proto),
1138                topic: Some(topic),
1139            } if proto == StringId::new(110) && topic == StringId::new(220)
1140        ));
1141    }
1142
1143    // === Phase 4 tests ===
1144
1145    #[test]
1146    fn test_phase4_apply_global_remap_basic() {
1147        use crate::graph::unified::node::NodeKind;
1148        use crate::graph::unified::storage::NodeArena;
1149
1150        let mut arena = NodeArena::new();
1151
1152        // Allocate two nodes with duplicate string IDs (2 and 3 are dupes of 1)
1153        let entry1 = NodeEntry::new(NodeKind::Function, StringId::new(1), FileId::new(0));
1154        let mut entry2 = NodeEntry::new(NodeKind::Variable, StringId::new(2), FileId::new(0));
1155        entry2.signature = Some(StringId::new(3));
1156
1157        arena.alloc(entry1).unwrap();
1158        arena.alloc(entry2).unwrap();
1159
1160        // Edges with string IDs that need remapping
1161        let mut all_edges = vec![vec![PendingEdge {
1162            source: NodeId::new(0, 1),
1163            target: NodeId::new(1, 1),
1164            kind: EdgeKind::Imports {
1165                alias: Some(StringId::new(3)),
1166                is_wildcard: false,
1167            },
1168            file: FileId::new(0),
1169            spans: vec![],
1170        }]];
1171
1172        // Dedup remap: 2→1, 3→1
1173        let mut remap = HashMap::new();
1174        remap.insert(StringId::new(2), StringId::new(1));
1175        remap.insert(StringId::new(3), StringId::new(1));
1176
1177        phase4_apply_global_remap(&mut arena, &mut all_edges, &remap);
1178
1179        // Check that node 1's name was remapped from 2→1
1180        let (_, entry) = arena.iter().nth(1).unwrap();
1181        assert_eq!(entry.name, StringId::new(1));
1182        assert_eq!(entry.signature, Some(StringId::new(1)));
1183
1184        // Check that edge's alias was remapped from 3→1
1185        if let EdgeKind::Imports { alias, .. } = &all_edges[0][0].kind {
1186            assert_eq!(*alias, Some(StringId::new(1)));
1187        } else {
1188            panic!("Expected Imports edge");
1189        }
1190    }
1191
1192    #[test]
1193    fn test_phase4_apply_global_remap_empty() {
1194        use crate::graph::unified::storage::NodeArena;
1195
1196        let mut arena = NodeArena::new();
1197        let mut edges: Vec<Vec<PendingEdge>> = vec![];
1198        let remap = HashMap::new();
1199
1200        // Should be a no-op
1201        phase4_apply_global_remap(&mut arena, &mut edges, &remap);
1202    }
1203
1204    #[test]
1205    fn test_pending_edges_to_delta_basic() {
1206        let edges = vec![
1207            vec![
1208                PendingEdge {
1209                    source: NodeId::new(0, 1),
1210                    target: NodeId::new(1, 1),
1211                    kind: EdgeKind::Calls {
1212                        argument_count: 0,
1213                        is_async: false,
1214                    },
1215                    file: FileId::new(0),
1216                    spans: vec![],
1217                },
1218                PendingEdge {
1219                    source: NodeId::new(1, 1),
1220                    target: NodeId::new(2, 1),
1221                    kind: EdgeKind::References,
1222                    file: FileId::new(0),
1223                    spans: vec![],
1224                },
1225            ],
1226            vec![PendingEdge {
1227                source: NodeId::new(3, 1),
1228                target: NodeId::new(4, 1),
1229                kind: EdgeKind::Defines,
1230                file: FileId::new(1),
1231                spans: vec![],
1232            }],
1233        ];
1234
1235        let (deltas, final_seq) = pending_edges_to_delta(&edges, 100);
1236
1237        assert_eq!(deltas.len(), 2);
1238        assert_eq!(deltas[0].len(), 2);
1239        assert_eq!(deltas[1].len(), 1);
1240        assert_eq!(final_seq, 103);
1241
1242        // Check sequence numbers are monotonic
1243        assert_eq!(deltas[0][0].seq, 100);
1244        assert_eq!(deltas[0][1].seq, 101);
1245        assert_eq!(deltas[1][0].seq, 102);
1246
1247        // Check all are Add operations
1248        assert!(matches!(deltas[0][0].op, DeltaOp::Add));
1249        assert!(matches!(deltas[1][0].op, DeltaOp::Add));
1250    }
1251
1252    #[test]
1253    fn test_pending_edges_to_delta_empty() {
1254        let edges: Vec<Vec<PendingEdge>> = vec![];
1255        let (deltas, final_seq) = pending_edges_to_delta(&edges, 0);
1256        assert!(deltas.is_empty());
1257        assert_eq!(final_seq, 0);
1258    }
1259}