Skip to main content

ferro_queue/
dispatcher.rs

1//! Job dispatching utilities.
2
3use crate::{Error, Job, JobPayload, Queue, QueueConfig};
4use serde::{de::DeserializeOwned, Serialize};
5use std::time::Duration;
6
7/// A pending job dispatch.
8///
9/// This builder allows configuring how a job is dispatched before
10/// actually sending it to the queue.
11pub struct PendingDispatch<J> {
12    job: J,
13    queue: Option<&'static str>,
14    delay: Option<Duration>,
15}
16
17impl<J> PendingDispatch<J>
18where
19    J: Job + Serialize + DeserializeOwned,
20{
21    /// Create a new pending dispatch.
22    pub fn new(job: J) -> Self {
23        Self {
24            job,
25            queue: None,
26            delay: None,
27        }
28    }
29
30    /// Set the queue to dispatch to.
31    pub fn on_queue(mut self, queue: &'static str) -> Self {
32        self.queue = Some(queue);
33        self
34    }
35
36    /// Set a delay before the job is available for processing.
37    pub fn delay(mut self, duration: Duration) -> Self {
38        self.delay = Some(duration);
39        self
40    }
41
42    /// Dispatch the job to the queue.
43    ///
44    /// In sync mode (`QUEUE_CONNECTION=sync`), the job is executed immediately
45    /// in the current task. This is useful for development and testing.
46    ///
47    /// In redis mode (`QUEUE_CONNECTION=redis`), the job is pushed to the
48    /// Redis queue for background processing by a worker.
49    pub async fn dispatch(self) -> Result<(), Error> {
50        if QueueConfig::is_sync_mode() {
51            return self.dispatch_immediately().await;
52        }
53
54        self.dispatch_to_queue().await
55    }
56
57    /// Execute the job immediately (sync mode).
58    async fn dispatch_immediately(self) -> Result<(), Error> {
59        let job_name = self.job.name();
60
61        if self.delay.is_some() {
62            tracing::debug!(
63                job = %job_name,
64                "Job delay ignored in sync mode"
65            );
66        }
67
68        tracing::debug!(job = %job_name, "Executing job synchronously");
69
70        match self.job.handle().await {
71            Ok(()) => {
72                tracing::debug!(job = %job_name, "Job completed successfully");
73                Ok(())
74            }
75            Err(e) => {
76                tracing::error!(job = %job_name, error = %e, "Job failed");
77                self.job.failed(&e).await;
78                Err(e)
79            }
80        }
81    }
82
83    /// Push the job to the Redis queue.
84    async fn dispatch_to_queue(self) -> Result<(), Error> {
85        let conn = Queue::connection();
86        let queue = self.queue.unwrap_or(&conn.config().default_queue);
87
88        let payload = match self.delay {
89            Some(delay) => JobPayload::with_delay(&self.job, queue, delay)?,
90            None => JobPayload::new(&self.job, queue)?,
91        };
92
93        conn.push(payload).await
94    }
95
96    /// Dispatch the job in a background task (fire and forget).
97    ///
98    /// This spawns the dispatch as a background task, useful when you
99    /// don't need to wait for the dispatch to complete.
100    pub fn dispatch_now(self)
101    where
102        J: Send + 'static,
103    {
104        tokio::spawn(async move {
105            if let Err(e) = self.dispatch().await {
106                tracing::error!(error = %e, "Failed to dispatch job");
107            }
108        });
109    }
110
111    /// Dispatch the job synchronously (fire and forget).
112    ///
113    /// This spawns the dispatch as a background task.
114    #[deprecated(since = "0.2.0", note = "Use dispatch_now() instead")]
115    pub fn dispatch_sync(self)
116    where
117        J: Send + 'static,
118    {
119        self.dispatch_now()
120    }
121}
122
123/// Dispatch a job using the global queue.
124///
125/// In sync mode, the job executes immediately. In redis mode, it's
126/// queued for background processing.
127///
128/// # Example
129///
130/// ```rust,ignore
131/// use ferro_queue::{dispatch, Job, Error};
132///
133/// #[derive(Debug, Serialize, Deserialize)]
134/// struct MyJob { data: String }
135///
136/// impl Job for MyJob {
137///     async fn handle(&self) -> Result<(), Error> { Ok(()) }
138/// }
139///
140/// dispatch(MyJob { data: "hello".into() }).await?;
141/// ```
142pub async fn dispatch<J>(job: J) -> Result<(), Error>
143where
144    J: Job + Serialize + DeserializeOwned,
145{
146    PendingDispatch::new(job).dispatch().await
147}
148
149/// Dispatch a job to a specific queue.
150pub async fn dispatch_to<J>(job: J, queue: &'static str) -> Result<(), Error>
151where
152    J: Job + Serialize + DeserializeOwned,
153{
154    PendingDispatch::new(job).on_queue(queue).dispatch().await
155}
156
157/// Dispatch a job with a delay.
158///
159/// Note: In sync mode, the delay is ignored and the job executes immediately.
160pub async fn dispatch_later<J>(job: J, delay: Duration) -> Result<(), Error>
161where
162    J: Job + Serialize + DeserializeOwned,
163{
164    PendingDispatch::new(job).delay(delay).dispatch().await
165}
166
167#[cfg(test)]
168mod tests {
169    use super::*;
170    use crate::async_trait;
171    use serial_test::serial;
172    use std::env;
173    use std::sync::atomic::{AtomicBool, Ordering};
174    use std::sync::Arc;
175
176    /// Guard that removes environment variables on drop, ensuring cleanup even on panic.
177    struct EnvGuard {
178        vars: Vec<String>,
179    }
180
181    impl EnvGuard {
182        fn set(key: &str, value: &str) -> Self {
183            env::set_var(key, value);
184            Self {
185                vars: vec![key.to_string()],
186            }
187        }
188    }
189
190    impl Drop for EnvGuard {
191        fn drop(&mut self) {
192            for var in &self.vars {
193                env::remove_var(var);
194            }
195        }
196    }
197
198    #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
199    struct TestJob {
200        #[serde(skip)]
201        executed: Arc<AtomicBool>,
202    }
203
204    impl TestJob {
205        fn new() -> (Self, Arc<AtomicBool>) {
206            let executed = Arc::new(AtomicBool::new(false));
207            (
208                Self {
209                    executed: executed.clone(),
210                },
211                executed,
212            )
213        }
214    }
215
216    #[async_trait]
217    impl Job for TestJob {
218        async fn handle(&self) -> Result<(), Error> {
219            self.executed.store(true, Ordering::SeqCst);
220            Ok(())
221        }
222    }
223
224    #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
225    struct FailingJob;
226
227    #[async_trait]
228    impl Job for FailingJob {
229        async fn handle(&self) -> Result<(), Error> {
230            Err(Error::job_failed("FailingJob", "intentional failure"))
231        }
232    }
233
234    #[tokio::test]
235    #[serial]
236    async fn test_sync_mode_executes_immediately() {
237        let _guard = EnvGuard::set("QUEUE_CONNECTION", "sync");
238
239        let (job, executed) = TestJob::new();
240        assert!(!executed.load(Ordering::SeqCst));
241
242        let result = PendingDispatch::new(job).dispatch().await;
243        assert!(result.is_ok());
244        assert!(executed.load(Ordering::SeqCst));
245    }
246
247    #[tokio::test]
248    #[serial]
249    async fn test_sync_mode_handles_failure() {
250        let _guard = EnvGuard::set("QUEUE_CONNECTION", "sync");
251
252        let result = PendingDispatch::new(FailingJob).dispatch().await;
253        assert!(result.is_err());
254    }
255
256    #[tokio::test]
257    #[serial]
258    async fn test_sync_mode_ignores_delay() {
259        let _guard = EnvGuard::set("QUEUE_CONNECTION", "sync");
260
261        let (job, executed) = TestJob::new();
262
263        let start = std::time::Instant::now();
264        let result = PendingDispatch::new(job)
265            .delay(Duration::from_secs(10))
266            .dispatch()
267            .await;
268
269        assert!(result.is_ok());
270        assert!(executed.load(Ordering::SeqCst));
271        // Should complete quickly, not wait 10 seconds
272        assert!(start.elapsed() < Duration::from_secs(5));
273    }
274
275    #[tokio::test]
276    #[serial]
277    async fn test_sync_mode_ignores_queue() {
278        let _guard = EnvGuard::set("QUEUE_CONNECTION", "sync");
279
280        let (job, executed) = TestJob::new();
281
282        let result = PendingDispatch::new(job)
283            .on_queue("high-priority")
284            .dispatch()
285            .await;
286
287        assert!(result.is_ok());
288        assert!(executed.load(Ordering::SeqCst));
289    }
290}