Skip to main content

mockforge_plugin_loader/
invocation_metrics.rs

1//! Per-invocation metrics for plugin execution.
2//!
3//! Each call into a plugin emits an [`InvocationMetric`] on a
4//! broadcast channel ([`InvocationMetricsBus`]). Observers — the admin
5//! UI's plugin status page, an OTLP exporter for cloud metering, a
6//! billing aggregator — subscribe and consume.
7//!
8//! This is the foundation for cloud-plugins Phase 2 metering: the bus
9//! shape is the same whether we're emitting locally for the OSS admin
10//! UI or shipping events to a SaaS billing pipeline. Local first,
11//! cloud second.
12//!
13//! # Why a broadcast channel
14//!
15//! Multiple subscribers each get every event without coordinating.
16//! Slow consumers don't slow down the producer — `tokio::sync::broadcast`
17//! drops messages for laggers and surfaces it as a `RecvError::Lagged`,
18//! which is the right semantics for telemetry (better to miss a sample
19//! than to backpressure the request path).
20//!
21//! # Why RAII for the timer
22//!
23//! The producer side is wrapped in [`InvocationTimer`], which records
24//! the start instant on construction and emits the metric on `finish_*`.
25//! Forgetting to finish a timer is a (warned) bug; the `Drop` impl emits
26//! a "dropped" metric so we don't silently lose calls.
27
28use std::sync::Arc;
29
30use chrono::{DateTime, Utc};
31use mockforge_plugin_core::PluginId;
32use tokio::sync::broadcast::{self, Receiver, Sender};
33
34/// Default capacity for the broadcast channel. 256 is generous for a
35/// single mockforge process — laggers up to ~256 events behind still
36/// receive; beyond that the receiver gets `RecvError::Lagged`.
37const DEFAULT_CHANNEL_CAPACITY: usize = 256;
38
39/// One plugin invocation's measured cost + outcome.
40#[derive(Debug, Clone)]
41pub struct InvocationMetric {
42    /// The plugin whose function was invoked.
43    pub plugin_id: PluginId,
44    /// The exported function the host called. For most plugins this is
45    /// `"on_request"` / `"on_response"` / similar.
46    pub function_name: String,
47    /// Wall-clock start, useful for correlation with request traces.
48    pub started_at: DateTime<Utc>,
49    /// Wall-time spent inside the plugin call. Microseconds for fine
50    /// resolution; sub-millisecond invocations are common for transform
51    /// plugins.
52    pub wall_time_us: u64,
53    /// Peak memory usage observed during this invocation, in bytes.
54    /// Reported as 0 when the underlying tracker isn't wired up
55    /// (current state — see `memory_tracking.rs` for the limiter that
56    /// will populate this in cloud Phase 2).
57    pub memory_peak_bytes: u64,
58    /// Outcome.
59    pub status: InvocationStatus,
60}
61
62/// Outcome of a plugin invocation, attached to each [`InvocationMetric`].
63#[derive(Debug, Clone, PartialEq, Eq)]
64pub enum InvocationStatus {
65    /// The plugin function returned normally.
66    Success,
67    /// The plugin function returned an error or trapped.
68    Failure {
69        /// Error message captured from the failed invocation.
70        error: String,
71    },
72    /// The timer was dropped without `finish_*` being called. Indicates
73    /// a host-side bug — the call path didn't see the work through.
74    Dropped,
75}
76
77/// Broadcast bus that fans an [`InvocationMetric`] out to subscribers.
78///
79/// Construct one per [`PluginSandbox`] and clone the `Arc` to pass it
80/// into each [`SandboxInstance`]. External code calls [`subscribe`] to
81/// get a [`Receiver`] for live events.
82///
83/// [`PluginSandbox`]: crate::sandbox::PluginSandbox
84/// [`SandboxInstance`]: crate::sandbox::SandboxInstance
85/// [`subscribe`]: InvocationMetricsBus::subscribe
86#[derive(Debug, Clone)]
87pub struct InvocationMetricsBus {
88    tx: Sender<InvocationMetric>,
89}
90
91impl InvocationMetricsBus {
92    /// Construct a bus with the default channel capacity
93    /// ([`DEFAULT_CHANNEL_CAPACITY`]).
94    pub fn new() -> Self {
95        Self::with_capacity(DEFAULT_CHANNEL_CAPACITY)
96    }
97
98    /// Construct a bus with a custom channel capacity. Larger values
99    /// tolerate slower subscribers without dropping events; smaller
100    /// values use less memory per bus.
101    pub fn with_capacity(capacity: usize) -> Self {
102        let (tx, _rx) = broadcast::channel(capacity);
103        Self { tx }
104    }
105
106    /// Subscribe to live metrics. Each subscriber sees every event sent
107    /// after the call to `subscribe`; events emitted before are not
108    /// replayed.
109    pub fn subscribe(&self) -> Receiver<InvocationMetric> {
110        self.tx.subscribe()
111    }
112
113    /// Send a metric. Non-blocking. If the channel is full, the oldest
114    /// undelivered event is dropped (broadcast semantics) — that
115    /// subscriber gets `RecvError::Lagged` on the next `recv`. If
116    /// nobody is subscribed, the metric is silently dropped.
117    pub fn record(&self, metric: InvocationMetric) {
118        // `send` returns `Err` if there are no subscribers. That's
119        // expected (e.g. self-hosted user without admin UI open) — not
120        // an error, so we ignore it.
121        let _ = self.tx.send(metric);
122    }
123
124    /// Current number of subscribers. Useful for tests and for skipping
125    /// metric construction when nobody's listening.
126    pub fn subscriber_count(&self) -> usize {
127        self.tx.receiver_count()
128    }
129}
130
131impl Default for InvocationMetricsBus {
132    fn default() -> Self {
133        Self::new()
134    }
135}
136
137/// RAII guard that times a plugin invocation and emits one
138/// [`InvocationMetric`] when finished.
139///
140/// Construct with [`start`], then call exactly one of
141/// [`finish_success`] or [`finish_failure`]. Dropping the timer
142/// without calling `finish_*` emits a [`InvocationStatus::Dropped`]
143/// metric — this is a bug, but a visible one rather than a silent
144/// omission.
145///
146/// [`start`]: InvocationTimer::start
147/// [`finish_success`]: InvocationTimer::finish_success
148/// [`finish_failure`]: InvocationTimer::finish_failure
149pub struct InvocationTimer {
150    bus: Arc<InvocationMetricsBus>,
151    plugin_id: PluginId,
152    function_name: String,
153    started_at: DateTime<Utc>,
154    started_instant: std::time::Instant,
155    /// Set to `true` when `finish_*` consumes the timer, so the `Drop`
156    /// impl knows to skip the dropped-metric emit.
157    finished: bool,
158}
159
160impl InvocationTimer {
161    /// Start timing an invocation. The wall-clock and `Instant` are
162    /// captured here; nothing is sent on the bus until `finish_*`.
163    pub fn start(
164        bus: Arc<InvocationMetricsBus>,
165        plugin_id: PluginId,
166        function_name: impl Into<String>,
167    ) -> Self {
168        Self {
169            bus,
170            plugin_id,
171            function_name: function_name.into(),
172            started_at: Utc::now(),
173            started_instant: std::time::Instant::now(),
174            finished: false,
175        }
176    }
177
178    /// Emit a successful invocation metric.
179    pub fn finish_success(mut self, memory_peak_bytes: u64) {
180        self.emit(InvocationStatus::Success, memory_peak_bytes);
181    }
182
183    /// Emit a failed invocation metric.
184    pub fn finish_failure(mut self, error: impl Into<String>, memory_peak_bytes: u64) {
185        self.emit(
186            InvocationStatus::Failure {
187                error: error.into(),
188            },
189            memory_peak_bytes,
190        );
191    }
192
193    fn emit(&mut self, status: InvocationStatus, memory_peak_bytes: u64) {
194        self.finished = true;
195        let wall_time_us = self.started_instant.elapsed().as_micros().min(u64::MAX as u128) as u64;
196        let metric = InvocationMetric {
197            plugin_id: self.plugin_id.clone(),
198            function_name: std::mem::take(&mut self.function_name),
199            started_at: self.started_at,
200            wall_time_us,
201            memory_peak_bytes,
202            status,
203        };
204        self.bus.record(metric);
205    }
206}
207
208impl Drop for InvocationTimer {
209    fn drop(&mut self) {
210        if !self.finished {
211            tracing::warn!(
212                plugin_id = %self.plugin_id,
213                function_name = %self.function_name,
214                "InvocationTimer dropped without finish_* — emitting Dropped metric"
215            );
216            // Re-use `emit`. We have to provide a memory_peak; we don't
217            // know it at drop time, so 0 is the honest value.
218            self.emit(InvocationStatus::Dropped, 0);
219        }
220    }
221}
222
223#[cfg(test)]
224mod tests {
225    use super::*;
226    use std::time::Duration;
227
228    fn test_plugin_id() -> PluginId {
229        PluginId::new("test-plugin")
230    }
231
232    #[tokio::test]
233    async fn record_with_no_subscribers_is_silent() {
234        let bus = InvocationMetricsBus::new();
235        assert_eq!(bus.subscriber_count(), 0);
236
237        // Should not panic, should not error.
238        bus.record(InvocationMetric {
239            plugin_id: test_plugin_id(),
240            function_name: "fn1".into(),
241            started_at: Utc::now(),
242            wall_time_us: 100,
243            memory_peak_bytes: 0,
244            status: InvocationStatus::Success,
245        });
246    }
247
248    #[tokio::test]
249    async fn subscribe_then_receive() {
250        let bus = InvocationMetricsBus::new();
251        let mut rx = bus.subscribe();
252        assert_eq!(bus.subscriber_count(), 1);
253
254        bus.record(InvocationMetric {
255            plugin_id: test_plugin_id(),
256            function_name: "fn1".into(),
257            started_at: Utc::now(),
258            wall_time_us: 42,
259            memory_peak_bytes: 0,
260            status: InvocationStatus::Success,
261        });
262
263        let received = rx.recv().await.unwrap();
264        assert_eq!(received.function_name, "fn1");
265        assert_eq!(received.wall_time_us, 42);
266        assert_eq!(received.status, InvocationStatus::Success);
267    }
268
269    #[tokio::test]
270    async fn multiple_subscribers_each_get_every_event() {
271        let bus = InvocationMetricsBus::new();
272        let mut rx1 = bus.subscribe();
273        let mut rx2 = bus.subscribe();
274        assert_eq!(bus.subscriber_count(), 2);
275
276        bus.record(InvocationMetric {
277            plugin_id: test_plugin_id(),
278            function_name: "fn-broadcast".into(),
279            started_at: Utc::now(),
280            wall_time_us: 7,
281            memory_peak_bytes: 0,
282            status: InvocationStatus::Success,
283        });
284
285        let m1 = rx1.recv().await.unwrap();
286        let m2 = rx2.recv().await.unwrap();
287        assert_eq!(m1.function_name, "fn-broadcast");
288        assert_eq!(m2.function_name, "fn-broadcast");
289    }
290
291    #[tokio::test]
292    async fn timer_finish_success_emits_metric() {
293        let bus = Arc::new(InvocationMetricsBus::new());
294        let mut rx = bus.subscribe();
295
296        let timer = InvocationTimer::start(bus.clone(), test_plugin_id(), "do_thing");
297        // Sleep a tiny amount so wall_time_us is meaningfully > 0.
298        tokio::time::sleep(Duration::from_millis(2)).await;
299        timer.finish_success(1024);
300
301        let metric = rx.recv().await.unwrap();
302        assert_eq!(metric.function_name, "do_thing");
303        assert_eq!(metric.status, InvocationStatus::Success);
304        assert_eq!(metric.memory_peak_bytes, 1024);
305        assert!(metric.wall_time_us >= 1_000, "expected ≥1ms, got {}us", metric.wall_time_us);
306    }
307
308    #[tokio::test]
309    async fn timer_finish_failure_includes_error() {
310        let bus = Arc::new(InvocationMetricsBus::new());
311        let mut rx = bus.subscribe();
312
313        let timer = InvocationTimer::start(bus.clone(), test_plugin_id(), "do_thing");
314        timer.finish_failure("boom", 0);
315
316        let metric = rx.recv().await.unwrap();
317        match metric.status {
318            InvocationStatus::Failure { error } => assert_eq!(error, "boom"),
319            other => panic!("expected Failure, got {:?}", other),
320        }
321    }
322
323    #[tokio::test]
324    async fn timer_dropped_without_finish_emits_dropped_metric() {
325        let bus = Arc::new(InvocationMetricsBus::new());
326        let mut rx = bus.subscribe();
327
328        {
329            let _timer = InvocationTimer::start(bus.clone(), test_plugin_id(), "leaked");
330            // Dropped here without calling finish_*.
331        }
332
333        let metric = rx.recv().await.unwrap();
334        assert_eq!(metric.function_name, "leaked");
335        assert_eq!(metric.status, InvocationStatus::Dropped);
336    }
337
338    #[tokio::test]
339    async fn started_at_is_set_at_start_not_finish() {
340        let bus = Arc::new(InvocationMetricsBus::new());
341        let mut rx = bus.subscribe();
342
343        let timer = InvocationTimer::start(bus.clone(), test_plugin_id(), "fn");
344        let started = timer.started_at;
345
346        tokio::time::sleep(Duration::from_millis(5)).await;
347        timer.finish_success(0);
348
349        let metric = rx.recv().await.unwrap();
350        assert_eq!(metric.started_at, started);
351        let elapsed_via_metric = Utc::now()
352            .signed_duration_since(metric.started_at)
353            .num_microseconds()
354            .unwrap_or(i64::MAX);
355        assert!(elapsed_via_metric >= 5_000);
356    }
357}