Skip to main content

agent_procs/tui/
mod.rs

1//! Terminal UI for real-time process monitoring.
2//!
3//! The TUI connects to a running daemon session and displays a split-pane
4//! layout: a process list on the left and streaming output on the right.
5//! It polls status every 2 seconds and streams output via `Logs --follow`.
6//!
7//! See [`app`] for state management, [`input`] for keybindings, and
8//! [`ui`] for rendering.
9
10pub mod app;
11pub mod input;
12pub mod ui;
13
14use crate::cli;
15use crate::paths;
16use crate::protocol::{Request, Response, Stream as ProtoStream};
17use app::App;
18use crossterm::{
19    event::{Event, EventStream},
20    execute,
21    terminal::{EnterAlternateScreen, LeaveAlternateScreen, disable_raw_mode, enable_raw_mode},
22};
23use futures::StreamExt;
24use ratatui::prelude::*;
25use std::io;
26use std::io::{BufRead as _, BufReader as StdBufReader};
27use std::sync::Arc;
28use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
29use tokio::sync::mpsc;
30use tokio::time::{Duration, interval};
31
32enum AppEvent {
33    Key(crossterm::event::KeyEvent),
34    OutputLine {
35        process: String,
36        stream: ProtoStream,
37        line: String,
38    },
39    StatusUpdate(Vec<crate::protocol::ProcessInfo>),
40    OutputStreamClosed,
41}
42
43pub async fn run(session: &str) -> i32 {
44    // Verify daemon is running
45    if cli::connect(session, false).await.is_err() {
46        eprintln!(
47            "error: no daemon running for session '{}'. Start processes first.",
48            session
49        );
50        return 1;
51    }
52
53    // Set up terminal
54    if let Err(e) = enable_raw_mode() {
55        eprintln!("error: failed to enable raw mode: {}", e);
56        return 1;
57    }
58    let mut stdout = io::stdout();
59    if let Err(e) = execute!(stdout, EnterAlternateScreen) {
60        let _ = disable_raw_mode();
61        eprintln!("error: failed to enter alternate screen: {}", e);
62        return 1;
63    }
64
65    // Install panic hook to restore terminal
66    let original_hook = std::panic::take_hook();
67    std::panic::set_hook(Box::new(move |panic_info| {
68        let _ = disable_raw_mode();
69        let _ = execute!(io::stdout(), LeaveAlternateScreen);
70        original_hook(panic_info);
71    }));
72
73    let mut terminal = match Terminal::new(CrosstermBackend::new(stdout)) {
74        Ok(t) => t,
75        Err(e) => {
76            let _ = disable_raw_mode();
77            let _ = execute!(io::stdout(), LeaveAlternateScreen);
78            eprintln!("error: failed to initialize terminal: {}", e);
79            return 1;
80        }
81    };
82    let mut app = App::new();
83
84    // Load historical output from log files on disk
85    load_historical_logs(session, &mut app);
86
87    // Channel for events from background tasks
88    let (tx, mut rx) = mpsc::channel::<AppEvent>(256);
89
90    // Start output stream reader
91    let session_str = session.to_string();
92    let output_tx = tx.clone();
93    tokio::spawn(async move {
94        output_stream_reader(&session_str, output_tx).await;
95    });
96
97    // Start status poller
98    let session_str = session.to_string();
99    let status_tx = tx.clone();
100    tokio::spawn(async move {
101        status_poller(&session_str, status_tx).await;
102    });
103
104    // Start key event reader
105    let key_tx = tx.clone();
106    tokio::spawn(async move {
107        key_reader(key_tx).await;
108    });
109
110    // Handle SIGTERM gracefully (restore terminal before exit)
111    let sigterm_tx = tx.clone();
112    tokio::spawn(async move {
113        if let Ok(mut sig) =
114            tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate())
115        {
116            sig.recv().await;
117            let _ = sigterm_tx
118                .send(AppEvent::Key(crossterm::event::KeyEvent::new(
119                    crossterm::event::KeyCode::Char('q'),
120                    crossterm::event::KeyModifiers::empty(),
121                )))
122                .await;
123        }
124    });
125
126    // Track reconnection attempts for backoff
127    let reconnect_count = Arc::new(std::sync::atomic::AtomicU32::new(0));
128    const MAX_RECONNECT_ATTEMPTS: u32 = 10;
129
130    // Main render loop
131    while app.running {
132        if let Err(e) = terminal.draw(|f| ui::draw(f, &app)) {
133            eprintln!("error: terminal draw failed: {}", e);
134            break;
135        }
136
137        // Wait for next event
138        if let Some(event) = rx.recv().await {
139            match event {
140                AppEvent::Key(key) => {
141                    let action = input::handle_key(key);
142                    match action {
143                        input::Action::SelectNext => app.select_next(),
144                        input::Action::SelectPrev => app.select_prev(),
145                        input::Action::CycleStream => app.cycle_stream_mode(),
146                        input::Action::TogglePause => app.toggle_pause(),
147                        input::Action::Quit => app.quit(),
148                        input::Action::QuitAndStop => app.quit_and_stop(),
149                        input::Action::Stop => {
150                            if let Some(name) = app.selected_name() {
151                                let _ = cli::request(
152                                    session,
153                                    &Request::Stop {
154                                        target: name.to_string(),
155                                    },
156                                    false,
157                                )
158                                .await;
159                            }
160                        }
161                        input::Action::StopAll => {
162                            let _ = cli::request(session, &Request::StopAll, false).await;
163                        }
164                        input::Action::Restart => {
165                            if let Some(name) = app.selected_name() {
166                                let _ = cli::request(
167                                    session,
168                                    &Request::Restart {
169                                        target: name.to_string(),
170                                    },
171                                    false,
172                                )
173                                .await;
174                            }
175                        }
176                        input::Action::None => {}
177                    }
178                }
179                AppEvent::OutputLine {
180                    process,
181                    stream,
182                    line,
183                } => {
184                    app.push_output(&process, stream, &line);
185                }
186                AppEvent::StatusUpdate(processes) => {
187                    app.update_processes(processes);
188                }
189                AppEvent::OutputStreamClosed => {
190                    let count = reconnect_count.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
191                    if count < MAX_RECONNECT_ATTEMPTS {
192                        let session_str = session.to_string();
193                        let reconnect_tx = tx.clone();
194                        let rc = Arc::clone(&reconnect_count);
195                        tokio::spawn(async move {
196                            // Exponential backoff: 2s, 4s, 8s, ... capped at 30s
197                            let delay = Duration::from_secs(2u64.saturating_pow(count).min(30));
198                            tokio::time::sleep(delay).await;
199                            output_stream_reader(&session_str, reconnect_tx).await;
200                            // Reset counter on successful reconnection
201                            rc.store(0, std::sync::atomic::Ordering::Relaxed);
202                        });
203                    }
204                }
205            }
206        }
207    }
208
209    // Stop all if requested
210    if app.stop_all_on_quit {
211        let _ = cli::request(session, &Request::StopAll, false).await;
212    }
213
214    // Restore terminal
215    let _ = disable_raw_mode();
216    let _ = execute!(terminal.backend_mut(), LeaveAlternateScreen);
217    0
218}
219
220async fn output_stream_reader(session: &str, tx: mpsc::Sender<AppEvent>) {
221    let stream = match cli::connect(session, false).await {
222        Ok(s) => s,
223        Err(_) => {
224            let _ = tx.send(AppEvent::OutputStreamClosed).await;
225            return;
226        }
227    };
228
229    let (reader, mut writer) = stream.into_split();
230
231    // Send follow request with no timeout (infinite streaming)
232    let req = Request::Logs {
233        target: None,
234        tail: 0,
235        follow: true,
236        stderr: false,
237        all: true,
238        timeout_secs: None, // infinite — TUI manages its own lifetime
239        lines: None,
240    };
241    let mut json = match serde_json::to_string(&req) {
242        Ok(j) => j,
243        Err(_) => return,
244    };
245    json.push('\n');
246    if writer.write_all(json.as_bytes()).await.is_err() {
247        return;
248    }
249    if writer.flush().await.is_err() {
250        return;
251    }
252
253    let mut lines = BufReader::new(reader);
254    loop {
255        let mut line = String::new();
256        match lines.read_line(&mut line).await {
257            Ok(0) | Err(_) => break, // EOF or error
258            Ok(_) => {
259                if let Ok(resp) = serde_json::from_str::<Response>(&line) {
260                    match resp {
261                        Response::LogLine {
262                            process,
263                            stream,
264                            line,
265                        } => {
266                            let _ = tx
267                                .send(AppEvent::OutputLine {
268                                    process,
269                                    stream,
270                                    line,
271                                })
272                                .await;
273                        }
274                        Response::LogEnd => break,
275                        _ => {}
276                    }
277                }
278            }
279        }
280    }
281
282    let _ = tx.send(AppEvent::OutputStreamClosed).await;
283}
284
285async fn status_poller(session: &str, tx: mpsc::Sender<AppEvent>) {
286    let mut ticker = interval(Duration::from_secs(2));
287    loop {
288        ticker.tick().await;
289        if let Ok(Response::Status { processes }) =
290            cli::request(session, &Request::Status, false).await
291        {
292            if tx.send(AppEvent::StatusUpdate(processes)).await.is_err() {
293                break; // Receiver dropped, TUI is shutting down
294            }
295        }
296    }
297}
298
299async fn key_reader(tx: mpsc::Sender<AppEvent>) {
300    let mut reader = EventStream::new();
301    while let Some(Ok(event)) = reader.next().await {
302        if let Event::Key(key) = event {
303            if tx.send(AppEvent::Key(key)).await.is_err() {
304                break;
305            }
306        }
307    }
308}
309
310/// Load the tail of each process's log files from disk into the app's output buffers.
311/// This populates the TUI with historical output from before it was launched.
312fn load_historical_logs(session: &str, app: &mut App) {
313    let log_dir = paths::log_dir(session);
314
315    let entries = match std::fs::read_dir(&log_dir) {
316        Ok(e) => e,
317        Err(_) => return,
318    };
319
320    for entry in entries.flatten() {
321        let name = entry.file_name().to_string_lossy().to_string();
322        let (proc_name, stream) = if let Some(p) = name.strip_suffix(".stdout") {
323            (p.to_string(), ProtoStream::Stdout)
324        } else if let Some(p) = name.strip_suffix(".stderr") {
325            (p.to_string(), ProtoStream::Stderr)
326        } else {
327            continue;
328        };
329
330        if let Ok(file) = std::fs::File::open(entry.path()) {
331            for line in StdBufReader::new(file).lines().map_while(Result::ok) {
332                app.push_output(&proc_name, stream, &line);
333            }
334        }
335    }
336}