use crate::patterns::ask::ReplyTo;
use crate::{DefaultWithPersistenceId, EventSourcedEntity, IsFullPopulationCommand, NamedEntity};
use async_trait::async_trait;
use std::fmt::{Debug, Display};
use tokio::sync::broadcast;
#[async_trait]
pub trait ReadEventsFromPersistenceStore<EntityPersistenceStoreContext>
where
Self: EventSourcedEntity + Sized,
{
async fn read_events_from_persistence_store(persistence_context: &EntityPersistenceStoreContext, persistence_id: &Self::PersistenceId) -> anyhow::Result<Vec<Self::Event>>;
}
#[async_trait]
pub trait WriteEventsToPersistenceStore<EntityPersistenceStoreContext>
where
Self: EventSourcedEntity + Sized,
{
async fn write_events_to_persistence_store(&self, persistence_context: &EntityPersistenceStoreContext, events: &Vec<Self::Event>) -> anyhow::Result<()>;
}
pub trait TakeFromAndWriteToCache<EntityCacheContext>
where
Self: EventSourcedEntity + Sized,
{
fn take_from_cache(cache_context: &mut EntityCacheContext, persistence_id: &Self::PersistenceId) -> Option<Self>;
fn write_to_cache(self, cache_context: &mut EntityCacheContext);
}
#[async_trait]
pub trait PublishEventToStream<EventStreamContext>
where
Self: EventSourcedEntity + Clone,
{
async fn publish_to_event_stream(stream_context: &mut EventStreamContext, event_envelope: EventEnvelope<Self>) -> anyhow::Result<()>;
}
#[async_trait]
pub trait SubscribeToEventStream<EventStreamContext>
where
Self: EventSourcedEntity + Clone + Sized,
{
async fn subscribe_to_event_stream(stream_context: &mut EventStreamContext) -> anyhow::Result<broadcast::Receiver<EventEnvelope<Self>>>;
}
#[derive(Clone, Debug)]
pub struct EventEnvelope<Entity>
where
Entity: EventSourcedEntity + ?Sized,
{
pub offset: u64,
pub persistence_id: Entity::PersistenceId,
pub event: Entity::Event,
}
pub struct EventSourcedSystem<EntityCacheContext, EntityPersistenceStoreContext, EventStreamContext> {
offset: u64,
entity_cache_context: EntityCacheContext,
entity_persistence_store_context: EntityPersistenceStoreContext,
event_stream_context: EventStreamContext,
}
pub fn snapshot_name(entity_name: &str, persistence_id: &str) -> String {
format!("{}|{}", entity_name, persistence_id)
}
impl<EntityCacheContext, EntityPersistenceStoreContext, EventStreamContext> EventSourcedSystem<EntityCacheContext, EntityPersistenceStoreContext, EventStreamContext> {
pub async fn init(entity_cache_context: EntityCacheContext, entity_persistence_store_context: EntityPersistenceStoreContext, event_stream_context: EventStreamContext) -> anyhow::Result<Self> {
Ok(Self {
offset: 0,
entity_cache_context,
entity_persistence_store_context,
event_stream_context,
})
}
async fn dispatch_command<Command, Event, Entity>(&mut self, c: Command, entity: &mut Entity) -> anyhow::Result<()>
where
Entity: EventSourcedEntity<Command = Command, Event = Event> + DefaultWithPersistenceId<Entity::PersistenceId> + WriteEventsToPersistenceStore<EntityPersistenceStoreContext> + NamedEntity + PublishEventToStream<EventStreamContext>,
Command: Debug + Send + Sync + 'static,
Event: Clone + Debug + Send + Sync + 'static,
{
let events = entity.handle_command(c)?;
entity.write_events_to_persistence_store(&self.entity_persistence_store_context, &events).await?;
for event in events {
let event_for_consumption = event.clone();
self.offset += 1;
entity.apply_event(event_for_consumption);
let event_envelope: EventEnvelope<Entity> = EventEnvelope {
offset: self.offset,
persistence_id: entity.persistence_id(),
event,
};
let _ = Entity::publish_to_event_stream(&mut self.event_stream_context, event_envelope).await?;
}
Ok(())
}
async fn load_entity<Entity>(&mut self, persistence_id: &Entity::PersistenceId, use_persistence_store: bool) -> anyhow::Result<Option<Entity>>
where
Entity: EventSourcedEntity + DefaultWithPersistenceId<Entity::PersistenceId> + NamedEntity,
Entity: Clone + TakeFromAndWriteToCache<EntityCacheContext> + ReadEventsFromPersistenceStore<EntityPersistenceStoreContext>,
{
Ok(match Entity::take_from_cache(&mut self.entity_cache_context, persistence_id) {
None => {
if use_persistence_store {
tracing::trace!("Entity {} not found in cache, checking persistence store", persistence_id);
let events_from_persistence_store = Entity::read_events_from_persistence_store(&self.entity_persistence_store_context, persistence_id).await?;
if events_from_persistence_store.is_empty() {
tracing::trace!("Entity {} not found in persistence store, returning None", persistence_id);
None
} else {
let mut entity = Entity::default_with_persistence_id(persistence_id);
for event in events_from_persistence_store {
entity.apply_event(event);
}
Some(entity)
}
} else {
tracing::trace!("Entity {} not found in cache, ignoring persistence store as use_persistence_store is false", persistence_id);
None
}
}
Some(e) => {
tracing::trace!("Entity {} found in cache", persistence_id);
Some(e)
}
})
}
async fn find_entity_and_dispatch_command<Entity>(&mut self, persistence_id: &Entity::PersistenceId, c: Entity::Command) -> anyhow::Result<()>
where
Entity: EventSourcedEntity + DefaultWithPersistenceId<Entity::PersistenceId> + NamedEntity,
<Entity as EventSourcedEntity>::PersistenceId: Display,
Entity: Clone + TakeFromAndWriteToCache<EntityCacheContext> + ReadEventsFromPersistenceStore<EntityPersistenceStoreContext> + WriteEventsToPersistenceStore<EntityPersistenceStoreContext> + PublishEventToStream<EventStreamContext>,
Entity::Event: Clone + Debug + Send + Sync + 'static,
{
let mut e = match self.load_entity::<Entity>(&persistence_id, !c.is_full_population_command()).await? {
None => {
tracing::trace!("Entity with persistence_id {} not found in persistence store, creating default_with_persistence_id", &persistence_id);
Entity::default_with_persistence_id(&persistence_id)
}
Some(e) => {
tracing::trace!("Entity {} found in persistence store", &persistence_id);
e
}
};
self.dispatch_command(c, &mut e).await?;
e.write_to_cache(&mut self.entity_cache_context);
Ok(())
}
async fn find_entity_and_return_value<Entity, R, MP>(&mut self, persistence_id: &Entity::PersistenceId, mapper: MP) -> anyhow::Result<Option<R>>
where
Entity: EventSourcedEntity + DefaultWithPersistenceId<Entity::PersistenceId> + NamedEntity,
Entity: Clone + TakeFromAndWriteToCache<EntityCacheContext> + ReadEventsFromPersistenceStore<EntityPersistenceStoreContext> + WriteEventsToPersistenceStore<EntityPersistenceStoreContext> + PublishEventToStream<EventStreamContext>,
Entity::Event: Clone + Debug + Send + Sync + 'static,
MP: FnOnce(&Entity) -> R,
{
match self.load_entity::<Entity>(&persistence_id, true).await? {
None => {
tracing::warn!("Entity with persistence_id {} not found in persistence store", &persistence_id);
Ok(None)
}
Some(e) => {
tracing::trace!("Entity {} found in persistence store", &persistence_id);
let r = mapper(&e);
e.write_to_cache(&mut self.entity_cache_context);
Ok(Some(r))
}
}
}
pub async fn tell<Entity>(&mut self, persistence_id: &Entity::PersistenceId, command: Entity::Command) -> anyhow::Result<()>
where
Entity: EventSourcedEntity + DefaultWithPersistenceId<Entity::PersistenceId> + NamedEntity,
Entity: Clone + TakeFromAndWriteToCache<EntityCacheContext> + ReadEventsFromPersistenceStore<EntityPersistenceStoreContext> + WriteEventsToPersistenceStore<EntityPersistenceStoreContext> + PublishEventToStream<EventStreamContext>,
Entity::Event: Clone + Debug + Send + Sync + 'static,
{
self.find_entity_and_dispatch_command::<Entity>(persistence_id, command).await.map(|_| ())
}
pub async fn ask<Entity, R, FC>(&mut self, persistence_id: &Entity::PersistenceId, fc: FC) -> anyhow::Result<R>
where
Entity: EventSourcedEntity + DefaultWithPersistenceId<Entity::PersistenceId> + NamedEntity,
Entity: Clone + TakeFromAndWriteToCache<EntityCacheContext> + ReadEventsFromPersistenceStore<EntityPersistenceStoreContext> + WriteEventsToPersistenceStore<EntityPersistenceStoreContext> + PublishEventToStream<EventStreamContext>,
Entity::Event: Clone + Debug + Send + Sync + 'static,
FC: FnOnce(ReplyTo<R>) -> Entity::Command,
{
let (rx, reply_to) = ReplyTo::new();
let command = fc(reply_to);
self.find_entity_and_dispatch_command::<Entity>(persistence_id, command).await?;
Ok(rx.await?)
}
pub async fn get<Entity, R, MP>(&mut self, persistence_id: &Entity::PersistenceId, mapper: MP) -> anyhow::Result<Option<R>>
where
Entity: EventSourcedEntity + DefaultWithPersistenceId<Entity::PersistenceId> + NamedEntity,
Entity: Clone + TakeFromAndWriteToCache<EntityCacheContext> + ReadEventsFromPersistenceStore<EntityPersistenceStoreContext> + WriteEventsToPersistenceStore<EntityPersistenceStoreContext> + PublishEventToStream<EventStreamContext>,
Entity::Event: Clone + Debug + Send + Sync + 'static,
MP: FnOnce(&Entity) -> R,
{
Ok(self.find_entity_and_return_value::<Entity, R, MP>(persistence_id, mapper).await?)
}
pub async fn subscribe_to_event_stream<Entity>(&mut self) -> anyhow::Result<broadcast::Receiver<EventEnvelope<Entity>>>
where
Entity: EventSourcedEntity + SubscribeToEventStream<EventStreamContext>,
{
Entity::subscribe_to_event_stream(&mut self.event_stream_context).await
}
pub async fn stop(self) -> anyhow::Result<EventStreamContext> {
Ok(self.event_stream_context)
}
}