1use super::app::App;
2use super::input;
3use crate::cli;
4use crate::cli::logs::tail_file;
5use crate::paths;
6use crate::protocol::{Request, Response, Stream as ProtoStream};
7use crossterm::event::{Event, EventStream, MouseButton, MouseEventKind};
8use futures::StreamExt;
9use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
10use tokio::sync::mpsc;
11use tokio::time::{Duration, interval};
12
13pub enum AppEvent {
14 Key(crossterm::event::KeyEvent),
15 Mouse(crossterm::event::MouseEvent),
16 OutputLine {
17 process: String,
18 stream: ProtoStream,
19 line: String,
20 },
21 StatusUpdate(Vec<crate::protocol::ProcessInfo>),
22 OutputStreamClosed,
23}
24
25enum ReconnectState {
26 Connected,
27 Reconnecting { attempt: u32 },
28 Failed,
29}
30
31const MAX_RECONNECT_ATTEMPTS: u32 = 10;
32
33pub struct TuiEventLoop {
34 rx: mpsc::Receiver<AppEvent>,
35 tx: mpsc::Sender<AppEvent>,
36 session: String,
37 reconnect_state: ReconnectState,
38}
39
40impl TuiEventLoop {
41 pub fn new(session: &str) -> Self {
42 let (tx, rx) = mpsc::channel::<AppEvent>(256);
43 Self {
44 rx,
45 tx,
46 session: session.to_string(),
47 reconnect_state: ReconnectState::Connected,
48 }
49 }
50
51 pub fn spawn_background_tasks(&self) {
53 let session_str = self.session.clone();
54 let output_tx = self.tx.clone();
55 tokio::spawn(async move {
56 output_stream_reader(&session_str, output_tx).await;
57 });
58
59 let session_str = self.session.clone();
60 let status_tx = self.tx.clone();
61 tokio::spawn(async move {
62 status_poller(&session_str, status_tx).await;
63 });
64
65 let key_tx = self.tx.clone();
66 tokio::spawn(async move {
67 key_reader(key_tx).await;
68 });
69
70 let sigterm_tx = self.tx.clone();
71 tokio::spawn(async move {
72 if let Ok(mut sig) =
73 tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate())
74 {
75 sig.recv().await;
76 let _ = sigterm_tx
77 .send(AppEvent::Key(crossterm::event::KeyEvent::new(
78 crossterm::event::KeyCode::Char('q'),
79 crossterm::event::KeyModifiers::empty(),
80 )))
81 .await;
82 }
83 });
84 }
85
86 pub async fn run(
88 &mut self,
89 terminal: &mut ratatui::Terminal<ratatui::prelude::CrosstermBackend<std::io::Stdout>>,
90 app: &mut App,
91 ) {
92 while app.running {
93 if let Err(e) = terminal.draw(|f| super::ui::draw(f, app)) {
94 eprintln!("error: terminal draw failed: {}", e);
95 break;
96 }
97
98 if let Some(event) = self.rx.recv().await {
99 match event {
100 AppEvent::Key(key) => {
101 self.handle_key(app, key).await;
102 }
103 AppEvent::Mouse(mouse) => {
104 Self::handle_mouse(app, mouse);
105 }
106 AppEvent::OutputLine {
107 process,
108 stream,
109 line,
110 } => {
111 self.reconnect_state = ReconnectState::Connected;
112 app.push_output(&process, stream, &line);
113 }
114 AppEvent::StatusUpdate(processes) => {
115 app.update_processes(processes);
116 }
117 AppEvent::OutputStreamClosed => {
118 self.handle_reconnect();
119 }
120 }
121 }
122 }
123
124 if app.stop_all_on_quit {
125 let _ = cli::request(&self.session, &Request::Shutdown, false).await;
126 }
127 }
128
129 async fn handle_key(&self, app: &mut App, key: crossterm::event::KeyEvent) {
130 if app.input_mode == super::app::InputMode::FilterInput {
131 match input::handle_filter_key(key) {
132 input::FilterAction::Char(c) => app.filter_buf.push(c),
133 input::FilterAction::Backspace => {
134 app.filter_buf.pop();
135 }
136 input::FilterAction::Confirm => app.confirm_filter(),
137 input::FilterAction::Cancel => app.cancel_filter(),
138 }
139 } else {
140 let action = input::handle_key(key);
141 match action {
142 input::Action::SelectNext => app.select_next(),
143 input::Action::SelectPrev => app.select_prev(),
144 input::Action::CycleStream => app.cycle_stream_mode(),
145 input::Action::TogglePause => app.toggle_pause(),
146 input::Action::ScrollUp => app.scroll_up(),
147 input::Action::ScrollDown => app.scroll_down(),
148 input::Action::ScrollToTop => app.scroll_to_top(),
149 input::Action::ScrollToBottom => app.scroll_to_bottom(),
150 input::Action::StartFilter => app.start_filter(),
151 input::Action::ClearFilter => app.clear_filter(),
152 input::Action::Quit => app.quit(),
153 input::Action::QuitAndStop => app.quit_and_stop(),
154 input::Action::Stop => {
155 if let Some(name) = app.selected_name() {
156 let _ = cli::request(
157 &self.session,
158 &Request::Stop {
159 target: name.to_string(),
160 },
161 false,
162 )
163 .await;
164 }
165 }
166 input::Action::StopAll => {
167 let _ = cli::request(&self.session, &Request::StopAll, false).await;
168 }
169 input::Action::Restart => {
170 if let Some(name) = app.selected_name() {
171 let _ = cli::request(
172 &self.session,
173 &Request::Restart {
174 target: name.to_string(),
175 },
176 false,
177 )
178 .await;
179 }
180 }
181 input::Action::None => {}
182 }
183 }
184 }
185
186 fn handle_mouse(app: &mut App, mouse: crossterm::event::MouseEvent) {
187 const MOUSE_SCROLL_LINES: usize = 3;
188 match mouse.kind {
189 MouseEventKind::ScrollUp => app.scroll_up_by(MOUSE_SCROLL_LINES),
190 MouseEventKind::ScrollDown => app.scroll_down_by(MOUSE_SCROLL_LINES),
191 MouseEventKind::Down(MouseButton::Left) => {
192 if mouse.column < 22 {
193 let row = mouse.row.saturating_sub(1) as usize;
194 if row < app.processes.len() {
195 app.selected = row;
196 }
197 }
198 }
199 _ => {}
200 }
201 }
202
203 fn handle_reconnect(&mut self) {
204 let attempt = match self.reconnect_state {
205 ReconnectState::Connected => 0,
206 ReconnectState::Reconnecting { attempt } => attempt,
207 ReconnectState::Failed => return,
208 };
209
210 if attempt >= MAX_RECONNECT_ATTEMPTS {
211 self.reconnect_state = ReconnectState::Failed;
212 return;
213 }
214
215 self.reconnect_state = ReconnectState::Reconnecting {
216 attempt: attempt + 1,
217 };
218
219 let session_str = self.session.clone();
220 let reconnect_tx = self.tx.clone();
221 tokio::spawn(async move {
222 let delay = Duration::from_secs(2u64.saturating_pow(attempt).min(30));
223 tokio::time::sleep(delay).await;
224 output_stream_reader(&session_str, reconnect_tx).await;
225 });
226 }
227}
228
229pub async fn output_stream_reader(session: &str, tx: mpsc::Sender<AppEvent>) {
230 let stream = match cli::connect(session, false).await {
231 Ok(s) => s,
232 Err(_) => {
233 let _ = tx.send(AppEvent::OutputStreamClosed).await;
234 return;
235 }
236 };
237
238 let (reader, mut writer) = stream.into_split();
239
240 let req = Request::Logs {
241 target: None,
242 tail: 0,
243 follow: true,
244 stderr: false,
245 all: true,
246 timeout_secs: None,
247 lines: None,
248 };
249 let mut json = match serde_json::to_string(&req) {
250 Ok(j) => j,
251 Err(_) => return,
252 };
253 json.push('\n');
254 if writer.write_all(json.as_bytes()).await.is_err() {
255 return;
256 }
257 if writer.flush().await.is_err() {
258 return;
259 }
260
261 let mut lines = BufReader::new(reader);
262 loop {
263 let mut line = String::new();
264 match lines.read_line(&mut line).await {
265 Ok(0) | Err(_) => break,
266 Ok(_) => {
267 if let Ok(resp) = serde_json::from_str::<Response>(&line) {
268 match resp {
269 Response::LogLine {
270 process,
271 stream,
272 line,
273 } => {
274 let _ = tx
275 .send(AppEvent::OutputLine {
276 process,
277 stream,
278 line,
279 })
280 .await;
281 }
282 Response::LogEnd => break,
283 _ => {}
284 }
285 }
286 }
287 }
288 }
289
290 let _ = tx.send(AppEvent::OutputStreamClosed).await;
291}
292
293pub async fn status_poller(session: &str, tx: mpsc::Sender<AppEvent>) {
294 let mut ticker = interval(Duration::from_secs(2));
295 loop {
296 ticker.tick().await;
297 if let Ok(Response::Status { processes }) =
298 cli::request(session, &Request::Status, false).await
299 {
300 if tx.send(AppEvent::StatusUpdate(processes)).await.is_err() {
301 break;
302 }
303 }
304 }
305}
306
307pub async fn key_reader(tx: mpsc::Sender<AppEvent>) {
308 let mut reader = EventStream::new();
309 while let Some(Ok(event)) = reader.next().await {
310 let app_event = match event {
311 Event::Key(key) => AppEvent::Key(key),
312 Event::Mouse(mouse) => AppEvent::Mouse(mouse),
313 _ => continue,
314 };
315 if tx.send(app_event).await.is_err() {
316 break;
317 }
318 }
319}
320
321pub fn load_historical_logs(session: &str, app: &mut App) {
324 let log_dir = paths::log_dir(session);
325
326 let entries = match std::fs::read_dir(&log_dir) {
327 Ok(e) => e,
328 Err(_) => return,
329 };
330
331 for entry in entries.flatten() {
332 let name = entry.file_name().to_string_lossy().to_string();
333 let (proc_name, stream) = if let Some(p) = name.strip_suffix(".stdout") {
334 (p.to_string(), ProtoStream::Stdout)
335 } else if let Some(p) = name.strip_suffix(".stderr") {
336 (p.to_string(), ProtoStream::Stderr)
337 } else {
338 continue;
339 };
340
341 if let Ok(lines) = tail_file(&entry.path(), 1000) {
342 for line in lines {
343 app.push_output(&proc_name, stream, &line);
344 }
345 }
346 }
347}