1use std::collections::{HashMap, HashSet, VecDeque};
4
5use rand::SeedableRng;
6use rand::rngs::StdRng;
7use rand::seq::SliceRandom;
8
9use crate::Access;
10use crate::group::{GroupAction, GroupControlMessage, GroupMember};
11use crate::traits::{GroupStore, Operation, Orderer};
12
13use super::{
14 MemberId, MessageId, TestGroup, TestGroupState, TestGroupStore, TestOperation, TestOrderer,
15 TestOrdererState,
16};
17
18pub struct Network {
19 members: HashMap<MemberId, NetworkMember>,
20 queue: VecDeque<TestOperation>,
21 rng: StdRng,
22}
23
24pub struct NetworkMember {
25 id: MemberId,
26 group_store: TestGroupStore,
27 orderer_y: TestOrdererState,
28}
29
30impl Network {
31 pub fn new<const N: usize>(members: [MemberId; N], mut rng: StdRng) -> Self {
32 Self {
33 members: HashMap::from_iter(members.into_iter().map(|member_id| {
34 let group_store = TestGroupStore::default();
35 let orderer_y = TestOrdererState::new(
36 member_id,
37 group_store.clone(),
38 StdRng::from_rng(&mut rng),
39 );
40 (
41 member_id,
42 NetworkMember {
43 id: member_id,
44 group_store,
45 orderer_y,
46 },
47 )
48 })),
49 queue: VecDeque::new(),
50 rng,
51 }
52 }
53
54 pub fn create(
55 &mut self,
56 group_id: MemberId,
57 creator: MemberId,
58 initial_members: Vec<(GroupMember<MemberId>, Access<()>)>,
59 ) -> MessageId {
60 let y = self.get_y(&creator, &group_id);
61 let control_message = GroupControlMessage {
62 group_id,
63 action: GroupAction::Create { initial_members },
64 };
65 let (y_i, operation) = TestGroup::prepare(y, &control_message).unwrap();
66 let operation_id = operation.id();
67 let y_ii = TestGroup::process(y_i, &operation).unwrap();
68 self.queue.push_back(operation);
69 self.set_y(y_ii);
70 operation_id
71 }
72
73 pub fn add(
74 &mut self,
75 adder: MemberId,
76 added: GroupMember<MemberId>,
77 group_id: MemberId,
78 access: Access<()>,
79 ) -> MessageId {
80 let y = self.get_y(&adder, &group_id);
81 let control_message = GroupControlMessage {
82 group_id,
83 action: GroupAction::Add {
84 member: added,
85 access,
86 },
87 };
88 let (y_i, operation) = TestGroup::prepare(y, &control_message).unwrap();
89 let y_ii = TestGroup::process(y_i, &operation).unwrap();
90 let operation_id = operation.id();
91 self.queue.push_back(operation);
92 self.set_y(y_ii);
93 operation_id
94 }
95
96 pub fn remove(
97 &mut self,
98 remover: MemberId,
99 removed: GroupMember<MemberId>,
100 group_id: MemberId,
101 ) -> MessageId {
102 let y = self.get_y(&remover, &group_id);
103 let control_message = GroupControlMessage {
104 group_id,
105 action: GroupAction::Remove { member: removed },
106 };
107 let (y_i, operation) = TestGroup::prepare(y, &control_message).unwrap();
108 let y_ii = TestGroup::process(y_i, &operation).unwrap();
109 let operation_id = operation.id();
110 self.queue.push_back(operation);
111 self.set_y(y_ii);
112 operation_id
113 }
114
115 pub fn demote(
116 &mut self,
117 demoter: MemberId,
118 demoted: GroupMember<MemberId>,
119 group_id: MemberId,
120 access: Access<()>,
121 ) -> MessageId {
122 let y = self.get_y(&demoter, &group_id);
123 let control_message = GroupControlMessage {
124 group_id,
125 action: GroupAction::Demote {
126 member: demoted,
127 access,
128 },
129 };
130 let (y_i, operation) = TestGroup::prepare(y, &control_message).unwrap();
131 let y_ii = TestGroup::process(y_i, &operation).unwrap();
132 let operation_id = operation.id();
133 self.queue.push_back(operation);
134 self.set_y(y_ii);
135 operation_id
136 }
137
138 pub fn promote(
139 &mut self,
140 promoter: MemberId,
141 promoted: GroupMember<MemberId>,
142 group_id: MemberId,
143 access: Access<()>,
144 ) -> MessageId {
145 let y = self.get_y(&promoter, &group_id);
146 let control_message = GroupControlMessage {
147 group_id,
148 action: GroupAction::Promote {
149 member: promoted,
150 access,
151 },
152 };
153 let (y_i, operation) = TestGroup::prepare(y, &control_message).unwrap();
154 let y_ii = TestGroup::process(y_i, &operation).unwrap();
155 let operation_id = operation.id();
156 self.queue.push_back(operation);
157 self.set_y(y_ii);
158 operation_id
159 }
160
161 pub fn process_ooo(&mut self) {
162 if self.queue.is_empty() {
163 return;
164 }
165
166 let member_ids: Vec<MemberId> = self.members.keys().cloned().collect();
167
168 self.shuffle();
169 while let Some(operation) = self.queue.pop_front() {
170 for id in &member_ids {
171 self.shuffle();
173 self.member_process(id, &operation)
174 }
175 }
176 }
177
178 pub fn process(&mut self) {
179 if self.queue.is_empty() {
180 return;
181 }
182
183 let member_ids: Vec<MemberId> = self.members.keys().cloned().collect();
184
185 while let Some(operation) = self.queue.pop_front() {
186 for id in &member_ids {
187 self.member_process(id, &operation)
188 }
189 }
190 }
191
192 fn member_process(&mut self, member_id: &MemberId, operation: &TestOperation) {
193 if &operation.author() == member_id {
195 return;
196 }
197
198 let control_message = operation.payload();
199 let mut group_id = control_message.group_id();
200 let mut y = self.get_y(member_id, &group_id);
201 let orderer_y = TestOrderer::queue(y.orderer_y.clone(), operation).unwrap();
202
203 loop {
204 let (orderer_y, result) = TestOrderer::next_ready_message(orderer_y.clone()).unwrap();
205 y.orderer_y = orderer_y;
206 self.set_y(y.clone());
207
208 let Some(operation) = result else {
209 break;
210 };
211
212 if &operation.author() == member_id {
213 continue;
214 }
215
216 group_id = operation.payload().group_id();
217 y = self.get_y(member_id, &group_id);
218 y = TestGroup::process(y.clone(), &operation).unwrap();
219 self.set_y(y.clone());
220 }
221 }
222
223 pub fn members(
224 &self,
225 member: &MemberId,
226 group_id: &MemberId,
227 ) -> Vec<(GroupMember<MemberId>, Access<()>)> {
228 let group_y = self.get_y(member, group_id);
229 let mut members = group_y.members();
230 members.sort();
231 members
232 }
233
234 pub fn members_at(
235 &self,
236 member: &MemberId,
237 group_id: &MemberId,
238 dependencies: &[MessageId],
239 ) -> Vec<(GroupMember<MemberId>, Access<()>)> {
240 let group_y = self.get_y(member, group_id);
241 let mut members = group_y.members_at(&dependencies.iter().copied().collect::<HashSet<_>>());
242 members.sort();
243 members
244 }
245
246 pub fn transitive_members(
247 &self,
248 member: &MemberId,
249 group_id: &MemberId,
250 ) -> Vec<(MemberId, Access<()>)> {
251 let group_y = self.get_y(member, group_id);
252 let mut members = group_y
253 .transitive_members()
254 .expect("get transitive members");
255 members.sort();
256 members
257 }
258
259 pub fn transitive_members_at(
260 &self,
261 member: &MemberId,
262 group_id: &MemberId,
263 dependencies: &[MessageId],
264 ) -> Vec<(MemberId, Access<()>)> {
265 let group_y = self.get_y(member, group_id);
266 let mut members = group_y
267 .transitive_members_at(&dependencies.iter().copied().collect::<HashSet<_>>())
268 .expect("get transitive members");
269 members.sort();
270 members
271 }
272
273 fn shuffle(&mut self) {
274 let mut queue = self.queue.clone().into_iter().collect::<Vec<_>>();
275 queue.shuffle(&mut self.rng);
276 self.queue = VecDeque::from(queue);
277 }
278
279 pub fn get_y(&self, member: &MemberId, group_id: &MemberId) -> TestGroupState {
280 let member = self.members.get(member).expect("member exists");
281 let group_y = TestGroupStore::get(&member.group_store, group_id).unwrap();
282
283 match group_y {
284 Some(group_y) => group_y,
285 None => TestGroupState::new(
286 member.id,
287 *group_id,
288 member.group_store.clone(),
289 member.orderer_y.clone(),
290 ),
291 }
292 }
293
294 fn set_y(&mut self, y: TestGroupState) {
295 let member = self.members.get_mut(&y.my_id).expect("member exists");
296 member.group_store.insert(&y.id(), &y).unwrap();
297 }
298}