use crate::event_sourced_system::EventEnvelope;
use crate::EventSourcedEntity;
use async_trait::async_trait;
use std::marker::PhantomData;
use tokio::sync::broadcast;
use tokio::sync::broadcast::error::RecvError;
use tracing::{debug, error, info};
#[async_trait]
pub trait ReadProjectionMessageHandler<Entity, Command>
where
Entity: EventSourcedEntity + Clone,
{
async fn get_next_command(&mut self) -> Option<Command>;
async fn handle_command(&mut self, command: Command) -> anyhow::Result<()>;
async fn handle_event(&mut self, envelope: EventEnvelope<Entity>) -> anyhow::Result<()>;
}
pub struct ReadProjection<Entity, Command, ItemHandler>
where
Entity: EventSourcedEntity + Clone,
ItemHandler: ReadProjectionMessageHandler<Entity, Command>,
{
name: String,
event_stream: broadcast::Receiver<EventEnvelope<Entity>>,
item_handler: ItemHandler,
pd: PhantomData<Command>,
}
const LOG_COUNT_INTERVAL: u64 = 10;
impl<Entity, Command, ItemHandler> ReadProjection<Entity, Command, ItemHandler>
where
ItemHandler: ReadProjectionMessageHandler<Entity, Command>,
Entity: EventSourcedEntity + Clone,
EventEnvelope<Entity>: Clone,
{
pub fn new<NAME>(name: NAME, event_stream: broadcast::Receiver<EventEnvelope<Entity>>, item_handler: ItemHandler) -> Self
where
NAME: Into<String>,
{
Self {
name: name.into(),
event_stream,
item_handler,
pd: PhantomData,
}
}
pub async fn run(mut self) {
info!("Running read projection: {}", &self.name);
loop {
tokio::select! {
Some(command) = self.item_handler.get_next_command() => {
match self.item_handler.handle_command(command).await {
Ok(_) => {}
Err(e) => {
error!("Error handling command: {:?}", e);
}
}
}
event_result = self.event_stream.recv() => {
match event_result {
Err(RecvError::Closed) => {
error!("Reached end of stream");
break;
}
Err(RecvError::Lagged(lagged_by)) => {
error!("Lagged by {} when reading stream", lagged_by);
}
Ok(envelope) => {
debug!("Got event in read projection {} - {:?}", envelope.offset, envelope.event);
if envelope.offset % LOG_COUNT_INTERVAL == 0 {
info!("Read projection {} processed {} events", &self.name, envelope.offset);
}
if let Err(e) = self.item_handler.handle_event(envelope).await {
error!("Error handling event: {:?}", e);
}
}
}
}
}
}
}
}