mi6_otel_server/
server.rs

1//! HTTP server implementation for receiving OpenTelemetry logs.
2
3use std::io::Write;
4use std::process::{Command, Stdio};
5use std::sync::atomic::{AtomicBool, Ordering};
6
7use anyhow::Result;
8
9use mi6_core::{Config, ExportLogsServiceRequest, OtelMode, Storage, process_logs_request};
10
11const HEALTH_RESPONSE: &str = r#"{"service":"mi6-otel","status":"ok"}"#;
12
13/// Run the OTLP HTTP server.
14///
15/// This function starts an HTTP server on the specified port that receives
16/// OpenTelemetry logs and stores them using the provided storage backend.
17///
18/// # Arguments
19/// * `storage` - Storage backend (only used in library mode)
20/// * `port` - Port to listen on
21/// * `mode` - Processing mode (None uses config default)
22///
23/// # Endpoints
24/// - `POST /v1/logs` - Receive and process OpenTelemetry log data
25/// - `GET /health` - Health check endpoint for service identification
26///
27/// # Panics
28/// This function runs indefinitely and only returns on error.
29pub fn run_server<S: Storage>(storage: &S, port: u16, mode: Option<OtelMode>) -> Result<()> {
30    let config = Config::load().unwrap_or_default();
31    let machine_id = config.machine_id();
32
33    // Determine the effective mode
34    let effective_mode = mode.unwrap_or(config.otel.mode);
35
36    // Track if we've fallen back to library mode
37    let using_fallback = AtomicBool::new(false);
38
39    let addr = format!("127.0.0.1:{}", port);
40    let server = tiny_http::Server::http(&addr)
41        .map_err(|e| anyhow::anyhow!("failed to start HTTP server: {}", e))?;
42
43    eprintln!(
44        "[mi6] OTel server listening on http://{} (mode: {})",
45        addr, effective_mode
46    );
47
48    loop {
49        let mut request = match server.recv() {
50            Ok(req) => req,
51            Err(e) => {
52                eprintln!("[mi6] Error receiving request: {}", e);
53                continue;
54            }
55        };
56
57        let url = request.url().to_string();
58        let method = request.method().to_string();
59
60        // Handle POST /v1/logs
61        if method == "POST" && url == "/v1/logs" {
62            let mut body = String::new();
63            if request.as_reader().read_to_string(&mut body).is_ok() {
64                let count = match effective_mode {
65                    OtelMode::RelayCli => {
66                        // Try relay-cli mode first, fall back to library on failure
67                        match process_via_cli(&body) {
68                            Ok(c) => c,
69                            Err(e) => {
70                                if !using_fallback.swap(true, Ordering::Relaxed) {
71                                    eprintln!(
72                                        "[mi6] Warning: relay-cli failed ({}), falling back to library mode",
73                                        e
74                                    );
75                                }
76                                process_logs_library(storage, &body, &machine_id)
77                            }
78                        }
79                    }
80                    OtelMode::Library => process_logs_library(storage, &body, &machine_id),
81                };
82
83                if count > 0 {
84                    eprintln!("[mi6] Stored {} API request(s)", count);
85                }
86            }
87            let response = tiny_http::Response::empty(200);
88            let _ = request.respond(response);
89        } else if url == "/health" {
90            // Return specific JSON response for service identification
91            let response = if let Ok(header) =
92                tiny_http::Header::from_bytes(&b"Content-Type"[..], &b"application/json"[..])
93            {
94                tiny_http::Response::from_string(HEALTH_RESPONSE).with_header(header)
95            } else {
96                tiny_http::Response::from_string(HEALTH_RESPONSE)
97            };
98            let _ = request.respond(response);
99        } else {
100            // Unknown endpoint
101            let response = tiny_http::Response::empty(404);
102            let _ = request.respond(response);
103        }
104    }
105}
106
107/// Process logs via the CLI (relay-cli mode).
108///
109/// Spawns `mi6 ingest otel` and pipes the OTLP JSON to it.
110/// Returns the number of events processed (always 0 since we don't get count back).
111fn process_via_cli(body: &str) -> Result<usize> {
112    let mut child = Command::new("mi6")
113        .args(["ingest", "otel"])
114        .stdin(Stdio::piped())
115        .stderr(Stdio::piped())
116        .spawn()
117        .map_err(|e| anyhow::anyhow!("failed to spawn mi6: {}", e))?;
118
119    if let Some(mut stdin) = child.stdin.take() {
120        stdin
121            .write_all(body.as_bytes())
122            .map_err(|e| anyhow::anyhow!("failed to write to mi6 stdin: {}", e))?;
123    }
124
125    let output = child
126        .wait_with_output()
127        .map_err(|e| anyhow::anyhow!("failed to wait for mi6: {}", e))?;
128
129    if !output.status.success() {
130        let stderr = String::from_utf8_lossy(&output.stderr);
131        anyhow::bail!("mi6 ingest otel failed: {}", stderr.trim());
132    }
133
134    // We don't know the actual count from CLI output, return 0
135    // The CLI itself logs the count to stderr
136    Ok(0)
137}
138
139/// Process logs using the library directly (library mode).
140fn process_logs_library<S: Storage>(storage: &S, body: &str, machine_id: &str) -> usize {
141    let Ok(request) = serde_json::from_str::<ExportLogsServiceRequest>(body) else {
142        return 0;
143    };
144    process_logs_request(storage, &request, machine_id)
145}