p2panda_auth/test_utils/
orderer.rs

1// SPDX-License-Identifier: MIT OR Apache-2.0
2
3use std::cell::RefCell;
4use std::collections::HashMap;
5use std::fmt::Debug;
6use std::rc::Rc;
7
8use rand::RngCore;
9use rand::rngs::StdRng;
10use thiserror::Error;
11
12use crate::group::{GroupAction, GroupControlMessage, GroupMember};
13use crate::test_utils::{
14    Conditions, MemberId, MessageId, PartialOrderer, PartialOrdererState, TestGroupState,
15    TestGroupStore,
16};
17use crate::traits::{GroupStore, Operation, Orderer};
18
19#[derive(Debug, Error)]
20pub enum OrdererError {}
21
22#[derive(Clone, Debug)]
23pub struct TestOrdererState {
24    pub inner: Rc<RefCell<TestOrdererStateInner>>,
25}
26
27#[derive(Clone, Debug)]
28pub struct TestOrdererStateInner {
29    pub my_id: MemberId,
30    pub group_store: TestGroupStore,
31    pub orderer_y: PartialOrdererState<MessageId>,
32    pub messages: HashMap<MessageId, TestOperation>,
33    pub rng: StdRng,
34}
35
36impl TestOrdererState {
37    pub fn new(my_id: MemberId, group_store: TestGroupStore, rng: StdRng) -> Self {
38        let inner = TestOrdererStateInner {
39            my_id,
40            group_store,
41            messages: Default::default(),
42            orderer_y: PartialOrdererState::default(),
43            rng,
44        };
45        Self {
46            inner: Rc::new(RefCell::new(inner)),
47        }
48    }
49
50    pub fn my_id(&self) -> MemberId {
51        self.inner.borrow().my_id
52    }
53}
54
55#[derive(Clone, Debug, Default)]
56pub struct TestOrderer {}
57
58impl Orderer<MemberId, MessageId, GroupControlMessage<MemberId, Conditions>> for TestOrderer {
59    type State = TestOrdererState;
60
61    type Error = OrdererError;
62
63    type Operation = TestOperation;
64
65    /// Construct the next operation which should include meta-data required for establishing order
66    /// between different operations.
67    ///
68    /// In this implementation causal order is established between operations using a graph
69    /// structure. Every operation contains a pointer to both the previous operations in a single auth
70    /// group graph, and also the tips of any sub-group graphs.
71    fn next_message(
72        y: Self::State,
73        control_message: &GroupControlMessage<MemberId, Conditions>,
74    ) -> Result<(Self::State, Self::Operation), Self::Error> {
75        let group_id = control_message.group_id();
76        let group_y = {
77            let y_inner = y.inner.borrow();
78
79            // Instantiate a new group.
80            let mut group_y = TestGroupState::new(
81                y_inner.my_id,
82                group_id,
83                y_inner.group_store.clone(),
84                y.clone(),
85            );
86
87            // If this isn't a create message, retrieve the current group state from the store.
88            if !control_message.is_create() {
89                let y = TestGroupStore::get(&y_inner.group_store, &group_id)
90                    .expect("get group state from store")
91                    .expect("group exists");
92                group_y = y;
93            }
94
95            group_y
96        };
97
98        // Get the "dependencies" of this operation. Dependencies are any other operations from any
99        // group which should be processed before this one. The transitive_heads method traverses
100        // the current group graph to all tips, and recurses into any sub-groups.
101        let mut dependencies = group_y
102            .transitive_heads()
103            .expect("retrieve transitive heads");
104
105        // If this operation adds a new member to the group, and that member itself is a
106        // sub-group, include their current transitive heads in our dependencies.
107        if let GroupControlMessage {
108            action:
109                GroupAction::Add {
110                    member: GroupMember::Group(id),
111                    ..
112                },
113            ..
114        } = control_message
115        {
116            let added_sub_group = group_y.get_sub_group(*id).expect("sub-group exists");
117            dependencies.extend(
118                &added_sub_group
119                    .transitive_heads()
120                    .expect("retrieve transitive heads"),
121            );
122        };
123
124        // If this is a create action, check if any of the initial members are sub-groups and if
125        // so include their current transitive heads in our dependencies.
126        if let GroupControlMessage {
127            action: GroupAction::Create { initial_members },
128            ..
129        } = control_message
130        {
131            for (member, _) in initial_members {
132                if let GroupMember::Group(id) = member {
133                    let sub_group = group_y.get_sub_group(*id).expect("sub-group exists");
134                    dependencies.extend(
135                        &sub_group
136                            .transitive_heads()
137                            .expect("retrieve transitive heads"),
138                    );
139                }
140            }
141        };
142
143        // The previous field includes only the tip operations for the target group graph.
144        let previous = group_y.heads();
145
146        // Generate a new random operation id.
147        let next_id = {
148            let mut y_mut = y.inner.borrow_mut();
149            y_mut.rng.next_u32()
150        };
151
152        // Construct the actual operation.
153        let operation = TestOperation {
154            id: next_id,
155            author: y.my_id(),
156            dependencies: dependencies.into_iter().collect::<Vec<_>>(),
157            previous: previous.into_iter().collect::<Vec<_>>(),
158            payload: control_message.clone(),
159        };
160
161        // Queue the operation in the orderer.
162        //
163        // Even though we know the operation is ready for processing (ordering dependencies are
164        // met), we need to queue it so that the orderer progresses to the correct state.
165        //
166        // TODO: we should rather update the orderer state directly as this method (next_message) is
167        // always called locally and we can assume that our own messages are processed immediately.
168        let y_i = TestOrderer::queue(y, &operation)?;
169
170        Ok((y_i, operation))
171    }
172
173    fn queue(y: Self::State, message: &Self::Operation) -> Result<Self::State, Self::Error> {
174        let id = message.id();
175
176        {
177            let mut inner: std::cell::RefMut<'_, TestOrdererStateInner> = y.inner.borrow_mut();
178            inner.messages.insert(id, message.clone());
179
180            let dependencies = message.dependencies();
181
182            if !PartialOrderer::ready(&inner.orderer_y, &dependencies).unwrap() {
183                let (orderer_y_i, _) =
184                    PartialOrderer::mark_pending(inner.orderer_y.clone(), id, dependencies.clone())
185                        .unwrap();
186                inner.orderer_y = orderer_y_i;
187            } else {
188                let (orderer_y_i, _) =
189                    PartialOrderer::mark_ready(inner.orderer_y.clone(), id).unwrap();
190                let orderer_y_ii = PartialOrderer::process_pending(orderer_y_i, id).unwrap();
191                inner.orderer_y = orderer_y_ii;
192            }
193        }
194
195        Ok(y)
196    }
197
198    fn next_ready_message(
199        y: Self::State,
200    ) -> Result<(Self::State, Option<Self::Operation>), Self::Error> {
201        let next_msg = {
202            let mut inner = y.inner.borrow_mut();
203            let (orderer_y_i, msg) =
204                PartialOrderer::take_next_ready(inner.orderer_y.clone()).unwrap();
205
206            inner.orderer_y = orderer_y_i;
207            msg
208        };
209
210        let next_msg = match next_msg {
211            Some(msg) => y.inner.borrow().messages.get(&msg).cloned(),
212            None => None,
213        };
214
215        Ok((y, next_msg))
216    }
217}
218
219#[derive(Clone, Debug)]
220pub struct TestOperation {
221    pub id: u32,
222    pub author: char,
223    pub dependencies: Vec<u32>,
224    pub previous: Vec<u32>,
225    pub payload: GroupControlMessage<char, ()>,
226}
227
228impl Operation<char, u32, GroupControlMessage<char, ()>> for TestOperation {
229    fn id(&self) -> u32 {
230        self.id
231    }
232
233    fn author(&self) -> char {
234        self.author
235    }
236
237    fn dependencies(&self) -> Vec<u32> {
238        self.dependencies.clone()
239    }
240
241    fn previous(&self) -> Vec<u32> {
242        self.previous.clone()
243    }
244
245    fn payload(&self) -> GroupControlMessage<char, ()> {
246        self.payload.clone()
247    }
248}