use eventcore_types::{
Event, EventStore, EventStoreError, EventStreamReader, EventStreamSlice, StreamId,
StreamVersion, StreamWrites,
};
pub struct DeterministicConflictStore<S> {
inner: S,
remaining_conflicts: std::sync::atomic::AtomicU32,
}
impl<S> DeterministicConflictStore<S> {
pub fn new(store: S, conflict_count: u32) -> Self {
Self {
inner: store,
remaining_conflicts: std::sync::atomic::AtomicU32::new(conflict_count),
}
}
}
impl<S> EventStore for DeterministicConflictStore<S>
where
S: EventStore + Sync,
{
async fn read_stream<E: Event>(
&self,
stream_id: StreamId,
) -> Result<EventStreamReader<E>, EventStoreError> {
self.inner.read_stream(stream_id).await
}
async fn append_events(
&self,
writes: StreamWrites,
) -> Result<EventStreamSlice, EventStoreError> {
let remaining = self.remaining_conflicts.fetch_update(
std::sync::atomic::Ordering::SeqCst,
std::sync::atomic::Ordering::SeqCst,
|n| if n > 0 { Some(n - 1) } else { None },
);
if remaining.is_ok() {
return Err(EventStoreError::VersionConflict {
stream_id: StreamId::try_new("deterministic-conflict").expect("valid"),
expected: StreamVersion::new(0),
actual: StreamVersion::new(1),
});
}
self.inner.append_events(writes).await
}
}