Skip to main content

axon/
trace_store.rs

1//! Trace Store — in-memory execution trace buffer for AxonServer.
2//!
3//! Collects execution traces from deployed flow runs and provides
4//! queryable access via the server API:
5//!   - `GET /v1/traces`       — list/query recent traces
6//!   - `GET /v1/traces/:id`   — get a specific trace by ID
7//!   - `GET /v1/traces/stats` — aggregate analytics across buffered traces
8//!
9//! The store is a ring buffer with configurable capacity.
10//! Each trace records: flow name, status, steps executed, latency,
11//! token usage, anchor results, and a timestamped event log.
12
13use std::collections::VecDeque;
14use std::time::{SystemTime, UNIX_EPOCH};
15
16use serde::{Deserialize, Serialize};
17
18// ── Trace entry ─────────────────────────────────────────────────────────
19
20/// A single execution trace.
21#[derive(Debug, Clone, Serialize)]
22pub struct TraceEntry {
23    /// Unique trace ID (sequential).
24    pub id: u64,
25    /// Wall-clock timestamp (Unix seconds).
26    pub timestamp: u64,
27    /// Flow name that was executed.
28    pub flow_name: String,
29    /// Execution status.
30    pub status: TraceStatus,
31    /// Number of steps executed.
32    pub steps_executed: usize,
33    /// Total latency in milliseconds.
34    pub latency_ms: u64,
35    /// Input tokens used (if known).
36    pub tokens_input: u64,
37    /// Output tokens used (if known).
38    pub tokens_output: u64,
39    /// Anchor checks performed.
40    pub anchor_checks: usize,
41    /// Anchor breaches detected.
42    pub anchor_breaches: usize,
43    /// Error count.
44    pub errors: usize,
45    /// Retry count.
46    pub retries: usize,
47    /// Source file.
48    pub source_file: String,
49    /// Backend used (e.g., "anthropic").
50    pub backend: String,
51    /// Client identifier.
52    pub client_key: String,
53    /// Ordered event log.
54    pub events: Vec<TraceEvent>,
55    /// If this trace is a replay, the ID of the original trace.
56    #[serde(skip_serializing_if = "Option::is_none")]
57    pub replay_of: Option<u64>,
58    /// User-added annotations for debugging and collaboration.
59    #[serde(skip_serializing_if = "Vec::is_empty")]
60    pub annotations: Vec<TraceAnnotation>,
61    /// Correlation ID for linking related traces.
62    #[serde(skip_serializing_if = "Option::is_none")]
63    pub correlation_id: Option<String>,
64}
65
66/// A user-added annotation on a trace.
67#[derive(Debug, Clone, Serialize, Deserialize)]
68pub struct TraceAnnotation {
69    /// Who added the annotation.
70    pub author: String,
71    /// Free-form note text.
72    pub text: String,
73    /// Tags for categorization/filtering.
74    pub tags: Vec<String>,
75    /// Unix timestamp when the annotation was added.
76    pub timestamp: u64,
77}
78
79/// Execution status for a trace.
80#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
81#[serde(rename_all = "lowercase")]
82pub enum TraceStatus {
83    Success,
84    Failed,
85    Partial,
86    Timeout,
87}
88
89impl TraceStatus {
90    pub fn as_str(&self) -> &'static str {
91        match self {
92            TraceStatus::Success => "success",
93            TraceStatus::Failed => "failed",
94            TraceStatus::Partial => "partial",
95            TraceStatus::Timeout => "timeout",
96        }
97    }
98}
99
100/// A single event within a trace.
101#[derive(Debug, Clone, Serialize)]
102pub struct TraceEvent {
103    /// Event type (step_start, step_end, anchor_check, model_call, error, etc.).
104    pub event_type: String,
105    /// Relative timestamp in milliseconds from trace start.
106    pub offset_ms: u64,
107    /// Step name (if applicable).
108    pub step_name: String,
109    /// Event detail (free-form).
110    pub detail: String,
111}
112
113// ── Config ──────────────────────────────────────────────────────────────
114
115/// Trace store configuration.
116#[derive(Debug, Clone, Serialize, Deserialize)]
117pub struct TraceStoreConfig {
118    /// Maximum traces in the ring buffer.
119    pub capacity: usize,
120    /// Whether trace recording is enabled.
121    pub enabled: bool,
122    /// Maximum events per trace (to prevent memory bloat).
123    pub max_events_per_trace: usize,
124    /// Maximum age of a trace in seconds (0 = no TTL).
125    pub max_age_secs: u64,
126}
127
128impl Default for TraceStoreConfig {
129    fn default() -> Self {
130        TraceStoreConfig {
131            capacity: 500,
132            enabled: true,
133            max_events_per_trace: 200,
134            max_age_secs: 0,
135        }
136    }
137}
138
139impl TraceStoreConfig {
140    pub fn disabled() -> Self {
141        TraceStoreConfig {
142            capacity: 0,
143            enabled: false,
144            max_events_per_trace: 0,
145            max_age_secs: 0,
146        }
147    }
148}
149
150// ── Store ───────────────────────────────────────────────────────────────
151
152/// In-memory ring buffer for execution traces.
153pub struct TraceStore {
154    config: TraceStoreConfig,
155    entries: VecDeque<TraceEntry>,
156    next_id: u64,
157    total_recorded: u64,
158}
159
160impl TraceStore {
161    /// Create a new trace store.
162    pub fn new(config: TraceStoreConfig) -> Self {
163        TraceStore {
164            entries: VecDeque::with_capacity(config.capacity.min(512)),
165            config,
166            next_id: 1,
167            total_recorded: 0,
168        }
169    }
170
171    /// Record a new trace. Returns the assigned trace ID.
172    pub fn record(&mut self, mut trace: TraceEntry) -> u64 {
173        if !self.config.enabled {
174            return 0;
175        }
176
177        let id = self.next_id;
178        self.next_id += 1;
179        self.total_recorded += 1;
180
181        trace.id = id;
182        trace.timestamp = wall_clock_secs();
183
184        // Truncate events if over limit
185        if trace.events.len() > self.config.max_events_per_trace {
186            trace.events.truncate(self.config.max_events_per_trace);
187        }
188
189        // Evict oldest if at capacity
190        if self.entries.len() >= self.config.capacity && self.config.capacity > 0 {
191            self.entries.pop_front();
192        }
193        if self.config.capacity > 0 {
194            self.entries.push_back(trace);
195        }
196
197        id
198    }
199
200    /// §Fase 33.c — Reserve a trace ID without recording the entry yet.
201    ///
202    /// Wire-streaming sites (`execute_sse_handler`) need a trace ID up
203    /// front so the first `axon.token` event can carry it on the wire
204    /// — adopters bind the live stream to the eventual replay surface
205    /// via this id. The entry is finalized via `record_with_id` once
206    /// the executor closes the FlowExecutionEvent channel.
207    ///
208    /// When the store is disabled the call still returns 0 to match
209    /// `record`'s disabled-path semantics.
210    pub fn reserve_id(&mut self) -> u64 {
211        if !self.config.enabled {
212            return 0;
213        }
214        let id = self.next_id;
215        self.next_id += 1;
216        id
217    }
218
219    /// §Fase 33.c — Persist a trace entry under a previously reserved id.
220    ///
221    /// Pairs with `reserve_id`: callers reserve early to plumb the id
222    /// into wire events, then call here once execution has completed
223    /// with final stats. The capacity / eviction / timestamp semantics
224    /// match `record` verbatim — only the id-allocation timing differs.
225    pub fn record_with_id(&mut self, mut trace: TraceEntry, id: u64) {
226        if !self.config.enabled {
227            return;
228        }
229        self.total_recorded += 1;
230        trace.id = id;
231        trace.timestamp = wall_clock_secs();
232
233        if trace.events.len() > self.config.max_events_per_trace {
234            trace.events.truncate(self.config.max_events_per_trace);
235        }
236
237        if self.entries.len() >= self.config.capacity && self.config.capacity > 0 {
238            self.entries.pop_front();
239        }
240        if self.config.capacity > 0 {
241            self.entries.push_back(trace);
242        }
243    }
244
245    /// Get a trace by ID.
246    pub fn get(&self, id: u64) -> Option<&TraceEntry> {
247        self.entries.iter().find(|e| e.id == id)
248    }
249
250    /// Query recent traces (newest first), optionally filtered.
251    pub fn recent(&self, limit: usize, filter: Option<&TraceFilter>) -> Vec<&TraceEntry> {
252        self.entries
253            .iter()
254            .rev()
255            .filter(|e| match filter {
256                Some(f) => f.matches(e),
257                None => true,
258            })
259            .take(limit)
260            .collect()
261    }
262
263    /// Number of buffered traces.
264    pub fn len(&self) -> usize {
265        self.entries.len()
266    }
267
268    /// Whether the buffer is empty.
269    pub fn is_empty(&self) -> bool {
270        self.entries.is_empty()
271    }
272
273    /// Total traces recorded (including evicted).
274    pub fn total_recorded(&self) -> u64 {
275        self.total_recorded
276    }
277
278    /// Get a mutable trace by ID (for annotations).
279    pub fn get_mut(&mut self, id: u64) -> Option<&mut TraceEntry> {
280        self.entries.iter_mut().find(|e| e.id == id)
281    }
282
283    /// Annotate a trace by ID. Returns true if the trace was found.
284    pub fn annotate(&mut self, id: u64, annotation: TraceAnnotation) -> bool {
285        match self.get_mut(id) {
286            Some(entry) => {
287                entry.annotations.push(annotation);
288                true
289            }
290            None => false,
291        }
292    }
293
294    /// Bulk delete traces by IDs. Returns number actually deleted.
295    pub fn bulk_delete(&mut self, ids: &[u64]) -> usize {
296        let before = self.entries.len();
297        self.entries.retain(|e| !ids.contains(&e.id));
298        before - self.entries.len()
299    }
300
301    /// Bulk annotate traces by IDs. Returns number of traces annotated.
302    pub fn bulk_annotate(&mut self, ids: &[u64], annotation: TraceAnnotation) -> usize {
303        let mut count = 0;
304        for entry in self.entries.iter_mut() {
305            if ids.contains(&entry.id) {
306                entry.annotations.push(annotation.clone());
307                count += 1;
308            }
309        }
310        count
311    }
312
313    /// Full-text search across buffered traces.
314    ///
315    /// Matches the query (case-insensitive substring) against:
316    /// flow_name, source_file, backend, client_key, event step_name,
317    /// event detail, annotation text, and annotation tags.
318    pub fn search(&self, query: &str, limit: usize) -> Vec<&TraceEntry> {
319        let q = query.to_lowercase();
320        self.entries
321            .iter()
322            .rev()
323            .filter(|e| {
324                e.flow_name.to_lowercase().contains(&q)
325                    || e.source_file.to_lowercase().contains(&q)
326                    || e.backend.to_lowercase().contains(&q)
327                    || e.client_key.to_lowercase().contains(&q)
328                    || e.events.iter().any(|ev| {
329                        ev.step_name.to_lowercase().contains(&q)
330                            || ev.detail.to_lowercase().contains(&q)
331                    })
332                    || e.annotations.iter().any(|a| {
333                        a.text.to_lowercase().contains(&q)
334                            || a.tags.iter().any(|t| t.to_lowercase().contains(&q))
335                    })
336            })
337            .take(limit)
338            .collect()
339    }
340
341    /// Evict traces older than `max_age_secs`. Returns number evicted.
342    /// No-op if max_age_secs is 0.
343    pub fn evict_expired(&mut self) -> usize {
344        if self.config.max_age_secs == 0 {
345            return 0;
346        }
347        let now = wall_clock_secs();
348        let cutoff = now.saturating_sub(self.config.max_age_secs);
349        let before = self.entries.len();
350        self.entries.retain(|e| e.timestamp >= cutoff);
351        before - self.entries.len()
352    }
353
354    /// Update retention policy. Returns previous max_age_secs.
355    pub fn set_max_age_secs(&mut self, max_age_secs: u64) -> u64 {
356        let prev = self.config.max_age_secs;
357        self.config.max_age_secs = max_age_secs;
358        prev
359    }
360
361    /// Get configuration.
362    pub fn config(&self) -> &TraceStoreConfig {
363        &self.config
364    }
365
366    /// Compute aggregate statistics across buffered traces.
367    pub fn stats(&self) -> TraceStoreStats {
368        let mut total_latency_ms: u64 = 0;
369        let mut max_latency_ms: u64 = 0;
370        let mut total_tokens_input: u64 = 0;
371        let mut total_tokens_output: u64 = 0;
372        let mut total_steps: usize = 0;
373        let mut total_anchor_checks: usize = 0;
374        let mut total_anchor_breaches: usize = 0;
375        let mut total_errors: usize = 0;
376        let mut total_retries: usize = 0;
377        let mut status_counts = std::collections::HashMap::new();
378        let mut flow_counts = std::collections::HashMap::new();
379
380        for e in &self.entries {
381            total_latency_ms += e.latency_ms;
382            if e.latency_ms > max_latency_ms {
383                max_latency_ms = e.latency_ms;
384            }
385            total_tokens_input += e.tokens_input;
386            total_tokens_output += e.tokens_output;
387            total_steps += e.steps_executed;
388            total_anchor_checks += e.anchor_checks;
389            total_anchor_breaches += e.anchor_breaches;
390            total_errors += e.errors;
391            total_retries += e.retries;
392            *status_counts.entry(e.status.as_str().to_string()).or_insert(0u64) += 1;
393            *flow_counts.entry(e.flow_name.clone()).or_insert(0u64) += 1;
394        }
395
396        let count = self.entries.len() as u64;
397        let avg_latency_ms = if count > 0 { total_latency_ms / count } else { 0 };
398
399        let mut top_flows: Vec<(String, u64)> = flow_counts.into_iter().collect();
400        top_flows.sort_by(|a, b| b.1.cmp(&a.1));
401        top_flows.truncate(10);
402
403        let mut status_breakdown: Vec<(String, u64)> = status_counts.into_iter().collect();
404        status_breakdown.sort_by_key(|(k, _)| k.clone());
405
406        TraceStoreStats {
407            total_recorded: self.total_recorded,
408            buffered: self.entries.len(),
409            avg_latency_ms,
410            max_latency_ms,
411            total_tokens_input,
412            total_tokens_output,
413            total_steps,
414            total_anchor_checks,
415            total_anchor_breaches,
416            total_errors,
417            total_retries,
418            top_flows,
419            status_breakdown,
420        }
421    }
422
423    /// Aggregate traces within a time window (seconds from now).
424    /// Returns percentiles, error rate, and per-flow stats.
425    /// If window_secs is 0, aggregates all buffered traces.
426    pub fn aggregate(&self, window_secs: u64) -> TraceAggregate {
427        let now = wall_clock_secs();
428        let cutoff = if window_secs > 0 { now.saturating_sub(window_secs) } else { 0 };
429
430        let window_entries: Vec<&TraceEntry> = self.entries
431            .iter()
432            .filter(|e| e.timestamp >= cutoff)
433            .collect();
434
435        let count = window_entries.len();
436        if count == 0 {
437            return TraceAggregate {
438                window_secs,
439                count: 0,
440                error_rate: 0.0,
441                avg_latency_ms: 0,
442                p50_latency_ms: 0,
443                p95_latency_ms: 0,
444                p99_latency_ms: 0,
445                min_latency_ms: 0,
446                max_latency_ms: 0,
447                total_tokens: 0,
448                avg_steps: 0.0,
449                flows: Vec::new(),
450            };
451        }
452
453        let mut latencies: Vec<u64> = window_entries.iter().map(|e| e.latency_ms).collect();
454        latencies.sort();
455
456        let error_count = window_entries.iter().filter(|e| e.errors > 0).count();
457        let total_latency: u64 = latencies.iter().sum();
458        let total_tokens: u64 = window_entries.iter().map(|e| e.tokens_input + e.tokens_output).sum();
459        let total_steps: f64 = window_entries.iter().map(|e| e.steps_executed as f64).sum();
460
461        // Per-flow aggregation
462        let mut flow_map: std::collections::HashMap<String, (u64, u64, usize)> = std::collections::HashMap::new();
463        for e in &window_entries {
464            let entry = flow_map.entry(e.flow_name.clone()).or_insert((0, 0, 0));
465            entry.0 += 1; // count
466            entry.1 += e.latency_ms; // total latency
467            if e.errors > 0 { entry.2 += 1; } // errors
468        }
469        let mut flows: Vec<FlowAggregate> = flow_map.into_iter().map(|(name, (cnt, lat, errs))| {
470            FlowAggregate {
471                flow_name: name,
472                count: cnt,
473                avg_latency_ms: if cnt > 0 { lat / cnt } else { 0 },
474                errors: errs as u64,
475            }
476        }).collect();
477        flows.sort_by(|a, b| b.count.cmp(&a.count));
478
479        TraceAggregate {
480            window_secs,
481            count: count as u64,
482            error_rate: error_count as f64 / count as f64,
483            avg_latency_ms: total_latency / count as u64,
484            p50_latency_ms: percentile(&latencies, 50),
485            p95_latency_ms: percentile(&latencies, 95),
486            p99_latency_ms: percentile(&latencies, 99),
487            min_latency_ms: latencies[0],
488            max_latency_ms: latencies[latencies.len() - 1],
489            total_tokens,
490            avg_steps: total_steps / count as f64,
491            flows,
492        }
493    }
494
495    /// Set correlation ID on a trace. Returns true if found.
496    pub fn set_correlation(&mut self, id: u64, correlation_id: &str) -> bool {
497        match self.get_mut(id) {
498            Some(entry) => {
499                entry.correlation_id = Some(correlation_id.to_string());
500                true
501            }
502            None => false,
503        }
504    }
505
506    /// Find all traces with a given correlation ID.
507    pub fn by_correlation(&self, correlation_id: &str) -> Vec<&TraceEntry> {
508        self.entries.iter()
509            .filter(|e| e.correlation_id.as_deref() == Some(correlation_id))
510            .collect()
511    }
512
513    /// Clear all buffered traces.
514    pub fn clear(&mut self) {
515        self.entries.clear();
516    }
517}
518
519/// Percentile from a sorted slice (nearest-rank method).
520fn percentile(sorted: &[u64], pct: usize) -> u64 {
521    if sorted.is_empty() { return 0; }
522    let idx = (pct * sorted.len() + 99) / 100;
523    sorted[idx.min(sorted.len()) - 1]
524}
525
526/// Aggregate result for a time window.
527#[derive(Debug, Clone, Serialize)]
528pub struct TraceAggregate {
529    pub window_secs: u64,
530    pub count: u64,
531    pub error_rate: f64,
532    pub avg_latency_ms: u64,
533    pub p50_latency_ms: u64,
534    pub p95_latency_ms: u64,
535    pub p99_latency_ms: u64,
536    pub min_latency_ms: u64,
537    pub max_latency_ms: u64,
538    pub total_tokens: u64,
539    pub avg_steps: f64,
540    pub flows: Vec<FlowAggregate>,
541}
542
543/// Per-flow aggregate within a window.
544#[derive(Debug, Clone, Serialize)]
545pub struct FlowAggregate {
546    pub flow_name: String,
547    pub count: u64,
548    pub avg_latency_ms: u64,
549    pub errors: u64,
550}
551
552// ── Filter ──────────────────────────────────────────────────────────────
553
554/// Filter for querying traces.
555#[derive(Debug, Clone, Default, Deserialize)]
556pub struct TraceFilter {
557    /// Filter by flow name.
558    pub flow_name: Option<String>,
559    /// Filter by status.
560    pub status: Option<String>,
561    /// Filter by client key.
562    pub client_key: Option<String>,
563    /// Only traces with latency >= this (ms).
564    pub min_latency_ms: Option<u64>,
565    /// Only traces with errors > 0.
566    pub has_errors: Option<bool>,
567    /// Only traces with this annotation tag.
568    pub tag: Option<String>,
569}
570
571impl TraceFilter {
572    pub fn matches(&self, entry: &TraceEntry) -> bool {
573        if let Some(ref name) = self.flow_name {
574            if entry.flow_name != *name {
575                return false;
576            }
577        }
578        if let Some(ref status) = self.status {
579            if entry.status.as_str() != status.as_str() {
580                return false;
581            }
582        }
583        if let Some(ref key) = self.client_key {
584            if entry.client_key != *key {
585                return false;
586            }
587        }
588        if let Some(min_lat) = self.min_latency_ms {
589            if entry.latency_ms < min_lat {
590                return false;
591            }
592        }
593        if let Some(has_err) = self.has_errors {
594            if has_err && entry.errors == 0 {
595                return false;
596            }
597            if !has_err && entry.errors > 0 {
598                return false;
599            }
600        }
601        if let Some(ref tag) = self.tag {
602            let has_tag = entry.annotations.iter().any(|a| a.tags.contains(tag));
603            if !has_tag {
604                return false;
605            }
606        }
607        true
608    }
609}
610
611// ── Stats ───────────────────────────────────────────────────────────────
612
613/// Aggregate statistics across buffered traces.
614#[derive(Debug, Clone, Serialize)]
615pub struct TraceStoreStats {
616    pub total_recorded: u64,
617    pub buffered: usize,
618    pub avg_latency_ms: u64,
619    pub max_latency_ms: u64,
620    pub total_tokens_input: u64,
621    pub total_tokens_output: u64,
622    pub total_steps: usize,
623    pub total_anchor_checks: usize,
624    pub total_anchor_breaches: usize,
625    pub total_errors: usize,
626    pub total_retries: usize,
627    pub top_flows: Vec<(String, u64)>,
628    pub status_breakdown: Vec<(String, u64)>,
629}
630
631// ── Export formats ─────────────────────────────────────────────────────
632
633/// Supported export formats.
634#[derive(Debug, Clone, Copy, PartialEq, Eq)]
635pub enum ExportFormat {
636    /// JSON Lines — one JSON object per trace, OpenTelemetry-like span structure.
637    JsonLines,
638    /// CSV — tabular rows with header.
639    Csv,
640    /// Prometheus exposition — aggregate metrics from buffered traces.
641    Prometheus,
642}
643
644impl ExportFormat {
645    /// Parse format string (case-insensitive). Default: JsonLines.
646    pub fn from_str(s: &str) -> Self {
647        match s.to_lowercase().as_str() {
648            "csv" => ExportFormat::Csv,
649            "prometheus" | "prom" => ExportFormat::Prometheus,
650            _ => ExportFormat::JsonLines,
651        }
652    }
653
654    /// MIME content type for the format.
655    pub fn content_type(&self) -> &'static str {
656        match self {
657            ExportFormat::JsonLines => "application/x-ndjson",
658            ExportFormat::Csv => "text/csv",
659            ExportFormat::Prometheus => "text/plain; version=0.0.4; charset=utf-8",
660        }
661    }
662}
663
664/// An OpenTelemetry-like span representation for trace export.
665#[derive(Debug, Clone, Serialize)]
666pub struct TraceSpan {
667    /// Trace ID (matches TraceEntry.id).
668    pub trace_id: String,
669    /// Span name (flow name).
670    pub name: String,
671    /// Start time (Unix seconds).
672    pub start_time_unix_secs: u64,
673    /// Duration in milliseconds.
674    pub duration_ms: u64,
675    /// Status code: "ok", "error", "partial", "timeout".
676    pub status: String,
677    /// Resource attributes.
678    pub resource: TraceSpanResource,
679    /// Span attributes (key-value metrics).
680    pub attributes: TraceSpanAttributes,
681    /// Events (sub-spans).
682    pub events: Vec<TraceSpanEvent>,
683}
684
685/// Resource metadata for a trace span.
686#[derive(Debug, Clone, Serialize)]
687pub struct TraceSpanResource {
688    pub service_name: String,
689    pub service_version: String,
690    pub source_file: String,
691    pub backend: String,
692    pub client_key: String,
693}
694
695/// Numeric attributes for a trace span.
696#[derive(Debug, Clone, Serialize)]
697pub struct TraceSpanAttributes {
698    pub steps_executed: usize,
699    pub tokens_input: u64,
700    pub tokens_output: u64,
701    pub tokens_total: u64,
702    pub anchor_checks: usize,
703    pub anchor_breaches: usize,
704    pub errors: usize,
705    pub retries: usize,
706}
707
708/// An event within a trace span (maps from TraceEvent).
709#[derive(Debug, Clone, Serialize)]
710pub struct TraceSpanEvent {
711    pub name: String,
712    pub offset_ms: u64,
713    pub attributes: std::collections::HashMap<String, String>,
714}
715
716/// Convert a TraceEntry to an OpenTelemetry-like span.
717pub fn entry_to_span(entry: &TraceEntry) -> TraceSpan {
718    let events = entry
719        .events
720        .iter()
721        .map(|e| {
722            let mut attrs = std::collections::HashMap::new();
723            if !e.step_name.is_empty() {
724                attrs.insert("step".to_string(), e.step_name.clone());
725            }
726            if !e.detail.is_empty() {
727                attrs.insert("detail".to_string(), e.detail.clone());
728            }
729            TraceSpanEvent {
730                name: e.event_type.clone(),
731                offset_ms: e.offset_ms,
732                attributes: attrs,
733            }
734        })
735        .collect();
736
737    TraceSpan {
738        trace_id: format!("axt-{}", entry.id),
739        name: entry.flow_name.clone(),
740        start_time_unix_secs: entry.timestamp,
741        duration_ms: entry.latency_ms,
742        status: entry.status.as_str().to_string(),
743        resource: TraceSpanResource {
744            service_name: "axon-server".to_string(),
745            service_version: crate::runner::AXON_VERSION.to_string(),
746            source_file: entry.source_file.clone(),
747            backend: entry.backend.clone(),
748            client_key: entry.client_key.clone(),
749        },
750        attributes: TraceSpanAttributes {
751            steps_executed: entry.steps_executed,
752            tokens_input: entry.tokens_input,
753            tokens_output: entry.tokens_output,
754            tokens_total: entry.tokens_input + entry.tokens_output,
755            anchor_checks: entry.anchor_checks,
756            anchor_breaches: entry.anchor_breaches,
757            errors: entry.errors,
758            retries: entry.retries,
759        },
760        events,
761    }
762}
763
764/// Export traces as JSON Lines (one JSON object per line).
765pub fn export_jsonl(entries: &[&TraceEntry]) -> String {
766    let mut out = String::new();
767    for entry in entries {
768        let span = entry_to_span(entry);
769        if let Ok(line) = serde_json::to_string(&span) {
770            out.push_str(&line);
771            out.push('\n');
772        }
773    }
774    out
775}
776
777/// Export traces as CSV with header row.
778pub fn export_csv(entries: &[&TraceEntry]) -> String {
779    let mut out = String::new();
780    out.push_str("trace_id,timestamp,flow_name,status,steps_executed,latency_ms,tokens_input,tokens_output,anchor_checks,anchor_breaches,errors,retries,source_file,backend,client_key,event_count\n");
781    for entry in entries {
782        out.push_str(&format!(
783            "axt-{},{},{},{},{},{},{},{},{},{},{},{},{},{},{},{}\n",
784            entry.id,
785            entry.timestamp,
786            entry.flow_name,
787            entry.status.as_str(),
788            entry.steps_executed,
789            entry.latency_ms,
790            entry.tokens_input,
791            entry.tokens_output,
792            entry.anchor_checks,
793            entry.anchor_breaches,
794            entry.errors,
795            entry.retries,
796            entry.source_file,
797            entry.backend,
798            entry.client_key,
799            entry.events.len(),
800        ));
801    }
802    out
803}
804
805/// Export aggregate metrics from traces as Prometheus exposition format.
806pub fn export_prometheus(entries: &[&TraceEntry]) -> String {
807    let count = entries.len() as u64;
808    let mut total_latency: u64 = 0;
809    let mut max_latency: u64 = 0;
810    let mut total_tokens_in: u64 = 0;
811    let mut total_tokens_out: u64 = 0;
812    let mut total_steps: u64 = 0;
813    let mut total_errors: u64 = 0;
814    let mut total_retries: u64 = 0;
815    let mut total_anchor_checks: u64 = 0;
816    let mut total_anchor_breaches: u64 = 0;
817    let mut status_counts: std::collections::HashMap<String, u64> = std::collections::HashMap::new();
818
819    for e in entries {
820        total_latency += e.latency_ms;
821        if e.latency_ms > max_latency {
822            max_latency = e.latency_ms;
823        }
824        total_tokens_in += e.tokens_input;
825        total_tokens_out += e.tokens_output;
826        total_steps += e.steps_executed as u64;
827        total_errors += e.errors as u64;
828        total_retries += e.retries as u64;
829        total_anchor_checks += e.anchor_checks as u64;
830        total_anchor_breaches += e.anchor_breaches as u64;
831        *status_counts.entry(e.status.as_str().to_string()).or_insert(0) += 1;
832    }
833
834    let avg_latency = if count > 0 { total_latency / count } else { 0 };
835
836    let mut out = String::new();
837
838    out.push_str("# HELP axon_trace_export_count Number of traces in this export.\n");
839    out.push_str("# TYPE axon_trace_export_count gauge\n");
840    out.push_str(&format!("axon_trace_export_count {}\n\n", count));
841
842    out.push_str("# HELP axon_trace_export_latency_avg_ms Average latency across exported traces.\n");
843    out.push_str("# TYPE axon_trace_export_latency_avg_ms gauge\n");
844    out.push_str(&format!("axon_trace_export_latency_avg_ms {}\n\n", avg_latency));
845
846    out.push_str("# HELP axon_trace_export_latency_max_ms Maximum latency across exported traces.\n");
847    out.push_str("# TYPE axon_trace_export_latency_max_ms gauge\n");
848    out.push_str(&format!("axon_trace_export_latency_max_ms {}\n\n", max_latency));
849
850    out.push_str("# HELP axon_trace_export_tokens_total Total tokens in exported traces.\n");
851    out.push_str("# TYPE axon_trace_export_tokens_total counter\n");
852    out.push_str(&format!("axon_trace_export_tokens_total{{type=\"input\"}} {}\n", total_tokens_in));
853    out.push_str(&format!("axon_trace_export_tokens_total{{type=\"output\"}} {}\n\n", total_tokens_out));
854
855    out.push_str("# HELP axon_trace_export_steps_total Total steps executed in exported traces.\n");
856    out.push_str("# TYPE axon_trace_export_steps_total counter\n");
857    out.push_str(&format!("axon_trace_export_steps_total {}\n\n", total_steps));
858
859    out.push_str("# HELP axon_trace_export_errors_total Total errors in exported traces.\n");
860    out.push_str("# TYPE axon_trace_export_errors_total counter\n");
861    out.push_str(&format!("axon_trace_export_errors_total {}\n\n", total_errors));
862
863    out.push_str("# HELP axon_trace_export_retries_total Total retries in exported traces.\n");
864    out.push_str("# TYPE axon_trace_export_retries_total counter\n");
865    out.push_str(&format!("axon_trace_export_retries_total {}\n\n", total_retries));
866
867    out.push_str("# HELP axon_trace_export_anchor_checks_total Total anchor checks in exported traces.\n");
868    out.push_str("# TYPE axon_trace_export_anchor_checks_total counter\n");
869    out.push_str(&format!("axon_trace_export_anchor_checks_total {}\n\n", total_anchor_checks));
870
871    out.push_str("# HELP axon_trace_export_anchor_breaches_total Total anchor breaches in exported traces.\n");
872    out.push_str("# TYPE axon_trace_export_anchor_breaches_total counter\n");
873    out.push_str(&format!("axon_trace_export_anchor_breaches_total {}\n\n", total_anchor_breaches));
874
875    if !status_counts.is_empty() {
876        out.push_str("# HELP axon_trace_export_by_status Count of exported traces by status.\n");
877        out.push_str("# TYPE axon_trace_export_by_status gauge\n");
878        let mut sorted: Vec<_> = status_counts.into_iter().collect();
879        sorted.sort_by_key(|(k, _)| k.clone());
880        for (status, n) in sorted {
881            out.push_str(&format!("axon_trace_export_by_status{{status=\"{}\"}} {}\n", status, n));
882        }
883        out.push('\n');
884    }
885
886    out
887}
888
889// ── Helpers ─────────────────────────────────────────────────────────────
890
891fn wall_clock_secs() -> u64 {
892    SystemTime::now()
893        .duration_since(UNIX_EPOCH)
894        .unwrap_or_default()
895        .as_secs()
896}
897
898/// Build a trace entry (convenience constructor for server use).
899pub fn build_trace(
900    flow_name: &str,
901    source_file: &str,
902    backend: &str,
903    client_key: &str,
904    status: TraceStatus,
905    steps: usize,
906    latency_ms: u64,
907) -> TraceEntry {
908    TraceEntry {
909        id: 0, // set by store
910        timestamp: 0, // set by store
911        flow_name: flow_name.to_string(),
912        status,
913        steps_executed: steps,
914        latency_ms,
915        tokens_input: 0,
916        tokens_output: 0,
917        anchor_checks: 0,
918        anchor_breaches: 0,
919        errors: 0,
920        retries: 0,
921        source_file: source_file.to_string(),
922        backend: backend.to_string(),
923        client_key: client_key.to_string(),
924        events: Vec::new(),
925        replay_of: None,
926        annotations: Vec::new(),
927        correlation_id: None,
928    }
929}
930
931// ── Tests ────────────────────────────────────────────────────────────────
932
933#[cfg(test)]
934mod tests {
935    use super::*;
936
937    fn sample_trace(name: &str, status: TraceStatus) -> TraceEntry {
938        let mut t = build_trace(name, "test.axon", "anthropic", "token_a", status, 3, 150);
939        t.tokens_input = 100;
940        t.tokens_output = 50;
941        t.anchor_checks = 2;
942        t.events.push(TraceEvent {
943            event_type: "step_start".into(),
944            offset_ms: 0,
945            step_name: "step1".into(),
946            detail: "starting".into(),
947        });
948        t
949    }
950
951    #[test]
952    fn record_and_retrieve() {
953        let mut store = TraceStore::new(TraceStoreConfig::default());
954        let id = store.record(sample_trace("FlowA", TraceStatus::Success));
955        assert_eq!(id, 1);
956        assert_eq!(store.len(), 1);
957
958        let entry = store.get(id).unwrap();
959        assert_eq!(entry.flow_name, "FlowA");
960        assert_eq!(entry.status, TraceStatus::Success);
961        assert_eq!(entry.steps_executed, 3);
962        assert!(entry.timestamp > 0);
963    }
964
965    #[test]
966    fn ring_buffer_eviction() {
967        let config = TraceStoreConfig { capacity: 3, enabled: true, max_events_per_trace: 100, max_age_secs: 0 };
968        let mut store = TraceStore::new(config);
969
970        for i in 0..5 {
971            store.record(sample_trace(&format!("Flow{}", i), TraceStatus::Success));
972        }
973
974        assert_eq!(store.len(), 3);
975        assert_eq!(store.total_recorded(), 5);
976
977        let recent = store.recent(10, None);
978        assert_eq!(recent[0].flow_name, "Flow4");
979        assert_eq!(recent[2].flow_name, "Flow2");
980    }
981
982    #[test]
983    fn disabled_store() {
984        let mut store = TraceStore::new(TraceStoreConfig::disabled());
985        let id = store.record(sample_trace("X", TraceStatus::Success));
986        assert_eq!(id, 0);
987        assert_eq!(store.len(), 0);
988        assert_eq!(store.total_recorded(), 0);
989    }
990
991    // §Fase 33.c — reserve_id / record_with_id
992
993    #[test]
994    fn reserve_id_monotonic_and_consumes_next_id() {
995        let mut store = TraceStore::new(TraceStoreConfig::default());
996        let id1 = store.reserve_id();
997        let id2 = store.reserve_id();
998        let id3 = store.reserve_id();
999        assert_eq!(id1, 1);
1000        assert_eq!(id2, 2);
1001        assert_eq!(id3, 3);
1002        // No entries persisted by reserve_id — it only allocates the
1003        // sequence number.
1004        assert_eq!(store.len(), 0);
1005        assert_eq!(store.total_recorded(), 0);
1006    }
1007
1008    #[test]
1009    fn reserve_id_disabled_store_returns_zero() {
1010        let mut store = TraceStore::new(TraceStoreConfig::disabled());
1011        let id = store.reserve_id();
1012        assert_eq!(id, 0);
1013    }
1014
1015    #[test]
1016    fn record_with_id_persists_under_reserved_id() {
1017        let mut store = TraceStore::new(TraceStoreConfig::default());
1018        let id = store.reserve_id();
1019        store.record_with_id(sample_trace("ReservedFlow", TraceStatus::Success), id);
1020        let entry = store.get(id).expect("entry must exist under reserved id");
1021        assert_eq!(entry.id, id);
1022        assert_eq!(entry.flow_name, "ReservedFlow");
1023        assert_eq!(store.len(), 1);
1024        assert_eq!(store.total_recorded(), 1);
1025    }
1026
1027    #[test]
1028    fn record_with_id_does_not_advance_next_id() {
1029        let mut store = TraceStore::new(TraceStoreConfig::default());
1030        let reserved = store.reserve_id();
1031        store.record_with_id(sample_trace("X", TraceStatus::Success), reserved);
1032        // record() after record_with_id() should produce a NEW id
1033        // (the next sequence number), not reuse the reserved id.
1034        let next = store.record(sample_trace("Y", TraceStatus::Success));
1035        assert!(
1036            next > reserved,
1037            "record after record_with_id must produce a strictly greater id"
1038        );
1039    }
1040
1041    #[test]
1042    fn record_with_id_disabled_store_is_noop() {
1043        let mut store = TraceStore::new(TraceStoreConfig::disabled());
1044        let id = store.reserve_id();
1045        store.record_with_id(sample_trace("X", TraceStatus::Success), id);
1046        assert_eq!(store.len(), 0);
1047        assert_eq!(store.total_recorded(), 0);
1048    }
1049
1050    #[test]
1051    fn reserve_then_record_preserves_audit_correlation() {
1052        // Live-streaming pattern from execute_sse_handler: reserve up
1053        // front so wire events carry the trace_id, then record the
1054        // final entry once execution completes.
1055        let mut store = TraceStore::new(TraceStoreConfig::default());
1056        let wire_trace_id = store.reserve_id();
1057
1058        // …flow executes; wire emits axon.token + axon.complete with
1059        // trace_id = wire_trace_id…
1060
1061        let mut entry = sample_trace("LiveStreaming", TraceStatus::Success);
1062        entry.steps_executed = 5;
1063        entry.tokens_output = 42;
1064        store.record_with_id(entry, wire_trace_id);
1065
1066        let recovered = store.get(wire_trace_id).expect("audit lookup must succeed");
1067        assert_eq!(recovered.id, wire_trace_id);
1068        assert_eq!(recovered.steps_executed, 5);
1069        assert_eq!(recovered.tokens_output, 42);
1070    }
1071
1072    #[test]
1073    fn filter_by_flow_name() {
1074        let mut store = TraceStore::new(TraceStoreConfig::default());
1075        store.record(sample_trace("Alpha", TraceStatus::Success));
1076        store.record(sample_trace("Beta", TraceStatus::Success));
1077        store.record(sample_trace("Alpha", TraceStatus::Failed));
1078
1079        let filter = TraceFilter { flow_name: Some("Alpha".into()), ..Default::default() };
1080        let result = store.recent(10, Some(&filter));
1081        assert_eq!(result.len(), 2);
1082    }
1083
1084    #[test]
1085    fn filter_by_status() {
1086        let mut store = TraceStore::new(TraceStoreConfig::default());
1087        store.record(sample_trace("A", TraceStatus::Success));
1088        store.record(sample_trace("B", TraceStatus::Failed));
1089        store.record(sample_trace("C", TraceStatus::Success));
1090
1091        let filter = TraceFilter { status: Some("failed".into()), ..Default::default() };
1092        let result = store.recent(10, Some(&filter));
1093        assert_eq!(result.len(), 1);
1094        assert_eq!(result[0].flow_name, "B");
1095    }
1096
1097    #[test]
1098    fn filter_by_has_errors() {
1099        let mut store = TraceStore::new(TraceStoreConfig::default());
1100
1101        let mut t1 = sample_trace("A", TraceStatus::Success);
1102        t1.errors = 0;
1103        store.record(t1);
1104
1105        let mut t2 = sample_trace("B", TraceStatus::Failed);
1106        t2.errors = 2;
1107        store.record(t2);
1108
1109        let filter = TraceFilter { has_errors: Some(true), ..Default::default() };
1110        let result = store.recent(10, Some(&filter));
1111        assert_eq!(result.len(), 1);
1112        assert_eq!(result[0].flow_name, "B");
1113    }
1114
1115    #[test]
1116    fn stats_computation() {
1117        let mut store = TraceStore::new(TraceStoreConfig::default());
1118
1119        let mut t1 = sample_trace("Alpha", TraceStatus::Success);
1120        t1.latency_ms = 100;
1121        t1.tokens_input = 200;
1122        t1.tokens_output = 100;
1123        store.record(t1);
1124
1125        let mut t2 = sample_trace("Alpha", TraceStatus::Success);
1126        t2.latency_ms = 300;
1127        t2.tokens_input = 400;
1128        t2.tokens_output = 200;
1129        t2.errors = 1;
1130        store.record(t2);
1131
1132        let mut t3 = sample_trace("Beta", TraceStatus::Failed);
1133        t3.latency_ms = 500;
1134        t3.errors = 2;
1135        store.record(t3);
1136
1137        let stats = store.stats();
1138        assert_eq!(stats.total_recorded, 3);
1139        assert_eq!(stats.buffered, 3);
1140        assert_eq!(stats.avg_latency_ms, 300); // (100+300+500)/3
1141        assert_eq!(stats.max_latency_ms, 500);
1142        assert_eq!(stats.total_tokens_input, 700); // 200+400+100
1143        assert_eq!(stats.total_errors, 3);
1144        assert_eq!(stats.top_flows[0].0, "Alpha");
1145        assert_eq!(stats.top_flows[0].1, 2);
1146    }
1147
1148    #[test]
1149    fn stats_empty_store() {
1150        let store = TraceStore::new(TraceStoreConfig::default());
1151        let stats = store.stats();
1152        assert_eq!(stats.total_recorded, 0);
1153        assert_eq!(stats.avg_latency_ms, 0);
1154        assert_eq!(stats.max_latency_ms, 0);
1155    }
1156
1157    #[test]
1158    fn trace_status_serde() {
1159        assert_eq!(TraceStatus::Success.as_str(), "success");
1160        assert_eq!(TraceStatus::Failed.as_str(), "failed");
1161        assert_eq!(TraceStatus::Partial.as_str(), "partial");
1162        assert_eq!(TraceStatus::Timeout.as_str(), "timeout");
1163
1164        let json = serde_json::to_value(TraceStatus::Success).unwrap();
1165        assert_eq!(json, "success");
1166    }
1167
1168    #[test]
1169    fn trace_entry_serializable() {
1170        let t = sample_trace("TestFlow", TraceStatus::Success);
1171        let json = serde_json::to_value(&t).unwrap();
1172        assert_eq!(json["flow_name"], "TestFlow");
1173        assert_eq!(json["status"], "success");
1174        assert_eq!(json["steps_executed"], 3);
1175        assert!(json["events"].is_array());
1176    }
1177
1178    #[test]
1179    fn stats_serializable() {
1180        let store = TraceStore::new(TraceStoreConfig::default());
1181        let stats = store.stats();
1182        let json = serde_json::to_value(&stats).unwrap();
1183        assert_eq!(json["total_recorded"], 0);
1184        assert_eq!(json["buffered"], 0);
1185        assert!(json["top_flows"].is_array());
1186    }
1187
1188    #[test]
1189    fn config_serializable() {
1190        let cfg = TraceStoreConfig::default();
1191        let json = serde_json::to_value(&cfg).unwrap();
1192        assert_eq!(json["capacity"], 500);
1193        assert_eq!(json["enabled"], true);
1194        assert_eq!(json["max_events_per_trace"], 200);
1195    }
1196
1197    #[test]
1198    fn event_truncation() {
1199        let config = TraceStoreConfig { capacity: 10, enabled: true, max_events_per_trace: 3, max_age_secs: 0 };
1200        let mut store = TraceStore::new(config);
1201
1202        let mut t = sample_trace("X", TraceStatus::Success);
1203        for i in 0..10 {
1204            t.events.push(TraceEvent {
1205                event_type: "test".into(),
1206                offset_ms: i,
1207                step_name: "s".into(),
1208                detail: "d".into(),
1209            });
1210        }
1211        let id = store.record(t);
1212        let entry = store.get(id).unwrap();
1213        assert_eq!(entry.events.len(), 3);
1214    }
1215
1216    #[test]
1217    fn clear_preserves_total() {
1218        let mut store = TraceStore::new(TraceStoreConfig::default());
1219        store.record(sample_trace("A", TraceStatus::Success));
1220        store.record(sample_trace("B", TraceStatus::Success));
1221        assert_eq!(store.len(), 2);
1222
1223        store.clear();
1224        assert_eq!(store.len(), 0);
1225        assert!(store.is_empty());
1226        assert_eq!(store.total_recorded(), 2);
1227    }
1228
1229    // ── Export tests ────────────────────────────────────────────────────
1230
1231    #[test]
1232    fn export_format_parsing() {
1233        assert_eq!(ExportFormat::from_str("jsonl"), ExportFormat::JsonLines);
1234        assert_eq!(ExportFormat::from_str("JSONL"), ExportFormat::JsonLines);
1235        assert_eq!(ExportFormat::from_str("json"), ExportFormat::JsonLines);
1236        assert_eq!(ExportFormat::from_str("csv"), ExportFormat::Csv);
1237        assert_eq!(ExportFormat::from_str("CSV"), ExportFormat::Csv);
1238        assert_eq!(ExportFormat::from_str("prometheus"), ExportFormat::Prometheus);
1239        assert_eq!(ExportFormat::from_str("prom"), ExportFormat::Prometheus);
1240        assert_eq!(ExportFormat::from_str("unknown"), ExportFormat::JsonLines);
1241    }
1242
1243    #[test]
1244    fn export_format_content_type() {
1245        assert_eq!(ExportFormat::JsonLines.content_type(), "application/x-ndjson");
1246        assert_eq!(ExportFormat::Csv.content_type(), "text/csv");
1247        assert!(ExportFormat::Prometheus.content_type().starts_with("text/plain"));
1248    }
1249
1250    #[test]
1251    fn entry_to_span_conversion() {
1252        let mut store = TraceStore::new(TraceStoreConfig::default());
1253        let id = store.record(sample_trace("FlowX", TraceStatus::Success));
1254        let entry = store.get(id).unwrap();
1255
1256        let span = entry_to_span(entry);
1257        assert_eq!(span.trace_id, format!("axt-{}", id));
1258        assert_eq!(span.name, "FlowX");
1259        assert_eq!(span.status, "success");
1260        assert_eq!(span.duration_ms, 150);
1261        assert_eq!(span.resource.service_name, "axon-server");
1262        assert_eq!(span.resource.backend, "anthropic");
1263        assert_eq!(span.resource.client_key, "token_a");
1264        assert_eq!(span.attributes.steps_executed, 3);
1265        assert_eq!(span.attributes.tokens_input, 100);
1266        assert_eq!(span.attributes.tokens_output, 50);
1267        assert_eq!(span.attributes.tokens_total, 150);
1268        assert_eq!(span.attributes.anchor_checks, 2);
1269        assert_eq!(span.events.len(), 1);
1270        assert_eq!(span.events[0].name, "step_start");
1271    }
1272
1273    #[test]
1274    fn export_jsonl_format() {
1275        let mut store = TraceStore::new(TraceStoreConfig::default());
1276        store.record(sample_trace("A", TraceStatus::Success));
1277        store.record(sample_trace("B", TraceStatus::Failed));
1278        let entries = store.recent(10, None);
1279
1280        let jsonl = export_jsonl(&entries);
1281        let lines: Vec<&str> = jsonl.lines().collect();
1282        assert_eq!(lines.len(), 2);
1283
1284        // Each line is valid JSON
1285        let first: serde_json::Value = serde_json::from_str(lines[0]).unwrap();
1286        assert_eq!(first["name"], "B"); // newest first
1287        assert_eq!(first["status"], "failed");
1288        assert!(first["trace_id"].as_str().unwrap().starts_with("axt-"));
1289        assert!(first["resource"]["service_name"].as_str().unwrap() == "axon-server");
1290
1291        let second: serde_json::Value = serde_json::from_str(lines[1]).unwrap();
1292        assert_eq!(second["name"], "A");
1293        assert_eq!(second["status"], "success");
1294    }
1295
1296    #[test]
1297    fn export_csv_format() {
1298        let mut store = TraceStore::new(TraceStoreConfig::default());
1299        store.record(sample_trace("FlowA", TraceStatus::Success));
1300        store.record(sample_trace("FlowB", TraceStatus::Failed));
1301        let entries = store.recent(10, None);
1302
1303        let csv = export_csv(&entries);
1304        let lines: Vec<&str> = csv.lines().collect();
1305        assert_eq!(lines.len(), 3); // header + 2 rows
1306
1307        // Header
1308        assert!(lines[0].starts_with("trace_id,"));
1309        assert!(lines[0].contains("flow_name"));
1310        assert!(lines[0].contains("latency_ms"));
1311        assert!(lines[0].contains("event_count"));
1312
1313        // Data rows
1314        assert!(lines[1].contains("FlowB")); // newest first
1315        assert!(lines[1].contains("failed"));
1316        assert!(lines[2].contains("FlowA"));
1317        assert!(lines[2].contains("success"));
1318    }
1319
1320    #[test]
1321    fn export_prometheus_format() {
1322        let mut store = TraceStore::new(TraceStoreConfig::default());
1323        let mut t1 = sample_trace("A", TraceStatus::Success);
1324        t1.latency_ms = 200;
1325        t1.errors = 0;
1326        store.record(t1);
1327        let mut t2 = sample_trace("B", TraceStatus::Failed);
1328        t2.latency_ms = 400;
1329        t2.errors = 2;
1330        store.record(t2);
1331        let entries = store.recent(10, None);
1332
1333        let prom = export_prometheus(&entries);
1334        assert!(prom.contains("axon_trace_export_count 2"));
1335        assert!(prom.contains("axon_trace_export_latency_avg_ms 300")); // (200+400)/2
1336        assert!(prom.contains("axon_trace_export_latency_max_ms 400"));
1337        assert!(prom.contains("axon_trace_export_errors_total 2"));
1338        assert!(prom.contains("axon_trace_export_by_status{status=\"success\"} 1"));
1339        assert!(prom.contains("axon_trace_export_by_status{status=\"failed\"} 1"));
1340        assert!(prom.contains("# HELP axon_trace_export_count"));
1341        assert!(prom.contains("# TYPE axon_trace_export_count gauge"));
1342    }
1343
1344    #[test]
1345    fn export_empty_traces() {
1346        let entries: Vec<&TraceEntry> = vec![];
1347        let jsonl = export_jsonl(&entries);
1348        assert!(jsonl.is_empty());
1349
1350        let csv = export_csv(&entries);
1351        let lines: Vec<&str> = csv.lines().collect();
1352        assert_eq!(lines.len(), 1); // header only
1353
1354        let prom = export_prometheus(&entries);
1355        assert!(prom.contains("axon_trace_export_count 0"));
1356        assert!(prom.contains("axon_trace_export_latency_avg_ms 0"));
1357    }
1358
1359    #[test]
1360    fn span_serializable() {
1361        let mut store = TraceStore::new(TraceStoreConfig::default());
1362        store.record(sample_trace("Test", TraceStatus::Success));
1363        let entry = store.get(1).unwrap();
1364        let span = entry_to_span(entry);
1365        let json = serde_json::to_value(&span).unwrap();
1366        assert!(json["trace_id"].is_string());
1367        assert!(json["resource"].is_object());
1368        assert!(json["attributes"].is_object());
1369        assert!(json["events"].is_array());
1370    }
1371}