Skip to main content

forge_runtime/daemon/
runner.rs

1//! Daemon runner with restart logic and leader election.
2
3use std::collections::HashMap;
4use std::sync::Arc;
5use std::time::{Duration, Instant};
6
7use forge_core::CircuitBreakerClient;
8use forge_core::Result;
9use forge_core::daemon::{DaemonContext, DaemonStatus};
10use futures_util::FutureExt;
11use sqlx::PgPool;
12use tokio::sync::{broadcast, watch};
13use tracing::{Instrument, Span, field};
14use uuid::Uuid;
15
16use super::registry::DaemonRegistry;
17
18/// Configuration for the daemon runner.
19#[derive(Debug, Clone)]
20pub struct DaemonRunnerConfig {
21    /// How often to check daemon health.
22    pub health_check_interval: Duration,
23    /// How often to send heartbeats.
24    pub heartbeat_interval: Duration,
25}
26
27impl Default for DaemonRunnerConfig {
28    fn default() -> Self {
29        Self {
30            health_check_interval: Duration::from_secs(30),
31            heartbeat_interval: Duration::from_secs(10),
32        }
33    }
34}
35
36/// Manages running all registered daemons.
37pub struct DaemonRunner {
38    registry: Arc<DaemonRegistry>,
39    pool: PgPool,
40    http_client: CircuitBreakerClient,
41    node_id: Uuid,
42    config: DaemonRunnerConfig,
43    shutdown_rx: broadcast::Receiver<()>,
44}
45
46impl DaemonRunner {
47    /// Create a new daemon runner.
48    pub fn new(
49        registry: Arc<DaemonRegistry>,
50        pool: PgPool,
51        http_client: CircuitBreakerClient,
52        node_id: Uuid,
53        shutdown_rx: broadcast::Receiver<()>,
54    ) -> Self {
55        Self {
56            registry,
57            pool,
58            http_client,
59            node_id,
60            config: DaemonRunnerConfig::default(),
61            shutdown_rx,
62        }
63    }
64
65    /// Set custom configuration.
66    pub fn with_config(mut self, config: DaemonRunnerConfig) -> Self {
67        self.config = config;
68        self
69    }
70
71    /// Run all registered daemons.
72    pub async fn run(mut self) -> Result<()> {
73        let runner_span = tracing::info_span!(
74            "daemon.runner",
75            daemon.node_id = %self.node_id,
76            daemon.count = self.registry.len(),
77            daemon.uptime_ms = field::Empty,
78        );
79        let start_time = Instant::now();
80
81        async {
82            if self.registry.is_empty() {
83                tracing::debug!("No daemons registered, daemon runner idle");
84                // Wait for shutdown
85                let _ = self.shutdown_rx.recv().await;
86                Span::current().record("daemon.uptime_ms", start_time.elapsed().as_millis() as u64);
87                return Ok(());
88            }
89
90            tracing::info!(count = self.registry.len(), "Daemon runner starting");
91
92            // Create individual shutdown channels for each daemon
93            let mut daemon_handles: HashMap<String, DaemonHandle> = HashMap::new();
94
95            // Start each daemon
96            for (name, entry) in self.registry.daemons() {
97                let info = &entry.info;
98
99                // Create shutdown channel for this daemon
100                let (shutdown_tx, shutdown_rx) = watch::channel(false);
101
102                let handle = DaemonHandle {
103                    name: name.to_string(),
104                    instance_id: Uuid::new_v4(),
105                    shutdown_tx,
106                    restarts: 0,
107                    status: DaemonStatus::Pending,
108                };
109
110                // Record daemon in database
111                if let Err(e) = self.record_daemon_start(&handle).await {
112                    tracing::debug!(daemon = name, error = %e, "Failed to record daemon start");
113                }
114
115                tracing::info!(
116                    daemon.name = name,
117                    daemon.instance_id = %handle.instance_id,
118                    daemon.leader_elected = info.leader_elected,
119                    "Starting daemon"
120                );
121
122                // Spawn daemon task
123                let daemon_entry = entry.clone();
124                let pool = self.pool.clone();
125                let http_client = self.http_client.clone();
126                let daemon_name = name.to_string();
127                let startup_delay = info.startup_delay;
128                let restart_on_panic = info.restart_on_panic;
129                let restart_delay = info.restart_delay;
130                let max_restarts = info.max_restarts;
131                let leader_elected = info.leader_elected;
132                let node_id = self.node_id;
133
134                tokio::spawn(async move {
135                    run_daemon_loop(
136                        daemon_name,
137                        daemon_entry,
138                        pool,
139                        http_client,
140                        shutdown_rx,
141                        node_id,
142                        startup_delay,
143                        restart_on_panic,
144                        restart_delay,
145                        max_restarts,
146                        leader_elected,
147                    )
148                    .await
149                });
150
151                daemon_handles.insert(name.to_string(), handle);
152            }
153
154            // Wait for shutdown signal
155            let _ = self.shutdown_rx.recv().await;
156            tracing::info!("Daemon runner received shutdown signal");
157
158            // Signal all daemons to stop
159            for (name, handle) in &daemon_handles {
160                tracing::info!(daemon.name = name, "Signaling daemon to stop");
161                let _ = handle.shutdown_tx.send(true);
162            }
163
164            // Give daemons time to clean up
165            tokio::time::sleep(Duration::from_secs(2)).await;
166
167            // Update daemon status in database
168            for (name, handle) in &daemon_handles {
169                if let Err(e) = self.record_daemon_stop(handle).await {
170                    tracing::debug!(daemon = name, error = %e, "Failed to record daemon stop");
171                }
172            }
173
174            Span::current().record("daemon.uptime_ms", start_time.elapsed().as_millis() as u64);
175            tracing::info!(
176                daemon.uptime_ms = start_time.elapsed().as_millis() as u64,
177                "Daemon runner stopped"
178            );
179            Ok(())
180        }
181        .instrument(runner_span)
182        .await
183    }
184
185    async fn record_daemon_start(&self, handle: &DaemonHandle) -> Result<()> {
186        sqlx::query(
187            r#"
188            INSERT INTO forge_daemons (name, node_id, instance_id, status, restarts, started_at, last_heartbeat)
189            VALUES ($1, $2, $3, $4, $5, NOW(), NOW())
190            ON CONFLICT (name) DO UPDATE SET
191                node_id = EXCLUDED.node_id,
192                instance_id = EXCLUDED.instance_id,
193                status = EXCLUDED.status,
194                restarts = EXCLUDED.restarts,
195                started_at = NOW(),
196                last_heartbeat = NOW(),
197                last_error = NULL
198            "#,
199        )
200        .bind(&handle.name)
201        .bind(self.node_id)
202        .bind(handle.instance_id)
203        .bind(handle.status.as_str())
204        .bind(handle.restarts as i32)
205        .execute(&self.pool)
206        .await
207        .map_err(|e| forge_core::ForgeError::Database(e.to_string()))?;
208
209        Ok(())
210    }
211
212    async fn record_daemon_stop(&self, handle: &DaemonHandle) -> Result<()> {
213        sqlx::query(
214            r#"
215            UPDATE forge_daemons
216            SET status = 'stopped', last_heartbeat = NOW()
217            WHERE name = $1 AND instance_id = $2
218            "#,
219        )
220        .bind(&handle.name)
221        .bind(handle.instance_id)
222        .execute(&self.pool)
223        .await
224        .map_err(|e| forge_core::ForgeError::Database(e.to_string()))?;
225
226        Ok(())
227    }
228}
229
230struct DaemonHandle {
231    name: String,
232    instance_id: Uuid,
233    shutdown_tx: watch::Sender<bool>,
234    restarts: u32,
235    status: DaemonStatus,
236}
237
238#[allow(clippy::too_many_arguments)]
239async fn run_daemon_loop(
240    name: String,
241    entry: Arc<super::registry::DaemonEntry>,
242    pool: PgPool,
243    http_client: CircuitBreakerClient,
244    mut shutdown_rx: watch::Receiver<bool>,
245    node_id: Uuid,
246    startup_delay: Duration,
247    restart_on_panic: bool,
248    restart_delay: Duration,
249    max_restarts: Option<u32>,
250    leader_elected: bool,
251) {
252    let daemon_span = tracing::info_span!(
253        "daemon.lifecycle",
254        daemon.name = %name,
255        daemon.node_id = %node_id,
256        daemon.leader_elected = leader_elected,
257        daemon.restart_count = field::Empty,
258        daemon.uptime_ms = field::Empty,
259        daemon.final_status = field::Empty,
260        otel.name = %format!("daemon {}", name),
261    );
262    let daemon_start = Instant::now();
263
264    async {
265        let mut restarts = 0u32;
266
267        // Apply startup delay
268        if !startup_delay.is_zero() {
269            tracing::debug!(delay_ms = startup_delay.as_millis() as u64, "Waiting startup delay");
270            tokio::select! {
271                _ = tokio::time::sleep(startup_delay) => {}
272                _ = shutdown_rx.changed() => {
273                    tracing::debug!("Shutdown during startup delay");
274                    Span::current().record("daemon.final_status", "shutdown_during_startup");
275                    return;
276                }
277            }
278        }
279
280        loop {
281            // Check shutdown before attempting to run
282            if *shutdown_rx.borrow() {
283                tracing::debug!("Daemon shutting down");
284                Span::current().record("daemon.final_status", "shutdown");
285                break;
286            }
287
288            // Try to acquire leadership if required
289            if leader_elected {
290                match try_acquire_leadership(&pool, &name, node_id).await {
291                    Ok(true) => {
292                        tracing::info!("Acquired leadership");
293                    }
294                    Ok(false) => {
295                        // Another node has leadership, wait and retry
296                        tracing::debug!("Waiting for leadership");
297                        tokio::select! {
298                            _ = tokio::time::sleep(Duration::from_secs(5)) => {}
299                            _ = shutdown_rx.changed() => {
300                                tracing::debug!("Shutdown while waiting for leadership");
301                                Span::current().record("daemon.final_status", "shutdown_waiting_leadership");
302                                return;
303                            }
304                        }
305                        continue;
306                    }
307                    Err(e) => {
308                        tracing::debug!(error = %e, "Failed to check leadership");
309                        tokio::time::sleep(Duration::from_secs(1)).await;
310                        continue;
311                    }
312                }
313            }
314
315            // Update status to running
316            if let Err(e) = update_daemon_status(&pool, &name, DaemonStatus::Running).await {
317                tracing::debug!(error = %e, "Failed to update daemon status");
318            }
319
320            let instance_id = Uuid::new_v4();
321            let execution_start = Instant::now();
322
323            let exec_span = tracing::info_span!(
324                "daemon.execute",
325                daemon.instance_id = %instance_id,
326                daemon.execution_duration_ms = field::Empty,
327                daemon.status = field::Empty,
328            );
329
330            let result = async {
331                tracing::info!(instance_id = %instance_id, "Daemon instance starting");
332
333                // Create context with shutdown receiver
334                let (daemon_shutdown_tx, daemon_shutdown_rx) = watch::channel(false);
335
336                // Forward shutdown signal
337                let shutdown_rx_clone = shutdown_rx.clone();
338                let shutdown_tx_clone = daemon_shutdown_tx.clone();
339                tokio::spawn(async move {
340                    let mut rx = shutdown_rx_clone;
341                    while rx.changed().await.is_ok() {
342                        if *rx.borrow() {
343                            let _ = shutdown_tx_clone.send(true);
344                            break;
345                        }
346                    }
347                });
348
349                let ctx = DaemonContext::new(
350                    name.clone(),
351                    instance_id,
352                    pool.clone(),
353                    http_client.inner().clone(),
354                    daemon_shutdown_rx,
355                );
356
357                // Run the daemon
358                let result = std::panic::AssertUnwindSafe((entry.handler)(&ctx))
359                    .catch_unwind()
360                    .await;
361
362                let exec_duration = execution_start.elapsed().as_millis() as u64;
363                Span::current().record("daemon.execution_duration_ms", exec_duration);
364
365                result
366            }
367            .instrument(exec_span)
368            .await;
369
370            match result {
371                Ok(Ok(())) => {
372                    tracing::info!("Daemon completed gracefully");
373                    Span::current().record("daemon.final_status", "completed");
374                    let _ = update_daemon_status(&pool, &name, DaemonStatus::Stopped).await;
375                    break;
376                }
377                Ok(Err(e)) => {
378                    let recorded = record_daemon_error(&pool, &name, &e.to_string()).await.is_ok();
379                    tracing::error!(error = %e, recorded, "Daemon failed");
380                }
381                Err(_) => {
382                    let recorded = record_daemon_error(&pool, &name, "Daemon panicked").await.is_ok();
383                    tracing::error!(recorded, "Daemon panicked");
384                }
385            }
386
387            // Check shutdown again
388            if *shutdown_rx.borrow() {
389                tracing::debug!("Daemon shutting down after failure");
390                Span::current().record("daemon.final_status", "shutdown_after_failure");
391                break;
392            }
393
394            // Check restart policy
395            if !restart_on_panic {
396                tracing::warn!("Restart disabled, daemon stopping");
397                Span::current().record("daemon.final_status", "failed_no_restart");
398                let _ = update_daemon_status(&pool, &name, DaemonStatus::Failed).await;
399                break;
400            }
401
402            restarts += 1;
403            Span::current().record("daemon.restart_count", restarts);
404
405            // Check max restarts
406            if let Some(max) = max_restarts
407                && restarts >= max
408            {
409                tracing::error!(restarts, max, "Max restarts exceeded");
410                Span::current().record("daemon.final_status", "max_restarts_exceeded");
411                let _ = update_daemon_status(&pool, &name, DaemonStatus::Failed).await;
412                break;
413            }
414
415            // Update status to restarting
416            let _ = update_daemon_status(&pool, &name, DaemonStatus::Restarting).await;
417
418            tracing::warn!(
419                restarts,
420                restart_delay_ms = restart_delay.as_millis() as u64,
421                "Restarting daemon"
422            );
423
424            // Wait before restart
425            tokio::select! {
426                _ = tokio::time::sleep(restart_delay) => {}
427                _ = shutdown_rx.changed() => {
428                    tracing::debug!("Shutdown during restart delay");
429                    Span::current().record("daemon.final_status", "shutdown_during_restart");
430                    break;
431                }
432            }
433        }
434
435        let uptime = daemon_start.elapsed().as_millis() as u64;
436        Span::current().record("daemon.uptime_ms", uptime);
437        Span::current().record("daemon.restart_count", restarts);
438
439        // Release leadership if we held it
440        if leader_elected {
441            let _ = release_leadership(&pool, &name, node_id).await;
442        }
443
444        tracing::info!(
445            uptime_ms = uptime,
446            restart_count = restarts,
447            "Daemon lifecycle ended"
448        );
449    }
450    .instrument(daemon_span)
451    .await
452}
453
454async fn try_acquire_leadership(pool: &PgPool, daemon_name: &str, node_id: Uuid) -> Result<bool> {
455    // Use advisory lock for leader election
456    // Hash the daemon name to get a consistent lock ID
457    let lock_id = daemon_name
458        .bytes()
459        .fold(0i64, |acc, b| acc.wrapping_add(b as i64).wrapping_mul(31));
460
461    let result: (bool,) = sqlx::query_as("SELECT pg_try_advisory_lock($1)")
462        .bind(lock_id)
463        .fetch_one(pool)
464        .await
465        .map_err(|e| forge_core::ForgeError::Database(e.to_string()))?;
466
467    if result.0 {
468        // Update daemon record with our node_id
469        sqlx::query("UPDATE forge_daemons SET node_id = $1 WHERE name = $2")
470            .bind(node_id)
471            .bind(daemon_name)
472            .execute(pool)
473            .await
474            .map_err(|e| forge_core::ForgeError::Database(e.to_string()))?;
475    }
476
477    Ok(result.0)
478}
479
480async fn release_leadership(pool: &PgPool, daemon_name: &str, _node_id: Uuid) -> Result<()> {
481    let lock_id = daemon_name
482        .bytes()
483        .fold(0i64, |acc, b| acc.wrapping_add(b as i64).wrapping_mul(31));
484
485    sqlx::query("SELECT pg_advisory_unlock($1)")
486        .bind(lock_id)
487        .execute(pool)
488        .await
489        .map_err(|e| forge_core::ForgeError::Database(e.to_string()))?;
490
491    Ok(())
492}
493
494async fn update_daemon_status(pool: &PgPool, name: &str, status: DaemonStatus) -> Result<()> {
495    sqlx::query("UPDATE forge_daemons SET status = $1, last_heartbeat = NOW() WHERE name = $2")
496        .bind(status.as_str())
497        .bind(name)
498        .execute(pool)
499        .await
500        .map_err(|e| forge_core::ForgeError::Database(e.to_string()))?;
501
502    Ok(())
503}
504
505async fn record_daemon_error(pool: &PgPool, name: &str, error: &str) -> Result<()> {
506    sqlx::query(
507        "UPDATE forge_daemons SET status = 'failed', last_error = $1, last_heartbeat = NOW() WHERE name = $2",
508    )
509    .bind(error)
510    .bind(name)
511    .execute(pool)
512    .await
513    .map_err(|e| forge_core::ForgeError::Database(e.to_string()))?;
514
515    Ok(())
516}
517
518#[cfg(test)]
519mod tests {
520    use super::*;
521
522    #[test]
523    fn test_default_config() {
524        let config = DaemonRunnerConfig::default();
525        assert_eq!(config.health_check_interval, Duration::from_secs(30));
526        assert_eq!(config.heartbeat_interval, Duration::from_secs(10));
527    }
528}