codeprism_core/pipeline/
mod.rs

1//! File monitoring pipeline for real-time graph updates
2//!
3//! This module provides functionality to monitor file changes and automatically
4//! update the code graph through incremental parsing and patch generation.
5
6use crate::error::{Error, Result};
7use crate::parser::{ParseContext, ParserEngine};
8use crate::patch::{AstPatch, PatchBuilder};
9use crate::watcher::{ChangeEvent, ChangeKind, FileWatcher};
10use std::path::Path;
11use std::sync::Arc;
12use std::time::{Duration, Instant};
13use tokio::sync::oneshot;
14use tokio::time::sleep;
15
16/// Pipeline event representing a processed file change
17#[derive(Debug, Clone)]
18pub struct PipelineEvent {
19    /// Repository ID
20    pub repo_id: String,
21    /// Original change event
22    pub change_event: ChangeEvent,
23    /// Generated patch (if any)
24    pub patch: Option<AstPatch>,
25    /// Processing timestamp
26    pub processed_at: Instant,
27    /// Processing duration in milliseconds
28    pub processing_duration_ms: u64,
29}
30
31/// Pipeline statistics
32#[derive(Debug, Clone, Default)]
33pub struct PipelineStats {
34    /// Total events processed
35    pub events_processed: usize,
36    /// Events processed successfully
37    pub events_success: usize,
38    /// Events that failed processing
39    pub events_failed: usize,
40    /// Events that were filtered out
41    pub events_filtered: usize,
42    /// Average processing time in milliseconds
43    pub avg_processing_ms: f64,
44    /// Total patches generated
45    pub patches_generated: usize,
46    /// Total nodes added
47    pub nodes_added: usize,
48    /// Total edges added
49    pub edges_added: usize,
50    /// Total nodes removed
51    pub nodes_removed: usize,
52    /// Total edges removed
53    pub edges_removed: usize,
54}
55
56impl PipelineStats {
57    /// Update statistics with a new event
58    pub fn update(&mut self, event: &PipelineEvent, success: bool) {
59        self.events_processed += 1;
60
61        if success {
62            self.events_success += 1;
63            if let Some(ref patch) = event.patch {
64                self.patches_generated += 1;
65                self.nodes_added += patch.nodes_add.len();
66                self.edges_added += patch.edges_add.len();
67                self.nodes_removed += patch.nodes_delete.len();
68                self.edges_removed += patch.edges_delete.len();
69            }
70        } else {
71            self.events_failed += 1;
72        }
73
74        // Update average processing time
75        let total_time = self.avg_processing_ms * (self.events_processed - 1) as f64
76            + event.processing_duration_ms as f64;
77        self.avg_processing_ms = total_time / self.events_processed as f64;
78    }
79
80    /// Calculate success rate as percentage
81    pub fn success_rate(&self) -> f64 {
82        if self.events_processed == 0 {
83            0.0
84        } else {
85            (self.events_success as f64 / self.events_processed as f64) * 100.0
86        }
87    }
88
89    /// Calculate events per second
90    pub fn events_per_second(&self, duration_secs: f64) -> f64 {
91        if duration_secs <= 0.0 {
92            0.0
93        } else {
94            self.events_processed as f64 / duration_secs
95        }
96    }
97}
98
99/// Event handler trait for processing pipeline events
100pub trait PipelineEventHandler: Send + Sync {
101    /// Handle a processed pipeline event
102    fn handle_event(&self, event: &PipelineEvent) -> Result<()>;
103
104    /// Handle pipeline errors
105    fn handle_error(&self, error: &Error, change_event: &ChangeEvent);
106}
107
108/// No-op event handler
109#[derive(Debug, Default)]
110pub struct NoOpEventHandler;
111
112impl PipelineEventHandler for NoOpEventHandler {
113    fn handle_event(&self, _event: &PipelineEvent) -> Result<()> {
114        Ok(())
115    }
116
117    fn handle_error(&self, _error: &Error, _change_event: &ChangeEvent) {}
118}
119
120/// Logging event handler
121#[derive(Debug)]
122pub struct LoggingEventHandler {
123    verbose: bool,
124}
125
126impl LoggingEventHandler {
127    /// Create a new logging event handler
128    pub fn new(verbose: bool) -> Self {
129        Self { verbose }
130    }
131}
132
133impl PipelineEventHandler for LoggingEventHandler {
134    fn handle_event(&self, event: &PipelineEvent) -> Result<()> {
135        if self.verbose {
136            println!(
137                "Pipeline event: {:?} processed in {}ms",
138                event.change_event.kind, event.processing_duration_ms
139            );
140
141            if let Some(ref patch) = event.patch {
142                println!(
143                    "  Generated patch: +{} nodes, +{} edges, -{} nodes, -{} edges",
144                    patch.nodes_add.len(),
145                    patch.edges_add.len(),
146                    patch.nodes_delete.len(),
147                    patch.edges_delete.len()
148                );
149            }
150        }
151        Ok(())
152    }
153
154    fn handle_error(&self, error: &Error, change_event: &ChangeEvent) {
155        eprintln!(
156            "Pipeline error processing {:?}: {}",
157            change_event.path, error
158        );
159    }
160}
161
162/// Configuration for the monitoring pipeline
163#[derive(Debug, Clone)]
164pub struct PipelineConfig {
165    /// Repository ID
166    pub repo_id: String,
167    /// Commit SHA for generated patches
168    pub commit_sha: String,
169    /// Debounce duration for file changes
170    pub debounce_duration: Duration,
171    /// Maximum queue size for pending events
172    pub max_queue_size: usize,
173    /// Batch size for processing multiple events
174    pub batch_size: usize,
175    /// Whether to process events in batches
176    pub enable_batching: bool,
177    /// Timeout for processing individual events
178    pub processing_timeout: Duration,
179}
180
181impl PipelineConfig {
182    /// Create a new pipeline config
183    pub fn new(repo_id: String, commit_sha: String) -> Self {
184        Self {
185            repo_id,
186            commit_sha,
187            debounce_duration: Duration::from_millis(100),
188            max_queue_size: 1000,
189            batch_size: 10,
190            enable_batching: true,
191            processing_timeout: Duration::from_secs(30),
192        }
193    }
194}
195
196/// File monitoring pipeline that connects FileWatcher to ParserEngine
197pub struct MonitoringPipeline {
198    config: PipelineConfig,
199    parser_engine: Arc<ParserEngine>,
200    file_watcher: FileWatcher,
201    event_handler: Arc<dyn PipelineEventHandler>,
202    stats: PipelineStats,
203    shutdown_tx: Option<oneshot::Sender<()>>,
204}
205
206impl MonitoringPipeline {
207    /// Create a new monitoring pipeline
208    pub fn new(
209        config: PipelineConfig,
210        parser_engine: Arc<ParserEngine>,
211        event_handler: Arc<dyn PipelineEventHandler>,
212    ) -> Result<Self> {
213        let file_watcher = FileWatcher::with_debounce(config.debounce_duration)?;
214
215        Ok(Self {
216            config,
217            parser_engine,
218            file_watcher,
219            event_handler,
220            stats: PipelineStats::default(),
221            shutdown_tx: None,
222        })
223    }
224
225    /// Start monitoring a repository path
226    pub async fn start_monitoring<P: AsRef<Path>>(&mut self, repo_path: P) -> Result<()> {
227        let repo_path = repo_path.as_ref();
228
229        // Start watching the repository
230        self.file_watcher
231            .watch_dir(repo_path, repo_path.to_path_buf())?;
232
233        // Start the processing loop
234        let (shutdown_tx, shutdown_rx) = oneshot::channel();
235        self.shutdown_tx = Some(shutdown_tx);
236
237        let mut event_queue = Vec::new();
238        let mut last_batch_time = Instant::now();
239
240        tokio::select! {
241            _ = self.process_events(&mut event_queue, &mut last_batch_time) => {
242                // Processing loop ended
243            }
244            _ = shutdown_rx => {
245                // Shutdown requested
246                tracing::info!("Pipeline shutdown requested");
247            }
248        }
249
250        Ok(())
251    }
252
253    /// Stop monitoring and shutdown the pipeline
254    pub fn stop_monitoring(&mut self) {
255        if let Some(shutdown_tx) = self.shutdown_tx.take() {
256            let _ = shutdown_tx.send(());
257        }
258    }
259
260    /// Process file change events
261    async fn process_events(
262        &mut self,
263        event_queue: &mut Vec<ChangeEvent>,
264        last_batch_time: &mut Instant,
265    ) -> Result<()> {
266        loop {
267            // Try to get the next change event
268            if let Some(change_event) = self.file_watcher.next_change().await {
269                event_queue.push(change_event);
270
271                // Process batch if conditions are met
272                let should_process_batch = event_queue.len() >= self.config.batch_size
273                    || (!self.config.enable_batching && !event_queue.is_empty())
274                    || (last_batch_time.elapsed() > self.config.debounce_duration
275                        && !event_queue.is_empty());
276
277                if should_process_batch {
278                    self.process_event_batch(event_queue).await?;
279                    *last_batch_time = Instant::now();
280                }
281            } else {
282                // No more events, process any remaining in queue
283                if !event_queue.is_empty() {
284                    self.process_event_batch(event_queue).await?;
285                    *last_batch_time = Instant::now();
286                }
287
288                // Brief pause to avoid busy waiting
289                sleep(Duration::from_millis(10)).await;
290            }
291        }
292    }
293
294    /// Process a batch of change events
295    async fn process_event_batch(&mut self, event_queue: &mut Vec<ChangeEvent>) -> Result<()> {
296        let events_to_process = std::mem::take(event_queue);
297
298        for change_event in events_to_process {
299            match self.process_single_event(change_event.clone()).await {
300                Ok(pipeline_event) => {
301                    self.stats.update(&pipeline_event, true);
302                    if let Err(e) = self.event_handler.handle_event(&pipeline_event) {
303                        self.event_handler.handle_error(&e, &change_event);
304                    }
305                }
306                Err(e) => {
307                    self.stats.events_failed += 1;
308                    self.event_handler.handle_error(&e, &change_event);
309                }
310            }
311        }
312
313        Ok(())
314    }
315
316    /// Process a single change event
317    async fn process_single_event(&self, change_event: ChangeEvent) -> Result<PipelineEvent> {
318        let start_time = Instant::now();
319
320        let patch = match change_event.kind {
321            ChangeKind::Created | ChangeKind::Modified => {
322                self.process_file_change(&change_event.path).await?
323            }
324            ChangeKind::Deleted => self.process_file_deletion(&change_event.path).await?,
325            ChangeKind::Renamed { ref old, ref new } => self.process_file_rename(old, new).await?,
326        };
327
328        let processing_duration = start_time.elapsed();
329
330        Ok(PipelineEvent {
331            repo_id: self.config.repo_id.clone(),
332            change_event,
333            patch,
334            processed_at: Instant::now(),
335            processing_duration_ms: processing_duration.as_millis() as u64,
336        })
337    }
338
339    /// Process a file creation or modification
340    async fn process_file_change(&self, file_path: &Path) -> Result<Option<AstPatch>> {
341        // Check if file still exists and is readable
342        if !file_path.exists() {
343            return Ok(None);
344        }
345
346        // Read file content
347        let content = tokio::fs::read_to_string(file_path).await.map_err(|e| {
348            Error::io(format!(
349                "Failed to read file {}: {}",
350                file_path.display(),
351                e
352            ))
353        })?;
354
355        // Skip empty files
356        if content.trim().is_empty() {
357            return Ok(None);
358        }
359
360        // Create parse context
361        let context = ParseContext::new(
362            self.config.repo_id.clone(),
363            file_path.to_path_buf(),
364            content,
365        );
366
367        // Parse the file
368        let parse_result = self.parser_engine.parse_incremental(context)?;
369
370        // Create patch with new nodes and edges
371        let patch = PatchBuilder::new(self.config.repo_id.clone(), self.config.commit_sha.clone())
372            .add_nodes(parse_result.nodes)
373            .add_edges(parse_result.edges)
374            .build();
375
376        Ok(Some(patch))
377    }
378
379    /// Process a file deletion
380    async fn process_file_deletion(&self, _file_path: &Path) -> Result<Option<AstPatch>> {
381        // For deletion, we would need to track which nodes belong to which files
382        // and generate deletion patches. For now, we'll create an empty patch
383        // that represents the deletion event.
384
385        let patch =
386            PatchBuilder::new(self.config.repo_id.clone(), self.config.commit_sha.clone()).build();
387
388        Ok(Some(patch))
389    }
390
391    /// Process a file rename
392    async fn process_file_rename(
393        &self,
394        _old_path: &Path,
395        new_path: &Path,
396    ) -> Result<Option<AstPatch>> {
397        // For rename, we could:
398        // 1. Delete nodes from old file
399        // 2. Parse and add nodes from new file
400        // For now, we'll just process it as a new file
401        self.process_file_change(new_path).await
402    }
403
404    /// Get pipeline statistics
405    pub fn get_stats(&self) -> &PipelineStats {
406        &self.stats
407    }
408
409    /// Reset pipeline statistics
410    pub fn reset_stats(&mut self) {
411        self.stats = PipelineStats::default();
412    }
413}
414
415#[cfg(test)]
416mod tests {
417    use super::*;
418    use crate::parser::LanguageRegistry;
419    use std::path::PathBuf;
420    use std::sync::atomic::{AtomicUsize, Ordering};
421    use tempfile::TempDir;
422    use tokio::fs;
423
424    struct TestEventHandler {
425        event_count: Arc<AtomicUsize>,
426        error_count: Arc<AtomicUsize>,
427    }
428
429    impl TestEventHandler {
430        fn new() -> Self {
431            Self {
432                event_count: Arc::new(AtomicUsize::new(0)),
433                error_count: Arc::new(AtomicUsize::new(0)),
434            }
435        }
436    }
437
438    impl PipelineEventHandler for TestEventHandler {
439        fn handle_event(&self, _event: &PipelineEvent) -> Result<()> {
440            self.event_count.fetch_add(1, Ordering::Relaxed);
441            Ok(())
442        }
443
444        fn handle_error(&self, _error: &Error, _change_event: &ChangeEvent) {
445            self.error_count.fetch_add(1, Ordering::Relaxed);
446        }
447    }
448
449    fn create_test_pipeline() -> (MonitoringPipeline, TempDir, Arc<TestEventHandler>) {
450        let temp_dir = TempDir::new().unwrap();
451        let config = PipelineConfig::new("test_repo".to_string(), "abc123".to_string());
452        let registry = Arc::new(LanguageRegistry::new());
453        let parser_engine = Arc::new(ParserEngine::new(registry));
454        let handler = Arc::new(TestEventHandler::new());
455
456        let pipeline = MonitoringPipeline::new(
457            config,
458            parser_engine,
459            handler.clone() as Arc<dyn PipelineEventHandler>,
460        )
461        .unwrap();
462
463        (pipeline, temp_dir, handler)
464    }
465
466    #[test]
467    fn test_pipeline_config() {
468        let config = PipelineConfig::new("test".to_string(), "sha".to_string());
469        assert_eq!(config.repo_id, "test");
470        assert_eq!(config.commit_sha, "sha");
471        assert!(config.enable_batching);
472    }
473
474    #[test]
475    fn test_pipeline_stats() {
476        let mut stats = PipelineStats::default();
477
478        let event = PipelineEvent {
479            repo_id: "test".to_string(),
480            change_event: ChangeEvent {
481                repo_root: PathBuf::from("/repo"),
482                path: PathBuf::from("/repo/file.js"),
483                kind: ChangeKind::Modified,
484                timestamp: Instant::now(),
485            },
486            patch: None,
487            processed_at: Instant::now(),
488            processing_duration_ms: 100,
489        };
490
491        stats.update(&event, true);
492        assert_eq!(stats.events_processed, 1);
493        assert_eq!(stats.events_success, 1);
494        assert_eq!(stats.avg_processing_ms, 100.0);
495        assert_eq!(stats.success_rate(), 100.0);
496    }
497
498    #[tokio::test]
499    async fn test_pipeline_creation() {
500        let (pipeline, _temp_dir, _handler) = create_test_pipeline();
501        assert_eq!(pipeline.config.repo_id, "test_repo");
502        assert_eq!(pipeline.stats.events_processed, 0);
503    }
504
505    #[tokio::test]
506    async fn test_process_file_change() {
507        let (pipeline, temp_dir, _handler) = create_test_pipeline();
508
509        // Create a test file
510        let test_file = temp_dir.path().join("test.js");
511        fs::write(&test_file, "console.log('hello');")
512            .await
513            .unwrap();
514
515        // This will fail because no JS parser is registered, but tests the logic
516        let result = pipeline.process_file_change(&test_file).await;
517        assert!(result.is_err()); // Expected because no parser registered
518    }
519
520    #[tokio::test]
521    async fn test_process_empty_file() {
522        let (pipeline, temp_dir, _handler) = create_test_pipeline();
523
524        // Create an empty test file
525        let test_file = temp_dir.path().join("empty.js");
526        fs::write(&test_file, "").await.unwrap();
527
528        let result = pipeline.process_file_change(&test_file).await.unwrap();
529        assert!(result.is_none()); // Should be None for empty files
530    }
531
532    #[tokio::test]
533    async fn test_process_nonexistent_file() {
534        let (pipeline, temp_dir, _handler) = create_test_pipeline();
535
536        let test_file = temp_dir.path().join("nonexistent.js");
537
538        let result = pipeline.process_file_change(&test_file).await.unwrap();
539        assert!(result.is_none()); // Should be None for nonexistent files
540    }
541
542    #[test]
543    fn test_event_handlers() {
544        let handler = LoggingEventHandler::new(true);
545
546        let event = PipelineEvent {
547            repo_id: "test".to_string(),
548            change_event: ChangeEvent {
549                repo_root: PathBuf::from("/repo"),
550                path: PathBuf::from("/repo/file.js"),
551                kind: ChangeKind::Modified,
552                timestamp: Instant::now(),
553            },
554            patch: None,
555            processed_at: Instant::now(),
556            processing_duration_ms: 100,
557        };
558
559        // Should not panic
560        let _ = handler.handle_event(&event);
561
562        let error = Error::other("test error");
563        handler.handle_error(&error, &event.change_event);
564    }
565}