1use crate::daemon::log_index::{IndexRecord, IndexWriter, idx_path_for};
2use crate::protocol::Stream as ProtoStream;
3use std::path::Path;
4use std::sync::Arc;
5use std::sync::atomic::{AtomicU64, Ordering};
6use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
7use tokio::sync::broadcast;
8
9pub const DEFAULT_MAX_ROTATED_FILES: u32 = 5;
11
12#[derive(Debug, Clone)]
14pub struct OutputLine {
15 pub process: String,
16 pub stream: ProtoStream,
17 pub line: String,
18}
19
20struct LogWriteState<'a> {
22 file: Option<tokio::fs::File>,
23 idx_writer: IndexWriter,
24 bytes_written: u64,
25 lines_since_idx_flush: u32,
26 log_path: &'a Path,
28 process_name: &'a str,
29 stream: ProtoStream,
30 tx: &'a broadcast::Sender<OutputLine>,
31 max_bytes: u64,
32 max_rotated_files: u32,
33 seq: &'a AtomicU64,
34}
35
36impl LogWriteState<'_> {
37 async fn write_line(&mut self, line: &str) {
38 let line_bytes = line.len() as u64 + 1;
39 if self.max_bytes > 0 && self.bytes_written + line_bytes > self.max_bytes {
40 let _ = self.idx_writer.flush();
41 self.file = None;
43
44 rotate_log_files(self.log_path, self.max_rotated_files).await;
45 self.file = match tokio::fs::File::create(self.log_path).await {
46 Ok(f) => Some(f),
47 Err(e) => {
48 tracing::warn!(path = %self.log_path.display(), process = %self.process_name, error = %e, "cannot recreate log file after rotation");
49 return;
50 }
51 };
52 let idx_path = idx_path_for(self.log_path);
53 let new_seq_base = self.seq.load(Ordering::Relaxed);
54 self.idx_writer = match IndexWriter::create(&idx_path, new_seq_base) {
55 Ok(w) => w,
56 Err(e) => {
57 tracing::warn!(path = %idx_path.display(), error = %e, "cannot recreate index file after rotation");
58 return;
59 }
60 };
61 self.bytes_written = 0;
62 self.lines_since_idx_flush = 0;
63 }
64
65 let Some(ref mut file) = self.file else {
66 return;
67 };
68
69 let byte_offset = self.bytes_written;
70 let line_seq = self.seq.fetch_add(1, Ordering::Relaxed);
71 let _ = self.idx_writer.append(IndexRecord {
72 byte_offset,
73 seq: line_seq,
74 });
75
76 let _ = file.write_all(line.as_bytes()).await;
77 let _ = file.write_all(b"\n").await;
78 let _ = file.flush().await;
79 self.bytes_written += line_bytes;
80
81 self.lines_since_idx_flush += 1;
82 if self.lines_since_idx_flush >= 64 {
83 let _ = self.idx_writer.flush();
84 self.lines_since_idx_flush = 0;
85 }
86
87 let _ = self.tx.send(OutputLine {
88 process: self.process_name.to_string(),
89 stream: self.stream,
90 line: line.to_string(),
91 });
92 }
93}
94
95#[allow(clippy::too_many_arguments)]
102pub async fn capture_output<R: tokio::io::AsyncRead + Unpin>(
103 reader: R,
104 log_path: &Path,
105 process_name: &str,
106 stream: ProtoStream,
107 tx: broadcast::Sender<OutputLine>,
108 max_bytes: u64,
109 max_rotated_files: u32,
110 seq: Arc<AtomicU64>,
111 mut sup_rx: tokio::sync::mpsc::Receiver<String>,
112) {
113 let mut lines = BufReader::new(reader).lines();
114 let file = match tokio::fs::File::create(log_path).await {
115 Ok(f) => f,
116 Err(e) => {
117 tracing::warn!(path = %log_path.display(), process = %process_name, error = %e, "cannot create log file");
118 return;
119 }
120 };
121
122 let idx_path = idx_path_for(log_path);
123 let seq_base = seq.load(Ordering::Relaxed);
124 let idx_writer = match IndexWriter::create(&idx_path, seq_base) {
125 Ok(w) => w,
126 Err(e) => {
127 tracing::warn!(path = %idx_path.display(), error = %e, "cannot create index file");
128 return;
129 }
130 };
131
132 let mut state = LogWriteState {
133 file: Some(file),
134 idx_writer,
135 bytes_written: 0,
136 lines_since_idx_flush: 0,
137 log_path,
138 process_name,
139 stream,
140 tx: &tx,
141 max_bytes,
142 max_rotated_files,
143 seq: &seq,
144 };
145
146 loop {
148 let line = tokio::select! {
149 result = lines.next_line() => {
150 match result {
151 Ok(Some(line)) => line,
152 _ => break, }
154 }
155 Some(sup_line) = sup_rx.recv() => sup_line,
156 };
157
158 state.write_line(&line).await;
159 }
160
161 while let Some(sup_line) = sup_rx.recv().await {
163 state.write_line(&sup_line).await;
164 }
165
166 let _ = state.idx_writer.flush();
168}
169
170pub async fn rotate_if_exists(log_path: &Path) {
172 if log_path.exists() {
173 rotate_log_files(log_path, DEFAULT_MAX_ROTATED_FILES).await;
174 }
175}
176
177pub(crate) async fn rotate_log_files(log_path: &Path, max_rotated_files: u32) {
181 let ext = log_path
182 .extension()
183 .unwrap_or_default()
184 .to_string_lossy()
185 .to_string();
186
187 for i in (1..max_rotated_files).rev() {
189 let from = log_path.with_extension(format!("{}.{}", ext, i));
190 let to = log_path.with_extension(format!("{}.{}", ext, i + 1));
191 let _ = tokio::fs::rename(&from, &to).await;
192 let idx_from = idx_path_for(&from);
194 let idx_to = idx_path_for(&to);
195 let _ = tokio::fs::rename(&idx_from, &idx_to).await;
196 }
197
198 let rotated_1 = log_path.with_extension(format!("{}.1", ext));
200 let _ = tokio::fs::rename(log_path, &rotated_1).await;
201 let idx_current = idx_path_for(log_path);
203 let idx_rotated_1 = idx_path_for(&rotated_1);
204 let _ = tokio::fs::rename(&idx_current, &idx_rotated_1).await;
205
206 let excess = log_path.with_extension(format!("{}.{}", ext, max_rotated_files + 1));
208 let _ = tokio::fs::remove_file(&excess).await;
209 let idx_excess = idx_path_for(&excess);
210 let _ = tokio::fs::remove_file(&idx_excess).await;
211}
212
213#[cfg(test)]
214mod tests {
215 use super::*;
216 use crate::daemon::log_index::IndexReader;
217 use std::sync::atomic::AtomicU64;
218
219 fn dummy_sup_rx() -> tokio::sync::mpsc::Receiver<String> {
221 let (_tx, rx) = tokio::sync::mpsc::channel::<String>(1);
222 rx
223 }
224
225 #[tokio::test]
226 async fn test_capture_output_creates_index() {
227 let dir = tempfile::tempdir().unwrap();
228 let log_path = dir.path().join("test.stdout");
229 let (tx, _rx) = broadcast::channel(16);
230 let seq = Arc::new(AtomicU64::new(0));
231
232 let input = b"line 0\nline 1\nline 2\nline 3\nline 4\n";
234 let reader = &input[..];
235 capture_output(
236 reader,
237 &log_path,
238 "test",
239 ProtoStream::Stdout,
240 tx,
241 1024 * 1024,
242 5,
243 seq.clone(),
244 dummy_sup_rx(),
245 )
246 .await;
247
248 let content = std::fs::read_to_string(&log_path).unwrap();
250 assert_eq!(content.lines().count(), 5);
251
252 let idx_path = idx_path_for(&log_path);
254 let mut idx = IndexReader::open(&idx_path).unwrap().unwrap();
255 assert_eq!(idx.line_count(), 5);
256
257 let r0 = idx.read_record(0).unwrap();
259 assert_eq!(r0.byte_offset, 0);
260 assert_eq!(r0.seq, 0);
261
262 let r1 = idx.read_record(1).unwrap();
264 assert_eq!(r1.byte_offset, 7);
265 assert_eq!(r1.seq, 1);
266
267 assert_eq!(seq.load(Ordering::Relaxed), 5);
269 }
270
271 #[tokio::test]
272 async fn test_capture_output_shared_seq_counter() {
273 let dir = tempfile::tempdir().unwrap();
274 let stdout_path = dir.path().join("test.stdout");
275 let stderr_path = dir.path().join("test.stderr");
276 let (tx, _rx) = broadcast::channel(16);
277 let seq = Arc::new(AtomicU64::new(0));
278
279 let stdout_input = b"out1\nout2\n";
281 capture_output(
282 &stdout_input[..],
283 &stdout_path,
284 "test",
285 ProtoStream::Stdout,
286 tx.clone(),
287 1024 * 1024,
288 5,
289 seq.clone(),
290 dummy_sup_rx(),
291 )
292 .await;
293
294 assert_eq!(seq.load(Ordering::Relaxed), 2);
296
297 let stderr_input = b"err1\nerr2\n";
299 capture_output(
300 &stderr_input[..],
301 &stderr_path,
302 "test",
303 ProtoStream::Stderr,
304 tx,
305 1024 * 1024,
306 5,
307 seq.clone(),
308 dummy_sup_rx(),
309 )
310 .await;
311
312 assert_eq!(seq.load(Ordering::Relaxed), 4);
314
315 let idx_path = idx_path_for(&stderr_path);
317 let mut idx = IndexReader::open(&idx_path).unwrap().unwrap();
318 assert_eq!(idx.read_record(0).unwrap().seq, 2);
319 assert_eq!(idx.read_record(1).unwrap().seq, 3);
320 }
321
322 #[tokio::test]
323 async fn test_capture_output_rotation_creates_idx_companions() {
324 let dir = tempfile::tempdir().unwrap();
325 let log_path = dir.path().join("test.stdout");
326 let (tx, _rx) = broadcast::channel(64);
327 let seq = Arc::new(AtomicU64::new(0));
328
329 let mut input = String::new();
332 for i in 0..20 {
333 use std::fmt::Write;
334 let _ = writeln!(input, "line {:02}", i);
335 }
336
337 capture_output(
338 input.as_bytes(),
339 &log_path,
340 "test",
341 ProtoStream::Stdout,
342 tx,
343 50,
344 3,
345 seq,
346 dummy_sup_rx(),
347 )
348 .await;
349
350 assert!(log_path.exists());
352 assert!(idx_path_for(&log_path).exists());
353
354 let rotated = log_path.with_extension("stdout.1");
356 assert!(
357 rotated.exists(),
358 "rotated log .1 should exist after rotation"
359 );
360 assert!(
361 idx_path_for(&rotated).exists(),
362 "rotated idx .1 should exist after rotation"
363 );
364 }
365
366 #[tokio::test]
367 async fn test_capture_output_supervisor_channel() {
368 let dir = tempfile::tempdir().unwrap();
369 let log_path = dir.path().join("test.stdout");
370 let log_path_clone = log_path.clone();
371 let (tx, _rx) = broadcast::channel(16);
372 let seq = Arc::new(AtomicU64::new(0));
373 let (sup_tx, sup_rx) = tokio::sync::mpsc::channel::<String>(16);
374
375 let (client, server) = tokio::io::duplex(1024);
378 let mut writer = client;
379 use tokio::io::AsyncWriteExt as _;
380 let write_task = tokio::spawn(async move {
381 writer.write_all(b"line 0\nline 1\n").await.unwrap();
382 writer.shutdown().await.unwrap();
383 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
385 sup_tx
386 .send("[agent-procs] Restarted".to_string())
387 .await
388 .unwrap();
389 drop(sup_tx);
390 });
391
392 capture_output(
393 server,
394 &log_path_clone,
395 "test",
396 ProtoStream::Stdout,
397 tx,
398 1024 * 1024,
399 5,
400 seq.clone(),
401 sup_rx,
402 )
403 .await;
404
405 write_task.await.unwrap();
406
407 let content = std::fs::read_to_string(&log_path).unwrap();
408 let lines: Vec<&str> = content.lines().collect();
409 assert_eq!(lines.len(), 3);
410 assert_eq!(lines[0], "line 0");
411 assert_eq!(lines[1], "line 1");
412 assert_eq!(lines[2], "[agent-procs] Restarted");
413
414 let idx_path = idx_path_for(&log_path);
416 let idx = IndexReader::open(&idx_path).unwrap().unwrap();
417 assert_eq!(idx.line_count(), 3);
418
419 assert_eq!(seq.load(Ordering::Relaxed), 3);
421 }
422}