1use std::sync::RwLock;
7
8use chrono::{DateTime, Utc};
9use uuid::Uuid;
10
11use crate::error::{ForgeError, Result};
12use crate::job::JobStatus;
13use crate::workflow::WorkflowStatus;
14
15#[derive(Debug, Clone)]
17pub struct DispatchedJob {
18 pub id: Uuid,
20 pub job_type: String,
22 pub args: serde_json::Value,
24 pub dispatched_at: DateTime<Utc>,
26 pub status: JobStatus,
28 pub cancel_reason: Option<String>,
30}
31
32#[derive(Debug, Clone)]
34pub struct StartedWorkflow {
35 pub run_id: Uuid,
37 pub workflow_name: String,
39 pub input: serde_json::Value,
41 pub started_at: DateTime<Utc>,
43 pub status: WorkflowStatus,
45}
46
47pub struct MockJobDispatch {
63 jobs: RwLock<Vec<DispatchedJob>>,
64}
65
66impl MockJobDispatch {
67 pub fn new() -> Self {
69 Self {
70 jobs: RwLock::new(Vec::new()),
71 }
72 }
73
74 pub async fn dispatch<T: serde::Serialize>(&self, job_type: &str, args: T) -> Result<Uuid> {
76 let id = Uuid::new_v4();
77 let args_json =
78 serde_json::to_value(args).map_err(|e| ForgeError::Serialization(e.to_string()))?;
79
80 let job = DispatchedJob {
81 id,
82 job_type: job_type.to_string(),
83 args: args_json,
84 dispatched_at: Utc::now(),
85 status: JobStatus::Pending,
86 cancel_reason: None,
87 };
88
89 self.jobs.write().unwrap().push(job);
90 Ok(id)
91 }
92
93 pub fn dispatched_jobs(&self) -> Vec<DispatchedJob> {
95 self.jobs.read().unwrap().clone()
96 }
97
98 pub fn jobs_of_type(&self, job_type: &str) -> Vec<DispatchedJob> {
100 self.jobs
101 .read()
102 .unwrap()
103 .iter()
104 .filter(|j| j.job_type == job_type)
105 .cloned()
106 .collect()
107 }
108
109 pub fn assert_dispatched(&self, job_type: &str) {
111 let jobs = self.jobs.read().unwrap();
112 let found = jobs.iter().any(|j| j.job_type == job_type);
113 assert!(
114 found,
115 "Expected job '{}' to be dispatched, but it wasn't. Dispatched jobs: {:?}",
116 job_type,
117 jobs.iter().map(|j| &j.job_type).collect::<Vec<_>>()
118 );
119 }
120
121 pub fn assert_dispatched_with<F>(&self, job_type: &str, predicate: F)
123 where
124 F: Fn(&serde_json::Value) -> bool,
125 {
126 let jobs = self.jobs.read().unwrap();
127 let found = jobs
128 .iter()
129 .any(|j| j.job_type == job_type && predicate(&j.args));
130 assert!(
131 found,
132 "Expected job '{}' with matching args to be dispatched",
133 job_type
134 );
135 }
136
137 pub fn assert_not_dispatched(&self, job_type: &str) {
139 let jobs = self.jobs.read().unwrap();
140 let found = jobs.iter().any(|j| j.job_type == job_type);
141 assert!(
142 !found,
143 "Expected job '{}' NOT to be dispatched, but it was",
144 job_type
145 );
146 }
147
148 pub fn assert_dispatch_count(&self, job_type: &str, expected: usize) {
150 let jobs = self.jobs.read().unwrap();
151 let count = jobs.iter().filter(|j| j.job_type == job_type).count();
152 assert_eq!(
153 count, expected,
154 "Expected {} dispatches of '{}', but found {}",
155 expected, job_type, count
156 );
157 }
158
159 pub fn clear(&self) {
161 self.jobs.write().unwrap().clear();
162 }
163
164 pub fn complete_job(&self, job_id: Uuid) {
166 let mut jobs = self.jobs.write().unwrap();
167 if let Some(job) = jobs.iter_mut().find(|j| j.id == job_id) {
168 job.status = JobStatus::Completed;
169 }
170 }
171
172 pub fn fail_job(&self, job_id: Uuid) {
174 let mut jobs = self.jobs.write().unwrap();
175 if let Some(job) = jobs.iter_mut().find(|j| j.id == job_id) {
176 job.status = JobStatus::Failed;
177 }
178 }
179
180 pub fn cancel_job(&self, job_id: Uuid, reason: Option<String>) {
182 let mut jobs = self.jobs.write().unwrap();
183 if let Some(job) = jobs.iter_mut().find(|j| j.id == job_id) {
184 job.status = JobStatus::Cancelled;
185 job.cancel_reason = reason;
186 }
187 }
188}
189
190impl Default for MockJobDispatch {
191 fn default() -> Self {
192 Self::new()
193 }
194}
195
196impl crate::function::JobDispatch for MockJobDispatch {
197 fn get_info(&self, _job_type: &str) -> Option<crate::job::JobInfo> {
198 None
199 }
200
201 fn dispatch_by_name(
202 &self,
203 job_type: &str,
204 args: serde_json::Value,
205 ) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<Uuid>> + Send + '_>> {
206 let job_type = job_type.to_string();
207 Box::pin(async move { self.dispatch(&job_type, args).await })
208 }
209
210 fn cancel(
211 &self,
212 job_id: Uuid,
213 reason: Option<String>,
214 ) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<bool>> + Send + '_>> {
215 Box::pin(async move {
216 self.cancel_job(job_id, reason);
217 Ok(true)
218 })
219 }
220}
221
222pub struct MockWorkflowDispatch {
235 workflows: RwLock<Vec<StartedWorkflow>>,
236}
237
238impl MockWorkflowDispatch {
239 pub fn new() -> Self {
241 Self {
242 workflows: RwLock::new(Vec::new()),
243 }
244 }
245
246 pub async fn start<T: serde::Serialize>(&self, workflow_name: &str, input: T) -> Result<Uuid> {
248 let run_id = Uuid::new_v4();
249 let input_json =
250 serde_json::to_value(input).map_err(|e| ForgeError::Serialization(e.to_string()))?;
251
252 let workflow = StartedWorkflow {
253 run_id,
254 workflow_name: workflow_name.to_string(),
255 input: input_json,
256 started_at: Utc::now(),
257 status: WorkflowStatus::Created,
258 };
259
260 self.workflows.write().unwrap().push(workflow);
261 Ok(run_id)
262 }
263
264 pub fn started_workflows(&self) -> Vec<StartedWorkflow> {
266 self.workflows.read().unwrap().clone()
267 }
268
269 pub fn workflows_named(&self, name: &str) -> Vec<StartedWorkflow> {
271 self.workflows
272 .read()
273 .unwrap()
274 .iter()
275 .filter(|w| w.workflow_name == name)
276 .cloned()
277 .collect()
278 }
279
280 pub fn assert_started(&self, workflow_name: &str) {
282 let workflows = self.workflows.read().unwrap();
283 let found = workflows.iter().any(|w| w.workflow_name == workflow_name);
284 assert!(
285 found,
286 "Expected workflow '{}' to be started, but it wasn't. Started workflows: {:?}",
287 workflow_name,
288 workflows
289 .iter()
290 .map(|w| &w.workflow_name)
291 .collect::<Vec<_>>()
292 );
293 }
294
295 pub fn assert_started_with<F>(&self, workflow_name: &str, predicate: F)
297 where
298 F: Fn(&serde_json::Value) -> bool,
299 {
300 let workflows = self.workflows.read().unwrap();
301 let found = workflows
302 .iter()
303 .any(|w| w.workflow_name == workflow_name && predicate(&w.input));
304 assert!(
305 found,
306 "Expected workflow '{}' with matching input to be started",
307 workflow_name
308 );
309 }
310
311 pub fn assert_not_started(&self, workflow_name: &str) {
313 let workflows = self.workflows.read().unwrap();
314 let found = workflows.iter().any(|w| w.workflow_name == workflow_name);
315 assert!(
316 !found,
317 "Expected workflow '{}' NOT to be started, but it was",
318 workflow_name
319 );
320 }
321
322 pub fn assert_start_count(&self, workflow_name: &str, expected: usize) {
324 let workflows = self.workflows.read().unwrap();
325 let count = workflows
326 .iter()
327 .filter(|w| w.workflow_name == workflow_name)
328 .count();
329 assert_eq!(
330 count, expected,
331 "Expected {} starts of '{}', but found {}",
332 expected, workflow_name, count
333 );
334 }
335
336 pub fn clear(&self) {
338 self.workflows.write().unwrap().clear();
339 }
340
341 pub fn complete_workflow(&self, run_id: Uuid) {
343 let mut workflows = self.workflows.write().unwrap();
344 if let Some(workflow) = workflows.iter_mut().find(|w| w.run_id == run_id) {
345 workflow.status = WorkflowStatus::Completed;
346 }
347 }
348
349 pub fn fail_workflow(&self, run_id: Uuid) {
351 let mut workflows = self.workflows.write().unwrap();
352 if let Some(workflow) = workflows.iter_mut().find(|w| w.run_id == run_id) {
353 workflow.status = WorkflowStatus::Failed;
354 }
355 }
356}
357
358impl Default for MockWorkflowDispatch {
359 fn default() -> Self {
360 Self::new()
361 }
362}
363
364#[cfg(test)]
365mod tests {
366 use super::*;
367
368 #[tokio::test]
369 async fn test_mock_job_dispatch() {
370 let dispatch = MockJobDispatch::new();
371
372 let job_id = dispatch
373 .dispatch("send_email", serde_json::json!({"to": "test@example.com"}))
374 .await
375 .unwrap();
376
377 assert!(!job_id.is_nil());
378 dispatch.assert_dispatched("send_email");
379 dispatch.assert_not_dispatched("other_job");
380 }
381
382 #[tokio::test]
383 async fn test_job_dispatch_with_args() {
384 let dispatch = MockJobDispatch::new();
385
386 dispatch
387 .dispatch("send_email", serde_json::json!({"to": "test@example.com"}))
388 .await
389 .unwrap();
390
391 dispatch.assert_dispatched_with("send_email", |args| args["to"] == "test@example.com");
392 }
393
394 #[tokio::test]
395 async fn test_job_dispatch_count() {
396 let dispatch = MockJobDispatch::new();
397
398 dispatch
399 .dispatch("job_a", serde_json::json!({}))
400 .await
401 .unwrap();
402 dispatch
403 .dispatch("job_b", serde_json::json!({}))
404 .await
405 .unwrap();
406 dispatch
407 .dispatch("job_a", serde_json::json!({}))
408 .await
409 .unwrap();
410
411 dispatch.assert_dispatch_count("job_a", 2);
412 dispatch.assert_dispatch_count("job_b", 1);
413 }
414
415 #[tokio::test]
416 async fn test_mock_workflow_dispatch() {
417 let dispatch = MockWorkflowDispatch::new();
418
419 let run_id = dispatch
420 .start("onboarding", serde_json::json!({"user_id": "123"}))
421 .await
422 .unwrap();
423
424 assert!(!run_id.is_nil());
425 dispatch.assert_started("onboarding");
426 dispatch.assert_not_started("other_workflow");
427 }
428
429 #[tokio::test]
430 async fn test_workflow_dispatch_with_input() {
431 let dispatch = MockWorkflowDispatch::new();
432
433 dispatch
434 .start("onboarding", serde_json::json!({"user_id": "123"}))
435 .await
436 .unwrap();
437
438 dispatch.assert_started_with("onboarding", |input| input["user_id"] == "123");
439 }
440
441 #[tokio::test]
442 async fn test_clear() {
443 let dispatch = MockJobDispatch::new();
444 dispatch
445 .dispatch("test", serde_json::json!({}))
446 .await
447 .unwrap();
448
449 assert_eq!(dispatch.dispatched_jobs().len(), 1);
450 dispatch.clear();
451 assert_eq!(dispatch.dispatched_jobs().len(), 0);
452 }
453
454 #[tokio::test]
455 async fn test_job_status_simulation() {
456 let dispatch = MockJobDispatch::new();
457 let job_id = dispatch
458 .dispatch("test", serde_json::json!({}))
459 .await
460 .unwrap();
461
462 let jobs = dispatch.dispatched_jobs();
463 assert_eq!(jobs[0].status, JobStatus::Pending);
464
465 dispatch.complete_job(job_id);
466
467 let jobs = dispatch.dispatched_jobs();
468 assert_eq!(jobs[0].status, JobStatus::Completed);
469 }
470
471 #[tokio::test]
472 async fn test_job_cancel_simulation() {
473 let dispatch = MockJobDispatch::new();
474 let job_id = dispatch
475 .dispatch("test", serde_json::json!({}))
476 .await
477 .unwrap();
478
479 dispatch.cancel_job(job_id, Some("user request".to_string()));
480
481 let jobs = dispatch.dispatched_jobs();
482 assert_eq!(jobs[0].status, JobStatus::Cancelled);
483 assert_eq!(jobs[0].cancel_reason.as_deref(), Some("user request"));
484 }
485}