Skip to main content

sage_runtime/
tracing.rs

1//! Tracing support for Sage programs.
2//!
3//! Supports multiple backends:
4//! - `ndjson`: Newline-delimited JSON to stderr or file (default)
5//! - `otlp`: OpenTelemetry Protocol HTTP/JSON export
6//! - `none`: Tracing disabled
7
8use serde::Serialize;
9use std::fs::OpenOptions;
10use std::io::Write;
11use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
12use std::sync::{Arc, Mutex, OnceLock};
13use std::time::{SystemTime, UNIX_EPOCH};
14
15/// Global tracing state.
16static TRACER: OnceLock<Arc<Tracer>> = OnceLock::new();
17
18/// Check if tracing is enabled.
19#[inline]
20pub fn is_enabled() -> bool {
21    TRACER
22        .get()
23        .map(|t| t.enabled.load(Ordering::Relaxed))
24        .unwrap_or(false)
25}
26
27/// Configuration for the tracing backend.
28#[derive(Debug, Clone)]
29pub struct TracingConfig {
30    /// Backend type: "ndjson", "otlp", or "none".
31    pub backend: String,
32    /// OTLP endpoint URL (for otlp backend).
33    pub otlp_endpoint: Option<String>,
34    /// Service name for trace attribution.
35    pub service_name: String,
36}
37
38impl Default for TracingConfig {
39    fn default() -> Self {
40        Self {
41            backend: "ndjson".to_string(),
42            otlp_endpoint: None,
43            service_name: "sage-agent".to_string(),
44        }
45    }
46}
47
48/// Initialize tracing with the given configuration.
49pub fn init_with_config(config: TracingConfig) {
50    let tracer = match config.backend.as_str() {
51        "none" => Tracer::disabled(),
52        "otlp" => {
53            let endpoint = config
54                .otlp_endpoint
55                .unwrap_or_else(|| "http://localhost:4318/v1/traces".to_string());
56            Tracer::otlp(endpoint, config.service_name)
57        }
58        "ndjson" | _ => {
59            // Check environment variables for NDJSON output
60            if let Ok(path) = std::env::var("SAGE_TRACE_FILE") {
61                match OpenOptions::new().create(true).append(true).open(&path) {
62                    Ok(file) => Tracer::ndjson_file(file),
63                    Err(e) => {
64                        eprintln!("Warning: Could not open trace file {}: {}", path, e);
65                        Tracer::ndjson_stderr()
66                    }
67                }
68            } else if std::env::var("SAGE_TRACE").is_ok() {
69                Tracer::ndjson_stderr()
70            } else {
71                Tracer::disabled()
72            }
73        }
74    };
75
76    let _ = TRACER.set(Arc::new(tracer));
77}
78
79/// Initialize tracing from environment variables (legacy compatibility).
80pub fn init() {
81    init_with_config(TracingConfig::default());
82}
83
84/// Get current timestamp in milliseconds.
85fn timestamp_ms() -> u64 {
86    SystemTime::now()
87        .duration_since(UNIX_EPOCH)
88        .map(|d| d.as_millis() as u64)
89        .unwrap_or(0)
90}
91
92/// Get current timestamp in nanoseconds.
93fn timestamp_ns() -> u64 {
94    SystemTime::now()
95        .duration_since(UNIX_EPOCH)
96        .map(|d| d.as_nanos() as u64)
97        .unwrap_or(0)
98}
99
100/// Generate a random trace ID (16 bytes as hex).
101fn generate_trace_id() -> String {
102    use std::time::Instant;
103    let now = Instant::now();
104    let seed = now.elapsed().as_nanos() as u64;
105    format!("{:032x}", seed ^ timestamp_ns())
106}
107
108/// Generate a random span ID (8 bytes as hex).
109fn generate_span_id() -> String {
110    static COUNTER: AtomicU64 = AtomicU64::new(1);
111    let count = COUNTER.fetch_add(1, Ordering::SeqCst);
112    format!("{:016x}", count ^ (timestamp_ns() & 0xFFFF_FFFF))
113}
114
115/// Tracing backend implementation.
116struct Tracer {
117    enabled: AtomicBool,
118    backend: Mutex<TracerBackend>,
119    service_name: String,
120    trace_id: String,
121}
122
123enum TracerBackend {
124    Disabled,
125    Ndjson(NdjsonBackend),
126    Otlp(OtlpBackend),
127}
128
129impl Tracer {
130    fn disabled() -> Self {
131        Self {
132            enabled: AtomicBool::new(false),
133            backend: Mutex::new(TracerBackend::Disabled),
134            service_name: "sage-agent".to_string(),
135            trace_id: generate_trace_id(),
136        }
137    }
138
139    fn ndjson_stderr() -> Self {
140        Self {
141            enabled: AtomicBool::new(true),
142            backend: Mutex::new(TracerBackend::Ndjson(NdjsonBackend::Stderr)),
143            service_name: "sage-agent".to_string(),
144            trace_id: generate_trace_id(),
145        }
146    }
147
148    fn ndjson_file(file: std::fs::File) -> Self {
149        Self {
150            enabled: AtomicBool::new(true),
151            backend: Mutex::new(TracerBackend::Ndjson(NdjsonBackend::File(file))),
152            service_name: "sage-agent".to_string(),
153            trace_id: generate_trace_id(),
154        }
155    }
156
157    fn otlp(endpoint: String, service_name: String) -> Self {
158        Self {
159            enabled: AtomicBool::new(true),
160            backend: Mutex::new(TracerBackend::Otlp(OtlpBackend::new(endpoint))),
161            service_name,
162            trace_id: generate_trace_id(),
163        }
164    }
165
166    fn emit(&self, kind: &str, data: serde_json::Value) {
167        if !self.enabled.load(Ordering::Relaxed) {
168            return;
169        }
170
171        let mut backend = self.backend.lock().unwrap();
172        match &mut *backend {
173            TracerBackend::Disabled => {}
174            TracerBackend::Ndjson(ndjson) => {
175                ndjson.emit(kind, data);
176            }
177            TracerBackend::Otlp(otlp) => {
178                otlp.emit(kind, data, &self.trace_id, &self.service_name);
179            }
180        }
181    }
182}
183
184/// NDJSON backend for local trace output.
185enum NdjsonBackend {
186    Stderr,
187    File(std::fs::File),
188}
189
190impl NdjsonBackend {
191    fn emit(&mut self, kind: &str, data: serde_json::Value) {
192        #[derive(Serialize)]
193        struct TraceEvent<'a> {
194            t: u64,
195            kind: &'a str,
196            #[serde(flatten)]
197            data: serde_json::Value,
198        }
199
200        let event = TraceEvent {
201            t: timestamp_ms(),
202            kind,
203            data,
204        };
205
206        if let Ok(json) = serde_json::to_string(&event) {
207            let line = format!("{}\n", json);
208            match self {
209                NdjsonBackend::Stderr => {
210                    let _ = std::io::stderr().write_all(line.as_bytes());
211                }
212                NdjsonBackend::File(f) => {
213                    let _ = f.write_all(line.as_bytes());
214                }
215            }
216        }
217    }
218}
219
220/// OTLP HTTP/JSON backend for OpenTelemetry export.
221struct OtlpBackend {
222    endpoint: String,
223    pending_spans: Vec<OtlpSpan>,
224}
225
226impl OtlpBackend {
227    fn new(endpoint: String) -> Self {
228        Self {
229            endpoint,
230            pending_spans: Vec::new(),
231        }
232    }
233
234    fn emit(&mut self, kind: &str, data: serde_json::Value, trace_id: &str, service_name: &str) {
235        let span_id = generate_span_id();
236        let now_ns = timestamp_ns();
237
238        // Convert our event to an OTLP span
239        let span = OtlpSpan {
240            trace_id: trace_id.to_string(),
241            span_id,
242            name: kind.to_string(),
243            kind: 1, // INTERNAL
244            start_time_unix_nano: now_ns,
245            end_time_unix_nano: now_ns,
246            attributes: data_to_attributes(&data),
247            status: OtlpStatus { code: 1 }, // OK
248        };
249
250        self.pending_spans.push(span);
251
252        // Flush periodically (every 10 spans or on terminal events)
253        if self.pending_spans.len() >= 10 || kind.contains("stop") || kind.contains("error") {
254            self.flush(service_name);
255        }
256    }
257
258    fn flush(&mut self, service_name: &str) {
259        if self.pending_spans.is_empty() {
260            return;
261        }
262
263        let spans = std::mem::take(&mut self.pending_spans);
264        let payload = OtlpExportRequest {
265            resource_spans: vec![OtlpResourceSpans {
266                resource: OtlpResource {
267                    attributes: vec![OtlpAttribute {
268                        key: "service.name".to_string(),
269                        value: OtlpValue {
270                            string_value: Some(service_name.to_string()),
271                        },
272                    }],
273                },
274                scope_spans: vec![OtlpScopeSpans {
275                    scope: OtlpScope {
276                        name: "sage".to_string(),
277                        version: env!("CARGO_PKG_VERSION").to_string(),
278                    },
279                    spans,
280                }],
281            }],
282        };
283
284        // Fire-and-forget async export
285        let endpoint = self.endpoint.clone();
286        if let Ok(json) = serde_json::to_string(&payload) {
287            std::thread::spawn(move || {
288                let _ = ureq_post(&endpoint, &json);
289            });
290        }
291    }
292}
293
294/// Simple blocking HTTP POST (used in background thread).
295fn ureq_post(url: &str, body: &str) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
296    use std::io::Read;
297    use std::net::TcpStream;
298
299    // Parse URL
300    let url = url.trim_start_matches("http://");
301    let (host_port, path) = url.split_once('/').unwrap_or((url, "v1/traces"));
302    let path = format!("/{}", path);
303
304    // Connect and send
305    let mut stream = TcpStream::connect(host_port)?;
306    stream.set_write_timeout(Some(std::time::Duration::from_secs(5)))?;
307    stream.set_read_timeout(Some(std::time::Duration::from_secs(5)))?;
308
309    let request = format!(
310        "POST {} HTTP/1.1\r\n\
311         Host: {}\r\n\
312         Content-Type: application/json\r\n\
313         Content-Length: {}\r\n\
314         Connection: close\r\n\
315         \r\n\
316         {}",
317        path,
318        host_port,
319        body.len(),
320        body
321    );
322
323    stream.write_all(request.as_bytes())?;
324
325    // Read response (just drain it)
326    let mut response = Vec::new();
327    let _ = stream.read_to_end(&mut response);
328
329    Ok(())
330}
331
332/// Convert JSON data to OTLP attributes.
333fn data_to_attributes(data: &serde_json::Value) -> Vec<OtlpAttribute> {
334    let mut attrs = Vec::new();
335
336    if let serde_json::Value::Object(map) = data {
337        for (key, value) in map {
338            let attr = match value {
339                serde_json::Value::String(s) => OtlpAttribute {
340                    key: key.clone(),
341                    value: OtlpValue {
342                        string_value: Some(s.clone()),
343                    },
344                },
345                serde_json::Value::Number(n) => OtlpAttribute {
346                    key: key.clone(),
347                    value: OtlpValue {
348                        string_value: Some(n.to_string()),
349                    },
350                },
351                serde_json::Value::Bool(b) => OtlpAttribute {
352                    key: key.clone(),
353                    value: OtlpValue {
354                        string_value: Some(b.to_string()),
355                    },
356                },
357                _ => OtlpAttribute {
358                    key: key.clone(),
359                    value: OtlpValue {
360                        string_value: Some(value.to_string()),
361                    },
362                },
363            };
364            attrs.push(attr);
365        }
366    }
367
368    attrs
369}
370
371// OTLP JSON structures (simplified for HTTP/JSON export)
372
373#[derive(Serialize)]
374struct OtlpExportRequest {
375    #[serde(rename = "resourceSpans")]
376    resource_spans: Vec<OtlpResourceSpans>,
377}
378
379#[derive(Serialize)]
380struct OtlpResourceSpans {
381    resource: OtlpResource,
382    #[serde(rename = "scopeSpans")]
383    scope_spans: Vec<OtlpScopeSpans>,
384}
385
386#[derive(Serialize)]
387struct OtlpResource {
388    attributes: Vec<OtlpAttribute>,
389}
390
391#[derive(Serialize)]
392struct OtlpScopeSpans {
393    scope: OtlpScope,
394    spans: Vec<OtlpSpan>,
395}
396
397#[derive(Serialize)]
398struct OtlpScope {
399    name: String,
400    version: String,
401}
402
403#[derive(Serialize)]
404struct OtlpSpan {
405    #[serde(rename = "traceId")]
406    trace_id: String,
407    #[serde(rename = "spanId")]
408    span_id: String,
409    name: String,
410    kind: i32,
411    #[serde(rename = "startTimeUnixNano")]
412    start_time_unix_nano: u64,
413    #[serde(rename = "endTimeUnixNano")]
414    end_time_unix_nano: u64,
415    attributes: Vec<OtlpAttribute>,
416    status: OtlpStatus,
417}
418
419#[derive(Serialize)]
420struct OtlpAttribute {
421    key: String,
422    value: OtlpValue,
423}
424
425#[derive(Serialize)]
426struct OtlpValue {
427    #[serde(rename = "stringValue", skip_serializing_if = "Option::is_none")]
428    string_value: Option<String>,
429}
430
431#[derive(Serialize)]
432struct OtlpStatus {
433    code: i32,
434}
435
436// Public API functions
437
438fn emit_event(kind: &str, data: serde_json::Value) {
439    if let Some(tracer) = TRACER.get() {
440        tracer.emit(kind, data);
441    }
442}
443
444/// Trace an agent spawn event.
445pub fn agent_spawn(agent: &str, id: &str) {
446    emit_event(
447        "agent.spawn",
448        serde_json::json!({
449            "agent": agent,
450            "id": id,
451        }),
452    );
453}
454
455/// Trace an agent emit event.
456pub fn agent_emit(agent: &str, id: &str, value_type: &str) {
457    emit_event(
458        "agent.emit",
459        serde_json::json!({
460            "agent": agent,
461            "id": id,
462            "value_type": value_type,
463        }),
464    );
465}
466
467/// Trace an agent stop event.
468pub fn agent_stop(agent: &str, id: &str, duration_ms: u64) {
469    emit_event(
470        "agent.stop",
471        serde_json::json!({
472            "agent": agent,
473            "id": id,
474            "duration_ms": duration_ms,
475        }),
476    );
477}
478
479/// Trace an agent error event.
480pub fn agent_error(agent: &str, id: &str, error_kind: &str, message: &str) {
481    emit_event(
482        "agent.error",
483        serde_json::json!({
484            "agent": agent,
485            "id": id,
486            "error": {
487                "kind": error_kind,
488                "message": message,
489            },
490        }),
491    );
492}
493
494/// Trace an infer start event.
495pub fn infer_start(agent: &str, id: &str, model: &str, prompt_len: usize) {
496    emit_event(
497        "infer.start",
498        serde_json::json!({
499            "agent": agent,
500            "id": id,
501            "model": model,
502            "prompt_len": prompt_len,
503        }),
504    );
505}
506
507/// Trace an infer complete event.
508pub fn infer_complete(agent: &str, id: &str, model: &str, response_len: usize, duration_ms: u64) {
509    emit_event(
510        "infer.complete",
511        serde_json::json!({
512            "agent": agent,
513            "id": id,
514            "model": model,
515            "response_len": response_len,
516            "duration_ms": duration_ms,
517        }),
518    );
519}
520
521/// Trace an infer error event.
522pub fn infer_error(agent: &str, id: &str, error_kind: &str, message: &str) {
523    emit_event(
524        "infer.error",
525        serde_json::json!({
526            "agent": agent,
527            "id": id,
528            "error": {
529                "kind": error_kind,
530                "message": message,
531            },
532        }),
533    );
534}
535
536/// Trace a user-defined event (via the trace() keyword).
537pub fn user(message: &str) {
538    emit_event(
539        "user",
540        serde_json::json!({
541            "message": message,
542        }),
543    );
544}
545
546/// Trace the start of a span block.
547pub fn span_start(name: &str) {
548    emit_event(
549        "span.start",
550        serde_json::json!({
551            "name": name,
552        }),
553    );
554}
555
556/// Trace the end of a span block with duration.
557pub fn span_end(name: &str, duration_ms: u64) {
558    emit_event(
559        "span.end",
560        serde_json::json!({
561            "name": name,
562            "duration_ms": duration_ms,
563        }),
564    );
565}
566
567#[cfg(test)]
568mod tests {
569    use super::*;
570
571    #[test]
572    fn test_timestamp_ms() {
573        let ts = timestamp_ms();
574        // Should be a reasonable timestamp (after year 2020)
575        assert!(ts > 1_577_836_800_000);
576    }
577
578    #[test]
579    fn test_timestamp_ns() {
580        let ts = timestamp_ns();
581        // Should be a reasonable timestamp in nanoseconds
582        assert!(ts > 1_577_836_800_000_000_000);
583    }
584
585    #[test]
586    fn test_generate_trace_id() {
587        let id1 = generate_trace_id();
588        let id2 = generate_trace_id();
589        assert_eq!(id1.len(), 32);
590        assert_eq!(id2.len(), 32);
591    }
592
593    #[test]
594    fn test_generate_span_id() {
595        let id1 = generate_span_id();
596        let id2 = generate_span_id();
597        assert_eq!(id1.len(), 16);
598        assert_eq!(id2.len(), 16);
599        assert_ne!(id1, id2);
600    }
601
602    #[test]
603    fn test_data_to_attributes() {
604        let data = serde_json::json!({
605            "agent": "TestAgent",
606            "id": "123",
607            "count": 42,
608            "active": true,
609        });
610        let attrs = data_to_attributes(&data);
611        assert_eq!(attrs.len(), 4);
612    }
613
614    #[test]
615    fn test_tracing_config_default() {
616        let config = TracingConfig::default();
617        assert_eq!(config.backend, "ndjson");
618        assert!(config.otlp_endpoint.is_none());
619        assert_eq!(config.service_name, "sage-agent");
620    }
621}