mockforge_chaos/
trace_collector.rs

1//! Trace collector for querying traces from Jaeger and OTLP backends
2
3use mockforge_tracing::exporter::ExporterType;
4use reqwest::Client;
5use serde::{Deserialize, Serialize};
6use std::time::{Duration, SystemTime, UNIX_EPOCH};
7use thiserror::Error;
8
9/// Trace collector errors
10#[derive(Error, Debug)]
11pub enum TraceCollectorError {
12    #[error("HTTP request failed: {0}")]
13    HttpError(#[from] reqwest::Error),
14
15    #[error("JSON parsing failed: {0}")]
16    JsonError(#[from] serde_json::Error),
17
18    #[error("Invalid configuration: {0}")]
19    ConfigError(String),
20
21    #[error("Trace backend unavailable: {0}")]
22    BackendUnavailable(String),
23}
24
25/// Configuration for trace collection
26#[derive(Debug, Clone)]
27pub struct TraceCollectorConfig {
28    /// Backend type
29    pub backend_type: ExporterType,
30    /// Jaeger endpoint (for Jaeger backend)
31    pub jaeger_endpoint: Option<String>,
32    /// OTLP endpoint (for OTLP backend)
33    pub otlp_endpoint: Option<String>,
34    /// Query timeout
35    pub timeout: Duration,
36    /// Maximum number of traces to return
37    pub max_traces: usize,
38}
39
40impl Default for TraceCollectorConfig {
41    fn default() -> Self {
42        Self {
43            backend_type: ExporterType::Jaeger,
44            jaeger_endpoint: Some("http://localhost:16686".to_string()), // Jaeger UI endpoint
45            otlp_endpoint: None,
46            timeout: Duration::from_secs(30),
47            max_traces: 100,
48        }
49    }
50}
51
52/// Collected trace data
53#[derive(Debug, Clone, Serialize, Deserialize)]
54pub struct CollectedTrace {
55    pub trace_id: String,
56    pub span_id: String,
57    pub parent_span_id: Option<String>,
58    pub name: String,
59    pub start_time: String,
60    pub end_time: String,
61    pub duration_ms: u64,
62    pub attributes: std::collections::HashMap<String, serde_json::Value>,
63}
64
65/// Trace collector for querying backend systems
66pub struct TraceCollector {
67    client: Client,
68    config: TraceCollectorConfig,
69}
70
71impl TraceCollector {
72    /// Create a new trace collector
73    pub fn new(config: TraceCollectorConfig) -> Self {
74        let client = Client::builder()
75            .timeout(config.timeout)
76            .build()
77            .expect("Failed to create HTTP client");
78
79        Self { client, config }
80    }
81
82    /// Collect traces from the configured backend
83    pub async fn collect_traces(&self) -> Result<Vec<CollectedTrace>, TraceCollectorError> {
84        match self.config.backend_type {
85            ExporterType::Jaeger => self.collect_from_jaeger().await,
86            ExporterType::Otlp => self.collect_from_otlp().await,
87        }
88    }
89
90    /// Get a specific trace by ID from the configured backend
91    pub async fn get_trace_by_id(
92        &self,
93        trace_id: &str,
94    ) -> Result<Vec<CollectedTrace>, TraceCollectorError> {
95        match self.config.backend_type {
96            ExporterType::Jaeger => self.get_trace_from_jaeger(trace_id).await,
97            ExporterType::Otlp => self.get_trace_from_otlp(trace_id).await,
98        }
99    }
100
101    /// Collect traces from Jaeger backend
102    async fn collect_from_jaeger(&self) -> Result<Vec<CollectedTrace>, TraceCollectorError> {
103        let endpoint = self.config.jaeger_endpoint.as_ref().ok_or_else(|| {
104            TraceCollectorError::ConfigError("Jaeger endpoint not configured".to_string())
105        })?;
106
107        // Query recent traces from Jaeger API
108        let url = format!("{}/api/traces", endpoint);
109
110        // For now, query traces from the last hour
111        let start_time = SystemTime::now()
112            .duration_since(SystemTime::UNIX_EPOCH)
113            .unwrap_or_default()
114            .as_millis()
115            - 3600000; // 1 hour ago
116
117        let params = [
118            ("start", start_time.to_string()),
119            ("limit", self.config.max_traces.to_string()),
120        ];
121
122        let response = self.client.get(&url).query(&params).send().await?;
123
124        if !response.status().is_success() {
125            return Err(TraceCollectorError::BackendUnavailable(format!(
126                "Jaeger API returned status: {}",
127                response.status()
128            )));
129        }
130
131        let jaeger_response: JaegerTracesResponse = response.json().await?;
132
133        // Convert Jaeger format to our format
134        let mut traces = Vec::new();
135        for trace_data in jaeger_response.data {
136            for span in trace_data.spans {
137                let trace = CollectedTrace {
138                    trace_id: span.trace_id,
139                    span_id: span.span_id,
140                    parent_span_id: span.parent_span_id,
141                    name: span.operation_name,
142                    start_time: format!(
143                        "{:?}",
144                        UNIX_EPOCH + Duration::from_micros(span.start_time)
145                    ),
146                    end_time: {
147                        let end_micros = span.start_time.saturating_add(span.duration);
148                        format!("{:?}", UNIX_EPOCH + Duration::from_micros(end_micros))
149                    },
150                    duration_ms: span.duration / 1000, // Convert microseconds to milliseconds
151                    attributes: {
152                        let mut attrs = std::collections::HashMap::new();
153                        for tag in &span.tags {
154                            attrs.insert(tag.key.clone(), tag.value.clone());
155                        }
156                        attrs
157                    },
158                };
159                traces.push(trace);
160            }
161        }
162
163        Ok(traces)
164    }
165
166    /// Get a specific trace from Jaeger backend
167    async fn get_trace_from_jaeger(
168        &self,
169        trace_id: &str,
170    ) -> Result<Vec<CollectedTrace>, TraceCollectorError> {
171        let endpoint = self.config.jaeger_endpoint.as_ref().ok_or_else(|| {
172            TraceCollectorError::ConfigError("Jaeger endpoint not configured".to_string())
173        })?;
174
175        // Query specific trace from Jaeger API
176        let url = format!("{}/api/traces/{}", endpoint, trace_id);
177
178        let response = self.client.get(&url).send().await?;
179
180        if !response.status().is_success() {
181            return Err(TraceCollectorError::BackendUnavailable(format!(
182                "Jaeger API returned status: {}",
183                response.status()
184            )));
185        }
186
187        let jaeger_response: JaegerTracesResponse = response.json().await?;
188
189        // Convert Jaeger format to our format
190        let mut traces = Vec::new();
191        for trace_data in jaeger_response.data {
192            for span in trace_data.spans {
193                let trace = CollectedTrace {
194                    trace_id: span.trace_id,
195                    span_id: span.span_id,
196                    parent_span_id: span.parent_span_id,
197                    name: span.operation_name,
198                    start_time: format!(
199                        "{:?}",
200                        UNIX_EPOCH + Duration::from_micros(span.start_time)
201                    ),
202                    end_time: {
203                        let end_micros = span.start_time.saturating_add(span.duration);
204                        format!("{:?}", UNIX_EPOCH + Duration::from_micros(end_micros))
205                    },
206                    duration_ms: span.duration / 1000, // Convert microseconds to milliseconds
207                    attributes: {
208                        let mut attrs = std::collections::HashMap::new();
209                        for tag in &span.tags {
210                            attrs.insert(tag.key.clone(), tag.value.clone());
211                        }
212                        attrs
213                    },
214                };
215                traces.push(trace);
216            }
217        }
218
219        Ok(traces)
220    }
221
222    /// Collect traces from OTLP backend (placeholder - OTLP doesn't typically provide query API)
223    async fn collect_from_otlp(&self) -> Result<Vec<CollectedTrace>, TraceCollectorError> {
224        // OTLP is primarily for exporting, not querying
225        // In a real implementation, you might need to query a separate trace storage backend
226        // For now, return empty results
227        Ok(Vec::new())
228    }
229
230    /// Get a specific trace from OTLP backend (placeholder)
231    async fn get_trace_from_otlp(
232        &self,
233        _trace_id: &str,
234    ) -> Result<Vec<CollectedTrace>, TraceCollectorError> {
235        // OTLP is primarily for exporting, not querying
236        // In a real implementation, you might need to query a separate trace storage backend
237        // For now, return empty results
238        Ok(Vec::new())
239    }
240}
241
242/// Jaeger API response structures
243#[derive(Deserialize)]
244struct JaegerTracesResponse {
245    data: Vec<JaegerTraceData>,
246}
247
248#[derive(Deserialize)]
249struct JaegerTraceData {
250    spans: Vec<JaegerSpan>,
251}
252
253#[derive(Deserialize)]
254struct JaegerSpan {
255    trace_id: String,
256    span_id: String,
257    parent_span_id: Option<String>,
258    operation_name: String,
259    start_time: u64, // microseconds since epoch
260    duration: u64,   // microseconds
261    tags: Vec<JaegerTag>,
262}
263
264#[derive(Deserialize, Serialize)]
265struct JaegerTag {
266    key: String,
267    value: serde_json::Value,
268}
269
270#[cfg(test)]
271mod tests {
272    use super::*;
273
274    #[test]
275    fn test_default_config() {
276        let config = TraceCollectorConfig::default();
277        assert_eq!(config.backend_type, ExporterType::Jaeger);
278        assert_eq!(config.max_traces, 100);
279        assert_eq!(config.timeout, Duration::from_secs(30));
280    }
281
282    #[tokio::test]
283    async fn test_collect_traces_jaeger_unavailable() {
284        let config = TraceCollectorConfig {
285            backend_type: ExporterType::Jaeger,
286            jaeger_endpoint: Some("http://nonexistent:16686".to_string()),
287            ..Default::default()
288        };
289
290        let collector = TraceCollector::new(config);
291        let result = collector.collect_traces().await;
292
293        // Should fail due to unreachable endpoint
294        assert!(result.is_err());
295    }
296}