1pub mod app;
11pub mod input;
12pub mod ui;
13
14use crate::cli;
15use crate::paths;
16use crate::protocol::{Request, Response, Stream as ProtoStream};
17use app::App;
18use crossterm::{
19 event::{Event, EventStream},
20 execute,
21 terminal::{EnterAlternateScreen, LeaveAlternateScreen, disable_raw_mode, enable_raw_mode},
22};
23use futures::StreamExt;
24use ratatui::prelude::*;
25use std::io;
26use std::io::{BufRead as _, BufReader as StdBufReader};
27use std::sync::Arc;
28use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
29use tokio::sync::mpsc;
30use tokio::time::{Duration, interval};
31
32enum AppEvent {
33 Key(crossterm::event::KeyEvent),
34 OutputLine {
35 process: String,
36 stream: ProtoStream,
37 line: String,
38 },
39 StatusUpdate(Vec<crate::protocol::ProcessInfo>),
40 OutputStreamClosed,
41}
42
43pub async fn run(session: &str) -> i32 {
44 if cli::connect(session, false).await.is_err() {
46 eprintln!(
47 "error: no daemon running for session '{}'. Start processes first.",
48 session
49 );
50 return 1;
51 }
52
53 if let Err(e) = enable_raw_mode() {
55 eprintln!("error: failed to enable raw mode: {}", e);
56 return 1;
57 }
58 let mut stdout = io::stdout();
59 if let Err(e) = execute!(stdout, EnterAlternateScreen) {
60 let _ = disable_raw_mode();
61 eprintln!("error: failed to enter alternate screen: {}", e);
62 return 1;
63 }
64
65 let original_hook = std::panic::take_hook();
67 std::panic::set_hook(Box::new(move |panic_info| {
68 let _ = disable_raw_mode();
69 let _ = execute!(io::stdout(), LeaveAlternateScreen);
70 original_hook(panic_info);
71 }));
72
73 let mut terminal = match Terminal::new(CrosstermBackend::new(stdout)) {
74 Ok(t) => t,
75 Err(e) => {
76 let _ = disable_raw_mode();
77 let _ = execute!(io::stdout(), LeaveAlternateScreen);
78 eprintln!("error: failed to initialize terminal: {}", e);
79 return 1;
80 }
81 };
82 let mut app = App::new();
83
84 load_historical_logs(session, &mut app);
86
87 let (tx, mut rx) = mpsc::channel::<AppEvent>(256);
89
90 let session_str = session.to_string();
92 let output_tx = tx.clone();
93 tokio::spawn(async move {
94 output_stream_reader(&session_str, output_tx).await;
95 });
96
97 let session_str = session.to_string();
99 let status_tx = tx.clone();
100 tokio::spawn(async move {
101 status_poller(&session_str, status_tx).await;
102 });
103
104 let key_tx = tx.clone();
106 tokio::spawn(async move {
107 key_reader(key_tx).await;
108 });
109
110 let sigterm_tx = tx.clone();
112 tokio::spawn(async move {
113 if let Ok(mut sig) =
114 tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate())
115 {
116 sig.recv().await;
117 let _ = sigterm_tx
118 .send(AppEvent::Key(crossterm::event::KeyEvent::new(
119 crossterm::event::KeyCode::Char('q'),
120 crossterm::event::KeyModifiers::empty(),
121 )))
122 .await;
123 }
124 });
125
126 let reconnect_count = Arc::new(std::sync::atomic::AtomicU32::new(0));
128 const MAX_RECONNECT_ATTEMPTS: u32 = 10;
129
130 while app.running {
132 if let Err(e) = terminal.draw(|f| ui::draw(f, &app)) {
133 eprintln!("error: terminal draw failed: {}", e);
134 break;
135 }
136
137 if let Some(event) = rx.recv().await {
139 match event {
140 AppEvent::Key(key) => {
141 let action = input::handle_key(key);
142 match action {
143 input::Action::SelectNext => app.select_next(),
144 input::Action::SelectPrev => app.select_prev(),
145 input::Action::CycleStream => app.cycle_stream_mode(),
146 input::Action::TogglePause => app.toggle_pause(),
147 input::Action::Quit => app.quit(),
148 input::Action::QuitAndStop => app.quit_and_stop(),
149 input::Action::Stop => {
150 if let Some(name) = app.selected_name() {
151 let _ = cli::request(
152 session,
153 &Request::Stop {
154 target: name.to_string(),
155 },
156 false,
157 )
158 .await;
159 }
160 }
161 input::Action::StopAll => {
162 let _ = cli::request(session, &Request::StopAll, false).await;
163 }
164 input::Action::Restart => {
165 if let Some(name) = app.selected_name() {
166 let _ = cli::request(
167 session,
168 &Request::Restart {
169 target: name.to_string(),
170 },
171 false,
172 )
173 .await;
174 }
175 }
176 input::Action::None => {}
177 }
178 }
179 AppEvent::OutputLine {
180 process,
181 stream,
182 line,
183 } => {
184 app.push_output(&process, stream, &line);
185 }
186 AppEvent::StatusUpdate(processes) => {
187 app.update_processes(processes);
188 }
189 AppEvent::OutputStreamClosed => {
190 let count = reconnect_count.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
191 if count < MAX_RECONNECT_ATTEMPTS {
192 let session_str = session.to_string();
193 let reconnect_tx = tx.clone();
194 let rc = Arc::clone(&reconnect_count);
195 tokio::spawn(async move {
196 let delay = Duration::from_secs(2u64.saturating_pow(count).min(30));
198 tokio::time::sleep(delay).await;
199 output_stream_reader(&session_str, reconnect_tx).await;
200 rc.store(0, std::sync::atomic::Ordering::Relaxed);
202 });
203 }
204 }
205 }
206 }
207 }
208
209 if app.stop_all_on_quit {
211 let _ = cli::request(session, &Request::StopAll, false).await;
212 }
213
214 let _ = disable_raw_mode();
216 let _ = execute!(terminal.backend_mut(), LeaveAlternateScreen);
217 0
218}
219
220async fn output_stream_reader(session: &str, tx: mpsc::Sender<AppEvent>) {
221 let stream = match cli::connect(session, false).await {
222 Ok(s) => s,
223 Err(_) => {
224 let _ = tx.send(AppEvent::OutputStreamClosed).await;
225 return;
226 }
227 };
228
229 let (reader, mut writer) = stream.into_split();
230
231 let req = Request::Logs {
233 target: None,
234 tail: 0,
235 follow: true,
236 stderr: false,
237 all: true,
238 timeout_secs: None, lines: None,
240 };
241 let mut json = match serde_json::to_string(&req) {
242 Ok(j) => j,
243 Err(_) => return,
244 };
245 json.push('\n');
246 if writer.write_all(json.as_bytes()).await.is_err() {
247 return;
248 }
249 if writer.flush().await.is_err() {
250 return;
251 }
252
253 let mut lines = BufReader::new(reader);
254 loop {
255 let mut line = String::new();
256 match lines.read_line(&mut line).await {
257 Ok(0) | Err(_) => break, Ok(_) => {
259 if let Ok(resp) = serde_json::from_str::<Response>(&line) {
260 match resp {
261 Response::LogLine {
262 process,
263 stream,
264 line,
265 } => {
266 let _ = tx
267 .send(AppEvent::OutputLine {
268 process,
269 stream,
270 line,
271 })
272 .await;
273 }
274 Response::LogEnd => break,
275 _ => {}
276 }
277 }
278 }
279 }
280 }
281
282 let _ = tx.send(AppEvent::OutputStreamClosed).await;
283}
284
285async fn status_poller(session: &str, tx: mpsc::Sender<AppEvent>) {
286 let mut ticker = interval(Duration::from_secs(2));
287 loop {
288 ticker.tick().await;
289 if let Ok(Response::Status { processes }) =
290 cli::request(session, &Request::Status, false).await
291 {
292 if tx.send(AppEvent::StatusUpdate(processes)).await.is_err() {
293 break; }
295 }
296 }
297}
298
299async fn key_reader(tx: mpsc::Sender<AppEvent>) {
300 let mut reader = EventStream::new();
301 while let Some(Ok(event)) = reader.next().await {
302 if let Event::Key(key) = event {
303 if tx.send(AppEvent::Key(key)).await.is_err() {
304 break;
305 }
306 }
307 }
308}
309
310fn load_historical_logs(session: &str, app: &mut App) {
313 let log_dir = paths::log_dir(session);
314
315 let entries = match std::fs::read_dir(&log_dir) {
316 Ok(e) => e,
317 Err(_) => return,
318 };
319
320 for entry in entries.flatten() {
321 let name = entry.file_name().to_string_lossy().to_string();
322 let (proc_name, stream) = if let Some(p) = name.strip_suffix(".stdout") {
323 (p.to_string(), ProtoStream::Stdout)
324 } else if let Some(p) = name.strip_suffix(".stderr") {
325 (p.to_string(), ProtoStream::Stderr)
326 } else {
327 continue;
328 };
329
330 if let Ok(file) = std::fs::File::open(entry.path()) {
331 for line in StdBufReader::new(file).lines().map_while(Result::ok) {
332 app.push_output(&proc_name, stream, &line);
333 }
334 }
335 }
336}