1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
//! Module for creating and managing long-running Subscriptions
//! to incoming events in the [`EventStore`].
//!
//! [`EventStore`]: ../store/trait.EventStore.html

use std::error::Error as StdError;
use std::sync::atomic::{AtomicU32, Ordering};
use std::sync::Arc;

use futures::future::{ok, BoxFuture, FutureExt};
use futures::stream::{BoxStream, StreamExt, TryStreamExt};

use crate::store::{EventStore, Persisted, Select};

/// Stream of events returned by the [`EventSubscriber::subscribe_all`] method.
///
/// [`EventSubscriber::subscribe_all`]: trait.EventSubscriber.html#method.subscribe_all
pub type EventStream<'a, S> = BoxStream<
    'a,
    Result<
        Persisted<<S as EventSubscriber>::SourceId, <S as EventSubscriber>::Event>,
        <S as EventSubscriber>::Error,
    >,
>;

/// Component to let users subscribe to newly-inserted events into the [`EventStore`].
///
/// Check out [`subscribe_all`] for more information.
///
/// Additional information can be found in the [_Volatile Subscription_] section
/// of eventstore.com
///
/// [_Volatile Subscription_]: https://eventstore.com/docs/getting-started/reading-subscribing-events/index.html#volatile-subscriptions
/// [`EventStore`]: ../store/trait.EventStore.html
pub trait EventSubscriber {
    /// Type of the Source id, typically an [`AggregateId`].
    ///
    /// [`AggregateId`]: ../aggregate/type.AggregateId.html
    type SourceId: Eq;

    /// Event type stored in the [`EventStore`], typically an [`Aggregate::Event`].
    ///
    /// [`Aggregate::Event`]: ../aggregate/trait.Aggregate.html#associatedtype.Event
    /// [`EventStore`]: ../store/trait.EventStore.html
    type Event;

    /// Possible errors returned when receiving events from the notification channel.
    type Error;

    /// Subscribes to all new events persisted in the [`EventStore`], from
    /// the moment of calling this function, in the future.
    ///
    /// Since this is a long-running stream, make sure not to *block*
    /// or await the full computation of the stream.
    ///
    /// Prefer using a `while let` consumer for this [`EventStream`]:
    ///
    /// ```text
    /// let stream = subscriber.subscribe_all().await?;
    ///
    /// while let Some(event) = stream.next().await {
    ///     // Do stuff with the received event...
    /// }
    /// ```
    ///
    /// [`EventStore`]: ../store/trait.EventStore.html
    /// [`EventStream`]: type.EventStream.html
    fn subscribe_all(&self) -> BoxFuture<Result<EventStream<Self>, Self::Error>>;
}

/// Stream of events returned by the [`Subscription::resume`] method.
///
/// [`Subscription::resume`]: trait.Subscription.html#method.resume
pub type SubscriptionStream<'a, S> = BoxStream<
    'a,
    Result<
        Persisted<<S as Subscription>::SourceId, <S as Subscription>::Event>,
        <S as Subscription>::Error,
    >,
>;

/// A Subscription to an [`EventStream`] which can be "checkpointed":
/// keeps a record of the latest message processed by itself using [`checkpoint`],
/// and can resume working from such message by using the [`resume`].
///
/// [`EventStream`]: type.EventStream.html
/// [`resume`]: trait.Subscription.html#method.resume
/// [`checkpoint`]: trait.Subscription.html#method.checkpoint
pub trait Subscription {
    /// Type of the Source id, typically an [`AggregateId`].
    ///
    /// [`AggregateId`]: ../aggregate/type.AggregateId.html
    type SourceId: Eq;

    /// Event type stored in the [`EventStore`], typically an [`Aggregate::Event`].
    ///
    /// [`Aggregate::Event`]: ../aggregate/trait.Aggregate.html#associatedtype.Event
    /// [`EventStore`]: ../store/trait.EventStore.html
    type Event;

    /// Possible errors returned when receiving events from the notification channel.
    type Error;

    /// Resumes the current state of a `Subscription` by returning the [`EventStream`],
    /// starting from the last event processed by the `Subscription`.
    ///
    /// [`EventStream`]: type.EventStream.html
    fn resume(&self) -> BoxFuture<Result<SubscriptionStream<Self>, Self::Error>>;

    /// Saves the provided version (or sequence number) as the latest
    /// version processed.
    fn checkpoint(&self, version: u32) -> BoxFuture<Result<(), Self::Error>>;
}

/// Error type returned by a [`Transient`] Subscription.
///
/// [`Transient`]: struct.Transient.html
#[derive(Debug, thiserror::Error)]
pub enum Error {
    /// Error caused by the Subscription's [`EventStore`].
    ///
    /// [`EventStore`]: ../store/trait.EventStore.html
    #[error("error received while listening to the event stream from the store: {0}")]
    Store(#[source] anyhow::Error),

    /// Error caused by the Subscription's [`EventSubscriber`].
    ///
    /// [`EventSubscriber`]: trait.EventSubscriber.html
    #[error("error received while listening to the event stream subscription: {0}")]
    Subscription(#[source] anyhow::Error),
}

/// [`Subscription`] type which gets deleted once the process using it
/// gets terminated.
///
/// Useful for in-memory or one-off [`Projection`]s.
///
/// [`Subscription`]: trait.Subscription.html
/// [`Projection`]: ../projection/trait.Projection.html
pub struct Transient<Store, Subscriber> {
    store: Store,
    subscriber: Subscriber,
    last_sequence_number: Arc<AtomicU32>,
}

impl<Store, Subscriber> Transient<Store, Subscriber> {
    /// Creates a new [`Subscription`] using the specified [`EventStore`]
    /// and [`EventSubscriber`] to create the [`SubscriptionStream`] from.
    ///
    /// [`Subscription`]: trait.Subscription.html
    /// [`EventStore`]: ../store/trait.EventStore.html
    /// [`EventSubscriber`]: trait.EventSubscriber.html
    /// [`SubscriptionStream`]: type.SubscriptionStream.html
    pub fn new(store: Store, subscriber: Subscriber) -> Self {
        Self {
            store,
            subscriber,
            last_sequence_number: Default::default(),
        }
    }

    /// Specifies the sequence number of the `Event` the [`SubscriptionStream`]
    /// should start from when calling [`run`].
    ///
    /// [`SubscriptionStream`]: type.SubscriptionStream.html
    /// [`run`]: struct.Transient.html#method.run
    pub fn from(self, sequence_number: u32) -> Self {
        self.last_sequence_number
            .store(sequence_number, Ordering::Relaxed);

        self
    }
}

impl<Store, Subscriber> Subscription for Transient<Store, Subscriber>
where
    Store: EventStore + Send + Sync,
    Subscriber: EventSubscriber<
            SourceId = <Store as EventStore>::SourceId,
            Event = <Store as EventStore>::Event,
        > + Send
        + Sync,
    <Store as EventStore>::SourceId: Send + Sync,
    <Store as EventStore>::Event: Send + Sync,
    <Store as EventStore>::Error: StdError + Send + Sync + 'static,
    <Subscriber as EventSubscriber>::Error: StdError + Send + Sync + 'static,
{
    type SourceId = Store::SourceId;
    type Event = Store::Event;
    type Error = Error;

    fn resume(&self) -> BoxFuture<Result<SubscriptionStream<Self>, Self::Error>> {
        Box::pin(async move {
            // Create the Subscription first, so that once the future has been resolved
            // we'll start receiving events right away.
            //
            // This is to avoid losing events when waiting for the one-off stream
            // to resolve its future.
            //
            // The impact is that we _might_ get duplicated events from the one-off stream
            // and the subscription stream. Luckily, we can discard those by
            // keeping an internal state of the last processed sequence number,
            // and discard all those events that are found.
            let subscription = self
                .subscriber
                .subscribe_all()
                .await
                .map_err(anyhow::Error::from)
                .map_err(Error::Store)?;

            let one_off_stream = self
                .store
                .stream_all(Select::From(
                    self.last_sequence_number.load(Ordering::Relaxed),
                ))
                .await
                .map_err(anyhow::Error::from)
                .map_err(Error::Subscription)?;

            let stream = one_off_stream
                .map_err(anyhow::Error::from)
                .map_err(Error::Store)
                .chain(
                    subscription
                        .map_err(anyhow::Error::from)
                        .map_err(Error::Subscription),
                )
                .try_filter_map(move |event| async move {
                    let expected_sequence_number =
                        self.last_sequence_number.load(Ordering::Relaxed);

                    let event_sequence_number = event.sequence_number();

                    if event_sequence_number < expected_sequence_number {
                        #[cfg(feature = "with-tracing")]
                        tracing::trace!(
                            event.sequence_number = event_sequence_number,
                            subscription.sequence_number = expected_sequence_number,
                            "Duplicated event detected; skipping"
                        );

                        return Ok(None);
                    }

                    Ok(Some(event))
                })
                .boxed();

            Ok(stream)
        })
    }

    fn checkpoint(&self, version: u32) -> BoxFuture<Result<(), Self::Error>> {
        // Checkpointing happens in memory on the atomic sequence number checkpoint.
        self.last_sequence_number.store(version, Ordering::Relaxed);
        ok(()).boxed()
    }
}