1use std::sync::RwLock;
7
8use chrono::{DateTime, Utc};
9use uuid::Uuid;
10
11use crate::error::{ForgeError, Result};
12use crate::job::JobStatus;
13use crate::workflow::WorkflowStatus;
14
15#[derive(Debug, Clone)]
17pub struct DispatchedJob {
18 pub id: Uuid,
20 pub job_type: String,
22 pub args: serde_json::Value,
24 pub dispatched_at: DateTime<Utc>,
26 pub status: JobStatus,
28}
29
30#[derive(Debug, Clone)]
32pub struct StartedWorkflow {
33 pub run_id: Uuid,
35 pub workflow_name: String,
37 pub input: serde_json::Value,
39 pub started_at: DateTime<Utc>,
41 pub status: WorkflowStatus,
43}
44
45pub struct MockJobDispatch {
61 jobs: RwLock<Vec<DispatchedJob>>,
62}
63
64impl MockJobDispatch {
65 pub fn new() -> Self {
67 Self {
68 jobs: RwLock::new(Vec::new()),
69 }
70 }
71
72 pub async fn dispatch<T: serde::Serialize>(&self, job_type: &str, args: T) -> Result<Uuid> {
74 let id = Uuid::new_v4();
75 let args_json =
76 serde_json::to_value(args).map_err(|e| ForgeError::Serialization(e.to_string()))?;
77
78 let job = DispatchedJob {
79 id,
80 job_type: job_type.to_string(),
81 args: args_json,
82 dispatched_at: Utc::now(),
83 status: JobStatus::Pending,
84 };
85
86 self.jobs.write().unwrap().push(job);
87 Ok(id)
88 }
89
90 pub fn dispatched_jobs(&self) -> Vec<DispatchedJob> {
92 self.jobs.read().unwrap().clone()
93 }
94
95 pub fn jobs_of_type(&self, job_type: &str) -> Vec<DispatchedJob> {
97 self.jobs
98 .read()
99 .unwrap()
100 .iter()
101 .filter(|j| j.job_type == job_type)
102 .cloned()
103 .collect()
104 }
105
106 pub fn assert_dispatched(&self, job_type: &str) {
108 let jobs = self.jobs.read().unwrap();
109 let found = jobs.iter().any(|j| j.job_type == job_type);
110 assert!(
111 found,
112 "Expected job '{}' to be dispatched, but it wasn't. Dispatched jobs: {:?}",
113 job_type,
114 jobs.iter().map(|j| &j.job_type).collect::<Vec<_>>()
115 );
116 }
117
118 pub fn assert_dispatched_with<F>(&self, job_type: &str, predicate: F)
120 where
121 F: Fn(&serde_json::Value) -> bool,
122 {
123 let jobs = self.jobs.read().unwrap();
124 let found = jobs
125 .iter()
126 .any(|j| j.job_type == job_type && predicate(&j.args));
127 assert!(
128 found,
129 "Expected job '{}' with matching args to be dispatched",
130 job_type
131 );
132 }
133
134 pub fn assert_not_dispatched(&self, job_type: &str) {
136 let jobs = self.jobs.read().unwrap();
137 let found = jobs.iter().any(|j| j.job_type == job_type);
138 assert!(
139 !found,
140 "Expected job '{}' NOT to be dispatched, but it was",
141 job_type
142 );
143 }
144
145 pub fn assert_dispatch_count(&self, job_type: &str, expected: usize) {
147 let jobs = self.jobs.read().unwrap();
148 let count = jobs.iter().filter(|j| j.job_type == job_type).count();
149 assert_eq!(
150 count, expected,
151 "Expected {} dispatches of '{}', but found {}",
152 expected, job_type, count
153 );
154 }
155
156 pub fn clear(&self) {
158 self.jobs.write().unwrap().clear();
159 }
160
161 pub fn complete_job(&self, job_id: Uuid) {
163 let mut jobs = self.jobs.write().unwrap();
164 if let Some(job) = jobs.iter_mut().find(|j| j.id == job_id) {
165 job.status = JobStatus::Completed;
166 }
167 }
168
169 pub fn fail_job(&self, job_id: Uuid) {
171 let mut jobs = self.jobs.write().unwrap();
172 if let Some(job) = jobs.iter_mut().find(|j| j.id == job_id) {
173 job.status = JobStatus::Failed;
174 }
175 }
176}
177
178impl Default for MockJobDispatch {
179 fn default() -> Self {
180 Self::new()
181 }
182}
183
184pub struct MockWorkflowDispatch {
197 workflows: RwLock<Vec<StartedWorkflow>>,
198}
199
200impl MockWorkflowDispatch {
201 pub fn new() -> Self {
203 Self {
204 workflows: RwLock::new(Vec::new()),
205 }
206 }
207
208 pub async fn start<T: serde::Serialize>(&self, workflow_name: &str, input: T) -> Result<Uuid> {
210 let run_id = Uuid::new_v4();
211 let input_json =
212 serde_json::to_value(input).map_err(|e| ForgeError::Serialization(e.to_string()))?;
213
214 let workflow = StartedWorkflow {
215 run_id,
216 workflow_name: workflow_name.to_string(),
217 input: input_json,
218 started_at: Utc::now(),
219 status: WorkflowStatus::Created,
220 };
221
222 self.workflows.write().unwrap().push(workflow);
223 Ok(run_id)
224 }
225
226 pub fn started_workflows(&self) -> Vec<StartedWorkflow> {
228 self.workflows.read().unwrap().clone()
229 }
230
231 pub fn workflows_named(&self, name: &str) -> Vec<StartedWorkflow> {
233 self.workflows
234 .read()
235 .unwrap()
236 .iter()
237 .filter(|w| w.workflow_name == name)
238 .cloned()
239 .collect()
240 }
241
242 pub fn assert_started(&self, workflow_name: &str) {
244 let workflows = self.workflows.read().unwrap();
245 let found = workflows.iter().any(|w| w.workflow_name == workflow_name);
246 assert!(
247 found,
248 "Expected workflow '{}' to be started, but it wasn't. Started workflows: {:?}",
249 workflow_name,
250 workflows
251 .iter()
252 .map(|w| &w.workflow_name)
253 .collect::<Vec<_>>()
254 );
255 }
256
257 pub fn assert_started_with<F>(&self, workflow_name: &str, predicate: F)
259 where
260 F: Fn(&serde_json::Value) -> bool,
261 {
262 let workflows = self.workflows.read().unwrap();
263 let found = workflows
264 .iter()
265 .any(|w| w.workflow_name == workflow_name && predicate(&w.input));
266 assert!(
267 found,
268 "Expected workflow '{}' with matching input to be started",
269 workflow_name
270 );
271 }
272
273 pub fn assert_not_started(&self, workflow_name: &str) {
275 let workflows = self.workflows.read().unwrap();
276 let found = workflows.iter().any(|w| w.workflow_name == workflow_name);
277 assert!(
278 !found,
279 "Expected workflow '{}' NOT to be started, but it was",
280 workflow_name
281 );
282 }
283
284 pub fn assert_start_count(&self, workflow_name: &str, expected: usize) {
286 let workflows = self.workflows.read().unwrap();
287 let count = workflows
288 .iter()
289 .filter(|w| w.workflow_name == workflow_name)
290 .count();
291 assert_eq!(
292 count, expected,
293 "Expected {} starts of '{}', but found {}",
294 expected, workflow_name, count
295 );
296 }
297
298 pub fn clear(&self) {
300 self.workflows.write().unwrap().clear();
301 }
302
303 pub fn complete_workflow(&self, run_id: Uuid) {
305 let mut workflows = self.workflows.write().unwrap();
306 if let Some(workflow) = workflows.iter_mut().find(|w| w.run_id == run_id) {
307 workflow.status = WorkflowStatus::Completed;
308 }
309 }
310
311 pub fn fail_workflow(&self, run_id: Uuid) {
313 let mut workflows = self.workflows.write().unwrap();
314 if let Some(workflow) = workflows.iter_mut().find(|w| w.run_id == run_id) {
315 workflow.status = WorkflowStatus::Failed;
316 }
317 }
318}
319
320impl Default for MockWorkflowDispatch {
321 fn default() -> Self {
322 Self::new()
323 }
324}
325
326#[cfg(test)]
327mod tests {
328 use super::*;
329
330 #[tokio::test]
331 async fn test_mock_job_dispatch() {
332 let dispatch = MockJobDispatch::new();
333
334 let job_id = dispatch
335 .dispatch("send_email", serde_json::json!({"to": "test@example.com"}))
336 .await
337 .unwrap();
338
339 assert!(!job_id.is_nil());
340 dispatch.assert_dispatched("send_email");
341 dispatch.assert_not_dispatched("other_job");
342 }
343
344 #[tokio::test]
345 async fn test_job_dispatch_with_args() {
346 let dispatch = MockJobDispatch::new();
347
348 dispatch
349 .dispatch("send_email", serde_json::json!({"to": "test@example.com"}))
350 .await
351 .unwrap();
352
353 dispatch.assert_dispatched_with("send_email", |args| args["to"] == "test@example.com");
354 }
355
356 #[tokio::test]
357 async fn test_job_dispatch_count() {
358 let dispatch = MockJobDispatch::new();
359
360 dispatch
361 .dispatch("job_a", serde_json::json!({}))
362 .await
363 .unwrap();
364 dispatch
365 .dispatch("job_b", serde_json::json!({}))
366 .await
367 .unwrap();
368 dispatch
369 .dispatch("job_a", serde_json::json!({}))
370 .await
371 .unwrap();
372
373 dispatch.assert_dispatch_count("job_a", 2);
374 dispatch.assert_dispatch_count("job_b", 1);
375 }
376
377 #[tokio::test]
378 async fn test_mock_workflow_dispatch() {
379 let dispatch = MockWorkflowDispatch::new();
380
381 let run_id = dispatch
382 .start("onboarding", serde_json::json!({"user_id": "123"}))
383 .await
384 .unwrap();
385
386 assert!(!run_id.is_nil());
387 dispatch.assert_started("onboarding");
388 dispatch.assert_not_started("other_workflow");
389 }
390
391 #[tokio::test]
392 async fn test_workflow_dispatch_with_input() {
393 let dispatch = MockWorkflowDispatch::new();
394
395 dispatch
396 .start("onboarding", serde_json::json!({"user_id": "123"}))
397 .await
398 .unwrap();
399
400 dispatch.assert_started_with("onboarding", |input| input["user_id"] == "123");
401 }
402
403 #[tokio::test]
404 async fn test_clear() {
405 let dispatch = MockJobDispatch::new();
406 dispatch
407 .dispatch("test", serde_json::json!({}))
408 .await
409 .unwrap();
410
411 assert_eq!(dispatch.dispatched_jobs().len(), 1);
412 dispatch.clear();
413 assert_eq!(dispatch.dispatched_jobs().len(), 0);
414 }
415
416 #[tokio::test]
417 async fn test_job_status_simulation() {
418 let dispatch = MockJobDispatch::new();
419 let job_id = dispatch
420 .dispatch("test", serde_json::json!({}))
421 .await
422 .unwrap();
423
424 let jobs = dispatch.dispatched_jobs();
425 assert_eq!(jobs[0].status, JobStatus::Pending);
426
427 dispatch.complete_job(job_id);
428
429 let jobs = dispatch.dispatched_jobs();
430 assert_eq!(jobs[0].status, JobStatus::Completed);
431 }
432}