p2panda_auth/test_utils/
partial_ord.rs

1// SPDX-License-Identifier: MIT OR Apache-2.0
2
3use std::collections::{HashMap, HashSet, VecDeque};
4use std::hash::Hash as StdHash;
5use std::marker::PhantomData;
6
7use serde::{Deserialize, Serialize};
8use thiserror::Error;
9
10/// Queue which checks if dependencies are met for an item and returning it as "ready".
11///
12/// Internally this assumes a structure where items can point at others as "dependencies", forming
13/// an DAG (Directed Acyclic Graph). The "orderer" monitors incoming items, asserts if the
14/// dependencies are met and yields a linearized sequence of "dependency checked" items.
15#[derive(Debug)]
16pub struct PartialOrderer<T> {
17    _marker: PhantomData<T>,
18}
19
20#[derive(Clone, Debug, Serialize, Deserialize)]
21pub struct PartialOrdererState<T>
22where
23    T: PartialEq + Eq + StdHash,
24{
25    ready: HashSet<T>,
26    ready_queue: VecDeque<T>,
27    pending: HashMap<T, HashSet<(T, Vec<T>)>>,
28}
29
30impl<T> Default for PartialOrdererState<T>
31where
32    T: PartialEq + Eq + StdHash,
33{
34    fn default() -> Self {
35        Self {
36            ready: Default::default(),
37            ready_queue: Default::default(),
38            pending: Default::default(),
39        }
40    }
41}
42
43impl<T> PartialOrderer<T>
44where
45    T: Copy + Clone + PartialEq + Eq + StdHash,
46{
47    pub fn mark_ready(
48        mut y: PartialOrdererState<T>,
49        key: T,
50    ) -> Result<(PartialOrdererState<T>, bool), PartialOrdererError> {
51        let result = y.ready.insert(key);
52        if result {
53            y.ready_queue.push_back(key);
54        }
55        Ok((y, result))
56    }
57
58    pub fn mark_pending(
59        mut y: PartialOrdererState<T>,
60        key: T,
61        dependencies: Vec<T>,
62    ) -> Result<(PartialOrdererState<T>, bool), PartialOrdererError> {
63        let insert_occured = false;
64        for dep_key in &dependencies {
65            if y.ready.contains(dep_key) {
66                continue;
67            }
68
69            let dependents = y.pending.entry(*dep_key).or_default();
70            dependents.insert((key, dependencies.clone()));
71        }
72
73        Ok((y, insert_occured))
74    }
75
76    #[allow(clippy::type_complexity)]
77    pub fn get_next_pending(
78        y: &PartialOrdererState<T>,
79        key: T,
80    ) -> Result<Option<HashSet<(T, Vec<T>)>>, PartialOrdererError> {
81        Ok(y.pending.get(&key).cloned())
82    }
83
84    pub fn take_next_ready(
85        mut y: PartialOrdererState<T>,
86    ) -> Result<(PartialOrdererState<T>, Option<T>), PartialOrdererError> {
87        let result = y.ready_queue.pop_front();
88        Ok((y, result))
89    }
90
91    pub fn remove_pending(
92        mut y: PartialOrdererState<T>,
93        key: T,
94    ) -> Result<(PartialOrdererState<T>, bool), PartialOrdererError> {
95        let result = y.pending.remove(&key).is_some();
96        Ok((y, result))
97    }
98
99    pub fn ready(
100        y: &PartialOrdererState<T>,
101        dependencies: &[T],
102    ) -> Result<bool, PartialOrdererError> {
103        let deps_set = HashSet::from_iter(dependencies.iter().cloned());
104        let result = y.ready.is_superset(&deps_set);
105        Ok(result)
106    }
107
108    pub fn process_pending(
109        y: PartialOrdererState<T>,
110        key: T,
111    ) -> Result<PartialOrdererState<T>, PartialOrdererError> {
112        // Get all items which depend on the passed key.
113        let Some(dependents) = Self::get_next_pending(&y, key)? else {
114            return Ok(y);
115        };
116
117        // For each dependent check if it has all it's dependencies met, if not then we do nothing
118        // as it is still in a pending state.
119        let mut y_loop = y;
120        for (next_key, next_deps) in dependents {
121            if !Self::ready(&y_loop, &next_deps)? {
122                continue;
123            }
124
125            let (y_next, _) = Self::mark_ready(y_loop, next_key)?;
126            y_loop = y_next;
127
128            // Recurse down the dependency graph by now checking any pending items which depend on
129            // the current item.
130            let y_next = Self::process_pending(y_loop, next_key)?;
131            y_loop = y_next;
132        }
133
134        // Finally remove this item from the pending items queue.
135        let (y_i, _) = Self::remove_pending(y_loop, key)?;
136
137        Ok(y_i)
138    }
139}
140
141#[derive(Debug, Error)]
142pub enum PartialOrdererError {
143    // TODO: For now the orderer API is infallible, but we keep the error type around for later, as
144    // in it's current form the orderer would need to keep too much memory around for processing
145    // and we'll probably start to introduce a persistence backend (which can fail).
146}