Skip to main content

lean_ctx/core/
pipeline.rs

1use std::collections::HashMap;
2
3/// Identifies a stage in the compression pipeline (input → intent → compression → delivery).
4#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
5pub enum LayerKind {
6    Input,
7    Intent,
8    Relevance,
9    Compression,
10    Translation,
11    Delivery,
12}
13
14impl LayerKind {
15    /// Returns the string label for this pipeline layer kind.
16    pub fn as_str(&self) -> &'static str {
17        match self {
18            Self::Input => "input",
19            Self::Intent => "intent",
20            Self::Relevance => "relevance",
21            Self::Compression => "compression",
22            Self::Translation => "translation",
23            Self::Delivery => "delivery",
24        }
25    }
26
27    /// Returns all layer kinds in pipeline execution order.
28    pub fn all() -> &'static [LayerKind] {
29        &[
30            Self::Input,
31            Self::Intent,
32            Self::Relevance,
33            Self::Compression,
34            Self::Translation,
35            Self::Delivery,
36        ]
37    }
38}
39
40impl std::fmt::Display for LayerKind {
41    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
42        write!(f, "{}", self.as_str())
43    }
44}
45
46/// Content and metadata passed into a pipeline layer for processing.
47#[derive(Debug, Clone)]
48pub struct LayerInput {
49    pub content: String,
50    pub tokens: usize,
51    pub metadata: HashMap<String, String>,
52}
53
54/// Result produced by a pipeline layer after processing.
55#[derive(Debug, Clone)]
56pub struct LayerOutput {
57    pub content: String,
58    pub tokens: usize,
59    pub metadata: HashMap<String, String>,
60}
61
62/// Performance metrics for a single layer execution: tokens in/out, timing, ratio.
63#[derive(Debug, Clone)]
64pub struct LayerMetrics {
65    pub layer: LayerKind,
66    pub input_tokens: usize,
67    pub output_tokens: usize,
68    pub duration_us: u64,
69    pub compression_ratio: f64,
70}
71
72impl LayerMetrics {
73    pub fn new(
74        layer: LayerKind,
75        input_tokens: usize,
76        output_tokens: usize,
77        duration_us: u64,
78    ) -> Self {
79        let ratio = if input_tokens == 0 {
80            1.0
81        } else {
82            output_tokens as f64 / input_tokens as f64
83        };
84        Self {
85            layer,
86            input_tokens,
87            output_tokens,
88            duration_us,
89            compression_ratio: ratio,
90        }
91    }
92}
93
94/// A single processing stage in the compression pipeline.
95pub trait Layer {
96    fn kind(&self) -> LayerKind;
97    fn process(&self, input: LayerInput) -> LayerOutput;
98}
99
100/// A chain of processing layers that content flows through sequentially.
101pub struct Pipeline {
102    layers: Vec<Box<dyn Layer>>,
103}
104
105impl Pipeline {
106    /// Creates an empty pipeline with no layers.
107    pub fn new() -> Self {
108        Self { layers: Vec::new() }
109    }
110
111    /// Appends a processing layer to the pipeline (builder pattern).
112    pub fn add_layer(mut self, layer: Box<dyn Layer>) -> Self {
113        self.layers.push(layer);
114        self
115    }
116
117    /// Runs all layers in sequence, collecting per-layer metrics.
118    pub fn execute(&self, input: LayerInput) -> (LayerOutput, Vec<LayerMetrics>) {
119        let mut current = input;
120        let mut metrics = Vec::new();
121
122        for layer in &self.layers {
123            let start = std::time::Instant::now();
124            let input_tokens = current.tokens;
125            let output = layer.process(current);
126            let duration = start.elapsed().as_micros() as u64;
127
128            metrics.push(LayerMetrics::new(
129                layer.kind(),
130                input_tokens,
131                output.tokens,
132                duration,
133            ));
134
135            current = LayerInput {
136                content: output.content,
137                tokens: output.tokens,
138                metadata: output.metadata,
139            };
140        }
141
142        let final_output = LayerOutput {
143            content: current.content,
144            tokens: current.tokens,
145            metadata: current.metadata,
146        };
147
148        (final_output, metrics)
149    }
150
151    /// Formats pipeline metrics as a human-readable summary with per-layer and total stats.
152    pub fn format_metrics(metrics: &[LayerMetrics]) -> String {
153        let mut out = String::from("Pipeline Metrics:\n");
154        let mut total_saved = 0usize;
155        for m in metrics {
156            let saved = m.input_tokens.saturating_sub(m.output_tokens);
157            total_saved += saved;
158            out.push_str(&format!(
159                "  {} : {} -> {} tok ({:.0}%, {:.1}ms)\n",
160                m.layer,
161                m.input_tokens,
162                m.output_tokens,
163                m.compression_ratio * 100.0,
164                m.duration_us as f64 / 1000.0,
165            ));
166        }
167        if let (Some(first), Some(last)) = (metrics.first(), metrics.last()) {
168            let total_ratio = if first.input_tokens == 0 {
169                1.0
170            } else {
171                last.output_tokens as f64 / first.input_tokens as f64
172            };
173            out.push_str(&format!(
174                "  TOTAL: {} -> {} tok ({:.0}%, saved {})\n",
175                first.input_tokens,
176                last.output_tokens,
177                total_ratio * 100.0,
178                total_saved,
179            ));
180        }
181        out
182    }
183}
184
185/// Persistent aggregated statistics across all pipeline runs.
186#[derive(Debug, Clone, Default, serde::Serialize, serde::Deserialize)]
187pub struct PipelineStats {
188    pub runs: usize,
189    pub per_layer: HashMap<LayerKind, AggregatedMetrics>,
190}
191
192/// Cumulative token counts and timing for a single pipeline layer across all runs.
193#[derive(Debug, Clone, Default, serde::Serialize, serde::Deserialize)]
194pub struct AggregatedMetrics {
195    pub total_input_tokens: usize,
196    pub total_output_tokens: usize,
197    pub total_duration_us: u64,
198    pub count: usize,
199}
200
201impl AggregatedMetrics {
202    /// Returns the average compression ratio (output/input) across all runs.
203    pub fn avg_ratio(&self) -> f64 {
204        if self.total_input_tokens == 0 {
205            return 1.0;
206        }
207        self.total_output_tokens as f64 / self.total_input_tokens as f64
208    }
209
210    /// Returns the average duration per invocation in milliseconds.
211    pub fn avg_duration_ms(&self) -> f64 {
212        if self.count == 0 {
213            return 0.0;
214        }
215        self.total_duration_us as f64 / self.count as f64 / 1000.0
216    }
217}
218
219impl PipelineStats {
220    /// Creates empty pipeline stats with zero runs.
221    pub fn new() -> Self {
222        Self {
223            runs: 0,
224            per_layer: HashMap::new(),
225        }
226    }
227
228    /// Records a batch of layer metrics from a single pipeline execution.
229    pub fn record(&mut self, metrics: &[LayerMetrics]) {
230        self.runs += 1;
231        for m in metrics {
232            let agg = self.per_layer.entry(m.layer).or_default();
233            agg.total_input_tokens += m.input_tokens;
234            agg.total_output_tokens += m.output_tokens;
235            agg.total_duration_us += m.duration_us;
236            agg.count += 1;
237        }
238    }
239
240    /// Records metrics for a single layer execution.
241    pub fn record_single(
242        &mut self,
243        layer: LayerKind,
244        input_tokens: usize,
245        output_tokens: usize,
246        duration: std::time::Duration,
247    ) {
248        self.runs += 1;
249        let agg = self.per_layer.entry(layer).or_default();
250        agg.total_input_tokens += input_tokens;
251        agg.total_output_tokens += output_tokens;
252        agg.total_duration_us += duration.as_micros() as u64;
253        agg.count += 1;
254    }
255
256    /// Returns the total tokens saved across all pipeline layers.
257    pub fn total_tokens_saved(&self) -> usize {
258        self.per_layer
259            .values()
260            .map(|a| a.total_input_tokens.saturating_sub(a.total_output_tokens))
261            .sum()
262    }
263
264    /// Persists pipeline stats to `~/.lean-ctx/pipeline_stats.json`.
265    pub fn save(&self) {
266        if let Ok(dir) = crate::core::data_dir::lean_ctx_data_dir() {
267            let path = dir.join("pipeline_stats.json");
268            if let Ok(json) = serde_json::to_string(self) {
269                let _ = std::fs::write(path, json);
270            }
271        }
272    }
273
274    /// Loads pipeline stats from disk, returning defaults if absent.
275    pub fn load() -> Self {
276        crate::core::data_dir::lean_ctx_data_dir()
277            .ok()
278            .map(|d| d.join("pipeline_stats.json"))
279            .and_then(|p| std::fs::read_to_string(p).ok())
280            .and_then(|s| serde_json::from_str(&s).ok())
281            .unwrap_or_default()
282    }
283
284    /// Formats a human-readable summary of per-layer stats and total savings.
285    pub fn format_summary(&self) -> String {
286        let mut out = format!("Pipeline Stats ({} runs):\n", self.runs);
287        for kind in LayerKind::all() {
288            if let Some(agg) = self.per_layer.get(kind) {
289                out.push_str(&format!(
290                    "  {}: avg {:.0}% ratio, {:.1}ms, {} invocations\n",
291                    kind,
292                    agg.avg_ratio() * 100.0,
293                    agg.avg_duration_ms(),
294                    agg.count,
295                ));
296            }
297        }
298        out.push_str(&format!("  SAVED: {} tokens\n", self.total_tokens_saved()));
299        out
300    }
301}
302
303impl Default for Pipeline {
304    fn default() -> Self {
305        Self::new()
306    }
307}
308
309#[cfg(test)]
310mod tests {
311    use super::*;
312
313    struct PassthroughLayer {
314        kind: LayerKind,
315    }
316
317    impl Layer for PassthroughLayer {
318        fn kind(&self) -> LayerKind {
319            self.kind
320        }
321
322        fn process(&self, input: LayerInput) -> LayerOutput {
323            LayerOutput {
324                content: input.content,
325                tokens: input.tokens,
326                metadata: input.metadata,
327            }
328        }
329    }
330
331    struct CompressionLayer {
332        ratio: f64,
333    }
334
335    impl Layer for CompressionLayer {
336        fn kind(&self) -> LayerKind {
337            LayerKind::Compression
338        }
339
340        fn process(&self, input: LayerInput) -> LayerOutput {
341            let new_tokens = (input.tokens as f64 * self.ratio) as usize;
342            let truncated = if input.content.len() > new_tokens * 4 {
343                input.content[..new_tokens * 4].to_string()
344            } else {
345                input.content
346            };
347            LayerOutput {
348                content: truncated,
349                tokens: new_tokens,
350                metadata: input.metadata,
351            }
352        }
353    }
354
355    #[test]
356    fn layer_kind_all_ordered() {
357        let all = LayerKind::all();
358        assert_eq!(all.len(), 6);
359        assert_eq!(all[0], LayerKind::Input);
360        assert_eq!(all[5], LayerKind::Delivery);
361    }
362
363    #[test]
364    fn passthrough_preserves_content() {
365        let layer = PassthroughLayer {
366            kind: LayerKind::Input,
367        };
368        let input = LayerInput {
369            content: "hello world".to_string(),
370            tokens: 2,
371            metadata: HashMap::new(),
372        };
373        let output = layer.process(input);
374        assert_eq!(output.content, "hello world");
375        assert_eq!(output.tokens, 2);
376    }
377
378    #[test]
379    fn compression_layer_reduces() {
380        let layer = CompressionLayer { ratio: 0.5 };
381        let input = LayerInput {
382            content: "a ".repeat(100),
383            tokens: 100,
384            metadata: HashMap::new(),
385        };
386        let output = layer.process(input);
387        assert_eq!(output.tokens, 50);
388    }
389
390    #[test]
391    fn pipeline_chains_layers() {
392        let pipeline = Pipeline::new()
393            .add_layer(Box::new(PassthroughLayer {
394                kind: LayerKind::Input,
395            }))
396            .add_layer(Box::new(CompressionLayer { ratio: 0.5 }))
397            .add_layer(Box::new(PassthroughLayer {
398                kind: LayerKind::Delivery,
399            }));
400
401        let input = LayerInput {
402            content: "a ".repeat(100),
403            tokens: 100,
404            metadata: HashMap::new(),
405        };
406        let (output, metrics) = pipeline.execute(input);
407        assert_eq!(output.tokens, 50);
408        assert_eq!(metrics.len(), 3);
409        assert_eq!(metrics[0].layer, LayerKind::Input);
410        assert_eq!(metrics[1].layer, LayerKind::Compression);
411        assert_eq!(metrics[2].layer, LayerKind::Delivery);
412    }
413
414    #[test]
415    fn metrics_new_calculates_ratio() {
416        let m = LayerMetrics::new(LayerKind::Compression, 100, 50, 1000);
417        assert!((m.compression_ratio - 0.5).abs() < f64::EPSILON);
418    }
419
420    #[test]
421    fn metrics_format_readable() {
422        let metrics = vec![
423            LayerMetrics::new(LayerKind::Input, 1000, 1000, 100),
424            LayerMetrics::new(LayerKind::Compression, 1000, 300, 5000),
425            LayerMetrics::new(LayerKind::Delivery, 300, 300, 50),
426        ];
427        let formatted = Pipeline::format_metrics(&metrics);
428        assert!(formatted.contains("input"));
429        assert!(formatted.contains("compression"));
430        assert!(formatted.contains("delivery"));
431        assert!(formatted.contains("TOTAL"));
432    }
433
434    #[test]
435    fn empty_pipeline_passes_through() {
436        let pipeline = Pipeline::new();
437        let input = LayerInput {
438            content: "test".to_string(),
439            tokens: 1,
440            metadata: HashMap::new(),
441        };
442        let (output, metrics) = pipeline.execute(input);
443        assert_eq!(output.content, "test");
444        assert!(metrics.is_empty());
445    }
446
447    #[test]
448    fn pipeline_stats_record_and_summarize() {
449        let mut stats = PipelineStats::default();
450        let metrics = vec![
451            LayerMetrics::new(LayerKind::Input, 1000, 1000, 100),
452            LayerMetrics::new(LayerKind::Compression, 1000, 300, 5000),
453            LayerMetrics::new(LayerKind::Delivery, 300, 300, 50),
454        ];
455        stats.record(&metrics);
456        stats.record(&metrics);
457
458        assert_eq!(stats.runs, 2);
459        assert_eq!(stats.total_tokens_saved(), 1400);
460
461        let agg = stats.per_layer.get(&LayerKind::Compression).unwrap();
462        assert_eq!(agg.count, 2);
463        assert_eq!(agg.total_input_tokens, 2000);
464        assert_eq!(agg.total_output_tokens, 600);
465
466        let summary = stats.format_summary();
467        assert!(summary.contains("2 runs"));
468        assert!(summary.contains("SAVED: 1400"));
469    }
470
471    #[test]
472    fn aggregated_metrics_avg() {
473        let agg = AggregatedMetrics {
474            total_input_tokens: 1000,
475            total_output_tokens: 500,
476            total_duration_us: 10000,
477            count: 2,
478        };
479        assert!((agg.avg_ratio() - 0.5).abs() < f64::EPSILON);
480        assert!((agg.avg_duration_ms() - 5.0).abs() < f64::EPSILON);
481    }
482}