1use crate::daemon::{Daemon, RunOptions};
2use crate::env::PITCHFORK_LOGS_DIR;
3use crate::ipc::client::IpcClient;
4use crate::pitchfork_toml::PitchforkToml;
5use crate::procs::{ProcessStats, PROCS};
6use crate::Result;
7use fuzzy_matcher::skim::SkimMatcherV2;
8use fuzzy_matcher::FuzzyMatcher;
9use miette::bail;
10use std::collections::{HashMap, HashSet, VecDeque};
11use std::fs;
12use std::path::PathBuf;
13use std::time::Instant;
14
15const MAX_STAT_HISTORY: usize = 60;
17
18#[derive(Debug, Clone, Copy)]
20pub struct StatsSnapshot {
21 pub cpu_percent: f32,
22 pub memory_bytes: u64,
23 pub disk_read_bytes: u64,
24 pub disk_write_bytes: u64,
25}
26
27impl From<&ProcessStats> for StatsSnapshot {
28 fn from(stats: &ProcessStats) -> Self {
29 Self {
30 cpu_percent: stats.cpu_percent,
31 memory_bytes: stats.memory_bytes,
32 disk_read_bytes: stats.disk_read_bytes,
33 disk_write_bytes: stats.disk_write_bytes,
34 }
35 }
36}
37
38#[derive(Debug, Clone, Default)]
40pub struct StatsHistory {
41 pub samples: VecDeque<StatsSnapshot>,
42}
43
44impl StatsHistory {
45 pub fn push(&mut self, snapshot: StatsSnapshot) {
46 self.samples.push_back(snapshot);
47 while self.samples.len() > MAX_STAT_HISTORY {
48 self.samples.pop_front();
49 }
50 }
51
52 pub fn cpu_values(&self) -> Vec<f32> {
53 self.samples.iter().map(|s| s.cpu_percent).collect()
54 }
55
56 pub fn memory_values(&self) -> Vec<u64> {
57 self.samples.iter().map(|s| s.memory_bytes).collect()
58 }
59
60 pub fn disk_read_values(&self) -> Vec<u64> {
61 self.samples.iter().map(|s| s.disk_read_bytes).collect()
62 }
63
64 pub fn disk_write_values(&self) -> Vec<u64> {
65 self.samples.iter().map(|s| s.disk_write_bytes).collect()
66 }
67}
68
69#[derive(Debug, Clone, Copy, PartialEq, Eq)]
70pub enum View {
71 Dashboard,
72 Logs,
73 Help,
74 Confirm,
75 Loading,
76 Details,
77}
78
79#[derive(Debug, Clone)]
80pub enum PendingAction {
81 Stop(String),
82 Restart(String),
83 Disable(String),
84 BatchStop(Vec<String>),
86 BatchRestart(Vec<String>),
87 BatchDisable(Vec<String>),
88}
89
90#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
91pub enum SortColumn {
92 #[default]
93 Name,
94 Status,
95 Cpu,
96 Memory,
97 Uptime,
98}
99
100impl SortColumn {
101 pub fn next(self) -> Self {
102 match self {
103 Self::Name => Self::Status,
104 Self::Status => Self::Cpu,
105 Self::Cpu => Self::Memory,
106 Self::Memory => Self::Uptime,
107 Self::Uptime => Self::Name,
108 }
109 }
110}
111
112#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
113pub enum SortOrder {
114 #[default]
115 Ascending,
116 Descending,
117}
118
119impl SortOrder {
120 pub fn toggle(self) -> Self {
121 match self {
122 Self::Ascending => Self::Descending,
123 Self::Descending => Self::Ascending,
124 }
125 }
126
127 pub fn indicator(self) -> &'static str {
128 match self {
129 Self::Ascending => "↑",
130 Self::Descending => "↓",
131 }
132 }
133}
134
135pub struct App {
136 pub daemons: Vec<Daemon>,
137 pub disabled: Vec<String>,
138 pub selected: usize,
139 pub view: View,
140 pub prev_view: View,
141 pub log_content: Vec<String>,
142 pub log_daemon_id: Option<String>,
143 pub log_scroll: usize,
144 pub log_follow: bool, pub message: Option<String>,
146 pub message_time: Option<Instant>,
147 pub process_stats: HashMap<u32, ProcessStats>, pub stats_history: HashMap<String, StatsHistory>, pub pending_action: Option<PendingAction>,
150 pub loading_text: Option<String>,
151 pub search_query: String,
152 pub search_active: bool,
153 pub sort_column: SortColumn,
155 pub sort_order: SortOrder,
156 pub log_search_query: String,
158 pub log_search_active: bool,
159 pub log_search_matches: Vec<usize>, pub log_search_current: usize, pub details_daemon_id: Option<String>,
163 pub logs_expanded: bool,
165 pub multi_select: HashSet<String>,
167 pub config_daemon_ids: HashSet<String>,
169 pub show_available: bool,
171}
172
173impl App {
174 pub fn new() -> Self {
175 Self {
176 daemons: Vec::new(),
177 disabled: Vec::new(),
178 selected: 0,
179 view: View::Dashboard,
180 prev_view: View::Dashboard,
181 log_content: Vec::new(),
182 log_daemon_id: None,
183 log_scroll: 0,
184 log_follow: true,
185 message: None,
186 message_time: None,
187 process_stats: HashMap::new(),
188 stats_history: HashMap::new(),
189 pending_action: None,
190 loading_text: None,
191 search_query: String::new(),
192 search_active: false,
193 sort_column: SortColumn::default(),
194 sort_order: SortOrder::default(),
195 log_search_query: String::new(),
196 log_search_active: false,
197 log_search_matches: Vec::new(),
198 log_search_current: 0,
199 details_daemon_id: None,
200 logs_expanded: false,
201 multi_select: HashSet::new(),
202 config_daemon_ids: HashSet::new(),
203 show_available: true, }
205 }
206
207 pub fn confirm_action(&mut self, action: PendingAction) {
208 self.pending_action = Some(action);
209 self.view = View::Confirm;
210 }
211
212 pub fn cancel_confirm(&mut self) {
213 self.pending_action = None;
214 self.view = View::Dashboard;
215 }
216
217 pub fn take_pending_action(&mut self) -> Option<PendingAction> {
218 self.view = View::Dashboard;
219 self.pending_action.take()
220 }
221
222 pub fn start_loading(&mut self, text: impl Into<String>) {
223 self.prev_view = self.view;
224 self.loading_text = Some(text.into());
225 self.view = View::Loading;
226 }
227
228 pub fn stop_loading(&mut self) {
229 self.loading_text = None;
230 self.view = self.prev_view;
231 }
232
233 pub fn start_search(&mut self) {
235 self.search_active = true;
236 }
237
238 pub fn end_search(&mut self) {
239 self.search_active = false;
240 }
241
242 pub fn clear_search(&mut self) {
243 self.search_query.clear();
244 self.search_active = false;
245 self.selected = 0;
246 }
247
248 pub fn search_push(&mut self, c: char) {
249 self.search_query.push(c);
250 self.selected = 0;
252 }
253
254 pub fn search_pop(&mut self) {
255 self.search_query.pop();
256 self.selected = 0;
257 }
258
259 pub fn filtered_daemons(&self) -> Vec<&Daemon> {
260 let mut filtered: Vec<&Daemon> = if self.search_query.is_empty() {
261 self.daemons.iter().collect()
262 } else {
263 let matcher = SkimMatcherV2::default();
265 let mut scored: Vec<_> = self
266 .daemons
267 .iter()
268 .filter_map(|d| {
269 matcher
270 .fuzzy_match(&d.id, &self.search_query)
271 .map(|score| (d, score))
272 })
273 .collect();
274 scored.sort_by(|a, b| b.1.cmp(&a.1));
276 scored.into_iter().map(|(d, _)| d).collect()
277 };
278
279 filtered.sort_by(|a, b| {
281 let cmp = match self.sort_column {
282 SortColumn::Name => a.id.to_lowercase().cmp(&b.id.to_lowercase()),
283 SortColumn::Status => {
284 let status_order = |d: &Daemon| match &d.status {
285 crate::daemon_status::DaemonStatus::Running => 0,
286 crate::daemon_status::DaemonStatus::Waiting => 1,
287 crate::daemon_status::DaemonStatus::Stopping => 2,
288 crate::daemon_status::DaemonStatus::Stopped => 3,
289 crate::daemon_status::DaemonStatus::Errored(_) => 4,
290 crate::daemon_status::DaemonStatus::Failed(_) => 5,
291 };
292 status_order(a).cmp(&status_order(b))
293 }
294 SortColumn::Cpu => {
295 let cpu_a = a
296 .pid
297 .and_then(|p| self.get_stats(p))
298 .map(|s| s.cpu_percent)
299 .unwrap_or(0.0);
300 let cpu_b = b
301 .pid
302 .and_then(|p| self.get_stats(p))
303 .map(|s| s.cpu_percent)
304 .unwrap_or(0.0);
305 cpu_a
306 .partial_cmp(&cpu_b)
307 .unwrap_or(std::cmp::Ordering::Equal)
308 }
309 SortColumn::Memory => {
310 let mem_a = a
311 .pid
312 .and_then(|p| self.get_stats(p))
313 .map(|s| s.memory_bytes)
314 .unwrap_or(0);
315 let mem_b = b
316 .pid
317 .and_then(|p| self.get_stats(p))
318 .map(|s| s.memory_bytes)
319 .unwrap_or(0);
320 mem_a.cmp(&mem_b)
321 }
322 SortColumn::Uptime => {
323 let up_a = a
324 .pid
325 .and_then(|p| self.get_stats(p))
326 .map(|s| s.uptime_secs)
327 .unwrap_or(0);
328 let up_b = b
329 .pid
330 .and_then(|p| self.get_stats(p))
331 .map(|s| s.uptime_secs)
332 .unwrap_or(0);
333 up_a.cmp(&up_b)
334 }
335 };
336 match self.sort_order {
337 SortOrder::Ascending => cmp,
338 SortOrder::Descending => cmp.reverse(),
339 }
340 });
341
342 filtered
343 }
344
345 pub fn cycle_sort(&mut self) {
347 self.sort_column = self.sort_column.next();
349 self.selected = 0;
350 }
351
352 pub fn toggle_sort_order(&mut self) {
353 self.sort_order = self.sort_order.toggle();
354 self.selected = 0;
355 }
356
357 pub fn selected_daemon(&self) -> Option<&Daemon> {
358 let filtered = self.filtered_daemons();
359 filtered.get(self.selected).copied()
360 }
361
362 pub fn select_next(&mut self) {
363 let count = self.filtered_daemons().len();
364 if count > 0 {
365 self.selected = (self.selected + 1) % count;
366 }
367 }
368
369 pub fn select_prev(&mut self) {
370 let count = self.filtered_daemons().len();
371 if count > 0 {
372 self.selected = self.selected.checked_sub(1).unwrap_or(count - 1);
373 }
374 }
375
376 pub fn toggle_log_follow(&mut self) {
378 self.log_follow = !self.log_follow;
379 if self.log_follow && !self.log_content.is_empty() {
380 self.log_scroll = self.log_content.len().saturating_sub(20);
382 }
383 }
384
385 pub fn toggle_logs_expanded(&mut self) {
387 self.logs_expanded = !self.logs_expanded;
388 }
389
390 pub fn toggle_select(&mut self) {
392 if let Some(daemon) = self.selected_daemon() {
393 let id = daemon.id.clone();
394 if self.multi_select.contains(&id) {
395 self.multi_select.remove(&id);
396 } else {
397 self.multi_select.insert(id);
398 }
399 }
400 }
401
402 pub fn select_all_visible(&mut self) {
403 let ids: Vec<String> = self
405 .filtered_daemons()
406 .iter()
407 .map(|d| d.id.clone())
408 .collect();
409 for id in ids {
410 self.multi_select.insert(id);
411 }
412 }
413
414 pub fn clear_selection(&mut self) {
415 self.multi_select.clear();
416 }
417
418 pub fn is_selected(&self, daemon_id: &str) -> bool {
419 self.multi_select.contains(daemon_id)
420 }
421
422 pub fn has_selection(&self) -> bool {
423 !self.multi_select.is_empty()
424 }
425
426 pub fn selected_daemon_ids(&self) -> Vec<String> {
427 self.multi_select.iter().cloned().collect()
428 }
429
430 pub fn set_message(&mut self, msg: impl Into<String>) {
431 self.message = Some(msg.into());
432 self.message_time = Some(Instant::now());
433 }
434
435 pub fn clear_stale_message(&mut self) {
436 if let Some(time) = self.message_time {
437 if time.elapsed().as_secs() >= 3 {
438 self.message = None;
439 self.message_time = None;
440 }
441 }
442 }
443
444 pub fn get_stats(&self, pid: u32) -> Option<&ProcessStats> {
445 self.process_stats.get(&pid)
446 }
447
448 fn refresh_process_stats(&mut self) {
449 PROCS.refresh_processes();
450 self.process_stats.clear();
451 for daemon in &self.daemons {
452 if let Some(pid) = daemon.pid {
453 if let Some(stats) = PROCS.get_stats(pid) {
454 self.process_stats.insert(pid, stats);
455 let history = self.stats_history.entry(daemon.id.clone()).or_default();
457 history.push(StatsSnapshot::from(&stats));
458 }
459 }
460 }
461 }
462
463 pub fn get_stats_history(&self, daemon_id: &str) -> Option<&StatsHistory> {
465 self.stats_history.get(daemon_id)
466 }
467
468 pub async fn refresh(&mut self, client: &IpcClient) -> Result<()> {
469 self.daemons = client.active_daemons().await?;
470 self.daemons.retain(|d| d.id != "pitchfork");
472 self.disabled = client.get_disabled_daemons().await?;
473
474 self.refresh_config_daemons();
476
477 self.refresh_process_stats();
479
480 self.clear_stale_message();
482
483 let total_count = self.total_daemon_count();
485 if total_count > 0 && self.selected >= total_count {
486 self.selected = total_count - 1;
487 }
488
489 if self.view == View::Logs {
491 if let Some(id) = self.log_daemon_id.clone() {
492 self.load_logs(&id);
493 }
494 }
495
496 Ok(())
497 }
498
499 fn refresh_config_daemons(&mut self) {
500 use crate::daemon_status::DaemonStatus;
501
502 let config = PitchforkToml::all_merged();
503 let active_ids: HashSet<String> = self.daemons.iter().map(|d| d.id.clone()).collect();
504
505 self.config_daemon_ids.clear();
507 for daemon_id in config.daemons.keys() {
508 if !active_ids.contains(daemon_id) && daemon_id != "pitchfork" {
509 self.config_daemon_ids.insert(daemon_id.clone());
510
511 if self.show_available {
513 let placeholder = Daemon {
514 id: daemon_id.clone(),
515 title: None,
516 pid: None,
517 shell_pid: None,
518 status: DaemonStatus::Stopped,
519 dir: None,
520 autostop: false,
521 cron_schedule: None,
522 cron_retrigger: None,
523 last_exit_success: None,
524 retry: 0,
525 retry_count: 0,
526 ready_delay: None,
527 ready_output: None,
528 ready_http: None,
529 ready_port: None,
530 };
531 self.daemons.push(placeholder);
532 }
533 }
534 }
535 }
536
537 pub fn is_config_only(&self, daemon_id: &str) -> bool {
539 self.config_daemon_ids.contains(daemon_id)
540 }
541
542 pub fn toggle_show_available(&mut self) {
544 self.show_available = !self.show_available;
545 }
546
547 fn total_daemon_count(&self) -> usize {
549 self.filtered_daemons().len()
550 }
551
552 pub fn scroll_logs_down(&mut self) {
553 if self.log_content.len() > 20 {
554 let max_scroll = self.log_content.len().saturating_sub(20);
555 self.log_scroll = (self.log_scroll + 1).min(max_scroll);
556 }
557 }
558
559 pub fn scroll_logs_up(&mut self) {
560 self.log_scroll = self.log_scroll.saturating_sub(1);
561 }
562
563 pub fn scroll_logs_page_down(&mut self, visible_lines: usize) {
565 let half_page = visible_lines / 2;
566 if self.log_content.len() > visible_lines {
567 let max_scroll = self.log_content.len().saturating_sub(visible_lines);
568 self.log_scroll = (self.log_scroll + half_page).min(max_scroll);
569 }
570 }
571
572 pub fn scroll_logs_page_up(&mut self, visible_lines: usize) {
574 let half_page = visible_lines / 2;
575 self.log_scroll = self.log_scroll.saturating_sub(half_page);
576 }
577
578 pub fn start_log_search(&mut self) {
580 self.log_search_active = true;
581 self.log_search_query.clear();
582 self.log_search_matches.clear();
583 self.log_search_current = 0;
584 }
585
586 pub fn end_log_search(&mut self) {
587 self.log_search_active = false;
588 }
589
590 pub fn clear_log_search(&mut self) {
591 self.log_search_query.clear();
592 self.log_search_active = false;
593 self.log_search_matches.clear();
594 self.log_search_current = 0;
595 }
596
597 pub fn log_search_push(&mut self, c: char) {
598 self.log_search_query.push(c);
599 self.update_log_search_matches();
600 }
601
602 pub fn log_search_pop(&mut self) {
603 self.log_search_query.pop();
604 self.update_log_search_matches();
605 }
606
607 fn update_log_search_matches(&mut self) {
608 self.log_search_matches.clear();
609 if !self.log_search_query.is_empty() {
610 let query = self.log_search_query.to_lowercase();
611 for (i, line) in self.log_content.iter().enumerate() {
612 if line.to_lowercase().contains(&query) {
613 self.log_search_matches.push(i);
614 }
615 }
616 if !self.log_search_matches.is_empty() {
618 self.log_search_current = 0;
619 self.jump_to_log_match();
620 }
621 }
622 }
623
624 pub fn log_search_next(&mut self) {
625 if !self.log_search_matches.is_empty() {
626 self.log_search_current = (self.log_search_current + 1) % self.log_search_matches.len();
627 self.jump_to_log_match();
628 }
629 }
630
631 pub fn log_search_prev(&mut self) {
632 if !self.log_search_matches.is_empty() {
633 self.log_search_current = self
634 .log_search_current
635 .checked_sub(1)
636 .unwrap_or(self.log_search_matches.len() - 1);
637 self.jump_to_log_match();
638 }
639 }
640
641 fn jump_to_log_match(&mut self) {
642 if let Some(&line_idx) = self.log_search_matches.get(self.log_search_current) {
643 let half_page = 10; self.log_scroll = line_idx.saturating_sub(half_page);
646 self.log_follow = false;
647 }
648 }
649
650 pub fn show_details(&mut self, daemon_id: &str) {
652 self.details_daemon_id = Some(daemon_id.to_string());
653 self.prev_view = self.view;
654 self.view = View::Details;
655 }
656
657 pub fn hide_details(&mut self) {
658 self.details_daemon_id = None;
659 self.view = View::Dashboard;
660 }
661
662 pub fn view_daemon_details(&mut self, daemon_id: &str) {
664 self.log_daemon_id = Some(daemon_id.to_string());
665 self.logs_expanded = false; self.load_logs(daemon_id);
667 self.view = View::Logs; }
669
670 fn load_logs(&mut self, daemon_id: &str) {
671 let log_path = Self::log_path(daemon_id);
672 let prev_len = self.log_content.len();
673
674 self.log_content = if log_path.exists() {
675 fs::read_to_string(&log_path)
676 .unwrap_or_default()
677 .lines()
678 .map(String::from)
679 .collect()
680 } else {
681 vec!["No logs available".to_string()]
682 };
683
684 if self.log_follow {
686 if self.log_content.len() > 20 {
687 self.log_scroll = self.log_content.len().saturating_sub(20);
688 } else {
689 self.log_scroll = 0;
690 }
691 } else if prev_len == 0 {
692 if self.log_content.len() > 20 {
694 self.log_scroll = self.log_content.len().saturating_sub(20);
695 }
696 }
697 }
699
700 fn log_path(daemon_id: &str) -> PathBuf {
701 PITCHFORK_LOGS_DIR
702 .join(daemon_id)
703 .join(format!("{daemon_id}.log"))
704 }
705
706 pub fn show_help(&mut self) {
707 self.view = View::Help;
708 }
709
710 pub fn back_to_dashboard(&mut self) {
711 self.view = View::Dashboard;
712 self.log_daemon_id = None;
713 self.log_content.clear();
714 self.log_scroll = 0;
715 }
716
717 pub fn stats(&self) -> (usize, usize, usize, usize, usize) {
719 let available = self.config_daemon_ids.len();
720 let total = self.daemons.len();
721 let running = self
722 .daemons
723 .iter()
724 .filter(|d| d.status.is_running())
725 .count();
726 let stopped = self
728 .daemons
729 .iter()
730 .filter(|d| d.status.is_stopped() && !self.config_daemon_ids.contains(&d.id))
731 .count();
732 let errored = self
733 .daemons
734 .iter()
735 .filter(|d| d.status.is_errored() || d.status.is_failed())
736 .count();
737 (total, running, stopped, errored, available)
738 }
739
740 pub fn is_disabled(&self, daemon_id: &str) -> bool {
741 self.disabled.contains(&daemon_id.to_string())
742 }
743
744 pub async fn start_daemon(&mut self, client: &IpcClient, daemon_id: &str) -> Result<()> {
745 let config = PitchforkToml::all_merged();
747 let daemon_config = config
748 .daemons
749 .get(daemon_id)
750 .ok_or_else(|| miette::miette!("Daemon '{}' not found in config", daemon_id))?;
751
752 let cmd = shell_words::split(&daemon_config.run)
753 .map_err(|e| miette::miette!("Failed to parse command: {}", e))?;
754
755 if cmd.is_empty() {
756 bail!("Daemon '{}' has empty run command", daemon_id);
757 }
758
759 let (cron_schedule, cron_retrigger) = daemon_config
760 .cron
761 .as_ref()
762 .map(|c| (Some(c.schedule.clone()), Some(c.retrigger)))
763 .unwrap_or((None, None));
764
765 let opts = RunOptions {
766 id: daemon_id.to_string(),
767 cmd,
768 force: false,
769 shell_pid: None,
770 dir: std::env::current_dir().unwrap_or_default(),
771 autostop: false,
772 cron_schedule,
773 cron_retrigger,
774 retry: daemon_config.retry,
775 retry_count: 0,
776 ready_delay: daemon_config.ready_delay,
777 ready_output: daemon_config.ready_output.clone(),
778 ready_http: daemon_config.ready_http.clone(),
779 ready_port: daemon_config.ready_port,
780 wait_ready: false,
781 };
782
783 client.run(opts).await?;
784 self.set_message(format!("Started {}", daemon_id));
785 Ok(())
786 }
787}
788
789impl Default for App {
790 fn default() -> Self {
791 Self::new()
792 }
793}