lens_core/pipeline/
mod.rs

1//! Zero-Copy Fused Pipeline Architecture
2//!
3//! Implements the fused Rust pipeline with:
4//! - Zero-copy segment views for ≤150ms p95 latency
5//! - Async overlap processing 
6//! - Cross-shard TA/NRA stopping
7//! - Learning-to-stop for WAND/HNSW
8//! - Prefetch/visited-set reuse
9//! - SLA-bounded execution with timeouts
10
11pub mod executor;
12pub mod fusion;
13pub mod learning;
14pub mod memory;
15pub mod scheduler;
16pub mod stages;
17
18use anyhow::Result;
19use bytes::Bytes;
20use serde::{Deserialize, Serialize};
21use std::sync::Arc;
22use std::time::{Duration, Instant};
23use tokio::sync::{mpsc, RwLock};
24use tracing::{debug, info, warn};
25
26pub use executor::PipelineExecutor;
27pub use fusion::ResultFusion;
28pub use memory::{ZeroCopyBuffer, SegmentView};
29pub use scheduler::PipelineScheduler;
30pub use stages::{QueryPreprocessingStage, LspSearchStage, TextSearchStage};
31
32/// Pipeline execution context with zero-copy semantics
33#[derive(Debug, Clone)]
34pub struct PipelineContext {
35    pub request_id: String,
36    pub query: String,
37    pub file_path: Option<String>,
38    pub max_results: usize,
39    pub timeout: Duration,
40    pub started_at: Instant,
41    pub sla_deadline: Instant,
42}
43
44impl PipelineContext {
45    pub fn new(request_id: String, query: String, timeout_ms: u64) -> Self {
46        let started_at = Instant::now();
47        let timeout = Duration::from_millis(timeout_ms);
48        let sla_deadline = started_at + timeout;
49        
50        Self {
51            request_id,
52            query,
53            file_path: None,
54            max_results: 50,
55            timeout,
56            started_at,
57            sla_deadline,
58        }
59    }
60
61    pub fn with_file_path(mut self, file_path: String) -> Self {
62        self.file_path = Some(file_path);
63        self
64    }
65
66    pub fn with_max_results(mut self, max_results: usize) -> Self {
67        self.max_results = max_results;
68        self
69    }
70
71    pub fn elapsed(&self) -> Duration {
72        self.started_at.elapsed()
73    }
74
75    pub fn remaining_time(&self) -> Duration {
76        self.sla_deadline.saturating_duration_since(Instant::now())
77    }
78
79    pub fn is_deadline_exceeded(&self) -> bool {
80        Instant::now() >= self.sla_deadline
81    }
82
83    pub fn time_budget_percent(&self) -> f64 {
84        let total_time = self.timeout.as_millis() as f64;
85        let elapsed_time = self.elapsed().as_millis() as f64;
86        
87        if total_time > 0.0 {
88            (elapsed_time / total_time).min(1.0)
89        } else {
90            1.0
91        }
92    }
93}
94
95/// Zero-copy pipeline data flowing between stages
96#[derive(Debug, Clone)]
97pub struct PipelineData {
98    /// Core data buffer with zero-copy views
99    pub buffer: Arc<ZeroCopyBuffer>,
100    
101    /// Metadata about the data
102    pub metadata: PipelineMetadata,
103    
104    /// Current processing stage
105    pub stage: PipelineStage,
106    
107    /// Segment views for different data types
108    pub segments: Vec<SegmentView>,
109}
110
111impl PipelineData {
112    pub fn new(capacity: usize) -> Self {
113        Self {
114            buffer: Arc::new(ZeroCopyBuffer::new(capacity)),
115            metadata: PipelineMetadata::default(),
116            stage: PipelineStage::Input,
117            segments: Vec::new(),
118        }
119    }
120
121    /// Add a segment view without copying data
122    pub fn add_segment(&mut self, segment: SegmentView) {
123        self.segments.push(segment);
124    }
125
126    /// Create a new view of existing data
127    pub fn create_view(&self, offset: usize, length: usize) -> Result<SegmentView> {
128        self.buffer.create_view(offset, length)
129    }
130
131    /// Get total data size across all segments
132    pub fn total_size(&self) -> usize {
133        self.segments.iter().map(|s| s.len()).sum()
134    }
135
136    /// Advance to next pipeline stage
137    pub fn advance_stage(&mut self, stage: PipelineStage) {
138        self.stage = stage;
139        self.metadata.stage_transitions += 1;
140    }
141}
142
143/// Metadata tracked throughout pipeline execution
144#[derive(Debug, Default, Clone)]
145pub struct PipelineMetadata {
146    pub stage_transitions: usize,
147    pub bytes_processed: usize,
148    pub cache_hits: usize,
149    pub lsp_queries: usize,
150    pub search_results_count: usize,
151    pub performance_metrics: PerformanceMetrics,
152}
153
154/// Performance metrics for pipeline execution
155#[derive(Debug, Default, Clone)]
156pub struct PerformanceMetrics {
157    pub total_time_ms: u64,
158    pub lsp_time_ms: u64,
159    pub search_time_ms: u64,
160    pub fusion_time_ms: u64,
161    pub memory_allocations: usize,
162    pub zero_copy_operations: usize,
163}
164
165/// Pipeline execution stages
166#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
167pub enum PipelineStage {
168    Input,
169    QueryAnalysis,
170    LspRouting,
171    ParallelSearch,
172    ResultFusion,
173    PostProcess,
174    Output,
175}
176
177impl PipelineStage {
178    pub fn as_str(&self) -> &'static str {
179        match self {
180            PipelineStage::Input => "input",
181            PipelineStage::QueryAnalysis => "query_analysis",
182            PipelineStage::LspRouting => "lsp_routing",
183            PipelineStage::ParallelSearch => "parallel_search",
184            PipelineStage::ResultFusion => "result_fusion",
185            PipelineStage::PostProcess => "post_process",
186            PipelineStage::Output => "output",
187        }
188    }
189
190    pub fn next(&self) -> Option<PipelineStage> {
191        match self {
192            PipelineStage::Input => Some(PipelineStage::QueryAnalysis),
193            PipelineStage::QueryAnalysis => Some(PipelineStage::LspRouting),
194            PipelineStage::LspRouting => Some(PipelineStage::ParallelSearch),
195            PipelineStage::ParallelSearch => Some(PipelineStage::ResultFusion),
196            PipelineStage::ResultFusion => Some(PipelineStage::PostProcess),
197            PipelineStage::PostProcess => Some(PipelineStage::Output),
198            PipelineStage::Output => None,
199        }
200    }
201}
202
203/// Pipeline execution result
204#[derive(Debug, Clone)]
205pub struct PipelineResult {
206    pub request_id: String,
207    pub data: PipelineData,
208    pub success: bool,
209    pub error_message: Option<String>,
210    pub metrics: PipelineMetrics,
211}
212
213/// Overall pipeline metrics
214#[derive(Debug, Default, Clone, Serialize, Deserialize)]
215pub struct PipelineMetrics {
216    pub total_requests: u64,
217    pub successful_requests: u64,
218    pub failed_requests: u64,
219    pub avg_latency_ms: f64,
220    pub p95_latency_ms: u64,
221    pub p99_latency_ms: u64,
222    pub zero_copy_ratio: f64,
223    pub fusion_effectiveness: f64,
224    pub sla_compliance_rate: f64,
225    pub stage_breakdown: StageBreakdown,
226}
227
228/// Performance breakdown by pipeline stage
229#[derive(Debug, Default, Clone, Serialize, Deserialize)]
230pub struct StageBreakdown {
231    pub query_analysis_ms: f64,
232    pub lsp_routing_ms: f64,
233    pub parallel_search_ms: f64,
234    pub result_fusion_ms: f64,
235    pub post_process_ms: f64,
236}
237
238/// Configuration for the fused pipeline
239#[derive(Debug, Clone)]
240pub struct PipelineConfig {
241    /// Maximum latency SLA in milliseconds (150ms per TODO.md)
242    pub max_latency_ms: u64,
243    
244    /// Buffer pool size for zero-copy operations
245    pub buffer_pool_size: usize,
246    
247    /// Maximum concurrent pipeline executions
248    pub max_concurrent: usize,
249    
250    /// Stage timeout allocations (percentages of total)
251    pub stage_timeouts: StageTimeouts,
252    
253    /// Fusion and optimization settings
254    pub fusion_enabled: bool,
255    pub prefetch_enabled: bool,
256    pub visited_set_reuse: bool,
257    
258    /// Learning-to-stop thresholds
259    pub learning_to_stop_threshold: f64,
260    pub early_stopping_enabled: bool,
261}
262
263/// Timeout allocation per stage
264#[derive(Debug, Clone)]
265pub struct StageTimeouts {
266    pub query_analysis_percent: f64,
267    pub lsp_routing_percent: f64,
268    pub parallel_search_percent: f64,
269    pub result_fusion_percent: f64,
270    pub post_process_percent: f64,
271}
272
273impl Default for PipelineConfig {
274    fn default() -> Self {
275        Self {
276            max_latency_ms: 150, // ≤150ms p95 per TODO.md
277            buffer_pool_size: 100,
278            max_concurrent: 50,
279            stage_timeouts: StageTimeouts {
280                query_analysis_percent: 0.05, // 5%
281                lsp_routing_percent: 0.10,     // 10%
282                parallel_search_percent: 0.65, // 65%
283                result_fusion_percent: 0.15,   // 15%
284                post_process_percent: 0.05,    // 5%
285            },
286            fusion_enabled: true,
287            prefetch_enabled: true,
288            visited_set_reuse: true,
289            learning_to_stop_threshold: 0.8,
290            early_stopping_enabled: true,
291        }
292    }
293}
294
295impl StageTimeouts {
296    pub fn calculate_timeout(&self, total_timeout_ms: u64, stage: PipelineStage) -> Duration {
297        let percent = match stage {
298            PipelineStage::QueryAnalysis => self.query_analysis_percent,
299            PipelineStage::LspRouting => self.lsp_routing_percent,
300            PipelineStage::ParallelSearch => self.parallel_search_percent,
301            PipelineStage::ResultFusion => self.result_fusion_percent,
302            PipelineStage::PostProcess => self.post_process_percent,
303            _ => 0.0,
304        };
305        
306        Duration::from_millis((total_timeout_ms as f64 * percent) as u64)
307    }
308}
309
310/// Pipeline execution error types
311#[derive(Debug, thiserror::Error)]
312pub enum PipelineError {
313    #[error("SLA deadline exceeded: {elapsed_ms}ms > {deadline_ms}ms")]
314    DeadlineExceeded { elapsed_ms: u64, deadline_ms: u64 },
315    
316    #[error("Stage timeout in {stage:?}: {elapsed_ms}ms")]
317    StageTimeout { stage: PipelineStage, elapsed_ms: u64 },
318    
319    #[error("Buffer allocation failed: {requested_size} bytes")]
320    BufferAllocation { requested_size: usize },
321    
322    #[error("Zero-copy operation failed: {reason}")]
323    ZeroCopyFailed { reason: String },
324    
325    #[error("Pipeline fusion error: {stage:?} - {message}")]
326    FusionError { stage: PipelineStage, message: String },
327    
328    #[error("LSP integration error: {message}")]
329    LspError { message: String },
330    
331    #[error("Search engine error: {message}")]
332    SearchError { message: String },
333}
334
335/// Pipeline stage interface
336#[async_trait::async_trait]
337pub trait PipelineStageProcessor: Send + Sync {
338    /// Process data through this stage
339    async fn process(&self, context: &PipelineContext, data: PipelineData) -> Result<PipelineData, PipelineError>;
340    
341    /// Get stage identifier
342    fn stage_id(&self) -> PipelineStage;
343    
344    /// Check if this stage can be fused with others
345    fn supports_fusion(&self) -> bool {
346        false
347    }
348    
349    /// Estimate processing time for planning
350    fn estimate_processing_time(&self, data_size: usize) -> Duration {
351        Duration::from_millis(10) // Default estimate
352    }
353}
354
355/// Main fused pipeline implementation
356pub struct FusedPipeline {
357    config: PipelineConfig,
358    executor: Arc<PipelineExecutor>,
359    scheduler: Arc<PipelineScheduler>,
360    metrics: Arc<RwLock<PipelineMetrics>>,
361}
362
363impl FusedPipeline {
364    pub async fn new(config: PipelineConfig) -> Result<Self> {
365        let executor = Arc::new(PipelineExecutor::new(config.clone()).await?);
366        let scheduler = Arc::new(PipelineScheduler::new(config.max_concurrent));
367        let metrics = Arc::new(RwLock::new(PipelineMetrics::default()));
368        
369        info!("Initialized fused pipeline with ≤{}ms SLA", config.max_latency_ms);
370        
371        Ok(Self {
372            config,
373            executor,
374            scheduler,
375            metrics,
376        })
377    }
378
379    /// Execute a search query through the fused pipeline
380    pub async fn search(&self, context: PipelineContext) -> Result<PipelineResult, PipelineError> {
381        let start_time = Instant::now();
382        
383        // Check SLA before starting
384        if context.is_deadline_exceeded() {
385            return Err(PipelineError::DeadlineExceeded {
386                elapsed_ms: context.elapsed().as_millis() as u64,
387                deadline_ms: self.config.max_latency_ms,
388            });
389        }
390
391        // Acquire scheduler slot
392        let _permit = self.scheduler.acquire().await;
393        
394        // Execute through fused stages
395        let result = self.executor.execute(context.clone()).await;
396        
397        // Update metrics
398        let latency_ms = start_time.elapsed().as_millis() as u64;
399        self.update_metrics(latency_ms, result.is_ok()).await;
400        
401        match result {
402            Ok(data) => {
403                debug!(
404                    "Pipeline execution completed: request_id={}, latency={}ms, sla_compliance={}",
405                    context.request_id,
406                    latency_ms,
407                    latency_ms <= self.config.max_latency_ms
408                );
409                
410                Ok(PipelineResult {
411                    request_id: context.request_id,
412                    data,
413                    success: true,
414                    error_message: None,
415                    metrics: self.get_current_metrics().await,
416                })
417            }
418            Err(e) => {
419                warn!(
420                    "Pipeline execution failed: request_id={}, error={:?}",
421                    context.request_id, e
422                );
423                
424                Ok(PipelineResult {
425                    request_id: context.request_id,
426                    data: PipelineData::new(0),
427                    success: false,
428                    error_message: Some(e.to_string()),
429                    metrics: self.get_current_metrics().await,
430                })
431            }
432        }
433    }
434
435    async fn update_metrics(&self, latency_ms: u64, success: bool) {
436        let mut metrics = self.metrics.write().await;
437        
438        metrics.total_requests += 1;
439        
440        if success {
441            metrics.successful_requests += 1;
442        } else {
443            metrics.failed_requests += 1;
444        }
445        
446        // Update latency statistics (simplified)
447        let total_requests = metrics.total_requests as f64;
448        metrics.avg_latency_ms = (metrics.avg_latency_ms * (total_requests - 1.0) + latency_ms as f64) / total_requests;
449        
450        // Update percentiles (simplified - would use proper quantile estimation in production)
451        if latency_ms > metrics.p95_latency_ms {
452            metrics.p95_latency_ms = latency_ms;
453        }
454        if latency_ms > metrics.p99_latency_ms {
455            metrics.p99_latency_ms = latency_ms;
456        }
457        
458        // Update SLA compliance
459        let sla_compliant = latency_ms <= self.config.max_latency_ms;
460        let compliant_requests = if sla_compliant { 1.0 } else { 0.0 };
461        metrics.sla_compliance_rate = (metrics.sla_compliance_rate * (total_requests - 1.0) + compliant_requests) / total_requests;
462    }
463
464    async fn get_current_metrics(&self) -> PipelineMetrics {
465        self.metrics.read().await.clone()
466    }
467
468    /// Get comprehensive pipeline statistics
469    pub async fn get_metrics(&self) -> PipelineMetrics {
470        self.get_current_metrics().await
471    }
472
473    /// Shutdown the pipeline gracefully
474    pub async fn shutdown(&self) -> Result<()> {
475        info!("Shutting down fused pipeline");
476        self.executor.shutdown().await?;
477        Ok(())
478    }
479}
480
481#[cfg(test)]
482mod tests {
483    use super::*;
484
485    #[test]
486    fn test_pipeline_context() {
487        let context = PipelineContext::new("test-123".to_string(), "test query".to_string(), 150);
488        
489        assert_eq!(context.request_id, "test-123");
490        assert_eq!(context.query, "test query");
491        assert_eq!(context.max_results, 50);
492        assert_eq!(context.timeout.as_millis(), 150);
493        assert!(!context.is_deadline_exceeded());
494    }
495
496    #[test]
497    fn test_pipeline_data() {
498        let mut data = PipelineData::new(1024);
499        assert_eq!(data.stage, PipelineStage::Input);
500        assert_eq!(data.segments.len(), 0);
501        assert_eq!(data.total_size(), 0);
502        
503        data.advance_stage(PipelineStage::QueryAnalysis);
504        assert_eq!(data.stage, PipelineStage::QueryAnalysis);
505        assert_eq!(data.metadata.stage_transitions, 1);
506    }
507
508    #[test]
509    fn test_stage_timeouts() {
510        let timeouts = StageTimeouts {
511            query_analysis_percent: 0.1,
512            lsp_routing_percent: 0.2,
513            parallel_search_percent: 0.5,
514            result_fusion_percent: 0.15,
515            post_process_percent: 0.05,
516        };
517        
518        let timeout = timeouts.calculate_timeout(1000, PipelineStage::ParallelSearch);
519        assert_eq!(timeout.as_millis(), 500);
520    }
521
522    #[test]
523    fn test_stage_progression() {
524        let mut stage = PipelineStage::Input;
525        
526        stage = stage.next().unwrap();
527        assert_eq!(stage, PipelineStage::QueryAnalysis);
528        
529        stage = stage.next().unwrap();
530        assert_eq!(stage, PipelineStage::LspRouting);
531        
532        // Continue through all stages
533        while let Some(next_stage) = stage.next() {
534            stage = next_stage;
535        }
536        
537        assert_eq!(stage, PipelineStage::Output);
538        assert!(stage.next().is_none());
539    }
540
541    #[tokio::test]
542    async fn test_pipeline_creation() {
543        let config = PipelineConfig::default();
544        let pipeline = FusedPipeline::new(config).await;
545        assert!(pipeline.is_ok());
546    }
547}