yarli_cli/yarli-store/src/
memory.rs1use std::collections::{HashMap, HashSet};
7use std::sync::RwLock;
8
9use crate::yarli_core::domain::{Event, EventId};
10
11use crate::yarli_store::error::StoreError;
12use crate::yarli_store::event_store::{EventQuery, EventStore};
13
14#[derive(Debug, Default)]
19pub struct InMemoryEventStore {
20 events: RwLock<Vec<Event>>,
21 event_index: RwLock<HashMap<EventId, usize>>,
23 idempotency_keys: RwLock<HashSet<String>>,
25}
26
27impl InMemoryEventStore {
28 pub fn new() -> Self {
29 Self::default()
30 }
31}
32
33impl EventStore for InMemoryEventStore {
34 fn append(&self, event: Event) -> Result<(), StoreError> {
35 if let Some(ref key) = event.idempotency_key {
37 let keys = self.idempotency_keys.read().unwrap();
38 if keys.contains(key) {
39 return Err(StoreError::DuplicateIdempotencyKey(key.clone()));
40 }
41 }
42
43 let mut events = self.events.write().unwrap();
44 let mut index = self.event_index.write().unwrap();
45 let mut keys = self.idempotency_keys.write().unwrap();
46
47 if let Some(ref key) = event.idempotency_key {
49 if keys.contains(key) {
50 return Err(StoreError::DuplicateIdempotencyKey(key.clone()));
51 }
52 }
53
54 if index.contains_key(&event.event_id) {
56 return Err(StoreError::DuplicateEventId(event.event_id));
57 }
58
59 let pos = events.len();
60 index.insert(event.event_id, pos);
61 if let Some(ref key) = event.idempotency_key {
62 keys.insert(key.clone());
63 }
64 events.push(event);
65
66 Ok(())
67 }
68
69 fn get(&self, event_id: EventId) -> Result<Event, StoreError> {
70 let index = self.event_index.read().unwrap();
71 let events = self.events.read().unwrap();
72
73 match index.get(&event_id) {
74 Some(&pos) => Ok(events[pos].clone()),
75 None => Err(StoreError::EventNotFound(event_id)),
76 }
77 }
78
79 fn query(&self, query: &EventQuery) -> Result<Vec<Event>, StoreError> {
80 let events = self.events.read().unwrap();
81 let index = self.event_index.read().unwrap();
82
83 let start_pos = match query.after_event_id {
85 Some(after_id) => match index.get(&after_id) {
86 Some(&pos) => pos + 1,
87 None => return Err(StoreError::EventNotFound(after_id)),
88 },
89 None => 0,
90 };
91
92 let mut results: Vec<Event> = events[start_pos..]
93 .iter()
94 .filter(|e| {
95 if let Some(ref et) = query.entity_type {
96 if e.entity_type != *et {
97 return false;
98 }
99 }
100 if let Some(ref eid) = query.entity_id {
101 if e.entity_id != *eid {
102 return false;
103 }
104 }
105 if let Some(ref cid) = query.correlation_id {
106 if e.correlation_id != *cid {
107 return false;
108 }
109 }
110 if let Some(ref evt) = query.event_type {
111 if e.event_type != *evt {
112 return false;
113 }
114 }
115 true
116 })
117 .cloned()
118 .collect();
119
120 if let Some(limit) = query.limit {
122 results.truncate(limit);
123 }
124
125 Ok(results)
126 }
127
128 fn all(&self) -> Result<Vec<Event>, StoreError> {
129 let events = self.events.read().unwrap();
130 Ok(events.clone())
131 }
132
133 fn len(&self) -> usize {
134 self.events.read().unwrap().len()
135 }
136}
137
138#[cfg(test)]
139mod tests;