event_store 0.1.1

Crate to deal with every aspect of an eventstore
    // clippy::cargo
//! # EventStore
//! The `EventStore` will allow you to deal with every aspects of the event sourcing part of Chekov.
//! ## Appending an event
//! Events are appended by using the fluent API exposed at the root level of the event_store crate:
//! ```rust
//! use event_store::prelude::*;
//! # use uuid::Uuid;
//! # use std::convert::TryFrom;
//! # use actix::Actor;
//! #
//! # #[derive(serde::Deserialize, serde::Serialize)]
//! # struct MyEvent{
//! #     account_id: Uuid
//! # }
//! # impl Event for MyEvent {
//! #     fn event_type(&self) -> &'static str { "MyEvent" }
//! #     fn all_event_types() -> Vec<&'static str> { vec!["MyEvent"] }
//! # }
//! # impl TryFrom<RecordedEvent> for MyEvent {
//! #     type Error = ();
//! #      fn try_from(e: RecordedEvent) -> Result<Self, ()> {
//! #        serde_json::from_value(e.data).map_err(|_| ())
//! #      }
//! # }
//! # #[actix::main]
//! # async fn reading() -> Result<(), Box<dyn std::error::Error>>{
//! # let mut event_store = EventStore::builder().storage(InMemoryBackend::default()).build().await.unwrap().start();
//! let stream_uuid = Uuid::new_v4().to_string();
//! let my_event = MyEvent { account_id: Uuid::new_v4() };
//! event_store::append()
//!   .event(&my_event)?
//!   .to(&stream_uuid)?
//!   .execute(event_store)
//!   .await;
//! # Ok(())
//! # }
//! ```
//! ## Reading from stream
//! A `Stream` can be read with the fluent API exposed at the root level of the event_store crate:
//! ```rust
//! use event_store::prelude::*;
//! # use uuid::Uuid;
//! # use actix::Actor;
//! #
//! # #[actix::main]
//! # async fn reading() -> Result<(), Box<dyn std::error::Error>>{
//! # let mut event_store = EventStore::builder().storage(InMemoryBackend::default()).build().await.unwrap().start();
//! let stream_uuid = Uuid::new_v4().to_string();
//! event_store::read()
//!   .stream(&stream_uuid)?
//!   .from(ReadVersion::Origin)
//!   .limit(10)
//!   .execute(event_store)
//!   .await;
//! # Ok(())
//! # }
//! ```

mod connection;
mod error;
mod event;
mod expected_version;
mod read_version;
mod storage;
mod stream;
mod subscriptions;

use actix::prelude::*;
use connection::{Append, Connection, CreateStream, Read, StreamInfo};
use error::EventStoreError;
pub use event::Event;
use event::{ParseEventError, RecordedEvent};
use expected_version::ExpectedVersion;
use tracing::{debug, info, instrument, trace, warn};

use read_version::ReadVersion;
use std::borrow::Cow;
use storage::{appender::Appender, reader::Reader, Storage};
use tracing_futures::Instrument;
use uuid::Uuid;

/// An `EventStore` that hold a storage connection
pub struct EventStore<S: Storage> {
    connection: Addr<Connection<S>>,

impl<S: Storage> std::default::Default for EventStore<S> {
    fn default() -> Self {

impl<S> ::actix::Supervised for EventStore<S> where S: 'static + Storage + std::marker::Unpin {}
impl<S> ::actix::Actor for EventStore<S>
    S: 'static + Storage + std::marker::Unpin,
    type Context = ::actix::Context<Self>;

impl<S: Storage> Handler<storage::reader::ReadStreamRequest> for EventStore<S> {
    type Result = actix::ResponseActFuture<Self, Result<Vec<RecordedEvent>, EventStoreError>>;

    #[tracing::instrument(name = "EventStore::ReadStreamRequest", skip(self, e, _ctx), fields(correlation_id = %e.correlation_id))]
    fn handle(
        &mut self,
        e: storage::reader::ReadStreamRequest,
        _ctx: &mut Self::Context,
    ) -> Self::Result {
        let stream: String = e.stream.to_string();

        info!("Attempting to read {} stream event(s)", stream);

        let connection = self.connection.clone();

        let fut = async move {
            match connection
                .send(Read {
                    correlation_id: e.correlation_id,
                    #[cfg(feature = "verbose")]
                    stream: stream.clone(),
                    #[cfg(not(feature = "verbose"))]
                    version: e.version,
                    limit: e.limit,
                Ok(events) => {
                    #[cfg(feature = "verbose")]
                    info!("Read {} event(s) to {}", events.len(), stream);
                Err(e) => {
                    #[cfg(feature = "verbose")]
                    info!("Failed to read event(s) from {}", stream);


impl<S: Storage> Handler<StreamInfo> for EventStore<S> {
    type Result = actix::ResponseActFuture<
        Result<Cow<'static, crate::stream::Stream>, EventStoreError>,

    #[tracing::instrument(name = "EventStore::StreamInfo", skip(self, e, _ctx), fields(correlation_id = %e.correlation_id))]
    fn handle(&mut self, e: StreamInfo, _ctx: &mut Self::Context) -> Self::Result {
        debug!("Asking for stream {} infos", e.stream_uuid);

        let connection = self.connection.clone();
        let fut = async move { connection.send(e).await? }.into_actor(self);


impl<S: Storage> Handler<CreateStream> for EventStore<S> {
    type Result = actix::ResponseActFuture<
        Result<Cow<'static, crate::stream::Stream>, EventStoreError>,

    #[tracing::instrument(name = "EventStore::CreateStream", skip(self, e, _ctx), fields(correlation_id = %e.correlation_id))]
    fn handle(&mut self, e: CreateStream, _ctx: &mut Self::Context) -> Self::Result {
        debug!("Creating stream {}", e.stream_uuid);

        let connection = self.connection.clone();
        let fut = async move { connection.send(e).await? }.into_actor(self);


impl<S: Storage> Handler<storage::appender::AppendToStreamRequest> for EventStore<S> {
    type Result = actix::ResponseActFuture<Self, Result<Vec<Uuid>, EventStoreError>>;

    #[tracing::instrument(name = "EventStore::AppendToStream", skip(self, e, _ctx), fields(correlation_id = %e.correlation_id))]
    fn handle(
        &mut self,
        e: storage::appender::AppendToStreamRequest,
        _ctx: &mut Self::Context,
    ) -> Self::Result {
        let stream: String = e.stream.to_string();

        #[cfg(feature = "verbose")]
        let events_number = e.events.len();
            "Attempting to append {} event(s) to {} with ExpectedVersion::{:?}",

        let connection = self.connection.clone();
        let fut = async move {
            match connection
                .send(Append {
                    correlation_id: e.correlation_id,
                    #[cfg(feature = "verbose")]
                    stream: stream.clone(),
                    #[cfg(not(feature = "verbose"))]
                    expected_version: e.expected_version,
                    events: e.events,
                Ok(events) => {
                    #[cfg(feature = "verbose")]
                    info!("Appended {} event(s) to {}", events.len(), stream);
                Err(e) => {
                    #[cfg(feature = "verbose")]
                    info!("Failed to append {} event(s) to {}", events_number, stream);


impl<S> EventStore<S>
    S: 'static + Storage + std::marker::Unpin,
    pub fn builder() -> EventStoreBuilder<S> {
        EventStoreBuilder { storage: None }

/// Create an `Appender` to append events
pub fn append() -> Appender {

/// Create a `Reader` to read a stream
pub fn read() -> Reader {

/// Builder use to simplify the `EventStore` creation
pub struct EventStoreBuilder<S: Storage> {
    storage: Option<S>,

impl<S> EventStoreBuilder<S>
    S: Storage,
    /// Define which storage will be used by this building `EventStore`
    pub fn storage(mut self, storage: S) -> Self {
        self.storage = Some(storage);


    /// Try to build the previously configured `EventStore`
    /// # Errors
    /// For now this method can fail only if you haven't define a `Storage`
    // #[instrument(level = "trace", name = "my_name", skip(self))]
    #[instrument(level = "trace", name = "EventStoreBuilder::build", skip(self))]
    pub async fn build(self) -> Result<EventStore<S>, ()> {
        if self.storage.is_none() {
            return Err(());

        trace!("Creating EventStore with {} storage", S::storage_name());
        Ok(EventStore {
            connection: Connection::make(self.storage.unwrap()).start(),

pub mod prelude {
    pub use crate::connection::StreamInfo;
    pub use crate::error::EventStoreError;
    pub use crate::event::{Event, RecordedEvent, RecordedEvents, UnsavedEvent};
    pub use crate::expected_version::ExpectedVersion;
    pub use crate::read_version::ReadVersion;
    pub use crate::storage::{
        appender::Appender, inmemory::InMemoryBackend, postgres::PostgresBackend, reader::Reader,
        Storage, StorageError,
    pub use crate::stream::Stream;
    pub use crate::EventStore;