1use std::collections::HashMap;
4use std::sync::Arc;
5use std::time::{Duration, Instant};
6
7use forge_core::CircuitBreakerClient;
8use forge_core::Result;
9use forge_core::daemon::{DaemonContext, DaemonStatus};
10use futures_util::FutureExt;
11use sqlx::PgPool;
12use tokio::sync::{broadcast, watch};
13use tracing::{Instrument, Span, field};
14use uuid::Uuid;
15
16use super::registry::DaemonRegistry;
17
18#[derive(Debug, Clone)]
20pub struct DaemonRunnerConfig {
21 pub health_check_interval: Duration,
23 pub heartbeat_interval: Duration,
25}
26
27impl Default for DaemonRunnerConfig {
28 fn default() -> Self {
29 Self {
30 health_check_interval: Duration::from_secs(30),
31 heartbeat_interval: Duration::from_secs(10),
32 }
33 }
34}
35
36pub struct DaemonRunner {
38 registry: Arc<DaemonRegistry>,
39 pool: PgPool,
40 http_client: CircuitBreakerClient,
41 node_id: Uuid,
42 config: DaemonRunnerConfig,
43 shutdown_rx: broadcast::Receiver<()>,
44}
45
46impl DaemonRunner {
47 pub fn new(
49 registry: Arc<DaemonRegistry>,
50 pool: PgPool,
51 http_client: CircuitBreakerClient,
52 node_id: Uuid,
53 shutdown_rx: broadcast::Receiver<()>,
54 ) -> Self {
55 Self {
56 registry,
57 pool,
58 http_client,
59 node_id,
60 config: DaemonRunnerConfig::default(),
61 shutdown_rx,
62 }
63 }
64
65 pub fn with_config(mut self, config: DaemonRunnerConfig) -> Self {
67 self.config = config;
68 self
69 }
70
71 pub async fn run(mut self) -> Result<()> {
73 let runner_span = tracing::info_span!(
74 "daemon.runner",
75 daemon.node_id = %self.node_id,
76 daemon.count = self.registry.len(),
77 daemon.uptime_ms = field::Empty,
78 );
79 let start_time = Instant::now();
80
81 async {
82 if self.registry.is_empty() {
83 tracing::debug!("No daemons registered, daemon runner idle");
84 let _ = self.shutdown_rx.recv().await;
86 Span::current().record("daemon.uptime_ms", start_time.elapsed().as_millis() as u64);
87 return Ok(());
88 }
89
90 tracing::info!(count = self.registry.len(), "Daemon runner starting");
91
92 let mut daemon_handles: HashMap<String, DaemonHandle> = HashMap::new();
94
95 for (name, entry) in self.registry.daemons() {
97 let info = &entry.info;
98
99 let (shutdown_tx, shutdown_rx) = watch::channel(false);
101
102 let handle = DaemonHandle {
103 name: name.to_string(),
104 instance_id: Uuid::new_v4(),
105 shutdown_tx,
106 restarts: 0,
107 status: DaemonStatus::Pending,
108 };
109
110 if let Err(e) = self.record_daemon_start(&handle).await {
112 tracing::debug!(daemon = name, error = %e, "Failed to record daemon start");
113 }
114
115 tracing::info!(
116 daemon.name = name,
117 daemon.instance_id = %handle.instance_id,
118 daemon.leader_elected = info.leader_elected,
119 "Starting daemon"
120 );
121
122 let daemon_entry = entry.clone();
124 let pool = self.pool.clone();
125 let http_client = self.http_client.clone();
126 let daemon_name = name.to_string();
127 let startup_delay = info.startup_delay;
128 let restart_on_panic = info.restart_on_panic;
129 let restart_delay = info.restart_delay;
130 let max_restarts = info.max_restarts;
131 let leader_elected = info.leader_elected;
132 let node_id = self.node_id;
133
134 tokio::spawn(async move {
135 run_daemon_loop(
136 daemon_name,
137 daemon_entry,
138 pool,
139 http_client,
140 shutdown_rx,
141 node_id,
142 startup_delay,
143 restart_on_panic,
144 restart_delay,
145 max_restarts,
146 leader_elected,
147 )
148 .await
149 });
150
151 daemon_handles.insert(name.to_string(), handle);
152 }
153
154 let _ = self.shutdown_rx.recv().await;
156 tracing::info!("Daemon runner received shutdown signal");
157
158 for (name, handle) in &daemon_handles {
160 tracing::info!(daemon.name = name, "Signaling daemon to stop");
161 let _ = handle.shutdown_tx.send(true);
162 }
163
164 tokio::time::sleep(Duration::from_secs(2)).await;
166
167 for (name, handle) in &daemon_handles {
169 if let Err(e) = self.record_daemon_stop(handle).await {
170 tracing::debug!(daemon = name, error = %e, "Failed to record daemon stop");
171 }
172 }
173
174 Span::current().record("daemon.uptime_ms", start_time.elapsed().as_millis() as u64);
175 tracing::info!(
176 daemon.uptime_ms = start_time.elapsed().as_millis() as u64,
177 "Daemon runner stopped"
178 );
179 Ok(())
180 }
181 .instrument(runner_span)
182 .await
183 }
184
185 async fn record_daemon_start(&self, handle: &DaemonHandle) -> Result<()> {
186 sqlx::query(
187 r#"
188 INSERT INTO forge_daemons (name, node_id, instance_id, status, restarts, started_at, last_heartbeat)
189 VALUES ($1, $2, $3, $4, $5, NOW(), NOW())
190 ON CONFLICT (name) DO UPDATE SET
191 node_id = EXCLUDED.node_id,
192 instance_id = EXCLUDED.instance_id,
193 status = EXCLUDED.status,
194 restarts = EXCLUDED.restarts,
195 started_at = NOW(),
196 last_heartbeat = NOW(),
197 last_error = NULL
198 "#,
199 )
200 .bind(&handle.name)
201 .bind(self.node_id)
202 .bind(handle.instance_id)
203 .bind(handle.status.as_str())
204 .bind(handle.restarts as i32)
205 .execute(&self.pool)
206 .await
207 .map_err(|e| forge_core::ForgeError::Database(e.to_string()))?;
208
209 Ok(())
210 }
211
212 async fn record_daemon_stop(&self, handle: &DaemonHandle) -> Result<()> {
213 sqlx::query(
214 r#"
215 UPDATE forge_daemons
216 SET status = 'stopped', last_heartbeat = NOW()
217 WHERE name = $1 AND instance_id = $2
218 "#,
219 )
220 .bind(&handle.name)
221 .bind(handle.instance_id)
222 .execute(&self.pool)
223 .await
224 .map_err(|e| forge_core::ForgeError::Database(e.to_string()))?;
225
226 Ok(())
227 }
228}
229
230struct DaemonHandle {
231 name: String,
232 instance_id: Uuid,
233 shutdown_tx: watch::Sender<bool>,
234 restarts: u32,
235 status: DaemonStatus,
236}
237
238#[allow(clippy::too_many_arguments)]
239async fn run_daemon_loop(
240 name: String,
241 entry: Arc<super::registry::DaemonEntry>,
242 pool: PgPool,
243 http_client: CircuitBreakerClient,
244 mut shutdown_rx: watch::Receiver<bool>,
245 node_id: Uuid,
246 startup_delay: Duration,
247 restart_on_panic: bool,
248 restart_delay: Duration,
249 max_restarts: Option<u32>,
250 leader_elected: bool,
251) {
252 let daemon_span = tracing::info_span!(
253 "daemon.lifecycle",
254 daemon.name = %name,
255 daemon.node_id = %node_id,
256 daemon.leader_elected = leader_elected,
257 daemon.restart_count = field::Empty,
258 daemon.uptime_ms = field::Empty,
259 daemon.final_status = field::Empty,
260 otel.name = %format!("daemon {}", name),
261 );
262 let daemon_start = Instant::now();
263
264 async {
265 let mut restarts = 0u32;
266
267 if !startup_delay.is_zero() {
269 tracing::debug!(delay_ms = startup_delay.as_millis() as u64, "Waiting startup delay");
270 tokio::select! {
271 _ = tokio::time::sleep(startup_delay) => {}
272 _ = shutdown_rx.changed() => {
273 tracing::debug!("Shutdown during startup delay");
274 Span::current().record("daemon.final_status", "shutdown_during_startup");
275 return;
276 }
277 }
278 }
279
280 loop {
281 if *shutdown_rx.borrow() {
283 tracing::debug!("Daemon shutting down");
284 Span::current().record("daemon.final_status", "shutdown");
285 break;
286 }
287
288 if leader_elected {
290 match try_acquire_leadership(&pool, &name, node_id).await {
291 Ok(true) => {
292 tracing::info!("Acquired leadership");
293 }
294 Ok(false) => {
295 tracing::debug!("Waiting for leadership");
297 tokio::select! {
298 _ = tokio::time::sleep(Duration::from_secs(5)) => {}
299 _ = shutdown_rx.changed() => {
300 tracing::debug!("Shutdown while waiting for leadership");
301 Span::current().record("daemon.final_status", "shutdown_waiting_leadership");
302 return;
303 }
304 }
305 continue;
306 }
307 Err(e) => {
308 tracing::debug!(error = %e, "Failed to check leadership");
309 tokio::time::sleep(Duration::from_secs(1)).await;
310 continue;
311 }
312 }
313 }
314
315 if let Err(e) = update_daemon_status(&pool, &name, DaemonStatus::Running).await {
317 tracing::debug!(error = %e, "Failed to update daemon status");
318 }
319
320 let instance_id = Uuid::new_v4();
321 let execution_start = Instant::now();
322
323 let exec_span = tracing::info_span!(
324 "daemon.execute",
325 daemon.instance_id = %instance_id,
326 daemon.execution_duration_ms = field::Empty,
327 daemon.status = field::Empty,
328 );
329
330 let result = async {
331 tracing::info!(instance_id = %instance_id, "Daemon instance starting");
332
333 let (daemon_shutdown_tx, daemon_shutdown_rx) = watch::channel(false);
335
336 let shutdown_rx_clone = shutdown_rx.clone();
338 let shutdown_tx_clone = daemon_shutdown_tx.clone();
339 tokio::spawn(async move {
340 let mut rx = shutdown_rx_clone;
341 while rx.changed().await.is_ok() {
342 if *rx.borrow() {
343 let _ = shutdown_tx_clone.send(true);
344 break;
345 }
346 }
347 });
348
349 let ctx = DaemonContext::new(
350 name.clone(),
351 instance_id,
352 pool.clone(),
353 http_client.inner().clone(),
354 daemon_shutdown_rx,
355 );
356
357 let result = std::panic::AssertUnwindSafe((entry.handler)(&ctx))
359 .catch_unwind()
360 .await;
361
362 let exec_duration = execution_start.elapsed().as_millis() as u64;
363 Span::current().record("daemon.execution_duration_ms", exec_duration);
364
365 result
366 }
367 .instrument(exec_span)
368 .await;
369
370 match result {
371 Ok(Ok(())) => {
372 tracing::info!("Daemon completed gracefully");
373 Span::current().record("daemon.final_status", "completed");
374 if let Err(e) = update_daemon_status(&pool, &name, DaemonStatus::Stopped).await {
375 tracing::debug!(daemon = %name, error = %e, "Status update failed");
376 }
377 break;
378 }
379 Ok(Err(e)) => {
380 let recorded = record_daemon_error(&pool, &name, &e.to_string()).await.is_ok();
381 tracing::error!(error = %e, recorded, "Daemon failed");
382 }
383 Err(_) => {
384 let recorded = record_daemon_error(&pool, &name, "Daemon panicked").await.is_ok();
385 tracing::error!(recorded, "Daemon panicked");
386 }
387 }
388
389 if *shutdown_rx.borrow() {
391 tracing::debug!("Daemon shutting down after failure");
392 Span::current().record("daemon.final_status", "shutdown_after_failure");
393 break;
394 }
395
396 if !restart_on_panic {
398 tracing::warn!("Restart disabled, daemon stopping");
399 Span::current().record("daemon.final_status", "failed_no_restart");
400 if let Err(e) = update_daemon_status(&pool, &name, DaemonStatus::Failed).await {
401 tracing::debug!(daemon = %name, error = %e, "Status update failed");
402 }
403 break;
404 }
405
406 restarts += 1;
407 Span::current().record("daemon.restart_count", restarts);
408
409 if let Some(max) = max_restarts
411 && restarts >= max
412 {
413 tracing::error!(restarts, max, "Max restarts exceeded");
414 Span::current().record("daemon.final_status", "max_restarts_exceeded");
415 if let Err(e) = update_daemon_status(&pool, &name, DaemonStatus::Failed).await {
416 tracing::debug!(daemon = %name, error = %e, "Status update failed");
417 }
418 break;
419 }
420
421 if let Err(e) = update_daemon_status(&pool, &name, DaemonStatus::Restarting).await {
423 tracing::debug!(daemon = %name, error = %e, "Status update failed");
424 }
425
426 tracing::warn!(
427 restarts,
428 restart_delay_ms = restart_delay.as_millis() as u64,
429 "Restarting daemon"
430 );
431
432 tokio::select! {
434 _ = tokio::time::sleep(restart_delay) => {}
435 _ = shutdown_rx.changed() => {
436 tracing::debug!("Shutdown during restart delay");
437 Span::current().record("daemon.final_status", "shutdown_during_restart");
438 break;
439 }
440 }
441 }
442
443 let uptime = daemon_start.elapsed().as_millis() as u64;
444 Span::current().record("daemon.uptime_ms", uptime);
445 Span::current().record("daemon.restart_count", restarts);
446
447 if leader_elected
449 && let Err(e) = release_leadership(&pool, &name, node_id).await
450 {
451 tracing::debug!(daemon = %name, error = %e, "Failed to release leadership");
452 }
453
454 tracing::info!(
455 uptime_ms = uptime,
456 restart_count = restarts,
457 "Daemon lifecycle ended"
458 );
459 }
460 .instrument(daemon_span)
461 .await
462}
463
464async fn try_acquire_leadership(pool: &PgPool, daemon_name: &str, node_id: Uuid) -> Result<bool> {
465 let lock_id = daemon_name
468 .bytes()
469 .fold(0i64, |acc, b| acc.wrapping_add(b as i64).wrapping_mul(31));
470
471 let result: (bool,) = sqlx::query_as("SELECT pg_try_advisory_lock($1)")
472 .bind(lock_id)
473 .fetch_one(pool)
474 .await
475 .map_err(|e| forge_core::ForgeError::Database(e.to_string()))?;
476
477 if result.0 {
478 sqlx::query("UPDATE forge_daemons SET node_id = $1 WHERE name = $2")
480 .bind(node_id)
481 .bind(daemon_name)
482 .execute(pool)
483 .await
484 .map_err(|e| forge_core::ForgeError::Database(e.to_string()))?;
485 }
486
487 Ok(result.0)
488}
489
490async fn release_leadership(pool: &PgPool, daemon_name: &str, _node_id: Uuid) -> Result<()> {
491 let lock_id = daemon_name
492 .bytes()
493 .fold(0i64, |acc, b| acc.wrapping_add(b as i64).wrapping_mul(31));
494
495 sqlx::query("SELECT pg_advisory_unlock($1)")
496 .bind(lock_id)
497 .execute(pool)
498 .await
499 .map_err(|e| forge_core::ForgeError::Database(e.to_string()))?;
500
501 Ok(())
502}
503
504async fn update_daemon_status(pool: &PgPool, name: &str, status: DaemonStatus) -> Result<()> {
505 sqlx::query("UPDATE forge_daemons SET status = $1, last_heartbeat = NOW() WHERE name = $2")
506 .bind(status.as_str())
507 .bind(name)
508 .execute(pool)
509 .await
510 .map_err(|e| forge_core::ForgeError::Database(e.to_string()))?;
511
512 Ok(())
513}
514
515async fn record_daemon_error(pool: &PgPool, name: &str, error: &str) -> Result<()> {
516 sqlx::query(
517 "UPDATE forge_daemons SET status = 'failed', last_error = $1, last_heartbeat = NOW() WHERE name = $2",
518 )
519 .bind(error)
520 .bind(name)
521 .execute(pool)
522 .await
523 .map_err(|e| forge_core::ForgeError::Database(e.to_string()))?;
524
525 Ok(())
526}
527
528#[cfg(test)]
529mod tests {
530 use super::*;
531
532 #[test]
533 fn test_default_config() {
534 let config = DaemonRunnerConfig::default();
535 assert_eq!(config.health_check_interval, Duration::from_secs(30));
536 assert_eq!(config.heartbeat_interval, Duration::from_secs(10));
537 }
538}