use std::collections::HashMap;
use std::num::NonZeroU32;
use std::sync::Arc;
mod effects;
mod execute_pipeline;
mod projection;
mod projection_pipeline;
pub use eventcore_types::{
AttemptNumber, CommandError, CommandLogic, CommandStreams, DelayMilliseconds, Event,
FailureContext, FailureStrategy, NewEvents, Projector, StreamDeclarations, StreamId,
StreamPosition, StreamResolver,
};
use eventcore_types::{EventStore, MaxRetries, StreamVersion, StreamWrites};
pub use projection::{ProjectionConfig, ProjectionError, run_projection};
#[cfg(feature = "macros")]
pub use eventcore_macros::Command;
#[cfg(feature = "postgres")]
pub use eventcore_postgres as postgres;
#[cfg(feature = "sqlite")]
pub use eventcore_sqlite as sqlite;
#[macro_export]
macro_rules! require {
($condition:expr, $error:expr $(,)?) => {
if !$condition {
return ::core::result::Result::Err(
::core::convert::Into::<$crate::CommandError>::into($error),
);
}
};
($condition:expr, $format:expr, $($arg:expr),+ $(,)?) => {
if !$condition {
let message = ::std::format!($format, $($arg),+);
return ::core::result::Result::Err(
::core::convert::Into::<$crate::CommandError>::into(message),
);
}
};
}
#[derive(Debug)]
pub struct ExecutionResponse {
attempts: NonZeroU32,
}
impl ExecutionResponse {
pub fn new(attempts: NonZeroU32) -> Self {
Self { attempts }
}
pub fn attempts(&self) -> u32 {
self.attempts.get()
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum BackoffStrategy {
Fixed {
delay_ms: DelayMilliseconds,
},
Exponential {
base_ms: DelayMilliseconds,
},
}
pub trait MetricsHook: Send + Sync {
fn on_retry_attempt(&self, ctx: &RetryContext);
}
#[derive(Debug, Clone)]
pub struct RetryContext {
pub streams: Vec<StreamId>,
pub attempt: AttemptNumber,
pub delay_ms: DelayMilliseconds,
}
#[derive(Clone)]
pub struct RetryPolicy {
max_retries: MaxRetries,
backoff_strategy: BackoffStrategy,
metrics_hook: Option<Arc<dyn MetricsHook>>,
}
impl RetryPolicy {
pub fn new() -> Self {
Self {
max_retries: MaxRetries::new(4),
backoff_strategy: BackoffStrategy::Exponential {
base_ms: DelayMilliseconds::new(10),
},
metrics_hook: None,
}
}
pub fn max_retries(mut self, retries: u32) -> Self {
self.max_retries = MaxRetries::new(retries);
self
}
pub fn backoff_strategy(mut self, strategy: BackoffStrategy) -> Self {
self.backoff_strategy = strategy;
self
}
pub fn with_metrics_hook<H: MetricsHook + 'static>(mut self, hook: H) -> Self {
self.metrics_hook = Some(Arc::new(hook));
self
}
}
impl Default for RetryPolicy {
fn default() -> Self {
Self::new()
}
}
fn calculate_jitter_factor(random_value: f64) -> f64 {
1.0 + (random_value - 0.5) * 0.4
}
fn apply_jitter(base_delay: u64, jitter_factor: f64) -> u64 {
(base_delay as f64 * jitter_factor) as u64
}
pub(crate) fn build_stream_writes_from_events<C: CommandLogic>(
events: Vec<C::Event>,
expected_versions: HashMap<StreamId, StreamVersion>,
) -> Result<StreamWrites, CommandError> {
expected_versions
.into_iter()
.try_fold(
StreamWrites::new(),
|writes, (stream_id, expected_version)| {
writes.register_stream(stream_id, expected_version)
},
)
.and_then(|writes| {
events
.into_iter()
.try_fold(writes, |writes, event| writes.append(event))
})
.map_err(CommandError::EventStoreError)
}
pub(crate) fn compute_retry_delay_ms(
strategy: &BackoffStrategy,
attempt: u32,
) -> DelayMilliseconds {
match strategy {
BackoffStrategy::Fixed { delay_ms } => *delay_ms,
BackoffStrategy::Exponential { base_ms } => {
let base_ms_u64: u64 = (*base_ms).into();
let base_delay = 2_u64
.checked_pow(attempt)
.and_then(|exp| base_ms_u64.checked_mul(exp))
.unwrap_or(u64::MAX);
let random_value = rand::random::<f64>();
let jitter_factor = calculate_jitter_factor(random_value);
DelayMilliseconds::new(apply_jitter(base_delay, jitter_factor))
}
}
}
#[tracing::instrument(name = "execute", skip_all, fields())]
pub async fn execute<C, S>(
store: S,
command: C,
policy: RetryPolicy,
) -> Result<ExecutionResponse, CommandError>
where
C: CommandLogic,
S: EventStore,
{
use effects::{StoreEffect, StoreEffectResult};
use execute_pipeline::{ExecutePipeline, PipelineOutcome, PipelineStep};
let mut pipeline = ExecutePipeline::new(command, policy);
let mut step = pipeline.step();
loop {
match step {
PipelineStep::Done(PipelineOutcome::Success(response)) => return Ok(response),
PipelineStep::Done(PipelineOutcome::Error(err)) => return Err(err),
PipelineStep::Yield(StoreEffect::ReadStream { stream_id }) => {
let result = store.read_stream::<C::Event>(stream_id).await;
step = pipeline.resume(StoreEffectResult::StreamRead(result));
}
PipelineStep::Yield(StoreEffect::AppendEvents { writes }) => {
let result = store.append_events(writes).await;
step = pipeline.resume(StoreEffectResult::EventsAppended(result));
}
PipelineStep::Yield(StoreEffect::Sleep { duration }) => {
tokio::time::sleep(duration).await;
step = pipeline.resume(StoreEffectResult::Slept);
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use eventcore_memory::InMemoryEventStore;
use eventcore_types::{EventStoreError, EventStreamReader, EventStreamSlice};
use serde::{Deserialize, Serialize};
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
struct TestEvent {
stream_id: StreamId,
}
impl Event for TestEvent {
fn stream_id(&self) -> &StreamId {
&self.stream_id
}
fn event_type_name() -> &'static str {
"TestEvent"
}
}
struct MockCommand {
stream_id: StreamId,
handle_called: Arc<AtomicBool>,
}
impl CommandStreams for MockCommand {
fn stream_declarations(&self) -> StreamDeclarations {
StreamDeclarations::single(self.stream_id.clone())
}
}
impl CommandLogic for MockCommand {
type Event = TestEvent;
type State = ();
fn apply(&self, state: Self::State, _event: &Self::Event) -> Self::State {
state
}
fn handle(&self, _state: Self::State) -> Result<NewEvents<Self::Event>, CommandError> {
self.handle_called.store(true, Ordering::SeqCst);
Ok(NewEvents::default())
}
}
#[tokio::test]
async fn test_execute_calls_command_handle() {
let store = InMemoryEventStore::new();
let stream_id = StreamId::try_new("test-stream").expect("valid stream id");
let handle_called = Arc::new(AtomicBool::new(false));
let command = MockCommand {
stream_id,
handle_called: Arc::clone(&handle_called),
};
let result = execute(&store, command, RetryPolicy::new()).await;
assert!(result.is_ok(), "execute() should succeed");
assert!(
handle_called.load(Ordering::SeqCst),
"execute() must call command.handle()"
);
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
struct TestEventWithValue {
stream_id: StreamId,
value: i32,
}
impl Event for TestEventWithValue {
fn stream_id(&self) -> &StreamId {
&self.stream_id
}
fn event_type_name() -> &'static str {
"TestEventWithValue"
}
}
#[derive(Default, Clone, Debug, PartialEq)]
struct TestState {
value: i32,
}
struct StateCapturingCommand {
stream_id: StreamId,
captured_state: Arc<std::sync::Mutex<Option<TestState>>>,
}
impl CommandStreams for StateCapturingCommand {
fn stream_declarations(&self) -> StreamDeclarations {
StreamDeclarations::single(self.stream_id.clone())
}
}
impl CommandLogic for StateCapturingCommand {
type Event = TestEventWithValue;
type State = TestState;
fn apply(&self, mut state: Self::State, event: &Self::Event) -> Self::State {
state.value += event.value;
state
}
fn handle(&self, state: Self::State) -> Result<NewEvents<Self::Event>, CommandError> {
*self.captured_state.lock().unwrap() = Some(state);
Ok(NewEvents::default())
}
}
#[tokio::test]
async fn test_read_stream_failure_propagates_as_event_store_error() {
struct FailingEventStore;
impl EventStore for FailingEventStore {
async fn read_stream<E: Event>(
&self,
_stream_id: StreamId,
) -> Result<EventStreamReader<E>, EventStoreError> {
Err(EventStoreError::VersionConflict {
stream_id: _stream_id,
expected: StreamVersion::new(0),
actual: StreamVersion::new(1),
})
}
async fn append_events(
&self,
_writes: StreamWrites,
) -> Result<EventStreamSlice, EventStoreError> {
unimplemented!("Not needed for this test")
}
}
let store = FailingEventStore;
let stream_id = StreamId::try_new("test-stream").expect("valid stream id");
let command = MockCommand {
stream_id,
handle_called: Arc::new(AtomicBool::new(false)),
};
let result = execute(&store, command, RetryPolicy::new()).await;
assert!(
matches!(result, Err(CommandError::EventStoreError(_))),
"read_stream() failure should propagate as CommandError::EventStoreError, got: {:?}",
result
);
}
#[tokio::test]
async fn test_execute_reconstructs_state_from_existing_events() {
let store = InMemoryEventStore::new();
let stream_id = StreamId::try_new("account-123").expect("valid stream id");
let seed_event = TestEventWithValue {
stream_id: stream_id.clone(),
value: 50,
};
let writes = StreamWrites::new()
.register_stream(stream_id.clone(), StreamVersion::new(0))
.and_then(|writes| writes.append(seed_event))
.expect("seed append should succeed");
let _ = store
.append_events(writes)
.await
.expect("seed event to be stored");
let captured_state = Arc::new(std::sync::Mutex::new(None));
let command = StateCapturingCommand {
stream_id: stream_id.clone(),
captured_state: captured_state.clone(),
};
let _ = execute(&store, command, RetryPolicy::new())
.await
.expect("command execution to succeed");
let final_state = captured_state.lock().unwrap().clone().unwrap();
assert_eq!(
final_state.value, 50,
"execute() must reconstruct state from existing events before calling handle()"
);
}
#[tokio::test]
#[tracing_test::traced_test]
async fn test_execute_retries_automatically_on_version_conflict() {
use tokio::sync::Mutex;
struct ConflictOnceStore {
inner: InMemoryEventStore,
conflict_injected: Arc<Mutex<bool>>,
}
impl EventStore for ConflictOnceStore {
async fn read_stream<E: Event>(
&self,
stream_id: StreamId,
) -> Result<EventStreamReader<E>, EventStoreError> {
self.inner.read_stream(stream_id).await
}
async fn append_events(
&self,
writes: StreamWrites,
) -> Result<EventStreamSlice, EventStoreError> {
let mut injected = self.conflict_injected.lock().await;
if !*injected {
*injected = true;
Err(EventStoreError::VersionConflict {
stream_id: StreamId::try_new("conflict-test").expect("valid"),
expected: StreamVersion::new(0),
actual: StreamVersion::new(1),
})
} else {
self.inner.append_events(writes).await
}
}
}
let store = ConflictOnceStore {
inner: InMemoryEventStore::new(),
conflict_injected: Arc::new(Mutex::new(false)),
};
let stream_id = StreamId::try_new("test-stream").expect("valid stream id");
let command = MockCommand {
stream_id,
handle_called: Arc::new(AtomicBool::new(false)),
};
let result = execute(&store, command, RetryPolicy::new()).await;
assert!(
result.is_ok(),
"execute() should retry automatically and succeed, but got: {:?}",
result
);
assert!(
logs_contain("attempt="),
"logs should contain structured attempt field"
);
assert!(
logs_contain("delay_ms="),
"logs should contain structured delay_ms field"
);
assert!(
logs_contain("streams="),
"logs should contain structured streams field"
);
}
#[tokio::test]
async fn test_execute_returns_error_after_exhausting_retries() {
let store = AlwaysConflictStore::new();
let stream_id = StreamId::try_new("test-stream").expect("valid stream id");
let command = MockCommand {
stream_id,
handle_called: Arc::new(AtomicBool::new(false)),
};
let result = execute(&store, command, RetryPolicy::new()).await;
assert!(
matches!(result, Err(CommandError::ConcurrencyError(_))),
"should return ConcurrencyError after exhausting retries, but got: {:?}",
result
);
let error = result.unwrap_err();
if let CommandError::ConcurrencyError(_) = &error {
let error_msg = error.to_string();
assert_eq!(
error_msg, "concurrency conflict after 4 retry attempts",
"error message should clearly explain that retries were exhausted"
);
}
}
#[tokio::test]
async fn test_execute_with_custom_retry_policy() {
let store = AlwaysConflictStore::new();
let policy = RetryPolicy::new().max_retries(1);
let stream_id = StreamId::try_new("test-stream").expect("valid stream id");
let command = MockCommand {
stream_id,
handle_called: Arc::new(AtomicBool::new(false)),
};
let result = execute(&store, command, policy).await;
assert!(
matches!(result, Err(CommandError::ConcurrencyError(1))),
"should fail after exactly 1 retry as configured in policy, but got: {:?}",
result
);
}
struct AlwaysConflictStore {
inner: InMemoryEventStore,
}
impl AlwaysConflictStore {
fn new() -> Self {
Self {
inner: InMemoryEventStore::new(),
}
}
}
impl EventStore for AlwaysConflictStore {
async fn read_stream<E: Event>(
&self,
stream_id: StreamId,
) -> Result<EventStreamReader<E>, EventStoreError> {
self.inner.read_stream(stream_id).await
}
async fn append_events(
&self,
_writes: StreamWrites,
) -> Result<EventStreamSlice, EventStoreError> {
Err(EventStoreError::VersionConflict {
stream_id: StreamId::try_new("always-conflict").expect("valid"),
expected: StreamVersion::new(0),
actual: StreamVersion::new(1),
})
}
}
struct ConflictNTimesStore {
inner: InMemoryEventStore,
conflict_count: Arc<tokio::sync::Mutex<u32>>,
conflicts_to_inject: u32,
}
impl ConflictNTimesStore {
fn new(conflicts_to_inject: u32) -> Self {
Self {
inner: InMemoryEventStore::new(),
conflict_count: Arc::new(tokio::sync::Mutex::new(0)),
conflicts_to_inject,
}
}
}
impl EventStore for ConflictNTimesStore {
async fn read_stream<E: Event>(
&self,
stream_id: StreamId,
) -> Result<EventStreamReader<E>, EventStoreError> {
self.inner.read_stream(stream_id).await
}
async fn append_events(
&self,
writes: StreamWrites,
) -> Result<EventStreamSlice, EventStoreError> {
let mut count = self.conflict_count.lock().await;
if *count < self.conflicts_to_inject {
*count += 1;
Err(EventStoreError::VersionConflict {
stream_id: StreamId::try_new("conflict-n-times").expect("valid"),
expected: StreamVersion::new(0),
actual: StreamVersion::new(1),
})
} else {
self.inner.append_events(writes).await
}
}
}
#[tokio::test]
async fn test_execute_with_fixed_backoff_strategy() {
let policy = RetryPolicy::new()
.max_retries(2)
.backoff_strategy(BackoffStrategy::Fixed {
delay_ms: DelayMilliseconds::new(50),
});
let store = ConflictNTimesStore::new(2);
let stream_id = StreamId::try_new("test-stream").expect("valid stream id");
let command = MockCommand {
stream_id,
handle_called: Arc::new(AtomicBool::new(false)),
};
let start = std::time::Instant::now();
let result = execute(&store, command, policy).await;
let elapsed = start.elapsed();
assert!(result.is_ok(), "command should succeed after 2 retries");
assert!(
elapsed.as_millis() >= 70 && elapsed.as_millis() <= 130,
"expected ~100ms for 2 retries with 50ms fixed backoff, got {}ms",
elapsed.as_millis()
);
}
#[tokio::test]
async fn test_execute_with_zero_max_retries_disables_retry() {
let policy = RetryPolicy::new().max_retries(0);
let store = AlwaysConflictStore::new();
let stream_id = StreamId::try_new("test-stream").expect("valid stream id");
let command = MockCommand {
stream_id,
handle_called: Arc::new(AtomicBool::new(false)),
};
let start = std::time::Instant::now();
let result = execute(&store, command, policy).await;
let elapsed = start.elapsed();
assert!(
matches!(result, Err(CommandError::ConcurrencyError(0))),
"should return ConcurrencyError(0) for zero max_retries, but got: {:?}",
result
);
assert!(
elapsed.as_millis() < 10,
"expected <10ms for immediate failure, got {}ms",
elapsed.as_millis()
);
}
#[tokio::test]
#[tracing_test::traced_test]
async fn test_execute_emits_tracing_spans_and_events() {
let store = ConflictNTimesStore::new(2);
let policy = RetryPolicy::new().max_retries(3);
let stream_id = StreamId::try_new("account-123").expect("valid stream id");
let command = MockCommand {
stream_id,
handle_called: Arc::new(AtomicBool::new(false)),
};
let result = execute(&store, command, policy.clone()).await;
assert!(
result.is_ok(),
"command should succeed after 2 retries, got: {:?}",
result
);
assert!(
logs_contain(":execute:"),
"should create execution span named 'execute'"
);
assert!(
logs_contain("command execution succeeded") || logs_contain("execution complete"),
"should log success event when command completes"
)
}
#[tokio::test]
#[tracing_test::traced_test]
async fn test_retry_warnings_include_structured_fields() {
let store = ConflictNTimesStore::new(2);
let policy = RetryPolicy::new().max_retries(3);
let stream_id = StreamId::try_new("test-stream-123").expect("valid stream id");
let command = MockCommand {
stream_id: stream_id.clone(),
handle_called: Arc::new(AtomicBool::new(false)),
};
let result = execute(&store, command, policy).await;
assert!(result.is_ok(), "command should succeed after 2 retries");
assert!(
logs_contain("attempt="),
"logs should contain attempt field"
);
assert!(
logs_contain("delay_ms="),
"logs should contain delay_ms field"
);
assert!(
logs_contain("streams="),
"logs should contain streams field"
);
}
#[tokio::test]
#[tracing_test::traced_test]
async fn test_error_event_when_retries_exhausted() {
let store = AlwaysConflictStore::new();
let policy = RetryPolicy::new().max_retries(2);
let stream_id = StreamId::try_new("always-fails").expect("valid stream id");
let command = MockCommand {
stream_id,
handle_called: Arc::new(AtomicBool::new(false)),
};
let result = execute(&store, command, policy).await;
assert!(
matches!(result, Err(CommandError::ConcurrencyError(2))),
"should fail after exhausting retries"
);
assert!(
logs_contain("ERROR"),
"should log error event when retries exhausted"
);
assert!(
logs_contain("max_retries="),
"error event should include max_retries field"
);
assert!(
logs_contain("streams="),
"error event should include streams field"
);
}
#[tokio::test]
async fn test_metrics_hook_called_during_retry() {
use std::sync::atomic::AtomicUsize;
struct MockMetricsHook {
retry_count: Arc<AtomicUsize>,
}
impl MetricsHook for MockMetricsHook {
fn on_retry_attempt(&self, _ctx: &RetryContext) {
let _ = self.retry_count.fetch_add(1, Ordering::SeqCst);
}
}
let retry_count = Arc::new(AtomicUsize::new(0));
let hook = MockMetricsHook {
retry_count: Arc::clone(&retry_count),
};
let policy = RetryPolicy::new().max_retries(2).with_metrics_hook(hook);
let store = ConflictNTimesStore::new(1);
let stream_id = StreamId::try_new("test-stream").expect("valid stream id");
let command = MockCommand {
stream_id,
handle_called: Arc::new(AtomicBool::new(false)),
};
let result = execute(&store, command, policy).await;
assert!(result.is_ok(), "command should succeed after retry");
assert_eq!(
retry_count.load(Ordering::SeqCst),
1,
"metrics hook should be called once for the single retry attempt"
);
}
#[cfg(test)]
mod jitter_tests {
use super::*;
#[test]
fn test_calculate_jitter_factor_minimum() {
let result = calculate_jitter_factor(0.0);
assert_eq!(result, 0.8);
}
#[test]
fn test_calculate_jitter_factor_no_jitter() {
let result = calculate_jitter_factor(0.5);
assert_eq!(result, 1.0);
}
#[test]
fn test_calculate_jitter_factor_maximum() {
let result = calculate_jitter_factor(1.0);
assert_eq!(result, 1.2);
}
#[test]
fn test_apply_jitter_minimum() {
let result = apply_jitter(100, 0.8);
assert_eq!(result, 80);
}
#[test]
fn test_apply_jitter_no_jitter() {
let result = apply_jitter(100, 1.0);
assert_eq!(result, 100);
}
#[test]
fn test_apply_jitter_maximum() {
let result = apply_jitter(100, 1.2);
assert_eq!(result, 120);
}
#[test]
fn test_apply_jitter_zero_base_delay() {
let result = apply_jitter(0, 1.0);
assert_eq!(result, 0);
}
#[test]
fn test_apply_jitter_large_values() {
let result = apply_jitter(10000, 1.1);
assert_eq!(result, 11000);
}
}
}