solti_exec/subprocess/
logger.rs1use std::borrow::Cow;
32
33use tokio::io::{AsyncBufReadExt, AsyncReadExt, BufReader};
34use tracing::{debug, info, warn};
35
36#[derive(Debug, Clone, Copy)]
43pub struct LogConfig {
44 pub max_line_length: usize,
46 pub max_line_bytes: usize,
48 pub stdout_info: bool,
50 pub stderr_warn: bool,
52}
53
54impl Default for LogConfig {
55 fn default() -> Self {
56 Self {
57 max_line_bytes: 64 * 1024,
58 max_line_length: 4096,
59 stdout_info: true,
60 stderr_warn: true,
61 }
62 }
63}
64
65#[derive(Debug, Clone, Copy)]
67pub(crate) enum StreamKind {
68 Stdout,
69 Stderr,
70}
71
72impl StreamKind {
73 pub(crate) fn as_str(self) -> &'static str {
74 match self {
75 Self::Stdout => "stdout",
76 Self::Stderr => "stderr",
77 }
78 }
79
80 fn use_elevated_level(self, config: &LogConfig) -> bool {
81 match self {
82 Self::Stdout => config.stdout_info,
83 Self::Stderr => config.stderr_warn,
84 }
85 }
86}
87
88pub(crate) async fn log_stream<R>(reader: R, run_id: &str, stream: StreamKind, config: &LogConfig)
93where
94 R: tokio::io::AsyncRead + Unpin,
95{
96 let mut reader = BufReader::new(reader);
97 let stream_name = stream.as_str();
98 let mut line_count = 0u64;
99 let mut buf: Vec<u8> = Vec::with_capacity(256);
100
101 loop {
102 buf.clear();
103 let read_result = (&mut reader)
104 .take(config.max_line_bytes as u64)
105 .read_until(b'\n', &mut buf)
106 .await;
107
108 let bytes_read = match read_result {
109 Ok(0) => break,
110 Ok(n) => n,
111 Err(e) => {
112 warn!(
113 task = %run_id,
114 stream = %stream_name,
115 error = %e,
116 line_num = line_count,
117 "error while reading subprocess stream"
118 );
119 break;
120 }
121 };
122
123 let hit_cap = bytes_read == config.max_line_bytes && !buf.ends_with(b"\n");
124 if buf.ends_with(b"\n") {
125 buf.pop();
126 if buf.ends_with(b"\r") {
127 buf.pop();
128 }
129 }
130 let raw_line = String::from_utf8_lossy(&buf).into_owned();
131 let raw_line = if hit_cap {
132 format!(
133 "{raw_line} ...[line exceeded {} bytes, truncated]",
134 config.max_line_bytes
135 )
136 } else {
137 raw_line
138 };
139
140 if hit_cap {
141 let mut scratch = [0u8; 8 * 1024];
142 loop {
143 let drained = match reader.read(&mut scratch).await {
144 Ok(0) => break,
145 Ok(n) => n,
146 Err(_) => break,
147 };
148 if let Some(nl) = scratch[..drained].iter().position(|&b| b == b'\n') {
149 let _ = nl;
150 break;
151 }
152 }
153 }
154
155 let line = truncate_line(&raw_line, config.max_line_length);
156 line_count += 1;
157
158 if stream.use_elevated_level(config) {
159 match stream {
160 StreamKind::Stdout => info!(
161 task = %run_id,
162 stream = %stream_name,
163 line_num = line_count,
164 "{}",
165 line
166 ),
167 StreamKind::Stderr => warn!(
168 task = %run_id,
169 stream = %stream_name,
170 line_num = line_count,
171 "{}",
172 line
173 ),
174 }
175 } else {
176 debug!(
177 task = %run_id,
178 stream = %stream_name,
179 line_num = line_count,
180 "{}",
181 line
182 );
183 }
184 }
185
186 debug!(
187 task = %run_id,
188 stream = %stream_name,
189 total_lines = line_count,
190 "stream closed"
191 );
192}
193
194pub(crate) fn truncate_line(line: &str, max_chars: usize) -> Cow<'_, str> {
199 match line.char_indices().nth(max_chars) {
200 None => Cow::Borrowed(line),
201 Some((i, _)) => {
202 let skipped_bytes = line.len() - i;
203 Cow::Owned(format!(
204 "{}... (truncated {skipped_bytes} bytes)",
205 &line[..i]
206 ))
207 }
208 }
209}
210
211#[cfg(test)]
212mod tests {
213 use super::*;
214
215 #[test]
216 fn truncate_line_short_line_borrowed() {
217 let result = truncate_line("hello", 10);
218 assert!(matches!(result, Cow::Borrowed(_)));
219 assert_eq!(&*result, "hello");
220 }
221
222 #[test]
223 fn truncate_line_exact_length_borrowed() {
224 let result = truncate_line("hello", 5);
225 assert!(matches!(result, Cow::Borrowed(_)));
226 assert_eq!(&*result, "hello");
227 }
228
229 #[test]
230 fn truncate_line_truncates_long_line() {
231 let result = truncate_line("hello world", 5);
232 assert!(matches!(result, Cow::Owned(_)));
233 assert_eq!(&*result, "hello... (truncated 6 bytes)");
234 }
235
236 #[test]
237 fn truncate_line_empty_string_borrowed() {
238 let result = truncate_line("", 10);
239 assert!(matches!(result, Cow::Borrowed(_)));
240 assert_eq!(&*result, "");
241 }
242
243 #[test]
244 fn truncate_line_unicode_cyrillic() {
245 let result = truncate_line("привет", 2);
246 assert_eq!(&*result, "пр... (truncated 8 bytes)");
247 }
248
249 #[test]
250 fn truncate_line_unicode_hebrew() {
251 let result = truncate_line("שלום", 2);
252 assert_eq!(&*result, "של... (truncated 4 bytes)");
253 }
254
255 #[test]
256 fn truncate_line_single_char_limit() {
257 let result = truncate_line("abc", 1);
258 assert_eq!(&*result, "a... (truncated 2 bytes)");
259 }
260}