eventually_redis/
lib.rs

1//! Redis backend implementation for [`eventually` crate](https://crates.io/crates/eventually).
2//!
3//! ## Event Store
4//!
5//! `eventually-redis` supports the [`eventually::EventStore`] trait through
6//! the [`EventStore`] type.
7//!
8//! ## Event Subscriber
9//!
10//! `eventually-redis` supports the [`eventually::EventSubscriber`] trait
11//! through the [`EventSubscriber`] type.
12//!
13//! [`eventually::EventStore`]: ../eventually/trait.EventStore.html
14//! [`EventStore`]: struct.EventStore.html
15//! [`EventSubscriber`]: struct.EventSubscriber.html
16
17use std::convert::TryFrom;
18use std::fmt::{Debug, Display};
19
20use eventually_core::store::{
21    AppendError, EventStream as StoreEventStream, Expected, Persisted, Select,
22};
23use eventually_core::subscription::EventStream as SubscriberEventStream;
24
25use futures::future::BoxFuture;
26use futures::stream::{Stream, StreamExt, TryStreamExt};
27
28use lazy_static::lazy_static;
29
30use redis::streams::{StreamId, StreamRangeReply};
31use redis::{AsyncCommands, RedisError, RedisResult};
32
33use serde::{Deserialize, Serialize};
34
35/// Default size of a paginated request to Redis `XRANGE .. COUNT n`
36/// for the [`EventStore::stream`] and [`EventStore::stream_all`] operations.
37///
38/// Page size can be overridden through the [`EventStoreBuilder::stream_page_size`]
39/// option.
40///
41/// [`EventStore::stream`]: struct.EventStore.html#tymethod.stream
42/// [`EventStore::stream_all`]: struct.EventStore.html#tymethod.stream_all
43/// [`EventStoreBuilder::stream_page_size`]: struct.EventStoreBuilder.html#method.stream_page_size
44pub const STREAM_PAGE_DEFAULT: usize = 128;
45
46static APPEND_TO_STORE_SOURCE: &str = std::include_str!("append_to_store.lua");
47
48lazy_static! {
49    static ref APPEND_TO_STORE_SCRIPT: redis::Script = redis::Script::new(APPEND_TO_STORE_SOURCE);
50}
51
52/// Builder type for [`EventStore`] and [`EventSubscriber`] types.
53///
54/// The same builder instance can be used to build multiple instances of [`EventStore`]
55/// and [`EventSubscriber`].
56///
57/// [`EventStore`]: struct.EventStore.html
58/// [`EventSubscriber`]: struct.EventSubscriber.html
59#[derive(Clone)]
60pub struct EventStoreBuilder {
61    client: redis::Client,
62    stream_page_size: Option<usize>,
63}
64
65impl EventStoreBuilder {
66    /// Creates a new builder instance using the specified Redis client.
67    pub fn new(client: redis::Client) -> Self {
68        Self {
69            client,
70            stream_page_size: None,
71        }
72    }
73
74    /// Changes the page size used by the [`Stream`] returned in [`EventStore::stream`]
75    /// and [`EventStore::stream_all`].
76    ///
77    /// [`Stream`]: https://docs.rs/futures/0.3/futures/stream/trait.Stream.html
78    /// [`EventStore::stream`]: struct.EventStore.html#tymethod.stream
79    /// [`EventStore::stream_all`]: struct.EventStore.html#tymethod.stream_all
80    pub fn stream_page_size(mut self, size: usize) -> Self {
81        self.stream_page_size = Some(size);
82        self
83    }
84
85    /// Builds a new [`EventStore`] instance.
86    ///
87    /// This method returns an `std::future::Future` completing after a
88    /// connection with Redis is successfully established.
89    ///
90    /// [`EventStore`]: struct.EventStore.html
91    pub async fn build_store<Id, Event>(
92        &self,
93        stream_name: &'static str,
94    ) -> RedisResult<EventStore<Id, Event>> {
95        Ok(EventStore {
96            stream_name,
97            conn: self.client.get_multiplexed_async_connection().await?,
98            stream_page_size: self.stream_page_size.unwrap_or(STREAM_PAGE_DEFAULT),
99            id: std::marker::PhantomData,
100            event: std::marker::PhantomData,
101        })
102    }
103
104    /// Builds a new [`EventSubscriber`] instance.
105    ///
106    /// [`EventSubscriber`]: struct.EventSubscriber.html
107    pub fn build_subscriber<Id, Event>(
108        &self,
109        stream_name: &'static str,
110    ) -> EventSubscriber<Id, Event> {
111        EventSubscriber {
112            stream_name,
113            client: self.client.clone(),
114            id: std::marker::PhantomData,
115            event: std::marker::PhantomData,
116        }
117    }
118}
119
120/// Result returning the crate [`StoreError`] type.
121///
122/// [`StoreError`]: enum.Error.html
123pub type StoreResult<T> = Result<T, StoreError>;
124
125/// Error types returned by the [`eventually::EventStore`] implementation
126/// on the [`EventStore`] type.
127///
128/// [`eventually::EventStore`]: ../eventually/trait.EventStore.html
129/// [`EventStore`]: struct.EventStore.html
130#[derive(Debug, thiserror::Error)]
131pub enum StoreError {
132    /// Error returned when failed to encode events to JSON during [`append`].
133    ///
134    /// [`append`]: struct.EventStore.html#tymethod.append
135    #[error("failed to encode events: {0}")]
136    EncodeEvents(#[source] serde_json::Error),
137
138    /// Error returned when failed to decoding events from JSON
139    /// during either [`stream`] or [`stream_all`].
140    ///
141    /// [`stream`]: struct.EventStore.html#tymethod.stream
142    /// [`stream_all`]: struct.EventStore.html#tymethod.stream_all
143    #[error("failed to decode events: {0}")]
144    DecodeEvents(#[source] serde_json::Error),
145
146    /// Error returned when reading the stream coming from `XRANGE .. COUNT n`
147    /// during either [`stream`] or [`stream_all`].
148    ///
149    /// [`stream`]: struct.EventStore.html#tymethod.stream
150    /// [`stream_all`]: struct.EventStore.html#tymethod.stream_all
151    #[error("failed while reading stream from Redis: {0}")]
152    Stream(#[source] RedisError),
153
154    /// Error returned when attempting to read a key from the Redis stream
155    /// that does not exist.
156    #[error("no key from Redis result: `{0}`")]
157    NoKey(&'static str),
158
159    /// Error returned when attempting to decode the source id of one
160    /// Redis stream entry.
161    #[error("failed to decode source_id from Redis entry: {0}")]
162    DecodeSourceId(#[source] anyhow::Error),
163}
164
165impl AppendError for StoreError {
166    #[inline]
167    fn is_conflict_error(&self) -> bool {
168        false
169    }
170}
171
172/// Redis backend implementation for [`eventually::EventStore`] trait.
173///
174/// [`eventually::EventStore`]: ../eventually/trait.EventStore.html
175#[derive(Clone)]
176pub struct EventStore<Id, Event> {
177    stream_name: &'static str,
178    conn: redis::aio::MultiplexedConnection,
179    stream_page_size: usize,
180    id: std::marker::PhantomData<Id>,
181    event: std::marker::PhantomData<Event>,
182}
183
184impl<Id, Event> eventually_core::store::EventStore for EventStore<Id, Event>
185where
186    Id: TryFrom<String> + Display + Eq + Clone + Send + Sync,
187    <Id as TryFrom<String>>::Error: std::error::Error + Send + Sync + 'static,
188    Event: Serialize + Send + Sync,
189    for<'de> Event: Deserialize<'de>,
190{
191    type SourceId = Id;
192    type Event = Event;
193    type Error = StoreError;
194
195    fn append(
196        &mut self,
197        id: Self::SourceId,
198        version: Expected,
199        events: Vec<Self::Event>,
200    ) -> BoxFuture<StoreResult<u32>> {
201        let fut = async move {
202            let events = events
203                .iter()
204                .map(serde_json::to_string)
205                .collect::<Result<Vec<_>, _>>()
206                .map_err(StoreError::EncodeEvents)?;
207
208            Ok(APPEND_TO_STORE_SCRIPT
209                .key(self.stream_name)
210                .key(id.to_string())
211                .arg(match version {
212                    Expected::Any => -1,
213                    Expected::Exact(v) => v as i64,
214                })
215                .arg(events)
216                .invoke_async(&mut self.conn)
217                .await
218                .unwrap())
219        };
220
221        Box::pin(fut)
222    }
223
224    fn stream(
225        &self,
226        id: Self::SourceId,
227        select: Select,
228    ) -> BoxFuture<StoreResult<StoreEventStream<Self>>> {
229        let fut = async move {
230            let stream_name = format!("{}.{}", self.stream_name, id.to_string());
231
232            let paginator = RedisPaginatedStream {
233                conn: self.conn.clone(),
234                stream_name,
235                page_size: self.stream_page_size,
236                from: match select {
237                    Select::All => 0,
238                    Select::From(v) => v as usize,
239                },
240            };
241
242            Ok(paginator
243                .into_stream()
244                .map_err(StoreError::Stream)
245                .map(move |res| res.map(|v| (id.clone(), v)))
246                .and_then(move |(id, entry)| async move {
247                    let event: Vec<u8> = entry
248                        .get("event")
249                        .ok_or_else(|| StoreError::NoKey("event"))?;
250                    let event: Event =
251                        serde_json::from_slice(&event).map_err(StoreError::DecodeEvents)?;
252
253                    let (version, sequence_number) = parse_entry_id(&entry.id);
254
255                    Ok(Persisted::from(id, event)
256                        .sequence_number(sequence_number as u32)
257                        .version(version as u32))
258                })
259                .boxed())
260        };
261
262        Box::pin(fut)
263    }
264
265    fn stream_all(&self, select: Select) -> BoxFuture<StoreResult<StoreEventStream<Self>>> {
266        let fut = async move {
267            let paginator = RedisPaginatedStream {
268                conn: self.conn.clone(),
269                stream_name: self.stream_name.to_owned(),
270                page_size: self.stream_page_size,
271                from: match select {
272                    Select::All => 0,
273                    Select::From(v) => v as usize,
274                },
275            };
276
277            Ok(paginator
278                .into_stream()
279                .map_err(StoreError::Stream)
280                .and_then(|entry| async move {
281                    let source_id: String = entry
282                        .get("source_id")
283                        .ok_or_else(|| StoreError::NoKey("source_id"))?;
284
285                    let source_id: Id = Id::try_from(source_id)
286                        .map_err(anyhow::Error::from)
287                        .map_err(StoreError::DecodeSourceId)?;
288
289                    let event: Vec<u8> = entry
290                        .get("event")
291                        .ok_or_else(|| StoreError::NoKey("event"))?;
292                    let event: Event =
293                        serde_json::from_slice(&event).map_err(StoreError::DecodeEvents)?;
294
295                    let (sequence_number, version) = parse_entry_id(&entry.id);
296
297                    Ok(Persisted::from(source_id, event)
298                        .sequence_number(sequence_number as u32)
299                        .version(version as u32))
300                })
301                .boxed())
302        };
303
304        Box::pin(fut)
305    }
306
307    fn remove(&mut self, _id: Self::SourceId) -> BoxFuture<StoreResult<()>> {
308        unimplemented!()
309    }
310}
311
312/// Result returning the crate [`SubscriberError`] type.
313///
314/// [`SubscriberError`]: enum.Error.html
315pub type SubscriberResult<T> = Result<T, SubscriberError>;
316
317/// Error types returned by the [`eventually::EventSubscriber`] implementation
318/// on the [`EventSubscriber`] type.
319///
320/// [`eventually::EventSubscriber`]: ../eventually/trait.EventSubscriber.html
321/// [`EventSubscriber`]: struct.EventSubscriber.html
322#[derive(Debug, thiserror::Error)]
323pub enum SubscriberError {
324    /// Error returned when failed to establish a [`PubSub`] connection
325    /// with Redis.
326    ///
327    /// [`PubSub`]: https://docs.rs/redis/0.17.0/redis/aio/struct.PubSub.html
328    #[error("failed to establish connection with Redis: {0}")]
329    Connection(#[source] RedisError),
330
331    /// Error returned when failed to get the payload from a `SUBSCRIBE` event.
332    #[error("failed to get payload from message: {0}")]
333    Payload(#[source] RedisError),
334
335    /// Error returned when failed to decode the payload received
336    /// from JSON.
337    #[error("failed to decode published message: {0}")]
338    DecodeMessage(#[source] serde_json::Error),
339
340    /// Error returned when failed to execute the `SUBSCRIBE` command
341    /// to receive notification on the stream topic.
342    #[error("failed to subscriber to stream events: {0}")]
343    Subscribe(#[source] RedisError),
344
345    /// Error returned when attempting to decode the source id from the
346    /// notification payload.
347    #[error("failed to decode source_id from published message: {0}")]
348    DecodeSourceId(#[source] anyhow::Error),
349}
350
351/// Redis backend implementation for [`eventually::EventSubscriber`] trait.
352///
353/// [`eventually::EventSubscriber`]: ../eventually/trait.EventSubscriber.html
354pub struct EventSubscriber<Id, Event> {
355    stream_name: &'static str,
356    client: redis::Client,
357    id: std::marker::PhantomData<Id>,
358    event: std::marker::PhantomData<Event>,
359}
360
361impl<Id, Event> eventually_core::subscription::EventSubscriber for EventSubscriber<Id, Event>
362where
363    Id: TryFrom<String> + Eq + Send + Sync,
364    <Id as TryFrom<String>>::Error: std::error::Error + Send + Sync + 'static,
365    Event: Send + Sync,
366    for<'de> Event: Deserialize<'de>,
367{
368    type SourceId = Id;
369    type Event = Event;
370    type Error = SubscriberError;
371
372    fn subscribe_all(&self) -> BoxFuture<SubscriberResult<SubscriberEventStream<Self>>> {
373        #[derive(Deserialize)]
374        struct SubscribeMessage<Event> {
375            source_id: String,
376            sequence_number: u32,
377            version: u32,
378            event: Event,
379        }
380
381        let fut = async move {
382            let mut pubsub = self
383                .client
384                .get_async_connection()
385                .await
386                .map_err(SubscriberError::Connection)?
387                .into_pubsub();
388
389            pubsub
390                .subscribe(self.stream_name)
391                .await
392                .map_err(SubscriberError::Subscribe)?;
393
394            Ok(pubsub
395                .into_on_message()
396                .map(|msg| msg.get_payload::<Vec<u8>>())
397                .map_err(SubscriberError::Payload)
398                .and_then(|payload| async move {
399                    let msg: SubscribeMessage<Event> =
400                        serde_json::from_slice(&payload).map_err(SubscriberError::DecodeMessage)?;
401
402                    let source_id = Id::try_from(msg.source_id)
403                        .map_err(anyhow::Error::from)
404                        .map_err(SubscriberError::DecodeSourceId)?;
405
406                    Ok(Persisted::from(source_id, msg.event)
407                        .sequence_number(msg.sequence_number)
408                        .version(msg.version))
409                })
410                .boxed())
411        };
412
413        Box::pin(fut)
414    }
415}
416
417/// [`futures::Stream`] implementation for Redis `XRANGE` operations.
418///
419/// [`futures::Stream`]: https://docs.rs/futures/0.3/futures/stream/trait.Stream.html
420struct RedisPaginatedStream {
421    conn: redis::aio::MultiplexedConnection,
422    stream_name: String,
423    page_size: usize,
424    from: usize,
425}
426
427impl RedisPaginatedStream {
428    /// Returns a [`futures::Stream`] instance out of the paginated stream instance.
429    ///
430    /// This implementation will fetch all requested entries from a Redis Stream
431    /// using pagination.
432    ///
433    /// Each page is as big as `page_size`; for each page requested,
434    /// all the entries in [`StreamRangeReply`] are yielded in the stream,
435    /// until the entries are fully exhausted.
436    ///
437    /// The stream stop when all entries in the Redis Stream have been returned.
438    ///
439    /// [`futures::Stream`]: https://docs.rs/futures/0.3/futures/stream/trait.Stream.html
440    /// [`StreamRangeReply`]: https://docs.rs/redis/0.17.0/redis/streams/struct.StreamRangeReply.html
441    fn into_stream(mut self) -> impl Stream<Item = RedisResult<StreamId>> + 'static {
442        async_stream::try_stream! {
443            let mut from = self.from;
444
445            loop {
446                let result: StreamRangeReply = self.conn.xrange_count(&self.stream_name, from, "+", self.page_size).await?;
447                let ids = result.ids;
448                let size = ids.len();
449
450                for id in ids {
451                    from = parse_entry_id(&id.id).0 + 1;
452                    yield id;
453                }
454
455                if size < self.page_size {
456                    break;
457                }
458            }
459        }
460    }
461}
462
463fn parse_entry_id(id: &str) -> (usize, usize) {
464    let parts: Vec<&str> = id.split('-').collect();
465    (parts[0].parse().unwrap(), parts[1].parse().unwrap())
466}