Skip to main content

agent_procs/daemon/
log_writer.rs

1use crate::protocol::Stream as ProtoStream;
2use std::path::Path;
3use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
4use tokio::sync::broadcast;
5
6/// A line of output from a child process.
7#[derive(Debug, Clone)]
8pub struct OutputLine {
9    pub process: String,
10    pub stream: ProtoStream,
11    pub line: String,
12}
13
14/// Reads lines from a child's stdout or stderr, writes each line to a log file,
15/// and broadcasts it to subscribers (wait engine, --follow clients).
16pub async fn capture_output<R: tokio::io::AsyncRead + Unpin>(
17    reader: R,
18    log_path: &Path,
19    process_name: &str,
20    stream: ProtoStream,
21    tx: broadcast::Sender<OutputLine>,
22    max_bytes: u64,
23) {
24    let mut lines = BufReader::new(reader).lines();
25    let mut file = match tokio::fs::File::create(log_path).await {
26        Ok(f) => f,
27        Err(e) => {
28            tracing::warn!(path = %log_path.display(), process = %process_name, error = %e, "cannot create log file");
29            return;
30        }
31    };
32    let mut bytes_written: u64 = 0;
33
34    while let Ok(Some(line)) = lines.next_line().await {
35        // Write to log file (with rotation check)
36        let line_bytes = line.len() as u64 + 1; // +1 for newline
37        if max_bytes > 0 && bytes_written + line_bytes > max_bytes {
38            // Rotate: close current, rename to .1, start fresh
39            drop(file);
40            let rotated = log_path.with_extension(format!(
41                "{}.1",
42                log_path.extension().unwrap_or_default().to_string_lossy()
43            ));
44            let _ = tokio::fs::rename(log_path, &rotated).await;
45            file = match tokio::fs::File::create(log_path).await {
46                Ok(f) => f,
47                Err(e) => {
48                    tracing::warn!(path = %log_path.display(), process = %process_name, error = %e, "cannot recreate log file after rotation");
49                    return;
50                }
51            };
52            bytes_written = 0;
53        }
54
55        let _ = file.write_all(line.as_bytes()).await;
56        let _ = file.write_all(b"\n").await;
57        let _ = file.flush().await;
58        bytes_written += line_bytes;
59
60        // Broadcast to wait engine / followers
61        let _ = tx.send(OutputLine {
62            process: process_name.to_string(),
63            stream,
64            line,
65        });
66    }
67}