systemprompt_api/services/health/
monitor.rs1use anyhow::Result;
2use std::time::Duration;
3use systemprompt_database::{DbPool, ServiceRepository};
4use systemprompt_scheduler::ProcessCleanup;
5use tokio::task::JoinHandle;
6use tracing::{info, warn};
7
8#[derive(Debug)]
9pub struct ProcessMonitor {
10 db_pool: DbPool,
11 monitor_handle: Option<JoinHandle<()>>,
12 check_interval: Duration,
13}
14
15impl ProcessMonitor {
16 pub const fn new(db_pool: DbPool) -> Self {
17 Self {
18 db_pool,
19 monitor_handle: None,
20 check_interval: Duration::from_secs(30),
21 }
22 }
23
24 pub const fn with_interval(db_pool: DbPool, interval: Duration) -> Self {
25 Self {
26 db_pool,
27 monitor_handle: None,
28 check_interval: interval,
29 }
30 }
31
32 pub fn start(&mut self) {
33 if self.monitor_handle.is_some() {
34 warn!("Process monitor already started");
35 return;
36 }
37
38 info!("Starting centralized process monitoring");
39
40 let db_pool_clone = std::sync::Arc::clone(&self.db_pool);
41 let interval = self.check_interval;
42
43 let handle = tokio::spawn(async move { Self::monitor_loop(db_pool_clone, interval).await });
44
45 self.monitor_handle = Some(handle);
46 info!("Centralized process monitoring started");
47 }
48
49 pub fn stop(&mut self) {
50 if let Some(handle) = self.monitor_handle.take() {
51 info!("Stopping process monitoring");
52 handle.abort();
53 info!("Process monitoring stopped");
54 }
55 }
56
57 pub const fn is_running(&self) -> bool {
58 self.monitor_handle.is_some()
59 }
60
61 async fn monitor_loop(db_pool: DbPool, check_interval: Duration) {
62 info!(
63 interval_secs = check_interval.as_secs(),
64 "Process monitor loop started"
65 );
66
67 let mut interval = tokio::time::interval(check_interval);
68
69 loop {
70 interval.tick().await;
71
72 if let Err(e) = Self::perform_monitoring_cycle(&db_pool).await {
73 warn!(error = %e, "Monitoring cycle failed");
74 }
75 }
76 }
77
78 async fn perform_monitoring_cycle(db_pool: &DbPool) -> Result<()> {
79 let repository = ServiceRepository::new(std::sync::Arc::clone(db_pool));
80 let services = repository.get_running_services_with_pid().await?;
81
82 if services.is_empty() {
83 return Ok(());
84 }
85
86 let mut healthy_count = 0;
87 let mut crashed_count = 0;
88
89 for service in services {
90 if let Some(pid) = service.pid {
91 let pid = pid as u32;
92
93 if Self::process_exists(pid) {
94 healthy_count += 1;
95 } else {
96 repository.mark_service_crashed(&service.name).await?;
97
98 crashed_count += 1;
99 warn!(
100 module = %service.module_name,
101 service = %service.name,
102 pid = pid,
103 "Detected crashed service"
104 );
105 }
106 }
107 }
108
109 if crashed_count == 0 {
110 info!(healthy = healthy_count, "All services healthy");
111 } else {
112 warn!(
113 healthy = healthy_count,
114 crashed = crashed_count,
115 "Service health check completed with failures"
116 );
117 }
118
119 Ok(())
120 }
121
122 fn process_exists(pid: u32) -> bool {
123 ProcessCleanup::process_exists(pid)
124 }
125
126 pub async fn health_check_all(&self) -> Result<HealthSummary> {
127 info!("Running health check on all services");
128
129 let repository = ServiceRepository::new(std::sync::Arc::clone(&self.db_pool));
130 let services = repository.get_running_services_with_pid().await?;
131
132 let mut summary = HealthSummary::default();
133
134 for service in services {
135 if let Some(pid) = service.pid {
136 let pid = pid as u32;
137 let healthy = Self::process_exists(pid);
138
139 info!(
140 module = %service.module_name,
141 service = %service.name,
142 pid = pid,
143 healthy = healthy,
144 "Service health status"
145 );
146
147 *summary
148 .modules
149 .entry(service.module_name)
150 .or_insert_with(ModuleHealth::default) += if healthy {
151 ModuleHealth {
152 healthy: 1,
153 crashed: 0,
154 }
155 } else {
156 ModuleHealth {
157 healthy: 0,
158 crashed: 1,
159 }
160 };
161 }
162 }
163
164 let total_healthy = summary.modules.values().map(|m| m.healthy).sum::<u32>();
165 let total_crashed = summary.modules.values().map(|m| m.crashed).sum::<u32>();
166
167 if total_crashed == 0 {
168 info!(healthy = total_healthy, "All services are healthy");
169 } else {
170 warn!(
171 healthy = total_healthy,
172 total = total_healthy + total_crashed,
173 "Some services are unhealthy"
174 );
175 }
176
177 Ok(summary)
178 }
179}
180
181impl Drop for ProcessMonitor {
182 fn drop(&mut self) {
183 if let Some(handle) = self.monitor_handle.take() {
184 handle.abort();
185 }
186 }
187}
188
189#[derive(Debug, Default)]
190pub struct HealthSummary {
191 pub modules: std::collections::HashMap<String, ModuleHealth>,
192}
193
194#[derive(Debug, Default, Copy, Clone)]
195pub struct ModuleHealth {
196 pub healthy: u32,
197 pub crashed: u32,
198}
199
200impl std::ops::AddAssign for ModuleHealth {
201 fn add_assign(&mut self, other: Self) {
202 self.healthy += other.healthy;
203 self.crashed += other.crashed;
204 }
205}
206
207impl HealthSummary {
208 pub fn total_healthy(&self) -> u32 {
209 self.modules.values().map(|m| m.healthy).sum()
210 }
211
212 pub fn total_crashed(&self) -> u32 {
213 self.modules.values().map(|m| m.crashed).sum()
214 }
215
216 pub fn is_all_healthy(&self) -> bool {
217 self.total_crashed() == 0
218 }
219}