use std::time::Duration;
use eventcore_types::{
CheckpointStore, Event, EventFilter, EventPage, EventReader, EventStore, EventStoreError,
EventStreamReader, EventStreamSlice, Operation, ProjectorCoordinator, StreamId, StreamPosition,
StreamVersion, StreamWriteEntry, StreamWrites,
};
use nutype::nutype;
use serde_json::{Value, json};
use sqlx::types::Json;
use sqlx::{Pool, Postgres, Row, postgres::PgPoolOptions, query};
use thiserror::Error;
use tracing::{error, info, instrument, warn};
use uuid::Uuid;
#[derive(Debug, Error)]
pub enum PostgresEventStoreError {
#[error("failed to create postgres connection pool")]
ConnectionFailed(#[source] sqlx::Error),
}
#[nutype(derive(Debug, Clone, Copy, PartialEq, Eq, Display, AsRef, Into))]
pub struct MaxConnections(std::num::NonZeroU32);
#[derive(Debug, Clone)]
pub struct PostgresConfig {
pub max_connections: MaxConnections,
pub acquire_timeout: Duration,
pub idle_timeout: Duration,
}
impl Default for PostgresConfig {
fn default() -> Self {
const DEFAULT_MAX_CONNECTIONS: std::num::NonZeroU32 = match std::num::NonZeroU32::new(10) {
Some(v) => v,
None => unreachable!(),
};
Self {
max_connections: MaxConnections::new(DEFAULT_MAX_CONNECTIONS),
acquire_timeout: Duration::from_secs(30),
idle_timeout: Duration::from_secs(600), }
}
}
#[derive(Debug, Clone)]
pub struct PostgresEventStore {
pool: Pool<Postgres>,
}
impl PostgresEventStore {
pub async fn new<S: Into<String>>(
connection_string: S,
) -> Result<Self, PostgresEventStoreError> {
Self::with_config(connection_string, PostgresConfig::default()).await
}
pub async fn with_config<S: Into<String>>(
connection_string: S,
config: PostgresConfig,
) -> Result<Self, PostgresEventStoreError> {
let connection_string = connection_string.into();
let max_connections: std::num::NonZeroU32 = config.max_connections.into();
let pool = PgPoolOptions::new()
.max_connections(max_connections.get())
.acquire_timeout(config.acquire_timeout)
.idle_timeout(config.idle_timeout)
.connect(&connection_string)
.await
.map_err(PostgresEventStoreError::ConnectionFailed)?;
Ok(Self { pool })
}
pub fn from_pool(pool: Pool<Postgres>) -> Self {
Self { pool }
}
#[cfg_attr(test, mutants::skip)] pub async fn ping(&self) {
let _ = query("SELECT 1")
.execute(&self.pool)
.await
.expect("postgres ping failed");
}
#[cfg_attr(test, mutants::skip)] pub async fn migrate(&self) {
sqlx::migrate!("./migrations")
.run(&self.pool)
.await
.expect("postgres migration failed");
}
}
impl EventStore for PostgresEventStore {
#[instrument(name = "postgres.read_stream", skip(self))]
async fn read_stream<E: Event>(
&self,
stream_id: StreamId,
) -> Result<EventStreamReader<E>, EventStoreError> {
info!(
stream = %stream_id,
"[postgres.read_stream] reading events from postgres"
);
let rows = query(
"SELECT event_data FROM eventcore_events WHERE stream_id = $1 ORDER BY stream_version ASC",
)
.bind(stream_id.as_ref())
.fetch_all(&self.pool)
.await
.map_err(|error| map_sqlx_error(error, Operation::ReadStream))?;
let mut events = Vec::with_capacity(rows.len());
for row in rows {
let payload: Value = row
.try_get("event_data")
.map_err(|error| map_sqlx_error(error, Operation::ReadStream))?;
let event = serde_json::from_value(payload).map_err(|error| {
EventStoreError::DeserializationFailed {
stream_id: stream_id.clone(),
detail: error.to_string(),
}
})?;
events.push(event);
}
Ok(EventStreamReader::new(events))
}
#[instrument(name = "postgres.append_events", skip(self, writes))]
async fn append_events(
&self,
writes: StreamWrites,
) -> Result<EventStreamSlice, EventStoreError> {
let expected_versions = writes.expected_versions().clone();
let entries = writes.into_entries();
if entries.is_empty() {
return Ok(EventStreamSlice);
}
info!(
stream_count = expected_versions.len(),
event_count = entries.len(),
"[postgres.append_events] appending events to postgres"
);
let expected_versions_json: Value = expected_versions
.iter()
.map(|(stream_id, version)| {
(stream_id.as_ref().to_string(), json!(version.into_inner()))
})
.collect();
let mut tx = self
.pool
.begin()
.await
.map_err(|error| map_sqlx_error(error, Operation::BeginTransaction))?;
let _ = query("SELECT set_config('eventcore.expected_versions', $1, true)")
.bind(expected_versions_json.to_string())
.execute(&mut *tx)
.await
.map_err(|error| map_sqlx_error(error, Operation::SetExpectedVersions))?;
for entry in entries {
let StreamWriteEntry {
stream_id,
event_type,
event_data,
..
} = entry;
let event_id = Uuid::now_v7();
let _ = query(
"INSERT INTO eventcore_events (event_id, stream_id, event_type, event_data, metadata)
VALUES ($1, $2, $3, $4, $5)",
)
.bind(event_id)
.bind(stream_id.as_ref())
.bind(event_type)
.bind(Json(event_data))
.bind(Json(json!({})))
.execute(&mut *tx)
.await
.map_err(|error| map_sqlx_error(error, Operation::AppendEvents))?;
}
tx.commit()
.await
.map_err(|error| map_sqlx_error(error, Operation::CommitTransaction))?;
Ok(EventStreamSlice)
}
}
impl CheckpointStore for PostgresEventStore {
type Error = PostgresCheckpointError;
async fn load(&self, name: &str) -> Result<Option<StreamPosition>, Self::Error> {
let row = query("SELECT last_position FROM eventcore_subscription_versions WHERE subscription_name = $1")
.bind(name)
.fetch_optional(&self.pool)
.await
.map_err(PostgresCheckpointError::DatabaseError)?;
match row {
Some(row) => {
let position: Uuid = row.get("last_position");
Ok(Some(StreamPosition::new(position)))
}
None => Ok(None),
}
}
async fn save(&self, name: &str, position: StreamPosition) -> Result<(), Self::Error> {
let position_uuid: Uuid = position.into_inner();
let _ = query(
"INSERT INTO eventcore_subscription_versions (subscription_name, last_position, updated_at)
VALUES ($1, $2, NOW())
ON CONFLICT (subscription_name) DO UPDATE SET last_position = $2, updated_at = NOW()",
)
.bind(name)
.bind(position_uuid)
.execute(&self.pool)
.await
.map_err(PostgresCheckpointError::DatabaseError)?;
Ok(())
}
}
impl EventReader for PostgresEventStore {
type Error = EventStoreError;
async fn read_events<E: Event>(
&self,
filter: EventFilter,
page: EventPage,
) -> Result<Vec<(E, StreamPosition)>, Self::Error> {
let after_event_id: Option<Uuid> = page.after_position().map(|p| p.into_inner());
let limit: i64 = page.limit().into_inner() as i64;
let rows = if let Some(prefix) = filter.stream_prefix() {
let prefix_str = prefix.as_ref();
if let Some(after_id) = after_event_id {
let query_str = r#"
SELECT event_id, event_data, stream_id
FROM eventcore_events
WHERE event_id > $1
AND stream_id LIKE $2 || '%'
ORDER BY event_id
LIMIT $3
"#;
query(query_str)
.bind(after_id)
.bind(prefix_str)
.bind(limit)
.fetch_all(&self.pool)
.await
} else {
let query_str = r#"
SELECT event_id, event_data, stream_id
FROM eventcore_events
WHERE stream_id LIKE $1 || '%'
ORDER BY event_id
LIMIT $2
"#;
query(query_str)
.bind(prefix_str)
.bind(limit)
.fetch_all(&self.pool)
.await
}
} else if let Some(after_id) = after_event_id {
let query_str = r#"
SELECT event_id, event_data, stream_id
FROM eventcore_events
WHERE event_id > $1
ORDER BY event_id
LIMIT $2
"#;
query(query_str)
.bind(after_id)
.bind(limit)
.fetch_all(&self.pool)
.await
} else {
let query_str = r#"
SELECT event_id, event_data, stream_id
FROM eventcore_events
ORDER BY event_id
LIMIT $1
"#;
query(query_str).bind(limit).fetch_all(&self.pool).await
}
.map_err(|error| map_sqlx_error(error, Operation::ReadStream))?;
let events: Vec<(E, StreamPosition)> = rows
.into_iter()
.filter_map(|row| {
let event_data: Json<Value> = row.get("event_data");
let event_id: Uuid = row.get("event_id");
serde_json::from_value::<E>(event_data.0)
.ok()
.map(|e| (e, StreamPosition::new(event_id)))
})
.collect();
Ok(events)
}
}
fn map_sqlx_error(error: sqlx::Error, operation: Operation) -> EventStoreError {
if let sqlx::Error::Database(db_error) = &error {
let code = db_error.code();
let code_str = code.as_deref();
if code_str == Some("P0001") || code_str == Some("23505") {
warn!(
error = %db_error,
"[postgres.version_conflict] optimistic concurrency check failed"
);
return parse_version_conflict_from_db_error(db_error.message());
}
}
error!(
error = %error,
operation = %operation,
"[postgres.database_error] database operation failed"
);
EventStoreError::StoreFailure { operation }
}
fn parse_version_conflict_from_db_error(message: &str) -> EventStoreError {
if let Some(parsed) = try_parse_conflict_message(message) {
return parsed;
}
let fallback_stream_id =
StreamId::try_new("unknown-conflict-stream").expect("static stream id is valid");
EventStoreError::VersionConflict {
stream_id: fallback_stream_id,
expected: StreamVersion::new(0),
actual: StreamVersion::new(0),
}
}
fn try_parse_conflict_message(message: &str) -> Option<EventStoreError> {
let rest = message.strip_prefix("version_conflict: stream \"")?;
let stream_end = rest.find('"')?;
let stream_id_str = &rest[..stream_end];
let after_stream = &rest[stream_end..];
let expected_str = after_stream
.strip_prefix("\" expected version ")?
.split(',')
.next()?;
let actual_str = after_stream.rsplit("actual ").next()?;
let expected = expected_str.trim().parse::<usize>().ok()?;
let actual = actual_str.trim().parse::<usize>().ok()?;
let stream_id = StreamId::try_new(stream_id_str).ok()?;
Some(EventStoreError::VersionConflict {
stream_id,
expected: StreamVersion::new(expected),
actual: StreamVersion::new(actual),
})
}
#[derive(Debug, Error)]
pub enum PostgresCheckpointError {
#[error("failed to create postgres connection pool")]
ConnectionFailed(#[source] sqlx::Error),
#[error("database operation failed: {0}")]
DatabaseError(#[source] sqlx::Error),
}
#[derive(Debug, Clone)]
pub struct PostgresCheckpointStore {
pool: Pool<Postgres>,
}
impl PostgresCheckpointStore {
pub async fn new<S: Into<String>>(
connection_string: S,
) -> Result<Self, PostgresCheckpointError> {
Self::with_config(connection_string, PostgresConfig::default()).await
}
pub async fn with_config<S: Into<String>>(
connection_string: S,
config: PostgresConfig,
) -> Result<Self, PostgresCheckpointError> {
let connection_string = connection_string.into();
let max_connections: std::num::NonZeroU32 = config.max_connections.into();
let pool = PgPoolOptions::new()
.max_connections(max_connections.get())
.acquire_timeout(config.acquire_timeout)
.idle_timeout(config.idle_timeout)
.connect(&connection_string)
.await
.map_err(PostgresCheckpointError::ConnectionFailed)?;
sqlx::migrate!("./migrations")
.run(&pool)
.await
.map_err(|e| {
PostgresCheckpointError::DatabaseError(sqlx::Error::Migrate(Box::new(e)))
})?;
Ok(Self { pool })
}
pub fn from_pool(pool: Pool<Postgres>) -> Self {
Self { pool }
}
}
impl CheckpointStore for PostgresCheckpointStore {
type Error = PostgresCheckpointError;
async fn load(&self, name: &str) -> Result<Option<StreamPosition>, Self::Error> {
let row = query("SELECT last_position FROM eventcore_subscription_versions WHERE subscription_name = $1")
.bind(name)
.fetch_optional(&self.pool)
.await
.map_err(PostgresCheckpointError::DatabaseError)?;
match row {
Some(row) => {
let position: Uuid = row.get("last_position");
Ok(Some(StreamPosition::new(position)))
}
None => Ok(None),
}
}
async fn save(&self, name: &str, position: StreamPosition) -> Result<(), Self::Error> {
let position_uuid: Uuid = position.into_inner();
let _ = query(
"INSERT INTO eventcore_subscription_versions (subscription_name, last_position, updated_at)
VALUES ($1, $2, NOW())
ON CONFLICT (subscription_name) DO UPDATE SET last_position = $2, updated_at = NOW()",
)
.bind(name)
.bind(position_uuid)
.execute(&self.pool)
.await
.map_err(PostgresCheckpointError::DatabaseError)?;
Ok(())
}
}
#[derive(Debug, Error)]
pub enum CoordinationError {
#[error(
"leadership not acquired for subscription '{subscription_name}': another instance holds the lock"
)]
LeadershipNotAcquired { subscription_name: String },
#[error("database operation failed: {0}")]
DatabaseError(#[source] sqlx::Error),
}
pub struct CoordinationGuard {
lock_key: i64,
connection: Option<sqlx::pool::PoolConnection<Postgres>>,
}
impl std::fmt::Debug for CoordinationGuard {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("CoordinationGuard")
.field("lock_key", &self.lock_key)
.finish_non_exhaustive()
}
}
impl Drop for CoordinationGuard {
fn drop(&mut self) {
if let Some(mut connection) = self.connection.take() {
let lock_key = self.lock_key;
let handle = tokio::runtime::Handle::current();
let is_multi_thread =
handle.runtime_flavor() == tokio::runtime::RuntimeFlavor::MultiThread;
if is_multi_thread {
tokio::task::block_in_place(|| {
handle.block_on(async {
if let Err(e) = query("SELECT pg_advisory_unlock($1)")
.bind(lock_key)
.execute(&mut *connection)
.await
{
warn!(
lock_key = lock_key,
error = %e,
"failed to release advisory lock on drop"
);
}
});
});
} else {
drop(tokio::spawn(async move {
if let Err(e) = query("SELECT pg_advisory_unlock($1)")
.bind(lock_key)
.execute(&mut *connection)
.await
{
warn!(
lock_key = lock_key,
error = %e,
"failed to release advisory lock on drop (async)"
);
}
}));
}
}
}
}
#[derive(Debug, Clone)]
pub struct PostgresProjectorCoordinator {
pool: Pool<Postgres>,
}
impl PostgresProjectorCoordinator {
pub async fn new<S: Into<String>>(connection_string: S) -> Result<Self, CoordinationError> {
Self::with_config(connection_string, PostgresConfig::default()).await
}
pub async fn with_config<S: Into<String>>(
connection_string: S,
config: PostgresConfig,
) -> Result<Self, CoordinationError> {
let connection_string = connection_string.into();
let max_connections: std::num::NonZeroU32 = config.max_connections.into();
let pool = PgPoolOptions::new()
.max_connections(max_connections.get())
.acquire_timeout(config.acquire_timeout)
.idle_timeout(config.idle_timeout)
.connect(&connection_string)
.await
.map_err(CoordinationError::DatabaseError)?;
Ok(Self { pool })
}
pub fn from_pool(pool: Pool<Postgres>) -> Self {
Self { pool }
}
}
fn advisory_lock_key(subscription_name: &str) -> i64 {
const FNV_OFFSET_BASIS: u64 = 0xcbf29ce484222325;
const FNV_PRIME: u64 = 0x00000100000001B3;
let mut hash = FNV_OFFSET_BASIS;
for byte in subscription_name.as_bytes() {
hash ^= *byte as u64;
hash = hash.wrapping_mul(FNV_PRIME);
}
hash as i64
}
async fn try_acquire_advisory_lock(
pool: &Pool<Postgres>,
subscription_name: &str,
) -> Result<CoordinationGuard, CoordinationError> {
let lock_key = advisory_lock_key(subscription_name);
let mut connection = pool
.acquire()
.await
.map_err(CoordinationError::DatabaseError)?;
let row = query("SELECT pg_try_advisory_lock($1)")
.bind(lock_key)
.fetch_one(&mut *connection)
.await
.map_err(CoordinationError::DatabaseError)?;
let acquired: bool = row.get(0);
if acquired {
Ok(CoordinationGuard {
lock_key,
connection: Some(connection),
})
} else {
Err(CoordinationError::LeadershipNotAcquired {
subscription_name: subscription_name.to_string(),
})
}
}
impl ProjectorCoordinator for PostgresProjectorCoordinator {
type Error = CoordinationError;
type Guard = CoordinationGuard;
async fn try_acquire(&self, subscription_name: &str) -> Result<Self::Guard, Self::Error> {
try_acquire_advisory_lock(&self.pool, subscription_name).await
}
}
impl ProjectorCoordinator for PostgresEventStore {
type Error = CoordinationError;
type Guard = CoordinationGuard;
async fn try_acquire(&self, subscription_name: &str) -> Result<Self::Guard, Self::Error> {
try_acquire_advisory_lock(&self.pool, subscription_name).await
}
}