Skip to main content

agent_procs/daemon/
log_writer.rs

1use crate::daemon::log_index::{IndexRecord, IndexWriter, idx_path_for};
2use crate::protocol::Stream as ProtoStream;
3use std::path::Path;
4use std::sync::Arc;
5use std::sync::atomic::{AtomicU64, Ordering};
6use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
7use tokio::sync::broadcast;
8
9/// Default number of rotated log files to keep.
10pub const DEFAULT_MAX_ROTATED_FILES: u32 = 5;
11
12/// A line of output from a child process.
13#[derive(Debug, Clone)]
14pub struct OutputLine {
15    pub process: String,
16    pub stream: ProtoStream,
17    pub line: String,
18}
19
20/// Reads lines from a child's stdout or stderr, writes each line to a log file
21/// with a sidecar `.idx` index, and broadcasts it to subscribers.
22#[allow(clippy::too_many_arguments)]
23pub async fn capture_output<R: tokio::io::AsyncRead + Unpin>(
24    reader: R,
25    log_path: &Path,
26    process_name: &str,
27    stream: ProtoStream,
28    tx: broadcast::Sender<OutputLine>,
29    max_bytes: u64,
30    max_rotated_files: u32,
31    seq: Arc<AtomicU64>,
32) {
33    let mut lines = BufReader::new(reader).lines();
34    let mut file = match tokio::fs::File::create(log_path).await {
35        Ok(f) => f,
36        Err(e) => {
37            tracing::warn!(path = %log_path.display(), process = %process_name, error = %e, "cannot create log file");
38            return;
39        }
40    };
41
42    let idx_path = idx_path_for(log_path);
43    let seq_base = seq.load(Ordering::Relaxed);
44    let mut idx_writer = match IndexWriter::create(&idx_path, seq_base) {
45        Ok(w) => w,
46        Err(e) => {
47            tracing::warn!(path = %idx_path.display(), error = %e, "cannot create index file");
48            return;
49        }
50    };
51
52    let mut bytes_written: u64 = 0;
53    let mut lines_since_idx_flush: u32 = 0;
54
55    while let Ok(Some(line)) = lines.next_line().await {
56        // Write to log file (with rotation check)
57        let line_bytes = line.len() as u64 + 1; // +1 for newline
58        if max_bytes > 0 && bytes_written + line_bytes > max_bytes {
59            // Flush index before rotation
60            let _ = idx_writer.flush();
61
62            // Rotate: cascade .N-1 → .N, then current → .1, delete excess
63            drop(file);
64            drop(idx_writer);
65            rotate_log_files(log_path, max_rotated_files).await;
66            file = match tokio::fs::File::create(log_path).await {
67                Ok(f) => f,
68                Err(e) => {
69                    tracing::warn!(path = %log_path.display(), process = %process_name, error = %e, "cannot recreate log file after rotation");
70                    return;
71                }
72            };
73            let new_seq_base = seq.load(Ordering::Relaxed);
74            idx_writer = match IndexWriter::create(&idx_path, new_seq_base) {
75                Ok(w) => w,
76                Err(e) => {
77                    tracing::warn!(path = %idx_path.display(), error = %e, "cannot recreate index file after rotation");
78                    return;
79                }
80            };
81            bytes_written = 0;
82            lines_since_idx_flush = 0;
83        }
84
85        // Record byte offset before writing the line
86        let byte_offset = bytes_written;
87        let line_seq = seq.fetch_add(1, Ordering::Relaxed);
88
89        // Write index record (buffered; flushed periodically below)
90        let _ = idx_writer.append(IndexRecord {
91            byte_offset,
92            seq: line_seq,
93        });
94
95        let _ = file.write_all(line.as_bytes()).await;
96        let _ = file.write_all(b"\n").await;
97        let _ = file.flush().await;
98        bytes_written += line_bytes;
99
100        // Flush index periodically to reduce syscalls while keeping
101        // the sidecar reasonably up-to-date for TUI reads.
102        lines_since_idx_flush += 1;
103        if lines_since_idx_flush >= 64 {
104            let _ = idx_writer.flush();
105            lines_since_idx_flush = 0;
106        }
107
108        // Broadcast to wait engine / followers
109        let _ = tx.send(OutputLine {
110            process: process_name.to_string(),
111            stream,
112            line,
113        });
114    }
115    // Flush remaining buffered index entries
116    let _ = idx_writer.flush();
117}
118
119/// Cascade-rotate log files and their `.idx` companions:
120/// shift .N-1 → .N down to .1 → .2, then rename current → .1,
121/// and delete files beyond `max_rotated_files`.
122async fn rotate_log_files(log_path: &Path, max_rotated_files: u32) {
123    let ext = log_path
124        .extension()
125        .unwrap_or_default()
126        .to_string_lossy()
127        .to_string();
128
129    // Shift existing rotated files: .N-1 → .N (starting from highest to avoid overwriting)
130    for i in (1..max_rotated_files).rev() {
131        let from = log_path.with_extension(format!("{}.{}", ext, i));
132        let to = log_path.with_extension(format!("{}.{}", ext, i + 1));
133        let _ = tokio::fs::rename(&from, &to).await;
134        // Also rename idx companion
135        let idx_from = idx_path_for(&from);
136        let idx_to = idx_path_for(&to);
137        let _ = tokio::fs::rename(&idx_from, &idx_to).await;
138    }
139
140    // Rename current → .1
141    let rotated_1 = log_path.with_extension(format!("{}.1", ext));
142    let _ = tokio::fs::rename(log_path, &rotated_1).await;
143    // Rename current idx → .1.idx
144    let idx_current = idx_path_for(log_path);
145    let idx_rotated_1 = idx_path_for(&rotated_1);
146    let _ = tokio::fs::rename(&idx_current, &idx_rotated_1).await;
147
148    // Delete excess files beyond max_rotated_files
149    let excess = log_path.with_extension(format!("{}.{}", ext, max_rotated_files + 1));
150    let _ = tokio::fs::remove_file(&excess).await;
151    let idx_excess = idx_path_for(&excess);
152    let _ = tokio::fs::remove_file(&idx_excess).await;
153}
154
155#[cfg(test)]
156mod tests {
157    use super::*;
158    use crate::daemon::log_index::IndexReader;
159    use std::sync::atomic::AtomicU64;
160
161    #[tokio::test]
162    async fn test_capture_output_creates_index() {
163        let dir = tempfile::tempdir().unwrap();
164        let log_path = dir.path().join("test.stdout");
165        let (tx, _rx) = broadcast::channel(16);
166        let seq = Arc::new(AtomicU64::new(0));
167
168        // Feed 5 lines through capture_output
169        let input = b"line 0\nline 1\nline 2\nline 3\nline 4\n";
170        let reader = &input[..];
171        capture_output(
172            reader,
173            &log_path,
174            "test",
175            ProtoStream::Stdout,
176            tx,
177            1024 * 1024,
178            5,
179            seq.clone(),
180        )
181        .await;
182
183        // Log file should exist with all lines
184        let content = std::fs::read_to_string(&log_path).unwrap();
185        assert_eq!(content.lines().count(), 5);
186
187        // Index file should exist alongside
188        let idx_path = idx_path_for(&log_path);
189        let mut idx = IndexReader::open(&idx_path).unwrap().unwrap();
190        assert_eq!(idx.line_count(), 5);
191
192        // First record: byte_offset=0, seq=0
193        let r0 = idx.read_record(0).unwrap();
194        assert_eq!(r0.byte_offset, 0);
195        assert_eq!(r0.seq, 0);
196
197        // Second record: byte_offset=7 ("line 0\n" = 7 bytes), seq=1
198        let r1 = idx.read_record(1).unwrap();
199        assert_eq!(r1.byte_offset, 7);
200        assert_eq!(r1.seq, 1);
201
202        // Seq counter should have advanced to 5
203        assert_eq!(seq.load(Ordering::Relaxed), 5);
204    }
205
206    #[tokio::test]
207    async fn test_capture_output_shared_seq_counter() {
208        let dir = tempfile::tempdir().unwrap();
209        let stdout_path = dir.path().join("test.stdout");
210        let stderr_path = dir.path().join("test.stderr");
211        let (tx, _rx) = broadcast::channel(16);
212        let seq = Arc::new(AtomicU64::new(0));
213
214        // Run stdout capture
215        let stdout_input = b"out1\nout2\n";
216        capture_output(
217            &stdout_input[..],
218            &stdout_path,
219            "test",
220            ProtoStream::Stdout,
221            tx.clone(),
222            1024 * 1024,
223            5,
224            seq.clone(),
225        )
226        .await;
227
228        // seq should be at 2 now
229        assert_eq!(seq.load(Ordering::Relaxed), 2);
230
231        // Run stderr capture (continues from seq=2)
232        let stderr_input = b"err1\nerr2\n";
233        capture_output(
234            &stderr_input[..],
235            &stderr_path,
236            "test",
237            ProtoStream::Stderr,
238            tx,
239            1024 * 1024,
240            5,
241            seq.clone(),
242        )
243        .await;
244
245        // seq should be at 4
246        assert_eq!(seq.load(Ordering::Relaxed), 4);
247
248        // Verify stderr index has seq 2 and 3
249        let idx_path = idx_path_for(&stderr_path);
250        let mut idx = IndexReader::open(&idx_path).unwrap().unwrap();
251        assert_eq!(idx.read_record(0).unwrap().seq, 2);
252        assert_eq!(idx.read_record(1).unwrap().seq, 3);
253    }
254
255    #[tokio::test]
256    async fn test_capture_output_rotation_creates_idx_companions() {
257        let dir = tempfile::tempdir().unwrap();
258        let log_path = dir.path().join("test.stdout");
259        let (tx, _rx) = broadcast::channel(64);
260        let seq = Arc::new(AtomicU64::new(0));
261
262        // max_bytes=50 will trigger rotation after a few lines
263        // "line NN\n" = 8 bytes, so ~6 lines per file
264        let mut input = String::new();
265        for i in 0..20 {
266            use std::fmt::Write;
267            let _ = writeln!(input, "line {:02}", i);
268        }
269
270        capture_output(
271            input.as_bytes(),
272            &log_path,
273            "test",
274            ProtoStream::Stdout,
275            tx,
276            50,
277            3,
278            seq,
279        )
280        .await;
281
282        // Current log file and idx should exist
283        assert!(log_path.exists());
284        assert!(idx_path_for(&log_path).exists());
285
286        // At least one rotated file should exist
287        let rotated = log_path.with_extension("stdout.1");
288        assert!(
289            rotated.exists(),
290            "rotated log .1 should exist after rotation"
291        );
292        assert!(
293            idx_path_for(&rotated).exists(),
294            "rotated idx .1 should exist after rotation"
295        );
296    }
297}