1use std::borrow::Cow;
32
33use bytes::Bytes;
34use solti_runner::OutputSink;
35use tokio::io::{AsyncBufReadExt, AsyncReadExt, BufReader};
36use tracing::{debug, info, warn};
37
38#[derive(Debug, Clone, Copy)]
45pub struct LogConfig {
46 pub max_line_length: usize,
48 pub max_line_bytes: usize,
50 pub stdout_info: bool,
52 pub stderr_warn: bool,
54}
55
56impl Default for LogConfig {
57 fn default() -> Self {
58 Self {
59 max_line_bytes: 64 * 1024,
60 max_line_length: 4096,
61 stdout_info: true,
62 stderr_warn: true,
63 }
64 }
65}
66
67#[derive(Debug, Clone, Copy)]
69pub(crate) enum StreamKind {
70 Stdout,
71 Stderr,
72}
73
74impl StreamKind {
75 pub(crate) fn as_str(self) -> &'static str {
76 match self {
77 Self::Stdout => "stdout",
78 Self::Stderr => "stderr",
79 }
80 }
81
82 fn use_elevated_level(self, config: &LogConfig) -> bool {
83 match self {
84 Self::Stdout => config.stdout_info,
85 Self::Stderr => config.stderr_warn,
86 }
87 }
88}
89
90pub(crate) async fn log_stream<R>(
92 reader: R,
93 run_id: &str,
94 stream: StreamKind,
95 config: &LogConfig,
96 output_sink: Option<&OutputSink>,
97) where
98 R: tokio::io::AsyncRead + Unpin,
99{
100 let mut reader = BufReader::new(reader);
101 let stream_name = stream.as_str();
102 let mut line_count = 0u64;
103 let mut buf: Vec<u8> = Vec::with_capacity(256);
104
105 loop {
106 buf.clear();
107 let read_result = (&mut reader)
108 .take(config.max_line_bytes as u64)
109 .read_until(b'\n', &mut buf)
110 .await;
111
112 let bytes_read = match read_result {
113 Ok(0) => break,
114 Ok(n) => n,
115 Err(e) => {
116 warn!(
117 task = %run_id,
118 stream = %stream_name,
119 error = %e,
120 line_num = line_count,
121 "error while reading subprocess stream"
122 );
123 break;
124 }
125 };
126
127 let hit_cap = bytes_read == config.max_line_bytes && !buf.ends_with(b"\n");
128 if buf.ends_with(b"\n") {
129 buf.pop();
130 if buf.ends_with(b"\r") {
131 buf.pop();
132 }
133 }
134 let raw_line = String::from_utf8_lossy(&buf).into_owned();
135 let raw_line = if hit_cap {
136 format!(
137 "{raw_line} ...[line exceeded {} bytes, truncated]",
138 config.max_line_bytes
139 )
140 } else {
141 raw_line
142 };
143
144 if hit_cap {
145 let mut scratch = [0u8; 8 * 1024];
146 loop {
147 let drained = match reader.read(&mut scratch).await {
148 Ok(0) => break,
149 Ok(n) => n,
150 Err(_) => break,
151 };
152 if let Some(nl) = scratch[..drained].iter().position(|&b| b == b'\n') {
153 let _ = nl;
154 break;
155 }
156 }
157 }
158
159 let line = truncate_line(&raw_line, config.max_line_length);
160 line_count += 1;
161
162 if stream.use_elevated_level(config) {
163 match stream {
164 StreamKind::Stdout => info!(
165 task = %run_id,
166 stream = %stream_name,
167 line_num = line_count,
168 "{}",
169 line
170 ),
171 StreamKind::Stderr => warn!(
172 task = %run_id,
173 stream = %stream_name,
174 line_num = line_count,
175 "{}",
176 line
177 ),
178 }
179 } else {
180 debug!(
181 task = %run_id,
182 stream = %stream_name,
183 line_num = line_count,
184 "{}",
185 line
186 );
187 }
188
189 if let Some(sink) = output_sink {
190 let bytes_line: Bytes = match line {
191 Cow::Borrowed(s) => Bytes::copy_from_slice(s.as_bytes()),
192 Cow::Owned(s) => Bytes::from(s),
193 };
194 match stream {
195 StreamKind::Stdout => sink.stdout_line(bytes_line),
196 StreamKind::Stderr => sink.stderr_line(bytes_line),
197 }
198 }
199 }
200
201 debug!(
202 task = %run_id,
203 stream = %stream_name,
204 total_lines = line_count,
205 "stream closed"
206 );
207}
208
209pub(crate) fn truncate_line(line: &str, max_chars: usize) -> Cow<'_, str> {
214 match line.char_indices().nth(max_chars) {
215 None => Cow::Borrowed(line),
216 Some((i, _)) => {
217 let skipped_bytes = line.len() - i;
218 Cow::Owned(format!(
219 "{}... (truncated {skipped_bytes} bytes)",
220 &line[..i]
221 ))
222 }
223 }
224}
225
226#[cfg(test)]
227mod tests {
228 use super::*;
229
230 use solti_model::OutputEvent;
231 use solti_runner::OutputSink;
232 use tokio::sync::broadcast;
233
234 #[test]
235 fn truncate_line_short_line_borrowed() {
236 let result = truncate_line("hello", 10);
237 assert!(matches!(result, Cow::Borrowed(_)));
238 assert_eq!(&*result, "hello");
239 }
240
241 #[test]
242 fn truncate_line_exact_length_borrowed() {
243 let result = truncate_line("hello", 5);
244 assert!(matches!(result, Cow::Borrowed(_)));
245 assert_eq!(&*result, "hello");
246 }
247
248 #[test]
249 fn truncate_line_truncates_long_line() {
250 let result = truncate_line("hello world", 5);
251 assert!(matches!(result, Cow::Owned(_)));
252 assert_eq!(&*result, "hello... (truncated 6 bytes)");
253 }
254
255 #[test]
256 fn truncate_line_empty_string_borrowed() {
257 let result = truncate_line("", 10);
258 assert!(matches!(result, Cow::Borrowed(_)));
259 assert_eq!(&*result, "");
260 }
261
262 #[test]
263 fn truncate_line_unicode_cyrillic() {
264 let result = truncate_line("привет", 2);
265 assert_eq!(&*result, "пр... (truncated 8 bytes)");
266 }
267
268 #[test]
269 fn truncate_line_unicode_hebrew() {
270 let result = truncate_line("שלום", 2);
271 assert_eq!(&*result, "של... (truncated 4 bytes)");
272 }
273
274 #[test]
275 fn truncate_line_single_char_limit() {
276 let result = truncate_line("abc", 1);
277 assert_eq!(&*result, "a... (truncated 2 bytes)");
278 }
279
280 #[tokio::test]
281 async fn log_stream_pushes_each_stdout_line_to_sink() {
282 let (tx, mut rx) = broadcast::channel::<OutputEvent>(16);
283 let sink = OutputSink::new(tx, 1);
284
285 let reader = "alpha\nbeta\ngamma\n".as_bytes();
286 log_stream(
287 reader,
288 "task-1",
289 StreamKind::Stdout,
290 &LogConfig::default(),
291 Some(&sink),
292 )
293 .await;
294
295 let mut lines = Vec::new();
296 while let Ok(ev) = rx.try_recv() {
297 if let OutputEvent::Chunk(c) = ev {
298 assert_eq!(c.stream, solti_model::StreamKind::Stdout);
299 lines.push(std::str::from_utf8(&c.line).unwrap().to_string());
300 }
301 }
302 assert_eq!(lines, vec!["alpha", "beta", "gamma"]);
303 }
304
305 #[tokio::test]
306 async fn log_stream_pushes_stderr_line_with_stderr_kind() {
307 let (tx, mut rx) = broadcast::channel::<OutputEvent>(16);
308 let sink = OutputSink::new(tx, 1);
309
310 log_stream(
311 "boom\n".as_bytes(),
312 "task-2",
313 StreamKind::Stderr,
314 &LogConfig::default(),
315 Some(&sink),
316 )
317 .await;
318
319 match rx.recv().await.unwrap() {
320 OutputEvent::Chunk(c) => {
321 assert_eq!(c.stream, solti_model::StreamKind::Stderr);
322 assert_eq!(&c.line[..], b"boom");
323 }
324 other => panic!("expected Chunk, got {other:?}"),
325 }
326 }
327
328 #[tokio::test]
329 async fn log_stream_pushes_truncated_line_not_raw() {
330 let cfg = LogConfig {
331 max_line_length: 5,
332 ..LogConfig::default()
333 };
334 let (tx, mut rx) = broadcast::channel::<OutputEvent>(16);
335 let sink = OutputSink::new(tx, 1);
336
337 log_stream(
338 "hello world\n".as_bytes(),
339 "task-3",
340 StreamKind::Stdout,
341 &cfg,
342 Some(&sink),
343 )
344 .await;
345
346 match rx.recv().await.unwrap() {
347 OutputEvent::Chunk(c) => {
348 let line_text = std::str::from_utf8(&c.line).expect("line must be UTF-8");
349 assert!(
350 line_text.starts_with("hello"),
351 "expected truncated, got {line_text:?}"
352 );
353 assert!(line_text.contains("truncated"));
354 }
355 other => panic!("expected Chunk, got {other:?}"),
356 }
357 }
358
359 #[tokio::test]
360 async fn log_stream_with_none_sink_is_a_noop_for_subscribers() {
361 log_stream(
362 "noisy\n".as_bytes(),
363 "task-4",
364 StreamKind::Stdout,
365 &LogConfig::default(),
366 None,
367 )
368 .await;
369 }
370}