use std::marker::PhantomData;
use async_trait::async_trait;
use cqrs_es::{Aggregate, EventEnvelope, Query, View};
use crate::{PersistenceError, QueryContext, ViewRepository};
pub struct GenericQuery<R, V, A>
where
R: ViewRepository<V, A>,
V: View<A>,
A: Aggregate,
{
repo: R,
error_handler: Option<Box<ErrorHandler>>,
phantom: PhantomData<(V, A)>,
}
type ErrorHandler = dyn Fn(PersistenceError) + Send + Sync + 'static;
impl<R, V, A> GenericQuery<R, V, A>
where
R: ViewRepository<V, A>,
V: View<A>,
A: Aggregate,
{
#[must_use]
pub fn new(repo: R) -> Self {
GenericQuery {
repo,
error_handler: None,
phantom: Default::default(),
}
}
pub fn use_error_handler(&mut self, error_handler: Box<ErrorHandler>) {
self.error_handler = Some(error_handler);
}
pub async fn load(&self, query_instance_id: String) -> Option<V> {
match self.repo.load(&query_instance_id).await {
Ok(option) => option.map(|(view, _)| view),
Err(e) => {
self.handle_error(e);
None
}
}
}
async fn load_mut(
&self,
query_instance_id: String,
) -> Result<(V, QueryContext), PersistenceError> {
match self.repo.load(&query_instance_id).await? {
None => {
let view_context = QueryContext::new(query_instance_id, 0);
Ok((Default::default(), view_context))
}
Some((view, context)) => Ok((view, context)),
}
}
pub(crate) async fn apply_events(
&self,
query_instance_id: &str,
events: &[EventEnvelope<A>],
) -> Result<(), PersistenceError> {
let (mut view, view_context) = self.load_mut(query_instance_id.to_string()).await?;
for event in events {
view.update(event);
}
self.repo.update_view(view, view_context).await?;
Ok(())
}
fn handle_error(&self, error: PersistenceError) {
match &self.error_handler {
None => {}
Some(handler) => {
(handler)(error);
}
}
}
}
#[async_trait]
impl<R, V, A> Query<A> for GenericQuery<R, V, A>
where
R: ViewRepository<V, A>,
V: View<A>,
A: Aggregate,
{
async fn dispatch(&self, query_instance_id: &str, events: &[EventEnvelope<A>]) {
match self
.apply_events(&query_instance_id.to_string(), events)
.await
{
Ok(_) => {}
Err(err) => self.handle_error(err),
};
}
}