1use std::error::Error;
2use std::fmt::Debug;
3use std::future::Future;
4use std::ops::Deref;
5
6use futures::Stream;
7
8use url::Url;
9
10#[cfg(feature = "rabbitmq")]
11mod amqp;
12#[cfg(feature = "rabbitmq")]
13pub use amqp::*;
14
15#[cfg(feature = "local")]
16mod local;
17#[cfg(feature = "local")]
18pub use local::*;
19
20#[async_trait::async_trait]
22pub trait InputQueue {
23 type Handle: JobHandle<Err = Self::Err>;
25
26 type Err: Debug;
28
29 type Stream: Stream<Item = Result<JobResult<Self::Handle>, Self::Err>> + Unpin;
34
35 async fn get(&self) -> Result<JobResult<Self::Handle>, Self::Err>;
39
40 async fn into_stream(self) -> Self::Stream;
45}
46
47#[async_trait::async_trait]
49pub trait OutputQueue {
50 type Err: Debug;
54
55 async fn put<D>(&self, data: D) -> Result<(), Self::Err>
59 where
60 D: AsRef<[u8]> + Send;
61
62 async fn close(&self) -> Result<(), Self::Err>;
66}
67
68#[async_trait::async_trait]
69pub trait MakeQueue: Send + Sync {
71 type InputQueue: InputQueue<Err = Self::Err>;
75
76 type OutputQueue: OutputQueue<Err = Self::Err>;
80
81 type Err: Error + Send + Sync;
83
84 async fn input_queue(&self, name: &str, url: Url) -> Result<Self::InputQueue, Self::Err>;
86
87 async fn output_queue(&self, name: &str, url: Url) -> Result<Self::OutputQueue, Self::Err>;
92}
93
94#[async_trait::async_trait]
96pub trait JobHandle: Send + Sync + 'static {
97 type Err: Debug;
99
100 async fn ack_job(&self) -> Result<(), Self::Err>;
102
103 async fn nack_job(&self) -> Result<(), Self::Err>;
106}
107
108pub struct JobResult<H>
110where
111 H: JobHandle + 'static,
112{
113 handle: Option<H>,
114 job: Vec<u8>,
115}
116
117impl<H> JobResult<H>
118where
119 H: JobHandle,
120{
121 pub fn new(job: Vec<u8>, handle: H) -> Self {
123 Self {
124 handle: handle.into(),
125 job,
126 }
127 }
128
129 async fn run_with_handle<F>(&mut self, f: impl FnOnce(H) -> F) -> Result<(), H::Err>
130 where
131 F: Future<Output = Result<(), H::Err>>,
132 {
133 if let Some(handle) = self.handle.take() {
134 (f)(handle).await
135 } else {
136 Ok(())
137 }
138 }
139
140 pub fn job(&self) -> &Vec<u8> {
142 &self.job
143 }
144
145 pub fn split(self) -> (Option<H>, Vec<u8>) {
147 (self.handle, self.job)
148 }
149
150 pub async fn nack_job(&mut self) -> Result<(), H::Err> {
152 self.run_with_handle(|h| async move { h.nack_job().await })
153 .await
154 }
155
156 pub async fn ack_job(&mut self) -> Result<(), H::Err> {
158 self.run_with_handle(|h| async move { h.ack_job().await })
159 .await
160 }
161}
162
163impl<H> PartialEq for JobResult<H>
164where
165 H: JobHandle,
166{
167 fn eq(&self, other: &Self) -> bool {
168 self.job == other.job
169 }
170}
171
172impl<H> Deref for JobResult<H>
173where
174 H: JobHandle + Send + Sync + 'static,
175{
176 type Target = Vec<u8>;
177
178 fn deref(&self) -> &Self::Target {
179 &self.job
180 }
181}