klauthed_testing/
repository.rs1use std::collections::HashMap;
51use std::convert::Infallible;
52use std::hash::Hash;
53use std::sync::Mutex;
54
55use async_trait::async_trait;
56
57use klauthed_core::domain::{AggregateRoot, Repository};
58
59pub struct InMemoryRepository<A>
68where
69 A: AggregateRoot,
70{
71 inner: Mutex<Inner<A>>,
72}
73
74struct Inner<A>
75where
76 A: AggregateRoot,
77{
78 store: HashMap<A::Id, A>,
79 drained: Vec<A::Event>,
80}
81
82impl<A> InMemoryRepository<A>
83where
84 A: AggregateRoot,
85 A::Id: Eq + Hash + Clone,
86{
87 pub fn new() -> Self {
89 Self { inner: Mutex::new(Inner { store: HashMap::new(), drained: Vec::new() }) }
90 }
91
92 pub fn len(&self) -> usize {
94 self.lock().store.len()
95 }
96
97 pub fn is_empty(&self) -> bool {
99 self.lock().store.is_empty()
100 }
101
102 pub fn contains(&self, id: &A::Id) -> bool {
104 self.lock().store.contains_key(id)
105 }
106
107 pub fn insert(&self, aggregate: A) {
110 let mut inner = self.lock();
111 inner.store.insert(aggregate.id().clone(), aggregate);
112 }
113
114 pub fn drained_events(&self) -> Vec<A::Event> {
119 std::mem::take(&mut self.lock().drained)
120 }
121
122 pub fn drained_event_count(&self) -> usize {
124 self.lock().drained.len()
125 }
126
127 fn lock(&self) -> std::sync::MutexGuard<'_, Inner<A>> {
128 self.inner.lock().unwrap_or_else(std::sync::PoisonError::into_inner)
129 }
130}
131
132impl<A> Default for InMemoryRepository<A>
133where
134 A: AggregateRoot,
135 A::Id: Eq + Hash + Clone,
136{
137 fn default() -> Self {
138 Self::new()
139 }
140}
141
142#[async_trait]
143impl<A> Repository<A> for InMemoryRepository<A>
144where
145 A: AggregateRoot + Clone + Send + Sync,
146 A::Id: Eq + Hash + Clone + Send + Sync,
147 A::Event: Send,
148{
149 type Error = Infallible;
150
151 async fn find(&self, id: &A::Id) -> Result<Option<A>, Infallible> {
152 Ok(self.lock().store.get(id).cloned())
153 }
154
155 async fn save(&self, aggregate: &mut A) -> Result<(), Infallible> {
156 let events = aggregate.take_events();
159 let key = aggregate.id().clone();
160 let mut inner = self.lock();
161 inner.drained.extend(events);
162 inner.store.insert(key, aggregate.clone());
163 Ok(())
164 }
165
166 async fn delete(&self, id: &A::Id) -> Result<(), Infallible> {
167 self.lock().store.remove(id);
168 Ok(())
169 }
170}
171
172#[cfg(test)]
173mod tests {
174 use super::*;
175 use klauthed_core::domain::{DomainEvent, Entity, EventLog};
176 use klauthed_core::id::Id;
177
178 use crate::ids::seeded_id;
179
180 struct AccountTag;
181 type AccountId = Id<AccountTag>;
182
183 #[derive(Debug, Clone, PartialEq, Eq)]
184 enum AccountEvent {
185 Opened { owner: String },
186 Deposited { amount: i64 },
187 }
188
189 impl DomainEvent for AccountEvent {
190 fn event_type(&self) -> &'static str {
191 match self {
192 AccountEvent::Opened { .. } => "account.opened",
193 AccountEvent::Deposited { .. } => "account.deposited",
194 }
195 }
196 }
197
198 #[derive(Debug, Clone)]
199 struct Account {
200 id: AccountId,
201 balance: i64,
202 events: EventLog<AccountEvent>,
203 }
204
205 impl Entity for Account {
206 type Id = AccountId;
207 fn id(&self) -> &AccountId {
208 &self.id
209 }
210 }
211
212 impl AggregateRoot for Account {
213 type Event = AccountEvent;
214 fn aggregate_type() -> &'static str {
215 "account"
216 }
217 fn event_log(&self) -> &EventLog<AccountEvent> {
218 &self.events
219 }
220 fn event_log_mut(&mut self) -> &mut EventLog<AccountEvent> {
221 &mut self.events
222 }
223 }
224
225 impl Account {
226 fn open(id: AccountId, owner: &str) -> Self {
227 let mut account = Account { id, balance: 0, events: EventLog::new() };
228 account.record(AccountEvent::Opened { owner: owner.to_owned() });
229 account
230 }
231 fn deposit(&mut self, amount: i64) {
232 self.balance += amount;
233 self.record(AccountEvent::Deposited { amount });
234 }
235 }
236
237 #[tokio::test]
238 async fn save_find_delete_round_trip() {
239 let repo = InMemoryRepository::<Account>::new();
240 assert!(repo.is_empty());
241
242 let id = seeded_id::<AccountTag>(1);
243 let mut account = Account::open(id, "alice");
244 account.deposit(25);
245
246 repo.save(&mut account).await.unwrap();
247 assert_eq!(repo.len(), 1);
248 assert!(repo.contains(&id));
249
250 let loaded = repo.find(&id).await.unwrap().expect("present");
251 assert_eq!(loaded.balance, 25);
252 assert!(loaded.pending_events().is_empty());
254
255 repo.delete(&id).await.unwrap();
256 assert!(!repo.contains(&id));
257 assert!(repo.find(&id).await.unwrap().is_none());
258 }
259
260 #[tokio::test]
261 async fn captures_drained_events_in_order() {
262 let repo = InMemoryRepository::<Account>::new();
263 let mut a = Account::open(seeded_id::<AccountTag>(1), "alice");
264 a.deposit(10);
265 let mut b = Account::open(seeded_id::<AccountTag>(2), "bob");
266
267 repo.save(&mut a).await.unwrap();
268 repo.save(&mut b).await.unwrap();
269
270 assert_eq!(repo.drained_event_count(), 3);
271 let events = repo.drained_events();
272 assert_eq!(
273 events,
274 vec![
275 AccountEvent::Opened { owner: "alice".into() },
276 AccountEvent::Deposited { amount: 10 },
277 AccountEvent::Opened { owner: "bob".into() },
278 ]
279 );
280 assert_eq!(repo.drained_event_count(), 0);
282 }
283
284 #[tokio::test]
285 async fn insert_seeds_without_draining() {
286 let repo = InMemoryRepository::<Account>::new();
287 let account = Account::open(seeded_id::<AccountTag>(7), "carol");
288 repo.insert(account);
289 assert_eq!(repo.len(), 1);
290 assert_eq!(repo.drained_event_count(), 0);
292 }
293
294 #[tokio::test]
295 async fn usable_as_dyn_repository() {
296 let repo: Box<dyn Repository<Account, Error = Infallible>> =
297 Box::new(InMemoryRepository::<Account>::new());
298 let id = seeded_id::<AccountTag>(3);
299 let mut account = Account::open(id, "dave");
300 repo.save(&mut account).await.unwrap();
301 assert!(repo.find(&id).await.unwrap().is_some());
302 }
303}