1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201
//! Types, traits, and functions necessary to consume messages using hedwig
//!
//! See the [`Consumer`] trait.
use crate::message::ValidatedMessage;
use async_trait::async_trait;
use bytes::Bytes;
use either::Either;
use futures_util::stream;
use pin_project::pin_project;
use std::{
pin::Pin,
task::{Context, Poll},
};
pub use hedwig_core::message::DecodableMessage;
/// Message consumers ingest messages from a queue service and present them to the user application
/// as a [`Stream`](futures_util::stream::Stream).
///
/// ## Message Decoding
///
/// Messages pulled from the service are assumed to have been created by some [hedwig
/// publisher](crate::Publisher) and therefore were validated against the included schema
/// when publishing. It is the decoder's responsibility (when provided to functions like
/// [`consume`](Consumer::consume)) to check this schema and the accompanying payload for validity.
///
/// ## Acknowledging Messages
/// Typically message services deliver messages with a particular delivery time window, during
/// which this message won't be sent to other consumers. In AWS SQS this is called the [visibility
/// timeout][AWS], and in GCP PubSub this is the [ack deadline][GCP].
///
/// If a message is successfully acknowledged within this time, it will be considered processed and
/// not delivered to other consumers (and possibly deleted depending on the service's
/// configuration). A message can conversely be negatively-acknowledged, to indicate e.g.
/// processing has failed and the message should be delivered again to some consumer. This time
/// window can also be modified for each message, to allow for longer or shorter message processing
/// than the default configured time window.
///
/// Implementations of this trait do not ack/nack/modify messages themselves, and instead present
/// this functionality to users with the [`AcknowledgeableMessage`] type. Message processors are
/// responsible for handling message acknowledgement, including extensions for processing time as
/// necessary.
///
/// Bear in mind that message delivery and acknowledgement are all best-effort in distributed
/// message services. An acknowledged or extended message may still be re-delivered for any number
/// of reasons, and applications should be made resilient to such events.
///
/// [AWS]: https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-visibility-timeout.html
/// [GCP]: https://cloud.google.com/pubsub/docs/subscriber
// If we had async drop, sending nacks on drop would be nice. Alas, rust isn't there yet
pub trait Consumer {
/// The type of acknowledgement tokens produced by the underlying service implementation
type AckToken: AcknowledgeToken;
/// Errors encountered while streaming messages
type Error;
/// The stream returned by [`stream`]
type Stream: stream::Stream<
Item = Result<AcknowledgeableMessage<Self::AckToken, ValidatedMessage<Bytes>>, Self::Error>,
>;
/// Begin pulling messages from the backing message service.
///
/// The messages produced by this stream have not been decoded yet. Users should typically call
/// [`consume`](Consumer::consume) instead, to produce decoded messages.
fn stream(self) -> Self::Stream;
/// Create a stream of decoded messages from this consumer, using a decoder for the given
/// [decodable](DecodableMessage) message type.
fn consume<M>(self, decoder: M::Decoder) -> MessageStream<Self::Stream, M::Decoder, M>
where
Self: Sized,
M: DecodableMessage,
{
MessageStream {
stream: self.stream(),
decoder,
_message_type: std::marker::PhantomData,
}
}
}
/// A received message which can be acknowledged to prevent re-delivery by the backing message
/// service.
///
/// See the documentation for acknowledging messages on [`Consumer`]
#[derive(Debug)]
#[must_use = "Messages should be ack'ed to prevent repeated delivery, or nack'ed to improve responsiveness"]
pub struct AcknowledgeableMessage<A, M> {
/// The acknowledgement token which executes the ack/nack/modify operations
pub ack_token: A,
/// The underlying message
pub message: M,
}
impl<A, M> AcknowledgeableMessage<A, M>
where
A: AcknowledgeToken,
{
/// Acknowledge this message, declaring that processing was successful and the message should
/// not be re-delivered to consumers.
pub async fn ack(self) -> Result<M, A::AckError> {
self.ack_token.ack().await?;
Ok(self.message)
}
/// Negatively acknowledge this message, declaring that processing was unsuccessful and the
/// message should be re-delivered to consumers.
pub async fn nack(self) -> Result<M, A::NackError> {
self.ack_token.nack().await?;
Ok(self.message)
}
/// Modify the acknowledgement deadline for this message to the given number of seconds.
///
/// The new deadline will typically be this number of seconds after the service receives this
/// modification requesst, though users should check their implementation's documented
/// behavior.
pub async fn modify_deadline(&mut self, seconds: u32) -> Result<(), A::ModifyError> {
self.ack_token.modify_deadline(seconds).await
}
}
impl<A, M> std::ops::Deref for AcknowledgeableMessage<A, M> {
type Target = M;
fn deref(&self) -> &M {
&self.message
}
}
impl<A, M> std::ops::DerefMut for AcknowledgeableMessage<A, M> {
fn deref_mut(&mut self) -> &mut M {
&mut self.message
}
}
/// A token associated with some message received from a message service, used to issue an
/// ack/nack/modify request
///
/// See the documentation for acknowledging messages on [`Consumer`]
#[async_trait]
#[must_use = "Messages should be ack'ed to prevent repeated delivery, or nack'ed to improve responsiveness"]
pub trait AcknowledgeToken {
/// Errors returned by [`ack`](AcknowledgeToken::ack)
type AckError;
/// Errors returned by [`nack`](AcknowledgeToken::nack)
type NackError;
/// Errors returned by [`modify_deadline`](AcknowledgeToken::modify_deadline)
type ModifyError;
/// Acknowledge the associated message
async fn ack(self) -> Result<(), Self::AckError>;
/// Negatively acknowledge the associated message
async fn nack(self) -> Result<(), Self::NackError>;
/// Change the associated message's acknowledge deadline to the given number of seconds
// uses u32 seconds instead of e.g. Duration because SQS and PubSub both have second
// granularity; Duration::from_millis(999) would truncate to 0, which might be surprising
async fn modify_deadline(&mut self, seconds: u32) -> Result<(), Self::ModifyError>;
}
/// The stream returned by the [`consume`](Consumer::consume) function
#[pin_project]
#[derive(Debug)]
pub struct MessageStream<S, D, M> {
#[pin]
stream: S,
decoder: D,
_message_type: std::marker::PhantomData<M>,
}
impl<S, D, M, AckToken, StreamError> stream::Stream for MessageStream<S, D, M>
where
S: stream::Stream<
Item = Result<AcknowledgeableMessage<AckToken, ValidatedMessage<Bytes>>, StreamError>,
>,
M: DecodableMessage<Decoder = D>,
{
#[allow(clippy::type_complexity)] // it is what it is, aliases would all be generic anyway
type Item = Result<AcknowledgeableMessage<AckToken, M>, Either<StreamError, M::Error>>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
let this = self.project();
let decoder = this.decoder;
this.stream.poll_next(cx).map(|opt| {
opt.map(|res| {
res.map_err(Either::Left).and_then(
|AcknowledgeableMessage { ack_token, message }| {
Ok(AcknowledgeableMessage {
ack_token,
message: M::decode(message, decoder).map_err(Either::Right)?,
})
},
)
})
})
}
}