#![cfg(feature = "not-send-futures")]
use std::cell::RefCell;
use std::collections::HashMap;
use std::rc::Rc;
use fmodel_rust::aggregate::{
EventRepository, EventSourcedAggregate, StateRepository, StateStoredAggregate,
};
use fmodel_rust::decider::Decider;
use fmodel_rust::Identifier;
use tokio::task;
use crate::api::{CreateOrderCommand, OrderCommand, OrderCreatedEvent, OrderEvent, OrderState};
use crate::application::AggregateError;
mod api;
mod application;
struct InMemoryOrderEventRepository {
events: RefCell<Vec<(OrderEvent, i32)>>,
}
impl InMemoryOrderEventRepository {
fn new() -> Self {
Self {
events: RefCell::new(vec![]),
}
}
}
impl EventRepository<OrderCommand, OrderEvent, i32, AggregateError>
for InMemoryOrderEventRepository
{
async fn fetch_events(
&self,
command: &OrderCommand,
) -> Result<Vec<(OrderEvent, i32)>, AggregateError> {
let events = self.events.borrow(); Ok(events
.iter()
.cloned()
.filter(|(e, _)| e.identifier() == command.identifier())
.collect())
}
async fn save(&self, events: &[OrderEvent]) -> Result<Vec<(OrderEvent, i32)>, AggregateError> {
let latest_version = {
let events_vec = self.events.borrow(); events
.first()
.and_then(|first_event| {
events_vec
.iter()
.filter(|(e, _)| e.identifier() == first_event.identifier())
.map(|(_, v)| *v)
.last()
})
.unwrap_or(-1)
};
let mut current_version = latest_version;
let new_events: Vec<(OrderEvent, i32)> = events
.iter()
.map(|event| {
current_version += 1;
(event.clone(), current_version)
})
.collect();
self.events.borrow_mut().extend_from_slice(&new_events);
Ok(new_events)
}
async fn version_provider(&self, event: &OrderEvent) -> Result<Option<i32>, AggregateError> {
let events = self.events.borrow();
Ok(events
.iter()
.filter(|(e, _)| e.identifier() == event.identifier())
.map(|(_, v)| *v)
.last())
}
}
struct InMemoryOrderStateRepository {
states: RefCell<HashMap<u32, (OrderState, i32)>>,
}
impl InMemoryOrderStateRepository {
fn new() -> Self {
Self {
states: RefCell::new(HashMap::new()),
}
}
}
impl StateRepository<OrderCommand, OrderState, i32, AggregateError>
for InMemoryOrderStateRepository
{
async fn fetch_state(
&self,
command: &OrderCommand,
) -> Result<Option<(OrderState, i32)>, AggregateError> {
let states = self.states.borrow();
Ok(states
.get(&command.identifier().parse::<u32>().unwrap())
.cloned())
}
async fn save(
&self,
state: &OrderState,
version: &Option<i32>,
) -> Result<(OrderState, i32), AggregateError> {
let mut states = self.states.borrow_mut();
let version = version.unwrap_or(0);
states.insert(state.order_id, (state.clone(), version + 1));
Ok((state.clone(), version))
}
}
fn decider<'a>() -> Decider<'a, OrderCommand, OrderState, OrderEvent> {
Decider {
decide: Box::new(|command, _state| match command {
OrderCommand::Create(cmd) => Ok(vec![OrderEvent::Created(OrderCreatedEvent {
order_id: cmd.order_id,
customer_name: cmd.customer_name.clone(),
items: cmd.items.clone(),
})]),
OrderCommand::Update(_cmd) => Ok(vec![]),
OrderCommand::Cancel(_cmd) => Ok(vec![]),
}),
evolve: Box::new(|state, _event| state.clone()),
initial_state: Box::new(|| OrderState {
order_id: 0,
customer_name: "".to_string(),
items: vec![],
is_cancelled: false,
}),
}
}
#[tokio::test]
async fn es_test_not_send() {
let repository = InMemoryOrderEventRepository::new();
let aggregate = EventSourcedAggregate::new(
repository,
decider().map_error(|()| AggregateError::DomainError("Decider error".to_string())),
);
let task1 = async {
let command = OrderCommand::Create(CreateOrderCommand {
order_id: 1,
customer_name: "Alice".to_string(),
items: vec!["Item1".to_string()],
});
let result = aggregate.handle(&command).await;
assert!(result.is_ok());
};
let task2 = async {
let command = OrderCommand::Create(CreateOrderCommand {
order_id: 1,
customer_name: "John Doe".to_string(),
items: vec!["Item 1".to_string(), "Item 2".to_string()],
});
let result = aggregate.handle(&command).await;
assert!(result.is_ok());
};
tokio::join!(task1, task2);
}
#[tokio::test]
async fn es_test_not_send_with_spawn_local() {
let local = task::LocalSet::new();
local
.run_until(async {
let repository = InMemoryOrderEventRepository::new();
let aggregate = Rc::new(EventSourcedAggregate::new(
repository,
decider().map_error(|()| AggregateError::DomainError("Decider error".to_string())),
));
let aggregate1 = Rc::clone(&aggregate);
let aggregate2 = Rc::clone(&aggregate);
let handle1 = task::spawn_local(async move {
let command = OrderCommand::Create(CreateOrderCommand {
order_id: 1,
customer_name: "Alice".to_string(),
items: vec!["Item1".to_string()],
});
let result = aggregate1.handle(&command).await;
assert!(result.is_ok());
});
let handle2 = task::spawn_local(async move {
let command = OrderCommand::Create(CreateOrderCommand {
order_id: 2,
customer_name: "Bob".to_string(),
items: vec!["Item2".to_string()],
});
let result = aggregate2.handle(&command).await;
assert!(result.is_ok());
});
let (result1, result2) = tokio::join!(handle1, handle2);
assert!(result1.is_ok());
assert!(result2.is_ok());
})
.await;
}
#[tokio::test]
async fn ss_test_not_send() {
let repository = InMemoryOrderStateRepository::new();
let aggregate = StateStoredAggregate::new(
repository,
decider().map_error(|()| AggregateError::DomainError("Decider error".to_string())),
);
let command = OrderCommand::Create(CreateOrderCommand {
order_id: 1,
customer_name: "Alice".to_string(),
items: vec!["Item1".to_string()],
});
let result = aggregate.handle(&command).await;
assert!(result.is_ok());
}