1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
#[derive(Debug, thiserror::Error)]
pub enum EventStoreError {
#[error(
"concurrency conflict on stream '{stream_id}': expected version {expected}, found {actual}"
)]
ConcurrencyConflict {
stream_id: String,
expected: i64,
actual: i64,
},
#[error("stream not found: '{0}'")]
StreamNotFound(String),
#[error(
"failed to deserialize event at position {global_position} in stream '{stream_id}' (type: {event_type}): {source}"
)]
Deserialization {
stream_id: String,
global_position: i64,
event_type: String,
source: serde_json::Error,
},
#[error("event serialization error: {0}")]
Serialization(#[from] serde_json::Error),
/// The events read for a stream do not match its recorded
/// `stream_version`: a gap, a non-contiguous version sequence, or a
/// count/highest-version mismatch. This is a hard integrity failure
/// (truncated read, partial append, or corrupted log) — surfaced
/// rather than silently hydrating an aggregate from partial history,
/// which would let a command be validated against wrong state and
/// committed (review C1/SNAP-1).
#[error(
"stream '{stream_id}' is corrupt: recorded version {recorded_version} but {detail} (read {read_count} events)"
)]
StreamCorruption {
stream_id: String,
recorded_version: i64,
read_count: i64,
detail: String,
},
/// The always-on per-stream ordering backstop tripped (review
/// CORE-2/SUB-1): within one continuous delivery session, an event
/// arrived whose `stream_version` is not ahead of what this
/// subscription already delivered for the stream. Either the log/
/// checkpoint state is inconsistent (e.g. a pre-fix checkpoint that
/// passed a delivered-out-of-order event) or there is an ordering bug —
/// both mean a read model could silently regress, so delivery fails
/// loudly instead of handing the event to a handler.
#[error(
"ordering violation on subscription '{subscription_id}', stream '{stream_id}': \
event v{version} arrived but v{last_delivered} was already delivered \
(checkpoint/log inconsistency or ordering bug — refusing to corrupt the read model)"
)]
OrderingViolation {
subscription_id: String,
stream_id: String,
version: i64,
last_delivered: i64,
},
#[error("database error: {0}")]
Database(#[from] sqlx::Error),
}
/// Coarse classification of [`EventStoreError::Database`] for callers that
/// want to react differently to common failure modes without depending on
/// `sqlx` types. Use [`EventStoreError::db_kind`] to inspect.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum DbErrorKind {
/// Could not reach the database (network/TLS/IO problem). Retryable.
Connection,
/// Postgres `lock_not_available` (SQLSTATE `55P03`). The caller set
/// `lock_timeout` and a row lock didn't free in time.
LockTimeout,
/// Postgres `deadlock_detected` (SQLSTATE `40P01`). Retryable.
Deadlock,
/// Postgres `unique_violation` (SQLSTATE `23505`). Typically a bug
/// or genuine duplicate; not blindly retryable.
UniqueViolation,
/// Postgres `foreign_key_violation` (SQLSTATE `23503`).
ForeignKeyViolation,
/// Anything else — inspect the underlying [`sqlx::Error`] if you
/// need more.
Other,
}
impl EventStoreError {
/// Classify a [`Database`](EventStoreError::Database) variant into a
/// coarse bucket. Returns `None` for non-database variants. Lets
/// callers branch on common cases (retry vs surface) without
/// reaching into `sqlx::Error` internals.
///
/// ```rust,ignore
/// match store.save(&mut agg).await {
/// Err(e) if e.db_kind() == Some(DbErrorKind::LockTimeout) => retry_later(),
/// Err(e) => bail!("save failed: {e}"),
/// Ok(_) => {}
/// }
/// ```
pub fn db_kind(&self) -> Option<DbErrorKind> {
let Self::Database(e) = self else {
return None;
};
match e {
sqlx::Error::Io(_) => return Some(DbErrorKind::Connection),
sqlx::Error::PoolTimedOut => return Some(DbErrorKind::Connection),
sqlx::Error::PoolClosed => return Some(DbErrorKind::Connection),
sqlx::Error::Tls(_) => return Some(DbErrorKind::Connection),
_ => {}
}
let code = e.as_database_error().and_then(|d| d.code());
Some(match code.as_deref() {
Some("55P03") => DbErrorKind::LockTimeout,
Some("40P01") => DbErrorKind::Deadlock,
Some("23505") => DbErrorKind::UniqueViolation,
Some("23503") => DbErrorKind::ForeignKeyViolation,
_ => DbErrorKind::Other,
})
}
/// Is this error worth retrying? Returns `true` for transient DB
/// failures (connection issues, deadlocks) and `false` for
/// semantic failures (concurrency conflicts, unique violations,
/// serialization errors).
pub fn is_retryable(&self) -> bool {
matches!(
self.db_kind(),
Some(DbErrorKind::Connection | DbErrorKind::Deadlock | DbErrorKind::LockTimeout)
)
}
}