forge_core/testing/context/
mutation.rs1#![allow(clippy::unwrap_used, clippy::indexing_slicing)]
4
5use std::collections::HashMap;
6use std::sync::{Arc, Mutex};
7
8use sqlx::PgPool;
9use uuid::Uuid;
10
11use super::super::mock_dispatch::{MockJobDispatch, MockWorkflowDispatch};
12use super::super::mock_http::{MockHttp, MockRequest, MockResponse};
13use super::build_test_auth;
14use crate::Result;
15use crate::env::{EnvAccess, EnvProvider, MockEnvProvider};
16use crate::function::{AuthContext, OutboxBuffer, PendingJob, PendingWorkflow, RequestMetadata};
17
18pub struct TestMutationContext {
37 pub auth: AuthContext,
39 pub request: RequestMetadata,
41 pool: Option<PgPool>,
43 http: Arc<MockHttp>,
45 job_dispatch: Arc<MockJobDispatch>,
47 workflow_dispatch: Arc<MockWorkflowDispatch>,
49 env_provider: Arc<MockEnvProvider>,
51 outbox: Arc<Mutex<OutboxBuffer>>,
53}
54
55impl TestMutationContext {
56 pub fn builder() -> TestMutationContextBuilder {
58 TestMutationContextBuilder::default()
59 }
60
61 pub fn minimal() -> Self {
63 Self::builder().build()
64 }
65
66 pub fn authenticated(user_id: Uuid) -> Self {
68 Self::builder().as_user(user_id).build()
69 }
70
71 pub fn db(&self) -> Option<&PgPool> {
73 self.pool.as_ref()
74 }
75
76 pub fn http(&self) -> &MockHttp {
78 &self.http
79 }
80
81 pub fn job_dispatch(&self) -> &MockJobDispatch {
83 &self.job_dispatch
84 }
85
86 pub fn workflow_dispatch(&self) -> &MockWorkflowDispatch {
88 &self.workflow_dispatch
89 }
90
91 pub fn user_id(&self) -> Result<Uuid> {
93 self.auth.require_user_id()
94 }
95
96 pub async fn dispatch_job<T: serde::Serialize>(&self, job_type: &str, args: T) -> Result<Uuid> {
98 self.job_dispatch.dispatch(job_type, args).await
99 }
100
101 pub async fn cancel_job(&self, job_id: Uuid, reason: Option<String>) -> Result<bool> {
103 self.job_dispatch.cancel_job(job_id, reason);
104 Ok(true)
105 }
106
107 pub async fn start_workflow<T: serde::Serialize>(&self, name: &str, input: T) -> Result<Uuid> {
109 self.workflow_dispatch.start(name, input).await
110 }
111
112 pub fn env_mock(&self) -> &MockEnvProvider {
114 &self.env_provider
115 }
116
117 pub fn pending_jobs(&self) -> Vec<PendingJob> {
119 self.outbox.lock().unwrap().jobs.clone()
120 }
121
122 pub fn pending_workflows(&self) -> Vec<PendingWorkflow> {
124 self.outbox.lock().unwrap().workflows.clone()
125 }
126
127 pub fn assert_job_buffered(&self, job_type: &str) {
129 let jobs = self.pending_jobs();
130 assert!(
131 jobs.iter().any(|j| j.job_type == job_type),
132 "Expected job '{}' to be buffered, but it was not. Buffered jobs: {:?}",
133 job_type,
134 jobs.iter().map(|j| &j.job_type).collect::<Vec<_>>()
135 );
136 }
137
138 pub fn assert_workflow_buffered(&self, workflow_name: &str) {
140 let workflows = self.pending_workflows();
141 assert!(
142 workflows.iter().any(|w| w.workflow_name == workflow_name),
143 "Expected workflow '{}' to be buffered, but it was not. Buffered workflows: {:?}",
144 workflow_name,
145 workflows
146 .iter()
147 .map(|w| &w.workflow_name)
148 .collect::<Vec<_>>()
149 );
150 }
151}
152
153impl EnvAccess for TestMutationContext {
154 fn env_provider(&self) -> &dyn EnvProvider {
155 self.env_provider.as_ref()
156 }
157}
158
159pub struct TestMutationContextBuilder {
161 user_id: Option<Uuid>,
162 roles: Vec<String>,
163 claims: HashMap<String, serde_json::Value>,
164 pool: Option<PgPool>,
165 http: MockHttp,
166 job_dispatch: Arc<MockJobDispatch>,
167 workflow_dispatch: Arc<MockWorkflowDispatch>,
168 env_vars: HashMap<String, String>,
169}
170
171impl Default for TestMutationContextBuilder {
172 fn default() -> Self {
173 Self {
174 user_id: None,
175 roles: Vec::new(),
176 claims: HashMap::new(),
177 pool: None,
178 http: MockHttp::new(),
179 job_dispatch: Arc::new(MockJobDispatch::new()),
180 workflow_dispatch: Arc::new(MockWorkflowDispatch::new()),
181 env_vars: HashMap::new(),
182 }
183 }
184}
185
186impl TestMutationContextBuilder {
187 pub fn as_user(mut self, id: Uuid) -> Self {
189 self.user_id = Some(id);
190 self
191 }
192
193 pub fn as_subject(mut self, subject: impl Into<String>) -> Self {
195 self.claims
196 .insert("sub".to_string(), serde_json::json!(subject.into()));
197 self
198 }
199
200 pub fn with_role(mut self, role: impl Into<String>) -> Self {
202 self.roles.push(role.into());
203 self
204 }
205
206 pub fn with_roles(mut self, roles: Vec<String>) -> Self {
208 self.roles.extend(roles);
209 self
210 }
211
212 pub fn with_claim(mut self, key: impl Into<String>, value: serde_json::Value) -> Self {
214 self.claims.insert(key.into(), value);
215 self
216 }
217
218 pub fn with_pool(mut self, pool: PgPool) -> Self {
220 self.pool = Some(pool);
221 self
222 }
223
224 pub fn mock_http<F>(self, pattern: &str, handler: F) -> Self
226 where
227 F: Fn(&MockRequest) -> MockResponse + Send + Sync + 'static,
228 {
229 self.http.add_mock_sync(pattern, handler);
230 self
231 }
232
233 pub fn mock_http_json<T: serde::Serialize>(self, pattern: &str, response: T) -> Self {
235 let json = serde_json::to_value(response).unwrap_or(serde_json::Value::Null);
236 self.mock_http(pattern, move |_| MockResponse::json(json.clone()))
237 }
238
239 pub fn with_job_dispatch(mut self, dispatch: Arc<MockJobDispatch>) -> Self {
241 self.job_dispatch = dispatch;
242 self
243 }
244
245 pub fn with_workflow_dispatch(mut self, dispatch: Arc<MockWorkflowDispatch>) -> Self {
247 self.workflow_dispatch = dispatch;
248 self
249 }
250
251 pub fn with_env(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
253 self.env_vars.insert(key.into(), value.into());
254 self
255 }
256
257 pub fn with_envs(mut self, vars: HashMap<String, String>) -> Self {
259 self.env_vars.extend(vars);
260 self
261 }
262
263 pub fn build(self) -> TestMutationContext {
265 TestMutationContext {
266 auth: build_test_auth(self.user_id, self.roles, self.claims),
267 request: RequestMetadata::default(),
268 pool: self.pool,
269 http: Arc::new(self.http),
270 job_dispatch: self.job_dispatch,
271 workflow_dispatch: self.workflow_dispatch,
272 env_provider: Arc::new(MockEnvProvider::with_vars(self.env_vars)),
273 outbox: Arc::new(Mutex::new(OutboxBuffer::default())),
274 }
275 }
276}
277
278#[cfg(test)]
279mod tests {
280 use super::*;
281
282 #[tokio::test]
283 async fn test_dispatch_job() {
284 let ctx = TestMutationContext::authenticated(Uuid::new_v4());
285
286 let job_id = ctx
287 .dispatch_job("send_email", serde_json::json!({"to": "test@example.com"}))
288 .await
289 .unwrap();
290
291 assert!(!job_id.is_nil());
292 ctx.job_dispatch().assert_dispatched("send_email");
293 }
294
295 #[tokio::test]
296 async fn test_start_workflow() {
297 let ctx = TestMutationContext::authenticated(Uuid::new_v4());
298
299 let run_id = ctx
300 .start_workflow("onboarding", serde_json::json!({"user_id": "123"}))
301 .await
302 .unwrap();
303
304 assert!(!run_id.is_nil());
305 ctx.workflow_dispatch().assert_started("onboarding");
306 }
307
308 #[tokio::test]
309 async fn test_cancel_job() {
310 let ctx = TestMutationContext::authenticated(Uuid::new_v4());
311
312 let job_id = ctx
313 .dispatch_job("send_email", serde_json::json!({"to": "test@example.com"}))
314 .await
315 .unwrap();
316
317 let cancelled = ctx
318 .cancel_job(job_id, Some("test cancel".to_string()))
319 .await
320 .unwrap();
321
322 assert!(cancelled);
323 let jobs = ctx.job_dispatch().dispatched_jobs();
324 assert_eq!(jobs[0].status, crate::job::JobStatus::Cancelled);
325 assert_eq!(jobs[0].cancel_reason.as_deref(), Some("test cancel"));
326 }
327
328 #[tokio::test]
329 async fn test_job_not_dispatched() {
330 let ctx = TestMutationContext::authenticated(Uuid::new_v4());
331
332 ctx.job_dispatch().assert_not_dispatched("send_email");
333 }
334}