1use super::app::App;
2use super::disk_log_reader::DiskLogReader;
3use super::input;
4use crate::cli;
5use crate::cli::logs::tail_file;
6use crate::paths;
7use crate::protocol::{Request, Response, Stream as ProtoStream};
8use crossterm::event::{Event, EventStream, MouseButton, MouseEventKind};
9use futures::StreamExt;
10use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
11use tokio::sync::mpsc;
12use tokio::time::{Duration, interval};
13
14pub enum AppEvent {
15 Key(crossterm::event::KeyEvent),
16 Mouse(crossterm::event::MouseEvent),
17 OutputLine {
18 process: String,
19 stream: ProtoStream,
20 line: String,
21 },
22 StatusUpdate(Vec<crate::protocol::ProcessInfo>),
23 OutputStreamClosed,
24}
25
26enum ReconnectState {
27 Connected,
28 Reconnecting { attempt: u32 },
29 Failed,
30}
31
32const MAX_RECONNECT_ATTEMPTS: u32 = 10;
33
34pub struct TuiEventLoop {
35 rx: mpsc::Receiver<AppEvent>,
36 tx: mpsc::Sender<AppEvent>,
37 session: String,
38 reconnect_state: ReconnectState,
39}
40
41impl TuiEventLoop {
42 pub fn new(session: &str) -> Self {
43 let (tx, rx) = mpsc::channel::<AppEvent>(256);
44 Self {
45 rx,
46 tx,
47 session: session.to_string(),
48 reconnect_state: ReconnectState::Connected,
49 }
50 }
51
52 pub fn spawn_background_tasks(&self) {
54 let session_str = self.session.clone();
55 let output_tx = self.tx.clone();
56 tokio::spawn(async move {
57 output_stream_reader(&session_str, output_tx).await;
58 });
59
60 let session_str = self.session.clone();
61 let status_tx = self.tx.clone();
62 tokio::spawn(async move {
63 status_poller(&session_str, status_tx).await;
64 });
65
66 let key_tx = self.tx.clone();
67 tokio::spawn(async move {
68 key_reader(key_tx).await;
69 });
70
71 let sigterm_tx = self.tx.clone();
72 tokio::spawn(async move {
73 if let Ok(mut sig) =
74 tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate())
75 {
76 sig.recv().await;
77 let _ = sigterm_tx
78 .send(AppEvent::Key(crossterm::event::KeyEvent::new(
79 crossterm::event::KeyCode::Char('q'),
80 crossterm::event::KeyModifiers::empty(),
81 )))
82 .await;
83 }
84 });
85 }
86
87 pub async fn run(
89 &mut self,
90 terminal: &mut ratatui::Terminal<ratatui::prelude::CrosstermBackend<std::io::Stdout>>,
91 app: &mut App,
92 ) {
93 while app.running {
94 if let Err(e) = terminal.draw(|f| super::ui::draw(f, app)) {
95 eprintln!("error: terminal draw failed: {}", e);
96 break;
97 }
98
99 if let Some(event) = self.rx.recv().await {
100 match event {
101 AppEvent::Key(key) => {
102 self.handle_key(app, key).await;
103 }
104 AppEvent::Mouse(mouse) => {
105 Self::handle_mouse(app, mouse);
106 }
107 AppEvent::OutputLine {
108 process,
109 stream,
110 line,
111 } => {
112 self.reconnect_state = ReconnectState::Connected;
113 app.push_output(&process, stream, &line);
114 }
115 AppEvent::StatusUpdate(processes) => {
116 let log_dir = paths::log_dir(&self.session);
118 for p in &processes {
119 if !app.disk_readers.contains_key(&p.name) {
120 app.disk_readers.insert(
121 p.name.clone(),
122 DiskLogReader::new(log_dir.clone(), p.name.clone()),
123 );
124 }
125 }
126 app.update_processes(processes);
127 }
128 AppEvent::OutputStreamClosed => {
129 self.handle_reconnect();
130 }
131 }
132 }
133 }
134
135 if app.stop_all_on_quit {
136 let _ = cli::request(&self.session, &Request::Shutdown, false).await;
137 }
138 }
139
140 async fn handle_key(&self, app: &mut App, key: crossterm::event::KeyEvent) {
141 if app.input_mode == super::app::InputMode::FilterInput {
142 match input::handle_filter_key(key) {
143 input::FilterAction::Char(c) => app.filter_buf.push(c),
144 input::FilterAction::Backspace => {
145 app.filter_buf.pop();
146 }
147 input::FilterAction::Confirm => app.confirm_filter(),
148 input::FilterAction::Cancel => app.cancel_filter(),
149 }
150 } else {
151 let action = input::handle_key(key);
152 match action {
153 input::Action::SelectNext => app.select_next(),
154 input::Action::SelectPrev => app.select_prev(),
155 input::Action::CycleStream => app.cycle_stream_mode(),
156 input::Action::TogglePause => app.toggle_pause(),
157 input::Action::ScrollUp => app.scroll_up(),
158 input::Action::ScrollDown => app.scroll_down(),
159 input::Action::ScrollToTop => app.scroll_to_top(),
160 input::Action::ScrollToBottom => app.scroll_to_bottom(),
161 input::Action::StartFilter => app.start_filter(),
162 input::Action::ClearFilter => app.clear_filter(),
163 input::Action::Quit => app.quit(),
164 input::Action::QuitAndStop => app.quit_and_stop(),
165 input::Action::Stop => {
166 if let Some(name) = app.selected_name() {
167 let _ = cli::request(
168 &self.session,
169 &Request::Stop {
170 target: name.to_string(),
171 },
172 false,
173 )
174 .await;
175 }
176 }
177 input::Action::StopAll => {
178 let _ = cli::request(&self.session, &Request::StopAll, false).await;
179 }
180 input::Action::Restart => {
181 if let Some(name) = app.selected_name() {
182 let _ = cli::request(
183 &self.session,
184 &Request::Restart {
185 target: name.to_string(),
186 },
187 false,
188 )
189 .await;
190 }
191 }
192 input::Action::None => {}
193 }
194 }
195 }
196
197 fn handle_mouse(app: &mut App, mouse: crossterm::event::MouseEvent) {
198 const MOUSE_SCROLL_LINES: usize = 3;
199 match mouse.kind {
200 MouseEventKind::ScrollUp => app.scroll_up_by(MOUSE_SCROLL_LINES),
201 MouseEventKind::ScrollDown => app.scroll_down_by(MOUSE_SCROLL_LINES),
202 MouseEventKind::Down(MouseButton::Left) => {
203 if mouse.column < 22 {
204 let row = mouse.row.saturating_sub(1) as usize;
205 if row < app.processes.len() {
206 app.selected = row;
207 }
208 }
209 }
210 _ => {}
211 }
212 }
213
214 fn handle_reconnect(&mut self) {
215 let attempt = match self.reconnect_state {
216 ReconnectState::Connected => 0,
217 ReconnectState::Reconnecting { attempt } => attempt,
218 ReconnectState::Failed => return,
219 };
220
221 if attempt >= MAX_RECONNECT_ATTEMPTS {
222 self.reconnect_state = ReconnectState::Failed;
223 return;
224 }
225
226 self.reconnect_state = ReconnectState::Reconnecting {
227 attempt: attempt + 1,
228 };
229
230 let session_str = self.session.clone();
231 let reconnect_tx = self.tx.clone();
232 tokio::spawn(async move {
233 let delay = Duration::from_secs(2u64.saturating_pow(attempt).min(30));
234 tokio::time::sleep(delay).await;
235 output_stream_reader(&session_str, reconnect_tx).await;
236 });
237 }
238}
239
240pub async fn output_stream_reader(session: &str, tx: mpsc::Sender<AppEvent>) {
241 let stream = match cli::connect(session, false).await {
242 Ok(s) => s,
243 Err(_) => {
244 let _ = tx.send(AppEvent::OutputStreamClosed).await;
245 return;
246 }
247 };
248
249 let (reader, mut writer) = stream.into_split();
250
251 let req = Request::Logs {
252 target: None,
253 tail: 0,
254 follow: true,
255 stderr: false,
256 all: true,
257 timeout_secs: None,
258 lines: None,
259 };
260 let mut json = match serde_json::to_string(&req) {
261 Ok(j) => j,
262 Err(_) => return,
263 };
264 json.push('\n');
265 if writer.write_all(json.as_bytes()).await.is_err() {
266 return;
267 }
268 if writer.flush().await.is_err() {
269 return;
270 }
271
272 let mut lines = BufReader::new(reader);
273 loop {
274 let mut line = String::new();
275 match lines.read_line(&mut line).await {
276 Ok(0) | Err(_) => break,
277 Ok(_) => {
278 if let Ok(resp) = serde_json::from_str::<Response>(&line) {
279 match resp {
280 Response::LogLine {
281 process,
282 stream,
283 line,
284 } => {
285 let _ = tx
286 .send(AppEvent::OutputLine {
287 process,
288 stream,
289 line,
290 })
291 .await;
292 }
293 Response::LogEnd => break,
294 _ => {}
295 }
296 }
297 }
298 }
299 }
300
301 let _ = tx.send(AppEvent::OutputStreamClosed).await;
302}
303
304pub async fn status_poller(session: &str, tx: mpsc::Sender<AppEvent>) {
305 let mut ticker = interval(Duration::from_secs(2));
306 loop {
307 ticker.tick().await;
308 if let Ok(Response::Status { processes }) =
309 cli::request(session, &Request::Status, false).await
310 && tx.send(AppEvent::StatusUpdate(processes)).await.is_err()
311 {
312 break;
313 }
314 }
315}
316
317pub async fn key_reader(tx: mpsc::Sender<AppEvent>) {
318 let mut reader = EventStream::new();
319 while let Some(Ok(event)) = reader.next().await {
320 let app_event = match event {
321 Event::Key(key) => AppEvent::Key(key),
322 Event::Mouse(mouse) => AppEvent::Mouse(mouse),
323 _ => continue,
324 };
325 if tx.send(app_event).await.is_err() {
326 break;
327 }
328 }
329}
330
331pub fn init_disk_readers(session: &str, app: &mut App) {
337 let log_dir = paths::log_dir(session);
338
339 let entries = match std::fs::read_dir(&log_dir) {
340 Ok(e) => e,
341 Err(_) => return,
342 };
343
344 let mut seen_processes: std::collections::HashSet<String> = std::collections::HashSet::new();
346 let mut file_entries: Vec<(String, ProtoStream, std::path::PathBuf)> = Vec::new();
347
348 for entry in entries.flatten() {
349 let name = entry.file_name().to_string_lossy().to_string();
350 let (proc_name, stream) = if let Some(p) = name.strip_suffix(".stdout") {
353 (p.to_string(), ProtoStream::Stdout)
354 } else if let Some(p) = name.strip_suffix(".stderr") {
355 (p.to_string(), ProtoStream::Stderr)
356 } else {
357 continue;
358 };
359
360 seen_processes.insert(proc_name.clone());
361 file_entries.push((proc_name, stream, entry.path()));
362 }
363
364 for proc_name in &seen_processes {
366 app.disk_readers.insert(
367 proc_name.clone(),
368 DiskLogReader::new(log_dir.clone(), proc_name.clone()),
369 );
370 }
371
372 for (proc_name, stream, path) in &file_entries {
374 if let Ok(lines) = tail_file(path, 1000) {
375 for line in lines {
376 app.push_output(proc_name, *stream, &line);
377 }
378 }
379 }
380}