Skip to main content

forge_runtime/daemon/
runner.rs

1//! Daemon runner with restart logic and leader election.
2
3use std::collections::HashMap;
4use std::sync::Arc;
5use std::time::Duration;
6
7use forge_core::CircuitBreakerClient;
8use forge_core::Result;
9use forge_core::daemon::{DaemonContext, DaemonStatus};
10use futures_util::FutureExt;
11use sqlx::PgPool;
12use tokio::sync::{broadcast, watch};
13use tracing::{error, info, warn};
14use uuid::Uuid;
15
16use super::registry::DaemonRegistry;
17
18/// Configuration for the daemon runner.
19#[derive(Debug, Clone)]
20pub struct DaemonRunnerConfig {
21    /// How often to check daemon health.
22    pub health_check_interval: Duration,
23    /// How often to send heartbeats.
24    pub heartbeat_interval: Duration,
25}
26
27impl Default for DaemonRunnerConfig {
28    fn default() -> Self {
29        Self {
30            health_check_interval: Duration::from_secs(30),
31            heartbeat_interval: Duration::from_secs(10),
32        }
33    }
34}
35
36/// Manages running all registered daemons.
37pub struct DaemonRunner {
38    registry: Arc<DaemonRegistry>,
39    pool: PgPool,
40    http_client: CircuitBreakerClient,
41    node_id: Uuid,
42    config: DaemonRunnerConfig,
43    shutdown_rx: broadcast::Receiver<()>,
44}
45
46impl DaemonRunner {
47    /// Create a new daemon runner.
48    pub fn new(
49        registry: Arc<DaemonRegistry>,
50        pool: PgPool,
51        http_client: CircuitBreakerClient,
52        node_id: Uuid,
53        shutdown_rx: broadcast::Receiver<()>,
54    ) -> Self {
55        Self {
56            registry,
57            pool,
58            http_client,
59            node_id,
60            config: DaemonRunnerConfig::default(),
61            shutdown_rx,
62        }
63    }
64
65    /// Set custom configuration.
66    pub fn with_config(mut self, config: DaemonRunnerConfig) -> Self {
67        self.config = config;
68        self
69    }
70
71    /// Run all registered daemons.
72    pub async fn run(mut self) -> Result<()> {
73        if self.registry.is_empty() {
74            info!("No daemons registered, daemon runner idle");
75            // Wait for shutdown
76            let _ = self.shutdown_rx.recv().await;
77            return Ok(());
78        }
79
80        info!(
81            "Starting daemon runner with {} daemons",
82            self.registry.len()
83        );
84
85        // Create individual shutdown channels for each daemon
86        let mut daemon_handles: HashMap<String, DaemonHandle> = HashMap::new();
87
88        // Start each daemon
89        for (name, entry) in self.registry.daemons() {
90            let info = &entry.info;
91
92            // Create shutdown channel for this daemon
93            let (shutdown_tx, shutdown_rx) = watch::channel(false);
94
95            let handle = DaemonHandle {
96                name: name.to_string(),
97                instance_id: Uuid::new_v4(),
98                shutdown_tx,
99                restarts: 0,
100                status: DaemonStatus::Pending,
101            };
102
103            // Record daemon in database
104            if let Err(e) = self.record_daemon_start(&handle).await {
105                error!(daemon = name, error = %e, "Failed to record daemon start");
106            }
107
108            // Spawn daemon task
109            let daemon_entry = entry.clone();
110            let pool = self.pool.clone();
111            let http_client = self.http_client.clone();
112            let daemon_name = name.to_string();
113            let startup_delay = info.startup_delay;
114            let restart_on_panic = info.restart_on_panic;
115            let restart_delay = info.restart_delay;
116            let max_restarts = info.max_restarts;
117            let leader_elected = info.leader_elected;
118            let node_id = self.node_id;
119
120            tokio::spawn(async move {
121                run_daemon_loop(
122                    daemon_name,
123                    daemon_entry,
124                    pool,
125                    http_client,
126                    shutdown_rx,
127                    node_id,
128                    startup_delay,
129                    restart_on_panic,
130                    restart_delay,
131                    max_restarts,
132                    leader_elected,
133                )
134                .await
135            });
136
137            daemon_handles.insert(name.to_string(), handle);
138        }
139
140        // Wait for shutdown signal
141        let _ = self.shutdown_rx.recv().await;
142        info!("Daemon runner received shutdown signal");
143
144        // Signal all daemons to stop
145        for (name, handle) in &daemon_handles {
146            info!(daemon = name, "Signaling daemon to stop");
147            let _ = handle.shutdown_tx.send(true);
148        }
149
150        // Give daemons time to clean up
151        tokio::time::sleep(Duration::from_secs(2)).await;
152
153        // Update daemon status in database
154        for (name, handle) in &daemon_handles {
155            if let Err(e) = self.record_daemon_stop(handle).await {
156                warn!(daemon = name, error = %e, "Failed to record daemon stop");
157            }
158        }
159
160        Ok(())
161    }
162
163    async fn record_daemon_start(&self, handle: &DaemonHandle) -> Result<()> {
164        sqlx::query(
165            r#"
166            INSERT INTO forge_daemons (name, node_id, instance_id, status, restarts, started_at, last_heartbeat)
167            VALUES ($1, $2, $3, $4, $5, NOW(), NOW())
168            ON CONFLICT (name) DO UPDATE SET
169                node_id = EXCLUDED.node_id,
170                instance_id = EXCLUDED.instance_id,
171                status = EXCLUDED.status,
172                restarts = EXCLUDED.restarts,
173                started_at = NOW(),
174                last_heartbeat = NOW(),
175                last_error = NULL
176            "#,
177        )
178        .bind(&handle.name)
179        .bind(self.node_id)
180        .bind(handle.instance_id)
181        .bind(handle.status.as_str())
182        .bind(handle.restarts as i32)
183        .execute(&self.pool)
184        .await
185        .map_err(|e| forge_core::ForgeError::Database(e.to_string()))?;
186
187        Ok(())
188    }
189
190    async fn record_daemon_stop(&self, handle: &DaemonHandle) -> Result<()> {
191        sqlx::query(
192            r#"
193            UPDATE forge_daemons
194            SET status = 'stopped', last_heartbeat = NOW()
195            WHERE name = $1 AND instance_id = $2
196            "#,
197        )
198        .bind(&handle.name)
199        .bind(handle.instance_id)
200        .execute(&self.pool)
201        .await
202        .map_err(|e| forge_core::ForgeError::Database(e.to_string()))?;
203
204        Ok(())
205    }
206}
207
208struct DaemonHandle {
209    name: String,
210    instance_id: Uuid,
211    shutdown_tx: watch::Sender<bool>,
212    restarts: u32,
213    status: DaemonStatus,
214}
215
216#[allow(clippy::too_many_arguments)]
217async fn run_daemon_loop(
218    name: String,
219    entry: Arc<super::registry::DaemonEntry>,
220    pool: PgPool,
221    http_client: CircuitBreakerClient,
222    mut shutdown_rx: watch::Receiver<bool>,
223    node_id: Uuid,
224    startup_delay: Duration,
225    restart_on_panic: bool,
226    restart_delay: Duration,
227    max_restarts: Option<u32>,
228    leader_elected: bool,
229) {
230    let mut restarts = 0u32;
231
232    // Apply startup delay
233    if !startup_delay.is_zero() {
234        info!(daemon = %name, delay = ?startup_delay, "Waiting startup delay");
235        tokio::select! {
236            _ = tokio::time::sleep(startup_delay) => {}
237            _ = shutdown_rx.changed() => {
238                info!(daemon = %name, "Shutdown during startup delay");
239                return;
240            }
241        }
242    }
243
244    loop {
245        // Check shutdown before attempting to run
246        if *shutdown_rx.borrow() {
247            info!(daemon = %name, "Daemon shutting down");
248            break;
249        }
250
251        // Try to acquire leadership if required
252        if leader_elected {
253            match try_acquire_leadership(&pool, &name, node_id).await {
254                Ok(true) => {
255                    info!(daemon = %name, "Acquired leadership");
256                }
257                Ok(false) => {
258                    // Another node has leadership, wait and retry
259                    tokio::select! {
260                        _ = tokio::time::sleep(Duration::from_secs(5)) => {}
261                        _ = shutdown_rx.changed() => {
262                            info!(daemon = %name, "Shutdown while waiting for leadership");
263                            return;
264                        }
265                    }
266                    continue;
267                }
268                Err(e) => {
269                    warn!(daemon = %name, error = %e, "Failed to check leadership");
270                    tokio::time::sleep(Duration::from_secs(1)).await;
271                    continue;
272                }
273            }
274        }
275
276        // Update status to running
277        if let Err(e) = update_daemon_status(&pool, &name, DaemonStatus::Running).await {
278            warn!(daemon = %name, error = %e, "Failed to update status");
279        }
280
281        let instance_id = Uuid::new_v4();
282        info!(daemon = %name, instance = %instance_id, "Starting daemon");
283
284        // Create context with shutdown receiver
285        let (daemon_shutdown_tx, daemon_shutdown_rx) = watch::channel(false);
286
287        // Forward shutdown signal
288        let shutdown_rx_clone = shutdown_rx.clone();
289        let shutdown_tx_clone = daemon_shutdown_tx.clone();
290        tokio::spawn(async move {
291            let mut rx = shutdown_rx_clone;
292            while rx.changed().await.is_ok() {
293                if *rx.borrow() {
294                    let _ = shutdown_tx_clone.send(true);
295                    break;
296                }
297            }
298        });
299
300        let ctx = DaemonContext::new(
301            name.clone(),
302            instance_id,
303            pool.clone(),
304            http_client.inner().clone(),
305            daemon_shutdown_rx,
306        );
307
308        // Run the daemon
309        let result = std::panic::AssertUnwindSafe((entry.handler)(&ctx))
310            .catch_unwind()
311            .await;
312
313        match result {
314            Ok(Ok(())) => {
315                info!(daemon = %name, "Daemon completed successfully");
316                if let Err(e) = update_daemon_status(&pool, &name, DaemonStatus::Stopped).await {
317                    warn!(daemon = %name, error = %e, "Failed to update status");
318                }
319                break;
320            }
321            Ok(Err(e)) => {
322                error!(daemon = %name, error = %e, "Daemon failed with error");
323                if let Err(e) = record_daemon_error(&pool, &name, &e.to_string()).await {
324                    warn!(daemon = %name, error = %e, "Failed to record error");
325                }
326            }
327            Err(_) => {
328                error!(daemon = %name, "Daemon panicked");
329                if let Err(e) = record_daemon_error(&pool, &name, "Daemon panicked").await {
330                    warn!(daemon = %name, error = %e, "Failed to record panic");
331                }
332            }
333        }
334
335        // Check shutdown again
336        if *shutdown_rx.borrow() {
337            info!(daemon = %name, "Daemon shutting down after failure");
338            break;
339        }
340
341        // Check restart policy
342        if !restart_on_panic {
343            warn!(daemon = %name, "Restart disabled, daemon stopping");
344            if let Err(e) = update_daemon_status(&pool, &name, DaemonStatus::Failed).await {
345                warn!(daemon = %name, error = %e, "Failed to update status");
346            }
347            break;
348        }
349
350        restarts += 1;
351
352        // Check max restarts
353        if let Some(max) = max_restarts {
354            if restarts >= max {
355                error!(daemon = %name, restarts = restarts, max = max, "Max restarts exceeded");
356                if let Err(e) = update_daemon_status(&pool, &name, DaemonStatus::Failed).await {
357                    warn!(daemon = %name, error = %e, "Failed to update status");
358                }
359                break;
360            }
361        }
362
363        // Update status to restarting
364        if let Err(e) = update_daemon_status(&pool, &name, DaemonStatus::Restarting).await {
365            warn!(daemon = %name, error = %e, "Failed to update status");
366        }
367
368        info!(daemon = %name, restarts = restarts, delay = ?restart_delay, "Restarting daemon");
369
370        // Wait before restart
371        tokio::select! {
372            _ = tokio::time::sleep(restart_delay) => {}
373            _ = shutdown_rx.changed() => {
374                info!(daemon = %name, "Shutdown during restart delay");
375                break;
376            }
377        }
378    }
379
380    // Release leadership if we held it
381    if leader_elected {
382        let _ = release_leadership(&pool, &name, node_id).await;
383    }
384}
385
386async fn try_acquire_leadership(pool: &PgPool, daemon_name: &str, node_id: Uuid) -> Result<bool> {
387    // Use advisory lock for leader election
388    // Hash the daemon name to get a consistent lock ID
389    let lock_id = daemon_name
390        .bytes()
391        .fold(0i64, |acc, b| acc.wrapping_add(b as i64).wrapping_mul(31));
392
393    let result: (bool,) = sqlx::query_as("SELECT pg_try_advisory_lock($1)")
394        .bind(lock_id)
395        .fetch_one(pool)
396        .await
397        .map_err(|e| forge_core::ForgeError::Database(e.to_string()))?;
398
399    if result.0 {
400        // Update daemon record with our node_id
401        sqlx::query("UPDATE forge_daemons SET node_id = $1 WHERE name = $2")
402            .bind(node_id)
403            .bind(daemon_name)
404            .execute(pool)
405            .await
406            .map_err(|e| forge_core::ForgeError::Database(e.to_string()))?;
407    }
408
409    Ok(result.0)
410}
411
412async fn release_leadership(pool: &PgPool, daemon_name: &str, _node_id: Uuid) -> Result<()> {
413    let lock_id = daemon_name
414        .bytes()
415        .fold(0i64, |acc, b| acc.wrapping_add(b as i64).wrapping_mul(31));
416
417    sqlx::query("SELECT pg_advisory_unlock($1)")
418        .bind(lock_id)
419        .execute(pool)
420        .await
421        .map_err(|e| forge_core::ForgeError::Database(e.to_string()))?;
422
423    Ok(())
424}
425
426async fn update_daemon_status(pool: &PgPool, name: &str, status: DaemonStatus) -> Result<()> {
427    sqlx::query("UPDATE forge_daemons SET status = $1, last_heartbeat = NOW() WHERE name = $2")
428        .bind(status.as_str())
429        .bind(name)
430        .execute(pool)
431        .await
432        .map_err(|e| forge_core::ForgeError::Database(e.to_string()))?;
433
434    Ok(())
435}
436
437async fn record_daemon_error(pool: &PgPool, name: &str, error: &str) -> Result<()> {
438    sqlx::query(
439        "UPDATE forge_daemons SET status = 'failed', last_error = $1, last_heartbeat = NOW() WHERE name = $2",
440    )
441    .bind(error)
442    .bind(name)
443    .execute(pool)
444    .await
445    .map_err(|e| forge_core::ForgeError::Database(e.to_string()))?;
446
447    Ok(())
448}
449
450#[cfg(test)]
451mod tests {
452    use super::*;
453
454    #[test]
455    fn test_default_config() {
456        let config = DaemonRunnerConfig::default();
457        assert_eq!(config.health_check_interval, Duration::from_secs(30));
458        assert_eq!(config.heartbeat_interval, Duration::from_secs(10));
459    }
460}