nitinol_inmemory_adaptor/
store.rs

1use std::collections::{BTreeSet, HashMap};
2use std::fmt::{Debug, Formatter};
3use std::sync::Arc;
4use async_trait::async_trait;
5use nitinol_core::identifier::EntityId;
6use nitinol_protocol::errors::ProtocolError;
7use nitinol_protocol::io::{Reader, Writer};
8use nitinol_protocol::Payload;
9use tokio::sync::RwLock;
10use crate::errors::MemoryIoError;
11use crate::lock::OptLock;
12
13pub struct InMemoryEventStore {
14    store: Arc<RwLock<HashMap<EntityId, OptLock<BTreeSet<Payload>>>>>
15}
16
17impl Debug for InMemoryEventStore {
18    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
19        write!(f, "InMemoryEventStore")
20    }
21}
22
23impl Clone for InMemoryEventStore {
24    fn clone(&self) -> Self {
25        Self { store: Arc::clone(&self.store) }
26    }
27}
28
29impl Default for InMemoryEventStore {
30    fn default() -> Self {
31        Self { store: Arc::new(RwLock::new(HashMap::new())) }
32    }
33}
34
35#[async_trait]
36impl Writer for InMemoryEventStore {
37    async fn write(&self, aggregate_id: EntityId, payload: Payload) -> Result<(), ProtocolError> {
38        let guard = self.store.read().await;
39        if !guard.contains_key(&aggregate_id) {
40            tracing::debug!("not found entity: {}", aggregate_id);
41            
42            drop(guard); // release the read lock
43            
44            tracing::debug!("create new store for entity: {}", aggregate_id);
45            let mut guard = self.store.write().await;
46            let mut init = BTreeSet::new();
47            init.insert(payload);
48            guard.insert(aggregate_id, OptLock::new(init));
49            tracing::debug!("create successfully.");
50            return Ok(())
51        }
52        
53        let Some(lock) = guard.get(&aggregate_id) else {
54            panic!("If the target store does not exist, a new one should be created, so it should definitely exist.");
55        };
56        
57        let mut lock = lock.write().await
58            .map_err(|e| ProtocolError::Write(Box::new(e)))?;
59        
60        tracing::debug!("writing: {:?}", payload);
61        
62        lock.insert(payload);
63        
64        tracing::debug!("write successfully.");
65        Ok(())
66    }
67}
68
69#[async_trait]
70impl Reader for InMemoryEventStore {
71    async fn read(&self, id: EntityId, seq: i64) -> Result<Payload, ProtocolError> {
72        tracing::debug!("read: id={}, seq={}", id, seq);
73        
74        let guard = self.store.read().await;
75        let Some(lock) = guard.get(&id) else {
76            return Err(ProtocolError::Read(Box::new(MemoryIoError::NotFound(id))));
77        };
78        let found = loop {
79            match lock.read().await {
80                Ok(guard) => {
81                    let found = guard.iter()
82                        .find(|payload| payload.sequence_id.eq(&seq))
83                        .cloned();
84                    match guard.sync().await {
85                        Ok(_) => break found,
86                        Err(e) => {
87                            tracing::error!("{}", e);
88                            continue;
89                        }
90                    }
91                }
92                Err(e) => {
93                    tracing::error!("{}", e);
94                    continue;
95                }
96            }
97        };
98        
99        found.ok_or(ProtocolError::Read(Box::new(MemoryIoError::NotFound(id))))
100    }
101    
102    async fn read_to(&self, id: EntityId, from: i64, to: i64) -> Result<BTreeSet<Payload>, ProtocolError> {
103        tracing::debug!("read_to: id={}, from={}, to={}", id, from, to);
104        
105        let guard = self.store.read().await;
106        let Some(lock) = guard.get(&id) else {
107            return Ok(BTreeSet::new());
108        };
109        let found = loop {
110            match lock.read().await {
111                Ok(guard) => {
112                    let found = guard.iter()
113                        .filter(|payload| from <= payload.sequence_id && payload.sequence_id <= to)
114                        .cloned()
115                        .collect::<BTreeSet<_>>();
116                    match guard.sync().await {
117                        Ok(_) => break found,
118                        Err(e) => {
119                            tracing::error!("{}", e);
120                            continue;
121                        }
122                    }
123                }
124                Err(e) => {
125                    tracing::error!("{}", e);
126                    continue;
127                }
128            }
129        };
130        
131        tracing::trace!("len={}", found.len());
132        
133        Ok(found)
134    }
135}