Skip to main content

lean_ctx/core/
pipeline.rs

1//! # Context Pipeline
2//!
3//! The pipeline defines the processing stages that content flows through
4//! between raw input and the compressed output delivered to the LLM.
5//!
6//! ## Pipeline Flow
7//!
8//! ```text
9//! Input → Intent → Relevance → Compression → Translation → Delivery
10//! ```
11//!
12//! - **Input**: Raw file content / shell output enters the pipeline
13//! - **Intent**: Task-conditioned filtering — what is relevant to the current goal?
14//! - **Relevance**: Graph/heatmap-based prioritization of content sections
15//! - **Compression**: AST signatures, entropy filtering, delta encoding
16//! - **Translation**: Token shorthand (TDD), symbol replacement
17//! - **Delivery**: LITM positioning, CRP formatting, final output assembly
18//!
19//! Each layer can be enabled/disabled per profile (see `core::profiles`).
20//! `PipelineStats` aggregates per-layer metrics across all runs for observability.
21
22use std::collections::HashMap;
23
24/// Identifies a stage in the compression pipeline.
25///
26/// Layers execute in the order defined by [`LayerKind::all`]:
27/// Input → Intent → Relevance → Compression → Translation → Delivery.
28#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
29pub enum LayerKind {
30    Input,
31    Autonomy,
32    Intent,
33    Relevance,
34    Compression,
35    Translation,
36    Delivery,
37}
38
39impl LayerKind {
40    /// Returns the canonical string label for this layer.
41    pub fn as_str(&self) -> &'static str {
42        match self {
43            Self::Input => "input",
44            Self::Autonomy => "autonomy",
45            Self::Intent => "intent",
46            Self::Relevance => "relevance",
47            Self::Compression => "compression",
48            Self::Translation => "translation",
49            Self::Delivery => "delivery",
50        }
51    }
52
53    /// Returns all layer kinds in pipeline execution order.
54    pub fn all() -> &'static [LayerKind] {
55        &[
56            Self::Input,
57            Self::Autonomy,
58            Self::Intent,
59            Self::Relevance,
60            Self::Compression,
61            Self::Translation,
62            Self::Delivery,
63        ]
64    }
65}
66
67impl std::fmt::Display for LayerKind {
68    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
69        write!(f, "{}", self.as_str())
70    }
71}
72
73impl std::str::FromStr for LayerKind {
74    type Err = String;
75
76    fn from_str(s: &str) -> Result<Self, Self::Err> {
77        match s.to_ascii_lowercase().as_str() {
78            "input" => Ok(Self::Input),
79            "autonomy" => Ok(Self::Autonomy),
80            "intent" => Ok(Self::Intent),
81            "relevance" => Ok(Self::Relevance),
82            "compression" => Ok(Self::Compression),
83            "translation" => Ok(Self::Translation),
84            "delivery" => Ok(Self::Delivery),
85            _ => Err(format!(
86                "unknown pipeline layer '{s}'; expected one of: input, autonomy, intent, relevance, compression, translation, delivery"
87            )),
88        }
89    }
90}
91
92/// Content and metadata passed into a pipeline layer for processing.
93#[derive(Debug, Clone)]
94pub struct LayerInput {
95    pub content: String,
96    pub tokens: usize,
97    pub metadata: HashMap<String, String>,
98}
99
100/// Result produced by a pipeline layer after processing.
101#[derive(Debug, Clone)]
102pub struct LayerOutput {
103    pub content: String,
104    pub tokens: usize,
105    pub metadata: HashMap<String, String>,
106}
107
108/// Performance metrics for a single layer execution: tokens in/out, timing, ratio.
109#[derive(Debug, Clone)]
110pub struct LayerMetrics {
111    pub layer: LayerKind,
112    pub input_tokens: usize,
113    pub output_tokens: usize,
114    pub duration_us: u64,
115    pub compression_ratio: f64,
116}
117
118impl LayerMetrics {
119    pub fn new(
120        layer: LayerKind,
121        input_tokens: usize,
122        output_tokens: usize,
123        duration_us: u64,
124    ) -> Self {
125        let ratio = if input_tokens == 0 {
126            1.0
127        } else {
128            output_tokens as f64 / input_tokens as f64
129        };
130        Self {
131            layer,
132            input_tokens,
133            output_tokens,
134            duration_us,
135            compression_ratio: ratio,
136        }
137    }
138}
139
140/// A single processing stage in the compression pipeline.
141pub trait Layer {
142    fn kind(&self) -> LayerKind;
143    fn process(&self, input: LayerInput) -> LayerOutput;
144}
145
146/// Returns whether a given layer is enabled according to a profile's pipeline config.
147pub fn is_layer_enabled(kind: LayerKind, cfg: &crate::core::profiles::PipelineConfig) -> bool {
148    match kind {
149        LayerKind::Input | LayerKind::Autonomy | LayerKind::Delivery => true,
150        LayerKind::Intent => cfg.intent_effective(),
151        LayerKind::Relevance => cfg.relevance_effective(),
152        LayerKind::Compression => cfg.compression_effective(),
153        LayerKind::Translation => cfg.translation_effective(),
154    }
155}
156
157/// A chain of processing layers that content flows through sequentially.
158pub struct Pipeline {
159    layers: Vec<Box<dyn Layer>>,
160}
161
162impl Pipeline {
163    /// Creates an empty pipeline with no layers.
164    pub fn new() -> Self {
165        Self { layers: Vec::new() }
166    }
167
168    /// Appends a processing layer to the pipeline (builder pattern).
169    pub fn add_layer(mut self, layer: Box<dyn Layer>) -> Self {
170        self.layers.push(layer);
171        self
172    }
173
174    /// Appends a layer only if the profile's pipeline config allows it.
175    pub fn add_layer_if_enabled(
176        self,
177        layer: Box<dyn Layer>,
178        cfg: &crate::core::profiles::PipelineConfig,
179    ) -> Self {
180        if is_layer_enabled(layer.kind(), cfg) {
181            self.add_layer(layer)
182        } else {
183            self
184        }
185    }
186
187    /// Runs all layers in sequence, collecting per-layer metrics.
188    pub fn execute(&self, input: LayerInput) -> (LayerOutput, Vec<LayerMetrics>) {
189        let mut current = input;
190        let mut metrics = Vec::new();
191
192        for layer in &self.layers {
193            let start = std::time::Instant::now();
194            let input_tokens = current.tokens;
195            let output = layer.process(current);
196            let duration = start.elapsed().as_micros() as u64;
197
198            metrics.push(LayerMetrics::new(
199                layer.kind(),
200                input_tokens,
201                output.tokens,
202                duration,
203            ));
204
205            current = LayerInput {
206                content: output.content,
207                tokens: output.tokens,
208                metadata: output.metadata,
209            };
210        }
211
212        let final_output = LayerOutput {
213            content: current.content,
214            tokens: current.tokens,
215            metadata: current.metadata,
216        };
217
218        (final_output, metrics)
219    }
220
221    /// Formats pipeline metrics as a human-readable summary with per-layer and total stats.
222    pub fn format_metrics(metrics: &[LayerMetrics]) -> String {
223        let mut out = String::from("Pipeline Metrics:\n");
224        let mut total_saved = 0usize;
225        for m in metrics {
226            let saved = m.input_tokens.saturating_sub(m.output_tokens);
227            total_saved += saved;
228            out.push_str(&format!(
229                "  {} : {} -> {} tok ({:.0}%, {:.1}ms)\n",
230                m.layer,
231                m.input_tokens,
232                m.output_tokens,
233                m.compression_ratio * 100.0,
234                m.duration_us as f64 / 1000.0,
235            ));
236        }
237        if let (Some(first), Some(last)) = (metrics.first(), metrics.last()) {
238            let total_ratio = if first.input_tokens == 0 {
239                1.0
240            } else {
241                last.output_tokens as f64 / first.input_tokens as f64
242            };
243            out.push_str(&format!(
244                "  TOTAL: {} -> {} tok ({:.0}%, saved {})\n",
245                first.input_tokens,
246                last.output_tokens,
247                total_ratio * 100.0,
248                total_saved,
249            ));
250        }
251        out
252    }
253}
254
255/// Persistent aggregated statistics across all pipeline runs.
256#[derive(Debug, Clone, Default, serde::Serialize, serde::Deserialize)]
257pub struct PipelineStats {
258    pub runs: usize,
259    pub per_layer: HashMap<LayerKind, AggregatedMetrics>,
260}
261
262/// Cumulative token counts and timing for a single pipeline layer across all runs.
263#[derive(Debug, Clone, Default, serde::Serialize, serde::Deserialize)]
264pub struct AggregatedMetrics {
265    pub total_input_tokens: usize,
266    pub total_output_tokens: usize,
267    pub total_duration_us: u64,
268    pub count: usize,
269}
270
271impl AggregatedMetrics {
272    /// Returns the average compression ratio (output/input) across all runs.
273    pub fn avg_ratio(&self) -> f64 {
274        if self.total_input_tokens == 0 {
275            return 1.0;
276        }
277        self.total_output_tokens as f64 / self.total_input_tokens as f64
278    }
279
280    /// Returns the average duration per invocation in milliseconds.
281    pub fn avg_duration_ms(&self) -> f64 {
282        if self.count == 0 {
283            return 0.0;
284        }
285        self.total_duration_us as f64 / self.count as f64 / 1000.0
286    }
287}
288
289impl PipelineStats {
290    /// Creates empty pipeline stats with zero runs.
291    pub fn new() -> Self {
292        Self {
293            runs: 0,
294            per_layer: HashMap::new(),
295        }
296    }
297
298    /// Records a batch of layer metrics from a single pipeline execution.
299    pub fn record(&mut self, metrics: &[LayerMetrics]) {
300        self.runs += 1;
301        for m in metrics {
302            let agg = self.per_layer.entry(m.layer).or_default();
303            agg.total_input_tokens += m.input_tokens;
304            agg.total_output_tokens += m.output_tokens;
305            agg.total_duration_us += m.duration_us;
306            agg.count += 1;
307        }
308    }
309
310    /// Records metrics for a single layer execution.
311    pub fn record_single(
312        &mut self,
313        layer: LayerKind,
314        input_tokens: usize,
315        output_tokens: usize,
316        duration: std::time::Duration,
317    ) {
318        self.runs += 1;
319        let agg = self.per_layer.entry(layer).or_default();
320        agg.total_input_tokens += input_tokens;
321        agg.total_output_tokens += output_tokens;
322        agg.total_duration_us += duration.as_micros() as u64;
323        agg.count += 1;
324    }
325
326    /// Returns the total tokens saved across all pipeline layers.
327    pub fn total_tokens_saved(&self) -> usize {
328        self.per_layer
329            .values()
330            .map(|a| a.total_input_tokens.saturating_sub(a.total_output_tokens))
331            .sum()
332    }
333
334    /// Persists pipeline stats to `~/.lean-ctx/pipeline_stats.json`.
335    pub fn save(&self) {
336        if let Ok(dir) = crate::core::data_dir::lean_ctx_data_dir() {
337            let path = dir.join("pipeline_stats.json");
338            if let Ok(json) = serde_json::to_string(self) {
339                let _ = std::fs::write(path, json);
340            }
341        }
342    }
343
344    /// Loads pipeline stats from disk, returning defaults if absent.
345    pub fn load() -> Self {
346        crate::core::data_dir::lean_ctx_data_dir()
347            .ok()
348            .map(|d| d.join("pipeline_stats.json"))
349            .and_then(|p| std::fs::read_to_string(p).ok())
350            .and_then(|s| serde_json::from_str(&s).ok())
351            .unwrap_or_default()
352    }
353
354    /// Formats a human-readable summary of per-layer stats and total savings.
355    pub fn format_summary(&self) -> String {
356        let mut out = format!("Pipeline Stats ({} runs):\n", self.runs);
357        for kind in LayerKind::all() {
358            if let Some(agg) = self.per_layer.get(kind) {
359                out.push_str(&format!(
360                    "  {}: avg {:.0}% ratio, {:.1}ms, {} invocations\n",
361                    kind,
362                    agg.avg_ratio() * 100.0,
363                    agg.avg_duration_ms(),
364                    agg.count,
365                ));
366            }
367        }
368        out.push_str(&format!("  SAVED: {} tokens\n", self.total_tokens_saved()));
369        out
370    }
371}
372
373impl Default for Pipeline {
374    fn default() -> Self {
375        Self::new()
376    }
377}
378
379#[cfg(test)]
380mod tests {
381    use super::*;
382
383    struct PassthroughLayer {
384        kind: LayerKind,
385    }
386
387    impl Layer for PassthroughLayer {
388        fn kind(&self) -> LayerKind {
389            self.kind
390        }
391
392        fn process(&self, input: LayerInput) -> LayerOutput {
393            LayerOutput {
394                content: input.content,
395                tokens: input.tokens,
396                metadata: input.metadata,
397            }
398        }
399    }
400
401    struct CompressionLayer {
402        ratio: f64,
403    }
404
405    impl Layer for CompressionLayer {
406        fn kind(&self) -> LayerKind {
407            LayerKind::Compression
408        }
409
410        fn process(&self, input: LayerInput) -> LayerOutput {
411            let new_tokens = (input.tokens as f64 * self.ratio) as usize;
412            let truncated = if input.content.len() > new_tokens * 4 {
413                input.content[..new_tokens * 4].to_string()
414            } else {
415                input.content
416            };
417            LayerOutput {
418                content: truncated,
419                tokens: new_tokens,
420                metadata: input.metadata,
421            }
422        }
423    }
424
425    #[test]
426    fn layer_kind_all_ordered() {
427        let all = LayerKind::all();
428        assert_eq!(all.len(), 7);
429        assert_eq!(all[0], LayerKind::Input);
430        assert_eq!(all[1], LayerKind::Autonomy);
431        assert_eq!(all[6], LayerKind::Delivery);
432    }
433
434    #[test]
435    fn passthrough_preserves_content() {
436        let layer = PassthroughLayer {
437            kind: LayerKind::Input,
438        };
439        let input = LayerInput {
440            content: "hello world".to_string(),
441            tokens: 2,
442            metadata: HashMap::new(),
443        };
444        let output = layer.process(input);
445        assert_eq!(output.content, "hello world");
446        assert_eq!(output.tokens, 2);
447    }
448
449    #[test]
450    fn compression_layer_reduces() {
451        let layer = CompressionLayer { ratio: 0.5 };
452        let input = LayerInput {
453            content: "a ".repeat(100),
454            tokens: 100,
455            metadata: HashMap::new(),
456        };
457        let output = layer.process(input);
458        assert_eq!(output.tokens, 50);
459    }
460
461    #[test]
462    fn pipeline_chains_layers() {
463        let pipeline = Pipeline::new()
464            .add_layer(Box::new(PassthroughLayer {
465                kind: LayerKind::Input,
466            }))
467            .add_layer(Box::new(CompressionLayer { ratio: 0.5 }))
468            .add_layer(Box::new(PassthroughLayer {
469                kind: LayerKind::Delivery,
470            }));
471
472        let input = LayerInput {
473            content: "a ".repeat(100),
474            tokens: 100,
475            metadata: HashMap::new(),
476        };
477        let (output, metrics) = pipeline.execute(input);
478        assert_eq!(output.tokens, 50);
479        assert_eq!(metrics.len(), 3);
480        assert_eq!(metrics[0].layer, LayerKind::Input);
481        assert_eq!(metrics[1].layer, LayerKind::Compression);
482        assert_eq!(metrics[2].layer, LayerKind::Delivery);
483    }
484
485    #[test]
486    fn metrics_new_calculates_ratio() {
487        let m = LayerMetrics::new(LayerKind::Compression, 100, 50, 1000);
488        assert!((m.compression_ratio - 0.5).abs() < f64::EPSILON);
489    }
490
491    #[test]
492    fn metrics_format_readable() {
493        let metrics = vec![
494            LayerMetrics::new(LayerKind::Input, 1000, 1000, 100),
495            LayerMetrics::new(LayerKind::Compression, 1000, 300, 5000),
496            LayerMetrics::new(LayerKind::Delivery, 300, 300, 50),
497        ];
498        let formatted = Pipeline::format_metrics(&metrics);
499        assert!(formatted.contains("input"));
500        assert!(formatted.contains("compression"));
501        assert!(formatted.contains("delivery"));
502        assert!(formatted.contains("TOTAL"));
503    }
504
505    #[test]
506    fn empty_pipeline_passes_through() {
507        let pipeline = Pipeline::new();
508        let input = LayerInput {
509            content: "test".to_string(),
510            tokens: 1,
511            metadata: HashMap::new(),
512        };
513        let (output, metrics) = pipeline.execute(input);
514        assert_eq!(output.content, "test");
515        assert!(metrics.is_empty());
516    }
517
518    #[test]
519    fn pipeline_stats_record_and_summarize() {
520        let mut stats = PipelineStats::default();
521        let metrics = vec![
522            LayerMetrics::new(LayerKind::Input, 1000, 1000, 100),
523            LayerMetrics::new(LayerKind::Compression, 1000, 300, 5000),
524            LayerMetrics::new(LayerKind::Delivery, 300, 300, 50),
525        ];
526        stats.record(&metrics);
527        stats.record(&metrics);
528
529        assert_eq!(stats.runs, 2);
530        assert_eq!(stats.total_tokens_saved(), 1400);
531
532        let agg = stats.per_layer.get(&LayerKind::Compression).unwrap();
533        assert_eq!(agg.count, 2);
534        assert_eq!(agg.total_input_tokens, 2000);
535        assert_eq!(agg.total_output_tokens, 600);
536
537        let summary = stats.format_summary();
538        assert!(summary.contains("2 runs"));
539        assert!(summary.contains("SAVED: 1400"));
540    }
541
542    #[test]
543    fn aggregated_metrics_avg() {
544        let agg = AggregatedMetrics {
545            total_input_tokens: 1000,
546            total_output_tokens: 500,
547            total_duration_us: 10000,
548            count: 2,
549        };
550        assert!((agg.avg_ratio() - 0.5).abs() < f64::EPSILON);
551        assert!((agg.avg_duration_ms() - 5.0).abs() < f64::EPSILON);
552    }
553
554    #[test]
555    fn layer_kind_from_str_valid() {
556        assert_eq!("input".parse::<LayerKind>().unwrap(), LayerKind::Input);
557        assert_eq!("Intent".parse::<LayerKind>().unwrap(), LayerKind::Intent);
558        assert_eq!(
559            "COMPRESSION".parse::<LayerKind>().unwrap(),
560            LayerKind::Compression
561        );
562        assert_eq!(
563            "delivery".parse::<LayerKind>().unwrap(),
564            LayerKind::Delivery
565        );
566    }
567
568    #[test]
569    fn layer_kind_from_str_invalid() {
570        let err = "unknown".parse::<LayerKind>().unwrap_err();
571        assert!(err.contains("unknown pipeline layer"));
572        assert!(err.contains("input, autonomy, intent"));
573    }
574
575    #[test]
576    fn layer_kind_roundtrip_str() {
577        for kind in LayerKind::all() {
578            let s = kind.as_str();
579            let parsed: LayerKind = s.parse().unwrap();
580            assert_eq!(*kind, parsed);
581        }
582    }
583
584    #[test]
585    fn pipeline_stats_record_single() {
586        let mut stats = PipelineStats::new();
587        stats.record_single(
588            LayerKind::Compression,
589            1000,
590            300,
591            std::time::Duration::from_millis(5),
592        );
593        assert_eq!(stats.runs, 1);
594        let agg = stats.per_layer.get(&LayerKind::Compression).unwrap();
595        assert_eq!(agg.total_input_tokens, 1000);
596        assert_eq!(agg.total_output_tokens, 300);
597        assert_eq!(agg.count, 1);
598    }
599
600    #[test]
601    fn pipeline_full_flow_integration() {
602        let pipeline = Pipeline::new()
603            .add_layer(Box::new(PassthroughLayer {
604                kind: LayerKind::Input,
605            }))
606            .add_layer(Box::new(PassthroughLayer {
607                kind: LayerKind::Autonomy,
608            }))
609            .add_layer(Box::new(PassthroughLayer {
610                kind: LayerKind::Intent,
611            }))
612            .add_layer(Box::new(PassthroughLayer {
613                kind: LayerKind::Relevance,
614            }))
615            .add_layer(Box::new(CompressionLayer { ratio: 0.3 }))
616            .add_layer(Box::new(PassthroughLayer {
617                kind: LayerKind::Translation,
618            }))
619            .add_layer(Box::new(PassthroughLayer {
620                kind: LayerKind::Delivery,
621            }));
622
623        let input = LayerInput {
624            content: "x ".repeat(500),
625            tokens: 500,
626            metadata: HashMap::new(),
627        };
628        let (output, metrics) = pipeline.execute(input);
629
630        assert_eq!(metrics.len(), 7, "all layers should produce metrics");
631        assert_eq!(output.tokens, 150, "compression at 0.3 ratio");
632
633        for (i, kind) in LayerKind::all().iter().enumerate() {
634            assert_eq!(metrics[i].layer, *kind, "layer order must match");
635        }
636
637        let mut stats = PipelineStats::new();
638        stats.record(&metrics);
639        assert_eq!(stats.runs, 1);
640        assert_eq!(stats.total_tokens_saved(), 350);
641
642        let formatted = Pipeline::format_metrics(&metrics);
643        assert!(formatted.contains("TOTAL"));
644        assert!(formatted.contains("500"));
645    }
646
647    #[test]
648    fn is_layer_enabled_respects_config() {
649        let cfg = crate::core::profiles::PipelineConfig {
650            intent: Some(false),
651            relevance: Some(false),
652            compression: Some(true),
653            translation: Some(true),
654        };
655
656        assert!(is_layer_enabled(LayerKind::Input, &cfg));
657        assert!(!is_layer_enabled(LayerKind::Intent, &cfg));
658        assert!(!is_layer_enabled(LayerKind::Relevance, &cfg));
659        assert!(is_layer_enabled(LayerKind::Compression, &cfg));
660        assert!(is_layer_enabled(LayerKind::Translation, &cfg));
661        assert!(is_layer_enabled(LayerKind::Delivery, &cfg));
662    }
663
664    #[test]
665    fn add_layer_if_enabled_skips_disabled() {
666        let cfg = crate::core::profiles::PipelineConfig {
667            intent: Some(false),
668            relevance: Some(true),
669            compression: Some(true),
670            translation: Some(true),
671        };
672
673        let pipeline = Pipeline::new()
674            .add_layer_if_enabled(
675                Box::new(PassthroughLayer {
676                    kind: LayerKind::Input,
677                }),
678                &cfg,
679            )
680            .add_layer_if_enabled(
681                Box::new(PassthroughLayer {
682                    kind: LayerKind::Intent,
683                }),
684                &cfg,
685            )
686            .add_layer_if_enabled(Box::new(CompressionLayer { ratio: 0.5 }), &cfg)
687            .add_layer_if_enabled(
688                Box::new(PassthroughLayer {
689                    kind: LayerKind::Delivery,
690                }),
691                &cfg,
692            );
693
694        let input = LayerInput {
695            content: "x ".repeat(100),
696            tokens: 100,
697            metadata: HashMap::new(),
698        };
699        let (output, metrics) = pipeline.execute(input);
700
701        assert_eq!(
702            metrics.len(),
703            3,
704            "Intent layer should be skipped, leaving Input + Compression + Delivery"
705        );
706        assert_eq!(metrics[0].layer, LayerKind::Input);
707        assert_eq!(metrics[1].layer, LayerKind::Compression);
708        assert_eq!(metrics[2].layer, LayerKind::Delivery);
709        assert_eq!(output.tokens, 50);
710    }
711}