use crate::{
analysis::call_graph::RustCallGraphBuilder,
analyzers::rust_call_graph::extract_call_graph_multi_file,
config,
core::Language,
io,
priority::{
call_graph::{CallGraph, FunctionId},
parallel_call_graph::{ParallelCallGraph, ParallelConfig},
},
};
use anyhow::{Context, Result};
use rayon::prelude::*;
use std::collections::HashSet;
use std::path::{Path, PathBuf};
use std::sync::Arc;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum CallGraphPhase {
DiscoveringFiles,
ParsingASTs,
ExtractingCalls,
LinkingModules,
}
#[derive(Debug, Clone)]
pub struct CallGraphProgress {
pub phase: CallGraphPhase,
pub current: usize,
pub total: usize,
}
pub struct ParallelCallGraphBuilder;
impl Default for ParallelCallGraphBuilder {
fn default() -> Self {
Self::new()
}
}
impl ParallelCallGraphBuilder {
pub fn new() -> Self {
Self
}
pub fn with_config(_config: ParallelConfig) -> Self {
Self
}
pub fn build_parallel<F>(
&self,
project_path: &Path,
base_graph: CallGraph,
progress_callback: F,
) -> Result<(CallGraph, HashSet<FunctionId>, HashSet<FunctionId>)>
where
F: FnMut(CallGraphProgress) + Send + Sync,
{
self.build_parallel_with_files(project_path, base_graph, None, progress_callback)
}
pub fn build_parallel_with_files<F>(
&self,
project_path: &Path,
base_graph: CallGraph,
rust_files: Option<&[PathBuf]>,
mut progress_callback: F,
) -> Result<(CallGraph, HashSet<FunctionId>, HashSet<FunctionId>)>
where
F: FnMut(CallGraphProgress) + Send + Sync,
{
let discovered_files: Vec<PathBuf>;
let rust_files = match rust_files {
Some(files) => {
log::info!("Using {} pre-discovered Rust files", files.len());
files
}
None => {
progress_callback(CallGraphProgress {
phase: CallGraphPhase::DiscoveringFiles,
current: 0,
total: 0,
});
let config = config::get_config();
discovered_files = io::walker::find_project_files_with_config(
project_path,
vec![Language::Rust],
config,
)
.context("Failed to find Rust files for call graph")?;
log::info!("Discovered {} Rust files", discovered_files.len());
progress_callback(CallGraphProgress {
phase: CallGraphPhase::DiscoveringFiles,
current: discovered_files.len(),
total: discovered_files.len(),
});
&discovered_files
}
};
let total_files = rust_files.len();
log::info!("Processing {} Rust files in parallel", total_files);
let parallel_graph = Arc::new(ParallelCallGraph::new(total_files));
parallel_graph.merge_concurrent(base_graph);
const BATCH_SIZE: usize = 200;
let mut all_framework_exclusions = HashSet::new();
let mut all_function_pointer_used = HashSet::new();
let mut files_processed = 0;
std::thread::sleep(std::time::Duration::from_millis(150));
for batch in rust_files.chunks(BATCH_SIZE) {
let batch_start = files_processed;
let batch_end = batch_start + batch.len();
progress_callback(CallGraphProgress {
phase: CallGraphPhase::ParsingASTs,
current: batch_start,
total: total_files,
});
let parsed_files = self.parallel_parse_files_batch(batch, ¶llel_graph)?;
progress_callback(CallGraphProgress {
phase: CallGraphPhase::ExtractingCalls,
current: batch_start,
total: total_files,
});
self.parallel_multi_file_extraction(&parsed_files, ¶llel_graph)?;
let (batch_framework_exclusions, batch_function_pointer_used) =
self.parallel_enhanced_analysis(&parsed_files, ¶llel_graph)?;
all_framework_exclusions.extend(batch_framework_exclusions);
all_function_pointer_used.extend(batch_function_pointer_used);
files_processed = batch_end;
crate::core::parsing::reset_span_locations();
log::debug!(
"Processed batch {}/{} ({} files)",
batch_end,
total_files,
batch.len()
);
}
progress_callback(CallGraphProgress {
phase: CallGraphPhase::LinkingModules,
current: 0,
total: 0,
});
let mut final_graph = parallel_graph.to_call_graph();
final_graph.resolve_cross_file_calls();
let stats = parallel_graph.stats();
log::info!(
"Parallel call graph complete: {} nodes, {} edges, {} files processed in {} batches",
stats.total_nodes.load(std::sync::atomic::Ordering::Relaxed),
stats.total_edges.load(std::sync::atomic::Ordering::Relaxed),
stats
.files_processed
.load(std::sync::atomic::Ordering::Relaxed),
total_files.div_ceil(BATCH_SIZE),
);
Ok((
final_graph,
all_framework_exclusions,
all_function_pointer_used,
))
}
fn parallel_parse_files_batch(
&self,
batch: &[PathBuf],
parallel_graph: &Arc<ParallelCallGraph>,
) -> Result<Vec<(PathBuf, syn::File)>> {
let file_contents: Vec<_> = batch
.par_iter()
.filter_map(|file_path| {
io::read_file(file_path)
.map_err(|e| {
log::warn!("Failed to read file {}: {}", file_path.display(), e);
e
})
.ok()
.map(|content| (file_path.clone(), content))
})
.collect();
let parsed_files: Vec<_> = file_contents
.iter()
.filter_map(|(file_path, content)| {
let parsed = syn::parse_file(content).ok()?;
parallel_graph.stats().increment_files();
Some((file_path.clone(), parsed))
})
.collect();
Ok(parsed_files)
}
#[allow(dead_code)]
fn parallel_parse_files_with_progress<F>(
&self,
rust_files: &[PathBuf],
parallel_graph: &Arc<ParallelCallGraph>,
progress_callback: &mut F,
) -> Result<Vec<(PathBuf, syn::File)>>
where
F: FnMut(CallGraphProgress) + Send + Sync,
{
use std::sync::atomic::{AtomicUsize, Ordering};
let file_contents: Vec<_> = rust_files
.par_iter()
.filter_map(|file_path| {
let content = io::read_file(file_path)
.map_err(|e| {
eprintln!(
"Warning: Failed to read file {}: {}",
file_path.display(),
e
);
e
})
.ok()?;
Some((file_path.clone(), content))
})
.collect();
let total_files = file_contents.len();
let parsed_count = Arc::new(AtomicUsize::new(0));
let parsed_files: Vec<_> = file_contents
.iter()
.enumerate()
.filter_map(|(idx, (file_path, content))| {
let parsed = syn::parse_file(content).ok()?;
parallel_graph.stats().increment_files();
let count = parsed_count.fetch_add(1, Ordering::Relaxed) + 1;
if count % 10 == 0 || count == total_files {
progress_callback(CallGraphProgress {
phase: CallGraphPhase::ParsingASTs,
current: count,
total: total_files,
});
}
crate::io::progress::AnalysisProgress::with_global(|p| {
p.update_progress(crate::io::progress::PhaseProgress::Progress {
current: idx + 1,
total: total_files,
});
});
Some((file_path.clone(), parsed))
})
.collect();
Ok(parsed_files)
}
fn parallel_multi_file_extraction(
&self,
parsed_files: &[(PathBuf, syn::File)],
parallel_graph: &Arc<ParallelCallGraph>,
) -> Result<()> {
let files_for_extraction: Vec<_> = parsed_files
.iter()
.map(|(path, parsed)| (parsed.clone(), path.clone()))
.collect();
let graph = extract_call_graph_multi_file(&files_for_extraction);
parallel_graph.merge_concurrent(graph);
Ok(())
}
fn parallel_enhanced_analysis(
&self,
parsed_files: &[(PathBuf, syn::File)],
parallel_graph: &Arc<ParallelCallGraph>,
) -> Result<(HashSet<FunctionId>, HashSet<FunctionId>)> {
let workspace_files: Vec<(PathBuf, syn::File)> = parsed_files
.iter()
.map(|(path, parsed)| (path.clone(), parsed.clone()))
.collect();
let base_graph = parallel_graph.to_call_graph();
let mut enhanced_builder = RustCallGraphBuilder::from_base_graph(base_graph);
for (file_path, parsed) in &workspace_files {
enhanced_builder
.analyze_basic_calls(file_path, parsed)?
.analyze_trait_dispatch(file_path, parsed)?
.analyze_function_pointers(file_path, parsed)?
.analyze_framework_patterns(file_path, parsed)?;
}
enhanced_builder.analyze_cross_module(&workspace_files)?;
enhanced_builder.finalize_trait_analysis()?;
let enhanced_graph = enhanced_builder.build();
let framework_exclusions: HashSet<FunctionId> = enhanced_graph
.framework_patterns
.get_exclusions()
.into_iter()
.collect();
let function_pointer_used: HashSet<FunctionId> = enhanced_graph
.function_pointer_tracker
.get_definitely_used_functions()
.into_iter()
.collect();
parallel_graph.merge_concurrent(enhanced_graph.base_graph);
Ok((framework_exclusions, function_pointer_used))
}
}
pub fn build_call_graph_parallel<F>(
project_path: &Path,
base_graph: CallGraph,
num_threads: Option<usize>,
progress_callback: F,
) -> Result<(CallGraph, HashSet<FunctionId>, HashSet<FunctionId>)>
where
F: FnMut(CallGraphProgress) + Send + Sync,
{
build_call_graph_parallel_with_files(
project_path,
base_graph,
num_threads,
None,
progress_callback,
)
}
pub fn build_call_graph_parallel_with_files<F>(
project_path: &Path,
base_graph: CallGraph,
num_threads: Option<usize>,
rust_files: Option<&[PathBuf]>,
progress_callback: F,
) -> Result<(CallGraph, HashSet<FunctionId>, HashSet<FunctionId>)>
where
F: FnMut(CallGraphProgress) + Send + Sync,
{
let mut config = ParallelConfig::default();
if let Some(threads) = num_threads {
config = config.with_threads(threads);
}
let builder = ParallelCallGraphBuilder::with_config(config);
builder.build_parallel_with_files(project_path, base_graph, rust_files, progress_callback)
}
use crate::extraction::ExtractedFileData;
use std::collections::HashMap;
pub fn build_call_graph_from_extracted(
base_graph: CallGraph,
extracted: &HashMap<PathBuf, ExtractedFileData>,
) -> (CallGraph, HashSet<FunctionId>, HashSet<FunctionId>) {
use crate::priority::call_graph::CallType as GraphCallType;
let parallel_graph =
Arc::new(crate::priority::parallel_call_graph::ParallelCallGraph::new(extracted.len()));
parallel_graph.merge_concurrent(base_graph);
let mut sorted_extracted: Vec<_> = extracted.iter().collect();
sorted_extracted.sort_by(|a, b| a.0.cmp(b.0));
for (path, file_data) in sorted_extracted {
for func in &file_data.functions {
let func_id = FunctionId::new(path.clone(), func.qualified_name.clone(), func.line);
parallel_graph.add_function(
func_id.clone(),
false, func.is_test,
func.cyclomatic,
func.length,
);
for call_site in &func.calls {
let callee_id = resolve_callee_from_extracted(
&call_site.callee_name,
&call_site.call_type,
path,
extracted,
);
if let Some(callee) = callee_id {
parallel_graph.add_call(func_id.clone(), callee, GraphCallType::Direct);
}
}
}
parallel_graph.stats().increment_files();
}
let mut final_graph = parallel_graph.to_call_graph();
final_graph.resolve_cross_file_calls();
let framework_exclusions = HashSet::new();
let function_pointer_used = HashSet::new();
log::info!(
"Call graph from extracted data: {} nodes in {} files",
parallel_graph
.stats()
.total_nodes
.load(std::sync::atomic::Ordering::Relaxed),
extracted.len()
);
(final_graph, framework_exclusions, function_pointer_used)
}
fn resolve_callee_from_extracted(
callee_name: &str,
call_type: &crate::extraction::CallType,
caller_file: &Path,
extracted: &HashMap<PathBuf, ExtractedFileData>,
) -> Option<FunctionId> {
use crate::extraction::CallType;
match call_type {
CallType::Direct | CallType::StaticMethod | CallType::TraitMethod => {
if let Some(file_data) = extracted.get(caller_file) {
for func in &file_data.functions {
if func.qualified_name == callee_name || func.name == callee_name {
return Some(FunctionId::new(
caller_file.to_path_buf(),
func.qualified_name.clone(),
func.line,
));
}
}
}
let mut sorted_files: Vec<_> = extracted.iter().collect();
sorted_files.sort_by(|a, b| a.0.cmp(b.0));
for (path, file_data) in sorted_files {
for func in &file_data.functions {
if func.qualified_name == callee_name {
return Some(FunctionId::new(
path.clone(),
func.qualified_name.clone(),
func.line,
));
}
}
}
None
}
CallType::Method => {
let mut sorted_files: Vec<_> = extracted.iter().collect();
sorted_files.sort_by(|a, b| a.0.cmp(b.0));
for (path, file_data) in sorted_files {
for func in &file_data.functions {
if func.name == callee_name {
return Some(FunctionId::new(
path.clone(),
func.qualified_name.clone(),
func.line,
));
}
}
}
None
}
CallType::Closure | CallType::FunctionPointer => {
None
}
}
}