use crate::error::ReceiverError;
use crate::http::routes::AppState;
use crate::protocol::{json as json_parser, protobuf};
use axum::{
body::Bytes,
extract::State,
http::{HeaderMap, StatusCode},
response::{IntoResponse, Response},
Json,
};
use opentelemetry_proto::tonic::collector::logs::v1::ExportLogsServiceRequest;
use opentelemetry_proto::tonic::collector::metrics::v1::ExportMetricsServiceRequest;
use opentelemetry_proto::tonic::collector::trace::v1::ExportTraceServiceRequest;
use serde_json::json;
use tracing::{debug, error};
pub async fn handle_health(State(state): State<AppState>) -> Response {
if state.health_checker.is_ready() {
(StatusCode::OK, Json(json!({"status": "healthy"}))).into_response()
} else {
(
StatusCode::SERVICE_UNAVAILABLE,
Json(json!({"status": "unhealthy"})),
)
.into_response()
}
}
pub async fn handle_metrics(
State(state): State<AppState>,
headers: HeaderMap,
body: Bytes,
) -> Response {
debug!("Received metrics request: {} bytes", body.len());
let content_type = match get_content_type(&headers) {
Ok(ct) => ct,
Err(e) => return e.into_response(),
};
let request: ExportMetricsServiceRequest = match content_type.as_str() {
"application/x-protobuf" => match protobuf::parse_message(&body) {
Ok(req) => req,
Err(e) => {
error!("Failed to parse protobuf metrics request: {}", e);
return e.into_response();
},
},
"application/json" => match json_parser::parse_metrics_json(&body) {
Ok(req) => req,
Err(e) => {
error!("Failed to parse JSON metrics request: {}", e);
return e.into_response();
},
},
_ => {
return ReceiverError::InvalidContentType(format!(
"Unsupported Content-Type: {}. Expected application/x-protobuf or application/json",
content_type
))
.into_response();
},
};
match state.metrics_handler.process(request).await {
Ok(_) => (StatusCode::OK, Json(json!({"status": "success"}))).into_response(),
Err(e) => {
error!("Failed to process metrics: {}", e);
e.into_response()
},
}
}
pub async fn handle_logs(
State(state): State<AppState>,
headers: HeaderMap,
body: Bytes,
) -> Response {
debug!("Received logs request: {} bytes", body.len());
let content_type = match get_content_type(&headers) {
Ok(ct) => ct,
Err(e) => return e.into_response(),
};
let request: ExportLogsServiceRequest = match content_type.as_str() {
"application/x-protobuf" => match protobuf::parse_message(&body) {
Ok(req) => req,
Err(e) => {
error!("Failed to parse protobuf logs request: {}", e);
return e.into_response();
},
},
"application/json" => match json_parser::parse_logs_json(&body) {
Ok(req) => req,
Err(e) => {
error!("Failed to parse JSON logs request: {}", e);
return e.into_response();
},
},
_ => {
return ReceiverError::InvalidContentType(format!(
"Unsupported Content-Type: {}. Expected application/x-protobuf or application/json",
content_type
))
.into_response();
},
};
match state.logs_handler.process(request).await {
Ok(_) => (StatusCode::OK, Json(json!({"status": "success"}))).into_response(),
Err(e) => {
error!("Failed to process logs: {}", e);
e.into_response()
},
}
}
pub async fn handle_traces(
State(state): State<AppState>,
headers: HeaderMap,
body: Bytes,
) -> Response {
debug!("Received traces request: {} bytes", body.len());
let content_type = match get_content_type(&headers) {
Ok(ct) => ct,
Err(e) => return e.into_response(),
};
let request: ExportTraceServiceRequest = match content_type.as_str() {
"application/x-protobuf" => match protobuf::parse_message(&body) {
Ok(req) => req,
Err(e) => {
error!("Failed to parse protobuf traces request: {}", e);
return e.into_response();
},
},
"application/json" => match json_parser::parse_traces_json(&body) {
Ok(req) => req,
Err(e) => {
error!("Failed to parse JSON traces request: {}", e);
return e.into_response();
},
},
_ => {
return ReceiverError::InvalidContentType(format!(
"Unsupported Content-Type: {}. Expected application/x-protobuf or application/json",
content_type
))
.into_response();
},
};
match state.traces_handler.process(request).await {
Ok(_) => (StatusCode::OK, Json(json!({"status": "success"}))).into_response(),
Err(e) => {
error!("Failed to process traces: {}", e);
e.into_response()
},
}
}
pub async fn handle_unified(
State(state): State<AppState>,
headers: HeaderMap,
body: Bytes,
) -> Response {
debug!("Received unified OTLP request: {} bytes", body.len());
if let Err(e) = validate_content_type(&headers, "application/x-protobuf") {
return e.into_response();
}
if let Ok(request) = protobuf::parse_message::<ExportMetricsServiceRequest>(&body) {
return match state.metrics_handler.process(request).await {
Ok(_) => (StatusCode::OK, Json(json!({"status": "success"}))).into_response(),
Err(e) => e.into_response(),
};
}
if let Ok(request) = protobuf::parse_message::<ExportLogsServiceRequest>(&body) {
return match state.logs_handler.process(request).await {
Ok(_) => (StatusCode::OK, Json(json!({"status": "success"}))).into_response(),
Err(e) => e.into_response(),
};
}
if let Ok(request) = protobuf::parse_message::<ExportTraceServiceRequest>(&body) {
return match state.traces_handler.process(request).await {
Ok(_) => (StatusCode::OK, Json(json!({"status": "success"}))).into_response(),
Err(e) => e.into_response(),
};
}
ReceiverError::InvalidSignalType("Could not parse as any OTLP signal type".to_string())
.into_response()
}
fn get_content_type(headers: &HeaderMap) -> Result<String, ReceiverError> {
if let Some(content_type) = headers.get("content-type") {
let content_type_str = content_type.to_str().map_err(|_| {
ReceiverError::InvalidContentType("Invalid Content-Type header".to_string())
})?;
let base_type = content_type_str
.split(';')
.next()
.unwrap_or(content_type_str)
.trim();
Ok(base_type.to_string())
} else {
Err(ReceiverError::InvalidContentType(
"Missing Content-Type header".to_string(),
))
}
}
fn validate_content_type(headers: &HeaderMap, expected: &str) -> Result<(), ReceiverError> {
let content_type = get_content_type(headers)?;
if content_type.starts_with(expected) {
Ok(())
} else {
Err(ReceiverError::InvalidContentType(format!(
"Expected Content-Type: {}, got: {}",
expected, content_type
)))
}
}
#[cfg(test)]
mod tests {
use super::*;
use axum::http::header::CONTENT_TYPE;
#[test]
fn test_get_content_type_protobuf() {
let mut headers = HeaderMap::new();
headers.insert(CONTENT_TYPE, "application/x-protobuf".parse().unwrap());
assert_eq!(
get_content_type(&headers).unwrap(),
"application/x-protobuf"
);
}
#[test]
fn test_get_content_type_json() {
let mut headers = HeaderMap::new();
headers.insert(CONTENT_TYPE, "application/json".parse().unwrap());
assert_eq!(get_content_type(&headers).unwrap(), "application/json");
}
#[test]
fn test_get_content_type_with_charset() {
let mut headers = HeaderMap::new();
headers.insert(
CONTENT_TYPE,
"application/x-protobuf; charset=utf-8".parse().unwrap(),
);
assert_eq!(
get_content_type(&headers).unwrap(),
"application/x-protobuf"
);
}
#[test]
fn test_get_content_type_missing() {
let headers = HeaderMap::new();
assert!(get_content_type(&headers).is_err());
}
#[test]
fn test_validate_content_type_success() {
let mut headers = HeaderMap::new();
headers.insert(CONTENT_TYPE, "application/x-protobuf".parse().unwrap());
assert!(validate_content_type(&headers, "application/x-protobuf").is_ok());
}
#[test]
fn test_validate_content_type_with_charset() {
let mut headers = HeaderMap::new();
headers.insert(
CONTENT_TYPE,
"application/x-protobuf; charset=utf-8".parse().unwrap(),
);
assert!(validate_content_type(&headers, "application/x-protobuf").is_ok());
}
#[test]
fn test_validate_content_type_missing() {
let headers = HeaderMap::new();
assert!(validate_content_type(&headers, "application/x-protobuf").is_err());
}
#[test]
fn test_validate_content_type_wrong() {
let mut headers = HeaderMap::new();
headers.insert(CONTENT_TYPE, "application/json".parse().unwrap());
assert!(validate_content_type(&headers, "application/x-protobuf").is_err());
}
}