use std::io::Write;
use std::process::{Command, Stdio};
use std::sync::atomic::{AtomicBool, Ordering};
use anyhow::Result;
use mi6_core::{Config, ExportLogsServiceRequest, OtelMode, Storage, process_logs_request};
const HEALTH_RESPONSE: &str = r#"{"service":"mi6-otel","status":"ok"}"#;
pub fn run_server<S: Storage>(storage: &S, port: u16, mode: Option<OtelMode>) -> Result<()> {
let config = Config::load().unwrap_or_default();
let machine_id = config.machine_id();
let effective_mode = mode.unwrap_or(config.otel.mode);
let using_fallback = AtomicBool::new(false);
let addr = format!("127.0.0.1:{}", port);
let server = tiny_http::Server::http(&addr)
.map_err(|e| anyhow::anyhow!("failed to start HTTP server: {}", e))?;
eprintln!(
"[mi6] OTel server listening on http://{} (mode: {})",
addr, effective_mode
);
loop {
let mut request = match server.recv() {
Ok(req) => req,
Err(e) => {
eprintln!("[mi6] Error receiving request: {}", e);
continue;
}
};
let url = request.url().to_string();
let method = request.method().to_string();
if method == "POST" && url == "/v1/logs" {
let mut body = String::new();
if request.as_reader().read_to_string(&mut body).is_ok() {
let count = match effective_mode {
OtelMode::RelayCli => {
match process_via_cli(&body) {
Ok(c) => c,
Err(e) => {
if !using_fallback.swap(true, Ordering::Relaxed) {
eprintln!(
"[mi6] Warning: relay-cli failed ({}), falling back to library mode",
e
);
}
process_logs_library(storage, &body, &machine_id)
}
}
}
OtelMode::Library => process_logs_library(storage, &body, &machine_id),
};
if count > 0 {
eprintln!("[mi6] Stored {} API request(s)", count);
}
}
let response = tiny_http::Response::empty(200);
let _ = request.respond(response);
} else if url == "/health" {
let response = if let Ok(header) =
tiny_http::Header::from_bytes(&b"Content-Type"[..], &b"application/json"[..])
{
tiny_http::Response::from_string(HEALTH_RESPONSE).with_header(header)
} else {
tiny_http::Response::from_string(HEALTH_RESPONSE)
};
let _ = request.respond(response);
} else {
let response = tiny_http::Response::empty(404);
let _ = request.respond(response);
}
}
}
fn process_via_cli(body: &str) -> Result<usize> {
let mut child = Command::new("mi6")
.args(["ingest", "otel"])
.stdin(Stdio::piped())
.stderr(Stdio::piped())
.spawn()
.map_err(|e| anyhow::anyhow!("failed to spawn mi6: {}", e))?;
if let Some(mut stdin) = child.stdin.take() {
stdin
.write_all(body.as_bytes())
.map_err(|e| anyhow::anyhow!("failed to write to mi6 stdin: {}", e))?;
}
let output = child
.wait_with_output()
.map_err(|e| anyhow::anyhow!("failed to wait for mi6: {}", e))?;
if !output.status.success() {
let stderr = String::from_utf8_lossy(&output.stderr);
anyhow::bail!("mi6 ingest otel failed: {}", stderr.trim());
}
Ok(0)
}
fn process_logs_library<S: Storage>(storage: &S, body: &str, machine_id: &str) -> usize {
let Ok(request) = serde_json::from_str::<ExportLogsServiceRequest>(body) else {
return 0;
};
process_logs_request(storage, &request, machine_id)
}