Skip to main content

telltale_runtime/effects/middleware/
metrics.rs

1// Metrics collection middleware for effect handlers
2//
3// Tracks counts of sends, receives, and errors for monitoring and analysis.
4
5use async_trait::async_trait;
6use serde::{de::DeserializeOwned, Serialize};
7use std::time::Duration;
8
9use crate::effects::registry::{ExtensibleHandler, ExtensionRegistry};
10use crate::effects::{ChoreoHandler, ChoreoResult, RoleId};
11
12/// Metrics collection middleware
13#[derive(Clone)]
14pub struct Metrics<H> {
15    inner: H,
16    send_count: std::sync::Arc<std::sync::atomic::AtomicU64>,
17    recv_count: std::sync::Arc<std::sync::atomic::AtomicU64>,
18    error_count: std::sync::Arc<std::sync::atomic::AtomicU64>,
19}
20
21impl<H> Metrics<H> {
22    pub fn new(inner: H) -> Self {
23        Self {
24            inner,
25            send_count: std::sync::Arc::new(std::sync::atomic::AtomicU64::new(0)),
26            recv_count: std::sync::Arc::new(std::sync::atomic::AtomicU64::new(0)),
27            error_count: std::sync::Arc::new(std::sync::atomic::AtomicU64::new(0)),
28        }
29    }
30
31    pub fn send_count(&self) -> u64 {
32        self.send_count.load(std::sync::atomic::Ordering::Relaxed)
33    }
34
35    pub fn recv_count(&self) -> u64 {
36        self.recv_count.load(std::sync::atomic::Ordering::Relaxed)
37    }
38
39    pub fn error_count(&self) -> u64 {
40        self.error_count.load(std::sync::atomic::Ordering::Relaxed)
41    }
42}
43
44#[async_trait]
45impl<H: ChoreoHandler + Send> ChoreoHandler for Metrics<H> {
46    type Role = H::Role;
47    type Endpoint = H::Endpoint;
48
49    async fn send<M: Serialize + Send + Sync>(
50        &mut self,
51        ep: &mut Self::Endpoint,
52        to: Self::Role,
53        msg: &M,
54    ) -> ChoreoResult<()> {
55        let result = self.inner.send(ep, to, msg).await;
56        if result.is_ok() {
57            self.send_count
58                .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
59        } else {
60            self.error_count
61                .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
62        }
63        result
64    }
65
66    async fn recv<M: DeserializeOwned + Send>(
67        &mut self,
68        ep: &mut Self::Endpoint,
69        from: Self::Role,
70    ) -> ChoreoResult<M> {
71        let result = self.inner.recv(ep, from).await;
72        if result.is_ok() {
73            self.recv_count
74                .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
75        } else {
76            self.error_count
77                .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
78        }
79        result
80    }
81
82    async fn choose(
83        &mut self,
84        ep: &mut Self::Endpoint,
85        who: Self::Role,
86        label: <Self::Role as RoleId>::Label,
87    ) -> ChoreoResult<()> {
88        self.inner.choose(ep, who, label).await
89    }
90
91    async fn offer(
92        &mut self,
93        ep: &mut Self::Endpoint,
94        from: Self::Role,
95    ) -> ChoreoResult<<Self::Role as RoleId>::Label> {
96        self.inner.offer(ep, from).await
97    }
98
99    async fn with_timeout<F, T>(
100        &mut self,
101        ep: &mut Self::Endpoint,
102        at: Self::Role,
103        dur: Duration,
104        body: F,
105    ) -> ChoreoResult<T>
106    where
107        F: std::future::Future<Output = ChoreoResult<T>> + Send,
108    {
109        self.inner.with_timeout(ep, at, dur, body).await
110    }
111}
112
113impl<H> ExtensibleHandler for Metrics<H>
114where
115    H: ExtensibleHandler + Send,
116{
117    fn extension_registry(&self) -> &ExtensionRegistry<Self::Endpoint, Self::Role> {
118        self.inner.extension_registry()
119    }
120}