Skip to main content

solti_exec/subprocess/
logger.rs

1//! # Logger: subprocess output stream processing.
2//!
3//! Captures stdout/stderr from a spawned subprocess, truncates long lines, and emits them via `tracing` at configurable log levels.
4//!
5//! ## How it fits
6//! ```text
7//! run_subprocess()
8//!     ├──► child.stdout.take() ──► tokio::spawn(log_stream(Stdout))
9//!     └──► child.stderr.take() ──► tokio::spawn(log_stream(Stderr))
10//!
11//! log_stream(reader, run_id, stream_kind, config)
12//!     └──► for each line:
13//!           ├──► truncate_line(line, max_chars)
14//!           │     ├──► short? → Cow::Borrowed (zero alloc)
15//!           │     └──► long?  → Cow::Owned("prefix... (truncated N bytes)")
16//!           │
17//!           └──► emit via tracing:
18//!                 ├──► stdout + stdout_info  → info!
19//!                 ├──► stderr + stderr_warn  → warn!
20//!                 └──► otherwise             → debug!
21//! ```
22//!
23//! ## Configuration
24//!
25//! | Field             | Default | What it does                       |
26//! |-------------------|---------|------------------------------------|
27//! | `max_line_length` | 4096    | truncate lines beyond this (chars) |
28//! | `stdout_info`     | true    | log stdout at INFO (else DEBUG)    |
29//! | `stderr_warn`     | true    | log stderr at WARN (else DEBUG)    |
30
31use std::borrow::Cow;
32
33use tokio::io::{AsyncBufReadExt, AsyncReadExt, BufReader};
34use tracing::{debug, info, warn};
35
36/// Configuration for subprocess output logging.
37///
38/// ## Also
39///
40/// - [`SubprocessBackendConfig`](super::SubprocessBackendConfig) carries `LogConfig` as a field.
41/// - `log_stream` async function that reads + truncates + emits lines.
42#[derive(Debug, Clone, Copy)]
43pub struct LogConfig {
44    /// Max line length (in Unicode chars) before truncation of the emitted line.
45    pub max_line_length: usize,
46    /// Hard byte cap per line; bytes past it are drained until next `\n`.
47    pub max_line_bytes: usize,
48    /// Log stdout at INFO level (`false` = DEBUG).
49    pub stdout_info: bool,
50    /// Log stderr at WARN level (`false` = DEBUG).
51    pub stderr_warn: bool,
52}
53
54impl Default for LogConfig {
55    fn default() -> Self {
56        Self {
57            max_line_bytes: 64 * 1024,
58            max_line_length: 4096,
59            stdout_info: true,
60            stderr_warn: true,
61        }
62    }
63}
64
65/// Subprocess output stream kind.
66#[derive(Debug, Clone, Copy)]
67pub(crate) enum StreamKind {
68    Stdout,
69    Stderr,
70}
71
72impl StreamKind {
73    pub(crate) fn as_str(self) -> &'static str {
74        match self {
75            Self::Stdout => "stdout",
76            Self::Stderr => "stderr",
77        }
78    }
79
80    fn use_elevated_level(self, config: &LogConfig) -> bool {
81        match self {
82            Self::Stdout => config.stdout_info,
83            Self::Stderr => config.stderr_warn,
84        }
85    }
86}
87
88/// Log subprocess output stream line-by-line with truncation.
89///
90/// Spawned as a tokio task per stream (stdout / stderr).
91/// Reads lines until EOF or error, emitting each via `tracing`.
92pub(crate) async fn log_stream<R>(reader: R, run_id: &str, stream: StreamKind, config: &LogConfig)
93where
94    R: tokio::io::AsyncRead + Unpin,
95{
96    let mut reader = BufReader::new(reader);
97    let stream_name = stream.as_str();
98    let mut line_count = 0u64;
99    let mut buf: Vec<u8> = Vec::with_capacity(256);
100
101    loop {
102        buf.clear();
103        let read_result = (&mut reader)
104            .take(config.max_line_bytes as u64)
105            .read_until(b'\n', &mut buf)
106            .await;
107
108        let bytes_read = match read_result {
109            Ok(0) => break,
110            Ok(n) => n,
111            Err(e) => {
112                warn!(
113                    task = %run_id,
114                    stream = %stream_name,
115                    error = %e,
116                    line_num = line_count,
117                    "error while reading subprocess stream"
118                );
119                break;
120            }
121        };
122
123        let hit_cap = bytes_read == config.max_line_bytes && !buf.ends_with(b"\n");
124        if buf.ends_with(b"\n") {
125            buf.pop();
126            if buf.ends_with(b"\r") {
127                buf.pop();
128            }
129        }
130        let raw_line = String::from_utf8_lossy(&buf).into_owned();
131        let raw_line = if hit_cap {
132            format!(
133                "{raw_line} ...[line exceeded {} bytes, truncated]",
134                config.max_line_bytes
135            )
136        } else {
137            raw_line
138        };
139
140        if hit_cap {
141            let mut scratch = [0u8; 8 * 1024];
142            loop {
143                let drained = match reader.read(&mut scratch).await {
144                    Ok(0) => break,
145                    Ok(n) => n,
146                    Err(_) => break,
147                };
148                if let Some(nl) = scratch[..drained].iter().position(|&b| b == b'\n') {
149                    let _ = nl;
150                    break;
151                }
152            }
153        }
154
155        let line = truncate_line(&raw_line, config.max_line_length);
156        line_count += 1;
157
158        if stream.use_elevated_level(config) {
159            match stream {
160                StreamKind::Stdout => info!(
161                    task = %run_id,
162                    stream = %stream_name,
163                    line_num = line_count,
164                    "{}",
165                    line
166                ),
167                StreamKind::Stderr => warn!(
168                    task = %run_id,
169                    stream = %stream_name,
170                    line_num = line_count,
171                    "{}",
172                    line
173                ),
174            }
175        } else {
176            debug!(
177                task = %run_id,
178                stream = %stream_name,
179                line_num = line_count,
180                "{}",
181                line
182            );
183        }
184    }
185
186    debug!(
187        task = %run_id,
188        stream = %stream_name,
189        total_lines = line_count,
190        "stream closed"
191    );
192}
193
194/// Truncate line by Unicode scalar count, safe for UTF-8.
195///
196/// Returns `Cow::Borrowed` when no truncation is needed (zero-alloc for the common case).
197/// Reports truncated bytes (O(1)) instead of chars to avoid scanning the entire tail.
198pub(crate) fn truncate_line(line: &str, max_chars: usize) -> Cow<'_, str> {
199    match line.char_indices().nth(max_chars) {
200        None => Cow::Borrowed(line),
201        Some((i, _)) => {
202            let skipped_bytes = line.len() - i;
203            Cow::Owned(format!(
204                "{}... (truncated {skipped_bytes} bytes)",
205                &line[..i]
206            ))
207        }
208    }
209}
210
211#[cfg(test)]
212mod tests {
213    use super::*;
214
215    #[test]
216    fn truncate_line_short_line_borrowed() {
217        let result = truncate_line("hello", 10);
218        assert!(matches!(result, Cow::Borrowed(_)));
219        assert_eq!(&*result, "hello");
220    }
221
222    #[test]
223    fn truncate_line_exact_length_borrowed() {
224        let result = truncate_line("hello", 5);
225        assert!(matches!(result, Cow::Borrowed(_)));
226        assert_eq!(&*result, "hello");
227    }
228
229    #[test]
230    fn truncate_line_truncates_long_line() {
231        let result = truncate_line("hello world", 5);
232        assert!(matches!(result, Cow::Owned(_)));
233        assert_eq!(&*result, "hello... (truncated 6 bytes)");
234    }
235
236    #[test]
237    fn truncate_line_empty_string_borrowed() {
238        let result = truncate_line("", 10);
239        assert!(matches!(result, Cow::Borrowed(_)));
240        assert_eq!(&*result, "");
241    }
242
243    #[test]
244    fn truncate_line_unicode_cyrillic() {
245        let result = truncate_line("привет", 2);
246        assert_eq!(&*result, "пр... (truncated 8 bytes)");
247    }
248
249    #[test]
250    fn truncate_line_unicode_hebrew() {
251        let result = truncate_line("שלום", 2);
252        assert_eq!(&*result, "של... (truncated 4 bytes)");
253    }
254
255    #[test]
256    fn truncate_line_single_char_limit() {
257        let result = truncate_line("abc", 1);
258        assert_eq!(&*result, "a... (truncated 2 bytes)");
259    }
260}