Skip to main content

telltale_runtime/effects/middleware/
metrics.rs

1// Metrics collection middleware for effect handlers
2//
3// Tracks counts of sends, receives, and errors for monitoring and analysis.
4
5use async_trait::async_trait;
6use serde::{de::DeserializeOwned, Serialize};
7use std::time::Duration;
8
9use crate::effects::contract::{DocumentedHandlerContract, HandlerContractProfile};
10use crate::effects::registry::{ExtensibleHandler, ExtensionRegistry};
11use crate::effects::{ChoreoHandler, ChoreoResult, RoleId};
12
13/// Metrics collection middleware
14#[derive(Clone)]
15pub struct Metrics<H> {
16    inner: H,
17    send_count: std::sync::Arc<std::sync::atomic::AtomicU64>,
18    recv_count: std::sync::Arc<std::sync::atomic::AtomicU64>,
19    error_count: std::sync::Arc<std::sync::atomic::AtomicU64>,
20}
21
22impl<H> Metrics<H> {
23    pub fn new(inner: H) -> Self {
24        Self {
25            inner,
26            send_count: std::sync::Arc::new(std::sync::atomic::AtomicU64::new(0)),
27            recv_count: std::sync::Arc::new(std::sync::atomic::AtomicU64::new(0)),
28            error_count: std::sync::Arc::new(std::sync::atomic::AtomicU64::new(0)),
29        }
30    }
31
32    pub fn send_count(&self) -> u64 {
33        self.send_count.load(std::sync::atomic::Ordering::Relaxed)
34    }
35
36    pub fn recv_count(&self) -> u64 {
37        self.recv_count.load(std::sync::atomic::Ordering::Relaxed)
38    }
39
40    pub fn error_count(&self) -> u64 {
41        self.error_count.load(std::sync::atomic::Ordering::Relaxed)
42    }
43}
44
45impl<H> DocumentedHandlerContract for Metrics<H>
46where
47    H: DocumentedHandlerContract,
48{
49    fn contract_profile() -> HandlerContractProfile {
50        let mut profile = H::contract_profile();
51        profile.handler_name = std::any::type_name::<Self>();
52        profile
53            .notes
54            .push("metrics middleware may count outcomes but must not alter payloads or labels");
55        profile
56    }
57}
58
59#[async_trait]
60impl<H: ChoreoHandler + Send> ChoreoHandler for Metrics<H> {
61    type Role = H::Role;
62    type Endpoint = H::Endpoint;
63
64    async fn send<M: Serialize + Send + Sync>(
65        &mut self,
66        ep: &mut Self::Endpoint,
67        to: Self::Role,
68        msg: &M,
69    ) -> ChoreoResult<()> {
70        let result = self.inner.send(ep, to, msg).await;
71        if result.is_ok() {
72            self.send_count
73                .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
74        } else {
75            self.error_count
76                .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
77        }
78        result
79    }
80
81    async fn recv<M: DeserializeOwned + Send>(
82        &mut self,
83        ep: &mut Self::Endpoint,
84        from: Self::Role,
85    ) -> ChoreoResult<M> {
86        let result = self.inner.recv(ep, from).await;
87        if result.is_ok() {
88            self.recv_count
89                .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
90        } else {
91            self.error_count
92                .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
93        }
94        result
95    }
96
97    async fn choose(
98        &mut self,
99        ep: &mut Self::Endpoint,
100        who: Self::Role,
101        label: <Self::Role as RoleId>::Label,
102    ) -> ChoreoResult<()> {
103        self.inner.choose(ep, who, label).await
104    }
105
106    async fn offer(
107        &mut self,
108        ep: &mut Self::Endpoint,
109        from: Self::Role,
110    ) -> ChoreoResult<<Self::Role as RoleId>::Label> {
111        self.inner.offer(ep, from).await
112    }
113
114    async fn with_timeout<F, T>(
115        &mut self,
116        ep: &mut Self::Endpoint,
117        at: Self::Role,
118        dur: Duration,
119        body: F,
120    ) -> ChoreoResult<T>
121    where
122        F: std::future::Future<Output = ChoreoResult<T>> + Send,
123    {
124        self.inner.with_timeout(ep, at, dur, body).await
125    }
126}
127
128impl<H> ExtensibleHandler for Metrics<H>
129where
130    H: ExtensibleHandler + Send,
131{
132    fn extension_registry(&self) -> &ExtensionRegistry<Self::Endpoint, Self::Role> {
133        self.inner.extension_registry()
134    }
135}