#[derive(Debug, thiserror::Error)]
pub enum EventStoreError {
#[error(
"concurrency conflict on stream '{stream_id}': expected version {expected}, found {actual}"
)]
ConcurrencyConflict {
stream_id: String,
expected: i64,
actual: i64,
},
#[error("stream not found: '{0}'")]
StreamNotFound(String),
#[error(
"failed to deserialize event at position {global_position} in stream '{stream_id}' (type: {event_type}): {source}"
)]
Deserialization {
stream_id: String,
global_position: i64,
event_type: String,
source: serde_json::Error,
},
#[error("event serialization error: {0}")]
Serialization(#[from] serde_json::Error),
#[error("database error: {0}")]
Database(#[from] sqlx::Error),
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum DbErrorKind {
Connection,
LockTimeout,
Deadlock,
UniqueViolation,
ForeignKeyViolation,
Other,
}
impl EventStoreError {
pub fn db_kind(&self) -> Option<DbErrorKind> {
let Self::Database(e) = self else {
return None;
};
match e {
sqlx::Error::Io(_) => return Some(DbErrorKind::Connection),
sqlx::Error::PoolTimedOut => return Some(DbErrorKind::Connection),
sqlx::Error::PoolClosed => return Some(DbErrorKind::Connection),
sqlx::Error::Tls(_) => return Some(DbErrorKind::Connection),
_ => {}
}
let code = e.as_database_error().and_then(|d| d.code());
Some(match code.as_deref() {
Some("55P03") => DbErrorKind::LockTimeout,
Some("40P01") => DbErrorKind::Deadlock,
Some("23505") => DbErrorKind::UniqueViolation,
Some("23503") => DbErrorKind::ForeignKeyViolation,
_ => DbErrorKind::Other,
})
}
pub fn is_retryable(&self) -> bool {
matches!(
self.db_kind(),
Some(DbErrorKind::Connection | DbErrorKind::Deadlock | DbErrorKind::LockTimeout)
)
}
}