Skip to main content

agent_procs/daemon/
log_writer.rs

1use crate::protocol::Stream as ProtoStream;
2use std::path::Path;
3use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
4use tokio::sync::broadcast;
5
6/// Default number of rotated log files to keep.
7pub const DEFAULT_MAX_ROTATED_FILES: u32 = 5;
8
9/// A line of output from a child process.
10#[derive(Debug, Clone)]
11pub struct OutputLine {
12    pub process: String,
13    pub stream: ProtoStream,
14    pub line: String,
15}
16
17/// Reads lines from a child's stdout or stderr, writes each line to a log file,
18/// and broadcasts it to subscribers (wait engine, --follow clients).
19pub async fn capture_output<R: tokio::io::AsyncRead + Unpin>(
20    reader: R,
21    log_path: &Path,
22    process_name: &str,
23    stream: ProtoStream,
24    tx: broadcast::Sender<OutputLine>,
25    max_bytes: u64,
26    max_rotated_files: u32,
27) {
28    let mut lines = BufReader::new(reader).lines();
29    let mut file = match tokio::fs::File::create(log_path).await {
30        Ok(f) => f,
31        Err(e) => {
32            tracing::warn!(path = %log_path.display(), process = %process_name, error = %e, "cannot create log file");
33            return;
34        }
35    };
36    let mut bytes_written: u64 = 0;
37
38    while let Ok(Some(line)) = lines.next_line().await {
39        // Write to log file (with rotation check)
40        let line_bytes = line.len() as u64 + 1; // +1 for newline
41        if max_bytes > 0 && bytes_written + line_bytes > max_bytes {
42            // Rotate: cascade .N-1 → .N, then current → .1, delete excess
43            drop(file);
44            rotate_log_files(log_path, max_rotated_files).await;
45            file = match tokio::fs::File::create(log_path).await {
46                Ok(f) => f,
47                Err(e) => {
48                    tracing::warn!(path = %log_path.display(), process = %process_name, error = %e, "cannot recreate log file after rotation");
49                    return;
50                }
51            };
52            bytes_written = 0;
53        }
54
55        let _ = file.write_all(line.as_bytes()).await;
56        let _ = file.write_all(b"\n").await;
57        let _ = file.flush().await;
58        bytes_written += line_bytes;
59
60        // Broadcast to wait engine / followers
61        let _ = tx.send(OutputLine {
62            process: process_name.to_string(),
63            stream,
64            line,
65        });
66    }
67}
68
69/// Cascade-rotate log files: shift .N-1 → .N down to .1 → .2,
70/// then rename current → .1, and delete files beyond `max_rotated_files`.
71async fn rotate_log_files(log_path: &Path, max_rotated_files: u32) {
72    let ext = log_path
73        .extension()
74        .unwrap_or_default()
75        .to_string_lossy()
76        .to_string();
77
78    // Shift existing rotated files: .N-1 → .N (starting from highest to avoid overwriting)
79    for i in (1..max_rotated_files).rev() {
80        let from = log_path.with_extension(format!("{}.{}", ext, i));
81        let to = log_path.with_extension(format!("{}.{}", ext, i + 1));
82        let _ = tokio::fs::rename(&from, &to).await;
83    }
84
85    // Rename current → .1
86    let rotated_1 = log_path.with_extension(format!("{}.1", ext));
87    let _ = tokio::fs::rename(log_path, &rotated_1).await;
88
89    // Delete excess files beyond max_rotated_files
90    let excess = log_path.with_extension(format!("{}.{}", ext, max_rotated_files + 1));
91    let _ = tokio::fs::remove_file(&excess).await;
92}