1use crate::protocol::{ProcessInfo, Stream};
2use crate::tui::disk_log_reader::DiskLogReader;
3use std::collections::{HashMap, VecDeque};
4
5const MAX_BUFFER_LINES: usize = 10_000;
6
7#[derive(Debug, Clone, Copy, PartialEq)]
8pub enum StreamMode {
9 Stdout,
10 Stderr,
11 Both,
12}
13
14#[derive(Debug, Clone, Copy, PartialEq)]
15pub enum LineSource {
16 Stdout,
17 Stderr,
18}
19
20pub struct OutputBuffer {
23 lines: VecDeque<(LineSource, String)>,
24 max_lines: usize,
25 stdout_count: usize,
26 stderr_count: usize,
27}
28
29impl OutputBuffer {
30 pub fn new(max_lines: usize) -> Self {
31 Self {
32 lines: VecDeque::with_capacity(max_lines),
33 max_lines,
34 stdout_count: 0,
35 stderr_count: 0,
36 }
37 }
38
39 pub fn push(&mut self, source: LineSource, line: String) {
40 if self.lines.len() == self.max_lines
41 && let Some((evicted, _)) = self.lines.pop_front()
42 {
43 match evicted {
44 LineSource::Stdout => self.stdout_count -= 1,
45 LineSource::Stderr => self.stderr_count -= 1,
46 }
47 }
48 match source {
49 LineSource::Stdout => self.stdout_count += 1,
50 LineSource::Stderr => self.stderr_count += 1,
51 }
52 self.lines.push_back((source, line));
53 }
54
55 pub fn len(&self) -> usize {
57 self.lines.len()
58 }
59
60 pub fn is_empty(&self) -> bool {
61 self.lines.is_empty()
62 }
63
64 pub fn stdout_count(&self) -> usize {
66 self.stdout_count
67 }
68
69 pub fn stderr_count(&self) -> usize {
71 self.stderr_count
72 }
73
74 pub fn stdout_lines(&self) -> impl Iterator<Item = &str> {
75 self.lines
76 .iter()
77 .filter(|(src, _)| *src == LineSource::Stdout)
78 .map(|(_, s)| s.as_str())
79 }
80
81 pub fn stderr_lines(&self) -> impl Iterator<Item = &str> {
82 self.lines
83 .iter()
84 .filter(|(src, _)| *src == LineSource::Stderr)
85 .map(|(_, s)| s.as_str())
86 }
87
88 pub fn all_lines(&self) -> impl Iterator<Item = (LineSource, &str)> {
89 self.lines.iter().map(|(src, s)| (*src, s.as_str()))
90 }
91}
92
93#[derive(Debug, Clone, Copy, PartialEq)]
95pub enum InputMode {
96 Normal,
98 FilterInput,
100}
101
102pub struct App {
103 pub processes: Vec<ProcessInfo>,
104 pub selected: usize,
105 pub buffers: HashMap<String, OutputBuffer>,
106 pub stream_mode: StreamMode,
107 pub paused: bool,
108 pub scroll_offsets: HashMap<String, usize>,
109 pub running: bool,
110 pub stop_all_on_quit: bool,
111 pub input_mode: InputMode,
112 pub filter_buf: String,
114 pub filter: Option<String>,
116 pub visible_height: usize,
118 pub disk_readers: HashMap<String, DiskLogReader>,
120}
121
122impl Default for App {
123 fn default() -> Self {
124 Self::new()
125 }
126}
127
128impl App {
129 pub fn new() -> Self {
130 Self {
131 processes: Vec::new(),
132 selected: 0,
133 buffers: HashMap::new(),
134 stream_mode: StreamMode::Stdout,
135 paused: false,
136 scroll_offsets: HashMap::new(),
137 running: true,
138 stop_all_on_quit: false,
139 input_mode: InputMode::Normal,
140 filter_buf: String::new(),
141 filter: None,
142 visible_height: 20,
143 disk_readers: HashMap::new(),
144 }
145 }
146
147 pub fn update_processes(&mut self, processes: Vec<ProcessInfo>) {
148 self.processes = processes;
149 if self.selected >= self.processes.len() && !self.processes.is_empty() {
150 self.selected = self.processes.len() - 1;
151 }
152 }
153
154 pub fn selected_name(&self) -> Option<&str> {
155 self.processes.get(self.selected).map(|p| p.name.as_str())
156 }
157
158 pub fn select_next(&mut self) {
159 if !self.processes.is_empty() {
160 self.selected = (self.selected + 1) % self.processes.len();
161 }
162 }
163
164 pub fn select_prev(&mut self) {
165 if !self.processes.is_empty() {
166 self.selected = if self.selected == 0 {
167 self.processes.len() - 1
168 } else {
169 self.selected - 1
170 };
171 }
172 }
173
174 pub fn cycle_stream_mode(&mut self) {
175 self.stream_mode = match self.stream_mode {
176 StreamMode::Stdout => StreamMode::Stderr,
177 StreamMode::Stderr => StreamMode::Both,
178 StreamMode::Both => StreamMode::Stdout,
179 };
180 }
181
182 pub fn toggle_pause(&mut self) {
183 self.paused = !self.paused;
184 if !self.paused {
185 if let Some(name) = self.processes.get(self.selected).map(|p| p.name.clone()) {
187 self.scroll_offsets.remove(&name);
188 }
189 }
190 }
191
192 pub fn scroll_up_by(&mut self, lines: usize) {
195 if !self.paused {
196 self.paused = true;
197 }
198 if let Some(name) = self.selected_name().map(str::to_string) {
199 let max_offset = self
200 .line_count_for(&name)
201 .saturating_sub(self.visible_height);
202 let offset = self.scroll_offsets.entry(name).or_insert(0);
203 *offset = offset.saturating_add(lines).min(max_offset);
204 }
205 }
206
207 pub fn scroll_up(&mut self) {
209 let half_page = (self.visible_height / 2).max(1);
210 self.scroll_up_by(half_page);
211 }
212
213 pub fn scroll_down_by(&mut self, lines: usize) {
215 if let Some(name) = self.selected_name().map(str::to_string) {
216 let offset = self.scroll_offsets.entry(name).or_insert(0);
217 *offset = offset.saturating_sub(lines);
218 if *offset == 0 {
219 self.paused = false;
220 }
221 }
222 }
223
224 pub fn scroll_down(&mut self) {
226 let half_page = (self.visible_height / 2).max(1);
227 self.scroll_down_by(half_page);
228 }
229
230 pub fn scroll_to_top(&mut self) {
231 if !self.paused {
232 self.paused = true;
233 }
234 if let Some(name) = self.selected_name().map(str::to_string) {
235 let total = self.line_count_for(&name);
236 self.scroll_offsets.insert(name, total);
237 }
238 }
239
240 pub fn scroll_to_bottom(&mut self) {
241 if let Some(name) = self.selected_name().map(str::to_string) {
242 self.scroll_offsets.remove(&name);
243 self.paused = false;
244 }
245 }
246
247 fn line_count_for(&mut self, name: &str) -> usize {
251 if self.filter.is_some() {
252 return self.hot_line_count(name, self.filter.as_deref());
253 }
254 self.total_line_count(name)
255 }
256
257 fn total_line_count(&mut self, name: &str) -> usize {
261 let hot = self.hot_line_count(name, None);
262 let disk = self.disk_line_count(name);
263 disk.max(hot)
264 }
265
266 fn disk_line_count(&mut self, name: &str) -> usize {
268 let mode = self.stream_mode;
269 self.disk_readers.get_mut(name).map_or(0, |r| match mode {
270 StreamMode::Stdout => r.line_count(LineSource::Stdout),
271 StreamMode::Stderr => r.line_count(LineSource::Stderr),
272 StreamMode::Both => r.line_count_both(),
273 })
274 }
275
276 fn hot_line_count(&self, name: &str, filter: Option<&str>) -> usize {
279 let Some(buf) = self.buffers.get(name) else {
280 return 0;
281 };
282 if let Some(pat) = filter {
283 let matches = |line: &str| line.contains(pat);
284 return match self.stream_mode {
285 StreamMode::Stdout => buf.stdout_lines().filter(|l| matches(l)).count(),
286 StreamMode::Stderr => buf.stderr_lines().filter(|l| matches(l)).count(),
287 StreamMode::Both => buf.all_lines().filter(|(_, l)| matches(l)).count(),
288 };
289 }
290 match self.stream_mode {
291 StreamMode::Stdout => buf.stdout_count(),
292 StreamMode::Stderr => buf.stderr_count(),
293 StreamMode::Both => buf.len(),
294 }
295 }
296
297 pub fn visible_lines(
302 &mut self,
303 name: &str,
304 visible_height: usize,
305 ) -> Option<Vec<(LineSource, String)>> {
306 if self.filter.is_some() {
307 return None; }
309
310 let total = self.total_line_count(name);
311 let scroll_offset = if self.paused {
312 self.scroll_offsets.get(name).copied().unwrap_or(0)
313 } else {
314 0
315 };
316
317 let window_end = total.saturating_sub(scroll_offset);
318 let window_start = window_end.saturating_sub(visible_height);
319 let count = window_end - window_start;
320
321 if count == 0 {
322 return Some(Vec::new());
323 }
324
325 let hot_len = self.hot_line_count(name, None);
326 let disk_count = self.disk_line_count(name);
327 let disk_boundary = disk_count.saturating_sub(hot_len);
329
330 if window_start >= disk_boundary {
331 let hot_start = window_start - disk_boundary;
333 Some(self.hot_buffer_range(name, hot_start, count))
334 } else if window_end <= disk_boundary {
335 Some(self.disk_read_range(name, window_start, count))
337 } else {
338 let disk_part = disk_boundary - window_start;
340 let hot_part = window_end - disk_boundary;
341 let mut lines = self.disk_read_range(name, window_start, disk_part);
342 lines.extend(self.hot_buffer_range(name, 0, hot_part));
343 Some(lines)
344 }
345 }
346
347 fn hot_buffer_range(
349 &self,
350 name: &str,
351 start: usize,
352 count: usize,
353 ) -> Vec<(LineSource, String)> {
354 let Some(buf) = self.buffers.get(name) else {
355 return Vec::new();
356 };
357 match self.stream_mode {
358 StreamMode::Stdout => buf
359 .stdout_lines()
360 .skip(start)
361 .take(count)
362 .map(|l| (LineSource::Stdout, l.to_string()))
363 .collect(),
364 StreamMode::Stderr => buf
365 .stderr_lines()
366 .skip(start)
367 .take(count)
368 .map(|l| (LineSource::Stderr, l.to_string()))
369 .collect(),
370 StreamMode::Both => buf
371 .all_lines()
372 .skip(start)
373 .take(count)
374 .map(|(src, l)| (src, l.to_string()))
375 .collect(),
376 }
377 }
378
379 fn disk_read_range(
381 &mut self,
382 name: &str,
383 start: usize,
384 count: usize,
385 ) -> Vec<(LineSource, String)> {
386 let Some(reader) = self.disk_readers.get_mut(name) else {
387 return Vec::new();
388 };
389 match self.stream_mode {
390 StreamMode::Stdout => reader
391 .read_lines(LineSource::Stdout, start, count)
392 .unwrap_or_default()
393 .into_iter()
394 .map(|l| (LineSource::Stdout, l))
395 .collect(),
396 StreamMode::Stderr => reader
397 .read_lines(LineSource::Stderr, start, count)
398 .unwrap_or_default()
399 .into_iter()
400 .map(|l| (LineSource::Stderr, l))
401 .collect(),
402 StreamMode::Both => reader.read_interleaved(start, count).unwrap_or_default(),
403 }
404 }
405
406 pub fn start_filter(&mut self) {
407 self.input_mode = InputMode::FilterInput;
408 self.filter_buf = self.filter.clone().unwrap_or_default();
409 }
410
411 pub fn confirm_filter(&mut self) {
412 self.input_mode = InputMode::Normal;
413 if self.filter_buf.is_empty() {
414 self.filter = None;
415 } else {
416 self.filter = Some(self.filter_buf.clone());
417 }
418 }
419
420 pub fn cancel_filter(&mut self) {
421 self.input_mode = InputMode::Normal;
422 self.filter_buf.clear();
423 }
424
425 pub fn clear_filter(&mut self) {
426 self.filter = None;
427 self.filter_buf.clear();
428 }
429
430 pub fn push_output(&mut self, process: &str, stream: Stream, line: &str) {
431 let buf = self
432 .buffers
433 .entry(process.to_string())
434 .or_insert_with(|| OutputBuffer::new(MAX_BUFFER_LINES));
435 let source = match stream {
436 Stream::Stdout => LineSource::Stdout,
437 Stream::Stderr => LineSource::Stderr,
438 };
439 buf.push(source, line.to_string());
440 }
441
442 pub fn quit(&mut self) {
443 self.running = false;
444 }
445
446 pub fn quit_and_stop(&mut self) {
447 self.stop_all_on_quit = true;
448 self.running = false;
449 }
450
451 pub fn running_count(&self) -> usize {
452 self.processes
453 .iter()
454 .filter(|p| p.state == crate::protocol::ProcessState::Running)
455 .count()
456 }
457
458 pub fn exited_count(&self) -> usize {
459 self.processes
460 .iter()
461 .filter(|p| p.state == crate::protocol::ProcessState::Exited)
462 .count()
463 }
464}
465
466#[cfg(test)]
467mod tests {
468 use super::*;
469 use crate::protocol::{ProcessInfo, ProcessState, Stream};
470
471 fn make_process(name: &str, state: ProcessState) -> ProcessInfo {
472 let exit_code = if state == ProcessState::Exited {
473 Some(0)
474 } else {
475 None
476 };
477 let uptime_secs = if state == ProcessState::Running {
478 Some(10)
479 } else {
480 None
481 };
482 ProcessInfo {
483 name: name.into(),
484 id: format!("p-{}", name),
485 pid: 100,
486 state,
487 exit_code,
488 uptime_secs,
489 command: "true".into(),
490 port: None,
491 url: None,
492 }
493 }
494
495 #[test]
496 fn test_select_next_wraps() {
497 let mut app = App::new();
498 app.update_processes(vec![
499 make_process("a", ProcessState::Running),
500 make_process("b", ProcessState::Running),
501 make_process("c", ProcessState::Running),
502 ]);
503 app.selected = 2; app.select_next();
505 assert_eq!(app.selected, 0);
506 }
507
508 #[test]
509 fn test_select_prev_wraps() {
510 let mut app = App::new();
511 app.update_processes(vec![
512 make_process("a", ProcessState::Running),
513 make_process("b", ProcessState::Running),
514 make_process("c", ProcessState::Running),
515 ]);
516 app.selected = 0;
517 app.select_prev();
518 assert_eq!(app.selected, 2);
519 }
520
521 #[test]
522 fn test_cycle_stream_mode() {
523 let mut app = App::new();
524 assert_eq!(app.stream_mode, StreamMode::Stdout);
525 app.cycle_stream_mode();
526 assert_eq!(app.stream_mode, StreamMode::Stderr);
527 app.cycle_stream_mode();
528 assert_eq!(app.stream_mode, StreamMode::Both);
529 app.cycle_stream_mode();
530 assert_eq!(app.stream_mode, StreamMode::Stdout);
531 }
532
533 #[test]
534 fn test_toggle_pause() {
535 let mut app = App::new();
536 assert!(!app.paused);
537 app.toggle_pause();
538 assert!(app.paused);
539 app.toggle_pause();
540 assert!(!app.paused);
541 }
542
543 #[test]
544 fn test_push_output() {
545 let mut app = App::new();
546 app.push_output("web", Stream::Stdout, "hello world");
547 let buf = app.buffers.get("web").unwrap();
548 assert_eq!(buf.stdout_lines().count(), 1);
549 assert_eq!(buf.stdout_lines().next().unwrap(), "hello world");
550 }
551
552 #[test]
553 fn test_running_count() {
554 let mut app = App::new();
555 app.update_processes(vec![
556 make_process("a", ProcessState::Running),
557 make_process("b", ProcessState::Exited),
558 make_process("c", ProcessState::Running),
559 ]);
560 assert_eq!(app.running_count(), 2);
561 }
562
563 #[test]
564 fn test_exited_count() {
565 let mut app = App::new();
566 app.update_processes(vec![
567 make_process("a", ProcessState::Running),
568 make_process("b", ProcessState::Exited),
569 make_process("c", ProcessState::Exited),
570 ]);
571 assert_eq!(app.exited_count(), 2);
572 }
573
574 #[test]
575 fn test_output_buffer_counters() {
576 let mut buf = OutputBuffer::new(5);
577 assert_eq!(buf.len(), 0);
578 assert_eq!(buf.stdout_count(), 0);
579 assert_eq!(buf.stderr_count(), 0);
580 assert!(buf.is_empty());
581
582 buf.push(LineSource::Stdout, "a".into());
583 buf.push(LineSource::Stderr, "b".into());
584 buf.push(LineSource::Stdout, "c".into());
585 assert_eq!(buf.len(), 3);
586 assert_eq!(buf.stdout_count(), 2);
587 assert_eq!(buf.stderr_count(), 1);
588
589 buf.push(LineSource::Stdout, "d".into());
591 buf.push(LineSource::Stderr, "e".into());
592 assert_eq!(buf.len(), 5);
593 assert_eq!(buf.stdout_count(), 3);
594 assert_eq!(buf.stderr_count(), 2);
595
596 buf.push(LineSource::Stderr, "f".into());
598 assert_eq!(buf.len(), 5);
599 assert_eq!(buf.stdout_count(), 2);
600 assert_eq!(buf.stderr_count(), 3);
601 }
602
603 #[test]
604 fn test_visible_lines_hot_buffer_only() {
605 let mut app = App::new();
606 for i in 0..20 {
608 app.push_output("web", Stream::Stdout, &format!("line {}", i));
609 }
610 app.update_processes(vec![make_process("web", ProcessState::Running)]);
611 app.visible_height = 10;
612
613 let lines = app.visible_lines("web", 10).unwrap();
615 assert_eq!(lines.len(), 10);
616 assert_eq!(lines[0].1, "line 10");
617 assert_eq!(lines[9].1, "line 19");
618 }
619
620 #[test]
621 fn test_visible_lines_paused_scrolled() {
622 let mut app = App::new();
623 for i in 0..50 {
624 app.push_output("web", Stream::Stdout, &format!("line {}", i));
625 }
626 app.update_processes(vec![make_process("web", ProcessState::Running)]);
627 app.visible_height = 10;
628 app.paused = true;
629 app.scroll_offsets.insert("web".into(), 20);
630
631 let lines = app.visible_lines("web", 10).unwrap();
632 assert_eq!(lines.len(), 10);
633 assert_eq!(lines[0].1, "line 20");
635 assert_eq!(lines[9].1, "line 29");
636 }
637
638 #[test]
639 fn test_visible_lines_with_disk_reader() {
640 use crate::daemon::log_index::{IndexRecord, IndexWriter, idx_path_for};
641 use crate::tui::disk_log_reader::DiskLogReader;
642
643 let dir = tempfile::tempdir().unwrap();
644
645 let log_path = dir.path().join("web.stdout");
647 let idx_path = idx_path_for(&log_path);
648 let mut log_content = String::new();
649 let mut writer = IndexWriter::create(&idx_path, 0).unwrap();
650 let mut offset = 0u64;
651 for i in 0..100 {
652 let line = format!("disk line {}", i);
653 writer
654 .append(IndexRecord {
655 byte_offset: offset,
656 seq: i,
657 })
658 .unwrap();
659 log_content.push_str(&line);
660 log_content.push('\n');
661 offset += line.len() as u64 + 1;
662 }
663 writer.flush().unwrap();
664 std::fs::write(&log_path, log_content).unwrap();
665
666 let mut app = App::new();
667 app.disk_readers.insert(
668 "web".into(),
669 DiskLogReader::new(dir.path().to_path_buf(), "web".into()),
670 );
671
672 for i in 90..100 {
674 app.push_output("web", Stream::Stdout, &format!("disk line {}", i));
675 }
676
677 app.update_processes(vec![make_process("web", ProcessState::Running)]);
678 app.visible_height = 10;
679
680 assert_eq!(app.total_line_count("web"), 100);
682
683 let lines = app.visible_lines("web", 10).unwrap();
685 assert_eq!(lines.len(), 10);
686 assert_eq!(lines[0].1, "disk line 90");
687 assert_eq!(lines[9].1, "disk line 99");
688
689 app.paused = true;
691 app.scroll_offsets.insert("web".into(), 90);
692 let lines = app.visible_lines("web", 10).unwrap();
693 assert_eq!(lines.len(), 10);
694 assert_eq!(lines[0].1, "disk line 0");
695 assert_eq!(lines[9].1, "disk line 9");
696
697 app.scroll_offsets.insert("web".into(), 5);
702 let lines = app.visible_lines("web", 10).unwrap();
703 assert_eq!(lines.len(), 10);
704 assert_eq!(lines[0].1, "disk line 85");
705 assert_eq!(lines[4].1, "disk line 89");
706 assert_eq!(lines[5].1, "disk line 90");
707 assert_eq!(lines[9].1, "disk line 94");
708 }
709
710 #[test]
711 fn test_visible_lines_returns_none_when_filtered() {
712 let mut app = App::new();
713 app.push_output("web", Stream::Stdout, "hello");
714 app.filter = Some("hello".into());
715
716 assert!(app.visible_lines("web", 10).is_none());
718 }
719
720 #[test]
721 fn test_hot_line_count_with_and_without_filter() {
722 let mut app = App::new();
723 app.push_output("web", Stream::Stdout, "hello world");
724 app.push_output("web", Stream::Stdout, "goodbye world");
725 app.push_output("web", Stream::Stdout, "hello again");
726
727 assert_eq!(app.hot_line_count("web", None), 3);
729
730 assert_eq!(app.hot_line_count("web", Some("hello")), 2);
732 assert_eq!(app.hot_line_count("web", Some("xyz")), 0);
733 }
734}