use std::time::Duration;
use mire::{
Aggregate, EventData, EventHandler, EventStore, ExpectedVersion, HandledEvent,
ProjectionRunner, Snapshot, Subscription,
};
use serde::{Deserialize, Serialize};
use sqlx::PgPool;
use tokio_util::sync::CancellationToken;
use uuid::Uuid;
mod common;
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "type")]
enum CounterEvent {
Incremented { by: i64 },
}
impl EventData for CounterEvent {
fn event_type(&self) -> &'static str {
match self {
CounterEvent::Incremented { .. } => "counter.incremented",
}
}
}
#[derive(Debug, Default, Serialize, Deserialize)]
struct Counter {
total: i64,
}
impl Aggregate for Counter {
type Event = CounterEvent;
fn stream_category() -> &'static str {
"counter"
}
fn apply(&mut self, event: &CounterEvent) {
match event {
CounterEvent::Incremented { by } => self.total += by,
}
}
}
impl Snapshot for Counter {
const SNAPSHOT_VERSION: i32 = 1;
const SNAPSHOT_FREQUENCY: i64 = 2;
}
async fn store() -> Option<EventStore> {
common::store().await
}
#[tokio::test]
async fn records_and_rebuilds_from_the_log() {
let Some(store) = store().await else {
eprintln!("skipping: DATABASE_URL not set");
return;
};
let id = Uuid::new_v4().to_string();
let mut counter = store.load_or_default::<Counter>(&id).await.unwrap();
counter.record(CounterEvent::Incremented { by: 3 });
counter.record(CounterEvent::Incremented { by: 7 });
store.save(&mut counter).await.unwrap();
assert_eq!(counter.version, 2);
let reloaded = store.load::<Counter>(&id).await.unwrap().unwrap();
assert_eq!(reloaded.state.total, 10);
assert_eq!(reloaded.version, 2);
}
#[tokio::test]
async fn rejects_stale_writes() {
let Some(store) = store().await else {
eprintln!("skipping: DATABASE_URL not set");
return;
};
let id = Uuid::new_v4().to_string();
let mut counter = store.load_or_default::<Counter>(&id).await.unwrap();
counter.record(CounterEvent::Incremented { by: 1 });
store.save(&mut counter).await.unwrap();
let result = store
.append::<CounterEvent>(
&counter.stream_id,
Counter::stream_category(),
ExpectedVersion::Exact(0), &[mire::Event::new(CounterEvent::Incremented { by: 1 })],
)
.await;
assert!(matches!(
result,
Err(mire::EventStoreError::ConcurrencyConflict { .. })
));
}
#[tokio::test]
async fn no_stream_on_existing_returns_actual_version() {
let Some(store) = store().await else {
eprintln!("skipping: DATABASE_URL not set");
return;
};
let id = Uuid::new_v4().to_string();
let mut counter = store.load_or_default::<Counter>(&id).await.unwrap();
counter.record(CounterEvent::Incremented { by: 1 });
counter.record(CounterEvent::Incremented { by: 1 });
counter.record(CounterEvent::Incremented { by: 1 });
counter.record(CounterEvent::Incremented { by: 1 });
counter.record(CounterEvent::Incremented { by: 1 });
store.save(&mut counter).await.unwrap();
assert_eq!(counter.version, 5);
let result = store
.append::<CounterEvent>(
&counter.stream_id,
Counter::stream_category(),
ExpectedVersion::NoStream,
&[mire::Event::new(CounterEvent::Incremented { by: 1 })],
)
.await;
match result {
Err(mire::EventStoreError::ConcurrencyConflict {
expected, actual, ..
}) => {
assert_eq!(expected, 0);
assert_eq!(actual, 5, "NoStream on existing must surface actual=5");
}
other => panic!("expected ConcurrencyConflict, got: {other:?}"),
}
}
#[tokio::test(flavor = "multi_thread", worker_threads = 8)]
async fn parallel_any_appends_produce_contiguous_versions() {
let Some(store) = store().await else {
eprintln!("skipping: DATABASE_URL not set");
return;
};
let id = Uuid::new_v4().to_string();
let stream_id = format!("counter-{id}");
let n_writers: i64 = 100;
let mut handles = Vec::new();
for _ in 0..n_writers {
let store = store.clone();
let stream_id = stream_id.clone();
handles.push(tokio::spawn(async move {
store
.append::<CounterEvent>(
&stream_id,
Counter::stream_category(),
ExpectedVersion::Any,
&[mire::Event::new(CounterEvent::Incremented { by: 1 })],
)
.await
.expect("Any must never produce ConcurrencyConflict")
}));
}
for h in handles {
h.await.unwrap();
}
let events = store
.read_stream(&stream_id, mire::StreamQuery::default())
.await
.unwrap();
let mut versions: Vec<i64> = events.iter().map(|e| e.stream_version).collect();
versions.sort();
let expected: Vec<i64> = (1..=n_writers).collect();
assert_eq!(versions, expected, "version range must be contiguous 1..=N");
}
#[tokio::test(flavor = "multi_thread", worker_threads = 8)]
async fn exact_contention_never_violates_unique_constraint() {
let Some(url) = std::env::var("DATABASE_URL").ok() else {
eprintln!("skipping: DATABASE_URL not set");
return;
};
let pool = sqlx::postgres::PgPoolOptions::new()
.max_connections(40)
.connect(&url)
.await
.expect("connect");
let store = common::store_with_pool(pool).await;
let id = Uuid::new_v4().to_string();
let stream_id = format!("counter-{id}");
let writers: i64 = 16;
let events_per_writer: i64 = 4;
let mut handles = Vec::new();
for _ in 0..writers {
let store = store.clone();
let stream_id = stream_id.clone();
handles.push(tokio::spawn(async move {
let mut wins: i64 = 0;
while wins < events_per_writer {
let row: Option<i64> = sqlx::query_scalar(
"SELECT stream_version FROM es_streams WHERE stream_id = $1",
)
.bind(&stream_id)
.fetch_optional(store.pool())
.await
.unwrap();
let expected_version = row.unwrap_or(0);
let expected = if expected_version == 0 {
ExpectedVersion::NoStream
} else {
ExpectedVersion::Exact(expected_version)
};
match store
.append::<CounterEvent>(
&stream_id,
Counter::stream_category(),
expected,
&[mire::Event::new(CounterEvent::Incremented { by: 1 })],
)
.await
{
Ok(_) => wins += 1,
Err(mire::EventStoreError::ConcurrencyConflict { .. }) => {}
Err(mire::EventStoreError::Database(e)) => {
if e.as_database_error().and_then(|d| d.code()).as_deref() == Some("23505")
{
panic!(
"UNIQUE(stream_id, stream_version) violation under \
contended Exact(v) — CAS allowed a duplicate slot: {e}"
);
}
panic!("unexpected DB error: {e}");
}
Err(e) => panic!("unexpected error: {e}"),
}
}
}));
}
for h in handles {
h.await.unwrap();
}
let row: i64 = sqlx::query_scalar("SELECT stream_version FROM es_streams WHERE stream_id = $1")
.bind(&stream_id)
.fetch_one(store.pool())
.await
.unwrap();
assert_eq!(row, writers * events_per_writer);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn long_append_does_not_block_concurrent_reader() {
let Some(store) = store().await else {
eprintln!("skipping: DATABASE_URL not set");
return;
};
let id = Uuid::new_v4().to_string();
let stream_id = format!("counter-{id}");
let mut counter = store.load_or_default::<Counter>(&id).await.unwrap();
counter.record(CounterEvent::Incremented { by: 1 });
store.save(&mut counter).await.unwrap();
let writer_store = store.clone();
let writer_stream_id = stream_id.clone();
let writer = tokio::spawn(async move {
let mut tx = writer_store.pool().begin().await.unwrap();
mire::EventStore::append_in_tx::<CounterEvent>(
&mut tx,
&writer_stream_id,
Counter::stream_category(),
ExpectedVersion::Exact(1),
&[mire::Event::new(CounterEvent::Incremented { by: 1 })],
)
.await
.unwrap();
tokio::time::sleep(Duration::from_secs(1)).await;
tx.commit().await.unwrap();
});
tokio::time::sleep(Duration::from_millis(200)).await;
let reader_start = tokio::time::Instant::now();
let _row: i64 =
sqlx::query_scalar("SELECT stream_version FROM es_streams WHERE stream_id = $1")
.bind(&stream_id)
.fetch_one(store.pool())
.await
.unwrap();
let reader_elapsed = reader_start.elapsed();
assert!(
reader_elapsed < Duration::from_millis(500),
"reader was blocked by an in-flight append for {reader_elapsed:?} — \
lock contract is broken",
);
writer.await.unwrap();
}
#[tokio::test]
async fn snapshot_seeds_then_replays_tail() {
let Some(store) = store().await else {
eprintln!("skipping: DATABASE_URL not set");
return;
};
let id = Uuid::new_v4().to_string();
let mut counter = store.load_or_default::<Counter>(&id).await.unwrap();
counter.record(CounterEvent::Incremented { by: 1 });
counter.record(CounterEvent::Incremented { by: 2 });
store.save_snapshotting(&mut counter).await.unwrap();
counter.record(CounterEvent::Incremented { by: 3 });
store.save_snapshotting(&mut counter).await.unwrap();
let snapshot_events = store
.read_stream(
&format!("{}-snapshot", counter.stream_id),
mire::StreamQuery::default(),
)
.await
.unwrap();
assert!(!snapshot_events.is_empty(), "a snapshot should be written");
let envelope = &snapshot_events.last().unwrap().data;
assert_eq!(envelope["version"], 2);
let loaded = store
.load_snapshotted::<Counter>(&id)
.await
.unwrap()
.unwrap();
assert_eq!(loaded.state.total, 6);
assert_eq!(loaded.version, 3);
}
#[derive(Debug, Default, Serialize, Deserialize)]
struct SnapV1 {
total: i64,
}
impl Aggregate for SnapV1 {
type Event = CounterEvent;
fn stream_category() -> &'static str {
"snapver-test"
}
fn apply(&mut self, e: &CounterEvent) {
let CounterEvent::Incremented { by } = e;
self.total += by;
}
}
impl Snapshot for SnapV1 {
const SNAPSHOT_VERSION: i32 = 1;
const SNAPSHOT_FREQUENCY: i64 = 2;
}
#[derive(Debug, Default, Serialize, Deserialize)]
struct SnapV2 {
total: i64,
}
impl Aggregate for SnapV2 {
type Event = CounterEvent;
fn stream_category() -> &'static str {
"snapver-test"
}
fn apply(&mut self, e: &CounterEvent) {
let CounterEvent::Incremented { by } = e;
self.total += by * 10; }
}
impl Snapshot for SnapV2 {
const SNAPSHOT_VERSION: i32 = 2;
const SNAPSHOT_FREQUENCY: i64 = 2;
}
#[tokio::test]
async fn snapshot_from_an_older_version_is_ignored_not_deserialized() {
let Some(store) = store().await else {
eprintln!("skipping: DATABASE_URL not set");
return;
};
let id = Uuid::new_v4().to_string();
let mut v1 = store.load_or_default::<SnapV1>(&id).await.unwrap();
v1.record(CounterEvent::Incremented { by: 1 });
v1.record(CounterEvent::Incremented { by: 2 });
store.save_snapshotting(&mut v1).await.unwrap();
assert_eq!(v1.state.total, 3);
let loaded = store
.load_snapshotted::<SnapV2>(&id)
.await
.unwrap()
.unwrap();
assert_eq!(
loaded.state.total, 30,
"a snapshot from an older SNAPSHOT_VERSION must be ignored and the log replayed"
);
assert_eq!(loaded.version, 2);
}
#[derive(Debug, Default, Serialize, Deserialize)]
struct SnapWriter {
total: i64,
}
impl Aggregate for SnapWriter {
type Event = CounterEvent;
fn stream_category() -> &'static str {
"snapcorrupt-test"
}
fn apply(&mut self, e: &CounterEvent) {
let CounterEvent::Incremented { by } = e;
self.total += by;
}
}
impl Snapshot for SnapWriter {
const SNAPSHOT_VERSION: i32 = 1;
const SNAPSHOT_FREQUENCY: i64 = 2;
}
#[derive(Debug, Default, Serialize, Deserialize)]
struct SnapReader {
total: i64,
required_new_field: String,
}
impl Aggregate for SnapReader {
type Event = CounterEvent;
fn stream_category() -> &'static str {
"snapcorrupt-test"
}
fn apply(&mut self, e: &CounterEvent) {
let CounterEvent::Incremented { by } = e;
self.total += by * 7;
}
}
impl Snapshot for SnapReader {
const SNAPSHOT_VERSION: i32 = 1;
const SNAPSHOT_FREQUENCY: i64 = 2;
}
#[tokio::test]
async fn snapshot_that_fails_to_deserialize_falls_back_to_replay() {
let Some(store) = store().await else {
eprintln!("skipping: DATABASE_URL not set");
return;
};
let id = Uuid::new_v4().to_string();
let mut w = store.load_or_default::<SnapWriter>(&id).await.unwrap();
w.record(CounterEvent::Incremented { by: 1 });
w.record(CounterEvent::Incremented { by: 2 });
store.save_snapshotting(&mut w).await.unwrap();
let loaded = store
.load_snapshotted::<SnapReader>(&id)
.await
.expect("an undeserializable snapshot must not error the load")
.unwrap();
assert_eq!(
loaded.state.total, 21,
"an incompatible snapshot must fall back to a full replay"
);
assert_eq!(loaded.version, 2);
}
#[tokio::test]
async fn watermark_does_not_skip_in_flight_lower_transactions() {
let Some(store) = store().await else {
eprintln!("skipping: DATABASE_URL not set");
return;
};
let pool = store.pool().clone();
let mut tx = store.begin_transaction().await.unwrap();
let stream_a = format!("counter-{}", Uuid::new_v4());
tx.append::<CounterEvent>(
&stream_a,
"counter",
ExpectedVersion::Any,
&[mire::Event::new(CounterEvent::Incremented { by: 1 })],
)
.await
.unwrap();
let id_b = Uuid::new_v4().to_string();
let mut b = store.load_or_default::<Counter>(&id_b).await.unwrap();
b.record(CounterEvent::Incremented { by: 2 });
store.save(&mut b).await.unwrap();
let mut sub = Subscription::create(
store.clone(),
pool.clone(),
format!("watermark-{}", Uuid::new_v4()),
500,
)
.await
.unwrap();
let before = sub.poll_category("counter").await.unwrap();
assert!(
before.iter().all(|e| e.stream_id != b.stream_id),
"must not consume past an in-flight lower transaction"
);
tx.commit().await.unwrap();
let mut sub2 = Subscription::create(
store.clone(),
pool.clone(),
format!("watermark2-{}", Uuid::new_v4()),
500,
)
.await
.unwrap();
let (mut seen_a, mut seen_b) = (false, false);
for _ in 0..400 {
for e in sub2.poll_category("counter").await.unwrap() {
seen_a |= e.stream_id == stream_a;
seen_b |= e.stream_id == b.stream_id;
}
if seen_a && seen_b {
break;
}
tokio::time::sleep(Duration::from_millis(20)).await;
}
assert!(seen_a && seen_b, "both events must eventually be delivered");
}
#[derive(Debug, Default, Serialize, Deserialize)]
struct ProjCounter {
total: i64,
}
impl Aggregate for ProjCounter {
type Event = CounterEvent;
fn stream_category() -> &'static str {
"counter-proj"
}
fn apply(&mut self, event: &CounterEvent) {
match event {
CounterEvent::Incremented { by } => self.total += by,
}
}
}
struct CounterTotals {
db: PgPool,
table: String,
}
impl EventHandler for CounterTotals {
type Aggregate = ProjCounter;
async fn handle(&self, event: HandledEvent<CounterEvent>) -> anyhow::Result<()> {
let CounterEvent::Incremented { by } = event.event;
sqlx::query(sqlx::AssertSqlSafe(format!(
"INSERT INTO {} (stream_id, total) VALUES ($1, $2)
ON CONFLICT (stream_id) DO UPDATE SET total = {}.total + EXCLUDED.total",
self.table, self.table
)))
.bind(event.stream_id())
.bind(by)
.execute(&self.db)
.await?;
Ok(())
}
}
#[tokio::test]
async fn projection_runner_builds_read_model() {
let Some(store) = store().await else {
eprintln!("skipping: DATABASE_URL not set");
return;
};
let pool = store.pool().clone();
let suffix = Uuid::new_v4().simple().to_string();
let table = format!("counter_totals_{suffix}");
sqlx::raw_sql(sqlx::AssertSqlSafe(format!(
"CREATE TABLE IF NOT EXISTS {table} (stream_id TEXT PRIMARY KEY, total BIGINT NOT NULL)"
)))
.execute(&pool)
.await
.unwrap();
let id = Uuid::new_v4().to_string();
let mut counter = store.load_or_default::<ProjCounter>(&id).await.unwrap();
counter.record(CounterEvent::Incremented { by: 4 });
counter.record(CounterEvent::Incremented { by: 6 });
store.save(&mut counter).await.unwrap();
let runner = ProjectionRunner::builder(store.clone())
.poll_interval(Duration::from_millis(20))
.subscribe(
format!("counter-totals-{suffix}"),
CounterTotals {
db: pool.clone(),
table: table.clone(),
},
)
.build();
let token = CancellationToken::new();
let handle = {
let token = token.clone();
tokio::spawn(async move { runner.run(token).await })
};
let mut total = None;
for _ in 0..400 {
total = sqlx::query_scalar::<_, i64>(sqlx::AssertSqlSafe(format!(
"SELECT total FROM {table} WHERE stream_id = $1"
)))
.bind(&counter.stream_id)
.fetch_optional(&pool)
.await
.unwrap();
if total.is_some() {
break;
}
tokio::time::sleep(Duration::from_millis(20)).await;
}
token.cancel();
handle.await.unwrap().unwrap();
assert_eq!(total, Some(10));
}
struct AlwaysFails;
impl EventHandler for AlwaysFails {
type Aggregate = ProjCounter;
async fn handle(&self, _event: HandledEvent<CounterEvent>) -> anyhow::Result<()> {
anyhow::bail!("poison: this handler always fails")
}
}
#[tokio::test]
async fn poison_handler_stops_loudly_without_advancing_checkpoint() {
let Some(store) = store().await else {
eprintln!("skipping: DATABASE_URL not set");
return;
};
let pool = store.pool().clone();
let id = Uuid::new_v4().to_string();
let mut counter = store.load_or_default::<ProjCounter>(&id).await.unwrap();
counter.record(CounterEvent::Incremented { by: 1 });
store.save(&mut counter).await.unwrap();
let sub_id = format!("poison-sub-{}", Uuid::new_v4().simple());
let runner = ProjectionRunner::builder(store.clone())
.poll_interval(Duration::from_millis(20))
.max_attempts(2)
.subscribe(sub_id.clone(), AlwaysFails)
.build();
let token = CancellationToken::new();
let outcome = tokio::time::timeout(Duration::from_secs(30), runner.run(token))
.await
.expect("a poison handler must stop the runner, not hang forever");
assert!(
outcome.is_err(),
"a handler failing past max_attempts must stop the subscription loudly, got: {outcome:?}"
);
let last_position: Option<i64> =
sqlx::query_scalar("SELECT last_position FROM es_subscriptions WHERE subscription_id = $1")
.bind(&sub_id)
.fetch_optional(&pool)
.await
.unwrap();
assert_eq!(
last_position.unwrap_or(0),
0,
"a poison event must not advance the checkpoint (no silent skip)"
);
}
struct PanickingHandler;
impl EventHandler for PanickingHandler {
type Aggregate = ProjCounter;
async fn handle(&self, _event: HandledEvent<CounterEvent>) -> anyhow::Result<()> {
panic!("boom: a handler panic, not a graceful Err");
}
}
#[tokio::test]
async fn panicking_handler_is_contained_and_stops_the_runner() {
let Some(store) = store().await else {
eprintln!("skipping: DATABASE_URL not set");
return;
};
let pool = store.pool().clone();
let id = Uuid::new_v4().to_string();
let mut counter = store.load_or_default::<ProjCounter>(&id).await.unwrap();
counter.record(CounterEvent::Incremented { by: 1 });
store.save(&mut counter).await.unwrap();
let sub_id = format!("panic-sub-{}", Uuid::new_v4().simple());
let runner = ProjectionRunner::builder(store.clone())
.poll_interval(Duration::from_millis(20))
.max_attempts(2)
.subscribe(sub_id.clone(), PanickingHandler)
.build();
let token = CancellationToken::new();
let outcome = tokio::time::timeout(Duration::from_secs(30), runner.run(token))
.await
.expect("a panicking handler must stop the runner, not hang");
assert!(
outcome.is_err(),
"a panicking handler must surface as a run error, got: {outcome:?}"
);
let last_position: Option<i64> =
sqlx::query_scalar("SELECT last_position FROM es_subscriptions WHERE subscription_id = $1")
.bind(&sub_id)
.fetch_optional(&pool)
.await
.unwrap();
assert_eq!(
last_position.unwrap_or(0),
0,
"a panicking handler must not advance the checkpoint"
);
}
#[tokio::test]
async fn subscription_polls_and_checkpoints() {
let Some(store) = store().await else {
eprintln!("skipping: DATABASE_URL not set");
return;
};
let pool = store.pool().clone();
let id = Uuid::new_v4().to_string();
let mut counter = store.load_or_default::<Counter>(&id).await.unwrap();
counter.record(CounterEvent::Incremented { by: 5 });
store.save(&mut counter).await.unwrap();
let sub_id = format!("test-sub-{}", Uuid::new_v4());
let mut subscription = Subscription::create(store.clone(), pool, sub_id, 100)
.await
.unwrap();
let mut seen = false;
for _ in 0..400 {
if subscription
.poll_category("counter")
.await
.unwrap()
.iter()
.any(|e| e.stream_id == counter.stream_id)
{
seen = true;
break;
}
tokio::time::sleep(Duration::from_millis(20)).await;
}
assert!(seen, "our event should eventually be delivered");
subscription.checkpoint().await.unwrap();
assert!(subscription.position() > 0);
}
#[tokio::test]
async fn load_reads_streams_longer_than_the_page_limit() {
let Some(store) = store().await else {
eprintln!("skipping: DATABASE_URL not set");
return;
};
let id = Uuid::new_v4().to_string();
const N: i64 = 1001;
let mut counter = store.load_or_default::<Counter>(&id).await.unwrap();
counter.record_many((0..N).map(|_| CounterEvent::Incremented { by: 1 }));
store.save(&mut counter).await.unwrap();
assert_eq!(counter.version, N);
let reloaded = store.load::<Counter>(&id).await.unwrap().unwrap();
assert_eq!(reloaded.version, N);
assert_eq!(
reloaded.state.total, N,
"every event must be folded, not just the first page"
);
}
#[test]
fn hydrate_rejects_non_contiguous_history() {
use mire::{AggregateRoot, RecordedEvent};
let mk = |version: i64| RecordedEvent {
global_position: version,
stream_id: "counter-x".into(),
stream_version: version,
event_type: "counter.incremented".into(),
data: serde_json::json!({ "type": "Incremented", "by": 1 }),
metadata: serde_json::json!({}),
transaction_id: 1,
created_at: chrono::Utc::now(),
};
let events = vec![mk(1), mk(2), mk(4)];
let res = AggregateRoot::<Counter>::hydrate("counter-x".into(), &events, 4);
assert!(
matches!(res, Err(mire::EventStoreError::StreamCorruption { .. })),
"expected StreamCorruption for a version gap"
);
let truncated = vec![mk(1), mk(2), mk(3)];
let res = AggregateRoot::<Counter>::hydrate("counter-x".into(), &truncated, 5);
assert!(
matches!(res, Err(mire::EventStoreError::StreamCorruption { .. })),
"expected StreamCorruption for a truncated read"
);
let good = vec![mk(1), mk(2), mk(3)];
let root = AggregateRoot::<Counter>::hydrate("counter-x".into(), &good, 3).unwrap();
assert_eq!(root.version, 3);
assert_eq!(root.state.total, 3);
}
#[tokio::test]
async fn failed_save_keeps_pending_events_for_retry() {
let Some(store) = store().await else {
eprintln!("skipping: DATABASE_URL not set");
return;
};
let id = Uuid::new_v4().to_string();
let mut writer = store.load_or_default::<Counter>(&id).await.unwrap();
writer.record(CounterEvent::Incremented { by: 1 });
store.save(&mut writer).await.unwrap();
let mut stale = store.load_or_default::<Counter>(&id).await.unwrap();
stale.version = 0; stale.record(CounterEvent::Incremented { by: 5 });
assert_eq!(stale.pending_count(), 1);
let err = store.save(&mut stale).await.unwrap_err();
assert!(matches!(
err,
mire::EventStoreError::ConcurrencyConflict { .. }
));
assert_eq!(
stale.pending_count(),
1,
"pending event must survive a failed save so a retry is not a no-op"
);
assert!(stale.has_pending());
}
#[tokio::test]
async fn empty_append_enforces_expected_version() {
let Some(store) = store().await else {
eprintln!("skipping: DATABASE_URL not set");
return;
};
let id = Uuid::new_v4().to_string();
let stream_id = format!("counter-{id}");
let mut counter = store.load_or_default::<Counter>(&id).await.unwrap();
counter.record(CounterEvent::Incremented { by: 1 });
store.save(&mut counter).await.unwrap();
let err = store
.append::<CounterEvent>(&stream_id, "counter", ExpectedVersion::Exact(0), &[])
.await
.unwrap_err();
assert!(matches!(
err,
mire::EventStoreError::ConcurrencyConflict { .. }
));
let v = store
.append::<CounterEvent>(&stream_id, "counter", ExpectedVersion::Exact(1), &[])
.await
.unwrap();
assert_eq!(v, 1);
}