telltale_runtime/effects/middleware/
metrics.rs1use async_trait::async_trait;
6use serde::{de::DeserializeOwned, Serialize};
7use std::time::Duration;
8
9use crate::effects::{ChoreoHandler, ChoreoResult, RoleId};
10
11#[derive(Clone)]
13pub struct Metrics<H> {
14 inner: H,
15 send_count: std::sync::Arc<std::sync::atomic::AtomicU64>,
16 recv_count: std::sync::Arc<std::sync::atomic::AtomicU64>,
17 error_count: std::sync::Arc<std::sync::atomic::AtomicU64>,
18}
19
20impl<H> Metrics<H> {
21 pub fn new(inner: H) -> Self {
22 Self {
23 inner,
24 send_count: std::sync::Arc::new(std::sync::atomic::AtomicU64::new(0)),
25 recv_count: std::sync::Arc::new(std::sync::atomic::AtomicU64::new(0)),
26 error_count: std::sync::Arc::new(std::sync::atomic::AtomicU64::new(0)),
27 }
28 }
29
30 pub fn send_count(&self) -> u64 {
31 self.send_count.load(std::sync::atomic::Ordering::Relaxed)
32 }
33
34 pub fn recv_count(&self) -> u64 {
35 self.recv_count.load(std::sync::atomic::Ordering::Relaxed)
36 }
37
38 pub fn error_count(&self) -> u64 {
39 self.error_count.load(std::sync::atomic::Ordering::Relaxed)
40 }
41}
42
43#[async_trait]
44impl<H: ChoreoHandler + Send> ChoreoHandler for Metrics<H> {
45 type Role = H::Role;
46 type Endpoint = H::Endpoint;
47
48 async fn send<M: Serialize + Send + Sync>(
49 &mut self,
50 ep: &mut Self::Endpoint,
51 to: Self::Role,
52 msg: &M,
53 ) -> ChoreoResult<()> {
54 let result = self.inner.send(ep, to, msg).await;
55 if result.is_ok() {
56 self.send_count
57 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
58 } else {
59 self.error_count
60 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
61 }
62 result
63 }
64
65 async fn recv<M: DeserializeOwned + Send>(
66 &mut self,
67 ep: &mut Self::Endpoint,
68 from: Self::Role,
69 ) -> ChoreoResult<M> {
70 let result = self.inner.recv(ep, from).await;
71 if result.is_ok() {
72 self.recv_count
73 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
74 } else {
75 self.error_count
76 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
77 }
78 result
79 }
80
81 async fn choose(
82 &mut self,
83 ep: &mut Self::Endpoint,
84 who: Self::Role,
85 label: <Self::Role as RoleId>::Label,
86 ) -> ChoreoResult<()> {
87 self.inner.choose(ep, who, label).await
88 }
89
90 async fn offer(
91 &mut self,
92 ep: &mut Self::Endpoint,
93 from: Self::Role,
94 ) -> ChoreoResult<<Self::Role as RoleId>::Label> {
95 self.inner.offer(ep, from).await
96 }
97
98 async fn with_timeout<F, T>(
99 &mut self,
100 ep: &mut Self::Endpoint,
101 at: Self::Role,
102 dur: Duration,
103 body: F,
104 ) -> ChoreoResult<T>
105 where
106 F: std::future::Future<Output = ChoreoResult<T>> + Send,
107 {
108 self.inner.with_timeout(ep, at, dur, body).await
109 }
110}