Skip to main content

yarli_cli/yarli-store/src/
memory.rs

1//! In-memory event store for development and testing.
2//!
3//! Uses `RwLock<Vec<Event>>` for thread-safe concurrent access.
4//! Events are stored in append order and never mutated.
5
6use std::collections::{HashMap, HashSet};
7use std::sync::RwLock;
8
9use crate::yarli_core::domain::{Event, EventId};
10
11use crate::yarli_store::error::StoreError;
12use crate::yarli_store::event_store::{EventQuery, EventStore};
13
14/// In-memory event store backed by a `Vec<Event>` behind a `RwLock`.
15///
16/// Suitable for single-process usage in development and tests.
17/// For production, use the Postgres-backed store.
18#[derive(Debug, Default)]
19pub struct InMemoryEventStore {
20    events: RwLock<Vec<Event>>,
21    /// Index: event_id -> position in events vec.
22    event_index: RwLock<HashMap<EventId, usize>>,
23    /// Set of known idempotency keys for dedup.
24    idempotency_keys: RwLock<HashSet<String>>,
25}
26
27impl InMemoryEventStore {
28    pub fn new() -> Self {
29        Self::default()
30    }
31}
32
33impl EventStore for InMemoryEventStore {
34    fn append(&self, event: Event) -> Result<(), StoreError> {
35        // Check idempotency key first (outside write lock).
36        if let Some(ref key) = event.idempotency_key {
37            let keys = self.idempotency_keys.read().unwrap();
38            if keys.contains(key) {
39                return Err(StoreError::DuplicateIdempotencyKey(key.clone()));
40            }
41        }
42
43        let mut events = self.events.write().unwrap();
44        let mut index = self.event_index.write().unwrap();
45        let mut keys = self.idempotency_keys.write().unwrap();
46
47        // Double-check idempotency key under write lock.
48        if let Some(ref key) = event.idempotency_key {
49            if keys.contains(key) {
50                return Err(StoreError::DuplicateIdempotencyKey(key.clone()));
51            }
52        }
53
54        // Check duplicate event ID.
55        if index.contains_key(&event.event_id) {
56            return Err(StoreError::DuplicateEventId(event.event_id));
57        }
58
59        let pos = events.len();
60        index.insert(event.event_id, pos);
61        if let Some(ref key) = event.idempotency_key {
62            keys.insert(key.clone());
63        }
64        events.push(event);
65
66        Ok(())
67    }
68
69    fn get(&self, event_id: EventId) -> Result<Event, StoreError> {
70        let index = self.event_index.read().unwrap();
71        let events = self.events.read().unwrap();
72
73        match index.get(&event_id) {
74            Some(&pos) => Ok(events[pos].clone()),
75            None => Err(StoreError::EventNotFound(event_id)),
76        }
77    }
78
79    fn query(&self, query: &EventQuery) -> Result<Vec<Event>, StoreError> {
80        let events = self.events.read().unwrap();
81        let index = self.event_index.read().unwrap();
82
83        // Resolve the starting position for after_event_id pagination.
84        let start_pos = match query.after_event_id {
85            Some(after_id) => match index.get(&after_id) {
86                Some(&pos) => pos + 1,
87                None => return Err(StoreError::EventNotFound(after_id)),
88            },
89            None => 0,
90        };
91
92        let mut results: Vec<Event> = events[start_pos..]
93            .iter()
94            .filter(|e| {
95                if let Some(ref et) = query.entity_type {
96                    if e.entity_type != *et {
97                        return false;
98                    }
99                }
100                if let Some(ref eid) = query.entity_id {
101                    if e.entity_id != *eid {
102                        return false;
103                    }
104                }
105                if let Some(ref cid) = query.correlation_id {
106                    if e.correlation_id != *cid {
107                        return false;
108                    }
109                }
110                if let Some(ref evt) = query.event_type {
111                    if e.event_type != *evt {
112                        return false;
113                    }
114                }
115                true
116            })
117            .cloned()
118            .collect();
119
120        // Events are already in insertion order (which is occurred_at ASC).
121        if let Some(limit) = query.limit {
122            results.truncate(limit);
123        }
124
125        Ok(results)
126    }
127
128    fn all(&self) -> Result<Vec<Event>, StoreError> {
129        let events = self.events.read().unwrap();
130        Ok(events.clone())
131    }
132
133    fn len(&self) -> usize {
134        self.events.read().unwrap().len()
135    }
136}
137
138#[cfg(test)]
139mod tests;