p2panda_stream/ordering/partial/
store.rs

1// SPDX-License-Identifier: MIT OR Apache-2.0
2
3use std::collections::{HashMap, HashSet, VecDeque};
4use std::fmt::Debug;
5use std::hash::Hash as StdHash;
6
7use crate::partial::PartialOrderError;
8
9/// Trait defining a store API for handling ready and pending dependencies.
10///
11/// An implementation of this store trait provides the following functionality:
12/// - maintain a list of all items which have all their dependencies met
13/// - maintain a list of items which don't have their dependencies met
14/// - return all pending items which depend on a given item
15#[allow(async_fn_in_trait)]
16pub trait PartialOrderStore<K>
17where
18    K: Clone + Copy + StdHash + PartialEq + Eq,
19{
20    /// Add an item to the store which has all it's dependencies met already. If this is the first
21    /// time the item has been added it should also be pushed to the end of a "ready" queue.
22    async fn mark_ready(&mut self, key: K) -> Result<bool, PartialOrderError>;
23
24    /// Add an item which does not have all it's dependencies met yet.
25    async fn mark_pending(
26        &mut self,
27        key: K,
28        dependencies: Vec<K>,
29    ) -> Result<bool, PartialOrderError>;
30
31    /// Get all pending items which directly depend on the given key.
32    async fn get_next_pending(
33        &self,
34        key: K,
35    ) -> Result<Option<HashSet<(K, Vec<K>)>>, PartialOrderError>;
36
37    /// Take the next ready item from the ready queue.
38    async fn take_next_ready(&mut self) -> Result<Option<K>, PartialOrderError>;
39
40    /// Remove all items from the pending queue which depend on the passed key.
41    async fn remove_pending(&mut self, key: K) -> Result<bool, PartialOrderError>;
42
43    /// Returns `true` of all the passed keys are present in the ready list.
44    async fn ready(&self, keys: &[K]) -> Result<bool, PartialOrderError>;
45}
46
47/// Memory implementation of the `PartialOrderStore` trait.
48#[derive(Clone)]
49pub struct MemoryStore<K> {
50    pub(crate) ready: HashSet<K>,
51    pub(crate) ready_queue: VecDeque<K>,
52    pub(crate) pending: HashMap<K, HashSet<(K, Vec<K>)>>,
53}
54
55impl<K> Default for MemoryStore<K> {
56    fn default() -> Self {
57        Self {
58            ready: HashSet::new(),
59            ready_queue: VecDeque::new(),
60            pending: HashMap::new(),
61        }
62    }
63}
64
65impl<K> PartialOrderStore<K> for MemoryStore<K>
66where
67    K: Clone + Copy + Debug + StdHash + PartialEq + Eq,
68{
69    async fn mark_ready(&mut self, key: K) -> Result<bool, PartialOrderError> {
70        let result = self.ready.insert(key);
71        if result {
72            self.ready_queue.push_back(key);
73        }
74        Ok(result)
75    }
76
77    async fn mark_pending(
78        &mut self,
79        key: K,
80        dependencies: Vec<K>,
81    ) -> Result<bool, PartialOrderError> {
82        let insert_occured = false;
83        for dep_key in &dependencies {
84            if self.ready.contains(dep_key) {
85                continue;
86            }
87
88            let dependents = self.pending.entry(*dep_key).or_default();
89            dependents.insert((key, dependencies.clone()));
90        }
91
92        Ok(insert_occured)
93    }
94
95    async fn get_next_pending(
96        &self,
97        key: K,
98    ) -> Result<Option<HashSet<(K, Vec<K>)>>, PartialOrderError> {
99        Ok(self.pending.get(&key).cloned())
100    }
101
102    async fn take_next_ready(&mut self) -> Result<Option<K>, PartialOrderError> {
103        Ok(self.ready_queue.pop_front())
104    }
105
106    async fn remove_pending(&mut self, key: K) -> Result<bool, PartialOrderError> {
107        Ok(self.pending.remove(&key).is_some())
108    }
109
110    async fn ready(&self, dependencies: &[K]) -> Result<bool, PartialOrderError> {
111        let deps_set = HashSet::from_iter(dependencies.iter().cloned());
112        let result = self.ready.is_superset(&deps_set);
113        Ok(result)
114    }
115}