mi6-otel-server 0.2.3

OpenTelemetry HTTP server for mi6
Documentation
//! HTTP server implementation for receiving OpenTelemetry logs.

use std::io::Write;
use std::process::{Command, Stdio};
use std::sync::atomic::{AtomicBool, Ordering};

use anyhow::Result;

use mi6_core::{Config, ExportLogsServiceRequest, OtelMode, Storage, process_logs_request};

const HEALTH_RESPONSE: &str = r#"{"service":"mi6-otel","status":"ok"}"#;

/// Run the OTLP HTTP server.
///
/// This function starts an HTTP server on the specified port that receives
/// OpenTelemetry logs and stores them using the provided storage backend.
///
/// # Arguments
/// * `storage` - Storage backend (only used in library mode)
/// * `port` - Port to listen on
/// * `mode` - Processing mode (None uses config default)
///
/// # Endpoints
/// - `POST /v1/logs` - Receive and process OpenTelemetry log data
/// - `GET /health` - Health check endpoint for service identification
///
/// # Panics
/// This function runs indefinitely and only returns on error.
pub fn run_server<S: Storage>(storage: &S, port: u16, mode: Option<OtelMode>) -> Result<()> {
    let config = Config::load().unwrap_or_default();
    let machine_id = config.machine_id();

    // Determine the effective mode
    let effective_mode = mode.unwrap_or(config.otel.mode);

    // Track if we've fallen back to library mode
    let using_fallback = AtomicBool::new(false);

    let addr = format!("127.0.0.1:{}", port);
    let server = tiny_http::Server::http(&addr)
        .map_err(|e| anyhow::anyhow!("failed to start HTTP server: {}", e))?;

    eprintln!(
        "[mi6] OTel server listening on http://{} (mode: {})",
        addr, effective_mode
    );

    loop {
        let mut request = match server.recv() {
            Ok(req) => req,
            Err(e) => {
                eprintln!("[mi6] Error receiving request: {}", e);
                continue;
            }
        };

        let url = request.url().to_string();
        let method = request.method().to_string();

        // Handle POST /v1/logs
        if method == "POST" && url == "/v1/logs" {
            let mut body = String::new();
            if request.as_reader().read_to_string(&mut body).is_ok() {
                let count = match effective_mode {
                    OtelMode::RelayCli => {
                        // Try relay-cli mode first, fall back to library on failure
                        match process_via_cli(&body) {
                            Ok(c) => c,
                            Err(e) => {
                                if !using_fallback.swap(true, Ordering::Relaxed) {
                                    eprintln!(
                                        "[mi6] Warning: relay-cli failed ({}), falling back to library mode",
                                        e
                                    );
                                }
                                process_logs_library(storage, &body, &machine_id)
                            }
                        }
                    }
                    OtelMode::Library => process_logs_library(storage, &body, &machine_id),
                };

                if count > 0 {
                    eprintln!("[mi6] Stored {} API request(s)", count);
                }
            }
            let response = tiny_http::Response::empty(200);
            let _ = request.respond(response);
        } else if url == "/health" {
            // Return specific JSON response for service identification
            let response = if let Ok(header) =
                tiny_http::Header::from_bytes(&b"Content-Type"[..], &b"application/json"[..])
            {
                tiny_http::Response::from_string(HEALTH_RESPONSE).with_header(header)
            } else {
                tiny_http::Response::from_string(HEALTH_RESPONSE)
            };
            let _ = request.respond(response);
        } else {
            // Unknown endpoint
            let response = tiny_http::Response::empty(404);
            let _ = request.respond(response);
        }
    }
}

/// Process logs via the CLI (relay-cli mode).
///
/// Spawns `mi6 ingest otel` and pipes the OTLP JSON to it.
/// Returns the number of events processed (always 0 since we don't get count back).
fn process_via_cli(body: &str) -> Result<usize> {
    let mut child = Command::new("mi6")
        .args(["ingest", "otel"])
        .stdin(Stdio::piped())
        .stderr(Stdio::piped())
        .spawn()
        .map_err(|e| anyhow::anyhow!("failed to spawn mi6: {}", e))?;

    if let Some(mut stdin) = child.stdin.take() {
        stdin
            .write_all(body.as_bytes())
            .map_err(|e| anyhow::anyhow!("failed to write to mi6 stdin: {}", e))?;
    }

    let output = child
        .wait_with_output()
        .map_err(|e| anyhow::anyhow!("failed to wait for mi6: {}", e))?;

    if !output.status.success() {
        let stderr = String::from_utf8_lossy(&output.stderr);
        anyhow::bail!("mi6 ingest otel failed: {}", stderr.trim());
    }

    // We don't know the actual count from CLI output, return 0
    // The CLI itself logs the count to stderr
    Ok(0)
}

/// Process logs using the library directly (library mode).
fn process_logs_library<S: Storage>(storage: &S, body: &str, machine_id: &str) -> usize {
    let Ok(request) = serde_json::from_str::<ExportLogsServiceRequest>(body) else {
        return 0;
    };
    process_logs_request(storage, &request, machine_id)
}