eventsourced/
lib.rs

1#![cfg_attr(docsrs, feature(doc_cfg))]
2
3//! Event sourced entities.
4//!
5//! EventSourced is inspired to a large degree by the amazing
6//! [Akka Persistence](https://doc.akka.io/docs/akka/current/typed/index-persistence.html) library.
7//! It provides a framework for implementing
8//! [Event Sourcing](https://martinfowler.com/eaaDev/EventSourcing.html) and
9//! [CQRS](https://www.martinfowler.com/bliki/CQRS.html).
10//!
11//! The [EventSourced] trait defines the event type and handling for event sourced entities. These
12//! are identifiable by a type name and ID and can be created with the [EventSourcedExt::entity]
13//! extension method. Commands can be defined via the [Command] trait which contains a command
14//! handler function to either reject a command or return an event. An event gets persisted to the
15//! event log and then applied to the event handler to return the new state of the entity.
16//!
17//! ```text
18//!                  ┌───────┐   ┌ ─ ─ ─ Entity─ ─ ─ ─
19//!                  │Command│                        │
20//! ┌ ─ ─ ─ ─ ─ ─    └───────┘   │ ┌────────────────┐
21//!     Client   │────────────────▶│ handle_command │─┼─────────┐
22//! └ ─ ─ ─ ─ ─ ─                │ └────────────────┘           │
23//!        ▲                           │    │         │         │ ┌─────┐
24//!         ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─│─ ─ ─     │read               │ │Event│
25//!                   ┌─────┐               ▼         │         ▼ └─────┘
26//!                   │Reply│    │     ┌─────────┐       ┌ ─ ─ ─ ─ ─ ─
27//!                   │  /  │          │  State  │    │     EventLog  │
28//!                   │Error│    │     └─────────┘       └ ─ ─ ─ ─ ─ ─
29//!                   └─────┘               ▲         │         │ ┌─────┐
30//!                              │     write│                   │ │Event│
31//!                                         │         │         │ └─────┘
32//!                              │ ┌────────────────┐           │
33//!                                │  handle_event  │◀┼─────────┘
34//!                              │ └────────────────┘
35//!                                                   │
36//!                              └ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─
37//! ```
38//!
39//! The [EventLog] and [SnapshotStore] traits define a pluggable event log and a pluggable snapshot
40//! store respectively. For [NATS](https://nats.io/) and [Postgres](https://www.postgresql.org/)
41//! these are implemented in the respective crates.
42//!
43//! [EventSourcedEntity::spawn] puts the event sourced entity on the given event log and snapshot
44//! store, returning an [EntityRef] which can be cheaply cloned and used to pass commands to the
45//! entity. Conversion of events and snapshot state to and from bytes happens via the given
46//! [Binarize] implementation; for [prost](https://github.com/tokio-rs/prost) and [serde_json](https://github.com/serde-rs/json)
47//! these are already provided. Snapshots are taken after the configured number of processed events
48//! to speed up future spawning.
49//!
50//! [EntityRef::handle_command] either returns [Command::Error] for a rejected command or
51//! [Command::Reply] for an accepted one, wrapped in another `Result` dealing with technical errors.
52//!
53//! Events can be queried from the event log by ID or by entity type. These queries can be used to
54//! build read side projections. There is early support for projections in the
55//! `eventsourced-projection` crate.
56
57pub mod binarize;
58pub mod event_log;
59pub mod snapshot_store;
60
61mod util;
62
63use crate::{
64    binarize::Binarize,
65    event_log::EventLog,
66    snapshot_store::{Snapshot, SnapshotStore},
67    util::StreamExt as ThisStreamExt,
68};
69use error_ext::{BoxError, StdErrorExt};
70use futures::{future::ok, TryStreamExt};
71use serde::{Deserialize, Serialize};
72use std::{
73    any::Any,
74    fmt::Debug,
75    marker::PhantomData,
76    num::{NonZeroU64, NonZeroUsize},
77};
78use thiserror::Error;
79use tokio::{
80    sync::{mpsc, oneshot},
81    task,
82};
83use tracing::{debug, error, instrument};
84
85type BoxedCommand<E> = Box<dyn ErasedCommand<E> + Send>;
86type BoxedCommandEffect<E> = Result<
87    (
88        Option<<E as EventSourced>::Event>,
89        Box<dyn FnOnce(&E) -> BoxedAny + Send + Sync>,
90    ),
91    BoxedAny,
92>;
93type BoxedAny = Box<dyn Any + Send>;
94type BoxedMsg<E> = (BoxedCommand<E>, oneshot::Sender<Result<BoxedAny, BoxedAny>>);
95
96/// State and event handling for an [EventSourcedEntity].
97pub trait EventSourced
98where
99    Self: Send + Sync + Sized + 'static,
100{
101    /// The Id type.
102    type Id: Debug + Clone + Send;
103
104    /// The event type.
105    type Event: Debug + Send + Sync + 'static;
106
107    /// The type name.
108    const TYPE_NAME: &'static str;
109
110    /// The event handler.
111    fn handle_event(self, event: Self::Event) -> Self;
112}
113
114/// A command for a [EventSourced] implementation, defining command handling and replying.
115pub trait Command<E>
116where
117    Self: Debug + Send + 'static,
118    E: EventSourced,
119{
120    /// The type for replies.
121    type Reply: Send + Sync + 'static;
122
123    /// The type for rejecting this command.
124    type Error: Send + 'static;
125
126    /// The command handler, taking this command, and references to the ID and the state of
127    /// the event sourced entity, either rejecting this command via [CommandEffect::reject] or
128    /// returning an event using [CommandEffect::emit_and_reply] (or [CommandEffect::emit] in
129    /// case `Reply = ()`).
130    fn handle_command(self, id: &E::Id, state: &E) -> CommandEffect<E, Self::Reply, Self::Error>;
131}
132
133/// The result of handling a command, either emitting an event and replying or rejecting the
134/// command.
135pub enum CommandEffect<E, Reply, Error>
136where
137    E: EventSourced,
138{
139    EmitAndReply(E::Event, Box<dyn FnOnce(&E) -> Reply + Send + Sync>),
140    Reply(Reply),
141    Reject(Error),
142}
143
144impl<E, Reply, Error> CommandEffect<E, Reply, Error>
145where
146    E: EventSourced,
147{
148    /// Emit the given event, persist it, and after applying it to the state, use the given function
149    /// to create a reply. The new state is passed to the function after applying the event.
150    pub fn emit_and_reply(
151        event: E::Event,
152        make_reply: impl FnOnce(&E) -> Reply + Send + Sync + 'static,
153    ) -> Self {
154        Self::EmitAndReply(event, Box::new(make_reply))
155    }
156
157    /// Reply with the given value without emitting any event.
158    pub fn reply(reply: Reply) -> Self {
159        Self::Reply(reply)
160    }
161
162    /// Reject this command with the given error.
163    pub fn reject(error: Error) -> Self {
164        Self::Reject(error)
165    }
166}
167
168impl<E, Error> CommandEffect<E, (), Error>
169where
170    E: EventSourced,
171{
172    /// Persist the given event (and don't give a reply for Commands with Reply = ()).
173    pub fn emit(event: E::Event) -> Self {
174        Self::emit_and_reply(event, |_| ())
175    }
176}
177
178/// A handle representing a spawned [EventSourcedEntity], which can be used to pass it commands.
179#[derive(Debug, Clone)]
180pub struct EntityRef<E>
181where
182    E: EventSourced,
183{
184    command_in: mpsc::Sender<BoxedMsg<E>>,
185    id: E::Id,
186    _e: PhantomData<E>,
187}
188
189impl<E> EntityRef<E>
190where
191    E: EventSourced,
192{
193    /// The ID of the represented [EventSourcedEntity].
194    pub fn id(&self) -> &E::Id {
195        &self.id
196    }
197
198    /// Pass the given command to the represented [EventSourcedEntity]. The returned value is a
199    /// nested result where the outer one represents technical errors, e.g. problems connecting to
200    /// the event log, and the inner one comes from the command handler, i.e. signals potential
201    /// command rejection.
202    #[instrument(skip(self))]
203    pub async fn handle_command<C>(
204        &self,
205        command: C,
206    ) -> Result<Result<C::Reply, C::Error>, HandleCommandError>
207    where
208        C: Command<E>,
209    {
210        let (result_in, result_out) = oneshot::channel();
211        self.command_in
212            .send((Box::new(command), result_in))
213            .await
214            .map_err(|_| HandleCommandError("cannot send command".to_string()))?;
215        let result = result_out
216            .await
217            .map_err(|_| HandleCommandError("cannot receive command handler result".to_string()))?;
218        let result = result
219            .map_err(|error| *error.downcast::<C::Error>().expect("downcast error"))
220            .map(|reply| *reply.downcast::<C::Reply>().expect("downcast reply"));
221        Ok(result)
222    }
223}
224
225/// Extension methods for [EventSourced] entities.
226pub trait EventSourcedExt
227where
228    Self: EventSourced,
229{
230    /// Create a new [EventSourcedEntity] for this [EventSourced] implementation.
231    fn entity(self) -> EventSourcedEntity<Self> {
232        EventSourcedEntity(self)
233    }
234}
235
236impl<E> EventSourcedExt for E where E: EventSourced {}
237
238/// An [EventSourcedEntity] which can be `spawn`ed.
239#[derive(Debug, Clone)]
240pub struct EventSourcedEntity<E>(E)
241where
242    E: EventSourced;
243
244impl<E> EventSourcedEntity<E>
245where
246    E: EventSourced,
247{
248    /// Spawn this [EventSourcedEntity] with the given ID, settings, event log, snapshot store and
249    /// `Binarize` functions.
250    #[instrument(skip(self, event_log, snapshot_store, binarize))]
251    pub async fn spawn<L, S, B>(
252        self,
253        id: E::Id,
254        snapshot_after: Option<NonZeroU64>,
255        command_buffer: NonZeroUsize,
256        mut event_log: L,
257        mut snapshot_store: S,
258        binarize: B,
259    ) -> Result<EntityRef<E>, SpawnError>
260    where
261        L: EventLog<Id = E::Id>,
262        S: SnapshotStore<Id = E::Id>,
263        B: Binarize<E::Event, E>,
264    {
265        // Restore snapshot.
266        let (snapshot_seq_no, state) = snapshot_store
267            .load::<E, _, _>(&id, |bytes| binarize.state_from_bytes(bytes))
268            .await
269            .map_err(|error| SpawnError::LoadSnapshot(error.into()))?
270            .map(|Snapshot { seq_no, state }| {
271                debug!(?id, seq_no, "restored snapshot");
272                (seq_no, state)
273            })
274            .unzip();
275        let mut state = state.unwrap_or(self.0);
276
277        // Get and validate last sequence number.
278        let mut last_seq_no = event_log
279            .last_seq_no(E::TYPE_NAME, &id)
280            .await
281            .map_err(|error| SpawnError::LastNonZeroU64(error.into()))?;
282        if last_seq_no < snapshot_seq_no {
283            return Err(SpawnError::InvalidLastSeqNo(last_seq_no, snapshot_seq_no));
284        };
285
286        // Replay latest events.
287        if snapshot_seq_no < last_seq_no {
288            let seq_no = snapshot_seq_no
289                .map(|n| n.saturating_add(1))
290                .unwrap_or(NonZeroU64::MIN);
291            let to_seq_no = last_seq_no.unwrap(); // This is safe because of the above relation!
292            debug!(?id, seq_no, to_seq_no, "replaying events");
293
294            let events = event_log
295                .events_by_id::<E::Event, _, _>(E::TYPE_NAME, &id, seq_no, move |bytes| {
296                    binarize.event_from_bytes(bytes)
297                })
298                .await
299                .map_err(|error| SpawnError::EventsById(error.into()))?;
300
301            state = events
302                .map_err(|error| SpawnError::NextEvent(error.into()))
303                .take_until_predicate(move |result| {
304                    result
305                        .as_ref()
306                        .ok()
307                        .map(|&(seq_no, _)| seq_no >= to_seq_no)
308                        .unwrap_or(true)
309                })
310                .try_fold(state, |state, (_, event)| ok(state.handle_event(event)))
311                .await?;
312
313            debug!(?id, "replayed events");
314        }
315
316        // Spawn handler loop.
317        let (command_in, mut command_out) = mpsc::channel::<BoxedMsg<E>>(command_buffer.get());
318        task::spawn({
319            let id = id.clone();
320            let mut event_count = 0u64;
321
322            async move {
323                while let Some((command, result_sender)) = command_out.recv().await {
324                    debug!(?id, ?command, "handling command");
325
326                    let result = command.handle_command(&id, &state);
327                    match result {
328                        Ok((Some(event), make_reply)) => {
329                            debug!(?id, ?event, "persisting event");
330
331                            match event_log
332                                .persist::<E::Event, _, _>(
333                                    E::TYPE_NAME,
334                                    &id,
335                                    last_seq_no,
336                                    &event,
337                                    &|event| binarize.event_to_bytes(event),
338                                )
339                                .await
340                            {
341                                Ok(seq_no) => {
342                                    debug!(?id, ?event, seq_no, "persited event");
343
344                                    last_seq_no = Some(seq_no);
345                                    state = state.handle_event(event);
346
347                                    event_count += 1;
348                                    if snapshot_after
349                                        .map(|a| event_count % a == 0)
350                                        .unwrap_or_default()
351                                    {
352                                        debug!(?id, seq_no, event_count, "saving snapshot");
353
354                                        if let Err(error) = snapshot_store
355                                            .save(&id, seq_no, &state, &|state| {
356                                                binarize.state_to_bytes(state)
357                                            })
358                                            .await
359                                        {
360                                            error!(
361                                                error = error.as_chain(),
362                                                ?id,
363                                                "cannot save snapshot"
364                                            );
365                                        };
366                                    }
367
368                                    let reply = make_reply(&state);
369                                    if result_sender.send(Ok(reply)).is_err() {
370                                        error!(?id, "cannot send command reply");
371                                    };
372                                }
373
374                                Err(error) => {
375                                    error!(error = error.as_chain(), ?id, "cannot persist event");
376                                    // This is fatal, we must terminate the entity!
377                                    break;
378                                }
379                            }
380                        }
381
382                        Ok((None, make_reply)) => {
383                            let reply = make_reply(&state);
384                            if result_sender.send(Ok(reply)).is_err() {
385                                error!(?id, "cannot send command reply");
386                            }
387                        }
388
389                        Err(error) => {
390                            if result_sender.send(Err(error)).is_err() {
391                                error!(?id, "cannot send command error");
392                            }
393                        }
394                    };
395                }
396
397                debug!(?id, "entity terminated");
398            }
399        });
400
401        Ok(EntityRef {
402            command_in,
403            id,
404            _e: PhantomData,
405        })
406    }
407}
408
409/// A technical error, signaling that a command cannot be sent from an [EntityRef] to its event
410/// sourced entity or the result cannot be received from its event sourced entity.
411#[derive(Debug, Error, Serialize, Deserialize)]
412#[error("{0}")]
413pub struct HandleCommandError(String);
414
415/// A technical error when spawning an [EventSourcedEntity].
416#[derive(Debug, Error)]
417pub enum SpawnError {
418    #[error("cannot load snapshot from snapshot store")]
419    LoadSnapshot(#[source] BoxError),
420
421    #[error("last sequence number {0:?} less than snapshot sequence number {0:?}")]
422    InvalidLastSeqNo(Option<NonZeroU64>, Option<NonZeroU64>),
423
424    #[error("cannot get last seqence number from event log")]
425    LastNonZeroU64(#[source] BoxError),
426
427    #[error("cannot get events by ID stream from event log")]
428    EventsById(#[source] BoxError),
429
430    #[error("cannot get next event from events by ID stream")]
431    NextEvent(#[source] BoxError),
432}
433
434trait ErasedCommand<E>
435where
436    Self: Debug,
437    E: EventSourced,
438{
439    fn handle_command(self: Box<Self>, id: &E::Id, state: &E) -> BoxedCommandEffect<E>;
440}
441
442impl<C, E, Reply, Error> ErasedCommand<E> for C
443where
444    C: Command<E, Reply = Reply, Error = Error>,
445    E: EventSourced,
446    Reply: Send + Sync + 'static,
447    Error: Send + 'static,
448{
449    fn handle_command(self: Box<Self>, id: &E::Id, state: &E) -> BoxedCommandEffect<E> {
450        match <C as Command<E>>::handle_command(*self, id, state) {
451            CommandEffect::EmitAndReply(event, make_reply) => {
452                Ok((Some(event), Box::new(|s| Box::new(make_reply(s)))))
453            }
454            CommandEffect::Reply(reply) => Ok((None, Box::new(|_s| Box::new(reply)))),
455            CommandEffect::Reject(error) => Err(Box::new(error) as BoxedAny),
456        }
457    }
458}
459
460#[cfg(all(test, feature = "serde_json"))]
461mod tests {
462    use crate::{
463        binarize::serde_json::*,
464        event_log::{test::TestEventLog, EventLog},
465        snapshot_store::{test::TestSnapshotStore, SnapshotStore},
466        Command, CommandEffect, EntityRef, EventSourced, EventSourcedExt,
467    };
468    use assert_matches::assert_matches;
469    use error_ext::BoxError;
470    use serde::{Deserialize, Serialize};
471    use tracing_test::traced_test;
472    use uuid::Uuid;
473
474    #[derive(Debug, Default, Clone, Copy, Serialize, Deserialize)]
475    pub struct Counter(u64);
476
477    impl EventSourced for Counter {
478        type Id = Uuid;
479        type Event = CounterEvent;
480
481        const TYPE_NAME: &'static str = "counter";
482
483        fn handle_event(self, event: CounterEvent) -> Self {
484            match event {
485                CounterEvent::Increased(_, n) => Self(self.0 + n),
486                CounterEvent::Decreased(_, n) => Self(self.0 - n),
487            }
488        }
489    }
490
491    #[derive(Debug, Serialize, Deserialize)]
492    pub enum CounterEvent {
493        Increased(Uuid, u64),
494        Decreased(Uuid, u64),
495    }
496
497    #[derive(Debug)]
498    pub struct IncreaseCounter(pub u64);
499
500    impl Command<Counter> for IncreaseCounter {
501        type Error = Overflow;
502        type Reply = u64;
503
504        fn handle_command(
505            self,
506            id: &Uuid,
507            state: &Counter,
508        ) -> CommandEffect<Counter, u64, Overflow> {
509            if u64::MAX - state.0 < self.0 {
510                CommandEffect::reject(Overflow)
511            } else {
512                CommandEffect::emit_and_reply(
513                    CounterEvent::Increased(*id, self.0),
514                    |state: &Counter| state.0,
515                )
516            }
517        }
518    }
519
520    #[derive(Debug)]
521    pub struct Overflow;
522
523    #[derive(Debug)]
524    pub struct DecreaseCounter(pub u64);
525
526    impl Command<Counter> for DecreaseCounter {
527        type Error = Underflow;
528        type Reply = u64;
529
530        fn handle_command(
531            self,
532            id: &Uuid,
533            state: &Counter,
534        ) -> CommandEffect<Counter, u64, Underflow> {
535            if state.0 < self.0 {
536                CommandEffect::reject(Underflow)
537            } else {
538                CommandEffect::emit_and_reply(
539                    CounterEvent::Decreased(*id, self.0),
540                    move |state: &Counter| state.0 + self.0 - self.0, /* Simple no-op test to
541                                                                       * verify that closing
542                                                                       * over this command is
543                                                                       * possible */
544                )
545            }
546        }
547    }
548
549    #[derive(Debug, PartialEq, Eq)]
550    pub struct Underflow;
551
552    #[derive(Debug)]
553    pub struct GetCounter;
554    impl Command<Counter> for GetCounter {
555        type Error = ();
556        type Reply = u64;
557
558        fn handle_command(self, _id: &Uuid, state: &Counter) -> CommandEffect<Counter, u64, ()> {
559            CommandEffect::reply(state.0)
560        }
561    }
562
563    #[tokio::test]
564    #[traced_test]
565    async fn test() -> Result<(), BoxError> {
566        let id = Uuid::from_u128(1);
567
568        let mut event_log = TestEventLog::default();
569        for _ in 0..42 {
570            event_log
571                .persist(
572                    "counter",
573                    &id,
574                    None,
575                    &CounterEvent::Increased(id, 1),
576                    &to_bytes,
577                )
578                .await?;
579        }
580
581        let mut snapshot_store = TestSnapshotStore::default();
582        snapshot_store
583            .save(&id, 21.try_into()?, &Counter(21), &to_bytes)
584            .await?;
585
586        let entity: EntityRef<Counter> = Counter::default()
587            .entity()
588            .spawn(
589                id,
590                None,
591                1.try_into()?,
592                event_log,
593                snapshot_store,
594                SerdeJsonBinarize,
595            )
596            .await?;
597
598        let reply = entity.handle_command(IncreaseCounter(1)).await?;
599        assert_matches!(reply, Ok(43));
600        let reply = entity.handle_command(DecreaseCounter(100)).await?;
601        assert_matches!(reply, Err(error) if error == Underflow);
602        let reply = entity.handle_command(DecreaseCounter(1)).await?;
603        assert_matches!(reply, Ok(42));
604
605        let reply = entity.handle_command(GetCounter).await?;
606        assert_matches!(reply, Ok(42));
607
608        Ok(())
609    }
610}