agent_procs/daemon/
log_writer.rs1use crate::protocol::Stream as ProtoStream;
2use std::path::Path;
3use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
4use tokio::sync::broadcast;
5
6pub const DEFAULT_MAX_ROTATED_FILES: u32 = 5;
8
9#[derive(Debug, Clone)]
11pub struct OutputLine {
12 pub process: String,
13 pub stream: ProtoStream,
14 pub line: String,
15}
16
17pub async fn capture_output<R: tokio::io::AsyncRead + Unpin>(
20 reader: R,
21 log_path: &Path,
22 process_name: &str,
23 stream: ProtoStream,
24 tx: broadcast::Sender<OutputLine>,
25 max_bytes: u64,
26 max_rotated_files: u32,
27) {
28 let mut lines = BufReader::new(reader).lines();
29 let mut file = match tokio::fs::File::create(log_path).await {
30 Ok(f) => f,
31 Err(e) => {
32 tracing::warn!(path = %log_path.display(), process = %process_name, error = %e, "cannot create log file");
33 return;
34 }
35 };
36 let mut bytes_written: u64 = 0;
37
38 while let Ok(Some(line)) = lines.next_line().await {
39 let line_bytes = line.len() as u64 + 1; if max_bytes > 0 && bytes_written + line_bytes > max_bytes {
42 drop(file);
44 rotate_log_files(log_path, max_rotated_files).await;
45 file = match tokio::fs::File::create(log_path).await {
46 Ok(f) => f,
47 Err(e) => {
48 tracing::warn!(path = %log_path.display(), process = %process_name, error = %e, "cannot recreate log file after rotation");
49 return;
50 }
51 };
52 bytes_written = 0;
53 }
54
55 let _ = file.write_all(line.as_bytes()).await;
56 let _ = file.write_all(b"\n").await;
57 let _ = file.flush().await;
58 bytes_written += line_bytes;
59
60 let _ = tx.send(OutputLine {
62 process: process_name.to_string(),
63 stream,
64 line,
65 });
66 }
67}
68
69async fn rotate_log_files(log_path: &Path, max_rotated_files: u32) {
72 let ext = log_path
73 .extension()
74 .unwrap_or_default()
75 .to_string_lossy()
76 .to_string();
77
78 for i in (1..max_rotated_files).rev() {
80 let from = log_path.with_extension(format!("{}.{}", ext, i));
81 let to = log_path.with_extension(format!("{}.{}", ext, i + 1));
82 let _ = tokio::fs::rename(&from, &to).await;
83 }
84
85 let rotated_1 = log_path.with_extension(format!("{}.1", ext));
87 let _ = tokio::fs::rename(log_path, &rotated_1).await;
88
89 let excess = log_path.with_extension(format!("{}.{}", ext, max_rotated_files + 1));
91 let _ = tokio::fs::remove_file(&excess).await;
92}