1use serde::Serialize;
9use std::fs::OpenOptions;
10use std::io::Write;
11use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
12use std::sync::{Arc, Mutex, OnceLock};
13use std::time::{SystemTime, UNIX_EPOCH};
14
15static TRACER: OnceLock<Arc<Tracer>> = OnceLock::new();
17
18#[inline]
20pub fn is_enabled() -> bool {
21 TRACER
22 .get()
23 .map(|t| t.enabled.load(Ordering::Relaxed))
24 .unwrap_or(false)
25}
26
27#[derive(Debug, Clone)]
29pub struct TracingConfig {
30 pub backend: String,
32 pub otlp_endpoint: Option<String>,
34 pub service_name: String,
36}
37
38impl Default for TracingConfig {
39 fn default() -> Self {
40 Self {
41 backend: "ndjson".to_string(),
42 otlp_endpoint: None,
43 service_name: "sage-agent".to_string(),
44 }
45 }
46}
47
48pub fn init_with_config(config: TracingConfig) {
50 let tracer = match config.backend.as_str() {
51 "none" => Tracer::disabled(),
52 "otlp" => {
53 let endpoint = config
54 .otlp_endpoint
55 .unwrap_or_else(|| "http://localhost:4318/v1/traces".to_string());
56 Tracer::otlp(endpoint, config.service_name)
57 }
58 "ndjson" | _ => {
59 if let Ok(path) = std::env::var("SAGE_TRACE_FILE") {
61 match OpenOptions::new().create(true).append(true).open(&path) {
62 Ok(file) => Tracer::ndjson_file(file),
63 Err(e) => {
64 eprintln!("Warning: Could not open trace file {}: {}", path, e);
65 Tracer::ndjson_stderr()
66 }
67 }
68 } else if std::env::var("SAGE_TRACE").is_ok() {
69 Tracer::ndjson_stderr()
70 } else {
71 Tracer::disabled()
72 }
73 }
74 };
75
76 let _ = TRACER.set(Arc::new(tracer));
77}
78
79pub fn init() {
81 init_with_config(TracingConfig::default());
82}
83
84fn timestamp_ms() -> u64 {
86 SystemTime::now()
87 .duration_since(UNIX_EPOCH)
88 .map(|d| d.as_millis() as u64)
89 .unwrap_or(0)
90}
91
92fn timestamp_ns() -> u64 {
94 SystemTime::now()
95 .duration_since(UNIX_EPOCH)
96 .map(|d| d.as_nanos() as u64)
97 .unwrap_or(0)
98}
99
100fn generate_trace_id() -> String {
102 use std::time::Instant;
103 let now = Instant::now();
104 let seed = now.elapsed().as_nanos() as u64;
105 format!("{:032x}", seed ^ timestamp_ns())
106}
107
108fn generate_span_id() -> String {
110 static COUNTER: AtomicU64 = AtomicU64::new(1);
111 let count = COUNTER.fetch_add(1, Ordering::SeqCst);
112 format!("{:016x}", count ^ (timestamp_ns() & 0xFFFF_FFFF))
113}
114
115struct Tracer {
117 enabled: AtomicBool,
118 backend: Mutex<TracerBackend>,
119 service_name: String,
120 trace_id: String,
121}
122
123enum TracerBackend {
124 Disabled,
125 Ndjson(NdjsonBackend),
126 Otlp(OtlpBackend),
127}
128
129impl Tracer {
130 fn disabled() -> Self {
131 Self {
132 enabled: AtomicBool::new(false),
133 backend: Mutex::new(TracerBackend::Disabled),
134 service_name: "sage-agent".to_string(),
135 trace_id: generate_trace_id(),
136 }
137 }
138
139 fn ndjson_stderr() -> Self {
140 Self {
141 enabled: AtomicBool::new(true),
142 backend: Mutex::new(TracerBackend::Ndjson(NdjsonBackend::Stderr)),
143 service_name: "sage-agent".to_string(),
144 trace_id: generate_trace_id(),
145 }
146 }
147
148 fn ndjson_file(file: std::fs::File) -> Self {
149 Self {
150 enabled: AtomicBool::new(true),
151 backend: Mutex::new(TracerBackend::Ndjson(NdjsonBackend::File(file))),
152 service_name: "sage-agent".to_string(),
153 trace_id: generate_trace_id(),
154 }
155 }
156
157 fn otlp(endpoint: String, service_name: String) -> Self {
158 Self {
159 enabled: AtomicBool::new(true),
160 backend: Mutex::new(TracerBackend::Otlp(OtlpBackend::new(endpoint))),
161 service_name,
162 trace_id: generate_trace_id(),
163 }
164 }
165
166 fn emit(&self, kind: &str, data: serde_json::Value) {
167 if !self.enabled.load(Ordering::Relaxed) {
168 return;
169 }
170
171 let mut backend = self.backend.lock().unwrap();
172 match &mut *backend {
173 TracerBackend::Disabled => {}
174 TracerBackend::Ndjson(ndjson) => {
175 ndjson.emit(kind, data);
176 }
177 TracerBackend::Otlp(otlp) => {
178 otlp.emit(kind, data, &self.trace_id, &self.service_name);
179 }
180 }
181 }
182}
183
184enum NdjsonBackend {
186 Stderr,
187 File(std::fs::File),
188}
189
190impl NdjsonBackend {
191 fn emit(&mut self, kind: &str, data: serde_json::Value) {
192 #[derive(Serialize)]
193 struct TraceEvent<'a> {
194 t: u64,
195 kind: &'a str,
196 #[serde(flatten)]
197 data: serde_json::Value,
198 }
199
200 let event = TraceEvent {
201 t: timestamp_ms(),
202 kind,
203 data,
204 };
205
206 if let Ok(json) = serde_json::to_string(&event) {
207 let line = format!("{}\n", json);
208 match self {
209 NdjsonBackend::Stderr => {
210 let _ = std::io::stderr().write_all(line.as_bytes());
211 }
212 NdjsonBackend::File(f) => {
213 let _ = f.write_all(line.as_bytes());
214 }
215 }
216 }
217 }
218}
219
220struct OtlpBackend {
222 endpoint: String,
223 pending_spans: Vec<OtlpSpan>,
224}
225
226impl OtlpBackend {
227 fn new(endpoint: String) -> Self {
228 Self {
229 endpoint,
230 pending_spans: Vec::new(),
231 }
232 }
233
234 fn emit(&mut self, kind: &str, data: serde_json::Value, trace_id: &str, service_name: &str) {
235 let span_id = generate_span_id();
236 let now_ns = timestamp_ns();
237
238 let span = OtlpSpan {
240 trace_id: trace_id.to_string(),
241 span_id,
242 name: kind.to_string(),
243 kind: 1, start_time_unix_nano: now_ns,
245 end_time_unix_nano: now_ns,
246 attributes: data_to_attributes(&data),
247 status: OtlpStatus { code: 1 }, };
249
250 self.pending_spans.push(span);
251
252 if self.pending_spans.len() >= 10 || kind.contains("stop") || kind.contains("error") {
254 self.flush(service_name);
255 }
256 }
257
258 fn flush(&mut self, service_name: &str) {
259 if self.pending_spans.is_empty() {
260 return;
261 }
262
263 let spans = std::mem::take(&mut self.pending_spans);
264 let payload = OtlpExportRequest {
265 resource_spans: vec![OtlpResourceSpans {
266 resource: OtlpResource {
267 attributes: vec![OtlpAttribute {
268 key: "service.name".to_string(),
269 value: OtlpValue {
270 string_value: Some(service_name.to_string()),
271 },
272 }],
273 },
274 scope_spans: vec![OtlpScopeSpans {
275 scope: OtlpScope {
276 name: "sage".to_string(),
277 version: env!("CARGO_PKG_VERSION").to_string(),
278 },
279 spans,
280 }],
281 }],
282 };
283
284 let endpoint = self.endpoint.clone();
286 if let Ok(json) = serde_json::to_string(&payload) {
287 std::thread::spawn(move || {
288 let _ = ureq_post(&endpoint, &json);
289 });
290 }
291 }
292}
293
294fn ureq_post(url: &str, body: &str) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
296 use std::io::Read;
297 use std::net::TcpStream;
298
299 let url = url.trim_start_matches("http://");
301 let (host_port, path) = url.split_once('/').unwrap_or((url, "v1/traces"));
302 let path = format!("/{}", path);
303
304 let mut stream = TcpStream::connect(host_port)?;
306 stream.set_write_timeout(Some(std::time::Duration::from_secs(5)))?;
307 stream.set_read_timeout(Some(std::time::Duration::from_secs(5)))?;
308
309 let request = format!(
310 "POST {} HTTP/1.1\r\n\
311 Host: {}\r\n\
312 Content-Type: application/json\r\n\
313 Content-Length: {}\r\n\
314 Connection: close\r\n\
315 \r\n\
316 {}",
317 path,
318 host_port,
319 body.len(),
320 body
321 );
322
323 stream.write_all(request.as_bytes())?;
324
325 let mut response = Vec::new();
327 let _ = stream.read_to_end(&mut response);
328
329 Ok(())
330}
331
332fn data_to_attributes(data: &serde_json::Value) -> Vec<OtlpAttribute> {
334 let mut attrs = Vec::new();
335
336 if let serde_json::Value::Object(map) = data {
337 for (key, value) in map {
338 let attr = match value {
339 serde_json::Value::String(s) => OtlpAttribute {
340 key: key.clone(),
341 value: OtlpValue {
342 string_value: Some(s.clone()),
343 },
344 },
345 serde_json::Value::Number(n) => OtlpAttribute {
346 key: key.clone(),
347 value: OtlpValue {
348 string_value: Some(n.to_string()),
349 },
350 },
351 serde_json::Value::Bool(b) => OtlpAttribute {
352 key: key.clone(),
353 value: OtlpValue {
354 string_value: Some(b.to_string()),
355 },
356 },
357 _ => OtlpAttribute {
358 key: key.clone(),
359 value: OtlpValue {
360 string_value: Some(value.to_string()),
361 },
362 },
363 };
364 attrs.push(attr);
365 }
366 }
367
368 attrs
369}
370
371#[derive(Serialize)]
374struct OtlpExportRequest {
375 #[serde(rename = "resourceSpans")]
376 resource_spans: Vec<OtlpResourceSpans>,
377}
378
379#[derive(Serialize)]
380struct OtlpResourceSpans {
381 resource: OtlpResource,
382 #[serde(rename = "scopeSpans")]
383 scope_spans: Vec<OtlpScopeSpans>,
384}
385
386#[derive(Serialize)]
387struct OtlpResource {
388 attributes: Vec<OtlpAttribute>,
389}
390
391#[derive(Serialize)]
392struct OtlpScopeSpans {
393 scope: OtlpScope,
394 spans: Vec<OtlpSpan>,
395}
396
397#[derive(Serialize)]
398struct OtlpScope {
399 name: String,
400 version: String,
401}
402
403#[derive(Serialize)]
404struct OtlpSpan {
405 #[serde(rename = "traceId")]
406 trace_id: String,
407 #[serde(rename = "spanId")]
408 span_id: String,
409 name: String,
410 kind: i32,
411 #[serde(rename = "startTimeUnixNano")]
412 start_time_unix_nano: u64,
413 #[serde(rename = "endTimeUnixNano")]
414 end_time_unix_nano: u64,
415 attributes: Vec<OtlpAttribute>,
416 status: OtlpStatus,
417}
418
419#[derive(Serialize)]
420struct OtlpAttribute {
421 key: String,
422 value: OtlpValue,
423}
424
425#[derive(Serialize)]
426struct OtlpValue {
427 #[serde(rename = "stringValue", skip_serializing_if = "Option::is_none")]
428 string_value: Option<String>,
429}
430
431#[derive(Serialize)]
432struct OtlpStatus {
433 code: i32,
434}
435
436fn emit_event(kind: &str, data: serde_json::Value) {
439 if let Some(tracer) = TRACER.get() {
440 tracer.emit(kind, data);
441 }
442}
443
444pub fn agent_spawn(agent: &str, id: &str) {
446 emit_event(
447 "agent.spawn",
448 serde_json::json!({
449 "agent": agent,
450 "id": id,
451 }),
452 );
453}
454
455pub fn agent_emit(agent: &str, id: &str, value_type: &str) {
457 emit_event(
458 "agent.emit",
459 serde_json::json!({
460 "agent": agent,
461 "id": id,
462 "value_type": value_type,
463 }),
464 );
465}
466
467pub fn agent_stop(agent: &str, id: &str, duration_ms: u64) {
469 emit_event(
470 "agent.stop",
471 serde_json::json!({
472 "agent": agent,
473 "id": id,
474 "duration_ms": duration_ms,
475 }),
476 );
477}
478
479pub fn agent_error(agent: &str, id: &str, error_kind: &str, message: &str) {
481 emit_event(
482 "agent.error",
483 serde_json::json!({
484 "agent": agent,
485 "id": id,
486 "error": {
487 "kind": error_kind,
488 "message": message,
489 },
490 }),
491 );
492}
493
494pub fn infer_start(agent: &str, id: &str, model: &str, prompt_len: usize) {
496 emit_event(
497 "infer.start",
498 serde_json::json!({
499 "agent": agent,
500 "id": id,
501 "model": model,
502 "prompt_len": prompt_len,
503 }),
504 );
505}
506
507pub fn infer_complete(agent: &str, id: &str, model: &str, response_len: usize, duration_ms: u64) {
509 emit_event(
510 "infer.complete",
511 serde_json::json!({
512 "agent": agent,
513 "id": id,
514 "model": model,
515 "response_len": response_len,
516 "duration_ms": duration_ms,
517 }),
518 );
519}
520
521pub fn infer_error(agent: &str, id: &str, error_kind: &str, message: &str) {
523 emit_event(
524 "infer.error",
525 serde_json::json!({
526 "agent": agent,
527 "id": id,
528 "error": {
529 "kind": error_kind,
530 "message": message,
531 },
532 }),
533 );
534}
535
536pub fn user(message: &str) {
538 emit_event(
539 "user",
540 serde_json::json!({
541 "message": message,
542 }),
543 );
544}
545
546pub fn span_start(name: &str) {
548 emit_event(
549 "span.start",
550 serde_json::json!({
551 "name": name,
552 }),
553 );
554}
555
556pub fn span_end(name: &str, duration_ms: u64) {
558 emit_event(
559 "span.end",
560 serde_json::json!({
561 "name": name,
562 "duration_ms": duration_ms,
563 }),
564 );
565}
566
567#[cfg(test)]
568mod tests {
569 use super::*;
570
571 #[test]
572 fn test_timestamp_ms() {
573 let ts = timestamp_ms();
574 assert!(ts > 1_577_836_800_000);
576 }
577
578 #[test]
579 fn test_timestamp_ns() {
580 let ts = timestamp_ns();
581 assert!(ts > 1_577_836_800_000_000_000);
583 }
584
585 #[test]
586 fn test_generate_trace_id() {
587 let id1 = generate_trace_id();
588 let id2 = generate_trace_id();
589 assert_eq!(id1.len(), 32);
590 assert_eq!(id2.len(), 32);
591 }
592
593 #[test]
594 fn test_generate_span_id() {
595 let id1 = generate_span_id();
596 let id2 = generate_span_id();
597 assert_eq!(id1.len(), 16);
598 assert_eq!(id2.len(), 16);
599 assert_ne!(id1, id2);
600 }
601
602 #[test]
603 fn test_data_to_attributes() {
604 let data = serde_json::json!({
605 "agent": "TestAgent",
606 "id": "123",
607 "count": 42,
608 "active": true,
609 });
610 let attrs = data_to_attributes(&data);
611 assert_eq!(attrs.len(), 4);
612 }
613
614 #[test]
615 fn test_tracing_config_default() {
616 let config = TracingConfig::default();
617 assert_eq!(config.backend, "ndjson");
618 assert!(config.otlp_endpoint.is_none());
619 assert_eq!(config.service_name, "sage-agent");
620 }
621}