1use async_trait::async_trait;
25use serde::{de::DeserializeOwned, Serialize};
26use std::time::{Duration, Instant};
27use tracing::{debug, error, info, info_span, instrument, Instrument, Span};
28
29use crate::effects::{ChoreoHandler, ChoreoResult, LabelId, RoleId};
30use crate::identifiers::RoleName;
31
32pub mod fields {
34 pub const PROTOCOL: &str = "protocol";
36 pub const ROLE: &str = "role";
38 pub const ROLE_INDEX: &str = "role_index";
40 pub const PHASE: &str = "phase";
42 pub const MESSAGE_TYPE: &str = "message_type";
44 pub const MESSAGE_SIZE: &str = "message_size";
46 pub const TARGET_ROLE: &str = "target_role";
48 pub const SOURCE_ROLE: &str = "source_role";
50 pub const CHOICE_LABEL: &str = "choice_label";
52 pub const DURATION_MS: &str = "duration_ms";
54 pub const ERROR: &str = "error";
56}
57
58pub mod events {
60 pub const SEND: &str = "protocol.send";
62 pub const RECV: &str = "protocol.recv";
64 pub const CHOOSE: &str = "protocol.choose";
66 pub const OFFER: &str = "protocol.offer";
68 pub const PHASE_START: &str = "protocol.phase.start";
70 pub const PHASE_END: &str = "protocol.phase.end";
72 pub const ERROR: &str = "protocol.error";
74}
75
76pub fn protocol_span(protocol: &str, role: &RoleName, role_index: Option<u32>) -> Span {
78 match role_index {
79 Some(idx) => info_span!(
80 "protocol.execute",
81 protocol = protocol,
82 role = role.as_str(),
83 role_index = idx
84 ),
85 None => info_span!(
86 "protocol.execute",
87 protocol = protocol,
88 role = role.as_str()
89 ),
90 }
91}
92
93pub fn phase_span(protocol: &str, role: &RoleName, phase: &str) -> Span {
95 info_span!(
96 "protocol.phase",
97 protocol = protocol,
98 role = role.as_str(),
99 phase = phase
100 )
101}
102
103pub fn trace_send(target_role: &str, message_type: &str, message_size: usize) {
105 info!(
106 target: events::SEND,
107 target_role = target_role,
108 message_type = message_type,
109 message_size = message_size,
110 "sending message"
111 );
112}
113
114pub fn trace_recv(source_role: &str, message_type: &str, message_size: usize) {
116 info!(
117 target: events::RECV,
118 source_role = source_role,
119 message_type = message_type,
120 message_size = message_size,
121 "received message"
122 );
123}
124
125pub fn trace_choose(target_role: &str, label: &str) {
127 info!(
128 target: events::CHOOSE,
129 target_role = target_role,
130 choice_label = label,
131 "made choice"
132 );
133}
134
135pub fn trace_offer(source_role: &str, label: &str) {
137 info!(
138 target: events::OFFER,
139 source_role = source_role,
140 choice_label = label,
141 "received choice"
142 );
143}
144
145pub fn trace_phase_start(phase: &str) {
147 debug!(target: events::PHASE_START, phase = phase, "phase started");
148}
149
150pub fn trace_phase_end(phase: &str, duration_ms: u64) {
152 debug!(
153 target: events::PHASE_END,
154 phase = phase,
155 duration_ms = duration_ms,
156 "phase completed"
157 );
158}
159
160pub fn trace_error(error_message: &str) {
162 error!(
163 target: events::ERROR,
164 error = error_message,
165 "protocol error"
166 );
167}
168
169fn format_role<R: RoleId>(role: R) -> String {
170 match role.role_index() {
171 Some(index) => format!("{}[{}]", role.role_name(), index),
172 None => role.role_name().to_string(),
173 }
174}
175
176pub struct TracingHandler<H> {
178 inner: H,
179 protocol: &'static str,
180 role: RoleName,
181 role_index: Option<u32>,
182 span: Span,
183}
184
185impl<H> TracingHandler<H> {
186 pub fn new(inner: H, protocol: &'static str, role: H::Role) -> Self
188 where
189 H: ChoreoHandler,
190 {
191 let role_name = role.role_name();
192 let role_index = role.role_index();
193 let span = protocol_span(protocol, &role_name, role_index);
194 Self {
195 inner,
196 protocol,
197 role: role_name,
198 role_index,
199 span,
200 }
201 }
202
203 pub fn indexed(inner: H, protocol: &'static str, role: RoleName, index: u32) -> Self {
205 let span = protocol_span(protocol, &role, Some(index));
206 Self {
207 inner,
208 protocol,
209 role,
210 role_index: Some(index),
211 span,
212 }
213 }
214
215 pub fn protocol(&self) -> &'static str {
217 self.protocol
218 }
219
220 pub fn role(&self) -> &RoleName {
222 &self.role
223 }
224
225 pub fn role_index(&self) -> Option<u32> {
227 self.role_index
228 }
229
230 pub fn inner(&self) -> &H {
232 &self.inner
233 }
234
235 pub fn inner_mut(&mut self) -> &mut H {
237 &mut self.inner
238 }
239
240 pub fn into_inner(self) -> H {
242 self.inner
243 }
244}
245
246#[async_trait]
247impl<H: ChoreoHandler> ChoreoHandler for TracingHandler<H> {
248 type Role = H::Role;
249 type Endpoint = H::Endpoint;
250
251 #[instrument(
252 skip(self, ep, msg),
253 fields(
254 protocol = self.protocol,
255 role = self.role.as_str(),
256 target_role = ?to,
257 message_type = std::any::type_name::<M>()
258 )
259 )]
260 async fn send<M: Serialize + Send + Sync>(
261 &mut self,
262 ep: &mut Self::Endpoint,
263 to: Self::Role,
264 msg: &M,
265 ) -> ChoreoResult<()> {
266 trace_send(&format_role(to), std::any::type_name::<M>(), 0);
267 self.inner
268 .send(ep, to, msg)
269 .instrument(self.span.clone())
270 .await
271 }
272
273 #[instrument(
274 skip(self, ep),
275 fields(
276 protocol = self.protocol,
277 role = self.role.as_str(),
278 source_role = ?from,
279 message_type = std::any::type_name::<M>()
280 )
281 )]
282 async fn recv<M: DeserializeOwned + Send>(
283 &mut self,
284 ep: &mut Self::Endpoint,
285 from: Self::Role,
286 ) -> ChoreoResult<M> {
287 let result = self
288 .inner
289 .recv::<M>(ep, from)
290 .instrument(self.span.clone())
291 .await;
292 if result.is_ok() {
293 trace_recv(&format_role(from), std::any::type_name::<M>(), 0);
294 }
295 result
296 }
297
298 #[instrument(
299 skip(self, ep),
300 fields(
301 protocol = self.protocol,
302 role = self.role.as_str(),
303 target_role = ?to,
304 choice_label = label.as_str()
305 )
306 )]
307 async fn choose(
308 &mut self,
309 ep: &mut Self::Endpoint,
310 to: Self::Role,
311 label: <Self::Role as RoleId>::Label,
312 ) -> ChoreoResult<()> {
313 trace_choose(&format_role(to), label.as_str());
314 self.inner
315 .choose(ep, to, label)
316 .instrument(self.span.clone())
317 .await
318 }
319
320 #[instrument(
321 skip(self, ep),
322 fields(
323 protocol = self.protocol,
324 role = self.role.as_str(),
325 source_role = ?from
326 )
327 )]
328 async fn offer(
329 &mut self,
330 ep: &mut Self::Endpoint,
331 from: Self::Role,
332 ) -> ChoreoResult<<Self::Role as RoleId>::Label> {
333 let result = self
334 .inner
335 .offer(ep, from)
336 .instrument(self.span.clone())
337 .await;
338 if let Ok(ref label) = result {
339 trace_offer(&format_role(from), label.as_str());
340 }
341 result
342 }
343
344 async fn with_timeout<F, T>(
345 &mut self,
346 ep: &mut Self::Endpoint,
347 at: Self::Role,
348 dur: Duration,
349 body: F,
350 ) -> ChoreoResult<T>
351 where
352 F: std::future::Future<Output = ChoreoResult<T>> + Send,
353 {
354 self.inner
355 .with_timeout(ep, at, dur, body)
356 .instrument(self.span.clone())
357 .await
358 }
359}
360
361pub struct PhaseGuard {
363 phase: &'static str,
364 start: Instant,
365 span: Span,
366}
367
368impl PhaseGuard {
369 pub fn new(protocol: &'static str, role: &RoleName, phase: &'static str) -> Self {
371 let span = phase_span(protocol, role, phase);
372 {
373 let _enter = span.enter();
374 trace_phase_start(phase);
375 }
376
377 Self {
378 phase,
379 start: Instant::now(),
380 span,
381 }
382 }
383
384 pub fn span(&self) -> &Span {
386 &self.span
387 }
388}
389
390impl Drop for PhaseGuard {
391 fn drop(&mut self) {
392 let _enter = self.span.enter();
393 let duration_ms = u64::try_from(self.start.elapsed().as_millis()).unwrap_or(u64::MAX);
394 trace_phase_end(self.phase, duration_ms);
395 }
396}
397
398#[cfg(test)]
399mod tests {
400 use super::*;
401
402 #[test]
403 fn test_protocol_span() {
404 let span = protocol_span("TestProtocol", &RoleName::from_static("Client"), None);
405 assert!(span.is_disabled() || !span.is_disabled());
406 }
407
408 #[test]
409 fn test_protocol_span_indexed() {
410 let span = protocol_span("TestProtocol", &RoleName::from_static("Worker"), Some(3));
411 assert!(span.is_disabled() || !span.is_disabled());
412 }
413
414 #[test]
415 fn test_phase_span() {
416 let span = phase_span(
417 "TestProtocol",
418 &RoleName::from_static("Client"),
419 "handshake",
420 );
421 assert!(span.is_disabled() || !span.is_disabled());
422 }
423}