p2panda_stream/ordering/partial/
store.rs1use std::collections::{HashMap, HashSet, VecDeque};
4use std::fmt::Debug;
5use std::hash::Hash as StdHash;
6
7use crate::partial::PartialOrderError;
8
9#[allow(async_fn_in_trait)]
16pub trait PartialOrderStore<K>
17where
18 K: Clone + Copy + StdHash + PartialEq + Eq,
19{
20 async fn mark_ready(&mut self, key: K) -> Result<bool, PartialOrderError>;
23
24 async fn mark_pending(
26 &mut self,
27 key: K,
28 dependencies: Vec<K>,
29 ) -> Result<bool, PartialOrderError>;
30
31 async fn get_next_pending(
33 &self,
34 key: K,
35 ) -> Result<Option<HashSet<(K, Vec<K>)>>, PartialOrderError>;
36
37 async fn take_next_ready(&mut self) -> Result<Option<K>, PartialOrderError>;
39
40 async fn remove_pending(&mut self, key: K) -> Result<bool, PartialOrderError>;
42
43 async fn ready(&self, keys: &[K]) -> Result<bool, PartialOrderError>;
45}
46
47#[derive(Clone)]
49pub struct MemoryStore<K> {
50 pub(crate) ready: HashSet<K>,
51 pub(crate) ready_queue: VecDeque<K>,
52 pub(crate) pending: HashMap<K, HashSet<(K, Vec<K>)>>,
53}
54
55impl<K> Default for MemoryStore<K> {
56 fn default() -> Self {
57 Self {
58 ready: HashSet::new(),
59 ready_queue: VecDeque::new(),
60 pending: HashMap::new(),
61 }
62 }
63}
64
65impl<K> PartialOrderStore<K> for MemoryStore<K>
66where
67 K: Clone + Copy + Debug + StdHash + PartialEq + Eq,
68{
69 async fn mark_ready(&mut self, key: K) -> Result<bool, PartialOrderError> {
70 let result = self.ready.insert(key);
71 if result {
72 self.ready_queue.push_back(key);
73 }
74 Ok(result)
75 }
76
77 async fn mark_pending(
78 &mut self,
79 key: K,
80 dependencies: Vec<K>,
81 ) -> Result<bool, PartialOrderError> {
82 let insert_occured = false;
83 for dep_key in &dependencies {
84 if self.ready.contains(dep_key) {
85 continue;
86 }
87
88 let dependents = self.pending.entry(*dep_key).or_default();
89 dependents.insert((key, dependencies.clone()));
90 }
91
92 Ok(insert_occured)
93 }
94
95 async fn get_next_pending(
96 &self,
97 key: K,
98 ) -> Result<Option<HashSet<(K, Vec<K>)>>, PartialOrderError> {
99 Ok(self.pending.get(&key).cloned())
100 }
101
102 async fn take_next_ready(&mut self) -> Result<Option<K>, PartialOrderError> {
103 Ok(self.ready_queue.pop_front())
104 }
105
106 async fn remove_pending(&mut self, key: K) -> Result<bool, PartialOrderError> {
107 Ok(self.pending.remove(&key).is_some())
108 }
109
110 async fn ready(&self, dependencies: &[K]) -> Result<bool, PartialOrderError> {
111 let deps_set = HashSet::from_iter(dependencies.iter().cloned());
112 let result = self.ready.is_superset(&deps_set);
113 Ok(result)
114 }
115}