agent_procs/daemon/
log_writer.rs1use crate::protocol::Stream as ProtoStream;
2use std::path::Path;
3use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
4use tokio::sync::broadcast;
5
6#[derive(Debug, Clone)]
8pub struct OutputLine {
9 pub process: String,
10 pub stream: ProtoStream,
11 pub line: String,
12}
13
14pub async fn capture_output<R: tokio::io::AsyncRead + Unpin>(
17 reader: R,
18 log_path: &Path,
19 process_name: &str,
20 stream: ProtoStream,
21 tx: broadcast::Sender<OutputLine>,
22 max_bytes: u64,
23) {
24 let mut lines = BufReader::new(reader).lines();
25 let mut file = match tokio::fs::File::create(log_path).await {
26 Ok(f) => f,
27 Err(e) => {
28 eprintln!(
29 "warning: cannot create log file {:?} for {}: {}",
30 log_path, process_name, e
31 );
32 return;
33 }
34 };
35 let mut bytes_written: u64 = 0;
36
37 while let Ok(Some(line)) = lines.next_line().await {
38 let line_bytes = line.len() as u64 + 1; if max_bytes > 0 && bytes_written + line_bytes > max_bytes {
41 drop(file);
43 let rotated = log_path.with_extension(format!(
44 "{}.1",
45 log_path.extension().unwrap_or_default().to_string_lossy()
46 ));
47 let _ = tokio::fs::rename(log_path, &rotated).await;
48 file = match tokio::fs::File::create(log_path).await {
49 Ok(f) => f,
50 Err(e) => {
51 eprintln!(
52 "warning: cannot recreate log file {:?} for {} after rotation: {}",
53 log_path, process_name, e
54 );
55 return;
56 }
57 };
58 bytes_written = 0;
59 }
60
61 let _ = file.write_all(line.as_bytes()).await;
62 let _ = file.write_all(b"\n").await;
63 let _ = file.flush().await;
64 bytes_written += line_bytes;
65
66 let _ = tx.send(OutputLine {
68 process: process_name.to_string(),
69 stream,
70 line,
71 });
72 }
73}