codeprism_core/pipeline/
mod.rs

1//! File monitoring pipeline for real-time graph updates
2//!
3//! This module provides functionality to monitor file changes and automatically
4//! update the code graph through incremental parsing and patch generation.
5
6use crate::error::{Error, Result};
7use crate::parser::{ParseContext, ParserEngine};
8use crate::patch::{AstPatch, PatchBuilder};
9use codeprism_utils::{ChangeEvent, ChangeKind, FileWatcher};
10use std::path::Path;
11use std::sync::Arc;
12use std::time::{Duration, Instant};
13use tokio::sync::oneshot;
14use tokio::time::sleep;
15
16/// Pipeline event representing a processed file change
17#[derive(Debug, Clone)]
18pub struct PipelineEvent {
19    /// Repository ID
20    pub repo_id: String,
21    /// Original change event
22    pub change_event: ChangeEvent,
23    /// Generated patch (if any)
24    pub patch: Option<AstPatch>,
25    /// Processing timestamp
26    pub processed_at: Instant,
27    /// Processing duration in milliseconds
28    pub processing_duration_ms: u64,
29}
30
31/// Pipeline statistics
32#[derive(Debug, Clone, Default)]
33pub struct PipelineStats {
34    /// Total events processed
35    pub events_processed: usize,
36    /// Events processed successfully
37    pub events_success: usize,
38    /// Events that failed processing
39    pub events_failed: usize,
40    /// Events that were filtered out
41    pub events_filtered: usize,
42    /// Average processing time in milliseconds
43    pub avg_processing_ms: f64,
44    /// Total patches generated
45    pub patches_generated: usize,
46    /// Total nodes added
47    pub nodes_added: usize,
48    /// Total edges added
49    pub edges_added: usize,
50    /// Total nodes removed
51    pub nodes_removed: usize,
52    /// Total edges removed
53    pub edges_removed: usize,
54}
55
56impl PipelineStats {
57    /// Update statistics with a new event
58    pub fn update(&mut self, event: &PipelineEvent, success: bool) {
59        self.events_processed += 1;
60
61        if success {
62            self.events_success += 1;
63            if let Some(ref patch) = event.patch {
64                self.patches_generated += 1;
65                self.nodes_added += patch.nodes_add.len();
66                self.edges_added += patch.edges_add.len();
67                self.nodes_removed += patch.nodes_delete.len();
68                self.edges_removed += patch.edges_delete.len();
69            }
70        } else {
71            self.events_failed += 1;
72        }
73
74        // Update average processing time
75        let total_time = self.avg_processing_ms * (self.events_processed - 1) as f64
76            + event.processing_duration_ms as f64;
77        self.avg_processing_ms = total_time / self.events_processed as f64;
78    }
79
80    /// Calculate success rate as percentage
81    pub fn success_rate(&self) -> f64 {
82        if self.events_processed == 0 {
83            0.0
84        } else {
85            (self.events_success as f64 / self.events_processed as f64) * 100.0
86        }
87    }
88
89    /// Calculate events per second
90    pub fn events_per_second(&self, duration_secs: f64) -> f64 {
91        if duration_secs <= 0.0 {
92            0.0
93        } else {
94            self.events_processed as f64 / duration_secs
95        }
96    }
97}
98
99/// Event handler trait for processing pipeline events
100pub trait PipelineEventHandler: Send + Sync {
101    /// Handle a processed pipeline event
102    fn handle_event(&self, event: &PipelineEvent) -> Result<()>;
103
104    /// Handle pipeline errors
105    fn handle_error(&self, error: &Error, change_event: &ChangeEvent);
106}
107
108/// No-op event handler
109#[derive(Debug, Default)]
110pub struct NoOpEventHandler;
111
112impl PipelineEventHandler for NoOpEventHandler {
113    fn handle_event(&self, _event: &PipelineEvent) -> Result<()> {
114        Ok(())
115    }
116
117    fn handle_error(&self, _error: &Error, _change_event: &ChangeEvent) {}
118}
119
120/// Logging event handler
121#[derive(Debug)]
122pub struct LoggingEventHandler {
123    verbose: bool,
124}
125
126impl LoggingEventHandler {
127    /// Create a new logging event handler
128    pub fn new(verbose: bool) -> Self {
129        Self { verbose }
130    }
131}
132
133impl PipelineEventHandler for LoggingEventHandler {
134    fn handle_event(&self, event: &PipelineEvent) -> Result<()> {
135        if self.verbose {
136            println!(
137                "Pipeline event: {:?} processed in {}ms",
138                event.change_event.kind, event.processing_duration_ms
139            );
140
141            if let Some(ref patch) = event.patch {
142                println!(
143                    "  Generated patch: +{} nodes, +{} edges, -{} nodes, -{} edges",
144                    patch.nodes_add.len(),
145                    patch.edges_add.len(),
146                    patch.nodes_delete.len(),
147                    patch.edges_delete.len()
148                );
149            }
150        }
151        Ok(())
152    }
153
154    fn handle_error(&self, error: &Error, change_event: &ChangeEvent) {
155        eprintln!(
156            "Pipeline error processing {:?}: {}",
157            change_event.path, error
158        );
159    }
160}
161
162/// Configuration for the monitoring pipeline
163#[derive(Debug, Clone)]
164pub struct PipelineConfig {
165    /// Repository ID
166    pub repo_id: String,
167    /// Commit SHA for generated patches
168    pub commit_sha: String,
169    /// Debounce duration for file changes
170    pub debounce_duration: Duration,
171    /// Maximum queue size for pending events
172    pub max_queue_size: usize,
173    /// Batch size for processing multiple events
174    pub batch_size: usize,
175    /// Whether to process events in batches
176    pub enable_batching: bool,
177    /// Timeout for processing individual events
178    pub processing_timeout: Duration,
179}
180
181impl PipelineConfig {
182    /// Create a new pipeline config
183    pub fn new(repo_id: String, commit_sha: String) -> Self {
184        Self {
185            repo_id,
186            commit_sha,
187            debounce_duration: Duration::from_millis(100),
188            max_queue_size: 1000,
189            batch_size: 10,
190            enable_batching: true,
191            processing_timeout: Duration::from_secs(30),
192        }
193    }
194}
195
196/// File monitoring pipeline that connects FileWatcher to ParserEngine
197pub struct MonitoringPipeline {
198    config: PipelineConfig,
199    parser_engine: Arc<ParserEngine>,
200    file_watcher: FileWatcher,
201    event_handler: Arc<dyn PipelineEventHandler>,
202    stats: PipelineStats,
203    shutdown_tx: Option<oneshot::Sender<()>>,
204}
205
206impl MonitoringPipeline {
207    /// Create a new monitoring pipeline
208    pub fn new(
209        config: PipelineConfig,
210        parser_engine: Arc<ParserEngine>,
211        event_handler: Arc<dyn PipelineEventHandler>,
212    ) -> Result<Self> {
213        let file_watcher = FileWatcher::with_debounce(config.debounce_duration)
214            .map_err(|e| Error::watcher(format!("Failed to create file watcher: {e}")))?;
215
216        Ok(Self {
217            config,
218            parser_engine,
219            file_watcher,
220            event_handler,
221            stats: PipelineStats::default(),
222            shutdown_tx: None,
223        })
224    }
225
226    /// Start monitoring a repository path
227    pub async fn start_monitoring<P: AsRef<Path>>(&mut self, repo_path: P) -> Result<()> {
228        let repo_path = repo_path.as_ref();
229
230        // Start watching the repository
231        self.file_watcher
232            .watch_dir(repo_path, repo_path.to_path_buf())
233            .map_err(|e| Error::watcher(format!("Failed to watch directory: {e}")))?;
234
235        // Start the processing loop
236        let (shutdown_tx, shutdown_rx) = oneshot::channel();
237        self.shutdown_tx = Some(shutdown_tx);
238
239        let mut event_queue = Vec::new();
240        let mut last_batch_time = Instant::now();
241
242        tokio::select! {
243            _ = self.process_events(&mut event_queue, &mut last_batch_time) => {
244                // Processing loop ended
245            }
246            _ = shutdown_rx => {
247                // Shutdown requested
248                tracing::info!("Pipeline shutdown requested");
249            }
250        }
251
252        Ok(())
253    }
254
255    /// Stop monitoring and shutdown the pipeline
256    pub fn stop_monitoring(&mut self) {
257        if let Some(shutdown_tx) = self.shutdown_tx.take() {
258            let _ = shutdown_tx.send(());
259        }
260    }
261
262    /// Process file change events
263    async fn process_events(
264        &mut self,
265        event_queue: &mut Vec<ChangeEvent>,
266        last_batch_time: &mut Instant,
267    ) -> Result<()> {
268        loop {
269            // Try to get the next change event
270            if let Some(change_event) = self.file_watcher.next_change().await {
271                event_queue.push(change_event);
272
273                // Process batch if conditions are met
274                let should_process_batch = event_queue.len() >= self.config.batch_size
275                    || (!self.config.enable_batching && !event_queue.is_empty())
276                    || (last_batch_time.elapsed() > self.config.debounce_duration
277                        && !event_queue.is_empty());
278
279                if should_process_batch {
280                    self.process_event_batch(event_queue).await?;
281                    *last_batch_time = Instant::now();
282                }
283            } else {
284                // No more events, process any remaining in queue
285                if !event_queue.is_empty() {
286                    self.process_event_batch(event_queue).await?;
287                    *last_batch_time = Instant::now();
288                }
289
290                // Brief pause to avoid busy waiting
291                sleep(Duration::from_millis(10)).await;
292            }
293        }
294    }
295
296    /// Process a batch of change events
297    async fn process_event_batch(&mut self, event_queue: &mut Vec<ChangeEvent>) -> Result<()> {
298        let events_to_process = std::mem::take(event_queue);
299
300        for change_event in events_to_process {
301            match self.process_single_event(change_event.clone()).await {
302                Ok(pipeline_event) => {
303                    self.stats.update(&pipeline_event, true);
304                    if let Err(e) = self.event_handler.handle_event(&pipeline_event) {
305                        self.event_handler.handle_error(&e, &change_event);
306                    }
307                }
308                Err(e) => {
309                    self.stats.events_failed += 1;
310                    self.event_handler.handle_error(&e, &change_event);
311                }
312            }
313        }
314
315        Ok(())
316    }
317
318    /// Process a single change event
319    async fn process_single_event(&self, change_event: ChangeEvent) -> Result<PipelineEvent> {
320        let start_time = Instant::now();
321
322        let patch = match change_event.kind {
323            ChangeKind::Created | ChangeKind::Modified => {
324                self.process_file_change(&change_event.path).await?
325            }
326            ChangeKind::Deleted => self.process_file_deletion(&change_event.path).await?,
327            ChangeKind::Renamed { ref old, ref new } => self.process_file_rename(old, new).await?,
328        };
329
330        let processing_duration = start_time.elapsed();
331
332        Ok(PipelineEvent {
333            repo_id: self.config.repo_id.clone(),
334            change_event,
335            patch,
336            processed_at: Instant::now(),
337            processing_duration_ms: processing_duration.as_millis() as u64,
338        })
339    }
340
341    /// Process a file creation or modification
342    async fn process_file_change(&self, file_path: &Path) -> Result<Option<AstPatch>> {
343        // Check if file still exists and is readable
344        if !file_path.exists() {
345            return Ok(None);
346        }
347
348        // Read file content
349        let content = tokio::fs::read_to_string(file_path).await.map_err(|e| {
350            Error::io(format!(
351                "Failed to read file {}: {}",
352                file_path.display(),
353                e
354            ))
355        })?;
356
357        // Skip empty files
358        if content.trim().is_empty() {
359            return Ok(None);
360        }
361
362        // Create parse context
363        let context = ParseContext::new(
364            self.config.repo_id.clone(),
365            file_path.to_path_buf(),
366            content,
367        );
368
369        // Parse the file
370        let parse_result = self.parser_engine.parse_incremental(context)?;
371
372        // Create patch with new nodes and edges
373        let patch = PatchBuilder::new(self.config.repo_id.clone(), self.config.commit_sha.clone())
374            .add_nodes(parse_result.nodes)
375            .add_edges(parse_result.edges)
376            .build();
377
378        Ok(Some(patch))
379    }
380
381    /// Process a file deletion
382    async fn process_file_deletion(&self, _file_path: &Path) -> Result<Option<AstPatch>> {
383        // For deletion, we would need to track which nodes belong to which files
384        // and generate deletion patches. Currently creating an empty patch
385        // that represents the deletion event.
386
387        let patch =
388            PatchBuilder::new(self.config.repo_id.clone(), self.config.commit_sha.clone()).build();
389
390        Ok(Some(patch))
391    }
392
393    /// Process a file rename
394    async fn process_file_rename(
395        &self,
396        _old_path: &Path,
397        new_path: &Path,
398    ) -> Result<Option<AstPatch>> {
399        // For rename, we could:
400        // 1. Delete nodes from old file
401        // 2. Parse and add nodes from new file
402        // Currently processing it as a new file
403        self.process_file_change(new_path).await
404    }
405
406    /// Get pipeline statistics
407    pub fn get_stats(&self) -> &PipelineStats {
408        &self.stats
409    }
410
411    /// Reset pipeline statistics
412    pub fn reset_stats(&mut self) {
413        self.stats = PipelineStats::default();
414    }
415}
416
417#[cfg(test)]
418mod tests {
419    use super::*;
420    use crate::parser::LanguageRegistry;
421    use std::path::PathBuf;
422    use std::sync::atomic::{AtomicUsize, Ordering};
423    use tempfile::TempDir;
424    use tokio::fs;
425
426    struct TestEventHandler {
427        event_count: Arc<AtomicUsize>,
428        error_count: Arc<AtomicUsize>,
429    }
430
431    impl TestEventHandler {
432        fn new() -> Self {
433            Self {
434                event_count: Arc::new(AtomicUsize::new(0)),
435                error_count: Arc::new(AtomicUsize::new(0)),
436            }
437        }
438    }
439
440    impl PipelineEventHandler for TestEventHandler {
441        fn handle_event(&self, _event: &PipelineEvent) -> Result<()> {
442            self.event_count.fetch_add(1, Ordering::Relaxed);
443            Ok(())
444        }
445
446        fn handle_error(&self, _error: &Error, _change_event: &ChangeEvent) {
447            self.error_count.fetch_add(1, Ordering::Relaxed);
448        }
449    }
450
451    fn create_test_pipeline() -> (MonitoringPipeline, TempDir, Arc<TestEventHandler>) {
452        let temp_dir = TempDir::new().unwrap();
453        let config = PipelineConfig::new("test_repo".to_string(), "abc123".to_string());
454        let registry = Arc::new(LanguageRegistry::new());
455        let parser_engine = Arc::new(ParserEngine::new(registry));
456        let handler = Arc::new(TestEventHandler::new());
457
458        let pipeline = MonitoringPipeline::new(
459            config,
460            parser_engine,
461            handler.clone() as Arc<dyn PipelineEventHandler>,
462        )
463        .unwrap();
464
465        (pipeline, temp_dir, handler)
466    }
467
468    #[test]
469    fn test_pipeline_config() {
470        let config = PipelineConfig::new("test".to_string(), "sha".to_string());
471        assert_eq!(config.repo_id, "test");
472        assert_eq!(config.commit_sha, "sha");
473        assert!(config.enable_batching);
474    }
475
476    #[test]
477    fn test_pipeline_stats() {
478        let mut stats = PipelineStats::default();
479
480        let event = PipelineEvent {
481            repo_id: "test".to_string(),
482            change_event: ChangeEvent {
483                repo_root: PathBuf::from("/repo"),
484                path: PathBuf::from("/repo/file.js"),
485                kind: ChangeKind::Modified,
486                timestamp: Instant::now(),
487            },
488            patch: None,
489            processed_at: Instant::now(),
490            processing_duration_ms: 100,
491        };
492
493        stats.update(&event, true);
494        assert_eq!(stats.events_processed, 1);
495        assert_eq!(stats.events_success, 1);
496        assert_eq!(stats.avg_processing_ms, 100.0);
497        assert_eq!(stats.success_rate(), 100.0);
498    }
499
500    #[tokio::test]
501    async fn test_pipeline_creation() {
502        let (pipeline, _temp_dir, _handler) = create_test_pipeline();
503        assert_eq!(pipeline.config.repo_id, "test_repo");
504        assert_eq!(pipeline.stats.events_processed, 0);
505    }
506
507    #[tokio::test]
508    async fn test_process_file_change() {
509        let (pipeline, temp_dir, _handler) = create_test_pipeline();
510
511        // Create a test file
512        let test_file = temp_dir.path().join("test.js");
513        fs::write(&test_file, "console.log('hello');")
514            .await
515            .unwrap();
516
517        // This will fail because no JS parser is registered, but tests the logic
518        let result = pipeline.process_file_change(&test_file).await;
519        assert!(result.is_err()); // Expected because no parser registered
520    }
521
522    #[tokio::test]
523    async fn test_process_empty_file() {
524        let (pipeline, temp_dir, _handler) = create_test_pipeline();
525
526        // Create an empty test file
527        let test_file = temp_dir.path().join("empty.js");
528        fs::write(&test_file, "").await.unwrap();
529
530        let result = pipeline.process_file_change(&test_file).await.unwrap();
531        assert!(result.is_none(), "Should be none"); // Should be None for empty files
532    }
533
534    #[tokio::test]
535    async fn test_process_nonexistent_file() {
536        let (pipeline, temp_dir, _handler) = create_test_pipeline();
537
538        let test_file = temp_dir.path().join("nonexistent.js");
539
540        let result = pipeline.process_file_change(&test_file).await.unwrap();
541        assert!(result.is_none(), "Should be none"); // Should be None for nonexistent files
542    }
543
544    #[test]
545    fn test_event_handlers() {
546        let handler = LoggingEventHandler::new(true);
547
548        let event = PipelineEvent {
549            repo_id: "test".to_string(),
550            change_event: ChangeEvent {
551                repo_root: PathBuf::from("/repo"),
552                path: PathBuf::from("/repo/file.js"),
553                kind: ChangeKind::Modified,
554                timestamp: Instant::now(),
555            },
556            patch: None,
557            processed_at: Instant::now(),
558            processing_duration_ms: 100,
559        };
560
561        // Should not panic
562        let _ = handler.handle_event(&event);
563
564        let error = Error::other("test error");
565        handler.handle_error(&error, &event.change_event);
566    }
567}