p2panda_auth/test_utils/
partial_ord.rs

1// SPDX-License-Identifier: MIT OR Apache-2.0
2
3//! Test utilities for when a functional orderer is required to provide partial
4//! ordering of operations.
5
6use std::cell::RefCell;
7use std::collections::{HashMap, HashSet, VecDeque};
8use std::fmt::Debug;
9use std::hash::Hash as StdHash;
10use std::marker::PhantomData;
11use std::rc::Rc;
12
13use rand::RngCore;
14use rand::rngs::StdRng;
15use thiserror::Error;
16
17use crate::group::{
18    GroupControlMessage, GroupCrdt, GroupCrdtError, GroupCrdtInnerState, GroupCrdtState,
19};
20use crate::test_utils::{Conditions, MemberId, MessageId, TestOperation, TestResolver};
21use crate::traits::{Operation, Orderer};
22
23pub type TestAuthState = GroupCrdtInnerState<MemberId, MessageId, Conditions, TestOperation>;
24pub type TestGroupState = GroupCrdtState<MemberId, MessageId, Conditions, TestOrderer>;
25pub type TestGroup = GroupCrdt<MemberId, MessageId, Conditions, TestResolver, TestOrderer>;
26pub type TestGroupError =
27    GroupCrdtError<MemberId, MessageId, Conditions, TestResolver, TestOrderer>;
28
29#[derive(Debug, Error)]
30pub enum OrdererError {}
31
32#[derive(Clone, Debug)]
33pub struct TestOrdererState {
34    pub my_id: MemberId,
35    pub auth_heads: Rc<RefCell<Vec<MessageId>>>,
36    pub orderer_y: PartialOrdererState<MessageId>,
37    pub messages: HashMap<MessageId, TestOperation>,
38    pub rng: StdRng,
39}
40
41impl TestOrdererState {
42    pub fn my_id(&self) -> MemberId {
43        self.my_id
44    }
45}
46
47#[derive(Clone, Debug, Default)]
48pub struct TestOrderer {}
49
50impl TestOrderer {
51    pub fn init(
52        my_id: MemberId,
53        auth_heads: Rc<RefCell<Vec<MessageId>>>,
54        rng: StdRng,
55    ) -> TestOrdererState {
56        TestOrdererState {
57            my_id,
58            auth_heads,
59            messages: Default::default(),
60            orderer_y: PartialOrdererState::default(),
61            rng,
62        }
63    }
64}
65
66impl Orderer<MemberId, MessageId, GroupControlMessage<MemberId, Conditions>> for TestOrderer {
67    type State = TestOrdererState;
68
69    type Error = OrdererError;
70
71    type Operation = TestOperation;
72
73    /// Construct the next operation which should include meta-data required for establishing order
74    /// between different operations.
75    ///
76    /// In this implementation causal order is established between operations using a graph
77    /// structure. Every operation contains a pointer to both the previous operations in a single auth
78    /// group graph, and also the tips of any sub-group graphs.
79    fn next_message(
80        mut y: Self::State,
81        control_message: &GroupControlMessage<MemberId, Conditions>,
82    ) -> Result<(Self::State, Self::Operation), Self::Error> {
83        // Get auth dependencies. These are the current heads of all groups.
84        let auth_dependencies = y.auth_heads.borrow().to_owned();
85
86        // Generate a new random operation id.
87        let next_id = { y.rng.next_u32() };
88
89        // Construct the actual operation.
90        let operation = TestOperation {
91            id: next_id,
92            author: y.my_id(),
93            dependencies: auth_dependencies.to_vec(),
94            payload: control_message.clone(),
95        };
96
97        // Queue the operation in the orderer.
98        //
99        // Even though we know the operation is ready for processing (ordering dependencies are
100        // met), we need to queue it so that the orderer progresses to the correct state.
101        //
102        // TODO: we should rather update the orderer state directly as this method (next_message) is
103        // always called locally and we can assume that our own messages are processed immediately.
104        let y_i = TestOrderer::queue(y, &operation)?;
105
106        Ok((y_i, operation))
107    }
108
109    fn queue(mut y: Self::State, message: &Self::Operation) -> Result<Self::State, Self::Error> {
110        let id = message.id();
111
112        {
113            let dependencies = message.dependencies();
114
115            if !PartialOrderer::ready(&y.orderer_y, &dependencies).unwrap() {
116                let (orderer_y_i, _) =
117                    PartialOrderer::mark_pending(y.orderer_y.clone(), id, dependencies.clone())
118                        .unwrap();
119                y.orderer_y = orderer_y_i;
120            } else {
121                let (orderer_y_i, _) = PartialOrderer::mark_ready(y.orderer_y.clone(), id).unwrap();
122                let orderer_y_ii = PartialOrderer::process_pending(orderer_y_i, id).unwrap();
123                y.orderer_y = orderer_y_ii;
124            }
125        }
126
127        Ok(y)
128    }
129
130    fn next_ready_message(
131        mut y: Self::State,
132    ) -> Result<(Self::State, Option<Self::Operation>), Self::Error> {
133        let next_msg = {
134            let (orderer_y_i, msg) = PartialOrderer::take_next_ready(y.orderer_y.clone()).unwrap();
135            y.orderer_y = orderer_y_i;
136            msg
137        };
138
139        let next_msg = match next_msg {
140            Some(msg) => y.messages.get(&msg).cloned(),
141            None => None,
142        };
143
144        Ok((y, next_msg))
145    }
146}
147
148/// Queue which checks if dependencies are met for an item and returning it as "ready".
149///
150/// Internally this assumes a structure where items can point at others as "dependencies", forming
151/// an DAG (Directed Acyclic Graph). The "orderer" monitors incoming items, asserts if the
152/// dependencies are met and yields a linearized sequence of "dependency checked" items.
153#[derive(Debug)]
154pub struct PartialOrderer<T> {
155    _marker: PhantomData<T>,
156}
157
158#[derive(Clone, Debug)]
159pub struct PartialOrdererState<T>
160where
161    T: PartialEq + Eq + StdHash,
162{
163    ready: HashSet<T>,
164    ready_queue: VecDeque<T>,
165    pending: HashMap<T, HashSet<(T, Vec<T>)>>,
166}
167
168impl<T> Default for PartialOrdererState<T>
169where
170    T: PartialEq + Eq + StdHash,
171{
172    fn default() -> Self {
173        Self {
174            ready: Default::default(),
175            ready_queue: Default::default(),
176            pending: Default::default(),
177        }
178    }
179}
180
181impl<T> PartialOrderer<T>
182where
183    T: Copy + Clone + PartialEq + Eq + StdHash,
184{
185    pub fn mark_ready(
186        mut y: PartialOrdererState<T>,
187        key: T,
188    ) -> Result<(PartialOrdererState<T>, bool), PartialOrdererError> {
189        let result = y.ready.insert(key);
190        if result {
191            y.ready_queue.push_back(key);
192        }
193        Ok((y, result))
194    }
195
196    pub fn mark_pending(
197        mut y: PartialOrdererState<T>,
198        key: T,
199        dependencies: Vec<T>,
200    ) -> Result<(PartialOrdererState<T>, bool), PartialOrdererError> {
201        let insert_occured = false;
202        for dep_key in &dependencies {
203            if y.ready.contains(dep_key) {
204                continue;
205            }
206
207            let dependents = y.pending.entry(*dep_key).or_default();
208            dependents.insert((key, dependencies.clone()));
209        }
210
211        Ok((y, insert_occured))
212    }
213
214    #[allow(clippy::type_complexity)]
215    pub fn get_next_pending(
216        y: &PartialOrdererState<T>,
217        key: T,
218    ) -> Result<Option<HashSet<(T, Vec<T>)>>, PartialOrdererError> {
219        Ok(y.pending.get(&key).cloned())
220    }
221
222    pub fn take_next_ready(
223        mut y: PartialOrdererState<T>,
224    ) -> Result<(PartialOrdererState<T>, Option<T>), PartialOrdererError> {
225        let result = y.ready_queue.pop_front();
226        Ok((y, result))
227    }
228
229    pub fn remove_pending(
230        mut y: PartialOrdererState<T>,
231        key: T,
232    ) -> Result<(PartialOrdererState<T>, bool), PartialOrdererError> {
233        let result = y.pending.remove(&key).is_some();
234        Ok((y, result))
235    }
236
237    pub fn ready(
238        y: &PartialOrdererState<T>,
239        dependencies: &[T],
240    ) -> Result<bool, PartialOrdererError> {
241        let deps_set = HashSet::from_iter(dependencies.iter().cloned());
242        let result = y.ready.is_superset(&deps_set);
243        Ok(result)
244    }
245
246    pub fn process_pending(
247        y: PartialOrdererState<T>,
248        key: T,
249    ) -> Result<PartialOrdererState<T>, PartialOrdererError> {
250        // Get all items which depend on the passed key.
251        let Some(dependents) = Self::get_next_pending(&y, key)? else {
252            return Ok(y);
253        };
254
255        // For each dependent check if it has all it's dependencies met, if not then we do nothing
256        // as it is still in a pending state.
257        let mut y_loop = y;
258        for (next_key, next_deps) in dependents {
259            if !Self::ready(&y_loop, &next_deps)? {
260                continue;
261            }
262
263            let (y_next, _) = Self::mark_ready(y_loop, next_key)?;
264            y_loop = y_next;
265
266            // Recurse down the dependency graph by now checking any pending items which depend on
267            // the current item.
268            let y_next = Self::process_pending(y_loop, next_key)?;
269            y_loop = y_next;
270        }
271
272        // Finally remove this item from the pending items queue.
273        let (y_i, _) = Self::remove_pending(y_loop, key)?;
274
275        Ok(y_i)
276    }
277}
278
279#[derive(Debug, Error)]
280pub enum PartialOrdererError {
281    // TODO: For now the orderer API is infallible, but we keep the error type around for later, as
282    // in it's current form the orderer would need to keep too much memory around for processing
283    // and we'll probably start to introduce a persistence backend (which can fail).
284}