Skip to main content

agent_procs/tui/
app.rs

1use crate::protocol::{ProcessInfo, Stream};
2use crate::tui::disk_log_reader::DiskLogReader;
3use std::collections::{HashMap, VecDeque};
4
5const MAX_BUFFER_LINES: usize = 10_000;
6
7#[derive(Debug, Clone, Copy, PartialEq)]
8pub enum StreamMode {
9    Stdout,
10    Stderr,
11    Both,
12}
13
14#[derive(Debug, Clone, Copy, PartialEq)]
15pub enum LineSource {
16    Stdout,
17    Stderr,
18}
19
20/// Single ring buffer storing all output with source tags.
21/// Stdout/stderr views are filtered from the same data — no duplication.
22pub struct OutputBuffer {
23    lines: VecDeque<(LineSource, String)>,
24    max_lines: usize,
25    stdout_count: usize,
26    stderr_count: usize,
27}
28
29impl OutputBuffer {
30    pub fn new(max_lines: usize) -> Self {
31        Self {
32            lines: VecDeque::with_capacity(max_lines),
33            max_lines,
34            stdout_count: 0,
35            stderr_count: 0,
36        }
37    }
38
39    pub fn push(&mut self, source: LineSource, line: String) {
40        if self.lines.len() == self.max_lines
41            && let Some((evicted, _)) = self.lines.pop_front()
42        {
43            match evicted {
44                LineSource::Stdout => self.stdout_count -= 1,
45                LineSource::Stderr => self.stderr_count -= 1,
46            }
47        }
48        match source {
49            LineSource::Stdout => self.stdout_count += 1,
50            LineSource::Stderr => self.stderr_count += 1,
51        }
52        self.lines.push_back((source, line));
53    }
54
55    /// O(1) count of total lines.
56    pub fn len(&self) -> usize {
57        self.lines.len()
58    }
59
60    pub fn is_empty(&self) -> bool {
61        self.lines.is_empty()
62    }
63
64    /// O(1) count of stdout lines.
65    pub fn stdout_count(&self) -> usize {
66        self.stdout_count
67    }
68
69    /// O(1) count of stderr lines.
70    pub fn stderr_count(&self) -> usize {
71        self.stderr_count
72    }
73
74    pub fn stdout_lines(&self) -> impl Iterator<Item = &str> {
75        self.lines
76            .iter()
77            .filter(|(src, _)| *src == LineSource::Stdout)
78            .map(|(_, s)| s.as_str())
79    }
80
81    pub fn stderr_lines(&self) -> impl Iterator<Item = &str> {
82        self.lines
83            .iter()
84            .filter(|(src, _)| *src == LineSource::Stderr)
85            .map(|(_, s)| s.as_str())
86    }
87
88    pub fn all_lines(&self) -> impl Iterator<Item = (LineSource, &str)> {
89        self.lines.iter().map(|(src, s)| (*src, s.as_str()))
90    }
91}
92
93/// Input mode for the TUI.
94#[derive(Debug, Clone, Copy, PartialEq)]
95pub enum InputMode {
96    /// Normal keybinding mode.
97    Normal,
98    /// Typing a filter pattern.
99    FilterInput,
100}
101
102pub struct App {
103    pub processes: Vec<ProcessInfo>,
104    pub selected: usize,
105    pub buffers: HashMap<String, OutputBuffer>,
106    pub stream_mode: StreamMode,
107    pub paused: bool,
108    pub scroll_offsets: HashMap<String, usize>,
109    pub running: bool,
110    pub stop_all_on_quit: bool,
111    pub input_mode: InputMode,
112    /// In-progress filter text while the user is typing.
113    pub filter_buf: String,
114    /// Active filter applied to output lines. `None` means no filter.
115    pub filter: Option<String>,
116    /// Cached visible height of the output pane (set during render).
117    pub visible_height: usize,
118    /// Disk-backed log readers for each process.
119    pub disk_readers: HashMap<String, DiskLogReader>,
120}
121
122impl Default for App {
123    fn default() -> Self {
124        Self::new()
125    }
126}
127
128impl App {
129    pub fn new() -> Self {
130        Self {
131            processes: Vec::new(),
132            selected: 0,
133            buffers: HashMap::new(),
134            stream_mode: StreamMode::Stdout,
135            paused: false,
136            scroll_offsets: HashMap::new(),
137            running: true,
138            stop_all_on_quit: false,
139            input_mode: InputMode::Normal,
140            filter_buf: String::new(),
141            filter: None,
142            visible_height: 20,
143            disk_readers: HashMap::new(),
144        }
145    }
146
147    pub fn update_processes(&mut self, processes: Vec<ProcessInfo>) {
148        self.processes = processes;
149        if self.selected >= self.processes.len() && !self.processes.is_empty() {
150            self.selected = self.processes.len() - 1;
151        }
152    }
153
154    pub fn selected_name(&self) -> Option<&str> {
155        self.processes.get(self.selected).map(|p| p.name.as_str())
156    }
157
158    pub fn select_next(&mut self) {
159        if !self.processes.is_empty() {
160            self.selected = (self.selected + 1) % self.processes.len();
161        }
162    }
163
164    pub fn select_prev(&mut self) {
165        if !self.processes.is_empty() {
166            self.selected = if self.selected == 0 {
167                self.processes.len() - 1
168            } else {
169                self.selected - 1
170            };
171        }
172    }
173
174    pub fn cycle_stream_mode(&mut self) {
175        self.stream_mode = match self.stream_mode {
176            StreamMode::Stdout => StreamMode::Stderr,
177            StreamMode::Stderr => StreamMode::Both,
178            StreamMode::Both => StreamMode::Stdout,
179        };
180    }
181
182    pub fn toggle_pause(&mut self) {
183        self.paused = !self.paused;
184        if !self.paused {
185            // Reset scroll offset to bottom on unpause
186            if let Some(name) = self.processes.get(self.selected).map(|p| p.name.clone()) {
187                self.scroll_offsets.remove(&name);
188            }
189        }
190    }
191
192    /// Scroll up by the given number of lines. Auto-pauses if not already paused.
193    /// Clamps to the maximum scrollable range so overshooting the top is impossible.
194    pub fn scroll_up_by(&mut self, lines: usize) {
195        if !self.paused {
196            self.paused = true;
197        }
198        if let Some(name) = self.selected_name().map(str::to_string) {
199            let max_offset = self
200                .line_count_for(&name)
201                .saturating_sub(self.visible_height);
202            let offset = self.scroll_offsets.entry(name).or_insert(0);
203            *offset = offset.saturating_add(lines).min(max_offset);
204        }
205    }
206
207    /// Scroll up by half a page.
208    pub fn scroll_up(&mut self) {
209        let half_page = (self.visible_height / 2).max(1);
210        self.scroll_up_by(half_page);
211    }
212
213    /// Scroll down by the given number of lines. If we reach the bottom, unpause.
214    pub fn scroll_down_by(&mut self, lines: usize) {
215        if let Some(name) = self.selected_name().map(str::to_string) {
216            let offset = self.scroll_offsets.entry(name).or_insert(0);
217            *offset = offset.saturating_sub(lines);
218            if *offset == 0 {
219                self.paused = false;
220            }
221        }
222    }
223
224    /// Scroll down by half a page.
225    pub fn scroll_down(&mut self) {
226        let half_page = (self.visible_height / 2).max(1);
227        self.scroll_down_by(half_page);
228    }
229
230    pub fn scroll_to_top(&mut self) {
231        if !self.paused {
232            self.paused = true;
233        }
234        if let Some(name) = self.selected_name().map(str::to_string) {
235            let total = self.line_count_for(&name);
236            self.scroll_offsets.insert(name, total);
237        }
238    }
239
240    pub fn scroll_to_bottom(&mut self) {
241        if let Some(name) = self.selected_name().map(str::to_string) {
242            self.scroll_offsets.remove(&name);
243            self.paused = false;
244        }
245    }
246
247    /// Count visible lines for the selected process.
248    /// When a filter is active, uses only the hot buffer.
249    /// Otherwise, uses the disk-backed total (authoritative).
250    fn line_count_for(&mut self, name: &str) -> usize {
251        if self.filter.is_some() {
252            return self.hot_line_count(name, self.filter.as_deref());
253        }
254        self.total_line_count(name)
255    }
256
257    /// Total line count combining disk history and hot buffer.
258    /// Uses disk as authoritative; falls back to hot buffer if larger
259    /// (e.g. right after a process restart before disk catches up).
260    fn total_line_count(&mut self, name: &str) -> usize {
261        let hot = self.hot_line_count(name, None);
262        let disk = self.disk_line_count(name);
263        disk.max(hot)
264    }
265
266    /// Disk line count for the current stream mode.
267    fn disk_line_count(&mut self, name: &str) -> usize {
268        let mode = self.stream_mode;
269        self.disk_readers.get_mut(name).map_or(0, |r| match mode {
270            StreamMode::Stdout => r.line_count(LineSource::Stdout),
271            StreamMode::Stderr => r.line_count(LineSource::Stderr),
272            StreamMode::Both => r.line_count_both(),
273        })
274    }
275
276    /// Hot buffer line count, optionally filtered by a substring pattern.
277    /// O(1) when `filter` is `None`; O(n) when filtering.
278    fn hot_line_count(&self, name: &str, filter: Option<&str>) -> usize {
279        let Some(buf) = self.buffers.get(name) else {
280            return 0;
281        };
282        if let Some(pat) = filter {
283            let matches = |line: &str| line.contains(pat);
284            return match self.stream_mode {
285                StreamMode::Stdout => buf.stdout_lines().filter(|l| matches(l)).count(),
286                StreamMode::Stderr => buf.stderr_lines().filter(|l| matches(l)).count(),
287                StreamMode::Both => buf.all_lines().filter(|(_, l)| matches(l)).count(),
288            };
289        }
290        match self.stream_mode {
291            StreamMode::Stdout => buf.stdout_count(),
292            StreamMode::Stderr => buf.stderr_count(),
293            StreamMode::Both => buf.len(),
294        }
295    }
296
297    /// Fetch exactly the visible window of lines for rendering.
298    ///
299    /// When a filter is active, returns `None` — the caller should fall back
300    /// to the old collect-all-from-hot-buffer approach.
301    pub fn visible_lines(
302        &mut self,
303        name: &str,
304        visible_height: usize,
305    ) -> Option<Vec<(LineSource, String)>> {
306        if self.filter.is_some() {
307            return None; // caller falls back to hot buffer with filter
308        }
309
310        let total = self.total_line_count(name);
311        let scroll_offset = if self.paused {
312            self.scroll_offsets.get(name).copied().unwrap_or(0)
313        } else {
314            0
315        };
316
317        let window_end = total.saturating_sub(scroll_offset);
318        let window_start = window_end.saturating_sub(visible_height);
319        let count = window_end - window_start;
320
321        if count == 0 {
322            return Some(Vec::new());
323        }
324
325        let hot_len = self.hot_line_count(name, None);
326        let disk_count = self.disk_line_count(name);
327        // Boundary: lines before this come from disk, at or after from hot buffer.
328        let disk_boundary = disk_count.saturating_sub(hot_len);
329
330        if window_start >= disk_boundary {
331            // Entire window in hot buffer
332            let hot_start = window_start - disk_boundary;
333            Some(self.hot_buffer_range(name, hot_start, count))
334        } else if window_end <= disk_boundary {
335            // Entire window on disk
336            Some(self.disk_read_range(name, window_start, count))
337        } else {
338            // Split at boundary
339            let disk_part = disk_boundary - window_start;
340            let hot_part = window_end - disk_boundary;
341            let mut lines = self.disk_read_range(name, window_start, disk_part);
342            lines.extend(self.hot_buffer_range(name, 0, hot_part));
343            Some(lines)
344        }
345    }
346
347    /// Read a range from the hot buffer (no filter).
348    fn hot_buffer_range(
349        &self,
350        name: &str,
351        start: usize,
352        count: usize,
353    ) -> Vec<(LineSource, String)> {
354        let Some(buf) = self.buffers.get(name) else {
355            return Vec::new();
356        };
357        match self.stream_mode {
358            StreamMode::Stdout => buf
359                .stdout_lines()
360                .skip(start)
361                .take(count)
362                .map(|l| (LineSource::Stdout, l.to_string()))
363                .collect(),
364            StreamMode::Stderr => buf
365                .stderr_lines()
366                .skip(start)
367                .take(count)
368                .map(|l| (LineSource::Stderr, l.to_string()))
369                .collect(),
370            StreamMode::Both => buf
371                .all_lines()
372                .skip(start)
373                .take(count)
374                .map(|(src, l)| (src, l.to_string()))
375                .collect(),
376        }
377    }
378
379    /// Read a range from the disk reader.
380    fn disk_read_range(
381        &mut self,
382        name: &str,
383        start: usize,
384        count: usize,
385    ) -> Vec<(LineSource, String)> {
386        let Some(reader) = self.disk_readers.get_mut(name) else {
387            return Vec::new();
388        };
389        match self.stream_mode {
390            StreamMode::Stdout => reader
391                .read_lines(LineSource::Stdout, start, count)
392                .unwrap_or_default()
393                .into_iter()
394                .map(|l| (LineSource::Stdout, l))
395                .collect(),
396            StreamMode::Stderr => reader
397                .read_lines(LineSource::Stderr, start, count)
398                .unwrap_or_default()
399                .into_iter()
400                .map(|l| (LineSource::Stderr, l))
401                .collect(),
402            StreamMode::Both => reader.read_interleaved(start, count).unwrap_or_default(),
403        }
404    }
405
406    pub fn start_filter(&mut self) {
407        self.input_mode = InputMode::FilterInput;
408        self.filter_buf = self.filter.clone().unwrap_or_default();
409    }
410
411    pub fn confirm_filter(&mut self) {
412        self.input_mode = InputMode::Normal;
413        if self.filter_buf.is_empty() {
414            self.filter = None;
415        } else {
416            self.filter = Some(self.filter_buf.clone());
417        }
418    }
419
420    pub fn cancel_filter(&mut self) {
421        self.input_mode = InputMode::Normal;
422        self.filter_buf.clear();
423    }
424
425    pub fn clear_filter(&mut self) {
426        self.filter = None;
427        self.filter_buf.clear();
428    }
429
430    pub fn push_output(&mut self, process: &str, stream: Stream, line: &str) {
431        let buf = self
432            .buffers
433            .entry(process.to_string())
434            .or_insert_with(|| OutputBuffer::new(MAX_BUFFER_LINES));
435        let source = match stream {
436            Stream::Stdout => LineSource::Stdout,
437            Stream::Stderr => LineSource::Stderr,
438        };
439        buf.push(source, line.to_string());
440    }
441
442    pub fn quit(&mut self) {
443        self.running = false;
444    }
445
446    pub fn quit_and_stop(&mut self) {
447        self.stop_all_on_quit = true;
448        self.running = false;
449    }
450
451    pub fn running_count(&self) -> usize {
452        self.processes
453            .iter()
454            .filter(|p| p.state == crate::protocol::ProcessState::Running)
455            .count()
456    }
457
458    pub fn exited_count(&self) -> usize {
459        self.processes
460            .iter()
461            .filter(|p| p.state == crate::protocol::ProcessState::Exited)
462            .count()
463    }
464}
465
466#[cfg(test)]
467mod tests {
468    use super::*;
469    use crate::protocol::{ProcessInfo, ProcessState, Stream};
470
471    fn make_process(name: &str, state: ProcessState) -> ProcessInfo {
472        let exit_code = if state == ProcessState::Exited {
473            Some(0)
474        } else {
475            None
476        };
477        let uptime_secs = if state == ProcessState::Running {
478            Some(10)
479        } else {
480            None
481        };
482        ProcessInfo {
483            name: name.into(),
484            id: format!("p-{}", name),
485            pid: 100,
486            state,
487            exit_code,
488            uptime_secs,
489            command: "true".into(),
490            port: None,
491            url: None,
492        }
493    }
494
495    #[test]
496    fn test_select_next_wraps() {
497        let mut app = App::new();
498        app.update_processes(vec![
499            make_process("a", ProcessState::Running),
500            make_process("b", ProcessState::Running),
501            make_process("c", ProcessState::Running),
502        ]);
503        app.selected = 2; // last item
504        app.select_next();
505        assert_eq!(app.selected, 0);
506    }
507
508    #[test]
509    fn test_select_prev_wraps() {
510        let mut app = App::new();
511        app.update_processes(vec![
512            make_process("a", ProcessState::Running),
513            make_process("b", ProcessState::Running),
514            make_process("c", ProcessState::Running),
515        ]);
516        app.selected = 0;
517        app.select_prev();
518        assert_eq!(app.selected, 2);
519    }
520
521    #[test]
522    fn test_cycle_stream_mode() {
523        let mut app = App::new();
524        assert_eq!(app.stream_mode, StreamMode::Stdout);
525        app.cycle_stream_mode();
526        assert_eq!(app.stream_mode, StreamMode::Stderr);
527        app.cycle_stream_mode();
528        assert_eq!(app.stream_mode, StreamMode::Both);
529        app.cycle_stream_mode();
530        assert_eq!(app.stream_mode, StreamMode::Stdout);
531    }
532
533    #[test]
534    fn test_toggle_pause() {
535        let mut app = App::new();
536        assert!(!app.paused);
537        app.toggle_pause();
538        assert!(app.paused);
539        app.toggle_pause();
540        assert!(!app.paused);
541    }
542
543    #[test]
544    fn test_push_output() {
545        let mut app = App::new();
546        app.push_output("web", Stream::Stdout, "hello world");
547        let buf = app.buffers.get("web").unwrap();
548        assert_eq!(buf.stdout_lines().count(), 1);
549        assert_eq!(buf.stdout_lines().next().unwrap(), "hello world");
550    }
551
552    #[test]
553    fn test_running_count() {
554        let mut app = App::new();
555        app.update_processes(vec![
556            make_process("a", ProcessState::Running),
557            make_process("b", ProcessState::Exited),
558            make_process("c", ProcessState::Running),
559        ]);
560        assert_eq!(app.running_count(), 2);
561    }
562
563    #[test]
564    fn test_exited_count() {
565        let mut app = App::new();
566        app.update_processes(vec![
567            make_process("a", ProcessState::Running),
568            make_process("b", ProcessState::Exited),
569            make_process("c", ProcessState::Exited),
570        ]);
571        assert_eq!(app.exited_count(), 2);
572    }
573
574    #[test]
575    fn test_output_buffer_counters() {
576        let mut buf = OutputBuffer::new(5);
577        assert_eq!(buf.len(), 0);
578        assert_eq!(buf.stdout_count(), 0);
579        assert_eq!(buf.stderr_count(), 0);
580        assert!(buf.is_empty());
581
582        buf.push(LineSource::Stdout, "a".into());
583        buf.push(LineSource::Stderr, "b".into());
584        buf.push(LineSource::Stdout, "c".into());
585        assert_eq!(buf.len(), 3);
586        assert_eq!(buf.stdout_count(), 2);
587        assert_eq!(buf.stderr_count(), 1);
588
589        // Fill to capacity and trigger eviction
590        buf.push(LineSource::Stdout, "d".into());
591        buf.push(LineSource::Stderr, "e".into());
592        assert_eq!(buf.len(), 5);
593        assert_eq!(buf.stdout_count(), 3);
594        assert_eq!(buf.stderr_count(), 2);
595
596        // Push one more — evicts "a" (Stdout)
597        buf.push(LineSource::Stderr, "f".into());
598        assert_eq!(buf.len(), 5);
599        assert_eq!(buf.stdout_count(), 2);
600        assert_eq!(buf.stderr_count(), 3);
601    }
602
603    #[test]
604    fn test_visible_lines_hot_buffer_only() {
605        let mut app = App::new();
606        // No disk readers, just hot buffer
607        for i in 0..20 {
608            app.push_output("web", Stream::Stdout, &format!("line {}", i));
609        }
610        app.update_processes(vec![make_process("web", ProcessState::Running)]);
611        app.visible_height = 10;
612
613        // Unpaused: should get the last 10 lines
614        let lines = app.visible_lines("web", 10).unwrap();
615        assert_eq!(lines.len(), 10);
616        assert_eq!(lines[0].1, "line 10");
617        assert_eq!(lines[9].1, "line 19");
618    }
619
620    #[test]
621    fn test_visible_lines_paused_scrolled() {
622        let mut app = App::new();
623        for i in 0..50 {
624            app.push_output("web", Stream::Stdout, &format!("line {}", i));
625        }
626        app.update_processes(vec![make_process("web", ProcessState::Running)]);
627        app.visible_height = 10;
628        app.paused = true;
629        app.scroll_offsets.insert("web".into(), 20);
630
631        let lines = app.visible_lines("web", 10).unwrap();
632        assert_eq!(lines.len(), 10);
633        // 50 total, scroll_offset=20, visible=10 → window [20..30)
634        assert_eq!(lines[0].1, "line 20");
635        assert_eq!(lines[9].1, "line 29");
636    }
637
638    #[test]
639    fn test_visible_lines_with_disk_reader() {
640        use crate::daemon::log_index::{IndexRecord, IndexWriter, idx_path_for};
641        use crate::tui::disk_log_reader::DiskLogReader;
642
643        let dir = tempfile::tempdir().unwrap();
644
645        // Create a disk log with 100 lines
646        let log_path = dir.path().join("web.stdout");
647        let idx_path = idx_path_for(&log_path);
648        let mut log_content = String::new();
649        let mut writer = IndexWriter::create(&idx_path, 0).unwrap();
650        let mut offset = 0u64;
651        for i in 0..100 {
652            let line = format!("disk line {}", i);
653            writer
654                .append(IndexRecord {
655                    byte_offset: offset,
656                    seq: i,
657                })
658                .unwrap();
659            log_content.push_str(&line);
660            log_content.push('\n');
661            offset += line.len() as u64 + 1;
662        }
663        writer.flush().unwrap();
664        std::fs::write(&log_path, log_content).unwrap();
665
666        let mut app = App::new();
667        app.disk_readers.insert(
668            "web".into(),
669            DiskLogReader::new(dir.path().to_path_buf(), "web".into()),
670        );
671
672        // Push the last 10 lines into hot buffer (simulating live streaming)
673        for i in 90..100 {
674            app.push_output("web", Stream::Stdout, &format!("disk line {}", i));
675        }
676
677        app.update_processes(vec![make_process("web", ProcessState::Running)]);
678        app.visible_height = 10;
679
680        // Total should be 100 (disk is authoritative)
681        assert_eq!(app.total_line_count("web"), 100);
682
683        // Unpaused: last 10 lines from hot buffer
684        let lines = app.visible_lines("web", 10).unwrap();
685        assert_eq!(lines.len(), 10);
686        assert_eq!(lines[0].1, "disk line 90");
687        assert_eq!(lines[9].1, "disk line 99");
688
689        // Scroll to top: should read from disk
690        app.paused = true;
691        app.scroll_offsets.insert("web".into(), 90);
692        let lines = app.visible_lines("web", 10).unwrap();
693        assert_eq!(lines.len(), 10);
694        assert_eq!(lines[0].1, "disk line 0");
695        assert_eq!(lines[9].1, "disk line 9");
696
697        // Scroll to middle (spanning disk/hot boundary)
698        // hot buffer has lines 90-99, disk boundary = 100 - 10 = 90
699        // scroll_offset=5 → window_end=95, window_start=85
700        // lines 85-89 from disk, lines 90-94 from hot
701        app.scroll_offsets.insert("web".into(), 5);
702        let lines = app.visible_lines("web", 10).unwrap();
703        assert_eq!(lines.len(), 10);
704        assert_eq!(lines[0].1, "disk line 85");
705        assert_eq!(lines[4].1, "disk line 89");
706        assert_eq!(lines[5].1, "disk line 90");
707        assert_eq!(lines[9].1, "disk line 94");
708    }
709
710    #[test]
711    fn test_visible_lines_returns_none_when_filtered() {
712        let mut app = App::new();
713        app.push_output("web", Stream::Stdout, "hello");
714        app.filter = Some("hello".into());
715
716        // Should return None so caller falls back to hot buffer filtering
717        assert!(app.visible_lines("web", 10).is_none());
718    }
719
720    #[test]
721    fn test_hot_line_count_with_and_without_filter() {
722        let mut app = App::new();
723        app.push_output("web", Stream::Stdout, "hello world");
724        app.push_output("web", Stream::Stdout, "goodbye world");
725        app.push_output("web", Stream::Stdout, "hello again");
726
727        // Unfiltered: O(1) count
728        assert_eq!(app.hot_line_count("web", None), 3);
729
730        // Filtered: O(n) scan
731        assert_eq!(app.hot_line_count("web", Some("hello")), 2);
732        assert_eq!(app.hot_line_count("web", Some("xyz")), 0);
733    }
734}