Skip to main content

forge_core/testing/
mock_dispatch.rs

1//! Mock dispatchers for testing job and workflow dispatch.
2//!
3//! Provides mock implementations that record dispatched jobs and workflows
4//! for verification in tests.
5
6#![allow(clippy::unwrap_used, clippy::indexing_slicing)]
7
8use std::sync::RwLock;
9
10use chrono::{DateTime, Utc};
11use uuid::Uuid;
12
13use crate::error::{ForgeError, Result};
14use crate::job::JobStatus;
15use crate::workflow::WorkflowStatus;
16
17/// Record of a dispatched job.
18#[derive(Debug, Clone)]
19pub struct DispatchedJob {
20    /// Job ID.
21    pub id: Uuid,
22    /// Job type name.
23    pub job_type: String,
24    /// Job arguments (serialized).
25    pub args: serde_json::Value,
26    /// When the job was dispatched.
27    pub dispatched_at: DateTime<Utc>,
28    /// Current status (for test simulation).
29    pub status: JobStatus,
30    /// Cancellation reason (if any).
31    pub cancel_reason: Option<String>,
32}
33
34/// Record of a started workflow.
35#[derive(Debug, Clone)]
36pub struct StartedWorkflow {
37    /// Run ID.
38    pub run_id: Uuid,
39    /// Workflow name.
40    pub workflow_name: String,
41    /// Input (serialized).
42    pub input: serde_json::Value,
43    /// When the workflow was started.
44    pub started_at: DateTime<Utc>,
45    /// Current status.
46    pub status: WorkflowStatus,
47}
48
49/// Mock job dispatcher for testing.
50///
51/// Records dispatched jobs for later verification.
52///
53/// # Example
54///
55/// ```ignore
56/// let dispatch = MockJobDispatch::new();
57/// dispatch.dispatch("send_email", json!({"to": "test@example.com"})).await?;
58///
59/// dispatch.assert_dispatched("send_email");
60/// dispatch.assert_dispatched_with("send_email", |args| {
61///     args["to"] == "test@example.com"
62/// });
63/// ```
64pub struct MockJobDispatch {
65    jobs: RwLock<Vec<DispatchedJob>>,
66}
67
68impl MockJobDispatch {
69    /// Create a new mock job dispatcher.
70    pub fn new() -> Self {
71        Self {
72            jobs: RwLock::new(Vec::new()),
73        }
74    }
75
76    /// Dispatch a job (records for later verification).
77    pub async fn dispatch<T: serde::Serialize>(&self, job_type: &str, args: T) -> Result<Uuid> {
78        let id = Uuid::new_v4();
79        let args_json =
80            serde_json::to_value(args).map_err(|e| ForgeError::Serialization(e.to_string()))?;
81
82        let job = DispatchedJob {
83            id,
84            job_type: job_type.to_string(),
85            args: args_json,
86            dispatched_at: Utc::now(),
87            status: JobStatus::Pending,
88            cancel_reason: None,
89        };
90
91        self.jobs.write().expect("jobs lock poisoned").push(job);
92        Ok(id)
93    }
94
95    /// Get all dispatched jobs.
96    pub fn dispatched_jobs(&self) -> Vec<DispatchedJob> {
97        self.jobs.read().expect("jobs lock poisoned").clone()
98    }
99
100    /// Get jobs of a specific type.
101    pub fn jobs_of_type(&self, job_type: &str) -> Vec<DispatchedJob> {
102        self.jobs
103            .read()
104            .expect("jobs lock poisoned")
105            .iter()
106            .filter(|j| j.job_type == job_type)
107            .cloned()
108            .collect()
109    }
110
111    /// Assert that a job type was dispatched.
112    pub fn assert_dispatched(&self, job_type: &str) {
113        let jobs = self.jobs.read().expect("jobs lock poisoned");
114        let found = jobs.iter().any(|j| j.job_type == job_type);
115        assert!(
116            found,
117            "Expected job '{}' to be dispatched, but it wasn't. Dispatched jobs: {:?}",
118            job_type,
119            jobs.iter().map(|j| &j.job_type).collect::<Vec<_>>()
120        );
121    }
122
123    /// Assert that a job was dispatched with matching arguments.
124    pub fn assert_dispatched_with<F>(&self, job_type: &str, predicate: F)
125    where
126        F: Fn(&serde_json::Value) -> bool,
127    {
128        let jobs = self.jobs.read().expect("jobs lock poisoned");
129        let found = jobs
130            .iter()
131            .any(|j| j.job_type == job_type && predicate(&j.args));
132        assert!(
133            found,
134            "Expected job '{}' with matching args to be dispatched",
135            job_type
136        );
137    }
138
139    /// Assert that a job type was not dispatched.
140    pub fn assert_not_dispatched(&self, job_type: &str) {
141        let jobs = self.jobs.read().expect("jobs lock poisoned");
142        let found = jobs.iter().any(|j| j.job_type == job_type);
143        assert!(
144            !found,
145            "Expected job '{}' NOT to be dispatched, but it was",
146            job_type
147        );
148    }
149
150    /// Assert that a specific number of jobs were dispatched.
151    pub fn assert_dispatch_count(&self, job_type: &str, expected: usize) {
152        let jobs = self.jobs.read().expect("jobs lock poisoned");
153        let count = jobs.iter().filter(|j| j.job_type == job_type).count();
154        assert_eq!(
155            count, expected,
156            "Expected {} dispatches of '{}', but found {}",
157            expected, job_type, count
158        );
159    }
160
161    /// Clear all recorded jobs.
162    pub fn clear(&self) {
163        self.jobs.write().expect("jobs lock poisoned").clear();
164    }
165
166    /// Mark a job as completed (for test simulation).
167    pub fn complete_job(&self, job_id: Uuid) {
168        let mut jobs = self.jobs.write().expect("jobs lock poisoned");
169        if let Some(job) = jobs.iter_mut().find(|j| j.id == job_id) {
170            job.status = JobStatus::Completed;
171        }
172    }
173
174    /// Mark a job as failed (for test simulation).
175    pub fn fail_job(&self, job_id: Uuid) {
176        let mut jobs = self.jobs.write().expect("jobs lock poisoned");
177        if let Some(job) = jobs.iter_mut().find(|j| j.id == job_id) {
178            job.status = JobStatus::Failed;
179        }
180    }
181
182    /// Mark a job as cancelled (for test simulation).
183    pub fn cancel_job(&self, job_id: Uuid, reason: Option<String>) {
184        let mut jobs = self.jobs.write().expect("jobs lock poisoned");
185        if let Some(job) = jobs.iter_mut().find(|j| j.id == job_id) {
186            job.status = JobStatus::Cancelled;
187            job.cancel_reason = reason;
188        }
189    }
190}
191
192impl Default for MockJobDispatch {
193    fn default() -> Self {
194        Self::new()
195    }
196}
197
198impl crate::function::JobDispatch for MockJobDispatch {
199    fn get_info(&self, _job_type: &str) -> Option<crate::job::JobInfo> {
200        None
201    }
202
203    fn dispatch_by_name(
204        &self,
205        job_type: &str,
206        args: serde_json::Value,
207        _owner_subject: Option<String>,
208    ) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<Uuid>> + Send + '_>> {
209        let job_type = job_type.to_string();
210        Box::pin(async move { self.dispatch(&job_type, args).await })
211    }
212
213    fn cancel(
214        &self,
215        job_id: Uuid,
216        reason: Option<String>,
217    ) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<bool>> + Send + '_>> {
218        Box::pin(async move {
219            self.cancel_job(job_id, reason);
220            Ok(true)
221        })
222    }
223}
224
225/// Mock workflow dispatcher for testing.
226///
227/// Records started workflows for later verification.
228///
229/// # Example
230///
231/// ```ignore
232/// let dispatch = MockWorkflowDispatch::new();
233/// dispatch.start("onboarding", json!({"user_id": "123"})).await?;
234///
235/// dispatch.assert_started("onboarding");
236/// ```
237pub struct MockWorkflowDispatch {
238    workflows: RwLock<Vec<StartedWorkflow>>,
239}
240
241impl MockWorkflowDispatch {
242    /// Create a new mock workflow dispatcher.
243    pub fn new() -> Self {
244        Self {
245            workflows: RwLock::new(Vec::new()),
246        }
247    }
248
249    /// Start a workflow (records for later verification).
250    pub async fn start<T: serde::Serialize>(&self, workflow_name: &str, input: T) -> Result<Uuid> {
251        let run_id = Uuid::new_v4();
252        let input_json =
253            serde_json::to_value(input).map_err(|e| ForgeError::Serialization(e.to_string()))?;
254
255        let workflow = StartedWorkflow {
256            run_id,
257            workflow_name: workflow_name.to_string(),
258            input: input_json,
259            started_at: Utc::now(),
260            status: WorkflowStatus::Created,
261        };
262
263        self.workflows
264            .write()
265            .expect("workflows lock poisoned")
266            .push(workflow);
267        Ok(run_id)
268    }
269
270    /// Get all started workflows.
271    pub fn started_workflows(&self) -> Vec<StartedWorkflow> {
272        self.workflows
273            .read()
274            .expect("workflows lock poisoned")
275            .clone()
276    }
277
278    /// Get workflows of a specific name.
279    pub fn workflows_named(&self, name: &str) -> Vec<StartedWorkflow> {
280        self.workflows
281            .read()
282            .expect("workflows lock poisoned")
283            .iter()
284            .filter(|w| w.workflow_name == name)
285            .cloned()
286            .collect()
287    }
288
289    /// Assert that a workflow was started.
290    pub fn assert_started(&self, workflow_name: &str) {
291        let workflows = self.workflows.read().expect("workflows lock poisoned");
292        let found = workflows.iter().any(|w| w.workflow_name == workflow_name);
293        assert!(
294            found,
295            "Expected workflow '{}' to be started, but it wasn't. Started workflows: {:?}",
296            workflow_name,
297            workflows
298                .iter()
299                .map(|w| &w.workflow_name)
300                .collect::<Vec<_>>()
301        );
302    }
303
304    /// Assert that a workflow was started with matching input.
305    pub fn assert_started_with<F>(&self, workflow_name: &str, predicate: F)
306    where
307        F: Fn(&serde_json::Value) -> bool,
308    {
309        let workflows = self.workflows.read().expect("workflows lock poisoned");
310        let found = workflows
311            .iter()
312            .any(|w| w.workflow_name == workflow_name && predicate(&w.input));
313        assert!(
314            found,
315            "Expected workflow '{}' with matching input to be started",
316            workflow_name
317        );
318    }
319
320    /// Assert that a workflow was not started.
321    pub fn assert_not_started(&self, workflow_name: &str) {
322        let workflows = self.workflows.read().expect("workflows lock poisoned");
323        let found = workflows.iter().any(|w| w.workflow_name == workflow_name);
324        assert!(
325            !found,
326            "Expected workflow '{}' NOT to be started, but it was",
327            workflow_name
328        );
329    }
330
331    /// Assert that a specific number of workflows were started.
332    pub fn assert_start_count(&self, workflow_name: &str, expected: usize) {
333        let workflows = self.workflows.read().expect("workflows lock poisoned");
334        let count = workflows
335            .iter()
336            .filter(|w| w.workflow_name == workflow_name)
337            .count();
338        assert_eq!(
339            count, expected,
340            "Expected {} starts of '{}', but found {}",
341            expected, workflow_name, count
342        );
343    }
344
345    /// Clear all recorded workflows.
346    pub fn clear(&self) {
347        self.workflows
348            .write()
349            .expect("workflows lock poisoned")
350            .clear();
351    }
352
353    /// Mark a workflow as completed (for test simulation).
354    pub fn complete_workflow(&self, run_id: Uuid) {
355        let mut workflows = self.workflows.write().expect("workflows lock poisoned");
356        if let Some(workflow) = workflows.iter_mut().find(|w| w.run_id == run_id) {
357            workflow.status = WorkflowStatus::Completed;
358        }
359    }
360
361    /// Mark a workflow as failed (for test simulation).
362    pub fn fail_workflow(&self, run_id: Uuid) {
363        let mut workflows = self.workflows.write().expect("workflows lock poisoned");
364        if let Some(workflow) = workflows.iter_mut().find(|w| w.run_id == run_id) {
365            workflow.status = WorkflowStatus::Failed;
366        }
367    }
368}
369
370impl Default for MockWorkflowDispatch {
371    fn default() -> Self {
372        Self::new()
373    }
374}
375
376#[cfg(test)]
377mod tests {
378    use super::*;
379
380    #[tokio::test]
381    async fn test_mock_job_dispatch() {
382        let dispatch = MockJobDispatch::new();
383
384        let job_id = dispatch
385            .dispatch("send_email", serde_json::json!({"to": "test@example.com"}))
386            .await
387            .unwrap();
388
389        assert!(!job_id.is_nil());
390        dispatch.assert_dispatched("send_email");
391        dispatch.assert_not_dispatched("other_job");
392    }
393
394    #[tokio::test]
395    async fn test_job_dispatch_with_args() {
396        let dispatch = MockJobDispatch::new();
397
398        dispatch
399            .dispatch("send_email", serde_json::json!({"to": "test@example.com"}))
400            .await
401            .unwrap();
402
403        dispatch.assert_dispatched_with("send_email", |args| args["to"] == "test@example.com");
404    }
405
406    #[tokio::test]
407    async fn test_job_dispatch_count() {
408        let dispatch = MockJobDispatch::new();
409
410        dispatch
411            .dispatch("job_a", serde_json::json!({}))
412            .await
413            .unwrap();
414        dispatch
415            .dispatch("job_b", serde_json::json!({}))
416            .await
417            .unwrap();
418        dispatch
419            .dispatch("job_a", serde_json::json!({}))
420            .await
421            .unwrap();
422
423        dispatch.assert_dispatch_count("job_a", 2);
424        dispatch.assert_dispatch_count("job_b", 1);
425    }
426
427    #[tokio::test]
428    async fn test_mock_workflow_dispatch() {
429        let dispatch = MockWorkflowDispatch::new();
430
431        let run_id = dispatch
432            .start("onboarding", serde_json::json!({"user_id": "123"}))
433            .await
434            .unwrap();
435
436        assert!(!run_id.is_nil());
437        dispatch.assert_started("onboarding");
438        dispatch.assert_not_started("other_workflow");
439    }
440
441    #[tokio::test]
442    async fn test_workflow_dispatch_with_input() {
443        let dispatch = MockWorkflowDispatch::new();
444
445        dispatch
446            .start("onboarding", serde_json::json!({"user_id": "123"}))
447            .await
448            .unwrap();
449
450        dispatch.assert_started_with("onboarding", |input| input["user_id"] == "123");
451    }
452
453    #[tokio::test]
454    async fn test_clear() {
455        let dispatch = MockJobDispatch::new();
456        dispatch
457            .dispatch("test", serde_json::json!({}))
458            .await
459            .unwrap();
460
461        assert_eq!(dispatch.dispatched_jobs().len(), 1);
462        dispatch.clear();
463        assert_eq!(dispatch.dispatched_jobs().len(), 0);
464    }
465
466    #[tokio::test]
467    async fn test_job_status_simulation() {
468        let dispatch = MockJobDispatch::new();
469        let job_id = dispatch
470            .dispatch("test", serde_json::json!({}))
471            .await
472            .unwrap();
473
474        let jobs = dispatch.dispatched_jobs();
475        assert_eq!(jobs[0].status, JobStatus::Pending);
476
477        dispatch.complete_job(job_id);
478
479        let jobs = dispatch.dispatched_jobs();
480        assert_eq!(jobs[0].status, JobStatus::Completed);
481    }
482
483    #[tokio::test]
484    async fn test_job_cancel_simulation() {
485        let dispatch = MockJobDispatch::new();
486        let job_id = dispatch
487            .dispatch("test", serde_json::json!({}))
488            .await
489            .unwrap();
490
491        dispatch.cancel_job(job_id, Some("user request".to_string()));
492
493        let jobs = dispatch.dispatched_jobs();
494        assert_eq!(jobs[0].status, JobStatus::Cancelled);
495        assert_eq!(jobs[0].cancel_reason.as_deref(), Some("user request"));
496    }
497}