Skip to main content

forge_runtime/daemon/
runner.rs

1//! Daemon runner with restart logic and leader election.
2
3use std::collections::HashMap;
4use std::sync::Arc;
5use std::time::{Duration, Instant};
6
7use forge_core::CircuitBreakerClient;
8use forge_core::Result;
9use forge_core::daemon::{DaemonContext, DaemonStatus};
10use forge_core::function::{JobDispatch, WorkflowDispatch};
11use futures_util::FutureExt;
12use sqlx::PgPool;
13use tokio::sync::{broadcast, watch};
14use tracing::{Instrument, Span, field};
15use uuid::Uuid;
16
17use super::registry::DaemonRegistry;
18
19/// Configuration for the daemon runner.
20#[derive(Debug, Clone)]
21pub struct DaemonRunnerConfig {
22    /// How often to check daemon health.
23    pub health_check_interval: Duration,
24    /// How often to send heartbeats.
25    pub heartbeat_interval: Duration,
26}
27
28impl Default for DaemonRunnerConfig {
29    fn default() -> Self {
30        Self {
31            health_check_interval: Duration::from_secs(30),
32            heartbeat_interval: Duration::from_secs(10),
33        }
34    }
35}
36
37/// Manages running all registered daemons.
38pub struct DaemonRunner {
39    registry: Arc<DaemonRegistry>,
40    pool: PgPool,
41    http_client: CircuitBreakerClient,
42    node_id: Uuid,
43    config: DaemonRunnerConfig,
44    shutdown_rx: broadcast::Receiver<()>,
45    job_dispatch: Option<Arc<dyn JobDispatch>>,
46    workflow_dispatch: Option<Arc<dyn WorkflowDispatch>>,
47}
48
49impl DaemonRunner {
50    /// Create a new daemon runner.
51    pub fn new(
52        registry: Arc<DaemonRegistry>,
53        pool: PgPool,
54        http_client: CircuitBreakerClient,
55        node_id: Uuid,
56        shutdown_rx: broadcast::Receiver<()>,
57    ) -> Self {
58        Self {
59            registry,
60            pool,
61            http_client,
62            node_id,
63            config: DaemonRunnerConfig::default(),
64            shutdown_rx,
65            job_dispatch: None,
66            workflow_dispatch: None,
67        }
68    }
69
70    /// Set job dispatcher for daemon contexts.
71    pub fn with_job_dispatch(mut self, dispatcher: Arc<dyn JobDispatch>) -> Self {
72        self.job_dispatch = Some(dispatcher);
73        self
74    }
75
76    /// Set workflow dispatcher for daemon contexts.
77    pub fn with_workflow_dispatch(mut self, dispatcher: Arc<dyn WorkflowDispatch>) -> Self {
78        self.workflow_dispatch = Some(dispatcher);
79        self
80    }
81
82    /// Set custom configuration.
83    pub fn with_config(mut self, config: DaemonRunnerConfig) -> Self {
84        self.config = config;
85        self
86    }
87
88    /// Run all registered daemons.
89    pub async fn run(mut self) -> Result<()> {
90        let runner_span = tracing::info_span!(
91            "daemon.runner",
92            daemon.node_id = %self.node_id,
93            daemon.count = self.registry.len(),
94            daemon.uptime_ms = field::Empty,
95        );
96        let start_time = Instant::now();
97
98        async {
99            if self.registry.is_empty() {
100                tracing::debug!("No daemons registered, daemon runner idle");
101                // Wait for shutdown
102                let _ = self.shutdown_rx.recv().await;
103                Span::current().record("daemon.uptime_ms", start_time.elapsed().as_millis() as u64);
104                return Ok(());
105            }
106
107            tracing::info!(count = self.registry.len(), "Daemon runner starting");
108
109            // Create individual shutdown channels for each daemon
110            let mut daemon_handles: HashMap<String, DaemonHandle> = HashMap::new();
111
112            // Start each daemon
113            for (name, entry) in self.registry.daemons() {
114                let info = &entry.info;
115
116                // Create shutdown channel for this daemon
117                let (shutdown_tx, shutdown_rx) = watch::channel(false);
118
119                let handle = DaemonHandle {
120                    name: name.to_string(),
121                    instance_id: Uuid::new_v4(),
122                    shutdown_tx,
123                    restarts: 0,
124                    status: DaemonStatus::Pending,
125                };
126
127                // Record daemon in database
128                if let Err(e) = self.record_daemon_start(&handle).await {
129                    tracing::debug!(daemon = name, error = %e, "Failed to record daemon start");
130                }
131
132                tracing::info!(
133                    daemon.name = name,
134                    daemon.instance_id = %handle.instance_id,
135                    daemon.leader_elected = info.leader_elected,
136                    "Starting daemon"
137                );
138
139                // Spawn daemon task
140                let daemon_entry = entry.clone();
141                let pool = self.pool.clone();
142                let http_client = self.http_client.clone();
143                let daemon_name = name.to_string();
144                let startup_delay = info.startup_delay;
145                let restart_on_panic = info.restart_on_panic;
146                let restart_delay = info.restart_delay;
147                let max_restarts = info.max_restarts;
148                let leader_elected = info.leader_elected;
149                let node_id = self.node_id;
150                let job_dispatch = self.job_dispatch.clone();
151                let workflow_dispatch = self.workflow_dispatch.clone();
152
153                tokio::spawn(async move {
154                    run_daemon_loop(
155                        daemon_name,
156                        daemon_entry,
157                        pool,
158                        http_client,
159                        shutdown_rx,
160                        node_id,
161                        startup_delay,
162                        restart_on_panic,
163                        restart_delay,
164                        max_restarts,
165                        leader_elected,
166                        job_dispatch,
167                        workflow_dispatch,
168                    )
169                    .await
170                });
171
172                daemon_handles.insert(name.to_string(), handle);
173            }
174
175            // Wait for shutdown signal
176            let _ = self.shutdown_rx.recv().await;
177            tracing::info!("Daemon runner received shutdown signal");
178
179            // Signal all daemons to stop
180            for (name, handle) in &daemon_handles {
181                tracing::info!(daemon.name = name, "Signaling daemon to stop");
182                let _ = handle.shutdown_tx.send(true);
183            }
184
185            // Give daemons time to clean up
186            tokio::time::sleep(Duration::from_secs(2)).await;
187
188            // Update daemon status in database
189            for (name, handle) in &daemon_handles {
190                if let Err(e) = self.record_daemon_stop(handle).await {
191                    tracing::debug!(daemon = name, error = %e, "Failed to record daemon stop");
192                }
193            }
194
195            Span::current().record("daemon.uptime_ms", start_time.elapsed().as_millis() as u64);
196            tracing::info!(
197                daemon.uptime_ms = start_time.elapsed().as_millis() as u64,
198                "Daemon runner stopped"
199            );
200            Ok(())
201        }
202        .instrument(runner_span)
203        .await
204    }
205
206    async fn record_daemon_start(&self, handle: &DaemonHandle) -> Result<()> {
207        sqlx::query(
208            r#"
209            INSERT INTO forge_daemons (name, node_id, instance_id, status, restarts, started_at, last_heartbeat)
210            VALUES ($1, $2, $3, $4, $5, NOW(), NOW())
211            ON CONFLICT (name) DO UPDATE SET
212                node_id = EXCLUDED.node_id,
213                instance_id = EXCLUDED.instance_id,
214                status = EXCLUDED.status,
215                restarts = EXCLUDED.restarts,
216                started_at = NOW(),
217                last_heartbeat = NOW(),
218                last_error = NULL
219            "#,
220        )
221        .bind(&handle.name)
222        .bind(self.node_id)
223        .bind(handle.instance_id)
224        .bind(handle.status.as_str())
225        .bind(handle.restarts as i32)
226        .execute(&self.pool)
227        .await
228        .map_err(|e| forge_core::ForgeError::Database(e.to_string()))?;
229
230        Ok(())
231    }
232
233    async fn record_daemon_stop(&self, handle: &DaemonHandle) -> Result<()> {
234        sqlx::query(
235            r#"
236            UPDATE forge_daemons
237            SET status = 'stopped', last_heartbeat = NOW()
238            WHERE name = $1 AND instance_id = $2
239            "#,
240        )
241        .bind(&handle.name)
242        .bind(handle.instance_id)
243        .execute(&self.pool)
244        .await
245        .map_err(|e| forge_core::ForgeError::Database(e.to_string()))?;
246
247        Ok(())
248    }
249}
250
251struct DaemonHandle {
252    name: String,
253    instance_id: Uuid,
254    shutdown_tx: watch::Sender<bool>,
255    restarts: u32,
256    status: DaemonStatus,
257}
258
259#[allow(clippy::too_many_arguments)]
260async fn run_daemon_loop(
261    name: String,
262    entry: Arc<super::registry::DaemonEntry>,
263    pool: PgPool,
264    http_client: CircuitBreakerClient,
265    mut shutdown_rx: watch::Receiver<bool>,
266    node_id: Uuid,
267    startup_delay: Duration,
268    restart_on_panic: bool,
269    restart_delay: Duration,
270    max_restarts: Option<u32>,
271    leader_elected: bool,
272    job_dispatch: Option<Arc<dyn JobDispatch>>,
273    workflow_dispatch: Option<Arc<dyn WorkflowDispatch>>,
274) {
275    let daemon_span = tracing::info_span!(
276        "daemon.lifecycle",
277        daemon.name = %name,
278        daemon.node_id = %node_id,
279        daemon.leader_elected = leader_elected,
280        daemon.restart_count = field::Empty,
281        daemon.uptime_ms = field::Empty,
282        daemon.final_status = field::Empty,
283        otel.name = %format!("daemon {}", name),
284    );
285    let daemon_start = Instant::now();
286
287    async {
288        let mut restarts = 0u32;
289
290        // Apply startup delay
291        if !startup_delay.is_zero() {
292            tracing::debug!(delay_ms = startup_delay.as_millis() as u64, "Waiting startup delay");
293            tokio::select! {
294                _ = tokio::time::sleep(startup_delay) => {}
295                _ = shutdown_rx.changed() => {
296                    tracing::debug!("Shutdown during startup delay");
297                    Span::current().record("daemon.final_status", "shutdown_during_startup");
298                    return;
299                }
300            }
301        }
302
303        loop {
304            // Check shutdown before attempting to run
305            if *shutdown_rx.borrow() {
306                tracing::debug!("Daemon shutting down");
307                Span::current().record("daemon.final_status", "shutdown");
308                break;
309            }
310
311            // Try to acquire leadership if required
312            if leader_elected {
313                match try_acquire_leadership(&pool, &name, node_id).await {
314                    Ok(true) => {
315                        tracing::info!("Acquired leadership");
316                    }
317                    Ok(false) => {
318                        // Another node has leadership, wait and retry
319                        tracing::debug!("Waiting for leadership");
320                        tokio::select! {
321                            _ = tokio::time::sleep(Duration::from_secs(5)) => {}
322                            _ = shutdown_rx.changed() => {
323                                tracing::debug!("Shutdown while waiting for leadership");
324                                Span::current().record("daemon.final_status", "shutdown_waiting_leadership");
325                                return;
326                            }
327                        }
328                        continue;
329                    }
330                    Err(e) => {
331                        tracing::debug!(error = %e, "Failed to check leadership");
332                        tokio::time::sleep(Duration::from_secs(1)).await;
333                        continue;
334                    }
335                }
336            }
337
338            // Update status to running
339            if let Err(e) = update_daemon_status(&pool, &name, DaemonStatus::Running).await {
340                tracing::debug!(error = %e, "Failed to update daemon status");
341            }
342
343            let instance_id = Uuid::new_v4();
344            let execution_start = Instant::now();
345
346            let exec_span = tracing::info_span!(
347                "daemon.execute",
348                daemon.instance_id = %instance_id,
349                daemon.execution_duration_ms = field::Empty,
350                daemon.status = field::Empty,
351            );
352
353            let result = async {
354                tracing::info!(instance_id = %instance_id, "Daemon instance starting");
355
356                // Create context with shutdown receiver
357                let (daemon_shutdown_tx, daemon_shutdown_rx) = watch::channel(false);
358
359                // Forward shutdown signal
360                let shutdown_rx_clone = shutdown_rx.clone();
361                let shutdown_tx_clone = daemon_shutdown_tx.clone();
362                tokio::spawn(async move {
363                    let mut rx = shutdown_rx_clone;
364                    while rx.changed().await.is_ok() {
365                        if *rx.borrow() {
366                            let _ = shutdown_tx_clone.send(true);
367                            break;
368                        }
369                    }
370                });
371
372                let mut ctx = DaemonContext::new(
373                    name.clone(),
374                    instance_id,
375                    pool.clone(),
376                    http_client.inner().clone(),
377                    daemon_shutdown_rx,
378                );
379                if let Some(ref jd) = job_dispatch {
380                    ctx = ctx.with_job_dispatch(jd.clone());
381                }
382                if let Some(ref wd) = workflow_dispatch {
383                    ctx = ctx.with_workflow_dispatch(wd.clone());
384                }
385
386                // Run the daemon
387                let result = std::panic::AssertUnwindSafe((entry.handler)(&ctx))
388                    .catch_unwind()
389                    .await;
390
391                let exec_duration = execution_start.elapsed().as_millis() as u64;
392                Span::current().record("daemon.execution_duration_ms", exec_duration);
393
394                result
395            }
396            .instrument(exec_span)
397            .await;
398
399            match result {
400                Ok(Ok(())) => {
401                    tracing::info!("Daemon completed gracefully");
402                    Span::current().record("daemon.final_status", "completed");
403                    if let Err(e) = update_daemon_status(&pool, &name, DaemonStatus::Stopped).await {
404                        tracing::debug!(daemon = %name, error = %e, "Status update failed");
405                    }
406                    break;
407                }
408                Ok(Err(e)) => {
409                    let recorded = record_daemon_error(&pool, &name, &e.to_string()).await.is_ok();
410                    tracing::error!(error = %e, recorded, "Daemon failed");
411                }
412                Err(_) => {
413                    let recorded = record_daemon_error(&pool, &name, "Daemon panicked").await.is_ok();
414                    tracing::error!(recorded, "Daemon panicked");
415                }
416            }
417
418            // Check shutdown again
419            if *shutdown_rx.borrow() {
420                tracing::debug!("Daemon shutting down after failure");
421                Span::current().record("daemon.final_status", "shutdown_after_failure");
422                break;
423            }
424
425            // Check restart policy
426            if !restart_on_panic {
427                tracing::warn!("Restart disabled, daemon stopping");
428                Span::current().record("daemon.final_status", "failed_no_restart");
429                if let Err(e) = update_daemon_status(&pool, &name, DaemonStatus::Failed).await {
430                    tracing::debug!(daemon = %name, error = %e, "Status update failed");
431                }
432                break;
433            }
434
435            restarts += 1;
436            Span::current().record("daemon.restart_count", restarts);
437
438            // Check max restarts
439            if let Some(max) = max_restarts
440                && restarts >= max
441            {
442                tracing::error!(restarts, max, "Max restarts exceeded");
443                Span::current().record("daemon.final_status", "max_restarts_exceeded");
444                if let Err(e) = update_daemon_status(&pool, &name, DaemonStatus::Failed).await {
445                    tracing::debug!(daemon = %name, error = %e, "Status update failed");
446                }
447                break;
448            }
449
450            // Update status to restarting
451            if let Err(e) = update_daemon_status(&pool, &name, DaemonStatus::Restarting).await {
452                tracing::debug!(daemon = %name, error = %e, "Status update failed");
453            }
454
455            tracing::warn!(
456                restarts,
457                restart_delay_ms = restart_delay.as_millis() as u64,
458                "Restarting daemon"
459            );
460
461            // Wait before restart
462            tokio::select! {
463                _ = tokio::time::sleep(restart_delay) => {}
464                _ = shutdown_rx.changed() => {
465                    tracing::debug!("Shutdown during restart delay");
466                    Span::current().record("daemon.final_status", "shutdown_during_restart");
467                    break;
468                }
469            }
470        }
471
472        let uptime = daemon_start.elapsed().as_millis() as u64;
473        Span::current().record("daemon.uptime_ms", uptime);
474        Span::current().record("daemon.restart_count", restarts);
475
476        // Release leadership if we held it
477        if leader_elected
478            && let Err(e) = release_leadership(&pool, &name, node_id).await
479        {
480            tracing::debug!(daemon = %name, error = %e, "Failed to release leadership");
481        }
482
483        tracing::info!(
484            uptime_ms = uptime,
485            restart_count = restarts,
486            "Daemon lifecycle ended"
487        );
488    }
489    .instrument(daemon_span)
490    .await
491}
492
493async fn try_acquire_leadership(pool: &PgPool, daemon_name: &str, node_id: Uuid) -> Result<bool> {
494    // Use advisory lock for leader election
495    // Hash the daemon name to get a consistent lock ID
496    let lock_id = daemon_name
497        .bytes()
498        .fold(0i64, |acc, b| acc.wrapping_add(b as i64).wrapping_mul(31));
499
500    let result: (bool,) = sqlx::query_as("SELECT pg_try_advisory_lock($1)")
501        .bind(lock_id)
502        .fetch_one(pool)
503        .await
504        .map_err(|e| forge_core::ForgeError::Database(e.to_string()))?;
505
506    if result.0 {
507        // Update daemon record with our node_id
508        sqlx::query("UPDATE forge_daemons SET node_id = $1 WHERE name = $2")
509            .bind(node_id)
510            .bind(daemon_name)
511            .execute(pool)
512            .await
513            .map_err(|e| forge_core::ForgeError::Database(e.to_string()))?;
514    }
515
516    Ok(result.0)
517}
518
519async fn release_leadership(pool: &PgPool, daemon_name: &str, _node_id: Uuid) -> Result<()> {
520    let lock_id = daemon_name
521        .bytes()
522        .fold(0i64, |acc, b| acc.wrapping_add(b as i64).wrapping_mul(31));
523
524    sqlx::query("SELECT pg_advisory_unlock($1)")
525        .bind(lock_id)
526        .execute(pool)
527        .await
528        .map_err(|e| forge_core::ForgeError::Database(e.to_string()))?;
529
530    Ok(())
531}
532
533async fn update_daemon_status(pool: &PgPool, name: &str, status: DaemonStatus) -> Result<()> {
534    sqlx::query("UPDATE forge_daemons SET status = $1, last_heartbeat = NOW() WHERE name = $2")
535        .bind(status.as_str())
536        .bind(name)
537        .execute(pool)
538        .await
539        .map_err(|e| forge_core::ForgeError::Database(e.to_string()))?;
540
541    Ok(())
542}
543
544async fn record_daemon_error(pool: &PgPool, name: &str, error: &str) -> Result<()> {
545    sqlx::query(
546        "UPDATE forge_daemons SET status = 'failed', last_error = $1, last_heartbeat = NOW() WHERE name = $2",
547    )
548    .bind(error)
549    .bind(name)
550    .execute(pool)
551    .await
552    .map_err(|e| forge_core::ForgeError::Database(e.to_string()))?;
553
554    Ok(())
555}
556
557#[cfg(test)]
558mod tests {
559    use super::*;
560
561    #[test]
562    fn test_default_config() {
563        let config = DaemonRunnerConfig::default();
564        assert_eq!(config.health_check_interval, Duration::from_secs(30));
565        assert_eq!(config.heartbeat_interval, Duration::from_secs(10));
566    }
567}