net_queue/
lib.rs

1use std::error::Error;
2use std::fmt::Debug;
3use std::future::Future;
4use std::ops::Deref;
5
6use futures::Stream;
7
8use url::Url;
9
10#[cfg(feature = "rabbitmq")]
11mod amqp;
12#[cfg(feature = "rabbitmq")]
13pub use amqp::*;
14
15#[cfg(feature = "local")]
16mod local;
17#[cfg(feature = "local")]
18pub use local::*;
19
20/// The receive-only side of a queue
21#[async_trait::async_trait]
22pub trait InputQueue {
23    /// The type of handle used to ack/nack items received from this queue
24    type Handle: JobHandle<Err = Self::Err>;
25
26    /// The type of error that can occur while getting an item from this queue
27    type Err: Debug;
28
29    /// The type of [`Stream`] that this [`InputQueue`] produces
30    ///
31    /// [`Stream`]: futures::Stream
32    /// [`InputQueue`]: Self
33    type Stream: Stream<Item = Result<JobResult<Self::Handle>, Self::Err>> + Unpin;
34
35    /// Receive a message from this [`InputQueue`]
36    ///
37    /// [`InputQueue`]: Self
38    async fn get(&self) -> Result<JobResult<Self::Handle>, Self::Err>;
39
40    /// Convert this [`InputQueue`] into a [`Stream`]
41    ///
42    /// [`InputQueue`]: Self
43    /// [`Stream`]: futures::Stream
44    async fn into_stream(self) -> Self::Stream;
45}
46
47/// The send-only side of a queue
48#[async_trait::async_trait]
49pub trait OutputQueue {
50    /// The type of error that can occur sending messages to this [`OutputQueue`]
51    ///
52    /// [`OutputQueue`]: Self
53    type Err: Debug;
54
55    /// Put a job in this [`OutputQueue`]
56    ///
57    /// [`OutputQueue`]: Self
58    async fn put<D>(&self, data: D) -> Result<(), Self::Err>
59    where
60        D: AsRef<[u8]> + Send;
61
62    /// Close this [`OutputQueue`] signaling we don't want to receive anymore messages
63    ///
64    /// [`OutputQueue`]: Self
65    async fn close(&self) -> Result<(), Self::Err>;
66}
67
68#[async_trait::async_trait]
69/// The queue factory trait that takes care of creating queues
70pub trait MakeQueue: Send + Sync {
71    /// The type of [`InputQueue`] returned by this factory
72    ///
73    /// [`InputQueue`]: self::InputQueue
74    type InputQueue: InputQueue<Err = Self::Err>;
75
76    /// The type of [`OutputQueue`] returned by this factory
77    ///
78    /// [`OutputQueue`]: self::OutputQueue
79    type OutputQueue: OutputQueue<Err = Self::Err>;
80
81    /// The type of error that can occur when creating a job queue
82    type Err: Error + Send + Sync;
83
84    /// Create a new job queue using this factory
85    async fn input_queue(&self, name: &str, url: Url) -> Result<Self::InputQueue, Self::Err>;
86
87    /// Create a new [`OutputQueue`] with this [`MakeQueue`]
88    ///
89    /// [`OutputQueue`]: self::OutputQueue
90    /// [`MakeQueue`]: self::MakeQueue
91    async fn output_queue(&self, name: &str, url: Url) -> Result<Self::OutputQueue, Self::Err>;
92}
93
94/// A trait to manager job timeouts and (n)acks
95#[async_trait::async_trait]
96pub trait JobHandle: Send + Sync + 'static {
97    /// Type of errors that can occur
98    type Err: Debug;
99
100    /// Ack the job referred by this `JobHandle`
101    async fn ack_job(&self) -> Result<(), Self::Err>;
102
103    /// N-ack the job referred by this [`JobHandle`], this must trigger a requeue if the
104    /// amount of tries has not exceeded the maximum amount
105    async fn nack_job(&self) -> Result<(), Self::Err>;
106}
107
108/// A struct that holds both the job data and a JobHandle used to acknowledge jobs completion
109pub struct JobResult<H>
110where
111    H: JobHandle + 'static,
112{
113    handle: Option<H>,
114    job: Vec<u8>,
115}
116
117impl<H> JobResult<H>
118where
119    H: JobHandle,
120{
121    /// Create a new JobResult from a job and a JobHandle to acknowledge job completion
122    pub fn new(job: Vec<u8>, handle: H) -> Self {
123        Self {
124            handle: handle.into(),
125            job,
126        }
127    }
128
129    async fn run_with_handle<F>(&mut self, f: impl FnOnce(H) -> F) -> Result<(), H::Err>
130    where
131        F: Future<Output = Result<(), H::Err>>,
132    {
133        if let Some(handle) = self.handle.take() {
134            (f)(handle).await
135        } else {
136            Ok(())
137        }
138    }
139
140    /// Get a reference to the job contained in this `JobResult`
141    pub fn job(&self) -> &Vec<u8> {
142        &self.job
143    }
144
145    /// Split this result into its handle if it has not been already used and the actual job content
146    pub fn split(self) -> (Option<H>, Vec<u8>) {
147        (self.handle, self.job)
148    }
149
150    /// Nack the job associated with this `JobResult`
151    pub async fn nack_job(&mut self) -> Result<(), H::Err> {
152        self.run_with_handle(|h| async move { h.nack_job().await })
153            .await
154    }
155
156    /// Ack the job associated with this `JobResult`
157    pub async fn ack_job(&mut self) -> Result<(), H::Err> {
158        self.run_with_handle(|h| async move { h.ack_job().await })
159            .await
160    }
161}
162
163impl<H> PartialEq for JobResult<H>
164where
165    H: JobHandle,
166{
167    fn eq(&self, other: &Self) -> bool {
168        self.job == other.job
169    }
170}
171
172impl<H> Deref for JobResult<H>
173where
174    H: JobHandle + Send + Sync + 'static,
175{
176    type Target = Vec<u8>;
177
178    fn deref(&self) -> &Self::Target {
179        &self.job
180    }
181}