sonda-server 1.6.1

HTTP control plane for Sonda — synthetic telemetry generator
//! POST /events — synchronous single-event emission.

use std::collections::HashMap;
use std::time::Instant;

use axum::extract::State;
use axum::http::StatusCode;
use axum::response::{IntoResponse, Json, Response};
use serde::{Deserialize, Serialize};
use serde_json::json;
use sonda_core::emit::{emit_log, emit_metric};
use sonda_core::encoder::EncoderConfig;
use sonda_core::model::log::{LogEvent, Severity};
use sonda_core::model::metric::{Labels, MetricEvent};
use sonda_core::sink::SinkConfig;
use sonda_core::SondaError;
use tracing::info;

use crate::routes::sink_warnings::{collect_warnings_for_sink, log_warnings};
use crate::state::AppState;

#[derive(Debug, Deserialize)]
pub struct LogPayload {
    pub severity: Severity,
    pub message: String,
    #[serde(default)]
    pub fields: HashMap<String, String>,
}

#[derive(Debug, Deserialize)]
pub struct MetricPayload {
    pub name: String,
    pub value: f64,
}

#[derive(Debug, Deserialize)]
#[serde(tag = "signal_type")]
pub enum EventRequest {
    #[serde(rename = "logs")]
    Logs {
        #[serde(default)]
        labels: HashMap<String, String>,
        log: LogPayload,
        encoder: EncoderConfig,
        sink: SinkConfig,
    },
    #[serde(rename = "metrics")]
    Metrics {
        #[serde(default)]
        labels: HashMap<String, String>,
        metric: MetricPayload,
        encoder: EncoderConfig,
        sink: SinkConfig,
    },
}

#[derive(Debug, Serialize)]
pub struct EventAck {
    pub sent: bool,
    pub signal_type: &'static str,
    pub latency_ms: u128,
    #[serde(skip_serializing_if = "Vec::is_empty", default)]
    pub warnings: Vec<String>,
}

pub async fn post_events(State(_state): State<AppState>, body: axum::body::Bytes) -> Response {
    let req: EventRequest = match serde_json::from_slice::<EventRequest>(&body) {
        Ok(r) => r,
        Err(e) => return bad_request(format!("invalid event body: {e}")),
    };

    let mut warnings: Vec<String> = Vec::new();
    let warning_label = match &req {
        EventRequest::Logs { .. } => "events.logs",
        EventRequest::Metrics { .. } => "events.metrics",
    };
    match &req {
        EventRequest::Logs { sink, .. } | EventRequest::Metrics { sink, .. } => {
            collect_warnings_for_sink(sink, warning_label, &mut warnings);
        }
    }
    log_warnings("POST /events", &warnings);

    let started = Instant::now();
    let (signal_type, sink_type, emit_result) = match req {
        EventRequest::Logs {
            labels,
            log,
            encoder,
            sink,
        } => {
            let event = match build_log_event(log, &labels) {
                Ok(e) => e,
                Err(e) => return error_response(e),
            };
            let sink_type = sink_kind(&sink);
            let labels_for_sink = (!labels.is_empty()).then_some(labels);
            let result =
                run_blocking(move || emit_log(&event, &encoder, &sink, labels_for_sink.as_ref()))
                    .await;
            ("logs", sink_type, result)
        }
        EventRequest::Metrics {
            labels,
            metric,
            encoder,
            sink,
        } => {
            let event = match build_metric_event(metric, &labels) {
                Ok(e) => e,
                Err(e) => return error_response(e),
            };
            let sink_type = sink_kind(&sink);
            let labels_for_sink = (!labels.is_empty()).then_some(labels);
            let result = run_blocking(move || {
                emit_metric(&event, &encoder, &sink, labels_for_sink.as_ref())
            })
            .await;
            ("metrics", sink_type, result)
        }
    };
    let latency_ms = started.elapsed().as_millis();

    match emit_result {
        Ok(()) => {
            info!(
                signal_type = signal_type,
                sink_type = sink_type,
                latency_ms = latency_ms,
                result = "ok",
                "POST /events: event delivered"
            );
            (
                StatusCode::OK,
                Json(EventAck {
                    sent: true,
                    signal_type,
                    latency_ms,
                    warnings,
                }),
            )
                .into_response()
        }
        Err(err) => {
            info!(
                signal_type = signal_type,
                sink_type = sink_type,
                latency_ms = latency_ms,
                result = "error",
                error = %err,
                "POST /events: event delivery failed"
            );
            error_response(err)
        }
    }
}

fn build_log_event(
    log: LogPayload,
    labels: &HashMap<String, String>,
) -> Result<LogEvent, SondaError> {
    let labels = labels_from_map(labels)?;
    let fields: std::collections::BTreeMap<String, String> = log.fields.into_iter().collect();
    Ok(LogEvent::new(log.severity, log.message, labels, fields))
}

fn build_metric_event(
    metric: MetricPayload,
    labels: &HashMap<String, String>,
) -> Result<MetricEvent, SondaError> {
    let labels = labels_from_map(labels)?;
    MetricEvent::new(metric.name, metric.value, labels)
}

fn labels_from_map(map: &HashMap<String, String>) -> Result<Labels, SondaError> {
    if map.is_empty() {
        return Ok(Labels::default());
    }
    let pairs: Vec<(&str, &str)> = map.iter().map(|(k, v)| (k.as_str(), v.as_str())).collect();
    Labels::from_pairs(&pairs)
}

async fn run_blocking<F>(f: F) -> Result<(), SondaError>
where
    F: FnOnce() -> Result<(), SondaError> + Send + 'static,
{
    match tokio::task::spawn_blocking(f).await {
        Ok(r) => r,
        Err(_join_err) => Err(SondaError::Runtime(
            sonda_core::RuntimeError::ThreadPanicked,
        )),
    }
}

fn error_response(err: SondaError) -> Response {
    match err {
        SondaError::Config(e) => unprocessable(format!("{e}")),
        SondaError::Sink(e) => bad_gateway(format!("sink error: {e}")),
        SondaError::Encoder(e) => internal_error(format!("encoder error: {e}")),
        SondaError::Generator(e) => internal_error(format!("generator error: {e}")),
        SondaError::Runtime(e) => internal_error(format!("runtime error: {e}")),
        _ => internal_error("unexpected error variant"),
    }
}

fn sink_kind(sink: &SinkConfig) -> &'static str {
    match sink {
        SinkConfig::Stdout => "stdout",
        SinkConfig::File { .. } => "file",
        SinkConfig::Tcp { .. } => "tcp",
        SinkConfig::Udp { .. } => "udp",
        #[cfg(feature = "http")]
        SinkConfig::HttpPush { .. } => "http_push",
        #[cfg(feature = "http")]
        SinkConfig::Loki { .. } => "loki",
        #[cfg(feature = "remote-write")]
        SinkConfig::RemoteWrite { .. } => "remote_write",
        #[cfg(feature = "kafka")]
        SinkConfig::Kafka { .. } => "kafka",
        #[cfg(feature = "otlp")]
        SinkConfig::OtlpGrpc { .. } => "otlp_grpc",
        _ => "other",
    }
}

fn bad_request(detail: impl std::fmt::Display) -> Response {
    let body = json!({ "error": "bad_request", "detail": detail.to_string() });
    (StatusCode::BAD_REQUEST, Json(body)).into_response()
}

fn unprocessable(detail: impl std::fmt::Display) -> Response {
    let body = json!({ "error": "unprocessable_entity", "detail": detail.to_string() });
    (StatusCode::UNPROCESSABLE_ENTITY, Json(body)).into_response()
}

fn bad_gateway(detail: impl std::fmt::Display) -> Response {
    let body = json!({ "error": "bad_gateway", "detail": detail.to_string() });
    (StatusCode::BAD_GATEWAY, Json(body)).into_response()
}

fn internal_error(detail: impl std::fmt::Display) -> Response {
    let body = json!({ "error": "internal_server_error", "detail": detail.to_string() });
    (StatusCode::INTERNAL_SERVER_ERROR, Json(body)).into_response()
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn deserializes_logs_branch() {
        let payload = serde_json::json!({
            "signal_type": "logs",
            "labels": {"event": "deploy_start"},
            "log": {"severity": "info", "message": "go", "fields": {}},
            "encoder": {"type": "json_lines"},
            "sink": {"type": "stdout"},
        });
        let req: EventRequest = serde_json::from_value(payload).expect("must parse");
        match req {
            EventRequest::Logs { log, .. } => {
                assert_eq!(log.message, "go");
                assert_eq!(log.severity, Severity::Info);
            }
            _ => panic!("expected Logs branch"),
        }
    }

    #[test]
    fn deserializes_metrics_branch() {
        let payload = serde_json::json!({
            "signal_type": "metrics",
            "labels": {},
            "metric": {"name": "x", "value": 1.0},
            "encoder": {"type": "prometheus_text"},
            "sink": {"type": "stdout"},
        });
        let req: EventRequest = serde_json::from_value(payload).expect("must parse");
        match req {
            EventRequest::Metrics { metric, .. } => {
                assert_eq!(metric.name, "x");
                assert_eq!(metric.value, 1.0);
            }
            _ => panic!("expected Metrics branch"),
        }
    }

    #[test]
    fn unknown_signal_type_fails_to_deserialize() {
        let payload = serde_json::json!({
            "signal_type": "traces",
            "encoder": {"type": "json_lines"},
            "sink": {"type": "stdout"},
        });
        let err = serde_json::from_value::<EventRequest>(payload)
            .expect_err("unknown signal_type must error");
        let msg = err.to_string();
        assert!(
            msg.contains("traces") || msg.to_lowercase().contains("variant"),
            "error must mention the bad tag, got: {msg}"
        );
    }

    #[test]
    fn missing_log_message_fails_to_deserialize() {
        let payload = serde_json::json!({
            "signal_type": "logs",
            "log": {"severity": "info"},
            "encoder": {"type": "json_lines"},
            "sink": {"type": "stdout"},
        });
        let err = serde_json::from_value::<EventRequest>(payload)
            .expect_err("missing message must error");
        assert!(
            err.to_string().to_lowercase().contains("message"),
            "error must mention the missing field, got: {err}"
        );
    }

    #[test]
    fn error_response_maps_config_to_422() {
        let err = SondaError::Config(sonda_core::ConfigError::InvalidValue("bad".to_string()));
        let resp = error_response(err);
        assert_eq!(resp.status(), StatusCode::UNPROCESSABLE_ENTITY);
    }

    #[test]
    fn error_response_maps_sink_to_502() {
        let err = SondaError::Sink(std::io::Error::new(
            std::io::ErrorKind::ConnectionRefused,
            "nope",
        ));
        let resp = error_response(err);
        assert_eq!(resp.status(), StatusCode::BAD_GATEWAY);
    }

    #[test]
    fn error_response_maps_runtime_to_500() {
        let err = SondaError::Runtime(sonda_core::RuntimeError::ThreadPanicked);
        let resp = error_response(err);
        assert_eq!(resp.status(), StatusCode::INTERNAL_SERVER_ERROR);
    }

    #[test]
    fn sink_kind_tags_match_yaml_type_names() {
        assert_eq!(sink_kind(&SinkConfig::Stdout), "stdout");
        assert_eq!(
            sink_kind(&SinkConfig::Tcp {
                address: "x:1".into(),
                retry: None,
            }),
            "tcp"
        );
        assert_eq!(
            sink_kind(&SinkConfig::Udp {
                address: "x:1".into(),
            }),
            "udp"
        );
    }

    #[test]
    fn labels_from_empty_map_returns_default() {
        let map: HashMap<String, String> = HashMap::new();
        let labels = labels_from_map(&map).expect("empty must succeed");
        assert!(labels.is_empty());
    }

    #[test]
    fn labels_from_map_rejects_invalid_keys() {
        let mut map = HashMap::new();
        map.insert("1bad".to_string(), "value".to_string());
        let err = labels_from_map(&map).expect_err("invalid key must fail");
        assert!(matches!(err, SondaError::Config(_)));
    }
}