ferro_queue/
dispatcher.rs1use crate::{Error, Job, JobPayload, Queue, QueueConfig};
4use serde::{de::DeserializeOwned, Serialize};
5use std::time::Duration;
6
7pub struct PendingDispatch<J> {
12 job: J,
13 queue: Option<&'static str>,
14 delay: Option<Duration>,
15}
16
17impl<J> PendingDispatch<J>
18where
19 J: Job + Serialize + DeserializeOwned,
20{
21 pub fn new(job: J) -> Self {
23 Self {
24 job,
25 queue: None,
26 delay: None,
27 }
28 }
29
30 pub fn on_queue(mut self, queue: &'static str) -> Self {
32 self.queue = Some(queue);
33 self
34 }
35
36 pub fn delay(mut self, duration: Duration) -> Self {
38 self.delay = Some(duration);
39 self
40 }
41
42 pub async fn dispatch(self) -> Result<(), Error> {
50 if QueueConfig::is_sync_mode() {
51 return self.dispatch_immediately().await;
52 }
53
54 self.dispatch_to_queue().await
55 }
56
57 async fn dispatch_immediately(self) -> Result<(), Error> {
59 let job_name = self.job.name();
60
61 if self.delay.is_some() {
62 tracing::debug!(
63 job = %job_name,
64 "Job delay ignored in sync mode"
65 );
66 }
67
68 tracing::debug!(job = %job_name, "Executing job synchronously");
69
70 match self.job.handle().await {
71 Ok(()) => {
72 tracing::debug!(job = %job_name, "Job completed successfully");
73 Ok(())
74 }
75 Err(e) => {
76 tracing::error!(job = %job_name, error = %e, "Job failed");
77 self.job.failed(&e).await;
78 Err(e)
79 }
80 }
81 }
82
83 async fn dispatch_to_queue(self) -> Result<(), Error> {
85 let conn = Queue::connection();
86 let queue = self.queue.unwrap_or(&conn.config().default_queue);
87
88 let payload = match self.delay {
89 Some(delay) => JobPayload::with_delay(&self.job, queue, delay)?,
90 None => JobPayload::new(&self.job, queue)?,
91 };
92
93 conn.push(payload).await
94 }
95
96 pub fn dispatch_now(self)
101 where
102 J: Send + 'static,
103 {
104 tokio::spawn(async move {
105 if let Err(e) = self.dispatch().await {
106 tracing::error!(error = %e, "Failed to dispatch job");
107 }
108 });
109 }
110
111 #[deprecated(since = "0.2.0", note = "Use dispatch_now() instead")]
115 pub fn dispatch_sync(self)
116 where
117 J: Send + 'static,
118 {
119 self.dispatch_now()
120 }
121}
122
123pub async fn dispatch<J>(job: J) -> Result<(), Error>
143where
144 J: Job + Serialize + DeserializeOwned,
145{
146 PendingDispatch::new(job).dispatch().await
147}
148
149pub async fn dispatch_to<J>(job: J, queue: &'static str) -> Result<(), Error>
151where
152 J: Job + Serialize + DeserializeOwned,
153{
154 PendingDispatch::new(job).on_queue(queue).dispatch().await
155}
156
157pub async fn dispatch_later<J>(job: J, delay: Duration) -> Result<(), Error>
161where
162 J: Job + Serialize + DeserializeOwned,
163{
164 PendingDispatch::new(job).delay(delay).dispatch().await
165}
166
167#[cfg(test)]
168mod tests {
169 use super::*;
170 use crate::async_trait;
171 use serial_test::serial;
172 use std::env;
173 use std::sync::atomic::{AtomicBool, Ordering};
174 use std::sync::Arc;
175
176 struct EnvGuard {
178 vars: Vec<String>,
179 }
180
181 impl EnvGuard {
182 fn set(key: &str, value: &str) -> Self {
183 env::set_var(key, value);
184 Self {
185 vars: vec![key.to_string()],
186 }
187 }
188 }
189
190 impl Drop for EnvGuard {
191 fn drop(&mut self) {
192 for var in &self.vars {
193 env::remove_var(var);
194 }
195 }
196 }
197
198 #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
199 struct TestJob {
200 #[serde(skip)]
201 executed: Arc<AtomicBool>,
202 }
203
204 impl TestJob {
205 fn new() -> (Self, Arc<AtomicBool>) {
206 let executed = Arc::new(AtomicBool::new(false));
207 (
208 Self {
209 executed: executed.clone(),
210 },
211 executed,
212 )
213 }
214 }
215
216 #[async_trait]
217 impl Job for TestJob {
218 async fn handle(&self) -> Result<(), Error> {
219 self.executed.store(true, Ordering::SeqCst);
220 Ok(())
221 }
222 }
223
224 #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
225 struct FailingJob;
226
227 #[async_trait]
228 impl Job for FailingJob {
229 async fn handle(&self) -> Result<(), Error> {
230 Err(Error::job_failed("FailingJob", "intentional failure"))
231 }
232 }
233
234 #[tokio::test]
235 #[serial]
236 async fn test_sync_mode_executes_immediately() {
237 let _guard = EnvGuard::set("QUEUE_CONNECTION", "sync");
238
239 let (job, executed) = TestJob::new();
240 assert!(!executed.load(Ordering::SeqCst));
241
242 let result = PendingDispatch::new(job).dispatch().await;
243 assert!(result.is_ok());
244 assert!(executed.load(Ordering::SeqCst));
245 }
246
247 #[tokio::test]
248 #[serial]
249 async fn test_sync_mode_handles_failure() {
250 let _guard = EnvGuard::set("QUEUE_CONNECTION", "sync");
251
252 let result = PendingDispatch::new(FailingJob).dispatch().await;
253 assert!(result.is_err());
254 }
255
256 #[tokio::test]
257 #[serial]
258 async fn test_sync_mode_ignores_delay() {
259 let _guard = EnvGuard::set("QUEUE_CONNECTION", "sync");
260
261 let (job, executed) = TestJob::new();
262
263 let start = std::time::Instant::now();
264 let result = PendingDispatch::new(job)
265 .delay(Duration::from_secs(10))
266 .dispatch()
267 .await;
268
269 assert!(result.is_ok());
270 assert!(executed.load(Ordering::SeqCst));
271 assert!(start.elapsed() < Duration::from_secs(5));
273 }
274
275 #[tokio::test]
276 #[serial]
277 async fn test_sync_mode_ignores_queue() {
278 let _guard = EnvGuard::set("QUEUE_CONNECTION", "sync");
279
280 let (job, executed) = TestJob::new();
281
282 let result = PendingDispatch::new(job)
283 .on_queue("high-priority")
284 .dispatch()
285 .await;
286
287 assert!(result.is_ok());
288 assert!(executed.load(Ordering::SeqCst));
289 }
290}