eventually_core/subscription.rs
1//! Module for creating and managing long-running Subscriptions
2//! to incoming events in the [`EventStore`].
3//!
4//! [`EventStore`]: ../store/trait.EventStore.html
5
6use std::error::Error as StdError;
7use std::sync::atomic::{AtomicU32, Ordering};
8use std::sync::Arc;
9
10use futures::future::{ok, BoxFuture, FutureExt};
11use futures::stream::{BoxStream, StreamExt, TryStreamExt};
12
13use crate::store::{EventStore, Persisted, Select};
14
15/// Stream of events returned by the [`EventSubscriber::subscribe_all`] method.
16///
17/// [`EventSubscriber::subscribe_all`]: trait.EventSubscriber.html#method.subscribe_all
18pub type EventStream<'a, S> = BoxStream<
19 'a,
20 Result<
21 Persisted<<S as EventSubscriber>::SourceId, <S as EventSubscriber>::Event>,
22 <S as EventSubscriber>::Error,
23 >,
24>;
25
26/// Component to let users subscribe to newly-inserted events into the [`EventStore`].
27///
28/// Check out [`subscribe_all`] for more information.
29///
30/// Additional information can be found in the [_Volatile Subscription_] section
31/// of eventstore.com
32///
33/// [_Volatile Subscription_]: https://eventstore.com/docs/getting-started/reading-subscribing-events/index.html#volatile-subscriptions
34/// [`EventStore`]: ../store/trait.EventStore.html
35pub trait EventSubscriber {
36 /// Type of the Source id, typically an [`AggregateId`].
37 ///
38 /// [`AggregateId`]: ../aggregate/type.AggregateId.html
39 type SourceId: Eq;
40
41 /// Event type stored in the [`EventStore`], typically an [`Aggregate::Event`].
42 ///
43 /// [`Aggregate::Event`]: ../aggregate/trait.Aggregate.html#associatedtype.Event
44 /// [`EventStore`]: ../store/trait.EventStore.html
45 type Event;
46
47 /// Possible errors returned when receiving events from the notification channel.
48 type Error;
49
50 /// Subscribes to all new events persisted in the [`EventStore`], from
51 /// the moment of calling this function, in the future.
52 ///
53 /// Since this is a long-running stream, make sure not to *block*
54 /// or await the full computation of the stream.
55 ///
56 /// Prefer using a `while let` consumer for this [`EventStream`]:
57 ///
58 /// ```text
59 /// let stream = subscriber.subscribe_all().await?;
60 ///
61 /// while let Some(event) = stream.next().await {
62 /// // Do stuff with the received event...
63 /// }
64 /// ```
65 ///
66 /// [`EventStore`]: ../store/trait.EventStore.html
67 /// [`EventStream`]: type.EventStream.html
68 fn subscribe_all(&self) -> BoxFuture<Result<EventStream<Self>, Self::Error>>;
69}
70
71/// Stream of events returned by the [`Subscription::resume`] method.
72///
73/// [`Subscription::resume`]: trait.Subscription.html#method.resume
74pub type SubscriptionStream<'a, S> = BoxStream<
75 'a,
76 Result<
77 Persisted<<S as Subscription>::SourceId, <S as Subscription>::Event>,
78 <S as Subscription>::Error,
79 >,
80>;
81
82/// A Subscription to an [`EventStream`] which can be "checkpointed":
83/// keeps a record of the latest message processed by itself using [`checkpoint`],
84/// and can resume working from such message by using the [`resume`].
85///
86/// [`EventStream`]: type.EventStream.html
87/// [`resume`]: trait.Subscription.html#method.resume
88/// [`checkpoint`]: trait.Subscription.html#method.checkpoint
89pub trait Subscription {
90 /// Type of the Source id, typically an [`AggregateId`].
91 ///
92 /// [`AggregateId`]: ../aggregate/type.AggregateId.html
93 type SourceId: Eq;
94
95 /// Event type stored in the [`EventStore`], typically an [`Aggregate::Event`].
96 ///
97 /// [`Aggregate::Event`]: ../aggregate/trait.Aggregate.html#associatedtype.Event
98 /// [`EventStore`]: ../store/trait.EventStore.html
99 type Event;
100
101 /// Possible errors returned when receiving events from the notification channel.
102 type Error;
103
104 /// Resumes the current state of a `Subscription` by returning the [`EventStream`],
105 /// starting from the last event processed by the `Subscription`.
106 ///
107 /// [`EventStream`]: type.EventStream.html
108 fn resume(&self) -> BoxFuture<Result<SubscriptionStream<Self>, Self::Error>>;
109
110 /// Saves the provided version (or sequence number) as the latest
111 /// version processed.
112 fn checkpoint(&self, version: u32) -> BoxFuture<Result<(), Self::Error>>;
113}
114
115/// Error type returned by a [`Transient`] Subscription.
116///
117/// [`Transient`]: struct.Transient.html
118#[derive(Debug, thiserror::Error)]
119pub enum Error {
120 /// Error caused by the Subscription's [`EventStore`].
121 ///
122 /// [`EventStore`]: ../store/trait.EventStore.html
123 #[error("error received while listening to the event stream from the store: {0}")]
124 Store(#[source] anyhow::Error),
125
126 /// Error caused by the Subscription's [`EventSubscriber`].
127 ///
128 /// [`EventSubscriber`]: trait.EventSubscriber.html
129 #[error("error received while listening to the event stream subscription: {0}")]
130 Subscription(#[source] anyhow::Error),
131}
132
133/// [`Subscription`] type which gets deleted once the process using it
134/// gets terminated.
135///
136/// Useful for in-memory or one-off [`Projection`]s.
137///
138/// [`Subscription`]: trait.Subscription.html
139/// [`Projection`]: ../projection/trait.Projection.html
140pub struct Transient<Store, Subscriber> {
141 store: Store,
142 subscriber: Subscriber,
143 last_sequence_number: Arc<AtomicU32>,
144}
145
146impl<Store, Subscriber> Transient<Store, Subscriber> {
147 /// Creates a new [`Subscription`] using the specified [`EventStore`]
148 /// and [`EventSubscriber`] to create the [`SubscriptionStream`] from.
149 ///
150 /// [`Subscription`]: trait.Subscription.html
151 /// [`EventStore`]: ../store/trait.EventStore.html
152 /// [`EventSubscriber`]: trait.EventSubscriber.html
153 /// [`SubscriptionStream`]: type.SubscriptionStream.html
154 pub fn new(store: Store, subscriber: Subscriber) -> Self {
155 Self {
156 store,
157 subscriber,
158 last_sequence_number: Default::default(),
159 }
160 }
161
162 /// Specifies the sequence number of the `Event` the [`SubscriptionStream`]
163 /// should start from when calling [`run`].
164 ///
165 /// [`SubscriptionStream`]: type.SubscriptionStream.html
166 /// [`run`]: struct.Transient.html#method.run
167 pub fn from(self, sequence_number: u32) -> Self {
168 self.last_sequence_number
169 .store(sequence_number, Ordering::Relaxed);
170
171 self
172 }
173}
174
175impl<Store, Subscriber> Subscription for Transient<Store, Subscriber>
176where
177 Store: EventStore + Send + Sync,
178 Subscriber: EventSubscriber<
179 SourceId = <Store as EventStore>::SourceId,
180 Event = <Store as EventStore>::Event,
181 > + Send
182 + Sync,
183 <Store as EventStore>::SourceId: Send + Sync,
184 <Store as EventStore>::Event: Send + Sync,
185 <Store as EventStore>::Error: StdError + Send + Sync + 'static,
186 <Subscriber as EventSubscriber>::Error: StdError + Send + Sync + 'static,
187{
188 type SourceId = Store::SourceId;
189 type Event = Store::Event;
190 type Error = Error;
191
192 fn resume(&self) -> BoxFuture<Result<SubscriptionStream<Self>, Self::Error>> {
193 Box::pin(async move {
194 // Create the Subscription first, so that once the future has been resolved
195 // we'll start receiving events right away.
196 //
197 // This is to avoid losing events when waiting for the one-off stream
198 // to resolve its future.
199 //
200 // The impact is that we _might_ get duplicated events from the one-off stream
201 // and the subscription stream. Luckily, we can discard those by
202 // keeping an internal state of the last processed sequence number,
203 // and discard all those events that are found.
204 let subscription = self
205 .subscriber
206 .subscribe_all()
207 .await
208 .map_err(anyhow::Error::from)
209 .map_err(Error::Store)?;
210
211 let one_off_stream = self
212 .store
213 .stream_all(Select::From(
214 self.last_sequence_number.load(Ordering::Relaxed),
215 ))
216 .await
217 .map_err(anyhow::Error::from)
218 .map_err(Error::Subscription)?;
219
220 let stream = one_off_stream
221 .map_err(anyhow::Error::from)
222 .map_err(Error::Store)
223 .chain(
224 subscription
225 .map_err(anyhow::Error::from)
226 .map_err(Error::Subscription),
227 )
228 .try_filter_map(move |event| async move {
229 let expected_sequence_number =
230 self.last_sequence_number.load(Ordering::Relaxed);
231
232 let event_sequence_number = event.sequence_number();
233
234 if event_sequence_number < expected_sequence_number {
235 #[cfg(feature = "with-tracing")]
236 tracing::trace!(
237 event.sequence_number = event_sequence_number,
238 subscription.sequence_number = expected_sequence_number,
239 "Duplicated event detected; skipping"
240 );
241
242 return Ok(None);
243 }
244
245 Ok(Some(event))
246 })
247 .boxed();
248
249 Ok(stream)
250 })
251 }
252
253 fn checkpoint(&self, version: u32) -> BoxFuture<Result<(), Self::Error>> {
254 // Checkpointing happens in memory on the atomic sequence number checkpoint.
255 self.last_sequence_number.store(version, Ordering::Relaxed);
256 ok(()).boxed()
257 }
258}