chekov 0.1.1

CQRS/ES Framework
use crate::application::Application;
use crate::message::{ExecuteAppender, ExecuteReader, ExecuteStreamInfo};
pub use ::event_store::prelude::Event;
pub use ::event_store::prelude::RecordedEvent;
use actix::{Addr, Context, MailboxError, SystemService, WrapFuture};
use event_store::prelude::EventStoreError;
use event_store::prelude::Stream;
use uuid::Uuid;

pub use event_store::prelude::PostgresBackend;

pub(crate) struct EventStore<A: Application> {
    pub(crate) addr: Addr<event_store::EventStore<A::Storage>>,
}

impl<A> EventStore<A>
where
    A: Application,
{
    pub async fn with_appender(
        appender: event_store::prelude::Appender,
    ) -> Result<Result<Vec<Uuid>, EventStoreError>, MailboxError> {
        Self::from_registry().send(ExecuteAppender(appender)).await
    }
    pub async fn with_reader(
        reader: event_store::prelude::Reader,
    ) -> Result<Result<Vec<RecordedEvent>, EventStoreError>, MailboxError> {
        Self::from_registry().send(ExecuteReader(reader)).await
    }

    #[allow(dead_code)]
    pub async fn stream_info(
        stream_uuid: &str,
    ) -> Result<Result<event_store::prelude::Stream, EventStoreError>, MailboxError> {
        Self::from_registry()
            .send(ExecuteStreamInfo(stream_uuid.to_string()))
            .await
    }
}

impl<A> actix::Handler<ExecuteReader> for EventStore<A>
where
    A: Application,
{
    type Result = actix::ResponseActFuture<Self, Result<Vec<RecordedEvent>, EventStoreError>>;

    fn handle(&mut self, reader: ExecuteReader, _: &mut Self::Context) -> Self::Result {
        let addr = self.addr.clone();
        Box::pin(async move { reader.0.execute(addr).await }.into_actor(self))
    }
}

impl<A> actix::Handler<ExecuteAppender> for EventStore<A>
where
    A: Application,
{
    type Result = actix::ResponseActFuture<Self, Result<Vec<uuid::Uuid>, EventStoreError>>;

    fn handle(&mut self, appender: ExecuteAppender, _: &mut Self::Context) -> Self::Result {
        let addr = self.addr.clone();
        Box::pin(async move { appender.0.execute(addr).await }.into_actor(self))
    }
}

impl<A> actix::Handler<ExecuteStreamInfo> for EventStore<A>
where
    A: Application,
{
    type Result = actix::ResponseActFuture<Self, Result<Stream, EventStoreError>>;

    fn handle(&mut self, appender: ExecuteStreamInfo, _: &mut Self::Context) -> Self::Result {
        let addr = self.addr.clone();

        Box::pin(
            async move {
                match addr
                    .send(event_store::prelude::StreamInfo {
                        correlation_id: uuid::Uuid::new_v4(),
                        stream_uuid: appender.0,
                    })
                    .await
                {
                    Ok(Ok(stream)) => Ok(stream.into_owned()),
                    _ => Err(EventStoreError::Any),
                }
            }
            .into_actor(self),
        )
    }
}

impl<A> actix::Actor for EventStore<A>
where
    A: Application,
{
    type Context = Context<Self>;
}

impl<A> actix::SystemService for EventStore<A> where A: Application {}
impl<A> actix::Supervised for EventStore<A> where A: Application {}
impl<A> std::default::Default for EventStore<A>
where
    A: Application,
{
    fn default() -> Self {
        unimplemented!()
    }
}