1use std::collections::VecDeque;
14use std::time::{SystemTime, UNIX_EPOCH};
15
16use serde::{Deserialize, Serialize};
17
18#[derive(Debug, Clone, Serialize)]
22pub struct TraceEntry {
23 pub id: u64,
25 pub timestamp: u64,
27 pub flow_name: String,
29 pub status: TraceStatus,
31 pub steps_executed: usize,
33 pub latency_ms: u64,
35 pub tokens_input: u64,
37 pub tokens_output: u64,
39 pub anchor_checks: usize,
41 pub anchor_breaches: usize,
43 pub errors: usize,
45 pub retries: usize,
47 pub source_file: String,
49 pub backend: String,
51 pub client_key: String,
53 pub events: Vec<TraceEvent>,
55 #[serde(skip_serializing_if = "Option::is_none")]
57 pub replay_of: Option<u64>,
58 #[serde(skip_serializing_if = "Vec::is_empty")]
60 pub annotations: Vec<TraceAnnotation>,
61 #[serde(skip_serializing_if = "Option::is_none")]
63 pub correlation_id: Option<String>,
64}
65
66#[derive(Debug, Clone, Serialize, Deserialize)]
68pub struct TraceAnnotation {
69 pub author: String,
71 pub text: String,
73 pub tags: Vec<String>,
75 pub timestamp: u64,
77}
78
79#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
81#[serde(rename_all = "lowercase")]
82pub enum TraceStatus {
83 Success,
84 Failed,
85 Partial,
86 Timeout,
87}
88
89impl TraceStatus {
90 pub fn as_str(&self) -> &'static str {
91 match self {
92 TraceStatus::Success => "success",
93 TraceStatus::Failed => "failed",
94 TraceStatus::Partial => "partial",
95 TraceStatus::Timeout => "timeout",
96 }
97 }
98}
99
100#[derive(Debug, Clone, Serialize)]
102pub struct TraceEvent {
103 pub event_type: String,
105 pub offset_ms: u64,
107 pub step_name: String,
109 pub detail: String,
111}
112
113#[derive(Debug, Clone, Serialize, Deserialize)]
117pub struct TraceStoreConfig {
118 pub capacity: usize,
120 pub enabled: bool,
122 pub max_events_per_trace: usize,
124 pub max_age_secs: u64,
126}
127
128impl Default for TraceStoreConfig {
129 fn default() -> Self {
130 TraceStoreConfig {
131 capacity: 500,
132 enabled: true,
133 max_events_per_trace: 200,
134 max_age_secs: 0,
135 }
136 }
137}
138
139impl TraceStoreConfig {
140 pub fn disabled() -> Self {
141 TraceStoreConfig {
142 capacity: 0,
143 enabled: false,
144 max_events_per_trace: 0,
145 max_age_secs: 0,
146 }
147 }
148}
149
150pub struct TraceStore {
154 config: TraceStoreConfig,
155 entries: VecDeque<TraceEntry>,
156 next_id: u64,
157 total_recorded: u64,
158}
159
160impl TraceStore {
161 pub fn new(config: TraceStoreConfig) -> Self {
163 TraceStore {
164 entries: VecDeque::with_capacity(config.capacity.min(512)),
165 config,
166 next_id: 1,
167 total_recorded: 0,
168 }
169 }
170
171 pub fn record(&mut self, mut trace: TraceEntry) -> u64 {
173 if !self.config.enabled {
174 return 0;
175 }
176
177 let id = self.next_id;
178 self.next_id += 1;
179 self.total_recorded += 1;
180
181 trace.id = id;
182 trace.timestamp = wall_clock_secs();
183
184 if trace.events.len() > self.config.max_events_per_trace {
186 trace.events.truncate(self.config.max_events_per_trace);
187 }
188
189 if self.entries.len() >= self.config.capacity && self.config.capacity > 0 {
191 self.entries.pop_front();
192 }
193 if self.config.capacity > 0 {
194 self.entries.push_back(trace);
195 }
196
197 id
198 }
199
200 pub fn reserve_id(&mut self) -> u64 {
211 if !self.config.enabled {
212 return 0;
213 }
214 let id = self.next_id;
215 self.next_id += 1;
216 id
217 }
218
219 pub fn record_with_id(&mut self, mut trace: TraceEntry, id: u64) {
226 if !self.config.enabled {
227 return;
228 }
229 self.total_recorded += 1;
230 trace.id = id;
231 trace.timestamp = wall_clock_secs();
232
233 if trace.events.len() > self.config.max_events_per_trace {
234 trace.events.truncate(self.config.max_events_per_trace);
235 }
236
237 if self.entries.len() >= self.config.capacity && self.config.capacity > 0 {
238 self.entries.pop_front();
239 }
240 if self.config.capacity > 0 {
241 self.entries.push_back(trace);
242 }
243 }
244
245 pub fn get(&self, id: u64) -> Option<&TraceEntry> {
247 self.entries.iter().find(|e| e.id == id)
248 }
249
250 pub fn recent(&self, limit: usize, filter: Option<&TraceFilter>) -> Vec<&TraceEntry> {
252 self.entries
253 .iter()
254 .rev()
255 .filter(|e| match filter {
256 Some(f) => f.matches(e),
257 None => true,
258 })
259 .take(limit)
260 .collect()
261 }
262
263 pub fn len(&self) -> usize {
265 self.entries.len()
266 }
267
268 pub fn is_empty(&self) -> bool {
270 self.entries.is_empty()
271 }
272
273 pub fn total_recorded(&self) -> u64 {
275 self.total_recorded
276 }
277
278 pub fn get_mut(&mut self, id: u64) -> Option<&mut TraceEntry> {
280 self.entries.iter_mut().find(|e| e.id == id)
281 }
282
283 pub fn annotate(&mut self, id: u64, annotation: TraceAnnotation) -> bool {
285 match self.get_mut(id) {
286 Some(entry) => {
287 entry.annotations.push(annotation);
288 true
289 }
290 None => false,
291 }
292 }
293
294 pub fn bulk_delete(&mut self, ids: &[u64]) -> usize {
296 let before = self.entries.len();
297 self.entries.retain(|e| !ids.contains(&e.id));
298 before - self.entries.len()
299 }
300
301 pub fn bulk_annotate(&mut self, ids: &[u64], annotation: TraceAnnotation) -> usize {
303 let mut count = 0;
304 for entry in self.entries.iter_mut() {
305 if ids.contains(&entry.id) {
306 entry.annotations.push(annotation.clone());
307 count += 1;
308 }
309 }
310 count
311 }
312
313 pub fn search(&self, query: &str, limit: usize) -> Vec<&TraceEntry> {
319 let q = query.to_lowercase();
320 self.entries
321 .iter()
322 .rev()
323 .filter(|e| {
324 e.flow_name.to_lowercase().contains(&q)
325 || e.source_file.to_lowercase().contains(&q)
326 || e.backend.to_lowercase().contains(&q)
327 || e.client_key.to_lowercase().contains(&q)
328 || e.events.iter().any(|ev| {
329 ev.step_name.to_lowercase().contains(&q)
330 || ev.detail.to_lowercase().contains(&q)
331 })
332 || e.annotations.iter().any(|a| {
333 a.text.to_lowercase().contains(&q)
334 || a.tags.iter().any(|t| t.to_lowercase().contains(&q))
335 })
336 })
337 .take(limit)
338 .collect()
339 }
340
341 pub fn evict_expired(&mut self) -> usize {
344 if self.config.max_age_secs == 0 {
345 return 0;
346 }
347 let now = wall_clock_secs();
348 let cutoff = now.saturating_sub(self.config.max_age_secs);
349 let before = self.entries.len();
350 self.entries.retain(|e| e.timestamp >= cutoff);
351 before - self.entries.len()
352 }
353
354 pub fn set_max_age_secs(&mut self, max_age_secs: u64) -> u64 {
356 let prev = self.config.max_age_secs;
357 self.config.max_age_secs = max_age_secs;
358 prev
359 }
360
361 pub fn config(&self) -> &TraceStoreConfig {
363 &self.config
364 }
365
366 pub fn stats(&self) -> TraceStoreStats {
368 let mut total_latency_ms: u64 = 0;
369 let mut max_latency_ms: u64 = 0;
370 let mut total_tokens_input: u64 = 0;
371 let mut total_tokens_output: u64 = 0;
372 let mut total_steps: usize = 0;
373 let mut total_anchor_checks: usize = 0;
374 let mut total_anchor_breaches: usize = 0;
375 let mut total_errors: usize = 0;
376 let mut total_retries: usize = 0;
377 let mut status_counts = std::collections::HashMap::new();
378 let mut flow_counts = std::collections::HashMap::new();
379
380 for e in &self.entries {
381 total_latency_ms += e.latency_ms;
382 if e.latency_ms > max_latency_ms {
383 max_latency_ms = e.latency_ms;
384 }
385 total_tokens_input += e.tokens_input;
386 total_tokens_output += e.tokens_output;
387 total_steps += e.steps_executed;
388 total_anchor_checks += e.anchor_checks;
389 total_anchor_breaches += e.anchor_breaches;
390 total_errors += e.errors;
391 total_retries += e.retries;
392 *status_counts.entry(e.status.as_str().to_string()).or_insert(0u64) += 1;
393 *flow_counts.entry(e.flow_name.clone()).or_insert(0u64) += 1;
394 }
395
396 let count = self.entries.len() as u64;
397 let avg_latency_ms = if count > 0 { total_latency_ms / count } else { 0 };
398
399 let mut top_flows: Vec<(String, u64)> = flow_counts.into_iter().collect();
400 top_flows.sort_by(|a, b| b.1.cmp(&a.1));
401 top_flows.truncate(10);
402
403 let mut status_breakdown: Vec<(String, u64)> = status_counts.into_iter().collect();
404 status_breakdown.sort_by_key(|(k, _)| k.clone());
405
406 TraceStoreStats {
407 total_recorded: self.total_recorded,
408 buffered: self.entries.len(),
409 avg_latency_ms,
410 max_latency_ms,
411 total_tokens_input,
412 total_tokens_output,
413 total_steps,
414 total_anchor_checks,
415 total_anchor_breaches,
416 total_errors,
417 total_retries,
418 top_flows,
419 status_breakdown,
420 }
421 }
422
423 pub fn aggregate(&self, window_secs: u64) -> TraceAggregate {
427 let now = wall_clock_secs();
428 let cutoff = if window_secs > 0 { now.saturating_sub(window_secs) } else { 0 };
429
430 let window_entries: Vec<&TraceEntry> = self.entries
431 .iter()
432 .filter(|e| e.timestamp >= cutoff)
433 .collect();
434
435 let count = window_entries.len();
436 if count == 0 {
437 return TraceAggregate {
438 window_secs,
439 count: 0,
440 error_rate: 0.0,
441 avg_latency_ms: 0,
442 p50_latency_ms: 0,
443 p95_latency_ms: 0,
444 p99_latency_ms: 0,
445 min_latency_ms: 0,
446 max_latency_ms: 0,
447 total_tokens: 0,
448 avg_steps: 0.0,
449 flows: Vec::new(),
450 };
451 }
452
453 let mut latencies: Vec<u64> = window_entries.iter().map(|e| e.latency_ms).collect();
454 latencies.sort();
455
456 let error_count = window_entries.iter().filter(|e| e.errors > 0).count();
457 let total_latency: u64 = latencies.iter().sum();
458 let total_tokens: u64 = window_entries.iter().map(|e| e.tokens_input + e.tokens_output).sum();
459 let total_steps: f64 = window_entries.iter().map(|e| e.steps_executed as f64).sum();
460
461 let mut flow_map: std::collections::HashMap<String, (u64, u64, usize)> = std::collections::HashMap::new();
463 for e in &window_entries {
464 let entry = flow_map.entry(e.flow_name.clone()).or_insert((0, 0, 0));
465 entry.0 += 1; entry.1 += e.latency_ms; if e.errors > 0 { entry.2 += 1; } }
469 let mut flows: Vec<FlowAggregate> = flow_map.into_iter().map(|(name, (cnt, lat, errs))| {
470 FlowAggregate {
471 flow_name: name,
472 count: cnt,
473 avg_latency_ms: if cnt > 0 { lat / cnt } else { 0 },
474 errors: errs as u64,
475 }
476 }).collect();
477 flows.sort_by(|a, b| b.count.cmp(&a.count));
478
479 TraceAggregate {
480 window_secs,
481 count: count as u64,
482 error_rate: error_count as f64 / count as f64,
483 avg_latency_ms: total_latency / count as u64,
484 p50_latency_ms: percentile(&latencies, 50),
485 p95_latency_ms: percentile(&latencies, 95),
486 p99_latency_ms: percentile(&latencies, 99),
487 min_latency_ms: latencies[0],
488 max_latency_ms: latencies[latencies.len() - 1],
489 total_tokens,
490 avg_steps: total_steps / count as f64,
491 flows,
492 }
493 }
494
495 pub fn set_correlation(&mut self, id: u64, correlation_id: &str) -> bool {
497 match self.get_mut(id) {
498 Some(entry) => {
499 entry.correlation_id = Some(correlation_id.to_string());
500 true
501 }
502 None => false,
503 }
504 }
505
506 pub fn by_correlation(&self, correlation_id: &str) -> Vec<&TraceEntry> {
508 self.entries.iter()
509 .filter(|e| e.correlation_id.as_deref() == Some(correlation_id))
510 .collect()
511 }
512
513 pub fn clear(&mut self) {
515 self.entries.clear();
516 }
517}
518
519fn percentile(sorted: &[u64], pct: usize) -> u64 {
521 if sorted.is_empty() { return 0; }
522 let idx = (pct * sorted.len() + 99) / 100;
523 sorted[idx.min(sorted.len()) - 1]
524}
525
526#[derive(Debug, Clone, Serialize)]
528pub struct TraceAggregate {
529 pub window_secs: u64,
530 pub count: u64,
531 pub error_rate: f64,
532 pub avg_latency_ms: u64,
533 pub p50_latency_ms: u64,
534 pub p95_latency_ms: u64,
535 pub p99_latency_ms: u64,
536 pub min_latency_ms: u64,
537 pub max_latency_ms: u64,
538 pub total_tokens: u64,
539 pub avg_steps: f64,
540 pub flows: Vec<FlowAggregate>,
541}
542
543#[derive(Debug, Clone, Serialize)]
545pub struct FlowAggregate {
546 pub flow_name: String,
547 pub count: u64,
548 pub avg_latency_ms: u64,
549 pub errors: u64,
550}
551
552#[derive(Debug, Clone, Default, Deserialize)]
556pub struct TraceFilter {
557 pub flow_name: Option<String>,
559 pub status: Option<String>,
561 pub client_key: Option<String>,
563 pub min_latency_ms: Option<u64>,
565 pub has_errors: Option<bool>,
567 pub tag: Option<String>,
569}
570
571impl TraceFilter {
572 pub fn matches(&self, entry: &TraceEntry) -> bool {
573 if let Some(ref name) = self.flow_name {
574 if entry.flow_name != *name {
575 return false;
576 }
577 }
578 if let Some(ref status) = self.status {
579 if entry.status.as_str() != status.as_str() {
580 return false;
581 }
582 }
583 if let Some(ref key) = self.client_key {
584 if entry.client_key != *key {
585 return false;
586 }
587 }
588 if let Some(min_lat) = self.min_latency_ms {
589 if entry.latency_ms < min_lat {
590 return false;
591 }
592 }
593 if let Some(has_err) = self.has_errors {
594 if has_err && entry.errors == 0 {
595 return false;
596 }
597 if !has_err && entry.errors > 0 {
598 return false;
599 }
600 }
601 if let Some(ref tag) = self.tag {
602 let has_tag = entry.annotations.iter().any(|a| a.tags.contains(tag));
603 if !has_tag {
604 return false;
605 }
606 }
607 true
608 }
609}
610
611#[derive(Debug, Clone, Serialize)]
615pub struct TraceStoreStats {
616 pub total_recorded: u64,
617 pub buffered: usize,
618 pub avg_latency_ms: u64,
619 pub max_latency_ms: u64,
620 pub total_tokens_input: u64,
621 pub total_tokens_output: u64,
622 pub total_steps: usize,
623 pub total_anchor_checks: usize,
624 pub total_anchor_breaches: usize,
625 pub total_errors: usize,
626 pub total_retries: usize,
627 pub top_flows: Vec<(String, u64)>,
628 pub status_breakdown: Vec<(String, u64)>,
629}
630
631#[derive(Debug, Clone, Copy, PartialEq, Eq)]
635pub enum ExportFormat {
636 JsonLines,
638 Csv,
640 Prometheus,
642}
643
644impl ExportFormat {
645 pub fn from_str(s: &str) -> Self {
647 match s.to_lowercase().as_str() {
648 "csv" => ExportFormat::Csv,
649 "prometheus" | "prom" => ExportFormat::Prometheus,
650 _ => ExportFormat::JsonLines,
651 }
652 }
653
654 pub fn content_type(&self) -> &'static str {
656 match self {
657 ExportFormat::JsonLines => "application/x-ndjson",
658 ExportFormat::Csv => "text/csv",
659 ExportFormat::Prometheus => "text/plain; version=0.0.4; charset=utf-8",
660 }
661 }
662}
663
664#[derive(Debug, Clone, Serialize)]
666pub struct TraceSpan {
667 pub trace_id: String,
669 pub name: String,
671 pub start_time_unix_secs: u64,
673 pub duration_ms: u64,
675 pub status: String,
677 pub resource: TraceSpanResource,
679 pub attributes: TraceSpanAttributes,
681 pub events: Vec<TraceSpanEvent>,
683}
684
685#[derive(Debug, Clone, Serialize)]
687pub struct TraceSpanResource {
688 pub service_name: String,
689 pub service_version: String,
690 pub source_file: String,
691 pub backend: String,
692 pub client_key: String,
693}
694
695#[derive(Debug, Clone, Serialize)]
697pub struct TraceSpanAttributes {
698 pub steps_executed: usize,
699 pub tokens_input: u64,
700 pub tokens_output: u64,
701 pub tokens_total: u64,
702 pub anchor_checks: usize,
703 pub anchor_breaches: usize,
704 pub errors: usize,
705 pub retries: usize,
706}
707
708#[derive(Debug, Clone, Serialize)]
710pub struct TraceSpanEvent {
711 pub name: String,
712 pub offset_ms: u64,
713 pub attributes: std::collections::HashMap<String, String>,
714}
715
716pub fn entry_to_span(entry: &TraceEntry) -> TraceSpan {
718 let events = entry
719 .events
720 .iter()
721 .map(|e| {
722 let mut attrs = std::collections::HashMap::new();
723 if !e.step_name.is_empty() {
724 attrs.insert("step".to_string(), e.step_name.clone());
725 }
726 if !e.detail.is_empty() {
727 attrs.insert("detail".to_string(), e.detail.clone());
728 }
729 TraceSpanEvent {
730 name: e.event_type.clone(),
731 offset_ms: e.offset_ms,
732 attributes: attrs,
733 }
734 })
735 .collect();
736
737 TraceSpan {
738 trace_id: format!("axt-{}", entry.id),
739 name: entry.flow_name.clone(),
740 start_time_unix_secs: entry.timestamp,
741 duration_ms: entry.latency_ms,
742 status: entry.status.as_str().to_string(),
743 resource: TraceSpanResource {
744 service_name: "axon-server".to_string(),
745 service_version: crate::runner::AXON_VERSION.to_string(),
746 source_file: entry.source_file.clone(),
747 backend: entry.backend.clone(),
748 client_key: entry.client_key.clone(),
749 },
750 attributes: TraceSpanAttributes {
751 steps_executed: entry.steps_executed,
752 tokens_input: entry.tokens_input,
753 tokens_output: entry.tokens_output,
754 tokens_total: entry.tokens_input + entry.tokens_output,
755 anchor_checks: entry.anchor_checks,
756 anchor_breaches: entry.anchor_breaches,
757 errors: entry.errors,
758 retries: entry.retries,
759 },
760 events,
761 }
762}
763
764pub fn export_jsonl(entries: &[&TraceEntry]) -> String {
766 let mut out = String::new();
767 for entry in entries {
768 let span = entry_to_span(entry);
769 if let Ok(line) = serde_json::to_string(&span) {
770 out.push_str(&line);
771 out.push('\n');
772 }
773 }
774 out
775}
776
777pub fn export_csv(entries: &[&TraceEntry]) -> String {
779 let mut out = String::new();
780 out.push_str("trace_id,timestamp,flow_name,status,steps_executed,latency_ms,tokens_input,tokens_output,anchor_checks,anchor_breaches,errors,retries,source_file,backend,client_key,event_count\n");
781 for entry in entries {
782 out.push_str(&format!(
783 "axt-{},{},{},{},{},{},{},{},{},{},{},{},{},{},{},{}\n",
784 entry.id,
785 entry.timestamp,
786 entry.flow_name,
787 entry.status.as_str(),
788 entry.steps_executed,
789 entry.latency_ms,
790 entry.tokens_input,
791 entry.tokens_output,
792 entry.anchor_checks,
793 entry.anchor_breaches,
794 entry.errors,
795 entry.retries,
796 entry.source_file,
797 entry.backend,
798 entry.client_key,
799 entry.events.len(),
800 ));
801 }
802 out
803}
804
805pub fn export_prometheus(entries: &[&TraceEntry]) -> String {
807 let count = entries.len() as u64;
808 let mut total_latency: u64 = 0;
809 let mut max_latency: u64 = 0;
810 let mut total_tokens_in: u64 = 0;
811 let mut total_tokens_out: u64 = 0;
812 let mut total_steps: u64 = 0;
813 let mut total_errors: u64 = 0;
814 let mut total_retries: u64 = 0;
815 let mut total_anchor_checks: u64 = 0;
816 let mut total_anchor_breaches: u64 = 0;
817 let mut status_counts: std::collections::HashMap<String, u64> = std::collections::HashMap::new();
818
819 for e in entries {
820 total_latency += e.latency_ms;
821 if e.latency_ms > max_latency {
822 max_latency = e.latency_ms;
823 }
824 total_tokens_in += e.tokens_input;
825 total_tokens_out += e.tokens_output;
826 total_steps += e.steps_executed as u64;
827 total_errors += e.errors as u64;
828 total_retries += e.retries as u64;
829 total_anchor_checks += e.anchor_checks as u64;
830 total_anchor_breaches += e.anchor_breaches as u64;
831 *status_counts.entry(e.status.as_str().to_string()).or_insert(0) += 1;
832 }
833
834 let avg_latency = if count > 0 { total_latency / count } else { 0 };
835
836 let mut out = String::new();
837
838 out.push_str("# HELP axon_trace_export_count Number of traces in this export.\n");
839 out.push_str("# TYPE axon_trace_export_count gauge\n");
840 out.push_str(&format!("axon_trace_export_count {}\n\n", count));
841
842 out.push_str("# HELP axon_trace_export_latency_avg_ms Average latency across exported traces.\n");
843 out.push_str("# TYPE axon_trace_export_latency_avg_ms gauge\n");
844 out.push_str(&format!("axon_trace_export_latency_avg_ms {}\n\n", avg_latency));
845
846 out.push_str("# HELP axon_trace_export_latency_max_ms Maximum latency across exported traces.\n");
847 out.push_str("# TYPE axon_trace_export_latency_max_ms gauge\n");
848 out.push_str(&format!("axon_trace_export_latency_max_ms {}\n\n", max_latency));
849
850 out.push_str("# HELP axon_trace_export_tokens_total Total tokens in exported traces.\n");
851 out.push_str("# TYPE axon_trace_export_tokens_total counter\n");
852 out.push_str(&format!("axon_trace_export_tokens_total{{type=\"input\"}} {}\n", total_tokens_in));
853 out.push_str(&format!("axon_trace_export_tokens_total{{type=\"output\"}} {}\n\n", total_tokens_out));
854
855 out.push_str("# HELP axon_trace_export_steps_total Total steps executed in exported traces.\n");
856 out.push_str("# TYPE axon_trace_export_steps_total counter\n");
857 out.push_str(&format!("axon_trace_export_steps_total {}\n\n", total_steps));
858
859 out.push_str("# HELP axon_trace_export_errors_total Total errors in exported traces.\n");
860 out.push_str("# TYPE axon_trace_export_errors_total counter\n");
861 out.push_str(&format!("axon_trace_export_errors_total {}\n\n", total_errors));
862
863 out.push_str("# HELP axon_trace_export_retries_total Total retries in exported traces.\n");
864 out.push_str("# TYPE axon_trace_export_retries_total counter\n");
865 out.push_str(&format!("axon_trace_export_retries_total {}\n\n", total_retries));
866
867 out.push_str("# HELP axon_trace_export_anchor_checks_total Total anchor checks in exported traces.\n");
868 out.push_str("# TYPE axon_trace_export_anchor_checks_total counter\n");
869 out.push_str(&format!("axon_trace_export_anchor_checks_total {}\n\n", total_anchor_checks));
870
871 out.push_str("# HELP axon_trace_export_anchor_breaches_total Total anchor breaches in exported traces.\n");
872 out.push_str("# TYPE axon_trace_export_anchor_breaches_total counter\n");
873 out.push_str(&format!("axon_trace_export_anchor_breaches_total {}\n\n", total_anchor_breaches));
874
875 if !status_counts.is_empty() {
876 out.push_str("# HELP axon_trace_export_by_status Count of exported traces by status.\n");
877 out.push_str("# TYPE axon_trace_export_by_status gauge\n");
878 let mut sorted: Vec<_> = status_counts.into_iter().collect();
879 sorted.sort_by_key(|(k, _)| k.clone());
880 for (status, n) in sorted {
881 out.push_str(&format!("axon_trace_export_by_status{{status=\"{}\"}} {}\n", status, n));
882 }
883 out.push('\n');
884 }
885
886 out
887}
888
889fn wall_clock_secs() -> u64 {
892 SystemTime::now()
893 .duration_since(UNIX_EPOCH)
894 .unwrap_or_default()
895 .as_secs()
896}
897
898pub fn build_trace(
900 flow_name: &str,
901 source_file: &str,
902 backend: &str,
903 client_key: &str,
904 status: TraceStatus,
905 steps: usize,
906 latency_ms: u64,
907) -> TraceEntry {
908 TraceEntry {
909 id: 0, timestamp: 0, flow_name: flow_name.to_string(),
912 status,
913 steps_executed: steps,
914 latency_ms,
915 tokens_input: 0,
916 tokens_output: 0,
917 anchor_checks: 0,
918 anchor_breaches: 0,
919 errors: 0,
920 retries: 0,
921 source_file: source_file.to_string(),
922 backend: backend.to_string(),
923 client_key: client_key.to_string(),
924 events: Vec::new(),
925 replay_of: None,
926 annotations: Vec::new(),
927 correlation_id: None,
928 }
929}
930
931#[cfg(test)]
934mod tests {
935 use super::*;
936
937 fn sample_trace(name: &str, status: TraceStatus) -> TraceEntry {
938 let mut t = build_trace(name, "test.axon", "anthropic", "token_a", status, 3, 150);
939 t.tokens_input = 100;
940 t.tokens_output = 50;
941 t.anchor_checks = 2;
942 t.events.push(TraceEvent {
943 event_type: "step_start".into(),
944 offset_ms: 0,
945 step_name: "step1".into(),
946 detail: "starting".into(),
947 });
948 t
949 }
950
951 #[test]
952 fn record_and_retrieve() {
953 let mut store = TraceStore::new(TraceStoreConfig::default());
954 let id = store.record(sample_trace("FlowA", TraceStatus::Success));
955 assert_eq!(id, 1);
956 assert_eq!(store.len(), 1);
957
958 let entry = store.get(id).unwrap();
959 assert_eq!(entry.flow_name, "FlowA");
960 assert_eq!(entry.status, TraceStatus::Success);
961 assert_eq!(entry.steps_executed, 3);
962 assert!(entry.timestamp > 0);
963 }
964
965 #[test]
966 fn ring_buffer_eviction() {
967 let config = TraceStoreConfig { capacity: 3, enabled: true, max_events_per_trace: 100, max_age_secs: 0 };
968 let mut store = TraceStore::new(config);
969
970 for i in 0..5 {
971 store.record(sample_trace(&format!("Flow{}", i), TraceStatus::Success));
972 }
973
974 assert_eq!(store.len(), 3);
975 assert_eq!(store.total_recorded(), 5);
976
977 let recent = store.recent(10, None);
978 assert_eq!(recent[0].flow_name, "Flow4");
979 assert_eq!(recent[2].flow_name, "Flow2");
980 }
981
982 #[test]
983 fn disabled_store() {
984 let mut store = TraceStore::new(TraceStoreConfig::disabled());
985 let id = store.record(sample_trace("X", TraceStatus::Success));
986 assert_eq!(id, 0);
987 assert_eq!(store.len(), 0);
988 assert_eq!(store.total_recorded(), 0);
989 }
990
991 #[test]
994 fn reserve_id_monotonic_and_consumes_next_id() {
995 let mut store = TraceStore::new(TraceStoreConfig::default());
996 let id1 = store.reserve_id();
997 let id2 = store.reserve_id();
998 let id3 = store.reserve_id();
999 assert_eq!(id1, 1);
1000 assert_eq!(id2, 2);
1001 assert_eq!(id3, 3);
1002 assert_eq!(store.len(), 0);
1005 assert_eq!(store.total_recorded(), 0);
1006 }
1007
1008 #[test]
1009 fn reserve_id_disabled_store_returns_zero() {
1010 let mut store = TraceStore::new(TraceStoreConfig::disabled());
1011 let id = store.reserve_id();
1012 assert_eq!(id, 0);
1013 }
1014
1015 #[test]
1016 fn record_with_id_persists_under_reserved_id() {
1017 let mut store = TraceStore::new(TraceStoreConfig::default());
1018 let id = store.reserve_id();
1019 store.record_with_id(sample_trace("ReservedFlow", TraceStatus::Success), id);
1020 let entry = store.get(id).expect("entry must exist under reserved id");
1021 assert_eq!(entry.id, id);
1022 assert_eq!(entry.flow_name, "ReservedFlow");
1023 assert_eq!(store.len(), 1);
1024 assert_eq!(store.total_recorded(), 1);
1025 }
1026
1027 #[test]
1028 fn record_with_id_does_not_advance_next_id() {
1029 let mut store = TraceStore::new(TraceStoreConfig::default());
1030 let reserved = store.reserve_id();
1031 store.record_with_id(sample_trace("X", TraceStatus::Success), reserved);
1032 let next = store.record(sample_trace("Y", TraceStatus::Success));
1035 assert!(
1036 next > reserved,
1037 "record after record_with_id must produce a strictly greater id"
1038 );
1039 }
1040
1041 #[test]
1042 fn record_with_id_disabled_store_is_noop() {
1043 let mut store = TraceStore::new(TraceStoreConfig::disabled());
1044 let id = store.reserve_id();
1045 store.record_with_id(sample_trace("X", TraceStatus::Success), id);
1046 assert_eq!(store.len(), 0);
1047 assert_eq!(store.total_recorded(), 0);
1048 }
1049
1050 #[test]
1051 fn reserve_then_record_preserves_audit_correlation() {
1052 let mut store = TraceStore::new(TraceStoreConfig::default());
1056 let wire_trace_id = store.reserve_id();
1057
1058 let mut entry = sample_trace("LiveStreaming", TraceStatus::Success);
1062 entry.steps_executed = 5;
1063 entry.tokens_output = 42;
1064 store.record_with_id(entry, wire_trace_id);
1065
1066 let recovered = store.get(wire_trace_id).expect("audit lookup must succeed");
1067 assert_eq!(recovered.id, wire_trace_id);
1068 assert_eq!(recovered.steps_executed, 5);
1069 assert_eq!(recovered.tokens_output, 42);
1070 }
1071
1072 #[test]
1073 fn filter_by_flow_name() {
1074 let mut store = TraceStore::new(TraceStoreConfig::default());
1075 store.record(sample_trace("Alpha", TraceStatus::Success));
1076 store.record(sample_trace("Beta", TraceStatus::Success));
1077 store.record(sample_trace("Alpha", TraceStatus::Failed));
1078
1079 let filter = TraceFilter { flow_name: Some("Alpha".into()), ..Default::default() };
1080 let result = store.recent(10, Some(&filter));
1081 assert_eq!(result.len(), 2);
1082 }
1083
1084 #[test]
1085 fn filter_by_status() {
1086 let mut store = TraceStore::new(TraceStoreConfig::default());
1087 store.record(sample_trace("A", TraceStatus::Success));
1088 store.record(sample_trace("B", TraceStatus::Failed));
1089 store.record(sample_trace("C", TraceStatus::Success));
1090
1091 let filter = TraceFilter { status: Some("failed".into()), ..Default::default() };
1092 let result = store.recent(10, Some(&filter));
1093 assert_eq!(result.len(), 1);
1094 assert_eq!(result[0].flow_name, "B");
1095 }
1096
1097 #[test]
1098 fn filter_by_has_errors() {
1099 let mut store = TraceStore::new(TraceStoreConfig::default());
1100
1101 let mut t1 = sample_trace("A", TraceStatus::Success);
1102 t1.errors = 0;
1103 store.record(t1);
1104
1105 let mut t2 = sample_trace("B", TraceStatus::Failed);
1106 t2.errors = 2;
1107 store.record(t2);
1108
1109 let filter = TraceFilter { has_errors: Some(true), ..Default::default() };
1110 let result = store.recent(10, Some(&filter));
1111 assert_eq!(result.len(), 1);
1112 assert_eq!(result[0].flow_name, "B");
1113 }
1114
1115 #[test]
1116 fn stats_computation() {
1117 let mut store = TraceStore::new(TraceStoreConfig::default());
1118
1119 let mut t1 = sample_trace("Alpha", TraceStatus::Success);
1120 t1.latency_ms = 100;
1121 t1.tokens_input = 200;
1122 t1.tokens_output = 100;
1123 store.record(t1);
1124
1125 let mut t2 = sample_trace("Alpha", TraceStatus::Success);
1126 t2.latency_ms = 300;
1127 t2.tokens_input = 400;
1128 t2.tokens_output = 200;
1129 t2.errors = 1;
1130 store.record(t2);
1131
1132 let mut t3 = sample_trace("Beta", TraceStatus::Failed);
1133 t3.latency_ms = 500;
1134 t3.errors = 2;
1135 store.record(t3);
1136
1137 let stats = store.stats();
1138 assert_eq!(stats.total_recorded, 3);
1139 assert_eq!(stats.buffered, 3);
1140 assert_eq!(stats.avg_latency_ms, 300); assert_eq!(stats.max_latency_ms, 500);
1142 assert_eq!(stats.total_tokens_input, 700); assert_eq!(stats.total_errors, 3);
1144 assert_eq!(stats.top_flows[0].0, "Alpha");
1145 assert_eq!(stats.top_flows[0].1, 2);
1146 }
1147
1148 #[test]
1149 fn stats_empty_store() {
1150 let store = TraceStore::new(TraceStoreConfig::default());
1151 let stats = store.stats();
1152 assert_eq!(stats.total_recorded, 0);
1153 assert_eq!(stats.avg_latency_ms, 0);
1154 assert_eq!(stats.max_latency_ms, 0);
1155 }
1156
1157 #[test]
1158 fn trace_status_serde() {
1159 assert_eq!(TraceStatus::Success.as_str(), "success");
1160 assert_eq!(TraceStatus::Failed.as_str(), "failed");
1161 assert_eq!(TraceStatus::Partial.as_str(), "partial");
1162 assert_eq!(TraceStatus::Timeout.as_str(), "timeout");
1163
1164 let json = serde_json::to_value(TraceStatus::Success).unwrap();
1165 assert_eq!(json, "success");
1166 }
1167
1168 #[test]
1169 fn trace_entry_serializable() {
1170 let t = sample_trace("TestFlow", TraceStatus::Success);
1171 let json = serde_json::to_value(&t).unwrap();
1172 assert_eq!(json["flow_name"], "TestFlow");
1173 assert_eq!(json["status"], "success");
1174 assert_eq!(json["steps_executed"], 3);
1175 assert!(json["events"].is_array());
1176 }
1177
1178 #[test]
1179 fn stats_serializable() {
1180 let store = TraceStore::new(TraceStoreConfig::default());
1181 let stats = store.stats();
1182 let json = serde_json::to_value(&stats).unwrap();
1183 assert_eq!(json["total_recorded"], 0);
1184 assert_eq!(json["buffered"], 0);
1185 assert!(json["top_flows"].is_array());
1186 }
1187
1188 #[test]
1189 fn config_serializable() {
1190 let cfg = TraceStoreConfig::default();
1191 let json = serde_json::to_value(&cfg).unwrap();
1192 assert_eq!(json["capacity"], 500);
1193 assert_eq!(json["enabled"], true);
1194 assert_eq!(json["max_events_per_trace"], 200);
1195 }
1196
1197 #[test]
1198 fn event_truncation() {
1199 let config = TraceStoreConfig { capacity: 10, enabled: true, max_events_per_trace: 3, max_age_secs: 0 };
1200 let mut store = TraceStore::new(config);
1201
1202 let mut t = sample_trace("X", TraceStatus::Success);
1203 for i in 0..10 {
1204 t.events.push(TraceEvent {
1205 event_type: "test".into(),
1206 offset_ms: i,
1207 step_name: "s".into(),
1208 detail: "d".into(),
1209 });
1210 }
1211 let id = store.record(t);
1212 let entry = store.get(id).unwrap();
1213 assert_eq!(entry.events.len(), 3);
1214 }
1215
1216 #[test]
1217 fn clear_preserves_total() {
1218 let mut store = TraceStore::new(TraceStoreConfig::default());
1219 store.record(sample_trace("A", TraceStatus::Success));
1220 store.record(sample_trace("B", TraceStatus::Success));
1221 assert_eq!(store.len(), 2);
1222
1223 store.clear();
1224 assert_eq!(store.len(), 0);
1225 assert!(store.is_empty());
1226 assert_eq!(store.total_recorded(), 2);
1227 }
1228
1229 #[test]
1232 fn export_format_parsing() {
1233 assert_eq!(ExportFormat::from_str("jsonl"), ExportFormat::JsonLines);
1234 assert_eq!(ExportFormat::from_str("JSONL"), ExportFormat::JsonLines);
1235 assert_eq!(ExportFormat::from_str("json"), ExportFormat::JsonLines);
1236 assert_eq!(ExportFormat::from_str("csv"), ExportFormat::Csv);
1237 assert_eq!(ExportFormat::from_str("CSV"), ExportFormat::Csv);
1238 assert_eq!(ExportFormat::from_str("prometheus"), ExportFormat::Prometheus);
1239 assert_eq!(ExportFormat::from_str("prom"), ExportFormat::Prometheus);
1240 assert_eq!(ExportFormat::from_str("unknown"), ExportFormat::JsonLines);
1241 }
1242
1243 #[test]
1244 fn export_format_content_type() {
1245 assert_eq!(ExportFormat::JsonLines.content_type(), "application/x-ndjson");
1246 assert_eq!(ExportFormat::Csv.content_type(), "text/csv");
1247 assert!(ExportFormat::Prometheus.content_type().starts_with("text/plain"));
1248 }
1249
1250 #[test]
1251 fn entry_to_span_conversion() {
1252 let mut store = TraceStore::new(TraceStoreConfig::default());
1253 let id = store.record(sample_trace("FlowX", TraceStatus::Success));
1254 let entry = store.get(id).unwrap();
1255
1256 let span = entry_to_span(entry);
1257 assert_eq!(span.trace_id, format!("axt-{}", id));
1258 assert_eq!(span.name, "FlowX");
1259 assert_eq!(span.status, "success");
1260 assert_eq!(span.duration_ms, 150);
1261 assert_eq!(span.resource.service_name, "axon-server");
1262 assert_eq!(span.resource.backend, "anthropic");
1263 assert_eq!(span.resource.client_key, "token_a");
1264 assert_eq!(span.attributes.steps_executed, 3);
1265 assert_eq!(span.attributes.tokens_input, 100);
1266 assert_eq!(span.attributes.tokens_output, 50);
1267 assert_eq!(span.attributes.tokens_total, 150);
1268 assert_eq!(span.attributes.anchor_checks, 2);
1269 assert_eq!(span.events.len(), 1);
1270 assert_eq!(span.events[0].name, "step_start");
1271 }
1272
1273 #[test]
1274 fn export_jsonl_format() {
1275 let mut store = TraceStore::new(TraceStoreConfig::default());
1276 store.record(sample_trace("A", TraceStatus::Success));
1277 store.record(sample_trace("B", TraceStatus::Failed));
1278 let entries = store.recent(10, None);
1279
1280 let jsonl = export_jsonl(&entries);
1281 let lines: Vec<&str> = jsonl.lines().collect();
1282 assert_eq!(lines.len(), 2);
1283
1284 let first: serde_json::Value = serde_json::from_str(lines[0]).unwrap();
1286 assert_eq!(first["name"], "B"); assert_eq!(first["status"], "failed");
1288 assert!(first["trace_id"].as_str().unwrap().starts_with("axt-"));
1289 assert!(first["resource"]["service_name"].as_str().unwrap() == "axon-server");
1290
1291 let second: serde_json::Value = serde_json::from_str(lines[1]).unwrap();
1292 assert_eq!(second["name"], "A");
1293 assert_eq!(second["status"], "success");
1294 }
1295
1296 #[test]
1297 fn export_csv_format() {
1298 let mut store = TraceStore::new(TraceStoreConfig::default());
1299 store.record(sample_trace("FlowA", TraceStatus::Success));
1300 store.record(sample_trace("FlowB", TraceStatus::Failed));
1301 let entries = store.recent(10, None);
1302
1303 let csv = export_csv(&entries);
1304 let lines: Vec<&str> = csv.lines().collect();
1305 assert_eq!(lines.len(), 3); assert!(lines[0].starts_with("trace_id,"));
1309 assert!(lines[0].contains("flow_name"));
1310 assert!(lines[0].contains("latency_ms"));
1311 assert!(lines[0].contains("event_count"));
1312
1313 assert!(lines[1].contains("FlowB")); assert!(lines[1].contains("failed"));
1316 assert!(lines[2].contains("FlowA"));
1317 assert!(lines[2].contains("success"));
1318 }
1319
1320 #[test]
1321 fn export_prometheus_format() {
1322 let mut store = TraceStore::new(TraceStoreConfig::default());
1323 let mut t1 = sample_trace("A", TraceStatus::Success);
1324 t1.latency_ms = 200;
1325 t1.errors = 0;
1326 store.record(t1);
1327 let mut t2 = sample_trace("B", TraceStatus::Failed);
1328 t2.latency_ms = 400;
1329 t2.errors = 2;
1330 store.record(t2);
1331 let entries = store.recent(10, None);
1332
1333 let prom = export_prometheus(&entries);
1334 assert!(prom.contains("axon_trace_export_count 2"));
1335 assert!(prom.contains("axon_trace_export_latency_avg_ms 300")); assert!(prom.contains("axon_trace_export_latency_max_ms 400"));
1337 assert!(prom.contains("axon_trace_export_errors_total 2"));
1338 assert!(prom.contains("axon_trace_export_by_status{status=\"success\"} 1"));
1339 assert!(prom.contains("axon_trace_export_by_status{status=\"failed\"} 1"));
1340 assert!(prom.contains("# HELP axon_trace_export_count"));
1341 assert!(prom.contains("# TYPE axon_trace_export_count gauge"));
1342 }
1343
1344 #[test]
1345 fn export_empty_traces() {
1346 let entries: Vec<&TraceEntry> = vec![];
1347 let jsonl = export_jsonl(&entries);
1348 assert!(jsonl.is_empty());
1349
1350 let csv = export_csv(&entries);
1351 let lines: Vec<&str> = csv.lines().collect();
1352 assert_eq!(lines.len(), 1); let prom = export_prometheus(&entries);
1355 assert!(prom.contains("axon_trace_export_count 0"));
1356 assert!(prom.contains("axon_trace_export_latency_avg_ms 0"));
1357 }
1358
1359 #[test]
1360 fn span_serializable() {
1361 let mut store = TraceStore::new(TraceStoreConfig::default());
1362 store.record(sample_trace("Test", TraceStatus::Success));
1363 let entry = store.get(1).unwrap();
1364 let span = entry_to_span(entry);
1365 let json = serde_json::to_value(&span).unwrap();
1366 assert!(json["trace_id"].is_string());
1367 assert!(json["resource"].is_object());
1368 assert!(json["attributes"].is_object());
1369 assert!(json["events"].is_array());
1370 }
1371}