Skip to main content

systemprompt_api/services/health/
monitor.rs

1use anyhow::Result;
2use std::time::Duration;
3use systemprompt_database::{DbPool, ServiceRepository};
4use systemprompt_scheduler::ProcessCleanup;
5use tokio::task::JoinHandle;
6use tracing::{info, warn};
7
8#[derive(Debug)]
9pub struct ProcessMonitor {
10    db_pool: DbPool,
11    monitor_handle: Option<JoinHandle<()>>,
12    check_interval: Duration,
13}
14
15impl ProcessMonitor {
16    pub const fn new(db_pool: DbPool) -> Self {
17        Self {
18            db_pool,
19            monitor_handle: None,
20            check_interval: Duration::from_secs(30),
21        }
22    }
23
24    pub const fn with_interval(db_pool: DbPool, interval: Duration) -> Self {
25        Self {
26            db_pool,
27            monitor_handle: None,
28            check_interval: interval,
29        }
30    }
31
32    pub fn start(&mut self) {
33        if self.monitor_handle.is_some() {
34            warn!("Process monitor already started");
35            return;
36        }
37
38        info!("Starting centralized process monitoring");
39
40        let db_pool_clone = std::sync::Arc::clone(&self.db_pool);
41        let interval = self.check_interval;
42
43        let handle = tokio::spawn(async move { Self::monitor_loop(db_pool_clone, interval).await });
44
45        self.monitor_handle = Some(handle);
46        info!("Centralized process monitoring started");
47    }
48
49    pub fn stop(&mut self) {
50        if let Some(handle) = self.monitor_handle.take() {
51            info!("Stopping process monitoring");
52            handle.abort();
53            info!("Process monitoring stopped");
54        }
55    }
56
57    pub const fn is_running(&self) -> bool {
58        self.monitor_handle.is_some()
59    }
60
61    async fn monitor_loop(db_pool: DbPool, check_interval: Duration) {
62        info!(
63            interval_secs = check_interval.as_secs(),
64            "Process monitor loop started"
65        );
66
67        let mut interval = tokio::time::interval(check_interval);
68
69        loop {
70            interval.tick().await;
71
72            if let Err(e) = Self::perform_monitoring_cycle(&db_pool).await {
73                warn!(error = %e, "Monitoring cycle failed");
74            }
75        }
76    }
77
78    async fn perform_monitoring_cycle(db_pool: &DbPool) -> Result<()> {
79        let repository = ServiceRepository::new(std::sync::Arc::clone(db_pool));
80        let services = repository.get_running_services_with_pid().await?;
81
82        if services.is_empty() {
83            return Ok(());
84        }
85
86        let mut healthy_count = 0;
87        let mut crashed_count = 0;
88
89        for service in services {
90            if let Some(pid) = service.pid {
91                let pid = pid as u32;
92
93                if Self::process_exists(pid) {
94                    healthy_count += 1;
95                } else {
96                    repository.mark_service_crashed(&service.name).await?;
97
98                    crashed_count += 1;
99                    warn!(
100                        module = %service.module_name,
101                        service = %service.name,
102                        pid = pid,
103                        "Detected crashed service"
104                    );
105                }
106            }
107        }
108
109        if crashed_count == 0 {
110            info!(healthy = healthy_count, "All services healthy");
111        } else {
112            warn!(
113                healthy = healthy_count,
114                crashed = crashed_count,
115                "Service health check completed with failures"
116            );
117        }
118
119        Ok(())
120    }
121
122    fn process_exists(pid: u32) -> bool {
123        ProcessCleanup::process_exists(pid)
124    }
125
126    pub async fn health_check_all(&self) -> Result<HealthSummary> {
127        info!("Running health check on all services");
128
129        let repository = ServiceRepository::new(std::sync::Arc::clone(&self.db_pool));
130        let services = repository.get_running_services_with_pid().await?;
131
132        let mut summary = HealthSummary::default();
133
134        for service in services {
135            if let Some(pid) = service.pid {
136                let pid = pid as u32;
137                let healthy = Self::process_exists(pid);
138
139                info!(
140                    module = %service.module_name,
141                    service = %service.name,
142                    pid = pid,
143                    healthy = healthy,
144                    "Service health status"
145                );
146
147                *summary
148                    .modules
149                    .entry(service.module_name)
150                    .or_insert_with(ModuleHealth::default) += if healthy {
151                    ModuleHealth {
152                        healthy: 1,
153                        crashed: 0,
154                    }
155                } else {
156                    ModuleHealth {
157                        healthy: 0,
158                        crashed: 1,
159                    }
160                };
161            }
162        }
163
164        let total_healthy = summary.modules.values().map(|m| m.healthy).sum::<u32>();
165        let total_crashed = summary.modules.values().map(|m| m.crashed).sum::<u32>();
166
167        if total_crashed == 0 {
168            info!(healthy = total_healthy, "All services are healthy");
169        } else {
170            warn!(
171                healthy = total_healthy,
172                total = total_healthy + total_crashed,
173                "Some services are unhealthy"
174            );
175        }
176
177        Ok(summary)
178    }
179}
180
181impl Drop for ProcessMonitor {
182    fn drop(&mut self) {
183        if let Some(handle) = self.monitor_handle.take() {
184            handle.abort();
185        }
186    }
187}
188
189#[derive(Debug, Default)]
190pub struct HealthSummary {
191    pub modules: std::collections::HashMap<String, ModuleHealth>,
192}
193
194#[derive(Debug, Default, Copy, Clone)]
195pub struct ModuleHealth {
196    pub healthy: u32,
197    pub crashed: u32,
198}
199
200impl std::ops::AddAssign for ModuleHealth {
201    fn add_assign(&mut self, other: Self) {
202        self.healthy += other.healthy;
203        self.crashed += other.crashed;
204    }
205}
206
207impl HealthSummary {
208    pub fn total_healthy(&self) -> u32 {
209        self.modules.values().map(|m| m.healthy).sum()
210    }
211
212    pub fn total_crashed(&self) -> u32 {
213        self.modules.values().map(|m| m.crashed).sum()
214    }
215
216    pub fn is_all_healthy(&self) -> bool {
217        self.total_crashed() == 0
218    }
219}