1use crate::error::{Error, Result};
7use crate::parser::{ParseContext, ParserEngine};
8use crate::patch::{AstPatch, PatchBuilder};
9use crate::watcher::{ChangeEvent, ChangeKind, FileWatcher};
10use std::path::Path;
11use std::sync::Arc;
12use std::time::{Duration, Instant};
13use tokio::sync::oneshot;
14use tokio::time::sleep;
15
16#[derive(Debug, Clone)]
18pub struct PipelineEvent {
19 pub repo_id: String,
21 pub change_event: ChangeEvent,
23 pub patch: Option<AstPatch>,
25 pub processed_at: Instant,
27 pub processing_duration_ms: u64,
29}
30
31#[derive(Debug, Clone, Default)]
33pub struct PipelineStats {
34 pub events_processed: usize,
36 pub events_success: usize,
38 pub events_failed: usize,
40 pub events_filtered: usize,
42 pub avg_processing_ms: f64,
44 pub patches_generated: usize,
46 pub nodes_added: usize,
48 pub edges_added: usize,
50 pub nodes_removed: usize,
52 pub edges_removed: usize,
54}
55
56impl PipelineStats {
57 pub fn update(&mut self, event: &PipelineEvent, success: bool) {
59 self.events_processed += 1;
60
61 if success {
62 self.events_success += 1;
63 if let Some(ref patch) = event.patch {
64 self.patches_generated += 1;
65 self.nodes_added += patch.nodes_add.len();
66 self.edges_added += patch.edges_add.len();
67 self.nodes_removed += patch.nodes_delete.len();
68 self.edges_removed += patch.edges_delete.len();
69 }
70 } else {
71 self.events_failed += 1;
72 }
73
74 let total_time = self.avg_processing_ms * (self.events_processed - 1) as f64
76 + event.processing_duration_ms as f64;
77 self.avg_processing_ms = total_time / self.events_processed as f64;
78 }
79
80 pub fn success_rate(&self) -> f64 {
82 if self.events_processed == 0 {
83 0.0
84 } else {
85 (self.events_success as f64 / self.events_processed as f64) * 100.0
86 }
87 }
88
89 pub fn events_per_second(&self, duration_secs: f64) -> f64 {
91 if duration_secs <= 0.0 {
92 0.0
93 } else {
94 self.events_processed as f64 / duration_secs
95 }
96 }
97}
98
99pub trait PipelineEventHandler: Send + Sync {
101 fn handle_event(&self, event: &PipelineEvent) -> Result<()>;
103
104 fn handle_error(&self, error: &Error, change_event: &ChangeEvent);
106}
107
108#[derive(Debug, Default)]
110pub struct NoOpEventHandler;
111
112impl PipelineEventHandler for NoOpEventHandler {
113 fn handle_event(&self, _event: &PipelineEvent) -> Result<()> {
114 Ok(())
115 }
116
117 fn handle_error(&self, _error: &Error, _change_event: &ChangeEvent) {}
118}
119
120#[derive(Debug)]
122pub struct LoggingEventHandler {
123 verbose: bool,
124}
125
126impl LoggingEventHandler {
127 pub fn new(verbose: bool) -> Self {
129 Self { verbose }
130 }
131}
132
133impl PipelineEventHandler for LoggingEventHandler {
134 fn handle_event(&self, event: &PipelineEvent) -> Result<()> {
135 if self.verbose {
136 println!(
137 "Pipeline event: {:?} processed in {}ms",
138 event.change_event.kind, event.processing_duration_ms
139 );
140
141 if let Some(ref patch) = event.patch {
142 println!(
143 " Generated patch: +{} nodes, +{} edges, -{} nodes, -{} edges",
144 patch.nodes_add.len(),
145 patch.edges_add.len(),
146 patch.nodes_delete.len(),
147 patch.edges_delete.len()
148 );
149 }
150 }
151 Ok(())
152 }
153
154 fn handle_error(&self, error: &Error, change_event: &ChangeEvent) {
155 eprintln!(
156 "Pipeline error processing {:?}: {}",
157 change_event.path, error
158 );
159 }
160}
161
162#[derive(Debug, Clone)]
164pub struct PipelineConfig {
165 pub repo_id: String,
167 pub commit_sha: String,
169 pub debounce_duration: Duration,
171 pub max_queue_size: usize,
173 pub batch_size: usize,
175 pub enable_batching: bool,
177 pub processing_timeout: Duration,
179}
180
181impl PipelineConfig {
182 pub fn new(repo_id: String, commit_sha: String) -> Self {
184 Self {
185 repo_id,
186 commit_sha,
187 debounce_duration: Duration::from_millis(100),
188 max_queue_size: 1000,
189 batch_size: 10,
190 enable_batching: true,
191 processing_timeout: Duration::from_secs(30),
192 }
193 }
194}
195
196pub struct MonitoringPipeline {
198 config: PipelineConfig,
199 parser_engine: Arc<ParserEngine>,
200 file_watcher: FileWatcher,
201 event_handler: Arc<dyn PipelineEventHandler>,
202 stats: PipelineStats,
203 shutdown_tx: Option<oneshot::Sender<()>>,
204}
205
206impl MonitoringPipeline {
207 pub fn new(
209 config: PipelineConfig,
210 parser_engine: Arc<ParserEngine>,
211 event_handler: Arc<dyn PipelineEventHandler>,
212 ) -> Result<Self> {
213 let file_watcher = FileWatcher::with_debounce(config.debounce_duration)?;
214
215 Ok(Self {
216 config,
217 parser_engine,
218 file_watcher,
219 event_handler,
220 stats: PipelineStats::default(),
221 shutdown_tx: None,
222 })
223 }
224
225 pub async fn start_monitoring<P: AsRef<Path>>(&mut self, repo_path: P) -> Result<()> {
227 let repo_path = repo_path.as_ref();
228
229 self.file_watcher
231 .watch_dir(repo_path, repo_path.to_path_buf())?;
232
233 let (shutdown_tx, shutdown_rx) = oneshot::channel();
235 self.shutdown_tx = Some(shutdown_tx);
236
237 let mut event_queue = Vec::new();
238 let mut last_batch_time = Instant::now();
239
240 tokio::select! {
241 _ = self.process_events(&mut event_queue, &mut last_batch_time) => {
242 }
244 _ = shutdown_rx => {
245 tracing::info!("Pipeline shutdown requested");
247 }
248 }
249
250 Ok(())
251 }
252
253 pub fn stop_monitoring(&mut self) {
255 if let Some(shutdown_tx) = self.shutdown_tx.take() {
256 let _ = shutdown_tx.send(());
257 }
258 }
259
260 async fn process_events(
262 &mut self,
263 event_queue: &mut Vec<ChangeEvent>,
264 last_batch_time: &mut Instant,
265 ) -> Result<()> {
266 loop {
267 if let Some(change_event) = self.file_watcher.next_change().await {
269 event_queue.push(change_event);
270
271 let should_process_batch = event_queue.len() >= self.config.batch_size
273 || (!self.config.enable_batching && !event_queue.is_empty())
274 || (last_batch_time.elapsed() > self.config.debounce_duration
275 && !event_queue.is_empty());
276
277 if should_process_batch {
278 self.process_event_batch(event_queue).await?;
279 *last_batch_time = Instant::now();
280 }
281 } else {
282 if !event_queue.is_empty() {
284 self.process_event_batch(event_queue).await?;
285 *last_batch_time = Instant::now();
286 }
287
288 sleep(Duration::from_millis(10)).await;
290 }
291 }
292 }
293
294 async fn process_event_batch(&mut self, event_queue: &mut Vec<ChangeEvent>) -> Result<()> {
296 let events_to_process = std::mem::take(event_queue);
297
298 for change_event in events_to_process {
299 match self.process_single_event(change_event.clone()).await {
300 Ok(pipeline_event) => {
301 self.stats.update(&pipeline_event, true);
302 if let Err(e) = self.event_handler.handle_event(&pipeline_event) {
303 self.event_handler.handle_error(&e, &change_event);
304 }
305 }
306 Err(e) => {
307 self.stats.events_failed += 1;
308 self.event_handler.handle_error(&e, &change_event);
309 }
310 }
311 }
312
313 Ok(())
314 }
315
316 async fn process_single_event(&self, change_event: ChangeEvent) -> Result<PipelineEvent> {
318 let start_time = Instant::now();
319
320 let patch = match change_event.kind {
321 ChangeKind::Created | ChangeKind::Modified => {
322 self.process_file_change(&change_event.path).await?
323 }
324 ChangeKind::Deleted => self.process_file_deletion(&change_event.path).await?,
325 ChangeKind::Renamed { ref old, ref new } => self.process_file_rename(old, new).await?,
326 };
327
328 let processing_duration = start_time.elapsed();
329
330 Ok(PipelineEvent {
331 repo_id: self.config.repo_id.clone(),
332 change_event,
333 patch,
334 processed_at: Instant::now(),
335 processing_duration_ms: processing_duration.as_millis() as u64,
336 })
337 }
338
339 async fn process_file_change(&self, file_path: &Path) -> Result<Option<AstPatch>> {
341 if !file_path.exists() {
343 return Ok(None);
344 }
345
346 let content = tokio::fs::read_to_string(file_path).await.map_err(|e| {
348 Error::io(format!(
349 "Failed to read file {}: {}",
350 file_path.display(),
351 e
352 ))
353 })?;
354
355 if content.trim().is_empty() {
357 return Ok(None);
358 }
359
360 let context = ParseContext::new(
362 self.config.repo_id.clone(),
363 file_path.to_path_buf(),
364 content,
365 );
366
367 let parse_result = self.parser_engine.parse_incremental(context)?;
369
370 let patch = PatchBuilder::new(self.config.repo_id.clone(), self.config.commit_sha.clone())
372 .add_nodes(parse_result.nodes)
373 .add_edges(parse_result.edges)
374 .build();
375
376 Ok(Some(patch))
377 }
378
379 async fn process_file_deletion(&self, _file_path: &Path) -> Result<Option<AstPatch>> {
381 let patch =
386 PatchBuilder::new(self.config.repo_id.clone(), self.config.commit_sha.clone()).build();
387
388 Ok(Some(patch))
389 }
390
391 async fn process_file_rename(
393 &self,
394 _old_path: &Path,
395 new_path: &Path,
396 ) -> Result<Option<AstPatch>> {
397 self.process_file_change(new_path).await
402 }
403
404 pub fn get_stats(&self) -> &PipelineStats {
406 &self.stats
407 }
408
409 pub fn reset_stats(&mut self) {
411 self.stats = PipelineStats::default();
412 }
413}
414
415#[cfg(test)]
416mod tests {
417 use super::*;
418 use crate::parser::LanguageRegistry;
419 use std::path::PathBuf;
420 use std::sync::atomic::{AtomicUsize, Ordering};
421 use tempfile::TempDir;
422 use tokio::fs;
423
424 struct TestEventHandler {
425 event_count: Arc<AtomicUsize>,
426 error_count: Arc<AtomicUsize>,
427 }
428
429 impl TestEventHandler {
430 fn new() -> Self {
431 Self {
432 event_count: Arc::new(AtomicUsize::new(0)),
433 error_count: Arc::new(AtomicUsize::new(0)),
434 }
435 }
436 }
437
438 impl PipelineEventHandler for TestEventHandler {
439 fn handle_event(&self, _event: &PipelineEvent) -> Result<()> {
440 self.event_count.fetch_add(1, Ordering::Relaxed);
441 Ok(())
442 }
443
444 fn handle_error(&self, _error: &Error, _change_event: &ChangeEvent) {
445 self.error_count.fetch_add(1, Ordering::Relaxed);
446 }
447 }
448
449 fn create_test_pipeline() -> (MonitoringPipeline, TempDir, Arc<TestEventHandler>) {
450 let temp_dir = TempDir::new().unwrap();
451 let config = PipelineConfig::new("test_repo".to_string(), "abc123".to_string());
452 let registry = Arc::new(LanguageRegistry::new());
453 let parser_engine = Arc::new(ParserEngine::new(registry));
454 let handler = Arc::new(TestEventHandler::new());
455
456 let pipeline = MonitoringPipeline::new(
457 config,
458 parser_engine,
459 handler.clone() as Arc<dyn PipelineEventHandler>,
460 )
461 .unwrap();
462
463 (pipeline, temp_dir, handler)
464 }
465
466 #[test]
467 fn test_pipeline_config() {
468 let config = PipelineConfig::new("test".to_string(), "sha".to_string());
469 assert_eq!(config.repo_id, "test");
470 assert_eq!(config.commit_sha, "sha");
471 assert!(config.enable_batching);
472 }
473
474 #[test]
475 fn test_pipeline_stats() {
476 let mut stats = PipelineStats::default();
477
478 let event = PipelineEvent {
479 repo_id: "test".to_string(),
480 change_event: ChangeEvent {
481 repo_root: PathBuf::from("/repo"),
482 path: PathBuf::from("/repo/file.js"),
483 kind: ChangeKind::Modified,
484 timestamp: Instant::now(),
485 },
486 patch: None,
487 processed_at: Instant::now(),
488 processing_duration_ms: 100,
489 };
490
491 stats.update(&event, true);
492 assert_eq!(stats.events_processed, 1);
493 assert_eq!(stats.events_success, 1);
494 assert_eq!(stats.avg_processing_ms, 100.0);
495 assert_eq!(stats.success_rate(), 100.0);
496 }
497
498 #[tokio::test]
499 async fn test_pipeline_creation() {
500 let (pipeline, _temp_dir, _handler) = create_test_pipeline();
501 assert_eq!(pipeline.config.repo_id, "test_repo");
502 assert_eq!(pipeline.stats.events_processed, 0);
503 }
504
505 #[tokio::test]
506 async fn test_process_file_change() {
507 let (pipeline, temp_dir, _handler) = create_test_pipeline();
508
509 let test_file = temp_dir.path().join("test.js");
511 fs::write(&test_file, "console.log('hello');")
512 .await
513 .unwrap();
514
515 let result = pipeline.process_file_change(&test_file).await;
517 assert!(result.is_err()); }
519
520 #[tokio::test]
521 async fn test_process_empty_file() {
522 let (pipeline, temp_dir, _handler) = create_test_pipeline();
523
524 let test_file = temp_dir.path().join("empty.js");
526 fs::write(&test_file, "").await.unwrap();
527
528 let result = pipeline.process_file_change(&test_file).await.unwrap();
529 assert!(result.is_none()); }
531
532 #[tokio::test]
533 async fn test_process_nonexistent_file() {
534 let (pipeline, temp_dir, _handler) = create_test_pipeline();
535
536 let test_file = temp_dir.path().join("nonexistent.js");
537
538 let result = pipeline.process_file_change(&test_file).await.unwrap();
539 assert!(result.is_none()); }
541
542 #[test]
543 fn test_event_handlers() {
544 let handler = LoggingEventHandler::new(true);
545
546 let event = PipelineEvent {
547 repo_id: "test".to_string(),
548 change_event: ChangeEvent {
549 repo_root: PathBuf::from("/repo"),
550 path: PathBuf::from("/repo/file.js"),
551 kind: ChangeKind::Modified,
552 timestamp: Instant::now(),
553 },
554 patch: None,
555 processed_at: Instant::now(),
556 processing_duration_ms: 100,
557 };
558
559 let _ = handler.handle_event(&event);
561
562 let error = Error::other("test error");
563 handler.handle_error(&error, &event.change_event);
564 }
565}