p2panda_encryption/data_scheme/test_utils/
ordering.rs

1// SPDX-License-Identifier: MIT OR Apache-2.0
2
3use std::collections::HashMap;
4use std::marker::PhantomData;
5
6use serde::{Deserialize, Serialize};
7use thiserror::Error;
8
9use crate::crypto::xchacha20::XAeadNonce;
10use crate::data_scheme::{ControlMessage, DirectMessage, GroupSecretId};
11use crate::ordering::{Orderer, OrdererError, OrdererState};
12use crate::test_utils::{MemberId, MessageId};
13use crate::traits::{GroupMembership, GroupMessage, GroupMessageContent, Ordering};
14
15/// Orderer for testing the "data encryption" group APIs.
16///
17/// This is sufficient for the current testing setup but for anything "production ready" a more
18/// sophisticated solution will be required as all messages are kept in memory.
19#[derive(Clone, Debug)]
20pub struct MessageOrderer<DGM> {
21    _marker: PhantomData<DGM>,
22}
23
24impl<DGM> MessageOrderer<DGM>
25where
26    DGM: Clone + GroupMembership<MemberId, MessageId>,
27{
28    pub fn init(my_id: MemberId) -> MessageOrdererState<DGM> {
29        MessageOrdererState {
30            next_message_seq: 0,
31            previous: HashMap::new(),
32            orderer: Orderer::init(),
33            my_id,
34            messages: HashMap::new(),
35            welcome_message: None,
36        }
37    }
38}
39
40#[derive(Clone, Debug, Serialize, Deserialize)]
41pub struct MessageOrdererState<DGM>
42where
43    DGM: Clone + GroupMembership<MemberId, MessageId>,
44{
45    /// Sequence number of the next, message to-be published.
46    next_message_seq: usize,
47
48    /// Our own member id.
49    my_id: MemberId,
50
51    /// Internal helper to order messages based on their "previous" dependencies.
52    orderer: OrdererState<MessageId>,
53
54    /// Latest known message id's from each group member. This is the "head" of the DAG.
55    previous: HashMap<MemberId, MessageId>,
56
57    /// In-memory store of all messages.
58    messages: HashMap<MessageId, TestMessage<DGM>>,
59
60    /// "Create" or "Add" message which got us into the group.
61    welcome_message: Option<TestMessage<DGM>>,
62}
63
64impl<DGM> Ordering<MemberId, MessageId, DGM> for MessageOrderer<DGM>
65where
66    DGM: std::fmt::Debug
67        + Clone
68        + GroupMembership<MemberId, MessageId>
69        + Serialize
70        + for<'a> Deserialize<'a>,
71{
72    type State = MessageOrdererState<DGM>;
73
74    type Error = MessageOrdererError;
75
76    type Message = TestMessage<DGM>;
77
78    fn next_control_message(
79        mut y: Self::State,
80        control_message: &ControlMessage<MemberId>,
81        direct_messages: &[DirectMessage<MemberId, MessageId, DGM>],
82    ) -> Result<(Self::State, Self::Message), Self::Error> {
83        let seq = y.next_message_seq;
84        let sender = y.my_id;
85        let previous = y.previous.values().cloned().collect();
86
87        let message = TestMessage {
88            seq,
89            sender,
90            previous,
91            content: TestMessageContent::System {
92                control_message: control_message.to_owned(),
93                direct_messages: direct_messages.to_owned(),
94            },
95        };
96
97        y.next_message_seq += 1;
98        y.previous.insert(y.my_id, message.id());
99
100        Ok((y, message))
101    }
102
103    fn next_application_message(
104        mut y: Self::State,
105        group_secret_id: GroupSecretId,
106        nonce: XAeadNonce,
107        ciphertext: Vec<u8>,
108    ) -> Result<(Self::State, Self::Message), Self::Error> {
109        let seq = y.next_message_seq;
110        let sender = y.my_id;
111        let previous = y.previous.values().cloned().collect();
112
113        let message = TestMessage {
114            seq,
115            sender,
116            previous,
117            content: TestMessageContent::Application {
118                ciphertext,
119                group_secret_id,
120                nonce,
121            },
122        };
123
124        y.next_message_seq += 1;
125
126        Ok((y, message))
127    }
128
129    fn queue(mut y: Self::State, message: &Self::Message) -> Result<Self::State, Self::Error> {
130        let id = message.id();
131
132        // TODO: We keep all messages in memory currently which is bad. This needs a persistence
133        // layer as soon as we've looked into how it all plays together with our access control and
134        // stream APIs.
135        y.messages.insert(id, message.clone());
136
137        let previous: Vec<MessageId> = message
138            .previous
139            .iter()
140            .filter(|id| id.sender != y.my_id)
141            .cloned()
142            .collect();
143
144        if !Orderer::ready(&y.orderer, &previous)? {
145            let (y_orderer_i, _) = Orderer::mark_pending(y.orderer, id, previous)?;
146            y.orderer = y_orderer_i;
147            return Ok(y);
148        }
149
150        let (y_orderer_i, _) = Orderer::mark_ready(y.orderer, id)?;
151        let y_orderer_ii = Orderer::process_pending(y_orderer_i, id)?;
152        y.orderer = y_orderer_ii;
153
154        Ok(y)
155    }
156
157    fn set_welcome(
158        mut y: Self::State,
159        message: &Self::Message,
160    ) -> Result<Self::State, Self::Error> {
161        y.welcome_message = Some(message.clone());
162        Ok(y)
163    }
164
165    fn next_ready_message(
166        mut y: Self::State,
167    ) -> Result<(Self::State, Option<Self::Message>), Self::Error> {
168        // We have not joined the group yet, don't process any messages yet.
169        if y.welcome_message.is_none() {
170            return Ok((y, None));
171        };
172
173        let (y_orderer_i, next_ready) = Orderer::take_next_ready(y.orderer)?;
174        y.orderer = y_orderer_i;
175
176        let message = next_ready.map(|id| {
177            y.messages
178                .get(&id)
179                .expect("ids map consistently to messages")
180                .to_owned()
181        });
182
183        if let Some(ref message) = message
184            && let GroupMessageContent::Control(_) = message.content()
185        {
186            // Mark messages as "last seen" so we can mention the "previous" ones as soon
187            // as we publish a message ourselves.
188            y.previous.insert(message.sender(), message.id());
189        }
190
191        Ok((y, message))
192    }
193}
194
195#[derive(Clone, Debug, Serialize, Deserialize)]
196pub struct TestMessage<DGM>
197where
198    DGM: Clone + GroupMembership<MemberId, MessageId>,
199{
200    seq: usize,
201    sender: usize,
202    previous: Vec<MessageId>,
203    content: TestMessageContent<DGM>,
204}
205
206#[derive(Clone, Debug, Serialize, Deserialize)]
207pub enum TestMessageContent<DGM>
208where
209    DGM: Clone + GroupMembership<MemberId, MessageId>,
210{
211    Application {
212        ciphertext: Vec<u8>,
213        group_secret_id: GroupSecretId,
214        nonce: XAeadNonce,
215    },
216    System {
217        control_message: ControlMessage<MemberId>,
218        direct_messages: Vec<DirectMessage<MemberId, MessageId, DGM>>,
219    },
220}
221
222impl<DGM> GroupMessage<MemberId, MessageId, DGM> for TestMessage<DGM>
223where
224    DGM: Clone + GroupMembership<MemberId, MessageId>,
225{
226    fn id(&self) -> MessageId {
227        MessageId {
228            sender: self.sender,
229            seq: self.seq,
230        }
231    }
232
233    fn sender(&self) -> MemberId {
234        self.sender
235    }
236
237    fn content(&self) -> GroupMessageContent<MemberId> {
238        match &self.content {
239            TestMessageContent::Application {
240                ciphertext,
241                group_secret_id,
242                nonce,
243            } => GroupMessageContent::Application {
244                group_secret_id: *group_secret_id,
245                nonce: *nonce,
246                ciphertext: ciphertext.to_vec(),
247            },
248            TestMessageContent::System {
249                control_message, ..
250            } => GroupMessageContent::Control(control_message.clone()),
251        }
252    }
253
254    fn direct_messages(&self) -> Vec<DirectMessage<MemberId, MessageId, DGM>> {
255        match &self.content {
256            TestMessageContent::Application { .. } => Vec::new(),
257            TestMessageContent::System {
258                direct_messages, ..
259            } => direct_messages.clone(),
260        }
261    }
262}
263
264#[derive(Debug, Error)]
265pub enum MessageOrdererError {
266    #[error(transparent)]
267    Orderer(#[from] OrdererError),
268}