agent_procs/daemon/
log_writer.rs1use crate::protocol::Stream as ProtoStream;
2use std::path::Path;
3use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
4use tokio::sync::broadcast;
5
6#[derive(Debug, Clone)]
8pub struct OutputLine {
9 pub process: String,
10 pub stream: ProtoStream,
11 pub line: String,
12}
13
14pub async fn capture_output<R: tokio::io::AsyncRead + Unpin>(
17 reader: R,
18 log_path: &Path,
19 process_name: &str,
20 stream: ProtoStream,
21 tx: broadcast::Sender<OutputLine>,
22 max_bytes: u64,
23) {
24 let mut lines = BufReader::new(reader).lines();
25 let mut file = match tokio::fs::File::create(log_path).await {
26 Ok(f) => f,
27 Err(_) => return,
28 };
29 let mut bytes_written: u64 = 0;
30
31 while let Ok(Some(line)) = lines.next_line().await {
32 let line_bytes = line.len() as u64 + 1; if max_bytes > 0 && bytes_written + line_bytes > max_bytes {
35 drop(file);
37 let rotated = log_path.with_extension(format!(
38 "{}.1",
39 log_path.extension().unwrap_or_default().to_string_lossy()
40 ));
41 let _ = tokio::fs::rename(log_path, &rotated).await;
42 file = match tokio::fs::File::create(log_path).await {
43 Ok(f) => f,
44 Err(_) => return,
45 };
46 bytes_written = 0;
47 }
48
49 let _ = file.write_all(line.as_bytes()).await;
50 let _ = file.write_all(b"\n").await;
51 let _ = file.flush().await;
52 bytes_written += line_bytes;
53
54 let _ = tx.send(OutputLine {
56 process: process_name.to_string(),
57 stream,
58 line,
59 });
60 }
61}