1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
//! Types, traits, and functions necessary to consume messages using hedwig
//!
//! See the [`Consumer`] trait.

use crate::message::ValidatedMessage;
use async_trait::async_trait;
use bytes::Bytes;
use either::Either;
use futures_util::stream;
use pin_project::pin_project;
use std::{
    pin::Pin,
    task::{Context, Poll},
};

pub use hedwig_core::message::DecodableMessage;

/// Message consumers ingest messages from a queue service and present them to the user application
/// as a [`Stream`](futures_util::stream::Stream).
///
/// ## Message Decoding
///
/// Messages pulled from the service are assumed to have been created by some [hedwig
/// publisher](crate::Publisher) and therefore were validated against the included schema
/// when publishing. It is the decoder's responsibility (when provided to functions like
/// [`consume`](Consumer::consume)) to check this schema and the accompanying payload for validity.
///
/// ## Acknowledging Messages
/// Typically message services deliver messages with a particular delivery time window, during
/// which this message won't be sent to other consumers. In AWS SQS this is called the [visibility
/// timeout][AWS], and in GCP PubSub this is the [ack deadline][GCP].
///
/// If a message is successfully acknowledged within this time, it will be considered processed and
/// not delivered to other consumers (and possibly deleted depending on the service's
/// configuration). A message can conversely be negatively-acknowledged, to indicate e.g.
/// processing has failed and the message should be delivered again to some consumer. This time
/// window can also be modified for each message, to allow for longer or shorter message processing
/// than the default configured time window.
///
/// Implementations of this trait do not ack/nack/modify messages themselves, and instead present
/// this functionality to users with the [`AcknowledgeableMessage`] type. Message processors are
/// responsible for handling message acknowledgement, including extensions for processing time as
/// necessary.
///
/// Bear in mind that message delivery and acknowledgement are all best-effort in distributed
/// message services. An acknowledged or extended message may still be re-delivered for any number
/// of reasons, and applications should be made resilient to such events.
///
/// [AWS]: https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-visibility-timeout.html
/// [GCP]: https://cloud.google.com/pubsub/docs/subscriber
// If we had async drop, sending nacks on drop would be nice. Alas, rust isn't there yet
pub trait Consumer {
    /// The type of acknowledgement tokens produced by the underlying service implementation
    type AckToken: AcknowledgeToken;
    /// Errors encountered while streaming messages
    type Error;
    /// The stream returned by [`stream`]
    type Stream: stream::Stream<
        Item = Result<AcknowledgeableMessage<Self::AckToken, ValidatedMessage<Bytes>>, Self::Error>,
    >;

    /// Begin pulling messages from the backing message service.
    ///
    /// The messages produced by this stream have not been decoded yet. Users should typically call
    /// [`consume`](Consumer::consume) instead, to produce decoded messages.
    fn stream(self) -> Self::Stream;

    /// Create a stream of decoded messages from this consumer, using a decoder for the given
    /// [decodable](DecodableMessage) message type.
    fn consume<M>(self, decoder: M::Decoder) -> MessageStream<Self::Stream, M::Decoder, M>
    where
        Self: Sized,
        M: DecodableMessage,
    {
        MessageStream {
            stream: self.stream(),
            decoder,
            _message_type: std::marker::PhantomData,
        }
    }
}

/// A received message which can be acknowledged to prevent re-delivery by the backing message
/// service.
///
/// See the documentation for acknowledging messages on [`Consumer`]
#[derive(Debug)]
#[must_use = "Messages should be ack'ed to prevent repeated delivery, or nack'ed to improve responsiveness"]
pub struct AcknowledgeableMessage<A, M> {
    /// The acknowledgement token which executes the ack/nack/modify operations
    pub ack_token: A,

    /// The underlying message
    pub message: M,
}

impl<A, M> AcknowledgeableMessage<A, M>
where
    A: AcknowledgeToken,
{
    /// Acknowledge this message, declaring that processing was successful and the message should
    /// not be re-delivered to consumers.
    pub async fn ack(self) -> Result<M, A::AckError> {
        self.ack_token.ack().await?;
        Ok(self.message)
    }

    /// Negatively acknowledge this message, declaring that processing was unsuccessful and the
    /// message should be re-delivered to consumers.
    pub async fn nack(self) -> Result<M, A::NackError> {
        self.ack_token.nack().await?;
        Ok(self.message)
    }

    /// Modify the acknowledgement deadline for this message to the given number of seconds.
    ///
    /// The new deadline will typically be this number of seconds after the service receives this
    /// modification requesst, though users should check their implementation's documented
    /// behavior.
    pub async fn modify_deadline(&mut self, seconds: u32) -> Result<(), A::ModifyError> {
        self.ack_token.modify_deadline(seconds).await
    }
}

impl<A, M> std::ops::Deref for AcknowledgeableMessage<A, M> {
    type Target = M;

    fn deref(&self) -> &M {
        &self.message
    }
}

impl<A, M> std::ops::DerefMut for AcknowledgeableMessage<A, M> {
    fn deref_mut(&mut self) -> &mut M {
        &mut self.message
    }
}

/// A token associated with some message received from a message service, used to issue an
/// ack/nack/modify request
///
/// See the documentation for acknowledging messages on [`Consumer`]
#[async_trait]
#[must_use = "Messages should be ack'ed to prevent repeated delivery, or nack'ed to improve responsiveness"]
pub trait AcknowledgeToken {
    /// Errors returned by [`ack`](AcknowledgeToken::ack)
    type AckError;
    /// Errors returned by [`nack`](AcknowledgeToken::nack)
    type NackError;
    /// Errors returned by [`modify_deadline`](AcknowledgeToken::modify_deadline)
    type ModifyError;

    /// Acknowledge the associated message
    async fn ack(self) -> Result<(), Self::AckError>;

    /// Negatively acknowledge the associated message
    async fn nack(self) -> Result<(), Self::NackError>;

    /// Change the associated message's acknowledge deadline to the given number of seconds
    // uses u32 seconds instead of e.g. Duration because SQS and PubSub both have second
    // granularity; Duration::from_millis(999) would truncate to 0, which might be surprising
    async fn modify_deadline(&mut self, seconds: u32) -> Result<(), Self::ModifyError>;
}

/// The stream returned by the [`consume`](Consumer::consume) function
#[pin_project]
#[derive(Debug)]
pub struct MessageStream<S, D, M> {
    #[pin]
    stream: S,
    decoder: D,
    _message_type: std::marker::PhantomData<M>,
}

impl<S, D, M, AckToken, StreamError> stream::Stream for MessageStream<S, D, M>
where
    S: stream::Stream<
        Item = Result<AcknowledgeableMessage<AckToken, ValidatedMessage<Bytes>>, StreamError>,
    >,
    M: DecodableMessage<Decoder = D>,
{
    #[allow(clippy::type_complexity)] // it is what it is, aliases would all be generic anyway
    type Item = Result<AcknowledgeableMessage<AckToken, M>, Either<StreamError, M::Error>>;

    fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
        let this = self.project();
        let decoder = this.decoder;
        this.stream.poll_next(cx).map(|opt| {
            opt.map(|res| {
                res.map_err(Either::Left).and_then(
                    |AcknowledgeableMessage { ack_token, message }| {
                        Ok(AcknowledgeableMessage {
                            ack_token,
                            message: M::decode(message, decoder).map_err(Either::Right)?,
                        })
                    },
                )
            })
        })
    }
}