agent_procs/daemon/
log_writer.rs1use crate::protocol::Stream as ProtoStream;
2use std::path::Path;
3use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
4use tokio::sync::broadcast;
5
6#[derive(Debug, Clone)]
8pub struct OutputLine {
9 pub process: String,
10 pub stream: ProtoStream,
11 pub line: String,
12}
13
14pub async fn capture_output<R: tokio::io::AsyncRead + Unpin>(
17 reader: R,
18 log_path: &Path,
19 process_name: &str,
20 stream: ProtoStream,
21 tx: broadcast::Sender<OutputLine>,
22 max_bytes: u64,
23) {
24 let mut lines = BufReader::new(reader).lines();
25 let mut file = match tokio::fs::File::create(log_path).await {
26 Ok(f) => f,
27 Err(e) => {
28 tracing::warn!(path = %log_path.display(), process = %process_name, error = %e, "cannot create log file");
29 return;
30 }
31 };
32 let mut bytes_written: u64 = 0;
33
34 while let Ok(Some(line)) = lines.next_line().await {
35 let line_bytes = line.len() as u64 + 1; if max_bytes > 0 && bytes_written + line_bytes > max_bytes {
38 drop(file);
40 let rotated = log_path.with_extension(format!(
41 "{}.1",
42 log_path.extension().unwrap_or_default().to_string_lossy()
43 ));
44 let _ = tokio::fs::rename(log_path, &rotated).await;
45 file = match tokio::fs::File::create(log_path).await {
46 Ok(f) => f,
47 Err(e) => {
48 tracing::warn!(path = %log_path.display(), process = %process_name, error = %e, "cannot recreate log file after rotation");
49 return;
50 }
51 };
52 bytes_written = 0;
53 }
54
55 let _ = file.write_all(line.as_bytes()).await;
56 let _ = file.write_all(b"\n").await;
57 let _ = file.flush().await;
58 bytes_written += line_bytes;
59
60 let _ = tx.send(OutputLine {
62 process: process_name.to_string(),
63 stream,
64 line,
65 });
66 }
67}