Skip to main content

devboy_format_pipeline/
telemetry.rs

1//! Pipeline telemetry — per-response event capture for adaptive tuning.
2//!
3//! The pipeline emits one [`PipelineEvent`] per tool-result it handles.
4//! Events are anonymized by construction: no raw response text, no tool
5//! arguments, no user-facing strings leave this module. The schema carries
6//! just enough signal to drive the tuning rules described in
7//! `docs/research/paper-2-mckp-format-adaptive.md` §Adaptive Configuration.
8//!
9//! # Design
10//!
11//! - **Sink trait** — [`TelemetrySink`] abstracts persistence. Default impl
12//!   [`JsonlSink`] appends one JSON line per event to a per-session file;
13//!   alternate impls can stream to stdout, discard for tests, or forward
14//!   to an in-process aggregator.
15//! - **Zero-cost when disabled** — sinks are dyn-dispatched via an
16//!   `Option<Arc<dyn TelemetrySink>>`; setting `None` eliminates all
17//!   per-call allocation.
18//! - **Append-only, crash-safe** — JsonlSink opens with `O_APPEND`; no
19//!   in-memory buffering beyond the default kernel write buffer.
20//! - **Schema additions only** — `PipelineEvent` is `non_exhaustive`;
21//!   downstream analyzers project specific fields.
22//!
23//! # Example
24//!
25//! ```no_run
26//! use std::sync::Arc;
27//! use devboy_format_pipeline::telemetry::{
28//!     JsonlSink, PipelineEvent, Shape, Layer, TelemetrySink,
29//! };
30//!
31//! let sink: Arc<dyn TelemetrySink> =
32//!     Arc::new(JsonlSink::open("/tmp/devboy-telemetry/sess_abcd.jsonl").unwrap());
33//!
34//! // PipelineEvent is #[non_exhaustive]; construct via Default + mutation.
35//! let mut evt = PipelineEvent::default();
36//! evt.session_hash = "abcdef01".into();
37//! evt.tool_call_id_hash = "feed1234".into();
38//! evt.tool_name_anon = "Read".into();
39//! evt.endpoint_class = "Read".into();
40//! evt.response_chars = 1234;
41//! evt.shape = Shape::NumberedList;
42//! evt.content_sha_prefix_hex = "0123456789abcdef".into();
43//! evt.file_path_hash = Some("abc12345".into());
44//! evt.layer_used = Layer::L3;
45//! evt.tokens_baseline = 308;
46//! evt.tokens_final = 308;
47//! evt.ts_ms = 1_700_000_000_000;
48//!
49//! sink.record(&evt).unwrap();
50//! ```
51
52use std::fs::{File, OpenOptions};
53use std::io::{self, BufWriter, Write};
54use std::path::{Path, PathBuf};
55use std::sync::Mutex;
56
57use serde::{Deserialize, Serialize};
58use thiserror::Error;
59
60#[derive(Error, Debug)]
61pub enum TelemetryError {
62    #[error("telemetry I/O: {0}")]
63    Io(#[from] io::Error),
64    #[error("telemetry serialization: {0}")]
65    Serde(#[from] serde_json::Error),
66}
67
68pub type Result<T> = std::result::Result<T, TelemetryError>;
69
70/// Structural classification of a tool response.
71///
72/// Must be kept in sync with `docs/research/scripts/extract_paper2_format_events.py`
73/// so offline analyses and online collection share the same taxonomy.
74#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Serialize, Deserialize)]
75#[serde(rename_all = "snake_case")]
76pub enum Shape {
77    Prose,
78    NumberedList,
79    BulletList,
80    CodeBlock,
81    MarkdownTable,
82    NestedObject,
83    FlatObject,
84    ArrayOfObjects,
85    ArrayOfPrimitives,
86    Empty,
87    #[default]
88    Unknown,
89}
90
91/// Which pipeline layer produced the terminal decision for this response.
92#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Serialize, Deserialize)]
93pub enum Layer {
94    /// Content-hash dedup emitted a reference hint.
95    L0,
96    /// A per-endpoint template was applied.
97    L1,
98    /// Generic MCKP reformatted the response.
99    L2,
100    /// Passed through unchanged (text shape, below threshold, or no gain).
101    #[default]
102    L3,
103}
104
105/// Single pipeline decision — emitted once per tool-result.
106///
107/// See `docs/research/paper-2-mckp-format-adaptive.md` §Telemetry &
108/// Observability for field-level documentation.
109#[derive(Debug, Clone, Default, Serialize, Deserialize)]
110#[non_exhaustive]
111pub struct PipelineEvent {
112    /// SHA-256 prefix of the session UUID (not the raw UUID).
113    pub session_hash: String,
114    /// SHA-256 prefix of this tool_use_id — identifies the response for
115    /// dedup references without exposing the raw id.
116    pub tool_call_id_hash: String,
117    /// Anonymized tool name. MCP slugs are hashed (`mcp__p<hash>__verb`).
118    pub tool_name_anon: String,
119    /// Coarse endpoint classification (e.g. `git_log`, `curl`, or the full
120    /// tool_name for single-endpoint tools).
121    pub endpoint_class: String,
122    /// Raw byte count of the response.
123    pub response_chars: u64,
124    /// Structural shape of the response.
125    pub shape: Shape,
126    /// Names of embedded formats detected inside the response (e.g. `diff`,
127    /// `log`, `url`, `hash`). Empty when no embedded formats were seen.
128    #[serde(default)]
129    pub inner_formats: Vec<String>,
130    /// Hex-encoded prefix of SHA-256 over the response bytes — 32 hex chars
131    /// representing the first 16 bytes (128 bits), matching the paper's
132    /// stated fingerprint width and the Python extractor's output.
133    pub content_sha_prefix_hex: String,
134    /// Anonymized file-path hash for `Read`/`Edit`/`Write`-family tools;
135    /// `None` for tools that don't operate on a file path.
136    #[serde(default, skip_serializing_if = "Option::is_none")]
137    pub file_path_hash: Option<String>,
138    /// Did the pipeline emit a dedup hint in lieu of full content?
139    pub is_dedup_hit: bool,
140    /// Terminal layer in the pipeline decision tree.
141    pub layer_used: Layer,
142    /// L1 template identifier if `layer_used == L1`.
143    #[serde(default, skip_serializing_if = "Option::is_none")]
144    pub template_id: Option<String>,
145    /// Token count before pipeline encoding (baseline).
146    pub tokens_baseline: u32,
147    /// Token count of what the pipeline emitted.
148    pub tokens_final: u32,
149    /// Monotonic partition counter; increments on each compaction boundary.
150    pub context_partition: u32,
151    /// True for subagent (sidechain) tool-results; false for main session.
152    pub is_sidechain: bool,
153    /// Unix milliseconds at which the response was produced.
154    pub ts_ms: i64,
155    /// Fraction of events kept when sampling is enabled. `1.0` when every
156    /// event is recorded. Consumers of telemetry must scale counts by
157    /// `1 / sample_rate_applied`.
158    #[serde(default = "default_sample_rate")]
159    pub sample_rate_applied: f32,
160
161    // ─── Paper 3 — enricher effectiveness signals ────────────────────
162    //
163    // The four metrics surface as derived rates in `SessionSummary`
164    // and `tune analyze` (P-3-08). Recorded per-event so a tuner
165    // session can rebuild the rates without re-running the planner.
166    /// True when the planner pre-fetched this tool call (rather than
167    /// the LLM emitting it directly). Drives the `prefetch_hit_rate`
168    /// metric — paired with `cited_in_next_n_turns` once the post-pass
169    /// scanner runs.
170    #[serde(default, skip_serializing_if = "is_false")]
171    pub enricher_prefetched: bool,
172
173    /// `cost_model.typical_kb`-derived prediction (in tokens) at the
174    /// moment the planner admitted the call. Compared with
175    /// `tokens_baseline` to compute `cost_overrun_rate`.
176    #[serde(default, skip_serializing_if = "is_zero_u32")]
177    pub enricher_predicted_cost_tokens: u32,
178
179    /// Set on declined candidates that the host emitted anyway as a
180    /// telemetry-only event (so `tune analyze` can study what the
181    /// planner skipped). One of `"budget"` / `"low_probability"` /
182    /// `"preempted"` / `"prereq_missing"`.
183    #[serde(default, skip_serializing_if = "Option::is_none")]
184    pub enricher_decline_reason: Option<String>,
185
186    /// Reserved for an offline citation-enrichment post-pass that
187    /// re-reads the JSONL log and sets this to `true` when the next
188    /// 1–3 LLM messages textually reference any of the response's
189    /// `content_sha_prefix_hex` bytes. The live pipeline never sets
190    /// this; the post-pass is **not** shipped yet (the existing
191    /// `tune from-claude-logs --tools` only seeds `[tools.*]`
192    /// defaults, it does not populate citation fields). Stays `None`
193    /// until that pass lands.
194    #[serde(default, skip_serializing_if = "Option::is_none")]
195    pub cited_in_next_n_turns: Option<bool>,
196}
197
198fn is_false(b: &bool) -> bool {
199    !*b
200}
201fn is_zero_u32(n: &u32) -> bool {
202    *n == 0
203}
204fn is_zero_u64(n: &u64) -> bool {
205    *n == 0
206}
207
208fn default_sample_rate() -> f32 {
209    1.0
210}
211
212/// Session-level roll-up written on session close.
213///
214/// Separate from per-event sink for two reasons: (1) the summary requires
215/// all events to be complete, (2) the summary is a natural unit for the
216/// tuner to read without re-scanning JSONL.
217#[derive(Debug, Clone, Default, Serialize, Deserialize)]
218pub struct SessionSummary {
219    pub session_hash: String,
220    pub total_events: u64,
221    /// Fraction of events where L0 emitted a reference hint.
222    pub dedup_hit_rate: f32,
223    pub l1_hit_rate: f32,
224    pub l2_hit_rate: f32,
225    pub avg_response_chars: f32,
226    pub compaction_count: u32,
227    pub total_baseline_tokens: u64,
228    pub total_final_tokens: u64,
229    pub savings_pct: f32,
230    pub duration_sec: f32,
231    pub ended_at_ms: i64,
232    /// Fraction of events that were sampled (for scaling counts).
233    pub sample_rate_applied: f32,
234    /// Paper 3 enricher-effectiveness aggregates. Defaults to all-zero
235    /// when no enrichment activity was observed in the session.
236    #[serde(default)]
237    pub enrichment: EnrichmentEffectiveness,
238}
239
240/// Aggregate scoring of how well the Paper 3 enrichment planner served
241/// the agent during a session. Populated by the live pipeline (counters)
242/// plus the offline post-pass (`cited_*` numbers, see P-3-08).
243///
244/// Three primary rates the operator reads:
245///
246/// - **Prefetch hit rate** — fraction of planner-prefetched calls whose
247///   content was textually cited by the LLM in the next 1–3 turns. The
248///   north-star efficiency number; target ≥ 60%.
249/// - **Decline recall loss** — fraction of declined candidates the LLM
250///   ended up calling itself within the next 5 turns. Higher means the
251///   planner is too greedy. Target ≤ 10%.
252/// - **Cost overrun rate** — fraction of admitted calls whose actual
253///   `tokens_baseline` exceeded the predicted cost by ≥ 30%. Drives
254///   refresh of `cost_model.typical_kb` priors. Target ≤ 15%.
255///
256/// And the operator-facing ROI counters:
257///
258/// - **`inference_calls_saved_*`** — number of LLM round-trips the
259///   planner short-circuited, broken into three buckets so the
260///   contribution of each mechanism stays visible:
261///   `prefetch` (cited speculative calls), `dedup` (Paper 2 L0 hits —
262///   tool body replaced with a near-ref hint so the LLM never sees the
263///   full payload), and `fail_fast` (e.g. ToolSearch self-loop blocked
264///   after `fail_fast_after_n`).
265/// - **`inference_tokens_saved`** — sum of `tokens_baseline` from those
266///   short-circuited calls. The headline "we saved this much context"
267///   number for `tune analyze`.
268///
269/// Token savings vs a no-planner baseline is the roll-up "did the
270/// enricher pay for itself" answer; it lives in the corpus-replay
271/// validation harness (Paper 3 §Validation strategy), not on this
272/// summary, because it requires running the same session both with
273/// and without the planner. This struct carries only the per-session
274/// counters that drive the three rates above.
275#[derive(Debug, Clone, Default, Serialize, Deserialize, PartialEq)]
276pub struct EnrichmentEffectiveness {
277    /// Number of calls the planner pre-fetched.
278    pub total_prefetches: u32,
279    /// Of `total_prefetches`, the count whose content was cited by the
280    /// LLM in the next 1–3 turns. Filled in by the offline post-pass;
281    /// stays `0` until the post-pass has run.
282    pub cited_prefetches: u32,
283    /// Number of candidates the planner declined for any reason.
284    pub total_declines: u32,
285    /// Of `total_declines`, the count where the LLM later issued the
286    /// declined tool itself within the next 5 turns. Lower-is-better.
287    pub late_invoked_after_decline: u32,
288    /// Number of admitted calls whose actual `tokens_baseline` exceeded
289    /// the planner's prediction by ≥ 30%.
290    pub cost_overrun_count: u32,
291    /// Total admitted calls (denominator for `cost_overrun_rate`).
292    pub total_predictions: u32,
293    /// Sum of predicted-vs-actual prediction error in tokens — useful
294    /// for diagnosing systematic under- or over-estimation.
295    pub net_prediction_error_tokens: i64,
296
297    // ─── Inference round-trip savings ────────────────────────────────
298    /// LLM tool-uses avoided because the planner pre-fetched the
299    /// content and the model cited it in the next 1–3 turns. Counted
300    /// only when [`PipelineEvent::cited_in_next_n_turns`] is `Some(true)`.
301    #[serde(default, skip_serializing_if = "is_zero_u32")]
302    pub inference_calls_saved_prefetch: u32,
303    /// LLM tool-uses avoided because L0 dedup replaced the response
304    /// with a near-ref hint. Counted on every event with
305    /// `is_dedup_hit = true`.
306    #[serde(default, skip_serializing_if = "is_zero_u32")]
307    pub inference_calls_saved_dedup: u32,
308    /// LLM tool-uses avoided because [`crate::enrichment`] short-
309    /// circuited a `fail_fast_after_n` loop (e.g. ToolSearch returning
310    /// 0 bytes twice in a row). Incremented from the planner side via
311    /// [`Self::record_fail_fast_skip`].
312    #[serde(default, skip_serializing_if = "is_zero_u32")]
313    pub inference_calls_saved_fail_fast: u32,
314    /// Sum of baseline tokens from all three saved-call buckets. The
315    /// "we saved this much context" headline for `tune analyze`.
316    #[serde(default, skip_serializing_if = "is_zero_u64")]
317    pub inference_tokens_saved: u64,
318
319    // ─── Speculative-execution race instrumentation ─────────────────
320    /// Number of speculative tool-calls the host actually dispatched
321    /// out-of-band (a subset of `total_prefetches`: the fraction the
322    /// host *successfully scheduled*, not just plans the planner
323    /// produced).
324    #[serde(default, skip_serializing_if = "is_zero_u32")]
325    pub prefetch_dispatched: u32,
326    /// Of `prefetch_dispatched`, the count where the prefetch result
327    /// landed in the dedup cache *before* the LLM asked for the same
328    /// tool, so the LLM's call collapsed to an L0 hit. The other axis
329    /// of "did the speculation pay off" — independent of textual
330    /// citation.
331    #[serde(default, skip_serializing_if = "is_zero_u32")]
332    pub prefetch_won_race: u32,
333    /// Prefetches the LLM never asked for in the same session. Wasted
334    /// API quota / dollars; high values trigger R7's per-tool
335    /// auto-disable in `tune analyze`.
336    #[serde(default, skip_serializing_if = "is_zero_u32")]
337    pub prefetch_wasted: u32,
338}
339
340impl EnrichmentEffectiveness {
341    /// Fraction of prefetches that paid off (cited by the LLM).
342    /// Returns `None` when no prefetches happened — distinct from a
343    /// 0% hit rate.
344    pub fn prefetch_hit_rate(&self) -> Option<f32> {
345        (self.total_prefetches > 0)
346            .then(|| self.cited_prefetches as f32 / self.total_prefetches as f32)
347    }
348
349    /// Fraction of declined candidates the LLM later called anyway.
350    pub fn decline_recall_loss(&self) -> Option<f32> {
351        (self.total_declines > 0)
352            .then(|| self.late_invoked_after_decline as f32 / self.total_declines as f32)
353    }
354
355    /// Fraction of admitted calls whose actual baseline exceeded the
356    /// prediction by ≥ 30%.
357    pub fn cost_overrun_rate(&self) -> Option<f32> {
358        (self.total_predictions > 0)
359            .then(|| self.cost_overrun_count as f32 / self.total_predictions as f32)
360    }
361
362    /// Total LLM tool-uses the planner short-circuited across all three
363    /// buckets. The headline "round-trips avoided" number.
364    pub fn total_calls_saved(&self) -> u32 {
365        self.inference_calls_saved_prefetch
366            .saturating_add(self.inference_calls_saved_dedup)
367            .saturating_add(self.inference_calls_saved_fail_fast)
368    }
369
370    /// Fold one [`PipelineEvent`] into the per-session counters.
371    ///
372    /// Inspects the four enricher-specific fields plus `is_dedup_hit`
373    /// and `tokens_baseline`/`tokens_final` to maintain:
374    ///
375    /// 1. `total_prefetches` / `total_predictions` / `cost_overrun_*`
376    ///    when `enricher_prefetched = true`.
377    /// 2. `cited_prefetches` and `inference_calls_saved_prefetch` when
378    ///    the offline post-pass has set `cited_in_next_n_turns = Some(true)`.
379    /// 3. `total_declines` when `enricher_decline_reason` is set.
380    /// 4. `inference_calls_saved_dedup` (and the corresponding
381    ///    `inference_tokens_saved`) on every L0 dedup hit.
382    ///
383    /// Use it to drive `SessionSummary.enrichment` from the live
384    /// pipeline or from a JSONL post-pass — same accumulator either way.
385    pub fn accumulate(&mut self, ev: &PipelineEvent) {
386        if ev.enricher_prefetched {
387            self.total_prefetches = self.total_prefetches.saturating_add(1);
388            self.total_predictions = self.total_predictions.saturating_add(1);
389            let predicted = ev.enricher_predicted_cost_tokens as i64;
390            let actual = ev.tokens_baseline as i64;
391            self.net_prediction_error_tokens = self
392                .net_prediction_error_tokens
393                .saturating_add(actual - predicted);
394            // Overrun threshold: actual ≥ 130% of predicted, with a
395            // non-zero predicted to avoid trivial true on tiny calls.
396            if predicted > 0 && actual * 10 >= predicted * 13 {
397                self.cost_overrun_count = self.cost_overrun_count.saturating_add(1);
398            }
399            if matches!(ev.cited_in_next_n_turns, Some(true)) {
400                self.cited_prefetches = self.cited_prefetches.saturating_add(1);
401                self.inference_calls_saved_prefetch =
402                    self.inference_calls_saved_prefetch.saturating_add(1);
403                self.inference_tokens_saved = self
404                    .inference_tokens_saved
405                    .saturating_add(ev.tokens_baseline as u64);
406            }
407        }
408        if ev.is_dedup_hit {
409            self.inference_calls_saved_dedup = self.inference_calls_saved_dedup.saturating_add(1);
410            // Save the body's *baseline* tokens — the L0 hint replaces
411            // the full payload, so the LLM never gets billed for it.
412            // `tokens_final` is the hint itself (~9 tokens) and is
413            // trivial; the meaningful saving is `tokens_baseline`.
414            self.inference_tokens_saved = self
415                .inference_tokens_saved
416                .saturating_add(ev.tokens_baseline as u64);
417        }
418        if ev.enricher_decline_reason.is_some() {
419            self.total_declines = self.total_declines.saturating_add(1);
420        }
421    }
422
423    /// Record a `fail_fast_after_n` short-circuit — the planner refused
424    /// to issue a tool call (e.g. a third empty `ToolSearch`), so no
425    /// `PipelineEvent` is ever emitted for it. Call this from the
426    /// planner side to keep `inference_calls_saved_fail_fast` honest.
427    ///
428    /// `predicted_cost_tokens` is the per-call estimate from the
429    /// tool's `cost_model` — added to `inference_tokens_saved` so the
430    /// fail-fast contribution shows up in the headline number.
431    pub fn record_fail_fast_skip(&mut self, predicted_cost_tokens: u32) {
432        self.inference_calls_saved_fail_fast =
433            self.inference_calls_saved_fail_fast.saturating_add(1);
434        self.inference_tokens_saved = self
435            .inference_tokens_saved
436            .saturating_add(predicted_cost_tokens as u64);
437    }
438
439    /// Record that the host actually dispatched a speculative tool
440    /// call (a subset of `total_prefetches`: planner produced a plan
441    /// *and* the dispatcher succeeded in scheduling it). Increment
442    /// alongside `total_prefetches` from the host side; mismatches
443    /// between the two surface as "planner produced more than
444    /// dispatcher could schedule" — concurrency cap saturated.
445    pub fn record_prefetch_dispatched(&mut self) {
446        self.prefetch_dispatched = self.prefetch_dispatched.saturating_add(1);
447    }
448
449    /// Record that a dispatched prefetch landed in the dedup cache
450    /// before the LLM asked for the same tool, so the LLM's call
451    /// collapsed to an L0 hit. Independent of textual citation — the
452    /// LLM still issued the tool, but our prefetched body served the
453    /// answer at zero added latency.
454    pub fn record_prefetch_won_race(&mut self) {
455        self.prefetch_won_race = self.prefetch_won_race.saturating_add(1);
456    }
457
458    /// Record that a dispatched prefetch was never claimed by the
459    /// LLM during the rest of the session (offline post-pass tally).
460    /// High `prefetch_wasted / prefetch_dispatched` ratio is the
461    /// signal `tune analyze` watches for R7's per-tool auto-disable.
462    pub fn record_prefetch_wasted(&mut self) {
463        self.prefetch_wasted = self.prefetch_wasted.saturating_add(1);
464    }
465
466    /// Fraction of dispatched prefetches that beat the LLM to the
467    /// dedup cache. `None` when nothing was dispatched.
468    pub fn prefetch_race_win_rate(&self) -> Option<f32> {
469        (self.prefetch_dispatched > 0)
470            .then(|| self.prefetch_won_race as f32 / self.prefetch_dispatched as f32)
471    }
472
473    /// Fraction of dispatched prefetches that were never claimed by
474    /// the LLM. `None` when nothing was dispatched. Higher means the
475    /// planner's speculation was wasted — drive R7's auto-disable.
476    pub fn prefetch_waste_rate(&self) -> Option<f32> {
477        (self.prefetch_dispatched > 0)
478            .then(|| self.prefetch_wasted as f32 / self.prefetch_dispatched as f32)
479    }
480
481    /// Compact one-line summary suitable for `tune analyze` output.
482    pub fn report(&self) -> String {
483        let hit = self
484            .prefetch_hit_rate()
485            .map(|r| format!("{:.1}%", r * 100.0))
486            .unwrap_or_else(|| "n/a".into());
487        let loss = self
488            .decline_recall_loss()
489            .map(|r| format!("{:.1}%", r * 100.0))
490            .unwrap_or_else(|| "n/a".into());
491        let overrun = self
492            .cost_overrun_rate()
493            .map(|r| format!("{:.1}%", r * 100.0))
494            .unwrap_or_else(|| "n/a".into());
495        let race = self
496            .prefetch_race_win_rate()
497            .map(|r| format!("{:.1}%", r * 100.0))
498            .unwrap_or_else(|| "n/a".into());
499        let waste = self
500            .prefetch_waste_rate()
501            .map(|r| format!("{:.1}%", r * 100.0))
502            .unwrap_or_else(|| "n/a".into());
503        format!(
504            "prefetch_hit={hit} decline_recall_loss={loss} cost_overrun={overrun} \
505             race_win={race} waste={waste} \
506             calls_saved={saved} (prefetch={pf}, dedup={dd}, fail_fast={ff}) \
507             tokens_saved={ts} prefetches={p} dispatched={dp} \
508             declines={d} predictions={pr}",
509            saved = self.total_calls_saved(),
510            pf = self.inference_calls_saved_prefetch,
511            dd = self.inference_calls_saved_dedup,
512            ff = self.inference_calls_saved_fail_fast,
513            ts = self.inference_tokens_saved,
514            p = self.total_prefetches,
515            dp = self.prefetch_dispatched,
516            d = self.total_declines,
517            pr = self.total_predictions,
518        )
519    }
520}
521
522/// Persistence backend for telemetry events and summaries.
523///
524/// Implementations must be safe to call from multiple threads via shared
525/// ownership (`Arc<dyn TelemetrySink>`).
526pub trait TelemetrySink: Send + Sync {
527    /// Append a single per-response event.
528    fn record(&self, event: &PipelineEvent) -> Result<()>;
529
530    /// Append the session summary at session close. Default no-op for
531    /// sinks that don't distinguish rollups.
532    fn record_summary(&self, _summary: &SessionSummary) -> Result<()> {
533        Ok(())
534    }
535
536    /// Flush any buffered writes to durable storage.
537    fn flush(&self) -> Result<()> {
538        Ok(())
539    }
540}
541
542/// Append-only JSONL sink backed by a single file.
543///
544/// Thread-safe via an interior `Mutex<BufWriter>`. Writes one JSON line
545/// per event terminated by `'\n'`.
546pub struct JsonlSink {
547    path: PathBuf,
548    writer: Mutex<BufWriter<File>>,
549}
550
551impl JsonlSink {
552    /// Open (creating parent dirs as needed) the target path in append mode.
553    pub fn open(path: impl AsRef<Path>) -> Result<Self> {
554        let path = path.as_ref().to_path_buf();
555        if let Some(parent) = path.parent() {
556            std::fs::create_dir_all(parent)?;
557        }
558        let file = OpenOptions::new().create(true).append(true).open(&path)?;
559        Ok(Self {
560            path,
561            writer: Mutex::new(BufWriter::new(file)),
562        })
563    }
564
565    /// Returns the file path this sink writes to.
566    pub fn path(&self) -> &Path {
567        &self.path
568    }
569}
570
571impl TelemetrySink for JsonlSink {
572    fn record(&self, event: &PipelineEvent) -> Result<()> {
573        let line = serde_json::to_string(event)?;
574        let mut w = self.writer.lock().expect("telemetry writer mutex poisoned");
575        w.write_all(line.as_bytes())?;
576        w.write_all(b"\n")?;
577        Ok(())
578    }
579
580    fn record_summary(&self, summary: &SessionSummary) -> Result<()> {
581        // Summaries share the same stream but with an explicit type marker
582        // so analyzers can demultiplex.
583        let wrapped = serde_json::json!({
584            "type": "session_summary",
585            "data": summary,
586        });
587        let line = serde_json::to_string(&wrapped)?;
588        let mut w = self.writer.lock().expect("telemetry writer mutex poisoned");
589        w.write_all(line.as_bytes())?;
590        w.write_all(b"\n")?;
591        Ok(())
592    }
593
594    fn flush(&self) -> Result<()> {
595        self.writer
596            .lock()
597            .expect("telemetry writer mutex poisoned")
598            .flush()?;
599        Ok(())
600    }
601}
602
603/// No-op sink for tests and for code paths where telemetry is explicitly
604/// disabled. Always returns `Ok`; records nothing.
605#[derive(Default)]
606pub struct NullSink;
607
608impl TelemetrySink for NullSink {
609    fn record(&self, _event: &PipelineEvent) -> Result<()> {
610        Ok(())
611    }
612}
613
614/// In-memory sink for unit tests — retains events for assertion.
615#[derive(Default)]
616pub struct MemorySink {
617    events: Mutex<Vec<PipelineEvent>>,
618    summaries: Mutex<Vec<SessionSummary>>,
619}
620
621impl MemorySink {
622    pub fn new() -> Self {
623        Self::default()
624    }
625
626    pub fn events(&self) -> Vec<PipelineEvent> {
627        self.events.lock().unwrap().clone()
628    }
629
630    pub fn summaries(&self) -> Vec<SessionSummary> {
631        self.summaries.lock().unwrap().clone()
632    }
633
634    pub fn len(&self) -> usize {
635        self.events.lock().unwrap().len()
636    }
637
638    pub fn is_empty(&self) -> bool {
639        self.len() == 0
640    }
641}
642
643impl TelemetrySink for MemorySink {
644    fn record(&self, event: &PipelineEvent) -> Result<()> {
645        self.events.lock().unwrap().push(event.clone());
646        Ok(())
647    }
648
649    fn record_summary(&self, summary: &SessionSummary) -> Result<()> {
650        self.summaries.lock().unwrap().push(summary.clone());
651        Ok(())
652    }
653}
654
655#[cfg(test)]
656mod tests {
657    use super::*;
658    use std::sync::Arc;
659    use std::thread;
660
661    fn sample_event() -> PipelineEvent {
662        PipelineEvent {
663            session_hash: "sess0001".into(),
664            tool_call_id_hash: "tc0001".into(),
665            tool_name_anon: "Read".into(),
666            endpoint_class: "Read".into(),
667            response_chars: 1234,
668            shape: Shape::NumberedList,
669            inner_formats: vec![],
670            content_sha_prefix_hex: "0123456789abcdef".into(),
671            file_path_hash: Some("fpath001".into()),
672            is_dedup_hit: false,
673            layer_used: Layer::L3,
674            template_id: None,
675            tokens_baseline: 308,
676            tokens_final: 308,
677            context_partition: 0,
678            is_sidechain: false,
679            ts_ms: 1_700_000_000_000,
680            sample_rate_applied: 1.0,
681            enricher_prefetched: false,
682            enricher_predicted_cost_tokens: 0,
683            enricher_decline_reason: None,
684            cited_in_next_n_turns: None,
685        }
686    }
687
688    #[test]
689    fn memory_sink_captures_events() {
690        let sink = MemorySink::new();
691        let e = sample_event();
692        sink.record(&e).unwrap();
693        assert_eq!(sink.len(), 1);
694        assert_eq!(sink.events()[0].tool_call_id_hash, "tc0001");
695    }
696
697    #[test]
698    fn null_sink_is_noop() {
699        let sink = NullSink;
700        let e = sample_event();
701        sink.record(&e).unwrap();
702        // No API to verify, but must not panic.
703    }
704
705    #[test]
706    fn jsonl_sink_appends_line() {
707        let tmp = tempfile();
708        {
709            let sink = JsonlSink::open(&tmp).unwrap();
710            sink.record(&sample_event()).unwrap();
711            sink.flush().unwrap();
712        }
713        let body = std::fs::read_to_string(&tmp).unwrap();
714        assert_eq!(body.lines().count(), 1);
715        let deserialized: PipelineEvent = serde_json::from_str(body.trim()).unwrap();
716        assert_eq!(deserialized.tokens_baseline, 308);
717        std::fs::remove_file(&tmp).ok();
718    }
719
720    #[test]
721    fn jsonl_sink_survives_multiple_writes() {
722        let tmp = tempfile();
723        {
724            let sink = JsonlSink::open(&tmp).unwrap();
725            for i in 0..10 {
726                let mut e = sample_event();
727                e.tokens_baseline = i * 10;
728                sink.record(&e).unwrap();
729            }
730            sink.flush().unwrap();
731        }
732        let body = std::fs::read_to_string(&tmp).unwrap();
733        assert_eq!(body.lines().count(), 10);
734        std::fs::remove_file(&tmp).ok();
735    }
736
737    #[test]
738    fn jsonl_sink_supports_summary_tag() {
739        let tmp = tempfile();
740        {
741            let sink = JsonlSink::open(&tmp).unwrap();
742            sink.record(&sample_event()).unwrap();
743            let summary = SessionSummary {
744                session_hash: "sess0001".into(),
745                total_events: 10,
746                dedup_hit_rate: 0.35,
747                savings_pct: 0.35,
748                ended_at_ms: 1_700_000_100_000,
749                sample_rate_applied: 1.0,
750                ..Default::default()
751            };
752            sink.record_summary(&summary).unwrap();
753            sink.flush().unwrap();
754        }
755        let body = std::fs::read_to_string(&tmp).unwrap();
756        assert_eq!(body.lines().count(), 2);
757        assert!(body.contains("\"session_summary\""));
758        std::fs::remove_file(&tmp).ok();
759    }
760
761    #[test]
762    fn concurrent_writes_are_serialized() {
763        let tmp = tempfile();
764        {
765            let sink = Arc::new(JsonlSink::open(&tmp).unwrap());
766            let mut handles = vec![];
767            for i in 0..8 {
768                let sink = Arc::clone(&sink);
769                handles.push(thread::spawn(move || {
770                    let mut e = sample_event();
771                    e.tool_call_id_hash = format!("tc{i:04}");
772                    for _ in 0..25 {
773                        sink.record(&e).unwrap();
774                    }
775                }));
776            }
777            for h in handles {
778                h.join().unwrap();
779            }
780            sink.flush().unwrap();
781        }
782        let body = std::fs::read_to_string(&tmp).unwrap();
783        // 8 threads × 25 events = 200 lines, each a valid JSON object.
784        assert_eq!(body.lines().count(), 200);
785        for line in body.lines() {
786            let _: PipelineEvent = serde_json::from_str(line).unwrap();
787        }
788        std::fs::remove_file(&tmp).ok();
789    }
790
791    #[test]
792    fn schema_is_forward_compatible() {
793        // Verify that future-addition of fields (via #[non_exhaustive] +
794        // Default + serde defaults) doesn't break parsing.
795        let legacy = r#"{
796            "session_hash": "s",
797            "tool_call_id_hash": "t",
798            "tool_name_anon": "Read",
799            "endpoint_class": "Read",
800            "response_chars": 0,
801            "shape": "prose",
802            "content_sha_prefix_hex": "",
803            "is_dedup_hit": false,
804            "layer_used": "L3",
805            "tokens_baseline": 0,
806            "tokens_final": 0,
807            "context_partition": 0,
808            "is_sidechain": false,
809            "ts_ms": 0
810        }"#;
811        let parsed: PipelineEvent = serde_json::from_str(legacy).unwrap();
812        assert_eq!(parsed.sample_rate_applied, 1.0); // default applied
813        assert!(parsed.inner_formats.is_empty());
814        assert!(parsed.file_path_hash.is_none());
815    }
816
817    /// Cheap per-test unique path without pulling in the `tempfile` crate.
818    fn tempfile() -> PathBuf {
819        use std::sync::atomic::{AtomicU64, Ordering};
820        static COUNTER: AtomicU64 = AtomicU64::new(0);
821        let n = COUNTER.fetch_add(1, Ordering::Relaxed);
822        let pid = std::process::id();
823        std::env::temp_dir().join(format!("devboy_tele_test_{pid}_{n}.jsonl"))
824    }
825
826    #[test]
827    fn memory_sink_accessors() {
828        let sink = MemorySink::new();
829        assert!(sink.is_empty());
830        assert_eq!(sink.len(), 0);
831        sink.record(&sample_event()).unwrap();
832        assert!(!sink.is_empty());
833        assert_eq!(sink.len(), 1);
834        // flush() is a no-op on MemorySink
835        sink.flush().unwrap();
836    }
837
838    #[test]
839    fn memory_sink_captures_summaries() {
840        let sink = MemorySink::new();
841        let summary = SessionSummary {
842            session_hash: "abcd".into(),
843            total_events: 7,
844            savings_pct: 0.33,
845            ..Default::default()
846        };
847        sink.record_summary(&summary).unwrap();
848        assert_eq!(sink.summaries().len(), 1);
849        assert_eq!(sink.summaries()[0].total_events, 7);
850    }
851
852    #[test]
853    fn jsonl_sink_path_getter() {
854        let tmp = tempfile();
855        let sink = JsonlSink::open(&tmp).unwrap();
856        assert_eq!(sink.path(), tmp.as_path());
857        std::fs::remove_file(&tmp).ok();
858    }
859
860    #[test]
861    fn jsonl_sink_creates_parent_dirs() {
862        let parent =
863            std::env::temp_dir().join(format!("devboy_tele_nested_{}", std::process::id()));
864        let tmp = parent.join("deep/sub/events.jsonl");
865        assert!(!tmp.parent().unwrap().exists());
866        let sink = JsonlSink::open(&tmp).unwrap();
867        sink.record(&sample_event()).unwrap();
868        sink.flush().unwrap();
869        assert!(tmp.exists());
870        std::fs::remove_dir_all(&parent).ok();
871    }
872
873    #[test]
874    fn shape_and_layer_defaults() {
875        assert_eq!(Shape::default(), Shape::Unknown);
876        assert_eq!(Layer::default(), Layer::L3);
877    }
878
879    #[test]
880    fn shape_serde_snake_case() {
881        let j = serde_json::to_string(&Shape::MarkdownTable).unwrap();
882        assert_eq!(j, "\"markdown_table\"");
883        let parsed: Shape = serde_json::from_str("\"array_of_objects\"").unwrap();
884        assert_eq!(parsed, Shape::ArrayOfObjects);
885    }
886
887    #[test]
888    fn null_sink_flush_is_noop() {
889        let sink = NullSink;
890        sink.flush().unwrap();
891    }
892
893    #[test]
894    fn telemetry_error_display() {
895        let io_err = TelemetryError::Io(std::io::Error::other("boom"));
896        let msg = format!("{io_err}");
897        assert!(msg.contains("telemetry"));
898    }
899
900    // ─── Paper 3 EnrichmentEffectiveness ────────────────────────────
901
902    #[test]
903    fn enrichment_rates_are_none_when_no_activity() {
904        let e = EnrichmentEffectiveness::default();
905        assert!(e.prefetch_hit_rate().is_none());
906        assert!(e.decline_recall_loss().is_none());
907        assert!(e.cost_overrun_rate().is_none());
908        assert!(e.report().contains("n/a"));
909    }
910
911    #[test]
912    fn prefetch_hit_rate_handles_zero_and_partial_hits() {
913        let mut e = EnrichmentEffectiveness {
914            total_prefetches: 10,
915            cited_prefetches: 7,
916            ..Default::default()
917        };
918        assert_eq!(e.prefetch_hit_rate(), Some(0.7));
919        e.cited_prefetches = 0;
920        assert_eq!(e.prefetch_hit_rate(), Some(0.0));
921    }
922
923    #[test]
924    fn decline_recall_loss_metric() {
925        let e = EnrichmentEffectiveness {
926            total_declines: 20,
927            late_invoked_after_decline: 3,
928            ..Default::default()
929        };
930        let rate = e.decline_recall_loss().unwrap();
931        assert!((rate - 0.15).abs() < 1e-6);
932    }
933
934    #[test]
935    fn cost_overrun_rate_metric() {
936        let e = EnrichmentEffectiveness {
937            total_predictions: 100,
938            cost_overrun_count: 12,
939            ..Default::default()
940        };
941        let rate = e.cost_overrun_rate().unwrap();
942        assert!((rate - 0.12).abs() < 1e-6);
943    }
944
945    #[test]
946    fn report_format_is_human_readable() {
947        let e = EnrichmentEffectiveness {
948            total_prefetches: 10,
949            cited_prefetches: 7,
950            total_declines: 20,
951            late_invoked_after_decline: 2,
952            cost_overrun_count: 3,
953            total_predictions: 30,
954            ..Default::default()
955        };
956        let r = e.report();
957        assert!(r.contains("70.0%"), "expected prefetch_hit=70.0%, got {r}");
958        assert!(
959            r.contains("10.0%"),
960            "expected decline_recall_loss=10.0%, got {r}"
961        );
962        assert!(r.contains("10.0%"), "expected cost_overrun=10.0%, got {r}");
963    }
964
965    #[test]
966    fn pipeline_event_skips_default_enricher_fields_on_serialise() {
967        let evt = sample_event();
968        let json = serde_json::to_string(&evt).unwrap();
969        // Default values for enricher fields must be skip_serializing_if'd
970        // so older log files stay compact and parse cleanly.
971        assert!(!json.contains("enricher_prefetched"));
972        assert!(!json.contains("enricher_predicted_cost_tokens"));
973        assert!(!json.contains("enricher_decline_reason"));
974        assert!(!json.contains("cited_in_next_n_turns"));
975    }
976
977    #[test]
978    fn pipeline_event_round_trips_with_enricher_fields_populated() {
979        let mut evt = sample_event();
980        evt.enricher_prefetched = true;
981        evt.enricher_predicted_cost_tokens = 540;
982        evt.enricher_decline_reason = Some("budget".into());
983        evt.cited_in_next_n_turns = Some(true);
984        let json = serde_json::to_string(&evt).unwrap();
985        let back: PipelineEvent = serde_json::from_str(&json).unwrap();
986        assert!(back.enricher_prefetched);
987        assert_eq!(back.enricher_predicted_cost_tokens, 540);
988        assert_eq!(back.enricher_decline_reason.as_deref(), Some("budget"));
989        assert_eq!(back.cited_in_next_n_turns, Some(true));
990    }
991
992    // ─── Inference tool-call savings ─────────────────────────────────
993
994    #[test]
995    fn total_calls_saved_sums_three_buckets() {
996        let e = EnrichmentEffectiveness {
997            inference_calls_saved_prefetch: 7,
998            inference_calls_saved_dedup: 12,
999            inference_calls_saved_fail_fast: 3,
1000            ..Default::default()
1001        };
1002        assert_eq!(e.total_calls_saved(), 22);
1003    }
1004
1005    #[test]
1006    fn accumulate_dedup_hit_increments_dedup_bucket_and_tokens() {
1007        let mut e = EnrichmentEffectiveness::default();
1008        let mut ev = sample_event();
1009        ev.is_dedup_hit = true;
1010        ev.tokens_baseline = 800;
1011        ev.tokens_final = 9;
1012        e.accumulate(&ev);
1013        assert_eq!(e.inference_calls_saved_dedup, 1);
1014        assert_eq!(e.inference_tokens_saved, 800);
1015        assert_eq!(e.total_calls_saved(), 1);
1016        // dedup-only path must not move prefetch counters.
1017        assert_eq!(e.total_prefetches, 0);
1018        assert_eq!(e.total_predictions, 0);
1019    }
1020
1021    #[test]
1022    fn accumulate_cited_prefetch_increments_prefetch_bucket() {
1023        let mut e = EnrichmentEffectiveness::default();
1024        let mut ev = sample_event();
1025        ev.enricher_prefetched = true;
1026        ev.enricher_predicted_cost_tokens = 500;
1027        ev.tokens_baseline = 540;
1028        ev.cited_in_next_n_turns = Some(true);
1029        e.accumulate(&ev);
1030        assert_eq!(e.total_prefetches, 1);
1031        assert_eq!(e.cited_prefetches, 1);
1032        assert_eq!(e.inference_calls_saved_prefetch, 1);
1033        assert_eq!(e.inference_tokens_saved, 540);
1034        assert_eq!(e.cost_overrun_count, 0); // 540 < 500 * 1.3
1035    }
1036
1037    #[test]
1038    fn accumulate_uncited_prefetch_does_not_count_as_saved() {
1039        let mut e = EnrichmentEffectiveness::default();
1040        let mut ev = sample_event();
1041        ev.enricher_prefetched = true;
1042        ev.cited_in_next_n_turns = Some(false);
1043        ev.tokens_baseline = 200;
1044        e.accumulate(&ev);
1045        assert_eq!(e.total_prefetches, 1);
1046        assert_eq!(e.cited_prefetches, 0);
1047        assert_eq!(e.inference_calls_saved_prefetch, 0);
1048        assert_eq!(e.inference_tokens_saved, 0);
1049    }
1050
1051    #[test]
1052    fn accumulate_overrun_counts_when_actual_exceeds_130_percent() {
1053        let mut e = EnrichmentEffectiveness::default();
1054        let mut ev = sample_event();
1055        ev.enricher_prefetched = true;
1056        ev.enricher_predicted_cost_tokens = 100;
1057        ev.tokens_baseline = 200; // 200 ≥ 100 * 1.3 → overrun
1058        e.accumulate(&ev);
1059        assert_eq!(e.cost_overrun_count, 1);
1060        assert_eq!(e.net_prediction_error_tokens, 100);
1061    }
1062
1063    #[test]
1064    fn accumulate_decline_reason_increments_declines() {
1065        let mut e = EnrichmentEffectiveness::default();
1066        let mut ev = sample_event();
1067        ev.enricher_decline_reason = Some("budget".into());
1068        e.accumulate(&ev);
1069        assert_eq!(e.total_declines, 1);
1070    }
1071
1072    #[test]
1073    fn record_fail_fast_skip_increments_counter_and_tokens() {
1074        let mut e = EnrichmentEffectiveness::default();
1075        e.record_fail_fast_skip(75);
1076        e.record_fail_fast_skip(75);
1077        assert_eq!(e.inference_calls_saved_fail_fast, 2);
1078        assert_eq!(e.inference_tokens_saved, 150);
1079        assert_eq!(e.total_calls_saved(), 2);
1080    }
1081
1082    #[test]
1083    fn report_includes_calls_saved_and_tokens_saved() {
1084        let e = EnrichmentEffectiveness {
1085            total_prefetches: 10,
1086            cited_prefetches: 7,
1087            inference_calls_saved_prefetch: 7,
1088            inference_calls_saved_dedup: 12,
1089            inference_calls_saved_fail_fast: 3,
1090            inference_tokens_saved: 12_345,
1091            ..Default::default()
1092        };
1093        let r = e.report();
1094        assert!(r.contains("calls_saved=22"), "report missing total: {r}");
1095        assert!(
1096            r.contains("prefetch=7") && r.contains("dedup=12") && r.contains("fail_fast=3"),
1097            "report missing per-bucket breakdown: {r}"
1098        );
1099        assert!(
1100            r.contains("tokens_saved=12345"),
1101            "report missing tokens_saved: {r}"
1102        );
1103    }
1104
1105    #[test]
1106    fn enrichment_skips_zero_savings_fields_on_serialise() {
1107        let e = EnrichmentEffectiveness::default();
1108        let json = serde_json::to_string(&e).unwrap();
1109        assert!(!json.contains("inference_calls_saved_prefetch"));
1110        assert!(!json.contains("inference_calls_saved_dedup"));
1111        assert!(!json.contains("inference_calls_saved_fail_fast"));
1112        assert!(!json.contains("inference_tokens_saved"));
1113    }
1114
1115    #[test]
1116    fn enrichment_round_trips_with_savings_populated() {
1117        let e = EnrichmentEffectiveness {
1118            inference_calls_saved_prefetch: 4,
1119            inference_calls_saved_dedup: 9,
1120            inference_calls_saved_fail_fast: 2,
1121            inference_tokens_saved: 8_400,
1122            ..Default::default()
1123        };
1124        let json = serde_json::to_string(&e).unwrap();
1125        let back: EnrichmentEffectiveness = serde_json::from_str(&json).unwrap();
1126        assert_eq!(back, e);
1127    }
1128
1129    // ─── Speculative-execution race instrumentation ─────────────────
1130
1131    #[test]
1132    fn record_prefetch_dispatched_increments_counter() {
1133        let mut e = EnrichmentEffectiveness::default();
1134        e.record_prefetch_dispatched();
1135        e.record_prefetch_dispatched();
1136        e.record_prefetch_dispatched();
1137        assert_eq!(e.prefetch_dispatched, 3);
1138    }
1139
1140    #[test]
1141    fn race_win_rate_returns_some_only_when_dispatched() {
1142        let e0 = EnrichmentEffectiveness::default();
1143        assert!(e0.prefetch_race_win_rate().is_none());
1144        let e = EnrichmentEffectiveness {
1145            prefetch_dispatched: 10,
1146            prefetch_won_race: 7,
1147            ..Default::default()
1148        };
1149        let rate = e.prefetch_race_win_rate().unwrap();
1150        assert!((rate - 0.7).abs() < 1e-6);
1151    }
1152
1153    #[test]
1154    fn waste_rate_separates_dispatched_from_total_prefetches() {
1155        // Distinct from `prefetch_hit_rate`: hit_rate is "did the LLM
1156        // text-cite the prefetched body", waste_rate is "did the LLM
1157        // never call the same tool at all". Different denominators.
1158        let e = EnrichmentEffectiveness {
1159            total_prefetches: 12,
1160            prefetch_dispatched: 10, // 2 plans the dispatcher dropped
1161            prefetch_wasted: 4,      // 4 of 10 dispatched never claimed
1162            ..Default::default()
1163        };
1164        let rate = e.prefetch_waste_rate().unwrap();
1165        assert!((rate - 0.4).abs() < 1e-6);
1166        assert!(e.prefetch_race_win_rate().unwrap().abs() < 1e-6); // 0/10 = 0
1167    }
1168
1169    #[test]
1170    fn report_includes_race_and_waste_when_dispatched() {
1171        let e = EnrichmentEffectiveness {
1172            total_prefetches: 10,
1173            prefetch_dispatched: 10,
1174            prefetch_won_race: 6,
1175            prefetch_wasted: 2,
1176            ..Default::default()
1177        };
1178        let r = e.report();
1179        assert!(r.contains("race_win=60.0%"), "report missing race_win: {r}");
1180        assert!(r.contains("waste=20.0%"), "report missing waste: {r}");
1181        assert!(
1182            r.contains("dispatched=10"),
1183            "report missing dispatched: {r}"
1184        );
1185    }
1186
1187    #[test]
1188    fn race_fields_skip_serialise_when_zero() {
1189        let e = EnrichmentEffectiveness::default();
1190        let json = serde_json::to_string(&e).unwrap();
1191        assert!(!json.contains("prefetch_dispatched"));
1192        assert!(!json.contains("prefetch_won_race"));
1193        assert!(!json.contains("prefetch_wasted"));
1194    }
1195
1196    #[test]
1197    fn race_fields_round_trip_when_populated() {
1198        let e = EnrichmentEffectiveness {
1199            prefetch_dispatched: 12,
1200            prefetch_won_race: 8,
1201            prefetch_wasted: 3,
1202            ..Default::default()
1203        };
1204        let json = serde_json::to_string(&e).unwrap();
1205        let back: EnrichmentEffectiveness = serde_json::from_str(&json).unwrap();
1206        assert_eq!(back, e);
1207    }
1208}