coerce/persistent/journal/
provider.rs1use crate::persistent::journal::storage::JournalStorageRef;
2use std::sync::Arc;
3
4pub trait StorageProvider: 'static + Send + Sync {
5 fn journal_storage(&self) -> Option<JournalStorageRef>;
6}
7
8pub type StorageProviderRef = Arc<dyn StorageProvider>;
9
10pub mod inmemory {
11 use crate::persistent::journal::provider::StorageProvider;
12 use crate::persistent::journal::storage::{JournalEntry, JournalStorage, JournalStorageRef};
13 use parking_lot::RwLock;
14 use std::collections::hash_map::Entry;
15 use std::collections::HashMap;
16 use std::mem;
17 use std::sync::Arc;
18
19 #[derive(Debug)]
20 struct ActorJournal {
21 snapshots: Vec<JournalEntry>,
22 messages: Vec<JournalEntry>,
23 }
24
25 impl ActorJournal {
26 pub fn from_snapshot(entry: JournalEntry) -> ActorJournal {
27 ActorJournal {
28 snapshots: vec![entry],
29 messages: vec![],
30 }
31 }
32
33 pub fn from_message(entry: JournalEntry) -> ActorJournal {
34 ActorJournal {
35 snapshots: vec![],
36 messages: vec![entry],
37 }
38 }
39
40 pub fn from_messages(messages: Vec<JournalEntry>) -> ActorJournal {
41 ActorJournal {
42 snapshots: vec![],
43 messages,
44 }
45 }
46 }
47
48 #[derive(Default)]
49 pub struct InMemoryJournalStorage {
50 store: RwLock<HashMap<String, ActorJournal>>,
51 }
52
53 #[derive(Default)]
54 pub struct InMemoryStorageProvider {
55 store: Arc<InMemoryJournalStorage>,
56 }
57
58 impl InMemoryStorageProvider {
59 pub fn new() -> InMemoryStorageProvider {
60 Self::default()
61 }
62 }
63
64 impl StorageProvider for InMemoryStorageProvider {
65 fn journal_storage(&self) -> Option<JournalStorageRef> {
66 Some(self.store.clone())
67 }
68 }
69
70 #[async_trait]
71 impl JournalStorage for InMemoryJournalStorage {
72 async fn write_snapshot(
73 &self,
74 persistence_id: &str,
75 entry: JournalEntry,
76 ) -> anyhow::Result<()> {
77 let mut store = self.store.write();
78 if let Some(journal) = store.get_mut(persistence_id) {
79 journal.snapshots.push(entry);
80 } else {
81 store.insert(
82 persistence_id.to_string(),
83 ActorJournal::from_snapshot(entry),
84 );
85 }
86
87 Ok(())
88 }
89
90 async fn write_message(
91 &self,
92 persistence_id: &str,
93 entry: JournalEntry,
94 ) -> anyhow::Result<()> {
95 let mut store = self.store.write();
96 if let Some(journal) = store.get_mut(persistence_id) {
97 journal.messages.push(entry);
98 } else {
99 store.insert(
100 persistence_id.to_string(),
101 ActorJournal::from_message(entry),
102 );
103 }
104
105 Ok(())
106 }
107
108 async fn write_message_batch(
109 &self,
110 persistence_id: &str,
111 entries: Vec<JournalEntry>,
112 ) -> anyhow::Result<()> {
113 let mut store = self.store.write();
114 if let Some(journal) = store.get_mut(persistence_id) {
115 let mut entries = entries;
116 journal.messages.append(&mut entries);
117 } else {
118 store.insert(
119 persistence_id.to_string(),
120 ActorJournal::from_messages(entries),
121 );
122 }
123
124 Ok(())
125 }
126
127 async fn read_latest_snapshot(
128 &self,
129 persistence_id: &str,
130 ) -> anyhow::Result<Option<JournalEntry>> {
131 let store = self.store.read();
132 Ok(store
133 .get(persistence_id)
134 .and_then(|j| j.snapshots.last().cloned()))
135 }
136
137 async fn read_latest_messages(
138 &self,
139 persistence_id: &str,
140 from_sequence: i64,
141 ) -> anyhow::Result<Option<Vec<JournalEntry>>> {
142 let store = self.store.read();
143 Ok(store.get(persistence_id).map(|journal| {
144 let messages = match from_sequence {
145 0 => journal.messages.clone(),
146 from_sequence => {
147 let starting_message = journal
148 .messages
149 .iter()
150 .enumerate()
151 .find(|(_index, j)| j.sequence > from_sequence)
152 .map(|(index, _j)| index);
153
154 if let Some(starting_index) = starting_message {
155 journal.messages[starting_index..].iter().cloned().collect()
156 } else {
157 vec![]
158 }
159 }
160 };
161
162 trace!(
163 "storage found {} messages for persistence_id={}, from_sequence={}",
164 messages.len(),
165 persistence_id,
166 from_sequence
167 );
168
169 messages
170 }))
171 }
172
173 async fn read_message(
174 &self,
175 persistence_id: &str,
176 sequence_id: i64,
177 ) -> anyhow::Result<Option<JournalEntry>> {
178 let mut store = self.store.read();
179 let journal = store.get(persistence_id);
180 match journal {
181 None => Ok(None),
182 Some(journal) => Ok(journal
183 .messages
184 .iter()
185 .find(|n| n.sequence == sequence_id)
186 .cloned()),
187 }
188 }
189
190 async fn read_messages(
191 &self,
192 persistence_id: &str,
193 from_sequence: i64,
194 to_sequence: i64,
195 ) -> anyhow::Result<Option<Vec<JournalEntry>>> {
196 let mut store = self.store.read();
197 let journal = store.get(persistence_id);
198 match journal {
199 None => Ok(None),
200 Some(journal) => {
201 if journal.messages.is_empty() {
202 Ok(None)
203 } else {
204 let first_seq = journal.messages.first().map(|m| m.sequence).unwrap();
205 let final_seq = journal.messages.last().map(|m| m.sequence).unwrap();
206
207 if to_sequence >= final_seq {
208 if first_seq >= from_sequence {
209 Ok(Some(journal.messages.clone()))
210 } else {
211 let starting_message = journal
212 .messages
213 .iter()
214 .enumerate()
215 .find(|(_index, j)| j.sequence > from_sequence)
216 .map(|(index, _j)| index);
217
218 if let Some(starting_index) = starting_message {
219 Ok(Some(
220 journal.messages[starting_index..]
221 .iter()
222 .cloned()
223 .collect(),
224 ))
225 } else {
226 Ok(Some(vec![]))
227 }
228 }
229 } else if first_seq >= from_sequence {
230 let end_message = journal
231 .messages
232 .iter()
233 .enumerate()
234 .find(|(_index, j)| j.sequence > from_sequence)
235 .map(|(index, _j)| index);
236
237 if let Some(end_index) = end_message {
238 Ok(Some(
239 journal.messages[..end_index].iter().cloned().collect(),
240 ))
241 } else {
242 Ok(Some(vec![]))
243 }
244 } else {
245 panic!("todo: this")
246 }
247 }
248 }
249 }
250 }
251
252 async fn delete_messages_to(
253 &self,
254 persistence_id: &str,
255 to_sequence: i64,
256 ) -> anyhow::Result<()> {
257 let mut store = self.store.write();
258 let mut journal = store.entry(persistence_id.to_string());
259 if let Entry::Occupied(mut journal) = journal {
260 let journal = journal.get_mut();
261
262 fn get_messages_to(
263 to_sequence: i64,
264 journal: &mut ActorJournal,
265 ) -> Vec<JournalEntry> {
266 let starting_message = journal
267 .messages
268 .iter()
269 .enumerate()
270 .find(|(_index, j)| j.sequence >= to_sequence)
271 .map(|(index, _j)| index);
272
273 starting_message.map_or_else(|| vec![], |m| journal.messages.split_off(m))
274 }
275
276 let messages = if let Some(newest_msg) = journal.messages.last() {
277 if newest_msg.sequence < to_sequence {
278 vec![]
279 } else {
280 get_messages_to(to_sequence, journal)
281 }
282 } else {
283 get_messages_to(to_sequence, journal)
284 };
285
286 *journal = ActorJournal {
287 snapshots: mem::take(&mut journal.snapshots),
288 messages,
289 };
290 }
291
292 Ok(())
293 }
294
295 async fn delete_all(&self, persistence_id: &str) -> anyhow::Result<()> {
296 let mut store = self.store.write();
297 store.remove(persistence_id);
298 Ok(())
299 }
300 }
301}