forge_core/daemon/
context.rs1use std::sync::Arc;
2use std::time::Duration;
3
4use tokio::sync::{Mutex, watch};
5use tracing::Span;
6use uuid::Uuid;
7
8use crate::env::{EnvAccess, EnvProvider, RealEnvProvider};
9use crate::function::{JobDispatch, WorkflowDispatch};
10use crate::http::CircuitBreakerClient;
11
12pub struct DaemonContext {
14 pub daemon_name: String,
16 pub instance_id: Uuid,
18 db_pool: sqlx::PgPool,
20 http_client: CircuitBreakerClient,
22 http_timeout: Option<Duration>,
25 shutdown_rx: Mutex<watch::Receiver<bool>>,
27 job_dispatch: Option<Arc<dyn JobDispatch>>,
29 workflow_dispatch: Option<Arc<dyn WorkflowDispatch>>,
31 env_provider: Arc<dyn EnvProvider>,
33 span: Span,
35}
36
37impl DaemonContext {
38 pub fn new(
40 daemon_name: String,
41 instance_id: Uuid,
42 db_pool: sqlx::PgPool,
43 http_client: CircuitBreakerClient,
44 shutdown_rx: watch::Receiver<bool>,
45 ) -> Self {
46 Self {
47 daemon_name,
48 instance_id,
49 db_pool,
50 http_client,
51 http_timeout: None,
52 shutdown_rx: Mutex::new(shutdown_rx),
53 job_dispatch: None,
54 workflow_dispatch: None,
55 env_provider: Arc::new(RealEnvProvider::new()),
56 span: Span::current(),
57 }
58 }
59
60 pub fn with_job_dispatch(mut self, dispatcher: Arc<dyn JobDispatch>) -> Self {
62 self.job_dispatch = Some(dispatcher);
63 self
64 }
65
66 pub fn with_workflow_dispatch(mut self, dispatcher: Arc<dyn WorkflowDispatch>) -> Self {
68 self.workflow_dispatch = Some(dispatcher);
69 self
70 }
71
72 pub fn with_env_provider(mut self, provider: Arc<dyn EnvProvider>) -> Self {
74 self.env_provider = provider;
75 self
76 }
77
78 pub fn db(&self) -> crate::function::ForgeDb {
79 crate::function::ForgeDb::from_pool(&self.db_pool)
80 }
81
82 pub async fn conn(&self) -> sqlx::Result<crate::function::ForgeConn<'static>> {
84 Ok(crate::function::ForgeConn::Pool(
85 self.db_pool.acquire().await?,
86 ))
87 }
88
89 pub fn http(&self) -> crate::http::HttpClient {
90 self.http_client.with_timeout(self.http_timeout)
91 }
92
93 pub fn raw_http(&self) -> &reqwest::Client {
94 self.http_client.inner()
95 }
96
97 pub fn set_http_timeout(&mut self, timeout: Option<Duration>) {
98 self.http_timeout = timeout;
99 }
100
101 pub async fn dispatch_job<T: serde::Serialize>(
103 &self,
104 job_type: &str,
105 args: T,
106 ) -> crate::Result<Uuid> {
107 let dispatcher = self.job_dispatch.as_ref().ok_or_else(|| {
108 crate::error::ForgeError::Internal("Job dispatch not available".to_string())
109 })?;
110
111 let args_json = serde_json::to_value(args)?;
112 dispatcher.dispatch_by_name(job_type, args_json, None).await
113 }
114
115 pub async fn start_workflow<T: serde::Serialize>(
117 &self,
118 workflow_name: &str,
119 input: T,
120 ) -> crate::Result<Uuid> {
121 let dispatcher = self.workflow_dispatch.as_ref().ok_or_else(|| {
122 crate::error::ForgeError::Internal("Workflow dispatch not available".to_string())
123 })?;
124
125 let input_json = serde_json::to_value(input)?;
126 dispatcher
127 .start_by_name(workflow_name, input_json, None)
128 .await
129 }
130
131 pub fn is_shutdown_requested(&self) -> bool {
133 self.shutdown_rx
135 .try_lock()
136 .map(|rx| *rx.borrow())
137 .unwrap_or(false)
138 }
139
140 pub async fn shutdown_signal(&self) {
151 let mut rx = self.shutdown_rx.lock().await;
152 while !*rx.borrow_and_update() {
154 if rx.changed().await.is_err() {
155 break;
157 }
158 }
159 }
160
161 pub async fn heartbeat(&self) -> crate::Result<()> {
163 tracing::trace!(daemon.name = %self.daemon_name, "Sending heartbeat");
164
165 sqlx::query!(
166 r#"
167 UPDATE forge_daemons
168 SET last_heartbeat = NOW()
169 WHERE name = $1 AND instance_id = $2
170 "#,
171 self.daemon_name,
172 self.instance_id,
173 )
174 .execute(&self.db_pool)
175 .await
176 .map_err(|e| crate::ForgeError::Database(e.to_string()))?;
177
178 Ok(())
179 }
180
181 pub fn trace_id(&self) -> String {
185 self.instance_id.to_string()
186 }
187
188 pub fn span(&self) -> &Span {
192 &self.span
193 }
194}
195
196impl EnvAccess for DaemonContext {
197 fn env_provider(&self) -> &dyn EnvProvider {
198 self.env_provider.as_ref()
199 }
200}
201
202#[cfg(test)]
203#[allow(clippy::unwrap_used, clippy::indexing_slicing)]
204mod tests {
205 use super::*;
206
207 #[tokio::test]
208 async fn test_daemon_context_creation() {
209 let pool = sqlx::postgres::PgPoolOptions::new()
210 .max_connections(1)
211 .connect_lazy("postgres://localhost/nonexistent")
212 .expect("Failed to create mock pool");
213
214 let (shutdown_tx, shutdown_rx) = watch::channel(false);
215 let instance_id = Uuid::new_v4();
216
217 let ctx = DaemonContext::new(
218 "test_daemon".to_string(),
219 instance_id,
220 pool,
221 CircuitBreakerClient::with_defaults(reqwest::Client::new()),
222 shutdown_rx,
223 );
224
225 assert_eq!(ctx.daemon_name, "test_daemon");
226 assert_eq!(ctx.instance_id, instance_id);
227 assert!(!ctx.is_shutdown_requested());
228
229 shutdown_tx.send(true).unwrap();
231 assert!(ctx.is_shutdown_requested());
232 }
233
234 #[tokio::test]
235 async fn test_shutdown_signal() {
236 let pool = sqlx::postgres::PgPoolOptions::new()
237 .max_connections(1)
238 .connect_lazy("postgres://localhost/nonexistent")
239 .expect("Failed to create mock pool");
240
241 let (shutdown_tx, shutdown_rx) = watch::channel(false);
242
243 let ctx = DaemonContext::new(
244 "test_daemon".to_string(),
245 Uuid::new_v4(),
246 pool,
247 CircuitBreakerClient::with_defaults(reqwest::Client::new()),
248 shutdown_rx,
249 );
250
251 tokio::spawn(async move {
253 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
254 shutdown_tx.send(true).unwrap();
255 });
256
257 tokio::time::timeout(std::time::Duration::from_millis(200), ctx.shutdown_signal())
259 .await
260 .expect("Shutdown signal should complete");
261
262 assert!(ctx.is_shutdown_requested());
263 }
264}