Skip to main content

embeddenator_obs/obs/
opentelemetry.rs

1//! OpenTelemetry Integration
2//!
3//! Provides OpenTelemetry-compatible tracing and metrics export for
4//! distributed observability and integration with OTLP collectors.
5//!
6//! # Features
7//!
8//! - Span context propagation (W3C Trace Context)
9//! - OTLP-compatible span export
10//! - Distributed trace IDs
11//! - Parent-child span relationships
12//! - Span attributes and events
13//!
14//! # Usage
15//!
16//! ```rust,ignore
17//! use embeddenator_obs::opentelemetry::{OtelSpan, OtelExporter};
18//!
19//! let mut span = OtelSpan::new("operation");
20//! span.set_attribute("key", "value");
21//! span.add_event("checkpoint");
22//! span.end();
23//!
24//! let exporter = OtelExporter::new();
25//! let json = exporter.export_spans(&[span]);
26//! ```
27
28use std::collections::HashMap;
29use std::sync::atomic::{AtomicU64, Ordering};
30use std::time::{Duration, SystemTime, UNIX_EPOCH};
31
32/// Global trace ID counter for generating unique IDs.
33static TRACE_ID_COUNTER: AtomicU64 = AtomicU64::new(1);
34static SPAN_ID_COUNTER: AtomicU64 = AtomicU64::new(1);
35
36/// OpenTelemetry span status.
37#[derive(Debug, Clone, Copy, PartialEq, Eq)]
38pub enum SpanStatus {
39    /// Span completed successfully
40    Ok,
41    /// Span encountered an error
42    Error,
43    /// Status not set
44    Unset,
45}
46
47/// OpenTelemetry span kind.
48#[derive(Debug, Clone, Copy, PartialEq, Eq)]
49pub enum SpanKind {
50    /// Internal operation
51    Internal,
52    /// Server-side operation
53    Server,
54    /// Client-side operation
55    Client,
56    /// Producer (message queue)
57    Producer,
58    /// Consumer (message queue)
59    Consumer,
60}
61
62/// OpenTelemetry span with full tracing context.
63#[derive(Debug, Clone)]
64pub struct OtelSpan {
65    /// Unique trace ID (128-bit in production, 64-bit here for simplicity)
66    pub trace_id: u64,
67    /// Unique span ID
68    pub span_id: u64,
69    /// Parent span ID (0 if root)
70    pub parent_span_id: u64,
71    /// Operation name
72    pub name: String,
73    /// Span kind
74    pub kind: SpanKind,
75    /// Start timestamp (nanoseconds since epoch)
76    pub start_time_ns: u64,
77    /// End timestamp (0 if still active)
78    pub end_time_ns: u64,
79    /// Span status
80    pub status: SpanStatus,
81    /// Span attributes (key-value pairs)
82    pub attributes: HashMap<String, String>,
83    /// Span events
84    pub events: Vec<SpanEvent>,
85}
86
87/// Span event (checkpoint within a span).
88#[derive(Debug, Clone)]
89pub struct SpanEvent {
90    /// Event name
91    pub name: String,
92    /// Timestamp (nanoseconds since epoch)
93    pub timestamp_ns: u64,
94    /// Event attributes
95    pub attributes: HashMap<String, String>,
96}
97
98impl OtelSpan {
99    /// Create new root span.
100    pub fn new(name: impl Into<String>) -> Self {
101        let trace_id = TRACE_ID_COUNTER.fetch_add(1, Ordering::Relaxed);
102        let span_id = SPAN_ID_COUNTER.fetch_add(1, Ordering::Relaxed);
103
104        Self {
105            trace_id,
106            span_id,
107            parent_span_id: 0,
108            name: name.into(),
109            kind: SpanKind::Internal,
110            start_time_ns: system_time_nanos(),
111            end_time_ns: 0,
112            status: SpanStatus::Unset,
113            attributes: HashMap::new(),
114            events: Vec::new(),
115        }
116    }
117
118    /// Create child span with parent context.
119    pub fn new_child(name: impl Into<String>, parent: &OtelSpan) -> Self {
120        let span_id = SPAN_ID_COUNTER.fetch_add(1, Ordering::Relaxed);
121
122        Self {
123            trace_id: parent.trace_id,
124            span_id,
125            parent_span_id: parent.span_id,
126            name: name.into(),
127            kind: SpanKind::Internal,
128            start_time_ns: system_time_nanos(),
129            end_time_ns: 0,
130            status: SpanStatus::Unset,
131            attributes: HashMap::new(),
132            events: Vec::new(),
133        }
134    }
135
136    /// Set span kind.
137    pub fn set_kind(&mut self, kind: SpanKind) {
138        self.kind = kind;
139    }
140
141    /// Set span attribute.
142    pub fn set_attribute(&mut self, key: impl Into<String>, value: impl Into<String>) {
143        self.attributes.insert(key.into(), value.into());
144    }
145
146    /// Add span event.
147    pub fn add_event(&mut self, name: impl Into<String>) {
148        self.events.push(SpanEvent {
149            name: name.into(),
150            timestamp_ns: system_time_nanos(),
151            attributes: HashMap::new(),
152        });
153    }
154
155    /// Add span event with attributes.
156    pub fn add_event_with_attributes(
157        &mut self,
158        name: impl Into<String>,
159        attributes: HashMap<String, String>,
160    ) {
161        self.events.push(SpanEvent {
162            name: name.into(),
163            timestamp_ns: system_time_nanos(),
164            attributes,
165        });
166    }
167
168    /// Mark span as completed successfully.
169    pub fn end(&mut self) {
170        self.end_time_ns = system_time_nanos();
171        if self.status == SpanStatus::Unset {
172            self.status = SpanStatus::Ok;
173        }
174    }
175
176    /// Mark span as failed.
177    pub fn end_with_error(&mut self, error: impl Into<String>) {
178        self.end_time_ns = system_time_nanos();
179        self.status = SpanStatus::Error;
180        self.set_attribute("error.message", error);
181    }
182
183    /// Get span duration in nanoseconds.
184    pub fn duration_ns(&self) -> u64 {
185        if self.end_time_ns > 0 {
186            self.end_time_ns.saturating_sub(self.start_time_ns)
187        } else {
188            0
189        }
190    }
191
192    /// Check if span is root (no parent).
193    pub fn is_root(&self) -> bool {
194        self.parent_span_id == 0
195    }
196
197    /// Export as W3C Trace Context header (traceparent).
198    pub fn to_traceparent(&self) -> String {
199        format!("00-{:032x}-{:016x}-01", self.trace_id, self.span_id)
200    }
201
202    /// Parse W3C Trace Context header.
203    pub fn from_traceparent(traceparent: &str, name: impl Into<String>) -> Option<Self> {
204        let parts: Vec<&str> = traceparent.split('-').collect();
205        if parts.len() != 4 || parts[0] != "00" {
206            return None;
207        }
208
209        let trace_id = u64::from_str_radix(&parts[1][16..32], 16).ok()?;
210        let parent_span_id = u64::from_str_radix(parts[2], 16).ok()?;
211        let span_id = SPAN_ID_COUNTER.fetch_add(1, Ordering::Relaxed);
212
213        Some(Self {
214            trace_id,
215            span_id,
216            parent_span_id,
217            name: name.into(),
218            kind: SpanKind::Internal,
219            start_time_ns: system_time_nanos(),
220            end_time_ns: 0,
221            status: SpanStatus::Unset,
222            attributes: HashMap::new(),
223            events: Vec::new(),
224        })
225    }
226}
227
228/// OpenTelemetry exporter for OTLP-compatible output.
229pub struct OtelExporter {
230    /// Service name
231    service_name: String,
232}
233
234impl OtelExporter {
235    /// Create new OTLP exporter.
236    pub fn new() -> Self {
237        Self {
238            service_name: "embeddenator".to_string(),
239        }
240    }
241
242    /// Set service name.
243    pub fn with_service_name(mut self, name: impl Into<String>) -> Self {
244        self.service_name = name.into();
245        self
246    }
247
248    /// Export spans as JSON (simplified OTLP format).
249    pub fn export_spans(&self, spans: &[OtelSpan]) -> String {
250        let mut output = String::from("{\n  \"resourceSpans\": [\n    {\n");
251        output.push_str(&format!("      \"resource\": {{\"attributes\": [{{\"key\": \"service.name\", \"value\": \"{}\"}}]}},\n", self.service_name));
252        output.push_str("      \"scopeSpans\": [\n        {\n          \"spans\": [\n");
253
254        for (i, span) in spans.iter().enumerate() {
255            if i > 0 {
256                output.push_str(",\n");
257            }
258            output.push_str(&self.span_to_json(span));
259        }
260
261        output.push_str("\n          ]\n        }\n      ]\n    }\n  ]\n}");
262        output
263    }
264
265    fn span_to_json(&self, span: &OtelSpan) -> String {
266        let mut json = String::new();
267        json.push_str("            {\n");
268        json.push_str(&format!(
269            "              \"traceId\": \"{:032x}\",\n",
270            span.trace_id
271        ));
272        json.push_str(&format!(
273            "              \"spanId\": \"{:016x}\",\n",
274            span.span_id
275        ));
276        if span.parent_span_id != 0 {
277            json.push_str(&format!(
278                "              \"parentSpanId\": \"{:016x}\",\n",
279                span.parent_span_id
280            ));
281        }
282        json.push_str(&format!("              \"name\": \"{}\",\n", span.name));
283        json.push_str(&format!(
284            "              \"kind\": {:?},\n",
285            span.kind as u32
286        ));
287        json.push_str(&format!(
288            "              \"startTimeUnixNano\": {},\n",
289            span.start_time_ns
290        ));
291        json.push_str(&format!(
292            "              \"endTimeUnixNano\": {},\n",
293            span.end_time_ns
294        ));
295        json.push_str(&format!(
296            "              \"status\": {{\"code\": {}}}\n",
297            span.status as u32
298        ));
299        json.push_str("            }");
300        json
301    }
302}
303
304impl Default for OtelExporter {
305    fn default() -> Self {
306        Self::new()
307    }
308}
309
310/// Get current system time in nanoseconds since UNIX epoch.
311fn system_time_nanos() -> u64 {
312    SystemTime::now()
313        .duration_since(UNIX_EPOCH)
314        .unwrap_or(Duration::ZERO)
315        .as_nanos() as u64
316}
317
318#[cfg(test)]
319mod tests {
320    use super::*;
321
322    #[test]
323    fn test_span_creation() {
324        let span = OtelSpan::new("test_operation");
325        assert!(span.trace_id > 0);
326        assert!(span.span_id > 0);
327        assert_eq!(span.parent_span_id, 0);
328        assert!(span.is_root());
329        assert_eq!(span.status, SpanStatus::Unset);
330    }
331
332    #[test]
333    fn test_child_span() {
334        let parent = OtelSpan::new("parent");
335        let child = OtelSpan::new_child("child", &parent);
336
337        assert_eq!(child.trace_id, parent.trace_id);
338        assert_ne!(child.span_id, parent.span_id);
339        assert_eq!(child.parent_span_id, parent.span_id);
340        assert!(!child.is_root());
341    }
342
343    #[test]
344    fn test_span_attributes() {
345        let mut span = OtelSpan::new("test");
346        span.set_attribute("key1", "value1");
347        span.set_attribute("key2", "value2");
348
349        assert_eq!(span.attributes.get("key1"), Some(&"value1".to_string()));
350        assert_eq!(span.attributes.get("key2"), Some(&"value2".to_string()));
351    }
352
353    #[test]
354    fn test_span_events() {
355        let mut span = OtelSpan::new("test");
356        span.add_event("event1");
357        span.add_event("event2");
358
359        assert_eq!(span.events.len(), 2);
360        assert_eq!(span.events[0].name, "event1");
361        assert_eq!(span.events[1].name, "event2");
362    }
363
364    #[test]
365    fn test_span_end() {
366        let mut span = OtelSpan::new("test");
367        std::thread::sleep(Duration::from_millis(10));
368        span.end();
369
370        assert!(span.end_time_ns > span.start_time_ns);
371        assert_eq!(span.status, SpanStatus::Ok);
372        assert!(span.duration_ns() > 0);
373    }
374
375    #[test]
376    fn test_span_error() {
377        let mut span = OtelSpan::new("test");
378        span.end_with_error("Something went wrong");
379
380        assert_eq!(span.status, SpanStatus::Error);
381        assert!(span.attributes.contains_key("error.message"));
382    }
383
384    #[test]
385    fn test_traceparent_export() {
386        let span = OtelSpan::new("test");
387        let traceparent = span.to_traceparent();
388
389        assert!(traceparent.starts_with("00-"));
390        assert!(traceparent.ends_with("-01"));
391    }
392
393    #[test]
394    fn test_traceparent_parse() {
395        let parent = OtelSpan::new("parent");
396        let traceparent = parent.to_traceparent();
397
398        let child = OtelSpan::from_traceparent(&traceparent, "child").unwrap();
399        assert_eq!(child.trace_id, parent.trace_id);
400        assert_eq!(child.parent_span_id, parent.span_id);
401    }
402
403    #[test]
404    fn test_exporter() {
405        let mut span = OtelSpan::new("test");
406        span.set_attribute("key", "value");
407        span.end();
408
409        let exporter = OtelExporter::new().with_service_name("test_service");
410        let json = exporter.export_spans(&[span]);
411
412        assert!(json.contains("test_service"));
413        assert!(json.contains("test"));
414        assert!(json.contains("traceId"));
415    }
416}