use std::error::Error;
use std::fmt::Debug;
use std::future::Future;
use std::ops::Deref;
use futures::Stream;
use url::Url;
#[cfg(feature = "rabbitmq")]
mod amqp;
#[cfg(feature = "rabbitmq")]
pub use amqp::*;
#[cfg(feature = "local")]
mod local;
#[cfg(feature = "local")]
pub use local::*;
#[async_trait::async_trait]
pub trait InputQueue {
type Handle: JobHandle<Err = Self::Err>;
type Err: Debug;
type Stream: Stream<Item = Result<JobResult<Self::Handle>, Self::Err>> + Unpin;
async fn get(&self) -> Result<JobResult<Self::Handle>, Self::Err>;
async fn into_stream(self) -> Self::Stream;
}
#[async_trait::async_trait]
pub trait OutputQueue {
type Err: Debug;
async fn put<D>(&self, data: D) -> Result<(), Self::Err>
where
D: AsRef<[u8]> + Send;
async fn close(&self) -> Result<(), Self::Err>;
}
#[async_trait::async_trait]
pub trait MakeQueue: Send + Sync {
type InputQueue: InputQueue<Err = Self::Err>;
type OutputQueue: OutputQueue<Err = Self::Err>;
type Err: Error + Send + Sync;
async fn input_queue(&self, name: &str, url: Url) -> Result<Self::InputQueue, Self::Err>;
async fn output_queue(&self, name: &str, url: Url) -> Result<Self::OutputQueue, Self::Err>;
}
#[async_trait::async_trait]
pub trait JobHandle: Send + Sync + 'static {
type Err: Debug;
async fn ack_job(&self) -> Result<(), Self::Err>;
async fn nack_job(&self) -> Result<(), Self::Err>;
}
pub struct JobResult<H>
where
H: JobHandle + 'static,
{
handle: Option<H>,
job: Vec<u8>,
}
impl<H> JobResult<H>
where
H: JobHandle,
{
pub fn new(job: Vec<u8>, handle: H) -> Self {
Self {
handle: handle.into(),
job,
}
}
async fn run_with_handle<F>(&mut self, f: impl FnOnce(H) -> F) -> Result<(), H::Err>
where
F: Future<Output = Result<(), H::Err>>,
{
if let Some(handle) = self.handle.take() {
(f)(handle).await
} else {
Ok(())
}
}
pub fn job(&self) -> &Vec<u8> {
&self.job
}
pub fn split(self) -> (Option<H>, Vec<u8>) {
(self.handle, self.job)
}
pub async fn nack_job(&mut self) -> Result<(), H::Err> {
self.run_with_handle(|h| async move { h.nack_job().await })
.await
}
pub async fn ack_job(&mut self) -> Result<(), H::Err> {
self.run_with_handle(|h| async move { h.ack_job().await })
.await
}
}
impl<H> PartialEq for JobResult<H>
where
H: JobHandle,
{
fn eq(&self, other: &Self) -> bool {
self.job == other.job
}
}
impl<H> Deref for JobResult<H>
where
H: JobHandle + Send + Sync + 'static,
{
type Target = Vec<u8>;
fn deref(&self) -> &Self::Target {
&self.job
}
}