use mockforge_tracing::exporter::ExporterType;
use reqwest::Client;
use serde::{Deserialize, Serialize};
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use thiserror::Error;
#[derive(Error, Debug)]
pub enum TraceCollectorError {
#[error("HTTP request failed: {0}")]
HttpError(#[from] reqwest::Error),
#[error("JSON parsing failed: {0}")]
JsonError(#[from] serde_json::Error),
#[error("Invalid configuration: {0}")]
ConfigError(String),
#[error("Trace backend unavailable: {0}")]
BackendUnavailable(String),
}
#[derive(Debug, Clone)]
pub struct TraceCollectorConfig {
pub backend_type: ExporterType,
pub jaeger_endpoint: Option<String>,
pub otlp_endpoint: Option<String>,
pub timeout: Duration,
pub max_traces: usize,
}
impl Default for TraceCollectorConfig {
fn default() -> Self {
Self {
backend_type: ExporterType::Jaeger,
jaeger_endpoint: Some("http://localhost:16686".to_string()), otlp_endpoint: None,
timeout: Duration::from_secs(30),
max_traces: 100,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CollectedTrace {
pub trace_id: String,
pub span_id: String,
pub parent_span_id: Option<String>,
pub name: String,
pub start_time: String,
pub end_time: String,
pub duration_ms: u64,
pub attributes: std::collections::HashMap<String, serde_json::Value>,
}
pub struct TraceCollector {
client: Client,
config: TraceCollectorConfig,
}
impl TraceCollector {
pub fn new(config: TraceCollectorConfig) -> Self {
let client = Client::builder()
.timeout(config.timeout)
.build()
.expect("Failed to create HTTP client");
Self { client, config }
}
pub async fn collect_traces(&self) -> Result<Vec<CollectedTrace>, TraceCollectorError> {
match self.config.backend_type {
ExporterType::Jaeger => self.collect_from_jaeger().await,
ExporterType::Otlp => self.collect_from_otlp().await,
}
}
pub async fn get_trace_by_id(
&self,
trace_id: &str,
) -> Result<Vec<CollectedTrace>, TraceCollectorError> {
match self.config.backend_type {
ExporterType::Jaeger => self.get_trace_from_jaeger(trace_id).await,
ExporterType::Otlp => self.get_trace_from_otlp(trace_id).await,
}
}
async fn collect_from_jaeger(&self) -> Result<Vec<CollectedTrace>, TraceCollectorError> {
let endpoint = self.config.jaeger_endpoint.as_ref().ok_or_else(|| {
TraceCollectorError::ConfigError("Jaeger endpoint not configured".to_string())
})?;
let url = format!("{}/api/traces", endpoint);
let start_time = SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap_or_default()
.as_millis()
- 3600000;
let params = [
("start", start_time.to_string()),
("limit", self.config.max_traces.to_string()),
];
let response = self.client.get(&url).query(¶ms).send().await?;
if !response.status().is_success() {
return Err(TraceCollectorError::BackendUnavailable(format!(
"Jaeger API returned status: {}",
response.status()
)));
}
let jaeger_response: JaegerTracesResponse = response.json().await?;
let mut traces = Vec::new();
for trace_data in jaeger_response.data {
for span in trace_data.spans {
let trace = CollectedTrace {
trace_id: span.trace_id,
span_id: span.span_id,
parent_span_id: span.parent_span_id,
name: span.operation_name,
start_time: format!(
"{:?}",
UNIX_EPOCH + Duration::from_micros(span.start_time)
),
end_time: {
let end_micros = span.start_time.saturating_add(span.duration);
format!("{:?}", UNIX_EPOCH + Duration::from_micros(end_micros))
},
duration_ms: span.duration / 1000, attributes: {
let mut attrs = std::collections::HashMap::new();
for tag in &span.tags {
attrs.insert(tag.key.clone(), tag.value.clone());
}
attrs
},
};
traces.push(trace);
}
}
Ok(traces)
}
async fn get_trace_from_jaeger(
&self,
trace_id: &str,
) -> Result<Vec<CollectedTrace>, TraceCollectorError> {
let endpoint = self.config.jaeger_endpoint.as_ref().ok_or_else(|| {
TraceCollectorError::ConfigError("Jaeger endpoint not configured".to_string())
})?;
let url = format!("{}/api/traces/{}", endpoint, trace_id);
let response = self.client.get(&url).send().await?;
if !response.status().is_success() {
return Err(TraceCollectorError::BackendUnavailable(format!(
"Jaeger API returned status: {}",
response.status()
)));
}
let jaeger_response: JaegerTracesResponse = response.json().await?;
let mut traces = Vec::new();
for trace_data in jaeger_response.data {
for span in trace_data.spans {
let trace = CollectedTrace {
trace_id: span.trace_id,
span_id: span.span_id,
parent_span_id: span.parent_span_id,
name: span.operation_name,
start_time: format!(
"{:?}",
UNIX_EPOCH + Duration::from_micros(span.start_time)
),
end_time: {
let end_micros = span.start_time.saturating_add(span.duration);
format!("{:?}", UNIX_EPOCH + Duration::from_micros(end_micros))
},
duration_ms: span.duration / 1000, attributes: {
let mut attrs = std::collections::HashMap::new();
for tag in &span.tags {
attrs.insert(tag.key.clone(), tag.value.clone());
}
attrs
},
};
traces.push(trace);
}
}
Ok(traces)
}
async fn collect_from_otlp(&self) -> Result<Vec<CollectedTrace>, TraceCollectorError> {
Ok(Vec::new())
}
async fn get_trace_from_otlp(
&self,
_trace_id: &str,
) -> Result<Vec<CollectedTrace>, TraceCollectorError> {
Ok(Vec::new())
}
}
#[derive(Deserialize)]
struct JaegerTracesResponse {
data: Vec<JaegerTraceData>,
}
#[derive(Deserialize)]
struct JaegerTraceData {
spans: Vec<JaegerSpan>,
}
#[derive(Deserialize)]
struct JaegerSpan {
trace_id: String,
span_id: String,
parent_span_id: Option<String>,
operation_name: String,
start_time: u64, duration: u64, tags: Vec<JaegerTag>,
}
#[derive(Deserialize, Serialize)]
struct JaegerTag {
key: String,
value: serde_json::Value,
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_default_config() {
let config = TraceCollectorConfig::default();
assert_eq!(config.backend_type, ExporterType::Jaeger);
assert_eq!(config.max_traces, 100);
assert_eq!(config.timeout, Duration::from_secs(30));
}
#[tokio::test]
async fn test_collect_traces_jaeger_unavailable() {
let config = TraceCollectorConfig {
backend_type: ExporterType::Jaeger,
jaeger_endpoint: Some("http://nonexistent:16686".to_string()),
..Default::default()
};
let collector = TraceCollector::new(config);
let result = collector.collect_traces().await;
assert!(result.is_err());
}
}