Skip to main content

forge_core/daemon/
context.rs

1use std::sync::Arc;
2
3use tokio::sync::{Mutex, watch};
4use uuid::Uuid;
5
6use crate::env::{EnvAccess, EnvProvider, RealEnvProvider};
7
8/// Context available to daemon handlers.
9pub struct DaemonContext {
10    /// Daemon name.
11    pub daemon_name: String,
12    /// Unique instance ID for this daemon execution.
13    pub instance_id: Uuid,
14    /// Database pool.
15    db_pool: sqlx::PgPool,
16    /// HTTP client for external calls.
17    http_client: reqwest::Client,
18    /// Shutdown signal receiver (wrapped in Mutex for interior mutability).
19    shutdown_rx: Mutex<watch::Receiver<bool>>,
20    /// Environment variable provider.
21    env_provider: Arc<dyn EnvProvider>,
22}
23
24impl DaemonContext {
25    /// Create a new daemon context.
26    pub fn new(
27        daemon_name: String,
28        instance_id: Uuid,
29        db_pool: sqlx::PgPool,
30        http_client: reqwest::Client,
31        shutdown_rx: watch::Receiver<bool>,
32    ) -> Self {
33        Self {
34            daemon_name,
35            instance_id,
36            db_pool,
37            http_client,
38            shutdown_rx: Mutex::new(shutdown_rx),
39            env_provider: Arc::new(RealEnvProvider::new()),
40        }
41    }
42
43    /// Set environment provider.
44    pub fn with_env_provider(mut self, provider: Arc<dyn EnvProvider>) -> Self {
45        self.env_provider = provider;
46        self
47    }
48
49    /// Get database pool.
50    pub fn db(&self) -> &sqlx::PgPool {
51        &self.db_pool
52    }
53
54    /// Get HTTP client.
55    pub fn http(&self) -> &reqwest::Client {
56        &self.http_client
57    }
58
59    /// Check if shutdown has been requested.
60    pub fn is_shutdown_requested(&self) -> bool {
61        // Use try_lock to avoid blocking; if can't lock, assume not shutdown
62        self.shutdown_rx
63            .try_lock()
64            .map(|rx| *rx.borrow())
65            .unwrap_or(false)
66    }
67
68    /// Wait for shutdown signal.
69    ///
70    /// Use this in a `tokio::select!` to handle graceful shutdown:
71    ///
72    /// ```ignore
73    /// tokio::select! {
74    ///     _ = tokio::time::sleep(Duration::from_secs(60)) => {}
75    ///     _ = ctx.shutdown_signal() => break,
76    /// }
77    /// ```
78    pub async fn shutdown_signal(&self) {
79        let mut rx = self.shutdown_rx.lock().await;
80        // Wait until the value becomes true
81        while !*rx.borrow_and_update() {
82            if rx.changed().await.is_err() {
83                // Channel closed, treat as shutdown
84                break;
85            }
86        }
87    }
88
89    /// Send heartbeat to indicate daemon is alive.
90    pub async fn heartbeat(&self) -> crate::Result<()> {
91        sqlx::query(
92            r#"
93            UPDATE forge_daemons
94            SET last_heartbeat = NOW()
95            WHERE name = $1 AND instance_id = $2
96            "#,
97        )
98        .bind(&self.daemon_name)
99        .bind(self.instance_id)
100        .execute(&self.db_pool)
101        .await
102        .map_err(|e| crate::ForgeError::Database(e.to_string()))?;
103
104        Ok(())
105    }
106}
107
108impl EnvAccess for DaemonContext {
109    fn env_provider(&self) -> &dyn EnvProvider {
110        self.env_provider.as_ref()
111    }
112}
113
114#[cfg(test)]
115mod tests {
116    use super::*;
117
118    #[tokio::test]
119    async fn test_daemon_context_creation() {
120        let pool = sqlx::postgres::PgPoolOptions::new()
121            .max_connections(1)
122            .connect_lazy("postgres://localhost/nonexistent")
123            .expect("Failed to create mock pool");
124
125        let (shutdown_tx, shutdown_rx) = watch::channel(false);
126        let instance_id = Uuid::new_v4();
127
128        let ctx = DaemonContext::new(
129            "test_daemon".to_string(),
130            instance_id,
131            pool,
132            reqwest::Client::new(),
133            shutdown_rx,
134        );
135
136        assert_eq!(ctx.daemon_name, "test_daemon");
137        assert_eq!(ctx.instance_id, instance_id);
138        assert!(!ctx.is_shutdown_requested());
139
140        // Signal shutdown
141        shutdown_tx.send(true).unwrap();
142        assert!(ctx.is_shutdown_requested());
143    }
144
145    #[tokio::test]
146    async fn test_shutdown_signal() {
147        let pool = sqlx::postgres::PgPoolOptions::new()
148            .max_connections(1)
149            .connect_lazy("postgres://localhost/nonexistent")
150            .expect("Failed to create mock pool");
151
152        let (shutdown_tx, shutdown_rx) = watch::channel(false);
153
154        let ctx = DaemonContext::new(
155            "test_daemon".to_string(),
156            Uuid::new_v4(),
157            pool,
158            reqwest::Client::new(),
159            shutdown_rx,
160        );
161
162        // Spawn a task to signal shutdown after a delay
163        tokio::spawn(async move {
164            tokio::time::sleep(std::time::Duration::from_millis(50)).await;
165            shutdown_tx.send(true).unwrap();
166        });
167
168        // Wait for shutdown signal
169        tokio::time::timeout(std::time::Duration::from_millis(200), ctx.shutdown_signal())
170            .await
171            .expect("Shutdown signal should complete");
172
173        assert!(ctx.is_shutdown_requested());
174    }
175}