use crate::{
ids::LogId,
stores::{EventStore, EventStoreError},
AppendOptions, EventRecord,
};
use chrono::Utc;
use const_format::formatcp;
use deadpool_postgres::{GenericClient, Pool, PoolError};
use futures_util::TryStreamExt;
use serde::{de::DeserializeOwned, Serialize};
use std::fmt::Debug;
use tokio_postgres::{
error::SqlState,
types::{FromSql, Json, ToSql, Type},
Error, Row,
};
const SCHEMA_NAME: &str = "eventlogs";
const TABLE_NAME: &str = "events";
const QUALIFIED_TABLE_NAME: &str = formatcp!("{SCHEMA_NAME}.{TABLE_NAME}");
const PK_CONSTRAINT: &str = formatcp!("{TABLE_NAME}_pkey");
const IDEMPOTENCY_KEY_CONSTRAINT: &str = "idempotency_key_unique";
const COLUMN_LIST: &str = "log_id,event_index,recorded_at,idempotency_key,payload";
const SELECT_EVENTS: &str = formatcp!(
"select {COLUMN_LIST} from {QUALIFIED_TABLE_NAME}
where log_id = $1 and event_index >= $2
order by log_id, event_index"
);
const SELECT_EVENTS_WITH_LIMIT: &str = formatcp!("{SELECT_EVENTS} limit $3");
const INSERT_EVENT: &str =
formatcp!("insert into {QUALIFIED_TABLE_NAME} ({COLUMN_LIST}) values ($1,$2,$3,$4,$5)");
const SELECT_EVENT_FOR_IDEMPOTENCY_KEY: &str =
formatcp!("select log_id, event_index from {QUALIFIED_TABLE_NAME} where idempotency_key=$1");
impl From<PoolError> for EventStoreError {
fn from(value: PoolError) -> Self {
EventStoreError::DatabaseError {
error: Box::new(value),
}
}
}
impl From<Error> for EventStoreError {
fn from(value: Error) -> Self {
EventStoreError::DatabaseError {
error: Box::new(value),
}
}
}
impl<'a> FromSql<'a> for LogId {
fn from_sql(
_ty: &Type,
raw: &'a [u8],
) -> Result<Self, Box<dyn std::error::Error + Sync + Send>> {
let s = String::from_utf8_lossy(raw);
let log_id: LogId = s.parse()?;
Ok(log_id)
}
fn accepts(ty: &Type) -> bool {
matches!(ty, &Type::VARCHAR | &Type::CHAR)
}
}
impl<E> EventRecord<E> for Row
where
E: Serialize + DeserializeOwned + Debug + Send + Sync,
{
fn index(&self) -> u32 {
self.get("event_index")
}
fn recorded_at(&self) -> chrono::DateTime<Utc> {
self.get("recorded_at")
}
fn idempotency_key(&self) -> Option<String> {
self.get("idempotency_key")
}
fn event(&self) -> E {
self.get::<_, Json<E>>("payload").0
}
}
pub struct PostgresEventStore {
pool: Pool,
}
impl PostgresEventStore {
pub fn new(pool: Pool) -> Self {
PostgresEventStore { pool }
}
}
impl<E> EventStore<E> for PostgresEventStore
where
E: Serialize + DeserializeOwned + Debug + Send + Sync,
{
async fn append(
&self,
log_id: &LogId,
event: &E,
event_index: u32,
append_options: &AppendOptions,
) -> Result<(), EventStoreError> {
let conn = self.pool.get().await?;
let stmt = conn.prepare_cached(INSERT_EVENT).await?;
let result = conn
.execute(
&stmt,
&[
&log_id.to_string(),
&event_index,
&Utc::now(),
&append_options.idempotency_key,
&Json(event),
],
)
.await;
if let Err(ref e) = result {
if e.code() == Some(&SqlState::UNIQUE_VIOLATION) {
if let Some(dbe) = e.as_db_error() {
if dbe.constraint() == Some(PK_CONSTRAINT) {
return Err(EventStoreError::EventIndexAlreadyExists {
log_id: log_id.clone(),
event_index,
});
}
if dbe.constraint() == Some(IDEMPOTENCY_KEY_CONSTRAINT) {
if let Some(ref key) = append_options.idempotency_key {
let query = conn
.prepare_cached(SELECT_EVENT_FOR_IDEMPOTENCY_KEY)
.await?;
let row = conn.query_one(&query, &[&key]).await?;
return Err(EventStoreError::IdempotentReplay {
idempotency_key: key.clone(),
log_id: row.get("log_id"),
event_index: row.get("event_index"),
});
}
}
}
}
}
Ok(result.map(|_| ())?)
}
async fn load(
&self,
log_id: &LogId,
starting_index: u32,
max_events: u32,
) -> Result<
impl futures_util::Stream<Item = Result<impl EventRecord<E>, EventStoreError>>,
EventStoreError,
> {
let sql = if max_events == u32::MAX {
SELECT_EVENTS
} else {
SELECT_EVENTS_WITH_LIMIT
};
let conn = self.pool.get().await?;
let stmt = conn.prepare_cached(sql).await?;
let log_id_param = log_id.to_string();
let mut params: Vec<&(dyn ToSql + Sync)> = vec![&log_id_param, &starting_index];
let limit_param = max_events as i64;
if max_events < u32::MAX {
params.push(&limit_param);
}
let row_stream = conn.query_raw(&stmt, params).await?;
Ok(row_stream.map_err(|e| EventStoreError::DatabaseError { error: Box::new(e) }))
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::tests::TestEvent;
use deadpool_postgres::Config;
use deadpool_redis::Runtime;
use futures_util::StreamExt;
use tokio_postgres::NoTls;
use uuid::Uuid;
fn store() -> impl EventStore<TestEvent> {
let mut cfg = Config::new();
cfg.host = Some("localhost".to_string());
cfg.user = Some("postgres".to_string());
cfg.password = Some("ci-postgres-password".to_string());
cfg.dbname = Some("postgres".to_string());
let pool = cfg.create_pool(Some(Runtime::Tokio1), NoTls).unwrap();
PostgresEventStore::new(pool)
}
#[tokio::test]
async fn append_load() {
let log_id = LogId::new();
let store = store();
store
.append(&log_id, &TestEvent::Increment, 0, &AppendOptions::default())
.await
.unwrap();
let row_stream = store.load(&log_id, 0, u32::MAX).await.unwrap();
assert_eq!(row_stream.count().await, 1);
}
#[tokio::test]
async fn append_multiple_load() {
let log_id = LogId::new();
let store = store();
store
.append(&log_id, &TestEvent::Increment, 0, &AppendOptions::default())
.await
.unwrap();
store
.append(&log_id, &TestEvent::Decrement, 1, &AppendOptions::default())
.await
.unwrap();
store
.append(&log_id, &TestEvent::Increment, 2, &AppendOptions::default())
.await
.unwrap();
let row_stream = store.load(&log_id, 0, u32::MAX).await.unwrap();
assert_eq!(row_stream.count().await, 3);
}
#[tokio::test]
async fn idempotent_create() {
let log_id = LogId::new();
let store = store();
let idempotency_key = Uuid::now_v7().to_string();
let options = AppendOptions {
idempotency_key: Some(idempotency_key.clone()),
..Default::default()
};
store
.append(&log_id, &TestEvent::Increment, 0, &options)
.await
.unwrap();
let replay_log_id = LogId::new();
let result = store
.append(&replay_log_id, &TestEvent::Increment, 0, &options)
.await;
assert_eq!(
result,
Err(EventStoreError::IdempotentReplay {
idempotency_key: idempotency_key.clone(),
log_id: log_id.clone(), event_index: 0
})
)
}
#[tokio::test]
async fn concurrent_append() {
let log_id = LogId::new();
let store = store();
store
.append(&log_id, &TestEvent::Increment, 0, &AppendOptions::default())
.await
.unwrap();
store
.append(&log_id, &TestEvent::Decrement, 1, &AppendOptions::default())
.await
.unwrap();
let result = store
.append(&log_id, &TestEvent::Decrement, 1, &AppendOptions::default())
.await;
assert_eq!(
result,
Err(EventStoreError::EventIndexAlreadyExists {
log_id: log_id,
event_index: 1
})
)
}
#[tokio::test]
async fn idempotent_append() {
let log_id = LogId::new();
let store = store();
let idempotency_key = Uuid::now_v7().to_string();
let options = AppendOptions {
idempotency_key: Some(idempotency_key.clone()),
..Default::default()
};
store
.append(&log_id, &TestEvent::Increment, 0, &AppendOptions::default())
.await
.unwrap();
store
.append(&log_id, &TestEvent::Decrement, 1, &options)
.await
.unwrap();
let result = store
.append(&log_id, &TestEvent::Decrement, 2, &options)
.await;
assert_eq!(
result,
Err(EventStoreError::IdempotentReplay {
idempotency_key: idempotency_key.clone(),
log_id: log_id.clone(), event_index: 1 })
);
let row_stream = store.load(&log_id, 0, u32::MAX).await.unwrap();
assert_eq!(row_stream.count().await, 2);
}
#[tokio::test]
async fn load_limit() {
let log_id = LogId::new();
let store = store();
for i in 0..10 {
store
.append(&log_id, &TestEvent::Increment, i, &AppendOptions::default())
.await
.unwrap();
}
let row_stream = store.load(&log_id, 0, 5).await.unwrap();
assert_eq!(row_stream.count().await, 5);
}
}