Skip to main content

sage_runtime/
tracing.rs

1//! Tracing support for Sage programs.
2//!
3//! Supports multiple backends:
4//! - `ndjson`: Newline-delimited JSON to stderr or file (default, native only)
5//! - `otlp`: OpenTelemetry Protocol HTTP/JSON export (native only)
6//! - `console`: Browser console output (WASM only)
7//! - `none`: Tracing disabled
8
9use std::sync::atomic::{AtomicBool, Ordering};
10use std::sync::{Arc, Mutex, OnceLock};
11
12#[cfg(not(target_arch = "wasm32"))]
13use std::sync::atomic::AtomicU64;
14use std::time::{SystemTime, UNIX_EPOCH};
15
16/// Global tracing state.
17static TRACER: OnceLock<Arc<Tracer>> = OnceLock::new();
18
19/// Check if tracing is enabled.
20#[inline]
21pub fn is_enabled() -> bool {
22    TRACER
23        .get()
24        .map(|t| t.enabled.load(Ordering::Relaxed))
25        .unwrap_or(false)
26}
27
28/// Configuration for the tracing backend.
29#[derive(Debug, Clone)]
30pub struct TracingConfig {
31    /// Backend type: "ndjson", "otlp", "console", or "none".
32    pub backend: String,
33    /// OTLP endpoint URL (for otlp backend).
34    pub otlp_endpoint: Option<String>,
35    /// Service name for trace attribution.
36    pub service_name: String,
37}
38
39impl Default for TracingConfig {
40    fn default() -> Self {
41        Self {
42            #[cfg(not(target_arch = "wasm32"))]
43            backend: "ndjson".to_string(),
44            #[cfg(target_arch = "wasm32")]
45            backend: "console".to_string(),
46            otlp_endpoint: None,
47            service_name: "sage-agent".to_string(),
48        }
49    }
50}
51
52/// Initialize tracing with the given configuration.
53pub fn init_with_config(config: TracingConfig) {
54    let tracer = match config.backend.as_str() {
55        "none" => Tracer::disabled(),
56        #[cfg(not(target_arch = "wasm32"))]
57        "otlp" => {
58            let endpoint = config
59                .otlp_endpoint
60                .unwrap_or_else(|| "http://localhost:4318/v1/traces".to_string());
61            Tracer::otlp(endpoint, config.service_name)
62        }
63        #[cfg(target_arch = "wasm32")]
64        "console" => Tracer::console(),
65        #[cfg(not(target_arch = "wasm32"))]
66        _ => {
67            // Check environment variables for NDJSON output
68            if let Ok(path) = std::env::var("SAGE_TRACE_FILE") {
69                match std::fs::OpenOptions::new()
70                    .create(true)
71                    .append(true)
72                    .open(&path)
73                {
74                    Ok(file) => Tracer::ndjson_file(file),
75                    Err(e) => {
76                        eprintln!("Warning: Could not open trace file {}: {}", path, e);
77                        Tracer::ndjson_stderr()
78                    }
79                }
80            } else if std::env::var("SAGE_TRACE").is_ok() {
81                Tracer::ndjson_stderr()
82            } else {
83                Tracer::disabled()
84            }
85        }
86        #[cfg(target_arch = "wasm32")]
87        _ => Tracer::console(),
88    };
89
90    let _ = TRACER.set(Arc::new(tracer));
91}
92
93/// Initialize tracing from environment variables (legacy compatibility).
94pub fn init() {
95    init_with_config(TracingConfig::default());
96}
97
98/// Get current timestamp in milliseconds.
99fn timestamp_ms() -> u64 {
100    SystemTime::now()
101        .duration_since(UNIX_EPOCH)
102        .map(|d| d.as_millis() as u64)
103        .unwrap_or(0)
104}
105
106/// Get current timestamp in nanoseconds.
107#[cfg(not(target_arch = "wasm32"))]
108fn timestamp_ns() -> u64 {
109    SystemTime::now()
110        .duration_since(UNIX_EPOCH)
111        .map(|d| d.as_nanos() as u64)
112        .unwrap_or(0)
113}
114
115/// Generate a random trace ID (16 bytes as hex).
116#[cfg(not(target_arch = "wasm32"))]
117fn generate_trace_id() -> String {
118    use std::time::Instant;
119    let now = Instant::now();
120    let seed = now.elapsed().as_nanos() as u64;
121    format!("{:032x}", seed ^ timestamp_ns())
122}
123
124/// Generate a random span ID (8 bytes as hex).
125#[cfg(not(target_arch = "wasm32"))]
126fn generate_span_id() -> String {
127    static COUNTER: AtomicU64 = AtomicU64::new(1);
128    let count = COUNTER.fetch_add(1, Ordering::SeqCst);
129    format!("{:016x}", count ^ (timestamp_ns() & 0xFFFF_FFFF))
130}
131
132/// Tracing backend implementation.
133struct Tracer {
134    enabled: AtomicBool,
135    backend: Mutex<TracerBackend>,
136    #[cfg(not(target_arch = "wasm32"))]
137    service_name: String,
138    #[cfg(not(target_arch = "wasm32"))]
139    trace_id: String,
140}
141
142enum TracerBackend {
143    Disabled,
144    #[cfg(not(target_arch = "wasm32"))]
145    Ndjson(NdjsonBackend),
146    #[cfg(not(target_arch = "wasm32"))]
147    Otlp(OtlpBackend),
148    #[cfg(target_arch = "wasm32")]
149    Console,
150}
151
152impl Tracer {
153    fn disabled() -> Self {
154        Self {
155            enabled: AtomicBool::new(false),
156            backend: Mutex::new(TracerBackend::Disabled),
157            #[cfg(not(target_arch = "wasm32"))]
158            service_name: "sage-agent".to_string(),
159            #[cfg(not(target_arch = "wasm32"))]
160            trace_id: generate_trace_id(),
161        }
162    }
163
164    #[cfg(not(target_arch = "wasm32"))]
165    fn ndjson_stderr() -> Self {
166        Self {
167            enabled: AtomicBool::new(true),
168            backend: Mutex::new(TracerBackend::Ndjson(NdjsonBackend::Stderr)),
169            service_name: "sage-agent".to_string(),
170            trace_id: generate_trace_id(),
171        }
172    }
173
174    #[cfg(not(target_arch = "wasm32"))]
175    fn ndjson_file(file: std::fs::File) -> Self {
176        Self {
177            enabled: AtomicBool::new(true),
178            backend: Mutex::new(TracerBackend::Ndjson(NdjsonBackend::File(file))),
179            service_name: "sage-agent".to_string(),
180            trace_id: generate_trace_id(),
181        }
182    }
183
184    #[cfg(not(target_arch = "wasm32"))]
185    fn otlp(endpoint: String, service_name: String) -> Self {
186        Self {
187            enabled: AtomicBool::new(true),
188            backend: Mutex::new(TracerBackend::Otlp(OtlpBackend::new(endpoint))),
189            service_name,
190            trace_id: generate_trace_id(),
191        }
192    }
193
194    #[cfg(target_arch = "wasm32")]
195    fn console() -> Self {
196        Self {
197            enabled: AtomicBool::new(true),
198            backend: Mutex::new(TracerBackend::Console),
199        }
200    }
201
202    fn emit(&self, kind: &str, data: serde_json::Value) {
203        if !self.enabled.load(Ordering::Relaxed) {
204            return;
205        }
206
207        let mut backend = self.backend.lock().unwrap();
208        match &mut *backend {
209            TracerBackend::Disabled => {}
210            #[cfg(not(target_arch = "wasm32"))]
211            TracerBackend::Ndjson(ndjson) => {
212                ndjson.emit(kind, data);
213            }
214            #[cfg(not(target_arch = "wasm32"))]
215            TracerBackend::Otlp(otlp) => {
216                otlp.emit(kind, data, &self.trace_id, &self.service_name);
217            }
218            #[cfg(target_arch = "wasm32")]
219            TracerBackend::Console => {
220                if let Ok(json) = serde_json::to_string(&serde_json::json!({
221                    "t": timestamp_ms(),
222                    "kind": kind,
223                    "data": data,
224                })) {
225                    sage_runtime_web::console_log(&json);
226                }
227            }
228        }
229    }
230}
231
232// ===========================================================================
233// Native-only backends
234// ===========================================================================
235
236#[cfg(not(target_arch = "wasm32"))]
237mod native_backends {
238    use super::*;
239    use serde::Serialize;
240    use std::io::Write;
241
242    /// NDJSON backend for local trace output.
243    pub(super) enum NdjsonBackend {
244        Stderr,
245        File(std::fs::File),
246    }
247
248    impl NdjsonBackend {
249        pub(super) fn emit(&mut self, kind: &str, data: serde_json::Value) {
250            #[derive(Serialize)]
251            struct TraceEvent<'a> {
252                t: u64,
253                kind: &'a str,
254                #[serde(flatten)]
255                data: serde_json::Value,
256            }
257
258            let event = TraceEvent {
259                t: timestamp_ms(),
260                kind,
261                data,
262            };
263
264            if let Ok(json) = serde_json::to_string(&event) {
265                let line = format!("{}\n", json);
266                match self {
267                    NdjsonBackend::Stderr => {
268                        let _ = std::io::stderr().write_all(line.as_bytes());
269                    }
270                    NdjsonBackend::File(f) => {
271                        let _ = f.write_all(line.as_bytes());
272                    }
273                }
274            }
275        }
276    }
277
278    /// OTLP HTTP/JSON backend for OpenTelemetry export.
279    pub(super) struct OtlpBackend {
280        endpoint: String,
281        pending_spans: Vec<OtlpSpan>,
282    }
283
284    impl OtlpBackend {
285        pub(super) fn new(endpoint: String) -> Self {
286            Self {
287                endpoint,
288                pending_spans: Vec::new(),
289            }
290        }
291
292        pub(super) fn emit(
293            &mut self,
294            kind: &str,
295            data: serde_json::Value,
296            trace_id: &str,
297            service_name: &str,
298        ) {
299            let span_id = generate_span_id();
300            let now_ns = timestamp_ns();
301
302            let span = OtlpSpan {
303                trace_id: trace_id.to_string(),
304                span_id,
305                name: kind.to_string(),
306                kind: 1, // INTERNAL
307                start_time_unix_nano: now_ns,
308                end_time_unix_nano: now_ns,
309                attributes: data_to_attributes(&data),
310                status: OtlpStatus { code: 1 }, // OK
311            };
312
313            self.pending_spans.push(span);
314
315            if self.pending_spans.len() >= 10
316                || kind.contains("stop")
317                || kind.contains("error")
318            {
319                self.flush(service_name);
320            }
321        }
322
323        fn flush(&mut self, service_name: &str) {
324            if self.pending_spans.is_empty() {
325                return;
326            }
327
328            let spans = std::mem::take(&mut self.pending_spans);
329            let payload = OtlpExportRequest {
330                resource_spans: vec![OtlpResourceSpans {
331                    resource: OtlpResource {
332                        attributes: vec![OtlpAttribute {
333                            key: "service.name".to_string(),
334                            value: OtlpValue {
335                                string_value: Some(service_name.to_string()),
336                            },
337                        }],
338                    },
339                    scope_spans: vec![OtlpScopeSpans {
340                        scope: OtlpScope {
341                            name: "sage".to_string(),
342                            version: env!("CARGO_PKG_VERSION").to_string(),
343                        },
344                        spans,
345                    }],
346                }],
347            };
348
349            let endpoint = self.endpoint.clone();
350            if let Ok(json) = serde_json::to_string(&payload) {
351                std::thread::spawn(move || {
352                    let _ = ureq_post(&endpoint, &json);
353                });
354            }
355        }
356    }
357
358    /// Simple blocking HTTP POST (used in background thread).
359    fn ureq_post(url: &str, body: &str) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
360        use std::io::Read;
361        use std::net::TcpStream;
362
363        let url = url.trim_start_matches("http://");
364        let (host_port, path) = url.split_once('/').unwrap_or((url, "v1/traces"));
365        let path = format!("/{}", path);
366
367        let mut stream = TcpStream::connect(host_port)?;
368        stream.set_write_timeout(Some(std::time::Duration::from_secs(5)))?;
369        stream.set_read_timeout(Some(std::time::Duration::from_secs(5)))?;
370
371        let request = format!(
372            "POST {} HTTP/1.1\r\n\
373             Host: {}\r\n\
374             Content-Type: application/json\r\n\
375             Content-Length: {}\r\n\
376             Connection: close\r\n\
377             \r\n\
378             {}",
379            path,
380            host_port,
381            body.len(),
382            body
383        );
384
385        stream.write_all(request.as_bytes())?;
386
387        let mut response = Vec::new();
388        let _ = stream.read_to_end(&mut response);
389
390        Ok(())
391    }
392
393    /// Convert JSON data to OTLP attributes.
394    fn data_to_attributes(data: &serde_json::Value) -> Vec<OtlpAttribute> {
395        let mut attrs = Vec::new();
396
397        if let serde_json::Value::Object(map) = data {
398            for (key, value) in map {
399                let attr = match value {
400                    serde_json::Value::String(s) => OtlpAttribute {
401                        key: key.clone(),
402                        value: OtlpValue {
403                            string_value: Some(s.clone()),
404                        },
405                    },
406                    serde_json::Value::Number(n) => OtlpAttribute {
407                        key: key.clone(),
408                        value: OtlpValue {
409                            string_value: Some(n.to_string()),
410                        },
411                    },
412                    serde_json::Value::Bool(b) => OtlpAttribute {
413                        key: key.clone(),
414                        value: OtlpValue {
415                            string_value: Some(b.to_string()),
416                        },
417                    },
418                    _ => OtlpAttribute {
419                        key: key.clone(),
420                        value: OtlpValue {
421                            string_value: Some(value.to_string()),
422                        },
423                    },
424                };
425                attrs.push(attr);
426            }
427        }
428
429        attrs
430    }
431
432    // OTLP JSON structures
433
434    #[derive(Serialize)]
435    pub(super) struct OtlpExportRequest {
436        #[serde(rename = "resourceSpans")]
437        pub resource_spans: Vec<OtlpResourceSpans>,
438    }
439
440    #[derive(Serialize)]
441    pub(super) struct OtlpResourceSpans {
442        pub resource: OtlpResource,
443        #[serde(rename = "scopeSpans")]
444        pub scope_spans: Vec<OtlpScopeSpans>,
445    }
446
447    #[derive(Serialize)]
448    pub(super) struct OtlpResource {
449        pub attributes: Vec<OtlpAttribute>,
450    }
451
452    #[derive(Serialize)]
453    pub(super) struct OtlpScopeSpans {
454        pub scope: OtlpScope,
455        pub spans: Vec<OtlpSpan>,
456    }
457
458    #[derive(Serialize)]
459    pub(super) struct OtlpScope {
460        pub name: String,
461        pub version: String,
462    }
463
464    #[derive(Serialize)]
465    pub(super) struct OtlpSpan {
466        #[serde(rename = "traceId")]
467        pub trace_id: String,
468        #[serde(rename = "spanId")]
469        pub span_id: String,
470        pub name: String,
471        pub kind: i32,
472        #[serde(rename = "startTimeUnixNano")]
473        pub start_time_unix_nano: u64,
474        #[serde(rename = "endTimeUnixNano")]
475        pub end_time_unix_nano: u64,
476        pub attributes: Vec<OtlpAttribute>,
477        pub status: OtlpStatus,
478    }
479
480    #[derive(Serialize)]
481    pub(super) struct OtlpAttribute {
482        pub key: String,
483        pub value: OtlpValue,
484    }
485
486    #[derive(Serialize)]
487    pub(super) struct OtlpValue {
488        #[serde(rename = "stringValue", skip_serializing_if = "Option::is_none")]
489        pub string_value: Option<String>,
490    }
491
492    #[derive(Serialize)]
493    pub(super) struct OtlpStatus {
494        pub code: i32,
495    }
496}
497
498#[cfg(not(target_arch = "wasm32"))]
499use native_backends::{NdjsonBackend, OtlpBackend};
500
501// ===========================================================================
502// Public API functions
503// ===========================================================================
504
505fn emit_event(kind: &str, data: serde_json::Value) {
506    if let Some(tracer) = TRACER.get() {
507        tracer.emit(kind, data);
508    }
509}
510
511/// Trace an agent spawn event.
512pub fn agent_spawn(agent: &str, id: &str) {
513    emit_event(
514        "agent.spawn",
515        serde_json::json!({
516            "agent": agent,
517            "id": id,
518        }),
519    );
520}
521
522/// Trace an agent emit event.
523pub fn agent_emit(agent: &str, id: &str, value_type: &str) {
524    emit_event(
525        "agent.emit",
526        serde_json::json!({
527            "agent": agent,
528            "id": id,
529            "value_type": value_type,
530        }),
531    );
532}
533
534/// Trace an agent stop event.
535pub fn agent_stop(agent: &str, id: &str, duration_ms: u64) {
536    emit_event(
537        "agent.stop",
538        serde_json::json!({
539            "agent": agent,
540            "id": id,
541            "duration_ms": duration_ms,
542        }),
543    );
544}
545
546/// Trace an agent error event.
547pub fn agent_error(agent: &str, id: &str, error_kind: &str, message: &str) {
548    emit_event(
549        "agent.error",
550        serde_json::json!({
551            "agent": agent,
552            "id": id,
553            "error": {
554                "kind": error_kind,
555                "message": message,
556            },
557        }),
558    );
559}
560
561/// Trace an infer start event.
562pub fn infer_start(agent: &str, id: &str, model: &str, prompt_len: usize) {
563    emit_event(
564        "infer.start",
565        serde_json::json!({
566            "agent": agent,
567            "id": id,
568            "model": model,
569            "prompt_len": prompt_len,
570        }),
571    );
572}
573
574/// Trace an infer complete event.
575pub fn infer_complete(agent: &str, id: &str, model: &str, response_len: usize, duration_ms: u64) {
576    emit_event(
577        "infer.complete",
578        serde_json::json!({
579            "agent": agent,
580            "id": id,
581            "model": model,
582            "response_len": response_len,
583            "duration_ms": duration_ms,
584        }),
585    );
586}
587
588/// Trace an infer error event.
589pub fn infer_error(agent: &str, id: &str, error_kind: &str, message: &str) {
590    emit_event(
591        "infer.error",
592        serde_json::json!({
593            "agent": agent,
594            "id": id,
595            "error": {
596                "kind": error_kind,
597                "message": message,
598            },
599        }),
600    );
601}
602
603/// Trace a user-defined event (via the trace() keyword).
604pub fn user(message: &str) {
605    emit_event(
606        "user",
607        serde_json::json!({
608            "message": message,
609        }),
610    );
611}
612
613/// Trace the start of a span block.
614pub fn span_start(name: &str) {
615    emit_event(
616        "span.start",
617        serde_json::json!({
618            "name": name,
619        }),
620    );
621}
622
623/// Trace the end of a span block with duration.
624pub fn span_end(name: &str, duration_ms: u64) {
625    emit_event(
626        "span.end",
627        serde_json::json!({
628            "name": name,
629            "duration_ms": duration_ms,
630        }),
631    );
632}
633
634#[cfg(test)]
635mod tests {
636    use super::*;
637
638    #[test]
639    fn test_timestamp_ms() {
640        let ts = timestamp_ms();
641        // Should be a reasonable timestamp (after year 2020)
642        assert!(ts > 1_577_836_800_000);
643    }
644
645    #[test]
646    fn test_timestamp_ns() {
647        let ts = timestamp_ns();
648        // Should be a reasonable timestamp in nanoseconds
649        assert!(ts > 1_577_836_800_000_000_000);
650    }
651
652    #[test]
653    fn test_generate_trace_id() {
654        let id1 = generate_trace_id();
655        let id2 = generate_trace_id();
656        assert_eq!(id1.len(), 32);
657        assert_eq!(id2.len(), 32);
658    }
659
660    #[test]
661    fn test_generate_span_id() {
662        let id1 = generate_span_id();
663        let id2 = generate_span_id();
664        assert_eq!(id1.len(), 16);
665        assert_eq!(id2.len(), 16);
666        assert_ne!(id1, id2);
667    }
668
669    #[test]
670    fn test_tracing_config_default() {
671        let config = TracingConfig::default();
672        assert_eq!(config.backend, "ndjson");
673        assert!(config.otlp_endpoint.is_none());
674        assert_eq!(config.service_name, "sage-agent");
675    }
676}