1use std::collections::HashMap;
2
3#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
5pub enum LayerKind {
6 Input,
7 Intent,
8 Relevance,
9 Compression,
10 Translation,
11 Delivery,
12}
13
14impl LayerKind {
15 pub fn as_str(&self) -> &'static str {
17 match self {
18 Self::Input => "input",
19 Self::Intent => "intent",
20 Self::Relevance => "relevance",
21 Self::Compression => "compression",
22 Self::Translation => "translation",
23 Self::Delivery => "delivery",
24 }
25 }
26
27 pub fn all() -> &'static [LayerKind] {
29 &[
30 Self::Input,
31 Self::Intent,
32 Self::Relevance,
33 Self::Compression,
34 Self::Translation,
35 Self::Delivery,
36 ]
37 }
38}
39
40impl std::fmt::Display for LayerKind {
41 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
42 write!(f, "{}", self.as_str())
43 }
44}
45
46#[derive(Debug, Clone)]
48pub struct LayerInput {
49 pub content: String,
50 pub tokens: usize,
51 pub metadata: HashMap<String, String>,
52}
53
54#[derive(Debug, Clone)]
56pub struct LayerOutput {
57 pub content: String,
58 pub tokens: usize,
59 pub metadata: HashMap<String, String>,
60}
61
62#[derive(Debug, Clone)]
64pub struct LayerMetrics {
65 pub layer: LayerKind,
66 pub input_tokens: usize,
67 pub output_tokens: usize,
68 pub duration_us: u64,
69 pub compression_ratio: f64,
70}
71
72impl LayerMetrics {
73 pub fn new(
74 layer: LayerKind,
75 input_tokens: usize,
76 output_tokens: usize,
77 duration_us: u64,
78 ) -> Self {
79 let ratio = if input_tokens == 0 {
80 1.0
81 } else {
82 output_tokens as f64 / input_tokens as f64
83 };
84 Self {
85 layer,
86 input_tokens,
87 output_tokens,
88 duration_us,
89 compression_ratio: ratio,
90 }
91 }
92}
93
94pub trait Layer {
96 fn kind(&self) -> LayerKind;
97 fn process(&self, input: LayerInput) -> LayerOutput;
98}
99
100pub struct Pipeline {
102 layers: Vec<Box<dyn Layer>>,
103}
104
105impl Pipeline {
106 pub fn new() -> Self {
108 Self { layers: Vec::new() }
109 }
110
111 pub fn add_layer(mut self, layer: Box<dyn Layer>) -> Self {
113 self.layers.push(layer);
114 self
115 }
116
117 pub fn execute(&self, input: LayerInput) -> (LayerOutput, Vec<LayerMetrics>) {
119 let mut current = input;
120 let mut metrics = Vec::new();
121
122 for layer in &self.layers {
123 let start = std::time::Instant::now();
124 let input_tokens = current.tokens;
125 let output = layer.process(current);
126 let duration = start.elapsed().as_micros() as u64;
127
128 metrics.push(LayerMetrics::new(
129 layer.kind(),
130 input_tokens,
131 output.tokens,
132 duration,
133 ));
134
135 current = LayerInput {
136 content: output.content,
137 tokens: output.tokens,
138 metadata: output.metadata,
139 };
140 }
141
142 let final_output = LayerOutput {
143 content: current.content,
144 tokens: current.tokens,
145 metadata: current.metadata,
146 };
147
148 (final_output, metrics)
149 }
150
151 pub fn format_metrics(metrics: &[LayerMetrics]) -> String {
153 let mut out = String::from("Pipeline Metrics:\n");
154 let mut total_saved = 0usize;
155 for m in metrics {
156 let saved = m.input_tokens.saturating_sub(m.output_tokens);
157 total_saved += saved;
158 out.push_str(&format!(
159 " {} : {} -> {} tok ({:.0}%, {:.1}ms)\n",
160 m.layer,
161 m.input_tokens,
162 m.output_tokens,
163 m.compression_ratio * 100.0,
164 m.duration_us as f64 / 1000.0,
165 ));
166 }
167 if let (Some(first), Some(last)) = (metrics.first(), metrics.last()) {
168 let total_ratio = if first.input_tokens == 0 {
169 1.0
170 } else {
171 last.output_tokens as f64 / first.input_tokens as f64
172 };
173 out.push_str(&format!(
174 " TOTAL: {} -> {} tok ({:.0}%, saved {})\n",
175 first.input_tokens,
176 last.output_tokens,
177 total_ratio * 100.0,
178 total_saved,
179 ));
180 }
181 out
182 }
183}
184
185#[derive(Debug, Clone, Default, serde::Serialize, serde::Deserialize)]
187pub struct PipelineStats {
188 pub runs: usize,
189 pub per_layer: HashMap<LayerKind, AggregatedMetrics>,
190}
191
192#[derive(Debug, Clone, Default, serde::Serialize, serde::Deserialize)]
194pub struct AggregatedMetrics {
195 pub total_input_tokens: usize,
196 pub total_output_tokens: usize,
197 pub total_duration_us: u64,
198 pub count: usize,
199}
200
201impl AggregatedMetrics {
202 pub fn avg_ratio(&self) -> f64 {
204 if self.total_input_tokens == 0 {
205 return 1.0;
206 }
207 self.total_output_tokens as f64 / self.total_input_tokens as f64
208 }
209
210 pub fn avg_duration_ms(&self) -> f64 {
212 if self.count == 0 {
213 return 0.0;
214 }
215 self.total_duration_us as f64 / self.count as f64 / 1000.0
216 }
217}
218
219impl PipelineStats {
220 pub fn new() -> Self {
222 Self {
223 runs: 0,
224 per_layer: HashMap::new(),
225 }
226 }
227
228 pub fn record(&mut self, metrics: &[LayerMetrics]) {
230 self.runs += 1;
231 for m in metrics {
232 let agg = self.per_layer.entry(m.layer).or_default();
233 agg.total_input_tokens += m.input_tokens;
234 agg.total_output_tokens += m.output_tokens;
235 agg.total_duration_us += m.duration_us;
236 agg.count += 1;
237 }
238 }
239
240 pub fn record_single(
242 &mut self,
243 layer: LayerKind,
244 input_tokens: usize,
245 output_tokens: usize,
246 duration: std::time::Duration,
247 ) {
248 self.runs += 1;
249 let agg = self.per_layer.entry(layer).or_default();
250 agg.total_input_tokens += input_tokens;
251 agg.total_output_tokens += output_tokens;
252 agg.total_duration_us += duration.as_micros() as u64;
253 agg.count += 1;
254 }
255
256 pub fn total_tokens_saved(&self) -> usize {
258 self.per_layer
259 .values()
260 .map(|a| a.total_input_tokens.saturating_sub(a.total_output_tokens))
261 .sum()
262 }
263
264 pub fn save(&self) {
266 if let Ok(dir) = crate::core::data_dir::lean_ctx_data_dir() {
267 let path = dir.join("pipeline_stats.json");
268 if let Ok(json) = serde_json::to_string(self) {
269 let _ = std::fs::write(path, json);
270 }
271 }
272 }
273
274 pub fn load() -> Self {
276 crate::core::data_dir::lean_ctx_data_dir()
277 .ok()
278 .map(|d| d.join("pipeline_stats.json"))
279 .and_then(|p| std::fs::read_to_string(p).ok())
280 .and_then(|s| serde_json::from_str(&s).ok())
281 .unwrap_or_default()
282 }
283
284 pub fn format_summary(&self) -> String {
286 let mut out = format!("Pipeline Stats ({} runs):\n", self.runs);
287 for kind in LayerKind::all() {
288 if let Some(agg) = self.per_layer.get(kind) {
289 out.push_str(&format!(
290 " {}: avg {:.0}% ratio, {:.1}ms, {} invocations\n",
291 kind,
292 agg.avg_ratio() * 100.0,
293 agg.avg_duration_ms(),
294 agg.count,
295 ));
296 }
297 }
298 out.push_str(&format!(" SAVED: {} tokens\n", self.total_tokens_saved()));
299 out
300 }
301}
302
303impl Default for Pipeline {
304 fn default() -> Self {
305 Self::new()
306 }
307}
308
309#[cfg(test)]
310mod tests {
311 use super::*;
312
313 struct PassthroughLayer {
314 kind: LayerKind,
315 }
316
317 impl Layer for PassthroughLayer {
318 fn kind(&self) -> LayerKind {
319 self.kind
320 }
321
322 fn process(&self, input: LayerInput) -> LayerOutput {
323 LayerOutput {
324 content: input.content,
325 tokens: input.tokens,
326 metadata: input.metadata,
327 }
328 }
329 }
330
331 struct CompressionLayer {
332 ratio: f64,
333 }
334
335 impl Layer for CompressionLayer {
336 fn kind(&self) -> LayerKind {
337 LayerKind::Compression
338 }
339
340 fn process(&self, input: LayerInput) -> LayerOutput {
341 let new_tokens = (input.tokens as f64 * self.ratio) as usize;
342 let truncated = if input.content.len() > new_tokens * 4 {
343 input.content[..new_tokens * 4].to_string()
344 } else {
345 input.content
346 };
347 LayerOutput {
348 content: truncated,
349 tokens: new_tokens,
350 metadata: input.metadata,
351 }
352 }
353 }
354
355 #[test]
356 fn layer_kind_all_ordered() {
357 let all = LayerKind::all();
358 assert_eq!(all.len(), 6);
359 assert_eq!(all[0], LayerKind::Input);
360 assert_eq!(all[5], LayerKind::Delivery);
361 }
362
363 #[test]
364 fn passthrough_preserves_content() {
365 let layer = PassthroughLayer {
366 kind: LayerKind::Input,
367 };
368 let input = LayerInput {
369 content: "hello world".to_string(),
370 tokens: 2,
371 metadata: HashMap::new(),
372 };
373 let output = layer.process(input);
374 assert_eq!(output.content, "hello world");
375 assert_eq!(output.tokens, 2);
376 }
377
378 #[test]
379 fn compression_layer_reduces() {
380 let layer = CompressionLayer { ratio: 0.5 };
381 let input = LayerInput {
382 content: "a ".repeat(100),
383 tokens: 100,
384 metadata: HashMap::new(),
385 };
386 let output = layer.process(input);
387 assert_eq!(output.tokens, 50);
388 }
389
390 #[test]
391 fn pipeline_chains_layers() {
392 let pipeline = Pipeline::new()
393 .add_layer(Box::new(PassthroughLayer {
394 kind: LayerKind::Input,
395 }))
396 .add_layer(Box::new(CompressionLayer { ratio: 0.5 }))
397 .add_layer(Box::new(PassthroughLayer {
398 kind: LayerKind::Delivery,
399 }));
400
401 let input = LayerInput {
402 content: "a ".repeat(100),
403 tokens: 100,
404 metadata: HashMap::new(),
405 };
406 let (output, metrics) = pipeline.execute(input);
407 assert_eq!(output.tokens, 50);
408 assert_eq!(metrics.len(), 3);
409 assert_eq!(metrics[0].layer, LayerKind::Input);
410 assert_eq!(metrics[1].layer, LayerKind::Compression);
411 assert_eq!(metrics[2].layer, LayerKind::Delivery);
412 }
413
414 #[test]
415 fn metrics_new_calculates_ratio() {
416 let m = LayerMetrics::new(LayerKind::Compression, 100, 50, 1000);
417 assert!((m.compression_ratio - 0.5).abs() < f64::EPSILON);
418 }
419
420 #[test]
421 fn metrics_format_readable() {
422 let metrics = vec![
423 LayerMetrics::new(LayerKind::Input, 1000, 1000, 100),
424 LayerMetrics::new(LayerKind::Compression, 1000, 300, 5000),
425 LayerMetrics::new(LayerKind::Delivery, 300, 300, 50),
426 ];
427 let formatted = Pipeline::format_metrics(&metrics);
428 assert!(formatted.contains("input"));
429 assert!(formatted.contains("compression"));
430 assert!(formatted.contains("delivery"));
431 assert!(formatted.contains("TOTAL"));
432 }
433
434 #[test]
435 fn empty_pipeline_passes_through() {
436 let pipeline = Pipeline::new();
437 let input = LayerInput {
438 content: "test".to_string(),
439 tokens: 1,
440 metadata: HashMap::new(),
441 };
442 let (output, metrics) = pipeline.execute(input);
443 assert_eq!(output.content, "test");
444 assert!(metrics.is_empty());
445 }
446
447 #[test]
448 fn pipeline_stats_record_and_summarize() {
449 let mut stats = PipelineStats::default();
450 let metrics = vec![
451 LayerMetrics::new(LayerKind::Input, 1000, 1000, 100),
452 LayerMetrics::new(LayerKind::Compression, 1000, 300, 5000),
453 LayerMetrics::new(LayerKind::Delivery, 300, 300, 50),
454 ];
455 stats.record(&metrics);
456 stats.record(&metrics);
457
458 assert_eq!(stats.runs, 2);
459 assert_eq!(stats.total_tokens_saved(), 1400);
460
461 let agg = stats.per_layer.get(&LayerKind::Compression).unwrap();
462 assert_eq!(agg.count, 2);
463 assert_eq!(agg.total_input_tokens, 2000);
464 assert_eq!(agg.total_output_tokens, 600);
465
466 let summary = stats.format_summary();
467 assert!(summary.contains("2 runs"));
468 assert!(summary.contains("SAVED: 1400"));
469 }
470
471 #[test]
472 fn aggregated_metrics_avg() {
473 let agg = AggregatedMetrics {
474 total_input_tokens: 1000,
475 total_output_tokens: 500,
476 total_duration_us: 10000,
477 count: 2,
478 };
479 assert!((agg.avg_ratio() - 0.5).abs() < f64::EPSILON);
480 assert!((agg.avg_duration_ms() - 5.0).abs() < f64::EPSILON);
481 }
482}