Skip to main content

agent_procs/daemon/
log_writer.rs

1use crate::protocol::Stream as ProtoStream;
2use std::path::Path;
3use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
4use tokio::sync::broadcast;
5
6/// A line of output from a child process.
7#[derive(Debug, Clone)]
8pub struct OutputLine {
9    pub process: String,
10    pub stream: ProtoStream,
11    pub line: String,
12}
13
14/// Reads lines from a child's stdout or stderr, writes each line to a log file,
15/// and broadcasts it to subscribers (wait engine, --follow clients).
16pub async fn capture_output<R: tokio::io::AsyncRead + Unpin>(
17    reader: R,
18    log_path: &Path,
19    process_name: &str,
20    stream: ProtoStream,
21    tx: broadcast::Sender<OutputLine>,
22    max_bytes: u64,
23) {
24    let mut lines = BufReader::new(reader).lines();
25    let mut file = match tokio::fs::File::create(log_path).await {
26        Ok(f) => f,
27        Err(e) => {
28            eprintln!(
29                "warning: cannot create log file {:?} for {}: {}",
30                log_path, process_name, e
31            );
32            return;
33        }
34    };
35    let mut bytes_written: u64 = 0;
36
37    while let Ok(Some(line)) = lines.next_line().await {
38        // Write to log file (with rotation check)
39        let line_bytes = line.len() as u64 + 1; // +1 for newline
40        if max_bytes > 0 && bytes_written + line_bytes > max_bytes {
41            // Rotate: close current, rename to .1, start fresh
42            drop(file);
43            let rotated = log_path.with_extension(format!(
44                "{}.1",
45                log_path.extension().unwrap_or_default().to_string_lossy()
46            ));
47            let _ = tokio::fs::rename(log_path, &rotated).await;
48            file = match tokio::fs::File::create(log_path).await {
49                Ok(f) => f,
50                Err(e) => {
51                    eprintln!(
52                        "warning: cannot recreate log file {:?} for {} after rotation: {}",
53                        log_path, process_name, e
54                    );
55                    return;
56                }
57            };
58            bytes_written = 0;
59        }
60
61        let _ = file.write_all(line.as_bytes()).await;
62        let _ = file.write_all(b"\n").await;
63        let _ = file.flush().await;
64        bytes_written += line_bytes;
65
66        // Broadcast to wait engine / followers
67        let _ = tx.send(OutputLine {
68            process: process_name.to_string(),
69            stream,
70            line,
71        });
72    }
73}