Skip to main content

klauthed_testing/
repository.rs

1//! A generic in-memory [`Repository`] for tests.
2//!
3//! [`InMemoryRepository`] is a thread-safe, dependency-free stand-in for the real
4//! data-layer repositories, so unit tests can exercise aggregate behavior without
5//! a database. It stores cloned aggregate state keyed by id, captures the events
6//! drained on each [`save`](Repository::save) (mirroring how a real outbox-backed
7//! repository consumes them), and exposes inspection helpers
8//! ([`len`](InMemoryRepository::len), [`contains`](InMemoryRepository::contains),
9//! [`drained_events`](InMemoryRepository::drained_events)).
10//!
11//! The aggregate's id must be usable as a map key: `A::Id: Eq + Hash + Clone`.
12//! [`klauthed_core::id::Id`] satisfies this for any marker type.
13//!
14//! Future work (intentionally out of scope here): fault injection (forced save
15//! errors), optimistic-concurrency conflict simulation by version.
16//!
17//! ```
18//! # use klauthed_core::domain::{AggregateRoot, DomainEvent, Entity, EventLog, Repository};
19//! # use klauthed_core::id::Id;
20//! # use klauthed_testing::repository::InMemoryRepository;
21//! # struct AccountTag;
22//! # type AccountId = Id<AccountTag>;
23//! # #[derive(Debug, Clone, PartialEq, Eq)]
24//! # enum AccountEvent { Opened }
25//! # impl DomainEvent for AccountEvent {
26//! #     fn event_type(&self) -> &'static str { "account.opened" }
27//! # }
28//! # #[derive(Debug, Clone)]
29//! # struct Account { id: AccountId, events: EventLog<AccountEvent> }
30//! # impl Entity for Account { type Id = AccountId; fn id(&self) -> &AccountId { &self.id } }
31//! # impl AggregateRoot for Account {
32//! #     type Event = AccountEvent;
33//! #     fn aggregate_type() -> &'static str { "account" }
34//! #     fn event_log(&self) -> &EventLog<AccountEvent> { &self.events }
35//! #     fn event_log_mut(&mut self) -> &mut EventLog<AccountEvent> { &mut self.events }
36//! # }
37//! # async fn example() {
38//! let repo = InMemoryRepository::<Account>::new();
39//! let mut account = Account { id: AccountId::new(), events: EventLog::new() };
40//! account.record(AccountEvent::Opened);
41//! let id = *account.id();
42//!
43//! repo.save(&mut account).await.unwrap();
44//! assert_eq!(repo.len(), 1);
45//! assert!(repo.contains(&id));
46//! assert_eq!(repo.drained_events(), vec![AccountEvent::Opened]);
47//! # }
48//! ```
49
50use std::collections::HashMap;
51use std::convert::Infallible;
52use std::hash::Hash;
53use std::sync::Mutex;
54
55use async_trait::async_trait;
56
57use klauthed_core::domain::{AggregateRoot, Repository};
58
59/// A thread-safe, in-memory [`Repository`] implementation for tests.
60///
61/// Keys aggregates by their id (`A::Id: Eq + Hash + Clone`) and stores cloned
62/// state. On [`save`](Repository::save) it drains the aggregate's pending events
63/// (as a real repository would, e.g. into an outbox) and records them for later
64/// inspection via [`drained_events`](InMemoryRepository::drained_events).
65///
66/// Its [`Error`](Repository::Error) is [`Infallible`]: nothing here can fail.
67pub struct InMemoryRepository<A>
68where
69    A: AggregateRoot,
70{
71    inner: Mutex<Inner<A>>,
72}
73
74struct Inner<A>
75where
76    A: AggregateRoot,
77{
78    store: HashMap<A::Id, A>,
79    drained: Vec<A::Event>,
80}
81
82impl<A> InMemoryRepository<A>
83where
84    A: AggregateRoot,
85    A::Id: Eq + Hash + Clone,
86{
87    /// An empty repository.
88    pub fn new() -> Self {
89        Self { inner: Mutex::new(Inner { store: HashMap::new(), drained: Vec::new() }) }
90    }
91
92    /// The number of stored aggregates.
93    pub fn len(&self) -> usize {
94        self.lock().store.len()
95    }
96
97    /// Whether the repository holds no aggregates.
98    pub fn is_empty(&self) -> bool {
99        self.lock().store.is_empty()
100    }
101
102    /// Whether an aggregate with `id` is stored.
103    pub fn contains(&self, id: &A::Id) -> bool {
104        self.lock().store.contains_key(id)
105    }
106
107    /// Insert (or replace) an aggregate's state directly, without draining its
108    /// events — a convenience for seeding fixtures.
109    pub fn insert(&self, aggregate: A) {
110        let mut inner = self.lock();
111        inner.store.insert(aggregate.id().clone(), aggregate);
112    }
113
114    /// Remove and return all events captured from `save` calls so far.
115    ///
116    /// Each [`save`](Repository::save) appends the aggregate's drained events
117    /// here, preserving order across saves. Calling this clears the buffer.
118    pub fn drained_events(&self) -> Vec<A::Event> {
119        std::mem::take(&mut self.lock().drained)
120    }
121
122    /// The number of events captured from `save` calls (without draining them).
123    pub fn drained_event_count(&self) -> usize {
124        self.lock().drained.len()
125    }
126
127    fn lock(&self) -> std::sync::MutexGuard<'_, Inner<A>> {
128        self.inner.lock().unwrap_or_else(std::sync::PoisonError::into_inner)
129    }
130}
131
132impl<A> Default for InMemoryRepository<A>
133where
134    A: AggregateRoot,
135    A::Id: Eq + Hash + Clone,
136{
137    fn default() -> Self {
138        Self::new()
139    }
140}
141
142#[async_trait]
143impl<A> Repository<A> for InMemoryRepository<A>
144where
145    A: AggregateRoot + Clone + Send + Sync,
146    A::Id: Eq + Hash + Clone + Send + Sync,
147    A::Event: Send,
148{
149    type Error = Infallible;
150
151    async fn find(&self, id: &A::Id) -> Result<Option<A>, Infallible> {
152        Ok(self.lock().store.get(id).cloned())
153    }
154
155    async fn save(&self, aggregate: &mut A) -> Result<(), Infallible> {
156        // Mirror a real repository: drain pending events (would go to an outbox)
157        // and persist the current state.
158        let events = aggregate.take_events();
159        let key = aggregate.id().clone();
160        let mut inner = self.lock();
161        inner.drained.extend(events);
162        inner.store.insert(key, aggregate.clone());
163        Ok(())
164    }
165
166    async fn delete(&self, id: &A::Id) -> Result<(), Infallible> {
167        self.lock().store.remove(id);
168        Ok(())
169    }
170}
171
172#[cfg(test)]
173mod tests {
174    use super::*;
175    use klauthed_core::domain::{DomainEvent, Entity, EventLog};
176    use klauthed_core::id::Id;
177
178    use crate::ids::seeded_id;
179
180    struct AccountTag;
181    type AccountId = Id<AccountTag>;
182
183    #[derive(Debug, Clone, PartialEq, Eq)]
184    enum AccountEvent {
185        Opened { owner: String },
186        Deposited { amount: i64 },
187    }
188
189    impl DomainEvent for AccountEvent {
190        fn event_type(&self) -> &'static str {
191            match self {
192                AccountEvent::Opened { .. } => "account.opened",
193                AccountEvent::Deposited { .. } => "account.deposited",
194            }
195        }
196    }
197
198    #[derive(Debug, Clone)]
199    struct Account {
200        id: AccountId,
201        balance: i64,
202        events: EventLog<AccountEvent>,
203    }
204
205    impl Entity for Account {
206        type Id = AccountId;
207        fn id(&self) -> &AccountId {
208            &self.id
209        }
210    }
211
212    impl AggregateRoot for Account {
213        type Event = AccountEvent;
214        fn aggregate_type() -> &'static str {
215            "account"
216        }
217        fn event_log(&self) -> &EventLog<AccountEvent> {
218            &self.events
219        }
220        fn event_log_mut(&mut self) -> &mut EventLog<AccountEvent> {
221            &mut self.events
222        }
223    }
224
225    impl Account {
226        fn open(id: AccountId, owner: &str) -> Self {
227            let mut account = Account { id, balance: 0, events: EventLog::new() };
228            account.record(AccountEvent::Opened { owner: owner.to_owned() });
229            account
230        }
231        fn deposit(&mut self, amount: i64) {
232            self.balance += amount;
233            self.record(AccountEvent::Deposited { amount });
234        }
235    }
236
237    #[tokio::test]
238    async fn save_find_delete_round_trip() {
239        let repo = InMemoryRepository::<Account>::new();
240        assert!(repo.is_empty());
241
242        let id = seeded_id::<AccountTag>(1);
243        let mut account = Account::open(id, "alice");
244        account.deposit(25);
245
246        repo.save(&mut account).await.unwrap();
247        assert_eq!(repo.len(), 1);
248        assert!(repo.contains(&id));
249
250        let loaded = repo.find(&id).await.unwrap().expect("present");
251        assert_eq!(loaded.balance, 25);
252        // Events were drained on save, so loaded state has none pending.
253        assert!(loaded.pending_events().is_empty());
254
255        repo.delete(&id).await.unwrap();
256        assert!(!repo.contains(&id));
257        assert!(repo.find(&id).await.unwrap().is_none());
258    }
259
260    #[tokio::test]
261    async fn captures_drained_events_in_order() {
262        let repo = InMemoryRepository::<Account>::new();
263        let mut a = Account::open(seeded_id::<AccountTag>(1), "alice");
264        a.deposit(10);
265        let mut b = Account::open(seeded_id::<AccountTag>(2), "bob");
266
267        repo.save(&mut a).await.unwrap();
268        repo.save(&mut b).await.unwrap();
269
270        assert_eq!(repo.drained_event_count(), 3);
271        let events = repo.drained_events();
272        assert_eq!(
273            events,
274            vec![
275                AccountEvent::Opened { owner: "alice".into() },
276                AccountEvent::Deposited { amount: 10 },
277                AccountEvent::Opened { owner: "bob".into() },
278            ]
279        );
280        // Draining clears the buffer.
281        assert_eq!(repo.drained_event_count(), 0);
282    }
283
284    #[tokio::test]
285    async fn insert_seeds_without_draining() {
286        let repo = InMemoryRepository::<Account>::new();
287        let account = Account::open(seeded_id::<AccountTag>(7), "carol");
288        repo.insert(account);
289        assert_eq!(repo.len(), 1);
290        // insert does not touch the drained-events buffer.
291        assert_eq!(repo.drained_event_count(), 0);
292    }
293
294    #[tokio::test]
295    async fn usable_as_dyn_repository() {
296        let repo: Box<dyn Repository<Account, Error = Infallible>> =
297            Box::new(InMemoryRepository::<Account>::new());
298        let id = seeded_id::<AccountTag>(3);
299        let mut account = Account::open(id, "dave");
300        repo.save(&mut account).await.unwrap();
301        assert!(repo.find(&id).await.unwrap().is_some());
302    }
303}