Skip to main content

agent_procs/tui/
disk_log_reader.rs

1//! Random-access line reading from disk-backed logs using sidecar index files.
2//!
3//! [`DiskLogReader`] provides windowed access to log history, allowing the TUI
4//! to scroll through the entire output history without loading everything into
5//! memory.
6
7use crate::daemon::log_index::{IndexReader, idx_path_for};
8use crate::tui::app::LineSource;
9use std::io::{self, BufRead, BufReader, Seek, SeekFrom};
10use std::path::{Path, PathBuf};
11use std::time::Instant;
12
13/// Info about a single log segment (one log file + its index).
14#[derive(Clone)]
15struct Segment {
16    log_path: PathBuf,
17    idx_path: PathBuf,
18    line_count: usize,
19}
20
21/// Cached segment list with a short TTL to avoid repeated filesystem probes.
22struct SegmentCache {
23    stdout: Vec<Segment>,
24    stderr: Vec<Segment>,
25    refreshed_at: Instant,
26}
27
28/// How long to cache segment enumerations before re-probing the filesystem.
29const SEGMENT_CACHE_TTL_MS: u128 = 500;
30
31use crate::tui::app::StreamMode;
32
33/// Provides random-access line reading from disk log files using the sidecar
34/// `.idx` index.  Handles rotated files (`.1`, `.2`, ...) transparently.
35pub struct DiskLogReader {
36    log_dir: PathBuf,
37    process: String,
38    cache: Option<SegmentCache>,
39}
40
41impl DiskLogReader {
42    pub fn new(log_dir: PathBuf, process: String) -> Self {
43        Self {
44            log_dir,
45            process,
46            cache: None,
47        }
48    }
49
50    /// Get cached segments for a source, refreshing if stale.
51    fn segments(&mut self, source: LineSource) -> &[Segment] {
52        let stale = self
53            .cache
54            .as_ref()
55            .is_none_or(|c| c.refreshed_at.elapsed().as_millis() > SEGMENT_CACHE_TTL_MS);
56        if stale {
57            let stdout = Self::discover_segments(&self.log_dir, &self.process, LineSource::Stdout);
58            let stderr = Self::discover_segments(&self.log_dir, &self.process, LineSource::Stderr);
59            self.cache = Some(SegmentCache {
60                stdout,
61                stderr,
62                refreshed_at: Instant::now(),
63            });
64        }
65        let cache = self.cache.as_ref().unwrap();
66        match source {
67            LineSource::Stdout => &cache.stdout,
68            LineSource::Stderr => &cache.stderr,
69        }
70    }
71
72    /// Total line count for a stream across current + rotated files.
73    pub fn line_count(&mut self, source: LineSource) -> usize {
74        self.segments(source).iter().map(|s| s.line_count).sum()
75    }
76
77    /// Total line count for merged both-streams view.
78    pub fn line_count_both(&mut self) -> usize {
79        // Single cache refresh covers both streams.
80        let stdout: usize = self
81            .segments(LineSource::Stdout)
82            .iter()
83            .map(|s| s.line_count)
84            .sum();
85        let stderr: usize = self
86            .segments(LineSource::Stderr)
87            .iter()
88            .map(|s| s.line_count)
89            .sum();
90        stdout + stderr
91    }
92
93    /// Read lines `[start..start+count)` for a single stream, spanning
94    /// rotated files oldest-to-newest.
95    pub fn read_lines(
96        &mut self,
97        source: LineSource,
98        start: usize,
99        count: usize,
100    ) -> io::Result<Vec<String>> {
101        let segments = self.segments(source).to_vec();
102        read_lines_from_segments(&segments, start, count)
103    }
104
105    /// Read interleaved lines for "Both" mode, merge-sorted by sequence number.
106    pub fn read_interleaved(
107        &mut self,
108        start: usize,
109        count: usize,
110    ) -> io::Result<Vec<(LineSource, String)>> {
111        let merged = self.build_merged_index()?;
112        let end = (start + count).min(merged.len());
113        if start >= end {
114            return Ok(Vec::new());
115        }
116
117        let window = &merged[start..end];
118
119        // Batch reads: group by source, read each stream's lines in one pass,
120        // then assemble in merged order.
121        let mut stdout_needed: Vec<(usize, usize)> = Vec::new(); // (result_idx, stream_line)
122        let mut stderr_needed: Vec<(usize, usize)> = Vec::new();
123        for (i, &(source, stream_line)) in window.iter().enumerate() {
124            match source {
125                LineSource::Stdout => stdout_needed.push((i, stream_line)),
126                LineSource::Stderr => stderr_needed.push((i, stream_line)),
127            }
128        }
129
130        let mut result: Vec<(LineSource, String)> =
131            vec![(LineSource::Stdout, String::new()); end - start];
132
133        // Read stdout lines in batch
134        for &(result_idx, stream_line) in &stdout_needed {
135            let line = Self::read_single_line_from(self.segments(LineSource::Stdout), stream_line)?;
136            result[result_idx] = (LineSource::Stdout, line);
137        }
138        // Read stderr lines in batch
139        for &(result_idx, stream_line) in &stderr_needed {
140            let line = Self::read_single_line_from(self.segments(LineSource::Stderr), stream_line)?;
141            result[result_idx] = (LineSource::Stderr, line);
142        }
143
144        Ok(result)
145    }
146
147    /// Scan all lines in the given stream mode and return indices of lines
148    /// matching the filter substring. The returned indices are positions within
149    /// the mode's address space (single-stream or interleaved).
150    pub fn scan_matching_lines(&mut self, filter: &str, mode: StreamMode) -> Vec<usize> {
151        self.scan_matching_lines_from(filter, mode, 0)
152    }
153
154    /// Scan lines starting from `start_from` in the given stream mode.
155    /// Returns indices of matching lines and can be used for incremental updates.
156    pub fn scan_matching_lines_from(
157        &mut self,
158        filter: &str,
159        mode: StreamMode,
160        start_from: usize,
161    ) -> Vec<usize> {
162        match mode {
163            StreamMode::Stdout => self.scan_single_stream(filter, LineSource::Stdout, start_from),
164            StreamMode::Stderr => self.scan_single_stream(filter, LineSource::Stderr, start_from),
165            StreamMode::Both => self.scan_interleaved(filter, start_from),
166        }
167    }
168
169    fn scan_single_stream(
170        &mut self,
171        filter: &str,
172        source: LineSource,
173        start_from: usize,
174    ) -> Vec<usize> {
175        let total = self.line_count(source);
176        if total <= start_from {
177            return Vec::new();
178        }
179        let mut matching = Vec::new();
180        const CHUNK: usize = 1000;
181        let mut offset = start_from;
182        while offset < total {
183            let count = CHUNK.min(total - offset);
184            if let Ok(lines) = self.read_lines(source, offset, count) {
185                for (i, line) in lines.iter().enumerate() {
186                    if line.contains(filter) {
187                        matching.push(offset + i);
188                    }
189                }
190            }
191            offset += count;
192        }
193        matching
194    }
195
196    fn scan_interleaved(&mut self, filter: &str, start_from: usize) -> Vec<usize> {
197        let total = self.line_count_both();
198        if total <= start_from {
199            return Vec::new();
200        }
201        let mut matching = Vec::new();
202        const CHUNK: usize = 1000;
203        let mut offset = start_from;
204        while offset < total {
205            let count = CHUNK.min(total - offset);
206            if let Ok(lines) = self.read_interleaved(offset, count) {
207                for (i, (_, line)) in lines.iter().enumerate() {
208                    if line.contains(filter) {
209                        matching.push(offset + i);
210                    }
211                }
212            }
213            offset += count;
214        }
215        matching
216    }
217
218    /// Read specific non-contiguous lines by index within the given mode's
219    /// address space. Used by filtered scrollback to render only matching lines.
220    pub fn read_scattered_lines(
221        &mut self,
222        mode: StreamMode,
223        line_numbers: &[usize],
224    ) -> Vec<(LineSource, String)> {
225        if line_numbers.is_empty() {
226            return Vec::new();
227        }
228        match mode {
229            StreamMode::Stdout | StreamMode::Stderr => {
230                let source = if mode == StreamMode::Stdout {
231                    LineSource::Stdout
232                } else {
233                    LineSource::Stderr
234                };
235                let mut result = Vec::with_capacity(line_numbers.len());
236                for &ln in line_numbers {
237                    if let Ok(lines) = self.read_lines(source, ln, 1)
238                        && let Some(line) = lines.into_iter().next()
239                    {
240                        result.push((source, line));
241                        continue;
242                    }
243                    result.push((source, String::new()));
244                }
245                result
246            }
247            StreamMode::Both => {
248                let mut result = Vec::with_capacity(line_numbers.len());
249                for &ln in line_numbers {
250                    if let Ok(lines) = self.read_interleaved(ln, 1)
251                        && let Some(entry) = lines.into_iter().next()
252                    {
253                        result.push(entry);
254                        continue;
255                    }
256                    result.push((LineSource::Stdout, String::new()));
257                }
258                result
259            }
260        }
261    }
262
263    /// Build merged index: all lines from both streams sorted by sequence number.
264    fn build_merged_index(&mut self) -> io::Result<Vec<(LineSource, usize)>> {
265        let mut entries: Vec<(u64, LineSource, usize)> = Vec::new();
266
267        for source in [LineSource::Stdout, LineSource::Stderr] {
268            let segments = self.segments(source).to_vec();
269            let mut line_offset = 0;
270            for seg in &segments {
271                if let Ok(Some(mut reader)) = IndexReader::open(&seg.idx_path) {
272                    let records = reader.read_range(0, seg.line_count)?;
273                    for (i, rec) in records.iter().enumerate() {
274                        entries.push((rec.seq, source, line_offset + i));
275                    }
276                } else {
277                    for i in 0..seg.line_count {
278                        entries.push((u64::MAX, source, line_offset + i));
279                    }
280                }
281                line_offset += seg.line_count;
282            }
283        }
284
285        entries.sort_by_key(|&(seq, _, _)| seq);
286        Ok(entries
287            .into_iter()
288            .map(|(_, src, line)| (src, line))
289            .collect())
290    }
291
292    /// Read a single line by absolute position within pre-fetched segments.
293    fn read_single_line_from(segments: &[Segment], absolute_line: usize) -> io::Result<String> {
294        let mut cumulative = 0;
295        for seg in segments {
296            if absolute_line < cumulative + seg.line_count {
297                let line_in_seg = absolute_line - cumulative;
298                let lines = read_lines_from_segment(&seg.log_path, &seg.idx_path, line_in_seg, 1)?;
299                return lines.into_iter().next().ok_or_else(|| {
300                    io::Error::new(io::ErrorKind::UnexpectedEof, "line not found in segment")
301                });
302            }
303            cumulative += seg.line_count;
304        }
305        Err(io::Error::new(
306            io::ErrorKind::InvalidInput,
307            format!(
308                "line {} out of range (total: {})",
309                absolute_line, cumulative
310            ),
311        ))
312    }
313
314    /// Probe the filesystem for log segments of a given stream, ordered oldest first.
315    fn discover_segments(log_dir: &Path, process: &str, source: LineSource) -> Vec<Segment> {
316        let stream = match source {
317            LineSource::Stdout => "stdout",
318            LineSource::Stderr => "stderr",
319        };
320        let base = log_dir.join(format!("{}.{}", process, stream));
321
322        let mut rotated: Vec<(u32, Segment)> = Vec::new();
323        for n in 1u32.. {
324            let log_path = base.with_extension(format!("{}.{}", stream, n));
325            if !log_path.exists() {
326                break;
327            }
328            let idx_path = idx_path_for(&log_path);
329            let line_count = idx_line_count(&idx_path)
330                .unwrap_or_else(|| count_lines_in_file(&log_path).unwrap_or(0));
331            rotated.push((
332                n,
333                Segment {
334                    log_path,
335                    idx_path,
336                    line_count,
337                },
338            ));
339        }
340
341        // Sort by N descending so highest-N (oldest) comes first
342        rotated.sort_by(|a, b| b.0.cmp(&a.0));
343        let mut segments: Vec<Segment> = rotated.into_iter().map(|(_, s)| s).collect();
344
345        if base.exists() {
346            let idx_path = idx_path_for(&base);
347            let line_count = idx_line_count(&idx_path)
348                .unwrap_or_else(|| count_lines_in_file(&base).unwrap_or(0));
349            segments.push(Segment {
350                log_path: base,
351                idx_path,
352                line_count,
353            });
354        }
355
356        segments
357    }
358}
359
360/// Read lines from a span of segments by absolute line range.
361fn read_lines_from_segments(
362    segments: &[Segment],
363    start: usize,
364    count: usize,
365) -> io::Result<Vec<String>> {
366    let mut result = Vec::with_capacity(count);
367    let mut cumulative = 0;
368
369    for seg in segments {
370        let seg_end = cumulative + seg.line_count;
371        let window_end = start + count;
372
373        if seg_end <= start || cumulative >= window_end {
374            cumulative = seg_end;
375            continue;
376        }
377
378        let read_start = start.max(cumulative) - cumulative;
379        let read_end = window_end.min(seg_end) - cumulative;
380        let lines = read_lines_from_segment(
381            &seg.log_path,
382            &seg.idx_path,
383            read_start,
384            read_end - read_start,
385        )?;
386        result.extend(lines);
387
388        cumulative = seg_end;
389    }
390    Ok(result)
391}
392
393/// Get line count from an index file's metadata (no content read).
394fn idx_line_count(idx_path: &Path) -> Option<usize> {
395    IndexReader::line_count_from_metadata(idx_path).ok()
396}
397
398/// Fallback: count lines by scanning the log file.
399fn count_lines_in_file(path: &Path) -> io::Result<usize> {
400    let file = std::fs::File::open(path)?;
401    Ok(BufReader::new(file).lines().count())
402}
403
404/// Read `count` lines starting at `start` from a single segment.
405/// Uses the index for seeking when available, falls back to sequential scan.
406fn read_lines_from_segment(
407    log_path: &Path,
408    idx_path: &Path,
409    start: usize,
410    count: usize,
411) -> io::Result<Vec<String>> {
412    if count == 0 {
413        return Ok(Vec::new());
414    }
415
416    // Try indexed read
417    if let Ok(Some(mut idx_reader)) = IndexReader::open(idx_path) {
418        let records = idx_reader.read_range(start, count)?;
419        if records.is_empty() {
420            return Ok(Vec::new());
421        }
422        let file = std::fs::File::open(log_path)?;
423        let mut reader = BufReader::new(file);
424        // Seek to first record and read sequentially (records are contiguous)
425        reader.seek(SeekFrom::Start(records[0].byte_offset))?;
426        let mut result = Vec::with_capacity(records.len());
427        for _ in 0..records.len() {
428            let mut line = String::new();
429            reader.read_line(&mut line)?;
430            if line.ends_with('\n') {
431                line.pop();
432            }
433            result.push(line);
434        }
435        return Ok(result);
436    }
437
438    // Fallback: sequential scan
439    let file = std::fs::File::open(log_path)?;
440    let lines: Vec<String> = BufReader::new(file)
441        .lines()
442        .skip(start)
443        .take(count)
444        .collect::<io::Result<Vec<_>>>()?;
445    Ok(lines)
446}
447
448#[cfg(test)]
449mod tests {
450    use super::*;
451    use crate::daemon::log_index::{IndexRecord, IndexWriter};
452
453    /// Helper: create a log file with the given lines and a matching index.
454    fn create_log_with_index(dir: &Path, filename: &str, lines: &[&str], seq_base: u64) {
455        let log_path = dir.join(filename);
456        let idx_path = idx_path_for(&log_path);
457
458        let mut log_content = String::new();
459        let mut writer = IndexWriter::create(&idx_path, seq_base).unwrap();
460        let mut offset: u64 = 0;
461
462        for (i, line) in lines.iter().enumerate() {
463            writer
464                .append(IndexRecord {
465                    byte_offset: offset,
466                    seq: seq_base + i as u64,
467                })
468                .unwrap();
469            log_content.push_str(line);
470            log_content.push('\n');
471            offset += line.len() as u64 + 1;
472        }
473        writer.flush().unwrap();
474        std::fs::write(&log_path, log_content).unwrap();
475    }
476
477    #[test]
478    fn test_line_count_single_file() {
479        let dir = tempfile::tempdir().unwrap();
480        create_log_with_index(dir.path(), "test.stdout", &["line1", "line2", "line3"], 0);
481
482        let mut reader = DiskLogReader::new(dir.path().to_path_buf(), "test".to_string());
483        assert_eq!(reader.line_count(LineSource::Stdout), 3);
484        assert_eq!(reader.line_count(LineSource::Stderr), 0);
485    }
486
487    #[test]
488    fn test_read_lines_single_file() {
489        let dir = tempfile::tempdir().unwrap();
490        create_log_with_index(
491            dir.path(),
492            "test.stdout",
493            &["aaa", "bbb", "ccc", "ddd", "eee"],
494            0,
495        );
496
497        let mut reader = DiskLogReader::new(dir.path().to_path_buf(), "test".to_string());
498        let lines = reader.read_lines(LineSource::Stdout, 1, 3).unwrap();
499        assert_eq!(lines, vec!["bbb", "ccc", "ddd"]);
500    }
501
502    #[test]
503    fn test_read_lines_across_rotated() {
504        let dir = tempfile::tempdir().unwrap();
505        // Rotated file .2 (oldest)
506        create_log_with_index(dir.path(), "test.stdout.2", &["old1", "old2"], 0);
507        // Rotated file .1
508        create_log_with_index(dir.path(), "test.stdout.1", &["mid1", "mid2"], 2);
509        // Current file (newest)
510        create_log_with_index(dir.path(), "test.stdout", &["new1", "new2"], 4);
511
512        let mut reader = DiskLogReader::new(dir.path().to_path_buf(), "test".to_string());
513        assert_eq!(reader.line_count(LineSource::Stdout), 6);
514
515        // Read across segments
516        let lines = reader.read_lines(LineSource::Stdout, 1, 4).unwrap();
517        assert_eq!(lines, vec!["old2", "mid1", "mid2", "new1"]);
518    }
519
520    #[test]
521    fn test_read_interleaved() {
522        let dir = tempfile::tempdir().unwrap();
523        // Stdout: seq 0, 2, 4
524        create_log_with_index(dir.path(), "test.stdout", &["out0", "out2", "out4"], 0);
525        // Manually set seq numbers to 0, 2, 4
526        let idx_path = idx_path_for(&dir.path().join("test.stdout"));
527        let mut writer = IndexWriter::create(&idx_path, 0).unwrap();
528        writer
529            .append(IndexRecord {
530                byte_offset: 0,
531                seq: 0,
532            })
533            .unwrap();
534        writer
535            .append(IndexRecord {
536                byte_offset: 5,
537                seq: 2,
538            })
539            .unwrap();
540        writer
541            .append(IndexRecord {
542                byte_offset: 10,
543                seq: 4,
544            })
545            .unwrap();
546        writer.flush().unwrap();
547
548        // Stderr: seq 1, 3
549        let stderr_content = "err1\nerr3\n";
550        std::fs::write(dir.path().join("test.stderr"), stderr_content).unwrap();
551        let idx_path = idx_path_for(&dir.path().join("test.stderr"));
552        let mut writer = IndexWriter::create(&idx_path, 1).unwrap();
553        writer
554            .append(IndexRecord {
555                byte_offset: 0,
556                seq: 1,
557            })
558            .unwrap();
559        writer
560            .append(IndexRecord {
561                byte_offset: 5,
562                seq: 3,
563            })
564            .unwrap();
565        writer.flush().unwrap();
566
567        let mut reader = DiskLogReader::new(dir.path().to_path_buf(), "test".to_string());
568        assert_eq!(reader.line_count_both(), 5);
569
570        let interleaved = reader.read_interleaved(0, 5).unwrap();
571        assert_eq!(interleaved.len(), 5);
572        assert_eq!(interleaved[0], (LineSource::Stdout, "out0".to_string()));
573        assert_eq!(interleaved[1], (LineSource::Stderr, "err1".to_string()));
574        assert_eq!(interleaved[2], (LineSource::Stdout, "out2".to_string()));
575        assert_eq!(interleaved[3], (LineSource::Stderr, "err3".to_string()));
576        assert_eq!(interleaved[4], (LineSource::Stdout, "out4".to_string()));
577    }
578
579    #[test]
580    fn test_fallback_no_index() {
581        let dir = tempfile::tempdir().unwrap();
582        // Log file without index
583        std::fs::write(dir.path().join("test.stdout"), "aaa\nbbb\nccc\n").unwrap();
584
585        let mut reader = DiskLogReader::new(dir.path().to_path_buf(), "test".to_string());
586        assert_eq!(reader.line_count(LineSource::Stdout), 3);
587
588        let lines = reader.read_lines(LineSource::Stdout, 1, 2).unwrap();
589        assert_eq!(lines, vec!["bbb", "ccc"]);
590    }
591
592    #[test]
593    fn test_scan_matching_lines_stdout() {
594        let dir = tempfile::tempdir().unwrap();
595        create_log_with_index(
596            dir.path(),
597            "test.stdout",
598            &[
599                "hello world",
600                "goodbye world",
601                "hello again",
602                "foo bar",
603                "hello!",
604            ],
605            0,
606        );
607
608        let mut reader = DiskLogReader::new(dir.path().to_path_buf(), "test".to_string());
609        let matches = reader.scan_matching_lines("hello", StreamMode::Stdout);
610        assert_eq!(matches, vec![0, 2, 4]);
611
612        let matches = reader.scan_matching_lines("world", StreamMode::Stdout);
613        assert_eq!(matches, vec![0, 1]);
614
615        let matches = reader.scan_matching_lines("nonexistent", StreamMode::Stdout);
616        assert!(matches.is_empty());
617    }
618
619    #[test]
620    fn test_scan_matching_lines_both() {
621        let dir = tempfile::tempdir().unwrap();
622
623        // stdout: seq 0, 2
624        let stdout_content = "out-hello\nout-world\n";
625        std::fs::write(dir.path().join("test.stdout"), stdout_content).unwrap();
626        let idx_path = idx_path_for(&dir.path().join("test.stdout"));
627        let mut writer = IndexWriter::create(&idx_path, 0).unwrap();
628        writer
629            .append(IndexRecord {
630                byte_offset: 0,
631                seq: 0,
632            })
633            .unwrap();
634        writer
635            .append(IndexRecord {
636                byte_offset: 10,
637                seq: 2,
638            })
639            .unwrap();
640        writer.flush().unwrap();
641
642        // stderr: seq 1
643        let stderr_content = "err-hello\n";
644        std::fs::write(dir.path().join("test.stderr"), stderr_content).unwrap();
645        let idx_path = idx_path_for(&dir.path().join("test.stderr"));
646        let mut writer = IndexWriter::create(&idx_path, 1).unwrap();
647        writer
648            .append(IndexRecord {
649                byte_offset: 0,
650                seq: 1,
651            })
652            .unwrap();
653        writer.flush().unwrap();
654
655        let mut reader = DiskLogReader::new(dir.path().to_path_buf(), "test".to_string());
656        // Interleaved: [out-hello(0), err-hello(1), out-world(2)]
657        let matches = reader.scan_matching_lines("hello", StreamMode::Both);
658        assert_eq!(matches, vec![0, 1]); // indices in interleaved space
659    }
660
661    #[test]
662    fn test_read_scattered_lines() {
663        let dir = tempfile::tempdir().unwrap();
664        create_log_with_index(
665            dir.path(),
666            "test.stdout",
667            &["line0", "line1", "line2", "line3", "line4"],
668            0,
669        );
670
671        let mut reader = DiskLogReader::new(dir.path().to_path_buf(), "test".to_string());
672        let result = reader.read_scattered_lines(StreamMode::Stdout, &[0, 2, 4]);
673        assert_eq!(result.len(), 3);
674        assert_eq!(result[0], (LineSource::Stdout, "line0".to_string()));
675        assert_eq!(result[1], (LineSource::Stdout, "line2".to_string()));
676        assert_eq!(result[2], (LineSource::Stdout, "line4".to_string()));
677    }
678
679    #[test]
680    fn test_empty_dir() {
681        let dir = tempfile::tempdir().unwrap();
682        let mut reader = DiskLogReader::new(dir.path().to_path_buf(), "test".to_string());
683        assert_eq!(reader.line_count(LineSource::Stdout), 0);
684        assert_eq!(reader.line_count_both(), 0);
685    }
686}