Skip to main content

simple_queue/queue/
wait_for_job.rs

1use std::sync::OnceLock;
2
3use dashmap::DashMap;
4
5use crate::{BoxDynError, Job};
6
7static WAITING_CHANNELS: OnceLock<DashMap<uuid::Uuid, tokio::sync::oneshot::Sender<()>>> =
8    OnceLock::new();
9
10fn get_waiting_channels() -> &'static DashMap<uuid::Uuid, tokio::sync::oneshot::Sender<()>> {
11    WAITING_CHANNELS.get_or_init(|| DashMap::new())
12}
13
14/// Register a oneshot channel for the given job ID and return the receiver.
15/// When the job completes (i.e., the associated [`Guard`] is dropped), the receiver
16/// will be signaled with `Ok(())`.
17pub fn wait_for_job(id: uuid::Uuid) -> tokio::sync::oneshot::Receiver<()> {
18    let (tx, rx) = tokio::sync::oneshot::channel();
19    let dm = get_waiting_channels();
20    dm.insert(id, tx);
21    rx
22}
23
24/// A RAII guard that signals job completion when dropped.
25/// If a sender is held, dropping this guard will send `()` to the paired receiver,
26/// unblocking any caller awaiting [`wait_for_job`].
27pub struct Guard(Option<tokio::sync::oneshot::Sender<()>>);
28/// Remove the waiting channel for the given job ID and return a [`Guard`].
29/// Returns `None` if no waiter was registered for this job ID.
30/// When the returned guard is dropped, the paired receiver will be signaled.
31pub fn get_waiting_guard(id: uuid::Uuid) -> Option<Guard> {
32    let dm = get_waiting_channels();
33    if let Some((_id, ch)) = dm.remove(&id) {
34        Some(Guard(Some(ch)))
35    } else {
36        None
37    }
38}
39impl Drop for Guard {
40    fn drop(&mut self) {
41        if let Some(ch) = self.0.take() {
42            let _ = ch.send(());
43        }
44    }
45}
46
47type WaitingRx = tokio::sync::oneshot::Receiver<()>;
48impl super::SimpleQueue {
49    /// Insert a single job into the queue and return a receiver that will be
50    /// signaled when the job completes. If the insert was a no-op (e.g., due to
51    /// a duplicate unique key), `Ok(None)` is returned.
52    ///
53    /// **Note:** The channel returns on the first processing attempt, regardless
54    /// of whether the job succeeds or fails.
55    pub async fn insert_job_and_wait(
56        &self,
57        job: Job,
58    ) -> Result<Option<(WaitingRx, uuid::Uuid)>, BoxDynError> {
59        self.insert_job(job).await.map(|res| {
60            if let Some(id) = res {
61                let rx = wait_for_job(id);
62                Some((rx, id))
63            } else {
64                None
65            }
66        })
67    }
68    /// Insert multiple jobs into the queue and return a receiver for each job
69    /// that will be signaled when the corresponding job completes.
70    ///
71    /// **Note:** Each channel returns on the first processing attempt, regardless
72    /// of whether the job succeeds or fails.
73    pub async fn insert_jobs_and_wait(
74        &self,
75        jobs: Vec<Job>,
76    ) -> Result<Vec<(WaitingRx, uuid::Uuid)>, BoxDynError> {
77        self.insert_jobs(jobs).await.map(|vec| {
78            vec.into_iter()
79                .map(|id| {
80                    let rx = wait_for_job(id);
81                    (rx, id)
82                })
83                .collect()
84        })
85    }
86}