ferro_queue/
dispatcher.rs

1//! Job dispatching utilities.
2
3use crate::{Error, Job, JobPayload, Queue, QueueConfig};
4use serde::{de::DeserializeOwned, Serialize};
5use std::time::Duration;
6
7/// A pending job dispatch.
8///
9/// This builder allows configuring how a job is dispatched before
10/// actually sending it to the queue.
11pub struct PendingDispatch<J> {
12    job: J,
13    queue: Option<&'static str>,
14    delay: Option<Duration>,
15}
16
17impl<J> PendingDispatch<J>
18where
19    J: Job + Serialize + DeserializeOwned,
20{
21    /// Create a new pending dispatch.
22    pub fn new(job: J) -> Self {
23        Self {
24            job,
25            queue: None,
26            delay: None,
27        }
28    }
29
30    /// Set the queue to dispatch to.
31    pub fn on_queue(mut self, queue: &'static str) -> Self {
32        self.queue = Some(queue);
33        self
34    }
35
36    /// Set a delay before the job is available for processing.
37    pub fn delay(mut self, duration: Duration) -> Self {
38        self.delay = Some(duration);
39        self
40    }
41
42    /// Dispatch the job to the queue.
43    ///
44    /// In sync mode (`QUEUE_CONNECTION=sync`), the job is executed immediately
45    /// in the current task. This is useful for development and testing.
46    ///
47    /// In redis mode (`QUEUE_CONNECTION=redis`), the job is pushed to the
48    /// Redis queue for background processing by a worker.
49    pub async fn dispatch(self) -> Result<(), Error> {
50        if QueueConfig::is_sync_mode() {
51            return self.dispatch_immediately().await;
52        }
53
54        self.dispatch_to_queue().await
55    }
56
57    /// Execute the job immediately (sync mode).
58    async fn dispatch_immediately(self) -> Result<(), Error> {
59        let job_name = self.job.name();
60
61        if self.delay.is_some() {
62            tracing::debug!(
63                job = %job_name,
64                "Job delay ignored in sync mode"
65            );
66        }
67
68        tracing::debug!(job = %job_name, "Executing job synchronously");
69
70        match self.job.handle().await {
71            Ok(()) => {
72                tracing::debug!(job = %job_name, "Job completed successfully");
73                Ok(())
74            }
75            Err(e) => {
76                tracing::error!(job = %job_name, error = %e, "Job failed");
77                self.job.failed(&e).await;
78                Err(e)
79            }
80        }
81    }
82
83    /// Push the job to the Redis queue.
84    async fn dispatch_to_queue(self) -> Result<(), Error> {
85        let conn = Queue::connection();
86        let queue = self.queue.unwrap_or(&conn.config().default_queue);
87
88        let payload = match self.delay {
89            Some(delay) => JobPayload::with_delay(&self.job, queue, delay)?,
90            None => JobPayload::new(&self.job, queue)?,
91        };
92
93        conn.push(payload).await
94    }
95
96    /// Dispatch the job in a background task (fire and forget).
97    ///
98    /// This spawns the dispatch as a background task, useful when you
99    /// don't need to wait for the dispatch to complete.
100    pub fn dispatch_now(self)
101    where
102        J: Send + 'static,
103    {
104        tokio::spawn(async move {
105            if let Err(e) = self.dispatch().await {
106                tracing::error!(error = %e, "Failed to dispatch job");
107            }
108        });
109    }
110
111    /// Dispatch the job synchronously (fire and forget).
112    ///
113    /// This spawns the dispatch as a background task.
114    #[deprecated(since = "0.2.0", note = "Use dispatch_now() instead")]
115    pub fn dispatch_sync(self)
116    where
117        J: Send + 'static,
118    {
119        self.dispatch_now()
120    }
121}
122
123/// Dispatch a job using the global queue.
124///
125/// In sync mode, the job executes immediately. In redis mode, it's
126/// queued for background processing.
127///
128/// # Example
129///
130/// ```rust,ignore
131/// use ferro_queue::{dispatch, Job, Error};
132///
133/// #[derive(Debug, Serialize, Deserialize)]
134/// struct MyJob { data: String }
135///
136/// impl Job for MyJob {
137///     async fn handle(&self) -> Result<(), Error> { Ok(()) }
138/// }
139///
140/// dispatch(MyJob { data: "hello".into() }).await?;
141/// ```
142pub async fn dispatch<J>(job: J) -> Result<(), Error>
143where
144    J: Job + Serialize + DeserializeOwned,
145{
146    PendingDispatch::new(job).dispatch().await
147}
148
149/// Dispatch a job to a specific queue.
150pub async fn dispatch_to<J>(job: J, queue: &'static str) -> Result<(), Error>
151where
152    J: Job + Serialize + DeserializeOwned,
153{
154    PendingDispatch::new(job).on_queue(queue).dispatch().await
155}
156
157/// Dispatch a job with a delay.
158///
159/// Note: In sync mode, the delay is ignored and the job executes immediately.
160pub async fn dispatch_later<J>(job: J, delay: Duration) -> Result<(), Error>
161where
162    J: Job + Serialize + DeserializeOwned,
163{
164    PendingDispatch::new(job).delay(delay).dispatch().await
165}
166
167#[cfg(test)]
168mod tests {
169    use super::*;
170    use crate::async_trait;
171    use serial_test::serial;
172    use std::env;
173    use std::sync::atomic::{AtomicBool, Ordering};
174    use std::sync::Arc;
175
176    #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
177    struct TestJob {
178        #[serde(skip)]
179        executed: Arc<AtomicBool>,
180    }
181
182    impl TestJob {
183        fn new() -> (Self, Arc<AtomicBool>) {
184            let executed = Arc::new(AtomicBool::new(false));
185            (
186                Self {
187                    executed: executed.clone(),
188                },
189                executed,
190            )
191        }
192    }
193
194    #[async_trait]
195    impl Job for TestJob {
196        async fn handle(&self) -> Result<(), Error> {
197            self.executed.store(true, Ordering::SeqCst);
198            Ok(())
199        }
200    }
201
202    #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
203    struct FailingJob;
204
205    #[async_trait]
206    impl Job for FailingJob {
207        async fn handle(&self) -> Result<(), Error> {
208            Err(Error::job_failed("FailingJob", "intentional failure"))
209        }
210    }
211
212    #[tokio::test]
213    #[serial]
214    async fn test_sync_mode_executes_immediately() {
215        env::set_var("QUEUE_CONNECTION", "sync");
216
217        let (job, executed) = TestJob::new();
218        assert!(!executed.load(Ordering::SeqCst));
219
220        let result = PendingDispatch::new(job).dispatch().await;
221        assert!(result.is_ok());
222        assert!(executed.load(Ordering::SeqCst));
223
224        env::remove_var("QUEUE_CONNECTION");
225    }
226
227    #[tokio::test]
228    #[serial]
229    async fn test_sync_mode_handles_failure() {
230        env::set_var("QUEUE_CONNECTION", "sync");
231
232        let result = PendingDispatch::new(FailingJob).dispatch().await;
233        assert!(result.is_err());
234
235        env::remove_var("QUEUE_CONNECTION");
236    }
237
238    #[tokio::test]
239    #[serial]
240    async fn test_sync_mode_ignores_delay() {
241        env::set_var("QUEUE_CONNECTION", "sync");
242
243        let (job, executed) = TestJob::new();
244
245        let start = std::time::Instant::now();
246        let result = PendingDispatch::new(job)
247            .delay(Duration::from_secs(10))
248            .dispatch()
249            .await;
250
251        assert!(result.is_ok());
252        assert!(executed.load(Ordering::SeqCst));
253        // Should complete quickly, not wait 10 seconds
254        assert!(start.elapsed() < Duration::from_secs(1));
255
256        env::remove_var("QUEUE_CONNECTION");
257    }
258
259    #[tokio::test]
260    #[serial]
261    async fn test_sync_mode_ignores_queue() {
262        env::set_var("QUEUE_CONNECTION", "sync");
263
264        let (job, executed) = TestJob::new();
265
266        let result = PendingDispatch::new(job)
267            .on_queue("high-priority")
268            .dispatch()
269            .await;
270
271        assert!(result.is_ok());
272        assert!(executed.load(Ordering::SeqCst));
273
274        env::remove_var("QUEUE_CONNECTION");
275    }
276}