1use serde_json::Value;
7use tokio::io::{AsyncBufReadExt, AsyncRead, BufReader};
8use tokio::task::JoinHandle;
9use tokio_util::sync::CancellationToken;
10use tracing::Level;
11use uuid::Uuid;
12
13#[derive(Debug, Clone, Copy)]
15pub enum StreamKind {
16 Stdout,
17 Stderr,
18}
19
20impl std::fmt::Display for StreamKind {
21 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
22 match self {
23 StreamKind::Stdout => write!(f, "stdout"),
24 StreamKind::Stderr => write!(f, "stderr"),
25 }
26 }
27}
28
29fn detect_log_level(line: &str) -> Level {
47 if let Some(level) = detect_json_level(line) {
48 return level;
49 }
50 if let Some(level) = detect_plain_level(line) {
51 return level;
52 }
53 Level::INFO
54}
55
56fn detect_plain_level(line: &str) -> Option<Level> {
57 let mut parts = line.split_whitespace();
58 let _timestamp = parts.next()?;
59 let level_str = parts.next()?;
60
61 match level_str {
62 "ERROR" | "error" => Some(Level::ERROR),
63 "WARN" | "warn" => Some(Level::WARN),
64 "INFO" | "info" => Some(Level::INFO),
65 "DEBUG" | "debug" => Some(Level::DEBUG),
66 "TRACE" | "trace" => Some(Level::TRACE),
67 _ => None,
68 }
69}
70
71fn detect_json_level(line: &str) -> Option<Level> {
72 let trimmed = line.trim_start();
73 if !trimmed.starts_with('{') || !trimmed.contains("\"level\"") {
74 return None;
75 }
76
77 let v: Value = serde_json::from_str(trimmed).ok()?;
78 let level = v.get("level")?.as_str()?.to_ascii_lowercase();
79
80 match level.as_str() {
81 "error" => Some(Level::ERROR),
82 "warn" => Some(Level::WARN),
83 "info" => Some(Level::INFO),
84 "debug" => Some(Level::DEBUG),
85 "trace" => Some(Level::TRACE),
86 _ => None,
87 }
88}
89
90fn forward_line(module: &str, instance_id: Uuid, stream: StreamKind, line: &str) {
94 let level = detect_log_level(line);
95
96 match level {
97 Level::ERROR => {
98 tracing::error!(
99 oop_module = %module,
100 oop_instance_id = %instance_id,
101 stream = %stream,
102 "{line}"
103 );
104 }
105 Level::WARN => {
106 tracing::warn!(
107 oop_module = %module,
108 oop_instance_id = %instance_id,
109 stream = %stream,
110 "{line}"
111 );
112 }
113 Level::INFO => {
114 tracing::info!(
115 oop_module = %module,
116 oop_instance_id = %instance_id,
117 stream = %stream,
118 "{line}"
119 );
120 }
121 Level::DEBUG => {
122 tracing::debug!(
123 oop_module = %module,
124 oop_instance_id = %instance_id,
125 stream = %stream,
126 "{line}"
127 );
128 }
129 Level::TRACE => {
130 tracing::trace!(
131 oop_module = %module,
132 oop_instance_id = %instance_id,
133 stream = %stream,
134 "{line}"
135 );
136 }
137 }
138}
139
140pub fn spawn_stream_forwarder<S>(
146 stream: S,
147 module: String,
148 instance_id: Uuid,
149 cancel: CancellationToken,
150 kind: StreamKind,
151) -> JoinHandle<()>
152where
153 S: AsyncRead + Unpin + Send + 'static,
154{
155 tokio::spawn(async move {
156 let reader = BufReader::new(stream);
157 let mut lines = reader.lines();
158
159 loop {
160 tokio::select! {
161 biased;
162
163 () = cancel.cancelled() => {
164 tracing::debug!(
165 oop_module = %module,
166 oop_instance_id = %instance_id,
167 stream = ?kind,
168 "log forwarder cancelled"
169 );
170 break;
171 }
172
173 result = lines.next_line() => {
174 match result {
175 Ok(Some(line)) => {
176 forward_line(&module, instance_id, kind, &line);
177 }
178 Ok(None) => {
179 tracing::debug!(
180 oop_module = %module,
181 oop_instance_id = %instance_id,
182 stream = ?kind,
183 "log stream closed"
184 );
185 break;
186 }
187 Err(e) => {
188 tracing::warn!(
189 oop_module = %module,
190 oop_instance_id = %instance_id,
191 stream = ?kind,
192 error = %e,
193 "log stream read error"
194 );
195 break;
196 }
197 }
198 }
199 }
200 }
201 })
202}
203
204#[cfg(test)]
205mod tests {
206 use super::*;
207
208 #[test]
209 fn test_detect_log_level_tracing_subscriber_format() {
210 assert_eq!(
212 detect_log_level("2025-12-08T00:10:18.2852399Z INFO hyperspot_server: shutdown"),
213 Level::INFO
214 );
215 assert_eq!(
216 detect_log_level(
217 "2025-12-08T00:10:18.2861457Z DEBUG modkit::bootstrap::backends::local: Sending termination signal"
218 ),
219 Level::DEBUG
220 );
221 assert_eq!(
222 detect_log_level("2025-12-08T00:10:18.2852399Z WARN some_module: warning message"),
223 Level::WARN
224 );
225 assert_eq!(
226 detect_log_level("2025-12-08T00:10:18.2852399Z ERROR some_module: error message"),
227 Level::ERROR
228 );
229 assert_eq!(
230 detect_log_level("2025-12-08T00:10:18.2852399Z TRACE some_module: trace message"),
231 Level::TRACE
232 );
233 }
234
235 #[test]
236 fn test_detect_log_level_with_spans() {
237 assert_eq!(
239 detect_log_level(
240 "2025-12-08T00:10:18.2864778Z DEBUG stop:stop: modkit::lifecycle: lifecycle task completed"
241 ),
242 Level::DEBUG
243 );
244 assert_eq!(
245 detect_log_level(
246 "2025-12-08T00:10:18.2865251Z INFO stop:stop: modkit::lifecycle: lifecycle stopped"
247 ),
248 Level::INFO
249 );
250 }
251
252 #[test]
253 fn test_detect_log_level_default() {
254 assert_eq!(detect_log_level("some random line"), Level::INFO);
256 assert_eq!(detect_log_level(""), Level::INFO);
257 assert_eq!(detect_log_level("Starting server..."), Level::INFO);
258 }
259
260 #[test]
261 fn test_detect_log_level_json_format() {
262 assert_eq!(
264 detect_log_level(
265 r#"{"timestamp":"2025-12-09T21:09:40.0028859Z","level":"INFO","fields":{"message":"test"},"target":"module"}"#
266 ),
267 Level::INFO
268 );
269 assert_eq!(
270 detect_log_level(
271 r#"{"timestamp":"2025-12-09T21:09:40.0028859Z","level":"DEBUG","fields":{"message":"test"},"target":"module"}"#
272 ),
273 Level::DEBUG
274 );
275 assert_eq!(
276 detect_log_level(
277 r#"{"timestamp":"2025-12-09T21:09:40.0028859Z","level":"WARN","fields":{"message":"test"},"target":"module"}"#
278 ),
279 Level::WARN
280 );
281 assert_eq!(
282 detect_log_level(
283 r#"{"timestamp":"2025-12-09T21:09:40.0028859Z","level":"ERROR","fields":{"message":"test"},"target":"module"}"#
284 ),
285 Level::ERROR
286 );
287 assert_eq!(
288 detect_log_level(
289 r#"{"timestamp":"2025-12-09T21:09:40.0028859Z","level":"TRACE","fields":{"message":"test"},"target":"module"}"#
290 ),
291 Level::TRACE
292 );
293 }
294
295 #[test]
296 fn test_detect_log_level_json_format_lowercase() {
297 assert_eq!(
299 detect_log_level(r#"{"level":"info","message":"test"}"#),
300 Level::INFO
301 );
302 assert_eq!(
303 detect_log_level(r#"{"level":"debug","message":"test"}"#),
304 Level::DEBUG
305 );
306 assert_eq!(
307 detect_log_level(r#"{"level":"warn","message":"test"}"#),
308 Level::WARN
309 );
310 assert_eq!(
311 detect_log_level(r#"{"level":"error","message":"test"}"#),
312 Level::ERROR
313 );
314 }
315
316 #[test]
317 fn test_stream_kind_display() {
318 assert_eq!(format!("{}", StreamKind::Stdout), "stdout");
319 assert_eq!(format!("{}", StreamKind::Stderr), "stderr");
320 }
321}