use crate::server::AppState;
use axum::{extract::State, http::StatusCode, Json};
use serde_json::Value;
fn touch_activity(state: &AppState) {
if let Some(ref ts) = state.last_activity {
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_secs();
ts.store(now, std::sync::atomic::Ordering::Relaxed);
}
}
pub async fn receive_metrics(
State(state): State<AppState>,
Json(payload): Json<Value>,
) -> StatusCode {
touch_activity(&state);
tokio::task::spawn_blocking(move || {
if let Err(e) = store_metrics(payload) {
eprintln!("[otel] store_metrics error: {e}");
}
});
StatusCode::OK
}
pub async fn receive_logs(
State(state): State<AppState>,
Json(payload): Json<Value>,
) -> StatusCode {
touch_activity(&state);
tokio::task::spawn_blocking(move || {
if let Err(e) = store_logs(payload) {
eprintln!("[otel] store_logs error: {e}");
}
});
StatusCode::OK
}
fn store_metrics(payload: Value) -> crate::error::Result<()> {
let req: crate::otel::parser::ExportMetricsRequest =
serde_json::from_value(payload).unwrap_or_default();
let records = crate::otel::extract_metric_records(&req);
if records.is_empty() {
return Ok(());
}
let index = crate::storage::SessionIndex::new()?;
index.insert_otel_metrics(&records)
}
fn store_logs(payload: Value) -> crate::error::Result<()> {
let req: crate::otel::parser::ExportLogsRequest = match serde_json::from_value(payload) {
Ok(r) => r,
Err(e) => {
eprintln!("[otel] /v1/logs parse error: {e}");
return Ok(());
}
};
let records = crate::otel::extract_log_records(&req);
if records.is_empty() {
return Ok(());
}
let index = crate::storage::SessionIndex::new()?;
index.insert_otel_logs(&records)
}