use crate::ValidatedMessage;
use async_trait::async_trait;
use either::Either;
use futures_util::stream;
use pin_project::pin_project;
use std::{
pin::Pin,
task::{Context, Poll},
};
pub trait Consumer {
type AckToken: AcknowledgeToken;
type Error;
type Stream: stream::Stream<
Item = Result<AcknowledgeableMessage<Self::AckToken, ValidatedMessage>, Self::Error>,
>;
fn stream(self) -> Self::Stream;
fn consume<M>(self, decoder: M::Decoder) -> MessageStream<Self::Stream, M::Decoder, M>
where
Self: Sized,
M: DecodableMessage,
{
MessageStream {
stream: self.stream(),
decoder,
_message_type: std::marker::PhantomData,
}
}
}
pub trait DecodableMessage {
type Error;
type Decoder;
fn decode(msg: ValidatedMessage, decoder: &Self::Decoder) -> Result<Self, Self::Error>
where
Self: Sized;
}
#[derive(Debug)]
#[must_use = "Messages should be ack'ed to prevent repeated delivery, or nack'ed to improve responsiveness"]
pub struct AcknowledgeableMessage<A, M> {
pub ack_token: A,
pub message: M,
}
impl<A, M> AcknowledgeableMessage<A, M>
where
A: AcknowledgeToken,
{
pub async fn ack(self) -> Result<M, A::AckError> {
self.ack_token.ack().await?;
Ok(self.message)
}
pub async fn nack(self) -> Result<M, A::NackError> {
self.ack_token.nack().await?;
Ok(self.message)
}
pub async fn modify_deadline(&mut self, seconds: u32) -> Result<(), A::ModifyError> {
self.ack_token.modify_deadline(seconds).await
}
}
impl<A, M> std::ops::Deref for AcknowledgeableMessage<A, M> {
type Target = M;
fn deref(&self) -> &M {
&self.message
}
}
impl<A, M> std::ops::DerefMut for AcknowledgeableMessage<A, M> {
fn deref_mut(&mut self) -> &mut M {
&mut self.message
}
}
#[async_trait]
#[must_use = "Messages should be ack'ed to prevent repeated delivery, or nack'ed to improve responsiveness"]
pub trait AcknowledgeToken {
type AckError;
type NackError;
type ModifyError;
async fn ack(self) -> Result<(), Self::AckError>;
async fn nack(self) -> Result<(), Self::NackError>;
async fn modify_deadline(&mut self, seconds: u32) -> Result<(), Self::ModifyError>;
}
#[pin_project]
#[derive(Debug)]
pub struct MessageStream<S, D, M> {
#[pin]
stream: S,
decoder: D,
_message_type: std::marker::PhantomData<M>,
}
impl<S, D, M, AckToken, StreamError> stream::Stream for MessageStream<S, D, M>
where
S: stream::Stream<
Item = Result<AcknowledgeableMessage<AckToken, ValidatedMessage>, StreamError>,
>,
M: DecodableMessage<Decoder = D>,
{
#[allow(clippy::type_complexity)] type Item = Result<AcknowledgeableMessage<AckToken, M>, Either<StreamError, M::Error>>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
let this = self.project();
let decoder = this.decoder;
this.stream.poll_next(cx).map(|opt| {
opt.map(|res| {
res.map_err(Either::Left).and_then(
|AcknowledgeableMessage { ack_token, message }| {
Ok(AcknowledgeableMessage {
ack_token,
message: M::decode(message, decoder).map_err(Either::Right)?,
})
},
)
})
})
}
}