use std::sync::Arc;
use chrono::{DateTime, Utc};
use mockforge_plugin_core::PluginId;
use tokio::sync::broadcast::{self, Receiver, Sender};
const DEFAULT_CHANNEL_CAPACITY: usize = 256;
#[derive(Debug, Clone)]
pub struct InvocationMetric {
pub plugin_id: PluginId,
pub function_name: String,
pub started_at: DateTime<Utc>,
pub wall_time_us: u64,
pub memory_peak_bytes: u64,
pub status: InvocationStatus,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum InvocationStatus {
Success,
Failure {
error: String,
},
Dropped,
}
#[derive(Debug, Clone)]
pub struct InvocationMetricsBus {
tx: Sender<InvocationMetric>,
}
impl InvocationMetricsBus {
pub fn new() -> Self {
Self::with_capacity(DEFAULT_CHANNEL_CAPACITY)
}
pub fn with_capacity(capacity: usize) -> Self {
let (tx, _rx) = broadcast::channel(capacity);
Self { tx }
}
pub fn subscribe(&self) -> Receiver<InvocationMetric> {
self.tx.subscribe()
}
pub fn record(&self, metric: InvocationMetric) {
let _ = self.tx.send(metric);
}
pub fn subscriber_count(&self) -> usize {
self.tx.receiver_count()
}
}
impl Default for InvocationMetricsBus {
fn default() -> Self {
Self::new()
}
}
pub struct InvocationTimer {
bus: Arc<InvocationMetricsBus>,
plugin_id: PluginId,
function_name: String,
started_at: DateTime<Utc>,
started_instant: std::time::Instant,
finished: bool,
}
impl InvocationTimer {
pub fn start(
bus: Arc<InvocationMetricsBus>,
plugin_id: PluginId,
function_name: impl Into<String>,
) -> Self {
Self {
bus,
plugin_id,
function_name: function_name.into(),
started_at: Utc::now(),
started_instant: std::time::Instant::now(),
finished: false,
}
}
pub fn finish_success(mut self, memory_peak_bytes: u64) {
self.emit(InvocationStatus::Success, memory_peak_bytes);
}
pub fn finish_failure(mut self, error: impl Into<String>, memory_peak_bytes: u64) {
self.emit(
InvocationStatus::Failure {
error: error.into(),
},
memory_peak_bytes,
);
}
fn emit(&mut self, status: InvocationStatus, memory_peak_bytes: u64) {
self.finished = true;
let wall_time_us = self.started_instant.elapsed().as_micros().min(u64::MAX as u128) as u64;
let metric = InvocationMetric {
plugin_id: self.plugin_id.clone(),
function_name: std::mem::take(&mut self.function_name),
started_at: self.started_at,
wall_time_us,
memory_peak_bytes,
status,
};
self.bus.record(metric);
}
}
impl Drop for InvocationTimer {
fn drop(&mut self) {
if !self.finished {
tracing::warn!(
plugin_id = %self.plugin_id,
function_name = %self.function_name,
"InvocationTimer dropped without finish_* — emitting Dropped metric"
);
self.emit(InvocationStatus::Dropped, 0);
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::time::Duration;
fn test_plugin_id() -> PluginId {
PluginId::new("test-plugin")
}
#[tokio::test]
async fn record_with_no_subscribers_is_silent() {
let bus = InvocationMetricsBus::new();
assert_eq!(bus.subscriber_count(), 0);
bus.record(InvocationMetric {
plugin_id: test_plugin_id(),
function_name: "fn1".into(),
started_at: Utc::now(),
wall_time_us: 100,
memory_peak_bytes: 0,
status: InvocationStatus::Success,
});
}
#[tokio::test]
async fn subscribe_then_receive() {
let bus = InvocationMetricsBus::new();
let mut rx = bus.subscribe();
assert_eq!(bus.subscriber_count(), 1);
bus.record(InvocationMetric {
plugin_id: test_plugin_id(),
function_name: "fn1".into(),
started_at: Utc::now(),
wall_time_us: 42,
memory_peak_bytes: 0,
status: InvocationStatus::Success,
});
let received = rx.recv().await.unwrap();
assert_eq!(received.function_name, "fn1");
assert_eq!(received.wall_time_us, 42);
assert_eq!(received.status, InvocationStatus::Success);
}
#[tokio::test]
async fn multiple_subscribers_each_get_every_event() {
let bus = InvocationMetricsBus::new();
let mut rx1 = bus.subscribe();
let mut rx2 = bus.subscribe();
assert_eq!(bus.subscriber_count(), 2);
bus.record(InvocationMetric {
plugin_id: test_plugin_id(),
function_name: "fn-broadcast".into(),
started_at: Utc::now(),
wall_time_us: 7,
memory_peak_bytes: 0,
status: InvocationStatus::Success,
});
let m1 = rx1.recv().await.unwrap();
let m2 = rx2.recv().await.unwrap();
assert_eq!(m1.function_name, "fn-broadcast");
assert_eq!(m2.function_name, "fn-broadcast");
}
#[tokio::test]
async fn timer_finish_success_emits_metric() {
let bus = Arc::new(InvocationMetricsBus::new());
let mut rx = bus.subscribe();
let timer = InvocationTimer::start(bus.clone(), test_plugin_id(), "do_thing");
tokio::time::sleep(Duration::from_millis(2)).await;
timer.finish_success(1024);
let metric = rx.recv().await.unwrap();
assert_eq!(metric.function_name, "do_thing");
assert_eq!(metric.status, InvocationStatus::Success);
assert_eq!(metric.memory_peak_bytes, 1024);
assert!(metric.wall_time_us >= 1_000, "expected ≥1ms, got {}us", metric.wall_time_us);
}
#[tokio::test]
async fn timer_finish_failure_includes_error() {
let bus = Arc::new(InvocationMetricsBus::new());
let mut rx = bus.subscribe();
let timer = InvocationTimer::start(bus.clone(), test_plugin_id(), "do_thing");
timer.finish_failure("boom", 0);
let metric = rx.recv().await.unwrap();
match metric.status {
InvocationStatus::Failure { error } => assert_eq!(error, "boom"),
other => panic!("expected Failure, got {:?}", other),
}
}
#[tokio::test]
async fn timer_dropped_without_finish_emits_dropped_metric() {
let bus = Arc::new(InvocationMetricsBus::new());
let mut rx = bus.subscribe();
{
let _timer = InvocationTimer::start(bus.clone(), test_plugin_id(), "leaked");
}
let metric = rx.recv().await.unwrap();
assert_eq!(metric.function_name, "leaked");
assert_eq!(metric.status, InvocationStatus::Dropped);
}
#[tokio::test]
async fn started_at_is_set_at_start_not_finish() {
let bus = Arc::new(InvocationMetricsBus::new());
let mut rx = bus.subscribe();
let timer = InvocationTimer::start(bus.clone(), test_plugin_id(), "fn");
let started = timer.started_at;
tokio::time::sleep(Duration::from_millis(5)).await;
timer.finish_success(0);
let metric = rx.recv().await.unwrap();
assert_eq!(metric.started_at, started);
let elapsed_via_metric = Utc::now()
.signed_duration_since(metric.started_at)
.num_microseconds()
.unwrap_or(i64::MAX);
assert!(elapsed_via_metric >= 5_000);
}
}