mi6_otel_server/
server.rs1use std::io::Write;
4use std::process::{Command, Stdio};
5use std::sync::atomic::{AtomicBool, Ordering};
6
7use anyhow::Result;
8
9use mi6_core::{Config, ExportLogsServiceRequest, OtelMode, Storage, process_logs_request};
10
11const HEALTH_RESPONSE: &str = r#"{"service":"mi6-otel","status":"ok"}"#;
12
13pub fn run_server<S: Storage>(storage: &S, port: u16, mode: Option<OtelMode>) -> Result<()> {
30 let config = Config::load().unwrap_or_default();
31 let machine_id = config.machine_id();
32
33 let effective_mode = mode.unwrap_or(config.otel.mode);
35
36 let using_fallback = AtomicBool::new(false);
38
39 let addr = format!("127.0.0.1:{}", port);
40 let server = tiny_http::Server::http(&addr)
41 .map_err(|e| anyhow::anyhow!("failed to start HTTP server: {}", e))?;
42
43 eprintln!(
44 "[mi6] OTel server listening on http://{} (mode: {})",
45 addr, effective_mode
46 );
47
48 loop {
49 let mut request = match server.recv() {
50 Ok(req) => req,
51 Err(e) => {
52 eprintln!("[mi6] Error receiving request: {}", e);
53 continue;
54 }
55 };
56
57 let url = request.url().to_string();
58 let method = request.method().to_string();
59
60 if method == "POST" && url == "/v1/logs" {
62 let mut body = String::new();
63 if request.as_reader().read_to_string(&mut body).is_ok() {
64 let count = match effective_mode {
65 OtelMode::RelayCli => {
66 match process_via_cli(&body) {
68 Ok(c) => c,
69 Err(e) => {
70 if !using_fallback.swap(true, Ordering::Relaxed) {
71 eprintln!(
72 "[mi6] Warning: relay-cli failed ({}), falling back to library mode",
73 e
74 );
75 }
76 process_logs_library(storage, &body, &machine_id)
77 }
78 }
79 }
80 OtelMode::Library => process_logs_library(storage, &body, &machine_id),
81 };
82
83 if count > 0 {
84 eprintln!("[mi6] Stored {} API request(s)", count);
85 }
86 }
87 let response = tiny_http::Response::empty(200);
88 let _ = request.respond(response);
89 } else if url == "/health" {
90 let response = if let Ok(header) =
92 tiny_http::Header::from_bytes(&b"Content-Type"[..], &b"application/json"[..])
93 {
94 tiny_http::Response::from_string(HEALTH_RESPONSE).with_header(header)
95 } else {
96 tiny_http::Response::from_string(HEALTH_RESPONSE)
97 };
98 let _ = request.respond(response);
99 } else {
100 let response = tiny_http::Response::empty(404);
102 let _ = request.respond(response);
103 }
104 }
105}
106
107fn process_via_cli(body: &str) -> Result<usize> {
112 let mut child = Command::new("mi6")
113 .args(["ingest", "otel"])
114 .stdin(Stdio::piped())
115 .stderr(Stdio::piped())
116 .spawn()
117 .map_err(|e| anyhow::anyhow!("failed to spawn mi6: {}", e))?;
118
119 if let Some(mut stdin) = child.stdin.take() {
120 stdin
121 .write_all(body.as_bytes())
122 .map_err(|e| anyhow::anyhow!("failed to write to mi6 stdin: {}", e))?;
123 }
124
125 let output = child
126 .wait_with_output()
127 .map_err(|e| anyhow::anyhow!("failed to wait for mi6: {}", e))?;
128
129 if !output.status.success() {
130 let stderr = String::from_utf8_lossy(&output.stderr);
131 anyhow::bail!("mi6 ingest otel failed: {}", stderr.trim());
132 }
133
134 Ok(0)
137}
138
139fn process_logs_library<S: Storage>(storage: &S, body: &str, machine_id: &str) -> usize {
141 let Ok(request) = serde_json::from_str::<ExportLogsServiceRequest>(body) else {
142 return 0;
143 };
144 process_logs_request(storage, &request, machine_id)
145}