forge_core/testing/context/
mutation.rs1use std::collections::HashMap;
4use std::sync::{Arc, Mutex};
5
6use sqlx::PgPool;
7use uuid::Uuid;
8
9use super::super::mock_dispatch::{MockJobDispatch, MockWorkflowDispatch};
10use super::super::mock_http::{MockHttp, MockRequest, MockResponse};
11use super::build_test_auth;
12use crate::Result;
13use crate::env::{EnvAccess, EnvProvider, MockEnvProvider};
14use crate::function::{AuthContext, OutboxBuffer, PendingJob, PendingWorkflow, RequestMetadata};
15
16pub struct TestMutationContext {
35 pub auth: AuthContext,
37 pub request: RequestMetadata,
39 pool: Option<PgPool>,
41 http: Arc<MockHttp>,
43 job_dispatch: Arc<MockJobDispatch>,
45 workflow_dispatch: Arc<MockWorkflowDispatch>,
47 env_provider: Arc<MockEnvProvider>,
49 outbox: Arc<Mutex<OutboxBuffer>>,
51}
52
53impl TestMutationContext {
54 pub fn builder() -> TestMutationContextBuilder {
56 TestMutationContextBuilder::default()
57 }
58
59 pub fn minimal() -> Self {
61 Self::builder().build()
62 }
63
64 pub fn authenticated(user_id: Uuid) -> Self {
66 Self::builder().as_user(user_id).build()
67 }
68
69 pub fn db(&self) -> Option<&PgPool> {
71 self.pool.as_ref()
72 }
73
74 pub fn http(&self) -> &MockHttp {
76 &self.http
77 }
78
79 pub fn job_dispatch(&self) -> &MockJobDispatch {
81 &self.job_dispatch
82 }
83
84 pub fn workflow_dispatch(&self) -> &MockWorkflowDispatch {
86 &self.workflow_dispatch
87 }
88
89 pub fn require_user_id(&self) -> Result<Uuid> {
91 self.auth.require_user_id()
92 }
93
94 pub fn require_subject(&self) -> Result<&str> {
96 self.auth.require_subject()
97 }
98
99 pub async fn dispatch_job<T: serde::Serialize>(&self, job_type: &str, args: T) -> Result<Uuid> {
101 self.job_dispatch.dispatch(job_type, args).await
102 }
103
104 pub async fn cancel_job(&self, job_id: Uuid, reason: Option<String>) -> Result<bool> {
106 self.job_dispatch.cancel_job(job_id, reason);
107 Ok(true)
108 }
109
110 pub async fn start_workflow<T: serde::Serialize>(&self, name: &str, input: T) -> Result<Uuid> {
112 self.workflow_dispatch.start(name, input).await
113 }
114
115 pub fn env_mock(&self) -> &MockEnvProvider {
117 &self.env_provider
118 }
119
120 pub fn pending_jobs(&self) -> Vec<PendingJob> {
122 self.outbox.lock().unwrap().jobs.clone()
123 }
124
125 pub fn pending_workflows(&self) -> Vec<PendingWorkflow> {
127 self.outbox.lock().unwrap().workflows.clone()
128 }
129
130 pub fn assert_job_buffered(&self, job_type: &str) {
132 let jobs = self.pending_jobs();
133 assert!(
134 jobs.iter().any(|j| j.job_type == job_type),
135 "Expected job '{}' to be buffered, but it was not. Buffered jobs: {:?}",
136 job_type,
137 jobs.iter().map(|j| &j.job_type).collect::<Vec<_>>()
138 );
139 }
140
141 pub fn assert_workflow_buffered(&self, workflow_name: &str) {
143 let workflows = self.pending_workflows();
144 assert!(
145 workflows.iter().any(|w| w.workflow_name == workflow_name),
146 "Expected workflow '{}' to be buffered, but it was not. Buffered workflows: {:?}",
147 workflow_name,
148 workflows
149 .iter()
150 .map(|w| &w.workflow_name)
151 .collect::<Vec<_>>()
152 );
153 }
154}
155
156impl EnvAccess for TestMutationContext {
157 fn env_provider(&self) -> &dyn EnvProvider {
158 self.env_provider.as_ref()
159 }
160}
161
162pub struct TestMutationContextBuilder {
164 user_id: Option<Uuid>,
165 roles: Vec<String>,
166 claims: HashMap<String, serde_json::Value>,
167 pool: Option<PgPool>,
168 http: MockHttp,
169 job_dispatch: Arc<MockJobDispatch>,
170 workflow_dispatch: Arc<MockWorkflowDispatch>,
171 env_vars: HashMap<String, String>,
172}
173
174impl Default for TestMutationContextBuilder {
175 fn default() -> Self {
176 Self {
177 user_id: None,
178 roles: Vec::new(),
179 claims: HashMap::new(),
180 pool: None,
181 http: MockHttp::new(),
182 job_dispatch: Arc::new(MockJobDispatch::new()),
183 workflow_dispatch: Arc::new(MockWorkflowDispatch::new()),
184 env_vars: HashMap::new(),
185 }
186 }
187}
188
189impl TestMutationContextBuilder {
190 pub fn as_user(mut self, id: Uuid) -> Self {
192 self.user_id = Some(id);
193 self
194 }
195
196 pub fn as_subject(mut self, subject: impl Into<String>) -> Self {
198 self.claims
199 .insert("sub".to_string(), serde_json::json!(subject.into()));
200 self
201 }
202
203 pub fn with_role(mut self, role: impl Into<String>) -> Self {
205 self.roles.push(role.into());
206 self
207 }
208
209 pub fn with_roles(mut self, roles: Vec<String>) -> Self {
211 self.roles.extend(roles);
212 self
213 }
214
215 pub fn with_claim(mut self, key: impl Into<String>, value: serde_json::Value) -> Self {
217 self.claims.insert(key.into(), value);
218 self
219 }
220
221 pub fn with_pool(mut self, pool: PgPool) -> Self {
223 self.pool = Some(pool);
224 self
225 }
226
227 pub fn mock_http<F>(self, pattern: &str, handler: F) -> Self
229 where
230 F: Fn(&MockRequest) -> MockResponse + Send + Sync + 'static,
231 {
232 self.http.add_mock_sync(pattern, handler);
233 self
234 }
235
236 pub fn mock_http_json<T: serde::Serialize>(self, pattern: &str, response: T) -> Self {
238 let json = serde_json::to_value(response).unwrap_or(serde_json::Value::Null);
239 self.mock_http(pattern, move |_| MockResponse::json(json.clone()))
240 }
241
242 pub fn with_job_dispatch(mut self, dispatch: Arc<MockJobDispatch>) -> Self {
244 self.job_dispatch = dispatch;
245 self
246 }
247
248 pub fn with_workflow_dispatch(mut self, dispatch: Arc<MockWorkflowDispatch>) -> Self {
250 self.workflow_dispatch = dispatch;
251 self
252 }
253
254 pub fn with_env(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
256 self.env_vars.insert(key.into(), value.into());
257 self
258 }
259
260 pub fn with_envs(mut self, vars: HashMap<String, String>) -> Self {
262 self.env_vars.extend(vars);
263 self
264 }
265
266 pub fn build(self) -> TestMutationContext {
268 TestMutationContext {
269 auth: build_test_auth(self.user_id, self.roles, self.claims),
270 request: RequestMetadata::default(),
271 pool: self.pool,
272 http: Arc::new(self.http),
273 job_dispatch: self.job_dispatch,
274 workflow_dispatch: self.workflow_dispatch,
275 env_provider: Arc::new(MockEnvProvider::with_vars(self.env_vars)),
276 outbox: Arc::new(Mutex::new(OutboxBuffer::default())),
277 }
278 }
279}
280
281#[cfg(test)]
282mod tests {
283 use super::*;
284
285 #[tokio::test]
286 async fn test_dispatch_job() {
287 let ctx = TestMutationContext::authenticated(Uuid::new_v4());
288
289 let job_id = ctx
290 .dispatch_job("send_email", serde_json::json!({"to": "test@example.com"}))
291 .await
292 .unwrap();
293
294 assert!(!job_id.is_nil());
295 ctx.job_dispatch().assert_dispatched("send_email");
296 }
297
298 #[tokio::test]
299 async fn test_start_workflow() {
300 let ctx = TestMutationContext::authenticated(Uuid::new_v4());
301
302 let run_id = ctx
303 .start_workflow("onboarding", serde_json::json!({"user_id": "123"}))
304 .await
305 .unwrap();
306
307 assert!(!run_id.is_nil());
308 ctx.workflow_dispatch().assert_started("onboarding");
309 }
310
311 #[tokio::test]
312 async fn test_cancel_job() {
313 let ctx = TestMutationContext::authenticated(Uuid::new_v4());
314
315 let job_id = ctx
316 .dispatch_job("send_email", serde_json::json!({"to": "test@example.com"}))
317 .await
318 .unwrap();
319
320 let cancelled = ctx
321 .cancel_job(job_id, Some("test cancel".to_string()))
322 .await
323 .unwrap();
324
325 assert!(cancelled);
326 let jobs = ctx.job_dispatch().dispatched_jobs();
327 assert_eq!(jobs[0].status, crate::job::JobStatus::Cancelled);
328 assert_eq!(jobs[0].cancel_reason.as_deref(), Some("test cancel"));
329 }
330
331 #[tokio::test]
332 async fn test_job_not_dispatched() {
333 let ctx = TestMutationContext::authenticated(Uuid::new_v4());
334
335 ctx.job_dispatch().assert_not_dispatched("send_email");
336 }
337}