1use crate::cli;
2use crate::paths;
3use crate::protocol::{Request, Response, Stream as ProtoStream};
4use super::app::App;
5use super::input;
6use crossterm::event::{Event, EventStream, MouseButton, MouseEventKind};
7use futures::StreamExt;
8use crate::cli::logs::tail_file;
9use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
10use tokio::sync::mpsc;
11use tokio::time::{Duration, interval};
12
13pub enum AppEvent {
14 Key(crossterm::event::KeyEvent),
15 Mouse(crossterm::event::MouseEvent),
16 OutputLine {
17 process: String,
18 stream: ProtoStream,
19 line: String,
20 },
21 StatusUpdate(Vec<crate::protocol::ProcessInfo>),
22 OutputStreamClosed,
23}
24
25enum ReconnectState {
26 Connected,
27 Reconnecting { attempt: u32 },
28 Failed,
29}
30
31const MAX_RECONNECT_ATTEMPTS: u32 = 10;
32
33pub struct TuiEventLoop {
34 rx: mpsc::Receiver<AppEvent>,
35 tx: mpsc::Sender<AppEvent>,
36 session: String,
37 reconnect_state: ReconnectState,
38}
39
40impl TuiEventLoop {
41 pub fn new(session: &str) -> Self {
42 let (tx, rx) = mpsc::channel::<AppEvent>(256);
43 Self {
44 rx,
45 tx,
46 session: session.to_string(),
47 reconnect_state: ReconnectState::Connected,
48 }
49 }
50
51 pub fn spawn_background_tasks(&self) {
53 let session_str = self.session.clone();
54 let output_tx = self.tx.clone();
55 tokio::spawn(async move {
56 output_stream_reader(&session_str, output_tx).await;
57 });
58
59 let session_str = self.session.clone();
60 let status_tx = self.tx.clone();
61 tokio::spawn(async move {
62 status_poller(&session_str, status_tx).await;
63 });
64
65 let key_tx = self.tx.clone();
66 tokio::spawn(async move {
67 key_reader(key_tx).await;
68 });
69
70 let sigterm_tx = self.tx.clone();
71 tokio::spawn(async move {
72 if let Ok(mut sig) =
73 tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate())
74 {
75 sig.recv().await;
76 let _ = sigterm_tx
77 .send(AppEvent::Key(crossterm::event::KeyEvent::new(
78 crossterm::event::KeyCode::Char('q'),
79 crossterm::event::KeyModifiers::empty(),
80 )))
81 .await;
82 }
83 });
84 }
85
86 pub async fn run(
88 &mut self,
89 terminal: &mut ratatui::Terminal<ratatui::prelude::CrosstermBackend<std::io::Stdout>>,
90 app: &mut App,
91 ) {
92 while app.running {
93 if let Err(e) = terminal.draw(|f| super::ui::draw(f, app)) {
94 eprintln!("error: terminal draw failed: {}", e);
95 break;
96 }
97
98 if let Some(event) = self.rx.recv().await {
99 match event {
100 AppEvent::Key(key) => {
101 self.handle_key(app, key).await;
102 }
103 AppEvent::Mouse(mouse) => {
104 Self::handle_mouse(app, mouse);
105 }
106 AppEvent::OutputLine {
107 process,
108 stream,
109 line,
110 } => {
111 self.reconnect_state = ReconnectState::Connected;
112 app.push_output(&process, stream, &line);
113 }
114 AppEvent::StatusUpdate(processes) => {
115 app.update_processes(processes);
116 }
117 AppEvent::OutputStreamClosed => {
118 self.handle_reconnect();
119 }
120 }
121 }
122 }
123
124 if app.stop_all_on_quit {
125 let _ = cli::request(&self.session, &Request::Shutdown, false).await;
126 }
127 }
128
129 async fn handle_key(&self, app: &mut App, key: crossterm::event::KeyEvent) {
130 if app.input_mode == super::app::InputMode::FilterInput {
131 match input::handle_filter_key(key) {
132 input::FilterAction::Char(c) => app.filter_buf.push(c),
133 input::FilterAction::Backspace => {
134 app.filter_buf.pop();
135 }
136 input::FilterAction::Confirm => app.confirm_filter(),
137 input::FilterAction::Cancel => app.cancel_filter(),
138 }
139 } else {
140 let action = input::handle_key(key);
141 match action {
142 input::Action::SelectNext => app.select_next(),
143 input::Action::SelectPrev => app.select_prev(),
144 input::Action::CycleStream => app.cycle_stream_mode(),
145 input::Action::TogglePause => app.toggle_pause(),
146 input::Action::ScrollUp => app.scroll_up(),
147 input::Action::ScrollDown => app.scroll_down(),
148 input::Action::ScrollToTop => app.scroll_to_top(),
149 input::Action::ScrollToBottom => app.scroll_to_bottom(),
150 input::Action::StartFilter => app.start_filter(),
151 input::Action::ClearFilter => app.clear_filter(),
152 input::Action::Quit => app.quit(),
153 input::Action::QuitAndStop => app.quit_and_stop(),
154 input::Action::Stop => {
155 if let Some(name) = app.selected_name() {
156 let _ = cli::request(
157 &self.session,
158 &Request::Stop {
159 target: name.to_string(),
160 },
161 false,
162 )
163 .await;
164 }
165 }
166 input::Action::StopAll => {
167 let _ = cli::request(&self.session, &Request::StopAll, false).await;
168 }
169 input::Action::Restart => {
170 if let Some(name) = app.selected_name() {
171 let _ = cli::request(
172 &self.session,
173 &Request::Restart {
174 target: name.to_string(),
175 },
176 false,
177 )
178 .await;
179 }
180 }
181 input::Action::None => {}
182 }
183 }
184 }
185
186 fn handle_mouse(app: &mut App, mouse: crossterm::event::MouseEvent) {
187 match mouse.kind {
188 MouseEventKind::ScrollUp => app.scroll_up(),
189 MouseEventKind::ScrollDown => app.scroll_down(),
190 MouseEventKind::Down(MouseButton::Left) => {
191 if mouse.column < 22 {
192 let row = mouse.row.saturating_sub(1) as usize;
193 if row < app.processes.len() {
194 app.selected = row;
195 }
196 }
197 }
198 _ => {}
199 }
200 }
201
202 fn handle_reconnect(&mut self) {
203 let attempt = match self.reconnect_state {
204 ReconnectState::Connected => 0,
205 ReconnectState::Reconnecting { attempt } => attempt,
206 ReconnectState::Failed => return,
207 };
208
209 if attempt >= MAX_RECONNECT_ATTEMPTS {
210 self.reconnect_state = ReconnectState::Failed;
211 return;
212 }
213
214 self.reconnect_state = ReconnectState::Reconnecting {
215 attempt: attempt + 1,
216 };
217
218 let session_str = self.session.clone();
219 let reconnect_tx = self.tx.clone();
220 tokio::spawn(async move {
221 let delay = Duration::from_secs(2u64.saturating_pow(attempt).min(30));
222 tokio::time::sleep(delay).await;
223 output_stream_reader(&session_str, reconnect_tx).await;
224 });
225 }
226}
227
228pub async fn output_stream_reader(session: &str, tx: mpsc::Sender<AppEvent>) {
229 let stream = match cli::connect(session, false).await {
230 Ok(s) => s,
231 Err(_) => {
232 let _ = tx.send(AppEvent::OutputStreamClosed).await;
233 return;
234 }
235 };
236
237 let (reader, mut writer) = stream.into_split();
238
239 let req = Request::Logs {
240 target: None,
241 tail: 0,
242 follow: true,
243 stderr: false,
244 all: true,
245 timeout_secs: None,
246 lines: None,
247 };
248 let mut json = match serde_json::to_string(&req) {
249 Ok(j) => j,
250 Err(_) => return,
251 };
252 json.push('\n');
253 if writer.write_all(json.as_bytes()).await.is_err() {
254 return;
255 }
256 if writer.flush().await.is_err() {
257 return;
258 }
259
260 let mut lines = BufReader::new(reader);
261 loop {
262 let mut line = String::new();
263 match lines.read_line(&mut line).await {
264 Ok(0) | Err(_) => break,
265 Ok(_) => {
266 if let Ok(resp) = serde_json::from_str::<Response>(&line) {
267 match resp {
268 Response::LogLine {
269 process,
270 stream,
271 line,
272 } => {
273 let _ = tx
274 .send(AppEvent::OutputLine {
275 process,
276 stream,
277 line,
278 })
279 .await;
280 }
281 Response::LogEnd => break,
282 _ => {}
283 }
284 }
285 }
286 }
287 }
288
289 let _ = tx.send(AppEvent::OutputStreamClosed).await;
290}
291
292pub async fn status_poller(session: &str, tx: mpsc::Sender<AppEvent>) {
293 let mut ticker = interval(Duration::from_secs(2));
294 loop {
295 ticker.tick().await;
296 if let Ok(Response::Status { processes }) =
297 cli::request(session, &Request::Status, false).await
298 {
299 if tx.send(AppEvent::StatusUpdate(processes)).await.is_err() {
300 break;
301 }
302 }
303 }
304}
305
306pub async fn key_reader(tx: mpsc::Sender<AppEvent>) {
307 let mut reader = EventStream::new();
308 while let Some(Ok(event)) = reader.next().await {
309 let app_event = match event {
310 Event::Key(key) => AppEvent::Key(key),
311 Event::Mouse(mouse) => AppEvent::Mouse(mouse),
312 _ => continue,
313 };
314 if tx.send(app_event).await.is_err() {
315 break;
316 }
317 }
318}
319
320pub fn load_historical_logs(session: &str, app: &mut App) {
323 let log_dir = paths::log_dir(session);
324
325 let entries = match std::fs::read_dir(&log_dir) {
326 Ok(e) => e,
327 Err(_) => return,
328 };
329
330 for entry in entries.flatten() {
331 let name = entry.file_name().to_string_lossy().to_string();
332 let (proc_name, stream) = if let Some(p) = name.strip_suffix(".stdout") {
333 (p.to_string(), ProtoStream::Stdout)
334 } else if let Some(p) = name.strip_suffix(".stderr") {
335 (p.to_string(), ProtoStream::Stderr)
336 } else {
337 continue;
338 };
339
340 if let Ok(lines) = tail_file(&entry.path(), 1000) {
341 for line in lines {
342 app.push_output(&proc_name, stream, &line);
343 }
344 }
345 }
346}