exocore_chain/pending/
memory.rs

1use std::{
2    collections::{BTreeMap, HashMap},
3    ops::RangeBounds,
4    sync::Arc,
5};
6
7use bytes::Bytes;
8
9use super::*;
10use crate::operation::Operation;
11
12/// In memory pending store
13pub struct MemoryPendingStore {
14    operations_timeline: BTreeMap<OperationId, GroupId>,
15    groups_operations: HashMap<GroupId, GroupOperations>,
16}
17
18impl MemoryPendingStore {
19    pub fn new() -> MemoryPendingStore {
20        MemoryPendingStore {
21            operations_timeline: BTreeMap::new(),
22            groups_operations: HashMap::new(),
23        }
24    }
25
26    #[cfg(test)]
27    pub fn clear(&mut self) {
28        self.operations_timeline.clear();
29        self.groups_operations.clear();
30    }
31}
32
33impl Default for MemoryPendingStore {
34    fn default() -> Self {
35        MemoryPendingStore::new()
36    }
37}
38
39impl PendingStore for MemoryPendingStore {
40    fn put_operation(&mut self, operation: operation::NewOperation) -> Result<bool, Error> {
41        let operation_reader = operation.get_operation_reader()?;
42        let operation_type = operation.get_type()?;
43
44        let group_id = operation_reader.get_group_id();
45        let group_operations = self
46            .groups_operations
47            .entry(group_id)
48            .or_insert_with(GroupOperations::new);
49
50        let operation_id = operation_reader.get_operation_id();
51        group_operations.operations.insert(
52            operation_id,
53            GroupOperation {
54                operation_id,
55                operation_type,
56                commit_status: CommitStatus::Unknown,
57                frame: Arc::new(operation.frame),
58            },
59        );
60
61        let existed = self
62            .operations_timeline
63            .insert(operation_id, group_id)
64            .is_some();
65        Ok(existed)
66    }
67
68    fn update_operation_commit_status(
69        &mut self,
70        operation_id: u64,
71        status: CommitStatus,
72    ) -> Result<(), Error> {
73        let group_id = self
74            .operations_timeline
75            .get(&operation_id)
76            .ok_or(Error::NotFound)?;
77        let group = self
78            .groups_operations
79            .get_mut(group_id)
80            .ok_or(Error::NotFound)?;
81        let group_operation: &mut GroupOperation = group
82            .operations
83            .get_mut(&operation_id)
84            .ok_or(Error::NotFound)?;
85
86        group_operation.commit_status = status;
87
88        Ok(())
89    }
90
91    fn get_operation(&self, operation_id: OperationId) -> Result<Option<StoredOperation>, Error> {
92        let operation = self
93            .operations_timeline
94            .get(&operation_id)
95            .and_then(|group_id| {
96                self.groups_operations
97                    .get(group_id)
98                    .and_then(|group_operations| {
99                        group_operations
100                            .operations
101                            .get(&operation_id)
102                            .map(|op| (*group_id, op))
103                    })
104            })
105            .map(|(group_id, op)| StoredOperation {
106                group_id,
107                operation_id: op.operation_id,
108                operation_type: op.operation_type,
109                commit_status: op.commit_status,
110                frame: Arc::clone(&op.frame),
111            });
112
113        Ok(operation)
114    }
115
116    fn get_group_operations(
117        &self,
118        group_id: GroupId,
119    ) -> Result<Option<StoredOperationsGroup>, Error> {
120        let operations = self.groups_operations.get(&group_id).map(|group_ops| {
121            let operations = group_ops
122                .operations
123                .values()
124                .map(|op| StoredOperation {
125                    group_id,
126                    operation_id: op.operation_id,
127                    operation_type: op.operation_type,
128                    commit_status: op.commit_status,
129                    frame: Arc::clone(&op.frame),
130                })
131                .collect();
132
133            StoredOperationsGroup {
134                group_id,
135                operations,
136            }
137        });
138
139        Ok(operations)
140    }
141
142    fn operations_iter<R>(&self, range: R) -> Result<TimelineIterator, Error>
143    where
144        R: RangeBounds<OperationId>,
145    {
146        let ids_iterator = self
147            .operations_timeline
148            .range(range)
149            .map(|(op_id, group_id)| (*op_id, *group_id));
150
151        Ok(Box::new(OperationsIterator {
152            store: self,
153            ids_iterator: Box::new(ids_iterator),
154        }))
155    }
156
157    fn operations_count(&self) -> usize {
158        self.operations_timeline.len()
159    }
160
161    fn delete_operation(&mut self, operation_id: OperationId) -> Result<(), Error> {
162        if let Some(group_operations_id) = self
163            .groups_operations
164            .get(&operation_id)
165            .map(|group| group.operations.keys())
166        {
167            // the operation is a group, we delete all its operations
168            for operation_id in group_operations_id {
169                self.operations_timeline.remove(operation_id);
170            }
171            self.operations_timeline.remove(&operation_id);
172            self.groups_operations.remove(&operation_id);
173        } else {
174            // operation is part of a group, we delete the operation from it
175            if let Some(group_id) = self.operations_timeline.get(&operation_id) {
176                if let Some(group) = self.groups_operations.get_mut(group_id) {
177                    group.operations.remove(&operation_id);
178                }
179            }
180            self.operations_timeline.remove(&operation_id);
181        }
182
183        Ok(())
184    }
185}
186
187impl MemoryPendingStore {
188    fn get_group_operation(
189        &self,
190        group_id: GroupId,
191        operation_id: OperationId,
192    ) -> Option<&GroupOperation> {
193        self.groups_operations
194            .get(&group_id)
195            .and_then(|group_ops| group_ops.operations.get(&operation_id))
196    }
197}
198
199struct GroupOperations {
200    operations: BTreeMap<OperationId, GroupOperation>,
201}
202
203impl GroupOperations {
204    fn new() -> GroupOperations {
205        GroupOperations {
206            operations: BTreeMap::new(),
207        }
208    }
209}
210
211struct GroupOperation {
212    operation_id: OperationId,
213    operation_type: operation::OperationType,
214    commit_status: CommitStatus,
215    frame: Arc<crate::operation::OperationFrame<Bytes>>,
216}
217
218struct OperationsIterator<'store> {
219    store: &'store MemoryPendingStore,
220    ids_iterator: Box<dyn Iterator<Item = (OperationId, GroupId)> + 'store>,
221}
222
223impl<'store> Iterator for OperationsIterator<'store> {
224    type Item = StoredOperation;
225
226    fn next(&mut self) -> Option<StoredOperation> {
227        let (operation_id, group_id) = self.ids_iterator.next()?;
228        let group_operation = self.store.get_group_operation(group_id, operation_id)?;
229
230        Some(StoredOperation {
231            group_id,
232            operation_id,
233            operation_type: group_operation.operation_type,
234            commit_status: group_operation.commit_status,
235            frame: Arc::clone(&group_operation.frame),
236        })
237    }
238}
239
240#[cfg(test)]
241mod test {
242    use exocore_core::cell::LocalNode;
243
244    use super::*;
245    use crate::engine::testing::create_dummy_new_entry_op;
246
247    #[test]
248    fn put_and_retrieve_operation() -> anyhow::Result<()> {
249        let local_node = LocalNode::generate();
250        let mut store = MemoryPendingStore::new();
251
252        store.put_operation(create_dummy_new_entry_op(&local_node, 105, 200))?;
253        store.put_operation(create_dummy_new_entry_op(&local_node, 100, 200))?;
254        store.put_operation(create_dummy_new_entry_op(&local_node, 102, 201))?;
255
256        let timeline: Vec<(OperationId, GroupId)> = store
257            .operations_iter(..)?
258            .map(|op| (op.operation_id, op.group_id))
259            .collect();
260        assert_eq!(timeline, vec![(100, 200), (102, 201), (105, 200),]);
261
262        assert!(store.get_operation(42)?.is_none());
263
264        let group_operations = store.get_group_operations(200)?.unwrap();
265        assert_eq!(group_operations.group_id, 200);
266
267        let op_ids = group_operations
268            .operations
269            .iter()
270            .map(|op| op.operation_id)
271            .collect::<Vec<OperationId>>();
272
273        assert_eq!(op_ids, vec![100, 105]);
274
275        Ok(())
276    }
277
278    #[test]
279    fn operations_iteration() -> anyhow::Result<()> {
280        let local_node = LocalNode::generate();
281        let mut store = MemoryPendingStore::new();
282
283        store
284            .put_operation(create_dummy_new_entry_op(&local_node, 105, 200))
285            .unwrap();
286        store
287            .put_operation(create_dummy_new_entry_op(&local_node, 100, 200))
288            .unwrap();
289        store
290            .put_operation(create_dummy_new_entry_op(&local_node, 102, 201))
291            .unwrap();
292        store
293            .put_operation(create_dummy_new_entry_op(&local_node, 107, 202))
294            .unwrap();
295        store
296            .put_operation(create_dummy_new_entry_op(&local_node, 110, 203))
297            .unwrap();
298
299        assert_eq!(store.operations_iter(..)?.count(), 5);
300
301        Ok(())
302    }
303
304    #[test]
305    fn operations_delete() -> anyhow::Result<()> {
306        let local_node = LocalNode::generate();
307        let mut store = MemoryPendingStore::new();
308
309        store.put_operation(create_dummy_new_entry_op(&local_node, 101, 200))?;
310        store.put_operation(create_dummy_new_entry_op(&local_node, 102, 200))?;
311        store.put_operation(create_dummy_new_entry_op(&local_node, 103, 200))?;
312        store.put_operation(create_dummy_new_entry_op(&local_node, 104, 200))?;
313
314        // delete a single operation within a group
315        store.delete_operation(103)?;
316        assert!(store.get_operation(103)?.is_none());
317
318        let operations = store.get_group_operations(200)?.unwrap();
319        assert!(!operations
320            .operations
321            .iter()
322            .any(|op| op.operation_id == 103));
323
324        // delete a group operation
325        store.delete_operation(200)?;
326        assert!(store.get_operation(200)?.is_none());
327        assert!(store.get_group_operations(200)?.is_none());
328
329        Ok(())
330    }
331
332    #[test]
333    fn operation_commit_status() -> anyhow::Result<()> {
334        let local_node = LocalNode::generate();
335        let mut store = MemoryPendingStore::new();
336
337        store.put_operation(create_dummy_new_entry_op(&local_node, 101, 200))?;
338
339        let operation = store.get_operation(101)?.unwrap();
340        assert_eq!(CommitStatus::Unknown, operation.commit_status);
341
342        store.update_operation_commit_status(101, CommitStatus::Committed(1, 2))?;
343        let operation = store.get_operation(101)?.unwrap();
344        assert_eq!(CommitStatus::Committed(1, 2), operation.commit_status);
345
346        assert!(store
347            .update_operation_commit_status(666, CommitStatus::Committed(2, 3))
348            .is_err());
349
350        Ok(())
351    }
352}