Skip to main content

telltale_runtime/effects/middleware/
trace.rs

1// Tracing middleware for effect handlers
2//
3// Logs all choreographic operations with timing information for debugging and monitoring.
4
5use async_trait::async_trait;
6use serde::{de::DeserializeOwned, Serialize};
7use std::time::{Duration, Instant};
8use tracing::{debug, trace, warn};
9
10use crate::effects::registry::{ExtensibleHandler, ExtensionRegistry};
11use crate::effects::{ChoreoHandler, ChoreoResult, RoleId};
12
13/// Tracing middleware that logs all choreographic operations
14#[derive(Clone)]
15pub struct Trace<H> {
16    inner: H,
17    prefix: String,
18}
19
20impl<H> Trace<H> {
21    pub fn new(inner: H) -> Self {
22        Self::with_prefix(inner, "choreo")
23    }
24
25    pub fn with_prefix(inner: H, prefix: impl Into<String>) -> Self {
26        Self {
27            inner,
28            prefix: prefix.into(),
29        }
30    }
31}
32
33#[async_trait]
34impl<H: ChoreoHandler + Send> ChoreoHandler for Trace<H> {
35    type Role = H::Role;
36    type Endpoint = H::Endpoint;
37
38    async fn send<M: Serialize + Send + Sync>(
39        &mut self,
40        ep: &mut Self::Endpoint,
41        to: Self::Role,
42        msg: &M,
43    ) -> ChoreoResult<()> {
44        let start = Instant::now();
45        trace!(prefix = %self.prefix, ?to, "send: start");
46        let result = self.inner.send(ep, to, msg).await;
47        let duration = start.elapsed();
48        match &result {
49            Ok(()) => debug!(prefix = %self.prefix, ?to, ?duration, "send: success"),
50            Err(e) => warn!(prefix = %self.prefix, ?to, ?duration, error = %e, "send: failed"),
51        }
52        result
53    }
54
55    async fn recv<M: DeserializeOwned + Send>(
56        &mut self,
57        ep: &mut Self::Endpoint,
58        from: Self::Role,
59    ) -> ChoreoResult<M> {
60        let start = Instant::now();
61        trace!(prefix = %self.prefix, ?from, "recv: start");
62        let result = self.inner.recv(ep, from).await;
63        let duration = start.elapsed();
64        match &result {
65            Ok(_) => debug!(prefix = %self.prefix, ?from, ?duration, "recv: success"),
66            Err(e) => warn!(prefix = %self.prefix, ?from, ?duration, error = %e, "recv: failed"),
67        }
68        result
69    }
70
71    async fn choose(
72        &mut self,
73        ep: &mut Self::Endpoint,
74        who: Self::Role,
75        label: <Self::Role as RoleId>::Label,
76    ) -> ChoreoResult<()> {
77        debug!(prefix = %self.prefix, ?who, ?label, "choose");
78        self.inner.choose(ep, who, label).await
79    }
80
81    async fn offer(
82        &mut self,
83        ep: &mut Self::Endpoint,
84        from: Self::Role,
85    ) -> ChoreoResult<<Self::Role as RoleId>::Label> {
86        trace!(prefix = %self.prefix, ?from, "offer: waiting");
87        let label = self.inner.offer(ep, from).await?;
88        debug!(prefix = %self.prefix, ?from, ?label, "offer: received");
89        Ok(label)
90    }
91
92    async fn with_timeout<F, T>(
93        &mut self,
94        ep: &mut Self::Endpoint,
95        at: Self::Role,
96        dur: Duration,
97        body: F,
98    ) -> ChoreoResult<T>
99    where
100        F: std::future::Future<Output = ChoreoResult<T>> + Send,
101    {
102        debug!(prefix = %self.prefix, ?at, ?dur, "timeout: start");
103        let start = Instant::now();
104        let result = self.inner.with_timeout(ep, at, dur, body).await;
105        let elapsed = start.elapsed();
106        match &result {
107            Ok(_) => debug!(prefix = %self.prefix, ?at, ?elapsed, "timeout: completed"),
108            Err(e) => warn!(prefix = %self.prefix, ?at, ?elapsed, error = %e, "timeout: failed"),
109        }
110        result
111    }
112}
113
114impl<H> ExtensibleHandler for Trace<H>
115where
116    H: ExtensibleHandler + Send,
117{
118    fn extension_registry(&self) -> &ExtensionRegistry<Self::Endpoint, Self::Role> {
119        self.inner.extension_registry()
120    }
121}