Skip to main content

forge_core/webhook/
context.rs

1use std::collections::HashMap;
2use std::sync::Arc;
3use std::time::Duration;
4
5use sqlx::{Postgres, Transaction};
6use tokio::sync::Mutex as AsyncMutex;
7use uuid::Uuid;
8
9use crate::env::{EnvAccess, EnvProvider, RealEnvProvider};
10use crate::function::{JobDispatch, KvHandle, WorkflowDispatch};
11use crate::http::CircuitBreakerClient;
12
13/// Shared handle to the webhook's active transaction.
14///
15/// The runtime keeps ownership and commits/rolls back; the handler dispatches
16/// jobs and workflows on the same connection so they roll back with the
17/// webhook on error.
18pub type WebhookTxHandle = Arc<AsyncMutex<Option<Transaction<'static, Postgres>>>>;
19
20/// Context available to webhook handlers.
21#[non_exhaustive]
22pub struct WebhookContext {
23    pub webhook_name: String,
24    pub request_id: String,
25    pub idempotency_key: Option<String>,
26    /// Lowercase keys.
27    headers: HashMap<String, String>,
28    db_pool: sqlx::PgPool,
29    /// Held by the runtime; handler dispatches piggy-back on this connection
30    /// so they roll back with the webhook on error.
31    tx: Option<WebhookTxHandle>,
32    http_client: CircuitBreakerClient,
33    /// `None` means unlimited.
34    http_timeout: Option<Duration>,
35    job_dispatch: Option<Arc<dyn JobDispatch>>,
36    workflow_dispatch: Option<Arc<dyn WorkflowDispatch>>,
37    env_provider: Arc<dyn EnvProvider>,
38    kv: Option<Arc<dyn KvHandle>>,
39}
40
41impl WebhookContext {
42    /// Create a new webhook context.
43    pub fn new(
44        webhook_name: String,
45        request_id: String,
46        headers: HashMap<String, String>,
47        db_pool: sqlx::PgPool,
48        http_client: CircuitBreakerClient,
49    ) -> Self {
50        Self {
51            webhook_name,
52            request_id,
53            idempotency_key: None,
54            headers,
55            db_pool,
56            tx: None,
57            http_client,
58            http_timeout: None,
59            job_dispatch: None,
60            workflow_dispatch: None,
61            env_provider: Arc::new(RealEnvProvider::new()),
62            kv: None,
63        }
64    }
65
66    /// Build a webhook context that dispatches jobs and workflows on `tx`.
67    ///
68    /// The runtime opens the transaction, owns the handle, and commits or
69    /// rolls back based on the handler's result. Anything the handler
70    /// dispatches lands on the same connection, so failed webhooks don't
71    /// leave orphaned jobs behind.
72    pub fn with_transaction(
73        webhook_name: String,
74        request_id: String,
75        headers: HashMap<String, String>,
76        db_pool: sqlx::PgPool,
77        tx: Transaction<'static, Postgres>,
78        http_client: CircuitBreakerClient,
79    ) -> (Self, WebhookTxHandle) {
80        let handle: WebhookTxHandle = Arc::new(AsyncMutex::new(Some(tx)));
81        let ctx = Self {
82            webhook_name,
83            request_id,
84            idempotency_key: None,
85            headers,
86            db_pool,
87            tx: Some(handle.clone()),
88            http_client,
89            http_timeout: None,
90            job_dispatch: None,
91            workflow_dispatch: None,
92            env_provider: Arc::new(RealEnvProvider::new()),
93            kv: None,
94        };
95        (ctx, handle)
96    }
97
98    /// Whether dispatches will participate in an enclosing transaction.
99    pub fn is_transactional(&self) -> bool {
100        self.tx.is_some()
101    }
102
103    /// Attach a KV store handle. Called by the runtime before handing the
104    /// context to the handler.
105    pub fn with_kv(mut self, kv: Arc<dyn KvHandle>) -> Self {
106        self.kv = Some(kv);
107        self
108    }
109
110    /// Access the KV store.
111    pub fn kv(&self) -> crate::error::Result<&dyn KvHandle> {
112        self.kv
113            .as_deref()
114            .ok_or_else(|| crate::error::ForgeError::internal("KV store not available"))
115    }
116
117    /// Set idempotency key.
118    pub fn with_idempotency_key(mut self, key: Option<String>) -> Self {
119        self.idempotency_key = key;
120        self
121    }
122
123    /// Set job dispatcher.
124    pub fn with_job_dispatch(mut self, dispatcher: Arc<dyn JobDispatch>) -> Self {
125        self.job_dispatch = Some(dispatcher);
126        self
127    }
128
129    /// Set workflow dispatcher.
130    pub fn with_workflow_dispatch(mut self, dispatcher: Arc<dyn WorkflowDispatch>) -> Self {
131        self.workflow_dispatch = Some(dispatcher);
132        self
133    }
134
135    /// Set environment provider.
136    pub fn with_env_provider(mut self, provider: Arc<dyn EnvProvider>) -> Self {
137        self.env_provider = provider;
138        self
139    }
140
141    /// Get database pool.
142    pub fn db(&self) -> crate::function::ForgeDb {
143        crate::function::ForgeDb::from_pool(&self.db_pool)
144    }
145
146    /// Get a `DbConn` for use in shared helper functions.
147    ///
148    /// In transactional mode, returns a transaction-backed handle. Outside
149    /// a transaction, returns a pool-backed handle.
150    pub fn db_conn(&self) -> crate::function::DbConn<'_> {
151        match &self.tx {
152            Some(tx) => crate::function::DbConn::Transaction(tx.clone(), &self.db_pool),
153            None => crate::function::DbConn::Pool(self.db_pool.clone()),
154        }
155    }
156
157    /// Acquire a connection compatible with sqlx compile-time checked macros.
158    ///
159    /// In transactional mode, returns a guard over the active transaction so
160    /// the handler's queries participate in the same transaction as its
161    /// dispatches. Outside a transaction, acquires a fresh pool connection.
162    pub async fn conn(&self) -> sqlx::Result<crate::function::ForgeConn<'_>> {
163        match &self.tx {
164            Some(tx) => Ok(crate::function::ForgeConn::Tx(tx.lock().await)),
165            None => Ok(crate::function::ForgeConn::Pool(
166                self.db_pool.acquire().await?,
167            )),
168        }
169    }
170
171    /// Get the HTTP client for external requests.
172    pub fn http(&self) -> crate::http::HttpClient {
173        self.http_client.with_timeout(self.http_timeout)
174    }
175
176    /// Get the raw reqwest client, bypassing circuit breaker execution.
177    pub fn raw_http(&self) -> &reqwest::Client {
178        self.http_client.inner()
179    }
180
181    pub fn set_http_timeout(&mut self, timeout: Option<Duration>) {
182        self.http_timeout = timeout;
183    }
184
185    /// Get a request header value.
186    ///
187    /// Header names are case-insensitive.
188    pub fn header(&self, name: &str) -> Option<&str> {
189        self.headers.get(&name.to_lowercase()).map(|s| s.as_str())
190    }
191
192    /// Get all headers.
193    pub fn headers(&self) -> &HashMap<String, String> {
194        &self.headers
195    }
196
197    /// Dispatch a background job for async processing.
198    ///
199    /// This is the recommended way to handle webhook events:
200    /// 1. Validate the webhook signature
201    /// 2. Dispatch a job to process the event
202    /// 3. Return 202 Accepted immediately
203    ///
204    /// # Arguments
205    /// * `job_type` - The registered name of the job type
206    /// * `args` - The arguments for the job (will be serialized to JSON)
207    ///
208    /// # Returns
209    /// The UUID of the dispatched job, or an error if dispatch is not available.
210    pub async fn dispatch_job<T: serde::Serialize>(
211        &self,
212        job_type: &str,
213        args: T,
214    ) -> crate::error::Result<Uuid> {
215        let dispatcher = self
216            .job_dispatch
217            .as_ref()
218            .ok_or_else(|| crate::error::ForgeError::internal("Job dispatch not available"))?;
219        let args_json = serde_json::to_value(args)?;
220
221        if let Some(tx) = &self.tx {
222            let mut guard = tx.lock().await;
223            let conn = guard.as_mut().ok_or_else(|| {
224                crate::error::ForgeError::internal("Transaction already taken; cannot dispatch job")
225            })?;
226            return dispatcher
227                .dispatch_in_conn(conn, job_type, args_json, None, None)
228                .await;
229        }
230
231        dispatcher
232            .dispatch_by_name(job_type, args_json, None, None)
233            .await
234    }
235
236    /// Type-safe dispatch: resolves the job name from the type's `ForgeJob`
237    /// impl and serializes the args at the call site.
238    pub async fn dispatch<J: crate::ForgeJob>(&self, args: J::Args) -> crate::error::Result<Uuid> {
239        self.dispatch_job(J::info().name, args).await
240    }
241
242    /// Start a workflow.
243    pub async fn start_workflow<T: serde::Serialize>(
244        &self,
245        workflow_name: &str,
246        input: T,
247    ) -> crate::error::Result<Uuid> {
248        let dispatcher = self
249            .workflow_dispatch
250            .as_ref()
251            .ok_or_else(|| crate::error::ForgeError::internal("Workflow dispatch not available"))?;
252        let input_json = serde_json::to_value(input)?;
253
254        if let Some(tx) = &self.tx {
255            let mut guard = tx.lock().await;
256            let conn = guard.as_mut().ok_or_else(|| {
257                crate::error::ForgeError::internal(
258                    "Transaction already taken; cannot start workflow",
259                )
260            })?;
261            return dispatcher
262                .start_in_conn(conn, workflow_name, input_json, None, None)
263                .await;
264        }
265
266        dispatcher
267            .start_by_name(workflow_name, input_json, None, None)
268            .await
269    }
270
271    /// Type-safe workflow start.
272    pub async fn start<W: crate::ForgeWorkflow>(
273        &self,
274        input: W::Input,
275    ) -> crate::error::Result<Uuid> {
276        self.start_workflow(W::info().name, input).await
277    }
278
279    /// Request cancellation for a job.
280    pub async fn cancel_job(
281        &self,
282        job_id: Uuid,
283        reason: Option<String>,
284    ) -> crate::error::Result<bool> {
285        let dispatcher = self
286            .job_dispatch
287            .as_ref()
288            .ok_or_else(|| crate::error::ForgeError::internal("Job dispatch not available"))?;
289        dispatcher.cancel(job_id, reason).await
290    }
291}
292
293impl EnvAccess for WebhookContext {
294    fn env_provider(&self) -> &dyn EnvProvider {
295        self.env_provider.as_ref()
296    }
297}
298
299#[cfg(test)]
300#[allow(clippy::unwrap_used, clippy::indexing_slicing)]
301mod tests {
302    use super::*;
303
304    #[tokio::test]
305    async fn test_webhook_context_creation() {
306        let pool = sqlx::postgres::PgPoolOptions::new()
307            .max_connections(1)
308            .connect_lazy("postgres://localhost/nonexistent")
309            .expect("Failed to create mock pool");
310
311        let mut headers = HashMap::new();
312        headers.insert("x-github-event".to_string(), "push".to_string());
313        headers.insert("x-github-delivery".to_string(), "abc-123".to_string());
314
315        let ctx = WebhookContext::new(
316            "github_webhook".to_string(),
317            "req-123".to_string(),
318            headers,
319            pool,
320            CircuitBreakerClient::with_defaults(reqwest::Client::new()),
321        )
322        .with_idempotency_key(Some("abc-123".to_string()));
323
324        assert_eq!(ctx.webhook_name, "github_webhook");
325        assert_eq!(ctx.request_id, "req-123");
326        assert_eq!(ctx.idempotency_key, Some("abc-123".to_string()));
327        assert_eq!(ctx.header("X-GitHub-Event"), Some("push"));
328        assert_eq!(ctx.header("x-github-event"), Some("push")); // case-insensitive
329        assert!(ctx.header("nonexistent").is_none());
330    }
331}