Skip to main content

lean_ctx/core/
pipeline.rs

1//! # Context Pipeline
2//!
3//! The pipeline defines the processing stages that content flows through
4//! between raw input and the compressed output delivered to the LLM.
5//!
6//! ## Pipeline Flow
7//!
8//! ```text
9//! Input → Intent → Relevance → Compression → Translation → Delivery
10//! ```
11//!
12//! - **Input**: Raw file content / shell output enters the pipeline
13//! - **Intent**: Task-conditioned filtering — what is relevant to the current goal?
14//! - **Relevance**: Graph/heatmap-based prioritization of content sections
15//! - **Compression**: AST signatures, entropy filtering, delta encoding
16//! - **Translation**: Token shorthand (TDD), symbol replacement
17//! - **Delivery**: LITM positioning, CRP formatting, final output assembly
18//!
19//! Each layer can be enabled/disabled per profile (see `core::profiles`).
20//! `PipelineStats` aggregates per-layer metrics across all runs for observability.
21
22use std::collections::HashMap;
23
24/// Identifies a stage in the compression pipeline.
25///
26/// Layers execute in the order defined by [`LayerKind::all`]:
27/// Input → Intent → Relevance → Compression → Translation → Delivery.
28#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
29pub enum LayerKind {
30    Input,
31    Intent,
32    Relevance,
33    Compression,
34    Translation,
35    Delivery,
36}
37
38impl LayerKind {
39    /// Returns the canonical string label for this layer.
40    pub fn as_str(&self) -> &'static str {
41        match self {
42            Self::Input => "input",
43            Self::Intent => "intent",
44            Self::Relevance => "relevance",
45            Self::Compression => "compression",
46            Self::Translation => "translation",
47            Self::Delivery => "delivery",
48        }
49    }
50
51    /// Returns all layer kinds in pipeline execution order.
52    pub fn all() -> &'static [LayerKind] {
53        &[
54            Self::Input,
55            Self::Intent,
56            Self::Relevance,
57            Self::Compression,
58            Self::Translation,
59            Self::Delivery,
60        ]
61    }
62}
63
64impl std::fmt::Display for LayerKind {
65    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
66        write!(f, "{}", self.as_str())
67    }
68}
69
70impl std::str::FromStr for LayerKind {
71    type Err = String;
72
73    fn from_str(s: &str) -> Result<Self, Self::Err> {
74        match s.to_ascii_lowercase().as_str() {
75            "input" => Ok(Self::Input),
76            "intent" => Ok(Self::Intent),
77            "relevance" => Ok(Self::Relevance),
78            "compression" => Ok(Self::Compression),
79            "translation" => Ok(Self::Translation),
80            "delivery" => Ok(Self::Delivery),
81            _ => Err(format!(
82                "unknown pipeline layer '{s}'; expected one of: input, intent, relevance, compression, translation, delivery"
83            )),
84        }
85    }
86}
87
88/// Content and metadata passed into a pipeline layer for processing.
89#[derive(Debug, Clone)]
90pub struct LayerInput {
91    pub content: String,
92    pub tokens: usize,
93    pub metadata: HashMap<String, String>,
94}
95
96/// Result produced by a pipeline layer after processing.
97#[derive(Debug, Clone)]
98pub struct LayerOutput {
99    pub content: String,
100    pub tokens: usize,
101    pub metadata: HashMap<String, String>,
102}
103
104/// Performance metrics for a single layer execution: tokens in/out, timing, ratio.
105#[derive(Debug, Clone)]
106pub struct LayerMetrics {
107    pub layer: LayerKind,
108    pub input_tokens: usize,
109    pub output_tokens: usize,
110    pub duration_us: u64,
111    pub compression_ratio: f64,
112}
113
114impl LayerMetrics {
115    pub fn new(
116        layer: LayerKind,
117        input_tokens: usize,
118        output_tokens: usize,
119        duration_us: u64,
120    ) -> Self {
121        let ratio = if input_tokens == 0 {
122            1.0
123        } else {
124            output_tokens as f64 / input_tokens as f64
125        };
126        Self {
127            layer,
128            input_tokens,
129            output_tokens,
130            duration_us,
131            compression_ratio: ratio,
132        }
133    }
134}
135
136/// A single processing stage in the compression pipeline.
137pub trait Layer {
138    fn kind(&self) -> LayerKind;
139    fn process(&self, input: LayerInput) -> LayerOutput;
140}
141
142/// Returns whether a given layer is enabled according to a profile's pipeline config.
143pub fn is_layer_enabled(kind: LayerKind, cfg: &crate::core::profiles::PipelineConfig) -> bool {
144    match kind {
145        LayerKind::Input | LayerKind::Delivery => true,
146        LayerKind::Intent => cfg.intent,
147        LayerKind::Relevance => cfg.relevance,
148        LayerKind::Compression => cfg.compression,
149        LayerKind::Translation => cfg.translation,
150    }
151}
152
153/// A chain of processing layers that content flows through sequentially.
154pub struct Pipeline {
155    layers: Vec<Box<dyn Layer>>,
156}
157
158impl Pipeline {
159    /// Creates an empty pipeline with no layers.
160    pub fn new() -> Self {
161        Self { layers: Vec::new() }
162    }
163
164    /// Appends a processing layer to the pipeline (builder pattern).
165    pub fn add_layer(mut self, layer: Box<dyn Layer>) -> Self {
166        self.layers.push(layer);
167        self
168    }
169
170    /// Appends a layer only if the profile's pipeline config allows it.
171    pub fn add_layer_if_enabled(
172        self,
173        layer: Box<dyn Layer>,
174        cfg: &crate::core::profiles::PipelineConfig,
175    ) -> Self {
176        if is_layer_enabled(layer.kind(), cfg) {
177            self.add_layer(layer)
178        } else {
179            self
180        }
181    }
182
183    /// Runs all layers in sequence, collecting per-layer metrics.
184    pub fn execute(&self, input: LayerInput) -> (LayerOutput, Vec<LayerMetrics>) {
185        let mut current = input;
186        let mut metrics = Vec::new();
187
188        for layer in &self.layers {
189            let start = std::time::Instant::now();
190            let input_tokens = current.tokens;
191            let output = layer.process(current);
192            let duration = start.elapsed().as_micros() as u64;
193
194            metrics.push(LayerMetrics::new(
195                layer.kind(),
196                input_tokens,
197                output.tokens,
198                duration,
199            ));
200
201            current = LayerInput {
202                content: output.content,
203                tokens: output.tokens,
204                metadata: output.metadata,
205            };
206        }
207
208        let final_output = LayerOutput {
209            content: current.content,
210            tokens: current.tokens,
211            metadata: current.metadata,
212        };
213
214        (final_output, metrics)
215    }
216
217    /// Formats pipeline metrics as a human-readable summary with per-layer and total stats.
218    pub fn format_metrics(metrics: &[LayerMetrics]) -> String {
219        let mut out = String::from("Pipeline Metrics:\n");
220        let mut total_saved = 0usize;
221        for m in metrics {
222            let saved = m.input_tokens.saturating_sub(m.output_tokens);
223            total_saved += saved;
224            out.push_str(&format!(
225                "  {} : {} -> {} tok ({:.0}%, {:.1}ms)\n",
226                m.layer,
227                m.input_tokens,
228                m.output_tokens,
229                m.compression_ratio * 100.0,
230                m.duration_us as f64 / 1000.0,
231            ));
232        }
233        if let (Some(first), Some(last)) = (metrics.first(), metrics.last()) {
234            let total_ratio = if first.input_tokens == 0 {
235                1.0
236            } else {
237                last.output_tokens as f64 / first.input_tokens as f64
238            };
239            out.push_str(&format!(
240                "  TOTAL: {} -> {} tok ({:.0}%, saved {})\n",
241                first.input_tokens,
242                last.output_tokens,
243                total_ratio * 100.0,
244                total_saved,
245            ));
246        }
247        out
248    }
249}
250
251/// Persistent aggregated statistics across all pipeline runs.
252#[derive(Debug, Clone, Default, serde::Serialize, serde::Deserialize)]
253pub struct PipelineStats {
254    pub runs: usize,
255    pub per_layer: HashMap<LayerKind, AggregatedMetrics>,
256}
257
258/// Cumulative token counts and timing for a single pipeline layer across all runs.
259#[derive(Debug, Clone, Default, serde::Serialize, serde::Deserialize)]
260pub struct AggregatedMetrics {
261    pub total_input_tokens: usize,
262    pub total_output_tokens: usize,
263    pub total_duration_us: u64,
264    pub count: usize,
265}
266
267impl AggregatedMetrics {
268    /// Returns the average compression ratio (output/input) across all runs.
269    pub fn avg_ratio(&self) -> f64 {
270        if self.total_input_tokens == 0 {
271            return 1.0;
272        }
273        self.total_output_tokens as f64 / self.total_input_tokens as f64
274    }
275
276    /// Returns the average duration per invocation in milliseconds.
277    pub fn avg_duration_ms(&self) -> f64 {
278        if self.count == 0 {
279            return 0.0;
280        }
281        self.total_duration_us as f64 / self.count as f64 / 1000.0
282    }
283}
284
285impl PipelineStats {
286    /// Creates empty pipeline stats with zero runs.
287    pub fn new() -> Self {
288        Self {
289            runs: 0,
290            per_layer: HashMap::new(),
291        }
292    }
293
294    /// Records a batch of layer metrics from a single pipeline execution.
295    pub fn record(&mut self, metrics: &[LayerMetrics]) {
296        self.runs += 1;
297        for m in metrics {
298            let agg = self.per_layer.entry(m.layer).or_default();
299            agg.total_input_tokens += m.input_tokens;
300            agg.total_output_tokens += m.output_tokens;
301            agg.total_duration_us += m.duration_us;
302            agg.count += 1;
303        }
304    }
305
306    /// Records metrics for a single layer execution.
307    pub fn record_single(
308        &mut self,
309        layer: LayerKind,
310        input_tokens: usize,
311        output_tokens: usize,
312        duration: std::time::Duration,
313    ) {
314        self.runs += 1;
315        let agg = self.per_layer.entry(layer).or_default();
316        agg.total_input_tokens += input_tokens;
317        agg.total_output_tokens += output_tokens;
318        agg.total_duration_us += duration.as_micros() as u64;
319        agg.count += 1;
320    }
321
322    /// Returns the total tokens saved across all pipeline layers.
323    pub fn total_tokens_saved(&self) -> usize {
324        self.per_layer
325            .values()
326            .map(|a| a.total_input_tokens.saturating_sub(a.total_output_tokens))
327            .sum()
328    }
329
330    /// Persists pipeline stats to `~/.lean-ctx/pipeline_stats.json`.
331    pub fn save(&self) {
332        if let Ok(dir) = crate::core::data_dir::lean_ctx_data_dir() {
333            let path = dir.join("pipeline_stats.json");
334            if let Ok(json) = serde_json::to_string(self) {
335                let _ = std::fs::write(path, json);
336            }
337        }
338    }
339
340    /// Loads pipeline stats from disk, returning defaults if absent.
341    pub fn load() -> Self {
342        crate::core::data_dir::lean_ctx_data_dir()
343            .ok()
344            .map(|d| d.join("pipeline_stats.json"))
345            .and_then(|p| std::fs::read_to_string(p).ok())
346            .and_then(|s| serde_json::from_str(&s).ok())
347            .unwrap_or_default()
348    }
349
350    /// Formats a human-readable summary of per-layer stats and total savings.
351    pub fn format_summary(&self) -> String {
352        let mut out = format!("Pipeline Stats ({} runs):\n", self.runs);
353        for kind in LayerKind::all() {
354            if let Some(agg) = self.per_layer.get(kind) {
355                out.push_str(&format!(
356                    "  {}: avg {:.0}% ratio, {:.1}ms, {} invocations\n",
357                    kind,
358                    agg.avg_ratio() * 100.0,
359                    agg.avg_duration_ms(),
360                    agg.count,
361                ));
362            }
363        }
364        out.push_str(&format!("  SAVED: {} tokens\n", self.total_tokens_saved()));
365        out
366    }
367}
368
369impl Default for Pipeline {
370    fn default() -> Self {
371        Self::new()
372    }
373}
374
375#[cfg(test)]
376mod tests {
377    use super::*;
378
379    struct PassthroughLayer {
380        kind: LayerKind,
381    }
382
383    impl Layer for PassthroughLayer {
384        fn kind(&self) -> LayerKind {
385            self.kind
386        }
387
388        fn process(&self, input: LayerInput) -> LayerOutput {
389            LayerOutput {
390                content: input.content,
391                tokens: input.tokens,
392                metadata: input.metadata,
393            }
394        }
395    }
396
397    struct CompressionLayer {
398        ratio: f64,
399    }
400
401    impl Layer for CompressionLayer {
402        fn kind(&self) -> LayerKind {
403            LayerKind::Compression
404        }
405
406        fn process(&self, input: LayerInput) -> LayerOutput {
407            let new_tokens = (input.tokens as f64 * self.ratio) as usize;
408            let truncated = if input.content.len() > new_tokens * 4 {
409                input.content[..new_tokens * 4].to_string()
410            } else {
411                input.content
412            };
413            LayerOutput {
414                content: truncated,
415                tokens: new_tokens,
416                metadata: input.metadata,
417            }
418        }
419    }
420
421    #[test]
422    fn layer_kind_all_ordered() {
423        let all = LayerKind::all();
424        assert_eq!(all.len(), 6);
425        assert_eq!(all[0], LayerKind::Input);
426        assert_eq!(all[5], LayerKind::Delivery);
427    }
428
429    #[test]
430    fn passthrough_preserves_content() {
431        let layer = PassthroughLayer {
432            kind: LayerKind::Input,
433        };
434        let input = LayerInput {
435            content: "hello world".to_string(),
436            tokens: 2,
437            metadata: HashMap::new(),
438        };
439        let output = layer.process(input);
440        assert_eq!(output.content, "hello world");
441        assert_eq!(output.tokens, 2);
442    }
443
444    #[test]
445    fn compression_layer_reduces() {
446        let layer = CompressionLayer { ratio: 0.5 };
447        let input = LayerInput {
448            content: "a ".repeat(100),
449            tokens: 100,
450            metadata: HashMap::new(),
451        };
452        let output = layer.process(input);
453        assert_eq!(output.tokens, 50);
454    }
455
456    #[test]
457    fn pipeline_chains_layers() {
458        let pipeline = Pipeline::new()
459            .add_layer(Box::new(PassthroughLayer {
460                kind: LayerKind::Input,
461            }))
462            .add_layer(Box::new(CompressionLayer { ratio: 0.5 }))
463            .add_layer(Box::new(PassthroughLayer {
464                kind: LayerKind::Delivery,
465            }));
466
467        let input = LayerInput {
468            content: "a ".repeat(100),
469            tokens: 100,
470            metadata: HashMap::new(),
471        };
472        let (output, metrics) = pipeline.execute(input);
473        assert_eq!(output.tokens, 50);
474        assert_eq!(metrics.len(), 3);
475        assert_eq!(metrics[0].layer, LayerKind::Input);
476        assert_eq!(metrics[1].layer, LayerKind::Compression);
477        assert_eq!(metrics[2].layer, LayerKind::Delivery);
478    }
479
480    #[test]
481    fn metrics_new_calculates_ratio() {
482        let m = LayerMetrics::new(LayerKind::Compression, 100, 50, 1000);
483        assert!((m.compression_ratio - 0.5).abs() < f64::EPSILON);
484    }
485
486    #[test]
487    fn metrics_format_readable() {
488        let metrics = vec![
489            LayerMetrics::new(LayerKind::Input, 1000, 1000, 100),
490            LayerMetrics::new(LayerKind::Compression, 1000, 300, 5000),
491            LayerMetrics::new(LayerKind::Delivery, 300, 300, 50),
492        ];
493        let formatted = Pipeline::format_metrics(&metrics);
494        assert!(formatted.contains("input"));
495        assert!(formatted.contains("compression"));
496        assert!(formatted.contains("delivery"));
497        assert!(formatted.contains("TOTAL"));
498    }
499
500    #[test]
501    fn empty_pipeline_passes_through() {
502        let pipeline = Pipeline::new();
503        let input = LayerInput {
504            content: "test".to_string(),
505            tokens: 1,
506            metadata: HashMap::new(),
507        };
508        let (output, metrics) = pipeline.execute(input);
509        assert_eq!(output.content, "test");
510        assert!(metrics.is_empty());
511    }
512
513    #[test]
514    fn pipeline_stats_record_and_summarize() {
515        let mut stats = PipelineStats::default();
516        let metrics = vec![
517            LayerMetrics::new(LayerKind::Input, 1000, 1000, 100),
518            LayerMetrics::new(LayerKind::Compression, 1000, 300, 5000),
519            LayerMetrics::new(LayerKind::Delivery, 300, 300, 50),
520        ];
521        stats.record(&metrics);
522        stats.record(&metrics);
523
524        assert_eq!(stats.runs, 2);
525        assert_eq!(stats.total_tokens_saved(), 1400);
526
527        let agg = stats.per_layer.get(&LayerKind::Compression).unwrap();
528        assert_eq!(agg.count, 2);
529        assert_eq!(agg.total_input_tokens, 2000);
530        assert_eq!(agg.total_output_tokens, 600);
531
532        let summary = stats.format_summary();
533        assert!(summary.contains("2 runs"));
534        assert!(summary.contains("SAVED: 1400"));
535    }
536
537    #[test]
538    fn aggregated_metrics_avg() {
539        let agg = AggregatedMetrics {
540            total_input_tokens: 1000,
541            total_output_tokens: 500,
542            total_duration_us: 10000,
543            count: 2,
544        };
545        assert!((agg.avg_ratio() - 0.5).abs() < f64::EPSILON);
546        assert!((agg.avg_duration_ms() - 5.0).abs() < f64::EPSILON);
547    }
548
549    #[test]
550    fn layer_kind_from_str_valid() {
551        assert_eq!("input".parse::<LayerKind>().unwrap(), LayerKind::Input);
552        assert_eq!("Intent".parse::<LayerKind>().unwrap(), LayerKind::Intent);
553        assert_eq!(
554            "COMPRESSION".parse::<LayerKind>().unwrap(),
555            LayerKind::Compression
556        );
557        assert_eq!(
558            "delivery".parse::<LayerKind>().unwrap(),
559            LayerKind::Delivery
560        );
561    }
562
563    #[test]
564    fn layer_kind_from_str_invalid() {
565        let err = "unknown".parse::<LayerKind>().unwrap_err();
566        assert!(err.contains("unknown pipeline layer"));
567        assert!(err.contains("input, intent, relevance"));
568    }
569
570    #[test]
571    fn layer_kind_roundtrip_str() {
572        for kind in LayerKind::all() {
573            let s = kind.as_str();
574            let parsed: LayerKind = s.parse().unwrap();
575            assert_eq!(*kind, parsed);
576        }
577    }
578
579    #[test]
580    fn pipeline_stats_record_single() {
581        let mut stats = PipelineStats::new();
582        stats.record_single(
583            LayerKind::Compression,
584            1000,
585            300,
586            std::time::Duration::from_millis(5),
587        );
588        assert_eq!(stats.runs, 1);
589        let agg = stats.per_layer.get(&LayerKind::Compression).unwrap();
590        assert_eq!(agg.total_input_tokens, 1000);
591        assert_eq!(agg.total_output_tokens, 300);
592        assert_eq!(agg.count, 1);
593    }
594
595    #[test]
596    fn pipeline_full_flow_integration() {
597        let pipeline = Pipeline::new()
598            .add_layer(Box::new(PassthroughLayer {
599                kind: LayerKind::Input,
600            }))
601            .add_layer(Box::new(PassthroughLayer {
602                kind: LayerKind::Intent,
603            }))
604            .add_layer(Box::new(PassthroughLayer {
605                kind: LayerKind::Relevance,
606            }))
607            .add_layer(Box::new(CompressionLayer { ratio: 0.3 }))
608            .add_layer(Box::new(PassthroughLayer {
609                kind: LayerKind::Translation,
610            }))
611            .add_layer(Box::new(PassthroughLayer {
612                kind: LayerKind::Delivery,
613            }));
614
615        let input = LayerInput {
616            content: "x ".repeat(500),
617            tokens: 500,
618            metadata: HashMap::new(),
619        };
620        let (output, metrics) = pipeline.execute(input);
621
622        assert_eq!(metrics.len(), 6, "all 6 layers should produce metrics");
623        assert_eq!(output.tokens, 150, "compression at 0.3 ratio");
624
625        for (i, kind) in LayerKind::all().iter().enumerate() {
626            assert_eq!(metrics[i].layer, *kind, "layer order must match");
627        }
628
629        let mut stats = PipelineStats::new();
630        stats.record(&metrics);
631        assert_eq!(stats.runs, 1);
632        assert_eq!(stats.total_tokens_saved(), 350);
633
634        let formatted = Pipeline::format_metrics(&metrics);
635        assert!(formatted.contains("TOTAL"));
636        assert!(formatted.contains("500"));
637    }
638
639    #[test]
640    fn is_layer_enabled_respects_config() {
641        let cfg = crate::core::profiles::PipelineConfig {
642            intent: false,
643            relevance: false,
644            compression: true,
645            translation: true,
646        };
647
648        assert!(is_layer_enabled(LayerKind::Input, &cfg));
649        assert!(!is_layer_enabled(LayerKind::Intent, &cfg));
650        assert!(!is_layer_enabled(LayerKind::Relevance, &cfg));
651        assert!(is_layer_enabled(LayerKind::Compression, &cfg));
652        assert!(is_layer_enabled(LayerKind::Translation, &cfg));
653        assert!(is_layer_enabled(LayerKind::Delivery, &cfg));
654    }
655
656    #[test]
657    fn add_layer_if_enabled_skips_disabled() {
658        let cfg = crate::core::profiles::PipelineConfig {
659            intent: false,
660            relevance: true,
661            compression: true,
662            translation: true,
663        };
664
665        let pipeline = Pipeline::new()
666            .add_layer_if_enabled(
667                Box::new(PassthroughLayer {
668                    kind: LayerKind::Input,
669                }),
670                &cfg,
671            )
672            .add_layer_if_enabled(
673                Box::new(PassthroughLayer {
674                    kind: LayerKind::Intent,
675                }),
676                &cfg,
677            )
678            .add_layer_if_enabled(Box::new(CompressionLayer { ratio: 0.5 }), &cfg)
679            .add_layer_if_enabled(
680                Box::new(PassthroughLayer {
681                    kind: LayerKind::Delivery,
682                }),
683                &cfg,
684            );
685
686        let input = LayerInput {
687            content: "x ".repeat(100),
688            tokens: 100,
689            metadata: HashMap::new(),
690        };
691        let (output, metrics) = pipeline.execute(input);
692
693        assert_eq!(
694            metrics.len(),
695            3,
696            "Intent layer should be skipped, leaving Input + Compression + Delivery"
697        );
698        assert_eq!(metrics[0].layer, LayerKind::Input);
699        assert_eq!(metrics[1].layer, LayerKind::Compression);
700        assert_eq!(metrics[2].layer, LayerKind::Delivery);
701        assert_eq!(output.tokens, 50);
702    }
703}