Skip to main content

mcpr_core/
timing.rs

1//! Per-stage wall-clock timing instrumentation.
2//!
3//! Gated on the `MCPR_STAGE_TIMING` env var, checked **once** per
4//! process and cached via `OnceLock`. When disabled,
5//! [`StageTimer::mark`] is a no-op and [`StageTimer::finish`] returns
6//! `None` — ~1 ns of overhead from a cached branch predict.
7//!
8//! When enabled, timings populate [`crate::event::StageTimings`] which
9//! handlers attach to each `RequestEvent`. Events flow through the
10//! existing event bus, so sinks (stderr JSON, sqlite, cloud) pick
11//! them up without extra wiring.
12//!
13//! ## Enabling
14//!
15//! ```text
16//! MCPR_STAGE_TIMING=1 mcpr proxy run ./mcpr.toml
17//! # or
18//! MCPR_STAGE_TIMING=true mcpr proxy run ...
19//! ```
20//!
21//! ## Reading the data
22//!
23//! The JSON stderr sink (default log format) writes each event —
24//! including `stage_timings` — as one JSON line to the proxy's log
25//! file at `~/.mcpr/proxies/<name>/proxy.log`. To aggregate:
26//!
27//! ```text
28//! tail -n +N ~/.mcpr/proxies/bench/proxy.log \
29//!     | jq -c 'select(.stage_timings)' \
30//!     | jq -s '[.[].stage_timings.schema_us // empty] | sort'
31//! ```
32//!
33//! The `benches/scripts/scenarios/where-time-goes.sh` harness does
34//! this aggregation automatically and prints a per-stage summary.
35
36use std::sync::OnceLock;
37use std::time::Instant;
38
39use crate::event::StageTimings;
40
41const MCPR_STAGE_TIMING_ENV: &str = "MCPR_STAGE_TIMING";
42
43/// Check the env var once and cache. Subsequent calls are a single
44/// atomic load (~1 ns). Returns `true` when the env var is set to
45/// `"1"`, `"true"`, or `"yes"` (case-sensitive — keep it strict so
46/// typos don't accidentally enable instrumentation in production).
47fn timing_enabled() -> bool {
48    static ENABLED: OnceLock<bool> = OnceLock::new();
49    *ENABLED.get_or_init(|| {
50        matches!(
51            std::env::var(MCPR_STAGE_TIMING_ENV).as_deref(),
52            Ok("1") | Ok("true") | Ok("yes")
53        )
54    })
55}
56
57/// Named stages the pipeline reports timings for. Keep this list
58/// aligned with the `Option<u32>` fields on
59/// [`crate::event::StageTimings`].
60#[derive(Clone, Copy, Debug)]
61pub enum Stage {
62    /// Buffering the upstream response body (`read_body_capped`).
63    Buffer,
64    /// SSE frame extraction on the response body.
65    SseUnwrap,
66    /// JSON parse of the buffered/unwrapped body.
67    JsonParse,
68    /// `SchemaManager::ingest` + stale-flag check.
69    Schema,
70    /// Widget overlay substitution (`resources/read` with `ui://widget/*`).
71    WidgetOverlay,
72    /// Marker scan (`rewrite::has_markers`).
73    MarkerScan,
74    /// Structured CSP rewrite (`rewrite::rewrite_in_place`).
75    Rewrite,
76    /// `serde_json::to_vec` when reserialization was needed.
77    Reserialize,
78    /// Passthrough URL substitution (non-MCP JSON path).
79    UrlMap,
80    /// Post-response side-effects (session start, health, client info).
81    SideEffects,
82}
83
84/// Lightweight per-request stopwatch. Creation and every `mark()` are
85/// no-ops when [`MCPR_STAGE_TIMING`](const@MCPR_STAGE_TIMING_ENV) is
86/// not set — safe to sprinkle liberally through hot paths.
87pub struct StageTimer {
88    state: State,
89}
90
91enum State {
92    /// Env var unset — all ops are no-ops.
93    Disabled,
94    /// Env var set — track the clock.
95    Enabled {
96        last: Instant,
97        timings: StageTimings,
98    },
99}
100
101impl Default for StageTimer {
102    fn default() -> Self {
103        Self::new()
104    }
105}
106
107impl StageTimer {
108    /// Construct. Cheap either way — ~1 ns disabled, one `Instant::now()`
109    /// (~10 ns) when enabled.
110    pub fn new() -> Self {
111        let state = if timing_enabled() {
112            State::Enabled {
113                last: Instant::now(),
114                timings: StageTimings::default(),
115            }
116        } else {
117            State::Disabled
118        };
119        Self { state }
120    }
121
122    /// Record the microseconds elapsed since the previous mark (or
123    /// construction) into `stage`'s slot, then reset the clock.
124    /// No-op when disabled.
125    pub fn mark(&mut self, stage: Stage) {
126        let State::Enabled { last, timings } = &mut self.state else {
127            return;
128        };
129        let us = last.elapsed().as_micros() as u32;
130        match stage {
131            Stage::Buffer => timings.buffer_us = Some(us),
132            Stage::SseUnwrap => timings.sse_unwrap_us = Some(us),
133            Stage::JsonParse => timings.json_parse_us = Some(us),
134            Stage::Schema => timings.schema_us = Some(us),
135            Stage::WidgetOverlay => timings.widget_overlay_us = Some(us),
136            Stage::MarkerScan => timings.marker_scan_us = Some(us),
137            Stage::Rewrite => timings.rewrite_us = Some(us),
138            Stage::Reserialize => timings.reserialize_us = Some(us),
139            Stage::UrlMap => timings.url_map_us = Some(us),
140            Stage::SideEffects => timings.side_effects_us = Some(us),
141        }
142        *last = Instant::now();
143    }
144
145    /// Consume the timer and return the accumulated timings. Returns
146    /// `None` when disabled — callers assign directly into
147    /// `ResponseSummary::stage_timings` so `None` means "don't emit
148    /// this field."
149    pub fn finish(self) -> Option<StageTimings> {
150        match self.state {
151            State::Enabled { timings, .. } => Some(timings),
152            State::Disabled => None,
153        }
154    }
155}
156
157#[cfg(test)]
158mod tests {
159    use super::*;
160
161    // Note: `timing_enabled()` reads env ONCE via OnceLock, so these
162    // tests can't toggle it mid-process. They exercise the structural
163    // invariants instead: disabled timer is cheap and always returns
164    // None; enabled timer (forced via direct construction) records.
165
166    #[test]
167    fn disabled_timer_is_noop() {
168        let mut t = StageTimer {
169            state: State::Disabled,
170        };
171        t.mark(Stage::Schema); // must not panic / allocate
172        assert!(t.finish().is_none());
173    }
174
175    #[test]
176    fn enabled_timer_records_marks() {
177        let mut t = StageTimer {
178            state: State::Enabled {
179                last: Instant::now(),
180                timings: StageTimings::default(),
181            },
182        };
183        // Sleep a tiny bit so the microsecond field is non-zero.
184        std::thread::sleep(std::time::Duration::from_micros(50));
185        t.mark(Stage::Schema);
186        let out = t.finish().expect("enabled timer should yield Some");
187        assert!(out.schema_us.is_some(), "schema_us should be recorded");
188        assert!(
189            out.json_parse_us.is_none(),
190            "only marked stages should be populated"
191        );
192    }
193}