Skip to main content

telltale_runtime/effects/middleware/
metrics.rs

1// Metrics collection middleware for effect handlers
2//
3// Tracks counts of sends, receives, and errors for monitoring and analysis.
4
5use async_trait::async_trait;
6use serde::{de::DeserializeOwned, Serialize};
7use std::time::Duration;
8
9use crate::effects::{ChoreoHandler, ChoreoResult, RoleId};
10
11/// Metrics collection middleware
12#[derive(Clone)]
13pub struct Metrics<H> {
14    inner: H,
15    send_count: std::sync::Arc<std::sync::atomic::AtomicU64>,
16    recv_count: std::sync::Arc<std::sync::atomic::AtomicU64>,
17    error_count: std::sync::Arc<std::sync::atomic::AtomicU64>,
18}
19
20impl<H> Metrics<H> {
21    pub fn new(inner: H) -> Self {
22        Self {
23            inner,
24            send_count: std::sync::Arc::new(std::sync::atomic::AtomicU64::new(0)),
25            recv_count: std::sync::Arc::new(std::sync::atomic::AtomicU64::new(0)),
26            error_count: std::sync::Arc::new(std::sync::atomic::AtomicU64::new(0)),
27        }
28    }
29
30    pub fn send_count(&self) -> u64 {
31        self.send_count.load(std::sync::atomic::Ordering::Relaxed)
32    }
33
34    pub fn recv_count(&self) -> u64 {
35        self.recv_count.load(std::sync::atomic::Ordering::Relaxed)
36    }
37
38    pub fn error_count(&self) -> u64 {
39        self.error_count.load(std::sync::atomic::Ordering::Relaxed)
40    }
41}
42
43#[async_trait]
44impl<H: ChoreoHandler + Send> ChoreoHandler for Metrics<H> {
45    type Role = H::Role;
46    type Endpoint = H::Endpoint;
47
48    async fn send<M: Serialize + Send + Sync>(
49        &mut self,
50        ep: &mut Self::Endpoint,
51        to: Self::Role,
52        msg: &M,
53    ) -> ChoreoResult<()> {
54        let result = self.inner.send(ep, to, msg).await;
55        if result.is_ok() {
56            self.send_count
57                .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
58        } else {
59            self.error_count
60                .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
61        }
62        result
63    }
64
65    async fn recv<M: DeserializeOwned + Send>(
66        &mut self,
67        ep: &mut Self::Endpoint,
68        from: Self::Role,
69    ) -> ChoreoResult<M> {
70        let result = self.inner.recv(ep, from).await;
71        if result.is_ok() {
72            self.recv_count
73                .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
74        } else {
75            self.error_count
76                .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
77        }
78        result
79    }
80
81    async fn choose(
82        &mut self,
83        ep: &mut Self::Endpoint,
84        who: Self::Role,
85        label: <Self::Role as RoleId>::Label,
86    ) -> ChoreoResult<()> {
87        self.inner.choose(ep, who, label).await
88    }
89
90    async fn offer(
91        &mut self,
92        ep: &mut Self::Endpoint,
93        from: Self::Role,
94    ) -> ChoreoResult<<Self::Role as RoleId>::Label> {
95        self.inner.offer(ep, from).await
96    }
97
98    async fn with_timeout<F, T>(
99        &mut self,
100        ep: &mut Self::Endpoint,
101        at: Self::Role,
102        dur: Duration,
103        body: F,
104    ) -> ChoreoResult<T>
105    where
106        F: std::future::Future<Output = ChoreoResult<T>> + Send,
107    {
108        self.inner.with_timeout(ep, at, dur, body).await
109    }
110}