use std::time::Duration;
use eventcore_types::{
CheckpointStore, Event, EventFilter, EventPage, EventReader, EventStore, EventStoreError,
EventStream, EventStreamSlice, Operation, ProjectorCoordinator, StreamId, StreamPosition,
StreamVersion, StreamWrites,
};
use futures::StreamExt;
use nutype::nutype;
use serde_json::value::RawValue;
use serde_json::{Value, json};
use sqlx::types::Json;
use sqlx::{Pool, Postgres, QueryBuilder, Row, postgres::PgPoolOptions, query};
use thiserror::Error;
use tracing::{error, info, instrument, warn};
use uuid::Uuid;
#[derive(Debug, Error)]
pub enum PostgresEventStoreError {
#[error("failed to create postgres connection pool")]
ConnectionFailed(#[source] sqlx::Error),
}
#[nutype(derive(Debug, Clone, Copy, PartialEq, Eq, Display, AsRef, Into))]
pub struct MaxConnections(std::num::NonZeroU32);
#[derive(Debug, Clone)]
pub struct PostgresConfig {
pub max_connections: MaxConnections,
pub acquire_timeout: Duration,
pub idle_timeout: Duration,
}
impl Default for PostgresConfig {
fn default() -> Self {
const DEFAULT_MAX_CONNECTIONS: std::num::NonZeroU32 = match std::num::NonZeroU32::new(10) {
Some(v) => v,
None => unreachable!(),
};
Self {
max_connections: MaxConnections::new(DEFAULT_MAX_CONNECTIONS),
acquire_timeout: Duration::from_secs(30),
idle_timeout: Duration::from_secs(600), }
}
}
#[derive(Debug, Clone)]
pub struct PostgresEventStore {
pool: Pool<Postgres>,
}
impl PostgresEventStore {
pub async fn new<S: Into<String>>(
connection_string: S,
) -> Result<Self, PostgresEventStoreError> {
Self::with_config(connection_string, PostgresConfig::default()).await
}
pub async fn with_config<S: Into<String>>(
connection_string: S,
config: PostgresConfig,
) -> Result<Self, PostgresEventStoreError> {
let connection_string = connection_string.into();
let max_connections: std::num::NonZeroU32 = config.max_connections.into();
let pool = PgPoolOptions::new()
.max_connections(max_connections.get())
.acquire_timeout(config.acquire_timeout)
.idle_timeout(config.idle_timeout)
.connect(&connection_string)
.await
.map_err(PostgresEventStoreError::ConnectionFailed)?;
Ok(Self { pool })
}
pub fn from_pool(pool: Pool<Postgres>) -> Self {
Self { pool }
}
#[cfg_attr(test, mutants::skip)] pub async fn ping(&self) {
let _ = query("SELECT 1")
.execute(&self.pool)
.await
.expect("postgres ping failed");
}
#[cfg_attr(test, mutants::skip)] pub async fn migrate(&self) {
sqlx::migrate!("./migrations")
.run(&self.pool)
.await
.expect("postgres migration failed");
}
}
impl EventStore for PostgresEventStore {
#[instrument(name = "postgres.read_stream", skip(self))]
async fn read_stream<E: Event>(
&self,
stream_id: StreamId,
) -> Result<EventStream<E>, EventStoreError> {
info!(
stream = %stream_id,
"[postgres.read_stream] reading events from postgres"
);
let pool = self.pool.clone();
let stream = async_stream::stream! {
let mut rows = query(
"SELECT event_data FROM eventcore_events WHERE stream_id = $1 ORDER BY stream_version ASC",
)
.bind(stream_id.as_ref())
.fetch(&pool);
while let Some(row) = rows.next().await {
let row = match row {
Ok(row) => row,
Err(error) => {
yield Err(map_sqlx_error(error, Operation::ReadStream));
break;
}
};
let payload: Value = match row.try_get("event_data") {
Ok(payload) => payload,
Err(error) => {
yield Err(map_sqlx_error(error, Operation::ReadStream));
break;
}
};
match serde_json::from_value::<E>(payload) {
Ok(event) => yield Ok(event),
Err(error) => {
yield Err(EventStoreError::DeserializationFailed {
stream_id: stream_id.clone(),
detail: error.to_string(),
});
break;
}
}
}
};
Ok(EventStream::new(stream))
}
#[instrument(name = "postgres.append_events", skip(self, writes))]
async fn append_events(
&self,
writes: StreamWrites,
) -> Result<EventStreamSlice, EventStoreError> {
let expected_versions = writes.expected_versions().clone();
let entries = writes.into_entries();
if entries.is_empty() {
return Ok(EventStreamSlice);
}
info!(
stream_count = expected_versions.len(),
event_count = entries.len(),
"[postgres.append_events] appending events to postgres"
);
let expected_versions_json: Value = expected_versions
.iter()
.map(|(stream_id, version)| {
(stream_id.as_ref().to_string(), json!(version.into_inner()))
})
.collect();
let mut tx = self
.pool
.begin()
.await
.map_err(|error| map_sqlx_error(error, Operation::BeginTransaction))?;
let _ = query("SELECT set_config('eventcore.expected_versions', $1, true)")
.bind(expected_versions_json.to_string())
.execute(&mut *tx)
.await
.map_err(|error| map_sqlx_error(error, Operation::SetExpectedVersions))?;
let rows: Vec<(StreamId, &'static str, Box<RawValue>)> = entries
.into_iter()
.map(|entry| (entry.stream_id, entry.event_type, entry.event_data))
.collect();
const MAX_EVENTS_PER_INSERT: usize = 1000;
for chunk in rows.chunks(MAX_EVENTS_PER_INSERT) {
let mut builder = QueryBuilder::<Postgres>::new(
"INSERT INTO eventcore_events (event_id, stream_id, event_type, event_data, metadata) ",
);
let _ = builder.push_values(chunk, |mut row, (stream_id, event_type, event_data)| {
let _ = row
.push_bind(Uuid::now_v7())
.push_bind(stream_id.as_ref())
.push_bind(*event_type)
.push_bind(Json(event_data))
.push_bind(Json(json!({})));
});
let _ = builder
.build()
.execute(&mut *tx)
.await
.map_err(|error| map_sqlx_error(error, Operation::AppendEvents))?;
}
tx.commit()
.await
.map_err(|error| map_sqlx_error(error, Operation::CommitTransaction))?;
Ok(EventStreamSlice)
}
}
impl CheckpointStore for PostgresEventStore {
type Error = PostgresCheckpointError;
async fn load(&self, name: &str) -> Result<Option<StreamPosition>, Self::Error> {
let row = query("SELECT last_position FROM eventcore_subscription_versions WHERE subscription_name = $1")
.bind(name)
.fetch_optional(&self.pool)
.await
.map_err(PostgresCheckpointError::DatabaseError)?;
match row {
Some(row) => {
let position: Uuid = row.get("last_position");
Ok(Some(StreamPosition::new(position)))
}
None => Ok(None),
}
}
async fn save(&self, name: &str, position: StreamPosition) -> Result<(), Self::Error> {
let position_uuid: Uuid = position.into_inner();
let _ = query(
"INSERT INTO eventcore_subscription_versions (subscription_name, last_position, updated_at)
VALUES ($1, $2, NOW())
ON CONFLICT (subscription_name) DO UPDATE SET last_position = $2, updated_at = NOW()",
)
.bind(name)
.bind(position_uuid)
.execute(&self.pool)
.await
.map_err(PostgresCheckpointError::DatabaseError)?;
Ok(())
}
}
impl EventReader for PostgresEventStore {
type Error = EventStoreError;
async fn read_events<E: Event>(
&self,
filter: EventFilter,
page: EventPage,
) -> Result<Vec<(E, StreamPosition)>, Self::Error> {
let after_event_id: Option<Uuid> = page.after_position().map(|p| p.into_inner());
let limit: i64 = page.limit().into_inner() as i64;
let type_filter = filter.event_type().unwrap_or_else(|| E::event_type_name());
let pattern_regex = filter
.stream_pattern()
.map(|p| glob_to_anchored_regex(p.as_ref()));
let mut builder = QueryBuilder::<Postgres>::new(
"SELECT event_id, event_data, stream_id FROM eventcore_events WHERE event_type = ",
);
let _ = builder.push_bind(type_filter);
if let Some(after_id) = after_event_id {
let _ = builder.push(" AND event_id > ").push_bind(after_id);
}
if let Some(prefix) = filter.stream_prefix() {
let _ = builder
.push(" AND stream_id LIKE ")
.push_bind(prefix.as_ref().to_string())
.push(" || '%'");
} else if let Some(regex) = pattern_regex {
let _ = builder.push(" AND stream_id ~ ").push_bind(regex);
}
let _ = builder.push(" ORDER BY event_id LIMIT ").push_bind(limit);
let rows = builder
.build()
.fetch_all(&self.pool)
.await
.map_err(|error| map_sqlx_error(error, Operation::ReadStream))?;
let events: Vec<(E, StreamPosition)> = rows
.into_iter()
.filter_map(|row| {
let event_data: Json<Value> = row.get("event_data");
let event_id: Uuid = row.get("event_id");
serde_json::from_value::<E>(event_data.0)
.ok()
.map(|e| (e, StreamPosition::new(event_id)))
})
.collect();
Ok(events)
}
}
fn glob_to_anchored_regex(glob: &str) -> String {
let mut regex = String::with_capacity(glob.len() + 2);
regex.push('^');
let mut chars = glob.chars().peekable();
while let Some(c) = chars.next() {
match c {
'*' => regex.push_str(".*"),
'?' => regex.push('.'),
'[' => {
let mut class = String::new();
let mut closed = false;
if matches!(chars.peek(), Some('!')) {
let _ = chars.next();
class.push('^');
}
for inner in chars.by_ref() {
if inner == ']' {
closed = true;
break;
}
class.push(inner);
}
if closed {
regex.push('[');
regex.push_str(&class);
regex.push(']');
} else {
regex.push_str(®ex_escape("["));
regex.push_str(®ex_escape(&class));
}
}
other => regex.push_str(®ex_escape(&other.to_string())),
}
}
regex.push('$');
regex
}
fn regex_escape(literal: &str) -> String {
const METACHARACTERS: &[char] = &[
'.', '^', '$', '*', '+', '?', '(', ')', '[', ']', '{', '}', '|', '\\',
];
let mut escaped = String::with_capacity(literal.len());
for c in literal.chars() {
if METACHARACTERS.contains(&c) {
escaped.push('\\');
}
escaped.push(c);
}
escaped
}
fn map_sqlx_error(error: sqlx::Error, operation: Operation) -> EventStoreError {
if let sqlx::Error::Database(db_error) = &error {
let code = db_error.code();
let code_str = code.as_deref();
if code_str == Some("P0001") || code_str == Some("23505") {
warn!(
error = %db_error,
"[postgres.version_conflict] optimistic concurrency check failed"
);
return parse_version_conflict_from_db_error(db_error.message());
}
}
error!(
error = %error,
operation = %operation,
"[postgres.database_error] database operation failed"
);
EventStoreError::StoreFailure { operation }
}
fn parse_version_conflict_from_db_error(message: &str) -> EventStoreError {
if let Some(parsed) = try_parse_conflict_message(message) {
return parsed;
}
let fallback_stream_id =
StreamId::try_new("unknown-conflict-stream").expect("static stream id is valid");
EventStoreError::VersionConflict {
stream_id: fallback_stream_id,
expected: StreamVersion::new(0),
actual: StreamVersion::new(0),
}
}
fn try_parse_conflict_message(message: &str) -> Option<EventStoreError> {
let rest = message.strip_prefix("version_conflict: stream \"")?;
let stream_end = rest.find('"')?;
let stream_id_str = &rest[..stream_end];
let after_stream = &rest[stream_end..];
let expected_str = after_stream
.strip_prefix("\" expected version ")?
.split(',')
.next()?;
let actual_str = after_stream.rsplit("actual ").next()?;
let expected = expected_str.trim().parse::<usize>().ok()?;
let actual = actual_str.trim().parse::<usize>().ok()?;
let stream_id = StreamId::try_new(stream_id_str).ok()?;
Some(EventStoreError::VersionConflict {
stream_id,
expected: StreamVersion::new(expected),
actual: StreamVersion::new(actual),
})
}
#[derive(Debug, Error)]
pub enum PostgresCheckpointError {
#[error("failed to create postgres connection pool")]
ConnectionFailed(#[source] sqlx::Error),
#[error("database operation failed: {0}")]
DatabaseError(#[source] sqlx::Error),
}
#[derive(Debug, Clone)]
pub struct PostgresCheckpointStore {
pool: Pool<Postgres>,
}
impl PostgresCheckpointStore {
pub async fn new<S: Into<String>>(
connection_string: S,
) -> Result<Self, PostgresCheckpointError> {
Self::with_config(connection_string, PostgresConfig::default()).await
}
pub async fn with_config<S: Into<String>>(
connection_string: S,
config: PostgresConfig,
) -> Result<Self, PostgresCheckpointError> {
let connection_string = connection_string.into();
let max_connections: std::num::NonZeroU32 = config.max_connections.into();
let pool = PgPoolOptions::new()
.max_connections(max_connections.get())
.acquire_timeout(config.acquire_timeout)
.idle_timeout(config.idle_timeout)
.connect(&connection_string)
.await
.map_err(PostgresCheckpointError::ConnectionFailed)?;
sqlx::migrate!("./migrations")
.run(&pool)
.await
.map_err(|e| {
PostgresCheckpointError::DatabaseError(sqlx::Error::Migrate(Box::new(e)))
})?;
Ok(Self { pool })
}
pub fn from_pool(pool: Pool<Postgres>) -> Self {
Self { pool }
}
}
impl CheckpointStore for PostgresCheckpointStore {
type Error = PostgresCheckpointError;
async fn load(&self, name: &str) -> Result<Option<StreamPosition>, Self::Error> {
let row = query("SELECT last_position FROM eventcore_subscription_versions WHERE subscription_name = $1")
.bind(name)
.fetch_optional(&self.pool)
.await
.map_err(PostgresCheckpointError::DatabaseError)?;
match row {
Some(row) => {
let position: Uuid = row.get("last_position");
Ok(Some(StreamPosition::new(position)))
}
None => Ok(None),
}
}
async fn save(&self, name: &str, position: StreamPosition) -> Result<(), Self::Error> {
let position_uuid: Uuid = position.into_inner();
let _ = query(
"INSERT INTO eventcore_subscription_versions (subscription_name, last_position, updated_at)
VALUES ($1, $2, NOW())
ON CONFLICT (subscription_name) DO UPDATE SET last_position = $2, updated_at = NOW()",
)
.bind(name)
.bind(position_uuid)
.execute(&self.pool)
.await
.map_err(PostgresCheckpointError::DatabaseError)?;
Ok(())
}
}
#[derive(Debug, Error)]
pub enum CoordinationError {
#[error(
"leadership not acquired for subscription '{subscription_name}': another instance holds the lock"
)]
LeadershipNotAcquired { subscription_name: String },
#[error("database operation failed: {0}")]
DatabaseError(#[source] sqlx::Error),
}
pub struct CoordinationGuard {
lock_key: i64,
connection: Option<sqlx::pool::PoolConnection<Postgres>>,
}
impl std::fmt::Debug for CoordinationGuard {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("CoordinationGuard")
.field("lock_key", &self.lock_key)
.finish_non_exhaustive()
}
}
impl Drop for CoordinationGuard {
fn drop(&mut self) {
if let Some(mut connection) = self.connection.take() {
let lock_key = self.lock_key;
let handle = tokio::runtime::Handle::current();
let is_multi_thread =
handle.runtime_flavor() == tokio::runtime::RuntimeFlavor::MultiThread;
if is_multi_thread {
tokio::task::block_in_place(|| {
handle.block_on(async {
if let Err(e) = query("SELECT pg_advisory_unlock($1)")
.bind(lock_key)
.execute(&mut *connection)
.await
{
warn!(
lock_key = lock_key,
error = %e,
"failed to release advisory lock on drop"
);
}
});
});
} else {
drop(tokio::spawn(async move {
if let Err(e) = query("SELECT pg_advisory_unlock($1)")
.bind(lock_key)
.execute(&mut *connection)
.await
{
warn!(
lock_key = lock_key,
error = %e,
"failed to release advisory lock on drop (async)"
);
}
}));
}
}
}
}
#[derive(Debug, Clone)]
pub struct PostgresProjectorCoordinator {
pool: Pool<Postgres>,
}
impl PostgresProjectorCoordinator {
pub async fn new<S: Into<String>>(connection_string: S) -> Result<Self, CoordinationError> {
Self::with_config(connection_string, PostgresConfig::default()).await
}
pub async fn with_config<S: Into<String>>(
connection_string: S,
config: PostgresConfig,
) -> Result<Self, CoordinationError> {
let connection_string = connection_string.into();
let max_connections: std::num::NonZeroU32 = config.max_connections.into();
let pool = PgPoolOptions::new()
.max_connections(max_connections.get())
.acquire_timeout(config.acquire_timeout)
.idle_timeout(config.idle_timeout)
.connect(&connection_string)
.await
.map_err(CoordinationError::DatabaseError)?;
Ok(Self { pool })
}
pub fn from_pool(pool: Pool<Postgres>) -> Self {
Self { pool }
}
}
fn advisory_lock_key(subscription_name: &str) -> i64 {
const FNV_OFFSET_BASIS: u64 = 0xcbf29ce484222325;
const FNV_PRIME: u64 = 0x00000100000001B3;
let mut hash = FNV_OFFSET_BASIS;
for byte in subscription_name.as_bytes() {
hash ^= *byte as u64;
hash = hash.wrapping_mul(FNV_PRIME);
}
hash as i64
}
async fn try_acquire_advisory_lock(
pool: &Pool<Postgres>,
subscription_name: &str,
) -> Result<CoordinationGuard, CoordinationError> {
let lock_key = advisory_lock_key(subscription_name);
let mut connection = pool
.acquire()
.await
.map_err(CoordinationError::DatabaseError)?;
let row = query("SELECT pg_try_advisory_lock($1)")
.bind(lock_key)
.fetch_one(&mut *connection)
.await
.map_err(CoordinationError::DatabaseError)?;
let acquired: bool = row.get(0);
if acquired {
Ok(CoordinationGuard {
lock_key,
connection: Some(connection),
})
} else {
Err(CoordinationError::LeadershipNotAcquired {
subscription_name: subscription_name.to_string(),
})
}
}
impl ProjectorCoordinator for PostgresProjectorCoordinator {
type Error = CoordinationError;
type Guard = CoordinationGuard;
async fn try_acquire(&self, subscription_name: &str) -> Result<Self::Guard, Self::Error> {
try_acquire_advisory_lock(&self.pool, subscription_name).await
}
}
impl ProjectorCoordinator for PostgresEventStore {
type Error = CoordinationError;
type Guard = CoordinationGuard;
async fn try_acquire(&self, subscription_name: &str) -> Result<Self::Guard, Self::Error> {
try_acquire_advisory_lock(&self.pool, subscription_name).await
}
}
#[cfg(test)]
mod tests {
use super::{glob_to_anchored_regex, regex_escape};
#[test]
fn star_translates_to_dot_star_anchored() {
assert_eq!(glob_to_anchored_regex("account-*"), "^account-.*$");
}
#[test]
fn question_mark_translates_to_dot() {
assert_eq!(glob_to_anchored_regex("account-?"), "^account-.$");
}
#[test]
fn character_class_is_preserved() {
assert_eq!(
glob_to_anchored_regex("account-[0-9]*"),
"^account-[0-9].*$"
);
}
#[test]
fn negated_character_class_uses_caret() {
assert_eq!(glob_to_anchored_regex("account-[!0-9]"), "^account-[^0-9]$");
}
#[test]
fn literal_regex_metacharacters_are_escaped() {
assert_eq!(glob_to_anchored_regex("a.c+(d)"), "^a\\.c\\+\\(d\\)$");
}
#[test]
fn regex_escape_escapes_all_metacharacters() {
assert_eq!(
regex_escape(".^$*+?()[]{}|\\"),
"\\.\\^\\$\\*\\+\\?\\(\\)\\[\\]\\{\\}\\|\\\\"
);
}
}