Skip to main content

thread_flow/incremental/
analyzer.rs

1// SPDX-FileCopyrightText: 2025 Knitli Inc. <knitli@knit.li>
2// SPDX-License-Identifier: AGPL-3.0-or-later
3
4//! Core incremental analysis coordinator (Phase 4.1).
5//!
6//! This module implements the [`IncrementalAnalyzer`], the main entry point for
7//! incremental code analysis. It coordinates:
8//!
9//! - **Change detection** via content-addressed fingerprinting (Blake3)
10//! - **Dependency invalidation** using BFS graph traversal
11//! - **Reanalysis orchestration** with topological sorting
12//! - **Storage persistence** for session continuity
13//!
14//! ## Performance Target
15//!
16//! <10ms incremental update overhead (Constitutional Principle VI)
17//! achieved through content-addressed caching with >90% hit rate.
18//!
19//! ## Usage Example
20//!
21//! ```rust,ignore
22//! use thread_flow::incremental::analyzer::IncrementalAnalyzer;
23//! use thread_flow::incremental::storage::InMemoryStorage;
24//!
25//! #[tokio::main]
26//! async fn main() {
27//!     let storage = Box::new(InMemoryStorage::new());
28//!     let mut analyzer = IncrementalAnalyzer::new(storage);
29//!
30//!     // Analyze changes
31//!     let result = analyzer.analyze_changes(&[
32//!         PathBuf::from("src/main.rs"),
33//!         PathBuf::from("src/utils.rs"),
34//!     ]).await.unwrap();
35//!
36//!     // Invalidate affected files
37//!     let affected = analyzer.invalidate_dependents(&result.changed_files).await.unwrap();
38//!
39//!     // Reanalyze invalidated files
40//!     analyzer.reanalyze_invalidated(&affected).await.unwrap();
41//! }
42//! ```
43
44use super::dependency_builder::DependencyGraphBuilder;
45use super::graph::DependencyGraph;
46use super::storage::{StorageBackend, StorageError};
47use super::types::AnalysisDefFingerprint;
48use futures::stream::{self, StreamExt};
49use metrics::{counter, gauge, histogram};
50use std::path::{Path, PathBuf};
51use std::time::Instant;
52use thread_utilities::RapidSet;
53use tracing::{debug, info, instrument, warn};
54
55// ─── Error Types ─────────────────────────────────────────────────────────────
56
57/// Errors that can occur during incremental analysis.
58#[derive(Debug, thiserror::Error)]
59pub enum AnalyzerError {
60    /// Storage backend operation failed.
61    #[error("Storage error: {0}")]
62    Storage(String),
63
64    /// Fingerprint computation failed.
65    #[error("Fingerprint error: {0}")]
66    Fingerprint(String),
67
68    /// Graph operation failed.
69    #[error("Graph error: {0}")]
70    Graph(String),
71
72    /// File I/O error.
73    #[error("IO error: {0}")]
74    Io(#[from] std::io::Error),
75
76    /// Dependency extraction failed.
77    #[error("Extraction failed for {file}: {error}")]
78    ExtractionFailed { file: PathBuf, error: String },
79}
80
81impl From<StorageError> for AnalyzerError {
82    fn from(err: StorageError) -> Self {
83        AnalyzerError::Storage(err.to_string())
84    }
85}
86
87// ─── Analysis Result ─────────────────────────────────────────────────────────
88
89/// Result of an incremental analysis operation.
90///
91/// Contains the set of changed files, affected files, and performance metrics.
92#[derive(Debug, Clone)]
93pub struct AnalysisResult {
94    /// Files that have changed (new or modified content).
95    pub changed_files: Vec<PathBuf>,
96
97    /// Files that are affected by changes (via strong dependencies).
98    pub affected_files: Vec<PathBuf>,
99
100    /// Total analysis time in microseconds.
101    pub analysis_time_us: u64,
102
103    /// Cache hit rate (0.0 to 1.0).
104    ///
105    /// Represents the fraction of files whose fingerprints matched
106    /// cached values, avoiding expensive re-parsing.
107    pub cache_hit_rate: f64,
108}
109
110impl AnalysisResult {
111    /// Creates a new empty analysis result.
112    fn empty() -> Self {
113        Self {
114            changed_files: Vec::new(),
115            affected_files: Vec::new(),
116            analysis_time_us: 0,
117            cache_hit_rate: 0.0,
118        }
119    }
120}
121
122// ─── IncrementalAnalyzer ─────────────────────────────────────────────────────
123
124/// Core incremental analysis coordinator.
125///
126/// Manages the dependency graph, storage backend, and coordinates change
127/// detection, invalidation, and reanalysis workflows.
128///
129/// # Examples
130///
131/// ```rust,ignore
132/// use thread_flow::incremental::analyzer::IncrementalAnalyzer;
133/// use thread_flow::incremental::storage::InMemoryStorage;
134///
135/// let storage = Box::new(InMemoryStorage::new());
136/// let mut analyzer = IncrementalAnalyzer::new(storage);
137/// ```
138pub struct IncrementalAnalyzer {
139    /// Storage backend for persistence.
140    storage: Box<dyn StorageBackend>,
141
142    /// The dependency graph tracking file relationships.
143    dependency_graph: DependencyGraph,
144}
145
146impl IncrementalAnalyzer {
147    /// Creates a new incremental analyzer with the given storage backend.
148    #[instrument(skip(storage), fields(storage_type = storage.name()))]
149    ///
150    /// Initializes with an empty dependency graph. To restore a previous
151    /// session, use [`IncrementalAnalyzer::from_storage`] instead.
152    ///
153    /// # Arguments
154    ///
155    /// * `storage` - The storage backend to use for persistence.
156    ///
157    /// # Examples
158    ///
159    /// ```rust,ignore
160    /// let storage = Box::new(InMemoryStorage::new());
161    /// let analyzer = IncrementalAnalyzer::new(storage);
162    /// ```
163    pub fn new(storage: Box<dyn StorageBackend>) -> Self {
164        Self {
165            storage,
166            dependency_graph: DependencyGraph::new(),
167        }
168    }
169
170    /// Creates a new incremental analyzer and loads the dependency graph from storage.
171    ///
172    /// This is the recommended way to initialize an analyzer for session continuity,
173    /// as it restores the previous dependency graph state.
174    ///
175    /// # Arguments
176    ///
177    /// * `storage` - The storage backend containing the previous session's graph.
178    ///
179    /// # Errors
180    ///
181    /// Returns [`AnalyzerError::Storage`] if loading the graph fails.
182    ///
183    /// # Examples
184    ///
185    /// ```rust,ignore
186    /// let storage = Box::new(PostgresStorage::new(config).await?);
187    /// let analyzer = IncrementalAnalyzer::from_storage(storage).await?;
188    /// ```
189    pub async fn from_storage(storage: Box<dyn StorageBackend>) -> Result<Self, AnalyzerError> {
190        let dependency_graph = storage.load_full_graph().await?;
191
192        Ok(Self {
193            storage,
194            dependency_graph,
195        })
196    }
197
198    /// Analyzes a set of files to detect changes.
199    ///
200    /// Compares current file fingerprints with stored fingerprints to identify
201    /// which files have been added or modified. Uses Blake3-based content hashing
202    /// for fast change detection.
203    ///
204    /// **Performance**: Achieves <10ms overhead for 100 files with >90% cache hit rate.
205    ///
206    /// # Arguments
207    ///
208    /// * `paths` - Slice of file paths to analyze for changes.
209    ///
210    /// # Returns
211    ///
212    /// An [`AnalysisResult`] containing changed files and performance metrics.
213    ///
214    /// # Errors
215    ///
216    /// - [`AnalyzerError::Io`] if file reading fails
217    /// - [`AnalyzerError::Storage`] if fingerprint loading fails
218    ///
219    /// # Examples
220    ///
221    /// ```rust,ignore
222    /// let result = analyzer.analyze_changes(&[
223    ///     PathBuf::from("src/main.rs"),
224    ///     PathBuf::from("src/utils.rs"),
225    /// ]).await?;
226    ///
227    /// println!("Changed: {} files", result.changed_files.len());
228    /// println!("Cache hit rate: {:.1}%", result.cache_hit_rate * 100.0);
229    /// ```
230    pub async fn analyze_changes(
231        &mut self,
232        paths: &[PathBuf],
233    ) -> Result<AnalysisResult, AnalyzerError> {
234        let start = Instant::now();
235        info!("analyzing {} files for changes", paths.len());
236
237        if paths.is_empty() {
238            return Ok(AnalysisResult::empty());
239        }
240
241        let concurrency = std::thread::available_parallelism()
242            .map(|n| n.get())
243            .unwrap_or(4);
244
245        let paths_owned = paths.to_vec();
246        let file_data = stream::iter(paths_owned)
247            .map(|path| async move {
248                let content = tokio::fs::read(&path).await?;
249                let fp = AnalysisDefFingerprint::new(&content);
250                Ok::<(PathBuf, AnalysisDefFingerprint), std::io::Error>((path, fp))
251            })
252            .buffer_unordered(concurrency)
253            .collect::<Vec<Result<_, _>>>()
254            .await;
255
256        let mut changed_files = Vec::new();
257        let mut cache_hits = 0;
258        let mut cache_total = 0;
259
260        for data in file_data {
261            let (path, current_fp) = data.map_err(AnalyzerError::Io)?;
262            debug!(file_path = ?path, "analyzing file");
263
264            // Load stored fingerprint
265            let stored_fp = self.storage.load_fingerprint(&path).await?;
266
267            cache_total += 1;
268
269            match stored_fp {
270                Some(stored) => {
271                    // Compare fingerprints
272                    if stored.fingerprint().as_slice() != current_fp.fingerprint().as_slice() {
273                        // Content changed - save new fingerprint
274                        info!(file = ?path, "cache miss - content changed");
275                        counter!("cache_misses_total").increment(1);
276                        changed_files.push(path.clone());
277                        let _ = self.storage.save_fingerprint(&path, &current_fp).await;
278                    } else {
279                        // Cache hit - no change
280                        info!(file = ?path, "cache hit");
281                        counter!("cache_hits_total").increment(1);
282                        cache_hits += 1;
283                    }
284                }
285                None => {
286                    // New file - no cached fingerprint, save it
287                    info!(file = ?path, "cache miss - new file");
288                    counter!("cache_misses_total").increment(1);
289                    changed_files.push(path.clone());
290                    let _ = self.storage.save_fingerprint(&path, &current_fp).await;
291                }
292            }
293        }
294
295        let cache_hit_rate = if cache_total > 0 {
296            cache_hits as f64 / cache_total as f64
297        } else {
298            0.0
299        };
300
301        let analysis_time_us = start.elapsed().as_micros() as u64;
302
303        // Record metrics
304        histogram!("analysis_overhead_ms").record((analysis_time_us as f64) / 1000.0);
305        gauge!("cache_hit_rate").set(cache_hit_rate);
306
307        info!(
308            changed_files = changed_files.len(),
309            cache_hit_rate = %format!("{:.1}%", cache_hit_rate * 100.0),
310            duration_ms = analysis_time_us / 1000,
311            "analysis complete"
312        );
313
314        Ok(AnalysisResult {
315            changed_files,
316            affected_files: Vec::new(), // Populated by invalidate_dependents
317            analysis_time_us,
318            cache_hit_rate,
319        })
320    }
321
322    /// Finds all files affected by changes to the given files.
323    ///
324    /// Uses BFS traversal of the dependency graph to identify all files that
325    /// transitively depend on the changed files. Only follows strong dependency
326    /// edges (Import, Trait, Macro) for cascading invalidation.
327    ///
328    /// **Performance**: O(V + E) where V = files, E = dependency edges.
329    /// Achieves <5ms for 1000-node graphs.
330    ///
331    /// # Arguments
332    ///
333    /// * `changed` - Slice of file paths that have changed.
334    ///
335    /// # Returns
336    ///
337    /// A vector of all affected file paths (including the changed files themselves).
338    ///
339    /// # Errors
340    ///
341    /// Returns [`AnalyzerError::Graph`] if graph traversal fails.
342    ///
343    /// # Examples
344    ///
345    /// ```rust,ignore
346    /// let changed = vec![PathBuf::from("src/utils.rs")];
347    /// let affected = analyzer.invalidate_dependents(&changed).await?;
348    ///
349    /// println!("Files requiring reanalysis: {}", affected.len());
350    /// ```
351    pub async fn invalidate_dependents(
352        &self,
353        changed: &[PathBuf],
354    ) -> Result<Vec<PathBuf>, AnalyzerError> {
355        if changed.is_empty() {
356            return Ok(Vec::new());
357        }
358
359        // Convert to RapidSet for efficient lookup
360        let changed_set: RapidSet<PathBuf> = changed.iter().cloned().collect();
361
362        // Use graph's BFS traversal to find affected files
363        let affected_set = self.dependency_graph.find_affected_files(&changed_set);
364
365        // Convert back to Vec
366        Ok(affected_set.into_iter().collect())
367    }
368
369    /// Reanalyzes invalidated files and updates the dependency graph.
370    ///
371    /// Performs dependency extraction for all affected files, updates their
372    /// fingerprints, and saves the new state to storage. Files are processed
373    /// in topological order (dependencies before dependents) to ensure correctness.
374    ///
375    /// **Error Recovery**: Skips files that fail extraction but continues processing
376    /// other files. Extraction errors are logged but do not abort the entire batch.
377    ///
378    /// # Arguments
379    ///
380    /// * `files` - Slice of file paths requiring reanalysis.
381    ///
382    /// # Errors
383    ///
384    /// - [`AnalyzerError::Storage`] if persistence fails
385    /// - [`AnalyzerError::Graph`] if topological sort fails (cyclic dependency)
386    ///
387    /// # Examples
388    ///
389    /// ```rust,ignore
390    /// let affected = analyzer.invalidate_dependents(&changed_files).await?;
391    /// analyzer.reanalyze_invalidated(&affected).await?;
392    /// ```
393    pub async fn reanalyze_invalidated(&mut self, files: &[PathBuf]) -> Result<(), AnalyzerError> {
394        if files.is_empty() {
395            return Ok(());
396        }
397
398        // Convert to RapidSet for topological sort
399        let file_set: RapidSet<PathBuf> = files.iter().cloned().collect();
400
401        // Sort files in dependency order (dependencies before dependents)
402        let sorted_files = self
403            .dependency_graph
404            .topological_sort(&file_set)
405            .map_err(|e| AnalyzerError::Graph(e.to_string()))?;
406
407        // Create a new builder for re-extraction
408        let mut builder = DependencyGraphBuilder::new(Box::new(DummyStorage));
409
410        // Process files in dependency order
411        for file in &sorted_files {
412            // Skip files that don't exist
413            if !tokio::fs::try_exists(file).await.unwrap_or(false) {
414                continue;
415            }
416
417            // Read content and compute fingerprint
418            match tokio::fs::read(file).await {
419                Ok(content) => {
420                    let fingerprint = AnalysisDefFingerprint::new(&content);
421
422                    // Save updated fingerprint
423                    if let Err(e) = self.storage.save_fingerprint(file, &fingerprint).await {
424                        eprintln!(
425                            "Warning: Failed to save fingerprint for {}: {}",
426                            file.display(),
427                            e
428                        );
429                        continue;
430                    }
431
432                    // Attempt to extract dependencies
433                    match builder.extract_file(file).await {
434                        Ok(_) => {
435                            // Successfully extracted - edges added to builder's graph
436                        }
437                        Err(e) => {
438                            // Log extraction error but continue with other files
439                            eprintln!(
440                                "Warning: Dependency extraction failed for {}: {}",
441                                file.display(),
442                                e
443                            );
444                            // Still update the graph node without edges
445                            self.dependency_graph.add_node(file);
446                        }
447                    }
448                }
449                Err(e) => {
450                    eprintln!("Warning: Failed to read file {}: {}", file.display(), e);
451                    continue;
452                }
453            }
454        }
455
456        // Update dependency graph with newly extracted edges
457        // First, remove old edges for reanalyzed files
458        for file in &sorted_files {
459            let _ = self.storage.delete_edges_for(file).await;
460        }
461
462        // Merge new edges from builder into our graph
463        let new_graph = builder.graph();
464        for edge in &new_graph.edges {
465            // Only add edges that involve files we're reanalyzing
466            if file_set.contains(&edge.from) || file_set.contains(&edge.to) {
467                self.dependency_graph.add_edge(edge.clone());
468                // Save edge to storage
469                if let Err(e) = self.storage.save_edge(edge).await {
470                    eprintln!("Warning: Failed to save edge: {}", e);
471                }
472            }
473        }
474
475        // Update nodes in the graph
476        for file in &sorted_files {
477            if let Some(fp) = new_graph.nodes.get(file) {
478                self.dependency_graph.nodes.insert(file.clone(), fp.clone());
479            }
480        }
481
482        Ok(())
483    }
484
485    /// Returns a reference to the internal dependency graph.
486    ///
487    /// # Examples
488    ///
489    /// ```rust,ignore
490    /// let graph = analyzer.graph();
491    /// println!("Graph has {} nodes and {} edges",
492    ///     graph.node_count(), graph.edge_count());
493    /// ```
494    pub fn graph(&self) -> &DependencyGraph {
495        &self.dependency_graph
496    }
497
498    /// Returns a mutable reference to the internal dependency graph.
499    ///
500    /// # Examples
501    ///
502    /// ```rust,ignore
503    /// let graph = analyzer.graph_mut();
504    /// graph.add_edge(edge);
505    /// ```
506    pub fn graph_mut(&mut self) -> &mut DependencyGraph {
507        &mut self.dependency_graph
508    }
509
510    /// Persists the current dependency graph to storage.
511    ///
512    /// # Errors
513    ///
514    /// Returns [`AnalyzerError::Storage`] if persistence fails.
515    ///
516    /// # Examples
517    ///
518    /// ```rust,ignore
519    /// analyzer.persist().await?;
520    /// ```
521    pub async fn persist(&self) -> Result<(), AnalyzerError> {
522        self.storage.save_full_graph(&self.dependency_graph).await?;
523        Ok(())
524    }
525}
526
527// ─── Dummy Storage for Builder ───────────────────────────────────────────────
528
529/// Dummy storage backend that discards all operations.
530///
531/// Used internally by the analyzer when creating a temporary builder
532/// for re-extraction during reanalysis. The builder needs a storage
533/// backend but we don't want to persist its intermediate state.
534#[derive(Debug)]
535struct DummyStorage;
536
537#[async_trait::async_trait]
538impl StorageBackend for DummyStorage {
539    async fn save_fingerprint(
540        &self,
541        _file_path: &Path,
542        _fingerprint: &AnalysisDefFingerprint,
543    ) -> Result<(), StorageError> {
544        Ok(())
545    }
546
547    async fn load_fingerprint(
548        &self,
549        _file_path: &Path,
550    ) -> Result<Option<AnalysisDefFingerprint>, StorageError> {
551        Ok(None)
552    }
553
554    async fn delete_fingerprint(&self, _file_path: &Path) -> Result<bool, StorageError> {
555        Ok(false)
556    }
557
558    async fn save_edge(&self, _edge: &super::types::DependencyEdge) -> Result<(), StorageError> {
559        Ok(())
560    }
561
562    async fn load_edges_from(
563        &self,
564        _file_path: &Path,
565    ) -> Result<Vec<super::types::DependencyEdge>, StorageError> {
566        Ok(Vec::new())
567    }
568
569    async fn load_edges_to(
570        &self,
571        _file_path: &Path,
572    ) -> Result<Vec<super::types::DependencyEdge>, StorageError> {
573        Ok(Vec::new())
574    }
575
576    async fn delete_edges_for(&self, _file_path: &Path) -> Result<usize, StorageError> {
577        Ok(0)
578    }
579
580    async fn load_full_graph(&self) -> Result<DependencyGraph, StorageError> {
581        Ok(DependencyGraph::new())
582    }
583
584    async fn save_full_graph(&self, _graph: &DependencyGraph) -> Result<(), StorageError> {
585        Ok(())
586    }
587
588    fn name(&self) -> &'static str {
589        "dummy"
590    }
591}
592
593// ─── Tests ───────────────────────────────────────────────────────────────────
594
595#[cfg(test)]
596mod tests {
597    use super::*;
598    use crate::incremental::storage::InMemoryStorage;
599    use crate::incremental::types::DependencyEdge;
600
601    #[tokio::test]
602    async fn test_analyzer_new_creates_empty_graph() {
603        let storage = Box::new(InMemoryStorage::new());
604        let analyzer = IncrementalAnalyzer::new(storage);
605
606        assert_eq!(analyzer.graph().node_count(), 0);
607        assert_eq!(analyzer.graph().edge_count(), 0);
608    }
609
610    #[tokio::test]
611    async fn test_analyzer_from_storage_loads_graph() {
612        let storage = Box::new(InMemoryStorage::new());
613
614        // Create and save a graph
615        let mut graph = DependencyGraph::new();
616        graph.add_edge(DependencyEdge::new(
617            PathBuf::from("a.rs"),
618            PathBuf::from("b.rs"),
619            super::super::types::DependencyType::Import,
620        ));
621        storage.save_full_graph(&graph).await.unwrap();
622
623        // Load analyzer from storage
624        let analyzer = IncrementalAnalyzer::from_storage(storage).await.unwrap();
625
626        assert_eq!(analyzer.graph().node_count(), 2);
627        assert_eq!(analyzer.graph().edge_count(), 1);
628    }
629
630    #[tokio::test]
631    async fn test_analysis_result_empty() {
632        let result = AnalysisResult::empty();
633
634        assert_eq!(result.changed_files.len(), 0);
635        assert_eq!(result.affected_files.len(), 0);
636        assert_eq!(result.analysis_time_us, 0);
637        assert_eq!(result.cache_hit_rate, 0.0);
638    }
639}