Skip to main content

telltale_runtime/effects/middleware/
trace.rs

1// Tracing middleware for effect handlers
2//
3// Logs all choreographic operations with timing information for debugging and monitoring.
4
5use async_trait::async_trait;
6use serde::{de::DeserializeOwned, Serialize};
7use std::time::{Duration, Instant};
8use tracing::{debug, trace, warn};
9
10use crate::effects::{ChoreoHandler, ChoreoResult, RoleId};
11
12/// Tracing middleware that logs all choreographic operations
13#[derive(Clone)]
14pub struct Trace<H> {
15    inner: H,
16    prefix: String,
17}
18
19impl<H> Trace<H> {
20    pub fn new(inner: H) -> Self {
21        Self::with_prefix(inner, "choreo")
22    }
23
24    pub fn with_prefix(inner: H, prefix: impl Into<String>) -> Self {
25        Self {
26            inner,
27            prefix: prefix.into(),
28        }
29    }
30}
31
32#[async_trait]
33impl<H: ChoreoHandler + Send> ChoreoHandler for Trace<H> {
34    type Role = H::Role;
35    type Endpoint = H::Endpoint;
36
37    async fn send<M: Serialize + Send + Sync>(
38        &mut self,
39        ep: &mut Self::Endpoint,
40        to: Self::Role,
41        msg: &M,
42    ) -> ChoreoResult<()> {
43        let start = Instant::now();
44        trace!(prefix = %self.prefix, ?to, "send: start");
45        let result = self.inner.send(ep, to, msg).await;
46        let duration = start.elapsed();
47        match &result {
48            Ok(()) => debug!(prefix = %self.prefix, ?to, ?duration, "send: success"),
49            Err(e) => warn!(prefix = %self.prefix, ?to, ?duration, error = %e, "send: failed"),
50        }
51        result
52    }
53
54    async fn recv<M: DeserializeOwned + Send>(
55        &mut self,
56        ep: &mut Self::Endpoint,
57        from: Self::Role,
58    ) -> ChoreoResult<M> {
59        let start = Instant::now();
60        trace!(prefix = %self.prefix, ?from, "recv: start");
61        let result = self.inner.recv(ep, from).await;
62        let duration = start.elapsed();
63        match &result {
64            Ok(_) => debug!(prefix = %self.prefix, ?from, ?duration, "recv: success"),
65            Err(e) => warn!(prefix = %self.prefix, ?from, ?duration, error = %e, "recv: failed"),
66        }
67        result
68    }
69
70    async fn choose(
71        &mut self,
72        ep: &mut Self::Endpoint,
73        who: Self::Role,
74        label: <Self::Role as RoleId>::Label,
75    ) -> ChoreoResult<()> {
76        debug!(prefix = %self.prefix, ?who, ?label, "choose");
77        self.inner.choose(ep, who, label).await
78    }
79
80    async fn offer(
81        &mut self,
82        ep: &mut Self::Endpoint,
83        from: Self::Role,
84    ) -> ChoreoResult<<Self::Role as RoleId>::Label> {
85        trace!(prefix = %self.prefix, ?from, "offer: waiting");
86        let label = self.inner.offer(ep, from).await?;
87        debug!(prefix = %self.prefix, ?from, ?label, "offer: received");
88        Ok(label)
89    }
90
91    async fn with_timeout<F, T>(
92        &mut self,
93        ep: &mut Self::Endpoint,
94        at: Self::Role,
95        dur: Duration,
96        body: F,
97    ) -> ChoreoResult<T>
98    where
99        F: std::future::Future<Output = ChoreoResult<T>> + Send,
100    {
101        debug!(prefix = %self.prefix, ?at, ?dur, "timeout: start");
102        let start = Instant::now();
103        let result = self.inner.with_timeout(ep, at, dur, body).await;
104        let elapsed = start.elapsed();
105        match &result {
106            Ok(_) => debug!(prefix = %self.prefix, ?at, ?elapsed, "timeout: completed"),
107            Err(e) => warn!(prefix = %self.prefix, ?at, ?elapsed, error = %e, "timeout: failed"),
108        }
109        result
110    }
111}