eventful 0.2.1

A library for event sourcing in Rust
Documentation
use crate::event_sourced_system::EventEnvelope;
use crate::EventSourcedEntity;
use async_trait::async_trait;
use std::marker::PhantomData;
use tokio::sync::broadcast;
use tokio::sync::broadcast::error::RecvError;
use tracing::{debug, error, info};

#[async_trait]
pub trait ReadProjectionMessageHandler<Entity, Command>
where
    Entity: EventSourcedEntity + Clone,
{
    async fn get_next_command(&mut self) -> Option<Command>;
    async fn handle_command(&mut self, command: Command) -> anyhow::Result<()>;
    async fn handle_event(&mut self, envelope: EventEnvelope<Entity>) -> anyhow::Result<()>;
}

pub struct ReadProjection<Entity, Command, ItemHandler>
where
    Entity: EventSourcedEntity + Clone,
    ItemHandler: ReadProjectionMessageHandler<Entity, Command>,
{
    name: String,
    event_stream: broadcast::Receiver<EventEnvelope<Entity>>,
    item_handler: ItemHandler,
    pd: PhantomData<Command>,
}

const LOG_COUNT_INTERVAL: u64 = 10;

impl<Entity, Command, ItemHandler> ReadProjection<Entity, Command, ItemHandler>
where
    ItemHandler: ReadProjectionMessageHandler<Entity, Command>,
    Entity: EventSourcedEntity + Clone,
    EventEnvelope<Entity>: Clone,
{
    pub fn new<NAME>(name: NAME, event_stream: broadcast::Receiver<EventEnvelope<Entity>>, item_handler: ItemHandler) -> Self
    where
        NAME: Into<String>,
    {
        Self {
            name: name.into(),
            event_stream,
            item_handler,
            pd: PhantomData,
        }
    }

    pub async fn run(mut self) {
        info!("Running read projection: {}", &self.name);
        loop {
            tokio::select! {
                Some(command) = self.item_handler.get_next_command() => {
                    match self.item_handler.handle_command(command).await {
                        Ok(_) => {}
                        Err(e) => {
                            error!("Error handling command: {:?}", e);
                        }
                    }
                }
                event_result = self.event_stream.recv() => {
                    match event_result {
                        Err(RecvError::Closed) => {
                            error!("Reached end of stream");
                            break;
                        }
                        Err(RecvError::Lagged(lagged_by)) => {
                            error!("Lagged by {} when reading stream", lagged_by);
                        }
                        Ok(envelope) => {
                            debug!("Got event in read projection {} - {:?}", envelope.offset, envelope.event);
                            if envelope.offset % LOG_COUNT_INTERVAL == 0 {
                                info!("Read projection {} processed {} events", &self.name, envelope.offset);
                            }
                            if let Err(e) = self.item_handler.handle_event(envelope).await {
                                        error!("Error handling event: {:?}", e);
                            }
                        }
                    }
                }
            }
        }
    }
}