Skip to main content

modkit/backends/
log_forwarder.rs

1//! Log forwarding for `OoP` module stdout/stderr
2//!
3//! This module provides utilities for capturing stdout/stderr from child processes
4//! and forwarding each line to the parent's tracing system with proper context.
5
6use serde_json::Value;
7use tokio::io::{AsyncBufReadExt, AsyncRead, BufReader};
8use tokio::task::JoinHandle;
9use tokio_util::sync::CancellationToken;
10use tracing::Level;
11use uuid::Uuid;
12
13/// Stream type identifier for logging
14#[derive(Debug, Clone, Copy)]
15pub enum StreamKind {
16    Stdout,
17    Stderr,
18}
19
20impl std::fmt::Display for StreamKind {
21    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
22        match self {
23            StreamKind::Stdout => write!(f, "stdout"),
24            StreamKind::Stderr => write!(f, "stderr"),
25        }
26    }
27}
28
29/// Detect log level from a tracing-subscriber formatted line.
30///
31/// Supports two formats:
32///
33/// 1. Plain text format (tracing-subscriber default):
34/// ```text
35/// 2025-12-08T00:10:18.2852399Z  INFO module_name: message
36/// 2025-12-08T00:10:18.2852399Z DEBUG module_name: message
37/// ```
38///
39/// 2. JSON format (tracing-subscriber with json layer):
40/// ```json
41/// {"timestamp":"2025-12-09T21:09:40Z","level":"INFO","fields":{"message":"..."},"target":"..."}
42/// {"timestamp":"2025-12-09T21:09:40Z","level":"DEBUG","fields":{"message":"..."},"target":"..."}
43/// ```
44///
45/// Returns INFO as the default for unrecognized formats.
46fn detect_log_level(line: &str) -> Level {
47    if let Some(level) = detect_json_level(line) {
48        return level;
49    }
50    if let Some(level) = detect_plain_level(line) {
51        return level;
52    }
53    Level::INFO
54}
55
56fn detect_plain_level(line: &str) -> Option<Level> {
57    let mut parts = line.split_whitespace();
58    let _timestamp = parts.next()?;
59    let level_str = parts.next()?;
60
61    match level_str {
62        "ERROR" | "error" => Some(Level::ERROR),
63        "WARN" | "warn" => Some(Level::WARN),
64        "INFO" | "info" => Some(Level::INFO),
65        "DEBUG" | "debug" => Some(Level::DEBUG),
66        "TRACE" | "trace" => Some(Level::TRACE),
67        _ => None,
68    }
69}
70
71fn detect_json_level(line: &str) -> Option<Level> {
72    let trimmed = line.trim_start();
73    if !trimmed.starts_with('{') || !trimmed.contains("\"level\"") {
74        return None;
75    }
76
77    let v: Value = serde_json::from_str(trimmed).ok()?;
78    let level = v.get("level")?.as_str()?.to_ascii_lowercase();
79
80    match level.as_str() {
81        "error" => Some(Level::ERROR),
82        "warn" => Some(Level::WARN),
83        "info" => Some(Level::INFO),
84        "debug" => Some(Level::DEBUG),
85        "trace" => Some(Level::TRACE),
86        _ => None,
87    }
88}
89
90/// Forward a single line to tracing with the detected level.
91///
92/// Uses dynamic dispatch via `tracing::event!` macro with appropriate level.
93fn forward_line(module: &str, instance_id: Uuid, stream: StreamKind, line: &str) {
94    let level = detect_log_level(line);
95
96    match level {
97        Level::ERROR => {
98            tracing::error!(
99                oop_module = %module,
100                oop_instance_id = %instance_id,
101                stream = %stream,
102                "{line}"
103            );
104        }
105        Level::WARN => {
106            tracing::warn!(
107                oop_module = %module,
108                oop_instance_id = %instance_id,
109                stream = %stream,
110                "{line}"
111            );
112        }
113        Level::INFO => {
114            tracing::info!(
115                oop_module = %module,
116                oop_instance_id = %instance_id,
117                stream = %stream,
118                "{line}"
119            );
120        }
121        Level::DEBUG => {
122            tracing::debug!(
123                oop_module = %module,
124                oop_instance_id = %instance_id,
125                stream = %stream,
126                "{line}"
127            );
128        }
129        Level::TRACE => {
130            tracing::trace!(
131                oop_module = %module,
132                oop_instance_id = %instance_id,
133                stream = %stream,
134                "{line}"
135            );
136        }
137    }
138}
139
140/// Spawn a task that reads lines from stdout and forwards them to tracing.
141///
142/// The task will run until either:
143/// - The stream is closed (child process exits)
144/// - The cancellation token is triggered
145pub fn spawn_stream_forwarder<S>(
146    stream: S,
147    module: String,
148    instance_id: Uuid,
149    cancel: CancellationToken,
150    kind: StreamKind,
151) -> JoinHandle<()>
152where
153    S: AsyncRead + Unpin + Send + 'static,
154{
155    tokio::spawn(async move {
156        let reader = BufReader::new(stream);
157        let mut lines = reader.lines();
158
159        loop {
160            tokio::select! {
161                biased;
162
163                () = cancel.cancelled() => {
164                    tracing::debug!(
165                        oop_module = %module,
166                        oop_instance_id = %instance_id,
167                        stream = ?kind,
168                        "log forwarder cancelled"
169                    );
170                    break;
171                }
172
173                result = lines.next_line() => {
174                    match result {
175                        Ok(Some(line)) => {
176                            forward_line(&module, instance_id, kind, &line);
177                        }
178                        Ok(None) => {
179                            tracing::debug!(
180                                oop_module = %module,
181                                oop_instance_id = %instance_id,
182                                stream = ?kind,
183                                "log stream closed"
184                            );
185                            break;
186                        }
187                        Err(e) => {
188                            tracing::warn!(
189                                oop_module = %module,
190                                oop_instance_id = %instance_id,
191                                stream = ?kind,
192                                error = %e,
193                                "log stream read error"
194                            );
195                            break;
196                        }
197                    }
198                }
199            }
200        }
201    })
202}
203
204#[cfg(test)]
205mod tests {
206    use super::*;
207
208    #[test]
209    fn test_detect_log_level_tracing_subscriber_format() {
210        // Real tracing-subscriber format examples
211        assert_eq!(
212            detect_log_level("2025-12-08T00:10:18.2852399Z  INFO hyperspot_server: shutdown"),
213            Level::INFO
214        );
215        assert_eq!(
216            detect_log_level(
217                "2025-12-08T00:10:18.2861457Z DEBUG modkit::bootstrap::backends::local: Sending termination signal"
218            ),
219            Level::DEBUG
220        );
221        assert_eq!(
222            detect_log_level("2025-12-08T00:10:18.2852399Z  WARN some_module: warning message"),
223            Level::WARN
224        );
225        assert_eq!(
226            detect_log_level("2025-12-08T00:10:18.2852399Z ERROR some_module: error message"),
227            Level::ERROR
228        );
229        assert_eq!(
230            detect_log_level("2025-12-08T00:10:18.2852399Z TRACE some_module: trace message"),
231            Level::TRACE
232        );
233    }
234
235    #[test]
236    fn test_detect_log_level_with_spans() {
237        // tracing-subscriber with span context
238        assert_eq!(
239            detect_log_level(
240                "2025-12-08T00:10:18.2864778Z DEBUG stop:stop: modkit::lifecycle: lifecycle task completed"
241            ),
242            Level::DEBUG
243        );
244        assert_eq!(
245            detect_log_level(
246                "2025-12-08T00:10:18.2865251Z  INFO stop:stop: modkit::lifecycle: lifecycle stopped"
247            ),
248            Level::INFO
249        );
250    }
251
252    #[test]
253    fn test_detect_log_level_default() {
254        // Lines without recognized level pattern default to INFO
255        assert_eq!(detect_log_level("some random line"), Level::INFO);
256        assert_eq!(detect_log_level(""), Level::INFO);
257        assert_eq!(detect_log_level("Starting server..."), Level::INFO);
258    }
259
260    #[test]
261    fn test_detect_log_level_json_format() {
262        // JSON format with uppercase level
263        assert_eq!(
264            detect_log_level(
265                r#"{"timestamp":"2025-12-09T21:09:40.0028859Z","level":"INFO","fields":{"message":"test"},"target":"module"}"#
266            ),
267            Level::INFO
268        );
269        assert_eq!(
270            detect_log_level(
271                r#"{"timestamp":"2025-12-09T21:09:40.0028859Z","level":"DEBUG","fields":{"message":"test"},"target":"module"}"#
272            ),
273            Level::DEBUG
274        );
275        assert_eq!(
276            detect_log_level(
277                r#"{"timestamp":"2025-12-09T21:09:40.0028859Z","level":"WARN","fields":{"message":"test"},"target":"module"}"#
278            ),
279            Level::WARN
280        );
281        assert_eq!(
282            detect_log_level(
283                r#"{"timestamp":"2025-12-09T21:09:40.0028859Z","level":"ERROR","fields":{"message":"test"},"target":"module"}"#
284            ),
285            Level::ERROR
286        );
287        assert_eq!(
288            detect_log_level(
289                r#"{"timestamp":"2025-12-09T21:09:40.0028859Z","level":"TRACE","fields":{"message":"test"},"target":"module"}"#
290            ),
291            Level::TRACE
292        );
293    }
294
295    #[test]
296    fn test_detect_log_level_json_format_lowercase() {
297        // JSON format with lowercase level (some loggers use lowercase)
298        assert_eq!(
299            detect_log_level(r#"{"level":"info","message":"test"}"#),
300            Level::INFO
301        );
302        assert_eq!(
303            detect_log_level(r#"{"level":"debug","message":"test"}"#),
304            Level::DEBUG
305        );
306        assert_eq!(
307            detect_log_level(r#"{"level":"warn","message":"test"}"#),
308            Level::WARN
309        );
310        assert_eq!(
311            detect_log_level(r#"{"level":"error","message":"test"}"#),
312            Level::ERROR
313        );
314    }
315
316    #[test]
317    fn test_stream_kind_display() {
318        assert_eq!(format!("{}", StreamKind::Stdout), "stdout");
319        assert_eq!(format!("{}", StreamKind::Stderr), "stderr");
320    }
321}