Skip to main content

solti_exec/subprocess/
logger.rs

1//! # Logger: subprocess output stream processing.
2//!
3//! Captures stdout/stderr from a spawned subprocess, truncates long lines, and emits them via `tracing` at configurable log levels.
4//!
5//! ## How it fits
6//! ```text
7//! run_subprocess()
8//!     ├──► child.stdout.take() ──► tokio::spawn(log_stream(Stdout))
9//!     └──► child.stderr.take() ──► tokio::spawn(log_stream(Stderr))
10//!
11//! log_stream(reader, run_id, stream_kind, config)
12//!     └──► for each line:
13//!           ├──► truncate_line(line, max_chars)
14//!           │     ├──► short? → Cow::Borrowed (zero alloc)
15//!           │     └──► long?  → Cow::Owned("prefix... (truncated N bytes)")
16//!           │
17//!           └──► emit via tracing:
18//!                 ├──► stdout + stdout_info  → info!
19//!                 ├──► stderr + stderr_warn  → warn!
20//!                 └──► otherwise             → debug!
21//! ```
22//!
23//! ## Configuration
24//!
25//! | Field             | Default | What it does                       |
26//! |-------------------|---------|------------------------------------|
27//! | `max_line_length` | 4096    | truncate lines beyond this (chars) |
28//! | `stdout_info`     | true    | log stdout at INFO (else DEBUG)    |
29//! | `stderr_warn`     | true    | log stderr at WARN (else DEBUG)    |
30
31use std::borrow::Cow;
32
33use bytes::Bytes;
34use solti_runner::OutputSink;
35use tokio::io::{AsyncBufReadExt, AsyncReadExt, BufReader};
36use tracing::{debug, info, warn};
37
38/// Configuration for subprocess output logging.
39///
40/// ## Also
41///
42/// - [`SubprocessBackendConfig`](super::SubprocessBackendConfig) carries `LogConfig` as a field.
43/// - `log_stream` async function that reads + truncates + emits lines.
44#[derive(Debug, Clone, Copy)]
45pub struct LogConfig {
46    /// Max line length (in Unicode chars) before truncation of the emitted line.
47    pub max_line_length: usize,
48    /// Hard byte cap per line; bytes past it are drained until next `\n`.
49    pub max_line_bytes: usize,
50    /// Log stdout at INFO level (`false` = DEBUG).
51    pub stdout_info: bool,
52    /// Log stderr at WARN level (`false` = DEBUG).
53    pub stderr_warn: bool,
54}
55
56impl Default for LogConfig {
57    fn default() -> Self {
58        Self {
59            max_line_bytes: 64 * 1024,
60            max_line_length: 4096,
61            stdout_info: true,
62            stderr_warn: true,
63        }
64    }
65}
66
67/// Subprocess output stream kind.
68#[derive(Debug, Clone, Copy)]
69pub(crate) enum StreamKind {
70    Stdout,
71    Stderr,
72}
73
74impl StreamKind {
75    pub(crate) fn as_str(self) -> &'static str {
76        match self {
77            Self::Stdout => "stdout",
78            Self::Stderr => "stderr",
79        }
80    }
81
82    fn use_elevated_level(self, config: &LogConfig) -> bool {
83        match self {
84            Self::Stdout => config.stdout_info,
85            Self::Stderr => config.stderr_warn,
86        }
87    }
88}
89
90/// Log subprocess output stream line-by-line with truncation.
91pub(crate) async fn log_stream<R>(
92    reader: R,
93    run_id: &str,
94    stream: StreamKind,
95    config: &LogConfig,
96    output_sink: Option<&OutputSink>,
97) where
98    R: tokio::io::AsyncRead + Unpin,
99{
100    let mut reader = BufReader::new(reader);
101    let stream_name = stream.as_str();
102    let mut line_count = 0u64;
103    let mut buf: Vec<u8> = Vec::with_capacity(256);
104
105    loop {
106        buf.clear();
107        let read_result = (&mut reader)
108            .take(config.max_line_bytes as u64)
109            .read_until(b'\n', &mut buf)
110            .await;
111
112        let bytes_read = match read_result {
113            Ok(0) => break,
114            Ok(n) => n,
115            Err(e) => {
116                warn!(
117                    task = %run_id,
118                    stream = %stream_name,
119                    error = %e,
120                    line_num = line_count,
121                    "error while reading subprocess stream"
122                );
123                break;
124            }
125        };
126
127        let hit_cap = bytes_read == config.max_line_bytes && !buf.ends_with(b"\n");
128        if buf.ends_with(b"\n") {
129            buf.pop();
130            if buf.ends_with(b"\r") {
131                buf.pop();
132            }
133        }
134        let raw_line = String::from_utf8_lossy(&buf).into_owned();
135        let raw_line = if hit_cap {
136            format!(
137                "{raw_line} ...[line exceeded {} bytes, truncated]",
138                config.max_line_bytes
139            )
140        } else {
141            raw_line
142        };
143
144        if hit_cap {
145            let mut scratch = [0u8; 8 * 1024];
146            loop {
147                let drained = match reader.read(&mut scratch).await {
148                    Ok(0) => break,
149                    Ok(n) => n,
150                    Err(_) => break,
151                };
152                if let Some(nl) = scratch[..drained].iter().position(|&b| b == b'\n') {
153                    let _ = nl;
154                    break;
155                }
156            }
157        }
158
159        let line = truncate_line(&raw_line, config.max_line_length);
160        line_count += 1;
161
162        if stream.use_elevated_level(config) {
163            match stream {
164                StreamKind::Stdout => info!(
165                    task = %run_id,
166                    stream = %stream_name,
167                    line_num = line_count,
168                    "{}",
169                    line
170                ),
171                StreamKind::Stderr => warn!(
172                    task = %run_id,
173                    stream = %stream_name,
174                    line_num = line_count,
175                    "{}",
176                    line
177                ),
178            }
179        } else {
180            debug!(
181                task = %run_id,
182                stream = %stream_name,
183                line_num = line_count,
184                "{}",
185                line
186            );
187        }
188
189        if let Some(sink) = output_sink {
190            let bytes_line: Bytes = match line {
191                Cow::Borrowed(s) => Bytes::copy_from_slice(s.as_bytes()),
192                Cow::Owned(s) => Bytes::from(s),
193            };
194            match stream {
195                StreamKind::Stdout => sink.stdout_line(bytes_line),
196                StreamKind::Stderr => sink.stderr_line(bytes_line),
197            }
198        }
199    }
200
201    debug!(
202        task = %run_id,
203        stream = %stream_name,
204        total_lines = line_count,
205        "stream closed"
206    );
207}
208
209/// Truncate line by Unicode scalar count, safe for UTF-8.
210///
211/// Returns `Cow::Borrowed` when no truncation is needed (zero-alloc for the common case).
212/// Reports truncated bytes (O(1)) instead of chars to avoid scanning the entire tail.
213pub(crate) fn truncate_line(line: &str, max_chars: usize) -> Cow<'_, str> {
214    match line.char_indices().nth(max_chars) {
215        None => Cow::Borrowed(line),
216        Some((i, _)) => {
217            let skipped_bytes = line.len() - i;
218            Cow::Owned(format!(
219                "{}... (truncated {skipped_bytes} bytes)",
220                &line[..i]
221            ))
222        }
223    }
224}
225
226#[cfg(test)]
227mod tests {
228    use super::*;
229
230    use solti_model::OutputEvent;
231    use solti_runner::OutputSink;
232    use tokio::sync::broadcast;
233
234    #[test]
235    fn truncate_line_short_line_borrowed() {
236        let result = truncate_line("hello", 10);
237        assert!(matches!(result, Cow::Borrowed(_)));
238        assert_eq!(&*result, "hello");
239    }
240
241    #[test]
242    fn truncate_line_exact_length_borrowed() {
243        let result = truncate_line("hello", 5);
244        assert!(matches!(result, Cow::Borrowed(_)));
245        assert_eq!(&*result, "hello");
246    }
247
248    #[test]
249    fn truncate_line_truncates_long_line() {
250        let result = truncate_line("hello world", 5);
251        assert!(matches!(result, Cow::Owned(_)));
252        assert_eq!(&*result, "hello... (truncated 6 bytes)");
253    }
254
255    #[test]
256    fn truncate_line_empty_string_borrowed() {
257        let result = truncate_line("", 10);
258        assert!(matches!(result, Cow::Borrowed(_)));
259        assert_eq!(&*result, "");
260    }
261
262    #[test]
263    fn truncate_line_unicode_cyrillic() {
264        let result = truncate_line("привет", 2);
265        assert_eq!(&*result, "пр... (truncated 8 bytes)");
266    }
267
268    #[test]
269    fn truncate_line_unicode_hebrew() {
270        let result = truncate_line("שלום", 2);
271        assert_eq!(&*result, "של... (truncated 4 bytes)");
272    }
273
274    #[test]
275    fn truncate_line_single_char_limit() {
276        let result = truncate_line("abc", 1);
277        assert_eq!(&*result, "a... (truncated 2 bytes)");
278    }
279
280    #[tokio::test]
281    async fn log_stream_pushes_each_stdout_line_to_sink() {
282        let (tx, mut rx) = broadcast::channel::<OutputEvent>(16);
283        let sink = OutputSink::new(tx, 1);
284
285        let reader = "alpha\nbeta\ngamma\n".as_bytes();
286        log_stream(
287            reader,
288            "task-1",
289            StreamKind::Stdout,
290            &LogConfig::default(),
291            Some(&sink),
292        )
293        .await;
294
295        let mut lines = Vec::new();
296        while let Ok(ev) = rx.try_recv() {
297            if let OutputEvent::Chunk(c) = ev {
298                assert_eq!(c.stream, solti_model::StreamKind::Stdout);
299                lines.push(std::str::from_utf8(&c.line).unwrap().to_string());
300            }
301        }
302        assert_eq!(lines, vec!["alpha", "beta", "gamma"]);
303    }
304
305    #[tokio::test]
306    async fn log_stream_pushes_stderr_line_with_stderr_kind() {
307        let (tx, mut rx) = broadcast::channel::<OutputEvent>(16);
308        let sink = OutputSink::new(tx, 1);
309
310        log_stream(
311            "boom\n".as_bytes(),
312            "task-2",
313            StreamKind::Stderr,
314            &LogConfig::default(),
315            Some(&sink),
316        )
317        .await;
318
319        match rx.recv().await.unwrap() {
320            OutputEvent::Chunk(c) => {
321                assert_eq!(c.stream, solti_model::StreamKind::Stderr);
322                assert_eq!(&c.line[..], b"boom");
323            }
324            other => panic!("expected Chunk, got {other:?}"),
325        }
326    }
327
328    #[tokio::test]
329    async fn log_stream_pushes_truncated_line_not_raw() {
330        let cfg = LogConfig {
331            max_line_length: 5,
332            ..LogConfig::default()
333        };
334        let (tx, mut rx) = broadcast::channel::<OutputEvent>(16);
335        let sink = OutputSink::new(tx, 1);
336
337        log_stream(
338            "hello world\n".as_bytes(),
339            "task-3",
340            StreamKind::Stdout,
341            &cfg,
342            Some(&sink),
343        )
344        .await;
345
346        match rx.recv().await.unwrap() {
347            OutputEvent::Chunk(c) => {
348                let line_text = std::str::from_utf8(&c.line).expect("line must be UTF-8");
349                assert!(
350                    line_text.starts_with("hello"),
351                    "expected truncated, got {line_text:?}"
352                );
353                assert!(line_text.contains("truncated"));
354            }
355            other => panic!("expected Chunk, got {other:?}"),
356        }
357    }
358
359    #[tokio::test]
360    async fn log_stream_with_none_sink_is_a_noop_for_subscribers() {
361        log_stream(
362            "noisy\n".as_bytes(),
363            "task-4",
364            StreamKind::Stdout,
365            &LogConfig::default(),
366            None,
367        )
368        .await;
369    }
370}