use std::{
collections::HashMap,
marker::PhantomData,
};
use postgres::Client;
use serde_json::Value;
use cqrs_es2::{
AggregateContext,
AggregateError,
EventEnvelope,
IAggregate,
IEventStore,
};
use crate::sql::{
INSERT_EVENT,
INSERT_SNAPSHOT,
SELECT_EVENTS,
SELECT_SNAPSHOT,
UPDATE_SNAPSHOT,
};
pub struct SnapshotEventStore<A: IAggregate> {
conn: Client,
_phantom: PhantomData<A>,
}
impl<A: IAggregate> SnapshotEventStore<A> {
pub fn new(conn: Client) -> Self {
SnapshotEventStore {
conn,
_phantom: PhantomData,
}
}
}
impl<A: IAggregate> IEventStore<A> for SnapshotEventStore<A> {
fn load(
&mut self,
aggregate_id: &str,
) -> Vec<EventEnvelope<A>> {
let agg_type = A::aggregate_type();
let id = aggregate_id.to_string();
let mut result = Vec::new();
match self
.conn
.query(SELECT_EVENTS, &[&agg_type, &id])
{
Ok(rows) => {
for row in rows.iter() {
let aggregate_id: String =
row.get("aggregate_id");
let s: i64 = row.get("sequence");
let sequence = s as usize;
let payload: A::Event =
match serde_json::from_value(
row.get("payload"),
) {
Ok(payload) => payload,
Err(err) => {
panic!(
"bad payload found in events \
table for aggregate id {} with \
error: {}",
&id, err
);
},
};
let event = EventEnvelope::new(
aggregate_id,
sequence,
payload,
);
result.push(event);
}
},
Err(e) => {
panic!("{:?}", e);
},
}
result
}
fn load_aggregate(
&mut self,
aggregate_id: &str,
) -> AggregateContext<A> {
let agg_type = A::aggregate_type();
match self.conn.query(
SELECT_SNAPSHOT,
&[&agg_type, &aggregate_id.to_string()],
) {
Ok(rows) => {
match rows.iter().next() {
None => {
AggregateContext {
aggregate_id: aggregate_id.to_string(),
aggregate: A::default(),
current_sequence: 0,
}
},
Some(row) => {
let s: i64 = row.get("last_sequence");
let val: Value = row.get("payload");
let aggregate =
serde_json::from_value(val).unwrap();
AggregateContext {
aggregate_id: aggregate_id.to_string(),
aggregate: aggregate,
current_sequence: s as usize,
}
},
}
},
Err(e) => {
panic!("{:?}", e);
},
}
}
fn commit(
&mut self,
events: Vec<A::Event>,
context: AggregateContext<A>,
metadata: HashMap<String, String>,
) -> Result<Vec<EventEnvelope<A>>, AggregateError> {
let mut updated_aggregate = context.aggregate.clone();
let agg_type = A::aggregate_type().to_string();
let aggregate_id = context.aggregate_id.as_str();
let current_sequence = context.current_sequence;
let wrapped_events = self.wrap_events(
aggregate_id,
current_sequence,
events,
metadata,
);
let mut trans = match self.conn.transaction() {
Ok(t) => t,
Err(err) => {
return Err(AggregateError::TechnicalError(
err.to_string(),
));
},
};
let mut last_sequence = current_sequence as i64;
for event in wrapped_events.clone() {
let id = context.aggregate_id.clone();
let sequence = event.sequence as i64;
last_sequence = sequence;
let payload = match serde_json::to_value(&event.payload) {
Ok(payload) => payload,
Err(err) => {
panic!(
"bad payload found in events table for \
aggregate id {} with error: {}",
&id, err
);
},
};
let metadata = match serde_json::to_value(&event.metadata)
{
Ok(metadata) => metadata,
Err(err) => {
panic!(
"bad metadata found in events table for \
aggregate id {} with error: {}",
&id, err
);
},
};
match trans.execute(
INSERT_EVENT,
&[
&agg_type, &id, &sequence, &payload, &metadata,
],
) {
Ok(_) => {},
Err(err) => {
match err.code() {
None => {},
Some(state) => {
if state.code() == "23505" {
return Err(
AggregateError::TechnicalError(
"optimistic lock error"
.to_string(),
),
);
}
},
}
panic!(
"unable to insert event table for aggregate \
id {} with error: {}\n and payload: {}",
&id, err, &payload
);
},
};
updated_aggregate.apply(&event.payload);
}
let aggregate_payload =
match serde_json::to_value(updated_aggregate) {
Ok(val) => val,
Err(err) => {
panic!(
"bad metadata found in events table for \
aggregate id {} with error: {}",
&aggregate_id, err
);
},
};
if context.current_sequence == 0 {
match trans.execute(
INSERT_SNAPSHOT,
&[
&agg_type,
&aggregate_id,
&last_sequence,
&aggregate_payload,
],
) {
Ok(_) => {},
Err(err) => {
panic!(
"unable to insert snapshot for aggregate id \
{} with error: {}\n and payload: {}",
&aggregate_id, err, &aggregate_payload
);
},
};
}
else {
match trans.execute(
UPDATE_SNAPSHOT,
&[
&agg_type,
&aggregate_id,
&last_sequence,
&aggregate_payload,
],
) {
Ok(_) => {},
Err(err) => {
panic!(
"unable to update snapshot for aggregate id \
{} with error: {}\n and payload: {}",
&aggregate_id, err, &aggregate_payload
);
},
};
}
match trans.commit() {
Ok(_) => Ok(wrapped_events),
Err(err) => {
Err(AggregateError::TechnicalError(
err.to_string(),
))
},
}
}
}