1use std::{
2 collections::{BTreeMap, HashMap},
3 ops::RangeBounds,
4 sync::Arc,
5};
6
7use bytes::Bytes;
8
9use super::*;
10use crate::operation::Operation;
11
12pub struct MemoryPendingStore {
14 operations_timeline: BTreeMap<OperationId, GroupId>,
15 groups_operations: HashMap<GroupId, GroupOperations>,
16}
17
18impl MemoryPendingStore {
19 pub fn new() -> MemoryPendingStore {
20 MemoryPendingStore {
21 operations_timeline: BTreeMap::new(),
22 groups_operations: HashMap::new(),
23 }
24 }
25
26 #[cfg(test)]
27 pub fn clear(&mut self) {
28 self.operations_timeline.clear();
29 self.groups_operations.clear();
30 }
31}
32
33impl Default for MemoryPendingStore {
34 fn default() -> Self {
35 MemoryPendingStore::new()
36 }
37}
38
39impl PendingStore for MemoryPendingStore {
40 fn put_operation(&mut self, operation: operation::NewOperation) -> Result<bool, Error> {
41 let operation_reader = operation.get_operation_reader()?;
42 let operation_type = operation.get_type()?;
43
44 let group_id = operation_reader.get_group_id();
45 let group_operations = self
46 .groups_operations
47 .entry(group_id)
48 .or_insert_with(GroupOperations::new);
49
50 let operation_id = operation_reader.get_operation_id();
51 group_operations.operations.insert(
52 operation_id,
53 GroupOperation {
54 operation_id,
55 operation_type,
56 commit_status: CommitStatus::Unknown,
57 frame: Arc::new(operation.frame),
58 },
59 );
60
61 let existed = self
62 .operations_timeline
63 .insert(operation_id, group_id)
64 .is_some();
65 Ok(existed)
66 }
67
68 fn update_operation_commit_status(
69 &mut self,
70 operation_id: u64,
71 status: CommitStatus,
72 ) -> Result<(), Error> {
73 let group_id = self
74 .operations_timeline
75 .get(&operation_id)
76 .ok_or(Error::NotFound)?;
77 let group = self
78 .groups_operations
79 .get_mut(group_id)
80 .ok_or(Error::NotFound)?;
81 let group_operation: &mut GroupOperation = group
82 .operations
83 .get_mut(&operation_id)
84 .ok_or(Error::NotFound)?;
85
86 group_operation.commit_status = status;
87
88 Ok(())
89 }
90
91 fn get_operation(&self, operation_id: OperationId) -> Result<Option<StoredOperation>, Error> {
92 let operation = self
93 .operations_timeline
94 .get(&operation_id)
95 .and_then(|group_id| {
96 self.groups_operations
97 .get(group_id)
98 .and_then(|group_operations| {
99 group_operations
100 .operations
101 .get(&operation_id)
102 .map(|op| (*group_id, op))
103 })
104 })
105 .map(|(group_id, op)| StoredOperation {
106 group_id,
107 operation_id: op.operation_id,
108 operation_type: op.operation_type,
109 commit_status: op.commit_status,
110 frame: Arc::clone(&op.frame),
111 });
112
113 Ok(operation)
114 }
115
116 fn get_group_operations(
117 &self,
118 group_id: GroupId,
119 ) -> Result<Option<StoredOperationsGroup>, Error> {
120 let operations = self.groups_operations.get(&group_id).map(|group_ops| {
121 let operations = group_ops
122 .operations
123 .values()
124 .map(|op| StoredOperation {
125 group_id,
126 operation_id: op.operation_id,
127 operation_type: op.operation_type,
128 commit_status: op.commit_status,
129 frame: Arc::clone(&op.frame),
130 })
131 .collect();
132
133 StoredOperationsGroup {
134 group_id,
135 operations,
136 }
137 });
138
139 Ok(operations)
140 }
141
142 fn operations_iter<R>(&self, range: R) -> Result<TimelineIterator, Error>
143 where
144 R: RangeBounds<OperationId>,
145 {
146 let ids_iterator = self
147 .operations_timeline
148 .range(range)
149 .map(|(op_id, group_id)| (*op_id, *group_id));
150
151 Ok(Box::new(OperationsIterator {
152 store: self,
153 ids_iterator: Box::new(ids_iterator),
154 }))
155 }
156
157 fn operations_count(&self) -> usize {
158 self.operations_timeline.len()
159 }
160
161 fn delete_operation(&mut self, operation_id: OperationId) -> Result<(), Error> {
162 if let Some(group_operations_id) = self
163 .groups_operations
164 .get(&operation_id)
165 .map(|group| group.operations.keys())
166 {
167 for operation_id in group_operations_id {
169 self.operations_timeline.remove(operation_id);
170 }
171 self.operations_timeline.remove(&operation_id);
172 self.groups_operations.remove(&operation_id);
173 } else {
174 if let Some(group_id) = self.operations_timeline.get(&operation_id) {
176 if let Some(group) = self.groups_operations.get_mut(group_id) {
177 group.operations.remove(&operation_id);
178 }
179 }
180 self.operations_timeline.remove(&operation_id);
181 }
182
183 Ok(())
184 }
185}
186
187impl MemoryPendingStore {
188 fn get_group_operation(
189 &self,
190 group_id: GroupId,
191 operation_id: OperationId,
192 ) -> Option<&GroupOperation> {
193 self.groups_operations
194 .get(&group_id)
195 .and_then(|group_ops| group_ops.operations.get(&operation_id))
196 }
197}
198
199struct GroupOperations {
200 operations: BTreeMap<OperationId, GroupOperation>,
201}
202
203impl GroupOperations {
204 fn new() -> GroupOperations {
205 GroupOperations {
206 operations: BTreeMap::new(),
207 }
208 }
209}
210
211struct GroupOperation {
212 operation_id: OperationId,
213 operation_type: operation::OperationType,
214 commit_status: CommitStatus,
215 frame: Arc<crate::operation::OperationFrame<Bytes>>,
216}
217
218struct OperationsIterator<'store> {
219 store: &'store MemoryPendingStore,
220 ids_iterator: Box<dyn Iterator<Item = (OperationId, GroupId)> + 'store>,
221}
222
223impl<'store> Iterator for OperationsIterator<'store> {
224 type Item = StoredOperation;
225
226 fn next(&mut self) -> Option<StoredOperation> {
227 let (operation_id, group_id) = self.ids_iterator.next()?;
228 let group_operation = self.store.get_group_operation(group_id, operation_id)?;
229
230 Some(StoredOperation {
231 group_id,
232 operation_id,
233 operation_type: group_operation.operation_type,
234 commit_status: group_operation.commit_status,
235 frame: Arc::clone(&group_operation.frame),
236 })
237 }
238}
239
240#[cfg(test)]
241mod test {
242 use exocore_core::cell::LocalNode;
243
244 use super::*;
245 use crate::engine::testing::create_dummy_new_entry_op;
246
247 #[test]
248 fn put_and_retrieve_operation() -> anyhow::Result<()> {
249 let local_node = LocalNode::generate();
250 let mut store = MemoryPendingStore::new();
251
252 store.put_operation(create_dummy_new_entry_op(&local_node, 105, 200))?;
253 store.put_operation(create_dummy_new_entry_op(&local_node, 100, 200))?;
254 store.put_operation(create_dummy_new_entry_op(&local_node, 102, 201))?;
255
256 let timeline: Vec<(OperationId, GroupId)> = store
257 .operations_iter(..)?
258 .map(|op| (op.operation_id, op.group_id))
259 .collect();
260 assert_eq!(timeline, vec![(100, 200), (102, 201), (105, 200),]);
261
262 assert!(store.get_operation(42)?.is_none());
263
264 let group_operations = store.get_group_operations(200)?.unwrap();
265 assert_eq!(group_operations.group_id, 200);
266
267 let op_ids = group_operations
268 .operations
269 .iter()
270 .map(|op| op.operation_id)
271 .collect::<Vec<OperationId>>();
272
273 assert_eq!(op_ids, vec![100, 105]);
274
275 Ok(())
276 }
277
278 #[test]
279 fn operations_iteration() -> anyhow::Result<()> {
280 let local_node = LocalNode::generate();
281 let mut store = MemoryPendingStore::new();
282
283 store
284 .put_operation(create_dummy_new_entry_op(&local_node, 105, 200))
285 .unwrap();
286 store
287 .put_operation(create_dummy_new_entry_op(&local_node, 100, 200))
288 .unwrap();
289 store
290 .put_operation(create_dummy_new_entry_op(&local_node, 102, 201))
291 .unwrap();
292 store
293 .put_operation(create_dummy_new_entry_op(&local_node, 107, 202))
294 .unwrap();
295 store
296 .put_operation(create_dummy_new_entry_op(&local_node, 110, 203))
297 .unwrap();
298
299 assert_eq!(store.operations_iter(..)?.count(), 5);
300
301 Ok(())
302 }
303
304 #[test]
305 fn operations_delete() -> anyhow::Result<()> {
306 let local_node = LocalNode::generate();
307 let mut store = MemoryPendingStore::new();
308
309 store.put_operation(create_dummy_new_entry_op(&local_node, 101, 200))?;
310 store.put_operation(create_dummy_new_entry_op(&local_node, 102, 200))?;
311 store.put_operation(create_dummy_new_entry_op(&local_node, 103, 200))?;
312 store.put_operation(create_dummy_new_entry_op(&local_node, 104, 200))?;
313
314 store.delete_operation(103)?;
316 assert!(store.get_operation(103)?.is_none());
317
318 let operations = store.get_group_operations(200)?.unwrap();
319 assert!(!operations
320 .operations
321 .iter()
322 .any(|op| op.operation_id == 103));
323
324 store.delete_operation(200)?;
326 assert!(store.get_operation(200)?.is_none());
327 assert!(store.get_group_operations(200)?.is_none());
328
329 Ok(())
330 }
331
332 #[test]
333 fn operation_commit_status() -> anyhow::Result<()> {
334 let local_node = LocalNode::generate();
335 let mut store = MemoryPendingStore::new();
336
337 store.put_operation(create_dummy_new_entry_op(&local_node, 101, 200))?;
338
339 let operation = store.get_operation(101)?.unwrap();
340 assert_eq!(CommitStatus::Unknown, operation.commit_status);
341
342 store.update_operation_commit_status(101, CommitStatus::Committed(1, 2))?;
343 let operation = store.get_operation(101)?.unwrap();
344 assert_eq!(CommitStatus::Committed(1, 2), operation.commit_status);
345
346 assert!(store
347 .update_operation_commit_status(666, CommitStatus::Committed(2, 3))
348 .is_err());
349
350 Ok(())
351 }
352}