forge_core/testing/context/
mutation.rs1#![allow(clippy::unwrap_used, clippy::indexing_slicing)]
4
5use std::collections::HashMap;
6use std::sync::Arc;
7use std::time::Duration;
8
9use chrono::{DateTime, Utc};
10use sqlx::PgPool;
11use uuid::Uuid;
12
13use super::super::mock_dispatch::{MockJobDispatch, MockWorkflowDispatch};
14use super::super::mock_http::{MockHttp, MockRequest, MockResponse};
15use super::build_test_auth;
16use crate::Result;
17use crate::env::{EnvAccess, EnvProvider, MockEnvProvider};
18use crate::function::{
19 AuthContext, JobDispatch, MutationContext, RequestMetadata, WorkflowDispatch,
20};
21use crate::http::CircuitBreakerClient;
22
23pub struct TestMutationContext {
25 pub auth: AuthContext,
26 pub request: RequestMetadata,
27 pool: Option<PgPool>,
28 http: Arc<MockHttp>,
29 job_dispatch: Arc<MockJobDispatch>,
30 workflow_dispatch: Arc<MockWorkflowDispatch>,
31 env_provider: Arc<MockEnvProvider>,
32}
33
34impl TestMutationContext {
35 pub fn builder() -> TestMutationContextBuilder {
37 TestMutationContextBuilder::default()
38 }
39
40 pub fn minimal() -> Self {
42 Self::builder().build()
43 }
44
45 pub fn authenticated(user_id: Uuid) -> Self {
47 Self::builder().as_user(user_id).build()
48 }
49
50 pub fn db(&self) -> Option<&PgPool> {
52 self.pool.as_ref()
53 }
54
55 pub fn http(&self) -> &MockHttp {
57 &self.http
58 }
59
60 pub fn job_dispatch(&self) -> &MockJobDispatch {
62 &self.job_dispatch
63 }
64
65 pub fn workflow_dispatch(&self) -> &MockWorkflowDispatch {
67 &self.workflow_dispatch
68 }
69
70 pub fn user_id(&self) -> Result<Uuid> {
72 self.auth.require_user_id()
73 }
74
75 pub async fn dispatch_job<T: serde::Serialize>(&self, job_type: &str, args: T) -> Result<Uuid> {
77 self.job_dispatch.dispatch(job_type, args).await
78 }
79
80 pub async fn dispatch_job_at<T: serde::Serialize>(
82 &self,
83 job_type: &str,
84 args: T,
85 scheduled_at: DateTime<Utc>,
86 ) -> Result<Uuid> {
87 self.job_dispatch
88 .dispatch_at(job_type, args, scheduled_at)
89 .await
90 }
91
92 pub async fn dispatch_job_after<T: serde::Serialize>(
94 &self,
95 job_type: &str,
96 args: T,
97 delay: Duration,
98 ) -> Result<Uuid> {
99 let scheduled_at = Utc::now()
100 + chrono::Duration::from_std(delay)
101 .map_err(|_| crate::error::ForgeError::InvalidArgument("delay too large".into()))?;
102 self.job_dispatch
103 .dispatch_at(job_type, args, scheduled_at)
104 .await
105 }
106
107 pub async fn dispatch<J: crate::ForgeJob>(&self, args: J::Args) -> Result<Uuid> {
110 self.dispatch_job(J::info().name, args).await
111 }
112
113 pub async fn dispatch_at<J: crate::ForgeJob>(
115 &self,
116 args: J::Args,
117 scheduled_at: DateTime<Utc>,
118 ) -> Result<Uuid> {
119 self.dispatch_job_at(J::info().name, args, scheduled_at)
120 .await
121 }
122
123 pub async fn dispatch_after<J: crate::ForgeJob>(
125 &self,
126 args: J::Args,
127 delay: Duration,
128 ) -> Result<Uuid> {
129 self.dispatch_job_after(J::info().name, args, delay).await
130 }
131
132 pub async fn cancel_job(&self, job_id: Uuid, reason: Option<String>) -> Result<bool> {
134 self.job_dispatch.cancel_job(job_id, reason);
135 Ok(true)
136 }
137
138 pub async fn start_workflow<T: serde::Serialize>(&self, name: &str, input: T) -> Result<Uuid> {
140 self.workflow_dispatch.start(name, input).await
141 }
142
143 pub async fn start<W: crate::ForgeWorkflow>(&self, input: W::Input) -> Result<Uuid> {
145 self.start_workflow(W::info().name, input).await
146 }
147
148 pub fn env_mock(&self) -> &MockEnvProvider {
150 &self.env_provider
151 }
152
153 pub fn into_mutation_context(self, pool: sqlx::PgPool) -> MutationContext {
166 let job_dispatch: Option<Arc<dyn JobDispatch>> = Some(self.job_dispatch);
167 let workflow_dispatch: Option<Arc<dyn WorkflowDispatch>> = Some(self.workflow_dispatch);
168 MutationContext::with_env(
169 pool,
170 self.auth,
171 self.request,
172 CircuitBreakerClient::with_defaults(reqwest::Client::new()),
173 job_dispatch,
174 workflow_dispatch,
175 self.env_provider,
176 )
177 }
178}
179
180impl EnvAccess for TestMutationContext {
181 fn env_provider(&self) -> &dyn EnvProvider {
182 self.env_provider.as_ref()
183 }
184}
185
186pub struct TestMutationContextBuilder {
188 user_id: Option<Uuid>,
189 roles: Vec<String>,
190 claims: HashMap<String, serde_json::Value>,
191 pool: Option<PgPool>,
192 http: MockHttp,
193 job_dispatch: Arc<MockJobDispatch>,
194 workflow_dispatch: Arc<MockWorkflowDispatch>,
195 env_vars: HashMap<String, String>,
196}
197
198impl Default for TestMutationContextBuilder {
199 fn default() -> Self {
200 Self {
201 user_id: None,
202 roles: Vec::new(),
203 claims: HashMap::new(),
204 pool: None,
205 http: MockHttp::new(),
206 job_dispatch: Arc::new(MockJobDispatch::new()),
207 workflow_dispatch: Arc::new(MockWorkflowDispatch::new()),
208 env_vars: HashMap::new(),
209 }
210 }
211}
212
213impl TestMutationContextBuilder {
214 pub fn as_user(mut self, id: Uuid) -> Self {
216 self.user_id = Some(id);
217 self
218 }
219
220 pub fn as_subject(mut self, subject: impl Into<String>) -> Self {
222 self.claims
223 .insert("sub".to_string(), serde_json::json!(subject.into()));
224 self
225 }
226
227 pub fn with_role(mut self, role: impl Into<String>) -> Self {
229 self.roles.push(role.into());
230 self
231 }
232
233 pub fn with_roles(mut self, roles: Vec<String>) -> Self {
235 self.roles.extend(roles);
236 self
237 }
238
239 pub fn with_claim(mut self, key: impl Into<String>, value: serde_json::Value) -> Self {
241 self.claims.insert(key.into(), value);
242 self
243 }
244
245 pub fn with_pool(mut self, pool: PgPool) -> Self {
247 self.pool = Some(pool);
248 self
249 }
250
251 pub fn mock_http<F>(self, pattern: &str, handler: F) -> Self
253 where
254 F: Fn(&MockRequest) -> MockResponse + Send + Sync + 'static,
255 {
256 self.http.add_mock_sync(pattern, handler);
257 self
258 }
259
260 pub fn mock_http_json<T: serde::Serialize>(self, pattern: &str, response: T) -> Self {
262 let json = serde_json::to_value(response).unwrap_or(serde_json::Value::Null);
263 self.mock_http(pattern, move |_| MockResponse::json(json.clone()))
264 }
265
266 pub fn with_job_dispatch(mut self, dispatch: Arc<MockJobDispatch>) -> Self {
268 self.job_dispatch = dispatch;
269 self
270 }
271
272 pub fn with_workflow_dispatch(mut self, dispatch: Arc<MockWorkflowDispatch>) -> Self {
274 self.workflow_dispatch = dispatch;
275 self
276 }
277
278 pub fn with_env(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
280 self.env_vars.insert(key.into(), value.into());
281 self
282 }
283
284 pub fn with_envs(mut self, vars: HashMap<String, String>) -> Self {
286 self.env_vars.extend(vars);
287 self
288 }
289
290 pub fn build(self) -> TestMutationContext {
292 TestMutationContext {
293 auth: build_test_auth(self.user_id, self.roles, self.claims),
294 request: RequestMetadata::default(),
295 pool: self.pool,
296 http: Arc::new(self.http),
297 job_dispatch: self.job_dispatch,
298 workflow_dispatch: self.workflow_dispatch,
299 env_provider: Arc::new(MockEnvProvider::with_vars(self.env_vars)),
300 }
301 }
302}
303
304#[cfg(test)]
305mod tests {
306 use super::*;
307 use crate::error::ForgeError;
308
309 #[tokio::test]
310 async fn test_dispatch_job() {
311 let ctx = TestMutationContext::authenticated(Uuid::new_v4());
312
313 let job_id = ctx
314 .dispatch_job("send_email", serde_json::json!({"to": "test@example.com"}))
315 .await
316 .unwrap();
317
318 assert!(!job_id.is_nil());
319 ctx.job_dispatch().assert_dispatched("send_email");
320 }
321
322 #[tokio::test]
323 async fn test_start_workflow() {
324 let ctx = TestMutationContext::authenticated(Uuid::new_v4());
325
326 let run_id = ctx
327 .start_workflow("onboarding", serde_json::json!({"user_id": "123"}))
328 .await
329 .unwrap();
330
331 assert!(!run_id.is_nil());
332 ctx.workflow_dispatch().assert_started("onboarding");
333 }
334
335 #[tokio::test]
336 async fn test_cancel_job() {
337 let ctx = TestMutationContext::authenticated(Uuid::new_v4());
338
339 let job_id = ctx
340 .dispatch_job("send_email", serde_json::json!({"to": "test@example.com"}))
341 .await
342 .unwrap();
343
344 let cancelled = ctx
345 .cancel_job(job_id, Some("test cancel".to_string()))
346 .await
347 .unwrap();
348
349 assert!(cancelled);
350 let jobs = ctx.job_dispatch().dispatched_jobs();
351 assert_eq!(jobs[0].status, crate::job::JobStatus::Cancelled);
352 assert_eq!(jobs[0].cancel_reason.as_deref(), Some("test cancel"));
353 }
354
355 #[tokio::test]
356 async fn test_job_not_dispatched() {
357 let ctx = TestMutationContext::authenticated(Uuid::new_v4());
358
359 ctx.job_dispatch().assert_not_dispatched("send_email");
360 }
361
362 #[tokio::test]
363 async fn minimal_is_unauthenticated_and_user_id_errors() {
364 let ctx = TestMutationContext::minimal();
365 assert!(!ctx.auth.is_authenticated());
366 let err = ctx.user_id().unwrap_err();
367 assert!(
368 matches!(err, ForgeError::Unauthorized(_)),
369 "expected Unauthorized, got {err:?}"
370 );
371 assert!(ctx.db().is_none());
372 }
373
374 #[tokio::test]
375 async fn authenticated_helper_exposes_user_id() {
376 let uid = Uuid::new_v4();
377 let ctx = TestMutationContext::authenticated(uid);
378 assert_eq!(ctx.user_id().unwrap(), uid);
379 assert!(ctx.auth.is_authenticated());
380 }
381
382 #[tokio::test]
383 async fn as_subject_authenticates_without_uuid() {
384 let ctx = TestMutationContext::builder()
385 .as_subject("firebase|abc123")
386 .build();
387 assert!(ctx.auth.is_authenticated());
389 assert!(ctx.user_id().is_err(), "no UUID -> require_user_id fails");
390 assert_eq!(
391 ctx.auth.claim("sub"),
392 Some(&serde_json::json!("firebase|abc123"))
393 );
394 }
395
396 #[tokio::test]
397 async fn builder_with_role_and_with_roles_compose() {
398 let ctx = TestMutationContext::builder()
399 .as_user(Uuid::new_v4())
400 .with_role("editor")
401 .with_roles(vec!["admin".to_string(), "billing".to_string()])
402 .build();
403 assert!(ctx.auth.has_role("editor"));
404 assert!(ctx.auth.has_role("admin"));
405 assert!(ctx.auth.has_role("billing"));
406 assert!(!ctx.auth.has_role("ghost"));
407 }
408
409 #[tokio::test]
410 async fn with_claim_round_trips_through_auth_context() {
411 let ctx = TestMutationContext::builder()
412 .as_user(Uuid::new_v4())
413 .with_claim("tenant", serde_json::json!("acme"))
414 .build();
415 assert_eq!(ctx.auth.claim("tenant"), Some(&serde_json::json!("acme")));
416 }
417
418 #[tokio::test]
419 async fn with_envs_bulk_loads_provider() {
420 let mut vars = HashMap::new();
421 vars.insert("K1".to_string(), "v1".to_string());
422 vars.insert("K2".to_string(), "v2".to_string());
423 let ctx = TestMutationContext::builder().with_envs(vars).build();
424 assert_eq!(ctx.env("K1"), Some("v1".to_string()));
425 assert_eq!(ctx.env("K2"), Some("v2".to_string()));
426 assert!(ctx.env_mock().was_accessed("K1"));
427 }
428
429 #[tokio::test]
430 async fn with_env_single_and_with_envs_compose() {
431 let mut bulk = HashMap::new();
432 bulk.insert("BULK".to_string(), "b".to_string());
433 let ctx = TestMutationContext::builder()
434 .with_env("ONE", "1")
435 .with_envs(bulk)
436 .build();
437 assert_eq!(ctx.env("ONE"), Some("1".to_string()));
438 assert_eq!(ctx.env("BULK"), Some("b".to_string()));
439 }
440
441 #[tokio::test]
442 async fn with_job_dispatch_shares_state() {
443 let shared = Arc::new(MockJobDispatch::new());
444 let ctx = TestMutationContext::builder()
445 .with_job_dispatch(shared.clone())
446 .build();
447 ctx.dispatch_job("ext", serde_json::json!({}))
448 .await
449 .unwrap();
450 shared.assert_dispatched("ext");
452 }
453
454 #[tokio::test]
455 async fn with_workflow_dispatch_shares_state() {
456 let shared = Arc::new(MockWorkflowDispatch::new());
457 let ctx = TestMutationContext::builder()
458 .with_workflow_dispatch(shared.clone())
459 .build();
460 ctx.start_workflow("ext_wf", serde_json::json!({}))
461 .await
462 .unwrap();
463 shared.assert_started("ext_wf");
464 }
465
466 #[cfg(feature = "testcontainers")]
467 #[tokio::test]
468 async fn bridge_to_mutation_context_preserves_mocks() {
469 use crate::function::MutationContext;
474 use crate::testing::db::TestDatabase;
475
476 let db = TestDatabase::from_env().await.expect("test DB");
477 let shared_jobs = Arc::new(super::super::super::mock_dispatch::MockJobDispatch::new());
478 let uid = Uuid::new_v4();
479
480 let test_ctx = TestMutationContext::builder()
481 .as_user(uid)
482 .with_job_dispatch(shared_jobs.clone())
483 .build();
484 let ctx: MutationContext = test_ctx.into_mutation_context(db.pool().clone());
485
486 ctx.dispatch_job("welcome_email", serde_json::json!({"to": "a@b"}))
488 .await
489 .expect("dispatch through bridged context");
490
491 shared_jobs.assert_dispatched("welcome_email");
492 }
493
494 #[tokio::test]
495 async fn mock_http_json_executes_via_pattern() {
496 let ctx = TestMutationContext::builder()
497 .mock_http_json("https://api.test/echo", serde_json::json!({"ok": true}))
498 .build();
499 let req = MockRequest {
500 method: "GET".to_string(),
501 path: "/echo".to_string(),
502 url: "https://api.test/echo".to_string(),
503 headers: HashMap::new(),
504 body: serde_json::Value::Null,
505 };
506 let resp = ctx.http().execute(req).await;
507 assert_eq!(resp.status, 200);
508 assert_eq!(resp.body, serde_json::json!({"ok": true}));
509 ctx.http().assert_called("https://api.test/echo");
510 }
511}