Skip to main content

forge_core/webhook/
context.rs

1use std::collections::HashMap;
2use std::sync::Arc;
3use std::time::Duration;
4
5use uuid::Uuid;
6
7use crate::env::{EnvAccess, EnvProvider, RealEnvProvider};
8use crate::function::JobDispatch;
9use crate::http::CircuitBreakerClient;
10
11/// Context available to webhook handlers.
12pub struct WebhookContext {
13    /// Webhook name.
14    pub webhook_name: String,
15    /// Unique request ID for this webhook invocation.
16    pub request_id: String,
17    /// Idempotency key if extracted from request.
18    pub idempotency_key: Option<String>,
19    /// Request headers (lowercase keys).
20    headers: HashMap<String, String>,
21    /// Database pool.
22    db_pool: sqlx::PgPool,
23    /// HTTP client for external calls.
24    http_client: CircuitBreakerClient,
25    /// Default timeout for outbound HTTP requests made through the
26    /// circuit-breaker client. `None` means unlimited.
27    http_timeout: Option<Duration>,
28    /// Job dispatcher for async processing.
29    job_dispatch: Option<Arc<dyn JobDispatch>>,
30    /// Environment variable provider.
31    env_provider: Arc<dyn EnvProvider>,
32}
33
34impl WebhookContext {
35    /// Create a new webhook context.
36    pub fn new(
37        webhook_name: String,
38        request_id: String,
39        headers: HashMap<String, String>,
40        db_pool: sqlx::PgPool,
41        http_client: CircuitBreakerClient,
42    ) -> Self {
43        Self {
44            webhook_name,
45            request_id,
46            idempotency_key: None,
47            headers,
48            db_pool,
49            http_client,
50            http_timeout: None,
51            job_dispatch: None,
52            env_provider: Arc::new(RealEnvProvider::new()),
53        }
54    }
55
56    /// Set idempotency key.
57    pub fn with_idempotency_key(mut self, key: Option<String>) -> Self {
58        self.idempotency_key = key;
59        self
60    }
61
62    /// Set job dispatcher.
63    pub fn with_job_dispatch(mut self, dispatcher: Arc<dyn JobDispatch>) -> Self {
64        self.job_dispatch = Some(dispatcher);
65        self
66    }
67
68    /// Set environment provider.
69    pub fn with_env_provider(mut self, provider: Arc<dyn EnvProvider>) -> Self {
70        self.env_provider = provider;
71        self
72    }
73
74    /// Get database pool.
75    pub fn db(&self) -> crate::function::ForgeDb {
76        crate::function::ForgeDb::from_pool(&self.db_pool)
77    }
78
79    /// Get a `DbConn` for use in shared helper functions.
80    pub fn db_conn(&self) -> crate::function::DbConn<'_> {
81        crate::function::DbConn::Pool(self.db_pool.clone())
82    }
83
84    /// Acquire a connection compatible with sqlx compile-time checked macros.
85    pub async fn conn(&self) -> sqlx::Result<crate::function::ForgeConn<'static>> {
86        Ok(crate::function::ForgeConn::Pool(
87            self.db_pool.acquire().await?,
88        ))
89    }
90
91    /// Get the HTTP client for external requests.
92    pub fn http(&self) -> crate::http::HttpClient {
93        self.http_client.with_timeout(self.http_timeout)
94    }
95
96    /// Get the raw reqwest client, bypassing circuit breaker execution.
97    pub fn raw_http(&self) -> &reqwest::Client {
98        self.http_client.inner()
99    }
100
101    pub fn set_http_timeout(&mut self, timeout: Option<Duration>) {
102        self.http_timeout = timeout;
103    }
104
105    /// Get a request header value.
106    ///
107    /// Header names are case-insensitive.
108    pub fn header(&self, name: &str) -> Option<&str> {
109        self.headers.get(&name.to_lowercase()).map(|s| s.as_str())
110    }
111
112    /// Get all headers.
113    pub fn headers(&self) -> &HashMap<String, String> {
114        &self.headers
115    }
116
117    /// Dispatch a background job for async processing.
118    ///
119    /// This is the recommended way to handle webhook events:
120    /// 1. Validate the webhook signature
121    /// 2. Dispatch a job to process the event
122    /// 3. Return 202 Accepted immediately
123    ///
124    /// # Arguments
125    /// * `job_type` - The registered name of the job type
126    /// * `args` - The arguments for the job (will be serialized to JSON)
127    ///
128    /// # Returns
129    /// The UUID of the dispatched job, or an error if dispatch is not available.
130    pub async fn dispatch_job<T: serde::Serialize>(
131        &self,
132        job_type: &str,
133        args: T,
134    ) -> crate::error::Result<Uuid> {
135        let dispatcher = self.job_dispatch.as_ref().ok_or_else(|| {
136            crate::error::ForgeError::Internal("Job dispatch not available".into())
137        })?;
138        let args_json = serde_json::to_value(args)?;
139        dispatcher.dispatch_by_name(job_type, args_json, None).await
140    }
141
142    /// Request cancellation for a job.
143    pub async fn cancel_job(
144        &self,
145        job_id: Uuid,
146        reason: Option<String>,
147    ) -> crate::error::Result<bool> {
148        let dispatcher = self.job_dispatch.as_ref().ok_or_else(|| {
149            crate::error::ForgeError::Internal("Job dispatch not available".into())
150        })?;
151        dispatcher.cancel(job_id, reason).await
152    }
153}
154
155impl EnvAccess for WebhookContext {
156    fn env_provider(&self) -> &dyn EnvProvider {
157        self.env_provider.as_ref()
158    }
159}
160
161#[cfg(test)]
162#[allow(clippy::unwrap_used, clippy::indexing_slicing)]
163mod tests {
164    use super::*;
165
166    #[tokio::test]
167    async fn test_webhook_context_creation() {
168        let pool = sqlx::postgres::PgPoolOptions::new()
169            .max_connections(1)
170            .connect_lazy("postgres://localhost/nonexistent")
171            .expect("Failed to create mock pool");
172
173        let mut headers = HashMap::new();
174        headers.insert("x-github-event".to_string(), "push".to_string());
175        headers.insert("x-github-delivery".to_string(), "abc-123".to_string());
176
177        let ctx = WebhookContext::new(
178            "github_webhook".to_string(),
179            "req-123".to_string(),
180            headers,
181            pool,
182            CircuitBreakerClient::with_defaults(reqwest::Client::new()),
183        )
184        .with_idempotency_key(Some("abc-123".to_string()));
185
186        assert_eq!(ctx.webhook_name, "github_webhook");
187        assert_eq!(ctx.request_id, "req-123");
188        assert_eq!(ctx.idempotency_key, Some("abc-123".to_string()));
189        assert_eq!(ctx.header("X-GitHub-Event"), Some("push"));
190        assert_eq!(ctx.header("x-github-event"), Some("push")); // case-insensitive
191        assert!(ctx.header("nonexistent").is_none());
192    }
193}