eventually_core/
subscription.rs

1//! Module for creating and managing long-running Subscriptions
2//! to incoming events in the [`EventStore`].
3//!
4//! [`EventStore`]: ../store/trait.EventStore.html
5
6use std::error::Error as StdError;
7use std::sync::atomic::{AtomicU32, Ordering};
8use std::sync::Arc;
9
10use futures::future::{ok, BoxFuture, FutureExt};
11use futures::stream::{BoxStream, StreamExt, TryStreamExt};
12
13use crate::store::{EventStore, Persisted, Select};
14
15/// Stream of events returned by the [`EventSubscriber::subscribe_all`] method.
16///
17/// [`EventSubscriber::subscribe_all`]: trait.EventSubscriber.html#method.subscribe_all
18pub type EventStream<'a, S> = BoxStream<
19    'a,
20    Result<
21        Persisted<<S as EventSubscriber>::SourceId, <S as EventSubscriber>::Event>,
22        <S as EventSubscriber>::Error,
23    >,
24>;
25
26/// Component to let users subscribe to newly-inserted events into the [`EventStore`].
27///
28/// Check out [`subscribe_all`] for more information.
29///
30/// Additional information can be found in the [_Volatile Subscription_] section
31/// of eventstore.com
32///
33/// [_Volatile Subscription_]: https://eventstore.com/docs/getting-started/reading-subscribing-events/index.html#volatile-subscriptions
34/// [`EventStore`]: ../store/trait.EventStore.html
35pub trait EventSubscriber {
36    /// Type of the Source id, typically an [`AggregateId`].
37    ///
38    /// [`AggregateId`]: ../aggregate/type.AggregateId.html
39    type SourceId: Eq;
40
41    /// Event type stored in the [`EventStore`], typically an [`Aggregate::Event`].
42    ///
43    /// [`Aggregate::Event`]: ../aggregate/trait.Aggregate.html#associatedtype.Event
44    /// [`EventStore`]: ../store/trait.EventStore.html
45    type Event;
46
47    /// Possible errors returned when receiving events from the notification channel.
48    type Error;
49
50    /// Subscribes to all new events persisted in the [`EventStore`], from
51    /// the moment of calling this function, in the future.
52    ///
53    /// Since this is a long-running stream, make sure not to *block*
54    /// or await the full computation of the stream.
55    ///
56    /// Prefer using a `while let` consumer for this [`EventStream`]:
57    ///
58    /// ```text
59    /// let stream = subscriber.subscribe_all().await?;
60    ///
61    /// while let Some(event) = stream.next().await {
62    ///     // Do stuff with the received event...
63    /// }
64    /// ```
65    ///
66    /// [`EventStore`]: ../store/trait.EventStore.html
67    /// [`EventStream`]: type.EventStream.html
68    fn subscribe_all(&self) -> BoxFuture<Result<EventStream<Self>, Self::Error>>;
69}
70
71/// Stream of events returned by the [`Subscription::resume`] method.
72///
73/// [`Subscription::resume`]: trait.Subscription.html#method.resume
74pub type SubscriptionStream<'a, S> = BoxStream<
75    'a,
76    Result<
77        Persisted<<S as Subscription>::SourceId, <S as Subscription>::Event>,
78        <S as Subscription>::Error,
79    >,
80>;
81
82/// A Subscription to an [`EventStream`] which can be "checkpointed":
83/// keeps a record of the latest message processed by itself using [`checkpoint`],
84/// and can resume working from such message by using the [`resume`].
85///
86/// [`EventStream`]: type.EventStream.html
87/// [`resume`]: trait.Subscription.html#method.resume
88/// [`checkpoint`]: trait.Subscription.html#method.checkpoint
89pub trait Subscription {
90    /// Type of the Source id, typically an [`AggregateId`].
91    ///
92    /// [`AggregateId`]: ../aggregate/type.AggregateId.html
93    type SourceId: Eq;
94
95    /// Event type stored in the [`EventStore`], typically an [`Aggregate::Event`].
96    ///
97    /// [`Aggregate::Event`]: ../aggregate/trait.Aggregate.html#associatedtype.Event
98    /// [`EventStore`]: ../store/trait.EventStore.html
99    type Event;
100
101    /// Possible errors returned when receiving events from the notification channel.
102    type Error;
103
104    /// Resumes the current state of a `Subscription` by returning the [`EventStream`],
105    /// starting from the last event processed by the `Subscription`.
106    ///
107    /// [`EventStream`]: type.EventStream.html
108    fn resume(&self) -> BoxFuture<Result<SubscriptionStream<Self>, Self::Error>>;
109
110    /// Saves the provided version (or sequence number) as the latest
111    /// version processed.
112    fn checkpoint(&self, version: u32) -> BoxFuture<Result<(), Self::Error>>;
113}
114
115/// Error type returned by a [`Transient`] Subscription.
116///
117/// [`Transient`]: struct.Transient.html
118#[derive(Debug, thiserror::Error)]
119pub enum Error {
120    /// Error caused by the Subscription's [`EventStore`].
121    ///
122    /// [`EventStore`]: ../store/trait.EventStore.html
123    #[error("error received while listening to the event stream from the store: {0}")]
124    Store(#[source] anyhow::Error),
125
126    /// Error caused by the Subscription's [`EventSubscriber`].
127    ///
128    /// [`EventSubscriber`]: trait.EventSubscriber.html
129    #[error("error received while listening to the event stream subscription: {0}")]
130    Subscription(#[source] anyhow::Error),
131}
132
133/// [`Subscription`] type which gets deleted once the process using it
134/// gets terminated.
135///
136/// Useful for in-memory or one-off [`Projection`]s.
137///
138/// [`Subscription`]: trait.Subscription.html
139/// [`Projection`]: ../projection/trait.Projection.html
140pub struct Transient<Store, Subscriber> {
141    store: Store,
142    subscriber: Subscriber,
143    last_sequence_number: Arc<AtomicU32>,
144}
145
146impl<Store, Subscriber> Transient<Store, Subscriber> {
147    /// Creates a new [`Subscription`] using the specified [`EventStore`]
148    /// and [`EventSubscriber`] to create the [`SubscriptionStream`] from.
149    ///
150    /// [`Subscription`]: trait.Subscription.html
151    /// [`EventStore`]: ../store/trait.EventStore.html
152    /// [`EventSubscriber`]: trait.EventSubscriber.html
153    /// [`SubscriptionStream`]: type.SubscriptionStream.html
154    pub fn new(store: Store, subscriber: Subscriber) -> Self {
155        Self {
156            store,
157            subscriber,
158            last_sequence_number: Default::default(),
159        }
160    }
161
162    /// Specifies the sequence number of the `Event` the [`SubscriptionStream`]
163    /// should start from when calling [`run`].
164    ///
165    /// [`SubscriptionStream`]: type.SubscriptionStream.html
166    /// [`run`]: struct.Transient.html#method.run
167    pub fn from(self, sequence_number: u32) -> Self {
168        self.last_sequence_number
169            .store(sequence_number, Ordering::Relaxed);
170
171        self
172    }
173}
174
175impl<Store, Subscriber> Subscription for Transient<Store, Subscriber>
176where
177    Store: EventStore + Send + Sync,
178    Subscriber: EventSubscriber<
179            SourceId = <Store as EventStore>::SourceId,
180            Event = <Store as EventStore>::Event,
181        > + Send
182        + Sync,
183    <Store as EventStore>::SourceId: Send + Sync,
184    <Store as EventStore>::Event: Send + Sync,
185    <Store as EventStore>::Error: StdError + Send + Sync + 'static,
186    <Subscriber as EventSubscriber>::Error: StdError + Send + Sync + 'static,
187{
188    type SourceId = Store::SourceId;
189    type Event = Store::Event;
190    type Error = Error;
191
192    fn resume(&self) -> BoxFuture<Result<SubscriptionStream<Self>, Self::Error>> {
193        Box::pin(async move {
194            // Create the Subscription first, so that once the future has been resolved
195            // we'll start receiving events right away.
196            //
197            // This is to avoid losing events when waiting for the one-off stream
198            // to resolve its future.
199            //
200            // The impact is that we _might_ get duplicated events from the one-off stream
201            // and the subscription stream. Luckily, we can discard those by
202            // keeping an internal state of the last processed sequence number,
203            // and discard all those events that are found.
204            let subscription = self
205                .subscriber
206                .subscribe_all()
207                .await
208                .map_err(anyhow::Error::from)
209                .map_err(Error::Store)?;
210
211            let one_off_stream = self
212                .store
213                .stream_all(Select::From(
214                    self.last_sequence_number.load(Ordering::Relaxed),
215                ))
216                .await
217                .map_err(anyhow::Error::from)
218                .map_err(Error::Subscription)?;
219
220            let stream = one_off_stream
221                .map_err(anyhow::Error::from)
222                .map_err(Error::Store)
223                .chain(
224                    subscription
225                        .map_err(anyhow::Error::from)
226                        .map_err(Error::Subscription),
227                )
228                .try_filter_map(move |event| async move {
229                    let expected_sequence_number =
230                        self.last_sequence_number.load(Ordering::Relaxed);
231
232                    let event_sequence_number = event.sequence_number();
233
234                    if event_sequence_number < expected_sequence_number {
235                        #[cfg(feature = "with-tracing")]
236                        tracing::trace!(
237                            event.sequence_number = event_sequence_number,
238                            subscription.sequence_number = expected_sequence_number,
239                            "Duplicated event detected; skipping"
240                        );
241
242                        return Ok(None);
243                    }
244
245                    Ok(Some(event))
246                })
247                .boxed();
248
249            Ok(stream)
250        })
251    }
252
253    fn checkpoint(&self, version: u32) -> BoxFuture<Result<(), Self::Error>> {
254        // Checkpointing happens in memory on the atomic sequence number checkpoint.
255        self.last_sequence_number.store(version, Ordering::Relaxed);
256        ok(()).boxed()
257    }
258}