1use std::collections::HashMap;
4use std::sync::Arc;
5use std::time::{Duration, Instant};
6
7use forge_core::CircuitBreakerClient;
8use forge_core::Result;
9use forge_core::daemon::{DaemonContext, DaemonStatus};
10use futures_util::FutureExt;
11use sqlx::PgPool;
12use tokio::sync::{broadcast, watch};
13use tracing::{Instrument, Span, field};
14use uuid::Uuid;
15
16use super::registry::DaemonRegistry;
17
18#[derive(Debug, Clone)]
20pub struct DaemonRunnerConfig {
21 pub health_check_interval: Duration,
23 pub heartbeat_interval: Duration,
25}
26
27impl Default for DaemonRunnerConfig {
28 fn default() -> Self {
29 Self {
30 health_check_interval: Duration::from_secs(30),
31 heartbeat_interval: Duration::from_secs(10),
32 }
33 }
34}
35
36pub struct DaemonRunner {
38 registry: Arc<DaemonRegistry>,
39 pool: PgPool,
40 http_client: CircuitBreakerClient,
41 node_id: Uuid,
42 config: DaemonRunnerConfig,
43 shutdown_rx: broadcast::Receiver<()>,
44}
45
46impl DaemonRunner {
47 pub fn new(
49 registry: Arc<DaemonRegistry>,
50 pool: PgPool,
51 http_client: CircuitBreakerClient,
52 node_id: Uuid,
53 shutdown_rx: broadcast::Receiver<()>,
54 ) -> Self {
55 Self {
56 registry,
57 pool,
58 http_client,
59 node_id,
60 config: DaemonRunnerConfig::default(),
61 shutdown_rx,
62 }
63 }
64
65 pub fn with_config(mut self, config: DaemonRunnerConfig) -> Self {
67 self.config = config;
68 self
69 }
70
71 pub async fn run(mut self) -> Result<()> {
73 let runner_span = tracing::info_span!(
74 "daemon.runner",
75 daemon.node_id = %self.node_id,
76 daemon.count = self.registry.len(),
77 daemon.uptime_ms = field::Empty,
78 );
79 let start_time = Instant::now();
80
81 async {
82 if self.registry.is_empty() {
83 tracing::debug!("No daemons registered, daemon runner idle");
84 let _ = self.shutdown_rx.recv().await;
86 Span::current().record("daemon.uptime_ms", start_time.elapsed().as_millis() as u64);
87 return Ok(());
88 }
89
90 tracing::info!(count = self.registry.len(), "Daemon runner starting");
91
92 let mut daemon_handles: HashMap<String, DaemonHandle> = HashMap::new();
94
95 for (name, entry) in self.registry.daemons() {
97 let info = &entry.info;
98
99 let (shutdown_tx, shutdown_rx) = watch::channel(false);
101
102 let handle = DaemonHandle {
103 name: name.to_string(),
104 instance_id: Uuid::new_v4(),
105 shutdown_tx,
106 restarts: 0,
107 status: DaemonStatus::Pending,
108 };
109
110 if let Err(e) = self.record_daemon_start(&handle).await {
112 tracing::debug!(daemon = name, error = %e, "Failed to record daemon start");
113 }
114
115 tracing::info!(
116 daemon.name = name,
117 daemon.instance_id = %handle.instance_id,
118 daemon.leader_elected = info.leader_elected,
119 "Starting daemon"
120 );
121
122 let daemon_entry = entry.clone();
124 let pool = self.pool.clone();
125 let http_client = self.http_client.clone();
126 let daemon_name = name.to_string();
127 let startup_delay = info.startup_delay;
128 let restart_on_panic = info.restart_on_panic;
129 let restart_delay = info.restart_delay;
130 let max_restarts = info.max_restarts;
131 let leader_elected = info.leader_elected;
132 let node_id = self.node_id;
133
134 tokio::spawn(async move {
135 run_daemon_loop(
136 daemon_name,
137 daemon_entry,
138 pool,
139 http_client,
140 shutdown_rx,
141 node_id,
142 startup_delay,
143 restart_on_panic,
144 restart_delay,
145 max_restarts,
146 leader_elected,
147 )
148 .await
149 });
150
151 daemon_handles.insert(name.to_string(), handle);
152 }
153
154 let _ = self.shutdown_rx.recv().await;
156 tracing::info!("Daemon runner received shutdown signal");
157
158 for (name, handle) in &daemon_handles {
160 tracing::info!(daemon.name = name, "Signaling daemon to stop");
161 let _ = handle.shutdown_tx.send(true);
162 }
163
164 tokio::time::sleep(Duration::from_secs(2)).await;
166
167 for (name, handle) in &daemon_handles {
169 if let Err(e) = self.record_daemon_stop(handle).await {
170 tracing::debug!(daemon = name, error = %e, "Failed to record daemon stop");
171 }
172 }
173
174 Span::current().record("daemon.uptime_ms", start_time.elapsed().as_millis() as u64);
175 tracing::info!(
176 daemon.uptime_ms = start_time.elapsed().as_millis() as u64,
177 "Daemon runner stopped"
178 );
179 Ok(())
180 }
181 .instrument(runner_span)
182 .await
183 }
184
185 async fn record_daemon_start(&self, handle: &DaemonHandle) -> Result<()> {
186 sqlx::query(
187 r#"
188 INSERT INTO forge_daemons (name, node_id, instance_id, status, restarts, started_at, last_heartbeat)
189 VALUES ($1, $2, $3, $4, $5, NOW(), NOW())
190 ON CONFLICT (name) DO UPDATE SET
191 node_id = EXCLUDED.node_id,
192 instance_id = EXCLUDED.instance_id,
193 status = EXCLUDED.status,
194 restarts = EXCLUDED.restarts,
195 started_at = NOW(),
196 last_heartbeat = NOW(),
197 last_error = NULL
198 "#,
199 )
200 .bind(&handle.name)
201 .bind(self.node_id)
202 .bind(handle.instance_id)
203 .bind(handle.status.as_str())
204 .bind(handle.restarts as i32)
205 .execute(&self.pool)
206 .await
207 .map_err(|e| forge_core::ForgeError::Database(e.to_string()))?;
208
209 Ok(())
210 }
211
212 async fn record_daemon_stop(&self, handle: &DaemonHandle) -> Result<()> {
213 sqlx::query(
214 r#"
215 UPDATE forge_daemons
216 SET status = 'stopped', last_heartbeat = NOW()
217 WHERE name = $1 AND instance_id = $2
218 "#,
219 )
220 .bind(&handle.name)
221 .bind(handle.instance_id)
222 .execute(&self.pool)
223 .await
224 .map_err(|e| forge_core::ForgeError::Database(e.to_string()))?;
225
226 Ok(())
227 }
228}
229
230struct DaemonHandle {
231 name: String,
232 instance_id: Uuid,
233 shutdown_tx: watch::Sender<bool>,
234 restarts: u32,
235 status: DaemonStatus,
236}
237
238#[allow(clippy::too_many_arguments)]
239async fn run_daemon_loop(
240 name: String,
241 entry: Arc<super::registry::DaemonEntry>,
242 pool: PgPool,
243 http_client: CircuitBreakerClient,
244 mut shutdown_rx: watch::Receiver<bool>,
245 node_id: Uuid,
246 startup_delay: Duration,
247 restart_on_panic: bool,
248 restart_delay: Duration,
249 max_restarts: Option<u32>,
250 leader_elected: bool,
251) {
252 let daemon_span = tracing::info_span!(
253 "daemon.lifecycle",
254 daemon.name = %name,
255 daemon.node_id = %node_id,
256 daemon.leader_elected = leader_elected,
257 daemon.restart_count = field::Empty,
258 daemon.uptime_ms = field::Empty,
259 daemon.final_status = field::Empty,
260 otel.name = %format!("daemon {}", name),
261 );
262 let daemon_start = Instant::now();
263
264 async {
265 let mut restarts = 0u32;
266
267 if !startup_delay.is_zero() {
269 tracing::debug!(delay_ms = startup_delay.as_millis() as u64, "Waiting startup delay");
270 tokio::select! {
271 _ = tokio::time::sleep(startup_delay) => {}
272 _ = shutdown_rx.changed() => {
273 tracing::debug!("Shutdown during startup delay");
274 Span::current().record("daemon.final_status", "shutdown_during_startup");
275 return;
276 }
277 }
278 }
279
280 loop {
281 if *shutdown_rx.borrow() {
283 tracing::debug!("Daemon shutting down");
284 Span::current().record("daemon.final_status", "shutdown");
285 break;
286 }
287
288 if leader_elected {
290 match try_acquire_leadership(&pool, &name, node_id).await {
291 Ok(true) => {
292 tracing::info!("Acquired leadership");
293 }
294 Ok(false) => {
295 tracing::debug!("Waiting for leadership");
297 tokio::select! {
298 _ = tokio::time::sleep(Duration::from_secs(5)) => {}
299 _ = shutdown_rx.changed() => {
300 tracing::debug!("Shutdown while waiting for leadership");
301 Span::current().record("daemon.final_status", "shutdown_waiting_leadership");
302 return;
303 }
304 }
305 continue;
306 }
307 Err(e) => {
308 tracing::debug!(error = %e, "Failed to check leadership");
309 tokio::time::sleep(Duration::from_secs(1)).await;
310 continue;
311 }
312 }
313 }
314
315 if let Err(e) = update_daemon_status(&pool, &name, DaemonStatus::Running).await {
317 tracing::debug!(error = %e, "Failed to update daemon status");
318 }
319
320 let instance_id = Uuid::new_v4();
321 let execution_start = Instant::now();
322
323 let exec_span = tracing::info_span!(
324 "daemon.execute",
325 daemon.instance_id = %instance_id,
326 daemon.execution_duration_ms = field::Empty,
327 daemon.status = field::Empty,
328 );
329
330 let result = async {
331 tracing::info!(instance_id = %instance_id, "Daemon instance starting");
332
333 let (daemon_shutdown_tx, daemon_shutdown_rx) = watch::channel(false);
335
336 let shutdown_rx_clone = shutdown_rx.clone();
338 let shutdown_tx_clone = daemon_shutdown_tx.clone();
339 tokio::spawn(async move {
340 let mut rx = shutdown_rx_clone;
341 while rx.changed().await.is_ok() {
342 if *rx.borrow() {
343 let _ = shutdown_tx_clone.send(true);
344 break;
345 }
346 }
347 });
348
349 let ctx = DaemonContext::new(
350 name.clone(),
351 instance_id,
352 pool.clone(),
353 http_client.inner().clone(),
354 daemon_shutdown_rx,
355 );
356
357 let result = std::panic::AssertUnwindSafe((entry.handler)(&ctx))
359 .catch_unwind()
360 .await;
361
362 let exec_duration = execution_start.elapsed().as_millis() as u64;
363 Span::current().record("daemon.execution_duration_ms", exec_duration);
364
365 result
366 }
367 .instrument(exec_span)
368 .await;
369
370 match result {
371 Ok(Ok(())) => {
372 tracing::info!("Daemon completed gracefully");
373 Span::current().record("daemon.final_status", "completed");
374 let _ = update_daemon_status(&pool, &name, DaemonStatus::Stopped).await;
375 break;
376 }
377 Ok(Err(e)) => {
378 let recorded = record_daemon_error(&pool, &name, &e.to_string()).await.is_ok();
379 tracing::error!(error = %e, recorded, "Daemon failed");
380 }
381 Err(_) => {
382 let recorded = record_daemon_error(&pool, &name, "Daemon panicked").await.is_ok();
383 tracing::error!(recorded, "Daemon panicked");
384 }
385 }
386
387 if *shutdown_rx.borrow() {
389 tracing::debug!("Daemon shutting down after failure");
390 Span::current().record("daemon.final_status", "shutdown_after_failure");
391 break;
392 }
393
394 if !restart_on_panic {
396 tracing::warn!("Restart disabled, daemon stopping");
397 Span::current().record("daemon.final_status", "failed_no_restart");
398 let _ = update_daemon_status(&pool, &name, DaemonStatus::Failed).await;
399 break;
400 }
401
402 restarts += 1;
403 Span::current().record("daemon.restart_count", restarts);
404
405 if let Some(max) = max_restarts
407 && restarts >= max
408 {
409 tracing::error!(restarts, max, "Max restarts exceeded");
410 Span::current().record("daemon.final_status", "max_restarts_exceeded");
411 let _ = update_daemon_status(&pool, &name, DaemonStatus::Failed).await;
412 break;
413 }
414
415 let _ = update_daemon_status(&pool, &name, DaemonStatus::Restarting).await;
417
418 tracing::warn!(
419 restarts,
420 restart_delay_ms = restart_delay.as_millis() as u64,
421 "Restarting daemon"
422 );
423
424 tokio::select! {
426 _ = tokio::time::sleep(restart_delay) => {}
427 _ = shutdown_rx.changed() => {
428 tracing::debug!("Shutdown during restart delay");
429 Span::current().record("daemon.final_status", "shutdown_during_restart");
430 break;
431 }
432 }
433 }
434
435 let uptime = daemon_start.elapsed().as_millis() as u64;
436 Span::current().record("daemon.uptime_ms", uptime);
437 Span::current().record("daemon.restart_count", restarts);
438
439 if leader_elected {
441 let _ = release_leadership(&pool, &name, node_id).await;
442 }
443
444 tracing::info!(
445 uptime_ms = uptime,
446 restart_count = restarts,
447 "Daemon lifecycle ended"
448 );
449 }
450 .instrument(daemon_span)
451 .await
452}
453
454async fn try_acquire_leadership(pool: &PgPool, daemon_name: &str, node_id: Uuid) -> Result<bool> {
455 let lock_id = daemon_name
458 .bytes()
459 .fold(0i64, |acc, b| acc.wrapping_add(b as i64).wrapping_mul(31));
460
461 let result: (bool,) = sqlx::query_as("SELECT pg_try_advisory_lock($1)")
462 .bind(lock_id)
463 .fetch_one(pool)
464 .await
465 .map_err(|e| forge_core::ForgeError::Database(e.to_string()))?;
466
467 if result.0 {
468 sqlx::query("UPDATE forge_daemons SET node_id = $1 WHERE name = $2")
470 .bind(node_id)
471 .bind(daemon_name)
472 .execute(pool)
473 .await
474 .map_err(|e| forge_core::ForgeError::Database(e.to_string()))?;
475 }
476
477 Ok(result.0)
478}
479
480async fn release_leadership(pool: &PgPool, daemon_name: &str, _node_id: Uuid) -> Result<()> {
481 let lock_id = daemon_name
482 .bytes()
483 .fold(0i64, |acc, b| acc.wrapping_add(b as i64).wrapping_mul(31));
484
485 sqlx::query("SELECT pg_advisory_unlock($1)")
486 .bind(lock_id)
487 .execute(pool)
488 .await
489 .map_err(|e| forge_core::ForgeError::Database(e.to_string()))?;
490
491 Ok(())
492}
493
494async fn update_daemon_status(pool: &PgPool, name: &str, status: DaemonStatus) -> Result<()> {
495 sqlx::query("UPDATE forge_daemons SET status = $1, last_heartbeat = NOW() WHERE name = $2")
496 .bind(status.as_str())
497 .bind(name)
498 .execute(pool)
499 .await
500 .map_err(|e| forge_core::ForgeError::Database(e.to_string()))?;
501
502 Ok(())
503}
504
505async fn record_daemon_error(pool: &PgPool, name: &str, error: &str) -> Result<()> {
506 sqlx::query(
507 "UPDATE forge_daemons SET status = 'failed', last_error = $1, last_heartbeat = NOW() WHERE name = $2",
508 )
509 .bind(error)
510 .bind(name)
511 .execute(pool)
512 .await
513 .map_err(|e| forge_core::ForgeError::Database(e.to_string()))?;
514
515 Ok(())
516}
517
518#[cfg(test)]
519mod tests {
520 use super::*;
521
522 #[test]
523 fn test_default_config() {
524 let config = DaemonRunnerConfig::default();
525 assert_eq!(config.health_check_interval, Duration::from_secs(30));
526 assert_eq!(config.heartbeat_interval, Duration::from_secs(10));
527 }
528}