1use crate::daemon::log_index::{IndexReader, idx_path_for};
8use crate::tui::app::LineSource;
9use std::io::{self, BufRead, BufReader, Seek, SeekFrom};
10use std::path::{Path, PathBuf};
11use std::time::Instant;
12
13#[derive(Clone)]
15struct Segment {
16 log_path: PathBuf,
17 idx_path: PathBuf,
18 line_count: usize,
19}
20
21struct SegmentCache {
23 stdout: Vec<Segment>,
24 stderr: Vec<Segment>,
25 refreshed_at: Instant,
26}
27
28const SEGMENT_CACHE_TTL_MS: u128 = 500;
30
31pub struct DiskLogReader {
34 log_dir: PathBuf,
35 process: String,
36 cache: Option<SegmentCache>,
37}
38
39impl DiskLogReader {
40 pub fn new(log_dir: PathBuf, process: String) -> Self {
41 Self {
42 log_dir,
43 process,
44 cache: None,
45 }
46 }
47
48 fn segments(&mut self, source: LineSource) -> &[Segment] {
50 let stale = self
51 .cache
52 .as_ref()
53 .is_none_or(|c| c.refreshed_at.elapsed().as_millis() > SEGMENT_CACHE_TTL_MS);
54 if stale {
55 let stdout = Self::discover_segments(&self.log_dir, &self.process, LineSource::Stdout);
56 let stderr = Self::discover_segments(&self.log_dir, &self.process, LineSource::Stderr);
57 self.cache = Some(SegmentCache {
58 stdout,
59 stderr,
60 refreshed_at: Instant::now(),
61 });
62 }
63 let cache = self.cache.as_ref().unwrap();
64 match source {
65 LineSource::Stdout => &cache.stdout,
66 LineSource::Stderr => &cache.stderr,
67 }
68 }
69
70 pub fn line_count(&mut self, source: LineSource) -> usize {
72 self.segments(source).iter().map(|s| s.line_count).sum()
73 }
74
75 pub fn line_count_both(&mut self) -> usize {
77 let stdout: usize = self
79 .segments(LineSource::Stdout)
80 .iter()
81 .map(|s| s.line_count)
82 .sum();
83 let stderr: usize = self
84 .segments(LineSource::Stderr)
85 .iter()
86 .map(|s| s.line_count)
87 .sum();
88 stdout + stderr
89 }
90
91 pub fn read_lines(
94 &mut self,
95 source: LineSource,
96 start: usize,
97 count: usize,
98 ) -> io::Result<Vec<String>> {
99 let segments = self.segments(source).to_vec();
100 read_lines_from_segments(&segments, start, count)
101 }
102
103 pub fn read_interleaved(
105 &mut self,
106 start: usize,
107 count: usize,
108 ) -> io::Result<Vec<(LineSource, String)>> {
109 let merged = self.build_merged_index()?;
110 let end = (start + count).min(merged.len());
111 if start >= end {
112 return Ok(Vec::new());
113 }
114
115 let window = &merged[start..end];
116
117 let mut stdout_needed: Vec<(usize, usize)> = Vec::new(); let mut stderr_needed: Vec<(usize, usize)> = Vec::new();
121 for (i, &(source, stream_line)) in window.iter().enumerate() {
122 match source {
123 LineSource::Stdout => stdout_needed.push((i, stream_line)),
124 LineSource::Stderr => stderr_needed.push((i, stream_line)),
125 }
126 }
127
128 let mut result: Vec<(LineSource, String)> =
129 vec![(LineSource::Stdout, String::new()); end - start];
130
131 for &(result_idx, stream_line) in &stdout_needed {
133 let line = Self::read_single_line_from(self.segments(LineSource::Stdout), stream_line)?;
134 result[result_idx] = (LineSource::Stdout, line);
135 }
136 for &(result_idx, stream_line) in &stderr_needed {
138 let line = Self::read_single_line_from(self.segments(LineSource::Stderr), stream_line)?;
139 result[result_idx] = (LineSource::Stderr, line);
140 }
141
142 Ok(result)
143 }
144
145 fn build_merged_index(&mut self) -> io::Result<Vec<(LineSource, usize)>> {
147 let mut entries: Vec<(u64, LineSource, usize)> = Vec::new();
148
149 for source in [LineSource::Stdout, LineSource::Stderr] {
150 let segments = self.segments(source).to_vec();
151 let mut line_offset = 0;
152 for seg in &segments {
153 if let Ok(Some(mut reader)) = IndexReader::open(&seg.idx_path) {
154 let records = reader.read_range(0, seg.line_count)?;
155 for (i, rec) in records.iter().enumerate() {
156 entries.push((rec.seq, source, line_offset + i));
157 }
158 } else {
159 for i in 0..seg.line_count {
160 entries.push((u64::MAX, source, line_offset + i));
161 }
162 }
163 line_offset += seg.line_count;
164 }
165 }
166
167 entries.sort_by_key(|&(seq, _, _)| seq);
168 Ok(entries
169 .into_iter()
170 .map(|(_, src, line)| (src, line))
171 .collect())
172 }
173
174 fn read_single_line_from(segments: &[Segment], absolute_line: usize) -> io::Result<String> {
176 let mut cumulative = 0;
177 for seg in segments {
178 if absolute_line < cumulative + seg.line_count {
179 let line_in_seg = absolute_line - cumulative;
180 let lines = read_lines_from_segment(&seg.log_path, &seg.idx_path, line_in_seg, 1)?;
181 return lines.into_iter().next().ok_or_else(|| {
182 io::Error::new(io::ErrorKind::UnexpectedEof, "line not found in segment")
183 });
184 }
185 cumulative += seg.line_count;
186 }
187 Err(io::Error::new(
188 io::ErrorKind::InvalidInput,
189 format!(
190 "line {} out of range (total: {})",
191 absolute_line, cumulative
192 ),
193 ))
194 }
195
196 fn discover_segments(log_dir: &Path, process: &str, source: LineSource) -> Vec<Segment> {
198 let stream = match source {
199 LineSource::Stdout => "stdout",
200 LineSource::Stderr => "stderr",
201 };
202 let base = log_dir.join(format!("{}.{}", process, stream));
203
204 let mut rotated: Vec<(u32, Segment)> = Vec::new();
205 for n in 1u32.. {
206 let log_path = base.with_extension(format!("{}.{}", stream, n));
207 if !log_path.exists() {
208 break;
209 }
210 let idx_path = idx_path_for(&log_path);
211 let line_count = idx_line_count(&idx_path)
212 .unwrap_or_else(|| count_lines_in_file(&log_path).unwrap_or(0));
213 rotated.push((
214 n,
215 Segment {
216 log_path,
217 idx_path,
218 line_count,
219 },
220 ));
221 }
222
223 rotated.sort_by(|a, b| b.0.cmp(&a.0));
225 let mut segments: Vec<Segment> = rotated.into_iter().map(|(_, s)| s).collect();
226
227 if base.exists() {
228 let idx_path = idx_path_for(&base);
229 let line_count = idx_line_count(&idx_path)
230 .unwrap_or_else(|| count_lines_in_file(&base).unwrap_or(0));
231 segments.push(Segment {
232 log_path: base,
233 idx_path,
234 line_count,
235 });
236 }
237
238 segments
239 }
240}
241
242fn read_lines_from_segments(
244 segments: &[Segment],
245 start: usize,
246 count: usize,
247) -> io::Result<Vec<String>> {
248 let mut result = Vec::with_capacity(count);
249 let mut cumulative = 0;
250
251 for seg in segments {
252 let seg_end = cumulative + seg.line_count;
253 let window_end = start + count;
254
255 if seg_end <= start || cumulative >= window_end {
256 cumulative = seg_end;
257 continue;
258 }
259
260 let read_start = start.max(cumulative) - cumulative;
261 let read_end = window_end.min(seg_end) - cumulative;
262 let lines = read_lines_from_segment(
263 &seg.log_path,
264 &seg.idx_path,
265 read_start,
266 read_end - read_start,
267 )?;
268 result.extend(lines);
269
270 cumulative = seg_end;
271 }
272 Ok(result)
273}
274
275fn idx_line_count(idx_path: &Path) -> Option<usize> {
277 IndexReader::line_count_from_metadata(idx_path).ok()
278}
279
280fn count_lines_in_file(path: &Path) -> io::Result<usize> {
282 let file = std::fs::File::open(path)?;
283 Ok(BufReader::new(file).lines().count())
284}
285
286fn read_lines_from_segment(
289 log_path: &Path,
290 idx_path: &Path,
291 start: usize,
292 count: usize,
293) -> io::Result<Vec<String>> {
294 if count == 0 {
295 return Ok(Vec::new());
296 }
297
298 if let Ok(Some(mut idx_reader)) = IndexReader::open(idx_path) {
300 let records = idx_reader.read_range(start, count)?;
301 if records.is_empty() {
302 return Ok(Vec::new());
303 }
304 let file = std::fs::File::open(log_path)?;
305 let mut reader = BufReader::new(file);
306 reader.seek(SeekFrom::Start(records[0].byte_offset))?;
308 let mut result = Vec::with_capacity(records.len());
309 for _ in 0..records.len() {
310 let mut line = String::new();
311 reader.read_line(&mut line)?;
312 if line.ends_with('\n') {
313 line.pop();
314 }
315 result.push(line);
316 }
317 return Ok(result);
318 }
319
320 let file = std::fs::File::open(log_path)?;
322 let lines: Vec<String> = BufReader::new(file)
323 .lines()
324 .skip(start)
325 .take(count)
326 .collect::<io::Result<Vec<_>>>()?;
327 Ok(lines)
328}
329
330#[cfg(test)]
331mod tests {
332 use super::*;
333 use crate::daemon::log_index::{IndexRecord, IndexWriter};
334
335 fn create_log_with_index(dir: &Path, filename: &str, lines: &[&str], seq_base: u64) {
337 let log_path = dir.join(filename);
338 let idx_path = idx_path_for(&log_path);
339
340 let mut log_content = String::new();
341 let mut writer = IndexWriter::create(&idx_path, seq_base).unwrap();
342 let mut offset: u64 = 0;
343
344 for (i, line) in lines.iter().enumerate() {
345 writer
346 .append(IndexRecord {
347 byte_offset: offset,
348 seq: seq_base + i as u64,
349 })
350 .unwrap();
351 log_content.push_str(line);
352 log_content.push('\n');
353 offset += line.len() as u64 + 1;
354 }
355 writer.flush().unwrap();
356 std::fs::write(&log_path, log_content).unwrap();
357 }
358
359 #[test]
360 fn test_line_count_single_file() {
361 let dir = tempfile::tempdir().unwrap();
362 create_log_with_index(dir.path(), "test.stdout", &["line1", "line2", "line3"], 0);
363
364 let mut reader = DiskLogReader::new(dir.path().to_path_buf(), "test".to_string());
365 assert_eq!(reader.line_count(LineSource::Stdout), 3);
366 assert_eq!(reader.line_count(LineSource::Stderr), 0);
367 }
368
369 #[test]
370 fn test_read_lines_single_file() {
371 let dir = tempfile::tempdir().unwrap();
372 create_log_with_index(
373 dir.path(),
374 "test.stdout",
375 &["aaa", "bbb", "ccc", "ddd", "eee"],
376 0,
377 );
378
379 let mut reader = DiskLogReader::new(dir.path().to_path_buf(), "test".to_string());
380 let lines = reader.read_lines(LineSource::Stdout, 1, 3).unwrap();
381 assert_eq!(lines, vec!["bbb", "ccc", "ddd"]);
382 }
383
384 #[test]
385 fn test_read_lines_across_rotated() {
386 let dir = tempfile::tempdir().unwrap();
387 create_log_with_index(dir.path(), "test.stdout.2", &["old1", "old2"], 0);
389 create_log_with_index(dir.path(), "test.stdout.1", &["mid1", "mid2"], 2);
391 create_log_with_index(dir.path(), "test.stdout", &["new1", "new2"], 4);
393
394 let mut reader = DiskLogReader::new(dir.path().to_path_buf(), "test".to_string());
395 assert_eq!(reader.line_count(LineSource::Stdout), 6);
396
397 let lines = reader.read_lines(LineSource::Stdout, 1, 4).unwrap();
399 assert_eq!(lines, vec!["old2", "mid1", "mid2", "new1"]);
400 }
401
402 #[test]
403 fn test_read_interleaved() {
404 let dir = tempfile::tempdir().unwrap();
405 create_log_with_index(dir.path(), "test.stdout", &["out0", "out2", "out4"], 0);
407 let idx_path = idx_path_for(&dir.path().join("test.stdout"));
409 let mut writer = IndexWriter::create(&idx_path, 0).unwrap();
410 writer
411 .append(IndexRecord {
412 byte_offset: 0,
413 seq: 0,
414 })
415 .unwrap();
416 writer
417 .append(IndexRecord {
418 byte_offset: 5,
419 seq: 2,
420 })
421 .unwrap();
422 writer
423 .append(IndexRecord {
424 byte_offset: 10,
425 seq: 4,
426 })
427 .unwrap();
428 writer.flush().unwrap();
429
430 let stderr_content = "err1\nerr3\n";
432 std::fs::write(dir.path().join("test.stderr"), stderr_content).unwrap();
433 let idx_path = idx_path_for(&dir.path().join("test.stderr"));
434 let mut writer = IndexWriter::create(&idx_path, 1).unwrap();
435 writer
436 .append(IndexRecord {
437 byte_offset: 0,
438 seq: 1,
439 })
440 .unwrap();
441 writer
442 .append(IndexRecord {
443 byte_offset: 5,
444 seq: 3,
445 })
446 .unwrap();
447 writer.flush().unwrap();
448
449 let mut reader = DiskLogReader::new(dir.path().to_path_buf(), "test".to_string());
450 assert_eq!(reader.line_count_both(), 5);
451
452 let interleaved = reader.read_interleaved(0, 5).unwrap();
453 assert_eq!(interleaved.len(), 5);
454 assert_eq!(interleaved[0], (LineSource::Stdout, "out0".to_string()));
455 assert_eq!(interleaved[1], (LineSource::Stderr, "err1".to_string()));
456 assert_eq!(interleaved[2], (LineSource::Stdout, "out2".to_string()));
457 assert_eq!(interleaved[3], (LineSource::Stderr, "err3".to_string()));
458 assert_eq!(interleaved[4], (LineSource::Stdout, "out4".to_string()));
459 }
460
461 #[test]
462 fn test_fallback_no_index() {
463 let dir = tempfile::tempdir().unwrap();
464 std::fs::write(dir.path().join("test.stdout"), "aaa\nbbb\nccc\n").unwrap();
466
467 let mut reader = DiskLogReader::new(dir.path().to_path_buf(), "test".to_string());
468 assert_eq!(reader.line_count(LineSource::Stdout), 3);
469
470 let lines = reader.read_lines(LineSource::Stdout, 1, 2).unwrap();
471 assert_eq!(lines, vec!["bbb", "ccc"]);
472 }
473
474 #[test]
475 fn test_empty_dir() {
476 let dir = tempfile::tempdir().unwrap();
477 let mut reader = DiskLogReader::new(dir.path().to_path_buf(), "test".to_string());
478 assert_eq!(reader.line_count(LineSource::Stdout), 0);
479 assert_eq!(reader.line_count_both(), 0);
480 }
481}