Skip to main content

sqry_core/graph/unified/build/
entrypoint.rs

1//! Build entrypoint for unified graph.
2//!
3//! This module provides the top-level API for building a unified graph from source files.
4//! It orchestrates file discovery and delegates to the 5-pass build pipeline.
5
6use std::fs;
7use std::path::{Path, PathBuf};
8
9use anyhow::{Context, Result};
10use ignore::WalkBuilder;
11use rayon::prelude::*;
12
13use crate::graph::GraphBuilderError;
14use crate::graph::unified::analysis::LabelBudgetConfig;
15use crate::graph::unified::analysis::ReachabilityStrategy;
16use crate::graph::unified::build::StagingGraph;
17use crate::graph::unified::build::parallel_commit::{
18    GlobalOffsets, pending_edges_to_delta, phase2_assign_ranges, phase3_parallel_commit,
19    phase4_apply_global_remap,
20};
21use crate::graph::unified::build::pass3_intra::PendingEdge;
22use crate::graph::unified::build::progress::GraphBuildProgressTracker;
23use crate::graph::unified::concurrent::CodeGraph;
24use crate::plugin::PluginManager;
25use crate::plugin::error::ParseError;
26use crate::progress::{SharedReporter, no_op_reporter};
27use crate::project::path_utils::normalize_path_components;
28
29/// Result of a successful build-and-persist operation.
30///
31/// Contains all metadata about the completed graph build, including
32/// canonical (deduplicated) edge counts, file counts by language, and
33/// provenance information.
34#[derive(Debug, Clone)]
35pub struct BuildResult {
36    /// Number of nodes in the graph.
37    pub node_count: usize,
38    /// Number of deduplicated edges (from analysis CSR, after merge/compaction).
39    /// This is the canonical edge count.
40    pub edge_count: usize,
41    /// Number of raw edges in the graph (CSR + delta buffer, before dedup).
42    /// Available for diagnostics; NOT the canonical count.
43    pub raw_edge_count: usize,
44    /// Number of indexed files, by language (e.g., `{"rust": 150, "python": 30}`).
45    ///
46    /// Counts files that entered the graph indexing pipeline and were
47    /// successfully parsed by a plugin. Not the same as "scanned files"
48    /// (all files walked by the directory scanner).
49    pub file_count: std::collections::HashMap<String, usize>,
50    /// Total number of indexed files.
51    pub total_files: usize,
52    /// ISO 8601 timestamp when the build completed.
53    pub built_at: String,
54    /// Root path that was indexed.
55    pub root_path: String,
56    /// Number of threads used for parallel file processing.
57    ///
58    /// Reflects the effective thread count from the rayon pool, not the
59    /// CLI-requested value. Useful for build diagnostics.
60    pub thread_count: usize,
61
62    /// Reachability strategy used by each persisted analysis kind.
63    pub analysis_strategies: Vec<AnalysisStrategySummary>,
64}
65
66/// Persisted analysis strategy summary for one edge kind.
67#[derive(Debug, Clone, PartialEq, Eq)]
68pub struct AnalysisStrategySummary {
69    /// Stable edge-kind label (`calls`, `imports`, `references`, `inherits`).
70    pub edge_kind: &'static str,
71    /// Reachability strategy persisted for the edge kind.
72    pub strategy: ReachabilityStrategy,
73}
74
75/// Default staging memory limit per batch: 512 MB.
76///
77/// When the accumulated `StagingGraph` memory exceeds this threshold, the
78/// current batch is committed before parsing the next chunk. Override via
79/// `SQRY_STAGING_MEMORY_LIMIT_MB` or [`BuildConfig::staging_memory_limit`].
80const DEFAULT_STAGING_MEMORY_LIMIT: usize = 512 * 1024 * 1024;
81
82/// Configuration for building the unified graph.
83#[derive(Debug, Clone)]
84pub struct BuildConfig {
85    /// Maximum directory depth to traverse (None = unlimited).
86    pub max_depth: Option<usize>,
87
88    /// Follow symbolic links.
89    pub follow_links: bool,
90
91    /// Include hidden files and directories.
92    pub include_hidden: bool,
93
94    /// Number of threads for parallel building (None = use default based on CPU count).
95    pub num_threads: Option<usize>,
96
97    /// Maximum staging memory (bytes) to accumulate before committing a batch.
98    ///
99    /// Controls the parse-commit chunking watermark. When the sum of all
100    /// in-flight `StagingGraph` buffers exceeds this limit, the batch is
101    /// committed to the graph before the next chunk of files is parsed.
102    ///
103    /// Defaults to 512 MB. Override via
104    /// `SQRY_STAGING_MEMORY_LIMIT_MB` environment variable.
105    pub staging_memory_limit: usize,
106
107    /// Configuration for the 2-hop label budget used during analysis.
108    ///
109    /// Controls the maximum number of intervals per edge kind and what
110    /// to do when the budget is exceeded (fail or degrade to BFS).
111    pub label_budget: LabelBudgetConfig,
112}
113
114impl Default for BuildConfig {
115    fn default() -> Self {
116        let limit = std::env::var("SQRY_STAGING_MEMORY_LIMIT_MB")
117            .ok()
118            .and_then(|v| v.parse::<usize>().ok())
119            .map_or(DEFAULT_STAGING_MEMORY_LIMIT, |mb| mb * 1024 * 1024);
120
121        let label_budget = LabelBudgetConfig {
122            budget_per_kind: 15_000_000,
123            on_exceeded: crate::graph::unified::analysis::BudgetExceededPolicy::Degrade,
124            density_gate_threshold: 64,
125            skip_labels: false,
126        };
127
128        Self {
129            max_depth: None,
130            follow_links: false,
131            include_hidden: false,
132            num_threads: None,
133            staging_memory_limit: limit,
134            label_budget,
135        }
136    }
137}
138
139/// Create a rayon thread pool sized by `BuildConfig::num_threads`.
140fn create_thread_pool(config: &BuildConfig) -> Result<rayon::ThreadPool> {
141    let mut builder = rayon::ThreadPoolBuilder::new();
142    if let Some(n) = config.num_threads {
143        builder = builder.num_threads(n);
144    }
145    builder
146        .build()
147        .context("Failed to create rayon thread pool for parallel indexing")
148}
149
150/// Compute chunk boundaries for memory-bounded parallel parse batches.
151///
152/// Splits `files` into non-overlapping ranges where each chunk's estimated
153/// staging memory stays within `memory_limit`. Uses source file size as a
154/// proxy for staging buffer size (multiplied by an expansion factor to
155/// account for AST node/edge/string overhead).
156///
157/// Returns at least one chunk even if the first file alone exceeds the limit.
158fn compute_parse_chunks(
159    files: &[PathBuf],
160    _pool: &rayon::ThreadPool,
161    _plugins: &PluginManager,
162    memory_limit: usize,
163) -> Vec<std::ops::Range<usize>> {
164    // Expansion factor: staging buffers are typically 2-8x the source file
165    // size due to AST nodes, edges, and interned strings. Use 4x as a
166    // conservative middle ground.
167    const EXPANSION_FACTOR: usize = 4;
168
169    let mut chunks = Vec::new();
170    let mut chunk_start = 0;
171    let mut chunk_estimate = 0usize;
172
173    for (i, path) in files.iter().enumerate() {
174        #[allow(clippy::cast_possible_truncation)] // File sizes always fit usize on 32/64-bit.
175        let file_size = std::fs::metadata(path)
176            .map(|m| m.len() as usize)
177            .unwrap_or(0);
178        let estimated_staging = file_size * EXPANSION_FACTOR;
179
180        // If adding this file would exceed the limit and we already have
181        // files in the chunk, finalize the current chunk first.
182        if chunk_estimate + estimated_staging > memory_limit && i > chunk_start {
183            chunks.push(chunk_start..i);
184            chunk_start = i;
185            chunk_estimate = 0;
186        }
187        chunk_estimate += estimated_staging;
188    }
189
190    // Final chunk (always push — handles single-chunk and trailing files)
191    if chunk_start < files.len() {
192        chunks.push(chunk_start..files.len());
193    }
194
195    if chunks.len() > 1 {
196        log::info!(
197            "Memory-bounded chunking: {} batches for {} files (limit: {} MB)",
198            chunks.len(),
199            files.len(),
200            memory_limit / (1024 * 1024),
201        );
202    }
203
204    chunks
205}
206
207/// Phase name for file processing during graph build.
208pub const GRAPH_FILE_PROCESSING_PHASE: &str = "File processing";
209
210/// Build a unified graph from source files.
211///
212/// This function:
213/// 1. Walks the file tree starting at `root`
214/// 2. For each file, extracts symbols using the appropriate language plugin
215/// 3. Runs the 5-pass build pipeline to populate the graph
216/// 4. Returns the completed `CodeGraph`
217///
218/// # Arguments
219///
220/// * `root` - Root directory to scan for source files
221/// * `plugins` - Plugin manager for language-specific extraction
222/// * `config` - Build configuration
223///
224/// # Returns
225///
226/// A `CodeGraph` containing the populated graph.
227///
228/// # Errors
229///
230/// Returns an error if:
231/// - The root path does not exist
232/// - No graph builders are registered
233/// - All eligible files fail to build (per-file failures are logged and skipped)
234///
235/// # Example
236///
237/// ```ignore
238/// use sqry_core::graph::unified::build::{build_unified_graph, BuildConfig};
239/// use sqry_core::plugin::PluginManager;
240/// use std::path::Path;
241///
242/// let plugins = sqry_plugin_registry::create_plugin_manager();
243/// let config = BuildConfig::default();
244/// let graph = build_unified_graph(Path::new("src"), &plugins, &config)?;
245/// println!("Created graph with {} nodes", graph.node_count());
246/// ```
247pub fn build_unified_graph(
248    root: &Path,
249    plugins: &PluginManager,
250    config: &BuildConfig,
251) -> Result<CodeGraph> {
252    let (graph, _effective_threads) =
253        build_unified_graph_inner(root, plugins, config, no_op_reporter())?;
254    Ok(graph)
255}
256
257/// Build a unified graph from source files with progress reporting.
258///
259/// This is the same as [`build_unified_graph`] but accepts a progress reporter
260/// for tracking build progress.
261///
262/// # Arguments
263///
264/// * `root` - Root directory to scan for source files
265/// * `plugins` - Plugin manager for language-specific extraction
266/// * `config` - Build configuration
267/// * `progress` - Progress reporter for build status updates
268///
269/// # Returns
270///
271/// A `CodeGraph` containing the populated graph.
272///
273/// # Errors
274///
275/// Returns an error if the path is missing, no graph builders are registered,
276/// or all eligible files fail to build.
277pub fn build_unified_graph_with_progress(
278    root: &Path,
279    plugins: &PluginManager,
280    config: &BuildConfig,
281    progress: SharedReporter,
282) -> Result<CodeGraph> {
283    let (graph, _effective_threads) = build_unified_graph_inner(root, plugins, config, progress)?;
284    Ok(graph)
285}
286
287/// Internal implementation that returns the effective thread count alongside the graph.
288///
289/// Used by [`build_and_persist_graph_with_progress`] to propagate the thread count
290/// into `BuildResult` without exposing it in the public API.
291#[allow(clippy::too_many_lines)] // Complex 5-pass build pipeline requires sequential flow
292fn build_unified_graph_inner(
293    root: &Path,
294    plugins: &PluginManager,
295    config: &BuildConfig,
296    progress: SharedReporter,
297) -> Result<(CodeGraph, usize)> {
298    if !root.exists() {
299        anyhow::bail!("Path {} does not exist", root.display());
300    }
301
302    log::info!(
303        "Building unified graph from source files in {}",
304        root.display()
305    );
306
307    let has_graph_builders = plugins
308        .plugins()
309        .iter()
310        .any(|plugin| plugin.graph_builder().is_some());
311    if !has_graph_builders {
312        anyhow::bail!("No graph builders registered – cannot build code graph");
313    }
314
315    // Create progress tracker for this build
316    let tracker = GraphBuildProgressTracker::new(progress);
317
318    // 1. Find source files
319    let mut files = find_source_files(root, config);
320    sort_files_for_build(root, &mut files);
321
322    // 2. Create the unified graph
323    let mut graph = CodeGraph::new();
324
325    // 3. Create scoped thread pool for parallel parse
326    let pool = create_thread_pool(config)?;
327    let effective_threads = pool.current_num_threads();
328    log::info!("Parallel indexing: using {effective_threads} threads");
329
330    // Chunked parallel-parse / parallel-commit pipeline.
331    //
332    // Files are processed in memory-bounded batches (chunks). Each chunk:
333    //   Phase 1: Parse files in parallel (rayon thread pool)
334    //   Phase 2: Count + prefix-sum range assignment
335    //   Phase 3: Parallel commit into disjoint pre-allocated arena/interner ranges
336    //   Phase 4: After ALL chunks — string dedup, global remap, index build, edge bulk insert
337    //
338    // The batch boundary is determined by `staging_memory_limit`: once the
339    // accumulated staging buffer size exceeds the watermark, the current
340    // batch is committed before more files are parsed. This prevents OOM
341    // on large repositories where holding all StagingGraphs simultaneously
342    // would exhaust available RAM.
343    let total_files = files.len();
344    tracker.start_phase(
345        1,
346        "Chunked structural indexing (parse -> range-plan -> semantic commit)",
347        total_files,
348    );
349
350    let (mut succeeded, mut parse_errors, mut skipped) = (0usize, 0usize, 0usize);
351    let mut total_staging_bytes = 0usize;
352    let mut peak_chunk_staging_bytes = 0usize;
353    let mut max_file_staging_bytes = 0usize;
354
355    // Global offsets track running positions across chunks.
356    // For a fresh graph: node arena starts at 0 slots, string interner at 1 (sentinel).
357    let initial_string_offset = graph.strings_mut().alloc_range(0).unwrap_or(1);
358    let mut offsets = GlobalOffsets {
359        node_offset: u32::try_from(graph.nodes().slot_count()).unwrap_or(0),
360        string_offset: initial_string_offset,
361    };
362    // Collect all edges across chunks for Phase 4 bulk insert.
363    let mut all_edges: Vec<Vec<PendingEdge>> = Vec::new();
364
365    let chunks = compute_parse_chunks(&files, &pool, plugins, config.staging_memory_limit);
366    for chunk_range in chunks {
367        let chunk_files = &files[chunk_range];
368
369        // Phase 1: Parallel parse this chunk
370        let staged_results: Vec<(PathBuf, Result<Option<ParsedFile>>)> = pool.install(|| {
371            chunk_files
372                .par_iter()
373                .map(|path| {
374                    let result = parse_file(path.as_path(), plugins);
375                    tracker.increment_progress();
376                    (path.clone(), result)
377                })
378                .collect()
379        });
380
381        // Separate successful parses from errors/skips
382        let mut chunk_parsed: Vec<(PathBuf, ParsedFile)> = Vec::new();
383        let mut chunk_staging_bytes = 0usize;
384        for (path, result) in staged_results {
385            match result {
386                Ok(Some(parsed)) => {
387                    let file_bytes = parsed.staging.estimated_byte_size();
388                    total_staging_bytes += file_bytes;
389                    chunk_staging_bytes += file_bytes;
390                    if file_bytes > max_file_staging_bytes {
391                        max_file_staging_bytes = file_bytes;
392                    }
393                    chunk_parsed.push((path, parsed));
394                }
395                Ok(None) => skipped += 1,
396                Err(e) => {
397                    parse_errors += 1;
398                    log::warn!("Failed to parse {}: {e}", path.display());
399                }
400            }
401        }
402        if chunk_staging_bytes > peak_chunk_staging_bytes {
403            peak_chunk_staging_bytes = chunk_staging_bytes;
404        }
405
406        if chunk_parsed.is_empty() {
407            continue;
408        }
409
410        // Register files in batch
411        let file_info: Vec<_> = chunk_parsed
412            .iter()
413            .map(|(path, parsed)| (path.clone(), Some(parsed.language)))
414            .collect();
415        let file_ids = graph
416            .files_mut()
417            .register_batch(&file_info)
418            .map_err(|e| anyhow::anyhow!("Failed to register files: {e}"))?;
419
420        // Phase 2: Count + range assignment (fast, no progress needed)
421        let staging_refs: Vec<_> = chunk_parsed.iter().map(|(_, p)| &p.staging).collect();
422        let plan = phase2_assign_ranges(&staging_refs, &file_ids, &offsets);
423
424        // Pre-allocate arena and interner ranges for Phase 3.
425        let placeholder = crate::graph::unified::storage::NodeEntry::new(
426            crate::graph::unified::node::NodeKind::Other,
427            crate::graph::unified::string::StringId::new(0),
428            crate::graph::unified::file::FileId::new(0),
429        );
430        graph
431            .nodes_mut()
432            .alloc_range(plan.total_nodes, &placeholder)
433            .map_err(|e| anyhow::anyhow!("Failed to alloc node range: {e:?}"))?;
434        graph
435            .strings_mut()
436            .alloc_range(plan.total_strings)
437            .map_err(|e| anyhow::anyhow!("Failed to alloc string range: {e}"))?;
438
439        // Phase 3: Parallel commit into disjoint pre-allocated ranges.
440        // Use pool.install to respect BuildConfig::num_threads for rayon par_iter.
441        let (arena, interner) = graph.nodes_and_strings_mut();
442        let phase3 = pool.install(|| phase3_parallel_commit(&plan, &staging_refs, arena, interner));
443
444        // Validate written counts match plan. A mismatch indicates a bug in
445        // StagingGraph counting — abort the build to prevent phantom entries
446        // and inconsistent file registry state.
447        let expected_nodes = plan.total_nodes as usize;
448        let expected_strings = plan.total_strings as usize;
449        let expected_edges = usize::try_from(plan.total_edges)
450            .unwrap_or_else(|_| unreachable!("edge count does not fit usize"));
451        if phase3.total_nodes_written != expected_nodes
452            || phase3.total_strings_written != expected_strings
453            || phase3.total_edges_collected != expected_edges
454        {
455            anyhow::bail!(
456                "Phase 3 count mismatch: nodes {}/{expected_nodes}, strings {}/{expected_strings}, \
457                 edges {}/{expected_edges}. This indicates a bug in StagingGraph counting.",
458                phase3.total_nodes_written,
459                phase3.total_strings_written,
460                phase3.total_edges_collected,
461            );
462        }
463
464        succeeded += chunk_parsed.len();
465
466        // Merge confidence metadata from parsed files
467        for (_path, parsed) in &mut chunk_parsed {
468            if let Some(confidence) = parsed.staging.take_confidence() {
469                let language_name = parsed.language.to_string();
470                graph.merge_confidence(&language_name, confidence);
471            }
472        }
473
474        // Update global offsets for next chunk
475        offsets.node_offset += plan.total_nodes;
476        offsets.string_offset += plan.total_strings;
477
478        // Accumulate edges for Phase 4
479        all_edges.extend(phase3.per_file_edges);
480    }
481    tracker.complete_phase();
482
483    // Phase 4: Post-chunk finalization
484    tracker.start_phase(4, "Finalizing graph", 4);
485
486    // Phase 4a: Global string dedup
487    let string_remap = graph.strings_mut().build_dedup_table();
488    if !string_remap.is_empty() {
489        log::debug!(
490            "Phase 4a: dedup removed {} duplicate string(s)",
491            string_remap.len()
492        );
493
494        // Phase 4b: Apply dedup remap to all nodes and pending edges
495        phase4_apply_global_remap(graph.nodes_mut(), &mut all_edges, &string_remap);
496    }
497    tracker.increment_progress(); // 4a+4b done
498
499    // Phase 4c: Build indices from finalized arena.
500    // Uses build_from_arena() which is O(n log n) — no per-element duplicate check.
501    graph.rebuild_indices();
502    tracker.increment_progress(); // 4c done
503
504    // Phase 4d: Bulk insert edges via deterministic DeltaEdge conversion.
505    // Start seq numbering from the edge store's current counter to support non-empty graphs.
506    let edge_seq_start = graph.edges().forward().seq_counter();
507    let (delta_edge_vecs, _final_seq) = pending_edges_to_delta(&all_edges, edge_seq_start);
508    let total_edge_count: u64 = delta_edge_vecs.iter().map(|v| v.len() as u64).sum();
509    if total_edge_count > 0 {
510        graph
511            .edges()
512            .add_edges_bulk_ordered(&delta_edge_vecs, total_edge_count);
513    }
514    tracker.increment_progress(); // 4d done
515    tracker.complete_phase();
516
517    log::info!(
518        "Parallel indexing complete: {succeeded} committed, {skipped} skipped, \
519         {parse_errors} parse errors, \
520         ~{} MB total staged, ~{} MB peak chunk (max single file: ~{} KB)",
521        total_staging_bytes / (1024 * 1024),
522        peak_chunk_staging_bytes / (1024 * 1024),
523        max_file_staging_bytes / 1024,
524    );
525
526    let attempted = succeeded + parse_errors;
527
528    if attempted == 0 {
529        log::warn!(
530            "No eligible source files found for graph build in {}",
531            root.display()
532        );
533    }
534
535    if attempted > 0 && succeeded == 0 {
536        anyhow::bail!("All graph builds failed");
537    }
538
539    // Pass 5: Cross-language linking (FFI declarations → C/C++ functions, HTTP requests → endpoints)
540    tracker.start_phase(5, "Cross-language linking", 1);
541    let pass5_stats = super::pass5_cross_language::link_cross_language_edges(&mut graph);
542    if pass5_stats.total_edges_created > 0 {
543        log::info!(
544            "Pass 5: {} cross-language edges created ({} FFI, {} HTTP)",
545            pass5_stats.total_edges_created,
546            pass5_stats.ffi_edges_created,
547            pass5_stats.http_endpoints_matched,
548        );
549    }
550    tracker.increment_progress(); // pass 5 done
551    tracker.complete_phase();
552
553    log::info!("Built unified graph with {} nodes", graph.node_count());
554    Ok((graph, effective_threads))
555}
556
557/// Build unified graph, persist snapshot + manifest, and run analysis pipeline.
558///
559/// Convenience wrapper that uses a no-op progress reporter.
560/// See [`build_and_persist_graph_with_progress`] for full documentation.
561///
562/// # Errors
563///
564/// Returns an error if graph building, persistence, or analysis fails.
565pub fn build_and_persist_graph(
566    root: &Path,
567    plugins: &PluginManager,
568    config: &BuildConfig,
569    build_command: &str,
570) -> Result<(CodeGraph, BuildResult)> {
571    build_and_persist_graph_with_progress(root, plugins, config, build_command, no_op_reporter())
572}
573
574/// Build unified graph with progress, persist snapshot + manifest, and run analysis.
575///
576/// This is the single entry point for building a complete graph index. It combines:
577/// 1. Graph building from source files (with progress reporting)
578/// 2. Snapshot persistence (binary format)
579/// 3. Analysis pipeline (CSR + SCC + Condensation DAG + labels/fallback) — strict, fails on error
580/// 4. Manifest creation with deduplicated edge count (JSON metadata, written LAST)
581///
582/// The manifest is the "commit point" — written last, only after all other artifacts
583/// succeed. Consumers check `storage.exists()` (manifest-based) for index readiness.
584///
585/// # Arguments
586///
587/// * `root` - Root directory to scan for source files
588/// * `plugins` - Plugin manager for language-specific extraction
589/// * `config` - Build configuration
590/// * `build_command` - Provenance string (e.g., `"cli:index"`, `"mcp:rebuild_index"`)
591/// * `progress` - Progress reporter for build status updates
592///
593/// # Errors
594///
595/// Returns an error if graph building, persistence, or analysis fails.
596/// Analysis failure is strict — no fallback to raw edge counts.
597#[allow(clippy::too_many_lines, clippy::needless_pass_by_value)]
598pub fn build_and_persist_graph_with_progress(
599    root: &Path,
600    plugins: &PluginManager,
601    config: &BuildConfig,
602    build_command: &str,
603    progress: SharedReporter,
604) -> Result<(CodeGraph, BuildResult)> {
605    use crate::graph::unified::analysis::csr::CsrAdjacency;
606    use crate::graph::unified::analysis::{AnalysisIdentity, GraphAnalyses, compute_node_id_hash};
607    use crate::graph::unified::compaction::{Direction, build_compacted_csr, snapshot_edges};
608    use crate::graph::unified::persistence::manifest::write_manifest_bytes_atomic;
609    use crate::graph::unified::persistence::{
610        BuildProvenance, GraphStorage, MANIFEST_SCHEMA_VERSION, Manifest, SNAPSHOT_FORMAT_VERSION,
611        save_to_path,
612    };
613    use crate::progress::IndexProgress;
614    use chrono::Utc;
615    use sha2::{Digest, Sha256};
616
617    // Step 1: Build the unified graph
618    let (graph, effective_threads) =
619        build_unified_graph_inner(root, plugins, config, progress.clone())?;
620
621    // Step 2: Ensure storage directories exist and remove old manifest
622    // Removing the manifest BEFORE writing the new snapshot ensures that
623    // readers see `storage.exists() == false` during the rebuild window.
624    // Without this, an interrupted rebuild (crash after snapshot write but
625    // before manifest write) would leave the old manifest paired with a
626    // new, potentially incompatible snapshot — violating the commit-point
627    // contract.
628    let storage = GraphStorage::new(root);
629    fs::create_dir_all(storage.graph_dir())
630        .with_context(|| format!("Failed to create {}", storage.graph_dir().display()))?;
631
632    if storage.exists() {
633        // Remove old manifest so readers don't see stale readiness.
634        // This MUST succeed before we overwrite the snapshot — otherwise a
635        // crash between snapshot write and manifest write leaves stale
636        // readiness (old manifest + new snapshot).  NotFound is harmless
637        // (race or already cleaned up); any other error is fatal.
638        match fs::remove_file(storage.manifest_path()) {
639            Ok(()) => {}
640            Err(e) if e.kind() == std::io::ErrorKind::NotFound => {}
641            Err(e) => {
642                return Err(e).with_context(|| {
643                    format!(
644                        "Failed to remove old manifest at {} — rebuild cannot proceed safely",
645                        storage.manifest_path().display()
646                    )
647                });
648            }
649        }
650    }
651
652    // Step 3: Capture raw edge count before compaction changes it
653    let raw_edge_count = graph.edge_count();
654    let node_count = graph.node_count();
655
656    // Step 4: Compact edge stores into CSR before persistence
657    //
658    // The build pipeline inserts all edges into the DeltaBuffer (write-optimized).
659    // Without compaction, the persisted snapshot stores edges in delta, causing
660    // O(N) scans for every edges_from()/edges_to() call on load. Compacting to
661    // CSR gives O(degree) lookups — critical for kernel-scale graphs (22M edges).
662    progress.report(IndexProgress::StageStarted {
663        stage_name: "Compacting edge stores for persistence",
664    });
665    let compaction_start = std::time::Instant::now();
666
667    // Snapshot both edge stores (sequential — holds read locks briefly)
668    let forward_compaction_snapshot = {
669        let forward_store = graph.edges().forward();
670        snapshot_edges(&forward_store, node_count)
671    };
672    let reverse_compaction_snapshot = {
673        let reverse_store = graph.edges().reverse();
674        snapshot_edges(&reverse_store, node_count)
675    };
676
677    // Build both CSRs in parallel (CPU-intensive, no locks held)
678    let (forward_result, reverse_result) = rayon::join(
679        || build_compacted_csr(&forward_compaction_snapshot, Direction::Forward),
680        || build_compacted_csr(&reverse_compaction_snapshot, Direction::Reverse),
681    );
682
683    let (forward_csr, _forward_build_stats) =
684        forward_result.context("Failed to build forward CSR for persistence compaction")?;
685    let (reverse_csr, _reverse_build_stats) =
686        reverse_result.context("Failed to build reverse CSR for persistence compaction")?;
687
688    // Drop snapshots — no longer needed
689    drop(forward_compaction_snapshot);
690    drop(reverse_compaction_snapshot);
691
692    // Build analysis adjacency from forward CSR before it's consumed by swap.
693    // This replaces the expensive build_from_snapshot merge+sort (~11s on kernel).
694    let adjacency = CsrAdjacency::from_csr_graph(&forward_csr);
695
696    // Atomic mutation phase: swap both CSRs and clear both deltas
697    graph
698        .edges()
699        .swap_csrs_and_clear_deltas(forward_csr, reverse_csr);
700
701    progress.report(IndexProgress::StageCompleted {
702        stage_name: "Compacting edge stores for persistence",
703        stage_duration: compaction_start.elapsed(),
704    });
705
706    // Step 5: Save CSR-backed binary snapshot
707    progress.report(IndexProgress::SavingStarted {
708        component_name: "unified graph",
709    });
710    let save_start = std::time::Instant::now();
711
712    save_to_path(&graph, storage.snapshot_path()).with_context(|| {
713        format!(
714            "Failed to save snapshot to {}",
715            storage.snapshot_path().display()
716        )
717    })?;
718
719    progress.report(IndexProgress::SavingCompleted {
720        component_name: "unified graph",
721        save_duration: save_start.elapsed(),
722    });
723
724    // Step 6: Compute snapshot checksum
725    let snapshot_content =
726        fs::read(storage.snapshot_path()).context("Failed to read snapshot for checksum")?;
727    let snapshot_sha256 = hex::encode(Sha256::digest(&snapshot_content));
728
729    // Step 7: Build full analyses from the prebuilt adjacency.
730    // CsrAdjacency was already derived from the forward CsrGraph in Step 4,
731    // eliminating the expensive re-merge from CompactionSnapshot.
732    progress.report(IndexProgress::StageStarted {
733        stage_name: "Computing graph analyses",
734    });
735    let analysis_start = std::time::Instant::now();
736
737    let analyses = if let Some(thread_count) = config.num_threads {
738        rayon::ThreadPoolBuilder::new()
739            .num_threads(thread_count)
740            .build()
741            .context("Failed to create rayon thread pool for graph analysis")?
742            .install(|| {
743                GraphAnalyses::build_all_from_adjacency_with_budget(adjacency, &config.label_budget)
744            })
745    } else {
746        GraphAnalyses::build_all_from_adjacency_with_budget(adjacency, &config.label_budget)
747    }
748    .context("Failed to build graph analyses")?;
749
750    progress.report(IndexProgress::StageCompleted {
751        stage_name: "Computing graph analyses",
752        stage_duration: analysis_start.elapsed(),
753    });
754
755    let dedup_edge_count = analyses.adjacency.edge_count as usize;
756
757    let analysis_strategies = vec![
758        AnalysisStrategySummary {
759            edge_kind: "calls",
760            strategy: analyses.cond_calls.strategy,
761        },
762        AnalysisStrategySummary {
763            edge_kind: "imports",
764            strategy: analyses.cond_imports.strategy,
765        },
766        AnalysisStrategySummary {
767            edge_kind: "references",
768            strategy: analyses.cond_references.strategy,
769        },
770        AnalysisStrategySummary {
771            edge_kind: "inherits",
772            strategy: analyses.cond_inherits.strategy,
773        },
774    ];
775
776    // Step 7: Count files by language using plugin detection
777    let mut file_counts: std::collections::HashMap<String, usize> =
778        std::collections::HashMap::new();
779    for (_file_id, file_path) in graph.indexed_files() {
780        let language = plugins
781            .plugin_for_path(file_path)
782            .map_or_else(|| "unknown".to_string(), |p| p.metadata().id.to_string());
783        *file_counts.entry(language).or_insert(0) += 1;
784    }
785    let total_files: usize = file_counts.values().sum();
786
787    // Step 8: Construct Manifest in memory (with dedup edge count from analysis)
788    let built_at = Utc::now().to_rfc3339();
789
790    let manifest = Manifest {
791        schema_version: MANIFEST_SCHEMA_VERSION,
792        snapshot_format_version: SNAPSHOT_FORMAT_VERSION,
793        built_at: built_at.clone(),
794        root_path: root.to_string_lossy().to_string(),
795        node_count,
796        edge_count: dedup_edge_count,
797        raw_edge_count: Some(raw_edge_count),
798        snapshot_sha256,
799        build_provenance: BuildProvenance {
800            sqry_version: env!("CARGO_PKG_VERSION").to_string(),
801            build_timestamp: built_at.clone(),
802            build_command: build_command.to_string(),
803            plugin_hashes: std::collections::HashMap::default(),
804        },
805        file_count: file_counts.clone(),
806        languages: Vec::default(),
807        config: std::collections::HashMap::default(),
808        confidence: graph.confidence().clone(),
809        last_indexed_commit: get_git_head_commit(root),
810    };
811
812    // Step 9: Serialize manifest to bytes and compute hash
813    let manifest_bytes =
814        serde_json::to_vec_pretty(&manifest).context("Failed to serialize manifest")?;
815
816    let manifest_hash = {
817        let mut hasher = Sha256::new();
818        hasher.update(&manifest_bytes);
819        hex::encode(hasher.finalize())
820    };
821
822    // Step 10: Construct AnalysisIdentity and persist all analyses
823    let snapshot = graph.snapshot();
824    let node_id_hash = compute_node_id_hash(&snapshot);
825    let identity = AnalysisIdentity::new(manifest_hash, node_id_hash);
826
827    fs::create_dir_all(storage.analysis_dir()).with_context(|| {
828        format!(
829            "Failed to create analysis directory at {}",
830            storage.analysis_dir().display()
831        )
832    })?;
833
834    progress.report(IndexProgress::SavingStarted {
835        component_name: "graph analyses",
836    });
837
838    analyses
839        .persist_all(&storage, &identity)
840        .context("Failed to persist graph analyses")?;
841
842    log::info!(
843        "Graph analyses persisted to {}",
844        storage.analysis_dir().display()
845    );
846
847    progress.report(IndexProgress::SavingCompleted {
848        component_name: "graph analyses",
849        save_duration: analysis_start.elapsed(),
850    });
851
852    // Step 11: Write manifest bytes to disk LAST (commit point)
853    write_manifest_bytes_atomic(storage.manifest_path(), &manifest_bytes).with_context(|| {
854        format!(
855            "Failed to save manifest to {}",
856            storage.manifest_path().display()
857        )
858    })?;
859
860    log::info!(
861        "Manifest saved to {} (dedup edges: {}, raw edges: {})",
862        storage.manifest_path().display(),
863        dedup_edge_count,
864        raw_edge_count
865    );
866
867    let build_result = BuildResult {
868        node_count,
869        edge_count: dedup_edge_count,
870        raw_edge_count,
871        file_count: file_counts,
872        total_files,
873        built_at,
874        root_path: root.to_string_lossy().to_string(),
875        thread_count: effective_threads,
876        analysis_strategies,
877    };
878
879    Ok((graph, build_result))
880}
881
882/// Get the current HEAD commit SHA from a git repository.
883fn get_git_head_commit(path: &Path) -> Option<String> {
884    let output = std::process::Command::new("git")
885        .arg("-C")
886        .arg(path)
887        .args(["rev-parse", "HEAD"])
888        .output()
889        .ok()?;
890
891    if output.status.success() {
892        let sha = String::from_utf8_lossy(&output.stdout).trim().to_string();
893        if sha.len() == 40 && sha.chars().all(|c| c.is_ascii_hexdigit()) {
894            return Some(sha);
895        }
896    }
897    None
898}
899
900/// Find source files in the given directory.
901///
902/// Uses the `ignore` crate to respect `.gitignore` files and standard ignore patterns.
903fn find_source_files(root: &Path, config: &BuildConfig) -> Vec<std::path::PathBuf> {
904    let mut builder = WalkBuilder::new(root);
905
906    builder
907        .follow_links(config.follow_links)
908        .hidden(!config.include_hidden)
909        .git_ignore(true)
910        .git_global(true)
911        .git_exclude(true);
912
913    if let Some(depth) = config.max_depth {
914        builder.max_depth(Some(depth));
915    }
916
917    if let Some(threads) = config.num_threads {
918        builder.threads(threads);
919    }
920
921    let mut files = Vec::new();
922
923    for entry in builder.build() {
924        let entry = match entry {
925            Ok(entry) => entry,
926            Err(err) => {
927                log::warn!("Failed to read directory entry: {err}");
928                continue;
929            }
930        };
931
932        if entry.file_type().is_some_and(|ft| ft.is_file()) {
933            files.push(entry.into_path());
934        }
935    }
936
937    files
938}
939
940fn sort_files_for_build(root: &Path, files: &mut [PathBuf]) {
941    let normalized_root = normalize_path_components(root);
942    files.sort_by(|left, right| {
943        let left_key = file_sort_key(&normalized_root, left);
944        let right_key = file_sort_key(&normalized_root, right);
945        left_key.cmp(&right_key).then_with(|| left.cmp(right))
946    });
947}
948
949fn file_sort_key(root: &Path, path: &Path) -> String {
950    let normalized_path = normalize_path_components(path);
951    let relative = normalized_path
952        .strip_prefix(root)
953        .unwrap_or(normalized_path.as_path());
954    let mut key = relative.to_string_lossy().replace('\\', "/");
955    if cfg!(windows) {
956        key = key.to_ascii_lowercase();
957    }
958    key
959}
960
961/// Result of successfully parsing a single file (parallel-safe, no shared state).
962struct ParsedFile {
963    /// Language identifier for file counting and confidence merging.
964    language: crate::graph::Language,
965    /// Staged graph operations ready for serial commit.
966    staging: StagingGraph,
967}
968
969/// Parse a single file into a `StagingGraph` without touching the shared graph.
970///
971/// This function is safe to call from multiple threads — it creates its own
972/// parser, reads the file, and builds a self-contained staging graph.
973///
974/// Returns `Ok(None)` if the file has no matching plugin or graph builder.
975fn parse_file(path: &Path, plugins: &PluginManager) -> Result<Option<ParsedFile>> {
976    let plugin = plugins.plugin_for_path(path);
977    let Some(plugin) = plugin else {
978        return Ok(None);
979    };
980
981    let Some(builder) = plugin.graph_builder() else {
982        return Ok(None);
983    };
984
985    let content = fs::read(path).with_context(|| format!("failed to read {}", path.display()))?;
986
987    let tree = plugin
988        .parse_ast(&content)
989        .map_err(|err| map_parse_error(path, err))?;
990
991    let mut staging = StagingGraph::new();
992    builder
993        .build_graph(&tree, &content, path, &mut staging)
994        .map_err(|err| map_builder_error(path, &err))?;
995
996    staging.attach_body_hashes(&content);
997
998    Ok(Some(ParsedFile {
999        language: builder.language(),
1000        staging,
1001    }))
1002}
1003
1004fn map_parse_error(path: &Path, err: ParseError) -> anyhow::Error {
1005    match err {
1006        ParseError::TreeSitterFailed => {
1007            anyhow::anyhow!("tree-sitter failed to parse {}", path.display())
1008        }
1009        ParseError::LanguageSetFailed(reason) => anyhow::anyhow!(
1010            "failed to configure tree-sitter for {}: {}",
1011            path.display(),
1012            reason
1013        ),
1014        _ => anyhow::anyhow!("parse error in {}: {:?}", path.display(), err),
1015    }
1016}
1017
1018fn map_builder_error(path: &Path, err: &GraphBuilderError) -> anyhow::Error {
1019    anyhow::anyhow!("graph builder error in {}: {}", path.display(), err)
1020}
1021
1022#[cfg(test)]
1023mod tests {
1024    use super::*;
1025    use crate::ast::Scope;
1026    use crate::graph::{GraphBuilder, GraphBuilderError, GraphResult, Language};
1027    use crate::plugin::error::{ParseError, ScopeError};
1028    use crate::plugin::{LanguageMetadata, LanguagePlugin};
1029    use std::fs;
1030    use std::path::Path;
1031    use tempfile::TempDir;
1032    use tree_sitter::{Parser, Tree};
1033
1034    const RUST_TEST_EXTENSIONS: &[&str] = &["rs"];
1035    const FILENAME_MATCH_EXTENSIONS: &[&str] = &["rmd", "bash_profile"];
1036
1037    /// Test helper: commit a single parsed file to a graph using the serial path.
1038    ///
1039    /// This is only used in tests to verify parse-and-commit without running the
1040    /// full parallel pipeline. It replicates the old `commit_staged_file` logic.
1041    fn commit_parsed_file_for_test(path: &Path, mut parsed: ParsedFile, graph: &mut CodeGraph) {
1042        let file_id = graph
1043            .files_mut()
1044            .register_with_language(path, Some(parsed.language))
1045            .expect("register file");
1046        parsed.staging.apply_file_id(file_id);
1047        let string_remap = parsed
1048            .staging
1049            .commit_strings(graph.strings_mut())
1050            .expect("commit strings");
1051        parsed
1052            .staging
1053            .apply_string_remap(&string_remap)
1054            .expect("apply string remap");
1055        let node_id_mapping = parsed
1056            .staging
1057            .commit_nodes(graph.nodes_mut())
1058            .expect("commit nodes");
1059        let edges = parsed.staging.get_remapped_edges(&node_id_mapping);
1060        for edge in edges {
1061            graph.edges_mut().add_edge_with_spans(
1062                edge.source,
1063                edge.target,
1064                edge.kind.clone(),
1065                file_id,
1066                edge.spans.clone(),
1067            );
1068        }
1069    }
1070
1071    fn parse_rust_ast(content: &[u8]) -> Result<Tree, ParseError> {
1072        let mut parser = Parser::new();
1073        let language = tree_sitter_rust::LANGUAGE.into();
1074        parser
1075            .set_language(&language)
1076            .map_err(|err| ParseError::LanguageSetFailed(err.to_string()))?;
1077        parser
1078            .parse(content, None)
1079            .ok_or(ParseError::TreeSitterFailed)
1080    }
1081
1082    struct TestPlugin {
1083        metadata: LanguageMetadata,
1084        extensions: &'static [&'static str],
1085        builder: Option<Box<dyn GraphBuilder>>,
1086    }
1087
1088    impl TestPlugin {
1089        fn new(
1090            id: &'static str,
1091            extensions: &'static [&'static str],
1092            builder: Option<Box<dyn GraphBuilder>>,
1093        ) -> Self {
1094            Self {
1095                metadata: LanguageMetadata {
1096                    id,
1097                    name: "Rust",
1098                    version: "test",
1099                    author: "sqry-core tests",
1100                    description: "Test-only Rust plugin for unified graph entrypoint tests",
1101                    tree_sitter_version: "0.25",
1102                },
1103                extensions,
1104                builder,
1105            }
1106        }
1107    }
1108
1109    impl LanguagePlugin for TestPlugin {
1110        fn metadata(&self) -> LanguageMetadata {
1111            self.metadata.clone()
1112        }
1113
1114        fn extensions(&self) -> &'static [&'static str] {
1115            self.extensions
1116        }
1117
1118        fn language(&self) -> tree_sitter::Language {
1119            tree_sitter_rust::LANGUAGE.into()
1120        }
1121
1122        fn parse_ast(&self, content: &[u8]) -> Result<Tree, ParseError> {
1123            parse_rust_ast(content)
1124        }
1125
1126        fn extract_scopes(
1127            &self,
1128            _tree: &Tree,
1129            _content: &[u8],
1130            _file_path: &Path,
1131        ) -> Result<Vec<Scope>, ScopeError> {
1132            Ok(Vec::new())
1133        }
1134
1135        fn graph_builder(&self) -> Option<&dyn crate::graph::GraphBuilder> {
1136            self.builder.as_deref()
1137        }
1138    }
1139
1140    struct FailingGraphBuilder;
1141
1142    impl GraphBuilder for FailingGraphBuilder {
1143        fn build_graph(
1144            &self,
1145            _tree: &Tree,
1146            _content: &[u8],
1147            _file: &Path,
1148            _staging: &mut StagingGraph,
1149        ) -> GraphResult<()> {
1150            Err(GraphBuilderError::CrossLanguageError {
1151                reason: "forced failure".to_string(),
1152            })
1153        }
1154
1155        fn language(&self) -> Language {
1156            Language::Rust
1157        }
1158    }
1159
1160    struct NoopGraphBuilder;
1161
1162    impl GraphBuilder for NoopGraphBuilder {
1163        fn build_graph(
1164            &self,
1165            _tree: &Tree,
1166            _content: &[u8],
1167            _file: &Path,
1168            _staging: &mut StagingGraph,
1169        ) -> GraphResult<()> {
1170            Ok(())
1171        }
1172
1173        fn language(&self) -> Language {
1174            Language::Rust
1175        }
1176    }
1177
1178    #[test]
1179    fn test_build_config_default() {
1180        let config = BuildConfig::default();
1181        assert_eq!(config.max_depth, None);
1182        assert!(!config.follow_links);
1183        assert!(!config.include_hidden);
1184        assert_eq!(config.num_threads, None);
1185    }
1186
1187    #[test]
1188    fn test_build_unified_graph_empty_registry_error() {
1189        let plugins = PluginManager::new();
1190        let config = BuildConfig::default();
1191        let root = std::path::Path::new(".");
1192
1193        let result = build_unified_graph(root, &plugins, &config);
1194        assert!(result.is_err());
1195        assert_eq!(
1196            result.unwrap_err().to_string(),
1197            "No graph builders registered – cannot build code graph"
1198        );
1199    }
1200
1201    #[test]
1202    fn test_build_unified_graph_no_graph_builders_error() {
1203        let mut plugins = PluginManager::new();
1204        plugins.register_builtin(Box::new(TestPlugin::new(
1205            "rust-no-graph-builder",
1206            RUST_TEST_EXTENSIONS,
1207            None,
1208        )));
1209        let config = BuildConfig::default();
1210        let root = std::path::Path::new(".");
1211
1212        let result = build_unified_graph(root, &plugins, &config);
1213        assert!(result.is_err());
1214        assert_eq!(
1215            result.unwrap_err().to_string(),
1216            "No graph builders registered – cannot build code graph"
1217        );
1218    }
1219
1220    #[test]
1221    fn test_build_unified_graph_all_failures_error() {
1222        let temp_dir = TempDir::new().expect("temp dir");
1223        let file_path = temp_dir.path().join("fail.rs");
1224        fs::write(&file_path, "fn main() {}").expect("write test file");
1225
1226        let mut plugins = PluginManager::new();
1227        plugins.register_builtin(Box::new(TestPlugin::new(
1228            "rust-failing-graph-builder",
1229            RUST_TEST_EXTENSIONS,
1230            Some(Box::new(FailingGraphBuilder)),
1231        )));
1232        let config = BuildConfig::default();
1233
1234        let result = build_unified_graph(temp_dir.path(), &plugins, &config);
1235        assert!(result.is_err());
1236        assert_eq!(result.unwrap_err().to_string(), "All graph builds failed");
1237    }
1238
1239    #[test]
1240    fn test_parse_file_matches_uppercase_extension() {
1241        let temp_dir = TempDir::new().expect("temp dir");
1242        let file_path = temp_dir.path().join("report.Rmd");
1243        fs::write(&file_path, "fn main() {}").expect("write test file");
1244
1245        let mut plugins = PluginManager::new();
1246        plugins.register_builtin(Box::new(TestPlugin::new(
1247            "rust-filename-match",
1248            FILENAME_MATCH_EXTENSIONS,
1249            Some(Box::new(NoopGraphBuilder)),
1250        )));
1251        let mut graph = CodeGraph::new();
1252
1253        let parsed = parse_file(&file_path, &plugins)
1254            .expect("parse file")
1255            .expect("should not be skipped");
1256        commit_parsed_file_for_test(&file_path, parsed, &mut graph);
1257    }
1258
1259    #[test]
1260    fn test_parse_file_matches_dotless_filename() {
1261        let temp_dir = TempDir::new().expect("temp dir");
1262        let file_path = temp_dir.path().join("bash_profile");
1263        fs::write(&file_path, "fn main() {}").expect("write test file");
1264
1265        let mut plugins = PluginManager::new();
1266        plugins.register_builtin(Box::new(TestPlugin::new(
1267            "rust-filename-match",
1268            FILENAME_MATCH_EXTENSIONS,
1269            Some(Box::new(NoopGraphBuilder)),
1270        )));
1271        let mut graph = CodeGraph::new();
1272
1273        let parsed = parse_file(&file_path, &plugins)
1274            .expect("parse file")
1275            .expect("should not be skipped");
1276        commit_parsed_file_for_test(&file_path, parsed, &mut graph);
1277    }
1278
1279    #[test]
1280    fn test_parse_file_matches_pulumi_stack_filename() {
1281        let temp_dir = TempDir::new().expect("temp dir");
1282        let file_path = temp_dir.path().join("Pulumi.dev.yaml");
1283        fs::write(&file_path, "fn main() {}").expect("write test file");
1284
1285        let mut plugins = PluginManager::new();
1286        plugins.register_builtin(Box::new(TestPlugin::new(
1287            "pulumi",
1288            &["pulumi.yaml"],
1289            Some(Box::new(NoopGraphBuilder)),
1290        )));
1291        let mut graph = CodeGraph::new();
1292
1293        let parsed = parse_file(&file_path, &plugins)
1294            .expect("parse file")
1295            .expect("should not be skipped");
1296        commit_parsed_file_for_test(&file_path, parsed, &mut graph);
1297    }
1298
1299    // ========================================================================
1300    // Build pipeline consolidation regression tests
1301    // ========================================================================
1302
1303    /// A graph builder that creates a few nodes and edges for testing.
1304    struct SimpleGraphBuilder;
1305
1306    impl GraphBuilder for SimpleGraphBuilder {
1307        fn build_graph(
1308            &self,
1309            _tree: &Tree,
1310            _content: &[u8],
1311            file: &Path,
1312            staging: &mut StagingGraph,
1313        ) -> GraphResult<()> {
1314            use crate::graph::unified::build::helper::GraphBuildHelper;
1315
1316            let mut helper = GraphBuildHelper::new(staging, file, Language::Rust);
1317
1318            // Create two function nodes
1319            let fn1 = helper.add_function("main", None, false, false);
1320            let fn2 = helper.add_function("helper", None, false, false);
1321
1322            // Add a Calls edge from main -> helper
1323            helper.add_call_edge(fn1, fn2);
1324
1325            Ok(())
1326        }
1327
1328        fn language(&self) -> Language {
1329            Language::Rust
1330        }
1331    }
1332
1333    /// `build_and_persist_graph` returns a populated `BuildResult`.
1334    #[test]
1335    fn test_build_and_persist_graph_returns_build_result() {
1336        let temp_dir = TempDir::new().expect("temp dir");
1337        let file_path = temp_dir.path().join("test.rs");
1338        fs::write(&file_path, "fn main() {} fn helper() {}").expect("write test file");
1339
1340        let mut plugins = PluginManager::new();
1341        plugins.register_builtin(Box::new(TestPlugin::new(
1342            "rust-simple",
1343            RUST_TEST_EXTENSIONS,
1344            Some(Box::new(SimpleGraphBuilder)),
1345        )));
1346        let config = BuildConfig::default();
1347
1348        let result =
1349            build_and_persist_graph(temp_dir.path(), &plugins, &config, "test:build_result");
1350        assert!(result.is_ok(), "build_and_persist_graph should succeed");
1351
1352        let (_graph, build_result) = result.unwrap();
1353        assert!(build_result.node_count > 0, "Should have nodes");
1354        assert!(build_result.total_files > 0, "Should have indexed files");
1355        assert!(!build_result.built_at.is_empty(), "Should have timestamp");
1356        assert!(!build_result.root_path.is_empty(), "Should have root path");
1357    }
1358
1359    /// Deduplicated `edge_count` is always <= `raw_edge_count`.
1360    #[test]
1361    fn test_build_result_edge_count_le_raw() {
1362        let temp_dir = TempDir::new().expect("temp dir");
1363        let file_path = temp_dir.path().join("test.rs");
1364        fs::write(&file_path, "fn main() {} fn helper() {}").expect("write test file");
1365
1366        let mut plugins = PluginManager::new();
1367        plugins.register_builtin(Box::new(TestPlugin::new(
1368            "rust-simple",
1369            RUST_TEST_EXTENSIONS,
1370            Some(Box::new(SimpleGraphBuilder)),
1371        )));
1372        let config = BuildConfig::default();
1373
1374        let (_graph, build_result) =
1375            build_and_persist_graph(temp_dir.path(), &plugins, &config, "test:edge_count").unwrap();
1376
1377        assert!(
1378            build_result.edge_count <= build_result.raw_edge_count,
1379            "Deduplicated edge count ({}) should be <= raw edge count ({})",
1380            build_result.edge_count,
1381            build_result.raw_edge_count
1382        );
1383    }
1384
1385    /// File counts use plugin detection (keyed by plugin ID).
1386    #[test]
1387    fn test_build_and_persist_graph_file_counts_use_plugins() {
1388        let temp_dir = TempDir::new().expect("temp dir");
1389        let file_path = temp_dir.path().join("test.rs");
1390        fs::write(&file_path, "fn main() {}").expect("write test file");
1391
1392        let mut plugins = PluginManager::new();
1393        plugins.register_builtin(Box::new(TestPlugin::new(
1394            "rust-simple",
1395            RUST_TEST_EXTENSIONS,
1396            Some(Box::new(SimpleGraphBuilder)),
1397        )));
1398        let config = BuildConfig::default();
1399
1400        let (_graph, build_result) =
1401            build_and_persist_graph(temp_dir.path(), &plugins, &config, "test:file_counts")
1402                .unwrap();
1403
1404        // File counts should include the plugin's ID as the language key
1405        assert!(
1406            !build_result.file_count.is_empty(),
1407            "File counts should not be empty"
1408        );
1409        assert!(
1410            build_result.file_count.contains_key("rust-simple"),
1411            "File counts should use plugin ID. Got: {:?}",
1412            build_result.file_count
1413        );
1414    }
1415
1416    /// Manifest `edge_count` matches `BuildResult` (deduplicated).
1417    #[test]
1418    fn test_manifest_edge_count_is_deduplicated() {
1419        use crate::graph::unified::persistence::GraphStorage;
1420
1421        let temp_dir = TempDir::new().expect("temp dir");
1422        let file_path = temp_dir.path().join("test.rs");
1423        fs::write(&file_path, "fn main() {} fn helper() {}").expect("write test file");
1424
1425        let mut plugins = PluginManager::new();
1426        plugins.register_builtin(Box::new(TestPlugin::new(
1427            "rust-simple",
1428            RUST_TEST_EXTENSIONS,
1429            Some(Box::new(SimpleGraphBuilder)),
1430        )));
1431        let config = BuildConfig::default();
1432
1433        let (_graph, build_result) =
1434            build_and_persist_graph(temp_dir.path(), &plugins, &config, "test:manifest_dedup")
1435                .unwrap();
1436
1437        // Load manifest and verify edge counts match BuildResult
1438        let storage = GraphStorage::new(temp_dir.path());
1439        assert!(storage.exists(), "Manifest should exist after build");
1440
1441        let manifest = storage.load_manifest().unwrap();
1442        assert_eq!(
1443            manifest.edge_count, build_result.edge_count,
1444            "Manifest edge_count should match BuildResult (deduplicated)"
1445        );
1446        assert_eq!(
1447            manifest.raw_edge_count,
1448            Some(build_result.raw_edge_count),
1449            "Manifest raw_edge_count should match BuildResult"
1450        );
1451    }
1452
1453    /// Build command provenance is recorded in the manifest.
1454    #[test]
1455    fn test_build_command_provenance() {
1456        use crate::graph::unified::persistence::GraphStorage;
1457
1458        let temp_dir = TempDir::new().expect("temp dir");
1459        let file_path = temp_dir.path().join("test.rs");
1460        fs::write(&file_path, "fn main() {}").expect("write test file");
1461
1462        let mut plugins = PluginManager::new();
1463        plugins.register_builtin(Box::new(TestPlugin::new(
1464            "rust-simple",
1465            RUST_TEST_EXTENSIONS,
1466            Some(Box::new(SimpleGraphBuilder)),
1467        )));
1468        let config = BuildConfig::default();
1469
1470        build_and_persist_graph(temp_dir.path(), &plugins, &config, "cli:index").unwrap();
1471
1472        let storage = GraphStorage::new(temp_dir.path());
1473        let manifest = storage.load_manifest().unwrap();
1474        assert_eq!(
1475            manifest.build_provenance.build_command, "cli:index",
1476            "Build command provenance should match"
1477        );
1478    }
1479
1480    /// Analysis identity hash matches the on-disk manifest bytes hash.
1481    #[test]
1482    fn test_analysis_identity_matches_manifest_hash() {
1483        use crate::graph::unified::analysis::persistence::load_csr;
1484        use crate::graph::unified::persistence::GraphStorage;
1485        use sha2::{Digest, Sha256};
1486
1487        let temp_dir = TempDir::new().expect("temp dir");
1488        let file_path = temp_dir.path().join("test.rs");
1489        fs::write(&file_path, "fn main() {} fn helper() {}").expect("write test file");
1490
1491        let mut plugins = PluginManager::new();
1492        plugins.register_builtin(Box::new(TestPlugin::new(
1493            "rust-simple",
1494            RUST_TEST_EXTENSIONS,
1495            Some(Box::new(SimpleGraphBuilder)),
1496        )));
1497        let config = BuildConfig::default();
1498
1499        build_and_persist_graph(temp_dir.path(), &plugins, &config, "test:identity").unwrap();
1500
1501        let storage = GraphStorage::new(temp_dir.path());
1502
1503        // Compute manifest hash from on-disk manifest bytes
1504        let manifest_bytes = std::fs::read(storage.manifest_path()).unwrap();
1505        let expected_hash = hex::encode(Sha256::digest(&manifest_bytes));
1506
1507        // Load analysis identity from the CSR file (identity is embedded in each analysis file)
1508        let (_csr, identity) = load_csr(&storage.analysis_csr_path()).unwrap();
1509
1510        assert_eq!(
1511            identity.manifest_hash, expected_hash,
1512            "On-disk manifest hash should equal analysis identity hash"
1513        );
1514    }
1515
1516    /// Regression test: old manifest is removed at start of rebuild.
1517    ///
1518    /// Verifies that `build_and_persist_graph_with_progress()` removes any
1519    /// existing manifest before writing the new snapshot. This prevents the
1520    /// inconsistent state where an old manifest pairs with a new snapshot
1521    /// after an interrupted rebuild.
1522    #[test]
1523    fn test_old_manifest_removed_during_rebuild() {
1524        use crate::graph::unified::persistence::GraphStorage;
1525
1526        let temp_dir = tempfile::TempDir::new().unwrap();
1527        let src = temp_dir.path().join("lib.rs");
1528        std::fs::write(&src, "fn main() {}").unwrap();
1529
1530        // Build an initial index
1531        let mut plugins = PluginManager::new();
1532        plugins.register_builtin(Box::new(TestPlugin::new(
1533            "rust-simple",
1534            RUST_TEST_EXTENSIONS,
1535            Some(Box::new(SimpleGraphBuilder)),
1536        )));
1537        let config = BuildConfig::default();
1538        build_and_persist_graph(temp_dir.path(), &plugins, &config, "test:initial").unwrap();
1539
1540        let storage = GraphStorage::new(temp_dir.path());
1541        assert!(
1542            storage.exists(),
1543            "Manifest should exist after initial build"
1544        );
1545
1546        // Record the original manifest's built_at timestamp
1547        let original_manifest = storage.load_manifest().unwrap();
1548        let original_built_at = original_manifest.built_at.clone();
1549
1550        // Rebuild — during the build, the old manifest should be removed first
1551        build_and_persist_graph(temp_dir.path(), &plugins, &config, "test:rebuild").unwrap();
1552
1553        // Verify the manifest was replaced (different built_at timestamp)
1554        let new_manifest = storage.load_manifest().unwrap();
1555        assert_ne!(
1556            original_built_at, new_manifest.built_at,
1557            "Manifest should have been replaced with new timestamp"
1558        );
1559        assert_eq!(
1560            new_manifest.build_provenance.build_command, "test:rebuild",
1561            "Manifest should reflect the rebuild provenance"
1562        );
1563    }
1564
1565    /// Regression test: failed rebuild leaves index in non-ready state.
1566    ///
1567    /// Exercises the real pipeline by making the analysis directory
1568    /// non-writable after an initial build, then attempting a rebuild.
1569    /// The pipeline should:
1570    ///   1. Remove the old manifest (Step 2) — making `exists()` false.
1571    ///   2. Write the new snapshot (Step 3).
1572    ///   3. Fail at analysis persistence (Step 9) because the directory
1573    ///      is not writable.
1574    ///   4. Return an error — manifest is NEVER written.
1575    ///
1576    /// After the failed rebuild, `storage.exists()` must be false (old
1577    /// manifest removed), even though the snapshot file was updated.
1578    #[test]
1579    fn test_failed_rebuild_leaves_index_not_ready() {
1580        use crate::graph::unified::persistence::GraphStorage;
1581
1582        let temp_dir = tempfile::TempDir::new().unwrap();
1583        let src = temp_dir.path().join("lib.rs");
1584        std::fs::write(&src, "fn main() {}").unwrap();
1585
1586        // Build an initial index (success)
1587        let mut plugins = PluginManager::new();
1588        plugins.register_builtin(Box::new(TestPlugin::new(
1589            "rust-simple",
1590            RUST_TEST_EXTENSIONS,
1591            Some(Box::new(SimpleGraphBuilder)),
1592        )));
1593        let config = BuildConfig::default();
1594        build_and_persist_graph(temp_dir.path(), &plugins, &config, "test:initial").unwrap();
1595
1596        let storage = GraphStorage::new(temp_dir.path());
1597        assert!(
1598            storage.exists(),
1599            "Manifest should exist after initial build"
1600        );
1601
1602        // Replace the analysis directory with a regular file to force a
1603        // failure at Step 9 (analysis persistence). `create_dir_all` will
1604        // fail because a regular file exists where a directory is expected.
1605        // This simulates the real failure window between snapshot write
1606        // (Step 3) and manifest write (Step 10).
1607        let analysis_dir = storage.analysis_dir().to_path_buf();
1608        std::fs::remove_dir_all(&analysis_dir).unwrap();
1609        std::fs::write(&analysis_dir, b"blocker").unwrap();
1610
1611        // Attempt rebuild — should fail at analysis persistence
1612        let result =
1613            build_and_persist_graph(temp_dir.path(), &plugins, &config, "test:failed_rebuild");
1614
1615        // Restore analysis dir so TempDir cleanup succeeds
1616        std::fs::remove_file(&analysis_dir).unwrap();
1617        std::fs::create_dir_all(&analysis_dir).unwrap();
1618
1619        // The build should have failed
1620        assert!(
1621            result.is_err(),
1622            "Rebuild should fail when analysis dir is read-only"
1623        );
1624
1625        // The old manifest should have been removed (Step 2 ran before failure)
1626        assert!(
1627            !storage.exists(),
1628            "After failed rebuild, manifest should have been removed — index is NOT ready"
1629        );
1630
1631        // The snapshot was updated (Step 3 succeeded before failure)
1632        assert!(
1633            storage.snapshot_exists(),
1634            "Snapshot should still exist on disk (written before failure)"
1635        );
1636    }
1637
1638    // ===== CSR Compaction Persistence Regression Tests =====
1639
1640    /// Graph builder that creates duplicate edges to exercise raw_edge_count > edge_count.
1641    struct DuplicateCallsGraphBuilder;
1642
1643    impl GraphBuilder for DuplicateCallsGraphBuilder {
1644        fn build_graph(
1645            &self,
1646            _tree: &Tree,
1647            _content: &[u8],
1648            file: &Path,
1649            staging: &mut StagingGraph,
1650        ) -> GraphResult<()> {
1651            use crate::graph::unified::build::helper::GraphBuildHelper;
1652
1653            let mut helper = GraphBuildHelper::new(staging, file, Language::Rust);
1654            let fn1 = helper.add_function("main", None, false, false);
1655            let fn2 = helper.add_function("helper", None, false, false);
1656
1657            // Add the same Calls edge twice to create a duplicate
1658            helper.add_call_edge(fn1, fn2);
1659            helper.add_call_edge(fn1, fn2);
1660
1661            Ok(())
1662        }
1663
1664        fn language(&self) -> Language {
1665            Language::Rust
1666        }
1667    }
1668
1669    /// Persisted snapshot has CSR on both stores and empty deltas.
1670    #[test]
1671    fn test_persisted_snapshot_compacts_both_edge_stores_before_save() {
1672        use crate::graph::unified::persistence::{GraphStorage, load_from_path};
1673
1674        let temp_dir = TempDir::new().expect("temp dir");
1675        let file_path = temp_dir.path().join("test.rs");
1676        fs::write(&file_path, "fn main() {} fn helper() {}").expect("write test file");
1677
1678        let mut plugins = PluginManager::new();
1679        plugins.register_builtin(Box::new(TestPlugin::new(
1680            "rust-simple",
1681            RUST_TEST_EXTENSIONS,
1682            Some(Box::new(SimpleGraphBuilder)),
1683        )));
1684        let config = BuildConfig::default();
1685
1686        let _result =
1687            build_and_persist_graph(temp_dir.path(), &plugins, &config, "test:csr_compact")
1688                .expect("build should succeed");
1689
1690        // Load the persisted snapshot and verify CSR state
1691        let storage = GraphStorage::new(temp_dir.path());
1692        let loaded = load_from_path(storage.snapshot_path(), None).expect("load should succeed");
1693
1694        assert!(
1695            loaded.edges().forward().csr().is_some(),
1696            "Forward store must have CSR after persistence"
1697        );
1698        assert!(
1699            loaded.edges().reverse().csr().is_some(),
1700            "Reverse store must have CSR after persistence"
1701        );
1702
1703        let stats = loaded.edges().stats();
1704        assert_eq!(
1705            stats.forward.delta_edge_count, 0,
1706            "Forward delta must be empty after persistence"
1707        );
1708        assert_eq!(
1709            stats.reverse.delta_edge_count, 0,
1710            "Reverse delta must be empty after persistence"
1711        );
1712    }
1713
1714    /// Loaded snapshot supports reverse traversal (direct-callers / edges_to).
1715    #[test]
1716    fn test_loaded_snapshot_edges_to_works_after_round_trip() {
1717        use crate::graph::unified::edge::EdgeKind;
1718        use crate::graph::unified::persistence::{GraphStorage, load_from_path};
1719
1720        let temp_dir = TempDir::new().expect("temp dir");
1721        let file_path = temp_dir.path().join("test.rs");
1722        fs::write(&file_path, "fn main() {} fn helper() {}").expect("write test file");
1723
1724        let mut plugins = PluginManager::new();
1725        plugins.register_builtin(Box::new(TestPlugin::new(
1726            "rust-simple",
1727            RUST_TEST_EXTENSIONS,
1728            Some(Box::new(SimpleGraphBuilder)),
1729        )));
1730        let config = BuildConfig::default();
1731
1732        build_and_persist_graph(temp_dir.path(), &plugins, &config, "test:round_trip")
1733            .expect("build should succeed");
1734
1735        let storage = GraphStorage::new(temp_dir.path());
1736        let loaded = load_from_path(storage.snapshot_path(), None).expect("load should succeed");
1737
1738        // Find main and helper node IDs through symbol resolution
1739        use crate::graph::unified::{
1740            FileScope, ResolutionMode, SymbolCandidateOutcome, SymbolQuery,
1741        };
1742        let snapshot = loaded.snapshot();
1743
1744        let main_id = match snapshot.find_symbol_candidates(&SymbolQuery {
1745            symbol: "main",
1746            file_scope: FileScope::Any,
1747            mode: ResolutionMode::AllowSuffixCandidates,
1748        }) {
1749            SymbolCandidateOutcome::Candidates(ids) => ids[0],
1750            _ => panic!("main node must exist"),
1751        };
1752
1753        let helper_id = match snapshot.find_symbol_candidates(&SymbolQuery {
1754            symbol: "helper",
1755            file_scope: FileScope::Any,
1756            mode: ResolutionMode::AllowSuffixCandidates,
1757        }) {
1758            SymbolCandidateOutcome::Candidates(ids) => ids[0],
1759            _ => panic!("helper node must exist"),
1760        };
1761
1762        // Forward: main -> helper
1763        let forward_edges = loaded.edges().edges_from(main_id);
1764        let has_call = forward_edges
1765            .iter()
1766            .any(|e| e.target == helper_id && matches!(e.kind, EdgeKind::Calls { .. }));
1767        assert!(has_call, "Forward traversal: main should call helper");
1768
1769        // Reverse: helper <- main (the critical regression check)
1770        let reverse_edges = loaded.edges().edges_to(helper_id);
1771        let has_caller = reverse_edges
1772            .iter()
1773            .any(|e| e.source == main_id && matches!(e.kind, EdgeKind::Calls { .. }));
1774        assert!(
1775            has_caller,
1776            "Reverse traversal: helper should have main as caller"
1777        );
1778    }
1779
1780    /// raw_edge_count >= edge_count still holds after pre-save compaction.
1781    #[test]
1782    fn test_raw_edge_count_preserved_across_pre_save_compaction() {
1783        use crate::graph::unified::persistence::GraphStorage;
1784
1785        let temp_dir = TempDir::new().expect("temp dir");
1786        let file_path = temp_dir.path().join("test.rs");
1787        fs::write(&file_path, "fn main() {} fn helper() {}").expect("write test file");
1788
1789        let mut plugins = PluginManager::new();
1790        plugins.register_builtin(Box::new(TestPlugin::new(
1791            "rust-dup",
1792            RUST_TEST_EXTENSIONS,
1793            Some(Box::new(DuplicateCallsGraphBuilder)),
1794        )));
1795        let config = BuildConfig::default();
1796
1797        let (_graph, build_result) =
1798            build_and_persist_graph(temp_dir.path(), &plugins, &config, "test:raw_edge_count")
1799                .expect("build should succeed");
1800
1801        assert!(
1802            build_result.raw_edge_count > build_result.edge_count,
1803            "raw_edge_count ({}) must be > edge_count ({}) for duplicate builder",
1804            build_result.raw_edge_count,
1805            build_result.edge_count
1806        );
1807
1808        // Verify manifest matches
1809        let storage = GraphStorage::new(temp_dir.path());
1810        let manifest = storage.load_manifest().expect("manifest should load");
1811
1812        assert_eq!(
1813            manifest.raw_edge_count,
1814            Some(build_result.raw_edge_count),
1815            "Manifest raw_edge_count must match build result"
1816        );
1817        assert_eq!(
1818            manifest.edge_count, build_result.edge_count,
1819            "Manifest edge_count must match build result"
1820        );
1821    }
1822
1823    /// Full round-trip: build -> save -> load -> query produces correct results.
1824    #[test]
1825    fn test_build_save_load_query_round_trip_preserves_edge_queries() {
1826        use crate::graph::unified::persistence::{GraphStorage, load_from_path};
1827
1828        let temp_dir = TempDir::new().expect("temp dir");
1829        let file_path = temp_dir.path().join("test.rs");
1830        fs::write(&file_path, "fn main() {} fn helper() {}").expect("write test file");
1831
1832        let mut plugins = PluginManager::new();
1833        plugins.register_builtin(Box::new(TestPlugin::new(
1834            "rust-simple",
1835            RUST_TEST_EXTENSIONS,
1836            Some(Box::new(SimpleGraphBuilder)),
1837        )));
1838        let config = BuildConfig::default();
1839
1840        let (_original_graph, build_result) =
1841            build_and_persist_graph(temp_dir.path(), &plugins, &config, "test:full_round_trip")
1842                .expect("build should succeed");
1843
1844        // Load from disk
1845        let storage = GraphStorage::new(temp_dir.path());
1846        let loaded = load_from_path(storage.snapshot_path(), None).expect("load should succeed");
1847
1848        // Edge count on loaded graph should match dedup count
1849        assert_eq!(
1850            loaded.edge_count(),
1851            build_result.edge_count,
1852            "Loaded graph edge count must match build result dedup count"
1853        );
1854
1855        // Node count should match
1856        assert_eq!(
1857            loaded.node_count(),
1858            build_result.node_count,
1859            "Loaded graph node count must match build result"
1860        );
1861
1862        // Verify edge queries work on loaded graph
1863        use crate::graph::unified::edge::EdgeKind;
1864        use crate::graph::unified::{
1865            FileScope, ResolutionMode, SymbolCandidateOutcome, SymbolQuery,
1866        };
1867        let snapshot = loaded.snapshot();
1868
1869        let main_id = match snapshot.find_symbol_candidates(&SymbolQuery {
1870            symbol: "main",
1871            file_scope: FileScope::Any,
1872            mode: ResolutionMode::AllowSuffixCandidates,
1873        }) {
1874            SymbolCandidateOutcome::Candidates(ids) => {
1875                assert!(!ids.is_empty(), "main must exist");
1876                ids[0]
1877            }
1878            _ => panic!("main node must exist"),
1879        };
1880
1881        let helper_id = match snapshot.find_symbol_candidates(&SymbolQuery {
1882            symbol: "helper",
1883            file_scope: FileScope::Any,
1884            mode: ResolutionMode::AllowSuffixCandidates,
1885        }) {
1886            SymbolCandidateOutcome::Candidates(ids) => {
1887                assert!(!ids.is_empty(), "helper must exist");
1888                ids[0]
1889            }
1890            _ => panic!("helper node must exist"),
1891        };
1892
1893        // Forward query: main calls helper
1894        let fwd = loaded.edges().edges_from(main_id);
1895        let has_fwd_call = fwd
1896            .iter()
1897            .any(|e| e.target == helper_id && matches!(e.kind, EdgeKind::Calls { .. }));
1898        assert!(has_fwd_call, "edges_from(main) must include call to helper");
1899
1900        // Reverse query: helper called by main
1901        let rev = loaded.edges().edges_to(helper_id);
1902        let has_rev_call = rev
1903            .iter()
1904            .any(|e| e.source == main_id && matches!(e.kind, EdgeKind::Calls { .. }));
1905        assert!(has_rev_call, "edges_to(helper) must include caller main");
1906    }
1907}