use std::collections::HashMap;
use std::convert::Infallible;
use std::hash::Hash;
use std::sync::Mutex;
use async_trait::async_trait;
use klauthed_core::domain::{AggregateRoot, Repository};
pub struct InMemoryRepository<A>
where
A: AggregateRoot,
{
inner: Mutex<Inner<A>>,
}
struct Inner<A>
where
A: AggregateRoot,
{
store: HashMap<A::Id, A>,
drained: Vec<A::Event>,
}
impl<A> InMemoryRepository<A>
where
A: AggregateRoot,
A::Id: Eq + Hash + Clone,
{
pub fn new() -> Self {
Self { inner: Mutex::new(Inner { store: HashMap::new(), drained: Vec::new() }) }
}
pub fn len(&self) -> usize {
self.lock().store.len()
}
pub fn is_empty(&self) -> bool {
self.lock().store.is_empty()
}
pub fn contains(&self, id: &A::Id) -> bool {
self.lock().store.contains_key(id)
}
pub fn insert(&self, aggregate: A) {
let mut inner = self.lock();
inner.store.insert(aggregate.id().clone(), aggregate);
}
pub fn drained_events(&self) -> Vec<A::Event> {
std::mem::take(&mut self.lock().drained)
}
pub fn drained_event_count(&self) -> usize {
self.lock().drained.len()
}
fn lock(&self) -> std::sync::MutexGuard<'_, Inner<A>> {
self.inner.lock().unwrap_or_else(std::sync::PoisonError::into_inner)
}
}
impl<A> Default for InMemoryRepository<A>
where
A: AggregateRoot,
A::Id: Eq + Hash + Clone,
{
fn default() -> Self {
Self::new()
}
}
#[async_trait]
impl<A> Repository<A> for InMemoryRepository<A>
where
A: AggregateRoot + Clone + Send + Sync,
A::Id: Eq + Hash + Clone + Send + Sync,
A::Event: Send,
{
type Error = Infallible;
async fn find(&self, id: &A::Id) -> Result<Option<A>, Infallible> {
Ok(self.lock().store.get(id).cloned())
}
async fn save(&self, aggregate: &mut A) -> Result<(), Infallible> {
let events = aggregate.take_events();
let key = aggregate.id().clone();
let mut inner = self.lock();
inner.drained.extend(events);
inner.store.insert(key, aggregate.clone());
Ok(())
}
async fn delete(&self, id: &A::Id) -> Result<(), Infallible> {
self.lock().store.remove(id);
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
use klauthed_core::domain::{DomainEvent, Entity, EventLog};
use klauthed_core::id::Id;
use crate::ids::seeded_id;
struct AccountTag;
type AccountId = Id<AccountTag>;
#[derive(Debug, Clone, PartialEq, Eq)]
enum AccountEvent {
Opened { owner: String },
Deposited { amount: i64 },
}
impl DomainEvent for AccountEvent {
fn event_type(&self) -> &'static str {
match self {
AccountEvent::Opened { .. } => "account.opened",
AccountEvent::Deposited { .. } => "account.deposited",
}
}
}
#[derive(Debug, Clone)]
struct Account {
id: AccountId,
balance: i64,
events: EventLog<AccountEvent>,
}
impl Entity for Account {
type Id = AccountId;
fn id(&self) -> &AccountId {
&self.id
}
}
impl AggregateRoot for Account {
type Event = AccountEvent;
fn aggregate_type() -> &'static str {
"account"
}
fn event_log(&self) -> &EventLog<AccountEvent> {
&self.events
}
fn event_log_mut(&mut self) -> &mut EventLog<AccountEvent> {
&mut self.events
}
}
impl Account {
fn open(id: AccountId, owner: &str) -> Self {
let mut account = Account { id, balance: 0, events: EventLog::new() };
account.record(AccountEvent::Opened { owner: owner.to_owned() });
account
}
fn deposit(&mut self, amount: i64) {
self.balance += amount;
self.record(AccountEvent::Deposited { amount });
}
}
#[tokio::test]
async fn save_find_delete_round_trip() {
let repo = InMemoryRepository::<Account>::new();
assert!(repo.is_empty());
let id = seeded_id::<AccountTag>(1);
let mut account = Account::open(id, "alice");
account.deposit(25);
repo.save(&mut account).await.unwrap();
assert_eq!(repo.len(), 1);
assert!(repo.contains(&id));
let loaded = repo.find(&id).await.unwrap().expect("present");
assert_eq!(loaded.balance, 25);
assert!(loaded.pending_events().is_empty());
repo.delete(&id).await.unwrap();
assert!(!repo.contains(&id));
assert!(repo.find(&id).await.unwrap().is_none());
}
#[tokio::test]
async fn captures_drained_events_in_order() {
let repo = InMemoryRepository::<Account>::new();
let mut a = Account::open(seeded_id::<AccountTag>(1), "alice");
a.deposit(10);
let mut b = Account::open(seeded_id::<AccountTag>(2), "bob");
repo.save(&mut a).await.unwrap();
repo.save(&mut b).await.unwrap();
assert_eq!(repo.drained_event_count(), 3);
let events = repo.drained_events();
assert_eq!(
events,
vec![
AccountEvent::Opened { owner: "alice".into() },
AccountEvent::Deposited { amount: 10 },
AccountEvent::Opened { owner: "bob".into() },
]
);
assert_eq!(repo.drained_event_count(), 0);
}
#[tokio::test]
async fn insert_seeds_without_draining() {
let repo = InMemoryRepository::<Account>::new();
let account = Account::open(seeded_id::<AccountTag>(7), "carol");
repo.insert(account);
assert_eq!(repo.len(), 1);
assert_eq!(repo.drained_event_count(), 0);
}
#[tokio::test]
async fn usable_as_dyn_repository() {
let repo: Box<dyn Repository<Account, Error = Infallible>> =
Box::new(InMemoryRepository::<Account>::new());
let id = seeded_id::<AccountTag>(3);
let mut account = Account::open(id, "dave");
repo.save(&mut account).await.unwrap();
assert!(repo.find(&id).await.unwrap().is_some());
}
}