Skip to main content

systemprompt_api/services/health/
monitor.rs

1//! Background process monitor for managed services.
2//!
3//! [`ProcessMonitor`] spawns a polling loop that checks each running service's
4//! PID against the live process table, marks vanished processes as crashed, and
5//! reports aggregate state via [`HealthSummary`] and [`ModuleHealth`].
6
7use anyhow::Result;
8use std::time::Duration;
9use systemprompt_database::{DbPool, ServiceRepository};
10use systemprompt_scheduler::ProcessCleanup;
11use tokio::task::JoinHandle;
12use tracing::{info, warn};
13
14#[derive(Debug)]
15pub struct ProcessMonitor {
16    db_pool: DbPool,
17    monitor_handle: Option<JoinHandle<()>>,
18    check_interval: Duration,
19}
20
21impl ProcessMonitor {
22    pub const fn new(db_pool: DbPool) -> Self {
23        Self {
24            db_pool,
25            monitor_handle: None,
26            check_interval: Duration::from_secs(30),
27        }
28    }
29
30    pub const fn with_interval(db_pool: DbPool, interval: Duration) -> Self {
31        Self {
32            db_pool,
33            monitor_handle: None,
34            check_interval: interval,
35        }
36    }
37
38    pub fn start(&mut self) {
39        if self.monitor_handle.is_some() {
40            warn!("Process monitor already started");
41            return;
42        }
43
44        info!("Starting centralized process monitoring");
45
46        let db_pool_clone = std::sync::Arc::clone(&self.db_pool);
47        let interval = self.check_interval;
48
49        let handle = tokio::spawn(async move { Self::monitor_loop(db_pool_clone, interval).await });
50
51        self.monitor_handle = Some(handle);
52        info!("Centralized process monitoring started");
53    }
54
55    pub fn stop(&mut self) {
56        if let Some(handle) = self.monitor_handle.take() {
57            info!("Stopping process monitoring");
58            handle.abort();
59            info!("Process monitoring stopped");
60        }
61    }
62
63    pub const fn is_running(&self) -> bool {
64        self.monitor_handle.is_some()
65    }
66
67    async fn monitor_loop(db_pool: DbPool, check_interval: Duration) {
68        info!(
69            interval_secs = check_interval.as_secs(),
70            "Process monitor loop started"
71        );
72
73        let mut interval = tokio::time::interval(check_interval);
74
75        loop {
76            interval.tick().await;
77
78            if let Err(e) = Self::perform_monitoring_cycle(&db_pool).await {
79                warn!(error = %e, "Monitoring cycle failed");
80            }
81        }
82    }
83
84    async fn perform_monitoring_cycle(db_pool: &DbPool) -> Result<()> {
85        let repository = ServiceRepository::new(db_pool)?;
86        let services = repository.get_running_services_with_pid().await?;
87
88        if services.is_empty() {
89            return Ok(());
90        }
91
92        let mut healthy_count = 0;
93        let mut crashed_count = 0;
94
95        for service in services {
96            if let Some(pid) = service.pid {
97                let pid = pid as u32;
98
99                if Self::process_exists(pid) {
100                    healthy_count += 1;
101                } else {
102                    repository.mark_service_crashed(&service.name).await?;
103
104                    crashed_count += 1;
105                    warn!(
106                        module = %service.module_name,
107                        service = %service.name,
108                        pid = pid,
109                        "Detected crashed service"
110                    );
111                }
112            }
113        }
114
115        if crashed_count == 0 {
116            info!(healthy = healthy_count, "All services healthy");
117        } else {
118            warn!(
119                healthy = healthy_count,
120                crashed = crashed_count,
121                "Service health check completed with failures"
122            );
123        }
124
125        Ok(())
126    }
127
128    fn process_exists(pid: u32) -> bool {
129        ProcessCleanup::process_exists(pid)
130    }
131
132    pub async fn health_check_all(&self) -> Result<HealthSummary> {
133        info!("Running health check on all services");
134
135        let repository = ServiceRepository::new(&self.db_pool)?;
136        let services = repository.get_running_services_with_pid().await?;
137
138        let mut summary = HealthSummary::default();
139
140        for service in services {
141            if let Some(pid) = service.pid {
142                let pid = pid as u32;
143                let healthy = Self::process_exists(pid);
144
145                info!(
146                    module = %service.module_name,
147                    service = %service.name,
148                    pid = pid,
149                    healthy = healthy,
150                    "Service health status"
151                );
152
153                *summary
154                    .modules
155                    .entry(service.module_name)
156                    .or_insert_with(ModuleHealth::default) += if healthy {
157                    ModuleHealth {
158                        healthy: 1,
159                        crashed: 0,
160                    }
161                } else {
162                    ModuleHealth {
163                        healthy: 0,
164                        crashed: 1,
165                    }
166                };
167            }
168        }
169
170        let total_healthy = summary.modules.values().map(|m| m.healthy).sum::<u32>();
171        let total_crashed = summary.modules.values().map(|m| m.crashed).sum::<u32>();
172
173        if total_crashed == 0 {
174            info!(healthy = total_healthy, "All services are healthy");
175        } else {
176            warn!(
177                healthy = total_healthy,
178                total = total_healthy + total_crashed,
179                "Some services are unhealthy"
180            );
181        }
182
183        Ok(summary)
184    }
185}
186
187impl Drop for ProcessMonitor {
188    fn drop(&mut self) {
189        if let Some(handle) = self.monitor_handle.take() {
190            handle.abort();
191        }
192    }
193}
194
195#[derive(Debug, Default)]
196pub struct HealthSummary {
197    pub modules: std::collections::HashMap<String, ModuleHealth>,
198}
199
200#[derive(Debug, Default, Copy, Clone)]
201pub struct ModuleHealth {
202    pub healthy: u32,
203    pub crashed: u32,
204}
205
206impl std::ops::AddAssign for ModuleHealth {
207    fn add_assign(&mut self, other: Self) {
208        self.healthy += other.healthy;
209        self.crashed += other.crashed;
210    }
211}
212
213impl HealthSummary {
214    pub fn total_healthy(&self) -> u32 {
215        self.modules.values().map(|m| m.healthy).sum()
216    }
217
218    pub fn total_crashed(&self) -> u32 {
219        self.modules.values().map(|m| m.crashed).sum()
220    }
221
222    pub fn is_all_healthy(&self) -> bool {
223        self.total_crashed() == 0
224    }
225}