1use std::collections::HashMap;
2use std::sync::Arc;
3use std::time::Duration;
4
5use sqlx::{Postgres, Transaction};
6use tokio::sync::Mutex as AsyncMutex;
7use uuid::Uuid;
8
9use crate::env::{EnvAccess, EnvProvider, RealEnvProvider};
10use crate::function::{JobDispatch, KvHandle, WorkflowDispatch};
11use crate::http::CircuitBreakerClient;
12
13pub type WebhookTxHandle = Arc<AsyncMutex<Option<Transaction<'static, Postgres>>>>;
19
20#[non_exhaustive]
22pub struct WebhookContext {
23 pub webhook_name: String,
24 pub request_id: String,
25 pub idempotency_key: Option<String>,
26 headers: HashMap<String, String>,
28 db_pool: sqlx::PgPool,
29 tx: Option<WebhookTxHandle>,
32 http_client: CircuitBreakerClient,
33 http_timeout: Option<Duration>,
35 job_dispatch: Option<Arc<dyn JobDispatch>>,
36 workflow_dispatch: Option<Arc<dyn WorkflowDispatch>>,
37 env_provider: Arc<dyn EnvProvider>,
38 kv: Option<Arc<dyn KvHandle>>,
39}
40
41impl WebhookContext {
42 pub fn new(
44 webhook_name: String,
45 request_id: String,
46 headers: HashMap<String, String>,
47 db_pool: sqlx::PgPool,
48 http_client: CircuitBreakerClient,
49 ) -> Self {
50 Self {
51 webhook_name,
52 request_id,
53 idempotency_key: None,
54 headers,
55 db_pool,
56 tx: None,
57 http_client,
58 http_timeout: None,
59 job_dispatch: None,
60 workflow_dispatch: None,
61 env_provider: Arc::new(RealEnvProvider::new()),
62 kv: None,
63 }
64 }
65
66 pub fn with_transaction(
73 webhook_name: String,
74 request_id: String,
75 headers: HashMap<String, String>,
76 db_pool: sqlx::PgPool,
77 tx: Transaction<'static, Postgres>,
78 http_client: CircuitBreakerClient,
79 ) -> (Self, WebhookTxHandle) {
80 let handle: WebhookTxHandle = Arc::new(AsyncMutex::new(Some(tx)));
81 let ctx = Self {
82 webhook_name,
83 request_id,
84 idempotency_key: None,
85 headers,
86 db_pool,
87 tx: Some(handle.clone()),
88 http_client,
89 http_timeout: None,
90 job_dispatch: None,
91 workflow_dispatch: None,
92 env_provider: Arc::new(RealEnvProvider::new()),
93 kv: None,
94 };
95 (ctx, handle)
96 }
97
98 pub fn is_transactional(&self) -> bool {
100 self.tx.is_some()
101 }
102
103 pub fn with_kv(mut self, kv: Arc<dyn KvHandle>) -> Self {
106 self.kv = Some(kv);
107 self
108 }
109
110 pub fn kv(&self) -> crate::error::Result<&dyn KvHandle> {
112 self.kv
113 .as_deref()
114 .ok_or_else(|| crate::error::ForgeError::internal("KV store not available"))
115 }
116
117 pub fn with_idempotency_key(mut self, key: Option<String>) -> Self {
119 self.idempotency_key = key;
120 self
121 }
122
123 pub fn with_job_dispatch(mut self, dispatcher: Arc<dyn JobDispatch>) -> Self {
125 self.job_dispatch = Some(dispatcher);
126 self
127 }
128
129 pub fn with_workflow_dispatch(mut self, dispatcher: Arc<dyn WorkflowDispatch>) -> Self {
131 self.workflow_dispatch = Some(dispatcher);
132 self
133 }
134
135 pub fn with_env_provider(mut self, provider: Arc<dyn EnvProvider>) -> Self {
137 self.env_provider = provider;
138 self
139 }
140
141 pub fn db(&self) -> crate::function::ForgeDb {
143 crate::function::ForgeDb::from_pool(&self.db_pool)
144 }
145
146 pub fn db_conn(&self) -> crate::function::DbConn<'_> {
151 match &self.tx {
152 Some(tx) => crate::function::DbConn::Transaction(tx.clone(), &self.db_pool),
153 None => crate::function::DbConn::Pool(self.db_pool.clone()),
154 }
155 }
156
157 pub async fn conn(&self) -> sqlx::Result<crate::function::ForgeConn<'_>> {
163 match &self.tx {
164 Some(tx) => Ok(crate::function::ForgeConn::Tx(tx.lock().await)),
165 None => Ok(crate::function::ForgeConn::Pool(
166 self.db_pool.acquire().await?,
167 )),
168 }
169 }
170
171 pub fn http(&self) -> crate::http::HttpClient {
173 self.http_client.with_timeout(self.http_timeout)
174 }
175
176 pub fn raw_http(&self) -> &reqwest::Client {
178 self.http_client.inner()
179 }
180
181 pub fn set_http_timeout(&mut self, timeout: Option<Duration>) {
182 self.http_timeout = timeout;
183 }
184
185 pub fn header(&self, name: &str) -> Option<&str> {
189 self.headers.get(&name.to_lowercase()).map(|s| s.as_str())
190 }
191
192 pub fn headers(&self) -> &HashMap<String, String> {
194 &self.headers
195 }
196
197 pub async fn dispatch_job<T: serde::Serialize>(
211 &self,
212 job_type: &str,
213 args: T,
214 ) -> crate::error::Result<Uuid> {
215 let dispatcher = self
216 .job_dispatch
217 .as_ref()
218 .ok_or_else(|| crate::error::ForgeError::internal("Job dispatch not available"))?;
219 let args_json = serde_json::to_value(args)?;
220
221 if let Some(tx) = &self.tx {
222 let mut guard = tx.lock().await;
223 let conn = guard.as_mut().ok_or_else(|| {
224 crate::error::ForgeError::internal("Transaction already taken; cannot dispatch job")
225 })?;
226 return dispatcher
227 .dispatch_in_conn(conn, job_type, args_json, None, None)
228 .await;
229 }
230
231 dispatcher
232 .dispatch_by_name(job_type, args_json, None, None)
233 .await
234 }
235
236 pub async fn dispatch<J: crate::ForgeJob>(&self, args: J::Args) -> crate::error::Result<Uuid> {
239 self.dispatch_job(J::info().name, args).await
240 }
241
242 pub async fn start_workflow<T: serde::Serialize>(
244 &self,
245 workflow_name: &str,
246 input: T,
247 ) -> crate::error::Result<Uuid> {
248 let dispatcher = self
249 .workflow_dispatch
250 .as_ref()
251 .ok_or_else(|| crate::error::ForgeError::internal("Workflow dispatch not available"))?;
252 let input_json = serde_json::to_value(input)?;
253
254 if let Some(tx) = &self.tx {
255 let mut guard = tx.lock().await;
256 let conn = guard.as_mut().ok_or_else(|| {
257 crate::error::ForgeError::internal(
258 "Transaction already taken; cannot start workflow",
259 )
260 })?;
261 return dispatcher
262 .start_in_conn(conn, workflow_name, input_json, None, None)
263 .await;
264 }
265
266 dispatcher
267 .start_by_name(workflow_name, input_json, None, None)
268 .await
269 }
270
271 pub async fn start<W: crate::ForgeWorkflow>(
273 &self,
274 input: W::Input,
275 ) -> crate::error::Result<Uuid> {
276 self.start_workflow(W::info().name, input).await
277 }
278
279 pub async fn cancel_job(
281 &self,
282 job_id: Uuid,
283 reason: Option<String>,
284 ) -> crate::error::Result<bool> {
285 let dispatcher = self
286 .job_dispatch
287 .as_ref()
288 .ok_or_else(|| crate::error::ForgeError::internal("Job dispatch not available"))?;
289 dispatcher.cancel(job_id, reason).await
290 }
291}
292
293impl EnvAccess for WebhookContext {
294 fn env_provider(&self) -> &dyn EnvProvider {
295 self.env_provider.as_ref()
296 }
297}
298
299#[cfg(test)]
300#[allow(clippy::unwrap_used, clippy::indexing_slicing)]
301mod tests {
302 use super::*;
303
304 #[tokio::test]
305 async fn test_webhook_context_creation() {
306 let pool = sqlx::postgres::PgPoolOptions::new()
307 .max_connections(1)
308 .connect_lazy("postgres://localhost/nonexistent")
309 .expect("Failed to create mock pool");
310
311 let mut headers = HashMap::new();
312 headers.insert("x-github-event".to_string(), "push".to_string());
313 headers.insert("x-github-delivery".to_string(), "abc-123".to_string());
314
315 let ctx = WebhookContext::new(
316 "github_webhook".to_string(),
317 "req-123".to_string(),
318 headers,
319 pool,
320 CircuitBreakerClient::with_defaults(reqwest::Client::new()),
321 )
322 .with_idempotency_key(Some("abc-123".to_string()));
323
324 assert_eq!(ctx.webhook_name, "github_webhook");
325 assert_eq!(ctx.request_id, "req-123");
326 assert_eq!(ctx.idempotency_key, Some("abc-123".to_string()));
327 assert_eq!(ctx.header("X-GitHub-Event"), Some("push"));
328 assert_eq!(ctx.header("x-github-event"), Some("push")); assert!(ctx.header("nonexistent").is_none());
330 }
331}