1use std::sync::Arc;
29
30use chrono::{DateTime, Utc};
31use mockforge_plugin_core::PluginId;
32use tokio::sync::broadcast::{self, Receiver, Sender};
33
34const DEFAULT_CHANNEL_CAPACITY: usize = 256;
38
39#[derive(Debug, Clone)]
41pub struct InvocationMetric {
42 pub plugin_id: PluginId,
44 pub function_name: String,
47 pub started_at: DateTime<Utc>,
49 pub wall_time_us: u64,
53 pub memory_peak_bytes: u64,
58 pub status: InvocationStatus,
60}
61
62#[derive(Debug, Clone, PartialEq, Eq)]
64pub enum InvocationStatus {
65 Success,
67 Failure {
69 error: String,
71 },
72 Dropped,
75}
76
77#[derive(Debug, Clone)]
87pub struct InvocationMetricsBus {
88 tx: Sender<InvocationMetric>,
89}
90
91impl InvocationMetricsBus {
92 pub fn new() -> Self {
95 Self::with_capacity(DEFAULT_CHANNEL_CAPACITY)
96 }
97
98 pub fn with_capacity(capacity: usize) -> Self {
102 let (tx, _rx) = broadcast::channel(capacity);
103 Self { tx }
104 }
105
106 pub fn subscribe(&self) -> Receiver<InvocationMetric> {
110 self.tx.subscribe()
111 }
112
113 pub fn record(&self, metric: InvocationMetric) {
118 let _ = self.tx.send(metric);
122 }
123
124 pub fn subscriber_count(&self) -> usize {
127 self.tx.receiver_count()
128 }
129}
130
131impl Default for InvocationMetricsBus {
132 fn default() -> Self {
133 Self::new()
134 }
135}
136
137pub struct InvocationTimer {
150 bus: Arc<InvocationMetricsBus>,
151 plugin_id: PluginId,
152 function_name: String,
153 started_at: DateTime<Utc>,
154 started_instant: std::time::Instant,
155 finished: bool,
158}
159
160impl InvocationTimer {
161 pub fn start(
164 bus: Arc<InvocationMetricsBus>,
165 plugin_id: PluginId,
166 function_name: impl Into<String>,
167 ) -> Self {
168 Self {
169 bus,
170 plugin_id,
171 function_name: function_name.into(),
172 started_at: Utc::now(),
173 started_instant: std::time::Instant::now(),
174 finished: false,
175 }
176 }
177
178 pub fn finish_success(mut self, memory_peak_bytes: u64) {
180 self.emit(InvocationStatus::Success, memory_peak_bytes);
181 }
182
183 pub fn finish_failure(mut self, error: impl Into<String>, memory_peak_bytes: u64) {
185 self.emit(
186 InvocationStatus::Failure {
187 error: error.into(),
188 },
189 memory_peak_bytes,
190 );
191 }
192
193 fn emit(&mut self, status: InvocationStatus, memory_peak_bytes: u64) {
194 self.finished = true;
195 let wall_time_us = self.started_instant.elapsed().as_micros().min(u64::MAX as u128) as u64;
196 let metric = InvocationMetric {
197 plugin_id: self.plugin_id.clone(),
198 function_name: std::mem::take(&mut self.function_name),
199 started_at: self.started_at,
200 wall_time_us,
201 memory_peak_bytes,
202 status,
203 };
204 self.bus.record(metric);
205 }
206}
207
208impl Drop for InvocationTimer {
209 fn drop(&mut self) {
210 if !self.finished {
211 tracing::warn!(
212 plugin_id = %self.plugin_id,
213 function_name = %self.function_name,
214 "InvocationTimer dropped without finish_* — emitting Dropped metric"
215 );
216 self.emit(InvocationStatus::Dropped, 0);
219 }
220 }
221}
222
223#[cfg(test)]
224mod tests {
225 use super::*;
226 use std::time::Duration;
227
228 fn test_plugin_id() -> PluginId {
229 PluginId::new("test-plugin")
230 }
231
232 #[tokio::test]
233 async fn record_with_no_subscribers_is_silent() {
234 let bus = InvocationMetricsBus::new();
235 assert_eq!(bus.subscriber_count(), 0);
236
237 bus.record(InvocationMetric {
239 plugin_id: test_plugin_id(),
240 function_name: "fn1".into(),
241 started_at: Utc::now(),
242 wall_time_us: 100,
243 memory_peak_bytes: 0,
244 status: InvocationStatus::Success,
245 });
246 }
247
248 #[tokio::test]
249 async fn subscribe_then_receive() {
250 let bus = InvocationMetricsBus::new();
251 let mut rx = bus.subscribe();
252 assert_eq!(bus.subscriber_count(), 1);
253
254 bus.record(InvocationMetric {
255 plugin_id: test_plugin_id(),
256 function_name: "fn1".into(),
257 started_at: Utc::now(),
258 wall_time_us: 42,
259 memory_peak_bytes: 0,
260 status: InvocationStatus::Success,
261 });
262
263 let received = rx.recv().await.unwrap();
264 assert_eq!(received.function_name, "fn1");
265 assert_eq!(received.wall_time_us, 42);
266 assert_eq!(received.status, InvocationStatus::Success);
267 }
268
269 #[tokio::test]
270 async fn multiple_subscribers_each_get_every_event() {
271 let bus = InvocationMetricsBus::new();
272 let mut rx1 = bus.subscribe();
273 let mut rx2 = bus.subscribe();
274 assert_eq!(bus.subscriber_count(), 2);
275
276 bus.record(InvocationMetric {
277 plugin_id: test_plugin_id(),
278 function_name: "fn-broadcast".into(),
279 started_at: Utc::now(),
280 wall_time_us: 7,
281 memory_peak_bytes: 0,
282 status: InvocationStatus::Success,
283 });
284
285 let m1 = rx1.recv().await.unwrap();
286 let m2 = rx2.recv().await.unwrap();
287 assert_eq!(m1.function_name, "fn-broadcast");
288 assert_eq!(m2.function_name, "fn-broadcast");
289 }
290
291 #[tokio::test]
292 async fn timer_finish_success_emits_metric() {
293 let bus = Arc::new(InvocationMetricsBus::new());
294 let mut rx = bus.subscribe();
295
296 let timer = InvocationTimer::start(bus.clone(), test_plugin_id(), "do_thing");
297 tokio::time::sleep(Duration::from_millis(2)).await;
299 timer.finish_success(1024);
300
301 let metric = rx.recv().await.unwrap();
302 assert_eq!(metric.function_name, "do_thing");
303 assert_eq!(metric.status, InvocationStatus::Success);
304 assert_eq!(metric.memory_peak_bytes, 1024);
305 assert!(metric.wall_time_us >= 1_000, "expected ≥1ms, got {}us", metric.wall_time_us);
306 }
307
308 #[tokio::test]
309 async fn timer_finish_failure_includes_error() {
310 let bus = Arc::new(InvocationMetricsBus::new());
311 let mut rx = bus.subscribe();
312
313 let timer = InvocationTimer::start(bus.clone(), test_plugin_id(), "do_thing");
314 timer.finish_failure("boom", 0);
315
316 let metric = rx.recv().await.unwrap();
317 match metric.status {
318 InvocationStatus::Failure { error } => assert_eq!(error, "boom"),
319 other => panic!("expected Failure, got {:?}", other),
320 }
321 }
322
323 #[tokio::test]
324 async fn timer_dropped_without_finish_emits_dropped_metric() {
325 let bus = Arc::new(InvocationMetricsBus::new());
326 let mut rx = bus.subscribe();
327
328 {
329 let _timer = InvocationTimer::start(bus.clone(), test_plugin_id(), "leaked");
330 }
332
333 let metric = rx.recv().await.unwrap();
334 assert_eq!(metric.function_name, "leaked");
335 assert_eq!(metric.status, InvocationStatus::Dropped);
336 }
337
338 #[tokio::test]
339 async fn started_at_is_set_at_start_not_finish() {
340 let bus = Arc::new(InvocationMetricsBus::new());
341 let mut rx = bus.subscribe();
342
343 let timer = InvocationTimer::start(bus.clone(), test_plugin_id(), "fn");
344 let started = timer.started_at;
345
346 tokio::time::sleep(Duration::from_millis(5)).await;
347 timer.finish_success(0);
348
349 let metric = rx.recv().await.unwrap();
350 assert_eq!(metric.started_at, started);
351 let elapsed_via_metric = Utc::now()
352 .signed_duration_since(metric.started_at)
353 .num_microseconds()
354 .unwrap_or(i64::MAX);
355 assert!(elapsed_via_metric >= 5_000);
356 }
357}