Skip to main content

agent_procs/tui/
event_loop.rs

1use crate::cli;
2use crate::paths;
3use crate::protocol::{Request, Response, Stream as ProtoStream};
4use super::app::App;
5use super::input;
6use crossterm::event::{Event, EventStream, MouseButton, MouseEventKind};
7use futures::StreamExt;
8use crate::cli::logs::tail_file;
9use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
10use tokio::sync::mpsc;
11use tokio::time::{Duration, interval};
12
13pub enum AppEvent {
14    Key(crossterm::event::KeyEvent),
15    Mouse(crossterm::event::MouseEvent),
16    OutputLine {
17        process: String,
18        stream: ProtoStream,
19        line: String,
20    },
21    StatusUpdate(Vec<crate::protocol::ProcessInfo>),
22    OutputStreamClosed,
23}
24
25enum ReconnectState {
26    Connected,
27    Reconnecting { attempt: u32 },
28    Failed,
29}
30
31const MAX_RECONNECT_ATTEMPTS: u32 = 10;
32
33pub struct TuiEventLoop {
34    rx: mpsc::Receiver<AppEvent>,
35    tx: mpsc::Sender<AppEvent>,
36    session: String,
37    reconnect_state: ReconnectState,
38}
39
40impl TuiEventLoop {
41    pub fn new(session: &str) -> Self {
42        let (tx, rx) = mpsc::channel::<AppEvent>(256);
43        Self {
44            rx,
45            tx,
46            session: session.to_string(),
47            reconnect_state: ReconnectState::Connected,
48        }
49    }
50
51    /// Spawn all background tasks (output stream, status poller, key reader, SIGTERM).
52    pub fn spawn_background_tasks(&self) {
53        let session_str = self.session.clone();
54        let output_tx = self.tx.clone();
55        tokio::spawn(async move {
56            output_stream_reader(&session_str, output_tx).await;
57        });
58
59        let session_str = self.session.clone();
60        let status_tx = self.tx.clone();
61        tokio::spawn(async move {
62            status_poller(&session_str, status_tx).await;
63        });
64
65        let key_tx = self.tx.clone();
66        tokio::spawn(async move {
67            key_reader(key_tx).await;
68        });
69
70        let sigterm_tx = self.tx.clone();
71        tokio::spawn(async move {
72            if let Ok(mut sig) =
73                tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate())
74            {
75                sig.recv().await;
76                let _ = sigterm_tx
77                    .send(AppEvent::Key(crossterm::event::KeyEvent::new(
78                        crossterm::event::KeyCode::Char('q'),
79                        crossterm::event::KeyModifiers::empty(),
80                    )))
81                    .await;
82            }
83        });
84    }
85
86    /// Run the main event loop, dispatching events to the app and drawing the terminal.
87    pub async fn run(
88        &mut self,
89        terminal: &mut ratatui::Terminal<ratatui::prelude::CrosstermBackend<std::io::Stdout>>,
90        app: &mut App,
91    ) {
92        while app.running {
93            if let Err(e) = terminal.draw(|f| super::ui::draw(f, app)) {
94                eprintln!("error: terminal draw failed: {}", e);
95                break;
96            }
97
98            if let Some(event) = self.rx.recv().await {
99                match event {
100                    AppEvent::Key(key) => {
101                        self.handle_key(app, key).await;
102                    }
103                    AppEvent::Mouse(mouse) => {
104                        Self::handle_mouse(app, mouse);
105                    }
106                    AppEvent::OutputLine {
107                        process,
108                        stream,
109                        line,
110                    } => {
111                        self.reconnect_state = ReconnectState::Connected;
112                        app.push_output(&process, stream, &line);
113                    }
114                    AppEvent::StatusUpdate(processes) => {
115                        app.update_processes(processes);
116                    }
117                    AppEvent::OutputStreamClosed => {
118                        self.handle_reconnect();
119                    }
120                }
121            }
122        }
123
124        if app.stop_all_on_quit {
125            let _ = cli::request(&self.session, &Request::Shutdown, false).await;
126        }
127    }
128
129    async fn handle_key(&self, app: &mut App, key: crossterm::event::KeyEvent) {
130        if app.input_mode == super::app::InputMode::FilterInput {
131            match input::handle_filter_key(key) {
132                input::FilterAction::Char(c) => app.filter_buf.push(c),
133                input::FilterAction::Backspace => {
134                    app.filter_buf.pop();
135                }
136                input::FilterAction::Confirm => app.confirm_filter(),
137                input::FilterAction::Cancel => app.cancel_filter(),
138            }
139        } else {
140            let action = input::handle_key(key);
141            match action {
142                input::Action::SelectNext => app.select_next(),
143                input::Action::SelectPrev => app.select_prev(),
144                input::Action::CycleStream => app.cycle_stream_mode(),
145                input::Action::TogglePause => app.toggle_pause(),
146                input::Action::ScrollUp => app.scroll_up(),
147                input::Action::ScrollDown => app.scroll_down(),
148                input::Action::ScrollToTop => app.scroll_to_top(),
149                input::Action::ScrollToBottom => app.scroll_to_bottom(),
150                input::Action::StartFilter => app.start_filter(),
151                input::Action::ClearFilter => app.clear_filter(),
152                input::Action::Quit => app.quit(),
153                input::Action::QuitAndStop => app.quit_and_stop(),
154                input::Action::Stop => {
155                    if let Some(name) = app.selected_name() {
156                        let _ = cli::request(
157                            &self.session,
158                            &Request::Stop {
159                                target: name.to_string(),
160                            },
161                            false,
162                        )
163                        .await;
164                    }
165                }
166                input::Action::StopAll => {
167                    let _ = cli::request(&self.session, &Request::StopAll, false).await;
168                }
169                input::Action::Restart => {
170                    if let Some(name) = app.selected_name() {
171                        let _ = cli::request(
172                            &self.session,
173                            &Request::Restart {
174                                target: name.to_string(),
175                            },
176                            false,
177                        )
178                        .await;
179                    }
180                }
181                input::Action::None => {}
182            }
183        }
184    }
185
186    fn handle_mouse(app: &mut App, mouse: crossterm::event::MouseEvent) {
187        match mouse.kind {
188            MouseEventKind::ScrollUp => app.scroll_up(),
189            MouseEventKind::ScrollDown => app.scroll_down(),
190            MouseEventKind::Down(MouseButton::Left) => {
191                if mouse.column < 22 {
192                    let row = mouse.row.saturating_sub(1) as usize;
193                    if row < app.processes.len() {
194                        app.selected = row;
195                    }
196                }
197            }
198            _ => {}
199        }
200    }
201
202    fn handle_reconnect(&mut self) {
203        let attempt = match self.reconnect_state {
204            ReconnectState::Connected => 0,
205            ReconnectState::Reconnecting { attempt } => attempt,
206            ReconnectState::Failed => return,
207        };
208
209        if attempt >= MAX_RECONNECT_ATTEMPTS {
210            self.reconnect_state = ReconnectState::Failed;
211            return;
212        }
213
214        self.reconnect_state = ReconnectState::Reconnecting {
215            attempt: attempt + 1,
216        };
217
218        let session_str = self.session.clone();
219        let reconnect_tx = self.tx.clone();
220        tokio::spawn(async move {
221            let delay = Duration::from_secs(2u64.saturating_pow(attempt).min(30));
222            tokio::time::sleep(delay).await;
223            output_stream_reader(&session_str, reconnect_tx).await;
224        });
225    }
226}
227
228pub async fn output_stream_reader(session: &str, tx: mpsc::Sender<AppEvent>) {
229    let stream = match cli::connect(session, false).await {
230        Ok(s) => s,
231        Err(_) => {
232            let _ = tx.send(AppEvent::OutputStreamClosed).await;
233            return;
234        }
235    };
236
237    let (reader, mut writer) = stream.into_split();
238
239    let req = Request::Logs {
240        target: None,
241        tail: 0,
242        follow: true,
243        stderr: false,
244        all: true,
245        timeout_secs: None,
246        lines: None,
247    };
248    let mut json = match serde_json::to_string(&req) {
249        Ok(j) => j,
250        Err(_) => return,
251    };
252    json.push('\n');
253    if writer.write_all(json.as_bytes()).await.is_err() {
254        return;
255    }
256    if writer.flush().await.is_err() {
257        return;
258    }
259
260    let mut lines = BufReader::new(reader);
261    loop {
262        let mut line = String::new();
263        match lines.read_line(&mut line).await {
264            Ok(0) | Err(_) => break,
265            Ok(_) => {
266                if let Ok(resp) = serde_json::from_str::<Response>(&line) {
267                    match resp {
268                        Response::LogLine {
269                            process,
270                            stream,
271                            line,
272                        } => {
273                            let _ = tx
274                                .send(AppEvent::OutputLine {
275                                    process,
276                                    stream,
277                                    line,
278                                })
279                                .await;
280                        }
281                        Response::LogEnd => break,
282                        _ => {}
283                    }
284                }
285            }
286        }
287    }
288
289    let _ = tx.send(AppEvent::OutputStreamClosed).await;
290}
291
292pub async fn status_poller(session: &str, tx: mpsc::Sender<AppEvent>) {
293    let mut ticker = interval(Duration::from_secs(2));
294    loop {
295        ticker.tick().await;
296        if let Ok(Response::Status { processes }) =
297            cli::request(session, &Request::Status, false).await
298        {
299            if tx.send(AppEvent::StatusUpdate(processes)).await.is_err() {
300                break;
301            }
302        }
303    }
304}
305
306pub async fn key_reader(tx: mpsc::Sender<AppEvent>) {
307    let mut reader = EventStream::new();
308    while let Some(Ok(event)) = reader.next().await {
309        let app_event = match event {
310            Event::Key(key) => AppEvent::Key(key),
311            Event::Mouse(mouse) => AppEvent::Mouse(mouse),
312            _ => continue,
313        };
314        if tx.send(app_event).await.is_err() {
315            break;
316        }
317    }
318}
319
320/// Load the tail of each process's log files from disk into the app's output buffers.
321/// Uses `tail_file` to read only the last 1000 lines instead of entire files.
322pub fn load_historical_logs(session: &str, app: &mut App) {
323    let log_dir = paths::log_dir(session);
324
325    let entries = match std::fs::read_dir(&log_dir) {
326        Ok(e) => e,
327        Err(_) => return,
328    };
329
330    for entry in entries.flatten() {
331        let name = entry.file_name().to_string_lossy().to_string();
332        let (proc_name, stream) = if let Some(p) = name.strip_suffix(".stdout") {
333            (p.to_string(), ProtoStream::Stdout)
334        } else if let Some(p) = name.strip_suffix(".stderr") {
335            (p.to_string(), ProtoStream::Stderr)
336        } else {
337            continue;
338        };
339
340        if let Ok(lines) = tail_file(&entry.path(), 1000) {
341            for line in lines {
342                app.push_output(&proc_name, stream, &line);
343            }
344        }
345    }
346}