Skip to main content

forge_core/daemon/
context.rs

1use std::sync::Arc;
2
3use tokio::sync::{Mutex, watch};
4use tracing::Span;
5use uuid::Uuid;
6
7use crate::env::{EnvAccess, EnvProvider, RealEnvProvider};
8
9/// Context available to daemon handlers.
10pub struct DaemonContext {
11    /// Daemon name.
12    pub daemon_name: String,
13    /// Unique instance ID for this daemon execution.
14    pub instance_id: Uuid,
15    /// Database pool.
16    db_pool: sqlx::PgPool,
17    /// HTTP client for external calls.
18    http_client: reqwest::Client,
19    /// Shutdown signal receiver (wrapped in Mutex for interior mutability).
20    shutdown_rx: Mutex<watch::Receiver<bool>>,
21    /// Environment variable provider.
22    env_provider: Arc<dyn EnvProvider>,
23    /// Parent span for trace propagation.
24    span: Span,
25}
26
27impl DaemonContext {
28    /// Create a new daemon context.
29    pub fn new(
30        daemon_name: String,
31        instance_id: Uuid,
32        db_pool: sqlx::PgPool,
33        http_client: reqwest::Client,
34        shutdown_rx: watch::Receiver<bool>,
35    ) -> Self {
36        Self {
37            daemon_name,
38            instance_id,
39            db_pool,
40            http_client,
41            shutdown_rx: Mutex::new(shutdown_rx),
42            env_provider: Arc::new(RealEnvProvider::new()),
43            span: Span::current(),
44        }
45    }
46
47    /// Set environment provider.
48    pub fn with_env_provider(mut self, provider: Arc<dyn EnvProvider>) -> Self {
49        self.env_provider = provider;
50        self
51    }
52
53    pub fn db(&self) -> &sqlx::PgPool {
54        &self.db_pool
55    }
56
57    pub fn http(&self) -> &reqwest::Client {
58        &self.http_client
59    }
60
61    /// Check if shutdown has been requested.
62    pub fn is_shutdown_requested(&self) -> bool {
63        // Use try_lock to avoid blocking; if can't lock, assume not shutdown
64        self.shutdown_rx
65            .try_lock()
66            .map(|rx| *rx.borrow())
67            .unwrap_or(false)
68    }
69
70    /// Wait for shutdown signal.
71    ///
72    /// Use this in a `tokio::select!` to handle graceful shutdown:
73    ///
74    /// ```ignore
75    /// tokio::select! {
76    ///     _ = tokio::time::sleep(Duration::from_secs(60)) => {}
77    ///     _ = ctx.shutdown_signal() => break,
78    /// }
79    /// ```
80    pub async fn shutdown_signal(&self) {
81        let mut rx = self.shutdown_rx.lock().await;
82        // Wait until the value becomes true
83        while !*rx.borrow_and_update() {
84            if rx.changed().await.is_err() {
85                // Channel closed, treat as shutdown
86                break;
87            }
88        }
89    }
90
91    /// Send heartbeat to indicate daemon is alive.
92    pub async fn heartbeat(&self) -> crate::Result<()> {
93        tracing::trace!(daemon.name = %self.daemon_name, "Sending heartbeat");
94
95        sqlx::query(
96            r#"
97            UPDATE forge_daemons
98            SET last_heartbeat = NOW()
99            WHERE name = $1 AND instance_id = $2
100            "#,
101        )
102        .bind(&self.daemon_name)
103        .bind(self.instance_id)
104        .execute(&self.db_pool)
105        .await
106        .map_err(|e| crate::ForgeError::Database(e.to_string()))?;
107
108        Ok(())
109    }
110
111    /// Get the trace ID for this daemon execution.
112    ///
113    /// Returns the instance_id as a correlation ID.
114    pub fn trace_id(&self) -> String {
115        self.instance_id.to_string()
116    }
117
118    /// Get the parent span for trace propagation.
119    ///
120    /// Use this to create child spans within daemon handlers.
121    pub fn span(&self) -> &Span {
122        &self.span
123    }
124}
125
126impl EnvAccess for DaemonContext {
127    fn env_provider(&self) -> &dyn EnvProvider {
128        self.env_provider.as_ref()
129    }
130}
131
132#[cfg(test)]
133#[allow(clippy::unwrap_used, clippy::indexing_slicing)]
134mod tests {
135    use super::*;
136
137    #[tokio::test]
138    async fn test_daemon_context_creation() {
139        let pool = sqlx::postgres::PgPoolOptions::new()
140            .max_connections(1)
141            .connect_lazy("postgres://localhost/nonexistent")
142            .expect("Failed to create mock pool");
143
144        let (shutdown_tx, shutdown_rx) = watch::channel(false);
145        let instance_id = Uuid::new_v4();
146
147        let ctx = DaemonContext::new(
148            "test_daemon".to_string(),
149            instance_id,
150            pool,
151            reqwest::Client::new(),
152            shutdown_rx,
153        );
154
155        assert_eq!(ctx.daemon_name, "test_daemon");
156        assert_eq!(ctx.instance_id, instance_id);
157        assert!(!ctx.is_shutdown_requested());
158
159        // Signal shutdown
160        shutdown_tx.send(true).unwrap();
161        assert!(ctx.is_shutdown_requested());
162    }
163
164    #[tokio::test]
165    async fn test_shutdown_signal() {
166        let pool = sqlx::postgres::PgPoolOptions::new()
167            .max_connections(1)
168            .connect_lazy("postgres://localhost/nonexistent")
169            .expect("Failed to create mock pool");
170
171        let (shutdown_tx, shutdown_rx) = watch::channel(false);
172
173        let ctx = DaemonContext::new(
174            "test_daemon".to_string(),
175            Uuid::new_v4(),
176            pool,
177            reqwest::Client::new(),
178            shutdown_rx,
179        );
180
181        // Spawn a task to signal shutdown after a delay
182        tokio::spawn(async move {
183            tokio::time::sleep(std::time::Duration::from_millis(50)).await;
184            shutdown_tx.send(true).unwrap();
185        });
186
187        // Wait for shutdown signal
188        tokio::time::timeout(std::time::Duration::from_millis(200), ctx.shutdown_signal())
189            .await
190            .expect("Shutdown signal should complete");
191
192        assert!(ctx.is_shutdown_requested());
193    }
194}