telltale_runtime/effects/middleware/
metrics.rs1use async_trait::async_trait;
6use serde::{de::DeserializeOwned, Serialize};
7use std::time::Duration;
8
9use crate::effects::contract::{DocumentedHandlerContract, HandlerContractProfile};
10use crate::effects::registry::{ExtensibleHandler, ExtensionRegistry};
11use crate::effects::{ChoreoHandler, ChoreoResult, RoleId};
12
13#[derive(Clone)]
15pub struct Metrics<H> {
16 inner: H,
17 send_count: std::sync::Arc<std::sync::atomic::AtomicU64>,
18 recv_count: std::sync::Arc<std::sync::atomic::AtomicU64>,
19 error_count: std::sync::Arc<std::sync::atomic::AtomicU64>,
20}
21
22impl<H> Metrics<H> {
23 pub fn new(inner: H) -> Self {
24 Self {
25 inner,
26 send_count: std::sync::Arc::new(std::sync::atomic::AtomicU64::new(0)),
27 recv_count: std::sync::Arc::new(std::sync::atomic::AtomicU64::new(0)),
28 error_count: std::sync::Arc::new(std::sync::atomic::AtomicU64::new(0)),
29 }
30 }
31
32 pub fn send_count(&self) -> u64 {
33 self.send_count.load(std::sync::atomic::Ordering::Relaxed)
34 }
35
36 pub fn recv_count(&self) -> u64 {
37 self.recv_count.load(std::sync::atomic::Ordering::Relaxed)
38 }
39
40 pub fn error_count(&self) -> u64 {
41 self.error_count.load(std::sync::atomic::Ordering::Relaxed)
42 }
43}
44
45impl<H> DocumentedHandlerContract for Metrics<H>
46where
47 H: DocumentedHandlerContract,
48{
49 fn contract_profile() -> HandlerContractProfile {
50 let mut profile = H::contract_profile();
51 profile.handler_name = std::any::type_name::<Self>();
52 profile
53 .notes
54 .push("metrics middleware may count outcomes but must not alter payloads or labels");
55 profile
56 }
57}
58
59#[async_trait]
60impl<H: ChoreoHandler + Send> ChoreoHandler for Metrics<H> {
61 type Role = H::Role;
62 type Endpoint = H::Endpoint;
63
64 async fn send<M: Serialize + Send + Sync>(
65 &mut self,
66 ep: &mut Self::Endpoint,
67 to: Self::Role,
68 msg: &M,
69 ) -> ChoreoResult<()> {
70 let result = self.inner.send(ep, to, msg).await;
71 if result.is_ok() {
72 self.send_count
73 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
74 } else {
75 self.error_count
76 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
77 }
78 result
79 }
80
81 async fn recv<M: DeserializeOwned + Send>(
82 &mut self,
83 ep: &mut Self::Endpoint,
84 from: Self::Role,
85 ) -> ChoreoResult<M> {
86 let result = self.inner.recv(ep, from).await;
87 if result.is_ok() {
88 self.recv_count
89 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
90 } else {
91 self.error_count
92 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
93 }
94 result
95 }
96
97 async fn choose(
98 &mut self,
99 ep: &mut Self::Endpoint,
100 who: Self::Role,
101 label: <Self::Role as RoleId>::Label,
102 ) -> ChoreoResult<()> {
103 self.inner.choose(ep, who, label).await
104 }
105
106 async fn offer(
107 &mut self,
108 ep: &mut Self::Endpoint,
109 from: Self::Role,
110 ) -> ChoreoResult<<Self::Role as RoleId>::Label> {
111 self.inner.offer(ep, from).await
112 }
113
114 async fn with_timeout<F, T>(
115 &mut self,
116 ep: &mut Self::Endpoint,
117 at: Self::Role,
118 dur: Duration,
119 body: F,
120 ) -> ChoreoResult<T>
121 where
122 F: std::future::Future<Output = ChoreoResult<T>> + Send,
123 {
124 self.inner.with_timeout(ep, at, dur, body).await
125 }
126}
127
128impl<H> ExtensibleHandler for Metrics<H>
129where
130 H: ExtensibleHandler + Send,
131{
132 fn extension_registry(&self) -> &ExtensionRegistry<Self::Endpoint, Self::Role> {
133 self.inner.extension_registry()
134 }
135}