1use crate::error::{Error, Result};
7use crate::parser::{ParseContext, ParserEngine};
8use crate::patch::{AstPatch, PatchBuilder};
9use codeprism_utils::{ChangeEvent, ChangeKind, FileWatcher};
10use std::path::Path;
11use std::sync::Arc;
12use std::time::{Duration, Instant};
13use tokio::sync::oneshot;
14use tokio::time::sleep;
15
16#[derive(Debug, Clone)]
18pub struct PipelineEvent {
19 pub repo_id: String,
21 pub change_event: ChangeEvent,
23 pub patch: Option<AstPatch>,
25 pub processed_at: Instant,
27 pub processing_duration_ms: u64,
29}
30
31#[derive(Debug, Clone, Default)]
33pub struct PipelineStats {
34 pub events_processed: usize,
36 pub events_success: usize,
38 pub events_failed: usize,
40 pub events_filtered: usize,
42 pub avg_processing_ms: f64,
44 pub patches_generated: usize,
46 pub nodes_added: usize,
48 pub edges_added: usize,
50 pub nodes_removed: usize,
52 pub edges_removed: usize,
54}
55
56impl PipelineStats {
57 pub fn update(&mut self, event: &PipelineEvent, success: bool) {
59 self.events_processed += 1;
60
61 if success {
62 self.events_success += 1;
63 if let Some(ref patch) = event.patch {
64 self.patches_generated += 1;
65 self.nodes_added += patch.nodes_add.len();
66 self.edges_added += patch.edges_add.len();
67 self.nodes_removed += patch.nodes_delete.len();
68 self.edges_removed += patch.edges_delete.len();
69 }
70 } else {
71 self.events_failed += 1;
72 }
73
74 let total_time = self.avg_processing_ms * (self.events_processed - 1) as f64
76 + event.processing_duration_ms as f64;
77 self.avg_processing_ms = total_time / self.events_processed as f64;
78 }
79
80 pub fn success_rate(&self) -> f64 {
82 if self.events_processed == 0 {
83 0.0
84 } else {
85 (self.events_success as f64 / self.events_processed as f64) * 100.0
86 }
87 }
88
89 pub fn events_per_second(&self, duration_secs: f64) -> f64 {
91 if duration_secs <= 0.0 {
92 0.0
93 } else {
94 self.events_processed as f64 / duration_secs
95 }
96 }
97}
98
99pub trait PipelineEventHandler: Send + Sync {
101 fn handle_event(&self, event: &PipelineEvent) -> Result<()>;
103
104 fn handle_error(&self, error: &Error, change_event: &ChangeEvent);
106}
107
108#[derive(Debug, Default)]
110pub struct NoOpEventHandler;
111
112impl PipelineEventHandler for NoOpEventHandler {
113 fn handle_event(&self, _event: &PipelineEvent) -> Result<()> {
114 Ok(())
115 }
116
117 fn handle_error(&self, _error: &Error, _change_event: &ChangeEvent) {}
118}
119
120#[derive(Debug)]
122pub struct LoggingEventHandler {
123 verbose: bool,
124}
125
126impl LoggingEventHandler {
127 pub fn new(verbose: bool) -> Self {
129 Self { verbose }
130 }
131}
132
133impl PipelineEventHandler for LoggingEventHandler {
134 fn handle_event(&self, event: &PipelineEvent) -> Result<()> {
135 if self.verbose {
136 println!(
137 "Pipeline event: {:?} processed in {}ms",
138 event.change_event.kind, event.processing_duration_ms
139 );
140
141 if let Some(ref patch) = event.patch {
142 println!(
143 " Generated patch: +{} nodes, +{} edges, -{} nodes, -{} edges",
144 patch.nodes_add.len(),
145 patch.edges_add.len(),
146 patch.nodes_delete.len(),
147 patch.edges_delete.len()
148 );
149 }
150 }
151 Ok(())
152 }
153
154 fn handle_error(&self, error: &Error, change_event: &ChangeEvent) {
155 eprintln!(
156 "Pipeline error processing {:?}: {}",
157 change_event.path, error
158 );
159 }
160}
161
162#[derive(Debug, Clone)]
164pub struct PipelineConfig {
165 pub repo_id: String,
167 pub commit_sha: String,
169 pub debounce_duration: Duration,
171 pub max_queue_size: usize,
173 pub batch_size: usize,
175 pub enable_batching: bool,
177 pub processing_timeout: Duration,
179}
180
181impl PipelineConfig {
182 pub fn new(repo_id: String, commit_sha: String) -> Self {
184 Self {
185 repo_id,
186 commit_sha,
187 debounce_duration: Duration::from_millis(100),
188 max_queue_size: 1000,
189 batch_size: 10,
190 enable_batching: true,
191 processing_timeout: Duration::from_secs(30),
192 }
193 }
194}
195
196pub struct MonitoringPipeline {
198 config: PipelineConfig,
199 parser_engine: Arc<ParserEngine>,
200 file_watcher: FileWatcher,
201 event_handler: Arc<dyn PipelineEventHandler>,
202 stats: PipelineStats,
203 shutdown_tx: Option<oneshot::Sender<()>>,
204}
205
206impl MonitoringPipeline {
207 pub fn new(
209 config: PipelineConfig,
210 parser_engine: Arc<ParserEngine>,
211 event_handler: Arc<dyn PipelineEventHandler>,
212 ) -> Result<Self> {
213 let file_watcher = FileWatcher::with_debounce(config.debounce_duration)
214 .map_err(|e| Error::watcher(format!("Failed to create file watcher: {e}")))?;
215
216 Ok(Self {
217 config,
218 parser_engine,
219 file_watcher,
220 event_handler,
221 stats: PipelineStats::default(),
222 shutdown_tx: None,
223 })
224 }
225
226 pub async fn start_monitoring<P: AsRef<Path>>(&mut self, repo_path: P) -> Result<()> {
228 let repo_path = repo_path.as_ref();
229
230 self.file_watcher
232 .watch_dir(repo_path, repo_path.to_path_buf())
233 .map_err(|e| Error::watcher(format!("Failed to watch directory: {e}")))?;
234
235 let (shutdown_tx, shutdown_rx) = oneshot::channel();
237 self.shutdown_tx = Some(shutdown_tx);
238
239 let mut event_queue = Vec::new();
240 let mut last_batch_time = Instant::now();
241
242 tokio::select! {
243 _ = self.process_events(&mut event_queue, &mut last_batch_time) => {
244 }
246 _ = shutdown_rx => {
247 tracing::info!("Pipeline shutdown requested");
249 }
250 }
251
252 Ok(())
253 }
254
255 pub fn stop_monitoring(&mut self) {
257 if let Some(shutdown_tx) = self.shutdown_tx.take() {
258 let _ = shutdown_tx.send(());
259 }
260 }
261
262 async fn process_events(
264 &mut self,
265 event_queue: &mut Vec<ChangeEvent>,
266 last_batch_time: &mut Instant,
267 ) -> Result<()> {
268 loop {
269 if let Some(change_event) = self.file_watcher.next_change().await {
271 event_queue.push(change_event);
272
273 let should_process_batch = event_queue.len() >= self.config.batch_size
275 || (!self.config.enable_batching && !event_queue.is_empty())
276 || (last_batch_time.elapsed() > self.config.debounce_duration
277 && !event_queue.is_empty());
278
279 if should_process_batch {
280 self.process_event_batch(event_queue).await?;
281 *last_batch_time = Instant::now();
282 }
283 } else {
284 if !event_queue.is_empty() {
286 self.process_event_batch(event_queue).await?;
287 *last_batch_time = Instant::now();
288 }
289
290 sleep(Duration::from_millis(10)).await;
292 }
293 }
294 }
295
296 async fn process_event_batch(&mut self, event_queue: &mut Vec<ChangeEvent>) -> Result<()> {
298 let events_to_process = std::mem::take(event_queue);
299
300 for change_event in events_to_process {
301 match self.process_single_event(change_event.clone()).await {
302 Ok(pipeline_event) => {
303 self.stats.update(&pipeline_event, true);
304 if let Err(e) = self.event_handler.handle_event(&pipeline_event) {
305 self.event_handler.handle_error(&e, &change_event);
306 }
307 }
308 Err(e) => {
309 self.stats.events_failed += 1;
310 self.event_handler.handle_error(&e, &change_event);
311 }
312 }
313 }
314
315 Ok(())
316 }
317
318 async fn process_single_event(&self, change_event: ChangeEvent) -> Result<PipelineEvent> {
320 let start_time = Instant::now();
321
322 let patch = match change_event.kind {
323 ChangeKind::Created | ChangeKind::Modified => {
324 self.process_file_change(&change_event.path).await?
325 }
326 ChangeKind::Deleted => self.process_file_deletion(&change_event.path).await?,
327 ChangeKind::Renamed { ref old, ref new } => self.process_file_rename(old, new).await?,
328 };
329
330 let processing_duration = start_time.elapsed();
331
332 Ok(PipelineEvent {
333 repo_id: self.config.repo_id.clone(),
334 change_event,
335 patch,
336 processed_at: Instant::now(),
337 processing_duration_ms: processing_duration.as_millis() as u64,
338 })
339 }
340
341 async fn process_file_change(&self, file_path: &Path) -> Result<Option<AstPatch>> {
343 if !file_path.exists() {
345 return Ok(None);
346 }
347
348 let content = tokio::fs::read_to_string(file_path).await.map_err(|e| {
350 Error::io(format!(
351 "Failed to read file {}: {}",
352 file_path.display(),
353 e
354 ))
355 })?;
356
357 if content.trim().is_empty() {
359 return Ok(None);
360 }
361
362 let context = ParseContext::new(
364 self.config.repo_id.clone(),
365 file_path.to_path_buf(),
366 content,
367 );
368
369 let parse_result = self.parser_engine.parse_incremental(context)?;
371
372 let patch = PatchBuilder::new(self.config.repo_id.clone(), self.config.commit_sha.clone())
374 .add_nodes(parse_result.nodes)
375 .add_edges(parse_result.edges)
376 .build();
377
378 Ok(Some(patch))
379 }
380
381 async fn process_file_deletion(&self, _file_path: &Path) -> Result<Option<AstPatch>> {
383 let patch =
388 PatchBuilder::new(self.config.repo_id.clone(), self.config.commit_sha.clone()).build();
389
390 Ok(Some(patch))
391 }
392
393 async fn process_file_rename(
395 &self,
396 _old_path: &Path,
397 new_path: &Path,
398 ) -> Result<Option<AstPatch>> {
399 self.process_file_change(new_path).await
404 }
405
406 pub fn get_stats(&self) -> &PipelineStats {
408 &self.stats
409 }
410
411 pub fn reset_stats(&mut self) {
413 self.stats = PipelineStats::default();
414 }
415}
416
417#[cfg(test)]
418mod tests {
419 use super::*;
420 use crate::parser::LanguageRegistry;
421 use std::path::PathBuf;
422 use std::sync::atomic::{AtomicUsize, Ordering};
423 use tempfile::TempDir;
424 use tokio::fs;
425
426 struct TestEventHandler {
427 event_count: Arc<AtomicUsize>,
428 error_count: Arc<AtomicUsize>,
429 }
430
431 impl TestEventHandler {
432 fn new() -> Self {
433 Self {
434 event_count: Arc::new(AtomicUsize::new(0)),
435 error_count: Arc::new(AtomicUsize::new(0)),
436 }
437 }
438 }
439
440 impl PipelineEventHandler for TestEventHandler {
441 fn handle_event(&self, _event: &PipelineEvent) -> Result<()> {
442 self.event_count.fetch_add(1, Ordering::Relaxed);
443 Ok(())
444 }
445
446 fn handle_error(&self, _error: &Error, _change_event: &ChangeEvent) {
447 self.error_count.fetch_add(1, Ordering::Relaxed);
448 }
449 }
450
451 fn create_test_pipeline() -> (MonitoringPipeline, TempDir, Arc<TestEventHandler>) {
452 let temp_dir = TempDir::new().unwrap();
453 let config = PipelineConfig::new("test_repo".to_string(), "abc123".to_string());
454 let registry = Arc::new(LanguageRegistry::new());
455 let parser_engine = Arc::new(ParserEngine::new(registry));
456 let handler = Arc::new(TestEventHandler::new());
457
458 let pipeline = MonitoringPipeline::new(
459 config,
460 parser_engine,
461 handler.clone() as Arc<dyn PipelineEventHandler>,
462 )
463 .unwrap();
464
465 (pipeline, temp_dir, handler)
466 }
467
468 #[test]
469 fn test_pipeline_config() {
470 let config = PipelineConfig::new("test".to_string(), "sha".to_string());
471 assert_eq!(config.repo_id, "test");
472 assert_eq!(config.commit_sha, "sha");
473 assert!(config.enable_batching);
474 }
475
476 #[test]
477 fn test_pipeline_stats() {
478 let mut stats = PipelineStats::default();
479
480 let event = PipelineEvent {
481 repo_id: "test".to_string(),
482 change_event: ChangeEvent {
483 repo_root: PathBuf::from("/repo"),
484 path: PathBuf::from("/repo/file.js"),
485 kind: ChangeKind::Modified,
486 timestamp: Instant::now(),
487 },
488 patch: None,
489 processed_at: Instant::now(),
490 processing_duration_ms: 100,
491 };
492
493 stats.update(&event, true);
494 assert_eq!(stats.events_processed, 1);
495 assert_eq!(stats.events_success, 1);
496 assert_eq!(stats.avg_processing_ms, 100.0);
497 assert_eq!(stats.success_rate(), 100.0);
498 }
499
500 #[tokio::test]
501 async fn test_pipeline_creation() {
502 let (pipeline, _temp_dir, _handler) = create_test_pipeline();
503 assert_eq!(pipeline.config.repo_id, "test_repo");
504 assert_eq!(pipeline.stats.events_processed, 0);
505 }
506
507 #[tokio::test]
508 async fn test_process_file_change() {
509 let (pipeline, temp_dir, _handler) = create_test_pipeline();
510
511 let test_file = temp_dir.path().join("test.js");
513 fs::write(&test_file, "console.log('hello');")
514 .await
515 .unwrap();
516
517 let result = pipeline.process_file_change(&test_file).await;
519 assert!(result.is_err()); }
521
522 #[tokio::test]
523 async fn test_process_empty_file() {
524 let (pipeline, temp_dir, _handler) = create_test_pipeline();
525
526 let test_file = temp_dir.path().join("empty.js");
528 fs::write(&test_file, "").await.unwrap();
529
530 let result = pipeline.process_file_change(&test_file).await.unwrap();
531 assert!(result.is_none(), "Should be none"); }
533
534 #[tokio::test]
535 async fn test_process_nonexistent_file() {
536 let (pipeline, temp_dir, _handler) = create_test_pipeline();
537
538 let test_file = temp_dir.path().join("nonexistent.js");
539
540 let result = pipeline.process_file_change(&test_file).await.unwrap();
541 assert!(result.is_none(), "Should be none"); }
543
544 #[test]
545 fn test_event_handlers() {
546 let handler = LoggingEventHandler::new(true);
547
548 let event = PipelineEvent {
549 repo_id: "test".to_string(),
550 change_event: ChangeEvent {
551 repo_root: PathBuf::from("/repo"),
552 path: PathBuf::from("/repo/file.js"),
553 kind: ChangeKind::Modified,
554 timestamp: Instant::now(),
555 },
556 patch: None,
557 processed_at: Instant::now(),
558 processing_duration_ms: 100,
559 };
560
561 let _ = handler.handle_event(&event);
563
564 let error = Error::other("test error");
565 handler.handle_error(&error, &event.change_event);
566 }
567}