1use std::collections::HashMap;
4use std::sync::Arc;
5use std::time::{Duration, Instant};
6
7use forge_core::CircuitBreakerClient;
8use forge_core::Result;
9use forge_core::daemon::{DaemonContext, DaemonStatus};
10use forge_core::function::{JobDispatch, WorkflowDispatch};
11use futures_util::FutureExt;
12use sqlx::PgPool;
13use tokio::sync::{broadcast, watch};
14use tracing::{Instrument, Span, field};
15use uuid::Uuid;
16
17use super::registry::DaemonRegistry;
18
19#[derive(Debug, Clone)]
21pub struct DaemonRunnerConfig {
22 pub health_check_interval: Duration,
24 pub heartbeat_interval: Duration,
26}
27
28impl Default for DaemonRunnerConfig {
29 fn default() -> Self {
30 Self {
31 health_check_interval: Duration::from_secs(30),
32 heartbeat_interval: Duration::from_secs(10),
33 }
34 }
35}
36
37pub struct DaemonRunner {
39 registry: Arc<DaemonRegistry>,
40 pool: PgPool,
41 http_client: CircuitBreakerClient,
42 node_id: Uuid,
43 config: DaemonRunnerConfig,
44 shutdown_rx: broadcast::Receiver<()>,
45 job_dispatch: Option<Arc<dyn JobDispatch>>,
46 workflow_dispatch: Option<Arc<dyn WorkflowDispatch>>,
47}
48
49impl DaemonRunner {
50 pub fn new(
52 registry: Arc<DaemonRegistry>,
53 pool: PgPool,
54 http_client: CircuitBreakerClient,
55 node_id: Uuid,
56 shutdown_rx: broadcast::Receiver<()>,
57 ) -> Self {
58 Self {
59 registry,
60 pool,
61 http_client,
62 node_id,
63 config: DaemonRunnerConfig::default(),
64 shutdown_rx,
65 job_dispatch: None,
66 workflow_dispatch: None,
67 }
68 }
69
70 pub fn with_job_dispatch(mut self, dispatcher: Arc<dyn JobDispatch>) -> Self {
72 self.job_dispatch = Some(dispatcher);
73 self
74 }
75
76 pub fn with_workflow_dispatch(mut self, dispatcher: Arc<dyn WorkflowDispatch>) -> Self {
78 self.workflow_dispatch = Some(dispatcher);
79 self
80 }
81
82 pub fn with_config(mut self, config: DaemonRunnerConfig) -> Self {
84 self.config = config;
85 self
86 }
87
88 pub async fn run(mut self) -> Result<()> {
90 let runner_span = tracing::info_span!(
91 "daemon.runner",
92 daemon.node_id = %self.node_id,
93 daemon.count = self.registry.len(),
94 daemon.uptime_ms = field::Empty,
95 );
96 let start_time = Instant::now();
97
98 async {
99 if self.registry.is_empty() {
100 tracing::debug!("No daemons registered, daemon runner idle");
101 let _ = self.shutdown_rx.recv().await;
103 Span::current().record("daemon.uptime_ms", start_time.elapsed().as_millis() as u64);
104 return Ok(());
105 }
106
107 tracing::info!(count = self.registry.len(), "Daemon runner starting");
108
109 let mut daemon_handles: HashMap<String, DaemonHandle> = HashMap::new();
111
112 for (name, entry) in self.registry.daemons() {
114 let info = &entry.info;
115
116 let (shutdown_tx, shutdown_rx) = watch::channel(false);
118
119 let handle = DaemonHandle {
120 name: name.to_string(),
121 instance_id: Uuid::new_v4(),
122 shutdown_tx,
123 restarts: 0,
124 status: DaemonStatus::Pending,
125 };
126
127 if let Err(e) = self.record_daemon_start(&handle).await {
129 tracing::debug!(daemon = name, error = %e, "Failed to record daemon start");
130 }
131
132 tracing::info!(
133 daemon.name = name,
134 daemon.instance_id = %handle.instance_id,
135 daemon.leader_elected = info.leader_elected,
136 "Starting daemon"
137 );
138
139 let daemon_entry = entry.clone();
141 let pool = self.pool.clone();
142 let http_client = self.http_client.clone();
143 let daemon_name = name.to_string();
144 let startup_delay = info.startup_delay;
145 let restart_on_panic = info.restart_on_panic;
146 let restart_delay = info.restart_delay;
147 let max_restarts = info.max_restarts;
148 let leader_elected = info.leader_elected;
149 let node_id = self.node_id;
150 let job_dispatch = self.job_dispatch.clone();
151 let workflow_dispatch = self.workflow_dispatch.clone();
152
153 tokio::spawn(async move {
154 run_daemon_loop(
155 daemon_name,
156 daemon_entry,
157 pool,
158 http_client,
159 shutdown_rx,
160 node_id,
161 startup_delay,
162 restart_on_panic,
163 restart_delay,
164 max_restarts,
165 leader_elected,
166 job_dispatch,
167 workflow_dispatch,
168 )
169 .await
170 });
171
172 daemon_handles.insert(name.to_string(), handle);
173 }
174
175 let _ = self.shutdown_rx.recv().await;
177 tracing::info!("Daemon runner received shutdown signal");
178
179 for (name, handle) in &daemon_handles {
181 tracing::info!(daemon.name = name, "Signaling daemon to stop");
182 let _ = handle.shutdown_tx.send(true);
183 }
184
185 tokio::time::sleep(Duration::from_secs(2)).await;
187
188 for (name, handle) in &daemon_handles {
190 if let Err(e) = self.record_daemon_stop(handle).await {
191 tracing::debug!(daemon = name, error = %e, "Failed to record daemon stop");
192 }
193 }
194
195 Span::current().record("daemon.uptime_ms", start_time.elapsed().as_millis() as u64);
196 tracing::info!(
197 daemon.uptime_ms = start_time.elapsed().as_millis() as u64,
198 "Daemon runner stopped"
199 );
200 Ok(())
201 }
202 .instrument(runner_span)
203 .await
204 }
205
206 async fn record_daemon_start(&self, handle: &DaemonHandle) -> Result<()> {
207 sqlx::query(
208 r#"
209 INSERT INTO forge_daemons (name, node_id, instance_id, status, restarts, started_at, last_heartbeat)
210 VALUES ($1, $2, $3, $4, $5, NOW(), NOW())
211 ON CONFLICT (name) DO UPDATE SET
212 node_id = EXCLUDED.node_id,
213 instance_id = EXCLUDED.instance_id,
214 status = EXCLUDED.status,
215 restarts = EXCLUDED.restarts,
216 started_at = NOW(),
217 last_heartbeat = NOW(),
218 last_error = NULL
219 "#,
220 )
221 .bind(&handle.name)
222 .bind(self.node_id)
223 .bind(handle.instance_id)
224 .bind(handle.status.as_str())
225 .bind(handle.restarts as i32)
226 .execute(&self.pool)
227 .await
228 .map_err(|e| forge_core::ForgeError::Database(e.to_string()))?;
229
230 Ok(())
231 }
232
233 async fn record_daemon_stop(&self, handle: &DaemonHandle) -> Result<()> {
234 sqlx::query(
235 r#"
236 UPDATE forge_daemons
237 SET status = 'stopped', last_heartbeat = NOW()
238 WHERE name = $1 AND instance_id = $2
239 "#,
240 )
241 .bind(&handle.name)
242 .bind(handle.instance_id)
243 .execute(&self.pool)
244 .await
245 .map_err(|e| forge_core::ForgeError::Database(e.to_string()))?;
246
247 Ok(())
248 }
249}
250
251struct DaemonHandle {
252 name: String,
253 instance_id: Uuid,
254 shutdown_tx: watch::Sender<bool>,
255 restarts: u32,
256 status: DaemonStatus,
257}
258
259#[allow(clippy::too_many_arguments)]
260async fn run_daemon_loop(
261 name: String,
262 entry: Arc<super::registry::DaemonEntry>,
263 pool: PgPool,
264 http_client: CircuitBreakerClient,
265 mut shutdown_rx: watch::Receiver<bool>,
266 node_id: Uuid,
267 startup_delay: Duration,
268 restart_on_panic: bool,
269 restart_delay: Duration,
270 max_restarts: Option<u32>,
271 leader_elected: bool,
272 job_dispatch: Option<Arc<dyn JobDispatch>>,
273 workflow_dispatch: Option<Arc<dyn WorkflowDispatch>>,
274) {
275 let daemon_span = tracing::info_span!(
276 "daemon.lifecycle",
277 daemon.name = %name,
278 daemon.node_id = %node_id,
279 daemon.leader_elected = leader_elected,
280 daemon.restart_count = field::Empty,
281 daemon.uptime_ms = field::Empty,
282 daemon.final_status = field::Empty,
283 otel.name = %format!("daemon {}", name),
284 );
285 let daemon_start = Instant::now();
286
287 async {
288 let mut restarts = 0u32;
289
290 if !startup_delay.is_zero() {
292 tracing::debug!(delay_ms = startup_delay.as_millis() as u64, "Waiting startup delay");
293 tokio::select! {
294 _ = tokio::time::sleep(startup_delay) => {}
295 _ = shutdown_rx.changed() => {
296 tracing::debug!("Shutdown during startup delay");
297 Span::current().record("daemon.final_status", "shutdown_during_startup");
298 return;
299 }
300 }
301 }
302
303 loop {
304 if *shutdown_rx.borrow() {
306 tracing::debug!("Daemon shutting down");
307 Span::current().record("daemon.final_status", "shutdown");
308 break;
309 }
310
311 if leader_elected {
313 match try_acquire_leadership(&pool, &name, node_id).await {
314 Ok(true) => {
315 tracing::info!("Acquired leadership");
316 }
317 Ok(false) => {
318 tracing::debug!("Waiting for leadership");
320 tokio::select! {
321 _ = tokio::time::sleep(Duration::from_secs(5)) => {}
322 _ = shutdown_rx.changed() => {
323 tracing::debug!("Shutdown while waiting for leadership");
324 Span::current().record("daemon.final_status", "shutdown_waiting_leadership");
325 return;
326 }
327 }
328 continue;
329 }
330 Err(e) => {
331 tracing::debug!(error = %e, "Failed to check leadership");
332 tokio::time::sleep(Duration::from_secs(1)).await;
333 continue;
334 }
335 }
336 }
337
338 if let Err(e) = update_daemon_status(&pool, &name, DaemonStatus::Running).await {
340 tracing::debug!(error = %e, "Failed to update daemon status");
341 }
342
343 let instance_id = Uuid::new_v4();
344 let execution_start = Instant::now();
345
346 let exec_span = tracing::info_span!(
347 "daemon.execute",
348 daemon.instance_id = %instance_id,
349 daemon.execution_duration_ms = field::Empty,
350 daemon.status = field::Empty,
351 );
352
353 let result = async {
354 tracing::info!(instance_id = %instance_id, "Daemon instance starting");
355
356 let (daemon_shutdown_tx, daemon_shutdown_rx) = watch::channel(false);
358
359 let shutdown_rx_clone = shutdown_rx.clone();
361 let shutdown_tx_clone = daemon_shutdown_tx.clone();
362 tokio::spawn(async move {
363 let mut rx = shutdown_rx_clone;
364 while rx.changed().await.is_ok() {
365 if *rx.borrow() {
366 let _ = shutdown_tx_clone.send(true);
367 break;
368 }
369 }
370 });
371
372 let mut ctx = DaemonContext::new(
373 name.clone(),
374 instance_id,
375 pool.clone(),
376 http_client.inner().clone(),
377 daemon_shutdown_rx,
378 );
379 if let Some(ref jd) = job_dispatch {
380 ctx = ctx.with_job_dispatch(jd.clone());
381 }
382 if let Some(ref wd) = workflow_dispatch {
383 ctx = ctx.with_workflow_dispatch(wd.clone());
384 }
385
386 let result = std::panic::AssertUnwindSafe((entry.handler)(&ctx))
388 .catch_unwind()
389 .await;
390
391 let exec_duration = execution_start.elapsed().as_millis() as u64;
392 Span::current().record("daemon.execution_duration_ms", exec_duration);
393
394 result
395 }
396 .instrument(exec_span)
397 .await;
398
399 match result {
400 Ok(Ok(())) => {
401 tracing::info!("Daemon completed gracefully");
402 Span::current().record("daemon.final_status", "completed");
403 if let Err(e) = update_daemon_status(&pool, &name, DaemonStatus::Stopped).await {
404 tracing::debug!(daemon = %name, error = %e, "Status update failed");
405 }
406 break;
407 }
408 Ok(Err(e)) => {
409 let recorded = record_daemon_error(&pool, &name, &e.to_string()).await.is_ok();
410 tracing::error!(error = %e, recorded, "Daemon failed");
411 }
412 Err(_) => {
413 let recorded = record_daemon_error(&pool, &name, "Daemon panicked").await.is_ok();
414 tracing::error!(recorded, "Daemon panicked");
415 }
416 }
417
418 if *shutdown_rx.borrow() {
420 tracing::debug!("Daemon shutting down after failure");
421 Span::current().record("daemon.final_status", "shutdown_after_failure");
422 break;
423 }
424
425 if !restart_on_panic {
427 tracing::warn!("Restart disabled, daemon stopping");
428 Span::current().record("daemon.final_status", "failed_no_restart");
429 if let Err(e) = update_daemon_status(&pool, &name, DaemonStatus::Failed).await {
430 tracing::debug!(daemon = %name, error = %e, "Status update failed");
431 }
432 break;
433 }
434
435 restarts += 1;
436 Span::current().record("daemon.restart_count", restarts);
437
438 if let Some(max) = max_restarts
440 && restarts >= max
441 {
442 tracing::error!(restarts, max, "Max restarts exceeded");
443 Span::current().record("daemon.final_status", "max_restarts_exceeded");
444 if let Err(e) = update_daemon_status(&pool, &name, DaemonStatus::Failed).await {
445 tracing::debug!(daemon = %name, error = %e, "Status update failed");
446 }
447 break;
448 }
449
450 if let Err(e) = update_daemon_status(&pool, &name, DaemonStatus::Restarting).await {
452 tracing::debug!(daemon = %name, error = %e, "Status update failed");
453 }
454
455 tracing::warn!(
456 restarts,
457 restart_delay_ms = restart_delay.as_millis() as u64,
458 "Restarting daemon"
459 );
460
461 tokio::select! {
463 _ = tokio::time::sleep(restart_delay) => {}
464 _ = shutdown_rx.changed() => {
465 tracing::debug!("Shutdown during restart delay");
466 Span::current().record("daemon.final_status", "shutdown_during_restart");
467 break;
468 }
469 }
470 }
471
472 let uptime = daemon_start.elapsed().as_millis() as u64;
473 Span::current().record("daemon.uptime_ms", uptime);
474 Span::current().record("daemon.restart_count", restarts);
475
476 if leader_elected
478 && let Err(e) = release_leadership(&pool, &name, node_id).await
479 {
480 tracing::debug!(daemon = %name, error = %e, "Failed to release leadership");
481 }
482
483 tracing::info!(
484 uptime_ms = uptime,
485 restart_count = restarts,
486 "Daemon lifecycle ended"
487 );
488 }
489 .instrument(daemon_span)
490 .await
491}
492
493async fn try_acquire_leadership(pool: &PgPool, daemon_name: &str, node_id: Uuid) -> Result<bool> {
494 let lock_id = daemon_name
497 .bytes()
498 .fold(0i64, |acc, b| acc.wrapping_add(b as i64).wrapping_mul(31));
499
500 let result: (bool,) = sqlx::query_as("SELECT pg_try_advisory_lock($1)")
501 .bind(lock_id)
502 .fetch_one(pool)
503 .await
504 .map_err(|e| forge_core::ForgeError::Database(e.to_string()))?;
505
506 if result.0 {
507 sqlx::query("UPDATE forge_daemons SET node_id = $1 WHERE name = $2")
509 .bind(node_id)
510 .bind(daemon_name)
511 .execute(pool)
512 .await
513 .map_err(|e| forge_core::ForgeError::Database(e.to_string()))?;
514 }
515
516 Ok(result.0)
517}
518
519async fn release_leadership(pool: &PgPool, daemon_name: &str, _node_id: Uuid) -> Result<()> {
520 let lock_id = daemon_name
521 .bytes()
522 .fold(0i64, |acc, b| acc.wrapping_add(b as i64).wrapping_mul(31));
523
524 sqlx::query("SELECT pg_advisory_unlock($1)")
525 .bind(lock_id)
526 .execute(pool)
527 .await
528 .map_err(|e| forge_core::ForgeError::Database(e.to_string()))?;
529
530 Ok(())
531}
532
533async fn update_daemon_status(pool: &PgPool, name: &str, status: DaemonStatus) -> Result<()> {
534 sqlx::query("UPDATE forge_daemons SET status = $1, last_heartbeat = NOW() WHERE name = $2")
535 .bind(status.as_str())
536 .bind(name)
537 .execute(pool)
538 .await
539 .map_err(|e| forge_core::ForgeError::Database(e.to_string()))?;
540
541 Ok(())
542}
543
544async fn record_daemon_error(pool: &PgPool, name: &str, error: &str) -> Result<()> {
545 sqlx::query(
546 "UPDATE forge_daemons SET status = 'failed', last_error = $1, last_heartbeat = NOW() WHERE name = $2",
547 )
548 .bind(error)
549 .bind(name)
550 .execute(pool)
551 .await
552 .map_err(|e| forge_core::ForgeError::Database(e.to_string()))?;
553
554 Ok(())
555}
556
557#[cfg(test)]
558mod tests {
559 use super::*;
560
561 #[test]
562 fn test_default_config() {
563 let config = DaemonRunnerConfig::default();
564 assert_eq!(config.health_check_interval, Duration::from_secs(30));
565 assert_eq!(config.heartbeat_interval, Duration::from_secs(10));
566 }
567}