1use crate::daemon::log_index::{IndexReader, idx_path_for};
8use crate::tui::app::LineSource;
9use std::io::{self, BufRead, BufReader, Seek, SeekFrom};
10use std::path::{Path, PathBuf};
11use std::time::Instant;
12
13#[derive(Clone)]
15struct Segment {
16 log_path: PathBuf,
17 idx_path: PathBuf,
18 line_count: usize,
19}
20
21struct SegmentCache {
23 stdout: Vec<Segment>,
24 stderr: Vec<Segment>,
25 refreshed_at: Instant,
26}
27
28const SEGMENT_CACHE_TTL_MS: u128 = 500;
30
31use crate::tui::app::StreamMode;
32
33pub struct DiskLogReader {
36 log_dir: PathBuf,
37 process: String,
38 cache: Option<SegmentCache>,
39}
40
41impl DiskLogReader {
42 pub fn new(log_dir: PathBuf, process: String) -> Self {
43 Self {
44 log_dir,
45 process,
46 cache: None,
47 }
48 }
49
50 fn segments(&mut self, source: LineSource) -> &[Segment] {
52 let stale = self
53 .cache
54 .as_ref()
55 .is_none_or(|c| c.refreshed_at.elapsed().as_millis() > SEGMENT_CACHE_TTL_MS);
56 if stale {
57 let stdout = Self::discover_segments(&self.log_dir, &self.process, LineSource::Stdout);
58 let stderr = Self::discover_segments(&self.log_dir, &self.process, LineSource::Stderr);
59 self.cache = Some(SegmentCache {
60 stdout,
61 stderr,
62 refreshed_at: Instant::now(),
63 });
64 }
65 let cache = self.cache.as_ref().unwrap();
66 match source {
67 LineSource::Stdout => &cache.stdout,
68 LineSource::Stderr => &cache.stderr,
69 }
70 }
71
72 pub fn line_count(&mut self, source: LineSource) -> usize {
74 self.segments(source).iter().map(|s| s.line_count).sum()
75 }
76
77 pub fn line_count_both(&mut self) -> usize {
79 let stdout: usize = self
81 .segments(LineSource::Stdout)
82 .iter()
83 .map(|s| s.line_count)
84 .sum();
85 let stderr: usize = self
86 .segments(LineSource::Stderr)
87 .iter()
88 .map(|s| s.line_count)
89 .sum();
90 stdout + stderr
91 }
92
93 pub fn read_lines(
96 &mut self,
97 source: LineSource,
98 start: usize,
99 count: usize,
100 ) -> io::Result<Vec<String>> {
101 let segments = self.segments(source).to_vec();
102 read_lines_from_segments(&segments, start, count)
103 }
104
105 pub fn read_interleaved(
107 &mut self,
108 start: usize,
109 count: usize,
110 ) -> io::Result<Vec<(LineSource, String)>> {
111 let merged = self.build_merged_index()?;
112 let end = (start + count).min(merged.len());
113 if start >= end {
114 return Ok(Vec::new());
115 }
116
117 let window = &merged[start..end];
118
119 let mut stdout_needed: Vec<(usize, usize)> = Vec::new(); let mut stderr_needed: Vec<(usize, usize)> = Vec::new();
123 for (i, &(source, stream_line)) in window.iter().enumerate() {
124 match source {
125 LineSource::Stdout => stdout_needed.push((i, stream_line)),
126 LineSource::Stderr => stderr_needed.push((i, stream_line)),
127 }
128 }
129
130 let mut result: Vec<(LineSource, String)> =
131 vec![(LineSource::Stdout, String::new()); end - start];
132
133 for &(result_idx, stream_line) in &stdout_needed {
135 let line = Self::read_single_line_from(self.segments(LineSource::Stdout), stream_line)?;
136 result[result_idx] = (LineSource::Stdout, line);
137 }
138 for &(result_idx, stream_line) in &stderr_needed {
140 let line = Self::read_single_line_from(self.segments(LineSource::Stderr), stream_line)?;
141 result[result_idx] = (LineSource::Stderr, line);
142 }
143
144 Ok(result)
145 }
146
147 pub fn scan_matching_lines(&mut self, filter: &str, mode: StreamMode) -> Vec<usize> {
151 self.scan_matching_lines_from(filter, mode, 0)
152 }
153
154 pub fn scan_matching_lines_from(
157 &mut self,
158 filter: &str,
159 mode: StreamMode,
160 start_from: usize,
161 ) -> Vec<usize> {
162 match mode {
163 StreamMode::Stdout => self.scan_single_stream(filter, LineSource::Stdout, start_from),
164 StreamMode::Stderr => self.scan_single_stream(filter, LineSource::Stderr, start_from),
165 StreamMode::Both => self.scan_interleaved(filter, start_from),
166 }
167 }
168
169 fn scan_single_stream(
170 &mut self,
171 filter: &str,
172 source: LineSource,
173 start_from: usize,
174 ) -> Vec<usize> {
175 let total = self.line_count(source);
176 if total <= start_from {
177 return Vec::new();
178 }
179 let mut matching = Vec::new();
180 const CHUNK: usize = 1000;
181 let mut offset = start_from;
182 while offset < total {
183 let count = CHUNK.min(total - offset);
184 if let Ok(lines) = self.read_lines(source, offset, count) {
185 for (i, line) in lines.iter().enumerate() {
186 if line.contains(filter) {
187 matching.push(offset + i);
188 }
189 }
190 }
191 offset += count;
192 }
193 matching
194 }
195
196 fn scan_interleaved(&mut self, filter: &str, start_from: usize) -> Vec<usize> {
197 let total = self.line_count_both();
198 if total <= start_from {
199 return Vec::new();
200 }
201 let mut matching = Vec::new();
202 const CHUNK: usize = 1000;
203 let mut offset = start_from;
204 while offset < total {
205 let count = CHUNK.min(total - offset);
206 if let Ok(lines) = self.read_interleaved(offset, count) {
207 for (i, (_, line)) in lines.iter().enumerate() {
208 if line.contains(filter) {
209 matching.push(offset + i);
210 }
211 }
212 }
213 offset += count;
214 }
215 matching
216 }
217
218 pub fn read_scattered_lines(
221 &mut self,
222 mode: StreamMode,
223 line_numbers: &[usize],
224 ) -> Vec<(LineSource, String)> {
225 if line_numbers.is_empty() {
226 return Vec::new();
227 }
228 match mode {
229 StreamMode::Stdout | StreamMode::Stderr => {
230 let source = if mode == StreamMode::Stdout {
231 LineSource::Stdout
232 } else {
233 LineSource::Stderr
234 };
235 let mut result = Vec::with_capacity(line_numbers.len());
236 for &ln in line_numbers {
237 if let Ok(lines) = self.read_lines(source, ln, 1)
238 && let Some(line) = lines.into_iter().next()
239 {
240 result.push((source, line));
241 continue;
242 }
243 result.push((source, String::new()));
244 }
245 result
246 }
247 StreamMode::Both => {
248 let mut result = Vec::with_capacity(line_numbers.len());
249 for &ln in line_numbers {
250 if let Ok(lines) = self.read_interleaved(ln, 1)
251 && let Some(entry) = lines.into_iter().next()
252 {
253 result.push(entry);
254 continue;
255 }
256 result.push((LineSource::Stdout, String::new()));
257 }
258 result
259 }
260 }
261 }
262
263 fn build_merged_index(&mut self) -> io::Result<Vec<(LineSource, usize)>> {
265 let mut entries: Vec<(u64, LineSource, usize)> = Vec::new();
266
267 for source in [LineSource::Stdout, LineSource::Stderr] {
268 let segments = self.segments(source).to_vec();
269 let mut line_offset = 0;
270 for seg in &segments {
271 if let Ok(Some(mut reader)) = IndexReader::open(&seg.idx_path) {
272 let records = reader.read_range(0, seg.line_count)?;
273 for (i, rec) in records.iter().enumerate() {
274 entries.push((rec.seq, source, line_offset + i));
275 }
276 } else {
277 for i in 0..seg.line_count {
278 entries.push((u64::MAX, source, line_offset + i));
279 }
280 }
281 line_offset += seg.line_count;
282 }
283 }
284
285 entries.sort_by_key(|&(seq, _, _)| seq);
286 Ok(entries
287 .into_iter()
288 .map(|(_, src, line)| (src, line))
289 .collect())
290 }
291
292 fn read_single_line_from(segments: &[Segment], absolute_line: usize) -> io::Result<String> {
294 let mut cumulative = 0;
295 for seg in segments {
296 if absolute_line < cumulative + seg.line_count {
297 let line_in_seg = absolute_line - cumulative;
298 let lines = read_lines_from_segment(&seg.log_path, &seg.idx_path, line_in_seg, 1)?;
299 return lines.into_iter().next().ok_or_else(|| {
300 io::Error::new(io::ErrorKind::UnexpectedEof, "line not found in segment")
301 });
302 }
303 cumulative += seg.line_count;
304 }
305 Err(io::Error::new(
306 io::ErrorKind::InvalidInput,
307 format!(
308 "line {} out of range (total: {})",
309 absolute_line, cumulative
310 ),
311 ))
312 }
313
314 fn discover_segments(log_dir: &Path, process: &str, source: LineSource) -> Vec<Segment> {
316 let stream = match source {
317 LineSource::Stdout => "stdout",
318 LineSource::Stderr => "stderr",
319 };
320 let base = log_dir.join(format!("{}.{}", process, stream));
321
322 let mut rotated: Vec<(u32, Segment)> = Vec::new();
323 for n in 1u32.. {
324 let log_path = base.with_extension(format!("{}.{}", stream, n));
325 if !log_path.exists() {
326 break;
327 }
328 let idx_path = idx_path_for(&log_path);
329 let line_count = idx_line_count(&idx_path)
330 .unwrap_or_else(|| count_lines_in_file(&log_path).unwrap_or(0));
331 rotated.push((
332 n,
333 Segment {
334 log_path,
335 idx_path,
336 line_count,
337 },
338 ));
339 }
340
341 rotated.sort_by(|a, b| b.0.cmp(&a.0));
343 let mut segments: Vec<Segment> = rotated.into_iter().map(|(_, s)| s).collect();
344
345 if base.exists() {
346 let idx_path = idx_path_for(&base);
347 let line_count = idx_line_count(&idx_path)
348 .unwrap_or_else(|| count_lines_in_file(&base).unwrap_or(0));
349 segments.push(Segment {
350 log_path: base,
351 idx_path,
352 line_count,
353 });
354 }
355
356 segments
357 }
358}
359
360fn read_lines_from_segments(
362 segments: &[Segment],
363 start: usize,
364 count: usize,
365) -> io::Result<Vec<String>> {
366 let mut result = Vec::with_capacity(count);
367 let mut cumulative = 0;
368
369 for seg in segments {
370 let seg_end = cumulative + seg.line_count;
371 let window_end = start + count;
372
373 if seg_end <= start || cumulative >= window_end {
374 cumulative = seg_end;
375 continue;
376 }
377
378 let read_start = start.max(cumulative) - cumulative;
379 let read_end = window_end.min(seg_end) - cumulative;
380 let lines = read_lines_from_segment(
381 &seg.log_path,
382 &seg.idx_path,
383 read_start,
384 read_end - read_start,
385 )?;
386 result.extend(lines);
387
388 cumulative = seg_end;
389 }
390 Ok(result)
391}
392
393fn idx_line_count(idx_path: &Path) -> Option<usize> {
395 IndexReader::line_count_from_metadata(idx_path).ok()
396}
397
398fn count_lines_in_file(path: &Path) -> io::Result<usize> {
400 let file = std::fs::File::open(path)?;
401 Ok(BufReader::new(file).lines().count())
402}
403
404fn read_lines_from_segment(
407 log_path: &Path,
408 idx_path: &Path,
409 start: usize,
410 count: usize,
411) -> io::Result<Vec<String>> {
412 if count == 0 {
413 return Ok(Vec::new());
414 }
415
416 if let Ok(Some(mut idx_reader)) = IndexReader::open(idx_path) {
418 let records = idx_reader.read_range(start, count)?;
419 if records.is_empty() {
420 return Ok(Vec::new());
421 }
422 let file = std::fs::File::open(log_path)?;
423 let mut reader = BufReader::new(file);
424 reader.seek(SeekFrom::Start(records[0].byte_offset))?;
426 let mut result = Vec::with_capacity(records.len());
427 for _ in 0..records.len() {
428 let mut line = String::new();
429 reader.read_line(&mut line)?;
430 if line.ends_with('\n') {
431 line.pop();
432 }
433 result.push(line);
434 }
435 return Ok(result);
436 }
437
438 let file = std::fs::File::open(log_path)?;
440 let lines: Vec<String> = BufReader::new(file)
441 .lines()
442 .skip(start)
443 .take(count)
444 .collect::<io::Result<Vec<_>>>()?;
445 Ok(lines)
446}
447
448#[cfg(test)]
449mod tests {
450 use super::*;
451 use crate::daemon::log_index::{IndexRecord, IndexWriter};
452
453 fn create_log_with_index(dir: &Path, filename: &str, lines: &[&str], seq_base: u64) {
455 let log_path = dir.join(filename);
456 let idx_path = idx_path_for(&log_path);
457
458 let mut log_content = String::new();
459 let mut writer = IndexWriter::create(&idx_path, seq_base).unwrap();
460 let mut offset: u64 = 0;
461
462 for (i, line) in lines.iter().enumerate() {
463 writer
464 .append(IndexRecord {
465 byte_offset: offset,
466 seq: seq_base + i as u64,
467 })
468 .unwrap();
469 log_content.push_str(line);
470 log_content.push('\n');
471 offset += line.len() as u64 + 1;
472 }
473 writer.flush().unwrap();
474 std::fs::write(&log_path, log_content).unwrap();
475 }
476
477 #[test]
478 fn test_line_count_single_file() {
479 let dir = tempfile::tempdir().unwrap();
480 create_log_with_index(dir.path(), "test.stdout", &["line1", "line2", "line3"], 0);
481
482 let mut reader = DiskLogReader::new(dir.path().to_path_buf(), "test".to_string());
483 assert_eq!(reader.line_count(LineSource::Stdout), 3);
484 assert_eq!(reader.line_count(LineSource::Stderr), 0);
485 }
486
487 #[test]
488 fn test_read_lines_single_file() {
489 let dir = tempfile::tempdir().unwrap();
490 create_log_with_index(
491 dir.path(),
492 "test.stdout",
493 &["aaa", "bbb", "ccc", "ddd", "eee"],
494 0,
495 );
496
497 let mut reader = DiskLogReader::new(dir.path().to_path_buf(), "test".to_string());
498 let lines = reader.read_lines(LineSource::Stdout, 1, 3).unwrap();
499 assert_eq!(lines, vec!["bbb", "ccc", "ddd"]);
500 }
501
502 #[test]
503 fn test_read_lines_across_rotated() {
504 let dir = tempfile::tempdir().unwrap();
505 create_log_with_index(dir.path(), "test.stdout.2", &["old1", "old2"], 0);
507 create_log_with_index(dir.path(), "test.stdout.1", &["mid1", "mid2"], 2);
509 create_log_with_index(dir.path(), "test.stdout", &["new1", "new2"], 4);
511
512 let mut reader = DiskLogReader::new(dir.path().to_path_buf(), "test".to_string());
513 assert_eq!(reader.line_count(LineSource::Stdout), 6);
514
515 let lines = reader.read_lines(LineSource::Stdout, 1, 4).unwrap();
517 assert_eq!(lines, vec!["old2", "mid1", "mid2", "new1"]);
518 }
519
520 #[test]
521 fn test_read_interleaved() {
522 let dir = tempfile::tempdir().unwrap();
523 create_log_with_index(dir.path(), "test.stdout", &["out0", "out2", "out4"], 0);
525 let idx_path = idx_path_for(&dir.path().join("test.stdout"));
527 let mut writer = IndexWriter::create(&idx_path, 0).unwrap();
528 writer
529 .append(IndexRecord {
530 byte_offset: 0,
531 seq: 0,
532 })
533 .unwrap();
534 writer
535 .append(IndexRecord {
536 byte_offset: 5,
537 seq: 2,
538 })
539 .unwrap();
540 writer
541 .append(IndexRecord {
542 byte_offset: 10,
543 seq: 4,
544 })
545 .unwrap();
546 writer.flush().unwrap();
547
548 let stderr_content = "err1\nerr3\n";
550 std::fs::write(dir.path().join("test.stderr"), stderr_content).unwrap();
551 let idx_path = idx_path_for(&dir.path().join("test.stderr"));
552 let mut writer = IndexWriter::create(&idx_path, 1).unwrap();
553 writer
554 .append(IndexRecord {
555 byte_offset: 0,
556 seq: 1,
557 })
558 .unwrap();
559 writer
560 .append(IndexRecord {
561 byte_offset: 5,
562 seq: 3,
563 })
564 .unwrap();
565 writer.flush().unwrap();
566
567 let mut reader = DiskLogReader::new(dir.path().to_path_buf(), "test".to_string());
568 assert_eq!(reader.line_count_both(), 5);
569
570 let interleaved = reader.read_interleaved(0, 5).unwrap();
571 assert_eq!(interleaved.len(), 5);
572 assert_eq!(interleaved[0], (LineSource::Stdout, "out0".to_string()));
573 assert_eq!(interleaved[1], (LineSource::Stderr, "err1".to_string()));
574 assert_eq!(interleaved[2], (LineSource::Stdout, "out2".to_string()));
575 assert_eq!(interleaved[3], (LineSource::Stderr, "err3".to_string()));
576 assert_eq!(interleaved[4], (LineSource::Stdout, "out4".to_string()));
577 }
578
579 #[test]
580 fn test_fallback_no_index() {
581 let dir = tempfile::tempdir().unwrap();
582 std::fs::write(dir.path().join("test.stdout"), "aaa\nbbb\nccc\n").unwrap();
584
585 let mut reader = DiskLogReader::new(dir.path().to_path_buf(), "test".to_string());
586 assert_eq!(reader.line_count(LineSource::Stdout), 3);
587
588 let lines = reader.read_lines(LineSource::Stdout, 1, 2).unwrap();
589 assert_eq!(lines, vec!["bbb", "ccc"]);
590 }
591
592 #[test]
593 fn test_scan_matching_lines_stdout() {
594 let dir = tempfile::tempdir().unwrap();
595 create_log_with_index(
596 dir.path(),
597 "test.stdout",
598 &[
599 "hello world",
600 "goodbye world",
601 "hello again",
602 "foo bar",
603 "hello!",
604 ],
605 0,
606 );
607
608 let mut reader = DiskLogReader::new(dir.path().to_path_buf(), "test".to_string());
609 let matches = reader.scan_matching_lines("hello", StreamMode::Stdout);
610 assert_eq!(matches, vec![0, 2, 4]);
611
612 let matches = reader.scan_matching_lines("world", StreamMode::Stdout);
613 assert_eq!(matches, vec![0, 1]);
614
615 let matches = reader.scan_matching_lines("nonexistent", StreamMode::Stdout);
616 assert!(matches.is_empty());
617 }
618
619 #[test]
620 fn test_scan_matching_lines_both() {
621 let dir = tempfile::tempdir().unwrap();
622
623 let stdout_content = "out-hello\nout-world\n";
625 std::fs::write(dir.path().join("test.stdout"), stdout_content).unwrap();
626 let idx_path = idx_path_for(&dir.path().join("test.stdout"));
627 let mut writer = IndexWriter::create(&idx_path, 0).unwrap();
628 writer
629 .append(IndexRecord {
630 byte_offset: 0,
631 seq: 0,
632 })
633 .unwrap();
634 writer
635 .append(IndexRecord {
636 byte_offset: 10,
637 seq: 2,
638 })
639 .unwrap();
640 writer.flush().unwrap();
641
642 let stderr_content = "err-hello\n";
644 std::fs::write(dir.path().join("test.stderr"), stderr_content).unwrap();
645 let idx_path = idx_path_for(&dir.path().join("test.stderr"));
646 let mut writer = IndexWriter::create(&idx_path, 1).unwrap();
647 writer
648 .append(IndexRecord {
649 byte_offset: 0,
650 seq: 1,
651 })
652 .unwrap();
653 writer.flush().unwrap();
654
655 let mut reader = DiskLogReader::new(dir.path().to_path_buf(), "test".to_string());
656 let matches = reader.scan_matching_lines("hello", StreamMode::Both);
658 assert_eq!(matches, vec![0, 1]); }
660
661 #[test]
662 fn test_read_scattered_lines() {
663 let dir = tempfile::tempdir().unwrap();
664 create_log_with_index(
665 dir.path(),
666 "test.stdout",
667 &["line0", "line1", "line2", "line3", "line4"],
668 0,
669 );
670
671 let mut reader = DiskLogReader::new(dir.path().to_path_buf(), "test".to_string());
672 let result = reader.read_scattered_lines(StreamMode::Stdout, &[0, 2, 4]);
673 assert_eq!(result.len(), 3);
674 assert_eq!(result[0], (LineSource::Stdout, "line0".to_string()));
675 assert_eq!(result[1], (LineSource::Stdout, "line2".to_string()));
676 assert_eq!(result[2], (LineSource::Stdout, "line4".to_string()));
677 }
678
679 #[test]
680 fn test_empty_dir() {
681 let dir = tempfile::tempdir().unwrap();
682 let mut reader = DiskLogReader::new(dir.path().to_path_buf(), "test".to_string());
683 assert_eq!(reader.line_count(LineSource::Stdout), 0);
684 assert_eq!(reader.line_count_both(), 0);
685 }
686}