Skip to main content

forge_core/testing/
mock_dispatch.rs

1//! Mock dispatchers for testing job and workflow dispatch.
2//!
3//! Provides mock implementations that record dispatched jobs and workflows
4//! for verification in tests.
5
6use std::sync::RwLock;
7
8use chrono::{DateTime, Utc};
9use uuid::Uuid;
10
11use crate::error::{ForgeError, Result};
12use crate::job::JobStatus;
13use crate::workflow::WorkflowStatus;
14
15/// Record of a dispatched job.
16#[derive(Debug, Clone)]
17pub struct DispatchedJob {
18    /// Job ID.
19    pub id: Uuid,
20    /// Job type name.
21    pub job_type: String,
22    /// Job arguments (serialized).
23    pub args: serde_json::Value,
24    /// When the job was dispatched.
25    pub dispatched_at: DateTime<Utc>,
26    /// Current status (for test simulation).
27    pub status: JobStatus,
28    /// Cancellation reason (if any).
29    pub cancel_reason: Option<String>,
30}
31
32/// Record of a started workflow.
33#[derive(Debug, Clone)]
34pub struct StartedWorkflow {
35    /// Run ID.
36    pub run_id: Uuid,
37    /// Workflow name.
38    pub workflow_name: String,
39    /// Input (serialized).
40    pub input: serde_json::Value,
41    /// When the workflow was started.
42    pub started_at: DateTime<Utc>,
43    /// Current status.
44    pub status: WorkflowStatus,
45}
46
47/// Mock job dispatcher for testing.
48///
49/// Records dispatched jobs for later verification.
50///
51/// # Example
52///
53/// ```ignore
54/// let dispatch = MockJobDispatch::new();
55/// dispatch.dispatch("send_email", json!({"to": "test@example.com"})).await?;
56///
57/// dispatch.assert_dispatched("send_email");
58/// dispatch.assert_dispatched_with("send_email", |args| {
59///     args["to"] == "test@example.com"
60/// });
61/// ```
62pub struct MockJobDispatch {
63    jobs: RwLock<Vec<DispatchedJob>>,
64}
65
66impl MockJobDispatch {
67    /// Create a new mock job dispatcher.
68    pub fn new() -> Self {
69        Self {
70            jobs: RwLock::new(Vec::new()),
71        }
72    }
73
74    /// Dispatch a job (records for later verification).
75    pub async fn dispatch<T: serde::Serialize>(&self, job_type: &str, args: T) -> Result<Uuid> {
76        let id = Uuid::new_v4();
77        let args_json =
78            serde_json::to_value(args).map_err(|e| ForgeError::Serialization(e.to_string()))?;
79
80        let job = DispatchedJob {
81            id,
82            job_type: job_type.to_string(),
83            args: args_json,
84            dispatched_at: Utc::now(),
85            status: JobStatus::Pending,
86            cancel_reason: None,
87        };
88
89        self.jobs.write().unwrap().push(job);
90        Ok(id)
91    }
92
93    /// Get all dispatched jobs.
94    pub fn dispatched_jobs(&self) -> Vec<DispatchedJob> {
95        self.jobs.read().unwrap().clone()
96    }
97
98    /// Get jobs of a specific type.
99    pub fn jobs_of_type(&self, job_type: &str) -> Vec<DispatchedJob> {
100        self.jobs
101            .read()
102            .unwrap()
103            .iter()
104            .filter(|j| j.job_type == job_type)
105            .cloned()
106            .collect()
107    }
108
109    /// Assert that a job type was dispatched.
110    pub fn assert_dispatched(&self, job_type: &str) {
111        let jobs = self.jobs.read().unwrap();
112        let found = jobs.iter().any(|j| j.job_type == job_type);
113        assert!(
114            found,
115            "Expected job '{}' to be dispatched, but it wasn't. Dispatched jobs: {:?}",
116            job_type,
117            jobs.iter().map(|j| &j.job_type).collect::<Vec<_>>()
118        );
119    }
120
121    /// Assert that a job was dispatched with matching arguments.
122    pub fn assert_dispatched_with<F>(&self, job_type: &str, predicate: F)
123    where
124        F: Fn(&serde_json::Value) -> bool,
125    {
126        let jobs = self.jobs.read().unwrap();
127        let found = jobs
128            .iter()
129            .any(|j| j.job_type == job_type && predicate(&j.args));
130        assert!(
131            found,
132            "Expected job '{}' with matching args to be dispatched",
133            job_type
134        );
135    }
136
137    /// Assert that a job type was not dispatched.
138    pub fn assert_not_dispatched(&self, job_type: &str) {
139        let jobs = self.jobs.read().unwrap();
140        let found = jobs.iter().any(|j| j.job_type == job_type);
141        assert!(
142            !found,
143            "Expected job '{}' NOT to be dispatched, but it was",
144            job_type
145        );
146    }
147
148    /// Assert that a specific number of jobs were dispatched.
149    pub fn assert_dispatch_count(&self, job_type: &str, expected: usize) {
150        let jobs = self.jobs.read().unwrap();
151        let count = jobs.iter().filter(|j| j.job_type == job_type).count();
152        assert_eq!(
153            count, expected,
154            "Expected {} dispatches of '{}', but found {}",
155            expected, job_type, count
156        );
157    }
158
159    /// Clear all recorded jobs.
160    pub fn clear(&self) {
161        self.jobs.write().unwrap().clear();
162    }
163
164    /// Mark a job as completed (for test simulation).
165    pub fn complete_job(&self, job_id: Uuid) {
166        let mut jobs = self.jobs.write().unwrap();
167        if let Some(job) = jobs.iter_mut().find(|j| j.id == job_id) {
168            job.status = JobStatus::Completed;
169        }
170    }
171
172    /// Mark a job as failed (for test simulation).
173    pub fn fail_job(&self, job_id: Uuid) {
174        let mut jobs = self.jobs.write().unwrap();
175        if let Some(job) = jobs.iter_mut().find(|j| j.id == job_id) {
176            job.status = JobStatus::Failed;
177        }
178    }
179
180    /// Mark a job as cancelled (for test simulation).
181    pub fn cancel_job(&self, job_id: Uuid, reason: Option<String>) {
182        let mut jobs = self.jobs.write().unwrap();
183        if let Some(job) = jobs.iter_mut().find(|j| j.id == job_id) {
184            job.status = JobStatus::Cancelled;
185            job.cancel_reason = reason;
186        }
187    }
188}
189
190impl Default for MockJobDispatch {
191    fn default() -> Self {
192        Self::new()
193    }
194}
195
196impl crate::function::JobDispatch for MockJobDispatch {
197    fn get_info(&self, _job_type: &str) -> Option<crate::job::JobInfo> {
198        None
199    }
200
201    fn dispatch_by_name(
202        &self,
203        job_type: &str,
204        args: serde_json::Value,
205    ) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<Uuid>> + Send + '_>> {
206        let job_type = job_type.to_string();
207        Box::pin(async move { self.dispatch(&job_type, args).await })
208    }
209
210    fn cancel(
211        &self,
212        job_id: Uuid,
213        reason: Option<String>,
214    ) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<bool>> + Send + '_>> {
215        Box::pin(async move {
216            self.cancel_job(job_id, reason);
217            Ok(true)
218        })
219    }
220}
221
222/// Mock workflow dispatcher for testing.
223///
224/// Records started workflows for later verification.
225///
226/// # Example
227///
228/// ```ignore
229/// let dispatch = MockWorkflowDispatch::new();
230/// dispatch.start("onboarding", json!({"user_id": "123"})).await?;
231///
232/// dispatch.assert_started("onboarding");
233/// ```
234pub struct MockWorkflowDispatch {
235    workflows: RwLock<Vec<StartedWorkflow>>,
236}
237
238impl MockWorkflowDispatch {
239    /// Create a new mock workflow dispatcher.
240    pub fn new() -> Self {
241        Self {
242            workflows: RwLock::new(Vec::new()),
243        }
244    }
245
246    /// Start a workflow (records for later verification).
247    pub async fn start<T: serde::Serialize>(&self, workflow_name: &str, input: T) -> Result<Uuid> {
248        let run_id = Uuid::new_v4();
249        let input_json =
250            serde_json::to_value(input).map_err(|e| ForgeError::Serialization(e.to_string()))?;
251
252        let workflow = StartedWorkflow {
253            run_id,
254            workflow_name: workflow_name.to_string(),
255            input: input_json,
256            started_at: Utc::now(),
257            status: WorkflowStatus::Created,
258        };
259
260        self.workflows.write().unwrap().push(workflow);
261        Ok(run_id)
262    }
263
264    /// Get all started workflows.
265    pub fn started_workflows(&self) -> Vec<StartedWorkflow> {
266        self.workflows.read().unwrap().clone()
267    }
268
269    /// Get workflows of a specific name.
270    pub fn workflows_named(&self, name: &str) -> Vec<StartedWorkflow> {
271        self.workflows
272            .read()
273            .unwrap()
274            .iter()
275            .filter(|w| w.workflow_name == name)
276            .cloned()
277            .collect()
278    }
279
280    /// Assert that a workflow was started.
281    pub fn assert_started(&self, workflow_name: &str) {
282        let workflows = self.workflows.read().unwrap();
283        let found = workflows.iter().any(|w| w.workflow_name == workflow_name);
284        assert!(
285            found,
286            "Expected workflow '{}' to be started, but it wasn't. Started workflows: {:?}",
287            workflow_name,
288            workflows
289                .iter()
290                .map(|w| &w.workflow_name)
291                .collect::<Vec<_>>()
292        );
293    }
294
295    /// Assert that a workflow was started with matching input.
296    pub fn assert_started_with<F>(&self, workflow_name: &str, predicate: F)
297    where
298        F: Fn(&serde_json::Value) -> bool,
299    {
300        let workflows = self.workflows.read().unwrap();
301        let found = workflows
302            .iter()
303            .any(|w| w.workflow_name == workflow_name && predicate(&w.input));
304        assert!(
305            found,
306            "Expected workflow '{}' with matching input to be started",
307            workflow_name
308        );
309    }
310
311    /// Assert that a workflow was not started.
312    pub fn assert_not_started(&self, workflow_name: &str) {
313        let workflows = self.workflows.read().unwrap();
314        let found = workflows.iter().any(|w| w.workflow_name == workflow_name);
315        assert!(
316            !found,
317            "Expected workflow '{}' NOT to be started, but it was",
318            workflow_name
319        );
320    }
321
322    /// Assert that a specific number of workflows were started.
323    pub fn assert_start_count(&self, workflow_name: &str, expected: usize) {
324        let workflows = self.workflows.read().unwrap();
325        let count = workflows
326            .iter()
327            .filter(|w| w.workflow_name == workflow_name)
328            .count();
329        assert_eq!(
330            count, expected,
331            "Expected {} starts of '{}', but found {}",
332            expected, workflow_name, count
333        );
334    }
335
336    /// Clear all recorded workflows.
337    pub fn clear(&self) {
338        self.workflows.write().unwrap().clear();
339    }
340
341    /// Mark a workflow as completed (for test simulation).
342    pub fn complete_workflow(&self, run_id: Uuid) {
343        let mut workflows = self.workflows.write().unwrap();
344        if let Some(workflow) = workflows.iter_mut().find(|w| w.run_id == run_id) {
345            workflow.status = WorkflowStatus::Completed;
346        }
347    }
348
349    /// Mark a workflow as failed (for test simulation).
350    pub fn fail_workflow(&self, run_id: Uuid) {
351        let mut workflows = self.workflows.write().unwrap();
352        if let Some(workflow) = workflows.iter_mut().find(|w| w.run_id == run_id) {
353            workflow.status = WorkflowStatus::Failed;
354        }
355    }
356}
357
358impl Default for MockWorkflowDispatch {
359    fn default() -> Self {
360        Self::new()
361    }
362}
363
364#[cfg(test)]
365mod tests {
366    use super::*;
367
368    #[tokio::test]
369    async fn test_mock_job_dispatch() {
370        let dispatch = MockJobDispatch::new();
371
372        let job_id = dispatch
373            .dispatch("send_email", serde_json::json!({"to": "test@example.com"}))
374            .await
375            .unwrap();
376
377        assert!(!job_id.is_nil());
378        dispatch.assert_dispatched("send_email");
379        dispatch.assert_not_dispatched("other_job");
380    }
381
382    #[tokio::test]
383    async fn test_job_dispatch_with_args() {
384        let dispatch = MockJobDispatch::new();
385
386        dispatch
387            .dispatch("send_email", serde_json::json!({"to": "test@example.com"}))
388            .await
389            .unwrap();
390
391        dispatch.assert_dispatched_with("send_email", |args| args["to"] == "test@example.com");
392    }
393
394    #[tokio::test]
395    async fn test_job_dispatch_count() {
396        let dispatch = MockJobDispatch::new();
397
398        dispatch
399            .dispatch("job_a", serde_json::json!({}))
400            .await
401            .unwrap();
402        dispatch
403            .dispatch("job_b", serde_json::json!({}))
404            .await
405            .unwrap();
406        dispatch
407            .dispatch("job_a", serde_json::json!({}))
408            .await
409            .unwrap();
410
411        dispatch.assert_dispatch_count("job_a", 2);
412        dispatch.assert_dispatch_count("job_b", 1);
413    }
414
415    #[tokio::test]
416    async fn test_mock_workflow_dispatch() {
417        let dispatch = MockWorkflowDispatch::new();
418
419        let run_id = dispatch
420            .start("onboarding", serde_json::json!({"user_id": "123"}))
421            .await
422            .unwrap();
423
424        assert!(!run_id.is_nil());
425        dispatch.assert_started("onboarding");
426        dispatch.assert_not_started("other_workflow");
427    }
428
429    #[tokio::test]
430    async fn test_workflow_dispatch_with_input() {
431        let dispatch = MockWorkflowDispatch::new();
432
433        dispatch
434            .start("onboarding", serde_json::json!({"user_id": "123"}))
435            .await
436            .unwrap();
437
438        dispatch.assert_started_with("onboarding", |input| input["user_id"] == "123");
439    }
440
441    #[tokio::test]
442    async fn test_clear() {
443        let dispatch = MockJobDispatch::new();
444        dispatch
445            .dispatch("test", serde_json::json!({}))
446            .await
447            .unwrap();
448
449        assert_eq!(dispatch.dispatched_jobs().len(), 1);
450        dispatch.clear();
451        assert_eq!(dispatch.dispatched_jobs().len(), 0);
452    }
453
454    #[tokio::test]
455    async fn test_job_status_simulation() {
456        let dispatch = MockJobDispatch::new();
457        let job_id = dispatch
458            .dispatch("test", serde_json::json!({}))
459            .await
460            .unwrap();
461
462        let jobs = dispatch.dispatched_jobs();
463        assert_eq!(jobs[0].status, JobStatus::Pending);
464
465        dispatch.complete_job(job_id);
466
467        let jobs = dispatch.dispatched_jobs();
468        assert_eq!(jobs[0].status, JobStatus::Completed);
469    }
470
471    #[tokio::test]
472    async fn test_job_cancel_simulation() {
473        let dispatch = MockJobDispatch::new();
474        let job_id = dispatch
475            .dispatch("test", serde_json::json!({}))
476            .await
477            .unwrap();
478
479        dispatch.cancel_job(job_id, Some("user request".to_string()));
480
481        let jobs = dispatch.dispatched_jobs();
482        assert_eq!(jobs[0].status, JobStatus::Cancelled);
483        assert_eq!(jobs[0].cancel_reason.as_deref(), Some("user request"));
484    }
485}