forge_core/testing/context/
mutation.rs1#![allow(clippy::unwrap_used, clippy::indexing_slicing)]
4
5use std::collections::HashMap;
6use std::sync::{Arc, Mutex};
7
8use sqlx::PgPool;
9use uuid::Uuid;
10
11use super::super::mock_dispatch::{MockJobDispatch, MockWorkflowDispatch};
12use super::super::mock_http::{MockHttp, MockRequest, MockResponse};
13use super::build_test_auth;
14use crate::Result;
15use crate::env::{EnvAccess, EnvProvider, MockEnvProvider};
16use crate::function::{AuthContext, OutboxBuffer, PendingJob, PendingWorkflow, RequestMetadata};
17
18pub struct TestMutationContext {
37 pub auth: AuthContext,
39 pub request: RequestMetadata,
41 pool: Option<PgPool>,
43 http: Arc<MockHttp>,
45 job_dispatch: Arc<MockJobDispatch>,
47 workflow_dispatch: Arc<MockWorkflowDispatch>,
49 env_provider: Arc<MockEnvProvider>,
51 outbox: Arc<Mutex<OutboxBuffer>>,
53}
54
55impl TestMutationContext {
56 pub fn builder() -> TestMutationContextBuilder {
58 TestMutationContextBuilder::default()
59 }
60
61 pub fn minimal() -> Self {
63 Self::builder().build()
64 }
65
66 pub fn authenticated(user_id: Uuid) -> Self {
68 Self::builder().as_user(user_id).build()
69 }
70
71 pub fn db(&self) -> Option<&PgPool> {
73 self.pool.as_ref()
74 }
75
76 pub fn http(&self) -> &MockHttp {
78 &self.http
79 }
80
81 pub fn job_dispatch(&self) -> &MockJobDispatch {
83 &self.job_dispatch
84 }
85
86 pub fn workflow_dispatch(&self) -> &MockWorkflowDispatch {
88 &self.workflow_dispatch
89 }
90
91 pub fn require_user_id(&self) -> Result<Uuid> {
93 self.auth.require_user_id()
94 }
95
96 pub fn require_subject(&self) -> Result<&str> {
98 self.auth.require_subject()
99 }
100
101 pub async fn dispatch_job<T: serde::Serialize>(&self, job_type: &str, args: T) -> Result<Uuid> {
103 self.job_dispatch.dispatch(job_type, args).await
104 }
105
106 pub async fn cancel_job(&self, job_id: Uuid, reason: Option<String>) -> Result<bool> {
108 self.job_dispatch.cancel_job(job_id, reason);
109 Ok(true)
110 }
111
112 pub async fn start_workflow<T: serde::Serialize>(&self, name: &str, input: T) -> Result<Uuid> {
114 self.workflow_dispatch.start(name, input).await
115 }
116
117 pub fn env_mock(&self) -> &MockEnvProvider {
119 &self.env_provider
120 }
121
122 pub fn pending_jobs(&self) -> Vec<PendingJob> {
124 self.outbox.lock().unwrap().jobs.clone()
125 }
126
127 pub fn pending_workflows(&self) -> Vec<PendingWorkflow> {
129 self.outbox.lock().unwrap().workflows.clone()
130 }
131
132 pub fn assert_job_buffered(&self, job_type: &str) {
134 let jobs = self.pending_jobs();
135 assert!(
136 jobs.iter().any(|j| j.job_type == job_type),
137 "Expected job '{}' to be buffered, but it was not. Buffered jobs: {:?}",
138 job_type,
139 jobs.iter().map(|j| &j.job_type).collect::<Vec<_>>()
140 );
141 }
142
143 pub fn assert_workflow_buffered(&self, workflow_name: &str) {
145 let workflows = self.pending_workflows();
146 assert!(
147 workflows.iter().any(|w| w.workflow_name == workflow_name),
148 "Expected workflow '{}' to be buffered, but it was not. Buffered workflows: {:?}",
149 workflow_name,
150 workflows
151 .iter()
152 .map(|w| &w.workflow_name)
153 .collect::<Vec<_>>()
154 );
155 }
156}
157
158impl EnvAccess for TestMutationContext {
159 fn env_provider(&self) -> &dyn EnvProvider {
160 self.env_provider.as_ref()
161 }
162}
163
164pub struct TestMutationContextBuilder {
166 user_id: Option<Uuid>,
167 roles: Vec<String>,
168 claims: HashMap<String, serde_json::Value>,
169 pool: Option<PgPool>,
170 http: MockHttp,
171 job_dispatch: Arc<MockJobDispatch>,
172 workflow_dispatch: Arc<MockWorkflowDispatch>,
173 env_vars: HashMap<String, String>,
174}
175
176impl Default for TestMutationContextBuilder {
177 fn default() -> Self {
178 Self {
179 user_id: None,
180 roles: Vec::new(),
181 claims: HashMap::new(),
182 pool: None,
183 http: MockHttp::new(),
184 job_dispatch: Arc::new(MockJobDispatch::new()),
185 workflow_dispatch: Arc::new(MockWorkflowDispatch::new()),
186 env_vars: HashMap::new(),
187 }
188 }
189}
190
191impl TestMutationContextBuilder {
192 pub fn as_user(mut self, id: Uuid) -> Self {
194 self.user_id = Some(id);
195 self
196 }
197
198 pub fn as_subject(mut self, subject: impl Into<String>) -> Self {
200 self.claims
201 .insert("sub".to_string(), serde_json::json!(subject.into()));
202 self
203 }
204
205 pub fn with_role(mut self, role: impl Into<String>) -> Self {
207 self.roles.push(role.into());
208 self
209 }
210
211 pub fn with_roles(mut self, roles: Vec<String>) -> Self {
213 self.roles.extend(roles);
214 self
215 }
216
217 pub fn with_claim(mut self, key: impl Into<String>, value: serde_json::Value) -> Self {
219 self.claims.insert(key.into(), value);
220 self
221 }
222
223 pub fn with_pool(mut self, pool: PgPool) -> Self {
225 self.pool = Some(pool);
226 self
227 }
228
229 pub fn mock_http<F>(self, pattern: &str, handler: F) -> Self
231 where
232 F: Fn(&MockRequest) -> MockResponse + Send + Sync + 'static,
233 {
234 self.http.add_mock_sync(pattern, handler);
235 self
236 }
237
238 pub fn mock_http_json<T: serde::Serialize>(self, pattern: &str, response: T) -> Self {
240 let json = serde_json::to_value(response).unwrap_or(serde_json::Value::Null);
241 self.mock_http(pattern, move |_| MockResponse::json(json.clone()))
242 }
243
244 pub fn with_job_dispatch(mut self, dispatch: Arc<MockJobDispatch>) -> Self {
246 self.job_dispatch = dispatch;
247 self
248 }
249
250 pub fn with_workflow_dispatch(mut self, dispatch: Arc<MockWorkflowDispatch>) -> Self {
252 self.workflow_dispatch = dispatch;
253 self
254 }
255
256 pub fn with_env(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
258 self.env_vars.insert(key.into(), value.into());
259 self
260 }
261
262 pub fn with_envs(mut self, vars: HashMap<String, String>) -> Self {
264 self.env_vars.extend(vars);
265 self
266 }
267
268 pub fn build(self) -> TestMutationContext {
270 TestMutationContext {
271 auth: build_test_auth(self.user_id, self.roles, self.claims),
272 request: RequestMetadata::default(),
273 pool: self.pool,
274 http: Arc::new(self.http),
275 job_dispatch: self.job_dispatch,
276 workflow_dispatch: self.workflow_dispatch,
277 env_provider: Arc::new(MockEnvProvider::with_vars(self.env_vars)),
278 outbox: Arc::new(Mutex::new(OutboxBuffer::default())),
279 }
280 }
281}
282
283#[cfg(test)]
284mod tests {
285 use super::*;
286
287 #[tokio::test]
288 async fn test_dispatch_job() {
289 let ctx = TestMutationContext::authenticated(Uuid::new_v4());
290
291 let job_id = ctx
292 .dispatch_job("send_email", serde_json::json!({"to": "test@example.com"}))
293 .await
294 .unwrap();
295
296 assert!(!job_id.is_nil());
297 ctx.job_dispatch().assert_dispatched("send_email");
298 }
299
300 #[tokio::test]
301 async fn test_start_workflow() {
302 let ctx = TestMutationContext::authenticated(Uuid::new_v4());
303
304 let run_id = ctx
305 .start_workflow("onboarding", serde_json::json!({"user_id": "123"}))
306 .await
307 .unwrap();
308
309 assert!(!run_id.is_nil());
310 ctx.workflow_dispatch().assert_started("onboarding");
311 }
312
313 #[tokio::test]
314 async fn test_cancel_job() {
315 let ctx = TestMutationContext::authenticated(Uuid::new_v4());
316
317 let job_id = ctx
318 .dispatch_job("send_email", serde_json::json!({"to": "test@example.com"}))
319 .await
320 .unwrap();
321
322 let cancelled = ctx
323 .cancel_job(job_id, Some("test cancel".to_string()))
324 .await
325 .unwrap();
326
327 assert!(cancelled);
328 let jobs = ctx.job_dispatch().dispatched_jobs();
329 assert_eq!(jobs[0].status, crate::job::JobStatus::Cancelled);
330 assert_eq!(jobs[0].cancel_reason.as_deref(), Some("test cancel"));
331 }
332
333 #[tokio::test]
334 async fn test_job_not_dispatched() {
335 let ctx = TestMutationContext::authenticated(Uuid::new_v4());
336
337 ctx.job_dispatch().assert_not_dispatched("send_email");
338 }
339}