1use std::collections::HashMap;
23
24#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
29pub enum LayerKind {
30 Input,
31 Autonomy,
32 Intent,
33 Relevance,
34 Compression,
35 Translation,
36 Delivery,
37}
38
39impl LayerKind {
40 pub fn as_str(&self) -> &'static str {
42 match self {
43 Self::Input => "input",
44 Self::Autonomy => "autonomy",
45 Self::Intent => "intent",
46 Self::Relevance => "relevance",
47 Self::Compression => "compression",
48 Self::Translation => "translation",
49 Self::Delivery => "delivery",
50 }
51 }
52
53 pub fn all() -> &'static [LayerKind] {
55 &[
56 Self::Input,
57 Self::Autonomy,
58 Self::Intent,
59 Self::Relevance,
60 Self::Compression,
61 Self::Translation,
62 Self::Delivery,
63 ]
64 }
65}
66
67impl std::fmt::Display for LayerKind {
68 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
69 write!(f, "{}", self.as_str())
70 }
71}
72
73impl std::str::FromStr for LayerKind {
74 type Err = String;
75
76 fn from_str(s: &str) -> Result<Self, Self::Err> {
77 match s.to_ascii_lowercase().as_str() {
78 "input" => Ok(Self::Input),
79 "autonomy" => Ok(Self::Autonomy),
80 "intent" => Ok(Self::Intent),
81 "relevance" => Ok(Self::Relevance),
82 "compression" => Ok(Self::Compression),
83 "translation" => Ok(Self::Translation),
84 "delivery" => Ok(Self::Delivery),
85 _ => Err(format!(
86 "unknown pipeline layer '{s}'; expected one of: input, autonomy, intent, relevance, compression, translation, delivery"
87 )),
88 }
89 }
90}
91
92#[derive(Debug, Clone)]
94pub struct LayerInput {
95 pub content: String,
96 pub tokens: usize,
97 pub metadata: HashMap<String, String>,
98}
99
100#[derive(Debug, Clone)]
102pub struct LayerOutput {
103 pub content: String,
104 pub tokens: usize,
105 pub metadata: HashMap<String, String>,
106}
107
108#[derive(Debug, Clone)]
110pub struct LayerMetrics {
111 pub layer: LayerKind,
112 pub input_tokens: usize,
113 pub output_tokens: usize,
114 pub duration_us: u64,
115 pub compression_ratio: f64,
116}
117
118impl LayerMetrics {
119 pub fn new(
120 layer: LayerKind,
121 input_tokens: usize,
122 output_tokens: usize,
123 duration_us: u64,
124 ) -> Self {
125 let ratio = if input_tokens == 0 {
126 1.0
127 } else {
128 output_tokens as f64 / input_tokens as f64
129 };
130 Self {
131 layer,
132 input_tokens,
133 output_tokens,
134 duration_us,
135 compression_ratio: ratio,
136 }
137 }
138}
139
140pub trait Layer {
142 fn kind(&self) -> LayerKind;
143 fn process(&self, input: LayerInput) -> LayerOutput;
144}
145
146pub fn is_layer_enabled(kind: LayerKind, cfg: &crate::core::profiles::PipelineConfig) -> bool {
148 match kind {
149 LayerKind::Input | LayerKind::Autonomy | LayerKind::Delivery => true,
150 LayerKind::Intent => cfg.intent_effective(),
151 LayerKind::Relevance => cfg.relevance_effective(),
152 LayerKind::Compression => cfg.compression_effective(),
153 LayerKind::Translation => cfg.translation_effective(),
154 }
155}
156
157pub struct Pipeline {
159 layers: Vec<Box<dyn Layer>>,
160}
161
162impl Pipeline {
163 pub fn new() -> Self {
165 Self { layers: Vec::new() }
166 }
167
168 pub fn add_layer(mut self, layer: Box<dyn Layer>) -> Self {
170 self.layers.push(layer);
171 self
172 }
173
174 pub fn add_layer_if_enabled(
176 self,
177 layer: Box<dyn Layer>,
178 cfg: &crate::core::profiles::PipelineConfig,
179 ) -> Self {
180 if is_layer_enabled(layer.kind(), cfg) {
181 self.add_layer(layer)
182 } else {
183 self
184 }
185 }
186
187 pub fn execute(&self, input: LayerInput) -> (LayerOutput, Vec<LayerMetrics>) {
189 let mut current = input;
190 let mut metrics = Vec::new();
191
192 for layer in &self.layers {
193 let start = std::time::Instant::now();
194 let input_tokens = current.tokens;
195 let output = layer.process(current);
196 let duration = start.elapsed().as_micros() as u64;
197
198 metrics.push(LayerMetrics::new(
199 layer.kind(),
200 input_tokens,
201 output.tokens,
202 duration,
203 ));
204
205 current = LayerInput {
206 content: output.content,
207 tokens: output.tokens,
208 metadata: output.metadata,
209 };
210 }
211
212 let final_output = LayerOutput {
213 content: current.content,
214 tokens: current.tokens,
215 metadata: current.metadata,
216 };
217
218 (final_output, metrics)
219 }
220
221 pub fn format_metrics(metrics: &[LayerMetrics]) -> String {
223 let mut out = String::from("Pipeline Metrics:\n");
224 let mut total_saved = 0usize;
225 for m in metrics {
226 let saved = m.input_tokens.saturating_sub(m.output_tokens);
227 total_saved += saved;
228 out.push_str(&format!(
229 " {} : {} -> {} tok ({:.0}%, {:.1}ms)\n",
230 m.layer,
231 m.input_tokens,
232 m.output_tokens,
233 m.compression_ratio * 100.0,
234 m.duration_us as f64 / 1000.0,
235 ));
236 }
237 if let (Some(first), Some(last)) = (metrics.first(), metrics.last()) {
238 let total_ratio = if first.input_tokens == 0 {
239 1.0
240 } else {
241 last.output_tokens as f64 / first.input_tokens as f64
242 };
243 out.push_str(&format!(
244 " TOTAL: {} -> {} tok ({:.0}%, saved {})\n",
245 first.input_tokens,
246 last.output_tokens,
247 total_ratio * 100.0,
248 total_saved,
249 ));
250 }
251 out
252 }
253}
254
255#[derive(Debug, Clone, Default, serde::Serialize, serde::Deserialize)]
257pub struct PipelineStats {
258 pub runs: usize,
259 pub per_layer: HashMap<LayerKind, AggregatedMetrics>,
260}
261
262#[derive(Debug, Clone, Default, serde::Serialize, serde::Deserialize)]
264pub struct AggregatedMetrics {
265 pub total_input_tokens: usize,
266 pub total_output_tokens: usize,
267 pub total_duration_us: u64,
268 pub count: usize,
269}
270
271impl AggregatedMetrics {
272 pub fn avg_ratio(&self) -> f64 {
274 if self.total_input_tokens == 0 {
275 return 1.0;
276 }
277 self.total_output_tokens as f64 / self.total_input_tokens as f64
278 }
279
280 pub fn avg_duration_ms(&self) -> f64 {
282 if self.count == 0 {
283 return 0.0;
284 }
285 self.total_duration_us as f64 / self.count as f64 / 1000.0
286 }
287}
288
289impl PipelineStats {
290 pub fn new() -> Self {
292 Self {
293 runs: 0,
294 per_layer: HashMap::new(),
295 }
296 }
297
298 pub fn record(&mut self, metrics: &[LayerMetrics]) {
300 self.runs += 1;
301 for m in metrics {
302 let agg = self.per_layer.entry(m.layer).or_default();
303 agg.total_input_tokens += m.input_tokens;
304 agg.total_output_tokens += m.output_tokens;
305 agg.total_duration_us += m.duration_us;
306 agg.count += 1;
307 }
308 }
309
310 pub fn record_single(
312 &mut self,
313 layer: LayerKind,
314 input_tokens: usize,
315 output_tokens: usize,
316 duration: std::time::Duration,
317 ) {
318 self.runs += 1;
319 let agg = self.per_layer.entry(layer).or_default();
320 agg.total_input_tokens += input_tokens;
321 agg.total_output_tokens += output_tokens;
322 agg.total_duration_us += duration.as_micros() as u64;
323 agg.count += 1;
324 }
325
326 pub fn total_tokens_saved(&self) -> usize {
328 self.per_layer
329 .values()
330 .map(|a| a.total_input_tokens.saturating_sub(a.total_output_tokens))
331 .sum()
332 }
333
334 pub fn save(&self) {
336 if let Ok(dir) = crate::core::data_dir::lean_ctx_data_dir() {
337 let path = dir.join("pipeline_stats.json");
338 if let Ok(json) = serde_json::to_string(self) {
339 let _ = std::fs::write(path, json);
340 }
341 }
342 }
343
344 pub fn load() -> Self {
346 crate::core::data_dir::lean_ctx_data_dir()
347 .ok()
348 .map(|d| d.join("pipeline_stats.json"))
349 .and_then(|p| std::fs::read_to_string(p).ok())
350 .and_then(|s| serde_json::from_str(&s).ok())
351 .unwrap_or_default()
352 }
353
354 pub fn format_summary(&self) -> String {
356 let mut out = format!("Pipeline Stats ({} runs):\n", self.runs);
357 for kind in LayerKind::all() {
358 if let Some(agg) = self.per_layer.get(kind) {
359 out.push_str(&format!(
360 " {}: avg {:.0}% ratio, {:.1}ms, {} invocations\n",
361 kind,
362 agg.avg_ratio() * 100.0,
363 agg.avg_duration_ms(),
364 agg.count,
365 ));
366 }
367 }
368 out.push_str(&format!(" SAVED: {} tokens\n", self.total_tokens_saved()));
369 out
370 }
371}
372
373impl Default for Pipeline {
374 fn default() -> Self {
375 Self::new()
376 }
377}
378
379#[cfg(test)]
380mod tests {
381 use super::*;
382
383 struct PassthroughLayer {
384 kind: LayerKind,
385 }
386
387 impl Layer for PassthroughLayer {
388 fn kind(&self) -> LayerKind {
389 self.kind
390 }
391
392 fn process(&self, input: LayerInput) -> LayerOutput {
393 LayerOutput {
394 content: input.content,
395 tokens: input.tokens,
396 metadata: input.metadata,
397 }
398 }
399 }
400
401 struct CompressionLayer {
402 ratio: f64,
403 }
404
405 impl Layer for CompressionLayer {
406 fn kind(&self) -> LayerKind {
407 LayerKind::Compression
408 }
409
410 fn process(&self, input: LayerInput) -> LayerOutput {
411 let new_tokens = (input.tokens as f64 * self.ratio) as usize;
412 let truncated = if input.content.len() > new_tokens * 4 {
413 input.content[..new_tokens * 4].to_string()
414 } else {
415 input.content
416 };
417 LayerOutput {
418 content: truncated,
419 tokens: new_tokens,
420 metadata: input.metadata,
421 }
422 }
423 }
424
425 #[test]
426 fn layer_kind_all_ordered() {
427 let all = LayerKind::all();
428 assert_eq!(all.len(), 7);
429 assert_eq!(all[0], LayerKind::Input);
430 assert_eq!(all[1], LayerKind::Autonomy);
431 assert_eq!(all[6], LayerKind::Delivery);
432 }
433
434 #[test]
435 fn passthrough_preserves_content() {
436 let layer = PassthroughLayer {
437 kind: LayerKind::Input,
438 };
439 let input = LayerInput {
440 content: "hello world".to_string(),
441 tokens: 2,
442 metadata: HashMap::new(),
443 };
444 let output = layer.process(input);
445 assert_eq!(output.content, "hello world");
446 assert_eq!(output.tokens, 2);
447 }
448
449 #[test]
450 fn compression_layer_reduces() {
451 let layer = CompressionLayer { ratio: 0.5 };
452 let input = LayerInput {
453 content: "a ".repeat(100),
454 tokens: 100,
455 metadata: HashMap::new(),
456 };
457 let output = layer.process(input);
458 assert_eq!(output.tokens, 50);
459 }
460
461 #[test]
462 fn pipeline_chains_layers() {
463 let pipeline = Pipeline::new()
464 .add_layer(Box::new(PassthroughLayer {
465 kind: LayerKind::Input,
466 }))
467 .add_layer(Box::new(CompressionLayer { ratio: 0.5 }))
468 .add_layer(Box::new(PassthroughLayer {
469 kind: LayerKind::Delivery,
470 }));
471
472 let input = LayerInput {
473 content: "a ".repeat(100),
474 tokens: 100,
475 metadata: HashMap::new(),
476 };
477 let (output, metrics) = pipeline.execute(input);
478 assert_eq!(output.tokens, 50);
479 assert_eq!(metrics.len(), 3);
480 assert_eq!(metrics[0].layer, LayerKind::Input);
481 assert_eq!(metrics[1].layer, LayerKind::Compression);
482 assert_eq!(metrics[2].layer, LayerKind::Delivery);
483 }
484
485 #[test]
486 fn metrics_new_calculates_ratio() {
487 let m = LayerMetrics::new(LayerKind::Compression, 100, 50, 1000);
488 assert!((m.compression_ratio - 0.5).abs() < f64::EPSILON);
489 }
490
491 #[test]
492 fn metrics_format_readable() {
493 let metrics = vec![
494 LayerMetrics::new(LayerKind::Input, 1000, 1000, 100),
495 LayerMetrics::new(LayerKind::Compression, 1000, 300, 5000),
496 LayerMetrics::new(LayerKind::Delivery, 300, 300, 50),
497 ];
498 let formatted = Pipeline::format_metrics(&metrics);
499 assert!(formatted.contains("input"));
500 assert!(formatted.contains("compression"));
501 assert!(formatted.contains("delivery"));
502 assert!(formatted.contains("TOTAL"));
503 }
504
505 #[test]
506 fn empty_pipeline_passes_through() {
507 let pipeline = Pipeline::new();
508 let input = LayerInput {
509 content: "test".to_string(),
510 tokens: 1,
511 metadata: HashMap::new(),
512 };
513 let (output, metrics) = pipeline.execute(input);
514 assert_eq!(output.content, "test");
515 assert!(metrics.is_empty());
516 }
517
518 #[test]
519 fn pipeline_stats_record_and_summarize() {
520 let mut stats = PipelineStats::default();
521 let metrics = vec![
522 LayerMetrics::new(LayerKind::Input, 1000, 1000, 100),
523 LayerMetrics::new(LayerKind::Compression, 1000, 300, 5000),
524 LayerMetrics::new(LayerKind::Delivery, 300, 300, 50),
525 ];
526 stats.record(&metrics);
527 stats.record(&metrics);
528
529 assert_eq!(stats.runs, 2);
530 assert_eq!(stats.total_tokens_saved(), 1400);
531
532 let agg = stats.per_layer.get(&LayerKind::Compression).unwrap();
533 assert_eq!(agg.count, 2);
534 assert_eq!(agg.total_input_tokens, 2000);
535 assert_eq!(agg.total_output_tokens, 600);
536
537 let summary = stats.format_summary();
538 assert!(summary.contains("2 runs"));
539 assert!(summary.contains("SAVED: 1400"));
540 }
541
542 #[test]
543 fn aggregated_metrics_avg() {
544 let agg = AggregatedMetrics {
545 total_input_tokens: 1000,
546 total_output_tokens: 500,
547 total_duration_us: 10000,
548 count: 2,
549 };
550 assert!((agg.avg_ratio() - 0.5).abs() < f64::EPSILON);
551 assert!((agg.avg_duration_ms() - 5.0).abs() < f64::EPSILON);
552 }
553
554 #[test]
555 fn layer_kind_from_str_valid() {
556 assert_eq!("input".parse::<LayerKind>().unwrap(), LayerKind::Input);
557 assert_eq!("Intent".parse::<LayerKind>().unwrap(), LayerKind::Intent);
558 assert_eq!(
559 "COMPRESSION".parse::<LayerKind>().unwrap(),
560 LayerKind::Compression
561 );
562 assert_eq!(
563 "delivery".parse::<LayerKind>().unwrap(),
564 LayerKind::Delivery
565 );
566 }
567
568 #[test]
569 fn layer_kind_from_str_invalid() {
570 let err = "unknown".parse::<LayerKind>().unwrap_err();
571 assert!(err.contains("unknown pipeline layer"));
572 assert!(err.contains("input, autonomy, intent"));
573 }
574
575 #[test]
576 fn layer_kind_roundtrip_str() {
577 for kind in LayerKind::all() {
578 let s = kind.as_str();
579 let parsed: LayerKind = s.parse().unwrap();
580 assert_eq!(*kind, parsed);
581 }
582 }
583
584 #[test]
585 fn pipeline_stats_record_single() {
586 let mut stats = PipelineStats::new();
587 stats.record_single(
588 LayerKind::Compression,
589 1000,
590 300,
591 std::time::Duration::from_millis(5),
592 );
593 assert_eq!(stats.runs, 1);
594 let agg = stats.per_layer.get(&LayerKind::Compression).unwrap();
595 assert_eq!(agg.total_input_tokens, 1000);
596 assert_eq!(agg.total_output_tokens, 300);
597 assert_eq!(agg.count, 1);
598 }
599
600 #[test]
601 fn pipeline_full_flow_integration() {
602 let pipeline = Pipeline::new()
603 .add_layer(Box::new(PassthroughLayer {
604 kind: LayerKind::Input,
605 }))
606 .add_layer(Box::new(PassthroughLayer {
607 kind: LayerKind::Autonomy,
608 }))
609 .add_layer(Box::new(PassthroughLayer {
610 kind: LayerKind::Intent,
611 }))
612 .add_layer(Box::new(PassthroughLayer {
613 kind: LayerKind::Relevance,
614 }))
615 .add_layer(Box::new(CompressionLayer { ratio: 0.3 }))
616 .add_layer(Box::new(PassthroughLayer {
617 kind: LayerKind::Translation,
618 }))
619 .add_layer(Box::new(PassthroughLayer {
620 kind: LayerKind::Delivery,
621 }));
622
623 let input = LayerInput {
624 content: "x ".repeat(500),
625 tokens: 500,
626 metadata: HashMap::new(),
627 };
628 let (output, metrics) = pipeline.execute(input);
629
630 assert_eq!(metrics.len(), 7, "all layers should produce metrics");
631 assert_eq!(output.tokens, 150, "compression at 0.3 ratio");
632
633 for (i, kind) in LayerKind::all().iter().enumerate() {
634 assert_eq!(metrics[i].layer, *kind, "layer order must match");
635 }
636
637 let mut stats = PipelineStats::new();
638 stats.record(&metrics);
639 assert_eq!(stats.runs, 1);
640 assert_eq!(stats.total_tokens_saved(), 350);
641
642 let formatted = Pipeline::format_metrics(&metrics);
643 assert!(formatted.contains("TOTAL"));
644 assert!(formatted.contains("500"));
645 }
646
647 #[test]
648 fn is_layer_enabled_respects_config() {
649 let cfg = crate::core::profiles::PipelineConfig {
650 intent: Some(false),
651 relevance: Some(false),
652 compression: Some(true),
653 translation: Some(true),
654 };
655
656 assert!(is_layer_enabled(LayerKind::Input, &cfg));
657 assert!(!is_layer_enabled(LayerKind::Intent, &cfg));
658 assert!(!is_layer_enabled(LayerKind::Relevance, &cfg));
659 assert!(is_layer_enabled(LayerKind::Compression, &cfg));
660 assert!(is_layer_enabled(LayerKind::Translation, &cfg));
661 assert!(is_layer_enabled(LayerKind::Delivery, &cfg));
662 }
663
664 #[test]
665 fn add_layer_if_enabled_skips_disabled() {
666 let cfg = crate::core::profiles::PipelineConfig {
667 intent: Some(false),
668 relevance: Some(true),
669 compression: Some(true),
670 translation: Some(true),
671 };
672
673 let pipeline = Pipeline::new()
674 .add_layer_if_enabled(
675 Box::new(PassthroughLayer {
676 kind: LayerKind::Input,
677 }),
678 &cfg,
679 )
680 .add_layer_if_enabled(
681 Box::new(PassthroughLayer {
682 kind: LayerKind::Intent,
683 }),
684 &cfg,
685 )
686 .add_layer_if_enabled(Box::new(CompressionLayer { ratio: 0.5 }), &cfg)
687 .add_layer_if_enabled(
688 Box::new(PassthroughLayer {
689 kind: LayerKind::Delivery,
690 }),
691 &cfg,
692 );
693
694 let input = LayerInput {
695 content: "x ".repeat(100),
696 tokens: 100,
697 metadata: HashMap::new(),
698 };
699 let (output, metrics) = pipeline.execute(input);
700
701 assert_eq!(
702 metrics.len(),
703 3,
704 "Intent layer should be skipped, leaving Input + Compression + Delivery"
705 );
706 assert_eq!(metrics[0].layer, LayerKind::Input);
707 assert_eq!(metrics[1].layer, LayerKind::Compression);
708 assert_eq!(metrics[2].layer, LayerKind::Delivery);
709 assert_eq!(output.tokens, 50);
710 }
711}