1use mockforge_tracing::exporter::ExporterType;
4use reqwest::Client;
5use serde::{Deserialize, Serialize};
6use std::time::{Duration, SystemTime, UNIX_EPOCH};
7use thiserror::Error;
8
9#[derive(Error, Debug)]
11pub enum TraceCollectorError {
12 #[error("HTTP request failed: {0}")]
13 HttpError(#[from] reqwest::Error),
14
15 #[error("JSON parsing failed: {0}")]
16 JsonError(#[from] serde_json::Error),
17
18 #[error("Invalid configuration: {0}")]
19 ConfigError(String),
20
21 #[error("Trace backend unavailable: {0}")]
22 BackendUnavailable(String),
23}
24
25#[derive(Debug, Clone)]
27pub struct TraceCollectorConfig {
28 pub backend_type: ExporterType,
30 pub jaeger_endpoint: Option<String>,
32 pub otlp_endpoint: Option<String>,
34 pub timeout: Duration,
36 pub max_traces: usize,
38}
39
40impl Default for TraceCollectorConfig {
41 fn default() -> Self {
42 Self {
43 backend_type: ExporterType::Jaeger,
44 jaeger_endpoint: Some("http://localhost:16686".to_string()), otlp_endpoint: None,
46 timeout: Duration::from_secs(30),
47 max_traces: 100,
48 }
49 }
50}
51
52#[derive(Debug, Clone, Serialize, Deserialize)]
54pub struct CollectedTrace {
55 pub trace_id: String,
56 pub span_id: String,
57 pub parent_span_id: Option<String>,
58 pub name: String,
59 pub start_time: String,
60 pub end_time: String,
61 pub duration_ms: u64,
62 pub attributes: std::collections::HashMap<String, serde_json::Value>,
63}
64
65pub struct TraceCollector {
67 client: Client,
68 config: TraceCollectorConfig,
69}
70
71impl TraceCollector {
72 pub fn new(config: TraceCollectorConfig) -> Self {
74 let client = Client::builder()
75 .timeout(config.timeout)
76 .build()
77 .expect("Failed to create HTTP client");
78
79 Self { client, config }
80 }
81
82 pub async fn collect_traces(&self) -> Result<Vec<CollectedTrace>, TraceCollectorError> {
84 match self.config.backend_type {
85 ExporterType::Jaeger => self.collect_from_jaeger().await,
86 ExporterType::Otlp => self.collect_from_otlp().await,
87 }
88 }
89
90 pub async fn get_trace_by_id(
92 &self,
93 trace_id: &str,
94 ) -> Result<Vec<CollectedTrace>, TraceCollectorError> {
95 match self.config.backend_type {
96 ExporterType::Jaeger => self.get_trace_from_jaeger(trace_id).await,
97 ExporterType::Otlp => self.get_trace_from_otlp(trace_id).await,
98 }
99 }
100
101 async fn collect_from_jaeger(&self) -> Result<Vec<CollectedTrace>, TraceCollectorError> {
103 let endpoint = self.config.jaeger_endpoint.as_ref().ok_or_else(|| {
104 TraceCollectorError::ConfigError("Jaeger endpoint not configured".to_string())
105 })?;
106
107 let url = format!("{}/api/traces", endpoint);
109
110 let start_time = SystemTime::now()
112 .duration_since(SystemTime::UNIX_EPOCH)
113 .unwrap_or_default()
114 .as_millis()
115 - 3600000; let params = [
118 ("start", start_time.to_string()),
119 ("limit", self.config.max_traces.to_string()),
120 ];
121
122 let response = self.client.get(&url).query(¶ms).send().await?;
123
124 if !response.status().is_success() {
125 return Err(TraceCollectorError::BackendUnavailable(format!(
126 "Jaeger API returned status: {}",
127 response.status()
128 )));
129 }
130
131 let jaeger_response: JaegerTracesResponse = response.json().await?;
132
133 let mut traces = Vec::new();
135 for trace_data in jaeger_response.data {
136 for span in trace_data.spans {
137 let trace = CollectedTrace {
138 trace_id: span.trace_id,
139 span_id: span.span_id,
140 parent_span_id: span.parent_span_id,
141 name: span.operation_name,
142 start_time: format!(
143 "{:?}",
144 UNIX_EPOCH + Duration::from_micros(span.start_time)
145 ),
146 end_time: {
147 let end_micros = span.start_time.saturating_add(span.duration);
148 format!("{:?}", UNIX_EPOCH + Duration::from_micros(end_micros))
149 },
150 duration_ms: span.duration / 1000, attributes: {
152 let mut attrs = std::collections::HashMap::new();
153 for tag in &span.tags {
154 attrs.insert(tag.key.clone(), tag.value.clone());
155 }
156 attrs
157 },
158 };
159 traces.push(trace);
160 }
161 }
162
163 Ok(traces)
164 }
165
166 async fn get_trace_from_jaeger(
168 &self,
169 trace_id: &str,
170 ) -> Result<Vec<CollectedTrace>, TraceCollectorError> {
171 let endpoint = self.config.jaeger_endpoint.as_ref().ok_or_else(|| {
172 TraceCollectorError::ConfigError("Jaeger endpoint not configured".to_string())
173 })?;
174
175 let url = format!("{}/api/traces/{}", endpoint, trace_id);
177
178 let response = self.client.get(&url).send().await?;
179
180 if !response.status().is_success() {
181 return Err(TraceCollectorError::BackendUnavailable(format!(
182 "Jaeger API returned status: {}",
183 response.status()
184 )));
185 }
186
187 let jaeger_response: JaegerTracesResponse = response.json().await?;
188
189 let mut traces = Vec::new();
191 for trace_data in jaeger_response.data {
192 for span in trace_data.spans {
193 let trace = CollectedTrace {
194 trace_id: span.trace_id,
195 span_id: span.span_id,
196 parent_span_id: span.parent_span_id,
197 name: span.operation_name,
198 start_time: format!(
199 "{:?}",
200 UNIX_EPOCH + Duration::from_micros(span.start_time)
201 ),
202 end_time: {
203 let end_micros = span.start_time.saturating_add(span.duration);
204 format!("{:?}", UNIX_EPOCH + Duration::from_micros(end_micros))
205 },
206 duration_ms: span.duration / 1000, attributes: {
208 let mut attrs = std::collections::HashMap::new();
209 for tag in &span.tags {
210 attrs.insert(tag.key.clone(), tag.value.clone());
211 }
212 attrs
213 },
214 };
215 traces.push(trace);
216 }
217 }
218
219 Ok(traces)
220 }
221
222 async fn collect_from_otlp(&self) -> Result<Vec<CollectedTrace>, TraceCollectorError> {
224 Ok(Vec::new())
228 }
229
230 async fn get_trace_from_otlp(
232 &self,
233 _trace_id: &str,
234 ) -> Result<Vec<CollectedTrace>, TraceCollectorError> {
235 Ok(Vec::new())
239 }
240}
241
242#[derive(Deserialize)]
244struct JaegerTracesResponse {
245 data: Vec<JaegerTraceData>,
246}
247
248#[derive(Deserialize)]
249struct JaegerTraceData {
250 spans: Vec<JaegerSpan>,
251}
252
253#[derive(Deserialize)]
254struct JaegerSpan {
255 trace_id: String,
256 span_id: String,
257 parent_span_id: Option<String>,
258 operation_name: String,
259 start_time: u64, duration: u64, tags: Vec<JaegerTag>,
262}
263
264#[derive(Deserialize, Serialize)]
265struct JaegerTag {
266 key: String,
267 value: serde_json::Value,
268}
269
270#[cfg(test)]
271mod tests {
272 use super::*;
273
274 #[test]
275 fn test_default_config() {
276 let config = TraceCollectorConfig::default();
277 assert_eq!(config.backend_type, ExporterType::Jaeger);
278 assert_eq!(config.max_traces, 100);
279 assert_eq!(config.timeout, Duration::from_secs(30));
280 }
281
282 #[tokio::test]
283 async fn test_collect_traces_jaeger_unavailable() {
284 let config = TraceCollectorConfig {
285 backend_type: ExporterType::Jaeger,
286 jaeger_endpoint: Some("http://nonexistent:16686".to_string()),
287 ..Default::default()
288 };
289
290 let collector = TraceCollector::new(config);
291 let result = collector.collect_traces().await;
292
293 assert!(result.is_err());
295 }
296}