1use crate::Result;
2use crate::daemon::{Daemon, RunOptions};
3use crate::env::PITCHFORK_LOGS_DIR;
4use crate::ipc::client::IpcClient;
5use crate::pitchfork_toml::PitchforkToml;
6use crate::procs::{PROCS, ProcessStats};
7use fuzzy_matcher::FuzzyMatcher;
8use fuzzy_matcher::skim::SkimMatcherV2;
9use miette::bail;
10use std::collections::{HashMap, HashSet, VecDeque};
11use std::fs;
12use std::path::PathBuf;
13use std::time::Instant;
14
15const MAX_STAT_HISTORY: usize = 60;
17
18#[derive(Debug, Clone, Copy)]
20pub struct StatsSnapshot {
21 pub cpu_percent: f32,
22 pub memory_bytes: u64,
23 pub disk_read_bytes: u64,
24 pub disk_write_bytes: u64,
25}
26
27impl From<&ProcessStats> for StatsSnapshot {
28 fn from(stats: &ProcessStats) -> Self {
29 Self {
30 cpu_percent: stats.cpu_percent,
31 memory_bytes: stats.memory_bytes,
32 disk_read_bytes: stats.disk_read_bytes,
33 disk_write_bytes: stats.disk_write_bytes,
34 }
35 }
36}
37
38#[derive(Debug, Clone, Default)]
40pub struct StatsHistory {
41 pub samples: VecDeque<StatsSnapshot>,
42}
43
44impl StatsHistory {
45 pub fn push(&mut self, snapshot: StatsSnapshot) {
46 self.samples.push_back(snapshot);
47 while self.samples.len() > MAX_STAT_HISTORY {
48 self.samples.pop_front();
49 }
50 }
51
52 pub fn cpu_values(&self) -> Vec<f32> {
53 self.samples.iter().map(|s| s.cpu_percent).collect()
54 }
55
56 pub fn memory_values(&self) -> Vec<u64> {
57 self.samples.iter().map(|s| s.memory_bytes).collect()
58 }
59
60 pub fn disk_read_values(&self) -> Vec<u64> {
61 self.samples.iter().map(|s| s.disk_read_bytes).collect()
62 }
63
64 pub fn disk_write_values(&self) -> Vec<u64> {
65 self.samples.iter().map(|s| s.disk_write_bytes).collect()
66 }
67}
68
69#[derive(Debug, Clone, Copy, PartialEq, Eq)]
70pub enum View {
71 Dashboard,
72 Logs,
73 Help,
74 Confirm,
75 Loading,
76 Details,
77}
78
79#[derive(Debug, Clone)]
80pub enum PendingAction {
81 Stop(String),
82 Restart(String),
83 Disable(String),
84 BatchStop(Vec<String>),
86 BatchRestart(Vec<String>),
87 BatchDisable(Vec<String>),
88}
89
90#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
91pub enum SortColumn {
92 #[default]
93 Name,
94 Status,
95 Cpu,
96 Memory,
97 Uptime,
98}
99
100impl SortColumn {
101 pub fn next(self) -> Self {
102 match self {
103 Self::Name => Self::Status,
104 Self::Status => Self::Cpu,
105 Self::Cpu => Self::Memory,
106 Self::Memory => Self::Uptime,
107 Self::Uptime => Self::Name,
108 }
109 }
110}
111
112#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
113pub enum SortOrder {
114 #[default]
115 Ascending,
116 Descending,
117}
118
119impl SortOrder {
120 pub fn toggle(self) -> Self {
121 match self {
122 Self::Ascending => Self::Descending,
123 Self::Descending => Self::Ascending,
124 }
125 }
126
127 pub fn indicator(self) -> &'static str {
128 match self {
129 Self::Ascending => "↑",
130 Self::Descending => "↓",
131 }
132 }
133}
134
135pub struct App {
136 pub daemons: Vec<Daemon>,
137 pub disabled: Vec<String>,
138 pub selected: usize,
139 pub view: View,
140 pub prev_view: View,
141 pub log_content: Vec<String>,
142 pub log_daemon_id: Option<String>,
143 pub log_scroll: usize,
144 pub log_follow: bool, pub message: Option<String>,
146 pub message_time: Option<Instant>,
147 pub process_stats: HashMap<u32, ProcessStats>, pub stats_history: HashMap<String, StatsHistory>, pub pending_action: Option<PendingAction>,
150 pub loading_text: Option<String>,
151 pub search_query: String,
152 pub search_active: bool,
153 pub sort_column: SortColumn,
155 pub sort_order: SortOrder,
156 pub log_search_query: String,
158 pub log_search_active: bool,
159 pub log_search_matches: Vec<usize>, pub log_search_current: usize, pub details_daemon_id: Option<String>,
163 pub logs_expanded: bool,
165 pub multi_select: HashSet<String>,
167 pub config_daemon_ids: HashSet<String>,
169 pub show_available: bool,
171}
172
173impl App {
174 pub fn new() -> Self {
175 Self {
176 daemons: Vec::new(),
177 disabled: Vec::new(),
178 selected: 0,
179 view: View::Dashboard,
180 prev_view: View::Dashboard,
181 log_content: Vec::new(),
182 log_daemon_id: None,
183 log_scroll: 0,
184 log_follow: true,
185 message: None,
186 message_time: None,
187 process_stats: HashMap::new(),
188 stats_history: HashMap::new(),
189 pending_action: None,
190 loading_text: None,
191 search_query: String::new(),
192 search_active: false,
193 sort_column: SortColumn::default(),
194 sort_order: SortOrder::default(),
195 log_search_query: String::new(),
196 log_search_active: false,
197 log_search_matches: Vec::new(),
198 log_search_current: 0,
199 details_daemon_id: None,
200 logs_expanded: false,
201 multi_select: HashSet::new(),
202 config_daemon_ids: HashSet::new(),
203 show_available: true, }
205 }
206
207 pub fn confirm_action(&mut self, action: PendingAction) {
208 self.pending_action = Some(action);
209 self.view = View::Confirm;
210 }
211
212 pub fn cancel_confirm(&mut self) {
213 self.pending_action = None;
214 self.view = View::Dashboard;
215 }
216
217 pub fn take_pending_action(&mut self) -> Option<PendingAction> {
218 self.view = View::Dashboard;
219 self.pending_action.take()
220 }
221
222 pub fn start_loading(&mut self, text: impl Into<String>) {
223 self.prev_view = self.view;
224 self.loading_text = Some(text.into());
225 self.view = View::Loading;
226 }
227
228 pub fn stop_loading(&mut self) {
229 self.loading_text = None;
230 self.view = self.prev_view;
231 }
232
233 pub fn start_search(&mut self) {
235 self.search_active = true;
236 }
237
238 pub fn end_search(&mut self) {
239 self.search_active = false;
240 }
241
242 pub fn clear_search(&mut self) {
243 self.search_query.clear();
244 self.search_active = false;
245 self.selected = 0;
246 }
247
248 pub fn search_push(&mut self, c: char) {
249 self.search_query.push(c);
250 self.selected = 0;
252 }
253
254 pub fn search_pop(&mut self) {
255 self.search_query.pop();
256 self.selected = 0;
257 }
258
259 pub fn filtered_daemons(&self) -> Vec<&Daemon> {
260 let mut filtered: Vec<&Daemon> = if self.search_query.is_empty() {
261 self.daemons.iter().collect()
262 } else {
263 let matcher = SkimMatcherV2::default();
265 let mut scored: Vec<_> = self
266 .daemons
267 .iter()
268 .filter_map(|d| {
269 matcher
270 .fuzzy_match(&d.id, &self.search_query)
271 .map(|score| (d, score))
272 })
273 .collect();
274 scored.sort_by(|a, b| b.1.cmp(&a.1));
276 scored.into_iter().map(|(d, _)| d).collect()
277 };
278
279 filtered.sort_by(|a, b| {
281 let cmp = match self.sort_column {
282 SortColumn::Name => a.id.to_lowercase().cmp(&b.id.to_lowercase()),
283 SortColumn::Status => {
284 let status_order = |d: &Daemon| match &d.status {
285 crate::daemon_status::DaemonStatus::Running => 0,
286 crate::daemon_status::DaemonStatus::Waiting => 1,
287 crate::daemon_status::DaemonStatus::Stopping => 2,
288 crate::daemon_status::DaemonStatus::Stopped => 3,
289 crate::daemon_status::DaemonStatus::Errored(_) => 4,
290 crate::daemon_status::DaemonStatus::Failed(_) => 5,
291 };
292 status_order(a).cmp(&status_order(b))
293 }
294 SortColumn::Cpu => {
295 let cpu_a = a
296 .pid
297 .and_then(|p| self.get_stats(p))
298 .map(|s| s.cpu_percent)
299 .unwrap_or(0.0);
300 let cpu_b = b
301 .pid
302 .and_then(|p| self.get_stats(p))
303 .map(|s| s.cpu_percent)
304 .unwrap_or(0.0);
305 cpu_a
306 .partial_cmp(&cpu_b)
307 .unwrap_or(std::cmp::Ordering::Equal)
308 }
309 SortColumn::Memory => {
310 let mem_a = a
311 .pid
312 .and_then(|p| self.get_stats(p))
313 .map(|s| s.memory_bytes)
314 .unwrap_or(0);
315 let mem_b = b
316 .pid
317 .and_then(|p| self.get_stats(p))
318 .map(|s| s.memory_bytes)
319 .unwrap_or(0);
320 mem_a.cmp(&mem_b)
321 }
322 SortColumn::Uptime => {
323 let up_a = a
324 .pid
325 .and_then(|p| self.get_stats(p))
326 .map(|s| s.uptime_secs)
327 .unwrap_or(0);
328 let up_b = b
329 .pid
330 .and_then(|p| self.get_stats(p))
331 .map(|s| s.uptime_secs)
332 .unwrap_or(0);
333 up_a.cmp(&up_b)
334 }
335 };
336 match self.sort_order {
337 SortOrder::Ascending => cmp,
338 SortOrder::Descending => cmp.reverse(),
339 }
340 });
341
342 filtered
343 }
344
345 pub fn cycle_sort(&mut self) {
347 self.sort_column = self.sort_column.next();
349 self.selected = 0;
350 }
351
352 pub fn toggle_sort_order(&mut self) {
353 self.sort_order = self.sort_order.toggle();
354 self.selected = 0;
355 }
356
357 pub fn selected_daemon(&self) -> Option<&Daemon> {
358 let filtered = self.filtered_daemons();
359 filtered.get(self.selected).copied()
360 }
361
362 pub fn select_next(&mut self) {
363 let count = self.filtered_daemons().len();
364 if count > 0 {
365 self.selected = (self.selected + 1) % count;
366 }
367 }
368
369 pub fn select_prev(&mut self) {
370 let count = self.filtered_daemons().len();
371 if count > 0 {
372 self.selected = self.selected.checked_sub(1).unwrap_or(count - 1);
373 }
374 }
375
376 pub fn toggle_log_follow(&mut self) {
378 self.log_follow = !self.log_follow;
379 if self.log_follow && !self.log_content.is_empty() {
380 self.log_scroll = self.log_content.len().saturating_sub(20);
382 }
383 }
384
385 pub fn toggle_logs_expanded(&mut self) {
387 self.logs_expanded = !self.logs_expanded;
388 }
389
390 pub fn toggle_select(&mut self) {
392 if let Some(daemon) = self.selected_daemon() {
393 let id = daemon.id.clone();
394 if self.multi_select.contains(&id) {
395 self.multi_select.remove(&id);
396 } else {
397 self.multi_select.insert(id);
398 }
399 }
400 }
401
402 pub fn select_all_visible(&mut self) {
403 let ids: Vec<String> = self
405 .filtered_daemons()
406 .iter()
407 .map(|d| d.id.clone())
408 .collect();
409 for id in ids {
410 self.multi_select.insert(id);
411 }
412 }
413
414 pub fn clear_selection(&mut self) {
415 self.multi_select.clear();
416 }
417
418 pub fn is_selected(&self, daemon_id: &str) -> bool {
419 self.multi_select.contains(daemon_id)
420 }
421
422 pub fn has_selection(&self) -> bool {
423 !self.multi_select.is_empty()
424 }
425
426 pub fn selected_daemon_ids(&self) -> Vec<String> {
427 self.multi_select.iter().cloned().collect()
428 }
429
430 pub fn set_message(&mut self, msg: impl Into<String>) {
431 self.message = Some(msg.into());
432 self.message_time = Some(Instant::now());
433 }
434
435 pub fn clear_stale_message(&mut self) {
436 if let Some(time) = self.message_time
437 && time.elapsed().as_secs() >= 3
438 {
439 self.message = None;
440 self.message_time = None;
441 }
442 }
443
444 pub fn get_stats(&self, pid: u32) -> Option<&ProcessStats> {
445 self.process_stats.get(&pid)
446 }
447
448 fn refresh_process_stats(&mut self) {
449 PROCS.refresh_processes();
450 self.process_stats.clear();
451 for daemon in &self.daemons {
452 if let Some(pid) = daemon.pid
453 && let Some(stats) = PROCS.get_stats(pid)
454 {
455 self.process_stats.insert(pid, stats);
456 let history = self.stats_history.entry(daemon.id.clone()).or_default();
458 history.push(StatsSnapshot::from(&stats));
459 }
460 }
461 }
462
463 pub fn get_stats_history(&self, daemon_id: &str) -> Option<&StatsHistory> {
465 self.stats_history.get(daemon_id)
466 }
467
468 pub async fn refresh(&mut self, client: &IpcClient) -> Result<()> {
469 self.daemons = client.active_daemons().await?;
470 self.daemons.retain(|d| d.id != "pitchfork");
472 self.disabled = client.get_disabled_daemons().await?;
473
474 self.refresh_config_daemons();
476
477 self.refresh_process_stats();
479
480 self.clear_stale_message();
482
483 let total_count = self.total_daemon_count();
485 if total_count > 0 && self.selected >= total_count {
486 self.selected = total_count - 1;
487 }
488
489 if self.view == View::Logs
491 && let Some(id) = self.log_daemon_id.clone()
492 {
493 self.load_logs(&id);
494 }
495
496 Ok(())
497 }
498
499 fn refresh_config_daemons(&mut self) {
500 use crate::daemon_status::DaemonStatus;
501
502 let config = PitchforkToml::all_merged();
503 let active_ids: HashSet<String> = self.daemons.iter().map(|d| d.id.clone()).collect();
504
505 self.config_daemon_ids.clear();
507 for daemon_id in config.daemons.keys() {
508 if !active_ids.contains(daemon_id) && daemon_id != "pitchfork" {
509 self.config_daemon_ids.insert(daemon_id.clone());
510
511 if self.show_available {
513 let placeholder = Daemon {
514 id: daemon_id.clone(),
515 title: None,
516 pid: None,
517 shell_pid: None,
518 status: DaemonStatus::Stopped,
519 dir: None,
520 autostop: false,
521 cron_schedule: None,
522 cron_retrigger: None,
523 last_cron_triggered: None,
524 last_exit_success: None,
525 retry: 0,
526 retry_count: 0,
527 ready_delay: None,
528 ready_output: None,
529 ready_http: None,
530 ready_port: None,
531 depends: vec![],
532 };
533 self.daemons.push(placeholder);
534 }
535 }
536 }
537 }
538
539 pub fn is_config_only(&self, daemon_id: &str) -> bool {
541 self.config_daemon_ids.contains(daemon_id)
542 }
543
544 pub fn toggle_show_available(&mut self) {
546 self.show_available = !self.show_available;
547 }
548
549 fn total_daemon_count(&self) -> usize {
551 self.filtered_daemons().len()
552 }
553
554 pub fn scroll_logs_down(&mut self) {
555 if self.log_content.len() > 20 {
556 let max_scroll = self.log_content.len().saturating_sub(20);
557 self.log_scroll = (self.log_scroll + 1).min(max_scroll);
558 }
559 }
560
561 pub fn scroll_logs_up(&mut self) {
562 self.log_scroll = self.log_scroll.saturating_sub(1);
563 }
564
565 pub fn scroll_logs_page_down(&mut self, visible_lines: usize) {
567 let half_page = visible_lines / 2;
568 if self.log_content.len() > visible_lines {
569 let max_scroll = self.log_content.len().saturating_sub(visible_lines);
570 self.log_scroll = (self.log_scroll + half_page).min(max_scroll);
571 }
572 }
573
574 pub fn scroll_logs_page_up(&mut self, visible_lines: usize) {
576 let half_page = visible_lines / 2;
577 self.log_scroll = self.log_scroll.saturating_sub(half_page);
578 }
579
580 pub fn start_log_search(&mut self) {
582 self.log_search_active = true;
583 self.log_search_query.clear();
584 self.log_search_matches.clear();
585 self.log_search_current = 0;
586 }
587
588 pub fn end_log_search(&mut self) {
589 self.log_search_active = false;
590 }
591
592 pub fn clear_log_search(&mut self) {
593 self.log_search_query.clear();
594 self.log_search_active = false;
595 self.log_search_matches.clear();
596 self.log_search_current = 0;
597 }
598
599 pub fn log_search_push(&mut self, c: char) {
600 self.log_search_query.push(c);
601 self.update_log_search_matches();
602 }
603
604 pub fn log_search_pop(&mut self) {
605 self.log_search_query.pop();
606 self.update_log_search_matches();
607 }
608
609 fn update_log_search_matches(&mut self) {
610 self.log_search_matches.clear();
611 if !self.log_search_query.is_empty() {
612 let query = self.log_search_query.to_lowercase();
613 for (i, line) in self.log_content.iter().enumerate() {
614 if line.to_lowercase().contains(&query) {
615 self.log_search_matches.push(i);
616 }
617 }
618 if !self.log_search_matches.is_empty() {
620 self.log_search_current = 0;
621 self.jump_to_log_match();
622 }
623 }
624 }
625
626 pub fn log_search_next(&mut self) {
627 if !self.log_search_matches.is_empty() {
628 self.log_search_current = (self.log_search_current + 1) % self.log_search_matches.len();
629 self.jump_to_log_match();
630 }
631 }
632
633 pub fn log_search_prev(&mut self) {
634 if !self.log_search_matches.is_empty() {
635 self.log_search_current = self
636 .log_search_current
637 .checked_sub(1)
638 .unwrap_or(self.log_search_matches.len() - 1);
639 self.jump_to_log_match();
640 }
641 }
642
643 fn jump_to_log_match(&mut self) {
644 if let Some(&line_idx) = self.log_search_matches.get(self.log_search_current) {
645 let half_page = 10; self.log_scroll = line_idx.saturating_sub(half_page);
648 self.log_follow = false;
649 }
650 }
651
652 pub fn show_details(&mut self, daemon_id: &str) {
654 self.details_daemon_id = Some(daemon_id.to_string());
655 self.prev_view = self.view;
656 self.view = View::Details;
657 }
658
659 pub fn hide_details(&mut self) {
660 self.details_daemon_id = None;
661 self.view = View::Dashboard;
662 }
663
664 pub fn view_daemon_details(&mut self, daemon_id: &str) {
666 self.log_daemon_id = Some(daemon_id.to_string());
667 self.logs_expanded = false; self.load_logs(daemon_id);
669 self.view = View::Logs; }
671
672 fn load_logs(&mut self, daemon_id: &str) {
673 let log_path = Self::log_path(daemon_id);
674 let prev_len = self.log_content.len();
675
676 self.log_content = if log_path.exists() {
677 fs::read_to_string(&log_path)
678 .unwrap_or_default()
679 .lines()
680 .map(String::from)
681 .collect()
682 } else {
683 vec!["No logs available".to_string()]
684 };
685
686 if self.log_follow {
688 if self.log_content.len() > 20 {
689 self.log_scroll = self.log_content.len().saturating_sub(20);
690 } else {
691 self.log_scroll = 0;
692 }
693 } else if prev_len == 0 {
694 if self.log_content.len() > 20 {
696 self.log_scroll = self.log_content.len().saturating_sub(20);
697 }
698 }
699 }
701
702 fn log_path(daemon_id: &str) -> PathBuf {
703 PITCHFORK_LOGS_DIR
704 .join(daemon_id)
705 .join(format!("{daemon_id}.log"))
706 }
707
708 pub fn show_help(&mut self) {
709 self.view = View::Help;
710 }
711
712 pub fn back_to_dashboard(&mut self) {
713 self.view = View::Dashboard;
714 self.log_daemon_id = None;
715 self.log_content.clear();
716 self.log_scroll = 0;
717 }
718
719 pub fn stats(&self) -> (usize, usize, usize, usize, usize) {
721 let available = self.config_daemon_ids.len();
722 let total = self.daemons.len();
723 let running = self
724 .daemons
725 .iter()
726 .filter(|d| d.status.is_running())
727 .count();
728 let stopped = self
730 .daemons
731 .iter()
732 .filter(|d| d.status.is_stopped() && !self.config_daemon_ids.contains(&d.id))
733 .count();
734 let errored = self
735 .daemons
736 .iter()
737 .filter(|d| d.status.is_errored() || d.status.is_failed())
738 .count();
739 (total, running, stopped, errored, available)
740 }
741
742 pub fn is_disabled(&self, daemon_id: &str) -> bool {
743 self.disabled.contains(&daemon_id.to_string())
744 }
745
746 pub async fn start_daemon(&mut self, client: &IpcClient, daemon_id: &str) -> Result<()> {
747 let config = PitchforkToml::all_merged();
749 let daemon_config = config
750 .daemons
751 .get(daemon_id)
752 .ok_or_else(|| miette::miette!("Daemon '{}' not found in config", daemon_id))?;
753
754 let cmd = shell_words::split(&daemon_config.run)
755 .map_err(|e| miette::miette!("Failed to parse command: {}", e))?;
756
757 if cmd.is_empty() {
758 bail!("Daemon '{}' has empty run command", daemon_id);
759 }
760
761 let (cron_schedule, cron_retrigger) = daemon_config
762 .cron
763 .as_ref()
764 .map(|c| (Some(c.schedule.clone()), Some(c.retrigger)))
765 .unwrap_or((None, None));
766
767 let opts = RunOptions {
768 id: daemon_id.to_string(),
769 cmd,
770 force: false,
771 shell_pid: None,
772 dir: std::env::current_dir().unwrap_or_default(),
773 autostop: false,
774 cron_schedule,
775 cron_retrigger,
776 retry: daemon_config.retry.count(),
777 retry_count: 0,
778 ready_delay: daemon_config.ready_delay,
779 ready_output: daemon_config.ready_output.clone(),
780 ready_http: daemon_config.ready_http.clone(),
781 ready_port: daemon_config.ready_port,
782 wait_ready: false,
783 depends: daemon_config.depends.clone(),
784 };
785
786 client.run(opts).await?;
787 self.set_message(format!("Started {}", daemon_id));
788 Ok(())
789 }
790}
791
792impl Default for App {
793 fn default() -> Self {
794 Self::new()
795 }
796}