1use crate::daemon::log_index::{IndexRecord, IndexWriter, idx_path_for};
2use crate::protocol::Stream as ProtoStream;
3use std::path::Path;
4use std::sync::Arc;
5use std::sync::atomic::{AtomicU64, Ordering};
6use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
7use tokio::sync::broadcast;
8
9pub const DEFAULT_MAX_ROTATED_FILES: u32 = 5;
11
12#[derive(Debug, Clone)]
14pub struct OutputLine {
15 pub process: String,
16 pub stream: ProtoStream,
17 pub line: String,
18}
19
20#[allow(clippy::too_many_arguments)]
23pub async fn capture_output<R: tokio::io::AsyncRead + Unpin>(
24 reader: R,
25 log_path: &Path,
26 process_name: &str,
27 stream: ProtoStream,
28 tx: broadcast::Sender<OutputLine>,
29 max_bytes: u64,
30 max_rotated_files: u32,
31 seq: Arc<AtomicU64>,
32) {
33 let mut lines = BufReader::new(reader).lines();
34 let mut file = match tokio::fs::File::create(log_path).await {
35 Ok(f) => f,
36 Err(e) => {
37 tracing::warn!(path = %log_path.display(), process = %process_name, error = %e, "cannot create log file");
38 return;
39 }
40 };
41
42 let idx_path = idx_path_for(log_path);
43 let seq_base = seq.load(Ordering::Relaxed);
44 let mut idx_writer = match IndexWriter::create(&idx_path, seq_base) {
45 Ok(w) => w,
46 Err(e) => {
47 tracing::warn!(path = %idx_path.display(), error = %e, "cannot create index file");
48 return;
49 }
50 };
51
52 let mut bytes_written: u64 = 0;
53 let mut lines_since_idx_flush: u32 = 0;
54
55 while let Ok(Some(line)) = lines.next_line().await {
56 let line_bytes = line.len() as u64 + 1; if max_bytes > 0 && bytes_written + line_bytes > max_bytes {
59 let _ = idx_writer.flush();
61
62 drop(file);
64 drop(idx_writer);
65 rotate_log_files(log_path, max_rotated_files).await;
66 file = match tokio::fs::File::create(log_path).await {
67 Ok(f) => f,
68 Err(e) => {
69 tracing::warn!(path = %log_path.display(), process = %process_name, error = %e, "cannot recreate log file after rotation");
70 return;
71 }
72 };
73 let new_seq_base = seq.load(Ordering::Relaxed);
74 idx_writer = match IndexWriter::create(&idx_path, new_seq_base) {
75 Ok(w) => w,
76 Err(e) => {
77 tracing::warn!(path = %idx_path.display(), error = %e, "cannot recreate index file after rotation");
78 return;
79 }
80 };
81 bytes_written = 0;
82 lines_since_idx_flush = 0;
83 }
84
85 let byte_offset = bytes_written;
87 let line_seq = seq.fetch_add(1, Ordering::Relaxed);
88
89 let _ = idx_writer.append(IndexRecord {
91 byte_offset,
92 seq: line_seq,
93 });
94
95 let _ = file.write_all(line.as_bytes()).await;
96 let _ = file.write_all(b"\n").await;
97 let _ = file.flush().await;
98 bytes_written += line_bytes;
99
100 lines_since_idx_flush += 1;
103 if lines_since_idx_flush >= 64 {
104 let _ = idx_writer.flush();
105 lines_since_idx_flush = 0;
106 }
107
108 let _ = tx.send(OutputLine {
110 process: process_name.to_string(),
111 stream,
112 line,
113 });
114 }
115 let _ = idx_writer.flush();
117}
118
119async fn rotate_log_files(log_path: &Path, max_rotated_files: u32) {
123 let ext = log_path
124 .extension()
125 .unwrap_or_default()
126 .to_string_lossy()
127 .to_string();
128
129 for i in (1..max_rotated_files).rev() {
131 let from = log_path.with_extension(format!("{}.{}", ext, i));
132 let to = log_path.with_extension(format!("{}.{}", ext, i + 1));
133 let _ = tokio::fs::rename(&from, &to).await;
134 let idx_from = idx_path_for(&from);
136 let idx_to = idx_path_for(&to);
137 let _ = tokio::fs::rename(&idx_from, &idx_to).await;
138 }
139
140 let rotated_1 = log_path.with_extension(format!("{}.1", ext));
142 let _ = tokio::fs::rename(log_path, &rotated_1).await;
143 let idx_current = idx_path_for(log_path);
145 let idx_rotated_1 = idx_path_for(&rotated_1);
146 let _ = tokio::fs::rename(&idx_current, &idx_rotated_1).await;
147
148 let excess = log_path.with_extension(format!("{}.{}", ext, max_rotated_files + 1));
150 let _ = tokio::fs::remove_file(&excess).await;
151 let idx_excess = idx_path_for(&excess);
152 let _ = tokio::fs::remove_file(&idx_excess).await;
153}
154
155#[cfg(test)]
156mod tests {
157 use super::*;
158 use crate::daemon::log_index::IndexReader;
159 use std::sync::atomic::AtomicU64;
160
161 #[tokio::test]
162 async fn test_capture_output_creates_index() {
163 let dir = tempfile::tempdir().unwrap();
164 let log_path = dir.path().join("test.stdout");
165 let (tx, _rx) = broadcast::channel(16);
166 let seq = Arc::new(AtomicU64::new(0));
167
168 let input = b"line 0\nline 1\nline 2\nline 3\nline 4\n";
170 let reader = &input[..];
171 capture_output(
172 reader,
173 &log_path,
174 "test",
175 ProtoStream::Stdout,
176 tx,
177 1024 * 1024,
178 5,
179 seq.clone(),
180 )
181 .await;
182
183 let content = std::fs::read_to_string(&log_path).unwrap();
185 assert_eq!(content.lines().count(), 5);
186
187 let idx_path = idx_path_for(&log_path);
189 let mut idx = IndexReader::open(&idx_path).unwrap().unwrap();
190 assert_eq!(idx.line_count(), 5);
191
192 let r0 = idx.read_record(0).unwrap();
194 assert_eq!(r0.byte_offset, 0);
195 assert_eq!(r0.seq, 0);
196
197 let r1 = idx.read_record(1).unwrap();
199 assert_eq!(r1.byte_offset, 7);
200 assert_eq!(r1.seq, 1);
201
202 assert_eq!(seq.load(Ordering::Relaxed), 5);
204 }
205
206 #[tokio::test]
207 async fn test_capture_output_shared_seq_counter() {
208 let dir = tempfile::tempdir().unwrap();
209 let stdout_path = dir.path().join("test.stdout");
210 let stderr_path = dir.path().join("test.stderr");
211 let (tx, _rx) = broadcast::channel(16);
212 let seq = Arc::new(AtomicU64::new(0));
213
214 let stdout_input = b"out1\nout2\n";
216 capture_output(
217 &stdout_input[..],
218 &stdout_path,
219 "test",
220 ProtoStream::Stdout,
221 tx.clone(),
222 1024 * 1024,
223 5,
224 seq.clone(),
225 )
226 .await;
227
228 assert_eq!(seq.load(Ordering::Relaxed), 2);
230
231 let stderr_input = b"err1\nerr2\n";
233 capture_output(
234 &stderr_input[..],
235 &stderr_path,
236 "test",
237 ProtoStream::Stderr,
238 tx,
239 1024 * 1024,
240 5,
241 seq.clone(),
242 )
243 .await;
244
245 assert_eq!(seq.load(Ordering::Relaxed), 4);
247
248 let idx_path = idx_path_for(&stderr_path);
250 let mut idx = IndexReader::open(&idx_path).unwrap().unwrap();
251 assert_eq!(idx.read_record(0).unwrap().seq, 2);
252 assert_eq!(idx.read_record(1).unwrap().seq, 3);
253 }
254
255 #[tokio::test]
256 async fn test_capture_output_rotation_creates_idx_companions() {
257 let dir = tempfile::tempdir().unwrap();
258 let log_path = dir.path().join("test.stdout");
259 let (tx, _rx) = broadcast::channel(64);
260 let seq = Arc::new(AtomicU64::new(0));
261
262 let mut input = String::new();
265 for i in 0..20 {
266 use std::fmt::Write;
267 let _ = writeln!(input, "line {:02}", i);
268 }
269
270 capture_output(
271 input.as_bytes(),
272 &log_path,
273 "test",
274 ProtoStream::Stdout,
275 tx,
276 50,
277 3,
278 seq,
279 )
280 .await;
281
282 assert!(log_path.exists());
284 assert!(idx_path_for(&log_path).exists());
285
286 let rotated = log_path.with_extension("stdout.1");
288 assert!(
289 rotated.exists(),
290 "rotated log .1 should exist after rotation"
291 );
292 assert!(
293 idx_path_for(&rotated).exists(),
294 "rotated idx .1 should exist after rotation"
295 );
296 }
297}