evented 0.2.4

Event Sourcing in Rust on top of PostgreSQL
Documentation
# evented

[![Crates.io][crates-badge]][crates-url]
[![license][license-badge]][license-url]

[crates-badge]: https://img.shields.io/crates/v/evented
[crates-url]: https://crates.io/crates/evented
[license-badge]: https://img.shields.io/github/license/hseeberger/evented
[license-url]: https://github.com/hseeberger/evented/blob/main/LICENSE

evented is a library for [Event Sourcing](https://martinfowler.com/eaaDev/EventSourcing.html) where state changes are persisted as events. It originated from [eventsourced](https://github.com/hseeberger/eventsourced), but offers additional strong consistency features by tightly coupling to [PostgreSQL](https://www.postgresql.org/) as well as other features like event metadata.

The core abstraction of evented is an `EventSourcedEntity` which can be identified via an ID: an `Entity` implementation defines its state and event handling and associated `Command` implementations define its command handling.

When an event-sourced entity receives a command, the respective command handler is called, which either returns a sequence of to be persisted events plus metadata or a rejection. If events are returned, these are transactionally persisted, thereby also invoking an optional `EventListener`. Concurrency is handled by optimistic locking via per entity versions.

When creating an event-sourced entity, its events are loaded and its state is conctructed by applying them to its event handler. This state is then used by the command handlers to decide whether a command should be accepted – resulting in events to be persisted and applied – or rejected.

It is also possible to create asynchronous `Projection`s. These transactionally process events by populating their view(s) and storing the sequence number of the processed event. Currently only events by type projections are supported.

The following shows a simple example from the tests which uses an event listener to build a synchronous and consistent view. Actually this view probably need not be consistent, but real-world use cases for consistent views include uniqueness checks, e.g. email addresses for user entities.

```rust
#[derive(Debug, Default, PartialEq, Eq)]
pub struct Counter(u64);

impl Entity for Counter {
    type Id = Uuid;
    type Event = Event;
    type Metadata = Metadata;

    const TYPE_NAME: &'static str = "counter";

    fn handle_event(&mut self, event: Self::Event) {
        match event {
            Event::Increased { inc, .. } => self.0 += inc,
            Event::Decreased { dec, .. } => self.0 -= dec,
        }
    }
}

#[derive(Debug, Serialize, Deserialize)]
pub enum Event {
    Increased { id: Uuid, inc: u64 },
    Decreased { id: Uuid, dec: u64 },
}

#[derive(Debug)]
pub struct Increase(u64);

impl Command for Increase {
    type Entity = Counter;
    type Rejection = Overflow;

    async fn handle(
        self,
        id: &<Self::Entity as Entity>::Id,
        entity: &Self::Entity,
    ) -> Result<
        Vec<
            impl Into<
                EventWithMetadata<
                    <Self::Entity as Entity>::Event,
                    <Self::Entity as Entity>::Metadata,
                >,
            >,
        >,
        Self::Rejection,
    > {
        let Increase(inc) = self;
        if entity.0 > u64::MAX - inc {
            Err(Overflow)
        } else {
            let increased = Event::Increased { id: *id, inc };
            let metadata = Metadata {
                timestamp: OffsetDateTime::now_utc(),
            };
            Ok(vec![increased.with_metadata(metadata)])
        }
    }
}

#[derive(Debug, PartialEq, Eq)]
pub struct Overflow;

#[derive(Debug)]
pub struct Decrease(u64);

impl Command for Decrease {
    type Entity = Counter;
    type Rejection = Underflow;

    async fn handle(
        self,
        id: &<Self::Entity as Entity>::Id,
        entity: &Self::Entity,
    ) -> Result<
        Vec<
            impl Into<
                EventWithMetadata<
                    <Self::Entity as Entity>::Event,
                    <Self::Entity as Entity>::Metadata,
                >,
            >,
        >,
        Self::Rejection,
    > {
        let Decrease(dec) = self;
        if entity.0 < dec {
            Err::<Vec<_>, Underflow>(Underflow)
        } else {
            let decreased = Event::Decreased { id: *id, dec };
            let metadata = Metadata {
                timestamp: OffsetDateTime::now_utc(),
            };
            Ok(vec![decreased.with_metadata(metadata)])
        }
    }
}

#[derive(Debug, PartialEq, Eq)]
pub struct Underflow;

#[derive(Debug, Serialize, Deserialize)]
pub struct Metadata {
    timestamp: OffsetDateTime,
}

struct Listener;

impl EventListener<Event> for Listener {
    async fn listen(
        &mut self,
        event: &Event,
        tx: &mut Transaction<'_, sqlx::Postgres>,
    ) -> Result<(), BoxError> {
        match event {
            Event::Increased { id, inc } => {
                let value = sqlx::query("SELECT value FROM counters WHERE id = $1")
                    .bind(id)
                    .fetch_optional(&mut **tx)
                    .await
                    .map_err(Box::new)?
                    .map(|row| row.try_get::<i64, _>(0))
                    .transpose()?;
                match value {
                    Some(value) => {
                        sqlx::query("UPDATE counters SET value = $1 WHERE id = $2")
                            .bind(value + *inc as i64)
                            .bind(id)
                            .execute(&mut **tx)
                            .await
                            .map_err(Box::new)?;
                    }

                    None => {
                        sqlx::query("INSERT INTO counters VALUES ($1, $2)")
                            .bind(id)
                            .bind(*inc as i64)
                            .execute(&mut **tx)
                            .await
                            .map_err(Box::new)?;
                    }
                }
                Ok(())
            }

            _ => Ok(()),
        }
    }
}
```

More examples can be found in the [examples](./examples) directory.

## License ##

This code is open source software licensed under the [Apache 2.0 License](http://www.apache.org/licenses/LICENSE-2.0.html).