1use std::fs::{File, OpenOptions};
53use std::io::{self, BufWriter, Write};
54use std::path::{Path, PathBuf};
55use std::sync::Mutex;
56
57use serde::{Deserialize, Serialize};
58use thiserror::Error;
59
60#[derive(Error, Debug)]
61pub enum TelemetryError {
62 #[error("telemetry I/O: {0}")]
63 Io(#[from] io::Error),
64 #[error("telemetry serialization: {0}")]
65 Serde(#[from] serde_json::Error),
66}
67
68pub type Result<T> = std::result::Result<T, TelemetryError>;
69
70#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Serialize, Deserialize)]
75#[serde(rename_all = "snake_case")]
76pub enum Shape {
77 Prose,
78 NumberedList,
79 BulletList,
80 CodeBlock,
81 MarkdownTable,
82 NestedObject,
83 FlatObject,
84 ArrayOfObjects,
85 ArrayOfPrimitives,
86 Empty,
87 #[default]
88 Unknown,
89}
90
91#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Serialize, Deserialize)]
93pub enum Layer {
94 L0,
96 L1,
98 L2,
100 #[default]
102 L3,
103}
104
105#[derive(Debug, Clone, Default, Serialize, Deserialize)]
110#[non_exhaustive]
111pub struct PipelineEvent {
112 pub session_hash: String,
114 pub tool_call_id_hash: String,
117 pub tool_name_anon: String,
119 pub endpoint_class: String,
122 pub response_chars: u64,
124 pub shape: Shape,
126 #[serde(default)]
129 pub inner_formats: Vec<String>,
130 pub content_sha_prefix_hex: String,
134 #[serde(default, skip_serializing_if = "Option::is_none")]
137 pub file_path_hash: Option<String>,
138 pub is_dedup_hit: bool,
140 pub layer_used: Layer,
142 #[serde(default, skip_serializing_if = "Option::is_none")]
144 pub template_id: Option<String>,
145 pub tokens_baseline: u32,
147 pub tokens_final: u32,
149 pub context_partition: u32,
151 pub is_sidechain: bool,
153 pub ts_ms: i64,
155 #[serde(default = "default_sample_rate")]
159 pub sample_rate_applied: f32,
160
161 #[serde(default, skip_serializing_if = "is_false")]
171 pub enricher_prefetched: bool,
172
173 #[serde(default, skip_serializing_if = "is_zero_u32")]
177 pub enricher_predicted_cost_tokens: u32,
178
179 #[serde(default, skip_serializing_if = "Option::is_none")]
184 pub enricher_decline_reason: Option<String>,
185
186 #[serde(default, skip_serializing_if = "Option::is_none")]
195 pub cited_in_next_n_turns: Option<bool>,
196}
197
198fn is_false(b: &bool) -> bool {
199 !*b
200}
201fn is_zero_u32(n: &u32) -> bool {
202 *n == 0
203}
204fn is_zero_u64(n: &u64) -> bool {
205 *n == 0
206}
207
208fn default_sample_rate() -> f32 {
209 1.0
210}
211
212#[derive(Debug, Clone, Default, Serialize, Deserialize)]
218pub struct SessionSummary {
219 pub session_hash: String,
220 pub total_events: u64,
221 pub dedup_hit_rate: f32,
223 pub l1_hit_rate: f32,
224 pub l2_hit_rate: f32,
225 pub avg_response_chars: f32,
226 pub compaction_count: u32,
227 pub total_baseline_tokens: u64,
228 pub total_final_tokens: u64,
229 pub savings_pct: f32,
230 pub duration_sec: f32,
231 pub ended_at_ms: i64,
232 pub sample_rate_applied: f32,
234 #[serde(default)]
237 pub enrichment: EnrichmentEffectiveness,
238}
239
240#[derive(Debug, Clone, Default, Serialize, Deserialize, PartialEq)]
276pub struct EnrichmentEffectiveness {
277 pub total_prefetches: u32,
279 pub cited_prefetches: u32,
283 pub total_declines: u32,
285 pub late_invoked_after_decline: u32,
288 pub cost_overrun_count: u32,
291 pub total_predictions: u32,
293 pub net_prediction_error_tokens: i64,
296
297 #[serde(default, skip_serializing_if = "is_zero_u32")]
302 pub inference_calls_saved_prefetch: u32,
303 #[serde(default, skip_serializing_if = "is_zero_u32")]
307 pub inference_calls_saved_dedup: u32,
308 #[serde(default, skip_serializing_if = "is_zero_u32")]
313 pub inference_calls_saved_fail_fast: u32,
314 #[serde(default, skip_serializing_if = "is_zero_u64")]
317 pub inference_tokens_saved: u64,
318
319 #[serde(default, skip_serializing_if = "is_zero_u32")]
325 pub prefetch_dispatched: u32,
326 #[serde(default, skip_serializing_if = "is_zero_u32")]
332 pub prefetch_won_race: u32,
333 #[serde(default, skip_serializing_if = "is_zero_u32")]
337 pub prefetch_wasted: u32,
338}
339
340impl EnrichmentEffectiveness {
341 pub fn prefetch_hit_rate(&self) -> Option<f32> {
345 (self.total_prefetches > 0)
346 .then(|| self.cited_prefetches as f32 / self.total_prefetches as f32)
347 }
348
349 pub fn decline_recall_loss(&self) -> Option<f32> {
351 (self.total_declines > 0)
352 .then(|| self.late_invoked_after_decline as f32 / self.total_declines as f32)
353 }
354
355 pub fn cost_overrun_rate(&self) -> Option<f32> {
358 (self.total_predictions > 0)
359 .then(|| self.cost_overrun_count as f32 / self.total_predictions as f32)
360 }
361
362 pub fn total_calls_saved(&self) -> u32 {
365 self.inference_calls_saved_prefetch
366 .saturating_add(self.inference_calls_saved_dedup)
367 .saturating_add(self.inference_calls_saved_fail_fast)
368 }
369
370 pub fn accumulate(&mut self, ev: &PipelineEvent) {
386 if ev.enricher_prefetched {
387 self.total_prefetches = self.total_prefetches.saturating_add(1);
388 self.total_predictions = self.total_predictions.saturating_add(1);
389 let predicted = ev.enricher_predicted_cost_tokens as i64;
390 let actual = ev.tokens_baseline as i64;
391 self.net_prediction_error_tokens = self
392 .net_prediction_error_tokens
393 .saturating_add(actual - predicted);
394 if predicted > 0 && actual * 10 >= predicted * 13 {
397 self.cost_overrun_count = self.cost_overrun_count.saturating_add(1);
398 }
399 if matches!(ev.cited_in_next_n_turns, Some(true)) {
400 self.cited_prefetches = self.cited_prefetches.saturating_add(1);
401 self.inference_calls_saved_prefetch =
402 self.inference_calls_saved_prefetch.saturating_add(1);
403 self.inference_tokens_saved = self
404 .inference_tokens_saved
405 .saturating_add(ev.tokens_baseline as u64);
406 }
407 }
408 if ev.is_dedup_hit {
409 self.inference_calls_saved_dedup = self.inference_calls_saved_dedup.saturating_add(1);
410 self.inference_tokens_saved = self
415 .inference_tokens_saved
416 .saturating_add(ev.tokens_baseline as u64);
417 }
418 if ev.enricher_decline_reason.is_some() {
419 self.total_declines = self.total_declines.saturating_add(1);
420 }
421 }
422
423 pub fn record_fail_fast_skip(&mut self, predicted_cost_tokens: u32) {
432 self.inference_calls_saved_fail_fast =
433 self.inference_calls_saved_fail_fast.saturating_add(1);
434 self.inference_tokens_saved = self
435 .inference_tokens_saved
436 .saturating_add(predicted_cost_tokens as u64);
437 }
438
439 pub fn record_prefetch_dispatched(&mut self) {
446 self.prefetch_dispatched = self.prefetch_dispatched.saturating_add(1);
447 }
448
449 pub fn record_prefetch_won_race(&mut self) {
455 self.prefetch_won_race = self.prefetch_won_race.saturating_add(1);
456 }
457
458 pub fn record_prefetch_wasted(&mut self) {
463 self.prefetch_wasted = self.prefetch_wasted.saturating_add(1);
464 }
465
466 pub fn prefetch_race_win_rate(&self) -> Option<f32> {
469 (self.prefetch_dispatched > 0)
470 .then(|| self.prefetch_won_race as f32 / self.prefetch_dispatched as f32)
471 }
472
473 pub fn prefetch_waste_rate(&self) -> Option<f32> {
477 (self.prefetch_dispatched > 0)
478 .then(|| self.prefetch_wasted as f32 / self.prefetch_dispatched as f32)
479 }
480
481 pub fn report(&self) -> String {
483 let hit = self
484 .prefetch_hit_rate()
485 .map(|r| format!("{:.1}%", r * 100.0))
486 .unwrap_or_else(|| "n/a".into());
487 let loss = self
488 .decline_recall_loss()
489 .map(|r| format!("{:.1}%", r * 100.0))
490 .unwrap_or_else(|| "n/a".into());
491 let overrun = self
492 .cost_overrun_rate()
493 .map(|r| format!("{:.1}%", r * 100.0))
494 .unwrap_or_else(|| "n/a".into());
495 let race = self
496 .prefetch_race_win_rate()
497 .map(|r| format!("{:.1}%", r * 100.0))
498 .unwrap_or_else(|| "n/a".into());
499 let waste = self
500 .prefetch_waste_rate()
501 .map(|r| format!("{:.1}%", r * 100.0))
502 .unwrap_or_else(|| "n/a".into());
503 format!(
504 "prefetch_hit={hit} decline_recall_loss={loss} cost_overrun={overrun} \
505 race_win={race} waste={waste} \
506 calls_saved={saved} (prefetch={pf}, dedup={dd}, fail_fast={ff}) \
507 tokens_saved={ts} prefetches={p} dispatched={dp} \
508 declines={d} predictions={pr}",
509 saved = self.total_calls_saved(),
510 pf = self.inference_calls_saved_prefetch,
511 dd = self.inference_calls_saved_dedup,
512 ff = self.inference_calls_saved_fail_fast,
513 ts = self.inference_tokens_saved,
514 p = self.total_prefetches,
515 dp = self.prefetch_dispatched,
516 d = self.total_declines,
517 pr = self.total_predictions,
518 )
519 }
520}
521
522pub trait TelemetrySink: Send + Sync {
527 fn record(&self, event: &PipelineEvent) -> Result<()>;
529
530 fn record_summary(&self, _summary: &SessionSummary) -> Result<()> {
533 Ok(())
534 }
535
536 fn flush(&self) -> Result<()> {
538 Ok(())
539 }
540}
541
542pub struct JsonlSink {
547 path: PathBuf,
548 writer: Mutex<BufWriter<File>>,
549}
550
551impl JsonlSink {
552 pub fn open(path: impl AsRef<Path>) -> Result<Self> {
554 let path = path.as_ref().to_path_buf();
555 if let Some(parent) = path.parent() {
556 std::fs::create_dir_all(parent)?;
557 }
558 let file = OpenOptions::new().create(true).append(true).open(&path)?;
559 Ok(Self {
560 path,
561 writer: Mutex::new(BufWriter::new(file)),
562 })
563 }
564
565 pub fn path(&self) -> &Path {
567 &self.path
568 }
569}
570
571impl TelemetrySink for JsonlSink {
572 fn record(&self, event: &PipelineEvent) -> Result<()> {
573 let line = serde_json::to_string(event)?;
574 let mut w = self.writer.lock().expect("telemetry writer mutex poisoned");
575 w.write_all(line.as_bytes())?;
576 w.write_all(b"\n")?;
577 Ok(())
578 }
579
580 fn record_summary(&self, summary: &SessionSummary) -> Result<()> {
581 let wrapped = serde_json::json!({
584 "type": "session_summary",
585 "data": summary,
586 });
587 let line = serde_json::to_string(&wrapped)?;
588 let mut w = self.writer.lock().expect("telemetry writer mutex poisoned");
589 w.write_all(line.as_bytes())?;
590 w.write_all(b"\n")?;
591 Ok(())
592 }
593
594 fn flush(&self) -> Result<()> {
595 self.writer
596 .lock()
597 .expect("telemetry writer mutex poisoned")
598 .flush()?;
599 Ok(())
600 }
601}
602
603#[derive(Default)]
606pub struct NullSink;
607
608impl TelemetrySink for NullSink {
609 fn record(&self, _event: &PipelineEvent) -> Result<()> {
610 Ok(())
611 }
612}
613
614#[derive(Default)]
616pub struct MemorySink {
617 events: Mutex<Vec<PipelineEvent>>,
618 summaries: Mutex<Vec<SessionSummary>>,
619}
620
621impl MemorySink {
622 pub fn new() -> Self {
623 Self::default()
624 }
625
626 pub fn events(&self) -> Vec<PipelineEvent> {
627 self.events.lock().unwrap().clone()
628 }
629
630 pub fn summaries(&self) -> Vec<SessionSummary> {
631 self.summaries.lock().unwrap().clone()
632 }
633
634 pub fn len(&self) -> usize {
635 self.events.lock().unwrap().len()
636 }
637
638 pub fn is_empty(&self) -> bool {
639 self.len() == 0
640 }
641}
642
643impl TelemetrySink for MemorySink {
644 fn record(&self, event: &PipelineEvent) -> Result<()> {
645 self.events.lock().unwrap().push(event.clone());
646 Ok(())
647 }
648
649 fn record_summary(&self, summary: &SessionSummary) -> Result<()> {
650 self.summaries.lock().unwrap().push(summary.clone());
651 Ok(())
652 }
653}
654
655#[cfg(test)]
656mod tests {
657 use super::*;
658 use std::sync::Arc;
659 use std::thread;
660
661 fn sample_event() -> PipelineEvent {
662 PipelineEvent {
663 session_hash: "sess0001".into(),
664 tool_call_id_hash: "tc0001".into(),
665 tool_name_anon: "Read".into(),
666 endpoint_class: "Read".into(),
667 response_chars: 1234,
668 shape: Shape::NumberedList,
669 inner_formats: vec![],
670 content_sha_prefix_hex: "0123456789abcdef".into(),
671 file_path_hash: Some("fpath001".into()),
672 is_dedup_hit: false,
673 layer_used: Layer::L3,
674 template_id: None,
675 tokens_baseline: 308,
676 tokens_final: 308,
677 context_partition: 0,
678 is_sidechain: false,
679 ts_ms: 1_700_000_000_000,
680 sample_rate_applied: 1.0,
681 enricher_prefetched: false,
682 enricher_predicted_cost_tokens: 0,
683 enricher_decline_reason: None,
684 cited_in_next_n_turns: None,
685 }
686 }
687
688 #[test]
689 fn memory_sink_captures_events() {
690 let sink = MemorySink::new();
691 let e = sample_event();
692 sink.record(&e).unwrap();
693 assert_eq!(sink.len(), 1);
694 assert_eq!(sink.events()[0].tool_call_id_hash, "tc0001");
695 }
696
697 #[test]
698 fn null_sink_is_noop() {
699 let sink = NullSink;
700 let e = sample_event();
701 sink.record(&e).unwrap();
702 }
704
705 #[test]
706 fn jsonl_sink_appends_line() {
707 let tmp = tempfile();
708 {
709 let sink = JsonlSink::open(&tmp).unwrap();
710 sink.record(&sample_event()).unwrap();
711 sink.flush().unwrap();
712 }
713 let body = std::fs::read_to_string(&tmp).unwrap();
714 assert_eq!(body.lines().count(), 1);
715 let deserialized: PipelineEvent = serde_json::from_str(body.trim()).unwrap();
716 assert_eq!(deserialized.tokens_baseline, 308);
717 std::fs::remove_file(&tmp).ok();
718 }
719
720 #[test]
721 fn jsonl_sink_survives_multiple_writes() {
722 let tmp = tempfile();
723 {
724 let sink = JsonlSink::open(&tmp).unwrap();
725 for i in 0..10 {
726 let mut e = sample_event();
727 e.tokens_baseline = i * 10;
728 sink.record(&e).unwrap();
729 }
730 sink.flush().unwrap();
731 }
732 let body = std::fs::read_to_string(&tmp).unwrap();
733 assert_eq!(body.lines().count(), 10);
734 std::fs::remove_file(&tmp).ok();
735 }
736
737 #[test]
738 fn jsonl_sink_supports_summary_tag() {
739 let tmp = tempfile();
740 {
741 let sink = JsonlSink::open(&tmp).unwrap();
742 sink.record(&sample_event()).unwrap();
743 let summary = SessionSummary {
744 session_hash: "sess0001".into(),
745 total_events: 10,
746 dedup_hit_rate: 0.35,
747 savings_pct: 0.35,
748 ended_at_ms: 1_700_000_100_000,
749 sample_rate_applied: 1.0,
750 ..Default::default()
751 };
752 sink.record_summary(&summary).unwrap();
753 sink.flush().unwrap();
754 }
755 let body = std::fs::read_to_string(&tmp).unwrap();
756 assert_eq!(body.lines().count(), 2);
757 assert!(body.contains("\"session_summary\""));
758 std::fs::remove_file(&tmp).ok();
759 }
760
761 #[test]
762 fn concurrent_writes_are_serialized() {
763 let tmp = tempfile();
764 {
765 let sink = Arc::new(JsonlSink::open(&tmp).unwrap());
766 let mut handles = vec![];
767 for i in 0..8 {
768 let sink = Arc::clone(&sink);
769 handles.push(thread::spawn(move || {
770 let mut e = sample_event();
771 e.tool_call_id_hash = format!("tc{i:04}");
772 for _ in 0..25 {
773 sink.record(&e).unwrap();
774 }
775 }));
776 }
777 for h in handles {
778 h.join().unwrap();
779 }
780 sink.flush().unwrap();
781 }
782 let body = std::fs::read_to_string(&tmp).unwrap();
783 assert_eq!(body.lines().count(), 200);
785 for line in body.lines() {
786 let _: PipelineEvent = serde_json::from_str(line).unwrap();
787 }
788 std::fs::remove_file(&tmp).ok();
789 }
790
791 #[test]
792 fn schema_is_forward_compatible() {
793 let legacy = r#"{
796 "session_hash": "s",
797 "tool_call_id_hash": "t",
798 "tool_name_anon": "Read",
799 "endpoint_class": "Read",
800 "response_chars": 0,
801 "shape": "prose",
802 "content_sha_prefix_hex": "",
803 "is_dedup_hit": false,
804 "layer_used": "L3",
805 "tokens_baseline": 0,
806 "tokens_final": 0,
807 "context_partition": 0,
808 "is_sidechain": false,
809 "ts_ms": 0
810 }"#;
811 let parsed: PipelineEvent = serde_json::from_str(legacy).unwrap();
812 assert_eq!(parsed.sample_rate_applied, 1.0); assert!(parsed.inner_formats.is_empty());
814 assert!(parsed.file_path_hash.is_none());
815 }
816
817 fn tempfile() -> PathBuf {
819 use std::sync::atomic::{AtomicU64, Ordering};
820 static COUNTER: AtomicU64 = AtomicU64::new(0);
821 let n = COUNTER.fetch_add(1, Ordering::Relaxed);
822 let pid = std::process::id();
823 std::env::temp_dir().join(format!("devboy_tele_test_{pid}_{n}.jsonl"))
824 }
825
826 #[test]
827 fn memory_sink_accessors() {
828 let sink = MemorySink::new();
829 assert!(sink.is_empty());
830 assert_eq!(sink.len(), 0);
831 sink.record(&sample_event()).unwrap();
832 assert!(!sink.is_empty());
833 assert_eq!(sink.len(), 1);
834 sink.flush().unwrap();
836 }
837
838 #[test]
839 fn memory_sink_captures_summaries() {
840 let sink = MemorySink::new();
841 let summary = SessionSummary {
842 session_hash: "abcd".into(),
843 total_events: 7,
844 savings_pct: 0.33,
845 ..Default::default()
846 };
847 sink.record_summary(&summary).unwrap();
848 assert_eq!(sink.summaries().len(), 1);
849 assert_eq!(sink.summaries()[0].total_events, 7);
850 }
851
852 #[test]
853 fn jsonl_sink_path_getter() {
854 let tmp = tempfile();
855 let sink = JsonlSink::open(&tmp).unwrap();
856 assert_eq!(sink.path(), tmp.as_path());
857 std::fs::remove_file(&tmp).ok();
858 }
859
860 #[test]
861 fn jsonl_sink_creates_parent_dirs() {
862 let parent =
863 std::env::temp_dir().join(format!("devboy_tele_nested_{}", std::process::id()));
864 let tmp = parent.join("deep/sub/events.jsonl");
865 assert!(!tmp.parent().unwrap().exists());
866 let sink = JsonlSink::open(&tmp).unwrap();
867 sink.record(&sample_event()).unwrap();
868 sink.flush().unwrap();
869 assert!(tmp.exists());
870 std::fs::remove_dir_all(&parent).ok();
871 }
872
873 #[test]
874 fn shape_and_layer_defaults() {
875 assert_eq!(Shape::default(), Shape::Unknown);
876 assert_eq!(Layer::default(), Layer::L3);
877 }
878
879 #[test]
880 fn shape_serde_snake_case() {
881 let j = serde_json::to_string(&Shape::MarkdownTable).unwrap();
882 assert_eq!(j, "\"markdown_table\"");
883 let parsed: Shape = serde_json::from_str("\"array_of_objects\"").unwrap();
884 assert_eq!(parsed, Shape::ArrayOfObjects);
885 }
886
887 #[test]
888 fn null_sink_flush_is_noop() {
889 let sink = NullSink;
890 sink.flush().unwrap();
891 }
892
893 #[test]
894 fn telemetry_error_display() {
895 let io_err = TelemetryError::Io(std::io::Error::other("boom"));
896 let msg = format!("{io_err}");
897 assert!(msg.contains("telemetry"));
898 }
899
900 #[test]
903 fn enrichment_rates_are_none_when_no_activity() {
904 let e = EnrichmentEffectiveness::default();
905 assert!(e.prefetch_hit_rate().is_none());
906 assert!(e.decline_recall_loss().is_none());
907 assert!(e.cost_overrun_rate().is_none());
908 assert!(e.report().contains("n/a"));
909 }
910
911 #[test]
912 fn prefetch_hit_rate_handles_zero_and_partial_hits() {
913 let mut e = EnrichmentEffectiveness {
914 total_prefetches: 10,
915 cited_prefetches: 7,
916 ..Default::default()
917 };
918 assert_eq!(e.prefetch_hit_rate(), Some(0.7));
919 e.cited_prefetches = 0;
920 assert_eq!(e.prefetch_hit_rate(), Some(0.0));
921 }
922
923 #[test]
924 fn decline_recall_loss_metric() {
925 let e = EnrichmentEffectiveness {
926 total_declines: 20,
927 late_invoked_after_decline: 3,
928 ..Default::default()
929 };
930 let rate = e.decline_recall_loss().unwrap();
931 assert!((rate - 0.15).abs() < 1e-6);
932 }
933
934 #[test]
935 fn cost_overrun_rate_metric() {
936 let e = EnrichmentEffectiveness {
937 total_predictions: 100,
938 cost_overrun_count: 12,
939 ..Default::default()
940 };
941 let rate = e.cost_overrun_rate().unwrap();
942 assert!((rate - 0.12).abs() < 1e-6);
943 }
944
945 #[test]
946 fn report_format_is_human_readable() {
947 let e = EnrichmentEffectiveness {
948 total_prefetches: 10,
949 cited_prefetches: 7,
950 total_declines: 20,
951 late_invoked_after_decline: 2,
952 cost_overrun_count: 3,
953 total_predictions: 30,
954 ..Default::default()
955 };
956 let r = e.report();
957 assert!(r.contains("70.0%"), "expected prefetch_hit=70.0%, got {r}");
958 assert!(
959 r.contains("10.0%"),
960 "expected decline_recall_loss=10.0%, got {r}"
961 );
962 assert!(r.contains("10.0%"), "expected cost_overrun=10.0%, got {r}");
963 }
964
965 #[test]
966 fn pipeline_event_skips_default_enricher_fields_on_serialise() {
967 let evt = sample_event();
968 let json = serde_json::to_string(&evt).unwrap();
969 assert!(!json.contains("enricher_prefetched"));
972 assert!(!json.contains("enricher_predicted_cost_tokens"));
973 assert!(!json.contains("enricher_decline_reason"));
974 assert!(!json.contains("cited_in_next_n_turns"));
975 }
976
977 #[test]
978 fn pipeline_event_round_trips_with_enricher_fields_populated() {
979 let mut evt = sample_event();
980 evt.enricher_prefetched = true;
981 evt.enricher_predicted_cost_tokens = 540;
982 evt.enricher_decline_reason = Some("budget".into());
983 evt.cited_in_next_n_turns = Some(true);
984 let json = serde_json::to_string(&evt).unwrap();
985 let back: PipelineEvent = serde_json::from_str(&json).unwrap();
986 assert!(back.enricher_prefetched);
987 assert_eq!(back.enricher_predicted_cost_tokens, 540);
988 assert_eq!(back.enricher_decline_reason.as_deref(), Some("budget"));
989 assert_eq!(back.cited_in_next_n_turns, Some(true));
990 }
991
992 #[test]
995 fn total_calls_saved_sums_three_buckets() {
996 let e = EnrichmentEffectiveness {
997 inference_calls_saved_prefetch: 7,
998 inference_calls_saved_dedup: 12,
999 inference_calls_saved_fail_fast: 3,
1000 ..Default::default()
1001 };
1002 assert_eq!(e.total_calls_saved(), 22);
1003 }
1004
1005 #[test]
1006 fn accumulate_dedup_hit_increments_dedup_bucket_and_tokens() {
1007 let mut e = EnrichmentEffectiveness::default();
1008 let mut ev = sample_event();
1009 ev.is_dedup_hit = true;
1010 ev.tokens_baseline = 800;
1011 ev.tokens_final = 9;
1012 e.accumulate(&ev);
1013 assert_eq!(e.inference_calls_saved_dedup, 1);
1014 assert_eq!(e.inference_tokens_saved, 800);
1015 assert_eq!(e.total_calls_saved(), 1);
1016 assert_eq!(e.total_prefetches, 0);
1018 assert_eq!(e.total_predictions, 0);
1019 }
1020
1021 #[test]
1022 fn accumulate_cited_prefetch_increments_prefetch_bucket() {
1023 let mut e = EnrichmentEffectiveness::default();
1024 let mut ev = sample_event();
1025 ev.enricher_prefetched = true;
1026 ev.enricher_predicted_cost_tokens = 500;
1027 ev.tokens_baseline = 540;
1028 ev.cited_in_next_n_turns = Some(true);
1029 e.accumulate(&ev);
1030 assert_eq!(e.total_prefetches, 1);
1031 assert_eq!(e.cited_prefetches, 1);
1032 assert_eq!(e.inference_calls_saved_prefetch, 1);
1033 assert_eq!(e.inference_tokens_saved, 540);
1034 assert_eq!(e.cost_overrun_count, 0); }
1036
1037 #[test]
1038 fn accumulate_uncited_prefetch_does_not_count_as_saved() {
1039 let mut e = EnrichmentEffectiveness::default();
1040 let mut ev = sample_event();
1041 ev.enricher_prefetched = true;
1042 ev.cited_in_next_n_turns = Some(false);
1043 ev.tokens_baseline = 200;
1044 e.accumulate(&ev);
1045 assert_eq!(e.total_prefetches, 1);
1046 assert_eq!(e.cited_prefetches, 0);
1047 assert_eq!(e.inference_calls_saved_prefetch, 0);
1048 assert_eq!(e.inference_tokens_saved, 0);
1049 }
1050
1051 #[test]
1052 fn accumulate_overrun_counts_when_actual_exceeds_130_percent() {
1053 let mut e = EnrichmentEffectiveness::default();
1054 let mut ev = sample_event();
1055 ev.enricher_prefetched = true;
1056 ev.enricher_predicted_cost_tokens = 100;
1057 ev.tokens_baseline = 200; e.accumulate(&ev);
1059 assert_eq!(e.cost_overrun_count, 1);
1060 assert_eq!(e.net_prediction_error_tokens, 100);
1061 }
1062
1063 #[test]
1064 fn accumulate_decline_reason_increments_declines() {
1065 let mut e = EnrichmentEffectiveness::default();
1066 let mut ev = sample_event();
1067 ev.enricher_decline_reason = Some("budget".into());
1068 e.accumulate(&ev);
1069 assert_eq!(e.total_declines, 1);
1070 }
1071
1072 #[test]
1073 fn record_fail_fast_skip_increments_counter_and_tokens() {
1074 let mut e = EnrichmentEffectiveness::default();
1075 e.record_fail_fast_skip(75);
1076 e.record_fail_fast_skip(75);
1077 assert_eq!(e.inference_calls_saved_fail_fast, 2);
1078 assert_eq!(e.inference_tokens_saved, 150);
1079 assert_eq!(e.total_calls_saved(), 2);
1080 }
1081
1082 #[test]
1083 fn report_includes_calls_saved_and_tokens_saved() {
1084 let e = EnrichmentEffectiveness {
1085 total_prefetches: 10,
1086 cited_prefetches: 7,
1087 inference_calls_saved_prefetch: 7,
1088 inference_calls_saved_dedup: 12,
1089 inference_calls_saved_fail_fast: 3,
1090 inference_tokens_saved: 12_345,
1091 ..Default::default()
1092 };
1093 let r = e.report();
1094 assert!(r.contains("calls_saved=22"), "report missing total: {r}");
1095 assert!(
1096 r.contains("prefetch=7") && r.contains("dedup=12") && r.contains("fail_fast=3"),
1097 "report missing per-bucket breakdown: {r}"
1098 );
1099 assert!(
1100 r.contains("tokens_saved=12345"),
1101 "report missing tokens_saved: {r}"
1102 );
1103 }
1104
1105 #[test]
1106 fn enrichment_skips_zero_savings_fields_on_serialise() {
1107 let e = EnrichmentEffectiveness::default();
1108 let json = serde_json::to_string(&e).unwrap();
1109 assert!(!json.contains("inference_calls_saved_prefetch"));
1110 assert!(!json.contains("inference_calls_saved_dedup"));
1111 assert!(!json.contains("inference_calls_saved_fail_fast"));
1112 assert!(!json.contains("inference_tokens_saved"));
1113 }
1114
1115 #[test]
1116 fn enrichment_round_trips_with_savings_populated() {
1117 let e = EnrichmentEffectiveness {
1118 inference_calls_saved_prefetch: 4,
1119 inference_calls_saved_dedup: 9,
1120 inference_calls_saved_fail_fast: 2,
1121 inference_tokens_saved: 8_400,
1122 ..Default::default()
1123 };
1124 let json = serde_json::to_string(&e).unwrap();
1125 let back: EnrichmentEffectiveness = serde_json::from_str(&json).unwrap();
1126 assert_eq!(back, e);
1127 }
1128
1129 #[test]
1132 fn record_prefetch_dispatched_increments_counter() {
1133 let mut e = EnrichmentEffectiveness::default();
1134 e.record_prefetch_dispatched();
1135 e.record_prefetch_dispatched();
1136 e.record_prefetch_dispatched();
1137 assert_eq!(e.prefetch_dispatched, 3);
1138 }
1139
1140 #[test]
1141 fn race_win_rate_returns_some_only_when_dispatched() {
1142 let e0 = EnrichmentEffectiveness::default();
1143 assert!(e0.prefetch_race_win_rate().is_none());
1144 let e = EnrichmentEffectiveness {
1145 prefetch_dispatched: 10,
1146 prefetch_won_race: 7,
1147 ..Default::default()
1148 };
1149 let rate = e.prefetch_race_win_rate().unwrap();
1150 assert!((rate - 0.7).abs() < 1e-6);
1151 }
1152
1153 #[test]
1154 fn waste_rate_separates_dispatched_from_total_prefetches() {
1155 let e = EnrichmentEffectiveness {
1159 total_prefetches: 12,
1160 prefetch_dispatched: 10, prefetch_wasted: 4, ..Default::default()
1163 };
1164 let rate = e.prefetch_waste_rate().unwrap();
1165 assert!((rate - 0.4).abs() < 1e-6);
1166 assert!(e.prefetch_race_win_rate().unwrap().abs() < 1e-6); }
1168
1169 #[test]
1170 fn report_includes_race_and_waste_when_dispatched() {
1171 let e = EnrichmentEffectiveness {
1172 total_prefetches: 10,
1173 prefetch_dispatched: 10,
1174 prefetch_won_race: 6,
1175 prefetch_wasted: 2,
1176 ..Default::default()
1177 };
1178 let r = e.report();
1179 assert!(r.contains("race_win=60.0%"), "report missing race_win: {r}");
1180 assert!(r.contains("waste=20.0%"), "report missing waste: {r}");
1181 assert!(
1182 r.contains("dispatched=10"),
1183 "report missing dispatched: {r}"
1184 );
1185 }
1186
1187 #[test]
1188 fn race_fields_skip_serialise_when_zero() {
1189 let e = EnrichmentEffectiveness::default();
1190 let json = serde_json::to_string(&e).unwrap();
1191 assert!(!json.contains("prefetch_dispatched"));
1192 assert!(!json.contains("prefetch_won_race"));
1193 assert!(!json.contains("prefetch_wasted"));
1194 }
1195
1196 #[test]
1197 fn race_fields_round_trip_when_populated() {
1198 let e = EnrichmentEffectiveness {
1199 prefetch_dispatched: 12,
1200 prefetch_won_race: 8,
1201 prefetch_wasted: 3,
1202 ..Default::default()
1203 };
1204 let json = serde_json::to_string(&e).unwrap();
1205 let back: EnrichmentEffectiveness = serde_json::from_str(&json).unwrap();
1206 assert_eq!(back, e);
1207 }
1208}