use std::time::{Duration, Instant};
use mire::{Event, EventData, EventStore, ExpectedVersion, RecordedEvent, Subscription};
use serde::{Deserialize, Serialize};
use sqlx::{PgPool, Row};
use uuid::Uuid;
mod common;
#[derive(Debug, Clone, Serialize, Deserialize)]
struct Marked {
mark: String,
}
impl EventData for Marked {
fn event_type(&self) -> &'static str {
"ordering.marked"
}
}
async fn store() -> Option<EventStore> {
common::store().await
}
fn ev(mark: &str) -> Event<Marked> {
Event::new(Marked {
mark: mark.to_string(),
})
}
async fn force_xid<'e, E>(executor: E)
where
E: sqlx::Executor<'e, Database = sqlx::Postgres>,
{
sqlx::query("SELECT pg_current_xact_id()")
.execute(executor)
.await
.expect("force xid");
}
async fn collect_until(
sub: &mut Subscription,
category: &str,
deadline: Duration,
pred: impl Fn(&[RecordedEvent]) -> bool,
) -> Vec<RecordedEvent> {
let start = Instant::now();
let mut got: Vec<RecordedEvent> = Vec::new();
loop {
got.extend(sub.poll_category(category).await.expect("poll"));
if pred(&got) {
return got;
}
assert!(
start.elapsed() < deadline,
"timed out waiting for deliveries; got {:?}",
got.iter()
.map(|e| (e.stream_id.clone(), e.stream_version))
.collect::<Vec<_>>()
);
tokio::time::sleep(Duration::from_millis(25)).await;
}
}
fn assert_per_stream_order(events: &[RecordedEvent]) {
let mut last: std::collections::HashMap<&str, i64> = std::collections::HashMap::new();
for e in events {
if let Some(prev) = last.insert(e.stream_id.as_str(), e.stream_version) {
assert!(
e.stream_version > prev,
"per-stream ordering violated on {}: delivered v{} after v{}",
e.stream_id,
e.stream_version,
prev
);
}
}
}
async fn cursor_of(pool: &PgPool, stream_id: &str, version: i64) -> (u64, i64) {
let row = sqlx::query(
"SELECT transaction_id::text AS txid, global_position
FROM es_events WHERE stream_id = $1 AND stream_version = $2",
)
.bind(stream_id)
.bind(version)
.fetch_one(pool)
.await
.expect("event exists");
let txid: String = row.get("txid");
(txid.parse().unwrap(), row.get("global_position"))
}
async fn subscription_cursor(pool: &PgPool, id: &str) -> (u64, i64) {
let row = sqlx::query(
"SELECT last_transaction_id::text AS txid, last_position
FROM es_subscriptions WHERE subscription_id = $1",
)
.bind(id)
.fetch_one(pool)
.await
.expect("subscription row");
let txid: String = row.get("txid");
(txid.parse().unwrap(), row.get("last_position"))
}
async fn inverted_pair(store: &EventStore, category: &str, stream: &str) {
let mut tx_a = store.begin_transaction().await.expect("begin A");
force_xid(&mut **tx_a.tx()).await;
store
.append(stream, category, ExpectedVersion::NoStream, &[ev("v1")])
.await
.expect("append v1");
tx_a.append(stream, category, ExpectedVersion::Exact(1), &[ev("v2")])
.await
.expect("append v2");
tx_a.commit().await.expect("commit A");
}
#[tokio::test]
async fn same_batch_inversion_is_reordered() {
let Some(store) = store().await else {
eprintln!("skipping: DATABASE_URL not set");
return;
};
let suffix = Uuid::new_v4().simple().to_string();
let category = format!("ordsb{suffix}");
let stream = format!("{category}-s1");
let mut sub = Subscription::create(
store.clone(),
store.pool().clone(),
format!("ord-sb-{suffix}"),
100,
)
.await
.expect("create sub");
inverted_pair(&store, &category, &stream).await;
let got = collect_until(&mut sub, &category, Duration::from_secs(10), |g| {
g.len() >= 2
})
.await;
assert_eq!(got.len(), 2);
assert_per_stream_order(&got);
assert_eq!(
(got[0].stream_version, got[1].stream_version),
(1, 2),
"v1 must be delivered before v2 even though v2 sorts first by xid"
);
}
#[tokio::test]
async fn limit_cut_does_not_split_inversion() {
let Some(store) = store().await else {
eprintln!("skipping: DATABASE_URL not set");
return;
};
let suffix = Uuid::new_v4().simple().to_string();
let category = format!("ordlc{suffix}");
let stream = format!("{category}-s1");
let mut sub = Subscription::create(
store.clone(),
store.pool().clone(),
format!("ord-lc-{suffix}"),
1,
)
.await
.expect("create sub");
inverted_pair(&store, &category, &stream).await;
let got = collect_until(&mut sub, &category, Duration::from_secs(10), |g| {
g.len() >= 2
})
.await;
assert_per_stream_order(&got);
assert_eq!((got[0].stream_version, got[1].stream_version), (1, 2));
}
#[tokio::test]
async fn cross_poll_inversion_is_held_back() {
let Some(store) = store().await else {
eprintln!("skipping: DATABASE_URL not set");
return;
};
let suffix = Uuid::new_v4().simple().to_string();
let category = format!("ordcp{suffix}");
let stream = format!("{category}-s1");
let mut sub = Subscription::create(
store.clone(),
store.pool().clone(),
format!("ord-cp-{suffix}"),
100,
)
.await
.expect("create sub");
let mut tx_a = store.begin_transaction().await.expect("begin A");
force_xid(&mut **tx_a.tx()).await;
let mut pin = store.pool().begin().await.expect("begin pin");
force_xid(&mut *pin).await;
store
.append(&stream, &category, ExpectedVersion::NoStream, &[ev("v1")])
.await
.expect("append v1");
tx_a.append(&stream, &category, ExpectedVersion::Exact(1), &[ev("v2")])
.await
.expect("append v2");
tx_a.commit().await.expect("commit A");
let probe_deadline = Instant::now() + Duration::from_millis(800);
while Instant::now() < probe_deadline {
let got = sub.poll_category(&category).await.expect("poll");
assert!(
got.is_empty(),
"nothing from this stream may be delivered while v1 is blocked; got {:?}",
got.iter().map(|e| e.stream_version).collect::<Vec<_>>()
);
tokio::time::sleep(Duration::from_millis(25)).await;
}
pin.rollback().await.expect("release pin");
let got = collect_until(&mut sub, &category, Duration::from_secs(10), |g| {
g.len() >= 2
})
.await;
assert_per_stream_order(&got);
assert_eq!((got[0].stream_version, got[1].stream_version), (1, 2));
}
#[tokio::test]
async fn unrelated_streams_flow_during_holdback_and_checkpoint_lags() {
let Some(store) = store().await else {
eprintln!("skipping: DATABASE_URL not set");
return;
};
let suffix = Uuid::new_v4().simple().to_string();
let category = format!("ordfl{suffix}");
let stream_s = format!("{category}-s");
let stream_t = format!("{category}-t");
let sub_id = format!("ord-fl-{suffix}");
let mut sub = Subscription::create(store.clone(), store.pool().clone(), sub_id.clone(), 100)
.await
.expect("create sub");
let mut tx_a = store.begin_transaction().await.expect("begin A");
force_xid(&mut **tx_a.tx()).await;
let mut tx_d = store.begin_transaction().await.expect("begin D");
force_xid(&mut **tx_d.tx()).await;
let mut pin = store.pool().begin().await.expect("begin pin");
force_xid(&mut *pin).await;
store
.append(&stream_s, &category, ExpectedVersion::NoStream, &[ev("v1")])
.await
.expect("append v1");
tx_a.append(&stream_s, &category, ExpectedVersion::Exact(1), &[ev("v2")])
.await
.expect("append v2");
tx_a.commit().await.expect("commit A");
tx_d.append(&stream_t, &category, ExpectedVersion::NoStream, &[ev("t1")])
.await
.expect("append t1");
tx_d.commit().await.expect("commit D");
let got = collect_until(&mut sub, &category, Duration::from_secs(10), |g| {
g.iter().any(|e| e.stream_id == stream_t)
})
.await;
assert!(
got.iter().all(|e| e.stream_id != stream_s),
"S's v2 must be held while its v1 is undeliverable; delivered {:?}",
got.iter()
.map(|e| (e.stream_id.clone(), e.stream_version))
.collect::<Vec<_>>()
);
sub.checkpoint().await.expect("checkpoint");
let v2_cursor = cursor_of(store.pool(), &stream_s, 2).await;
let cp = subscription_cursor(store.pool(), &sub_id).await;
assert!(
cp < v2_cursor,
"checkpoint {cp:?} must lag behind the held event at {v2_cursor:?}"
);
pin.rollback().await.expect("release pin");
let mut sub2 = Subscription::create(store.clone(), store.pool().clone(), sub_id.clone(), 100)
.await
.expect("recreate sub");
let got = collect_until(&mut sub2, &category, Duration::from_secs(10), |g| {
g.iter()
.filter(|e| e.stream_id == stream_s)
.map(|e| e.stream_version)
.collect::<Vec<_>>()
== vec![1, 2]
})
.await;
assert_per_stream_order(&got);
}
#[tokio::test]
async fn legacy_inverted_checkpoint_trips_backstop_loudly() {
let Some(store) = store().await else {
eprintln!("skipping: DATABASE_URL not set");
return;
};
let suffix = Uuid::new_v4().simple().to_string();
let category = format!("ordbs{suffix}");
let stream = format!("{category}-s1");
let sub_id = format!("ord-bs-{suffix}");
inverted_pair(&store, &category, &stream).await;
let (v2_txid, v2_pos) = cursor_of(store.pool(), &stream, 2).await;
sqlx::query(
"INSERT INTO es_subscriptions (subscription_id, last_position, last_transaction_id)
VALUES ($1, $2, $3::text::xid8)",
)
.bind(&sub_id)
.bind(v2_pos)
.bind(v2_txid.to_string())
.execute(store.pool())
.await
.expect("forge legacy checkpoint");
let mut sub = Subscription::create(store.clone(), store.pool().clone(), sub_id.clone(), 100)
.await
.expect("create sub");
let start = Instant::now();
loop {
match sub.poll_category(&category).await {
Ok(got) if got.is_empty() => {
assert!(
start.elapsed() < Duration::from_secs(10),
"timed out waiting for the backstop to trip"
);
tokio::time::sleep(Duration::from_millis(25)).await;
}
Ok(got) => panic!(
"v1 must not be delivered after a checkpoint that passed v2; got {:?}",
got.iter().map(|e| e.stream_version).collect::<Vec<_>>()
),
Err(mire::EventStoreError::OrderingViolation {
stream_id,
version,
last_delivered,
..
}) => {
assert_eq!(stream_id, stream);
assert_eq!(version, 1);
assert_eq!(last_delivered, 2);
break;
}
Err(e) => panic!("unexpected error: {e}"),
}
}
}
#[tokio::test]
async fn chaos_concurrent_scopes_never_regress_per_stream_order() {
let Some(store) = store().await else {
eprintln!("skipping: DATABASE_URL not set");
return;
};
let suffix = Uuid::new_v4().simple().to_string();
let category = format!("ordch{suffix}");
const STREAMS: usize = 4;
const EVENTS_PER_STREAM: i64 = 20;
let mut sub = Subscription::create(
store.clone(),
store.pool().clone(),
format!("ord-ch-{suffix}"),
7, )
.await
.expect("create sub");
let pin_stop = std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false));
let pin_task = {
let pool = store.pool().clone();
let stop = pin_stop.clone();
tokio::spawn(async move {
while !stop.load(std::sync::atomic::Ordering::Relaxed) {
let mut pin = pool.begin().await.expect("pin begin");
force_xid(&mut *pin).await;
tokio::time::sleep(Duration::from_millis(15)).await;
pin.rollback().await.expect("pin rollback");
}
})
};
let writers: Vec<_> = (0..STREAMS)
.map(|s| {
let store = store.clone();
let category = category.clone();
tokio::spawn(async move {
let stream = format!("{category}-s{s}");
for v in 1..=EVENTS_PER_STREAM {
let expected = if v == 1 {
ExpectedVersion::NoStream
} else {
ExpectedVersion::Exact(v - 1)
};
if v % 2 == 0 {
let mut scope = store.begin_transaction().await.expect("begin");
force_xid(&mut **scope.tx()).await;
tokio::time::sleep(Duration::from_millis(3 + (v as u64 % 7))).await;
scope
.append(&stream, &category, expected, &[ev(&format!("v{v}"))])
.await
.expect("scoped append");
scope.commit().await.expect("commit");
} else {
store
.append(&stream, &category, expected, &[ev(&format!("v{v}"))])
.await
.expect("append");
}
}
})
})
.collect();
for w in writers {
w.await.expect("writer");
}
pin_stop.store(true, std::sync::atomic::Ordering::Relaxed);
pin_task.await.expect("pin task");
let total = STREAMS as i64 * EVENTS_PER_STREAM;
let got = collect_until(&mut sub, &category, Duration::from_secs(30), |g| {
g.len() as i64 >= total
})
.await;
assert_eq!(got.len() as i64, total);
assert_per_stream_order(&got);
for s in 0..STREAMS {
let stream = format!("{category}-s{s}");
let versions: Vec<i64> = got
.iter()
.filter(|e| e.stream_id == stream)
.map(|e| e.stream_version)
.collect();
assert_eq!(
versions,
(1..=EVENTS_PER_STREAM).collect::<Vec<i64>>(),
"stream {stream} must arrive complete and in order"
);
}
}
#[tokio::test]
async fn restart_replay_does_not_trip_backstop() {
let Some(store) = store().await else {
eprintln!("skipping: DATABASE_URL not set");
return;
};
let suffix = Uuid::new_v4().simple().to_string();
let category = format!("ordrs{suffix}");
let stream = format!("{category}-s1");
let mut sub = Subscription::create(
store.clone(),
store.pool().clone(),
format!("ord-rs-{suffix}"),
100,
)
.await
.expect("create sub");
store
.append(&stream, &category, ExpectedVersion::NoStream, &[ev("v1")])
.await
.expect("append v1");
store
.append(&stream, &category, ExpectedVersion::Exact(1), &[ev("v2")])
.await
.expect("append v2");
let got = collect_until(&mut sub, &category, Duration::from_secs(10), |g| {
g.len() >= 2
})
.await;
assert_per_stream_order(&got);
sub.reset().await.expect("reset");
let replay = collect_until(&mut sub, &category, Duration::from_secs(10), |g| {
g.len() >= 2
})
.await;
assert_per_stream_order(&replay);
assert_eq!(
replay
.iter()
.map(|e| e.stream_version)
.collect::<Vec<i64>>(),
vec![1, 2]
);
}