1#![allow(clippy::unwrap_used, clippy::indexing_slicing)]
7
8use std::sync::RwLock;
9
10use chrono::{DateTime, Utc};
11use uuid::Uuid;
12
13use crate::error::{ForgeError, Result};
14use crate::job::JobStatus;
15use crate::workflow::WorkflowStatus;
16
17#[derive(Debug, Clone)]
19pub struct DispatchedJob {
20 pub id: Uuid,
22 pub job_type: String,
24 pub args: serde_json::Value,
26 pub dispatched_at: DateTime<Utc>,
28 pub status: JobStatus,
30 pub cancel_reason: Option<String>,
32}
33
34#[derive(Debug, Clone)]
36pub struct StartedWorkflow {
37 pub run_id: Uuid,
39 pub workflow_name: String,
41 pub input: serde_json::Value,
43 pub started_at: DateTime<Utc>,
45 pub status: WorkflowStatus,
47}
48
49pub struct MockJobDispatch {
65 jobs: RwLock<Vec<DispatchedJob>>,
66}
67
68impl MockJobDispatch {
69 pub fn new() -> Self {
71 Self {
72 jobs: RwLock::new(Vec::new()),
73 }
74 }
75
76 pub async fn dispatch<T: serde::Serialize>(&self, job_type: &str, args: T) -> Result<Uuid> {
78 let id = Uuid::new_v4();
79 let args_json =
80 serde_json::to_value(args).map_err(|e| ForgeError::Serialization(e.to_string()))?;
81
82 let job = DispatchedJob {
83 id,
84 job_type: job_type.to_string(),
85 args: args_json,
86 dispatched_at: Utc::now(),
87 status: JobStatus::Pending,
88 cancel_reason: None,
89 };
90
91 self.jobs.write().expect("jobs lock poisoned").push(job);
92 Ok(id)
93 }
94
95 pub fn dispatched_jobs(&self) -> Vec<DispatchedJob> {
97 self.jobs.read().expect("jobs lock poisoned").clone()
98 }
99
100 pub fn jobs_of_type(&self, job_type: &str) -> Vec<DispatchedJob> {
102 self.jobs
103 .read()
104 .expect("jobs lock poisoned")
105 .iter()
106 .filter(|j| j.job_type == job_type)
107 .cloned()
108 .collect()
109 }
110
111 pub fn assert_dispatched(&self, job_type: &str) {
113 let jobs = self.jobs.read().expect("jobs lock poisoned");
114 let found = jobs.iter().any(|j| j.job_type == job_type);
115 assert!(
116 found,
117 "Expected job '{}' to be dispatched, but it wasn't. Dispatched jobs: {:?}",
118 job_type,
119 jobs.iter().map(|j| &j.job_type).collect::<Vec<_>>()
120 );
121 }
122
123 pub fn assert_dispatched_with<F>(&self, job_type: &str, predicate: F)
125 where
126 F: Fn(&serde_json::Value) -> bool,
127 {
128 let jobs = self.jobs.read().expect("jobs lock poisoned");
129 let found = jobs
130 .iter()
131 .any(|j| j.job_type == job_type && predicate(&j.args));
132 assert!(
133 found,
134 "Expected job '{}' with matching args to be dispatched",
135 job_type
136 );
137 }
138
139 pub fn assert_not_dispatched(&self, job_type: &str) {
141 let jobs = self.jobs.read().expect("jobs lock poisoned");
142 let found = jobs.iter().any(|j| j.job_type == job_type);
143 assert!(
144 !found,
145 "Expected job '{}' NOT to be dispatched, but it was",
146 job_type
147 );
148 }
149
150 pub fn assert_dispatch_count(&self, job_type: &str, expected: usize) {
152 let jobs = self.jobs.read().expect("jobs lock poisoned");
153 let count = jobs.iter().filter(|j| j.job_type == job_type).count();
154 assert_eq!(
155 count, expected,
156 "Expected {} dispatches of '{}', but found {}",
157 expected, job_type, count
158 );
159 }
160
161 pub fn clear(&self) {
163 self.jobs.write().expect("jobs lock poisoned").clear();
164 }
165
166 pub fn complete_job(&self, job_id: Uuid) {
168 let mut jobs = self.jobs.write().expect("jobs lock poisoned");
169 if let Some(job) = jobs.iter_mut().find(|j| j.id == job_id) {
170 job.status = JobStatus::Completed;
171 }
172 }
173
174 pub fn fail_job(&self, job_id: Uuid) {
176 let mut jobs = self.jobs.write().expect("jobs lock poisoned");
177 if let Some(job) = jobs.iter_mut().find(|j| j.id == job_id) {
178 job.status = JobStatus::Failed;
179 }
180 }
181
182 pub fn cancel_job(&self, job_id: Uuid, reason: Option<String>) {
184 let mut jobs = self.jobs.write().expect("jobs lock poisoned");
185 if let Some(job) = jobs.iter_mut().find(|j| j.id == job_id) {
186 job.status = JobStatus::Cancelled;
187 job.cancel_reason = reason;
188 }
189 }
190}
191
192impl Default for MockJobDispatch {
193 fn default() -> Self {
194 Self::new()
195 }
196}
197
198impl crate::function::JobDispatch for MockJobDispatch {
199 fn get_info(&self, _job_type: &str) -> Option<crate::job::JobInfo> {
200 None
201 }
202
203 fn dispatch_by_name(
204 &self,
205 job_type: &str,
206 args: serde_json::Value,
207 _owner_subject: Option<String>,
208 ) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<Uuid>> + Send + '_>> {
209 let job_type = job_type.to_string();
210 Box::pin(async move { self.dispatch(&job_type, args).await })
211 }
212
213 fn cancel(
214 &self,
215 job_id: Uuid,
216 reason: Option<String>,
217 ) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<bool>> + Send + '_>> {
218 Box::pin(async move {
219 self.cancel_job(job_id, reason);
220 Ok(true)
221 })
222 }
223}
224
225pub struct MockWorkflowDispatch {
238 workflows: RwLock<Vec<StartedWorkflow>>,
239}
240
241impl MockWorkflowDispatch {
242 pub fn new() -> Self {
244 Self {
245 workflows: RwLock::new(Vec::new()),
246 }
247 }
248
249 pub async fn start<T: serde::Serialize>(&self, workflow_name: &str, input: T) -> Result<Uuid> {
251 let run_id = Uuid::new_v4();
252 let input_json =
253 serde_json::to_value(input).map_err(|e| ForgeError::Serialization(e.to_string()))?;
254
255 let workflow = StartedWorkflow {
256 run_id,
257 workflow_name: workflow_name.to_string(),
258 input: input_json,
259 started_at: Utc::now(),
260 status: WorkflowStatus::Created,
261 };
262
263 self.workflows
264 .write()
265 .expect("workflows lock poisoned")
266 .push(workflow);
267 Ok(run_id)
268 }
269
270 pub fn started_workflows(&self) -> Vec<StartedWorkflow> {
272 self.workflows
273 .read()
274 .expect("workflows lock poisoned")
275 .clone()
276 }
277
278 pub fn workflows_named(&self, name: &str) -> Vec<StartedWorkflow> {
280 self.workflows
281 .read()
282 .expect("workflows lock poisoned")
283 .iter()
284 .filter(|w| w.workflow_name == name)
285 .cloned()
286 .collect()
287 }
288
289 pub fn assert_started(&self, workflow_name: &str) {
291 let workflows = self.workflows.read().expect("workflows lock poisoned");
292 let found = workflows.iter().any(|w| w.workflow_name == workflow_name);
293 assert!(
294 found,
295 "Expected workflow '{}' to be started, but it wasn't. Started workflows: {:?}",
296 workflow_name,
297 workflows
298 .iter()
299 .map(|w| &w.workflow_name)
300 .collect::<Vec<_>>()
301 );
302 }
303
304 pub fn assert_started_with<F>(&self, workflow_name: &str, predicate: F)
306 where
307 F: Fn(&serde_json::Value) -> bool,
308 {
309 let workflows = self.workflows.read().expect("workflows lock poisoned");
310 let found = workflows
311 .iter()
312 .any(|w| w.workflow_name == workflow_name && predicate(&w.input));
313 assert!(
314 found,
315 "Expected workflow '{}' with matching input to be started",
316 workflow_name
317 );
318 }
319
320 pub fn assert_not_started(&self, workflow_name: &str) {
322 let workflows = self.workflows.read().expect("workflows lock poisoned");
323 let found = workflows.iter().any(|w| w.workflow_name == workflow_name);
324 assert!(
325 !found,
326 "Expected workflow '{}' NOT to be started, but it was",
327 workflow_name
328 );
329 }
330
331 pub fn assert_start_count(&self, workflow_name: &str, expected: usize) {
333 let workflows = self.workflows.read().expect("workflows lock poisoned");
334 let count = workflows
335 .iter()
336 .filter(|w| w.workflow_name == workflow_name)
337 .count();
338 assert_eq!(
339 count, expected,
340 "Expected {} starts of '{}', but found {}",
341 expected, workflow_name, count
342 );
343 }
344
345 pub fn clear(&self) {
347 self.workflows
348 .write()
349 .expect("workflows lock poisoned")
350 .clear();
351 }
352
353 pub fn complete_workflow(&self, run_id: Uuid) {
355 let mut workflows = self.workflows.write().expect("workflows lock poisoned");
356 if let Some(workflow) = workflows.iter_mut().find(|w| w.run_id == run_id) {
357 workflow.status = WorkflowStatus::Completed;
358 }
359 }
360
361 pub fn fail_workflow(&self, run_id: Uuid) {
363 let mut workflows = self.workflows.write().expect("workflows lock poisoned");
364 if let Some(workflow) = workflows.iter_mut().find(|w| w.run_id == run_id) {
365 workflow.status = WorkflowStatus::Failed;
366 }
367 }
368}
369
370impl Default for MockWorkflowDispatch {
371 fn default() -> Self {
372 Self::new()
373 }
374}
375
376#[cfg(test)]
377mod tests {
378 use super::*;
379
380 #[tokio::test]
381 async fn test_mock_job_dispatch() {
382 let dispatch = MockJobDispatch::new();
383
384 let job_id = dispatch
385 .dispatch("send_email", serde_json::json!({"to": "test@example.com"}))
386 .await
387 .unwrap();
388
389 assert!(!job_id.is_nil());
390 dispatch.assert_dispatched("send_email");
391 dispatch.assert_not_dispatched("other_job");
392 }
393
394 #[tokio::test]
395 async fn test_job_dispatch_with_args() {
396 let dispatch = MockJobDispatch::new();
397
398 dispatch
399 .dispatch("send_email", serde_json::json!({"to": "test@example.com"}))
400 .await
401 .unwrap();
402
403 dispatch.assert_dispatched_with("send_email", |args| args["to"] == "test@example.com");
404 }
405
406 #[tokio::test]
407 async fn test_job_dispatch_count() {
408 let dispatch = MockJobDispatch::new();
409
410 dispatch
411 .dispatch("job_a", serde_json::json!({}))
412 .await
413 .unwrap();
414 dispatch
415 .dispatch("job_b", serde_json::json!({}))
416 .await
417 .unwrap();
418 dispatch
419 .dispatch("job_a", serde_json::json!({}))
420 .await
421 .unwrap();
422
423 dispatch.assert_dispatch_count("job_a", 2);
424 dispatch.assert_dispatch_count("job_b", 1);
425 }
426
427 #[tokio::test]
428 async fn test_mock_workflow_dispatch() {
429 let dispatch = MockWorkflowDispatch::new();
430
431 let run_id = dispatch
432 .start("onboarding", serde_json::json!({"user_id": "123"}))
433 .await
434 .unwrap();
435
436 assert!(!run_id.is_nil());
437 dispatch.assert_started("onboarding");
438 dispatch.assert_not_started("other_workflow");
439 }
440
441 #[tokio::test]
442 async fn test_workflow_dispatch_with_input() {
443 let dispatch = MockWorkflowDispatch::new();
444
445 dispatch
446 .start("onboarding", serde_json::json!({"user_id": "123"}))
447 .await
448 .unwrap();
449
450 dispatch.assert_started_with("onboarding", |input| input["user_id"] == "123");
451 }
452
453 #[tokio::test]
454 async fn test_clear() {
455 let dispatch = MockJobDispatch::new();
456 dispatch
457 .dispatch("test", serde_json::json!({}))
458 .await
459 .unwrap();
460
461 assert_eq!(dispatch.dispatched_jobs().len(), 1);
462 dispatch.clear();
463 assert_eq!(dispatch.dispatched_jobs().len(), 0);
464 }
465
466 #[tokio::test]
467 async fn test_job_status_simulation() {
468 let dispatch = MockJobDispatch::new();
469 let job_id = dispatch
470 .dispatch("test", serde_json::json!({}))
471 .await
472 .unwrap();
473
474 let jobs = dispatch.dispatched_jobs();
475 assert_eq!(jobs[0].status, JobStatus::Pending);
476
477 dispatch.complete_job(job_id);
478
479 let jobs = dispatch.dispatched_jobs();
480 assert_eq!(jobs[0].status, JobStatus::Completed);
481 }
482
483 #[tokio::test]
484 async fn test_job_cancel_simulation() {
485 let dispatch = MockJobDispatch::new();
486 let job_id = dispatch
487 .dispatch("test", serde_json::json!({}))
488 .await
489 .unwrap();
490
491 dispatch.cancel_job(job_id, Some("user request".to_string()));
492
493 let jobs = dispatch.dispatched_jobs();
494 assert_eq!(jobs[0].status, JobStatus::Cancelled);
495 assert_eq!(jobs[0].cancel_reason.as_deref(), Some("user request"));
496 }
497}