1use std::cell::RefCell;
7use std::collections::{HashMap, HashSet, VecDeque};
8use std::fmt::Debug;
9use std::hash::Hash as StdHash;
10use std::marker::PhantomData;
11use std::rc::Rc;
12
13use rand::RngCore;
14use rand::rngs::StdRng;
15use thiserror::Error;
16
17use crate::group::{
18 GroupControlMessage, GroupCrdt, GroupCrdtError, GroupCrdtInnerState, GroupCrdtState,
19};
20use crate::test_utils::{Conditions, MemberId, MessageId, TestOperation, TestResolver};
21use crate::traits::{Operation, Orderer};
22
23pub type TestAuthState = GroupCrdtInnerState<MemberId, MessageId, Conditions, TestOperation>;
24pub type TestGroupState = GroupCrdtState<MemberId, MessageId, Conditions, TestOrderer>;
25pub type TestGroup = GroupCrdt<MemberId, MessageId, Conditions, TestResolver, TestOrderer>;
26pub type TestGroupError =
27 GroupCrdtError<MemberId, MessageId, Conditions, TestResolver, TestOrderer>;
28
29#[derive(Debug, Error)]
30pub enum OrdererError {}
31
32#[derive(Clone, Debug)]
33pub struct TestOrdererState {
34 pub my_id: MemberId,
35 pub auth_heads: Rc<RefCell<Vec<MessageId>>>,
36 pub orderer_y: PartialOrdererState<MessageId>,
37 pub messages: HashMap<MessageId, TestOperation>,
38 pub rng: StdRng,
39}
40
41impl TestOrdererState {
42 pub fn my_id(&self) -> MemberId {
43 self.my_id
44 }
45}
46
47#[derive(Clone, Debug, Default)]
48pub struct TestOrderer {}
49
50impl TestOrderer {
51 pub fn init(
52 my_id: MemberId,
53 auth_heads: Rc<RefCell<Vec<MessageId>>>,
54 rng: StdRng,
55 ) -> TestOrdererState {
56 TestOrdererState {
57 my_id,
58 auth_heads,
59 messages: Default::default(),
60 orderer_y: PartialOrdererState::default(),
61 rng,
62 }
63 }
64}
65
66impl Orderer<MemberId, MessageId, GroupControlMessage<MemberId, Conditions>> for TestOrderer {
67 type State = TestOrdererState;
68
69 type Error = OrdererError;
70
71 type Operation = TestOperation;
72
73 fn next_message(
80 mut y: Self::State,
81 control_message: &GroupControlMessage<MemberId, Conditions>,
82 ) -> Result<(Self::State, Self::Operation), Self::Error> {
83 let auth_dependencies = y.auth_heads.borrow().to_owned();
85
86 let next_id = { y.rng.next_u32() };
88
89 let operation = TestOperation {
91 id: next_id,
92 author: y.my_id(),
93 dependencies: auth_dependencies.to_vec(),
94 payload: control_message.clone(),
95 };
96
97 let y_i = TestOrderer::queue(y, &operation)?;
105
106 Ok((y_i, operation))
107 }
108
109 fn queue(mut y: Self::State, message: &Self::Operation) -> Result<Self::State, Self::Error> {
110 let id = message.id();
111
112 {
113 let dependencies = message.dependencies();
114
115 if !PartialOrderer::ready(&y.orderer_y, &dependencies).unwrap() {
116 let (orderer_y_i, _) =
117 PartialOrderer::mark_pending(y.orderer_y.clone(), id, dependencies.clone())
118 .unwrap();
119 y.orderer_y = orderer_y_i;
120 } else {
121 let (orderer_y_i, _) = PartialOrderer::mark_ready(y.orderer_y.clone(), id).unwrap();
122 let orderer_y_ii = PartialOrderer::process_pending(orderer_y_i, id).unwrap();
123 y.orderer_y = orderer_y_ii;
124 }
125 }
126
127 Ok(y)
128 }
129
130 fn next_ready_message(
131 mut y: Self::State,
132 ) -> Result<(Self::State, Option<Self::Operation>), Self::Error> {
133 let next_msg = {
134 let (orderer_y_i, msg) = PartialOrderer::take_next_ready(y.orderer_y.clone()).unwrap();
135 y.orderer_y = orderer_y_i;
136 msg
137 };
138
139 let next_msg = match next_msg {
140 Some(msg) => y.messages.get(&msg).cloned(),
141 None => None,
142 };
143
144 Ok((y, next_msg))
145 }
146}
147
148#[derive(Debug)]
154pub struct PartialOrderer<T> {
155 _marker: PhantomData<T>,
156}
157
158#[derive(Clone, Debug)]
159pub struct PartialOrdererState<T>
160where
161 T: PartialEq + Eq + StdHash,
162{
163 ready: HashSet<T>,
164 ready_queue: VecDeque<T>,
165 pending: HashMap<T, HashSet<(T, Vec<T>)>>,
166}
167
168impl<T> Default for PartialOrdererState<T>
169where
170 T: PartialEq + Eq + StdHash,
171{
172 fn default() -> Self {
173 Self {
174 ready: Default::default(),
175 ready_queue: Default::default(),
176 pending: Default::default(),
177 }
178 }
179}
180
181impl<T> PartialOrderer<T>
182where
183 T: Copy + Clone + PartialEq + Eq + StdHash,
184{
185 pub fn mark_ready(
186 mut y: PartialOrdererState<T>,
187 key: T,
188 ) -> Result<(PartialOrdererState<T>, bool), PartialOrdererError> {
189 let result = y.ready.insert(key);
190 if result {
191 y.ready_queue.push_back(key);
192 }
193 Ok((y, result))
194 }
195
196 pub fn mark_pending(
197 mut y: PartialOrdererState<T>,
198 key: T,
199 dependencies: Vec<T>,
200 ) -> Result<(PartialOrdererState<T>, bool), PartialOrdererError> {
201 let insert_occured = false;
202 for dep_key in &dependencies {
203 if y.ready.contains(dep_key) {
204 continue;
205 }
206
207 let dependents = y.pending.entry(*dep_key).or_default();
208 dependents.insert((key, dependencies.clone()));
209 }
210
211 Ok((y, insert_occured))
212 }
213
214 #[allow(clippy::type_complexity)]
215 pub fn get_next_pending(
216 y: &PartialOrdererState<T>,
217 key: T,
218 ) -> Result<Option<HashSet<(T, Vec<T>)>>, PartialOrdererError> {
219 Ok(y.pending.get(&key).cloned())
220 }
221
222 pub fn take_next_ready(
223 mut y: PartialOrdererState<T>,
224 ) -> Result<(PartialOrdererState<T>, Option<T>), PartialOrdererError> {
225 let result = y.ready_queue.pop_front();
226 Ok((y, result))
227 }
228
229 pub fn remove_pending(
230 mut y: PartialOrdererState<T>,
231 key: T,
232 ) -> Result<(PartialOrdererState<T>, bool), PartialOrdererError> {
233 let result = y.pending.remove(&key).is_some();
234 Ok((y, result))
235 }
236
237 pub fn ready(
238 y: &PartialOrdererState<T>,
239 dependencies: &[T],
240 ) -> Result<bool, PartialOrdererError> {
241 let deps_set = HashSet::from_iter(dependencies.iter().cloned());
242 let result = y.ready.is_superset(&deps_set);
243 Ok(result)
244 }
245
246 pub fn process_pending(
247 y: PartialOrdererState<T>,
248 key: T,
249 ) -> Result<PartialOrdererState<T>, PartialOrdererError> {
250 let Some(dependents) = Self::get_next_pending(&y, key)? else {
252 return Ok(y);
253 };
254
255 let mut y_loop = y;
258 for (next_key, next_deps) in dependents {
259 if !Self::ready(&y_loop, &next_deps)? {
260 continue;
261 }
262
263 let (y_next, _) = Self::mark_ready(y_loop, next_key)?;
264 y_loop = y_next;
265
266 let y_next = Self::process_pending(y_loop, next_key)?;
269 y_loop = y_next;
270 }
271
272 let (y_i, _) = Self::remove_pending(y_loop, key)?;
274
275 Ok(y_i)
276 }
277}
278
279#[derive(Debug, Error)]
280pub enum PartialOrdererError {
281 }