simple_queue/queue/wait_for_job.rs
1use std::sync::OnceLock;
2
3use dashmap::DashMap;
4
5use crate::{BoxDynError, Job};
6
7static WAITING_CHANNELS: OnceLock<DashMap<uuid::Uuid, tokio::sync::oneshot::Sender<()>>> =
8 OnceLock::new();
9
10fn get_waiting_channels() -> &'static DashMap<uuid::Uuid, tokio::sync::oneshot::Sender<()>> {
11 WAITING_CHANNELS.get_or_init(|| DashMap::new())
12}
13
14/// Register a oneshot channel for the given job ID and return the receiver.
15/// When the job completes (i.e., the associated [`Guard`] is dropped), the receiver
16/// will be signaled with `Ok(())`.
17pub fn wait_for_job(id: uuid::Uuid) -> tokio::sync::oneshot::Receiver<()> {
18 let (tx, rx) = tokio::sync::oneshot::channel();
19 let dm = get_waiting_channels();
20 dm.insert(id, tx);
21 rx
22}
23
24/// A RAII guard that signals job completion when dropped.
25/// If a sender is held, dropping this guard will send `()` to the paired receiver,
26/// unblocking any caller awaiting [`wait_for_job`].
27pub struct Guard(Option<tokio::sync::oneshot::Sender<()>>);
28/// Remove the waiting channel for the given job ID and return a [`Guard`].
29/// Returns `None` if no waiter was registered for this job ID.
30/// When the returned guard is dropped, the paired receiver will be signaled.
31pub fn get_waiting_guard(id: uuid::Uuid) -> Option<Guard> {
32 let dm = get_waiting_channels();
33 if let Some((_id, ch)) = dm.remove(&id) {
34 Some(Guard(Some(ch)))
35 } else {
36 None
37 }
38}
39impl Drop for Guard {
40 fn drop(&mut self) {
41 if let Some(ch) = self.0.take() {
42 let _ = ch.send(());
43 }
44 }
45}
46
47type WaitingRx = tokio::sync::oneshot::Receiver<()>;
48impl super::SimpleQueue {
49 /// Insert a single job into the queue and return a receiver that will be
50 /// signaled when the job completes. If the insert was a no-op (e.g., due to
51 /// a duplicate unique key), `Ok(None)` is returned.
52 ///
53 /// **Note:** The channel returns on the first processing attempt, regardless
54 /// of whether the job succeeds or fails.
55 pub async fn insert_job_and_wait(
56 &self,
57 job: Job,
58 ) -> Result<Option<(WaitingRx, uuid::Uuid)>, BoxDynError> {
59 self.insert_job(job).await.map(|res| {
60 if let Some(id) = res {
61 let rx = wait_for_job(id);
62 Some((rx, id))
63 } else {
64 None
65 }
66 })
67 }
68 /// Insert multiple jobs into the queue and return a receiver for each job
69 /// that will be signaled when the corresponding job completes.
70 ///
71 /// **Note:** Each channel returns on the first processing attempt, regardless
72 /// of whether the job succeeds or fails.
73 pub async fn insert_jobs_and_wait(
74 &self,
75 jobs: Vec<Job>,
76 ) -> Result<Vec<(WaitingRx, uuid::Uuid)>, BoxDynError> {
77 self.insert_jobs(jobs).await.map(|vec| {
78 vec.into_iter()
79 .map(|id| {
80 let rx = wait_for_job(id);
81 (rx, id)
82 })
83 .collect()
84 })
85 }
86}