lens_core/pipeline/
executor.rs

1//! Pipeline Executor
2//!
3//! Orchestrates the fused pipeline execution with:
4//! - Async overlap processing
5//! - Stage fusion for performance
6//! - SLA-bounded execution
7//! - Learning-to-stop algorithms
8//! - Cross-shard optimization
9
10use super::{
11    PipelineContext, PipelineData, PipelineConfig, PipelineStage, PipelineStageProcessor,
12    PipelineError, PipelineMetrics, memory::PipelineMemoryManager
13};
14use crate::lsp::{LspManager, LspConfig, QueryIntent};
15use crate::search::SearchEngine;
16use anyhow::Result;
17use std::collections::HashMap;
18use std::sync::Arc;
19use std::time::{Duration, Instant};
20use tokio::sync::{RwLock, Semaphore};
21use tokio::time::timeout;
22use tracing::{debug, error, info, warn};
23
24/// Fused pipeline executor with stage management
25pub struct PipelineExecutor {
26    config: PipelineConfig,
27    
28    // Core engines
29    search_engine: Arc<SearchEngine>,
30    lsp_manager: Arc<crate::lsp::LspState>,
31    
32    // Memory management
33    memory_manager: Arc<PipelineMemoryManager>,
34    
35    // Stage processors
36    stages: HashMap<PipelineStage, Arc<dyn PipelineStageProcessor>>,
37    
38    // Execution control
39    concurrency_limiter: Arc<Semaphore>,
40    
41    // Metrics and monitoring
42    metrics: Arc<RwLock<ExecutorMetrics>>,
43    
44    // Learning-to-stop state
45    stopping_predictor: Arc<RwLock<StoppingPredictor>>,
46}
47
48#[derive(Debug, Default, Clone)]
49pub struct ExecutorMetrics {
50    pub total_executions: u64,
51    pub successful_executions: u64,
52    pub failed_executions: u64,
53    pub avg_execution_time_ms: f64,
54    pub stage_fusion_count: u64,
55    pub early_stopping_count: u64,
56    pub memory_usage_peak_mb: f64,
57    pub sla_violations: u64,
58}
59
60/// Learning-to-stop predictor for early termination
61#[derive(Debug)]
62pub struct StoppingPredictor {
63    quality_threshold: f64,
64    confidence_threshold: f64,
65    historical_accuracies: Vec<f64>,
66    max_history: usize,
67}
68
69impl Default for StoppingPredictor {
70    fn default() -> Self {
71        Self {
72            quality_threshold: 0.8,
73            confidence_threshold: 0.9,
74            historical_accuracies: Vec::new(),
75            max_history: 100,
76        }
77    }
78}
79
80impl StoppingPredictor {
81    /// Predict if we should stop early based on current results
82    pub fn should_stop_early(&self, current_quality: f64, confidence: f64, time_budget_used: f64) -> bool {
83        // Don't stop if we haven't used much time yet
84        if time_budget_used < 0.3 {
85            return false;
86        }
87        
88        // Stop if we're confident and quality is good
89        if confidence >= self.confidence_threshold && current_quality >= self.quality_threshold {
90            return true;
91        }
92        
93        // Stop if we're running out of time and results are reasonable
94        if time_budget_used > 0.8 && current_quality >= self.quality_threshold * 0.7 {
95            return true;
96        }
97        
98        false
99    }
100    
101    /// Update predictor with execution outcome
102    pub fn update(&mut self, predicted_stop: bool, actual_quality: f64) {
103        // Calculate accuracy of the prediction
104        let was_accurate = if predicted_stop {
105            actual_quality >= self.quality_threshold * 0.9
106        } else {
107            actual_quality < self.quality_threshold
108        };
109        
110        let accuracy = if was_accurate { 1.0 } else { 0.0 };
111        
112        self.historical_accuracies.push(accuracy);
113        
114        // Keep only recent history
115        if self.historical_accuracies.len() > self.max_history {
116            self.historical_accuracies.remove(0);
117        }
118        
119        // Adapt thresholds based on accuracy
120        let avg_accuracy = self.historical_accuracies.iter().sum::<f64>() / self.historical_accuracies.len() as f64;
121        
122        if avg_accuracy < 0.7 {
123            // Too many false positives, be more conservative
124            self.quality_threshold = (self.quality_threshold + 0.05).min(0.95);
125            self.confidence_threshold = (self.confidence_threshold + 0.02).min(0.98);
126        } else if avg_accuracy > 0.9 {
127            // Very accurate, can be more aggressive
128            self.quality_threshold = (self.quality_threshold - 0.02).max(0.6);
129            self.confidence_threshold = (self.confidence_threshold - 0.01).max(0.8);
130        }
131    }
132}
133
134impl PipelineExecutor {
135    /// Create a new pipeline executor
136    pub async fn new(config: PipelineConfig) -> Result<Self> {
137        // Initialize core engines
138        let search_engine = Arc::new(SearchEngine::new(&config.max_concurrent.to_string()).await?);
139        
140        let lsp_config = LspConfig {
141            enabled: true,
142            server_timeout_ms: config.max_latency_ms / 2, // Use half of SLA for LSP
143            cache_ttl_hours: 24,
144            max_concurrent_requests: config.max_concurrent / 2,
145            routing_percentage: 0.5, // 50% routing target
146            ..Default::default()
147        };
148        let lsp_state = Arc::new(crate::lsp::LspState::new(lsp_config));
149        lsp_state.initialize().await?;
150        let lsp_manager = lsp_state;
151        
152        // Initialize memory manager (allocate 25% of system memory limit for pipeline)
153        let memory_manager = Arc::new(PipelineMemoryManager::new(256)); // 256MB limit
154        
155        let concurrency_limiter = Arc::new(Semaphore::new(config.max_concurrent));
156        let metrics = Arc::new(RwLock::new(ExecutorMetrics::default()));
157        let stopping_predictor = Arc::new(RwLock::new(StoppingPredictor::default()));
158        
159        let mut executor = Self {
160            config,
161            search_engine,
162            lsp_manager,
163            memory_manager,
164            stages: HashMap::new(),
165            concurrency_limiter,
166            metrics,
167            stopping_predictor,
168        };
169        
170        // Initialize stage processors
171        executor.init_stages().await?;
172        
173        info!("Pipeline executor initialized with {}ms SLA", executor.config.max_latency_ms);
174        
175        Ok(executor)
176    }
177    
178    async fn init_stages(&mut self) -> Result<()> {
179        // Query analysis stage
180        let query_stage = Arc::new(QueryAnalysisStage::new());
181        self.stages.insert(PipelineStage::QueryAnalysis, query_stage);
182        
183        // LSP routing stage  
184        let lsp_routing_stage = Arc::new(LspRoutingStage::new(self.lsp_manager.clone()));
185        self.stages.insert(PipelineStage::LspRouting, lsp_routing_stage);
186        
187        // Parallel search stage
188        let search_stage = Arc::new(ParallelSearchStage::new(
189            self.search_engine.clone(),
190            self.lsp_manager.clone(),
191        ));
192        self.stages.insert(PipelineStage::ParallelSearch, search_stage);
193        
194        // Result fusion stage
195        let fusion_stage = Arc::new(ResultFusionStage::new());
196        self.stages.insert(PipelineStage::ResultFusion, fusion_stage);
197        
198        // Post-processing stage
199        let post_process_stage = Arc::new(PostProcessStage::new());
200        self.stages.insert(PipelineStage::PostProcess, post_process_stage);
201        
202        Ok(())
203    }
204    
205    /// Execute a complete pipeline run
206    pub async fn execute(&self, context: PipelineContext) -> Result<PipelineData, PipelineError> {
207        let start_time = Instant::now();
208        let _permit = self.concurrency_limiter.acquire().await.map_err(|_| {
209            PipelineError::FusionError {
210                stage: PipelineStage::Input,
211                message: "Failed to acquire concurrency permit".to_string(),
212            }
213        })?;
214        
215        // Check SLA deadline before starting
216        if context.is_deadline_exceeded() {
217            return Err(PipelineError::DeadlineExceeded {
218                elapsed_ms: context.elapsed().as_millis() as u64,
219                deadline_ms: self.config.max_latency_ms,
220            });
221        }
222        
223        debug!("Starting pipeline execution for request: {}", context.request_id);
224        
225        // Initialize pipeline data with appropriate buffer size
226        let buffer_size = self.estimate_buffer_size(&context);
227        let buffer = self.memory_manager.allocate(buffer_size).await.map_err(|e| {
228            PipelineError::BufferAllocation { 
229                requested_size: buffer_size 
230            }
231        })?;
232        
233        let mut data = PipelineData::new(buffer_size);
234        data.buffer = buffer;
235        
236        // Execute pipeline stages
237        let result = self.execute_stages(context, data).await;
238        
239        // Update metrics
240        let execution_time = start_time.elapsed();
241        self.update_metrics(execution_time, result.is_ok()).await;
242        
243        result
244    }
245    
246    async fn execute_stages(&self, context: PipelineContext, mut data: PipelineData) -> Result<PipelineData, PipelineError> {
247        let mut current_stage = PipelineStage::QueryAnalysis;
248        
249        loop {
250            // Check deadline before each stage
251            if context.is_deadline_exceeded() {
252                return Err(PipelineError::DeadlineExceeded {
253                    elapsed_ms: context.elapsed().as_millis() as u64,
254                    deadline_ms: self.config.max_latency_ms,
255                });
256            }
257            
258            // Get stage timeout
259            let stage_timeout = self.config.stage_timeouts.calculate_timeout(
260                self.config.max_latency_ms,
261                current_stage,
262            );
263            
264            // Execute stage with timeout
265            let stage_start = Instant::now();
266            let stage_processor = self.stages.get(&current_stage).ok_or_else(|| {
267                PipelineError::FusionError {
268                    stage: current_stage,
269                    message: "Stage processor not found".to_string(),
270                }
271            })?;
272            
273            debug!("Executing stage: {:?}", current_stage);
274            
275            let stage_result = timeout(stage_timeout, stage_processor.process(&context, data)).await;
276            
277            match stage_result {
278                Ok(Ok(new_data)) => {
279                    data = new_data;
280                    let stage_time = stage_start.elapsed();
281                    
282                    debug!(
283                        "Stage {:?} completed in {}ms",
284                        current_stage,
285                        stage_time.as_millis()
286                    );
287                    
288                    // Check if we should stop early
289                    if self.should_stop_early(&context, &data, current_stage).await {
290                        info!("Early stopping after stage {:?}", current_stage);
291                        
292                        let mut metrics = self.metrics.write().await;
293                        metrics.early_stopping_count += 1;
294                        
295                        break;
296                    }
297                    
298                    // Advance to next stage
299                    if let Some(next_stage) = current_stage.next() {
300                        current_stage = next_stage;
301                        data.advance_stage(current_stage);
302                    } else {
303                        break; // Pipeline complete
304                    }
305                }
306                Ok(Err(e)) => {
307                    error!("Stage {:?} failed: {:?}", current_stage, e);
308                    return Err(e);
309                }
310                Err(_) => {
311                    let elapsed = stage_start.elapsed();
312                    return Err(PipelineError::StageTimeout {
313                        stage: current_stage,
314                        elapsed_ms: elapsed.as_millis() as u64,
315                    });
316                }
317            }
318        }
319        
320        Ok(data)
321    }
322    
323    async fn should_stop_early(&self, context: &PipelineContext, data: &PipelineData, current_stage: PipelineStage) -> bool {
324        if !self.config.early_stopping_enabled {
325            return false;
326        }
327        
328        // Don't stop too early in the pipeline
329        if matches!(current_stage, PipelineStage::QueryAnalysis | PipelineStage::LspRouting) {
330            return false;
331        }
332        
333        // Calculate current quality estimate (simplified)
334        let quality_score = self.estimate_result_quality(data).await;
335        let confidence = self.estimate_result_confidence(data).await;
336        let time_budget_used = context.time_budget_percent();
337        
338        let predictor = self.stopping_predictor.read().await;
339        let should_stop = predictor.should_stop_early(quality_score, confidence, time_budget_used);
340        
341        if should_stop {
342            debug!(
343                "Early stopping decision: quality={:.3}, confidence={:.3}, time_used={:.1}%",
344                quality_score,
345                confidence,
346                time_budget_used * 100.0
347            );
348        }
349        
350        should_stop
351    }
352    
353    async fn estimate_result_quality(&self, data: &PipelineData) -> f64 {
354        // Simplified quality estimation based on result count and diversity
355        let result_count = data.metadata.search_results_count;
356        let has_lsp_results = data.metadata.lsp_queries > 0;
357        
358        let mut quality: f64 = 0.0;
359        
360        // Base quality from result count
361        if result_count > 0 {
362            quality += 0.3;
363            if result_count >= 10 {
364                quality += 0.3;
365            }
366            if result_count >= 50 {
367                quality += 0.2;
368            }
369        }
370        
371        // Bonus for LSP integration
372        if has_lsp_results {
373            quality += 0.2;
374        }
375        
376        quality.min(1.0)
377    }
378    
379    async fn estimate_result_confidence(&self, data: &PipelineData) -> f64 {
380        // Simplified confidence estimation
381        let cache_hit_rate = if data.metadata.lsp_queries > 0 {
382            data.metadata.cache_hits as f64 / data.metadata.lsp_queries as f64
383        } else {
384            0.0
385        };
386        
387        let data_completeness = if data.total_size() > 0 { 0.8 } else { 0.2 };
388        
389        (cache_hit_rate * 0.3 + data_completeness * 0.7).min(1.0)
390    }
391    
392    fn estimate_buffer_size(&self, context: &PipelineContext) -> usize {
393        // Estimate buffer size based on query complexity and expected results
394        let base_size = 1024; // 1KB base
395        let query_factor = (context.query.len() / 10).max(1);
396        let results_factor = context.max_results / 10;
397        
398        base_size * query_factor * results_factor
399    }
400    
401    async fn update_metrics(&self, execution_time: Duration, success: bool) {
402        let mut metrics = self.metrics.write().await;
403        
404        metrics.total_executions += 1;
405        
406        if success {
407            metrics.successful_executions += 1;
408        } else {
409            metrics.failed_executions += 1;
410        }
411        
412        let execution_time_ms = execution_time.as_millis() as f64;
413        let total = metrics.total_executions as f64;
414        
415        // Update average execution time
416        metrics.avg_execution_time_ms = 
417            (metrics.avg_execution_time_ms * (total - 1.0) + execution_time_ms) / total;
418        
419        // Check SLA violation
420        if execution_time.as_millis() as u64 > self.config.max_latency_ms {
421            metrics.sla_violations += 1;
422        }
423        
424        // Update memory usage peak
425        let current_usage_mb = self.memory_manager.current_usage() as f64 / 1024.0 / 1024.0;
426        if current_usage_mb > metrics.memory_usage_peak_mb {
427            metrics.memory_usage_peak_mb = current_usage_mb;
428        }
429    }
430    
431    /// Get executor metrics
432    pub async fn get_metrics(&self) -> ExecutorMetrics {
433        self.metrics.read().await.clone()
434    }
435    
436    /// Shutdown the executor
437    pub async fn shutdown(&self) -> Result<()> {
438        info!("Shutting down pipeline executor");
439        
440        // Shutdown LSP manager
441        self.lsp_manager.shutdown().await?;
442        
443        // Clear memory manager
444        self.memory_manager.gc().await?;
445        
446        Ok(())
447    }
448}
449
450/// Query analysis stage processor
451pub struct QueryAnalysisStage;
452
453impl QueryAnalysisStage {
454    pub fn new() -> Self {
455        Self
456    }
457}
458
459#[async_trait::async_trait]
460impl PipelineStageProcessor for QueryAnalysisStage {
461    async fn process(&self, context: &PipelineContext, mut data: PipelineData) -> Result<PipelineData, PipelineError> {
462        // Analyze query intent and complexity
463        let intent = QueryIntent::classify(&context.query);
464        
465        // Store analysis results in metadata
466        data.metadata.bytes_processed += context.query.len();
467        
468        // Add query analysis segment to data
469        let analysis = serde_json::json!({
470            "intent": intent,
471            "query_length": context.query.len(),
472            "estimated_complexity": context.query.split_whitespace().count(),
473        });
474        
475        debug!("Query analysis completed: {:?}", intent);
476        
477        Ok(data)
478    }
479    
480    fn stage_id(&self) -> PipelineStage {
481        PipelineStage::QueryAnalysis
482    }
483    
484    fn supports_fusion(&self) -> bool {
485        true // Can be fused with LSP routing
486    }
487}
488
489/// LSP routing stage processor
490pub struct LspRoutingStage {
491    lsp_manager: Arc<crate::lsp::LspState>,
492}
493
494impl LspRoutingStage {
495    pub fn new(lsp_manager: Arc<crate::lsp::LspState>) -> Self {
496        Self { lsp_manager }
497    }
498}
499
500#[async_trait::async_trait]
501impl PipelineStageProcessor for LspRoutingStage {
502    async fn process(&self, context: &PipelineContext, mut data: PipelineData) -> Result<PipelineData, PipelineError> {
503        // Route query through LSP if appropriate
504        let lsp_response = self.lsp_manager.search(&context.query, context.file_path.as_deref()).await
505            .map_err(|e| PipelineError::LspError { message: e.to_string() })?;
506        
507        // Update metadata
508        data.metadata.lsp_queries += 1;
509        if !lsp_response.lsp_results.is_empty() {
510            data.metadata.search_results_count += lsp_response.lsp_results.len();
511        }
512        
513        debug!("LSP routing completed: {} results", lsp_response.lsp_results.len());
514        
515        Ok(data)
516    }
517    
518    fn stage_id(&self) -> PipelineStage {
519        PipelineStage::LspRouting
520    }
521    
522    fn supports_fusion(&self) -> bool {
523        true
524    }
525}
526
527/// Parallel search stage processor
528pub struct ParallelSearchStage {
529    search_engine: Arc<SearchEngine>,
530    lsp_manager: Arc<crate::lsp::LspState>,
531}
532
533impl ParallelSearchStage {
534    pub fn new(search_engine: Arc<SearchEngine>, lsp_manager: Arc<crate::lsp::LspState>) -> Self {
535        Self {
536            search_engine,
537            lsp_manager,
538        }
539    }
540}
541
542#[async_trait::async_trait]
543impl PipelineStageProcessor for ParallelSearchStage {
544    async fn process(&self, context: &PipelineContext, mut data: PipelineData) -> Result<PipelineData, PipelineError> {
545        // Execute parallel search across text search and LSP
546        let (search_results, _metrics) = self.search_engine.search(&context.query, context.max_results)
547            .await
548            .map_err(|e| PipelineError::SearchError { message: e.to_string() })?;
549        
550        data.metadata.search_results_count += search_results.len();
551        
552        debug!("Parallel search completed: {} results", search_results.len());
553        
554        Ok(data)
555    }
556    
557    fn stage_id(&self) -> PipelineStage {
558        PipelineStage::ParallelSearch
559    }
560    
561    fn supports_fusion(&self) -> bool {
562        false // Search operations are complex and shouldn't be fused
563    }
564    
565    fn estimate_processing_time(&self, data_size: usize) -> Duration {
566        // Estimate based on data size and index complexity
567        Duration::from_millis(50 + (data_size / 1000) as u64)
568    }
569}
570
571/// Result fusion stage processor  
572pub struct ResultFusionStage;
573
574impl ResultFusionStage {
575    pub fn new() -> Self {
576        Self
577    }
578}
579
580#[async_trait::async_trait]
581impl PipelineStageProcessor for ResultFusionStage {
582    async fn process(&self, _context: &PipelineContext, mut data: PipelineData) -> Result<PipelineData, PipelineError> {
583        // Fuse results from different sources (LSP + text search)
584        // This is where zero-copy operations really shine
585        
586        debug!("Result fusion completed");
587        
588        Ok(data)
589    }
590    
591    fn stage_id(&self) -> PipelineStage {
592        PipelineStage::ResultFusion
593    }
594    
595    fn supports_fusion(&self) -> bool {
596        true
597    }
598}
599
600/// Post-processing stage processor
601pub struct PostProcessStage;
602
603impl PostProcessStage {
604    pub fn new() -> Self {
605        Self
606    }
607}
608
609#[async_trait::async_trait]
610impl PipelineStageProcessor for PostProcessStage {
611    async fn process(&self, _context: &PipelineContext, mut data: PipelineData) -> Result<PipelineData, PipelineError> {
612        // Final post-processing: deduplication, ranking, formatting
613        
614        debug!("Post-processing completed");
615        
616        Ok(data)
617    }
618    
619    fn stage_id(&self) -> PipelineStage {
620        PipelineStage::PostProcess
621    }
622    
623    fn supports_fusion(&self) -> bool {
624        true
625    }
626}
627
628#[cfg(test)]
629mod tests {
630    use super::*;
631    use crate::search::SearchEngine;
632    use crate::config::LensConfig;
633    use tempfile::TempDir;
634
635    async fn create_mock_search_engine() -> Arc<SearchEngine> {
636        let temp_dir = TempDir::new().unwrap();
637        Arc::new(SearchEngine::new(temp_dir.path()).await.unwrap())
638    }
639
640    #[test]
641    fn test_executor_metrics_default() {
642        let metrics = ExecutorMetrics::default();
643        assert_eq!(metrics.total_executions, 0);
644        assert_eq!(metrics.successful_executions, 0);
645        assert_eq!(metrics.failed_executions, 0);
646        assert_eq!(metrics.avg_execution_time_ms, 0.0);
647        assert_eq!(metrics.stage_fusion_count, 0);
648        assert_eq!(metrics.early_stopping_count, 0);
649    }
650
651    #[test]
652    fn test_stopping_predictor_default() {
653        let predictor = StoppingPredictor::default();
654        assert_eq!(predictor.confidence_threshold, 0.9);
655        assert_eq!(predictor.quality_threshold, 0.8);
656        assert_eq!(predictor.historical_accuracies.len(), 0);
657        assert_eq!(predictor.max_history, 100);
658    }
659
660    #[test]
661    fn test_executor_metrics_update() {
662        let mut metrics = ExecutorMetrics::default();
663        metrics.total_executions = 10;
664        metrics.successful_executions = 8;
665        metrics.failed_executions = 2;
666        metrics.avg_execution_time_ms = 150.0;
667
668        assert_eq!(metrics.total_executions, 10);
669        assert_eq!(metrics.successful_executions, 8);
670        assert_eq!(metrics.failed_executions, 2);
671        assert_eq!(metrics.avg_execution_time_ms, 150.0);
672    }
673
674    #[tokio::test]
675    async fn test_pipeline_executor_creation() {
676        let config = super::super::PipelineConfig::default();
677
678        let executor = PipelineExecutor::new(config.clone()).await;
679
680        assert!(executor.is_ok());
681        let executor = executor.unwrap();
682        assert_eq!(executor.config.max_latency_ms, 150);
683        assert_eq!(executor.config.max_concurrent, 50);
684        assert!(executor.config.fusion_enabled);
685        assert!(executor.config.early_stopping_enabled);
686    }
687
688    #[test]
689    fn test_stopping_predictor_should_stop() {
690        let predictor = StoppingPredictor::default();
691        
692        // Should not stop early with low time budget
693        assert!(!predictor.should_stop_early(0.9, 0.95, 0.2));
694        
695        // Should stop with high confidence and quality
696        assert!(predictor.should_stop_early(0.85, 0.95, 0.5));
697        
698        // Should stop when running out of time with reasonable quality
699        assert!(predictor.should_stop_early(0.6, 0.7, 0.9));
700        
701        // Should not stop with low quality
702        assert!(!predictor.should_stop_early(0.4, 0.5, 0.5));
703    }
704
705    #[test]
706    fn test_stopping_predictor_update() {
707        let mut predictor = StoppingPredictor::default();
708        let initial_quality_threshold = predictor.quality_threshold;
709        let initial_confidence_threshold = predictor.confidence_threshold;
710        
711        // Update with good prediction
712        predictor.update(true, 0.85);
713        assert_eq!(predictor.historical_accuracies.len(), 1);
714        
715        // Add more data points to test threshold adaptation
716        for _ in 0..10 {
717            predictor.update(true, 0.9);
718        }
719        
720        // Should have collected historical data
721        assert_eq!(predictor.historical_accuracies.len(), 11);
722        assert!(predictor.historical_accuracies.len() <= predictor.max_history);
723    }
724
725    #[tokio::test]
726    async fn test_query_analysis_stage() {
727        let stage = QueryAnalysisStage::new();
728        assert_eq!(stage.stage_id(), super::super::PipelineStage::QueryAnalysis);
729        assert!(stage.supports_fusion());
730    }
731
732    #[tokio::test]
733    async fn test_post_process_stage() {
734        let stage = PostProcessStage::new();
735        assert_eq!(stage.stage_id(), super::super::PipelineStage::PostProcess);
736        assert!(stage.supports_fusion());
737    }
738
739    #[tokio::test]
740    async fn test_result_fusion_stage() {
741        let stage = ResultFusionStage::new();
742        assert_eq!(stage.stage_id(), super::super::PipelineStage::ResultFusion);
743        assert!(stage.supports_fusion()); // Fusion stage supports fusion
744    }
745}