p2panda_stream/ordering/partial/
mod.rs1pub mod operations;
4pub mod store;
5
6use std::fmt::{Debug, Display};
7use std::hash::Hash as StdHash;
8use std::marker::PhantomData;
9
10use thiserror::Error;
11
12pub use crate::partial::store::{MemoryStore, PartialOrderStore};
13
14#[derive(Debug, Error)]
16pub enum PartialOrderError {
17 #[error("store error: {0}")]
18 StoreError(String),
19}
20
21#[derive(Debug)]
83pub struct PartialOrder<K, S> {
84 store: S,
86 _phantom: PhantomData<K>,
87}
88
89impl<K, S> PartialOrder<K, S>
90where
91 K: Clone + Copy + Display + StdHash + PartialEq + Eq,
92 S: PartialOrderStore<K>,
93{
94 pub fn new(store: S) -> Self {
95 Self {
96 store,
97 _phantom: PhantomData,
98 }
99 }
100
101 pub async fn next(&mut self) -> Result<Option<K>, PartialOrderError> {
103 self.store.take_next_ready().await
104 }
105
106 pub async fn process(&mut self, key: K, dependencies: &[K]) -> Result<(), PartialOrderError> {
108 if !self.store.ready(dependencies).await? {
109 self.store.mark_pending(key, dependencies.to_vec()).await?;
110 return Ok(());
111 }
112
113 self.store.mark_ready(key).await?;
114
115 self.process_pending(key).await?;
118
119 Ok(())
120 }
121
122 async fn process_pending(&mut self, key: K) -> Result<(), PartialOrderError> {
124 let Some(dependents) = self.store.get_next_pending(key).await? else {
126 return Ok(());
127 };
128
129 for (next_key, next_deps) in dependents {
132 if !self.store.ready(&next_deps).await? {
133 continue;
134 }
135
136 self.store.mark_ready(next_key).await?;
137
138 Box::pin(self.process_pending(next_key)).await?;
141 }
142
143 self.store.remove_pending(key).await?;
145
146 Ok(())
147 }
148}
149
150#[cfg(test)]
151mod tests {
152 use std::collections::HashSet;
153
154 use super::{MemoryStore, PartialOrder};
155
156 #[tokio::test]
157 async fn partial_order() {
158 let graph = [
164 ("A", vec![]),
165 ("B", vec!["A"]),
166 ("C", vec!["B"]),
167 ("D", vec!["B", "C"]),
168 ];
169
170 let store = MemoryStore::default();
172 let mut checker = PartialOrder::new(store);
173 let item = graph[0].clone();
174 checker.process(item.0, &item.1).await.unwrap();
175 assert_eq!(checker.store.ready.len(), 1);
176 assert_eq!(checker.store.pending.len(), 0);
177 assert_eq!(checker.store.ready_queue.len(), 1);
178
179 let item = graph[1].clone();
182 checker.process(item.0, &item.1).await.unwrap();
183 assert_eq!(checker.store.ready.len(), 2);
184 assert_eq!(checker.store.pending.len(), 0);
185 assert_eq!(checker.store.ready_queue.len(), 2);
186
187 let item = graph[3].clone();
189 checker.process(item.0, &item.1).await.unwrap();
190 assert_eq!(checker.store.ready.len(), 2);
191 assert_eq!(checker.store.pending.len(), 1);
192 assert_eq!(checker.store.ready_queue.len(), 2);
193
194 let item = graph[2].clone();
197 checker.process(item.0, &item.1).await.unwrap();
198 assert_eq!(checker.store.ready.len(), 4);
199 assert_eq!(checker.store.pending.len(), 0);
200 assert_eq!(checker.store.ready_queue.len(), 4);
201
202 let item = checker.next().await.unwrap();
203 assert_eq!(item, Some("A"));
204 let item = checker.next().await.unwrap();
205 assert_eq!(item, Some("B"));
206 let item = checker.next().await.unwrap();
207 assert_eq!(item, Some("C"));
208 let item = checker.next().await.unwrap();
209 assert_eq!(item, Some("D"));
210 let item = checker.next().await.unwrap();
211 assert!(item.is_none());
212 }
213
214 #[tokio::test]
215 async fn partial_order_with_recursion() {
216 let incomplete_graph = [
222 ("A", vec![]),
223 ("C", vec!["B"]),
224 ("D", vec!["C"]),
225 ("E", vec!["D"]),
226 ("F", vec!["E"]),
227 ("G", vec!["F"]),
228 ];
229
230 let store = MemoryStore::default();
231 let mut checker = PartialOrder::new(store);
232 for (key, dependencies) in incomplete_graph {
233 checker.process(key, &dependencies).await.unwrap();
234 }
235 assert_eq!(checker.store.ready.len(), 1);
236 assert_eq!(checker.store.pending.len(), 5);
237 assert_eq!(checker.store.ready_queue.len(), 1);
238
239 let missing_dependency = ("B", vec!["A"]);
240
241 checker
242 .process(missing_dependency.0, &missing_dependency.1)
243 .await
244 .unwrap();
245 assert_eq!(checker.store.ready.len(), 7);
246 assert_eq!(checker.store.pending.len(), 0);
247 assert_eq!(checker.store.ready_queue.len(), 7);
248
249 let item = checker.next().await.unwrap();
250 assert_eq!(item, Some("A"));
251 let item = checker.next().await.unwrap();
252 assert_eq!(item, Some("B"));
253 let item = checker.next().await.unwrap();
254 assert_eq!(item, Some("C"));
255 let item = checker.next().await.unwrap();
256 assert_eq!(item, Some("D"));
257 let item = checker.next().await.unwrap();
258 assert_eq!(item, Some("E"));
259 let item = checker.next().await.unwrap();
260 assert_eq!(item, Some("F"));
261 let item = checker.next().await.unwrap();
262 assert_eq!(item, Some("G"));
263 let item = checker.next().await.unwrap();
264 assert!(item.is_none());
265 }
266
267 #[tokio::test]
268 async fn complex_graph() {
269 let incomplete_graph = [
276 ("A", vec![]),
277 ("B1", vec!["A"]),
278 ("C1", vec!["B1"]),
281 ("C2", vec!["B2"]),
282 ("C3", vec!["B2"]),
283 ("D", vec!["C1", "C2", "C3"]),
284 ];
285
286 let store = MemoryStore::default();
287 let mut checker = PartialOrder::new(store);
288 for (key, dependencies) in incomplete_graph {
289 checker.process(key, &dependencies).await.unwrap();
290 }
291
292 assert!(checker.store.ready.len() == 3);
294 assert_eq!(checker.store.pending.len(), 3);
295 assert_eq!(checker.store.ready_queue.len(), 3);
296
297 let item = checker.next().await.unwrap();
298 assert_eq!(item, Some("A"));
299 let item = checker.next().await.unwrap();
300 assert_eq!(item, Some("B1"));
301 let item = checker.next().await.unwrap();
302 assert_eq!(item, Some("C1"));
303 let item = checker.next().await.unwrap();
304 assert!(item.is_none());
305
306 assert_eq!(checker.store.ready_queue.len(), 0);
308
309 let missing_dependency = ("B2", vec!["A"]);
311 checker
312 .process(missing_dependency.0, &missing_dependency.1)
313 .await
314 .unwrap();
315
316 assert_eq!(checker.store.ready.len(), 7);
318 assert_eq!(checker.store.pending.len(), 0);
319 assert_eq!(checker.store.ready_queue.len(), 4);
320
321 let mut concurrent_items = HashSet::from(["C2", "C3"]);
322
323 let item = checker.next().await.unwrap().unwrap();
324 assert_eq!(item, "B2");
325 let item = checker.next().await.unwrap().unwrap();
326 assert!(concurrent_items.remove(item));
327 let item = checker.next().await.unwrap().unwrap();
328 assert!(concurrent_items.remove(item));
329 let item = checker.next().await.unwrap().unwrap();
330 assert_eq!(item, "D");
331 let item = checker.next().await.unwrap();
332 assert!(item.is_none());
333 }
334
335 #[tokio::test]
336 async fn very_out_of_order() {
337 let out_of_order_graph = [
344 ("D", vec!["C1", "C2", "C3"]),
345 ("C1", vec!["B1"]),
346 ("B1", vec!["A"]),
347 ("B2", vec!["A"]),
348 ("C3", vec!["B2"]),
349 ("C2", vec!["B2"]),
350 ("A", vec![]),
351 ];
352
353 let store = MemoryStore::default();
354 let mut checker = PartialOrder::new(store);
355 for (key, dependencies) in out_of_order_graph {
356 checker.process(key, &dependencies).await.unwrap();
357 }
358
359 assert!(checker.store.ready.len() == 7);
360 assert_eq!(checker.store.pending.len(), 0);
361 assert_eq!(checker.store.ready_queue.len(), 7);
362
363 let item = checker.next().await.unwrap();
364 assert_eq!(item, Some("A"));
365
366 let mut concurrent_items = HashSet::from(["B1", "B2", "C1", "C2", "C3"]);
367
368 let item = checker.next().await.unwrap().unwrap();
369 assert!(concurrent_items.remove(item));
370 let item = checker.next().await.unwrap().unwrap();
371 assert!(concurrent_items.remove(item));
372 let item = checker.next().await.unwrap().unwrap();
373 assert!(concurrent_items.remove(item));
374 let item = checker.next().await.unwrap().unwrap();
375 assert!(concurrent_items.remove(item));
376 let item = checker.next().await.unwrap().unwrap();
377 assert!(concurrent_items.remove(item));
378 let item = checker.next().await.unwrap().unwrap();
379 assert_eq!(item, "D");
380 let item = checker.next().await.unwrap();
381 assert!(item.is_none());
382 }
383}