Skip to main content

agent_procs/tui/
disk_log_reader.rs

1//! Random-access line reading from disk-backed logs using sidecar index files.
2//!
3//! [`DiskLogReader`] provides windowed access to log history, allowing the TUI
4//! to scroll through the entire output history without loading everything into
5//! memory.
6
7use crate::daemon::log_index::{IndexReader, idx_path_for};
8use crate::tui::app::LineSource;
9use std::io::{self, BufRead, BufReader, Seek, SeekFrom};
10use std::path::{Path, PathBuf};
11use std::time::Instant;
12
13/// Info about a single log segment (one log file + its index).
14#[derive(Clone)]
15struct Segment {
16    log_path: PathBuf,
17    idx_path: PathBuf,
18    line_count: usize,
19}
20
21/// Cached segment list with a short TTL to avoid repeated filesystem probes.
22struct SegmentCache {
23    stdout: Vec<Segment>,
24    stderr: Vec<Segment>,
25    refreshed_at: Instant,
26}
27
28/// How long to cache segment enumerations before re-probing the filesystem.
29const SEGMENT_CACHE_TTL_MS: u128 = 500;
30
31/// Provides random-access line reading from disk log files using the sidecar
32/// `.idx` index.  Handles rotated files (`.1`, `.2`, ...) transparently.
33pub struct DiskLogReader {
34    log_dir: PathBuf,
35    process: String,
36    cache: Option<SegmentCache>,
37}
38
39impl DiskLogReader {
40    pub fn new(log_dir: PathBuf, process: String) -> Self {
41        Self {
42            log_dir,
43            process,
44            cache: None,
45        }
46    }
47
48    /// Get cached segments for a source, refreshing if stale.
49    fn segments(&mut self, source: LineSource) -> &[Segment] {
50        let stale = self
51            .cache
52            .as_ref()
53            .is_none_or(|c| c.refreshed_at.elapsed().as_millis() > SEGMENT_CACHE_TTL_MS);
54        if stale {
55            let stdout = Self::discover_segments(&self.log_dir, &self.process, LineSource::Stdout);
56            let stderr = Self::discover_segments(&self.log_dir, &self.process, LineSource::Stderr);
57            self.cache = Some(SegmentCache {
58                stdout,
59                stderr,
60                refreshed_at: Instant::now(),
61            });
62        }
63        let cache = self.cache.as_ref().unwrap();
64        match source {
65            LineSource::Stdout => &cache.stdout,
66            LineSource::Stderr => &cache.stderr,
67        }
68    }
69
70    /// Total line count for a stream across current + rotated files.
71    pub fn line_count(&mut self, source: LineSource) -> usize {
72        self.segments(source).iter().map(|s| s.line_count).sum()
73    }
74
75    /// Total line count for merged both-streams view.
76    pub fn line_count_both(&mut self) -> usize {
77        // Single cache refresh covers both streams.
78        let stdout: usize = self
79            .segments(LineSource::Stdout)
80            .iter()
81            .map(|s| s.line_count)
82            .sum();
83        let stderr: usize = self
84            .segments(LineSource::Stderr)
85            .iter()
86            .map(|s| s.line_count)
87            .sum();
88        stdout + stderr
89    }
90
91    /// Read lines `[start..start+count)` for a single stream, spanning
92    /// rotated files oldest-to-newest.
93    pub fn read_lines(
94        &mut self,
95        source: LineSource,
96        start: usize,
97        count: usize,
98    ) -> io::Result<Vec<String>> {
99        let segments = self.segments(source).to_vec();
100        read_lines_from_segments(&segments, start, count)
101    }
102
103    /// Read interleaved lines for "Both" mode, merge-sorted by sequence number.
104    pub fn read_interleaved(
105        &mut self,
106        start: usize,
107        count: usize,
108    ) -> io::Result<Vec<(LineSource, String)>> {
109        let merged = self.build_merged_index()?;
110        let end = (start + count).min(merged.len());
111        if start >= end {
112            return Ok(Vec::new());
113        }
114
115        let window = &merged[start..end];
116
117        // Batch reads: group by source, read each stream's lines in one pass,
118        // then assemble in merged order.
119        let mut stdout_needed: Vec<(usize, usize)> = Vec::new(); // (result_idx, stream_line)
120        let mut stderr_needed: Vec<(usize, usize)> = Vec::new();
121        for (i, &(source, stream_line)) in window.iter().enumerate() {
122            match source {
123                LineSource::Stdout => stdout_needed.push((i, stream_line)),
124                LineSource::Stderr => stderr_needed.push((i, stream_line)),
125            }
126        }
127
128        let mut result: Vec<(LineSource, String)> =
129            vec![(LineSource::Stdout, String::new()); end - start];
130
131        // Read stdout lines in batch
132        for &(result_idx, stream_line) in &stdout_needed {
133            let line = Self::read_single_line_from(self.segments(LineSource::Stdout), stream_line)?;
134            result[result_idx] = (LineSource::Stdout, line);
135        }
136        // Read stderr lines in batch
137        for &(result_idx, stream_line) in &stderr_needed {
138            let line = Self::read_single_line_from(self.segments(LineSource::Stderr), stream_line)?;
139            result[result_idx] = (LineSource::Stderr, line);
140        }
141
142        Ok(result)
143    }
144
145    /// Build merged index: all lines from both streams sorted by sequence number.
146    fn build_merged_index(&mut self) -> io::Result<Vec<(LineSource, usize)>> {
147        let mut entries: Vec<(u64, LineSource, usize)> = Vec::new();
148
149        for source in [LineSource::Stdout, LineSource::Stderr] {
150            let segments = self.segments(source).to_vec();
151            let mut line_offset = 0;
152            for seg in &segments {
153                if let Ok(Some(mut reader)) = IndexReader::open(&seg.idx_path) {
154                    let records = reader.read_range(0, seg.line_count)?;
155                    for (i, rec) in records.iter().enumerate() {
156                        entries.push((rec.seq, source, line_offset + i));
157                    }
158                } else {
159                    for i in 0..seg.line_count {
160                        entries.push((u64::MAX, source, line_offset + i));
161                    }
162                }
163                line_offset += seg.line_count;
164            }
165        }
166
167        entries.sort_by_key(|&(seq, _, _)| seq);
168        Ok(entries
169            .into_iter()
170            .map(|(_, src, line)| (src, line))
171            .collect())
172    }
173
174    /// Read a single line by absolute position within pre-fetched segments.
175    fn read_single_line_from(segments: &[Segment], absolute_line: usize) -> io::Result<String> {
176        let mut cumulative = 0;
177        for seg in segments {
178            if absolute_line < cumulative + seg.line_count {
179                let line_in_seg = absolute_line - cumulative;
180                let lines = read_lines_from_segment(&seg.log_path, &seg.idx_path, line_in_seg, 1)?;
181                return lines.into_iter().next().ok_or_else(|| {
182                    io::Error::new(io::ErrorKind::UnexpectedEof, "line not found in segment")
183                });
184            }
185            cumulative += seg.line_count;
186        }
187        Err(io::Error::new(
188            io::ErrorKind::InvalidInput,
189            format!(
190                "line {} out of range (total: {})",
191                absolute_line, cumulative
192            ),
193        ))
194    }
195
196    /// Probe the filesystem for log segments of a given stream, ordered oldest first.
197    fn discover_segments(log_dir: &Path, process: &str, source: LineSource) -> Vec<Segment> {
198        let stream = match source {
199            LineSource::Stdout => "stdout",
200            LineSource::Stderr => "stderr",
201        };
202        let base = log_dir.join(format!("{}.{}", process, stream));
203
204        let mut rotated: Vec<(u32, Segment)> = Vec::new();
205        for n in 1u32.. {
206            let log_path = base.with_extension(format!("{}.{}", stream, n));
207            if !log_path.exists() {
208                break;
209            }
210            let idx_path = idx_path_for(&log_path);
211            let line_count = idx_line_count(&idx_path)
212                .unwrap_or_else(|| count_lines_in_file(&log_path).unwrap_or(0));
213            rotated.push((
214                n,
215                Segment {
216                    log_path,
217                    idx_path,
218                    line_count,
219                },
220            ));
221        }
222
223        // Sort by N descending so highest-N (oldest) comes first
224        rotated.sort_by(|a, b| b.0.cmp(&a.0));
225        let mut segments: Vec<Segment> = rotated.into_iter().map(|(_, s)| s).collect();
226
227        if base.exists() {
228            let idx_path = idx_path_for(&base);
229            let line_count = idx_line_count(&idx_path)
230                .unwrap_or_else(|| count_lines_in_file(&base).unwrap_or(0));
231            segments.push(Segment {
232                log_path: base,
233                idx_path,
234                line_count,
235            });
236        }
237
238        segments
239    }
240}
241
242/// Read lines from a span of segments by absolute line range.
243fn read_lines_from_segments(
244    segments: &[Segment],
245    start: usize,
246    count: usize,
247) -> io::Result<Vec<String>> {
248    let mut result = Vec::with_capacity(count);
249    let mut cumulative = 0;
250
251    for seg in segments {
252        let seg_end = cumulative + seg.line_count;
253        let window_end = start + count;
254
255        if seg_end <= start || cumulative >= window_end {
256            cumulative = seg_end;
257            continue;
258        }
259
260        let read_start = start.max(cumulative) - cumulative;
261        let read_end = window_end.min(seg_end) - cumulative;
262        let lines = read_lines_from_segment(
263            &seg.log_path,
264            &seg.idx_path,
265            read_start,
266            read_end - read_start,
267        )?;
268        result.extend(lines);
269
270        cumulative = seg_end;
271    }
272    Ok(result)
273}
274
275/// Get line count from an index file's metadata (no content read).
276fn idx_line_count(idx_path: &Path) -> Option<usize> {
277    IndexReader::line_count_from_metadata(idx_path).ok()
278}
279
280/// Fallback: count lines by scanning the log file.
281fn count_lines_in_file(path: &Path) -> io::Result<usize> {
282    let file = std::fs::File::open(path)?;
283    Ok(BufReader::new(file).lines().count())
284}
285
286/// Read `count` lines starting at `start` from a single segment.
287/// Uses the index for seeking when available, falls back to sequential scan.
288fn read_lines_from_segment(
289    log_path: &Path,
290    idx_path: &Path,
291    start: usize,
292    count: usize,
293) -> io::Result<Vec<String>> {
294    if count == 0 {
295        return Ok(Vec::new());
296    }
297
298    // Try indexed read
299    if let Ok(Some(mut idx_reader)) = IndexReader::open(idx_path) {
300        let records = idx_reader.read_range(start, count)?;
301        if records.is_empty() {
302            return Ok(Vec::new());
303        }
304        let file = std::fs::File::open(log_path)?;
305        let mut reader = BufReader::new(file);
306        // Seek to first record and read sequentially (records are contiguous)
307        reader.seek(SeekFrom::Start(records[0].byte_offset))?;
308        let mut result = Vec::with_capacity(records.len());
309        for _ in 0..records.len() {
310            let mut line = String::new();
311            reader.read_line(&mut line)?;
312            if line.ends_with('\n') {
313                line.pop();
314            }
315            result.push(line);
316        }
317        return Ok(result);
318    }
319
320    // Fallback: sequential scan
321    let file = std::fs::File::open(log_path)?;
322    let lines: Vec<String> = BufReader::new(file)
323        .lines()
324        .skip(start)
325        .take(count)
326        .collect::<io::Result<Vec<_>>>()?;
327    Ok(lines)
328}
329
330#[cfg(test)]
331mod tests {
332    use super::*;
333    use crate::daemon::log_index::{IndexRecord, IndexWriter};
334
335    /// Helper: create a log file with the given lines and a matching index.
336    fn create_log_with_index(dir: &Path, filename: &str, lines: &[&str], seq_base: u64) {
337        let log_path = dir.join(filename);
338        let idx_path = idx_path_for(&log_path);
339
340        let mut log_content = String::new();
341        let mut writer = IndexWriter::create(&idx_path, seq_base).unwrap();
342        let mut offset: u64 = 0;
343
344        for (i, line) in lines.iter().enumerate() {
345            writer
346                .append(IndexRecord {
347                    byte_offset: offset,
348                    seq: seq_base + i as u64,
349                })
350                .unwrap();
351            log_content.push_str(line);
352            log_content.push('\n');
353            offset += line.len() as u64 + 1;
354        }
355        writer.flush().unwrap();
356        std::fs::write(&log_path, log_content).unwrap();
357    }
358
359    #[test]
360    fn test_line_count_single_file() {
361        let dir = tempfile::tempdir().unwrap();
362        create_log_with_index(dir.path(), "test.stdout", &["line1", "line2", "line3"], 0);
363
364        let mut reader = DiskLogReader::new(dir.path().to_path_buf(), "test".to_string());
365        assert_eq!(reader.line_count(LineSource::Stdout), 3);
366        assert_eq!(reader.line_count(LineSource::Stderr), 0);
367    }
368
369    #[test]
370    fn test_read_lines_single_file() {
371        let dir = tempfile::tempdir().unwrap();
372        create_log_with_index(
373            dir.path(),
374            "test.stdout",
375            &["aaa", "bbb", "ccc", "ddd", "eee"],
376            0,
377        );
378
379        let mut reader = DiskLogReader::new(dir.path().to_path_buf(), "test".to_string());
380        let lines = reader.read_lines(LineSource::Stdout, 1, 3).unwrap();
381        assert_eq!(lines, vec!["bbb", "ccc", "ddd"]);
382    }
383
384    #[test]
385    fn test_read_lines_across_rotated() {
386        let dir = tempfile::tempdir().unwrap();
387        // Rotated file .2 (oldest)
388        create_log_with_index(dir.path(), "test.stdout.2", &["old1", "old2"], 0);
389        // Rotated file .1
390        create_log_with_index(dir.path(), "test.stdout.1", &["mid1", "mid2"], 2);
391        // Current file (newest)
392        create_log_with_index(dir.path(), "test.stdout", &["new1", "new2"], 4);
393
394        let mut reader = DiskLogReader::new(dir.path().to_path_buf(), "test".to_string());
395        assert_eq!(reader.line_count(LineSource::Stdout), 6);
396
397        // Read across segments
398        let lines = reader.read_lines(LineSource::Stdout, 1, 4).unwrap();
399        assert_eq!(lines, vec!["old2", "mid1", "mid2", "new1"]);
400    }
401
402    #[test]
403    fn test_read_interleaved() {
404        let dir = tempfile::tempdir().unwrap();
405        // Stdout: seq 0, 2, 4
406        create_log_with_index(dir.path(), "test.stdout", &["out0", "out2", "out4"], 0);
407        // Manually set seq numbers to 0, 2, 4
408        let idx_path = idx_path_for(&dir.path().join("test.stdout"));
409        let mut writer = IndexWriter::create(&idx_path, 0).unwrap();
410        writer
411            .append(IndexRecord {
412                byte_offset: 0,
413                seq: 0,
414            })
415            .unwrap();
416        writer
417            .append(IndexRecord {
418                byte_offset: 5,
419                seq: 2,
420            })
421            .unwrap();
422        writer
423            .append(IndexRecord {
424                byte_offset: 10,
425                seq: 4,
426            })
427            .unwrap();
428        writer.flush().unwrap();
429
430        // Stderr: seq 1, 3
431        let stderr_content = "err1\nerr3\n";
432        std::fs::write(dir.path().join("test.stderr"), stderr_content).unwrap();
433        let idx_path = idx_path_for(&dir.path().join("test.stderr"));
434        let mut writer = IndexWriter::create(&idx_path, 1).unwrap();
435        writer
436            .append(IndexRecord {
437                byte_offset: 0,
438                seq: 1,
439            })
440            .unwrap();
441        writer
442            .append(IndexRecord {
443                byte_offset: 5,
444                seq: 3,
445            })
446            .unwrap();
447        writer.flush().unwrap();
448
449        let mut reader = DiskLogReader::new(dir.path().to_path_buf(), "test".to_string());
450        assert_eq!(reader.line_count_both(), 5);
451
452        let interleaved = reader.read_interleaved(0, 5).unwrap();
453        assert_eq!(interleaved.len(), 5);
454        assert_eq!(interleaved[0], (LineSource::Stdout, "out0".to_string()));
455        assert_eq!(interleaved[1], (LineSource::Stderr, "err1".to_string()));
456        assert_eq!(interleaved[2], (LineSource::Stdout, "out2".to_string()));
457        assert_eq!(interleaved[3], (LineSource::Stderr, "err3".to_string()));
458        assert_eq!(interleaved[4], (LineSource::Stdout, "out4".to_string()));
459    }
460
461    #[test]
462    fn test_fallback_no_index() {
463        let dir = tempfile::tempdir().unwrap();
464        // Log file without index
465        std::fs::write(dir.path().join("test.stdout"), "aaa\nbbb\nccc\n").unwrap();
466
467        let mut reader = DiskLogReader::new(dir.path().to_path_buf(), "test".to_string());
468        assert_eq!(reader.line_count(LineSource::Stdout), 3);
469
470        let lines = reader.read_lines(LineSource::Stdout, 1, 2).unwrap();
471        assert_eq!(lines, vec!["bbb", "ccc"]);
472    }
473
474    #[test]
475    fn test_empty_dir() {
476        let dir = tempfile::tempdir().unwrap();
477        let mut reader = DiskLogReader::new(dir.path().to_path_buf(), "test".to_string());
478        assert_eq!(reader.line_count(LineSource::Stdout), 0);
479        assert_eq!(reader.line_count_both(), 0);
480    }
481}