Skip to main content

synwire_core/observability/
tracing_bridge.rs

1//! Tracing bridge trait and OpenTelemetry implementation.
2
3use crate::BoxFuture;
4use crate::observability::ObservabilitySpanKind;
5use serde::{Deserialize, Serialize};
6use serde_json::Value;
7use std::collections::HashMap;
8use uuid::Uuid;
9
10/// Outcome of a traced span.
11#[derive(Debug, Clone, Serialize, Deserialize)]
12pub enum SpanOutcome {
13    /// The operation completed successfully.
14    Success,
15    /// The operation failed with an error message.
16    Error(String),
17}
18
19/// Trait for bridging observability events to a tracing backend.
20pub trait TracingBridge: Send + Sync {
21    /// Begins a new span, returning a span identifier.
22    fn begin_span(
23        &self,
24        name: &str,
25        kind: ObservabilitySpanKind,
26        run_id: Uuid,
27        attributes: &HashMap<String, Value>,
28    ) -> BoxFuture<'_, Uuid>;
29
30    /// Ends a span with the given outcome.
31    fn end_span(
32        &self,
33        span_id: Uuid,
34        outcome: SpanOutcome,
35        attributes: &HashMap<String, Value>,
36    ) -> BoxFuture<'_, ()>;
37}
38
39/// RAII guard that ends a span when dropped.
40///
41/// If not explicitly completed via [`SpanGuard::complete`], the span is
42/// ended with an error outcome on drop.
43pub struct SpanGuard<B: TracingBridge + 'static> {
44    bridge: std::sync::Arc<B>,
45    span_id: Uuid,
46    completed: std::sync::atomic::AtomicBool,
47}
48
49impl<B: TracingBridge + 'static> SpanGuard<B> {
50    /// Creates a new span guard.
51    pub const fn new(bridge: std::sync::Arc<B>, span_id: Uuid) -> Self {
52        Self {
53            bridge,
54            span_id,
55            completed: std::sync::atomic::AtomicBool::new(false),
56        }
57    }
58
59    /// Returns the span identifier.
60    pub const fn span_id(&self) -> Uuid {
61        self.span_id
62    }
63
64    /// Completes the span with the given outcome.
65    ///
66    /// After calling this method, the drop implementation will not end the
67    /// span again.
68    pub async fn complete(self, outcome: SpanOutcome, attributes: &HashMap<String, Value>) {
69        self.completed
70            .store(true, std::sync::atomic::Ordering::Release);
71        self.bridge
72            .end_span(self.span_id, outcome, attributes)
73            .await;
74    }
75}
76
77impl<B: TracingBridge + 'static> Drop for SpanGuard<B> {
78    fn drop(&mut self) {
79        if !self.completed.load(std::sync::atomic::Ordering::Acquire) {
80            let bridge = self.bridge.clone();
81            let span_id = self.span_id;
82            // Best-effort: spawn a task to end the span if the runtime is
83            // available.
84            let _ = std::thread::Builder::new()
85                .name("span-guard-drop".into())
86                .spawn(move || {
87                    if let Ok(rt) = tokio::runtime::Handle::try_current() {
88                        rt.block_on(bridge.end_span(
89                            span_id,
90                            SpanOutcome::Error("span dropped without completion".into()),
91                            &HashMap::new(),
92                        ));
93                    }
94                });
95        }
96    }
97}
98
99/// OpenTelemetry-based tracing bridge.
100///
101/// Maps observability events to `tracing` spans with `GenAI` semantic convention
102/// attributes.
103pub struct OTelTracingBridge {
104    _private: (),
105}
106
107impl std::fmt::Debug for OTelTracingBridge {
108    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
109        f.debug_struct("OTelTracingBridge").finish()
110    }
111}
112
113impl Default for OTelTracingBridge {
114    fn default() -> Self {
115        Self::new()
116    }
117}
118
119impl OTelTracingBridge {
120    /// Creates a new `OTelTracingBridge`.
121    pub const fn new() -> Self {
122        Self { _private: () }
123    }
124}
125
126impl TracingBridge for OTelTracingBridge {
127    fn begin_span(
128        &self,
129        name: &str,
130        kind: ObservabilitySpanKind,
131        _run_id: Uuid,
132        _attributes: &HashMap<String, Value>,
133    ) -> BoxFuture<'_, Uuid> {
134        let span_id = Uuid::new_v4();
135        tracing::info_span!("synwire.span", otel.name = %name, synwire.span_kind = %kind);
136        Box::pin(async move { span_id })
137    }
138
139    fn end_span(
140        &self,
141        _span_id: Uuid,
142        outcome: SpanOutcome,
143        _attributes: &HashMap<String, Value>,
144    ) -> BoxFuture<'_, ()> {
145        Box::pin(async move {
146            match outcome {
147                SpanOutcome::Success => {
148                    tracing::debug!("span completed successfully");
149                }
150                SpanOutcome::Error(ref msg) => {
151                    tracing::warn!(error = %msg, "span completed with error");
152                }
153            }
154        })
155    }
156}
157
158#[cfg(test)]
159#[allow(clippy::unwrap_used)]
160mod tests {
161    use super::*;
162    use std::sync::Arc;
163
164    #[tokio::test]
165    async fn begin_and_end_span() {
166        let bridge = OTelTracingBridge::new();
167        let span_id = bridge
168            .begin_span(
169                "test-span",
170                ObservabilitySpanKind::Llm,
171                Uuid::new_v4(),
172                &HashMap::new(),
173            )
174            .await;
175
176        bridge
177            .end_span(span_id, SpanOutcome::Success, &HashMap::new())
178            .await;
179    }
180
181    #[tokio::test]
182    async fn span_guard_completes() {
183        let bridge = Arc::new(OTelTracingBridge::new());
184        let span_id = bridge
185            .begin_span(
186                "guarded",
187                ObservabilitySpanKind::Tool,
188                Uuid::new_v4(),
189                &HashMap::new(),
190            )
191            .await;
192
193        let guard = SpanGuard::new(Arc::clone(&bridge), span_id);
194        assert_eq!(guard.span_id(), span_id);
195        guard.complete(SpanOutcome::Success, &HashMap::new()).await;
196    }
197
198    #[test]
199    fn span_outcome_serialization() {
200        let success = SpanOutcome::Success;
201        let json = serde_json::to_string(&success).unwrap();
202        assert!(json.contains("Success"));
203
204        let error = SpanOutcome::Error("timeout".into());
205        let json = serde_json::to_string(&error).unwrap();
206        assert!(json.contains("timeout"));
207    }
208}