forge_core/daemon/
context.rs1use std::sync::Arc;
2
3use tokio::sync::{Mutex, watch};
4use tracing::Span;
5use uuid::Uuid;
6
7use crate::env::{EnvAccess, EnvProvider, RealEnvProvider};
8
9pub struct DaemonContext {
11 pub daemon_name: String,
13 pub instance_id: Uuid,
15 db_pool: sqlx::PgPool,
17 http_client: reqwest::Client,
19 shutdown_rx: Mutex<watch::Receiver<bool>>,
21 env_provider: Arc<dyn EnvProvider>,
23 span: Span,
25}
26
27impl DaemonContext {
28 pub fn new(
30 daemon_name: String,
31 instance_id: Uuid,
32 db_pool: sqlx::PgPool,
33 http_client: reqwest::Client,
34 shutdown_rx: watch::Receiver<bool>,
35 ) -> Self {
36 Self {
37 daemon_name,
38 instance_id,
39 db_pool,
40 http_client,
41 shutdown_rx: Mutex::new(shutdown_rx),
42 env_provider: Arc::new(RealEnvProvider::new()),
43 span: Span::current(),
44 }
45 }
46
47 pub fn with_env_provider(mut self, provider: Arc<dyn EnvProvider>) -> Self {
49 self.env_provider = provider;
50 self
51 }
52
53 pub fn db(&self) -> &sqlx::PgPool {
54 &self.db_pool
55 }
56
57 pub fn http(&self) -> &reqwest::Client {
58 &self.http_client
59 }
60
61 pub fn is_shutdown_requested(&self) -> bool {
63 self.shutdown_rx
65 .try_lock()
66 .map(|rx| *rx.borrow())
67 .unwrap_or(false)
68 }
69
70 pub async fn shutdown_signal(&self) {
81 let mut rx = self.shutdown_rx.lock().await;
82 while !*rx.borrow_and_update() {
84 if rx.changed().await.is_err() {
85 break;
87 }
88 }
89 }
90
91 pub async fn heartbeat(&self) -> crate::Result<()> {
93 tracing::trace!(daemon.name = %self.daemon_name, "Sending heartbeat");
94
95 sqlx::query(
96 r#"
97 UPDATE forge_daemons
98 SET last_heartbeat = NOW()
99 WHERE name = $1 AND instance_id = $2
100 "#,
101 )
102 .bind(&self.daemon_name)
103 .bind(self.instance_id)
104 .execute(&self.db_pool)
105 .await
106 .map_err(|e| crate::ForgeError::Database(e.to_string()))?;
107
108 Ok(())
109 }
110
111 pub fn trace_id(&self) -> String {
115 self.instance_id.to_string()
116 }
117
118 pub fn span(&self) -> &Span {
122 &self.span
123 }
124}
125
126impl EnvAccess for DaemonContext {
127 fn env_provider(&self) -> &dyn EnvProvider {
128 self.env_provider.as_ref()
129 }
130}
131
132#[cfg(test)]
133#[allow(clippy::unwrap_used, clippy::indexing_slicing)]
134mod tests {
135 use super::*;
136
137 #[tokio::test]
138 async fn test_daemon_context_creation() {
139 let pool = sqlx::postgres::PgPoolOptions::new()
140 .max_connections(1)
141 .connect_lazy("postgres://localhost/nonexistent")
142 .expect("Failed to create mock pool");
143
144 let (shutdown_tx, shutdown_rx) = watch::channel(false);
145 let instance_id = Uuid::new_v4();
146
147 let ctx = DaemonContext::new(
148 "test_daemon".to_string(),
149 instance_id,
150 pool,
151 reqwest::Client::new(),
152 shutdown_rx,
153 );
154
155 assert_eq!(ctx.daemon_name, "test_daemon");
156 assert_eq!(ctx.instance_id, instance_id);
157 assert!(!ctx.is_shutdown_requested());
158
159 shutdown_tx.send(true).unwrap();
161 assert!(ctx.is_shutdown_requested());
162 }
163
164 #[tokio::test]
165 async fn test_shutdown_signal() {
166 let pool = sqlx::postgres::PgPoolOptions::new()
167 .max_connections(1)
168 .connect_lazy("postgres://localhost/nonexistent")
169 .expect("Failed to create mock pool");
170
171 let (shutdown_tx, shutdown_rx) = watch::channel(false);
172
173 let ctx = DaemonContext::new(
174 "test_daemon".to_string(),
175 Uuid::new_v4(),
176 pool,
177 reqwest::Client::new(),
178 shutdown_rx,
179 );
180
181 tokio::spawn(async move {
183 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
184 shutdown_tx.send(true).unwrap();
185 });
186
187 tokio::time::timeout(std::time::Duration::from_millis(200), ctx.shutdown_signal())
189 .await
190 .expect("Shutdown signal should complete");
191
192 assert!(ctx.is_shutdown_requested());
193 }
194}