telltale_runtime/effects/middleware/
metrics.rs1use async_trait::async_trait;
6use serde::{de::DeserializeOwned, Serialize};
7use std::time::Duration;
8
9use crate::effects::registry::{ExtensibleHandler, ExtensionRegistry};
10use crate::effects::{ChoreoHandler, ChoreoResult, RoleId};
11
12#[derive(Clone)]
14pub struct Metrics<H> {
15 inner: H,
16 send_count: std::sync::Arc<std::sync::atomic::AtomicU64>,
17 recv_count: std::sync::Arc<std::sync::atomic::AtomicU64>,
18 error_count: std::sync::Arc<std::sync::atomic::AtomicU64>,
19}
20
21impl<H> Metrics<H> {
22 pub fn new(inner: H) -> Self {
23 Self {
24 inner,
25 send_count: std::sync::Arc::new(std::sync::atomic::AtomicU64::new(0)),
26 recv_count: std::sync::Arc::new(std::sync::atomic::AtomicU64::new(0)),
27 error_count: std::sync::Arc::new(std::sync::atomic::AtomicU64::new(0)),
28 }
29 }
30
31 pub fn send_count(&self) -> u64 {
32 self.send_count.load(std::sync::atomic::Ordering::Relaxed)
33 }
34
35 pub fn recv_count(&self) -> u64 {
36 self.recv_count.load(std::sync::atomic::Ordering::Relaxed)
37 }
38
39 pub fn error_count(&self) -> u64 {
40 self.error_count.load(std::sync::atomic::Ordering::Relaxed)
41 }
42}
43
44#[async_trait]
45impl<H: ChoreoHandler + Send> ChoreoHandler for Metrics<H> {
46 type Role = H::Role;
47 type Endpoint = H::Endpoint;
48
49 async fn send<M: Serialize + Send + Sync>(
50 &mut self,
51 ep: &mut Self::Endpoint,
52 to: Self::Role,
53 msg: &M,
54 ) -> ChoreoResult<()> {
55 let result = self.inner.send(ep, to, msg).await;
56 if result.is_ok() {
57 self.send_count
58 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
59 } else {
60 self.error_count
61 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
62 }
63 result
64 }
65
66 async fn recv<M: DeserializeOwned + Send>(
67 &mut self,
68 ep: &mut Self::Endpoint,
69 from: Self::Role,
70 ) -> ChoreoResult<M> {
71 let result = self.inner.recv(ep, from).await;
72 if result.is_ok() {
73 self.recv_count
74 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
75 } else {
76 self.error_count
77 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
78 }
79 result
80 }
81
82 async fn choose(
83 &mut self,
84 ep: &mut Self::Endpoint,
85 who: Self::Role,
86 label: <Self::Role as RoleId>::Label,
87 ) -> ChoreoResult<()> {
88 self.inner.choose(ep, who, label).await
89 }
90
91 async fn offer(
92 &mut self,
93 ep: &mut Self::Endpoint,
94 from: Self::Role,
95 ) -> ChoreoResult<<Self::Role as RoleId>::Label> {
96 self.inner.offer(ep, from).await
97 }
98
99 async fn with_timeout<F, T>(
100 &mut self,
101 ep: &mut Self::Endpoint,
102 at: Self::Role,
103 dur: Duration,
104 body: F,
105 ) -> ChoreoResult<T>
106 where
107 F: std::future::Future<Output = ChoreoResult<T>> + Send,
108 {
109 self.inner.with_timeout(ep, at, dur, body).await
110 }
111}
112
113impl<H> ExtensibleHandler for Metrics<H>
114where
115 H: ExtensibleHandler + Send,
116{
117 fn extension_registry(&self) -> &ExtensionRegistry<Self::Endpoint, Self::Role> {
118 self.inner.extension_registry()
119 }
120}