1use std::collections::HashMap;
4use std::time::Duration;
5
6use chrono::{DateTime, Utc};
7use serde::{Serialize, de::DeserializeOwned};
8use uuid::Uuid;
9
10use forge_core::error::{ForgeError, Result};
11use forge_core::function::{AuthContext, MutationContext, QueryContext};
12use forge_core::job::JobStatus;
13use forge_core::workflow::WorkflowStatus;
14
15use super::TestConfig;
16use super::mock::{MockHttp, MockRequest, MockResponse};
17
18pub struct TestContext {
23 pool: Option<sqlx::PgPool>,
25 #[allow(dead_code)]
27 tx: Option<sqlx::Transaction<'static, sqlx::Postgres>>,
28 mock_http: MockHttp,
30 auth: AuthContext,
32 #[allow(dead_code)]
34 config: TestConfig,
35 dispatched_jobs: Vec<DispatchedJob>,
37 started_workflows: Vec<StartedWorkflow>,
39}
40
41#[derive(Debug, Clone)]
43pub struct DispatchedJob {
44 pub id: Uuid,
46 pub job_type: String,
48 pub input: serde_json::Value,
50 pub dispatched_at: DateTime<Utc>,
52 pub status: JobStatus,
54}
55
56#[derive(Debug, Clone)]
58pub struct StartedWorkflow {
59 pub run_id: Uuid,
61 pub workflow_name: String,
63 pub input: serde_json::Value,
65 pub started_at: DateTime<Utc>,
67 pub status: WorkflowStatus,
69 pub completed_steps: Vec<String>,
71}
72
73impl TestContext {
74 pub fn new_without_db() -> Self {
76 Self {
77 pool: None,
78 tx: None,
79 mock_http: MockHttp::new(),
80 auth: AuthContext::unauthenticated(),
81 config: TestConfig::default(),
82 dispatched_jobs: Vec::new(),
83 started_workflows: Vec::new(),
84 }
85 }
86
87 pub async fn new() -> Result<Self> {
89 let config = TestConfig::default();
90 Self::with_config(config).await
91 }
92
93 pub async fn with_config(config: TestConfig) -> Result<Self> {
95 let pool = if let Some(ref url) = config.database_url {
96 Some(
97 sqlx::postgres::PgPoolOptions::new()
98 .max_connections(config.max_connections)
99 .acquire_timeout(Duration::from_secs(30))
100 .connect(url)
101 .await
102 .map_err(|e| ForgeError::Database(e.to_string()))?,
103 )
104 } else {
105 None
106 };
107
108 Ok(Self {
109 pool,
110 tx: None,
111 mock_http: MockHttp::new(),
112 auth: AuthContext::unauthenticated(),
113 config,
114 dispatched_jobs: Vec::new(),
115 started_workflows: Vec::new(),
116 })
117 }
118
119 pub fn builder() -> TestContextBuilder {
121 TestContextBuilder::new()
122 }
123
124 pub fn pool(&self) -> Option<&sqlx::PgPool> {
126 self.pool.as_ref()
127 }
128
129 pub fn auth(&self) -> &AuthContext {
131 &self.auth
132 }
133
134 pub fn user_id(&self) -> Option<Uuid> {
136 if self.auth.is_authenticated() {
137 self.auth.user_id()
138 } else {
139 None
140 }
141 }
142
143 pub fn set_user(&mut self, user_id: Uuid) {
145 self.auth = AuthContext::authenticated(user_id, vec![], HashMap::new());
146 }
147
148 pub fn mock_http(&self) -> &MockHttp {
150 &self.mock_http
151 }
152
153 pub fn mock_http_mut(&mut self) -> &mut MockHttp {
155 &mut self.mock_http
156 }
157
158 pub async fn query<F, I, O>(&self, _func: F, _input: I) -> Result<O>
160 where
161 F: Fn(
162 QueryContext,
163 I,
164 ) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<O>> + Send>>,
165 I: Serialize,
166 O: DeserializeOwned,
167 {
168 Err(ForgeError::Internal(
170 "Query execution requires database connection".to_string(),
171 ))
172 }
173
174 pub async fn mutate<F, I, O>(&self, _func: F, _input: I) -> Result<O>
176 where
177 F: Fn(
178 MutationContext,
179 I,
180 ) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<O>> + Send>>,
181 I: Serialize,
182 O: DeserializeOwned,
183 {
184 Err(ForgeError::Internal(
186 "Mutation execution requires database connection".to_string(),
187 ))
188 }
189
190 pub fn dispatch_job(&mut self, job_type: &str, input: serde_json::Value) -> Uuid {
192 let job_id = Uuid::new_v4();
193 self.dispatched_jobs.push(DispatchedJob {
194 id: job_id,
195 job_type: job_type.to_string(),
196 input,
197 dispatched_at: Utc::now(),
198 status: JobStatus::Pending,
199 });
200 job_id
201 }
202
203 pub fn cancel_job(&mut self, job_id: Uuid) {
205 if let Some(job) = self.dispatched_jobs.iter_mut().find(|j| j.id == job_id) {
206 job.status = JobStatus::Cancelled;
207 }
208 }
209
210 pub fn dispatched_jobs(&self) -> &[DispatchedJob] {
212 &self.dispatched_jobs
213 }
214
215 pub fn job_dispatched(&self, job_type: &str) -> bool {
217 self.dispatched_jobs.iter().any(|j| j.job_type == job_type)
218 }
219
220 pub fn job_status(&self, job_id: Uuid) -> Option<JobStatus> {
222 self.dispatched_jobs
223 .iter()
224 .find(|j| j.id == job_id)
225 .map(|j| j.status)
226 }
227
228 pub fn complete_job(&mut self, job_id: Uuid) {
230 if let Some(job) = self.dispatched_jobs.iter_mut().find(|j| j.id == job_id) {
231 job.status = JobStatus::Completed;
232 }
233 }
234
235 pub fn run_jobs(&mut self) {
237 for job in &mut self.dispatched_jobs {
238 if job.status == JobStatus::Pending {
239 job.status = JobStatus::Completed;
240 }
241 }
242 }
243
244 pub fn start_workflow(&mut self, workflow_name: &str, input: serde_json::Value) -> Uuid {
246 let run_id = Uuid::new_v4();
247 self.started_workflows.push(StartedWorkflow {
248 run_id,
249 workflow_name: workflow_name.to_string(),
250 input,
251 started_at: Utc::now(),
252 status: WorkflowStatus::Created,
253 completed_steps: Vec::new(),
254 });
255 run_id
256 }
257
258 pub fn started_workflows(&self) -> &[StartedWorkflow] {
260 &self.started_workflows
261 }
262
263 pub fn workflow_status(&self, run_id: Uuid) -> Option<WorkflowStatus> {
265 self.started_workflows
266 .iter()
267 .find(|w| w.run_id == run_id)
268 .map(|w| w.status)
269 }
270
271 pub fn complete_workflow_step(&mut self, run_id: Uuid, step_name: &str) {
273 if let Some(workflow) = self
274 .started_workflows
275 .iter_mut()
276 .find(|w| w.run_id == run_id)
277 {
278 workflow.completed_steps.push(step_name.to_string());
279 }
280 }
281
282 pub fn complete_workflow(&mut self, run_id: Uuid) {
284 if let Some(workflow) = self
285 .started_workflows
286 .iter_mut()
287 .find(|w| w.run_id == run_id)
288 {
289 workflow.status = WorkflowStatus::Completed;
290 }
291 }
292
293 pub fn workflow_step_completed(&self, run_id: Uuid, step_name: &str) -> bool {
295 self.started_workflows
296 .iter()
297 .find(|w| w.run_id == run_id)
298 .map(|w| w.completed_steps.contains(&step_name.to_string()))
299 .unwrap_or(false)
300 }
301}
302
303pub struct TestContextBuilder {
305 config: TestConfig,
306 user_id: Option<Uuid>,
307 roles: Vec<String>,
308 custom_claims: HashMap<String, serde_json::Value>,
309 mock_http: MockHttp,
310}
311
312impl TestContextBuilder {
313 pub fn new() -> Self {
315 Self {
316 config: TestConfig::default(),
317 user_id: None,
318 roles: Vec::new(),
319 custom_claims: HashMap::new(),
320 mock_http: MockHttp::new(),
321 }
322 }
323
324 pub fn database_url(mut self, url: impl Into<String>) -> Self {
326 self.config.database_url = Some(url.into());
327 self
328 }
329
330 pub fn as_user(mut self, user_id: Uuid) -> Self {
332 self.user_id = Some(user_id);
333 self
334 }
335
336 pub fn with_roles(mut self, roles: Vec<String>) -> Self {
338 self.roles = roles;
339 self
340 }
341
342 pub fn with_claims(mut self, claims: HashMap<String, serde_json::Value>) -> Self {
344 self.custom_claims = claims;
345 self
346 }
347
348 pub fn with_logging(mut self, enabled: bool) -> Self {
350 self.config.logging = enabled;
351 self
352 }
353
354 pub fn mock_http(
356 mut self,
357 pattern: &str,
358 handler: impl Fn(&MockRequest) -> MockResponse + Send + Sync + 'static,
359 ) -> Self {
360 self.mock_http.add_mock(pattern, handler);
361 self
362 }
363
364 pub async fn build(self) -> Result<TestContext> {
366 let mut ctx = TestContext::with_config(self.config).await?;
367
368 if let Some(user_id) = self.user_id {
369 ctx.auth = AuthContext::authenticated(user_id, self.roles, self.custom_claims);
370 }
371
372 ctx.mock_http = self.mock_http;
373
374 Ok(ctx)
375 }
376}
377
378impl Default for TestContextBuilder {
379 fn default() -> Self {
380 Self::new()
381 }
382}
383
384#[cfg(test)]
385mod tests {
386 use super::*;
387
388 #[test]
389 fn test_context_builder() {
390 let builder = TestContextBuilder::new()
391 .as_user(Uuid::new_v4())
392 .with_logging(true);
393
394 assert!(builder.user_id.is_some());
395 assert!(builder.config.logging);
396 }
397
398 #[test]
399 fn test_context_without_db() {
400 let ctx = TestContext::new_without_db();
401 assert!(ctx.pool().is_none());
402 assert!(!ctx.auth().is_authenticated());
403 }
404
405 #[test]
406 fn test_job_dispatch() {
407 let mut ctx = TestContext::new_without_db();
408 let job_id = ctx.dispatch_job("send_email", serde_json::json!({"to": "test@example.com"}));
409
410 assert!(ctx.job_dispatched("send_email"));
411 assert_eq!(ctx.job_status(job_id), Some(JobStatus::Pending));
412
413 ctx.complete_job(job_id);
414 assert_eq!(ctx.job_status(job_id), Some(JobStatus::Completed));
415 }
416
417 #[test]
418 fn test_workflow_tracking() {
419 let mut ctx = TestContext::new_without_db();
420 let run_id = ctx.start_workflow(
421 "onboarding",
422 serde_json::json!({"email": "test@example.com"}),
423 );
424
425 assert_eq!(ctx.workflow_status(run_id), Some(WorkflowStatus::Created));
426
427 ctx.complete_workflow_step(run_id, "create_user");
428 assert!(ctx.workflow_step_completed(run_id, "create_user"));
429 assert!(!ctx.workflow_step_completed(run_id, "send_email"));
430
431 ctx.complete_workflow(run_id);
432 assert_eq!(ctx.workflow_status(run_id), Some(WorkflowStatus::Completed));
433 }
434
435 #[test]
436 fn test_run_jobs() {
437 let mut ctx = TestContext::new_without_db();
438 let job1 = ctx.dispatch_job("job1", serde_json::json!({}));
439 let job2 = ctx.dispatch_job("job2", serde_json::json!({}));
440
441 ctx.run_jobs();
442
443 assert_eq!(ctx.job_status(job1), Some(JobStatus::Completed));
444 assert_eq!(ctx.job_status(job2), Some(JobStatus::Completed));
445 }
446}