forge_core/daemon/
context.rs1use std::sync::Arc;
2
3use tokio::sync::{Mutex, watch};
4use uuid::Uuid;
5
6use crate::env::{EnvAccess, EnvProvider, RealEnvProvider};
7
8pub struct DaemonContext {
10 pub daemon_name: String,
12 pub instance_id: Uuid,
14 db_pool: sqlx::PgPool,
16 http_client: reqwest::Client,
18 shutdown_rx: Mutex<watch::Receiver<bool>>,
20 env_provider: Arc<dyn EnvProvider>,
22}
23
24impl DaemonContext {
25 pub fn new(
27 daemon_name: String,
28 instance_id: Uuid,
29 db_pool: sqlx::PgPool,
30 http_client: reqwest::Client,
31 shutdown_rx: watch::Receiver<bool>,
32 ) -> Self {
33 Self {
34 daemon_name,
35 instance_id,
36 db_pool,
37 http_client,
38 shutdown_rx: Mutex::new(shutdown_rx),
39 env_provider: Arc::new(RealEnvProvider::new()),
40 }
41 }
42
43 pub fn with_env_provider(mut self, provider: Arc<dyn EnvProvider>) -> Self {
45 self.env_provider = provider;
46 self
47 }
48
49 pub fn db(&self) -> &sqlx::PgPool {
51 &self.db_pool
52 }
53
54 pub fn http(&self) -> &reqwest::Client {
56 &self.http_client
57 }
58
59 pub fn is_shutdown_requested(&self) -> bool {
61 self.shutdown_rx
63 .try_lock()
64 .map(|rx| *rx.borrow())
65 .unwrap_or(false)
66 }
67
68 pub async fn shutdown_signal(&self) {
79 let mut rx = self.shutdown_rx.lock().await;
80 while !*rx.borrow_and_update() {
82 if rx.changed().await.is_err() {
83 break;
85 }
86 }
87 }
88
89 pub async fn heartbeat(&self) -> crate::Result<()> {
91 sqlx::query(
92 r#"
93 UPDATE forge_daemons
94 SET last_heartbeat = NOW()
95 WHERE name = $1 AND instance_id = $2
96 "#,
97 )
98 .bind(&self.daemon_name)
99 .bind(self.instance_id)
100 .execute(&self.db_pool)
101 .await
102 .map_err(|e| crate::ForgeError::Database(e.to_string()))?;
103
104 Ok(())
105 }
106}
107
108impl EnvAccess for DaemonContext {
109 fn env_provider(&self) -> &dyn EnvProvider {
110 self.env_provider.as_ref()
111 }
112}
113
114#[cfg(test)]
115mod tests {
116 use super::*;
117
118 #[tokio::test]
119 async fn test_daemon_context_creation() {
120 let pool = sqlx::postgres::PgPoolOptions::new()
121 .max_connections(1)
122 .connect_lazy("postgres://localhost/nonexistent")
123 .expect("Failed to create mock pool");
124
125 let (shutdown_tx, shutdown_rx) = watch::channel(false);
126 let instance_id = Uuid::new_v4();
127
128 let ctx = DaemonContext::new(
129 "test_daemon".to_string(),
130 instance_id,
131 pool,
132 reqwest::Client::new(),
133 shutdown_rx,
134 );
135
136 assert_eq!(ctx.daemon_name, "test_daemon");
137 assert_eq!(ctx.instance_id, instance_id);
138 assert!(!ctx.is_shutdown_requested());
139
140 shutdown_tx.send(true).unwrap();
142 assert!(ctx.is_shutdown_requested());
143 }
144
145 #[tokio::test]
146 async fn test_shutdown_signal() {
147 let pool = sqlx::postgres::PgPoolOptions::new()
148 .max_connections(1)
149 .connect_lazy("postgres://localhost/nonexistent")
150 .expect("Failed to create mock pool");
151
152 let (shutdown_tx, shutdown_rx) = watch::channel(false);
153
154 let ctx = DaemonContext::new(
155 "test_daemon".to_string(),
156 Uuid::new_v4(),
157 pool,
158 reqwest::Client::new(),
159 shutdown_rx,
160 );
161
162 tokio::spawn(async move {
164 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
165 shutdown_tx.send(true).unwrap();
166 });
167
168 tokio::time::timeout(std::time::Duration::from_millis(200), ctx.shutdown_signal())
170 .await
171 .expect("Shutdown signal should complete");
172
173 assert!(ctx.is_shutdown_requested());
174 }
175}