ferro_queue/
dispatcher.rs1use crate::{Error, Job, QueueConfig};
4use serde::{de::DeserializeOwned, Serialize};
5use std::sync::OnceLock;
6use std::time::Duration;
7
8static TENANT_ID_HOOK: OnceLock<fn() -> Option<i64>> = OnceLock::new();
12
13pub fn register_tenant_capture_hook(f: fn() -> Option<i64>) {
19 let _ = TENANT_ID_HOOK.set(f);
20}
21
22pub struct PendingDispatch<J> {
27 job: J,
28 queue: Option<&'static str>,
29 delay: Option<Duration>,
30 tenant_id: Option<i64>,
32}
33
34impl<J> PendingDispatch<J>
35where
36 J: Job + Serialize + DeserializeOwned,
37{
38 pub fn new(job: J) -> Self {
40 Self {
41 job,
42 queue: None,
43 delay: None,
44 tenant_id: None,
45 }
46 }
47
48 pub fn on_queue(mut self, queue: &'static str) -> Self {
50 self.queue = Some(queue);
51 self
52 }
53
54 pub fn delay(mut self, duration: Duration) -> Self {
56 self.delay = Some(duration);
57 self
58 }
59
60 pub fn for_tenant(mut self, tenant_id: i64) -> Self {
66 self.tenant_id = Some(tenant_id);
67 self
68 }
69
70 fn captured_tenant_id(&self) -> Option<i64> {
74 self.tenant_id
75 .or_else(|| TENANT_ID_HOOK.get().and_then(|f| f()))
76 }
77
78 pub async fn dispatch(self) -> Result<(), Error> {
86 if QueueConfig::is_sync_mode() {
87 return self.dispatch_immediately().await;
88 }
89
90 self.dispatch_to_queue().await
91 }
92
93 async fn dispatch_immediately(self) -> Result<(), Error> {
98 let job_name = self.job.name();
99
100 if self.delay.is_some() {
101 tracing::debug!(
102 job = %job_name,
103 "Job delay ignored in sync mode"
104 );
105 }
106
107 if self.tenant_id.is_some() {
108 tracing::debug!(
109 job = %job_name,
110 tenant_id = ?self.tenant_id,
111 "for_tenant() ignored in sync mode — current task tenant context applies"
112 );
113 }
114
115 tracing::debug!(job = %job_name, "Executing job synchronously");
116
117 match self.job.handle().await {
118 Ok(()) => {
119 tracing::debug!(job = %job_name, "Job completed successfully");
120 Ok(())
121 }
122 Err(e) => {
123 tracing::error!(job = %job_name, error = %e, "Job failed");
124 self.job.failed(&e).await;
125 Err(e)
126 }
127 }
128 }
129
130 async fn dispatch_to_queue(self) -> Result<(), Error> {
132 let conn = crate::db::Queue::connection();
133 let queue = self.queue.unwrap_or("default");
134 let tenant_id = self.captured_tenant_id();
135 let now = chrono::Utc::now();
136 let available_at = match self.delay {
137 Some(d) => now + chrono::Duration::from_std(d).unwrap_or_default(),
138 None => now,
139 };
140 let payload = serde_json::to_string(&self.job)
141 .map_err(|e| Error::SerializationFailed(e.to_string()))?;
142 let job_type = self.job.name().to_string();
143 let max_retries = self.job.max_retries();
144 let idempotency_key = self.job.idempotency_key();
145 crate::db::enqueue(
146 conn,
147 queue,
148 &job_type,
149 &payload,
150 max_retries,
151 idempotency_key.as_deref(),
152 tenant_id,
153 available_at,
154 )
155 .await
156 }
157
158 pub fn dispatch_now(self)
163 where
164 J: Send + 'static,
165 {
166 tokio::spawn(async move {
167 if let Err(e) = self.dispatch().await {
168 tracing::error!(error = %e, "Failed to dispatch job");
169 }
170 });
171 }
172
173 #[deprecated(since = "0.2.0", note = "Use dispatch_now() instead")]
177 pub fn dispatch_sync(self)
178 where
179 J: Send + 'static,
180 {
181 self.dispatch_now()
182 }
183}
184
185pub async fn dispatch<J>(job: J) -> Result<(), Error>
205where
206 J: Job + Serialize + DeserializeOwned,
207{
208 PendingDispatch::new(job).dispatch().await
209}
210
211pub async fn dispatch_to<J>(job: J, queue: &'static str) -> Result<(), Error>
213where
214 J: Job + Serialize + DeserializeOwned,
215{
216 PendingDispatch::new(job).on_queue(queue).dispatch().await
217}
218
219pub async fn dispatch_later<J>(job: J, delay: Duration) -> Result<(), Error>
223where
224 J: Job + Serialize + DeserializeOwned,
225{
226 PendingDispatch::new(job).delay(delay).dispatch().await
227}
228
229#[cfg(test)]
230mod tests {
231 use super::*;
232 use crate::async_trait;
233 use serial_test::serial;
234 use std::env;
235 use std::sync::atomic::{AtomicBool, Ordering};
236 use std::sync::Arc;
237
238 struct EnvGuard {
240 vars: Vec<String>,
241 }
242
243 impl EnvGuard {
244 fn set(key: &str, value: &str) -> Self {
245 env::set_var(key, value);
246 Self {
247 vars: vec![key.to_string()],
248 }
249 }
250 }
251
252 impl Drop for EnvGuard {
253 fn drop(&mut self) {
254 for var in &self.vars {
255 env::remove_var(var);
256 }
257 }
258 }
259
260 #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
261 struct TestJob {
262 #[serde(skip)]
263 executed: Arc<AtomicBool>,
264 }
265
266 impl TestJob {
267 fn new() -> (Self, Arc<AtomicBool>) {
268 let executed = Arc::new(AtomicBool::new(false));
269 (
270 Self {
271 executed: executed.clone(),
272 },
273 executed,
274 )
275 }
276 }
277
278 #[async_trait]
279 impl Job for TestJob {
280 async fn handle(&self) -> Result<(), Error> {
281 self.executed.store(true, Ordering::SeqCst);
282 Ok(())
283 }
284 }
285
286 #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
287 struct FailingJob;
288
289 #[async_trait]
290 impl Job for FailingJob {
291 async fn handle(&self) -> Result<(), Error> {
292 Err(Error::job_failed("FailingJob", "intentional failure"))
293 }
294 }
295
296 #[tokio::test]
297 #[serial]
298 async fn test_sync_mode_executes_immediately() {
299 let _guard = EnvGuard::set("QUEUE_CONNECTION", "sync");
300
301 let (job, executed) = TestJob::new();
302 assert!(!executed.load(Ordering::SeqCst));
303
304 let result = PendingDispatch::new(job).dispatch().await;
305 assert!(result.is_ok());
306 assert!(executed.load(Ordering::SeqCst));
307 }
308
309 #[tokio::test]
310 #[serial]
311 async fn test_sync_mode_handles_failure() {
312 let _guard = EnvGuard::set("QUEUE_CONNECTION", "sync");
313
314 let result = PendingDispatch::new(FailingJob).dispatch().await;
315 assert!(result.is_err());
316 }
317
318 #[tokio::test]
319 #[serial]
320 async fn test_sync_mode_ignores_delay() {
321 let _guard = EnvGuard::set("QUEUE_CONNECTION", "sync");
322
323 let (job, executed) = TestJob::new();
324
325 let start = std::time::Instant::now();
326 let result = PendingDispatch::new(job)
327 .delay(Duration::from_secs(10))
328 .dispatch()
329 .await;
330
331 assert!(result.is_ok());
332 assert!(executed.load(Ordering::SeqCst));
333 assert!(start.elapsed() < Duration::from_secs(5));
335 }
336
337 #[tokio::test]
338 #[serial]
339 async fn test_sync_mode_ignores_queue() {
340 let _guard = EnvGuard::set("QUEUE_CONNECTION", "sync");
341
342 let (job, executed) = TestJob::new();
343
344 let result = PendingDispatch::new(job)
345 .on_queue("high-priority")
346 .dispatch()
347 .await;
348
349 assert!(result.is_ok());
350 assert!(executed.load(Ordering::SeqCst));
351 }
352
353 #[test]
356 fn test_for_tenant_stores_explicit_override() {
357 let (job, _) = TestJob::new();
358 let pending = PendingDispatch::new(job).for_tenant(99);
359 assert_eq!(pending.tenant_id, Some(99));
360 }
361
362 #[test]
363 fn test_for_tenant_explicit_wins_over_hook() {
364 let (job, _) = TestJob::new();
368 let pending = PendingDispatch::new(job).for_tenant(99);
369 assert_eq!(pending.captured_tenant_id(), Some(99));
371 }
372
373 #[test]
374 fn test_no_tenant_id_by_default() {
375 let (job, _) = TestJob::new();
376 let pending = PendingDispatch::new(job);
377 assert_eq!(pending.tenant_id, None);
378 }
379
380 #[test]
381 fn test_hook_registration_second_call_is_noop() {
382 register_tenant_capture_hook(|| Some(42));
385 register_tenant_capture_hook(|| Some(999)); let result = TENANT_ID_HOOK.get().map(|f| f());
388 let _ = result;
392 }
393
394 #[test]
395 fn test_hook_registration_captures_at_dispatch_time() {
396 register_tenant_capture_hook(|| Some(42));
399 let (job, _) = TestJob::new();
401 let pending = PendingDispatch::new(job);
402 let captured = pending.captured_tenant_id();
404 assert!(captured.is_none() || captured == Some(42));
407 }
408}