use std::borrow::Cow;
use bytes::Bytes;
use solti_runner::OutputSink;
use tokio::io::{AsyncBufReadExt, AsyncReadExt, BufReader};
use tracing::{debug, info, warn};
#[derive(Debug, Clone, Copy)]
pub struct LogConfig {
pub max_line_length: usize,
pub max_line_bytes: usize,
pub stdout_info: bool,
pub stderr_warn: bool,
}
impl Default for LogConfig {
fn default() -> Self {
Self {
max_line_bytes: 64 * 1024,
max_line_length: 4096,
stdout_info: true,
stderr_warn: true,
}
}
}
#[derive(Debug, Clone, Copy)]
pub(crate) enum StreamKind {
Stdout,
Stderr,
}
impl StreamKind {
pub(crate) fn as_str(self) -> &'static str {
match self {
Self::Stdout => "stdout",
Self::Stderr => "stderr",
}
}
fn use_elevated_level(self, config: &LogConfig) -> bool {
match self {
Self::Stdout => config.stdout_info,
Self::Stderr => config.stderr_warn,
}
}
}
pub(crate) async fn log_stream<R>(
reader: R,
run_id: &str,
stream: StreamKind,
config: &LogConfig,
output_sink: Option<&OutputSink>,
) where
R: tokio::io::AsyncRead + Unpin,
{
let mut reader = BufReader::new(reader);
let stream_name = stream.as_str();
let mut line_count = 0u64;
let mut buf: Vec<u8> = Vec::with_capacity(256);
loop {
buf.clear();
let read_result = (&mut reader)
.take(config.max_line_bytes as u64)
.read_until(b'\n', &mut buf)
.await;
let bytes_read = match read_result {
Ok(0) => break,
Ok(n) => n,
Err(e) => {
warn!(
task = %run_id,
stream = %stream_name,
error = %e,
line_num = line_count,
"error while reading subprocess stream"
);
break;
}
};
let hit_cap = bytes_read == config.max_line_bytes && !buf.ends_with(b"\n");
if buf.ends_with(b"\n") {
buf.pop();
if buf.ends_with(b"\r") {
buf.pop();
}
}
let raw_line = String::from_utf8_lossy(&buf).into_owned();
let raw_line = if hit_cap {
format!(
"{raw_line} ...[line exceeded {} bytes, truncated]",
config.max_line_bytes
)
} else {
raw_line
};
if hit_cap {
let mut scratch = [0u8; 8 * 1024];
loop {
let drained = match reader.read(&mut scratch).await {
Ok(0) => break,
Ok(n) => n,
Err(_) => break,
};
if let Some(nl) = scratch[..drained].iter().position(|&b| b == b'\n') {
let _ = nl;
break;
}
}
}
let line = truncate_line(&raw_line, config.max_line_length);
line_count += 1;
if stream.use_elevated_level(config) {
match stream {
StreamKind::Stdout => info!(
task = %run_id,
stream = %stream_name,
line_num = line_count,
"{}",
line
),
StreamKind::Stderr => warn!(
task = %run_id,
stream = %stream_name,
line_num = line_count,
"{}",
line
),
}
} else {
debug!(
task = %run_id,
stream = %stream_name,
line_num = line_count,
"{}",
line
);
}
if let Some(sink) = output_sink {
let bytes_line: Bytes = match line {
Cow::Borrowed(s) => Bytes::copy_from_slice(s.as_bytes()),
Cow::Owned(s) => Bytes::from(s),
};
match stream {
StreamKind::Stdout => sink.stdout_line(bytes_line),
StreamKind::Stderr => sink.stderr_line(bytes_line),
}
}
}
debug!(
task = %run_id,
stream = %stream_name,
total_lines = line_count,
"stream closed"
);
}
pub(crate) fn truncate_line(line: &str, max_chars: usize) -> Cow<'_, str> {
match line.char_indices().nth(max_chars) {
None => Cow::Borrowed(line),
Some((i, _)) => {
let skipped_bytes = line.len() - i;
Cow::Owned(format!(
"{}... (truncated {skipped_bytes} bytes)",
&line[..i]
))
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use solti_model::OutputEvent;
use solti_runner::OutputSink;
use tokio::sync::broadcast;
#[test]
fn truncate_line_short_line_borrowed() {
let result = truncate_line("hello", 10);
assert!(matches!(result, Cow::Borrowed(_)));
assert_eq!(&*result, "hello");
}
#[test]
fn truncate_line_exact_length_borrowed() {
let result = truncate_line("hello", 5);
assert!(matches!(result, Cow::Borrowed(_)));
assert_eq!(&*result, "hello");
}
#[test]
fn truncate_line_truncates_long_line() {
let result = truncate_line("hello world", 5);
assert!(matches!(result, Cow::Owned(_)));
assert_eq!(&*result, "hello... (truncated 6 bytes)");
}
#[test]
fn truncate_line_empty_string_borrowed() {
let result = truncate_line("", 10);
assert!(matches!(result, Cow::Borrowed(_)));
assert_eq!(&*result, "");
}
#[test]
fn truncate_line_unicode_cyrillic() {
let result = truncate_line("привет", 2);
assert_eq!(&*result, "пр... (truncated 8 bytes)");
}
#[test]
fn truncate_line_unicode_hebrew() {
let result = truncate_line("שלום", 2);
assert_eq!(&*result, "של... (truncated 4 bytes)");
}
#[test]
fn truncate_line_single_char_limit() {
let result = truncate_line("abc", 1);
assert_eq!(&*result, "a... (truncated 2 bytes)");
}
#[tokio::test]
async fn log_stream_pushes_each_stdout_line_to_sink() {
let (tx, mut rx) = broadcast::channel::<OutputEvent>(16);
let sink = OutputSink::new(tx, 1);
let reader = "alpha\nbeta\ngamma\n".as_bytes();
log_stream(
reader,
"task-1",
StreamKind::Stdout,
&LogConfig::default(),
Some(&sink),
)
.await;
let mut lines = Vec::new();
while let Ok(ev) = rx.try_recv() {
if let OutputEvent::Chunk(c) = ev {
assert_eq!(c.stream, solti_model::StreamKind::Stdout);
lines.push(std::str::from_utf8(&c.line).unwrap().to_string());
}
}
assert_eq!(lines, vec!["alpha", "beta", "gamma"]);
}
#[tokio::test]
async fn log_stream_pushes_stderr_line_with_stderr_kind() {
let (tx, mut rx) = broadcast::channel::<OutputEvent>(16);
let sink = OutputSink::new(tx, 1);
log_stream(
"boom\n".as_bytes(),
"task-2",
StreamKind::Stderr,
&LogConfig::default(),
Some(&sink),
)
.await;
match rx.recv().await.unwrap() {
OutputEvent::Chunk(c) => {
assert_eq!(c.stream, solti_model::StreamKind::Stderr);
assert_eq!(&c.line[..], b"boom");
}
other => panic!("expected Chunk, got {other:?}"),
}
}
#[tokio::test]
async fn log_stream_pushes_truncated_line_not_raw() {
let cfg = LogConfig {
max_line_length: 5,
..LogConfig::default()
};
let (tx, mut rx) = broadcast::channel::<OutputEvent>(16);
let sink = OutputSink::new(tx, 1);
log_stream(
"hello world\n".as_bytes(),
"task-3",
StreamKind::Stdout,
&cfg,
Some(&sink),
)
.await;
match rx.recv().await.unwrap() {
OutputEvent::Chunk(c) => {
let line_text = std::str::from_utf8(&c.line).expect("line must be UTF-8");
assert!(
line_text.starts_with("hello"),
"expected truncated, got {line_text:?}"
);
assert!(line_text.contains("truncated"));
}
other => panic!("expected Chunk, got {other:?}"),
}
}
#[tokio::test]
async fn log_stream_with_none_sink_is_a_noop_for_subscribers() {
log_stream(
"noisy\n".as_bytes(),
"task-4",
StreamKind::Stdout,
&LogConfig::default(),
None,
)
.await;
}
}