1use std::collections::HashMap;
23
24#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
29pub enum LayerKind {
30 Input,
31 Intent,
32 Relevance,
33 Compression,
34 Translation,
35 Delivery,
36}
37
38impl LayerKind {
39 pub fn as_str(&self) -> &'static str {
41 match self {
42 Self::Input => "input",
43 Self::Intent => "intent",
44 Self::Relevance => "relevance",
45 Self::Compression => "compression",
46 Self::Translation => "translation",
47 Self::Delivery => "delivery",
48 }
49 }
50
51 pub fn all() -> &'static [LayerKind] {
53 &[
54 Self::Input,
55 Self::Intent,
56 Self::Relevance,
57 Self::Compression,
58 Self::Translation,
59 Self::Delivery,
60 ]
61 }
62}
63
64impl std::fmt::Display for LayerKind {
65 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
66 write!(f, "{}", self.as_str())
67 }
68}
69
70impl std::str::FromStr for LayerKind {
71 type Err = String;
72
73 fn from_str(s: &str) -> Result<Self, Self::Err> {
74 match s.to_ascii_lowercase().as_str() {
75 "input" => Ok(Self::Input),
76 "intent" => Ok(Self::Intent),
77 "relevance" => Ok(Self::Relevance),
78 "compression" => Ok(Self::Compression),
79 "translation" => Ok(Self::Translation),
80 "delivery" => Ok(Self::Delivery),
81 _ => Err(format!(
82 "unknown pipeline layer '{s}'; expected one of: input, intent, relevance, compression, translation, delivery"
83 )),
84 }
85 }
86}
87
88#[derive(Debug, Clone)]
90pub struct LayerInput {
91 pub content: String,
92 pub tokens: usize,
93 pub metadata: HashMap<String, String>,
94}
95
96#[derive(Debug, Clone)]
98pub struct LayerOutput {
99 pub content: String,
100 pub tokens: usize,
101 pub metadata: HashMap<String, String>,
102}
103
104#[derive(Debug, Clone)]
106pub struct LayerMetrics {
107 pub layer: LayerKind,
108 pub input_tokens: usize,
109 pub output_tokens: usize,
110 pub duration_us: u64,
111 pub compression_ratio: f64,
112}
113
114impl LayerMetrics {
115 pub fn new(
116 layer: LayerKind,
117 input_tokens: usize,
118 output_tokens: usize,
119 duration_us: u64,
120 ) -> Self {
121 let ratio = if input_tokens == 0 {
122 1.0
123 } else {
124 output_tokens as f64 / input_tokens as f64
125 };
126 Self {
127 layer,
128 input_tokens,
129 output_tokens,
130 duration_us,
131 compression_ratio: ratio,
132 }
133 }
134}
135
136pub trait Layer {
138 fn kind(&self) -> LayerKind;
139 fn process(&self, input: LayerInput) -> LayerOutput;
140}
141
142pub fn is_layer_enabled(kind: LayerKind, cfg: &crate::core::profiles::PipelineConfig) -> bool {
144 match kind {
145 LayerKind::Input | LayerKind::Delivery => true,
146 LayerKind::Intent => cfg.intent,
147 LayerKind::Relevance => cfg.relevance,
148 LayerKind::Compression => cfg.compression,
149 LayerKind::Translation => cfg.translation,
150 }
151}
152
153pub struct Pipeline {
155 layers: Vec<Box<dyn Layer>>,
156}
157
158impl Pipeline {
159 pub fn new() -> Self {
161 Self { layers: Vec::new() }
162 }
163
164 pub fn add_layer(mut self, layer: Box<dyn Layer>) -> Self {
166 self.layers.push(layer);
167 self
168 }
169
170 pub fn add_layer_if_enabled(
172 self,
173 layer: Box<dyn Layer>,
174 cfg: &crate::core::profiles::PipelineConfig,
175 ) -> Self {
176 if is_layer_enabled(layer.kind(), cfg) {
177 self.add_layer(layer)
178 } else {
179 self
180 }
181 }
182
183 pub fn execute(&self, input: LayerInput) -> (LayerOutput, Vec<LayerMetrics>) {
185 let mut current = input;
186 let mut metrics = Vec::new();
187
188 for layer in &self.layers {
189 let start = std::time::Instant::now();
190 let input_tokens = current.tokens;
191 let output = layer.process(current);
192 let duration = start.elapsed().as_micros() as u64;
193
194 metrics.push(LayerMetrics::new(
195 layer.kind(),
196 input_tokens,
197 output.tokens,
198 duration,
199 ));
200
201 current = LayerInput {
202 content: output.content,
203 tokens: output.tokens,
204 metadata: output.metadata,
205 };
206 }
207
208 let final_output = LayerOutput {
209 content: current.content,
210 tokens: current.tokens,
211 metadata: current.metadata,
212 };
213
214 (final_output, metrics)
215 }
216
217 pub fn format_metrics(metrics: &[LayerMetrics]) -> String {
219 let mut out = String::from("Pipeline Metrics:\n");
220 let mut total_saved = 0usize;
221 for m in metrics {
222 let saved = m.input_tokens.saturating_sub(m.output_tokens);
223 total_saved += saved;
224 out.push_str(&format!(
225 " {} : {} -> {} tok ({:.0}%, {:.1}ms)\n",
226 m.layer,
227 m.input_tokens,
228 m.output_tokens,
229 m.compression_ratio * 100.0,
230 m.duration_us as f64 / 1000.0,
231 ));
232 }
233 if let (Some(first), Some(last)) = (metrics.first(), metrics.last()) {
234 let total_ratio = if first.input_tokens == 0 {
235 1.0
236 } else {
237 last.output_tokens as f64 / first.input_tokens as f64
238 };
239 out.push_str(&format!(
240 " TOTAL: {} -> {} tok ({:.0}%, saved {})\n",
241 first.input_tokens,
242 last.output_tokens,
243 total_ratio * 100.0,
244 total_saved,
245 ));
246 }
247 out
248 }
249}
250
251#[derive(Debug, Clone, Default, serde::Serialize, serde::Deserialize)]
253pub struct PipelineStats {
254 pub runs: usize,
255 pub per_layer: HashMap<LayerKind, AggregatedMetrics>,
256}
257
258#[derive(Debug, Clone, Default, serde::Serialize, serde::Deserialize)]
260pub struct AggregatedMetrics {
261 pub total_input_tokens: usize,
262 pub total_output_tokens: usize,
263 pub total_duration_us: u64,
264 pub count: usize,
265}
266
267impl AggregatedMetrics {
268 pub fn avg_ratio(&self) -> f64 {
270 if self.total_input_tokens == 0 {
271 return 1.0;
272 }
273 self.total_output_tokens as f64 / self.total_input_tokens as f64
274 }
275
276 pub fn avg_duration_ms(&self) -> f64 {
278 if self.count == 0 {
279 return 0.0;
280 }
281 self.total_duration_us as f64 / self.count as f64 / 1000.0
282 }
283}
284
285impl PipelineStats {
286 pub fn new() -> Self {
288 Self {
289 runs: 0,
290 per_layer: HashMap::new(),
291 }
292 }
293
294 pub fn record(&mut self, metrics: &[LayerMetrics]) {
296 self.runs += 1;
297 for m in metrics {
298 let agg = self.per_layer.entry(m.layer).or_default();
299 agg.total_input_tokens += m.input_tokens;
300 agg.total_output_tokens += m.output_tokens;
301 agg.total_duration_us += m.duration_us;
302 agg.count += 1;
303 }
304 }
305
306 pub fn record_single(
308 &mut self,
309 layer: LayerKind,
310 input_tokens: usize,
311 output_tokens: usize,
312 duration: std::time::Duration,
313 ) {
314 self.runs += 1;
315 let agg = self.per_layer.entry(layer).or_default();
316 agg.total_input_tokens += input_tokens;
317 agg.total_output_tokens += output_tokens;
318 agg.total_duration_us += duration.as_micros() as u64;
319 agg.count += 1;
320 }
321
322 pub fn total_tokens_saved(&self) -> usize {
324 self.per_layer
325 .values()
326 .map(|a| a.total_input_tokens.saturating_sub(a.total_output_tokens))
327 .sum()
328 }
329
330 pub fn save(&self) {
332 if let Ok(dir) = crate::core::data_dir::lean_ctx_data_dir() {
333 let path = dir.join("pipeline_stats.json");
334 if let Ok(json) = serde_json::to_string(self) {
335 let _ = std::fs::write(path, json);
336 }
337 }
338 }
339
340 pub fn load() -> Self {
342 crate::core::data_dir::lean_ctx_data_dir()
343 .ok()
344 .map(|d| d.join("pipeline_stats.json"))
345 .and_then(|p| std::fs::read_to_string(p).ok())
346 .and_then(|s| serde_json::from_str(&s).ok())
347 .unwrap_or_default()
348 }
349
350 pub fn format_summary(&self) -> String {
352 let mut out = format!("Pipeline Stats ({} runs):\n", self.runs);
353 for kind in LayerKind::all() {
354 if let Some(agg) = self.per_layer.get(kind) {
355 out.push_str(&format!(
356 " {}: avg {:.0}% ratio, {:.1}ms, {} invocations\n",
357 kind,
358 agg.avg_ratio() * 100.0,
359 agg.avg_duration_ms(),
360 agg.count,
361 ));
362 }
363 }
364 out.push_str(&format!(" SAVED: {} tokens\n", self.total_tokens_saved()));
365 out
366 }
367}
368
369impl Default for Pipeline {
370 fn default() -> Self {
371 Self::new()
372 }
373}
374
375#[cfg(test)]
376mod tests {
377 use super::*;
378
379 struct PassthroughLayer {
380 kind: LayerKind,
381 }
382
383 impl Layer for PassthroughLayer {
384 fn kind(&self) -> LayerKind {
385 self.kind
386 }
387
388 fn process(&self, input: LayerInput) -> LayerOutput {
389 LayerOutput {
390 content: input.content,
391 tokens: input.tokens,
392 metadata: input.metadata,
393 }
394 }
395 }
396
397 struct CompressionLayer {
398 ratio: f64,
399 }
400
401 impl Layer for CompressionLayer {
402 fn kind(&self) -> LayerKind {
403 LayerKind::Compression
404 }
405
406 fn process(&self, input: LayerInput) -> LayerOutput {
407 let new_tokens = (input.tokens as f64 * self.ratio) as usize;
408 let truncated = if input.content.len() > new_tokens * 4 {
409 input.content[..new_tokens * 4].to_string()
410 } else {
411 input.content
412 };
413 LayerOutput {
414 content: truncated,
415 tokens: new_tokens,
416 metadata: input.metadata,
417 }
418 }
419 }
420
421 #[test]
422 fn layer_kind_all_ordered() {
423 let all = LayerKind::all();
424 assert_eq!(all.len(), 6);
425 assert_eq!(all[0], LayerKind::Input);
426 assert_eq!(all[5], LayerKind::Delivery);
427 }
428
429 #[test]
430 fn passthrough_preserves_content() {
431 let layer = PassthroughLayer {
432 kind: LayerKind::Input,
433 };
434 let input = LayerInput {
435 content: "hello world".to_string(),
436 tokens: 2,
437 metadata: HashMap::new(),
438 };
439 let output = layer.process(input);
440 assert_eq!(output.content, "hello world");
441 assert_eq!(output.tokens, 2);
442 }
443
444 #[test]
445 fn compression_layer_reduces() {
446 let layer = CompressionLayer { ratio: 0.5 };
447 let input = LayerInput {
448 content: "a ".repeat(100),
449 tokens: 100,
450 metadata: HashMap::new(),
451 };
452 let output = layer.process(input);
453 assert_eq!(output.tokens, 50);
454 }
455
456 #[test]
457 fn pipeline_chains_layers() {
458 let pipeline = Pipeline::new()
459 .add_layer(Box::new(PassthroughLayer {
460 kind: LayerKind::Input,
461 }))
462 .add_layer(Box::new(CompressionLayer { ratio: 0.5 }))
463 .add_layer(Box::new(PassthroughLayer {
464 kind: LayerKind::Delivery,
465 }));
466
467 let input = LayerInput {
468 content: "a ".repeat(100),
469 tokens: 100,
470 metadata: HashMap::new(),
471 };
472 let (output, metrics) = pipeline.execute(input);
473 assert_eq!(output.tokens, 50);
474 assert_eq!(metrics.len(), 3);
475 assert_eq!(metrics[0].layer, LayerKind::Input);
476 assert_eq!(metrics[1].layer, LayerKind::Compression);
477 assert_eq!(metrics[2].layer, LayerKind::Delivery);
478 }
479
480 #[test]
481 fn metrics_new_calculates_ratio() {
482 let m = LayerMetrics::new(LayerKind::Compression, 100, 50, 1000);
483 assert!((m.compression_ratio - 0.5).abs() < f64::EPSILON);
484 }
485
486 #[test]
487 fn metrics_format_readable() {
488 let metrics = vec![
489 LayerMetrics::new(LayerKind::Input, 1000, 1000, 100),
490 LayerMetrics::new(LayerKind::Compression, 1000, 300, 5000),
491 LayerMetrics::new(LayerKind::Delivery, 300, 300, 50),
492 ];
493 let formatted = Pipeline::format_metrics(&metrics);
494 assert!(formatted.contains("input"));
495 assert!(formatted.contains("compression"));
496 assert!(formatted.contains("delivery"));
497 assert!(formatted.contains("TOTAL"));
498 }
499
500 #[test]
501 fn empty_pipeline_passes_through() {
502 let pipeline = Pipeline::new();
503 let input = LayerInput {
504 content: "test".to_string(),
505 tokens: 1,
506 metadata: HashMap::new(),
507 };
508 let (output, metrics) = pipeline.execute(input);
509 assert_eq!(output.content, "test");
510 assert!(metrics.is_empty());
511 }
512
513 #[test]
514 fn pipeline_stats_record_and_summarize() {
515 let mut stats = PipelineStats::default();
516 let metrics = vec![
517 LayerMetrics::new(LayerKind::Input, 1000, 1000, 100),
518 LayerMetrics::new(LayerKind::Compression, 1000, 300, 5000),
519 LayerMetrics::new(LayerKind::Delivery, 300, 300, 50),
520 ];
521 stats.record(&metrics);
522 stats.record(&metrics);
523
524 assert_eq!(stats.runs, 2);
525 assert_eq!(stats.total_tokens_saved(), 1400);
526
527 let agg = stats.per_layer.get(&LayerKind::Compression).unwrap();
528 assert_eq!(agg.count, 2);
529 assert_eq!(agg.total_input_tokens, 2000);
530 assert_eq!(agg.total_output_tokens, 600);
531
532 let summary = stats.format_summary();
533 assert!(summary.contains("2 runs"));
534 assert!(summary.contains("SAVED: 1400"));
535 }
536
537 #[test]
538 fn aggregated_metrics_avg() {
539 let agg = AggregatedMetrics {
540 total_input_tokens: 1000,
541 total_output_tokens: 500,
542 total_duration_us: 10000,
543 count: 2,
544 };
545 assert!((agg.avg_ratio() - 0.5).abs() < f64::EPSILON);
546 assert!((agg.avg_duration_ms() - 5.0).abs() < f64::EPSILON);
547 }
548
549 #[test]
550 fn layer_kind_from_str_valid() {
551 assert_eq!("input".parse::<LayerKind>().unwrap(), LayerKind::Input);
552 assert_eq!("Intent".parse::<LayerKind>().unwrap(), LayerKind::Intent);
553 assert_eq!(
554 "COMPRESSION".parse::<LayerKind>().unwrap(),
555 LayerKind::Compression
556 );
557 assert_eq!(
558 "delivery".parse::<LayerKind>().unwrap(),
559 LayerKind::Delivery
560 );
561 }
562
563 #[test]
564 fn layer_kind_from_str_invalid() {
565 let err = "unknown".parse::<LayerKind>().unwrap_err();
566 assert!(err.contains("unknown pipeline layer"));
567 assert!(err.contains("input, intent, relevance"));
568 }
569
570 #[test]
571 fn layer_kind_roundtrip_str() {
572 for kind in LayerKind::all() {
573 let s = kind.as_str();
574 let parsed: LayerKind = s.parse().unwrap();
575 assert_eq!(*kind, parsed);
576 }
577 }
578
579 #[test]
580 fn pipeline_stats_record_single() {
581 let mut stats = PipelineStats::new();
582 stats.record_single(
583 LayerKind::Compression,
584 1000,
585 300,
586 std::time::Duration::from_millis(5),
587 );
588 assert_eq!(stats.runs, 1);
589 let agg = stats.per_layer.get(&LayerKind::Compression).unwrap();
590 assert_eq!(agg.total_input_tokens, 1000);
591 assert_eq!(agg.total_output_tokens, 300);
592 assert_eq!(agg.count, 1);
593 }
594
595 #[test]
596 fn pipeline_full_flow_integration() {
597 let pipeline = Pipeline::new()
598 .add_layer(Box::new(PassthroughLayer {
599 kind: LayerKind::Input,
600 }))
601 .add_layer(Box::new(PassthroughLayer {
602 kind: LayerKind::Intent,
603 }))
604 .add_layer(Box::new(PassthroughLayer {
605 kind: LayerKind::Relevance,
606 }))
607 .add_layer(Box::new(CompressionLayer { ratio: 0.3 }))
608 .add_layer(Box::new(PassthroughLayer {
609 kind: LayerKind::Translation,
610 }))
611 .add_layer(Box::new(PassthroughLayer {
612 kind: LayerKind::Delivery,
613 }));
614
615 let input = LayerInput {
616 content: "x ".repeat(500),
617 tokens: 500,
618 metadata: HashMap::new(),
619 };
620 let (output, metrics) = pipeline.execute(input);
621
622 assert_eq!(metrics.len(), 6, "all 6 layers should produce metrics");
623 assert_eq!(output.tokens, 150, "compression at 0.3 ratio");
624
625 for (i, kind) in LayerKind::all().iter().enumerate() {
626 assert_eq!(metrics[i].layer, *kind, "layer order must match");
627 }
628
629 let mut stats = PipelineStats::new();
630 stats.record(&metrics);
631 assert_eq!(stats.runs, 1);
632 assert_eq!(stats.total_tokens_saved(), 350);
633
634 let formatted = Pipeline::format_metrics(&metrics);
635 assert!(formatted.contains("TOTAL"));
636 assert!(formatted.contains("500"));
637 }
638
639 #[test]
640 fn is_layer_enabled_respects_config() {
641 let cfg = crate::core::profiles::PipelineConfig {
642 intent: false,
643 relevance: false,
644 compression: true,
645 translation: true,
646 };
647
648 assert!(is_layer_enabled(LayerKind::Input, &cfg));
649 assert!(!is_layer_enabled(LayerKind::Intent, &cfg));
650 assert!(!is_layer_enabled(LayerKind::Relevance, &cfg));
651 assert!(is_layer_enabled(LayerKind::Compression, &cfg));
652 assert!(is_layer_enabled(LayerKind::Translation, &cfg));
653 assert!(is_layer_enabled(LayerKind::Delivery, &cfg));
654 }
655
656 #[test]
657 fn add_layer_if_enabled_skips_disabled() {
658 let cfg = crate::core::profiles::PipelineConfig {
659 intent: false,
660 relevance: true,
661 compression: true,
662 translation: true,
663 };
664
665 let pipeline = Pipeline::new()
666 .add_layer_if_enabled(
667 Box::new(PassthroughLayer {
668 kind: LayerKind::Input,
669 }),
670 &cfg,
671 )
672 .add_layer_if_enabled(
673 Box::new(PassthroughLayer {
674 kind: LayerKind::Intent,
675 }),
676 &cfg,
677 )
678 .add_layer_if_enabled(Box::new(CompressionLayer { ratio: 0.5 }), &cfg)
679 .add_layer_if_enabled(
680 Box::new(PassthroughLayer {
681 kind: LayerKind::Delivery,
682 }),
683 &cfg,
684 );
685
686 let input = LayerInput {
687 content: "x ".repeat(100),
688 tokens: 100,
689 metadata: HashMap::new(),
690 };
691 let (output, metrics) = pipeline.execute(input);
692
693 assert_eq!(
694 metrics.len(),
695 3,
696 "Intent layer should be skipped, leaving Input + Compression + Delivery"
697 );
698 assert_eq!(metrics[0].layer, LayerKind::Input);
699 assert_eq!(metrics[1].layer, LayerKind::Compression);
700 assert_eq!(metrics[2].layer, LayerKind::Delivery);
701 assert_eq!(output.tokens, 50);
702 }
703}