1use crate::Result;
2use crate::daemon::{Daemon, RunOptions};
3use crate::env::PITCHFORK_LOGS_DIR;
4use crate::ipc::client::IpcClient;
5use crate::pitchfork_toml::PitchforkToml;
6use crate::procs::{PROCS, ProcessStats};
7use fuzzy_matcher::FuzzyMatcher;
8use fuzzy_matcher::skim::SkimMatcherV2;
9use miette::bail;
10use std::collections::{HashMap, HashSet, VecDeque};
11use std::fs;
12use std::path::PathBuf;
13use std::time::Instant;
14
15const MAX_STAT_HISTORY: usize = 60;
17
18#[derive(Debug, Clone, Copy)]
20pub struct StatsSnapshot {
21 pub cpu_percent: f32,
22 pub memory_bytes: u64,
23 pub disk_read_bytes: u64,
24 pub disk_write_bytes: u64,
25}
26
27impl From<&ProcessStats> for StatsSnapshot {
28 fn from(stats: &ProcessStats) -> Self {
29 Self {
30 cpu_percent: stats.cpu_percent,
31 memory_bytes: stats.memory_bytes,
32 disk_read_bytes: stats.disk_read_bytes,
33 disk_write_bytes: stats.disk_write_bytes,
34 }
35 }
36}
37
38#[derive(Debug, Clone, Default)]
40pub struct StatsHistory {
41 pub samples: VecDeque<StatsSnapshot>,
42}
43
44impl StatsHistory {
45 pub fn push(&mut self, snapshot: StatsSnapshot) {
46 self.samples.push_back(snapshot);
47 while self.samples.len() > MAX_STAT_HISTORY {
48 self.samples.pop_front();
49 }
50 }
51
52 pub fn cpu_values(&self) -> Vec<f32> {
53 self.samples.iter().map(|s| s.cpu_percent).collect()
54 }
55
56 pub fn memory_values(&self) -> Vec<u64> {
57 self.samples.iter().map(|s| s.memory_bytes).collect()
58 }
59
60 pub fn disk_read_values(&self) -> Vec<u64> {
61 self.samples.iter().map(|s| s.disk_read_bytes).collect()
62 }
63
64 pub fn disk_write_values(&self) -> Vec<u64> {
65 self.samples.iter().map(|s| s.disk_write_bytes).collect()
66 }
67}
68
69#[derive(Debug, Clone, Copy, PartialEq, Eq)]
70pub enum View {
71 Dashboard,
72 Logs,
73 Help,
74 Confirm,
75 Loading,
76 Details,
77}
78
79#[derive(Debug, Clone)]
80pub enum PendingAction {
81 Stop(String),
82 Restart(String),
83 Disable(String),
84 BatchStop(Vec<String>),
86 BatchRestart(Vec<String>),
87 BatchDisable(Vec<String>),
88}
89
90#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
91pub enum SortColumn {
92 #[default]
93 Name,
94 Status,
95 Cpu,
96 Memory,
97 Uptime,
98}
99
100impl SortColumn {
101 pub fn next(self) -> Self {
102 match self {
103 Self::Name => Self::Status,
104 Self::Status => Self::Cpu,
105 Self::Cpu => Self::Memory,
106 Self::Memory => Self::Uptime,
107 Self::Uptime => Self::Name,
108 }
109 }
110}
111
112#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
113pub enum SortOrder {
114 #[default]
115 Ascending,
116 Descending,
117}
118
119impl SortOrder {
120 pub fn toggle(self) -> Self {
121 match self {
122 Self::Ascending => Self::Descending,
123 Self::Descending => Self::Ascending,
124 }
125 }
126
127 pub fn indicator(self) -> &'static str {
128 match self {
129 Self::Ascending => "↑",
130 Self::Descending => "↓",
131 }
132 }
133}
134
135pub struct App {
136 pub daemons: Vec<Daemon>,
137 pub disabled: Vec<String>,
138 pub selected: usize,
139 pub view: View,
140 pub prev_view: View,
141 pub log_content: Vec<String>,
142 pub log_daemon_id: Option<String>,
143 pub log_scroll: usize,
144 pub log_follow: bool, pub message: Option<String>,
146 pub message_time: Option<Instant>,
147 pub process_stats: HashMap<u32, ProcessStats>, pub stats_history: HashMap<String, StatsHistory>, pub pending_action: Option<PendingAction>,
150 pub loading_text: Option<String>,
151 pub search_query: String,
152 pub search_active: bool,
153 pub sort_column: SortColumn,
155 pub sort_order: SortOrder,
156 pub log_search_query: String,
158 pub log_search_active: bool,
159 pub log_search_matches: Vec<usize>, pub log_search_current: usize, pub details_daemon_id: Option<String>,
163 pub logs_expanded: bool,
165 pub multi_select: HashSet<String>,
167 pub config_daemon_ids: HashSet<String>,
169 pub show_available: bool,
171}
172
173impl App {
174 pub fn new() -> Self {
175 Self {
176 daemons: Vec::new(),
177 disabled: Vec::new(),
178 selected: 0,
179 view: View::Dashboard,
180 prev_view: View::Dashboard,
181 log_content: Vec::new(),
182 log_daemon_id: None,
183 log_scroll: 0,
184 log_follow: true,
185 message: None,
186 message_time: None,
187 process_stats: HashMap::new(),
188 stats_history: HashMap::new(),
189 pending_action: None,
190 loading_text: None,
191 search_query: String::new(),
192 search_active: false,
193 sort_column: SortColumn::default(),
194 sort_order: SortOrder::default(),
195 log_search_query: String::new(),
196 log_search_active: false,
197 log_search_matches: Vec::new(),
198 log_search_current: 0,
199 details_daemon_id: None,
200 logs_expanded: false,
201 multi_select: HashSet::new(),
202 config_daemon_ids: HashSet::new(),
203 show_available: true, }
205 }
206
207 pub fn confirm_action(&mut self, action: PendingAction) {
208 self.pending_action = Some(action);
209 self.view = View::Confirm;
210 }
211
212 pub fn cancel_confirm(&mut self) {
213 self.pending_action = None;
214 self.view = View::Dashboard;
215 }
216
217 pub fn take_pending_action(&mut self) -> Option<PendingAction> {
218 self.view = View::Dashboard;
219 self.pending_action.take()
220 }
221
222 pub fn start_loading(&mut self, text: impl Into<String>) {
223 self.prev_view = self.view;
224 self.loading_text = Some(text.into());
225 self.view = View::Loading;
226 }
227
228 pub fn stop_loading(&mut self) {
229 self.loading_text = None;
230 self.view = self.prev_view;
231 }
232
233 pub fn start_search(&mut self) {
235 self.search_active = true;
236 }
237
238 pub fn end_search(&mut self) {
239 self.search_active = false;
240 }
241
242 pub fn clear_search(&mut self) {
243 self.search_query.clear();
244 self.search_active = false;
245 self.selected = 0;
246 }
247
248 pub fn search_push(&mut self, c: char) {
249 self.search_query.push(c);
250 self.selected = 0;
252 }
253
254 pub fn search_pop(&mut self) {
255 self.search_query.pop();
256 self.selected = 0;
257 }
258
259 pub fn filtered_daemons(&self) -> Vec<&Daemon> {
260 let mut filtered: Vec<&Daemon> = if self.search_query.is_empty() {
261 self.daemons.iter().collect()
262 } else {
263 let matcher = SkimMatcherV2::default();
265 let mut scored: Vec<_> = self
266 .daemons
267 .iter()
268 .filter_map(|d| {
269 matcher
270 .fuzzy_match(&d.id, &self.search_query)
271 .map(|score| (d, score))
272 })
273 .collect();
274 scored.sort_by(|a, b| b.1.cmp(&a.1));
276 scored.into_iter().map(|(d, _)| d).collect()
277 };
278
279 filtered.sort_by(|a, b| {
281 let cmp = match self.sort_column {
282 SortColumn::Name => a.id.to_lowercase().cmp(&b.id.to_lowercase()),
283 SortColumn::Status => {
284 let status_order = |d: &Daemon| match &d.status {
285 crate::daemon_status::DaemonStatus::Running => 0,
286 crate::daemon_status::DaemonStatus::Waiting => 1,
287 crate::daemon_status::DaemonStatus::Stopping => 2,
288 crate::daemon_status::DaemonStatus::Stopped => 3,
289 crate::daemon_status::DaemonStatus::Errored(_) => 4,
290 crate::daemon_status::DaemonStatus::Failed(_) => 5,
291 };
292 status_order(a).cmp(&status_order(b))
293 }
294 SortColumn::Cpu => {
295 let cpu_a = a
296 .pid
297 .and_then(|p| self.get_stats(p))
298 .map(|s| s.cpu_percent)
299 .unwrap_or(0.0);
300 let cpu_b = b
301 .pid
302 .and_then(|p| self.get_stats(p))
303 .map(|s| s.cpu_percent)
304 .unwrap_or(0.0);
305 cpu_a
306 .partial_cmp(&cpu_b)
307 .unwrap_or(std::cmp::Ordering::Equal)
308 }
309 SortColumn::Memory => {
310 let mem_a = a
311 .pid
312 .and_then(|p| self.get_stats(p))
313 .map(|s| s.memory_bytes)
314 .unwrap_or(0);
315 let mem_b = b
316 .pid
317 .and_then(|p| self.get_stats(p))
318 .map(|s| s.memory_bytes)
319 .unwrap_or(0);
320 mem_a.cmp(&mem_b)
321 }
322 SortColumn::Uptime => {
323 let up_a = a
324 .pid
325 .and_then(|p| self.get_stats(p))
326 .map(|s| s.uptime_secs)
327 .unwrap_or(0);
328 let up_b = b
329 .pid
330 .and_then(|p| self.get_stats(p))
331 .map(|s| s.uptime_secs)
332 .unwrap_or(0);
333 up_a.cmp(&up_b)
334 }
335 };
336 match self.sort_order {
337 SortOrder::Ascending => cmp,
338 SortOrder::Descending => cmp.reverse(),
339 }
340 });
341
342 filtered
343 }
344
345 pub fn cycle_sort(&mut self) {
347 self.sort_column = self.sort_column.next();
349 self.selected = 0;
350 }
351
352 pub fn toggle_sort_order(&mut self) {
353 self.sort_order = self.sort_order.toggle();
354 self.selected = 0;
355 }
356
357 pub fn selected_daemon(&self) -> Option<&Daemon> {
358 let filtered = self.filtered_daemons();
359 filtered.get(self.selected).copied()
360 }
361
362 pub fn select_next(&mut self) {
363 let count = self.filtered_daemons().len();
364 if count > 0 {
365 self.selected = (self.selected + 1) % count;
366 }
367 }
368
369 pub fn select_prev(&mut self) {
370 let count = self.filtered_daemons().len();
371 if count > 0 {
372 self.selected = self.selected.checked_sub(1).unwrap_or(count - 1);
373 }
374 }
375
376 pub fn toggle_log_follow(&mut self) {
378 self.log_follow = !self.log_follow;
379 if self.log_follow && !self.log_content.is_empty() {
380 self.log_scroll = self.log_content.len().saturating_sub(20);
382 }
383 }
384
385 pub fn toggle_logs_expanded(&mut self) {
387 self.logs_expanded = !self.logs_expanded;
388 }
389
390 pub fn toggle_select(&mut self) {
392 if let Some(daemon) = self.selected_daemon() {
393 let id = daemon.id.clone();
394 if self.multi_select.contains(&id) {
395 self.multi_select.remove(&id);
396 } else {
397 self.multi_select.insert(id);
398 }
399 }
400 }
401
402 pub fn select_all_visible(&mut self) {
403 let ids: Vec<String> = self
405 .filtered_daemons()
406 .iter()
407 .map(|d| d.id.clone())
408 .collect();
409 for id in ids {
410 self.multi_select.insert(id);
411 }
412 }
413
414 pub fn clear_selection(&mut self) {
415 self.multi_select.clear();
416 }
417
418 pub fn is_selected(&self, daemon_id: &str) -> bool {
419 self.multi_select.contains(daemon_id)
420 }
421
422 pub fn has_selection(&self) -> bool {
423 !self.multi_select.is_empty()
424 }
425
426 pub fn selected_daemon_ids(&self) -> Vec<String> {
427 self.multi_select.iter().cloned().collect()
428 }
429
430 pub fn set_message(&mut self, msg: impl Into<String>) {
431 self.message = Some(msg.into());
432 self.message_time = Some(Instant::now());
433 }
434
435 pub fn clear_stale_message(&mut self) {
436 if let Some(time) = self.message_time
437 && time.elapsed().as_secs() >= 3
438 {
439 self.message = None;
440 self.message_time = None;
441 }
442 }
443
444 pub fn get_stats(&self, pid: u32) -> Option<&ProcessStats> {
445 self.process_stats.get(&pid)
446 }
447
448 fn refresh_process_stats(&mut self) {
449 PROCS.refresh_processes();
450 self.process_stats.clear();
451 for daemon in &self.daemons {
452 if let Some(pid) = daemon.pid
453 && let Some(stats) = PROCS.get_stats(pid)
454 {
455 self.process_stats.insert(pid, stats);
456 let history = self.stats_history.entry(daemon.id.clone()).or_default();
458 history.push(StatsSnapshot::from(&stats));
459 }
460 }
461 }
462
463 pub fn get_stats_history(&self, daemon_id: &str) -> Option<&StatsHistory> {
465 self.stats_history.get(daemon_id)
466 }
467
468 pub async fn refresh(&mut self, client: &IpcClient) -> Result<()> {
469 self.daemons = client.active_daemons().await?;
470 self.daemons.retain(|d| d.id != "pitchfork");
472 self.disabled = client.get_disabled_daemons().await?;
473
474 self.refresh_config_daemons();
476
477 self.refresh_process_stats();
479
480 self.clear_stale_message();
482
483 let total_count = self.total_daemon_count();
485 if total_count > 0 && self.selected >= total_count {
486 self.selected = total_count - 1;
487 }
488
489 if self.view == View::Logs
491 && let Some(id) = self.log_daemon_id.clone()
492 {
493 self.load_logs(&id);
494 }
495
496 Ok(())
497 }
498
499 fn refresh_config_daemons(&mut self) {
500 use crate::daemon_status::DaemonStatus;
501
502 let config = PitchforkToml::all_merged();
503 let active_ids: HashSet<String> = self.daemons.iter().map(|d| d.id.clone()).collect();
504
505 self.config_daemon_ids.clear();
507 for daemon_id in config.daemons.keys() {
508 if !active_ids.contains(daemon_id) && daemon_id != "pitchfork" {
509 self.config_daemon_ids.insert(daemon_id.clone());
510
511 if self.show_available {
513 let placeholder = Daemon {
514 id: daemon_id.clone(),
515 title: None,
516 pid: None,
517 shell_pid: None,
518 status: DaemonStatus::Stopped,
519 dir: None,
520 autostop: false,
521 cron_schedule: None,
522 cron_retrigger: None,
523 last_exit_success: None,
524 retry: 0,
525 retry_count: 0,
526 ready_delay: None,
527 ready_output: None,
528 ready_http: None,
529 ready_port: None,
530 depends: vec![],
531 };
532 self.daemons.push(placeholder);
533 }
534 }
535 }
536 }
537
538 pub fn is_config_only(&self, daemon_id: &str) -> bool {
540 self.config_daemon_ids.contains(daemon_id)
541 }
542
543 pub fn toggle_show_available(&mut self) {
545 self.show_available = !self.show_available;
546 }
547
548 fn total_daemon_count(&self) -> usize {
550 self.filtered_daemons().len()
551 }
552
553 pub fn scroll_logs_down(&mut self) {
554 if self.log_content.len() > 20 {
555 let max_scroll = self.log_content.len().saturating_sub(20);
556 self.log_scroll = (self.log_scroll + 1).min(max_scroll);
557 }
558 }
559
560 pub fn scroll_logs_up(&mut self) {
561 self.log_scroll = self.log_scroll.saturating_sub(1);
562 }
563
564 pub fn scroll_logs_page_down(&mut self, visible_lines: usize) {
566 let half_page = visible_lines / 2;
567 if self.log_content.len() > visible_lines {
568 let max_scroll = self.log_content.len().saturating_sub(visible_lines);
569 self.log_scroll = (self.log_scroll + half_page).min(max_scroll);
570 }
571 }
572
573 pub fn scroll_logs_page_up(&mut self, visible_lines: usize) {
575 let half_page = visible_lines / 2;
576 self.log_scroll = self.log_scroll.saturating_sub(half_page);
577 }
578
579 pub fn start_log_search(&mut self) {
581 self.log_search_active = true;
582 self.log_search_query.clear();
583 self.log_search_matches.clear();
584 self.log_search_current = 0;
585 }
586
587 pub fn end_log_search(&mut self) {
588 self.log_search_active = false;
589 }
590
591 pub fn clear_log_search(&mut self) {
592 self.log_search_query.clear();
593 self.log_search_active = false;
594 self.log_search_matches.clear();
595 self.log_search_current = 0;
596 }
597
598 pub fn log_search_push(&mut self, c: char) {
599 self.log_search_query.push(c);
600 self.update_log_search_matches();
601 }
602
603 pub fn log_search_pop(&mut self) {
604 self.log_search_query.pop();
605 self.update_log_search_matches();
606 }
607
608 fn update_log_search_matches(&mut self) {
609 self.log_search_matches.clear();
610 if !self.log_search_query.is_empty() {
611 let query = self.log_search_query.to_lowercase();
612 for (i, line) in self.log_content.iter().enumerate() {
613 if line.to_lowercase().contains(&query) {
614 self.log_search_matches.push(i);
615 }
616 }
617 if !self.log_search_matches.is_empty() {
619 self.log_search_current = 0;
620 self.jump_to_log_match();
621 }
622 }
623 }
624
625 pub fn log_search_next(&mut self) {
626 if !self.log_search_matches.is_empty() {
627 self.log_search_current = (self.log_search_current + 1) % self.log_search_matches.len();
628 self.jump_to_log_match();
629 }
630 }
631
632 pub fn log_search_prev(&mut self) {
633 if !self.log_search_matches.is_empty() {
634 self.log_search_current = self
635 .log_search_current
636 .checked_sub(1)
637 .unwrap_or(self.log_search_matches.len() - 1);
638 self.jump_to_log_match();
639 }
640 }
641
642 fn jump_to_log_match(&mut self) {
643 if let Some(&line_idx) = self.log_search_matches.get(self.log_search_current) {
644 let half_page = 10; self.log_scroll = line_idx.saturating_sub(half_page);
647 self.log_follow = false;
648 }
649 }
650
651 pub fn show_details(&mut self, daemon_id: &str) {
653 self.details_daemon_id = Some(daemon_id.to_string());
654 self.prev_view = self.view;
655 self.view = View::Details;
656 }
657
658 pub fn hide_details(&mut self) {
659 self.details_daemon_id = None;
660 self.view = View::Dashboard;
661 }
662
663 pub fn view_daemon_details(&mut self, daemon_id: &str) {
665 self.log_daemon_id = Some(daemon_id.to_string());
666 self.logs_expanded = false; self.load_logs(daemon_id);
668 self.view = View::Logs; }
670
671 fn load_logs(&mut self, daemon_id: &str) {
672 let log_path = Self::log_path(daemon_id);
673 let prev_len = self.log_content.len();
674
675 self.log_content = if log_path.exists() {
676 fs::read_to_string(&log_path)
677 .unwrap_or_default()
678 .lines()
679 .map(String::from)
680 .collect()
681 } else {
682 vec!["No logs available".to_string()]
683 };
684
685 if self.log_follow {
687 if self.log_content.len() > 20 {
688 self.log_scroll = self.log_content.len().saturating_sub(20);
689 } else {
690 self.log_scroll = 0;
691 }
692 } else if prev_len == 0 {
693 if self.log_content.len() > 20 {
695 self.log_scroll = self.log_content.len().saturating_sub(20);
696 }
697 }
698 }
700
701 fn log_path(daemon_id: &str) -> PathBuf {
702 PITCHFORK_LOGS_DIR
703 .join(daemon_id)
704 .join(format!("{daemon_id}.log"))
705 }
706
707 pub fn show_help(&mut self) {
708 self.view = View::Help;
709 }
710
711 pub fn back_to_dashboard(&mut self) {
712 self.view = View::Dashboard;
713 self.log_daemon_id = None;
714 self.log_content.clear();
715 self.log_scroll = 0;
716 }
717
718 pub fn stats(&self) -> (usize, usize, usize, usize, usize) {
720 let available = self.config_daemon_ids.len();
721 let total = self.daemons.len();
722 let running = self
723 .daemons
724 .iter()
725 .filter(|d| d.status.is_running())
726 .count();
727 let stopped = self
729 .daemons
730 .iter()
731 .filter(|d| d.status.is_stopped() && !self.config_daemon_ids.contains(&d.id))
732 .count();
733 let errored = self
734 .daemons
735 .iter()
736 .filter(|d| d.status.is_errored() || d.status.is_failed())
737 .count();
738 (total, running, stopped, errored, available)
739 }
740
741 pub fn is_disabled(&self, daemon_id: &str) -> bool {
742 self.disabled.contains(&daemon_id.to_string())
743 }
744
745 pub async fn start_daemon(&mut self, client: &IpcClient, daemon_id: &str) -> Result<()> {
746 let config = PitchforkToml::all_merged();
748 let daemon_config = config
749 .daemons
750 .get(daemon_id)
751 .ok_or_else(|| miette::miette!("Daemon '{}' not found in config", daemon_id))?;
752
753 let cmd = shell_words::split(&daemon_config.run)
754 .map_err(|e| miette::miette!("Failed to parse command: {}", e))?;
755
756 if cmd.is_empty() {
757 bail!("Daemon '{}' has empty run command", daemon_id);
758 }
759
760 let (cron_schedule, cron_retrigger) = daemon_config
761 .cron
762 .as_ref()
763 .map(|c| (Some(c.schedule.clone()), Some(c.retrigger)))
764 .unwrap_or((None, None));
765
766 let opts = RunOptions {
767 id: daemon_id.to_string(),
768 cmd,
769 force: false,
770 shell_pid: None,
771 dir: std::env::current_dir().unwrap_or_default(),
772 autostop: false,
773 cron_schedule,
774 cron_retrigger,
775 retry: daemon_config.retry,
776 retry_count: 0,
777 ready_delay: daemon_config.ready_delay,
778 ready_output: daemon_config.ready_output.clone(),
779 ready_http: daemon_config.ready_http.clone(),
780 ready_port: daemon_config.ready_port,
781 wait_ready: false,
782 depends: daemon_config.depends.clone(),
783 };
784
785 client.run(opts).await?;
786 self.set_message(format!("Started {}", daemon_id));
787 Ok(())
788 }
789}
790
791impl Default for App {
792 fn default() -> Self {
793 Self::new()
794 }
795}