1#![allow(clippy::type_complexity)]
6use std::collections::{HashMap, HashSet};
7use std::hash::Hash as StdHash;
8use std::marker::PhantomData;
9
10use serde::{Deserialize, Serialize};
11use thiserror::Error;
12
13use crate::crypto::Rng;
14use crate::crypto::aead::AeadError;
15use crate::key_bundle::OneTimeKeyBundle;
16use crate::message_scheme::dcgka::{
17 ControlMessage, Dcgka, DcgkaError, DcgkaState, DirectMessage, OperationOutput, ProcessInput,
18};
19use crate::message_scheme::message::{decrypt_message, encrypt_message};
20use crate::message_scheme::ratchet::{
21 DecryptionRatchet, DecryptionRatchetState, Generation, RatchetError, RatchetSecret,
22 RatchetSecretState,
23};
24use crate::traits::{
25 AckedGroupMembership, ForwardSecureGroupMessage, ForwardSecureMessageContent,
26 ForwardSecureOrdering, IdentityHandle, IdentityManager, IdentityRegistry, OperationId,
27 PreKeyManager, PreKeyRegistry,
28};
29
30pub struct MessageGroup<ID, OP, PKI, DGM, KMG, ORD> {
32 _marker: PhantomData<(ID, OP, PKI, DGM, KMG, ORD)>,
33}
34
35#[derive(Debug, Serialize, Deserialize)]
37#[cfg_attr(any(test, feature = "test_utils"), derive(Clone))]
38pub struct GroupState<ID, OP, PKI, DGM, KMG, ORD>
39where
40 ID: IdentityHandle,
41 OP: OperationId,
42 PKI: IdentityRegistry<ID, PKI::State> + PreKeyRegistry<ID, OneTimeKeyBundle>,
43 PKI::State: Clone,
44 DGM: AckedGroupMembership<ID, OP>,
45 KMG: IdentityManager<KMG::State> + PreKeyManager,
46 KMG::State: Clone,
47 ORD: ForwardSecureOrdering<ID, OP, DGM>,
48{
49 pub(crate) my_id: ID,
50 pub(crate) dcgka: DcgkaState<ID, OP, PKI, DGM, KMG>,
51 pub(crate) orderer: ORD::State,
52 pub(crate) welcome: Option<ORD::Message>,
53 pub(crate) ratchet: Option<RatchetSecretState>,
54 pub(crate) decryption_ratchet: HashMap<ID, DecryptionRatchetState>,
55 pub(crate) config: GroupConfig,
56}
57
58impl<ID, OP, PKI, DGM, KMG, ORD> MessageGroup<ID, OP, PKI, DGM, KMG, ORD>
59where
60 ID: IdentityHandle,
61 OP: OperationId,
62 PKI: IdentityRegistry<ID, PKI::State> + PreKeyRegistry<ID, OneTimeKeyBundle>,
63 PKI::State: Clone,
64 DGM: AckedGroupMembership<ID, OP>,
65 KMG: IdentityManager<KMG::State> + PreKeyManager,
66 KMG::State: Clone,
67 ORD: ForwardSecureOrdering<ID, OP, DGM>,
68{
69 pub fn init(
73 my_id: ID,
74 my_keys: KMG::State,
75 pki: PKI::State,
76 dgm: DGM::State,
77 orderer: ORD::State,
78 config: GroupConfig,
79 ) -> GroupState<ID, OP, PKI, DGM, KMG, ORD> {
80 GroupState {
81 my_id,
82 dcgka: Dcgka::init(my_id, my_keys, pki, dgm),
83 orderer,
84 welcome: None,
85 ratchet: None,
86 decryption_ratchet: HashMap::new(),
87 config,
88 }
89 }
90
91 pub fn create(
93 mut y: GroupState<ID, OP, PKI, DGM, KMG, ORD>,
94 initial_members: Vec<ID>,
95 rng: &Rng,
96 ) -> GroupResult<ORD::Message, ID, OP, PKI, DGM, KMG, ORD> {
97 if y.ratchet.is_some() {
100 return Err(GroupError::GroupAlreadyEstablished);
101 }
102
103 let (y_dcgka_i, pre) = Dcgka::create(y.dcgka, initial_members, rng)?;
105 y.dcgka = y_dcgka_i;
106
107 let (mut y_i, message) = Self::process_local(y, pre, rng)?;
108
109 let y_orderer_i = ORD::set_welcome(y_i.orderer, &message).map_err(GroupError::Orderer)?;
111 y_i.orderer = y_orderer_i;
112
113 Ok((y_i, message))
114 }
115
116 pub fn add(
118 mut y: GroupState<ID, OP, PKI, DGM, KMG, ORD>,
119 added: ID,
120 rng: &Rng,
121 ) -> GroupResult<ORD::Message, ID, OP, PKI, DGM, KMG, ORD> {
122 if y.ratchet.is_none() {
123 return Err(GroupError::GroupNotYetEstablished);
124 }
125
126 if y.my_id == added {
127 return Err(GroupError::NotAddOurselves);
128 }
129
130 if Self::members(&y)?.contains(&added) {
131 return Err(GroupError::AddedExistsAlready(added));
132 }
133
134 let (y_dcgka_i, pre) = Dcgka::add(y.dcgka, added, rng)?;
136 y.dcgka = y_dcgka_i;
137
138 Self::process_local(y, pre, rng)
139 }
140
141 pub fn remove(
143 mut y: GroupState<ID, OP, PKI, DGM, KMG, ORD>,
144 removed: ID,
145 rng: &Rng,
146 ) -> GroupResult<ORD::Message, ID, OP, PKI, DGM, KMG, ORD> {
147 if y.ratchet.is_none() {
148 return Err(GroupError::GroupNotYetEstablished);
149 }
150
151 if !Self::members(&y)?.contains(&removed) {
152 return Err(GroupError::InexistentRemovedMember(removed));
153 }
154
155 let (y_dcgka_i, pre) = Dcgka::remove(y.dcgka, removed, rng)?;
157 y.dcgka = y_dcgka_i;
158
159 Self::process_local(y, pre, rng)
160 }
161
162 pub fn update(
164 mut y: GroupState<ID, OP, PKI, DGM, KMG, ORD>,
165 rng: &Rng,
166 ) -> GroupResult<ORD::Message, ID, OP, PKI, DGM, KMG, ORD> {
167 if y.ratchet.is_none() {
168 return Err(GroupError::GroupNotYetEstablished);
169 }
170
171 let (y_dcgka_i, pre) = Dcgka::update(y.dcgka, rng)?;
173 y.dcgka = y_dcgka_i;
174
175 Self::process_local(y, pre, rng)
176 }
177
178 pub fn receive(
186 mut y: GroupState<ID, OP, PKI, DGM, KMG, ORD>,
187 message: &ORD::Message,
188 rng: &Rng,
189 ) -> GroupResult<Option<GroupOutput<ID, OP, DGM, ORD>>, ID, OP, PKI, DGM, KMG, ORD> {
190 let members_pre = Self::members(&y)?;
193
194 let message_type = message.content();
195 let is_established = y.ratchet.is_some();
196 let mut is_create_or_welcome = false;
197
198 if let ForwardSecureMessageContent::Control(ControlMessage::Create {
201 ref initial_members,
202 }) = message_type
203 {
204 if is_established {
205 return Err(GroupError::GroupAlreadyEstablished);
206 }
207
208 if initial_members.contains(&y.my_id) {
209 is_create_or_welcome = true;
210 }
211 }
212
213 if let ForwardSecureMessageContent::Control(ControlMessage::Add { added }) = message_type
215 && !is_established
216 && added == y.my_id
217 {
218 is_create_or_welcome = true;
219 }
220
221 let y_orderer_i = ORD::queue(y.orderer, message).map_err(GroupError::Orderer)?;
222 y.orderer = y_orderer_i;
223
224 if !is_established && !is_create_or_welcome {
225 return Ok((y, None));
229 }
230
231 if !is_established && is_create_or_welcome {
232 let y_orderer_i = ORD::set_welcome(y.orderer, message).map_err(GroupError::Orderer)?;
235 y.orderer = y_orderer_i;
236
237 y.welcome = Some(message.clone());
239
240 let (y_i, result) = Self::process_ready(y, message, rng)?;
242
243 let members_post = Self::members(&y_i)?;
244
245 return Ok((
246 y_i,
247 result.map(|output| GroupOutput::new(vec![output], members_pre, members_post)),
248 ));
249 }
250
251 let mut events = Vec::new();
252 let mut y_loop = y;
253
254 loop {
256 let (y_orderer_next, result) =
257 ORD::next_ready_message(y_loop.orderer).map_err(GroupError::Orderer)?;
258 y_loop.orderer = y_orderer_next;
259
260 let Some(message) = result else {
261 break;
264 };
265
266 if let Some(welcome) = &y_loop.welcome {
267 if welcome.id() == message.id() {
269 continue;
270 }
271 }
272
273 let (y_next, result) = Self::process_ready(y_loop, &message, rng)?;
274 y_loop = y_next;
275 if let Some(message) = result {
276 events.push(message);
277 }
278 }
279
280 let members_post = Self::members(&y_loop)?;
281
282 Ok((
283 y_loop,
284 Some(GroupOutput::new(events, members_pre, members_post)),
285 ))
286 }
287
288 pub fn send(
293 mut y: GroupState<ID, OP, PKI, DGM, KMG, ORD>,
294 plaintext: &[u8],
295 ) -> GroupResult<ORD::Message, ID, OP, PKI, DGM, KMG, ORD> {
296 let Some(y_ratchet) = y.ratchet else {
297 return Err(GroupError::GroupNotYetEstablished);
298 };
299
300 let (y_ratchet_i, generation, ciphertext) = Self::encrypt(y_ratchet, plaintext)?;
302 y.ratchet = Some(y_ratchet_i);
303
304 let (y_orderer_i, message) =
306 ORD::next_application_message(y.orderer, generation, ciphertext)
307 .map_err(GroupError::Orderer)?;
308 y.orderer = y_orderer_i;
309
310 Ok((y, message))
311 }
312
313 pub fn members(
315 y: &GroupState<ID, OP, PKI, DGM, KMG, ORD>,
316 ) -> Result<HashSet<ID>, GroupError<ID, OP, PKI, DGM, KMG, ORD>> {
317 let members = Dcgka::member_view(&y.dcgka, &y.my_id)?;
318 Ok(members)
319 }
320
321 fn process_local(
323 mut y: GroupState<ID, OP, PKI, DGM, KMG, ORD>,
324 output: OperationOutput<ID, OP, DGM>,
325 rng: &Rng,
326 ) -> GroupResult<ORD::Message, ID, OP, PKI, DGM, KMG, ORD> {
327 let (y_orderer_i, message) =
329 ORD::next_control_message(y.orderer, &output.control_message, &output.direct_messages)
330 .map_err(GroupError::Orderer)?;
331 y.orderer = y_orderer_i;
332
333 let (y_dcgka_i, process) = Dcgka::process_local(y.dcgka, message.id(), output, rng)?;
335 y.dcgka = y_dcgka_i;
336
337 y.ratchet = Some(RatchetSecret::init(
339 process
340 .me_update_secret
341 .expect("local operation always yields an update secret for us")
342 .into(),
343 ));
344
345 Ok((y, message))
346 }
347
348 fn process_ready(
350 y: GroupState<ID, OP, PKI, DGM, KMG, ORD>,
351 message: &ORD::Message,
352 rng: &Rng,
353 ) -> GroupResult<Option<GroupEvent<ID, OP, DGM, ORD>>, ID, OP, PKI, DGM, KMG, ORD> {
354 match message.content() {
355 ForwardSecureMessageContent::Control(control_message) => {
356 let direct_message = message
357 .direct_messages()
358 .into_iter()
359 .find(|dm| dm.recipient == y.my_id);
360
361 let (y_i, output) = Self::process_remote(
362 y,
363 message.id(),
364 message.sender(),
365 control_message,
366 direct_message,
367 rng,
368 )?;
369
370 let is_removed = !Self::members(&y_i)?.contains(&y_i.my_id);
372 if is_removed {
373 Ok((y_i, Some(GroupEvent::RemovedOurselves)))
374 } else {
375 Ok((y_i, output.map(|msg| GroupEvent::Control(msg))))
376 }
377 }
378 ForwardSecureMessageContent::Application {
379 ciphertext,
380 generation,
381 } => {
382 let (y_i, plaintext) = Self::decrypt(y, message.sender(), ciphertext, generation)?;
383 Ok((
384 y_i,
385 Some(GroupEvent::Application {
386 plaintext,
387 message_id: message.id(),
388 }),
389 ))
390 }
391 }
392 }
393
394 fn process_remote(
396 mut y: GroupState<ID, OP, PKI, DGM, KMG, ORD>,
397 seq: OP,
398 sender: ID,
399 control_message: ControlMessage<ID, OP>,
400 direct_message: Option<DirectMessage<ID, OP, DGM>>,
401 rng: &Rng,
402 ) -> GroupResult<Option<ORD::Message>, ID, OP, PKI, DGM, KMG, ORD> {
403 let (y_dcgka_i, output) = Dcgka::process_remote(
404 y.dcgka,
405 ProcessInput {
406 seq,
407 sender,
408 control_message,
409 direct_message,
410 },
411 rng,
412 )?;
413 y.dcgka = y_dcgka_i;
414
415 if let Some(me_update_secret) = output.me_update_secret {
417 y.ratchet = Some(RatchetSecret::init(me_update_secret.into()));
418 }
419
420 if let Some(sender_update_secret) = output.sender_update_secret {
422 y.decryption_ratchet
423 .insert(sender, DecryptionRatchet::init(sender_update_secret.into()));
424 }
425
426 if let Some(output_control_message) = output.control_message {
427 let (y_orderer_i, output_message) = ORD::next_control_message(
429 y.orderer,
430 &output_control_message,
431 &output.direct_messages,
432 )
433 .map_err(GroupError::Orderer)?;
434 y.orderer = y_orderer_i;
435 Ok((y, Some(output_message)))
436 } else {
437 Ok((y, None))
438 }
439 }
440
441 fn encrypt(
443 y_ratchet: RatchetSecretState,
444 plaintext: &[u8],
445 ) -> Result<(RatchetSecretState, Generation, Vec<u8>), GroupError<ID, OP, PKI, DGM, KMG, ORD>>
446 {
447 let (y_ratchet_i, generation, key_material) =
449 RatchetSecret::ratchet_forward(y_ratchet).map_err(GroupError::EncryptionRatchet)?;
450
451 let ciphertext = encrypt_message(plaintext, key_material)?;
453
454 Ok((y_ratchet_i, generation, ciphertext))
455 }
456
457 fn decrypt(
459 mut y: GroupState<ID, OP, PKI, DGM, KMG, ORD>,
460 sender: ID,
461 ciphertext: Vec<u8>,
462 generation: Generation,
463 ) -> GroupResult<Vec<u8>, ID, OP, PKI, DGM, KMG, ORD> {
464 let Some(y_decryption_ratchet) = y.decryption_ratchet.remove(&sender) else {
465 return Err(GroupError::DecryptionRachetUnavailable(sender, generation));
466 };
467
468 let (y_decryption_ratchet_i, key_material) = DecryptionRatchet::secret_for_decryption(
470 y_decryption_ratchet,
471 generation,
472 y.config.maximum_forward_distance,
473 y.config.out_of_order_tolerance,
474 )
475 .map_err(GroupError::DecryptionRatchet)?;
476 y.decryption_ratchet.insert(sender, y_decryption_ratchet_i);
477
478 let plaintext = decrypt_message(&ciphertext, key_material)?;
480
481 Ok((y, plaintext))
482 }
483}
484
485pub type GroupResult<T, ID, OP, PKI, DGM, KMG, ORD> =
486 Result<(GroupState<ID, OP, PKI, DGM, KMG, ORD>, T), GroupError<ID, OP, PKI, DGM, KMG, ORD>>;
487
488#[derive(Clone, Default)]
490pub struct GroupOutput<ID, OP, DGM, ORD>
491where
492 ID: Clone + Eq + PartialEq + StdHash,
493 DGM: AckedGroupMembership<ID, OP>,
494 ORD: ForwardSecureOrdering<ID, OP, DGM>,
495{
496 pub events: Vec<GroupEvent<ID, OP, DGM, ORD>>,
497 pub removed_members: HashSet<ID>,
498 pub added_members: HashSet<ID>,
499}
500
501impl<ID, OP, DGM, ORD> GroupOutput<ID, OP, DGM, ORD>
502where
503 ID: Clone + Eq + PartialEq + StdHash,
504 DGM: AckedGroupMembership<ID, OP>,
505 ORD: ForwardSecureOrdering<ID, OP, DGM>,
506{
507 pub(crate) fn new(
508 events: Vec<GroupEvent<ID, OP, DGM, ORD>>,
509 pre_members: HashSet<ID>,
510 post_members: HashSet<ID>,
511 ) -> Self {
512 let added_members = post_members.difference(&pre_members).cloned().collect();
513 let removed_members = pre_members.difference(&post_members).cloned().collect();
514
515 GroupOutput {
516 events,
517 removed_members,
518 added_members,
519 }
520 }
521}
522
523#[derive(Clone, Debug, PartialEq, Eq)]
525pub enum GroupEvent<ID, OP, DGM, ORD>
526where
527 DGM: AckedGroupMembership<ID, OP>,
528 ORD: ForwardSecureOrdering<ID, OP, DGM>,
529{
530 Control(ORD::Message),
532
533 Application { plaintext: Vec<u8>, message_id: OP },
535
536 RemovedOurselves,
538}
539
540#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
542pub struct GroupConfig {
543 pub maximum_forward_distance: u32,
546
547 pub out_of_order_tolerance: u32,
552}
553
554impl Default for GroupConfig {
555 fn default() -> Self {
556 Self {
557 maximum_forward_distance: 1000,
558 out_of_order_tolerance: 100,
559 }
560 }
561}
562
563#[derive(Debug, Error)]
564pub enum GroupError<ID, OP, PKI, DGM, KMG, ORD>
565where
566 PKI: IdentityRegistry<ID, PKI::State> + PreKeyRegistry<ID, OneTimeKeyBundle>,
567 DGM: AckedGroupMembership<ID, OP>,
568 KMG: PreKeyManager,
569 ORD: ForwardSecureOrdering<ID, OP, DGM>,
570{
571 #[error(transparent)]
572 Dcgka(#[from] DcgkaError<ID, OP, PKI, DGM, KMG>),
573
574 #[error(transparent)]
575 Orderer(ORD::Error),
576
577 #[error(transparent)]
578 EncryptionRatchet(RatchetError),
579
580 #[error(transparent)]
581 DecryptionRatchet(RatchetError),
582
583 #[error(transparent)]
584 Aead(#[from] AeadError),
585
586 #[error("creating or joining a group is not possible, state is already established")]
587 GroupAlreadyEstablished,
588
589 #[error("state is not ready yet, group needs to be created or joined first")]
590 GroupNotYetEstablished,
591
592 #[error("can not add ourselves to the group")]
593 NotAddOurselves,
594
595 #[error("received \"create\" or \"add\" message addressing us but no direct message attached")]
596 DirectMessageMissing,
597
598 #[error("decryption ratchet not established yet to process the message from {0} @{1}")]
599 DecryptionRachetUnavailable(ID, Generation),
600
601 #[error("to-be-added member {0} is already part of the group")]
602 AddedExistsAlready(ID),
603
604 #[error("to-be-removed member {0} is not part of the group")]
605 InexistentRemovedMember(ID),
606}
607
608#[cfg(test)]
609mod tests {
610 use crate::crypto::Rng;
611 use crate::message_scheme::test_utils::network::Network;
612
613 #[test]
614 fn simple_group() {
615 let alice = 0;
616 let bob = 1;
617
618 let mut network = Network::new([alice, bob], Rng::from_seed([1; 32]));
619
620 network.create(alice, vec![bob]);
622
623 let results = network.process();
625 assert!(
626 results.is_empty(),
627 "no decrypted application messages expected"
628 );
629
630 for member in [alice, bob] {
632 assert_eq!(network.members(&member), vec![alice, bob]);
633 }
634
635 network.send(alice, b"Hello everyone!");
637 assert_eq!(
638 network.process(),
639 vec![(alice, bob, b"Hello everyone!".to_vec())],
640 );
641 }
642
643 #[test]
644 fn welcome() {
645 let alice = 0;
646 let bob = 1;
647 let charlie = 2;
648
649 let mut network = Network::new([alice, bob, charlie], Rng::from_seed([1; 32]));
650
651 network.create(alice, vec![bob]);
653 network.process();
654
655 network.update(bob);
657 network.process();
658
659 network.send(bob, b"Huhu");
661 assert_eq!(network.process(), vec![(bob, alice, b"Huhu".to_vec())],);
662
663 network.add(bob, charlie);
665 network.process();
666
667 network.send(alice, b"Hello everyone!");
669 assert_eq!(
670 network.process(),
671 vec![
672 (alice, bob, b"Hello everyone!".to_vec()),
673 (alice, charlie, b"Hello everyone!".to_vec()),
674 ],
675 );
676 }
677
678 #[test]
679 fn concurrency() {
680 let alice = 0;
681 let bob = 1;
682 let charlie = 2;
683
684 let mut network = Network::new([alice, bob, charlie], Rng::from_seed([1; 32]));
685
686 network.create(alice, vec![bob]);
688 network.process();
689
690 network.update(bob);
692 network.add(alice, charlie);
693 network.process();
694
695 network.send(bob, b"Hello everyone!");
697 assert_eq!(
698 network.process(),
699 vec![
700 (bob, alice, b"Hello everyone!".to_vec()),
701 (bob, charlie, b"Hello everyone!".to_vec()),
702 ],
703 );
704 }
705
706 #[test]
707 fn removal() {
708 let alice = 0;
709 let bob = 1;
710 let charlie = 2;
711
712 let mut network = Network::new([alice, bob, charlie], Rng::from_seed([1; 32]));
713
714 network.create(alice, vec![alice, bob, charlie]);
715 network.process();
716
717 network.remove(alice, bob);
719 network.process();
720
721 network.remove(charlie, charlie);
723 network.process();
724 }
725}