1use super::{
11 PipelineContext, PipelineData, PipelineConfig, PipelineStage, PipelineStageProcessor,
12 PipelineError, PipelineMetrics, memory::PipelineMemoryManager
13};
14use crate::lsp::{LspManager, LspConfig, QueryIntent};
15use crate::search::SearchEngine;
16use anyhow::Result;
17use std::collections::HashMap;
18use std::sync::Arc;
19use std::time::{Duration, Instant};
20use tokio::sync::{RwLock, Semaphore};
21use tokio::time::timeout;
22use tracing::{debug, error, info, warn};
23
24pub struct PipelineExecutor {
26 config: PipelineConfig,
27
28 search_engine: Arc<SearchEngine>,
30 lsp_manager: Arc<crate::lsp::LspState>,
31
32 memory_manager: Arc<PipelineMemoryManager>,
34
35 stages: HashMap<PipelineStage, Arc<dyn PipelineStageProcessor>>,
37
38 concurrency_limiter: Arc<Semaphore>,
40
41 metrics: Arc<RwLock<ExecutorMetrics>>,
43
44 stopping_predictor: Arc<RwLock<StoppingPredictor>>,
46}
47
48#[derive(Debug, Default, Clone)]
49pub struct ExecutorMetrics {
50 pub total_executions: u64,
51 pub successful_executions: u64,
52 pub failed_executions: u64,
53 pub avg_execution_time_ms: f64,
54 pub stage_fusion_count: u64,
55 pub early_stopping_count: u64,
56 pub memory_usage_peak_mb: f64,
57 pub sla_violations: u64,
58}
59
60#[derive(Debug)]
62pub struct StoppingPredictor {
63 quality_threshold: f64,
64 confidence_threshold: f64,
65 historical_accuracies: Vec<f64>,
66 max_history: usize,
67}
68
69impl Default for StoppingPredictor {
70 fn default() -> Self {
71 Self {
72 quality_threshold: 0.8,
73 confidence_threshold: 0.9,
74 historical_accuracies: Vec::new(),
75 max_history: 100,
76 }
77 }
78}
79
80impl StoppingPredictor {
81 pub fn should_stop_early(&self, current_quality: f64, confidence: f64, time_budget_used: f64) -> bool {
83 if time_budget_used < 0.3 {
85 return false;
86 }
87
88 if confidence >= self.confidence_threshold && current_quality >= self.quality_threshold {
90 return true;
91 }
92
93 if time_budget_used > 0.8 && current_quality >= self.quality_threshold * 0.7 {
95 return true;
96 }
97
98 false
99 }
100
101 pub fn update(&mut self, predicted_stop: bool, actual_quality: f64) {
103 let was_accurate = if predicted_stop {
105 actual_quality >= self.quality_threshold * 0.9
106 } else {
107 actual_quality < self.quality_threshold
108 };
109
110 let accuracy = if was_accurate { 1.0 } else { 0.0 };
111
112 self.historical_accuracies.push(accuracy);
113
114 if self.historical_accuracies.len() > self.max_history {
116 self.historical_accuracies.remove(0);
117 }
118
119 let avg_accuracy = self.historical_accuracies.iter().sum::<f64>() / self.historical_accuracies.len() as f64;
121
122 if avg_accuracy < 0.7 {
123 self.quality_threshold = (self.quality_threshold + 0.05).min(0.95);
125 self.confidence_threshold = (self.confidence_threshold + 0.02).min(0.98);
126 } else if avg_accuracy > 0.9 {
127 self.quality_threshold = (self.quality_threshold - 0.02).max(0.6);
129 self.confidence_threshold = (self.confidence_threshold - 0.01).max(0.8);
130 }
131 }
132}
133
134impl PipelineExecutor {
135 pub async fn new(config: PipelineConfig) -> Result<Self> {
137 let search_engine = Arc::new(SearchEngine::new(&config.max_concurrent.to_string()).await?);
139
140 let lsp_config = LspConfig {
141 enabled: true,
142 server_timeout_ms: config.max_latency_ms / 2, cache_ttl_hours: 24,
144 max_concurrent_requests: config.max_concurrent / 2,
145 routing_percentage: 0.5, ..Default::default()
147 };
148 let lsp_state = Arc::new(crate::lsp::LspState::new(lsp_config));
149 lsp_state.initialize().await?;
150 let lsp_manager = lsp_state;
151
152 let memory_manager = Arc::new(PipelineMemoryManager::new(256)); let concurrency_limiter = Arc::new(Semaphore::new(config.max_concurrent));
156 let metrics = Arc::new(RwLock::new(ExecutorMetrics::default()));
157 let stopping_predictor = Arc::new(RwLock::new(StoppingPredictor::default()));
158
159 let mut executor = Self {
160 config,
161 search_engine,
162 lsp_manager,
163 memory_manager,
164 stages: HashMap::new(),
165 concurrency_limiter,
166 metrics,
167 stopping_predictor,
168 };
169
170 executor.init_stages().await?;
172
173 info!("Pipeline executor initialized with {}ms SLA", executor.config.max_latency_ms);
174
175 Ok(executor)
176 }
177
178 async fn init_stages(&mut self) -> Result<()> {
179 let query_stage = Arc::new(QueryAnalysisStage::new());
181 self.stages.insert(PipelineStage::QueryAnalysis, query_stage);
182
183 let lsp_routing_stage = Arc::new(LspRoutingStage::new(self.lsp_manager.clone()));
185 self.stages.insert(PipelineStage::LspRouting, lsp_routing_stage);
186
187 let search_stage = Arc::new(ParallelSearchStage::new(
189 self.search_engine.clone(),
190 self.lsp_manager.clone(),
191 ));
192 self.stages.insert(PipelineStage::ParallelSearch, search_stage);
193
194 let fusion_stage = Arc::new(ResultFusionStage::new());
196 self.stages.insert(PipelineStage::ResultFusion, fusion_stage);
197
198 let post_process_stage = Arc::new(PostProcessStage::new());
200 self.stages.insert(PipelineStage::PostProcess, post_process_stage);
201
202 Ok(())
203 }
204
205 pub async fn execute(&self, context: PipelineContext) -> Result<PipelineData, PipelineError> {
207 let start_time = Instant::now();
208 let _permit = self.concurrency_limiter.acquire().await.map_err(|_| {
209 PipelineError::FusionError {
210 stage: PipelineStage::Input,
211 message: "Failed to acquire concurrency permit".to_string(),
212 }
213 })?;
214
215 if context.is_deadline_exceeded() {
217 return Err(PipelineError::DeadlineExceeded {
218 elapsed_ms: context.elapsed().as_millis() as u64,
219 deadline_ms: self.config.max_latency_ms,
220 });
221 }
222
223 debug!("Starting pipeline execution for request: {}", context.request_id);
224
225 let buffer_size = self.estimate_buffer_size(&context);
227 let buffer = self.memory_manager.allocate(buffer_size).await.map_err(|e| {
228 PipelineError::BufferAllocation {
229 requested_size: buffer_size
230 }
231 })?;
232
233 let mut data = PipelineData::new(buffer_size);
234 data.buffer = buffer;
235
236 let result = self.execute_stages(context, data).await;
238
239 let execution_time = start_time.elapsed();
241 self.update_metrics(execution_time, result.is_ok()).await;
242
243 result
244 }
245
246 async fn execute_stages(&self, context: PipelineContext, mut data: PipelineData) -> Result<PipelineData, PipelineError> {
247 let mut current_stage = PipelineStage::QueryAnalysis;
248
249 loop {
250 if context.is_deadline_exceeded() {
252 return Err(PipelineError::DeadlineExceeded {
253 elapsed_ms: context.elapsed().as_millis() as u64,
254 deadline_ms: self.config.max_latency_ms,
255 });
256 }
257
258 let stage_timeout = self.config.stage_timeouts.calculate_timeout(
260 self.config.max_latency_ms,
261 current_stage,
262 );
263
264 let stage_start = Instant::now();
266 let stage_processor = self.stages.get(¤t_stage).ok_or_else(|| {
267 PipelineError::FusionError {
268 stage: current_stage,
269 message: "Stage processor not found".to_string(),
270 }
271 })?;
272
273 debug!("Executing stage: {:?}", current_stage);
274
275 let stage_result = timeout(stage_timeout, stage_processor.process(&context, data)).await;
276
277 match stage_result {
278 Ok(Ok(new_data)) => {
279 data = new_data;
280 let stage_time = stage_start.elapsed();
281
282 debug!(
283 "Stage {:?} completed in {}ms",
284 current_stage,
285 stage_time.as_millis()
286 );
287
288 if self.should_stop_early(&context, &data, current_stage).await {
290 info!("Early stopping after stage {:?}", current_stage);
291
292 let mut metrics = self.metrics.write().await;
293 metrics.early_stopping_count += 1;
294
295 break;
296 }
297
298 if let Some(next_stage) = current_stage.next() {
300 current_stage = next_stage;
301 data.advance_stage(current_stage);
302 } else {
303 break; }
305 }
306 Ok(Err(e)) => {
307 error!("Stage {:?} failed: {:?}", current_stage, e);
308 return Err(e);
309 }
310 Err(_) => {
311 let elapsed = stage_start.elapsed();
312 return Err(PipelineError::StageTimeout {
313 stage: current_stage,
314 elapsed_ms: elapsed.as_millis() as u64,
315 });
316 }
317 }
318 }
319
320 Ok(data)
321 }
322
323 async fn should_stop_early(&self, context: &PipelineContext, data: &PipelineData, current_stage: PipelineStage) -> bool {
324 if !self.config.early_stopping_enabled {
325 return false;
326 }
327
328 if matches!(current_stage, PipelineStage::QueryAnalysis | PipelineStage::LspRouting) {
330 return false;
331 }
332
333 let quality_score = self.estimate_result_quality(data).await;
335 let confidence = self.estimate_result_confidence(data).await;
336 let time_budget_used = context.time_budget_percent();
337
338 let predictor = self.stopping_predictor.read().await;
339 let should_stop = predictor.should_stop_early(quality_score, confidence, time_budget_used);
340
341 if should_stop {
342 debug!(
343 "Early stopping decision: quality={:.3}, confidence={:.3}, time_used={:.1}%",
344 quality_score,
345 confidence,
346 time_budget_used * 100.0
347 );
348 }
349
350 should_stop
351 }
352
353 async fn estimate_result_quality(&self, data: &PipelineData) -> f64 {
354 let result_count = data.metadata.search_results_count;
356 let has_lsp_results = data.metadata.lsp_queries > 0;
357
358 let mut quality: f64 = 0.0;
359
360 if result_count > 0 {
362 quality += 0.3;
363 if result_count >= 10 {
364 quality += 0.3;
365 }
366 if result_count >= 50 {
367 quality += 0.2;
368 }
369 }
370
371 if has_lsp_results {
373 quality += 0.2;
374 }
375
376 quality.min(1.0)
377 }
378
379 async fn estimate_result_confidence(&self, data: &PipelineData) -> f64 {
380 let cache_hit_rate = if data.metadata.lsp_queries > 0 {
382 data.metadata.cache_hits as f64 / data.metadata.lsp_queries as f64
383 } else {
384 0.0
385 };
386
387 let data_completeness = if data.total_size() > 0 { 0.8 } else { 0.2 };
388
389 (cache_hit_rate * 0.3 + data_completeness * 0.7).min(1.0)
390 }
391
392 fn estimate_buffer_size(&self, context: &PipelineContext) -> usize {
393 let base_size = 1024; let query_factor = (context.query.len() / 10).max(1);
396 let results_factor = context.max_results / 10;
397
398 base_size * query_factor * results_factor
399 }
400
401 async fn update_metrics(&self, execution_time: Duration, success: bool) {
402 let mut metrics = self.metrics.write().await;
403
404 metrics.total_executions += 1;
405
406 if success {
407 metrics.successful_executions += 1;
408 } else {
409 metrics.failed_executions += 1;
410 }
411
412 let execution_time_ms = execution_time.as_millis() as f64;
413 let total = metrics.total_executions as f64;
414
415 metrics.avg_execution_time_ms =
417 (metrics.avg_execution_time_ms * (total - 1.0) + execution_time_ms) / total;
418
419 if execution_time.as_millis() as u64 > self.config.max_latency_ms {
421 metrics.sla_violations += 1;
422 }
423
424 let current_usage_mb = self.memory_manager.current_usage() as f64 / 1024.0 / 1024.0;
426 if current_usage_mb > metrics.memory_usage_peak_mb {
427 metrics.memory_usage_peak_mb = current_usage_mb;
428 }
429 }
430
431 pub async fn get_metrics(&self) -> ExecutorMetrics {
433 self.metrics.read().await.clone()
434 }
435
436 pub async fn shutdown(&self) -> Result<()> {
438 info!("Shutting down pipeline executor");
439
440 self.lsp_manager.shutdown().await?;
442
443 self.memory_manager.gc().await?;
445
446 Ok(())
447 }
448}
449
450pub struct QueryAnalysisStage;
452
453impl QueryAnalysisStage {
454 pub fn new() -> Self {
455 Self
456 }
457}
458
459#[async_trait::async_trait]
460impl PipelineStageProcessor for QueryAnalysisStage {
461 async fn process(&self, context: &PipelineContext, mut data: PipelineData) -> Result<PipelineData, PipelineError> {
462 let intent = QueryIntent::classify(&context.query);
464
465 data.metadata.bytes_processed += context.query.len();
467
468 let analysis = serde_json::json!({
470 "intent": intent,
471 "query_length": context.query.len(),
472 "estimated_complexity": context.query.split_whitespace().count(),
473 });
474
475 debug!("Query analysis completed: {:?}", intent);
476
477 Ok(data)
478 }
479
480 fn stage_id(&self) -> PipelineStage {
481 PipelineStage::QueryAnalysis
482 }
483
484 fn supports_fusion(&self) -> bool {
485 true }
487}
488
489pub struct LspRoutingStage {
491 lsp_manager: Arc<crate::lsp::LspState>,
492}
493
494impl LspRoutingStage {
495 pub fn new(lsp_manager: Arc<crate::lsp::LspState>) -> Self {
496 Self { lsp_manager }
497 }
498}
499
500#[async_trait::async_trait]
501impl PipelineStageProcessor for LspRoutingStage {
502 async fn process(&self, context: &PipelineContext, mut data: PipelineData) -> Result<PipelineData, PipelineError> {
503 let lsp_response = self.lsp_manager.search(&context.query, context.file_path.as_deref()).await
505 .map_err(|e| PipelineError::LspError { message: e.to_string() })?;
506
507 data.metadata.lsp_queries += 1;
509 if !lsp_response.lsp_results.is_empty() {
510 data.metadata.search_results_count += lsp_response.lsp_results.len();
511 }
512
513 debug!("LSP routing completed: {} results", lsp_response.lsp_results.len());
514
515 Ok(data)
516 }
517
518 fn stage_id(&self) -> PipelineStage {
519 PipelineStage::LspRouting
520 }
521
522 fn supports_fusion(&self) -> bool {
523 true
524 }
525}
526
527pub struct ParallelSearchStage {
529 search_engine: Arc<SearchEngine>,
530 lsp_manager: Arc<crate::lsp::LspState>,
531}
532
533impl ParallelSearchStage {
534 pub fn new(search_engine: Arc<SearchEngine>, lsp_manager: Arc<crate::lsp::LspState>) -> Self {
535 Self {
536 search_engine,
537 lsp_manager,
538 }
539 }
540}
541
542#[async_trait::async_trait]
543impl PipelineStageProcessor for ParallelSearchStage {
544 async fn process(&self, context: &PipelineContext, mut data: PipelineData) -> Result<PipelineData, PipelineError> {
545 let (search_results, _metrics) = self.search_engine.search(&context.query, context.max_results)
547 .await
548 .map_err(|e| PipelineError::SearchError { message: e.to_string() })?;
549
550 data.metadata.search_results_count += search_results.len();
551
552 debug!("Parallel search completed: {} results", search_results.len());
553
554 Ok(data)
555 }
556
557 fn stage_id(&self) -> PipelineStage {
558 PipelineStage::ParallelSearch
559 }
560
561 fn supports_fusion(&self) -> bool {
562 false }
564
565 fn estimate_processing_time(&self, data_size: usize) -> Duration {
566 Duration::from_millis(50 + (data_size / 1000) as u64)
568 }
569}
570
571pub struct ResultFusionStage;
573
574impl ResultFusionStage {
575 pub fn new() -> Self {
576 Self
577 }
578}
579
580#[async_trait::async_trait]
581impl PipelineStageProcessor for ResultFusionStage {
582 async fn process(&self, _context: &PipelineContext, mut data: PipelineData) -> Result<PipelineData, PipelineError> {
583 debug!("Result fusion completed");
587
588 Ok(data)
589 }
590
591 fn stage_id(&self) -> PipelineStage {
592 PipelineStage::ResultFusion
593 }
594
595 fn supports_fusion(&self) -> bool {
596 true
597 }
598}
599
600pub struct PostProcessStage;
602
603impl PostProcessStage {
604 pub fn new() -> Self {
605 Self
606 }
607}
608
609#[async_trait::async_trait]
610impl PipelineStageProcessor for PostProcessStage {
611 async fn process(&self, _context: &PipelineContext, mut data: PipelineData) -> Result<PipelineData, PipelineError> {
612 debug!("Post-processing completed");
615
616 Ok(data)
617 }
618
619 fn stage_id(&self) -> PipelineStage {
620 PipelineStage::PostProcess
621 }
622
623 fn supports_fusion(&self) -> bool {
624 true
625 }
626}
627
628#[cfg(test)]
629mod tests {
630 use super::*;
631 use crate::search::SearchEngine;
632 use crate::config::LensConfig;
633 use tempfile::TempDir;
634
635 async fn create_mock_search_engine() -> Arc<SearchEngine> {
636 let temp_dir = TempDir::new().unwrap();
637 Arc::new(SearchEngine::new(temp_dir.path()).await.unwrap())
638 }
639
640 #[test]
641 fn test_executor_metrics_default() {
642 let metrics = ExecutorMetrics::default();
643 assert_eq!(metrics.total_executions, 0);
644 assert_eq!(metrics.successful_executions, 0);
645 assert_eq!(metrics.failed_executions, 0);
646 assert_eq!(metrics.avg_execution_time_ms, 0.0);
647 assert_eq!(metrics.stage_fusion_count, 0);
648 assert_eq!(metrics.early_stopping_count, 0);
649 }
650
651 #[test]
652 fn test_stopping_predictor_default() {
653 let predictor = StoppingPredictor::default();
654 assert_eq!(predictor.confidence_threshold, 0.9);
655 assert_eq!(predictor.quality_threshold, 0.8);
656 assert_eq!(predictor.historical_accuracies.len(), 0);
657 assert_eq!(predictor.max_history, 100);
658 }
659
660 #[test]
661 fn test_executor_metrics_update() {
662 let mut metrics = ExecutorMetrics::default();
663 metrics.total_executions = 10;
664 metrics.successful_executions = 8;
665 metrics.failed_executions = 2;
666 metrics.avg_execution_time_ms = 150.0;
667
668 assert_eq!(metrics.total_executions, 10);
669 assert_eq!(metrics.successful_executions, 8);
670 assert_eq!(metrics.failed_executions, 2);
671 assert_eq!(metrics.avg_execution_time_ms, 150.0);
672 }
673
674 #[tokio::test]
675 async fn test_pipeline_executor_creation() {
676 let config = super::super::PipelineConfig::default();
677
678 let executor = PipelineExecutor::new(config.clone()).await;
679
680 assert!(executor.is_ok());
681 let executor = executor.unwrap();
682 assert_eq!(executor.config.max_latency_ms, 150);
683 assert_eq!(executor.config.max_concurrent, 50);
684 assert!(executor.config.fusion_enabled);
685 assert!(executor.config.early_stopping_enabled);
686 }
687
688 #[test]
689 fn test_stopping_predictor_should_stop() {
690 let predictor = StoppingPredictor::default();
691
692 assert!(!predictor.should_stop_early(0.9, 0.95, 0.2));
694
695 assert!(predictor.should_stop_early(0.85, 0.95, 0.5));
697
698 assert!(predictor.should_stop_early(0.6, 0.7, 0.9));
700
701 assert!(!predictor.should_stop_early(0.4, 0.5, 0.5));
703 }
704
705 #[test]
706 fn test_stopping_predictor_update() {
707 let mut predictor = StoppingPredictor::default();
708 let initial_quality_threshold = predictor.quality_threshold;
709 let initial_confidence_threshold = predictor.confidence_threshold;
710
711 predictor.update(true, 0.85);
713 assert_eq!(predictor.historical_accuracies.len(), 1);
714
715 for _ in 0..10 {
717 predictor.update(true, 0.9);
718 }
719
720 assert_eq!(predictor.historical_accuracies.len(), 11);
722 assert!(predictor.historical_accuracies.len() <= predictor.max_history);
723 }
724
725 #[tokio::test]
726 async fn test_query_analysis_stage() {
727 let stage = QueryAnalysisStage::new();
728 assert_eq!(stage.stage_id(), super::super::PipelineStage::QueryAnalysis);
729 assert!(stage.supports_fusion());
730 }
731
732 #[tokio::test]
733 async fn test_post_process_stage() {
734 let stage = PostProcessStage::new();
735 assert_eq!(stage.stage_id(), super::super::PipelineStage::PostProcess);
736 assert!(stage.supports_fusion());
737 }
738
739 #[tokio::test]
740 async fn test_result_fusion_stage() {
741 let stage = ResultFusionStage::new();
742 assert_eq!(stage.stage_id(), super::super::PipelineStage::ResultFusion);
743 assert!(stage.supports_fusion()); }
745}