Skip to main content

agent_procs/daemon/
log_writer.rs

1use crate::daemon::log_index::{IndexRecord, IndexWriter, idx_path_for};
2use crate::protocol::Stream as ProtoStream;
3use std::path::Path;
4use std::sync::Arc;
5use std::sync::atomic::{AtomicU64, Ordering};
6use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
7use tokio::sync::broadcast;
8
9/// Default number of rotated log files to keep.
10pub const DEFAULT_MAX_ROTATED_FILES: u32 = 5;
11
12/// A line of output from a child process.
13#[derive(Debug, Clone)]
14pub struct OutputLine {
15    pub process: String,
16    pub stream: ProtoStream,
17    pub line: String,
18}
19
20/// State for writing lines to a log file with index and broadcast.
21struct LogWriteState<'a> {
22    file: Option<tokio::fs::File>,
23    idx_writer: IndexWriter,
24    bytes_written: u64,
25    lines_since_idx_flush: u32,
26    // Invariant context (set once, never changes)
27    log_path: &'a Path,
28    process_name: &'a str,
29    stream: ProtoStream,
30    tx: &'a broadcast::Sender<OutputLine>,
31    max_bytes: u64,
32    max_rotated_files: u32,
33    seq: &'a AtomicU64,
34}
35
36impl LogWriteState<'_> {
37    async fn write_line(&mut self, line: &str) {
38        let line_bytes = line.len() as u64 + 1;
39        if self.max_bytes > 0 && self.bytes_written + line_bytes > self.max_bytes {
40            let _ = self.idx_writer.flush();
41            // Drop file handle before rotation
42            self.file = None;
43
44            rotate_log_files(self.log_path, self.max_rotated_files).await;
45            self.file = match tokio::fs::File::create(self.log_path).await {
46                Ok(f) => Some(f),
47                Err(e) => {
48                    tracing::warn!(path = %self.log_path.display(), process = %self.process_name, error = %e, "cannot recreate log file after rotation");
49                    return;
50                }
51            };
52            let idx_path = idx_path_for(self.log_path);
53            let new_seq_base = self.seq.load(Ordering::Relaxed);
54            self.idx_writer = match IndexWriter::create(&idx_path, new_seq_base) {
55                Ok(w) => w,
56                Err(e) => {
57                    tracing::warn!(path = %idx_path.display(), error = %e, "cannot recreate index file after rotation");
58                    return;
59                }
60            };
61            self.bytes_written = 0;
62            self.lines_since_idx_flush = 0;
63        }
64
65        let Some(ref mut file) = self.file else {
66            return;
67        };
68
69        let byte_offset = self.bytes_written;
70        let line_seq = self.seq.fetch_add(1, Ordering::Relaxed);
71        let _ = self.idx_writer.append(IndexRecord {
72            byte_offset,
73            seq: line_seq,
74        });
75
76        let _ = file.write_all(line.as_bytes()).await;
77        let _ = file.write_all(b"\n").await;
78        let _ = file.flush().await;
79        self.bytes_written += line_bytes;
80
81        self.lines_since_idx_flush += 1;
82        if self.lines_since_idx_flush >= 64 {
83            let _ = self.idx_writer.flush();
84            self.lines_since_idx_flush = 0;
85        }
86
87        let _ = self.tx.send(OutputLine {
88            process: self.process_name.to_string(),
89            stream: self.stream,
90            line: line.to_string(),
91        });
92    }
93}
94
95/// Reads lines from a child's stdout or stderr, writes each line to a log file
96/// with a sidecar `.idx` index, and broadcasts it to subscribers.
97///
98/// Also accepts a supervisor channel for synthetic log lines (e.g. restart
99/// annotations). After the pipe hits EOF, continues draining the supervisor
100/// channel until it is closed.
101#[allow(clippy::too_many_arguments)]
102pub async fn capture_output<R: tokio::io::AsyncRead + Unpin>(
103    reader: R,
104    log_path: &Path,
105    process_name: &str,
106    stream: ProtoStream,
107    tx: broadcast::Sender<OutputLine>,
108    max_bytes: u64,
109    max_rotated_files: u32,
110    seq: Arc<AtomicU64>,
111    mut sup_rx: tokio::sync::mpsc::Receiver<String>,
112) {
113    let mut lines = BufReader::new(reader).lines();
114    let file = match tokio::fs::File::create(log_path).await {
115        Ok(f) => f,
116        Err(e) => {
117            tracing::warn!(path = %log_path.display(), process = %process_name, error = %e, "cannot create log file");
118            return;
119        }
120    };
121
122    let idx_path = idx_path_for(log_path);
123    let seq_base = seq.load(Ordering::Relaxed);
124    let idx_writer = match IndexWriter::create(&idx_path, seq_base) {
125        Ok(w) => w,
126        Err(e) => {
127            tracing::warn!(path = %idx_path.display(), error = %e, "cannot create index file");
128            return;
129        }
130    };
131
132    let mut state = LogWriteState {
133        file: Some(file),
134        idx_writer,
135        bytes_written: 0,
136        lines_since_idx_flush: 0,
137        log_path,
138        process_name,
139        stream,
140        tx: &tx,
141        max_bytes,
142        max_rotated_files,
143        seq: &seq,
144    };
145
146    // Main loop: select between pipe reads and supervisor channel
147    loop {
148        let line = tokio::select! {
149            result = lines.next_line() => {
150                match result {
151                    Ok(Some(line)) => line,
152                    _ => break, // pipe EOF — fall through to drain supervisor channel
153                }
154            }
155            Some(sup_line) = sup_rx.recv() => sup_line,
156        };
157
158        state.write_line(&line).await;
159    }
160
161    // After pipe EOF, drain remaining supervisor lines
162    while let Some(sup_line) = sup_rx.recv().await {
163        state.write_line(&sup_line).await;
164    }
165
166    // Flush remaining buffered index entries
167    let _ = state.idx_writer.flush();
168}
169
170/// Rotate log files if the path exists. Used by `respawn_in_place()`.
171pub async fn rotate_if_exists(log_path: &Path) {
172    if log_path.exists() {
173        rotate_log_files(log_path, DEFAULT_MAX_ROTATED_FILES).await;
174    }
175}
176
177/// Cascade-rotate log files and their `.idx` companions:
178/// shift .N-1 → .N down to .1 → .2, then rename current → .1,
179/// and delete files beyond `max_rotated_files`.
180pub(crate) async fn rotate_log_files(log_path: &Path, max_rotated_files: u32) {
181    let ext = log_path
182        .extension()
183        .unwrap_or_default()
184        .to_string_lossy()
185        .to_string();
186
187    // Shift existing rotated files: .N-1 → .N (starting from highest to avoid overwriting)
188    for i in (1..max_rotated_files).rev() {
189        let from = log_path.with_extension(format!("{}.{}", ext, i));
190        let to = log_path.with_extension(format!("{}.{}", ext, i + 1));
191        let _ = tokio::fs::rename(&from, &to).await;
192        // Also rename idx companion
193        let idx_from = idx_path_for(&from);
194        let idx_to = idx_path_for(&to);
195        let _ = tokio::fs::rename(&idx_from, &idx_to).await;
196    }
197
198    // Rename current → .1
199    let rotated_1 = log_path.with_extension(format!("{}.1", ext));
200    let _ = tokio::fs::rename(log_path, &rotated_1).await;
201    // Rename current idx → .1.idx
202    let idx_current = idx_path_for(log_path);
203    let idx_rotated_1 = idx_path_for(&rotated_1);
204    let _ = tokio::fs::rename(&idx_current, &idx_rotated_1).await;
205
206    // Delete excess files beyond max_rotated_files
207    let excess = log_path.with_extension(format!("{}.{}", ext, max_rotated_files + 1));
208    let _ = tokio::fs::remove_file(&excess).await;
209    let idx_excess = idx_path_for(&excess);
210    let _ = tokio::fs::remove_file(&idx_excess).await;
211}
212
213#[cfg(test)]
214mod tests {
215    use super::*;
216    use crate::daemon::log_index::IndexReader;
217    use std::sync::atomic::AtomicU64;
218
219    /// Create a dummy supervisor channel (sender dropped immediately).
220    fn dummy_sup_rx() -> tokio::sync::mpsc::Receiver<String> {
221        let (_tx, rx) = tokio::sync::mpsc::channel::<String>(1);
222        rx
223    }
224
225    #[tokio::test]
226    async fn test_capture_output_creates_index() {
227        let dir = tempfile::tempdir().unwrap();
228        let log_path = dir.path().join("test.stdout");
229        let (tx, _rx) = broadcast::channel(16);
230        let seq = Arc::new(AtomicU64::new(0));
231
232        // Feed 5 lines through capture_output
233        let input = b"line 0\nline 1\nline 2\nline 3\nline 4\n";
234        let reader = &input[..];
235        capture_output(
236            reader,
237            &log_path,
238            "test",
239            ProtoStream::Stdout,
240            tx,
241            1024 * 1024,
242            5,
243            seq.clone(),
244            dummy_sup_rx(),
245        )
246        .await;
247
248        // Log file should exist with all lines
249        let content = std::fs::read_to_string(&log_path).unwrap();
250        assert_eq!(content.lines().count(), 5);
251
252        // Index file should exist alongside
253        let idx_path = idx_path_for(&log_path);
254        let mut idx = IndexReader::open(&idx_path).unwrap().unwrap();
255        assert_eq!(idx.line_count(), 5);
256
257        // First record: byte_offset=0, seq=0
258        let r0 = idx.read_record(0).unwrap();
259        assert_eq!(r0.byte_offset, 0);
260        assert_eq!(r0.seq, 0);
261
262        // Second record: byte_offset=7 ("line 0\n" = 7 bytes), seq=1
263        let r1 = idx.read_record(1).unwrap();
264        assert_eq!(r1.byte_offset, 7);
265        assert_eq!(r1.seq, 1);
266
267        // Seq counter should have advanced to 5
268        assert_eq!(seq.load(Ordering::Relaxed), 5);
269    }
270
271    #[tokio::test]
272    async fn test_capture_output_shared_seq_counter() {
273        let dir = tempfile::tempdir().unwrap();
274        let stdout_path = dir.path().join("test.stdout");
275        let stderr_path = dir.path().join("test.stderr");
276        let (tx, _rx) = broadcast::channel(16);
277        let seq = Arc::new(AtomicU64::new(0));
278
279        // Run stdout capture
280        let stdout_input = b"out1\nout2\n";
281        capture_output(
282            &stdout_input[..],
283            &stdout_path,
284            "test",
285            ProtoStream::Stdout,
286            tx.clone(),
287            1024 * 1024,
288            5,
289            seq.clone(),
290            dummy_sup_rx(),
291        )
292        .await;
293
294        // seq should be at 2 now
295        assert_eq!(seq.load(Ordering::Relaxed), 2);
296
297        // Run stderr capture (continues from seq=2)
298        let stderr_input = b"err1\nerr2\n";
299        capture_output(
300            &stderr_input[..],
301            &stderr_path,
302            "test",
303            ProtoStream::Stderr,
304            tx,
305            1024 * 1024,
306            5,
307            seq.clone(),
308            dummy_sup_rx(),
309        )
310        .await;
311
312        // seq should be at 4
313        assert_eq!(seq.load(Ordering::Relaxed), 4);
314
315        // Verify stderr index has seq 2 and 3
316        let idx_path = idx_path_for(&stderr_path);
317        let mut idx = IndexReader::open(&idx_path).unwrap().unwrap();
318        assert_eq!(idx.read_record(0).unwrap().seq, 2);
319        assert_eq!(idx.read_record(1).unwrap().seq, 3);
320    }
321
322    #[tokio::test]
323    async fn test_capture_output_rotation_creates_idx_companions() {
324        let dir = tempfile::tempdir().unwrap();
325        let log_path = dir.path().join("test.stdout");
326        let (tx, _rx) = broadcast::channel(64);
327        let seq = Arc::new(AtomicU64::new(0));
328
329        // max_bytes=50 will trigger rotation after a few lines
330        // "line NN\n" = 8 bytes, so ~6 lines per file
331        let mut input = String::new();
332        for i in 0..20 {
333            use std::fmt::Write;
334            let _ = writeln!(input, "line {:02}", i);
335        }
336
337        capture_output(
338            input.as_bytes(),
339            &log_path,
340            "test",
341            ProtoStream::Stdout,
342            tx,
343            50,
344            3,
345            seq,
346            dummy_sup_rx(),
347        )
348        .await;
349
350        // Current log file and idx should exist
351        assert!(log_path.exists());
352        assert!(idx_path_for(&log_path).exists());
353
354        // At least one rotated file should exist
355        let rotated = log_path.with_extension("stdout.1");
356        assert!(
357            rotated.exists(),
358            "rotated log .1 should exist after rotation"
359        );
360        assert!(
361            idx_path_for(&rotated).exists(),
362            "rotated idx .1 should exist after rotation"
363        );
364    }
365
366    #[tokio::test]
367    async fn test_capture_output_supervisor_channel() {
368        let dir = tempfile::tempdir().unwrap();
369        let log_path = dir.path().join("test.stdout");
370        let log_path_clone = log_path.clone();
371        let (tx, _rx) = broadcast::channel(16);
372        let seq = Arc::new(AtomicU64::new(0));
373        let (sup_tx, sup_rx) = tokio::sync::mpsc::channel::<String>(16);
374
375        // Use a duplex to control timing: pipe data is consumed first,
376        // then supervisor line is drained after pipe EOF.
377        let (client, server) = tokio::io::duplex(1024);
378        let mut writer = client;
379        use tokio::io::AsyncWriteExt as _;
380        let write_task = tokio::spawn(async move {
381            writer.write_all(b"line 0\nline 1\n").await.unwrap();
382            writer.shutdown().await.unwrap();
383            // Wait for pipe EOF to be processed before sending supervisor line
384            tokio::time::sleep(std::time::Duration::from_millis(50)).await;
385            sup_tx
386                .send("[agent-procs] Restarted".to_string())
387                .await
388                .unwrap();
389            drop(sup_tx);
390        });
391
392        capture_output(
393            server,
394            &log_path_clone,
395            "test",
396            ProtoStream::Stdout,
397            tx,
398            1024 * 1024,
399            5,
400            seq.clone(),
401            sup_rx,
402        )
403        .await;
404
405        write_task.await.unwrap();
406
407        let content = std::fs::read_to_string(&log_path).unwrap();
408        let lines: Vec<&str> = content.lines().collect();
409        assert_eq!(lines.len(), 3);
410        assert_eq!(lines[0], "line 0");
411        assert_eq!(lines[1], "line 1");
412        assert_eq!(lines[2], "[agent-procs] Restarted");
413
414        // Index should have 3 entries
415        let idx_path = idx_path_for(&log_path);
416        let idx = IndexReader::open(&idx_path).unwrap().unwrap();
417        assert_eq!(idx.line_count(), 3);
418
419        // Seq counter should be at 3
420        assert_eq!(seq.load(Ordering::Relaxed), 3);
421    }
422}