Skip to main content

forge_core/testing/
mock_dispatch.rs

1//! Mock dispatchers for testing job and workflow dispatch.
2//!
3//! Provides mock implementations that record dispatched jobs and workflows
4//! for verification in tests.
5
6use std::sync::RwLock;
7
8use chrono::{DateTime, Utc};
9use uuid::Uuid;
10
11use crate::error::{ForgeError, Result};
12use crate::job::JobStatus;
13use crate::workflow::WorkflowStatus;
14
15/// Record of a dispatched job.
16#[derive(Debug, Clone)]
17pub struct DispatchedJob {
18    /// Job ID.
19    pub id: Uuid,
20    /// Job type name.
21    pub job_type: String,
22    /// Job arguments (serialized).
23    pub args: serde_json::Value,
24    /// When the job was dispatched.
25    pub dispatched_at: DateTime<Utc>,
26    /// Current status (for test simulation).
27    pub status: JobStatus,
28}
29
30/// Record of a started workflow.
31#[derive(Debug, Clone)]
32pub struct StartedWorkflow {
33    /// Run ID.
34    pub run_id: Uuid,
35    /// Workflow name.
36    pub workflow_name: String,
37    /// Input (serialized).
38    pub input: serde_json::Value,
39    /// When the workflow was started.
40    pub started_at: DateTime<Utc>,
41    /// Current status.
42    pub status: WorkflowStatus,
43}
44
45/// Mock job dispatcher for testing.
46///
47/// Records dispatched jobs for later verification.
48///
49/// # Example
50///
51/// ```ignore
52/// let dispatch = MockJobDispatch::new();
53/// dispatch.dispatch("send_email", json!({"to": "test@example.com"})).await?;
54///
55/// dispatch.assert_dispatched("send_email");
56/// dispatch.assert_dispatched_with("send_email", |args| {
57///     args["to"] == "test@example.com"
58/// });
59/// ```
60pub struct MockJobDispatch {
61    jobs: RwLock<Vec<DispatchedJob>>,
62}
63
64impl MockJobDispatch {
65    /// Create a new mock job dispatcher.
66    pub fn new() -> Self {
67        Self {
68            jobs: RwLock::new(Vec::new()),
69        }
70    }
71
72    /// Dispatch a job (records for later verification).
73    pub async fn dispatch<T: serde::Serialize>(&self, job_type: &str, args: T) -> Result<Uuid> {
74        let id = Uuid::new_v4();
75        let args_json =
76            serde_json::to_value(args).map_err(|e| ForgeError::Serialization(e.to_string()))?;
77
78        let job = DispatchedJob {
79            id,
80            job_type: job_type.to_string(),
81            args: args_json,
82            dispatched_at: Utc::now(),
83            status: JobStatus::Pending,
84        };
85
86        self.jobs.write().unwrap().push(job);
87        Ok(id)
88    }
89
90    /// Get all dispatched jobs.
91    pub fn dispatched_jobs(&self) -> Vec<DispatchedJob> {
92        self.jobs.read().unwrap().clone()
93    }
94
95    /// Get jobs of a specific type.
96    pub fn jobs_of_type(&self, job_type: &str) -> Vec<DispatchedJob> {
97        self.jobs
98            .read()
99            .unwrap()
100            .iter()
101            .filter(|j| j.job_type == job_type)
102            .cloned()
103            .collect()
104    }
105
106    /// Assert that a job type was dispatched.
107    pub fn assert_dispatched(&self, job_type: &str) {
108        let jobs = self.jobs.read().unwrap();
109        let found = jobs.iter().any(|j| j.job_type == job_type);
110        assert!(
111            found,
112            "Expected job '{}' to be dispatched, but it wasn't. Dispatched jobs: {:?}",
113            job_type,
114            jobs.iter().map(|j| &j.job_type).collect::<Vec<_>>()
115        );
116    }
117
118    /// Assert that a job was dispatched with matching arguments.
119    pub fn assert_dispatched_with<F>(&self, job_type: &str, predicate: F)
120    where
121        F: Fn(&serde_json::Value) -> bool,
122    {
123        let jobs = self.jobs.read().unwrap();
124        let found = jobs
125            .iter()
126            .any(|j| j.job_type == job_type && predicate(&j.args));
127        assert!(
128            found,
129            "Expected job '{}' with matching args to be dispatched",
130            job_type
131        );
132    }
133
134    /// Assert that a job type was not dispatched.
135    pub fn assert_not_dispatched(&self, job_type: &str) {
136        let jobs = self.jobs.read().unwrap();
137        let found = jobs.iter().any(|j| j.job_type == job_type);
138        assert!(
139            !found,
140            "Expected job '{}' NOT to be dispatched, but it was",
141            job_type
142        );
143    }
144
145    /// Assert that a specific number of jobs were dispatched.
146    pub fn assert_dispatch_count(&self, job_type: &str, expected: usize) {
147        let jobs = self.jobs.read().unwrap();
148        let count = jobs.iter().filter(|j| j.job_type == job_type).count();
149        assert_eq!(
150            count, expected,
151            "Expected {} dispatches of '{}', but found {}",
152            expected, job_type, count
153        );
154    }
155
156    /// Clear all recorded jobs.
157    pub fn clear(&self) {
158        self.jobs.write().unwrap().clear();
159    }
160
161    /// Mark a job as completed (for test simulation).
162    pub fn complete_job(&self, job_id: Uuid) {
163        let mut jobs = self.jobs.write().unwrap();
164        if let Some(job) = jobs.iter_mut().find(|j| j.id == job_id) {
165            job.status = JobStatus::Completed;
166        }
167    }
168
169    /// Mark a job as failed (for test simulation).
170    pub fn fail_job(&self, job_id: Uuid) {
171        let mut jobs = self.jobs.write().unwrap();
172        if let Some(job) = jobs.iter_mut().find(|j| j.id == job_id) {
173            job.status = JobStatus::Failed;
174        }
175    }
176}
177
178impl Default for MockJobDispatch {
179    fn default() -> Self {
180        Self::new()
181    }
182}
183
184/// Mock workflow dispatcher for testing.
185///
186/// Records started workflows for later verification.
187///
188/// # Example
189///
190/// ```ignore
191/// let dispatch = MockWorkflowDispatch::new();
192/// dispatch.start("onboarding", json!({"user_id": "123"})).await?;
193///
194/// dispatch.assert_started("onboarding");
195/// ```
196pub struct MockWorkflowDispatch {
197    workflows: RwLock<Vec<StartedWorkflow>>,
198}
199
200impl MockWorkflowDispatch {
201    /// Create a new mock workflow dispatcher.
202    pub fn new() -> Self {
203        Self {
204            workflows: RwLock::new(Vec::new()),
205        }
206    }
207
208    /// Start a workflow (records for later verification).
209    pub async fn start<T: serde::Serialize>(&self, workflow_name: &str, input: T) -> Result<Uuid> {
210        let run_id = Uuid::new_v4();
211        let input_json =
212            serde_json::to_value(input).map_err(|e| ForgeError::Serialization(e.to_string()))?;
213
214        let workflow = StartedWorkflow {
215            run_id,
216            workflow_name: workflow_name.to_string(),
217            input: input_json,
218            started_at: Utc::now(),
219            status: WorkflowStatus::Created,
220        };
221
222        self.workflows.write().unwrap().push(workflow);
223        Ok(run_id)
224    }
225
226    /// Get all started workflows.
227    pub fn started_workflows(&self) -> Vec<StartedWorkflow> {
228        self.workflows.read().unwrap().clone()
229    }
230
231    /// Get workflows of a specific name.
232    pub fn workflows_named(&self, name: &str) -> Vec<StartedWorkflow> {
233        self.workflows
234            .read()
235            .unwrap()
236            .iter()
237            .filter(|w| w.workflow_name == name)
238            .cloned()
239            .collect()
240    }
241
242    /// Assert that a workflow was started.
243    pub fn assert_started(&self, workflow_name: &str) {
244        let workflows = self.workflows.read().unwrap();
245        let found = workflows.iter().any(|w| w.workflow_name == workflow_name);
246        assert!(
247            found,
248            "Expected workflow '{}' to be started, but it wasn't. Started workflows: {:?}",
249            workflow_name,
250            workflows
251                .iter()
252                .map(|w| &w.workflow_name)
253                .collect::<Vec<_>>()
254        );
255    }
256
257    /// Assert that a workflow was started with matching input.
258    pub fn assert_started_with<F>(&self, workflow_name: &str, predicate: F)
259    where
260        F: Fn(&serde_json::Value) -> bool,
261    {
262        let workflows = self.workflows.read().unwrap();
263        let found = workflows
264            .iter()
265            .any(|w| w.workflow_name == workflow_name && predicate(&w.input));
266        assert!(
267            found,
268            "Expected workflow '{}' with matching input to be started",
269            workflow_name
270        );
271    }
272
273    /// Assert that a workflow was not started.
274    pub fn assert_not_started(&self, workflow_name: &str) {
275        let workflows = self.workflows.read().unwrap();
276        let found = workflows.iter().any(|w| w.workflow_name == workflow_name);
277        assert!(
278            !found,
279            "Expected workflow '{}' NOT to be started, but it was",
280            workflow_name
281        );
282    }
283
284    /// Assert that a specific number of workflows were started.
285    pub fn assert_start_count(&self, workflow_name: &str, expected: usize) {
286        let workflows = self.workflows.read().unwrap();
287        let count = workflows
288            .iter()
289            .filter(|w| w.workflow_name == workflow_name)
290            .count();
291        assert_eq!(
292            count, expected,
293            "Expected {} starts of '{}', but found {}",
294            expected, workflow_name, count
295        );
296    }
297
298    /// Clear all recorded workflows.
299    pub fn clear(&self) {
300        self.workflows.write().unwrap().clear();
301    }
302
303    /// Mark a workflow as completed (for test simulation).
304    pub fn complete_workflow(&self, run_id: Uuid) {
305        let mut workflows = self.workflows.write().unwrap();
306        if let Some(workflow) = workflows.iter_mut().find(|w| w.run_id == run_id) {
307            workflow.status = WorkflowStatus::Completed;
308        }
309    }
310
311    /// Mark a workflow as failed (for test simulation).
312    pub fn fail_workflow(&self, run_id: Uuid) {
313        let mut workflows = self.workflows.write().unwrap();
314        if let Some(workflow) = workflows.iter_mut().find(|w| w.run_id == run_id) {
315            workflow.status = WorkflowStatus::Failed;
316        }
317    }
318}
319
320impl Default for MockWorkflowDispatch {
321    fn default() -> Self {
322        Self::new()
323    }
324}
325
326#[cfg(test)]
327mod tests {
328    use super::*;
329
330    #[tokio::test]
331    async fn test_mock_job_dispatch() {
332        let dispatch = MockJobDispatch::new();
333
334        let job_id = dispatch
335            .dispatch("send_email", serde_json::json!({"to": "test@example.com"}))
336            .await
337            .unwrap();
338
339        assert!(!job_id.is_nil());
340        dispatch.assert_dispatched("send_email");
341        dispatch.assert_not_dispatched("other_job");
342    }
343
344    #[tokio::test]
345    async fn test_job_dispatch_with_args() {
346        let dispatch = MockJobDispatch::new();
347
348        dispatch
349            .dispatch("send_email", serde_json::json!({"to": "test@example.com"}))
350            .await
351            .unwrap();
352
353        dispatch.assert_dispatched_with("send_email", |args| args["to"] == "test@example.com");
354    }
355
356    #[tokio::test]
357    async fn test_job_dispatch_count() {
358        let dispatch = MockJobDispatch::new();
359
360        dispatch
361            .dispatch("job_a", serde_json::json!({}))
362            .await
363            .unwrap();
364        dispatch
365            .dispatch("job_b", serde_json::json!({}))
366            .await
367            .unwrap();
368        dispatch
369            .dispatch("job_a", serde_json::json!({}))
370            .await
371            .unwrap();
372
373        dispatch.assert_dispatch_count("job_a", 2);
374        dispatch.assert_dispatch_count("job_b", 1);
375    }
376
377    #[tokio::test]
378    async fn test_mock_workflow_dispatch() {
379        let dispatch = MockWorkflowDispatch::new();
380
381        let run_id = dispatch
382            .start("onboarding", serde_json::json!({"user_id": "123"}))
383            .await
384            .unwrap();
385
386        assert!(!run_id.is_nil());
387        dispatch.assert_started("onboarding");
388        dispatch.assert_not_started("other_workflow");
389    }
390
391    #[tokio::test]
392    async fn test_workflow_dispatch_with_input() {
393        let dispatch = MockWorkflowDispatch::new();
394
395        dispatch
396            .start("onboarding", serde_json::json!({"user_id": "123"}))
397            .await
398            .unwrap();
399
400        dispatch.assert_started_with("onboarding", |input| input["user_id"] == "123");
401    }
402
403    #[tokio::test]
404    async fn test_clear() {
405        let dispatch = MockJobDispatch::new();
406        dispatch
407            .dispatch("test", serde_json::json!({}))
408            .await
409            .unwrap();
410
411        assert_eq!(dispatch.dispatched_jobs().len(), 1);
412        dispatch.clear();
413        assert_eq!(dispatch.dispatched_jobs().len(), 0);
414    }
415
416    #[tokio::test]
417    async fn test_job_status_simulation() {
418        let dispatch = MockJobDispatch::new();
419        let job_id = dispatch
420            .dispatch("test", serde_json::json!({}))
421            .await
422            .unwrap();
423
424        let jobs = dispatch.dispatched_jobs();
425        assert_eq!(jobs[0].status, JobStatus::Pending);
426
427        dispatch.complete_job(job_id);
428
429        let jobs = dispatch.dispatched_jobs();
430        assert_eq!(jobs[0].status, JobStatus::Completed);
431    }
432}