ferro_queue/
dispatcher.rs1use crate::{Error, Job, JobPayload, Queue, QueueConfig};
4use serde::{de::DeserializeOwned, Serialize};
5use std::sync::OnceLock;
6use std::time::Duration;
7
8static TENANT_ID_HOOK: OnceLock<fn() -> Option<i64>> = OnceLock::new();
12
13pub fn register_tenant_capture_hook(f: fn() -> Option<i64>) {
19 let _ = TENANT_ID_HOOK.set(f);
20}
21
22pub struct PendingDispatch<J> {
27 job: J,
28 queue: Option<&'static str>,
29 delay: Option<Duration>,
30 tenant_id: Option<i64>,
32}
33
34impl<J> PendingDispatch<J>
35where
36 J: Job + Serialize + DeserializeOwned,
37{
38 pub fn new(job: J) -> Self {
40 Self {
41 job,
42 queue: None,
43 delay: None,
44 tenant_id: None,
45 }
46 }
47
48 pub fn on_queue(mut self, queue: &'static str) -> Self {
50 self.queue = Some(queue);
51 self
52 }
53
54 pub fn delay(mut self, duration: Duration) -> Self {
56 self.delay = Some(duration);
57 self
58 }
59
60 pub fn for_tenant(mut self, tenant_id: i64) -> Self {
66 self.tenant_id = Some(tenant_id);
67 self
68 }
69
70 fn captured_tenant_id(&self) -> Option<i64> {
74 self.tenant_id
75 .or_else(|| TENANT_ID_HOOK.get().and_then(|f| f()))
76 }
77
78 pub async fn dispatch(self) -> Result<(), Error> {
86 if QueueConfig::is_sync_mode() {
87 return self.dispatch_immediately().await;
88 }
89
90 self.dispatch_to_queue().await
91 }
92
93 async fn dispatch_immediately(self) -> Result<(), Error> {
98 let job_name = self.job.name();
99
100 if self.delay.is_some() {
101 tracing::debug!(
102 job = %job_name,
103 "Job delay ignored in sync mode"
104 );
105 }
106
107 if self.tenant_id.is_some() {
108 tracing::debug!(
109 job = %job_name,
110 tenant_id = ?self.tenant_id,
111 "for_tenant() ignored in sync mode — current task tenant context applies"
112 );
113 }
114
115 tracing::debug!(job = %job_name, "Executing job synchronously");
116
117 match self.job.handle().await {
118 Ok(()) => {
119 tracing::debug!(job = %job_name, "Job completed successfully");
120 Ok(())
121 }
122 Err(e) => {
123 tracing::error!(job = %job_name, error = %e, "Job failed");
124 self.job.failed(&e).await;
125 Err(e)
126 }
127 }
128 }
129
130 async fn dispatch_to_queue(self) -> Result<(), Error> {
132 let conn = Queue::connection();
133 let queue = self.queue.unwrap_or(&conn.config().default_queue);
134 let tenant_id = self.captured_tenant_id();
135
136 let payload = match self.delay {
137 Some(delay) => JobPayload::with_delay(&self.job, queue, delay)?,
138 None => JobPayload::new(&self.job, queue)?,
139 };
140
141 let payload = payload.with_tenant_id(tenant_id);
142
143 conn.push(payload).await
144 }
145
146 pub fn dispatch_now(self)
151 where
152 J: Send + 'static,
153 {
154 tokio::spawn(async move {
155 if let Err(e) = self.dispatch().await {
156 tracing::error!(error = %e, "Failed to dispatch job");
157 }
158 });
159 }
160
161 #[deprecated(since = "0.2.0", note = "Use dispatch_now() instead")]
165 pub fn dispatch_sync(self)
166 where
167 J: Send + 'static,
168 {
169 self.dispatch_now()
170 }
171}
172
173pub async fn dispatch<J>(job: J) -> Result<(), Error>
193where
194 J: Job + Serialize + DeserializeOwned,
195{
196 PendingDispatch::new(job).dispatch().await
197}
198
199pub async fn dispatch_to<J>(job: J, queue: &'static str) -> Result<(), Error>
201where
202 J: Job + Serialize + DeserializeOwned,
203{
204 PendingDispatch::new(job).on_queue(queue).dispatch().await
205}
206
207pub async fn dispatch_later<J>(job: J, delay: Duration) -> Result<(), Error>
211where
212 J: Job + Serialize + DeserializeOwned,
213{
214 PendingDispatch::new(job).delay(delay).dispatch().await
215}
216
217#[cfg(test)]
218mod tests {
219 use super::*;
220 use crate::async_trait;
221 use serial_test::serial;
222 use std::env;
223 use std::sync::atomic::{AtomicBool, Ordering};
224 use std::sync::Arc;
225
226 struct EnvGuard {
228 vars: Vec<String>,
229 }
230
231 impl EnvGuard {
232 fn set(key: &str, value: &str) -> Self {
233 env::set_var(key, value);
234 Self {
235 vars: vec![key.to_string()],
236 }
237 }
238 }
239
240 impl Drop for EnvGuard {
241 fn drop(&mut self) {
242 for var in &self.vars {
243 env::remove_var(var);
244 }
245 }
246 }
247
248 #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
249 struct TestJob {
250 #[serde(skip)]
251 executed: Arc<AtomicBool>,
252 }
253
254 impl TestJob {
255 fn new() -> (Self, Arc<AtomicBool>) {
256 let executed = Arc::new(AtomicBool::new(false));
257 (
258 Self {
259 executed: executed.clone(),
260 },
261 executed,
262 )
263 }
264 }
265
266 #[async_trait]
267 impl Job for TestJob {
268 async fn handle(&self) -> Result<(), Error> {
269 self.executed.store(true, Ordering::SeqCst);
270 Ok(())
271 }
272 }
273
274 #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
275 struct FailingJob;
276
277 #[async_trait]
278 impl Job for FailingJob {
279 async fn handle(&self) -> Result<(), Error> {
280 Err(Error::job_failed("FailingJob", "intentional failure"))
281 }
282 }
283
284 #[tokio::test]
285 #[serial]
286 async fn test_sync_mode_executes_immediately() {
287 let _guard = EnvGuard::set("QUEUE_CONNECTION", "sync");
288
289 let (job, executed) = TestJob::new();
290 assert!(!executed.load(Ordering::SeqCst));
291
292 let result = PendingDispatch::new(job).dispatch().await;
293 assert!(result.is_ok());
294 assert!(executed.load(Ordering::SeqCst));
295 }
296
297 #[tokio::test]
298 #[serial]
299 async fn test_sync_mode_handles_failure() {
300 let _guard = EnvGuard::set("QUEUE_CONNECTION", "sync");
301
302 let result = PendingDispatch::new(FailingJob).dispatch().await;
303 assert!(result.is_err());
304 }
305
306 #[tokio::test]
307 #[serial]
308 async fn test_sync_mode_ignores_delay() {
309 let _guard = EnvGuard::set("QUEUE_CONNECTION", "sync");
310
311 let (job, executed) = TestJob::new();
312
313 let start = std::time::Instant::now();
314 let result = PendingDispatch::new(job)
315 .delay(Duration::from_secs(10))
316 .dispatch()
317 .await;
318
319 assert!(result.is_ok());
320 assert!(executed.load(Ordering::SeqCst));
321 assert!(start.elapsed() < Duration::from_secs(5));
323 }
324
325 #[tokio::test]
326 #[serial]
327 async fn test_sync_mode_ignores_queue() {
328 let _guard = EnvGuard::set("QUEUE_CONNECTION", "sync");
329
330 let (job, executed) = TestJob::new();
331
332 let result = PendingDispatch::new(job)
333 .on_queue("high-priority")
334 .dispatch()
335 .await;
336
337 assert!(result.is_ok());
338 assert!(executed.load(Ordering::SeqCst));
339 }
340
341 #[test]
344 fn test_for_tenant_stores_explicit_override() {
345 let (job, _) = TestJob::new();
346 let pending = PendingDispatch::new(job).for_tenant(99);
347 assert_eq!(pending.tenant_id, Some(99));
348 }
349
350 #[test]
351 fn test_for_tenant_explicit_wins_over_hook() {
352 let (job, _) = TestJob::new();
356 let pending = PendingDispatch::new(job).for_tenant(99);
357 assert_eq!(pending.captured_tenant_id(), Some(99));
359 }
360
361 #[test]
362 fn test_no_tenant_id_by_default() {
363 let (job, _) = TestJob::new();
364 let pending = PendingDispatch::new(job);
365 assert_eq!(pending.tenant_id, None);
366 }
367
368 #[test]
369 fn test_hook_registration_second_call_is_noop() {
370 register_tenant_capture_hook(|| Some(42));
373 register_tenant_capture_hook(|| Some(999)); let result = TENANT_ID_HOOK.get().map(|f| f());
376 let _ = result;
380 }
381
382 #[test]
383 fn test_hook_registration_captures_at_dispatch_time() {
384 register_tenant_capture_hook(|| Some(42));
387 let (job, _) = TestJob::new();
389 let pending = PendingDispatch::new(job);
390 let captured = pending.captured_tenant_id();
392 assert!(captured.is_none() || captured == Some(42));
395 }
396}