1use std::sync::atomic::{AtomicBool, Ordering};
10use std::sync::{Arc, Mutex, OnceLock};
11
12#[cfg(not(target_arch = "wasm32"))]
13use std::sync::atomic::AtomicU64;
14use std::time::{SystemTime, UNIX_EPOCH};
15
16static TRACER: OnceLock<Arc<Tracer>> = OnceLock::new();
18
19#[inline]
21pub fn is_enabled() -> bool {
22 TRACER
23 .get()
24 .map(|t| t.enabled.load(Ordering::Relaxed))
25 .unwrap_or(false)
26}
27
28#[derive(Debug, Clone)]
30pub struct TracingConfig {
31 pub backend: String,
33 pub otlp_endpoint: Option<String>,
35 pub service_name: String,
37}
38
39impl Default for TracingConfig {
40 fn default() -> Self {
41 Self {
42 #[cfg(not(target_arch = "wasm32"))]
43 backend: "ndjson".to_string(),
44 #[cfg(target_arch = "wasm32")]
45 backend: "console".to_string(),
46 otlp_endpoint: None,
47 service_name: "sage-agent".to_string(),
48 }
49 }
50}
51
52pub fn init_with_config(config: TracingConfig) {
54 let tracer = match config.backend.as_str() {
55 "none" => Tracer::disabled(),
56 #[cfg(not(target_arch = "wasm32"))]
57 "otlp" => {
58 let endpoint = config
59 .otlp_endpoint
60 .unwrap_or_else(|| "http://localhost:4318/v1/traces".to_string());
61 Tracer::otlp(endpoint, config.service_name)
62 }
63 #[cfg(target_arch = "wasm32")]
64 "console" => Tracer::console(),
65 #[cfg(not(target_arch = "wasm32"))]
66 _ => {
67 if let Ok(path) = std::env::var("SAGE_TRACE_FILE") {
69 match std::fs::OpenOptions::new()
70 .create(true)
71 .append(true)
72 .open(&path)
73 {
74 Ok(file) => Tracer::ndjson_file(file),
75 Err(e) => {
76 eprintln!("Warning: Could not open trace file {}: {}", path, e);
77 Tracer::ndjson_stderr()
78 }
79 }
80 } else if std::env::var("SAGE_TRACE").is_ok() {
81 Tracer::ndjson_stderr()
82 } else {
83 Tracer::disabled()
84 }
85 }
86 #[cfg(target_arch = "wasm32")]
87 _ => Tracer::console(),
88 };
89
90 let _ = TRACER.set(Arc::new(tracer));
91}
92
93pub fn init() {
95 init_with_config(TracingConfig::default());
96}
97
98fn timestamp_ms() -> u64 {
100 SystemTime::now()
101 .duration_since(UNIX_EPOCH)
102 .map(|d| d.as_millis() as u64)
103 .unwrap_or(0)
104}
105
106#[cfg(not(target_arch = "wasm32"))]
108fn timestamp_ns() -> u64 {
109 SystemTime::now()
110 .duration_since(UNIX_EPOCH)
111 .map(|d| d.as_nanos() as u64)
112 .unwrap_or(0)
113}
114
115#[cfg(not(target_arch = "wasm32"))]
117fn generate_trace_id() -> String {
118 use std::time::Instant;
119 let now = Instant::now();
120 let seed = now.elapsed().as_nanos() as u64;
121 format!("{:032x}", seed ^ timestamp_ns())
122}
123
124#[cfg(not(target_arch = "wasm32"))]
126fn generate_span_id() -> String {
127 static COUNTER: AtomicU64 = AtomicU64::new(1);
128 let count = COUNTER.fetch_add(1, Ordering::SeqCst);
129 format!("{:016x}", count ^ (timestamp_ns() & 0xFFFF_FFFF))
130}
131
132struct Tracer {
134 enabled: AtomicBool,
135 backend: Mutex<TracerBackend>,
136 #[cfg(not(target_arch = "wasm32"))]
137 service_name: String,
138 #[cfg(not(target_arch = "wasm32"))]
139 trace_id: String,
140}
141
142enum TracerBackend {
143 Disabled,
144 #[cfg(not(target_arch = "wasm32"))]
145 Ndjson(NdjsonBackend),
146 #[cfg(not(target_arch = "wasm32"))]
147 Otlp(OtlpBackend),
148 #[cfg(target_arch = "wasm32")]
149 Console,
150}
151
152impl Tracer {
153 fn disabled() -> Self {
154 Self {
155 enabled: AtomicBool::new(false),
156 backend: Mutex::new(TracerBackend::Disabled),
157 #[cfg(not(target_arch = "wasm32"))]
158 service_name: "sage-agent".to_string(),
159 #[cfg(not(target_arch = "wasm32"))]
160 trace_id: generate_trace_id(),
161 }
162 }
163
164 #[cfg(not(target_arch = "wasm32"))]
165 fn ndjson_stderr() -> Self {
166 Self {
167 enabled: AtomicBool::new(true),
168 backend: Mutex::new(TracerBackend::Ndjson(NdjsonBackend::Stderr)),
169 service_name: "sage-agent".to_string(),
170 trace_id: generate_trace_id(),
171 }
172 }
173
174 #[cfg(not(target_arch = "wasm32"))]
175 fn ndjson_file(file: std::fs::File) -> Self {
176 Self {
177 enabled: AtomicBool::new(true),
178 backend: Mutex::new(TracerBackend::Ndjson(NdjsonBackend::File(file))),
179 service_name: "sage-agent".to_string(),
180 trace_id: generate_trace_id(),
181 }
182 }
183
184 #[cfg(not(target_arch = "wasm32"))]
185 fn otlp(endpoint: String, service_name: String) -> Self {
186 Self {
187 enabled: AtomicBool::new(true),
188 backend: Mutex::new(TracerBackend::Otlp(OtlpBackend::new(endpoint))),
189 service_name,
190 trace_id: generate_trace_id(),
191 }
192 }
193
194 #[cfg(target_arch = "wasm32")]
195 fn console() -> Self {
196 Self {
197 enabled: AtomicBool::new(true),
198 backend: Mutex::new(TracerBackend::Console),
199 }
200 }
201
202 fn emit(&self, kind: &str, data: serde_json::Value) {
203 if !self.enabled.load(Ordering::Relaxed) {
204 return;
205 }
206
207 let mut backend = self.backend.lock().unwrap();
208 match &mut *backend {
209 TracerBackend::Disabled => {}
210 #[cfg(not(target_arch = "wasm32"))]
211 TracerBackend::Ndjson(ndjson) => {
212 ndjson.emit(kind, data);
213 }
214 #[cfg(not(target_arch = "wasm32"))]
215 TracerBackend::Otlp(otlp) => {
216 otlp.emit(kind, data, &self.trace_id, &self.service_name);
217 }
218 #[cfg(target_arch = "wasm32")]
219 TracerBackend::Console => {
220 if let Ok(json) = serde_json::to_string(&serde_json::json!({
221 "t": timestamp_ms(),
222 "kind": kind,
223 "data": data,
224 })) {
225 sage_runtime_web::console_log(&json);
226 }
227 }
228 }
229 }
230}
231
232#[cfg(not(target_arch = "wasm32"))]
237mod native_backends {
238 use super::*;
239 use serde::Serialize;
240 use std::io::Write;
241
242 pub(super) enum NdjsonBackend {
244 Stderr,
245 File(std::fs::File),
246 }
247
248 impl NdjsonBackend {
249 pub(super) fn emit(&mut self, kind: &str, data: serde_json::Value) {
250 #[derive(Serialize)]
251 struct TraceEvent<'a> {
252 t: u64,
253 kind: &'a str,
254 #[serde(flatten)]
255 data: serde_json::Value,
256 }
257
258 let event = TraceEvent {
259 t: timestamp_ms(),
260 kind,
261 data,
262 };
263
264 if let Ok(json) = serde_json::to_string(&event) {
265 let line = format!("{}\n", json);
266 match self {
267 NdjsonBackend::Stderr => {
268 let _ = std::io::stderr().write_all(line.as_bytes());
269 }
270 NdjsonBackend::File(f) => {
271 let _ = f.write_all(line.as_bytes());
272 }
273 }
274 }
275 }
276 }
277
278 pub(super) struct OtlpBackend {
280 endpoint: String,
281 pending_spans: Vec<OtlpSpan>,
282 }
283
284 impl OtlpBackend {
285 pub(super) fn new(endpoint: String) -> Self {
286 Self {
287 endpoint,
288 pending_spans: Vec::new(),
289 }
290 }
291
292 pub(super) fn emit(
293 &mut self,
294 kind: &str,
295 data: serde_json::Value,
296 trace_id: &str,
297 service_name: &str,
298 ) {
299 let span_id = generate_span_id();
300 let now_ns = timestamp_ns();
301
302 let span = OtlpSpan {
303 trace_id: trace_id.to_string(),
304 span_id,
305 name: kind.to_string(),
306 kind: 1, start_time_unix_nano: now_ns,
308 end_time_unix_nano: now_ns,
309 attributes: data_to_attributes(&data),
310 status: OtlpStatus { code: 1 }, };
312
313 self.pending_spans.push(span);
314
315 if self.pending_spans.len() >= 10
316 || kind.contains("stop")
317 || kind.contains("error")
318 {
319 self.flush(service_name);
320 }
321 }
322
323 fn flush(&mut self, service_name: &str) {
324 if self.pending_spans.is_empty() {
325 return;
326 }
327
328 let spans = std::mem::take(&mut self.pending_spans);
329 let payload = OtlpExportRequest {
330 resource_spans: vec![OtlpResourceSpans {
331 resource: OtlpResource {
332 attributes: vec![OtlpAttribute {
333 key: "service.name".to_string(),
334 value: OtlpValue {
335 string_value: Some(service_name.to_string()),
336 },
337 }],
338 },
339 scope_spans: vec![OtlpScopeSpans {
340 scope: OtlpScope {
341 name: "sage".to_string(),
342 version: env!("CARGO_PKG_VERSION").to_string(),
343 },
344 spans,
345 }],
346 }],
347 };
348
349 let endpoint = self.endpoint.clone();
350 if let Ok(json) = serde_json::to_string(&payload) {
351 std::thread::spawn(move || {
352 let _ = ureq_post(&endpoint, &json);
353 });
354 }
355 }
356 }
357
358 fn ureq_post(url: &str, body: &str) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
360 use std::io::Read;
361 use std::net::TcpStream;
362
363 let url = url.trim_start_matches("http://");
364 let (host_port, path) = url.split_once('/').unwrap_or((url, "v1/traces"));
365 let path = format!("/{}", path);
366
367 let mut stream = TcpStream::connect(host_port)?;
368 stream.set_write_timeout(Some(std::time::Duration::from_secs(5)))?;
369 stream.set_read_timeout(Some(std::time::Duration::from_secs(5)))?;
370
371 let request = format!(
372 "POST {} HTTP/1.1\r\n\
373 Host: {}\r\n\
374 Content-Type: application/json\r\n\
375 Content-Length: {}\r\n\
376 Connection: close\r\n\
377 \r\n\
378 {}",
379 path,
380 host_port,
381 body.len(),
382 body
383 );
384
385 stream.write_all(request.as_bytes())?;
386
387 let mut response = Vec::new();
388 let _ = stream.read_to_end(&mut response);
389
390 Ok(())
391 }
392
393 fn data_to_attributes(data: &serde_json::Value) -> Vec<OtlpAttribute> {
395 let mut attrs = Vec::new();
396
397 if let serde_json::Value::Object(map) = data {
398 for (key, value) in map {
399 let attr = match value {
400 serde_json::Value::String(s) => OtlpAttribute {
401 key: key.clone(),
402 value: OtlpValue {
403 string_value: Some(s.clone()),
404 },
405 },
406 serde_json::Value::Number(n) => OtlpAttribute {
407 key: key.clone(),
408 value: OtlpValue {
409 string_value: Some(n.to_string()),
410 },
411 },
412 serde_json::Value::Bool(b) => OtlpAttribute {
413 key: key.clone(),
414 value: OtlpValue {
415 string_value: Some(b.to_string()),
416 },
417 },
418 _ => OtlpAttribute {
419 key: key.clone(),
420 value: OtlpValue {
421 string_value: Some(value.to_string()),
422 },
423 },
424 };
425 attrs.push(attr);
426 }
427 }
428
429 attrs
430 }
431
432 #[derive(Serialize)]
435 pub(super) struct OtlpExportRequest {
436 #[serde(rename = "resourceSpans")]
437 pub resource_spans: Vec<OtlpResourceSpans>,
438 }
439
440 #[derive(Serialize)]
441 pub(super) struct OtlpResourceSpans {
442 pub resource: OtlpResource,
443 #[serde(rename = "scopeSpans")]
444 pub scope_spans: Vec<OtlpScopeSpans>,
445 }
446
447 #[derive(Serialize)]
448 pub(super) struct OtlpResource {
449 pub attributes: Vec<OtlpAttribute>,
450 }
451
452 #[derive(Serialize)]
453 pub(super) struct OtlpScopeSpans {
454 pub scope: OtlpScope,
455 pub spans: Vec<OtlpSpan>,
456 }
457
458 #[derive(Serialize)]
459 pub(super) struct OtlpScope {
460 pub name: String,
461 pub version: String,
462 }
463
464 #[derive(Serialize)]
465 pub(super) struct OtlpSpan {
466 #[serde(rename = "traceId")]
467 pub trace_id: String,
468 #[serde(rename = "spanId")]
469 pub span_id: String,
470 pub name: String,
471 pub kind: i32,
472 #[serde(rename = "startTimeUnixNano")]
473 pub start_time_unix_nano: u64,
474 #[serde(rename = "endTimeUnixNano")]
475 pub end_time_unix_nano: u64,
476 pub attributes: Vec<OtlpAttribute>,
477 pub status: OtlpStatus,
478 }
479
480 #[derive(Serialize)]
481 pub(super) struct OtlpAttribute {
482 pub key: String,
483 pub value: OtlpValue,
484 }
485
486 #[derive(Serialize)]
487 pub(super) struct OtlpValue {
488 #[serde(rename = "stringValue", skip_serializing_if = "Option::is_none")]
489 pub string_value: Option<String>,
490 }
491
492 #[derive(Serialize)]
493 pub(super) struct OtlpStatus {
494 pub code: i32,
495 }
496}
497
498#[cfg(not(target_arch = "wasm32"))]
499use native_backends::{NdjsonBackend, OtlpBackend};
500
501fn emit_event(kind: &str, data: serde_json::Value) {
506 if let Some(tracer) = TRACER.get() {
507 tracer.emit(kind, data);
508 }
509}
510
511pub fn agent_spawn(agent: &str, id: &str) {
513 emit_event(
514 "agent.spawn",
515 serde_json::json!({
516 "agent": agent,
517 "id": id,
518 }),
519 );
520}
521
522pub fn agent_emit(agent: &str, id: &str, value_type: &str) {
524 emit_event(
525 "agent.emit",
526 serde_json::json!({
527 "agent": agent,
528 "id": id,
529 "value_type": value_type,
530 }),
531 );
532}
533
534pub fn agent_stop(agent: &str, id: &str, duration_ms: u64) {
536 emit_event(
537 "agent.stop",
538 serde_json::json!({
539 "agent": agent,
540 "id": id,
541 "duration_ms": duration_ms,
542 }),
543 );
544}
545
546pub fn agent_error(agent: &str, id: &str, error_kind: &str, message: &str) {
548 emit_event(
549 "agent.error",
550 serde_json::json!({
551 "agent": agent,
552 "id": id,
553 "error": {
554 "kind": error_kind,
555 "message": message,
556 },
557 }),
558 );
559}
560
561pub fn infer_start(agent: &str, id: &str, model: &str, prompt_len: usize) {
563 emit_event(
564 "infer.start",
565 serde_json::json!({
566 "agent": agent,
567 "id": id,
568 "model": model,
569 "prompt_len": prompt_len,
570 }),
571 );
572}
573
574pub fn infer_complete(agent: &str, id: &str, model: &str, response_len: usize, duration_ms: u64) {
576 emit_event(
577 "infer.complete",
578 serde_json::json!({
579 "agent": agent,
580 "id": id,
581 "model": model,
582 "response_len": response_len,
583 "duration_ms": duration_ms,
584 }),
585 );
586}
587
588pub fn infer_error(agent: &str, id: &str, error_kind: &str, message: &str) {
590 emit_event(
591 "infer.error",
592 serde_json::json!({
593 "agent": agent,
594 "id": id,
595 "error": {
596 "kind": error_kind,
597 "message": message,
598 },
599 }),
600 );
601}
602
603pub fn user(message: &str) {
605 emit_event(
606 "user",
607 serde_json::json!({
608 "message": message,
609 }),
610 );
611}
612
613pub fn span_start(name: &str) {
615 emit_event(
616 "span.start",
617 serde_json::json!({
618 "name": name,
619 }),
620 );
621}
622
623pub fn span_end(name: &str, duration_ms: u64) {
625 emit_event(
626 "span.end",
627 serde_json::json!({
628 "name": name,
629 "duration_ms": duration_ms,
630 }),
631 );
632}
633
634#[cfg(test)]
635mod tests {
636 use super::*;
637
638 #[test]
639 fn test_timestamp_ms() {
640 let ts = timestamp_ms();
641 assert!(ts > 1_577_836_800_000);
643 }
644
645 #[test]
646 fn test_timestamp_ns() {
647 let ts = timestamp_ns();
648 assert!(ts > 1_577_836_800_000_000_000);
650 }
651
652 #[test]
653 fn test_generate_trace_id() {
654 let id1 = generate_trace_id();
655 let id2 = generate_trace_id();
656 assert_eq!(id1.len(), 32);
657 assert_eq!(id2.len(), 32);
658 }
659
660 #[test]
661 fn test_generate_span_id() {
662 let id1 = generate_span_id();
663 let id2 = generate_span_id();
664 assert_eq!(id1.len(), 16);
665 assert_eq!(id2.len(), 16);
666 assert_ne!(id1, id2);
667 }
668
669 #[test]
670 fn test_tracing_config_default() {
671 let config = TracingConfig::default();
672 assert_eq!(config.backend, "ndjson");
673 assert!(config.otlp_endpoint.is_none());
674 assert_eq!(config.service_name, "sage-agent");
675 }
676}