1pub mod executor;
12pub mod fusion;
13pub mod learning;
14pub mod memory;
15pub mod scheduler;
16pub mod stages;
17
18use anyhow::Result;
19use bytes::Bytes;
20use serde::{Deserialize, Serialize};
21use std::sync::Arc;
22use std::time::{Duration, Instant};
23use tokio::sync::{mpsc, RwLock};
24use tracing::{debug, info, warn};
25
26pub use executor::PipelineExecutor;
27pub use fusion::ResultFusion;
28pub use memory::{ZeroCopyBuffer, SegmentView};
29pub use scheduler::PipelineScheduler;
30pub use stages::{QueryPreprocessingStage, LspSearchStage, TextSearchStage};
31
32#[derive(Debug, Clone)]
34pub struct PipelineContext {
35 pub request_id: String,
36 pub query: String,
37 pub file_path: Option<String>,
38 pub max_results: usize,
39 pub timeout: Duration,
40 pub started_at: Instant,
41 pub sla_deadline: Instant,
42}
43
44impl PipelineContext {
45 pub fn new(request_id: String, query: String, timeout_ms: u64) -> Self {
46 let started_at = Instant::now();
47 let timeout = Duration::from_millis(timeout_ms);
48 let sla_deadline = started_at + timeout;
49
50 Self {
51 request_id,
52 query,
53 file_path: None,
54 max_results: 50,
55 timeout,
56 started_at,
57 sla_deadline,
58 }
59 }
60
61 pub fn with_file_path(mut self, file_path: String) -> Self {
62 self.file_path = Some(file_path);
63 self
64 }
65
66 pub fn with_max_results(mut self, max_results: usize) -> Self {
67 self.max_results = max_results;
68 self
69 }
70
71 pub fn elapsed(&self) -> Duration {
72 self.started_at.elapsed()
73 }
74
75 pub fn remaining_time(&self) -> Duration {
76 self.sla_deadline.saturating_duration_since(Instant::now())
77 }
78
79 pub fn is_deadline_exceeded(&self) -> bool {
80 Instant::now() >= self.sla_deadline
81 }
82
83 pub fn time_budget_percent(&self) -> f64 {
84 let total_time = self.timeout.as_millis() as f64;
85 let elapsed_time = self.elapsed().as_millis() as f64;
86
87 if total_time > 0.0 {
88 (elapsed_time / total_time).min(1.0)
89 } else {
90 1.0
91 }
92 }
93}
94
95#[derive(Debug, Clone)]
97pub struct PipelineData {
98 pub buffer: Arc<ZeroCopyBuffer>,
100
101 pub metadata: PipelineMetadata,
103
104 pub stage: PipelineStage,
106
107 pub segments: Vec<SegmentView>,
109}
110
111impl PipelineData {
112 pub fn new(capacity: usize) -> Self {
113 Self {
114 buffer: Arc::new(ZeroCopyBuffer::new(capacity)),
115 metadata: PipelineMetadata::default(),
116 stage: PipelineStage::Input,
117 segments: Vec::new(),
118 }
119 }
120
121 pub fn add_segment(&mut self, segment: SegmentView) {
123 self.segments.push(segment);
124 }
125
126 pub fn create_view(&self, offset: usize, length: usize) -> Result<SegmentView> {
128 self.buffer.create_view(offset, length)
129 }
130
131 pub fn total_size(&self) -> usize {
133 self.segments.iter().map(|s| s.len()).sum()
134 }
135
136 pub fn advance_stage(&mut self, stage: PipelineStage) {
138 self.stage = stage;
139 self.metadata.stage_transitions += 1;
140 }
141}
142
143#[derive(Debug, Default, Clone)]
145pub struct PipelineMetadata {
146 pub stage_transitions: usize,
147 pub bytes_processed: usize,
148 pub cache_hits: usize,
149 pub lsp_queries: usize,
150 pub search_results_count: usize,
151 pub performance_metrics: PerformanceMetrics,
152}
153
154#[derive(Debug, Default, Clone)]
156pub struct PerformanceMetrics {
157 pub total_time_ms: u64,
158 pub lsp_time_ms: u64,
159 pub search_time_ms: u64,
160 pub fusion_time_ms: u64,
161 pub memory_allocations: usize,
162 pub zero_copy_operations: usize,
163}
164
165#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
167pub enum PipelineStage {
168 Input,
169 QueryAnalysis,
170 LspRouting,
171 ParallelSearch,
172 ResultFusion,
173 PostProcess,
174 Output,
175}
176
177impl PipelineStage {
178 pub fn as_str(&self) -> &'static str {
179 match self {
180 PipelineStage::Input => "input",
181 PipelineStage::QueryAnalysis => "query_analysis",
182 PipelineStage::LspRouting => "lsp_routing",
183 PipelineStage::ParallelSearch => "parallel_search",
184 PipelineStage::ResultFusion => "result_fusion",
185 PipelineStage::PostProcess => "post_process",
186 PipelineStage::Output => "output",
187 }
188 }
189
190 pub fn next(&self) -> Option<PipelineStage> {
191 match self {
192 PipelineStage::Input => Some(PipelineStage::QueryAnalysis),
193 PipelineStage::QueryAnalysis => Some(PipelineStage::LspRouting),
194 PipelineStage::LspRouting => Some(PipelineStage::ParallelSearch),
195 PipelineStage::ParallelSearch => Some(PipelineStage::ResultFusion),
196 PipelineStage::ResultFusion => Some(PipelineStage::PostProcess),
197 PipelineStage::PostProcess => Some(PipelineStage::Output),
198 PipelineStage::Output => None,
199 }
200 }
201}
202
203#[derive(Debug, Clone)]
205pub struct PipelineResult {
206 pub request_id: String,
207 pub data: PipelineData,
208 pub success: bool,
209 pub error_message: Option<String>,
210 pub metrics: PipelineMetrics,
211}
212
213#[derive(Debug, Default, Clone, Serialize, Deserialize)]
215pub struct PipelineMetrics {
216 pub total_requests: u64,
217 pub successful_requests: u64,
218 pub failed_requests: u64,
219 pub avg_latency_ms: f64,
220 pub p95_latency_ms: u64,
221 pub p99_latency_ms: u64,
222 pub zero_copy_ratio: f64,
223 pub fusion_effectiveness: f64,
224 pub sla_compliance_rate: f64,
225 pub stage_breakdown: StageBreakdown,
226}
227
228#[derive(Debug, Default, Clone, Serialize, Deserialize)]
230pub struct StageBreakdown {
231 pub query_analysis_ms: f64,
232 pub lsp_routing_ms: f64,
233 pub parallel_search_ms: f64,
234 pub result_fusion_ms: f64,
235 pub post_process_ms: f64,
236}
237
238#[derive(Debug, Clone)]
240pub struct PipelineConfig {
241 pub max_latency_ms: u64,
243
244 pub buffer_pool_size: usize,
246
247 pub max_concurrent: usize,
249
250 pub stage_timeouts: StageTimeouts,
252
253 pub fusion_enabled: bool,
255 pub prefetch_enabled: bool,
256 pub visited_set_reuse: bool,
257
258 pub learning_to_stop_threshold: f64,
260 pub early_stopping_enabled: bool,
261}
262
263#[derive(Debug, Clone)]
265pub struct StageTimeouts {
266 pub query_analysis_percent: f64,
267 pub lsp_routing_percent: f64,
268 pub parallel_search_percent: f64,
269 pub result_fusion_percent: f64,
270 pub post_process_percent: f64,
271}
272
273impl Default for PipelineConfig {
274 fn default() -> Self {
275 Self {
276 max_latency_ms: 150, buffer_pool_size: 100,
278 max_concurrent: 50,
279 stage_timeouts: StageTimeouts {
280 query_analysis_percent: 0.05, lsp_routing_percent: 0.10, parallel_search_percent: 0.65, result_fusion_percent: 0.15, post_process_percent: 0.05, },
286 fusion_enabled: true,
287 prefetch_enabled: true,
288 visited_set_reuse: true,
289 learning_to_stop_threshold: 0.8,
290 early_stopping_enabled: true,
291 }
292 }
293}
294
295impl StageTimeouts {
296 pub fn calculate_timeout(&self, total_timeout_ms: u64, stage: PipelineStage) -> Duration {
297 let percent = match stage {
298 PipelineStage::QueryAnalysis => self.query_analysis_percent,
299 PipelineStage::LspRouting => self.lsp_routing_percent,
300 PipelineStage::ParallelSearch => self.parallel_search_percent,
301 PipelineStage::ResultFusion => self.result_fusion_percent,
302 PipelineStage::PostProcess => self.post_process_percent,
303 _ => 0.0,
304 };
305
306 Duration::from_millis((total_timeout_ms as f64 * percent) as u64)
307 }
308}
309
310#[derive(Debug, thiserror::Error)]
312pub enum PipelineError {
313 #[error("SLA deadline exceeded: {elapsed_ms}ms > {deadline_ms}ms")]
314 DeadlineExceeded { elapsed_ms: u64, deadline_ms: u64 },
315
316 #[error("Stage timeout in {stage:?}: {elapsed_ms}ms")]
317 StageTimeout { stage: PipelineStage, elapsed_ms: u64 },
318
319 #[error("Buffer allocation failed: {requested_size} bytes")]
320 BufferAllocation { requested_size: usize },
321
322 #[error("Zero-copy operation failed: {reason}")]
323 ZeroCopyFailed { reason: String },
324
325 #[error("Pipeline fusion error: {stage:?} - {message}")]
326 FusionError { stage: PipelineStage, message: String },
327
328 #[error("LSP integration error: {message}")]
329 LspError { message: String },
330
331 #[error("Search engine error: {message}")]
332 SearchError { message: String },
333}
334
335#[async_trait::async_trait]
337pub trait PipelineStageProcessor: Send + Sync {
338 async fn process(&self, context: &PipelineContext, data: PipelineData) -> Result<PipelineData, PipelineError>;
340
341 fn stage_id(&self) -> PipelineStage;
343
344 fn supports_fusion(&self) -> bool {
346 false
347 }
348
349 fn estimate_processing_time(&self, data_size: usize) -> Duration {
351 Duration::from_millis(10) }
353}
354
355pub struct FusedPipeline {
357 config: PipelineConfig,
358 executor: Arc<PipelineExecutor>,
359 scheduler: Arc<PipelineScheduler>,
360 metrics: Arc<RwLock<PipelineMetrics>>,
361}
362
363impl FusedPipeline {
364 pub async fn new(config: PipelineConfig) -> Result<Self> {
365 let executor = Arc::new(PipelineExecutor::new(config.clone()).await?);
366 let scheduler = Arc::new(PipelineScheduler::new(config.max_concurrent));
367 let metrics = Arc::new(RwLock::new(PipelineMetrics::default()));
368
369 info!("Initialized fused pipeline with ≤{}ms SLA", config.max_latency_ms);
370
371 Ok(Self {
372 config,
373 executor,
374 scheduler,
375 metrics,
376 })
377 }
378
379 pub async fn search(&self, context: PipelineContext) -> Result<PipelineResult, PipelineError> {
381 let start_time = Instant::now();
382
383 if context.is_deadline_exceeded() {
385 return Err(PipelineError::DeadlineExceeded {
386 elapsed_ms: context.elapsed().as_millis() as u64,
387 deadline_ms: self.config.max_latency_ms,
388 });
389 }
390
391 let _permit = self.scheduler.acquire().await;
393
394 let result = self.executor.execute(context.clone()).await;
396
397 let latency_ms = start_time.elapsed().as_millis() as u64;
399 self.update_metrics(latency_ms, result.is_ok()).await;
400
401 match result {
402 Ok(data) => {
403 debug!(
404 "Pipeline execution completed: request_id={}, latency={}ms, sla_compliance={}",
405 context.request_id,
406 latency_ms,
407 latency_ms <= self.config.max_latency_ms
408 );
409
410 Ok(PipelineResult {
411 request_id: context.request_id,
412 data,
413 success: true,
414 error_message: None,
415 metrics: self.get_current_metrics().await,
416 })
417 }
418 Err(e) => {
419 warn!(
420 "Pipeline execution failed: request_id={}, error={:?}",
421 context.request_id, e
422 );
423
424 Ok(PipelineResult {
425 request_id: context.request_id,
426 data: PipelineData::new(0),
427 success: false,
428 error_message: Some(e.to_string()),
429 metrics: self.get_current_metrics().await,
430 })
431 }
432 }
433 }
434
435 async fn update_metrics(&self, latency_ms: u64, success: bool) {
436 let mut metrics = self.metrics.write().await;
437
438 metrics.total_requests += 1;
439
440 if success {
441 metrics.successful_requests += 1;
442 } else {
443 metrics.failed_requests += 1;
444 }
445
446 let total_requests = metrics.total_requests as f64;
448 metrics.avg_latency_ms = (metrics.avg_latency_ms * (total_requests - 1.0) + latency_ms as f64) / total_requests;
449
450 if latency_ms > metrics.p95_latency_ms {
452 metrics.p95_latency_ms = latency_ms;
453 }
454 if latency_ms > metrics.p99_latency_ms {
455 metrics.p99_latency_ms = latency_ms;
456 }
457
458 let sla_compliant = latency_ms <= self.config.max_latency_ms;
460 let compliant_requests = if sla_compliant { 1.0 } else { 0.0 };
461 metrics.sla_compliance_rate = (metrics.sla_compliance_rate * (total_requests - 1.0) + compliant_requests) / total_requests;
462 }
463
464 async fn get_current_metrics(&self) -> PipelineMetrics {
465 self.metrics.read().await.clone()
466 }
467
468 pub async fn get_metrics(&self) -> PipelineMetrics {
470 self.get_current_metrics().await
471 }
472
473 pub async fn shutdown(&self) -> Result<()> {
475 info!("Shutting down fused pipeline");
476 self.executor.shutdown().await?;
477 Ok(())
478 }
479}
480
481#[cfg(test)]
482mod tests {
483 use super::*;
484
485 #[test]
486 fn test_pipeline_context() {
487 let context = PipelineContext::new("test-123".to_string(), "test query".to_string(), 150);
488
489 assert_eq!(context.request_id, "test-123");
490 assert_eq!(context.query, "test query");
491 assert_eq!(context.max_results, 50);
492 assert_eq!(context.timeout.as_millis(), 150);
493 assert!(!context.is_deadline_exceeded());
494 }
495
496 #[test]
497 fn test_pipeline_data() {
498 let mut data = PipelineData::new(1024);
499 assert_eq!(data.stage, PipelineStage::Input);
500 assert_eq!(data.segments.len(), 0);
501 assert_eq!(data.total_size(), 0);
502
503 data.advance_stage(PipelineStage::QueryAnalysis);
504 assert_eq!(data.stage, PipelineStage::QueryAnalysis);
505 assert_eq!(data.metadata.stage_transitions, 1);
506 }
507
508 #[test]
509 fn test_stage_timeouts() {
510 let timeouts = StageTimeouts {
511 query_analysis_percent: 0.1,
512 lsp_routing_percent: 0.2,
513 parallel_search_percent: 0.5,
514 result_fusion_percent: 0.15,
515 post_process_percent: 0.05,
516 };
517
518 let timeout = timeouts.calculate_timeout(1000, PipelineStage::ParallelSearch);
519 assert_eq!(timeout.as_millis(), 500);
520 }
521
522 #[test]
523 fn test_stage_progression() {
524 let mut stage = PipelineStage::Input;
525
526 stage = stage.next().unwrap();
527 assert_eq!(stage, PipelineStage::QueryAnalysis);
528
529 stage = stage.next().unwrap();
530 assert_eq!(stage, PipelineStage::LspRouting);
531
532 while let Some(next_stage) = stage.next() {
534 stage = next_stage;
535 }
536
537 assert_eq!(stage, PipelineStage::Output);
538 assert!(stage.next().is_none());
539 }
540
541 #[tokio::test]
542 async fn test_pipeline_creation() {
543 let config = PipelineConfig::default();
544 let pipeline = FusedPipeline::new(config).await;
545 assert!(pipeline.is_ok());
546 }
547}