coerce/persistent/journal/
provider.rs

1use crate::persistent::journal::storage::JournalStorageRef;
2use std::sync::Arc;
3
4pub trait StorageProvider: 'static + Send + Sync {
5    fn journal_storage(&self) -> Option<JournalStorageRef>;
6}
7
8pub type StorageProviderRef = Arc<dyn StorageProvider>;
9
10pub mod inmemory {
11    use crate::persistent::journal::provider::StorageProvider;
12    use crate::persistent::journal::storage::{JournalEntry, JournalStorage, JournalStorageRef};
13    use parking_lot::RwLock;
14    use std::collections::hash_map::Entry;
15    use std::collections::HashMap;
16    use std::mem;
17    use std::sync::Arc;
18
19    #[derive(Debug)]
20    struct ActorJournal {
21        snapshots: Vec<JournalEntry>,
22        messages: Vec<JournalEntry>,
23    }
24
25    impl ActorJournal {
26        pub fn from_snapshot(entry: JournalEntry) -> ActorJournal {
27            ActorJournal {
28                snapshots: vec![entry],
29                messages: vec![],
30            }
31        }
32
33        pub fn from_message(entry: JournalEntry) -> ActorJournal {
34            ActorJournal {
35                snapshots: vec![],
36                messages: vec![entry],
37            }
38        }
39
40        pub fn from_messages(messages: Vec<JournalEntry>) -> ActorJournal {
41            ActorJournal {
42                snapshots: vec![],
43                messages,
44            }
45        }
46    }
47
48    #[derive(Default)]
49    pub struct InMemoryJournalStorage {
50        store: RwLock<HashMap<String, ActorJournal>>,
51    }
52
53    #[derive(Default)]
54    pub struct InMemoryStorageProvider {
55        store: Arc<InMemoryJournalStorage>,
56    }
57
58    impl InMemoryStorageProvider {
59        pub fn new() -> InMemoryStorageProvider {
60            Self::default()
61        }
62    }
63
64    impl StorageProvider for InMemoryStorageProvider {
65        fn journal_storage(&self) -> Option<JournalStorageRef> {
66            Some(self.store.clone())
67        }
68    }
69
70    #[async_trait]
71    impl JournalStorage for InMemoryJournalStorage {
72        async fn write_snapshot(
73            &self,
74            persistence_id: &str,
75            entry: JournalEntry,
76        ) -> anyhow::Result<()> {
77            let mut store = self.store.write();
78            if let Some(journal) = store.get_mut(persistence_id) {
79                journal.snapshots.push(entry);
80            } else {
81                store.insert(
82                    persistence_id.to_string(),
83                    ActorJournal::from_snapshot(entry),
84                );
85            }
86
87            Ok(())
88        }
89
90        async fn write_message(
91            &self,
92            persistence_id: &str,
93            entry: JournalEntry,
94        ) -> anyhow::Result<()> {
95            let mut store = self.store.write();
96            if let Some(journal) = store.get_mut(persistence_id) {
97                journal.messages.push(entry);
98            } else {
99                store.insert(
100                    persistence_id.to_string(),
101                    ActorJournal::from_message(entry),
102                );
103            }
104
105            Ok(())
106        }
107
108        async fn write_message_batch(
109            &self,
110            persistence_id: &str,
111            entries: Vec<JournalEntry>,
112        ) -> anyhow::Result<()> {
113            let mut store = self.store.write();
114            if let Some(journal) = store.get_mut(persistence_id) {
115                let mut entries = entries;
116                journal.messages.append(&mut entries);
117            } else {
118                store.insert(
119                    persistence_id.to_string(),
120                    ActorJournal::from_messages(entries),
121                );
122            }
123
124            Ok(())
125        }
126
127        async fn read_latest_snapshot(
128            &self,
129            persistence_id: &str,
130        ) -> anyhow::Result<Option<JournalEntry>> {
131            let store = self.store.read();
132            Ok(store
133                .get(persistence_id)
134                .and_then(|j| j.snapshots.last().cloned()))
135        }
136
137        async fn read_latest_messages(
138            &self,
139            persistence_id: &str,
140            from_sequence: i64,
141        ) -> anyhow::Result<Option<Vec<JournalEntry>>> {
142            let store = self.store.read();
143            Ok(store.get(persistence_id).map(|journal| {
144                let messages = match from_sequence {
145                    0 => journal.messages.clone(),
146                    from_sequence => {
147                        let starting_message = journal
148                            .messages
149                            .iter()
150                            .enumerate()
151                            .find(|(_index, j)| j.sequence > from_sequence)
152                            .map(|(index, _j)| index);
153
154                        if let Some(starting_index) = starting_message {
155                            journal.messages[starting_index..].iter().cloned().collect()
156                        } else {
157                            vec![]
158                        }
159                    }
160                };
161
162                trace!(
163                    "storage found {} messages for persistence_id={}, from_sequence={}",
164                    messages.len(),
165                    persistence_id,
166                    from_sequence
167                );
168
169                messages
170            }))
171        }
172
173        async fn read_message(
174            &self,
175            persistence_id: &str,
176            sequence_id: i64,
177        ) -> anyhow::Result<Option<JournalEntry>> {
178            let mut store = self.store.read();
179            let journal = store.get(persistence_id);
180            match journal {
181                None => Ok(None),
182                Some(journal) => Ok(journal
183                    .messages
184                    .iter()
185                    .find(|n| n.sequence == sequence_id)
186                    .cloned()),
187            }
188        }
189
190        async fn read_messages(
191            &self,
192            persistence_id: &str,
193            from_sequence: i64,
194            to_sequence: i64,
195        ) -> anyhow::Result<Option<Vec<JournalEntry>>> {
196            let mut store = self.store.read();
197            let journal = store.get(persistence_id);
198            match journal {
199                None => Ok(None),
200                Some(journal) => {
201                    if journal.messages.is_empty() {
202                        Ok(None)
203                    } else {
204                        let first_seq = journal.messages.first().map(|m| m.sequence).unwrap();
205                        let final_seq = journal.messages.last().map(|m| m.sequence).unwrap();
206
207                        if to_sequence >= final_seq {
208                            if first_seq >= from_sequence {
209                                Ok(Some(journal.messages.clone()))
210                            } else {
211                                let starting_message = journal
212                                    .messages
213                                    .iter()
214                                    .enumerate()
215                                    .find(|(_index, j)| j.sequence > from_sequence)
216                                    .map(|(index, _j)| index);
217
218                                if let Some(starting_index) = starting_message {
219                                    Ok(Some(
220                                        journal.messages[starting_index..]
221                                            .iter()
222                                            .cloned()
223                                            .collect(),
224                                    ))
225                                } else {
226                                    Ok(Some(vec![]))
227                                }
228                            }
229                        } else if first_seq >= from_sequence {
230                            let end_message = journal
231                                .messages
232                                .iter()
233                                .enumerate()
234                                .find(|(_index, j)| j.sequence > from_sequence)
235                                .map(|(index, _j)| index);
236
237                            if let Some(end_index) = end_message {
238                                Ok(Some(
239                                    journal.messages[..end_index].iter().cloned().collect(),
240                                ))
241                            } else {
242                                Ok(Some(vec![]))
243                            }
244                        } else {
245                            panic!("todo: this")
246                        }
247                    }
248                }
249            }
250        }
251
252        async fn delete_messages_to(
253            &self,
254            persistence_id: &str,
255            to_sequence: i64,
256        ) -> anyhow::Result<()> {
257            let mut store = self.store.write();
258            let mut journal = store.entry(persistence_id.to_string());
259            if let Entry::Occupied(mut journal) = journal {
260                let journal = journal.get_mut();
261
262                fn get_messages_to(
263                    to_sequence: i64,
264                    journal: &mut ActorJournal,
265                ) -> Vec<JournalEntry> {
266                    let starting_message = journal
267                        .messages
268                        .iter()
269                        .enumerate()
270                        .find(|(_index, j)| j.sequence >= to_sequence)
271                        .map(|(index, _j)| index);
272
273                    starting_message.map_or_else(|| vec![], |m| journal.messages.split_off(m))
274                }
275
276                let messages = if let Some(newest_msg) = journal.messages.last() {
277                    if newest_msg.sequence < to_sequence {
278                        vec![]
279                    } else {
280                        get_messages_to(to_sequence, journal)
281                    }
282                } else {
283                    get_messages_to(to_sequence, journal)
284                };
285
286                *journal = ActorJournal {
287                    snapshots: mem::take(&mut journal.snapshots),
288                    messages,
289                };
290            }
291
292            Ok(())
293        }
294
295        async fn delete_all(&self, persistence_id: &str) -> anyhow::Result<()> {
296            let mut store = self.store.write();
297            store.remove(persistence_id);
298            Ok(())
299        }
300    }
301}