Skip to main content

agent_procs/tui/
event_loop.rs

1use super::app::App;
2use super::disk_log_reader::DiskLogReader;
3use super::input;
4use crate::cli;
5use crate::cli::logs::tail_file;
6use crate::paths;
7use crate::protocol::{Request, Response, Stream as ProtoStream};
8use crossterm::event::{Event, EventStream, MouseButton, MouseEventKind};
9use futures::StreamExt;
10use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
11use tokio::sync::mpsc;
12use tokio::time::{Duration, interval};
13
14pub enum AppEvent {
15    Key(crossterm::event::KeyEvent),
16    Mouse(crossterm::event::MouseEvent),
17    OutputLine {
18        process: String,
19        stream: ProtoStream,
20        line: String,
21    },
22    StatusUpdate(Vec<crate::protocol::ProcessInfo>),
23    OutputStreamClosed,
24}
25
26enum ReconnectState {
27    Connected,
28    Reconnecting { attempt: u32 },
29    Failed,
30}
31
32const MAX_RECONNECT_ATTEMPTS: u32 = 10;
33
34pub struct TuiEventLoop {
35    rx: mpsc::Receiver<AppEvent>,
36    tx: mpsc::Sender<AppEvent>,
37    session: String,
38    reconnect_state: ReconnectState,
39}
40
41impl TuiEventLoop {
42    pub fn new(session: &str) -> Self {
43        let (tx, rx) = mpsc::channel::<AppEvent>(256);
44        Self {
45            rx,
46            tx,
47            session: session.to_string(),
48            reconnect_state: ReconnectState::Connected,
49        }
50    }
51
52    /// Spawn all background tasks (output stream, status poller, key reader, SIGTERM).
53    pub fn spawn_background_tasks(&self) {
54        let session_str = self.session.clone();
55        let output_tx = self.tx.clone();
56        tokio::spawn(async move {
57            output_stream_reader(&session_str, output_tx).await;
58        });
59
60        let session_str = self.session.clone();
61        let status_tx = self.tx.clone();
62        tokio::spawn(async move {
63            status_poller(&session_str, status_tx).await;
64        });
65
66        let key_tx = self.tx.clone();
67        tokio::spawn(async move {
68            key_reader(key_tx).await;
69        });
70
71        let sigterm_tx = self.tx.clone();
72        tokio::spawn(async move {
73            if let Ok(mut sig) =
74                tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate())
75            {
76                sig.recv().await;
77                let _ = sigterm_tx
78                    .send(AppEvent::Key(crossterm::event::KeyEvent::new(
79                        crossterm::event::KeyCode::Char('q'),
80                        crossterm::event::KeyModifiers::empty(),
81                    )))
82                    .await;
83            }
84        });
85    }
86
87    /// Run the main event loop, dispatching events to the app and drawing the terminal.
88    pub async fn run(
89        &mut self,
90        terminal: &mut ratatui::Terminal<ratatui::prelude::CrosstermBackend<std::io::Stdout>>,
91        app: &mut App,
92    ) {
93        while app.running {
94            if let Err(e) = terminal.draw(|f| super::ui::draw(f, app)) {
95                eprintln!("error: terminal draw failed: {}", e);
96                break;
97            }
98
99            if let Some(event) = self.rx.recv().await {
100                match event {
101                    AppEvent::Key(key) => {
102                        self.handle_key(app, key).await;
103                    }
104                    AppEvent::Mouse(mouse) => {
105                        Self::handle_mouse(app, mouse);
106                    }
107                    AppEvent::OutputLine {
108                        process,
109                        stream,
110                        line,
111                    } => {
112                        self.reconnect_state = ReconnectState::Connected;
113                        app.push_output(&process, stream, &line);
114                    }
115                    AppEvent::StatusUpdate(processes) => {
116                        // Create disk readers for new processes
117                        let log_dir = paths::log_dir(&self.session);
118                        for p in &processes {
119                            if !app.disk_readers.contains_key(&p.name) {
120                                app.disk_readers.insert(
121                                    p.name.clone(),
122                                    DiskLogReader::new(log_dir.clone(), p.name.clone()),
123                                );
124                            }
125                        }
126                        app.update_processes(processes);
127                    }
128                    AppEvent::OutputStreamClosed => {
129                        self.handle_reconnect();
130                    }
131                }
132            }
133        }
134
135        if app.stop_all_on_quit {
136            let _ = cli::request(&self.session, &Request::Shutdown, false).await;
137        }
138    }
139
140    async fn handle_key(&self, app: &mut App, key: crossterm::event::KeyEvent) {
141        if app.input_mode == super::app::InputMode::FilterInput {
142            match input::handle_filter_key(key) {
143                input::FilterAction::Char(c) => app.filter_buf.push(c),
144                input::FilterAction::Backspace => {
145                    app.filter_buf.pop();
146                }
147                input::FilterAction::Confirm => app.confirm_filter(),
148                input::FilterAction::Cancel => app.cancel_filter(),
149            }
150        } else {
151            let action = input::handle_key(key);
152            match action {
153                input::Action::SelectNext => app.select_next(),
154                input::Action::SelectPrev => app.select_prev(),
155                input::Action::CycleStream => app.cycle_stream_mode(),
156                input::Action::TogglePause => app.toggle_pause(),
157                input::Action::ScrollUp => app.scroll_up(),
158                input::Action::ScrollDown => app.scroll_down(),
159                input::Action::ScrollToTop => app.scroll_to_top(),
160                input::Action::ScrollToBottom => app.scroll_to_bottom(),
161                input::Action::StartFilter => app.start_filter(),
162                input::Action::ClearFilter => app.clear_filter(),
163                input::Action::Quit => app.quit(),
164                input::Action::QuitAndStop => app.quit_and_stop(),
165                input::Action::Stop => {
166                    if let Some(name) = app.selected_name() {
167                        let _ = cli::request(
168                            &self.session,
169                            &Request::Stop {
170                                target: name.to_string(),
171                            },
172                            false,
173                        )
174                        .await;
175                    }
176                }
177                input::Action::StopAll => {
178                    let _ = cli::request(&self.session, &Request::StopAll, false).await;
179                }
180                input::Action::Restart => {
181                    if let Some(name) = app.selected_name() {
182                        let _ = cli::request(
183                            &self.session,
184                            &Request::Restart {
185                                target: name.to_string(),
186                            },
187                            false,
188                        )
189                        .await;
190                    }
191                }
192                input::Action::None => {}
193            }
194        }
195    }
196
197    fn handle_mouse(app: &mut App, mouse: crossterm::event::MouseEvent) {
198        const MOUSE_SCROLL_LINES: usize = 3;
199        match mouse.kind {
200            MouseEventKind::ScrollUp => app.scroll_up_by(MOUSE_SCROLL_LINES),
201            MouseEventKind::ScrollDown => app.scroll_down_by(MOUSE_SCROLL_LINES),
202            MouseEventKind::Down(MouseButton::Left) => {
203                if mouse.column < 22 {
204                    let row = mouse.row.saturating_sub(1) as usize;
205                    if row < app.processes.len() {
206                        app.selected = row;
207                    }
208                }
209            }
210            _ => {}
211        }
212    }
213
214    fn handle_reconnect(&mut self) {
215        let attempt = match self.reconnect_state {
216            ReconnectState::Connected => 0,
217            ReconnectState::Reconnecting { attempt } => attempt,
218            ReconnectState::Failed => return,
219        };
220
221        if attempt >= MAX_RECONNECT_ATTEMPTS {
222            self.reconnect_state = ReconnectState::Failed;
223            return;
224        }
225
226        self.reconnect_state = ReconnectState::Reconnecting {
227            attempt: attempt + 1,
228        };
229
230        let session_str = self.session.clone();
231        let reconnect_tx = self.tx.clone();
232        tokio::spawn(async move {
233            let delay = Duration::from_secs(2u64.saturating_pow(attempt).min(30));
234            tokio::time::sleep(delay).await;
235            output_stream_reader(&session_str, reconnect_tx).await;
236        });
237    }
238}
239
240pub async fn output_stream_reader(session: &str, tx: mpsc::Sender<AppEvent>) {
241    let stream = match cli::connect(session, false).await {
242        Ok(s) => s,
243        Err(_) => {
244            let _ = tx.send(AppEvent::OutputStreamClosed).await;
245            return;
246        }
247    };
248
249    let (reader, mut writer) = stream.into_split();
250
251    let req = Request::Logs {
252        target: None,
253        tail: 0,
254        follow: true,
255        stderr: false,
256        all: true,
257        timeout_secs: None,
258        lines: None,
259    };
260    let mut json = match serde_json::to_string(&req) {
261        Ok(j) => j,
262        Err(_) => return,
263    };
264    json.push('\n');
265    if writer.write_all(json.as_bytes()).await.is_err() {
266        return;
267    }
268    if writer.flush().await.is_err() {
269        return;
270    }
271
272    let mut lines = BufReader::new(reader);
273    loop {
274        let mut line = String::new();
275        match lines.read_line(&mut line).await {
276            Ok(0) | Err(_) => break,
277            Ok(_) => {
278                if let Ok(resp) = serde_json::from_str::<Response>(&line) {
279                    match resp {
280                        Response::LogLine {
281                            process,
282                            stream,
283                            line,
284                        } => {
285                            let _ = tx
286                                .send(AppEvent::OutputLine {
287                                    process,
288                                    stream,
289                                    line,
290                                })
291                                .await;
292                        }
293                        Response::LogEnd => break,
294                        _ => {}
295                    }
296                }
297            }
298        }
299    }
300
301    let _ = tx.send(AppEvent::OutputStreamClosed).await;
302}
303
304pub async fn status_poller(session: &str, tx: mpsc::Sender<AppEvent>) {
305    let mut ticker = interval(Duration::from_secs(2));
306    loop {
307        ticker.tick().await;
308        if let Ok(Response::Status { processes }) =
309            cli::request(session, &Request::Status, false).await
310            && tx.send(AppEvent::StatusUpdate(processes)).await.is_err()
311        {
312            break;
313        }
314    }
315}
316
317pub async fn key_reader(tx: mpsc::Sender<AppEvent>) {
318    let mut reader = EventStream::new();
319    while let Some(Ok(event)) = reader.next().await {
320        let app_event = match event {
321            Event::Key(key) => AppEvent::Key(key),
322            Event::Mouse(mouse) => AppEvent::Mouse(mouse),
323            _ => continue,
324        };
325        if tx.send(app_event).await.is_err() {
326            break;
327        }
328    }
329}
330
331/// Initialize disk readers and pre-populate the hot buffer with recent lines.
332///
333/// Scans the log directory to discover process names, creates a `DiskLogReader`
334/// for each, and loads the last 1000 lines per stream into the hot buffer so
335/// the initial view has content before live lines arrive.
336pub fn init_disk_readers(session: &str, app: &mut App) {
337    let log_dir = paths::log_dir(session);
338
339    let entries = match std::fs::read_dir(&log_dir) {
340        Ok(e) => e,
341        Err(_) => return,
342    };
343
344    // Discover process names from existing log files
345    let mut seen_processes: std::collections::HashSet<String> = std::collections::HashSet::new();
346    let mut file_entries: Vec<(String, ProtoStream, std::path::PathBuf)> = Vec::new();
347
348    for entry in entries.flatten() {
349        let name = entry.file_name().to_string_lossy().to_string();
350        // Match current log files (e.g., "web.stdout", "web.stderr")
351        // Skip rotated (.1, .2, ...) and index (.idx) files for hot buffer loading
352        let (proc_name, stream) = if let Some(p) = name.strip_suffix(".stdout") {
353            (p.to_string(), ProtoStream::Stdout)
354        } else if let Some(p) = name.strip_suffix(".stderr") {
355            (p.to_string(), ProtoStream::Stderr)
356        } else {
357            continue;
358        };
359
360        seen_processes.insert(proc_name.clone());
361        file_entries.push((proc_name, stream, entry.path()));
362    }
363
364    // Create disk readers for all discovered processes
365    for proc_name in &seen_processes {
366        app.disk_readers.insert(
367            proc_name.clone(),
368            DiskLogReader::new(log_dir.clone(), proc_name.clone()),
369        );
370    }
371
372    // Pre-populate hot buffer with last 1000 lines from current log files
373    for (proc_name, stream, path) in &file_entries {
374        if let Ok(lines) = tail_file(path, 1000) {
375            for line in lines {
376                app.push_output(proc_name, *stream, &line);
377            }
378        }
379    }
380}