mcpr_core/timing.rs
1//! Per-stage wall-clock timing instrumentation.
2//!
3//! Gated on the `MCPR_STAGE_TIMING` env var, checked **once** per
4//! process and cached via `OnceLock`. When disabled,
5//! [`StageTimer::mark`] is a no-op and [`StageTimer::finish`] returns
6//! `None` — ~1 ns of overhead from a cached branch predict.
7//!
8//! When enabled, timings populate [`crate::event::StageTimings`] which
9//! handlers attach to each `RequestEvent`. Events flow through the
10//! existing event bus, so sinks (stderr JSON, sqlite, cloud) pick
11//! them up without extra wiring.
12//!
13//! ## Enabling
14//!
15//! ```text
16//! MCPR_STAGE_TIMING=1 mcpr proxy run -c ./mcpr.toml
17//! # or
18//! MCPR_STAGE_TIMING=true mcpr proxy run ...
19//! ```
20//!
21//! ## Reading the data
22//!
23//! The JSON stderr sink (default log format) writes each event —
24//! including `stage_timings` — as one JSON line to the proxy's log
25//! file at `~/.mcpr/proxies/<name>/proxy.log`. To aggregate:
26//!
27//! ```text
28//! tail -n +N ~/.mcpr/proxies/bench/proxy.log \
29//! | jq -c 'select(.stage_timings)' \
30//! | jq -s '[.[].stage_timings.schema_us // empty] | sort'
31//! ```
32//!
33//! The `benches/scripts/scenarios/where-time-goes.sh` harness does
34//! this aggregation automatically and prints a per-stage summary.
35
36use std::sync::OnceLock;
37use std::time::Instant;
38
39use crate::event::StageTimings;
40
41const MCPR_STAGE_TIMING_ENV: &str = "MCPR_STAGE_TIMING";
42
43/// Check the env var once and cache. Subsequent calls are a single
44/// atomic load (~1 ns). Returns `true` when the env var is set to
45/// `"1"`, `"true"`, or `"yes"` (case-sensitive — keep it strict so
46/// typos don't accidentally enable instrumentation in production).
47fn timing_enabled() -> bool {
48 static ENABLED: OnceLock<bool> = OnceLock::new();
49 *ENABLED.get_or_init(|| {
50 matches!(
51 std::env::var(MCPR_STAGE_TIMING_ENV).as_deref(),
52 Ok("1") | Ok("true") | Ok("yes")
53 )
54 })
55}
56
57/// Named stages the pipeline reports timings for. Keep this list
58/// aligned with the `Option<u32>` fields on
59/// [`crate::event::StageTimings`].
60#[derive(Clone, Copy, Debug)]
61pub enum Stage {
62 /// Buffering the upstream response body (`read_body_capped`).
63 Buffer,
64 /// SSE frame extraction on the response body.
65 SseUnwrap,
66 /// JSON parse of the buffered/unwrapped body.
67 JsonParse,
68 /// `SchemaManager::ingest` + stale-flag check.
69 Schema,
70 /// Widget overlay substitution (`resources/read` with `ui://widget/*`).
71 WidgetOverlay,
72 /// Marker scan (`rewrite::has_markers`).
73 MarkerScan,
74 /// Structured CSP rewrite (`rewrite::rewrite_in_place`).
75 Rewrite,
76 /// `serde_json::to_vec` when reserialization was needed.
77 Reserialize,
78 /// Passthrough URL substitution (non-MCP JSON path).
79 UrlMap,
80 /// Post-response side-effects (session start, health, client info).
81 SideEffects,
82}
83
84/// Lightweight per-request stopwatch. Creation and every `mark()` are
85/// no-ops when [`MCPR_STAGE_TIMING`](const@MCPR_STAGE_TIMING_ENV) is
86/// not set — safe to sprinkle liberally through hot paths.
87pub struct StageTimer {
88 state: State,
89}
90
91enum State {
92 /// Env var unset — all ops are no-ops.
93 Disabled,
94 /// Env var set — track the clock.
95 Enabled {
96 last: Instant,
97 timings: StageTimings,
98 },
99}
100
101impl Default for StageTimer {
102 fn default() -> Self {
103 Self::new()
104 }
105}
106
107impl StageTimer {
108 /// Construct. Cheap either way — ~1 ns disabled, one `Instant::now()`
109 /// (~10 ns) when enabled.
110 pub fn new() -> Self {
111 let state = if timing_enabled() {
112 State::Enabled {
113 last: Instant::now(),
114 timings: StageTimings::default(),
115 }
116 } else {
117 State::Disabled
118 };
119 Self { state }
120 }
121
122 /// Record the microseconds elapsed since the previous mark (or
123 /// construction) into `stage`'s slot, then reset the clock.
124 /// No-op when disabled.
125 pub fn mark(&mut self, stage: Stage) {
126 let State::Enabled { last, timings } = &mut self.state else {
127 return;
128 };
129 let us = last.elapsed().as_micros() as u32;
130 match stage {
131 Stage::Buffer => timings.buffer_us = Some(us),
132 Stage::SseUnwrap => timings.sse_unwrap_us = Some(us),
133 Stage::JsonParse => timings.json_parse_us = Some(us),
134 Stage::Schema => timings.schema_us = Some(us),
135 Stage::WidgetOverlay => timings.widget_overlay_us = Some(us),
136 Stage::MarkerScan => timings.marker_scan_us = Some(us),
137 Stage::Rewrite => timings.rewrite_us = Some(us),
138 Stage::Reserialize => timings.reserialize_us = Some(us),
139 Stage::UrlMap => timings.url_map_us = Some(us),
140 Stage::SideEffects => timings.side_effects_us = Some(us),
141 }
142 *last = Instant::now();
143 }
144
145 /// Consume the timer and return the accumulated timings. Returns
146 /// `None` when disabled — callers assign directly into
147 /// `ResponseSummary::stage_timings` so `None` means "don't emit
148 /// this field."
149 pub fn finish(self) -> Option<StageTimings> {
150 match self.state {
151 State::Enabled { timings, .. } => Some(timings),
152 State::Disabled => None,
153 }
154 }
155}
156
157#[cfg(test)]
158mod tests {
159 use super::*;
160
161 // Note: `timing_enabled()` reads env ONCE via OnceLock, so these
162 // tests can't toggle it mid-process. They exercise the structural
163 // invariants instead: disabled timer is cheap and always returns
164 // None; enabled timer (forced via direct construction) records.
165
166 #[test]
167 fn disabled_timer_is_noop() {
168 let mut t = StageTimer {
169 state: State::Disabled,
170 };
171 t.mark(Stage::Schema); // must not panic / allocate
172 assert!(t.finish().is_none());
173 }
174
175 #[test]
176 fn enabled_timer_records_marks() {
177 let mut t = StageTimer {
178 state: State::Enabled {
179 last: Instant::now(),
180 timings: StageTimings::default(),
181 },
182 };
183 // Sleep a tiny bit so the microsecond field is non-zero.
184 std::thread::sleep(std::time::Duration::from_micros(50));
185 t.mark(Stage::Schema);
186 let out = t.finish().expect("enabled timer should yield Some");
187 assert!(out.schema_us.is_some(), "schema_us should be recorded");
188 assert!(
189 out.json_parse_us.is_none(),
190 "only marked stages should be populated"
191 );
192 }
193}