use eventcore::{
Command, CommandError, CommandLogic, CommandStreams, Event, NewEvents, RetryPolicy,
StreamDeclarations, StreamId, collect_events, execute,
};
use eventcore_memory::InMemoryEventStore;
use eventcore_types::EventStore;
use serde::{Deserialize, Serialize};
use uuid::Uuid;
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
enum TransferEvent {
Debited { account_id: StreamId, cents: u64 },
Credited { account_id: StreamId, cents: u64 },
}
impl Event for TransferEvent {
fn stream_id(&self) -> &StreamId {
match self {
TransferEvent::Debited { account_id, .. }
| TransferEvent::Credited { account_id, .. } => account_id,
}
}
fn event_type_name() -> &'static str {
"TransferEvent"
}
}
#[derive(Default, Debug, Clone, Copy, PartialEq, Eq)]
struct TransferLedger {
debits_recorded: u32,
credits_recorded: u32,
}
impl TransferLedger {
fn record(mut self, event: &TransferEvent) -> Self {
match event {
TransferEvent::Debited { .. } => self.debits_recorded += 1,
TransferEvent::Credited { .. } => self.credits_recorded += 1,
}
self
}
fn already_completed(&self) -> bool {
self.debits_recorded > 0 && self.credits_recorded > 0
}
}
struct ManualTransfer {
source: StreamId,
destination: StreamId,
cents: u64,
}
impl CommandStreams for ManualTransfer {
fn stream_declarations(&self) -> StreamDeclarations {
StreamDeclarations::try_from_streams(vec![self.source.clone(), self.destination.clone()])
.expect("manual transfer declares unique streams")
}
}
impl CommandLogic for ManualTransfer {
type Event = TransferEvent;
type State = TransferLedger;
fn apply(&self, state: Self::State, event: &Self::Event) -> Self::State {
state.record(event)
}
fn handle(&self, state: Self::State) -> Result<NewEvents<Self::Event>, CommandError> {
if state.already_completed() {
return Err(CommandError::from(
"transfer already applied to both streams",
));
}
Ok(vec![
TransferEvent::Debited {
account_id: self.source.clone(),
cents: self.cents,
},
TransferEvent::Credited {
account_id: self.destination.clone(),
cents: self.cents,
},
]
.into())
}
}
#[derive(Command)]
struct DerivedTransfer {
#[stream]
source: StreamId,
#[stream]
destination: StreamId,
cents: u64,
}
impl CommandLogic for DerivedTransfer {
type Event = TransferEvent;
type State = TransferLedger;
fn apply(&self, state: Self::State, event: &Self::Event) -> Self::State {
state.record(event)
}
fn handle(&self, state: Self::State) -> Result<NewEvents<Self::Event>, CommandError> {
if state.already_completed() {
return Err(CommandError::from(
"transfer already applied to both streams",
));
}
Ok(vec![
TransferEvent::Debited {
account_id: self.source.clone(),
cents: self.cents,
},
TransferEvent::Credited {
account_id: self.destination.clone(),
cents: self.cents,
},
]
.into())
}
}
fn new_stream_id() -> StreamId {
StreamId::try_new(Uuid::now_v7().to_string()).expect("valid stream id")
}
#[tokio::test]
async fn migrating_from_manual_impl_preserves_behavior() {
let source_account = new_stream_id();
let destination_account = new_stream_id();
let cents = 6_500u64;
let manual_command = ManualTransfer {
source: source_account.clone(),
destination: destination_account.clone(),
cents,
};
let derived_command = DerivedTransfer {
source: source_account.clone(),
destination: destination_account.clone(),
cents,
};
let manual_streams: Vec<StreamId> = manual_command
.stream_declarations()
.iter()
.cloned()
.collect();
let derived_streams: Vec<StreamId> = derived_command
.stream_declarations()
.iter()
.cloned()
.collect();
let manual_store = InMemoryEventStore::new();
let derived_store = InMemoryEventStore::new();
let _ = execute(&manual_store, manual_command, RetryPolicy::new())
.await
.expect("manual command to succeed");
let _ = execute(&derived_store, derived_command, RetryPolicy::new())
.await
.expect("derived command to succeed");
assert_eq!(
manual_streams, derived_streams,
"#[derive(Command)] should declare the same streams as the manual implementation",
);
let manual_source_events = collect_events(
manual_store
.read_stream::<TransferEvent>(source_account.clone())
.await
.expect("reading manual source stream should succeed"),
)
.await
.expect("collecting manual source events");
let derived_source_events = collect_events(
derived_store
.read_stream::<TransferEvent>(source_account.clone())
.await
.expect("reading derived source stream should succeed"),
)
.await
.expect("collecting derived source events");
assert_eq!(
manual_source_events, derived_source_events,
"source stream should observe identical debits after migration",
);
let manual_destination_events = collect_events(
manual_store
.read_stream::<TransferEvent>(destination_account.clone())
.await
.expect("reading manual destination stream should succeed"),
)
.await
.expect("collecting manual destination events");
let derived_destination_events = collect_events(
derived_store
.read_stream::<TransferEvent>(destination_account.clone())
.await
.expect("reading derived destination stream should succeed"),
)
.await
.expect("collecting derived destination events");
assert_eq!(
manual_destination_events, derived_destination_events,
"destination stream should observe identical credits after migration",
);
}