ferro_queue/
dispatcher.rs1use crate::{Error, Job, JobPayload, Queue, QueueConfig};
4use serde::{de::DeserializeOwned, Serialize};
5use std::time::Duration;
6
7pub struct PendingDispatch<J> {
12 job: J,
13 queue: Option<&'static str>,
14 delay: Option<Duration>,
15}
16
17impl<J> PendingDispatch<J>
18where
19 J: Job + Serialize + DeserializeOwned,
20{
21 pub fn new(job: J) -> Self {
23 Self {
24 job,
25 queue: None,
26 delay: None,
27 }
28 }
29
30 pub fn on_queue(mut self, queue: &'static str) -> Self {
32 self.queue = Some(queue);
33 self
34 }
35
36 pub fn delay(mut self, duration: Duration) -> Self {
38 self.delay = Some(duration);
39 self
40 }
41
42 pub async fn dispatch(self) -> Result<(), Error> {
50 if QueueConfig::is_sync_mode() {
51 return self.dispatch_immediately().await;
52 }
53
54 self.dispatch_to_queue().await
55 }
56
57 async fn dispatch_immediately(self) -> Result<(), Error> {
59 let job_name = self.job.name();
60
61 if self.delay.is_some() {
62 tracing::debug!(
63 job = %job_name,
64 "Job delay ignored in sync mode"
65 );
66 }
67
68 tracing::debug!(job = %job_name, "Executing job synchronously");
69
70 match self.job.handle().await {
71 Ok(()) => {
72 tracing::debug!(job = %job_name, "Job completed successfully");
73 Ok(())
74 }
75 Err(e) => {
76 tracing::error!(job = %job_name, error = %e, "Job failed");
77 self.job.failed(&e).await;
78 Err(e)
79 }
80 }
81 }
82
83 async fn dispatch_to_queue(self) -> Result<(), Error> {
85 let conn = Queue::connection();
86 let queue = self.queue.unwrap_or(&conn.config().default_queue);
87
88 let payload = match self.delay {
89 Some(delay) => JobPayload::with_delay(&self.job, queue, delay)?,
90 None => JobPayload::new(&self.job, queue)?,
91 };
92
93 conn.push(payload).await
94 }
95
96 pub fn dispatch_now(self)
101 where
102 J: Send + 'static,
103 {
104 tokio::spawn(async move {
105 if let Err(e) = self.dispatch().await {
106 tracing::error!(error = %e, "Failed to dispatch job");
107 }
108 });
109 }
110
111 #[deprecated(since = "0.2.0", note = "Use dispatch_now() instead")]
115 pub fn dispatch_sync(self)
116 where
117 J: Send + 'static,
118 {
119 self.dispatch_now()
120 }
121}
122
123pub async fn dispatch<J>(job: J) -> Result<(), Error>
143where
144 J: Job + Serialize + DeserializeOwned,
145{
146 PendingDispatch::new(job).dispatch().await
147}
148
149pub async fn dispatch_to<J>(job: J, queue: &'static str) -> Result<(), Error>
151where
152 J: Job + Serialize + DeserializeOwned,
153{
154 PendingDispatch::new(job).on_queue(queue).dispatch().await
155}
156
157pub async fn dispatch_later<J>(job: J, delay: Duration) -> Result<(), Error>
161where
162 J: Job + Serialize + DeserializeOwned,
163{
164 PendingDispatch::new(job).delay(delay).dispatch().await
165}
166
167#[cfg(test)]
168mod tests {
169 use super::*;
170 use crate::async_trait;
171 use serial_test::serial;
172 use std::env;
173 use std::sync::atomic::{AtomicBool, Ordering};
174 use std::sync::Arc;
175
176 #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
177 struct TestJob {
178 #[serde(skip)]
179 executed: Arc<AtomicBool>,
180 }
181
182 impl TestJob {
183 fn new() -> (Self, Arc<AtomicBool>) {
184 let executed = Arc::new(AtomicBool::new(false));
185 (
186 Self {
187 executed: executed.clone(),
188 },
189 executed,
190 )
191 }
192 }
193
194 #[async_trait]
195 impl Job for TestJob {
196 async fn handle(&self) -> Result<(), Error> {
197 self.executed.store(true, Ordering::SeqCst);
198 Ok(())
199 }
200 }
201
202 #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
203 struct FailingJob;
204
205 #[async_trait]
206 impl Job for FailingJob {
207 async fn handle(&self) -> Result<(), Error> {
208 Err(Error::job_failed("FailingJob", "intentional failure"))
209 }
210 }
211
212 #[tokio::test]
213 #[serial]
214 async fn test_sync_mode_executes_immediately() {
215 env::set_var("QUEUE_CONNECTION", "sync");
216
217 let (job, executed) = TestJob::new();
218 assert!(!executed.load(Ordering::SeqCst));
219
220 let result = PendingDispatch::new(job).dispatch().await;
221 assert!(result.is_ok());
222 assert!(executed.load(Ordering::SeqCst));
223
224 env::remove_var("QUEUE_CONNECTION");
225 }
226
227 #[tokio::test]
228 #[serial]
229 async fn test_sync_mode_handles_failure() {
230 env::set_var("QUEUE_CONNECTION", "sync");
231
232 let result = PendingDispatch::new(FailingJob).dispatch().await;
233 assert!(result.is_err());
234
235 env::remove_var("QUEUE_CONNECTION");
236 }
237
238 #[tokio::test]
239 #[serial]
240 async fn test_sync_mode_ignores_delay() {
241 env::set_var("QUEUE_CONNECTION", "sync");
242
243 let (job, executed) = TestJob::new();
244
245 let start = std::time::Instant::now();
246 let result = PendingDispatch::new(job)
247 .delay(Duration::from_secs(10))
248 .dispatch()
249 .await;
250
251 assert!(result.is_ok());
252 assert!(executed.load(Ordering::SeqCst));
253 assert!(start.elapsed() < Duration::from_secs(1));
255
256 env::remove_var("QUEUE_CONNECTION");
257 }
258
259 #[tokio::test]
260 #[serial]
261 async fn test_sync_mode_ignores_queue() {
262 env::set_var("QUEUE_CONNECTION", "sync");
263
264 let (job, executed) = TestJob::new();
265
266 let result = PendingDispatch::new(job)
267 .on_queue("high-priority")
268 .dispatch()
269 .await;
270
271 assert!(result.is_ok());
272 assert!(executed.load(Ordering::SeqCst));
273
274 env::remove_var("QUEUE_CONNECTION");
275 }
276}