agent_procs/daemon/
log_writer.rs1use crate::protocol::Stream as ProtoStream;
2use std::path::Path;
3use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
4use tokio::sync::broadcast;
5
6#[derive(Debug, Clone)]
8pub struct OutputLine {
9 pub process: String,
10 pub stream: ProtoStream,
11 pub line: String,
12}
13
14pub async fn capture_output<R: tokio::io::AsyncRead + Unpin>(
17 reader: R,
18 log_path: &Path,
19 process_name: &str,
20 stream: ProtoStream,
21 tx: broadcast::Sender<OutputLine>,
22 max_bytes: u64,
23) {
24 let mut lines = BufReader::new(reader).lines();
25 let mut file = match tokio::fs::File::create(log_path).await {
26 Ok(f) => f,
27 Err(_) => return,
28 };
29 let mut bytes_written: u64 = 0;
30
31 while let Ok(Some(line)) = lines.next_line().await {
32 let line_bytes = line.len() as u64 + 1; if max_bytes > 0 && bytes_written + line_bytes > max_bytes {
35 drop(file);
37 let rotated = log_path.with_extension(
38 format!("{}.1", log_path.extension().unwrap_or_default().to_string_lossy())
39 );
40 let _ = tokio::fs::rename(log_path, &rotated).await;
41 file = match tokio::fs::File::create(log_path).await {
42 Ok(f) => f,
43 Err(_) => return,
44 };
45 bytes_written = 0;
46 }
47
48 let _ = file.write_all(line.as_bytes()).await;
49 let _ = file.write_all(b"\n").await;
50 let _ = file.flush().await;
51 bytes_written += line_bytes;
52
53 let _ = tx.send(OutputLine {
55 process: process_name.to_string(),
56 stream,
57 line,
58 });
59 }
60}