use std::marker::PhantomData;
use postgres::Client;
use cqrs_es2::{
Aggregate,
AggregateError,
EventEnvelope,
Query,
QueryProcessor,
};
use super::query_context::QueryContext;
pub struct GenericQueryRepository<V, A>
where
V: Query<A>,
A: Aggregate, {
conn: Client,
query_name: String,
error_handler: Option<Box<ErrorHandler>>,
_phantom: PhantomData<(V, A)>,
}
type ErrorHandler = dyn Fn(AggregateError);
impl<V, A> GenericQueryRepository<V, A>
where
V: Query<A>,
A: Aggregate,
{
#[must_use]
pub fn new(
query_name: &str,
conn: Client,
) -> Self {
GenericQueryRepository {
conn,
query_name: query_name.to_string(),
error_handler: None,
_phantom: PhantomData,
}
}
pub fn with_error_handler(
&mut self,
error_handler: Box<ErrorHandler>,
) {
self.error_handler = Some(error_handler);
}
#[must_use]
pub fn view_name(&self) -> String {
self.query_name.to_string()
}
fn load_mut(
&mut self,
query_instance_id: String,
) -> Result<(V, QueryContext<V>), AggregateError> {
let query = format!(
"SELECT version,payload FROM {} WHERE \
query_instance_id= $1",
&self.query_name
);
let result = match self
.conn
.query(query.as_str(), &[&query_instance_id])
{
Ok(result) => result,
Err(e) => {
return Err(AggregateError::new(
e.to_string().as_str(),
));
},
};
match result.iter().next() {
Some(row) => {
let view_name = self.query_name.clone();
let version = row.get("version");
let payload = row.get("payload");
let view = serde_json::from_value(payload)?;
let view_context = QueryContext::new(
view_name,
query_instance_id,
version,
PhantomData,
);
Ok((view, view_context))
},
None => {
let view_context = QueryContext::new(
self.query_name.clone(),
query_instance_id,
0,
PhantomData,
);
Ok((Default::default(), view_context))
},
}
}
pub fn apply_events(
&mut self,
query_instance_id: &str,
events: &[EventEnvelope<A>],
) {
match self.load_mut(query_instance_id.to_string()) {
Ok((mut view, mut view_context)) => {
for event in events {
view.update(event);
}
view_context.commit(&mut self.conn, view);
},
Err(e) => {
match &self.error_handler {
None => {},
Some(handler) => {
(handler)(e);
},
}
},
};
}
pub fn load(
&mut self,
query_instance_id: String,
) -> Option<V> {
let query = format!(
"SELECT version,payload FROM {} WHERE \
query_instance_id= $1",
&self.query_name
);
let result = match self
.conn
.query(query.as_str(), &[&query_instance_id])
{
Ok(result) => result,
Err(err) => {
panic!(
"unable to load view '{}' with id: '{}', \
encountered: {}",
&query_instance_id, &self.query_name, err
);
},
};
match result.iter().next() {
Some(row) => {
let payload = row.get("payload");
match serde_json::from_value(payload) {
Ok(view) => Some(view),
Err(e) => {
match &self.error_handler {
None => {},
Some(handler) => {
(handler)(e.into());
},
}
None
},
}
},
None => None,
}
}
}
impl<Q, A> QueryProcessor<A> for GenericQueryRepository<Q, A>
where
Q: Query<A>,
A: Aggregate,
{
fn dispatch(
&mut self,
query_instance_id: &str,
events: &[EventEnvelope<A>],
) {
self.apply_events(&query_instance_id.to_string(), &events);
}
}