use super::dependency_builder::DependencyGraphBuilder;
use super::graph::DependencyGraph;
use super::storage::{StorageBackend, StorageError};
use super::types::AnalysisDefFingerprint;
use futures::stream::{self, StreamExt};
use metrics::{counter, gauge, histogram};
use std::path::{Path, PathBuf};
use std::time::Instant;
use thread_utilities::RapidSet;
use tracing::{debug, info, instrument, warn};
#[derive(Debug, thiserror::Error)]
pub enum AnalyzerError {
#[error("Storage error: {0}")]
Storage(String),
#[error("Fingerprint error: {0}")]
Fingerprint(String),
#[error("Graph error: {0}")]
Graph(String),
#[error("IO error: {0}")]
Io(#[from] std::io::Error),
#[error("Extraction failed for {file}: {error}")]
ExtractionFailed { file: PathBuf, error: String },
}
impl From<StorageError> for AnalyzerError {
fn from(err: StorageError) -> Self {
AnalyzerError::Storage(err.to_string())
}
}
#[derive(Debug, Clone)]
pub struct AnalysisResult {
pub changed_files: Vec<PathBuf>,
pub affected_files: Vec<PathBuf>,
pub analysis_time_us: u64,
pub cache_hit_rate: f64,
}
impl AnalysisResult {
fn empty() -> Self {
Self {
changed_files: Vec::new(),
affected_files: Vec::new(),
analysis_time_us: 0,
cache_hit_rate: 0.0,
}
}
}
pub struct IncrementalAnalyzer {
storage: Box<dyn StorageBackend>,
dependency_graph: DependencyGraph,
}
impl IncrementalAnalyzer {
#[instrument(skip(storage), fields(storage_type = storage.name()))]
pub fn new(storage: Box<dyn StorageBackend>) -> Self {
Self {
storage,
dependency_graph: DependencyGraph::new(),
}
}
pub async fn from_storage(storage: Box<dyn StorageBackend>) -> Result<Self, AnalyzerError> {
let dependency_graph = storage.load_full_graph().await?;
Ok(Self {
storage,
dependency_graph,
})
}
pub async fn analyze_changes(
&mut self,
paths: &[PathBuf],
) -> Result<AnalysisResult, AnalyzerError> {
let start = Instant::now();
info!("analyzing {} files for changes", paths.len());
if paths.is_empty() {
return Ok(AnalysisResult::empty());
}
let concurrency = std::thread::available_parallelism()
.map(|n| n.get())
.unwrap_or(4);
let paths_owned = paths.to_vec();
let file_data = stream::iter(paths_owned)
.map(|path| async move {
let content = tokio::fs::read(&path).await?;
let fp = AnalysisDefFingerprint::new(&content);
Ok::<(PathBuf, AnalysisDefFingerprint), std::io::Error>((path, fp))
})
.buffer_unordered(concurrency)
.collect::<Vec<Result<_, _>>>()
.await;
let mut changed_files = Vec::new();
let mut cache_hits = 0;
let mut cache_total = 0;
for data in file_data {
let (path, current_fp) = data.map_err(AnalyzerError::Io)?;
debug!(file_path = ?path, "analyzing file");
let stored_fp = self.storage.load_fingerprint(&path).await?;
cache_total += 1;
match stored_fp {
Some(stored) => {
if stored.fingerprint().as_slice() != current_fp.fingerprint().as_slice() {
info!(file = ?path, "cache miss - content changed");
counter!("cache_misses_total").increment(1);
changed_files.push(path.clone());
let _ = self.storage.save_fingerprint(&path, ¤t_fp).await;
} else {
info!(file = ?path, "cache hit");
counter!("cache_hits_total").increment(1);
cache_hits += 1;
}
}
None => {
info!(file = ?path, "cache miss - new file");
counter!("cache_misses_total").increment(1);
changed_files.push(path.clone());
let _ = self.storage.save_fingerprint(&path, ¤t_fp).await;
}
}
}
let cache_hit_rate = if cache_total > 0 {
cache_hits as f64 / cache_total as f64
} else {
0.0
};
let analysis_time_us = start.elapsed().as_micros() as u64;
histogram!("analysis_overhead_ms").record((analysis_time_us as f64) / 1000.0);
gauge!("cache_hit_rate").set(cache_hit_rate);
info!(
changed_files = changed_files.len(),
cache_hit_rate = %format!("{:.1}%", cache_hit_rate * 100.0),
duration_ms = analysis_time_us / 1000,
"analysis complete"
);
Ok(AnalysisResult {
changed_files,
affected_files: Vec::new(), analysis_time_us,
cache_hit_rate,
})
}
pub async fn invalidate_dependents(
&self,
changed: &[PathBuf],
) -> Result<Vec<PathBuf>, AnalyzerError> {
if changed.is_empty() {
return Ok(Vec::new());
}
let changed_set: RapidSet<PathBuf> = changed.iter().cloned().collect();
let affected_set = self.dependency_graph.find_affected_files(&changed_set);
Ok(affected_set.into_iter().collect())
}
pub async fn reanalyze_invalidated(&mut self, files: &[PathBuf]) -> Result<(), AnalyzerError> {
if files.is_empty() {
return Ok(());
}
let file_set: RapidSet<PathBuf> = files.iter().cloned().collect();
let sorted_files = self
.dependency_graph
.topological_sort(&file_set)
.map_err(|e| AnalyzerError::Graph(e.to_string()))?;
let mut builder = DependencyGraphBuilder::new(Box::new(DummyStorage));
for file in &sorted_files {
if !tokio::fs::try_exists(file).await.unwrap_or(false) {
continue;
}
match tokio::fs::read(file).await {
Ok(content) => {
let fingerprint = AnalysisDefFingerprint::new(&content);
if let Err(e) = self.storage.save_fingerprint(file, &fingerprint).await {
eprintln!(
"Warning: Failed to save fingerprint for {}: {}",
file.display(),
e
);
continue;
}
match builder.extract_file(file).await {
Ok(_) => {
}
Err(e) => {
eprintln!(
"Warning: Dependency extraction failed for {}: {}",
file.display(),
e
);
self.dependency_graph.add_node(file);
}
}
}
Err(e) => {
eprintln!("Warning: Failed to read file {}: {}", file.display(), e);
continue;
}
}
}
for file in &sorted_files {
let _ = self.storage.delete_edges_for(file).await;
}
let new_graph = builder.graph();
for edge in &new_graph.edges {
if file_set.contains(&edge.from) || file_set.contains(&edge.to) {
self.dependency_graph.add_edge(edge.clone());
if let Err(e) = self.storage.save_edge(edge).await {
eprintln!("Warning: Failed to save edge: {}", e);
}
}
}
for file in &sorted_files {
if let Some(fp) = new_graph.nodes.get(file) {
self.dependency_graph.nodes.insert(file.clone(), fp.clone());
}
}
Ok(())
}
pub fn graph(&self) -> &DependencyGraph {
&self.dependency_graph
}
pub fn graph_mut(&mut self) -> &mut DependencyGraph {
&mut self.dependency_graph
}
pub async fn persist(&self) -> Result<(), AnalyzerError> {
self.storage.save_full_graph(&self.dependency_graph).await?;
Ok(())
}
}
#[derive(Debug)]
struct DummyStorage;
#[async_trait::async_trait]
impl StorageBackend for DummyStorage {
async fn save_fingerprint(
&self,
_file_path: &Path,
_fingerprint: &AnalysisDefFingerprint,
) -> Result<(), StorageError> {
Ok(())
}
async fn load_fingerprint(
&self,
_file_path: &Path,
) -> Result<Option<AnalysisDefFingerprint>, StorageError> {
Ok(None)
}
async fn delete_fingerprint(&self, _file_path: &Path) -> Result<bool, StorageError> {
Ok(false)
}
async fn save_edge(&self, _edge: &super::types::DependencyEdge) -> Result<(), StorageError> {
Ok(())
}
async fn load_edges_from(
&self,
_file_path: &Path,
) -> Result<Vec<super::types::DependencyEdge>, StorageError> {
Ok(Vec::new())
}
async fn load_edges_to(
&self,
_file_path: &Path,
) -> Result<Vec<super::types::DependencyEdge>, StorageError> {
Ok(Vec::new())
}
async fn delete_edges_for(&self, _file_path: &Path) -> Result<usize, StorageError> {
Ok(0)
}
async fn load_full_graph(&self) -> Result<DependencyGraph, StorageError> {
Ok(DependencyGraph::new())
}
async fn save_full_graph(&self, _graph: &DependencyGraph) -> Result<(), StorageError> {
Ok(())
}
fn name(&self) -> &'static str {
"dummy"
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::incremental::storage::InMemoryStorage;
use crate::incremental::types::DependencyEdge;
#[tokio::test]
async fn test_analyzer_new_creates_empty_graph() {
let storage = Box::new(InMemoryStorage::new());
let analyzer = IncrementalAnalyzer::new(storage);
assert_eq!(analyzer.graph().node_count(), 0);
assert_eq!(analyzer.graph().edge_count(), 0);
}
#[tokio::test]
async fn test_analyzer_from_storage_loads_graph() {
let storage = Box::new(InMemoryStorage::new());
let mut graph = DependencyGraph::new();
graph.add_edge(DependencyEdge::new(
PathBuf::from("a.rs"),
PathBuf::from("b.rs"),
super::super::types::DependencyType::Import,
));
storage.save_full_graph(&graph).await.unwrap();
let analyzer = IncrementalAnalyzer::from_storage(storage).await.unwrap();
assert_eq!(analyzer.graph().node_count(), 2);
assert_eq!(analyzer.graph().edge_count(), 1);
}
#[tokio::test]
async fn test_analysis_result_empty() {
let result = AnalysisResult::empty();
assert_eq!(result.changed_files.len(), 0);
assert_eq!(result.affected_files.len(), 0);
assert_eq!(result.analysis_time_us, 0);
assert_eq!(result.cache_hit_rate, 0.0);
}
}