forge_core/daemon/
context.rs1use std::sync::Arc;
2use std::time::Duration;
3
4use tokio::sync::{Mutex, watch};
5use tracing::Span;
6use uuid::Uuid;
7
8use crate::env::{EnvAccess, EnvProvider, RealEnvProvider};
9use crate::function::{JobDispatch, WorkflowDispatch};
10use crate::http::CircuitBreakerClient;
11
12pub struct DaemonContext {
14 pub daemon_name: String,
16 pub instance_id: Uuid,
18 db_pool: sqlx::PgPool,
20 http_client: CircuitBreakerClient,
22 http_timeout: Option<Duration>,
25 shutdown_rx: Mutex<watch::Receiver<bool>>,
27 job_dispatch: Option<Arc<dyn JobDispatch>>,
29 workflow_dispatch: Option<Arc<dyn WorkflowDispatch>>,
31 env_provider: Arc<dyn EnvProvider>,
33 span: Span,
35}
36
37impl DaemonContext {
38 pub fn new(
40 daemon_name: String,
41 instance_id: Uuid,
42 db_pool: sqlx::PgPool,
43 http_client: CircuitBreakerClient,
44 shutdown_rx: watch::Receiver<bool>,
45 ) -> Self {
46 Self {
47 daemon_name,
48 instance_id,
49 db_pool,
50 http_client,
51 http_timeout: None,
52 shutdown_rx: Mutex::new(shutdown_rx),
53 job_dispatch: None,
54 workflow_dispatch: None,
55 env_provider: Arc::new(RealEnvProvider::new()),
56 span: Span::current(),
57 }
58 }
59
60 pub fn with_job_dispatch(mut self, dispatcher: Arc<dyn JobDispatch>) -> Self {
62 self.job_dispatch = Some(dispatcher);
63 self
64 }
65
66 pub fn with_workflow_dispatch(mut self, dispatcher: Arc<dyn WorkflowDispatch>) -> Self {
68 self.workflow_dispatch = Some(dispatcher);
69 self
70 }
71
72 pub fn with_env_provider(mut self, provider: Arc<dyn EnvProvider>) -> Self {
74 self.env_provider = provider;
75 self
76 }
77
78 pub fn db(&self) -> crate::function::ForgeDb {
79 crate::function::ForgeDb::from_pool(&self.db_pool)
80 }
81
82 pub async fn conn(&self) -> sqlx::Result<crate::function::ForgeConn<'static>> {
84 Ok(crate::function::ForgeConn::Pool(
85 self.db_pool.acquire().await?,
86 ))
87 }
88
89 pub fn http(&self) -> crate::http::HttpClient {
90 self.http_client.with_timeout(self.http_timeout)
91 }
92
93 pub fn raw_http(&self) -> &reqwest::Client {
94 self.http_client.inner()
95 }
96
97 pub fn http_with_circuit_breaker(&self) -> crate::http::HttpClient {
98 self.http()
99 }
100
101 pub fn set_http_timeout(&mut self, timeout: Option<Duration>) {
102 self.http_timeout = timeout;
103 }
104
105 pub async fn dispatch_job<T: serde::Serialize>(
107 &self,
108 job_type: &str,
109 args: T,
110 ) -> crate::Result<Uuid> {
111 let dispatcher = self.job_dispatch.as_ref().ok_or_else(|| {
112 crate::error::ForgeError::Internal("Job dispatch not available".to_string())
113 })?;
114
115 let args_json = serde_json::to_value(args)?;
116 dispatcher.dispatch_by_name(job_type, args_json, None).await
117 }
118
119 pub async fn start_workflow<T: serde::Serialize>(
121 &self,
122 workflow_name: &str,
123 input: T,
124 ) -> crate::Result<Uuid> {
125 let dispatcher = self.workflow_dispatch.as_ref().ok_or_else(|| {
126 crate::error::ForgeError::Internal("Workflow dispatch not available".to_string())
127 })?;
128
129 let input_json = serde_json::to_value(input)?;
130 dispatcher
131 .start_by_name(workflow_name, input_json, None)
132 .await
133 }
134
135 pub fn is_shutdown_requested(&self) -> bool {
137 self.shutdown_rx
139 .try_lock()
140 .map(|rx| *rx.borrow())
141 .unwrap_or(false)
142 }
143
144 pub async fn shutdown_signal(&self) {
155 let mut rx = self.shutdown_rx.lock().await;
156 while !*rx.borrow_and_update() {
158 if rx.changed().await.is_err() {
159 break;
161 }
162 }
163 }
164
165 pub async fn heartbeat(&self) -> crate::Result<()> {
167 tracing::trace!(daemon.name = %self.daemon_name, "Sending heartbeat");
168
169 sqlx::query(
170 r#"
171 UPDATE forge_daemons
172 SET last_heartbeat = NOW()
173 WHERE name = $1 AND instance_id = $2
174 "#,
175 )
176 .bind(&self.daemon_name)
177 .bind(self.instance_id)
178 .execute(&self.db_pool)
179 .await
180 .map_err(|e| crate::ForgeError::Database(e.to_string()))?;
181
182 Ok(())
183 }
184
185 pub fn trace_id(&self) -> String {
189 self.instance_id.to_string()
190 }
191
192 pub fn span(&self) -> &Span {
196 &self.span
197 }
198}
199
200impl EnvAccess for DaemonContext {
201 fn env_provider(&self) -> &dyn EnvProvider {
202 self.env_provider.as_ref()
203 }
204}
205
206#[cfg(test)]
207#[allow(clippy::unwrap_used, clippy::indexing_slicing)]
208mod tests {
209 use super::*;
210
211 #[tokio::test]
212 async fn test_daemon_context_creation() {
213 let pool = sqlx::postgres::PgPoolOptions::new()
214 .max_connections(1)
215 .connect_lazy("postgres://localhost/nonexistent")
216 .expect("Failed to create mock pool");
217
218 let (shutdown_tx, shutdown_rx) = watch::channel(false);
219 let instance_id = Uuid::new_v4();
220
221 let ctx = DaemonContext::new(
222 "test_daemon".to_string(),
223 instance_id,
224 pool,
225 CircuitBreakerClient::with_defaults(reqwest::Client::new()),
226 shutdown_rx,
227 );
228
229 assert_eq!(ctx.daemon_name, "test_daemon");
230 assert_eq!(ctx.instance_id, instance_id);
231 assert!(!ctx.is_shutdown_requested());
232
233 shutdown_tx.send(true).unwrap();
235 assert!(ctx.is_shutdown_requested());
236 }
237
238 #[tokio::test]
239 async fn test_shutdown_signal() {
240 let pool = sqlx::postgres::PgPoolOptions::new()
241 .max_connections(1)
242 .connect_lazy("postgres://localhost/nonexistent")
243 .expect("Failed to create mock pool");
244
245 let (shutdown_tx, shutdown_rx) = watch::channel(false);
246
247 let ctx = DaemonContext::new(
248 "test_daemon".to_string(),
249 Uuid::new_v4(),
250 pool,
251 CircuitBreakerClient::with_defaults(reqwest::Client::new()),
252 shutdown_rx,
253 );
254
255 tokio::spawn(async move {
257 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
258 shutdown_tx.send(true).unwrap();
259 });
260
261 tokio::time::timeout(std::time::Duration::from_millis(200), ctx.shutdown_signal())
263 .await
264 .expect("Shutdown signal should complete");
265
266 assert!(ctx.is_shutdown_requested());
267 }
268}