p2panda_stream/ordering/partial/
mod.rs

1// SPDX-License-Identifier: MIT OR Apache-2.0
2
3pub mod operations;
4pub mod store;
5
6use std::fmt::{Debug, Display};
7use std::hash::Hash as StdHash;
8use std::marker::PhantomData;
9
10use thiserror::Error;
11
12pub use crate::partial::store::{MemoryStore, PartialOrderStore};
13
14/// Error types which may be returned from `PartialOrder` methods.
15#[derive(Debug, Error)]
16pub enum PartialOrderError {
17    #[error("store error: {0}")]
18    StoreError(String),
19}
20
21/// Struct for establishing partial order over a set of items which form a dependency graph.
22///
23/// A partial order sorts items based on their causal relationships. An item can be "before",
24/// "after" or "at the same time" as any other item.
25///
26/// This functionality is required when, for example, processing a set of messages where some
27/// messages _must_ be processed before others. A set such as this would naturally form a graph
28/// structure, each item would have a chain of dependencies. Another example would be a package
29/// dependency tree, where a certain package depends on one or many others. In order to understand
30/// which order we should install these packages, we need to partially order the set and process
31/// them from start to finish.
32///
33/// There are various approaches which can be taken when wanting to linearize items in a graph
34/// structure. The approach taken in this module establishes a partial order over all items in the
35/// set. The word "partial" indicates that some items may not be directly comparable. Items in
36/// different branches of the graph may not have a direct path between them, and so we don't know
37/// "which should come first". In fact, as there is no dependency relation between them, it makes
38/// no difference which comes first, and depending on the order items are processed the ordering
39/// process may arrive at different results (it is a non-deterministic algorithm).
40///
41/// Items in the process of being ordered are considered to be in one of two states. They are
42/// considered in a "ready" state when all their dependencies have themselves been processed, and
43/// in a "pending" state when their dependencies have not yet been processed.
44///
45/// If an item is in a "pending" state then it is held in a pending queue and if it's dependencies
46/// are later processed and "ready", then the so far "pending" item will be moved to the "ready"
47/// queue. This processing of pending items recursively checks all pending dependents.
48///
49/// Example graph:
50///
51/// ```text
52/// A <-- B2 <-- C
53///   \-- B1 <--/
54/// ```
55///
56/// Both of the following are possible and valid orderings for the above graph:
57///
58/// ```text
59/// [A, B1, B2, C]
60/// [A, B2, B1, C]
61/// ```
62///
63/// Items will not be placed into an partial order until all their dependencies are met, in the
64/// following example item C will not be visited as we have not processed all of it's
65/// dependencies.
66///
67/// Example graph:
68///
69/// ```text
70/// A <-- ?? <-- C
71///  \-- B1 <--/
72/// ```
73///
74/// C is not processed yet as we are missing one of its dependencies:
75///
76/// ```text
77/// [A, B1]
78/// ```
79///
80/// Note that no checks are made for cycles occurring in the graph, this should be validated on
81/// another layer.
82#[derive(Debug)]
83pub struct PartialOrder<K, S> {
84    /// Store for managing "ready" and "pending" items.
85    store: S,
86    _phantom: PhantomData<K>,
87}
88
89impl<K, S> PartialOrder<K, S>
90where
91    K: Clone + Copy + Display + StdHash + PartialEq + Eq,
92    S: PartialOrderStore<K>,
93{
94    pub fn new(store: S) -> Self {
95        Self {
96            store,
97            _phantom: PhantomData,
98        }
99    }
100
101    /// Pop the next item from the ready queue.
102    pub async fn next(&mut self) -> Result<Option<K>, PartialOrderError> {
103        self.store.take_next_ready().await
104    }
105
106    /// Process a new item which may be in a "ready" or "pending" state.
107    pub async fn process(&mut self, key: K, dependencies: &[K]) -> Result<(), PartialOrderError> {
108        if !self.store.ready(dependencies).await? {
109            self.store.mark_pending(key, dependencies.to_vec()).await?;
110            return Ok(());
111        }
112
113        self.store.mark_ready(key).await?;
114
115        // We added a new ready item to the store so now we want to process any pending items
116        // which depend on it as they may now have transitioned into a ready state.
117        self.process_pending(key).await?;
118
119        Ok(())
120    }
121
122    /// Recursively check if any pending items now have their dependencies met.
123    async fn process_pending(&mut self, key: K) -> Result<(), PartialOrderError> {
124        // Get all items which depend on the passed key.
125        let Some(dependents) = self.store.get_next_pending(key).await? else {
126            return Ok(());
127        };
128
129        // For each dependent check if it has all it's dependencies met, if not then we do nothing
130        // as it is still in a pending state.
131        for (next_key, next_deps) in dependents {
132            if !self.store.ready(&next_deps).await? {
133                continue;
134            }
135
136            self.store.mark_ready(next_key).await?;
137
138            // Recurse down the dependency graph by now checking any pending items which depend on
139            // the current item.
140            Box::pin(self.process_pending(next_key)).await?;
141        }
142
143        // Finally remove this item from the pending items queue.
144        self.store.remove_pending(key).await?;
145
146        Ok(())
147    }
148}
149
150#[cfg(test)]
151mod tests {
152    use std::collections::HashSet;
153
154    use super::{MemoryStore, PartialOrder};
155
156    #[tokio::test]
157    async fn partial_order() {
158        // Graph
159        //
160        // A <-- B <--------- D
161        //        \--- C <---/
162        //
163        let graph = [
164            ("A", vec![]),
165            ("B", vec!["A"]),
166            ("C", vec!["B"]),
167            ("D", vec!["B", "C"]),
168        ];
169
170        // A has no dependencies and so it's added straight to the processed set and ready queue.
171        let store = MemoryStore::default();
172        let mut checker = PartialOrder::new(store);
173        let item = graph[0].clone();
174        checker.process(item.0, &item.1).await.unwrap();
175        assert_eq!(checker.store.ready.len(), 1);
176        assert_eq!(checker.store.pending.len(), 0);
177        assert_eq!(checker.store.ready_queue.len(), 1);
178
179        // B has it's dependencies met and so it too is added to the processed set and ready
180        // queue.
181        let item = graph[1].clone();
182        checker.process(item.0, &item.1).await.unwrap();
183        assert_eq!(checker.store.ready.len(), 2);
184        assert_eq!(checker.store.pending.len(), 0);
185        assert_eq!(checker.store.ready_queue.len(), 2);
186
187        // D doesn't have both its dependencies met yet so it waits in the pending queue.
188        let item = graph[3].clone();
189        checker.process(item.0, &item.1).await.unwrap();
190        assert_eq!(checker.store.ready.len(), 2);
191        assert_eq!(checker.store.pending.len(), 1);
192        assert_eq!(checker.store.ready_queue.len(), 2);
193
194        // C satisfies D's dependencies and so both C & D are added to the processed set
195        // and ready queue.
196        let item = graph[2].clone();
197        checker.process(item.0, &item.1).await.unwrap();
198        assert_eq!(checker.store.ready.len(), 4);
199        assert_eq!(checker.store.pending.len(), 0);
200        assert_eq!(checker.store.ready_queue.len(), 4);
201
202        let item = checker.next().await.unwrap();
203        assert_eq!(item, Some("A"));
204        let item = checker.next().await.unwrap();
205        assert_eq!(item, Some("B"));
206        let item = checker.next().await.unwrap();
207        assert_eq!(item, Some("C"));
208        let item = checker.next().await.unwrap();
209        assert_eq!(item, Some("D"));
210        let item = checker.next().await.unwrap();
211        assert!(item.is_none());
212    }
213
214    #[tokio::test]
215    async fn partial_order_with_recursion() {
216        // Graph
217        //
218        // A <-- B <--------- D
219        //        \--- C <---/
220        //
221        let incomplete_graph = [
222            ("A", vec![]),
223            ("C", vec!["B"]),
224            ("D", vec!["C"]),
225            ("E", vec!["D"]),
226            ("F", vec!["E"]),
227            ("G", vec!["F"]),
228        ];
229
230        let store = MemoryStore::default();
231        let mut checker = PartialOrder::new(store);
232        for (key, dependencies) in incomplete_graph {
233            checker.process(key, &dependencies).await.unwrap();
234        }
235        assert_eq!(checker.store.ready.len(), 1);
236        assert_eq!(checker.store.pending.len(), 5);
237        assert_eq!(checker.store.ready_queue.len(), 1);
238
239        let missing_dependency = ("B", vec!["A"]);
240
241        checker
242            .process(missing_dependency.0, &missing_dependency.1)
243            .await
244            .unwrap();
245        assert_eq!(checker.store.ready.len(), 7);
246        assert_eq!(checker.store.pending.len(), 0);
247        assert_eq!(checker.store.ready_queue.len(), 7);
248
249        let item = checker.next().await.unwrap();
250        assert_eq!(item, Some("A"));
251        let item = checker.next().await.unwrap();
252        assert_eq!(item, Some("B"));
253        let item = checker.next().await.unwrap();
254        assert_eq!(item, Some("C"));
255        let item = checker.next().await.unwrap();
256        assert_eq!(item, Some("D"));
257        let item = checker.next().await.unwrap();
258        assert_eq!(item, Some("E"));
259        let item = checker.next().await.unwrap();
260        assert_eq!(item, Some("F"));
261        let item = checker.next().await.unwrap();
262        assert_eq!(item, Some("G"));
263        let item = checker.next().await.unwrap();
264        assert!(item.is_none());
265    }
266
267    #[tokio::test]
268    async fn complex_graph() {
269        // Graph
270        //
271        // A <-- B1 <-- C1 <--\
272        //   \-- ?? <-- C2 <-- D
273        //        \---- C3 <--/
274        //
275        let incomplete_graph = [
276            ("A", vec![]),
277            ("B1", vec!["A"]),
278            // This item is missing.
279            // ("B2", vec!["A"]),
280            ("C1", vec!["B1"]),
281            ("C2", vec!["B2"]),
282            ("C3", vec!["B2"]),
283            ("D", vec!["C1", "C2", "C3"]),
284        ];
285
286        let store = MemoryStore::default();
287        let mut checker = PartialOrder::new(store);
288        for (key, dependencies) in incomplete_graph {
289            checker.process(key, &dependencies).await.unwrap();
290        }
291
292        // A1, B1 and C1 have dependencies met and were already processed.
293        assert!(checker.store.ready.len() == 3);
294        assert_eq!(checker.store.pending.len(), 3);
295        assert_eq!(checker.store.ready_queue.len(), 3);
296
297        let item = checker.next().await.unwrap();
298        assert_eq!(item, Some("A"));
299        let item = checker.next().await.unwrap();
300        assert_eq!(item, Some("B1"));
301        let item = checker.next().await.unwrap();
302        assert_eq!(item, Some("C1"));
303        let item = checker.next().await.unwrap();
304        assert!(item.is_none());
305
306        // No more ready items.
307        assert_eq!(checker.store.ready_queue.len(), 0);
308
309        // Process the missing item.
310        let missing_dependency = ("B2", vec!["A"]);
311        checker
312            .process(missing_dependency.0, &missing_dependency.1)
313            .await
314            .unwrap();
315
316        // All items have now been processed and new ones are waiting in the ready queue.
317        assert_eq!(checker.store.ready.len(), 7);
318        assert_eq!(checker.store.pending.len(), 0);
319        assert_eq!(checker.store.ready_queue.len(), 4);
320
321        let mut concurrent_items = HashSet::from(["C2", "C3"]);
322
323        let item = checker.next().await.unwrap().unwrap();
324        assert_eq!(item, "B2");
325        let item = checker.next().await.unwrap().unwrap();
326        assert!(concurrent_items.remove(item));
327        let item = checker.next().await.unwrap().unwrap();
328        assert!(concurrent_items.remove(item));
329        let item = checker.next().await.unwrap().unwrap();
330        assert_eq!(item, "D");
331        let item = checker.next().await.unwrap();
332        assert!(item.is_none());
333    }
334
335    #[tokio::test]
336    async fn very_out_of_order() {
337        // Graph
338        //
339        // A <-- B1 <-- C1 <--\
340        //   \-- B2 <-- C2 <-- D
341        //        \---- C3 <--/
342        //
343        let out_of_order_graph = [
344            ("D", vec!["C1", "C2", "C3"]),
345            ("C1", vec!["B1"]),
346            ("B1", vec!["A"]),
347            ("B2", vec!["A"]),
348            ("C3", vec!["B2"]),
349            ("C2", vec!["B2"]),
350            ("A", vec![]),
351        ];
352
353        let store = MemoryStore::default();
354        let mut checker = PartialOrder::new(store);
355        for (key, dependencies) in out_of_order_graph {
356            checker.process(key, &dependencies).await.unwrap();
357        }
358
359        assert!(checker.store.ready.len() == 7);
360        assert_eq!(checker.store.pending.len(), 0);
361        assert_eq!(checker.store.ready_queue.len(), 7);
362
363        let item = checker.next().await.unwrap();
364        assert_eq!(item, Some("A"));
365
366        let mut concurrent_items = HashSet::from(["B1", "B2", "C1", "C2", "C3"]);
367
368        let item = checker.next().await.unwrap().unwrap();
369        assert!(concurrent_items.remove(item));
370        let item = checker.next().await.unwrap().unwrap();
371        assert!(concurrent_items.remove(item));
372        let item = checker.next().await.unwrap().unwrap();
373        assert!(concurrent_items.remove(item));
374        let item = checker.next().await.unwrap().unwrap();
375        assert!(concurrent_items.remove(item));
376        let item = checker.next().await.unwrap().unwrap();
377        assert!(concurrent_items.remove(item));
378        let item = checker.next().await.unwrap().unwrap();
379        assert_eq!(item, "D");
380        let item = checker.next().await.unwrap();
381        assert!(item.is_none());
382    }
383}