use super::{ExternalService, ServiceError};
use crate::protocol::{ServiceRequest, ServiceResponse};
use async_trait::async_trait;
use reqwest::{Client, Url};
use serde::Serialize;
use std::collections::HashMap;
use tracing::warn;
pub struct OtelService {
client: Client,
endpoint: Url,
headers: HashMap<String, String>,
}
impl OtelService {
pub fn new(endpoint: Url, headers: HashMap<String, String>) -> Self {
Self {
client: Client::new(),
endpoint,
headers,
}
}
pub fn from_env() -> Result<Self, anyhow::Error> {
let endpoint_str = std::env::var("OTLP_ENDPOINT")?;
let endpoint = Url::parse(&endpoint_str)?;
let mut headers = HashMap::new();
if let Ok(h_str) = std::env::var("OTLP_HEADERS") {
for pair in h_str.split(',') {
let trimmed = pair.trim();
if trimmed.is_empty() {
continue;
}
if let Some((k, v)) = trimmed.split_once('=') {
headers.insert(k.trim().to_string(), v.trim().to_string());
} else {
warn!(
"OTelService: ignoring malformed OTLP_HEADERS entry without '=': {:?}",
trimmed
);
}
}
}
Ok(Self::new(endpoint, headers))
}
}
#[derive(Serialize)]
struct TracesData {
resource_spans: Vec<ResourceSpan>,
}
#[derive(Serialize)]
struct ResourceSpan {
resource: Resource,
scope_spans: Vec<ScopeSpan>,
}
#[derive(Serialize)]
struct Resource {
attributes: Vec<KeyValue>,
}
#[derive(Serialize)]
struct ScopeSpan {
scope: InstrumentationScope,
spans: Vec<Span>,
}
#[derive(Serialize)]
struct InstrumentationScope {
name: String,
version: String,
}
#[derive(Serialize)]
struct Span {
trace_id: String,
span_id: String,
name: String,
start_time_unix_nano: u64,
end_time_unix_nano: u64,
attributes: Vec<KeyValue>,
kind: i32, }
#[derive(Serialize)]
struct KeyValue {
key: String,
value: AnyValue,
}
#[derive(Serialize)]
struct AnyValue {
string_value: Option<String>,
}
#[derive(Serialize)]
struct MetricsData {
resource_metrics: Vec<ResourceMetric>,
}
#[derive(Serialize)]
struct ResourceMetric {
resource: Resource,
scope_metrics: Vec<ScopeMetric>,
}
#[derive(Serialize)]
struct ScopeMetric {
scope: InstrumentationScope,
metrics: Vec<Metric>,
}
#[derive(Serialize)]
struct Metric {
name: String,
gauge: Option<Gauge>,
}
#[derive(Serialize)]
struct Gauge {
data_points: Vec<NumberDataPoint>,
}
#[derive(Serialize)]
struct NumberDataPoint {
time_unix_nano: u64,
as_double: f64,
attributes: Vec<KeyValue>,
}
impl From<HashMap<String, String>> for AnyValue {
fn from(_: HashMap<String, String>) -> Self {
AnyValue { string_value: None }
}
}
fn map_attributes(attrs: &HashMap<String, String>) -> Vec<KeyValue> {
attrs
.iter()
.map(|(k, v)| KeyValue {
key: k.clone(),
value: AnyValue {
string_value: Some(v.clone()),
},
})
.collect()
}
#[async_trait]
impl ExternalService for OtelService {
type Request = ServiceRequest;
type Response = ServiceResponse;
async fn call(&self, req: Self::Request) -> Result<Self::Response, ServiceError> {
match req {
ServiceRequest::OtelSpan {
trace_id,
span_id,
name,
start_ns,
end_ns,
attributes,
} => {
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map_err(|e| {
warn!("SystemTime error (clock skew?): {}", e);
ServiceError::Api {
code: 500,
message: format!("SystemTime error: {}", e),
}
})?
.as_nanos() as u64;
let payload = TracesData {
resource_spans: vec![ResourceSpan {
resource: Resource { attributes: vec![] },
scope_spans: vec![ScopeSpan {
scope: InstrumentationScope {
name: "exomonad".into(),
version: "0.1.0".into(),
},
spans: vec![Span {
trace_id,
span_id,
name,
start_time_unix_nano: start_ns.unwrap_or(now),
end_time_unix_nano: end_ns.unwrap_or(now),
attributes: map_attributes(&attributes.unwrap_or_default()),
kind: 1, }],
}],
}],
};
let url = self
.endpoint
.join("/v1/traces")
.map_err(|e| ServiceError::Api {
code: 500,
message: format!("URL error: {}", e),
})?;
let mut builder = self.client.post(url).json(&payload);
for (k, v) in &self.headers {
builder = builder.header(k, v);
}
let response = builder.send().await?;
if !response.status().is_success() {
return Err(ServiceError::Api {
code: response.status().as_u16() as i32,
message: response.text().await.unwrap_or_default(),
});
}
Ok(ServiceResponse::Ack)
}
ServiceRequest::OtelMetric {
name,
value,
labels,
} => {
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map_err(|e| {
warn!("SystemTime error (clock skew?): {}", e);
ServiceError::Api {
code: 500,
message: format!("SystemTime error: {}", e),
}
})?
.as_nanos() as u64;
let payload = MetricsData {
resource_metrics: vec![ResourceMetric {
resource: Resource { attributes: vec![] },
scope_metrics: vec![ScopeMetric {
scope: InstrumentationScope {
name: "exomonad".into(),
version: "0.1.0".into(),
},
metrics: vec![Metric {
name,
gauge: Some(Gauge {
data_points: vec![NumberDataPoint {
time_unix_nano: now,
as_double: value,
attributes: map_attributes(&labels),
}],
}),
}],
}],
}],
};
let url = self
.endpoint
.join("/v1/metrics")
.map_err(|e| ServiceError::Api {
code: 500,
message: format!("URL error: {}", e),
})?;
let mut builder = self.client.post(url).json(&payload);
for (k, v) in &self.headers {
builder = builder.header(k, v);
}
let response = builder.send().await?;
if !response.status().is_success() {
return Err(ServiceError::Api {
code: response.status().as_u16() as i32,
message: response.text().await.unwrap_or_default(),
});
}
Ok(ServiceResponse::Ack)
}
_ => panic!("Invalid request type for OtelService"),
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use wiremock::matchers::{method, path};
use wiremock::{Mock, MockServer, ResponseTemplate};
#[tokio::test]
async fn test_otel_span() {
let mock_server = MockServer::start().await;
Mock::given(method("POST"))
.and(path("/v1/traces"))
.respond_with(ResponseTemplate::new(200))
.mount(&mock_server)
.await;
let service = OtelService::new(mock_server.uri().parse().unwrap(), HashMap::new());
let req = ServiceRequest::OtelSpan {
trace_id: "12345678901234567890123456789012".into(),
span_id: "1234567890123456".into(),
name: "test-span".into(),
start_ns: Some(1000),
end_ns: Some(2000),
attributes: Some(HashMap::new()),
};
match service.call(req).await.unwrap() {
ServiceResponse::Ack => {}
_ => panic!("Wrong response type"),
}
}
#[tokio::test]
async fn test_otel_url_error() {
let endpoint = Url::parse("mailto:user@example.com").unwrap();
let service = OtelService::new(endpoint, HashMap::new());
let req = ServiceRequest::OtelMetric {
name: "test-metric".into(),
value: 1.0,
labels: HashMap::new(),
};
let result = service.call(req).await;
match result {
Err(ServiceError::Api { code, message }) => {
assert_eq!(code, 500);
assert!(message.contains("URL error"));
}
_ => panic!("Expected URL error, got {:?}", result),
}
}
}