systemprompt_api/services/health/
monitor.rs1use anyhow::Result;
8use std::time::Duration;
9use systemprompt_database::{DbPool, ServiceRepository};
10use systemprompt_scheduler::ProcessCleanup;
11use tokio::task::JoinHandle;
12use tracing::{info, warn};
13
14#[derive(Debug)]
15pub struct ProcessMonitor {
16 db_pool: DbPool,
17 monitor_handle: Option<JoinHandle<()>>,
18 check_interval: Duration,
19}
20
21impl ProcessMonitor {
22 pub const fn new(db_pool: DbPool) -> Self {
23 Self {
24 db_pool,
25 monitor_handle: None,
26 check_interval: Duration::from_secs(30),
27 }
28 }
29
30 pub const fn with_interval(db_pool: DbPool, interval: Duration) -> Self {
31 Self {
32 db_pool,
33 monitor_handle: None,
34 check_interval: interval,
35 }
36 }
37
38 pub fn start(&mut self) {
39 if self.monitor_handle.is_some() {
40 warn!("Process monitor already started");
41 return;
42 }
43
44 info!("Starting centralized process monitoring");
45
46 let db_pool_clone = std::sync::Arc::clone(&self.db_pool);
47 let interval = self.check_interval;
48
49 let handle = tokio::spawn(async move { Self::monitor_loop(db_pool_clone, interval).await });
50
51 self.monitor_handle = Some(handle);
52 info!("Centralized process monitoring started");
53 }
54
55 pub fn stop(&mut self) {
56 if let Some(handle) = self.monitor_handle.take() {
57 info!("Stopping process monitoring");
58 handle.abort();
59 info!("Process monitoring stopped");
60 }
61 }
62
63 pub const fn is_running(&self) -> bool {
64 self.monitor_handle.is_some()
65 }
66
67 async fn monitor_loop(db_pool: DbPool, check_interval: Duration) {
68 info!(
69 interval_secs = check_interval.as_secs(),
70 "Process monitor loop started"
71 );
72
73 let mut interval = tokio::time::interval(check_interval);
74
75 loop {
76 interval.tick().await;
77
78 if let Err(e) = Self::perform_monitoring_cycle(&db_pool).await {
79 warn!(error = %e, "Monitoring cycle failed");
80 }
81 }
82 }
83
84 async fn perform_monitoring_cycle(db_pool: &DbPool) -> Result<()> {
85 let repository = ServiceRepository::new(db_pool)?;
86 let services = repository.get_running_services_with_pid().await?;
87
88 if services.is_empty() {
89 return Ok(());
90 }
91
92 let mut healthy_count = 0;
93 let mut crashed_count = 0;
94
95 for service in services {
96 if let Some(pid) = service.pid {
97 let pid = pid as u32;
98
99 if Self::process_exists(pid) {
100 healthy_count += 1;
101 } else {
102 repository.mark_service_crashed(&service.name).await?;
103
104 crashed_count += 1;
105 warn!(
106 module = %service.module_name,
107 service = %service.name,
108 pid = pid,
109 "Detected crashed service"
110 );
111 }
112 }
113 }
114
115 if crashed_count == 0 {
116 info!(healthy = healthy_count, "All services healthy");
117 } else {
118 warn!(
119 healthy = healthy_count,
120 crashed = crashed_count,
121 "Service health check completed with failures"
122 );
123 }
124
125 Ok(())
126 }
127
128 fn process_exists(pid: u32) -> bool {
129 ProcessCleanup::process_exists(pid)
130 }
131
132 pub async fn health_check_all(&self) -> Result<HealthSummary> {
133 info!("Running health check on all services");
134
135 let repository = ServiceRepository::new(&self.db_pool)?;
136 let services = repository.get_running_services_with_pid().await?;
137
138 let mut summary = HealthSummary::default();
139
140 for service in services {
141 if let Some(pid) = service.pid {
142 let pid = pid as u32;
143 let healthy = Self::process_exists(pid);
144
145 info!(
146 module = %service.module_name,
147 service = %service.name,
148 pid = pid,
149 healthy = healthy,
150 "Service health status"
151 );
152
153 *summary
154 .modules
155 .entry(service.module_name)
156 .or_insert_with(ModuleHealth::default) += if healthy {
157 ModuleHealth {
158 healthy: 1,
159 crashed: 0,
160 }
161 } else {
162 ModuleHealth {
163 healthy: 0,
164 crashed: 1,
165 }
166 };
167 }
168 }
169
170 let total_healthy = summary.modules.values().map(|m| m.healthy).sum::<u32>();
171 let total_crashed = summary.modules.values().map(|m| m.crashed).sum::<u32>();
172
173 if total_crashed == 0 {
174 info!(healthy = total_healthy, "All services are healthy");
175 } else {
176 warn!(
177 healthy = total_healthy,
178 total = total_healthy + total_crashed,
179 "Some services are unhealthy"
180 );
181 }
182
183 Ok(summary)
184 }
185}
186
187impl Drop for ProcessMonitor {
188 fn drop(&mut self) {
189 if let Some(handle) = self.monitor_handle.take() {
190 handle.abort();
191 }
192 }
193}
194
195#[derive(Debug, Default)]
196pub struct HealthSummary {
197 pub modules: std::collections::HashMap<String, ModuleHealth>,
198}
199
200#[derive(Debug, Default, Copy, Clone)]
201pub struct ModuleHealth {
202 pub healthy: u32,
203 pub crashed: u32,
204}
205
206impl std::ops::AddAssign for ModuleHealth {
207 fn add_assign(&mut self, other: Self) {
208 self.healthy += other.healthy;
209 self.crashed += other.crashed;
210 }
211}
212
213impl HealthSummary {
214 pub fn total_healthy(&self) -> u32 {
215 self.modules.values().map(|m| m.healthy).sum()
216 }
217
218 pub fn total_crashed(&self) -> u32 {
219 self.modules.values().map(|m| m.crashed).sum()
220 }
221
222 pub fn is_all_healthy(&self) -> bool {
223 self.total_crashed() == 0
224 }
225}