1use std::collections::HashMap;
4use std::sync::Arc;
5use std::time::Duration;
6
7use forge_core::CircuitBreakerClient;
8use forge_core::Result;
9use forge_core::daemon::{DaemonContext, DaemonStatus};
10use futures_util::FutureExt;
11use sqlx::PgPool;
12use tokio::sync::{broadcast, watch};
13use tracing::{error, info, warn};
14use uuid::Uuid;
15
16use super::registry::DaemonRegistry;
17
18#[derive(Debug, Clone)]
20pub struct DaemonRunnerConfig {
21 pub health_check_interval: Duration,
23 pub heartbeat_interval: Duration,
25}
26
27impl Default for DaemonRunnerConfig {
28 fn default() -> Self {
29 Self {
30 health_check_interval: Duration::from_secs(30),
31 heartbeat_interval: Duration::from_secs(10),
32 }
33 }
34}
35
36pub struct DaemonRunner {
38 registry: Arc<DaemonRegistry>,
39 pool: PgPool,
40 http_client: CircuitBreakerClient,
41 node_id: Uuid,
42 config: DaemonRunnerConfig,
43 shutdown_rx: broadcast::Receiver<()>,
44}
45
46impl DaemonRunner {
47 pub fn new(
49 registry: Arc<DaemonRegistry>,
50 pool: PgPool,
51 http_client: CircuitBreakerClient,
52 node_id: Uuid,
53 shutdown_rx: broadcast::Receiver<()>,
54 ) -> Self {
55 Self {
56 registry,
57 pool,
58 http_client,
59 node_id,
60 config: DaemonRunnerConfig::default(),
61 shutdown_rx,
62 }
63 }
64
65 pub fn with_config(mut self, config: DaemonRunnerConfig) -> Self {
67 self.config = config;
68 self
69 }
70
71 pub async fn run(mut self) -> Result<()> {
73 if self.registry.is_empty() {
74 info!("No daemons registered, daemon runner idle");
75 let _ = self.shutdown_rx.recv().await;
77 return Ok(());
78 }
79
80 info!(
81 "Starting daemon runner with {} daemons",
82 self.registry.len()
83 );
84
85 let mut daemon_handles: HashMap<String, DaemonHandle> = HashMap::new();
87
88 for (name, entry) in self.registry.daemons() {
90 let info = &entry.info;
91
92 let (shutdown_tx, shutdown_rx) = watch::channel(false);
94
95 let handle = DaemonHandle {
96 name: name.to_string(),
97 instance_id: Uuid::new_v4(),
98 shutdown_tx,
99 restarts: 0,
100 status: DaemonStatus::Pending,
101 };
102
103 if let Err(e) = self.record_daemon_start(&handle).await {
105 error!(daemon = name, error = %e, "Failed to record daemon start");
106 }
107
108 let daemon_entry = entry.clone();
110 let pool = self.pool.clone();
111 let http_client = self.http_client.clone();
112 let daemon_name = name.to_string();
113 let startup_delay = info.startup_delay;
114 let restart_on_panic = info.restart_on_panic;
115 let restart_delay = info.restart_delay;
116 let max_restarts = info.max_restarts;
117 let leader_elected = info.leader_elected;
118 let node_id = self.node_id;
119
120 tokio::spawn(async move {
121 run_daemon_loop(
122 daemon_name,
123 daemon_entry,
124 pool,
125 http_client,
126 shutdown_rx,
127 node_id,
128 startup_delay,
129 restart_on_panic,
130 restart_delay,
131 max_restarts,
132 leader_elected,
133 )
134 .await
135 });
136
137 daemon_handles.insert(name.to_string(), handle);
138 }
139
140 let _ = self.shutdown_rx.recv().await;
142 info!("Daemon runner received shutdown signal");
143
144 for (name, handle) in &daemon_handles {
146 info!(daemon = name, "Signaling daemon to stop");
147 let _ = handle.shutdown_tx.send(true);
148 }
149
150 tokio::time::sleep(Duration::from_secs(2)).await;
152
153 for (name, handle) in &daemon_handles {
155 if let Err(e) = self.record_daemon_stop(handle).await {
156 warn!(daemon = name, error = %e, "Failed to record daemon stop");
157 }
158 }
159
160 Ok(())
161 }
162
163 async fn record_daemon_start(&self, handle: &DaemonHandle) -> Result<()> {
164 sqlx::query(
165 r#"
166 INSERT INTO forge_daemons (name, node_id, instance_id, status, restarts, started_at, last_heartbeat)
167 VALUES ($1, $2, $3, $4, $5, NOW(), NOW())
168 ON CONFLICT (name) DO UPDATE SET
169 node_id = EXCLUDED.node_id,
170 instance_id = EXCLUDED.instance_id,
171 status = EXCLUDED.status,
172 restarts = EXCLUDED.restarts,
173 started_at = NOW(),
174 last_heartbeat = NOW(),
175 last_error = NULL
176 "#,
177 )
178 .bind(&handle.name)
179 .bind(self.node_id)
180 .bind(handle.instance_id)
181 .bind(handle.status.as_str())
182 .bind(handle.restarts as i32)
183 .execute(&self.pool)
184 .await
185 .map_err(|e| forge_core::ForgeError::Database(e.to_string()))?;
186
187 Ok(())
188 }
189
190 async fn record_daemon_stop(&self, handle: &DaemonHandle) -> Result<()> {
191 sqlx::query(
192 r#"
193 UPDATE forge_daemons
194 SET status = 'stopped', last_heartbeat = NOW()
195 WHERE name = $1 AND instance_id = $2
196 "#,
197 )
198 .bind(&handle.name)
199 .bind(handle.instance_id)
200 .execute(&self.pool)
201 .await
202 .map_err(|e| forge_core::ForgeError::Database(e.to_string()))?;
203
204 Ok(())
205 }
206}
207
208struct DaemonHandle {
209 name: String,
210 instance_id: Uuid,
211 shutdown_tx: watch::Sender<bool>,
212 restarts: u32,
213 status: DaemonStatus,
214}
215
216#[allow(clippy::too_many_arguments)]
217async fn run_daemon_loop(
218 name: String,
219 entry: Arc<super::registry::DaemonEntry>,
220 pool: PgPool,
221 http_client: CircuitBreakerClient,
222 mut shutdown_rx: watch::Receiver<bool>,
223 node_id: Uuid,
224 startup_delay: Duration,
225 restart_on_panic: bool,
226 restart_delay: Duration,
227 max_restarts: Option<u32>,
228 leader_elected: bool,
229) {
230 let mut restarts = 0u32;
231
232 if !startup_delay.is_zero() {
234 info!(daemon = %name, delay = ?startup_delay, "Waiting startup delay");
235 tokio::select! {
236 _ = tokio::time::sleep(startup_delay) => {}
237 _ = shutdown_rx.changed() => {
238 info!(daemon = %name, "Shutdown during startup delay");
239 return;
240 }
241 }
242 }
243
244 loop {
245 if *shutdown_rx.borrow() {
247 info!(daemon = %name, "Daemon shutting down");
248 break;
249 }
250
251 if leader_elected {
253 match try_acquire_leadership(&pool, &name, node_id).await {
254 Ok(true) => {
255 info!(daemon = %name, "Acquired leadership");
256 }
257 Ok(false) => {
258 tokio::select! {
260 _ = tokio::time::sleep(Duration::from_secs(5)) => {}
261 _ = shutdown_rx.changed() => {
262 info!(daemon = %name, "Shutdown while waiting for leadership");
263 return;
264 }
265 }
266 continue;
267 }
268 Err(e) => {
269 warn!(daemon = %name, error = %e, "Failed to check leadership");
270 tokio::time::sleep(Duration::from_secs(1)).await;
271 continue;
272 }
273 }
274 }
275
276 if let Err(e) = update_daemon_status(&pool, &name, DaemonStatus::Running).await {
278 warn!(daemon = %name, error = %e, "Failed to update status");
279 }
280
281 let instance_id = Uuid::new_v4();
282 info!(daemon = %name, instance = %instance_id, "Starting daemon");
283
284 let (daemon_shutdown_tx, daemon_shutdown_rx) = watch::channel(false);
286
287 let shutdown_rx_clone = shutdown_rx.clone();
289 let shutdown_tx_clone = daemon_shutdown_tx.clone();
290 tokio::spawn(async move {
291 let mut rx = shutdown_rx_clone;
292 while rx.changed().await.is_ok() {
293 if *rx.borrow() {
294 let _ = shutdown_tx_clone.send(true);
295 break;
296 }
297 }
298 });
299
300 let ctx = DaemonContext::new(
301 name.clone(),
302 instance_id,
303 pool.clone(),
304 http_client.inner().clone(),
305 daemon_shutdown_rx,
306 );
307
308 let result = std::panic::AssertUnwindSafe((entry.handler)(&ctx))
310 .catch_unwind()
311 .await;
312
313 match result {
314 Ok(Ok(())) => {
315 info!(daemon = %name, "Daemon completed successfully");
316 if let Err(e) = update_daemon_status(&pool, &name, DaemonStatus::Stopped).await {
317 warn!(daemon = %name, error = %e, "Failed to update status");
318 }
319 break;
320 }
321 Ok(Err(e)) => {
322 error!(daemon = %name, error = %e, "Daemon failed with error");
323 if let Err(e) = record_daemon_error(&pool, &name, &e.to_string()).await {
324 warn!(daemon = %name, error = %e, "Failed to record error");
325 }
326 }
327 Err(_) => {
328 error!(daemon = %name, "Daemon panicked");
329 if let Err(e) = record_daemon_error(&pool, &name, "Daemon panicked").await {
330 warn!(daemon = %name, error = %e, "Failed to record panic");
331 }
332 }
333 }
334
335 if *shutdown_rx.borrow() {
337 info!(daemon = %name, "Daemon shutting down after failure");
338 break;
339 }
340
341 if !restart_on_panic {
343 warn!(daemon = %name, "Restart disabled, daemon stopping");
344 if let Err(e) = update_daemon_status(&pool, &name, DaemonStatus::Failed).await {
345 warn!(daemon = %name, error = %e, "Failed to update status");
346 }
347 break;
348 }
349
350 restarts += 1;
351
352 if let Some(max) = max_restarts {
354 if restarts >= max {
355 error!(daemon = %name, restarts = restarts, max = max, "Max restarts exceeded");
356 if let Err(e) = update_daemon_status(&pool, &name, DaemonStatus::Failed).await {
357 warn!(daemon = %name, error = %e, "Failed to update status");
358 }
359 break;
360 }
361 }
362
363 if let Err(e) = update_daemon_status(&pool, &name, DaemonStatus::Restarting).await {
365 warn!(daemon = %name, error = %e, "Failed to update status");
366 }
367
368 info!(daemon = %name, restarts = restarts, delay = ?restart_delay, "Restarting daemon");
369
370 tokio::select! {
372 _ = tokio::time::sleep(restart_delay) => {}
373 _ = shutdown_rx.changed() => {
374 info!(daemon = %name, "Shutdown during restart delay");
375 break;
376 }
377 }
378 }
379
380 if leader_elected {
382 let _ = release_leadership(&pool, &name, node_id).await;
383 }
384}
385
386async fn try_acquire_leadership(pool: &PgPool, daemon_name: &str, node_id: Uuid) -> Result<bool> {
387 let lock_id = daemon_name
390 .bytes()
391 .fold(0i64, |acc, b| acc.wrapping_add(b as i64).wrapping_mul(31));
392
393 let result: (bool,) = sqlx::query_as("SELECT pg_try_advisory_lock($1)")
394 .bind(lock_id)
395 .fetch_one(pool)
396 .await
397 .map_err(|e| forge_core::ForgeError::Database(e.to_string()))?;
398
399 if result.0 {
400 sqlx::query("UPDATE forge_daemons SET node_id = $1 WHERE name = $2")
402 .bind(node_id)
403 .bind(daemon_name)
404 .execute(pool)
405 .await
406 .map_err(|e| forge_core::ForgeError::Database(e.to_string()))?;
407 }
408
409 Ok(result.0)
410}
411
412async fn release_leadership(pool: &PgPool, daemon_name: &str, _node_id: Uuid) -> Result<()> {
413 let lock_id = daemon_name
414 .bytes()
415 .fold(0i64, |acc, b| acc.wrapping_add(b as i64).wrapping_mul(31));
416
417 sqlx::query("SELECT pg_advisory_unlock($1)")
418 .bind(lock_id)
419 .execute(pool)
420 .await
421 .map_err(|e| forge_core::ForgeError::Database(e.to_string()))?;
422
423 Ok(())
424}
425
426async fn update_daemon_status(pool: &PgPool, name: &str, status: DaemonStatus) -> Result<()> {
427 sqlx::query("UPDATE forge_daemons SET status = $1, last_heartbeat = NOW() WHERE name = $2")
428 .bind(status.as_str())
429 .bind(name)
430 .execute(pool)
431 .await
432 .map_err(|e| forge_core::ForgeError::Database(e.to_string()))?;
433
434 Ok(())
435}
436
437async fn record_daemon_error(pool: &PgPool, name: &str, error: &str) -> Result<()> {
438 sqlx::query(
439 "UPDATE forge_daemons SET status = 'failed', last_error = $1, last_heartbeat = NOW() WHERE name = $2",
440 )
441 .bind(error)
442 .bind(name)
443 .execute(pool)
444 .await
445 .map_err(|e| forge_core::ForgeError::Database(e.to_string()))?;
446
447 Ok(())
448}
449
450#[cfg(test)]
451mod tests {
452 use super::*;
453
454 #[test]
455 fn test_default_config() {
456 let config = DaemonRunnerConfig::default();
457 assert_eq!(config.health_check_interval, Duration::from_secs(30));
458 assert_eq!(config.heartbeat_interval, Duration::from_secs(10));
459 }
460}