use super::event::*;
use crate::aggregate::UncommittedEvent;
use crate::error::{Error, Result as TsResult};
use chrono::{DateTime, Utc};
use futures::stream::{BoxStream, StreamExt, TryStreamExt};
use sqlx::postgres::PgPoolOptions;
use sqlx::PgPool;
use std::fmt::Debug;
use uuid::Uuid;
#[derive(Debug)]
pub enum CommitOrder {
None,
First,
Following(DateTime<Utc>),
}
#[async_trait]
pub trait EventStore {
type Event: Send + Sync + TimesourceEventPayload + std::fmt::Debug;
async fn commit(
&self,
aggregate_id: Uuid,
order: CommitOrder,
events: &[UncommittedEvent<Self::Event>],
) -> TsResult<Vec<u64>>;
async fn aggregate_stream(
&self,
aggregate_id: Uuid,
) -> BoxStream<'_, TsResult<Persisted<Self::Event>>>;
async fn remove(&mut self, aggregate_id: Uuid) -> TsResult<()>;
}
#[derive(Debug)]
pub struct EventStoreBuilder<'a> {
with_migrations: bool,
dsn: &'a str,
}
impl<'a> EventStoreBuilder<'a> {
pub fn new(dsn: &'a str) -> Self {
Self {
with_migrations: true,
dsn,
}
}
pub fn with_migrations(mut self, value: bool) -> Self {
self.with_migrations = value;
self
}
pub async fn build<Event>(&self, aggregate_type_name: &str) -> TsResult<TimescaleStore<Event>>
where
Event: Send + Sync + Debug + TimesourceEventPayload,
{
let pool = PgPoolOptions::new()
.connect(self.dsn)
.await
.expect("being able to create Pg pool");
if self.with_migrations {
sqlx::migrate!().run(&pool).await?;
}
let aggregate_type_id =
sqlx::query_file_scalar!("queries/aggregate_type/id.sql", aggregate_type_name)
.fetch_one(&pool)
.await?
.ok_or_else(|| Error::InvalidData("Unable to get aggregate type id".into()))?;
sqlx::query_file!(
"queries/event/get_all_name_events.sql",
aggregate_type_id,
100
)
.fetch_all(&pool)
.await?;
trace!(
aggregate_type_name,
aggregate_type_id,
"Aggregate root data received"
);
let event_meta_id = sqlx::query_file_scalar!(
"queries/event/upsert_event_meta.sql",
aggregate_type_id,
Event::name(),
Event::version(),
&Event::encoding() as _
)
.fetch_one(&pool)
.await?;
Ok(TimescaleStore {
payload: std::marker::PhantomData,
pool,
aggregate_type_id,
event_meta_id: event_meta_id.unwrap(),
})
}
}
#[derive(sqlx::Type)]
#[sqlx(type_name = "append_event_data")]
struct AppendEventInput {
bytes: Option<Vec<u8>>,
json: Option<String>,
meta_id: i32,
time: i64,
}
#[derive(sqlx::Type)]
#[sqlx(type_name = "_append_event_data")]
struct VecAppendEventInput(Vec<AppendEventInput>);
#[derive(Clone, Debug)]
pub struct TimescaleStore<Event> {
payload: std::marker::PhantomData<Event>,
pool: PgPool,
pub event_meta_id: i32,
pub aggregate_type_id: i32,
}
#[async_trait]
impl<Event> EventStore for TimescaleStore<Event>
where
Event: Send + Sync + TimesourceEventPayload + Debug,
{
type Event = Event;
#[tracing::instrument]
async fn commit(
&self,
aggregate_id: Uuid,
order: CommitOrder,
events: &[UncommittedEvent<Self::Event>],
) -> TsResult<Vec<u64>> {
if events.is_empty() {
return Err(Error::InvalidData(
"list of events can't be empty".to_string(),
));
}
let mut event_data: Vec<AppendEventInput> = Vec::with_capacity(events.len());
for event in events {
let time = event.utc.timestamp_nanos();
let payload = &event.data.encode_to_payload()?;
event_data.push(AppendEventInput {
bytes: if let EventPayloadEncoding::Bytes(bytes) = payload {
Some(bytes.to_owned())
} else {
None
},
json: if let EventPayloadEncoding::Json(str) = payload {
Some(str.to_owned())
} else {
None
},
meta_id: self.event_meta_id,
time,
});
}
let (orderly, last_known_time) = match order {
CommitOrder::None => (false, None),
CommitOrder::First => (true, None),
CommitOrder::Following(utc) => (true, Some(utc.timestamp_nanos())),
};
let inserted_ids = sqlx::query_file_scalar!(
"queries/event/append.sql",
self.aggregate_type_id,
&aggregate_id,
&VecAppendEventInput(event_data) as _,
orderly,
last_known_time
)
.fetch_one(&self.pool)
.await?;
debug!(?inserted_ids, "Committed events");
Ok(inserted_ids
.unwrap_or_default()
.into_iter()
.map(|x| x as _)
.collect::<Vec<_>>())
}
#[tracing::instrument]
async fn aggregate_stream(
&self,
aggregate_id: Uuid,
) -> BoxStream<'_, TsResult<Persisted<Self::Event>>> {
sqlx::query_file_as!(
EventRow,
"queries/event/get_aggregate_all.sql",
self.aggregate_type_id,
aggregate_id,
)
.fetch(&self.pool)
.map_err(Error::from)
.map(move |x| x.and_then(|x| Persisted::try_from(x).map_err(Error::from)))
.boxed()
}
#[tracing::instrument]
async fn remove(&mut self, aggregate_id: Uuid) -> TsResult<()> {
sqlx::query_file!(
"queries/event/delete_aggregate.sql",
self.aggregate_type_id,
&aggregate_id
)
.execute(&self.pool)
.await
.map_err(Error::from)
.map(|_| ())
}
}