p2panda_encryption/message_scheme/test_utils/
ordering.rs

1// SPDX-License-Identifier: MIT OR Apache-2.0
2
3// TODO: A complete ordering solution following the full ordering specification for the "message
4// encryption" scheme will be provided as soon as "access control" work has been finished.
5use std::collections::HashMap;
6use std::marker::PhantomData;
7
8use serde::{Deserialize, Serialize};
9use thiserror::Error;
10
11use crate::message_scheme::{ControlMessage, DirectMessage, Generation};
12use crate::ordering::{Orderer, OrdererError, OrdererState};
13use crate::test_utils::{MemberId, MessageId};
14use crate::traits::{
15    AckedGroupMembership, ForwardSecureGroupMessage, ForwardSecureMessageContent,
16    ForwardSecureOrdering,
17};
18
19/// Simplified orderer for testing the "message encryption" group APIs.
20///
21/// NOTE: This orderer does _not_ fullfill the full specification for correct ordering. It's
22/// assuming that peers process all messages after each member has published max. one control
23/// or application message. On top it's very inefficient, as every published message points at
24/// _every_ previously published messages from all peers.
25///
26/// This is sufficient for the current testing setup but for anything "production ready" and
27/// more robust for all concurrency scenarios, a more sophisticated solution will be required.
28#[derive(Clone, Debug)]
29pub struct ForwardSecureOrderer<DGM> {
30    _marker: PhantomData<DGM>,
31}
32
33impl<DGM> ForwardSecureOrderer<DGM>
34where
35    DGM: Clone + AckedGroupMembership<MemberId, MessageId>,
36{
37    pub fn init(my_id: MemberId) -> ForwardSecureOrdererState<DGM> {
38        ForwardSecureOrdererState {
39            next_message_seq: 0,
40            orderer: Orderer::init(),
41            my_id,
42            messages: HashMap::new(),
43            welcome_message: None,
44        }
45    }
46}
47
48#[derive(Clone, Debug, Serialize, Deserialize)]
49pub struct ForwardSecureOrdererState<DGM>
50where
51    DGM: Clone + AckedGroupMembership<MemberId, MessageId>,
52{
53    /// Sequence number of the next, message to-be published.
54    next_message_seq: usize,
55
56    /// Our own member id.
57    my_id: MemberId,
58
59    /// Internal helper to order messages based on their "previous" dependencies.
60    orderer: OrdererState<MessageId>,
61
62    /// In-memory store of all messages.
63    messages: HashMap<MessageId, TestMessage<DGM>>,
64
65    /// "Create" or "Add" message which got us into the group.
66    welcome_message: Option<TestMessage<DGM>>,
67}
68
69impl<DGM> ForwardSecureOrdering<MemberId, MessageId, DGM> for ForwardSecureOrderer<DGM>
70where
71    DGM: std::fmt::Debug
72        + Clone
73        + AckedGroupMembership<MemberId, MessageId>
74        + Serialize
75        + for<'a> Deserialize<'a>,
76{
77    type State = ForwardSecureOrdererState<DGM>;
78
79    type Error = ForwardSecureOrdererError;
80
81    type Message = TestMessage<DGM>;
82
83    fn next_control_message(
84        mut y: Self::State,
85        control_message: &ControlMessage<MemberId, MessageId>,
86        direct_messages: &[DirectMessage<MemberId, MessageId, DGM>],
87    ) -> Result<(Self::State, Self::Message), Self::Error> {
88        let seq = y.next_message_seq;
89        let sender = y.my_id;
90
91        // This is a very naive implementation where every message points at _every_ known,
92        // previous message as an "dependency". This allows us to not write any code which tracks
93        // transitive dependencies.
94        let previous = y.messages.keys().cloned().collect();
95
96        let message = TestMessage {
97            seq,
98            sender,
99            previous,
100            content: TestMessageContent::System {
101                control_message: control_message.to_owned(),
102                direct_messages: direct_messages.to_owned(),
103            },
104        };
105
106        y.messages.insert(message.id(), message.clone());
107        y.next_message_seq += 1;
108
109        Ok((y, message))
110    }
111
112    fn next_application_message(
113        mut y: Self::State,
114        generation: Generation,
115        ciphertext: Vec<u8>,
116    ) -> Result<(Self::State, Self::Message), Self::Error> {
117        let seq = y.next_message_seq;
118        let sender = y.my_id;
119
120        // This is a very naive implementation where every message points at _every_ known,
121        // previous message as an "dependency". This allows us to not write any code which tracks
122        // transitive dependencies.
123        let previous = y.messages.keys().cloned().collect();
124
125        let message = TestMessage {
126            seq,
127            sender,
128            previous,
129            content: TestMessageContent::Application {
130                ciphertext,
131                generation,
132            },
133        };
134
135        y.messages.insert(message.id(), message.clone());
136        y.next_message_seq += 1;
137
138        Ok((y, message))
139    }
140
141    fn queue(mut y: Self::State, message: &Self::Message) -> Result<Self::State, Self::Error> {
142        let id = message.id();
143
144        y.messages.insert(id, message.clone());
145
146        // Clear dependencies list from own messages, we didn't queue them as we know that we've
147        // seen and processed them.
148        let previous: Vec<MessageId> = message
149            .previous
150            .iter()
151            .filter(|id| id.sender != y.my_id)
152            .cloned()
153            .collect();
154
155        if !Orderer::ready(&y.orderer, &previous)? {
156            let (y_orderer_i, _) = Orderer::mark_pending(y.orderer, id, previous)?;
157            y.orderer = y_orderer_i;
158            return Ok(y);
159        }
160
161        let (y_orderer_i, _) = Orderer::mark_ready(y.orderer, id)?;
162        let y_orderer_ii = Orderer::process_pending(y_orderer_i, id)?;
163        y.orderer = y_orderer_ii;
164
165        Ok(y)
166    }
167
168    fn set_welcome(
169        mut y: Self::State,
170        message: &Self::Message,
171    ) -> Result<Self::State, Self::Error> {
172        y.welcome_message = Some(message.clone());
173        Ok(y)
174    }
175
176    fn next_ready_message(
177        y: Self::State,
178    ) -> Result<(Self::State, Option<Self::Message>), Self::Error> {
179        // We have not joined the group yet, don't process any messages yet.
180        let Some(welcome) = y.welcome_message.clone() else {
181            return Ok((y, None));
182        };
183
184        let mut y_loop = y;
185        loop {
186            let (y_next, next_ready) = Orderer::take_next_ready(y_loop.orderer)?;
187            y_loop.orderer = y_next;
188
189            let message = next_ready.map(|id| {
190                y_loop
191                    .messages
192                    .get(&id)
193                    .expect("ids map consistently to messages")
194                    .to_owned()
195            });
196
197            if let Some(message) = message {
198                // Don't forward welcome message, it was already processed.
199                if message.id() == welcome.id() {
200                    continue;
201                }
202
203                // Control messages can be ignored if message is before our welcome. Concurrent
204                // messages need to be processed.
205                //
206                // This is a naive implementation where we assume that every member processed every
207                // control message after one round and where every message points at _every_
208                // previously-created message.
209                if let ForwardSecureMessageContent::Control { .. } = message.content()
210                    && welcome.previous.contains(&message.id())
211                {
212                    continue;
213                }
214
215                // Application messages can be ignored if before or concurrent to welcome.
216                if let ForwardSecureMessageContent::Application { .. } = message.content()
217                    && !message.previous.contains(&welcome.id())
218                {
219                    continue;
220                }
221
222                return Ok((y_loop, Some(message)));
223            } else {
224                return Ok((y_loop, None));
225            }
226        }
227    }
228}
229
230#[derive(Clone, Debug, Serialize, Deserialize)]
231pub struct TestMessage<DGM>
232where
233    DGM: Clone + AckedGroupMembership<MemberId, MessageId>,
234{
235    seq: usize,
236    sender: usize,
237    previous: Vec<MessageId>,
238    content: TestMessageContent<DGM>,
239}
240
241#[derive(Clone, Debug, Serialize, Deserialize)]
242pub enum TestMessageContent<DGM>
243where
244    DGM: Clone + AckedGroupMembership<MemberId, MessageId>,
245{
246    Application {
247        ciphertext: Vec<u8>,
248        generation: Generation,
249    },
250    System {
251        control_message: ControlMessage<MemberId, MessageId>,
252        direct_messages: Vec<DirectMessage<MemberId, MessageId, DGM>>,
253    },
254}
255
256impl<DGM> ForwardSecureGroupMessage<MemberId, MessageId, DGM> for TestMessage<DGM>
257where
258    DGM: Clone + AckedGroupMembership<MemberId, MessageId>,
259{
260    fn id(&self) -> MessageId {
261        MessageId {
262            sender: self.sender,
263            seq: self.seq,
264        }
265    }
266
267    fn sender(&self) -> MemberId {
268        self.sender
269    }
270
271    fn content(&self) -> ForwardSecureMessageContent<MemberId, MessageId> {
272        match &self.content {
273            TestMessageContent::Application {
274                ciphertext,
275                generation,
276            } => ForwardSecureMessageContent::Application {
277                ciphertext: ciphertext.to_owned(),
278                generation: *generation,
279            },
280            TestMessageContent::System {
281                control_message, ..
282            } => ForwardSecureMessageContent::Control(control_message.to_owned()),
283        }
284    }
285
286    fn direct_messages(&self) -> Vec<DirectMessage<MemberId, MessageId, DGM>> {
287        match &self.content {
288            TestMessageContent::Application { .. } => Vec::new(),
289            TestMessageContent::System {
290                direct_messages, ..
291            } => direct_messages.clone(),
292        }
293    }
294}
295
296#[derive(Debug, Error)]
297pub enum ForwardSecureOrdererError {
298    #[error(transparent)]
299    Orderer(#[from] OrdererError),
300}