nitinol_inmemory_adaptor/
store.rs1use std::collections::{BTreeSet, HashMap};
2use std::fmt::{Debug, Formatter};
3use std::sync::Arc;
4use async_trait::async_trait;
5use nitinol_core::identifier::EntityId;
6use nitinol_protocol::errors::ProtocolError;
7use nitinol_protocol::io::{Reader, Writer};
8use nitinol_protocol::Payload;
9use tokio::sync::RwLock;
10use crate::errors::MemoryIoError;
11use crate::lock::OptLock;
12
13pub struct InMemoryEventStore {
14 store: Arc<RwLock<HashMap<EntityId, OptLock<BTreeSet<Payload>>>>>
15}
16
17impl Debug for InMemoryEventStore {
18 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
19 write!(f, "InMemoryEventStore")
20 }
21}
22
23impl Clone for InMemoryEventStore {
24 fn clone(&self) -> Self {
25 Self { store: Arc::clone(&self.store) }
26 }
27}
28
29impl Default for InMemoryEventStore {
30 fn default() -> Self {
31 Self { store: Arc::new(RwLock::new(HashMap::new())) }
32 }
33}
34
35#[async_trait]
36impl Writer for InMemoryEventStore {
37 async fn write(&self, aggregate_id: EntityId, payload: Payload) -> Result<(), ProtocolError> {
38 let guard = self.store.read().await;
39 if !guard.contains_key(&aggregate_id) {
40 tracing::debug!("not found entity: {}", aggregate_id);
41
42 drop(guard); tracing::debug!("create new store for entity: {}", aggregate_id);
45 let mut guard = self.store.write().await;
46 let mut init = BTreeSet::new();
47 init.insert(payload);
48 guard.insert(aggregate_id, OptLock::new(init));
49 tracing::debug!("create successfully.");
50 return Ok(())
51 }
52
53 let Some(lock) = guard.get(&aggregate_id) else {
54 panic!("If the target store does not exist, a new one should be created, so it should definitely exist.");
55 };
56
57 let mut lock = lock.write().await
58 .map_err(|e| ProtocolError::Write(Box::new(e)))?;
59
60 tracing::debug!("writing: {:?}", payload);
61
62 lock.insert(payload);
63
64 tracing::debug!("write successfully.");
65 Ok(())
66 }
67}
68
69#[async_trait]
70impl Reader for InMemoryEventStore {
71 async fn read(&self, id: EntityId, seq: i64) -> Result<Payload, ProtocolError> {
72 tracing::debug!("read: id={}, seq={}", id, seq);
73
74 let guard = self.store.read().await;
75 let Some(lock) = guard.get(&id) else {
76 return Err(ProtocolError::Read(Box::new(MemoryIoError::NotFound(id))));
77 };
78 let found = loop {
79 match lock.read().await {
80 Ok(guard) => {
81 let found = guard.iter()
82 .find(|payload| payload.sequence_id.eq(&seq))
83 .cloned();
84 match guard.sync().await {
85 Ok(_) => break found,
86 Err(e) => {
87 tracing::error!("{}", e);
88 continue;
89 }
90 }
91 }
92 Err(e) => {
93 tracing::error!("{}", e);
94 continue;
95 }
96 }
97 };
98
99 found.ok_or(ProtocolError::Read(Box::new(MemoryIoError::NotFound(id))))
100 }
101
102 async fn read_to(&self, id: EntityId, from: i64, to: i64) -> Result<BTreeSet<Payload>, ProtocolError> {
103 tracing::debug!("read_to: id={}, from={}, to={}", id, from, to);
104
105 let guard = self.store.read().await;
106 let Some(lock) = guard.get(&id) else {
107 return Ok(BTreeSet::new());
108 };
109 let found = loop {
110 match lock.read().await {
111 Ok(guard) => {
112 let found = guard.iter()
113 .filter(|payload| from <= payload.sequence_id && payload.sequence_id <= to)
114 .cloned()
115 .collect::<BTreeSet<_>>();
116 match guard.sync().await {
117 Ok(_) => break found,
118 Err(e) => {
119 tracing::error!("{}", e);
120 continue;
121 }
122 }
123 }
124 Err(e) => {
125 tracing::error!("{}", e);
126 continue;
127 }
128 }
129 };
130
131 tracing::trace!("len={}", found.len());
132
133 Ok(found)
134 }
135}