use axum::Extension;
use axum::Router;
use axum::body::Body;
use axum::extract::DefaultBodyLimit;
use axum::http::{HeaderMap, HeaderValue, StatusCode, header};
use axum::response::Response;
use axum::routing::post;
use micromegas_ingestion::web_ingestion_service::WebIngestionService;
use micromegas_otel_ingestion::error::OtelError;
use micromegas_otel_ingestion::handler;
use micromegas_tracing::prelude::*;
use prost::Message;
use std::sync::Arc;
use tower_http::decompression::RequestDecompressionLayer;
use tower_http::limit::RequestBodyLimitLayer;
const OTLP_BODY_LIMIT_BYTES: usize = 20 * 1024 * 1024;
const OTLP_DECOMPRESSED_BODY_LIMIT_BYTES: usize = 300 * 1024 * 1024;
const RETRY_AFTER_SECONDS: u32 = 30;
const CONTENT_TYPE_PROTOBUF: &str = "application/x-protobuf";
fn check_content_type(headers: &HeaderMap) -> Result<(), OtlpHttpError> {
let Some(ct) = headers.get(header::CONTENT_TYPE) else {
return Err(OtlpHttpError::WrongContentType);
};
let Ok(ct) = ct.to_str() else {
return Err(OtlpHttpError::WrongContentType);
};
let media = ct
.split(';')
.next()
.unwrap_or("")
.trim()
.to_ascii_lowercase();
if media == CONTENT_TYPE_PROTOBUF {
Ok(())
} else {
Err(OtlpHttpError::WrongContentType)
}
}
enum OtlpHttpError {
WrongContentType,
Otel(OtelError),
}
impl OtlpHttpError {
fn into_otlp_response(self) -> Response {
match self {
OtlpHttpError::WrongContentType => build_error_response(
StatusCode::UNSUPPORTED_MEDIA_TYPE,
3, "Content-Type must be application/x-protobuf",
false,
),
OtlpHttpError::Otel(err) => {
let retryable = err.is_retryable();
let status = match err.http_status() {
400 => StatusCode::BAD_REQUEST,
503 => StatusCode::SERVICE_UNAVAILABLE,
other => {
StatusCode::from_u16(other).unwrap_or(StatusCode::INTERNAL_SERVER_ERROR)
}
};
let code = err.grpc_code();
error!("OTLP error: {}", err);
build_error_response(status, code, &err.public_message(), retryable)
}
}
}
}
fn build_error_response(status: StatusCode, code: i32, message: &str, retryable: bool) -> Response {
let proto_status = micromegas_otel_ingestion::proto::Status {
code,
message: message.to_string(),
};
let body = proto_status.encode_to_vec();
let mut response = Response::builder()
.status(status)
.header(
header::CONTENT_TYPE,
HeaderValue::from_static(CONTENT_TYPE_PROTOBUF),
)
.body(Body::from(body))
.expect("building OTLP error response");
if retryable && let Ok(value) = HeaderValue::from_str(&RETRY_AFTER_SECONDS.to_string()) {
response.headers_mut().insert(header::RETRY_AFTER, value);
}
response
}
fn proto_response<M: Message>(msg: M) -> Response {
let body = msg.encode_to_vec();
Response::builder()
.status(StatusCode::OK)
.header(
header::CONTENT_TYPE,
HeaderValue::from_static(CONTENT_TYPE_PROTOBUF),
)
.body(Body::from(body))
.expect("building OTLP success response")
}
async fn logs_handler(
Extension(service): Extension<Arc<WebIngestionService>>,
headers: HeaderMap,
body: bytes::Bytes,
) -> Response {
if let Err(e) = check_content_type(&headers) {
return e.into_otlp_response();
}
match handler::ingest_logs(service, body).await {
Ok(resp) => proto_response(resp),
Err(e) => OtlpHttpError::Otel(e).into_otlp_response(),
}
}
async fn metrics_handler(
Extension(service): Extension<Arc<WebIngestionService>>,
headers: HeaderMap,
body: bytes::Bytes,
) -> Response {
if let Err(e) = check_content_type(&headers) {
return e.into_otlp_response();
}
match handler::ingest_metrics(service, body).await {
Ok(resp) => proto_response(resp),
Err(e) => OtlpHttpError::Otel(e).into_otlp_response(),
}
}
async fn traces_handler(
Extension(service): Extension<Arc<WebIngestionService>>,
headers: HeaderMap,
body: bytes::Bytes,
) -> Response {
if let Err(e) = check_content_type(&headers) {
return e.into_otlp_response();
}
match handler::ingest_traces(service, body).await {
Ok(resp) => proto_response(resp),
Err(e) => OtlpHttpError::Otel(e).into_otlp_response(),
}
}
pub fn otlp_router() -> Router {
Router::new()
.route("/ingestion/otlp/v1/logs", post(logs_handler))
.route("/ingestion/otlp/v1/metrics", post(metrics_handler))
.route("/ingestion/otlp/v1/traces", post(traces_handler))
.layer(RequestDecompressionLayer::new().gzip(true))
.layer(RequestBodyLimitLayer::new(OTLP_BODY_LIMIT_BYTES))
.layer(DefaultBodyLimit::max(OTLP_DECOMPRESSED_BODY_LIMIT_BYTES))
}