use std::net::SocketAddr;
use std::sync::Arc;
use axum::Router;
use axum::body::Bytes;
use axum::extract::State;
use axum::http::StatusCode;
use axum::response::IntoResponse;
use axum::routing::post;
use prost::Message;
use tracing::info;
use super::proto;
use super::receiver::{ingest_logs, ingest_metrics, ingest_traces};
use crate::control::state::SharedState;
pub async fn run(listen: SocketAddr, shared: Arc<SharedState>) -> std::io::Result<()> {
let router = Router::new()
.route(
"/opentelemetry.proto.collector.metrics.v1.MetricsService/Export",
post(grpc_metrics),
)
.route(
"/opentelemetry.proto.collector.trace.v1.TraceService/Export",
post(grpc_traces),
)
.route(
"/opentelemetry.proto.collector.logs.v1.LogsService/Export",
post(grpc_logs),
)
.with_state(shared);
let listener = tokio::net::TcpListener::bind(listen).await?;
info!(addr = %listen, "OTLP/gRPC receiver started");
axum::serve(listener, router)
.await
.map_err(std::io::Error::other)
}
async fn grpc_metrics(State(shared): State<Arc<SharedState>>, body: Bytes) -> impl IntoResponse {
let payload = grpc_decode(&body);
let Ok(req) = proto::ExportMetricsServiceRequest::decode(&payload[..]) else {
return grpc_error(StatusCode::BAD_REQUEST, "invalid protobuf");
};
ingest_metrics(&shared, &req).await;
grpc_response(&proto::ExportMetricsServiceResponse {})
}
async fn grpc_traces(State(shared): State<Arc<SharedState>>, body: Bytes) -> impl IntoResponse {
let payload = grpc_decode(&body);
let Ok(req) = proto::ExportTraceServiceRequest::decode(&payload[..]) else {
return grpc_error(StatusCode::BAD_REQUEST, "invalid protobuf");
};
ingest_traces(&shared, &req).await;
grpc_response(&proto::ExportTraceServiceResponse {})
}
async fn grpc_logs(State(shared): State<Arc<SharedState>>, body: Bytes) -> impl IntoResponse {
let payload = grpc_decode(&body);
let Ok(req) = proto::ExportLogsServiceRequest::decode(&payload[..]) else {
return grpc_error(StatusCode::BAD_REQUEST, "invalid protobuf");
};
ingest_logs(&shared, &req).await;
grpc_response(&proto::ExportLogsServiceResponse {})
}
fn grpc_decode(body: &Bytes) -> Vec<u8> {
if body.len() < 5 {
return body.to_vec();
}
let _compressed = body[0];
let len = u32::from_be_bytes([body[1], body[2], body[3], body[4]]) as usize;
let start = 5;
let end = (start + len).min(body.len());
body[start..end].to_vec()
}
fn grpc_response<M: Message>(msg: &M) -> (StatusCode, [(String, String); 2], Vec<u8>) {
let mut payload = Vec::new();
let _ = msg.encode(&mut payload);
let len = payload.len() as u32;
let mut framed = Vec::with_capacity(5 + payload.len());
framed.push(0); framed.extend_from_slice(&len.to_be_bytes());
framed.extend_from_slice(&payload);
(
StatusCode::OK,
[
("content-type".to_string(), "application/grpc".to_string()),
("grpc-status".to_string(), "0".to_string()),
],
framed,
)
}
fn grpc_error(status: StatusCode, msg: &str) -> (StatusCode, [(String, String); 2], Vec<u8>) {
(
status,
[
("content-type".to_string(), "application/grpc".to_string()),
("grpc-status".to_string(), "2".to_string()), ],
msg.as_bytes().to_vec(),
)
}