use std::collections::HashMap;
use std::path::Path;
use std::time::Instant;
use crate::core::{Language, ParsedFile};
use crate::graph::{CodeGraph, GraphBuilder};
use crate::parsers::ParserRegistry;
use rayon::prelude::*;
use tracing::{info, warn};
use super::diff_analyzer::DiffInfo;
use super::scanner::{FileScanner, SourceFile};
use crate::config::cache::{CacheConfig, CacheStore};
#[derive(Debug, Clone)]
pub struct PipelineConfig {
pub max_file_size: u64,
pub exclude_patterns: Vec<String>,
pub include_tests: bool,
pub parallel: bool,
}
impl Default for PipelineConfig {
fn default() -> Self {
Self {
max_file_size: 1024 * 1024,
exclude_patterns: Vec::new(),
include_tests: true,
parallel: true,
}
}
}
#[derive(Debug)]
pub struct PipelineResult {
pub graph: CodeGraph,
pub parsed_files: Vec<ParsedFile>,
pub errors: Vec<(String, String)>,
pub files_scanned: usize,
pub files_parsed: usize,
pub total_lines: usize,
pub duration_ms: u64,
}
pub struct Pipeline {
config: PipelineConfig,
}
impl Pipeline {
pub fn new(config: PipelineConfig) -> Self {
Self { config }
}
pub fn with_defaults() -> Self {
Self {
config: PipelineConfig::default(),
}
}
pub fn run(&self, root: &Path) -> Result<PipelineResult, crate::core::Error> {
let start = Instant::now();
let scanner = FileScanner::new().with_max_file_size(self.config.max_file_size);
let files = scanner.scan(root)?;
let files_scanned = files.len();
info!("Scanned {} source files", files_scanned);
let files = group_by_directory(files);
let registry = ParserRegistry::with_defaults()?;
let parse_results: Vec<Result<ParsedFile, (String, String)>> = if self.config.parallel {
let chunk_size = compute_chunk_size(files_scanned);
files
.par_chunks(chunk_size)
.flat_map(|chunk| {
chunk
.iter()
.map(|f| parse_file(f, ®istry))
.collect::<Vec<_>>()
})
.collect()
} else {
files.iter().map(|f| parse_file(f, ®istry)).collect()
};
let mut parsed_files = Vec::new();
let mut errors = Vec::new();
let mut total_lines = 0usize;
for result in parse_results {
match result {
Ok(pf) => {
total_lines += pf.source.lines().count();
parsed_files.push(pf);
}
Err((path, err)) => {
warn!("Failed to parse {}: {}", path, err);
errors.push((path, err));
}
}
}
let files_parsed = parsed_files.len();
info!("Parsed {} files ({} errors)", files_parsed, errors.len());
let builder = GraphBuilder::new()?;
let graph = builder.build_project_graph(&parsed_files)?;
info!(
"Built graph: {} nodes, {} edges",
graph.node_count(),
graph.edge_count()
);
let duration_ms = start.elapsed().as_millis() as u64;
Ok(PipelineResult {
graph,
parsed_files,
errors,
files_scanned,
files_parsed,
total_lines,
duration_ms,
})
}
pub fn parse_single_file(
&self,
file_path: &Path,
) -> Result<(ParsedFile, CodeGraph), crate::core::Error> {
let ext = file_path
.extension()
.and_then(|e| e.to_str())
.ok_or_else(|| crate::core::Error::parse("No file extension"))?;
let language = Language::from_extension(ext)
.ok_or_else(|| crate::core::Error::parse(format!("Unsupported extension: {ext}")))?;
let source = std::fs::read_to_string(file_path)
.map_err(|e| crate::core::Error::analysis(format!("IO error: {e}")))?;
let registry = ParserRegistry::with_defaults()?;
let parser = registry
.get_parser_for_extension(ext)
.or_else(|| registry.get_parser(language))
.ok_or_else(|| {
crate::core::Error::parse(format!("No parser for {}", language.name()))
})?;
let parsed = parser.parse_file(file_path.to_str().unwrap_or("unknown"), &source)?;
let builder = GraphBuilder::new()?;
let graph = builder.build_from_parsed_file(&parsed)?;
Ok((parsed, graph))
}
pub fn run_streaming(&self, root: &Path) -> Result<PipelineResult, crate::core::Error> {
let start = Instant::now();
let scanner = FileScanner::new().with_max_file_size(self.config.max_file_size);
let files = scanner.scan(root)?;
let files_scanned = files.len();
info!("Scanned {} source files", files_scanned);
let files = group_by_directory(files);
let batch_size = 500; let registry = ParserRegistry::with_defaults()?;
let builder = GraphBuilder::new()?;
let mut project_graph = CodeGraph::new();
let mut all_errors = Vec::new();
let mut total_lines = 0usize;
let mut files_parsed = 0usize;
for batch in files.chunks(batch_size) {
let parse_results: Vec<Result<ParsedFile, (String, String)>> = if self.config.parallel {
let chunk_size = compute_chunk_size(batch.len());
batch
.par_chunks(chunk_size)
.flat_map(|chunk| {
chunk
.iter()
.map(|f| parse_file(f, ®istry))
.collect::<Vec<_>>()
})
.collect()
} else {
batch.iter().map(|f| parse_file(f, ®istry)).collect()
};
let mut batch_parsed = Vec::new();
for result in parse_results {
match result {
Ok(pf) => {
total_lines += pf.source.lines().count();
files_parsed += 1;
batch_parsed.push(pf);
}
Err((path, err)) => {
warn!("Failed to parse {}: {}", path, err);
all_errors.push((path, err));
}
}
}
if !batch_parsed.is_empty() {
let batch_graph = builder.build_project_graph(&batch_parsed)?;
project_graph.merge(&batch_graph);
}
let _rss = get_rss_mb();
}
let duration_ms = start.elapsed().as_millis() as u64;
Ok(PipelineResult {
graph: project_graph,
parsed_files: Vec::new(), errors: all_errors,
files_scanned,
files_parsed,
total_lines,
duration_ms,
})
}
pub fn run_with_diff(
&self,
root: &Path,
base_branch: &str,
cache_dir: Option<&Path>,
) -> Result<PipelineResult, crate::core::Error> {
let start = Instant::now();
let diff_output = std::process::Command::new("git")
.args(["diff", "--name-status", &format!("{}...HEAD", base_branch)])
.current_dir(root)
.output()
.map_err(|e| crate::core::Error::analysis(format!("Failed to run git diff: {}", e)))?;
if !diff_output.status.success() {
return Err(crate::core::Error::analysis(
"git diff command failed. Ensure you're in a git repository and the base branch exists."
.to_string(),
));
}
let diff_str = String::from_utf8_lossy(&diff_output.stdout);
let diff_info = DiffInfo::from_git_diff(&diff_str).map_err(crate::core::Error::analysis)?;
let changed_files = diff_info.changed_file_strings();
info!("Detected {} changed file(s)", changed_files.len());
let scanner = FileScanner::new().with_max_file_size(self.config.max_file_size);
let all_files = scanner.scan(root)?;
let files_scanned = all_files.len();
info!("Scanned {} total source files", files_scanned);
let cache_store = if let Some(dir) = cache_dir {
let config = CacheConfig {
enabled: true,
cache_dir: Some(dir.to_string_lossy().to_string()),
ttl_hours: 168,
};
Some(CacheStore::new(&config)?)
} else {
None
};
let (changed_source_files, unchanged_source_files): (Vec<_>, Vec<_>) =
all_files.into_iter().partition(|f| {
let path_str = f.path.to_string_lossy().to_string();
changed_files.iter().any(|cf| path_str.ends_with(cf))
});
info!(
"Changed: {} files, Unchanged: {} files",
changed_source_files.len(),
unchanged_source_files.len()
);
let cached_graph = CodeGraph::new();
let _cache_hits = 0;
if let Some(ref _store) = cache_store {
}
let mut all_source_files = unchanged_source_files;
all_source_files.extend(changed_source_files);
let files_to_parse = group_by_directory(all_source_files);
let registry = ParserRegistry::with_defaults()?;
let parse_results: Vec<Result<ParsedFile, (String, String)>> = if self.config.parallel {
let chunk_size = compute_chunk_size(files_to_parse.len());
files_to_parse
.par_chunks(chunk_size)
.flat_map(|chunk| {
chunk
.iter()
.map(|f| parse_file(f, ®istry))
.collect::<Vec<_>>()
})
.collect()
} else {
files_to_parse
.iter()
.map(|f| parse_file(f, ®istry))
.collect()
};
let mut parsed_files = Vec::new();
let mut errors = Vec::new();
let mut total_lines = 0usize;
for result in parse_results {
match result {
Ok(pf) => {
total_lines += pf.source.lines().count();
parsed_files.push(pf);
}
Err((path, err)) => {
warn!("Failed to parse {}: {}", path, err);
errors.push((path, err));
}
}
}
let files_parsed = parsed_files.len();
info!(
"Parsed {} changed files ({} errors)",
files_parsed,
errors.len()
);
let builder = GraphBuilder::new()?;
let mut fresh_graph = if !parsed_files.is_empty() {
builder.build_project_graph(&parsed_files)?
} else {
CodeGraph::new()
};
fresh_graph.merge(&cached_graph);
info!(
"Built graph: {} nodes, {} edges",
fresh_graph.node_count(),
fresh_graph.edge_count()
);
let duration_ms = start.elapsed().as_millis() as u64;
Ok(PipelineResult {
graph: fresh_graph,
parsed_files,
errors,
files_scanned,
files_parsed,
total_lines,
duration_ms,
})
}
}
pub fn get_rss_mb() -> u64 {
#[cfg(target_os = "linux")]
{
if let Ok(status) = std::fs::read_to_string("/proc/self/status") {
for line in status.lines() {
if line.starts_with("VmRSS:") {
if let Some(value_str) = line.split_whitespace().nth(1) {
if let Ok(kb) = value_str.parse::<u64>() {
return kb / 1024; }
}
}
}
}
}
0 }
fn compute_chunk_size(num_files: usize) -> usize {
let num_cpus = rayon::current_num_threads().max(1);
let ideal = num_files / (num_cpus * 2);
ideal.max(1)
}
fn group_by_directory(files: Vec<SourceFile>) -> Vec<SourceFile> {
let mut buckets: HashMap<Option<std::path::PathBuf>, Vec<SourceFile>> = HashMap::new();
for file in files {
let parent = file.path.parent().map(|p| p.to_path_buf());
buckets.entry(parent).or_default().push(file);
}
let mut grouped = Vec::new();
for (_, mut bucket) in buckets {
grouped.append(&mut bucket);
}
grouped
}
fn parse_file(
file: &SourceFile,
registry: &ParserRegistry,
) -> Result<ParsedFile, (String, String)> {
let path_str = file.path.to_string_lossy().to_string();
let source = std::fs::read_to_string(&file.path)
.map_err(|e| (path_str.clone(), format!("IO error: {e}")))?;
let ext = file.path.extension().and_then(|e| e.to_str()).unwrap_or("");
let parser = registry
.get_parser_for_extension(ext)
.or_else(|| registry.get_parser(file.language))
.ok_or_else(|| {
(
path_str.clone(),
format!("No parser for {}", file.language.name()),
)
})?;
parser
.parse_file(&path_str, &source)
.map_err(|e| (path_str, e.to_string()))
}
#[cfg(test)]
mod tests {
use super::*;
use std::fs;
use tempfile::TempDir;
#[test]
fn test_pipeline_basic() {
let dir = TempDir::new().unwrap();
fs::write(
dir.path().join("main.py"),
"def main():\n helper()\n\ndef helper():\n pass\n",
)
.unwrap();
let pipeline = Pipeline::with_defaults();
let result = pipeline.run(dir.path()).unwrap();
assert_eq!(result.files_scanned, 1);
assert_eq!(result.files_parsed, 1);
assert!(result.graph.node_count() >= 2);
assert!(result.errors.is_empty());
}
#[test]
fn test_pipeline_multi_file() {
let dir = TempDir::new().unwrap();
fs::write(dir.path().join("a.py"), "def a():\n pass\n").unwrap();
fs::write(dir.path().join("b.py"), "def b():\n pass\n").unwrap();
let pipeline = Pipeline::with_defaults();
let result = pipeline.run(dir.path()).unwrap();
assert_eq!(result.files_parsed, 2);
assert!(result.graph.node_count() >= 2);
}
#[test]
fn test_compute_chunk_size_minimum_one() {
assert!(compute_chunk_size(1) >= 1);
assert!(compute_chunk_size(0) >= 1);
}
#[test]
fn test_compute_chunk_size_scales() {
let size = compute_chunk_size(10_000);
assert!(size >= 1);
let num_cpus = rayon::current_num_threads().max(1);
let expected = 10_000 / (num_cpus * 2);
assert_eq!(size, expected.max(1));
}
#[test]
fn test_group_by_directory_preserves_files() {
use crate::analysis::scanner::SourceFile;
use crate::core::Language;
use std::path::PathBuf;
let files = vec![
SourceFile {
path: PathBuf::from("/project/src/a.py"),
language: Language::Python,
size_bytes: 100,
},
SourceFile {
path: PathBuf::from("/project/lib/b.py"),
language: Language::Python,
size_bytes: 200,
},
SourceFile {
path: PathBuf::from("/project/src/c.py"),
language: Language::Python,
size_bytes: 150,
},
];
let grouped = group_by_directory(files);
assert_eq!(grouped.len(), 3);
let src_indices: Vec<usize> = grouped
.iter()
.enumerate()
.filter(|(_, f)| f.path.parent().unwrap().ends_with("src"))
.map(|(i, _)| i)
.collect();
if src_indices.len() == 2 {
assert_eq!(src_indices[1] - src_indices[0], 1);
}
}
#[test]
fn test_pipeline_with_subdirectories() {
let dir = TempDir::new().unwrap();
let sub1 = dir.path().join("pkg1");
let sub2 = dir.path().join("pkg2");
fs::create_dir_all(&sub1).unwrap();
fs::create_dir_all(&sub2).unwrap();
fs::write(sub1.join("mod_a.py"), "def a():\n pass\n").unwrap();
fs::write(sub1.join("mod_b.py"), "def b():\n pass\n").unwrap();
fs::write(sub2.join("mod_c.py"), "def c():\n pass\n").unwrap();
let pipeline = Pipeline::with_defaults();
let result = pipeline.run(dir.path()).unwrap();
assert_eq!(result.files_scanned, 3);
assert_eq!(result.files_parsed, 3);
assert!(result.graph.node_count() >= 3);
assert!(result.errors.is_empty());
}
}