1use gbp::{CodecError, ControlMessage, ErrorObject, GbpFrame};
23use gbp_core::{
24 ControlOpcode, ErrorClass, GbpFlags, GroupId, MemberId, NodeState, PayloadCodec, SequenceNo,
25 StreamId, StreamType, TransitionId, TransitionState, codes, errors::ErrorSpec, timeouts,
26};
27use gbp_mls::{MlsError, label_for};
28use std::collections::HashMap;
29use std::time::{Duration, Instant};
30
31#[derive(Debug, thiserror::Error)]
33pub enum NodeError {
34 #[error("codec: {0}")]
36 Codec(#[from] CodecError),
37 #[error("mls: {0}")]
39 Mls(#[from] MlsError),
40 #[error("invalid state: {0}")]
42 InvalidState(String),
43}
44
45pub struct OutboundFrame {
47 pub to: MemberId,
49 pub wire: Vec<u8>,
51}
52
53#[derive(Debug, Clone)]
55pub struct DeliveredPayload {
56 pub stream_type: StreamType,
58 pub stream_id: StreamId,
61 pub sequence_no: SequenceNo,
63 pub flags: u16,
65 pub plaintext: Vec<u8>,
67 pub codec: PayloadCodec,
69}
70
71#[derive(Debug, Clone)]
73pub enum Event {
74 StateChanged {
76 from: NodeState,
78 to: NodeState,
80 },
81 PayloadReceived(DeliveredPayload),
85 Control {
87 from: MemberId,
89 opcode: ControlOpcode,
91 transition_id: TransitionId,
93 request_id: u32,
95 args: Vec<u8>,
98 },
99 Error {
101 code: u16,
103 class: ErrorClass,
105 retryable: bool,
107 fatal: bool,
109 reason: String,
111 },
112 EpochAdvanced {
114 epoch: u64,
116 transition_id: TransitionId,
118 },
119 CoordinatorElectionNeeded,
123 BecameCoordinator,
126 CoordinatorClaim {
128 claimant: MemberId,
130 },
131}
132
133pub struct GroupNode {
140 pub member_id: MemberId,
142 pub is_coordinator: bool,
144 pub group_id: GroupId,
146 pub current_epoch: u64,
149 pub last_transition_id: TransitionId,
151 pub pending_transition_id: TransitionId,
153 pub state: NodeState,
155 pub transition_state: TransitionState,
157
158 out_seq: HashMap<(StreamType, StreamId), SequenceNo>,
159 in_hw: HashMap<(StreamType, StreamId), SequenceNo>,
160 events: Vec<Event>,
161
162 pending_commit_sender: Option<MemberId>,
166 prepare_deadline: Option<Instant>,
169 execute_deadline: Option<Instant>,
172 coordinator_last_seen: Option<Instant>,
175}
176
177impl GroupNode {
178 pub fn new(member_id: MemberId, group_id: GroupId) -> Self {
180 Self {
181 member_id,
182 group_id,
183 is_coordinator: false,
184 current_epoch: 0,
185 last_transition_id: 0,
186 pending_transition_id: 0,
187 state: NodeState::Idle,
188 transition_state: TransitionState::TIdle,
189 out_seq: HashMap::new(),
190 in_hw: HashMap::new(),
191 events: Vec::new(),
192 pending_commit_sender: None,
193 prepare_deadline: None,
194 execute_deadline: None,
195 coordinator_last_seen: None,
196 }
197 }
198
199 pub fn bootstrap_as_creator(&mut self, epoch: u64) {
201 self.transition(NodeState::Connecting);
202 self.transition(NodeState::EstablishingGroup);
203 self.current_epoch = epoch;
204 self.transition(NodeState::Active);
205 }
206
207 pub fn bootstrap_as_joiner(&mut self, epoch: u64, expected_first_tid: u32) {
216 self.transition(NodeState::Connecting);
217 self.transition(NodeState::EstablishingGroup);
218 self.current_epoch = epoch;
219 if expected_first_tid > 0 {
220 self.pending_transition_id = expected_first_tid;
221 self.transition_state = TransitionState::TPrepared;
222 }
223 self.transition(NodeState::Active);
224 }
225
226 pub fn drain_events(&mut self) -> Vec<Event> {
228 std::mem::take(&mut self.events)
229 }
230
231 pub fn member_stream_id(&self, base: u32) -> StreamId {
236 debug_assert!(
237 self.member_id < 1_000_000,
238 "member_id overflow: {0}",
239 self.member_id
240 );
241 base + self.member_id * 100
242 }
243
244 pub fn send_payload<S: Sealer>(
251 &mut self,
252 seal: &mut S,
253 target: MemberId,
254 stream_type: StreamType,
255 stream_id: StreamId,
256 flags: u16,
257 plaintext: &[u8],
258 codec: PayloadCodec,
259 ) -> Result<OutboundFrame, NodeError> {
260 self.assert_can_send()?;
261 let seq = self.next_seq(stream_type, stream_id);
262 let ciphertext = seal.seal(stream_type, seq, plaintext)?;
263 let frame = GbpFrame::new(
264 self.group_id,
265 self.current_epoch,
266 self.last_transition_id,
267 stream_type,
268 stream_id,
269 flags,
270 seq,
271 ciphertext,
272 codec.as_u8(),
273 );
274 Ok(OutboundFrame {
275 to: target,
276 wire: frame.to_cbor(),
277 })
278 }
279
280 pub fn send_control<S: Sealer>(
290 &mut self,
291 seal: &mut S,
292 target: MemberId,
293 opcode: ControlOpcode,
294 transition_id: TransitionId,
295 request_id: u32,
296 args: Vec<u8>,
297 ) -> Result<OutboundFrame, NodeError> {
298 let ctl = ControlMessage::with_args(
299 opcode as u16,
300 request_id,
301 self.member_id,
302 transition_id,
303 args,
304 );
305 let mut flags = GbpFlags::ordered_reliable_system();
306 if matches!(
307 opcode,
308 ControlOpcode::PrepareTransition
309 | ControlOpcode::ReadyForTransition
310 | ControlOpcode::ExecuteTransition
311 ) {
312 flags |= GbpFlags::CRITICAL;
313 }
314 match opcode {
318 ControlOpcode::PrepareTransition => {
319 self.pending_transition_id = transition_id;
320 self.transition_state = TransitionState::TPrepared;
321 self.prepare_deadline =
322 Some(Instant::now() + Duration::from_millis(timeouts::T_PREPARE_MAX_MS));
323 self.execute_deadline = None;
324 }
325 ControlOpcode::ReadyForTransition => {
326 self.execute_deadline =
327 Some(Instant::now() + Duration::from_millis(timeouts::T_EXECUTE_MAX_MS));
328 }
329 ControlOpcode::ExecuteTransition | ControlOpcode::AbortTransition => {
330 self.prepare_deadline = None;
331 self.execute_deadline = None;
332 if opcode == ControlOpcode::AbortTransition {
333 self.pending_transition_id = 0;
334 self.transition_state = TransitionState::TAborted;
335 }
336 }
337 _ => {}
338 }
339 let stream_id = self.member_stream_id(0);
340 self.send_payload(
341 seal,
342 target,
343 StreamType::Control,
344 stream_id,
345 flags,
346 &ctl.to_cbor(),
347 PayloadCodec::Cbor,
348 )
349 }
350
351 pub fn on_wire<S: Sealer>(
362 &mut self,
363 seal: &mut S,
364 wire: &[u8],
365 ) -> Result<Vec<Event>, NodeError> {
366 let frame = match GbpFrame::decode(wire) {
371 Ok(f) => f,
372 Err(e) => {
373 self.emit_err_spec(codes::STREAM_POLICY_VIOLATION, format!("frame decode: {e}"));
374 return Ok(self.drain_events());
375 }
376 };
377 self.deliver_frame(seal, frame)?;
378 Ok(self.drain_events())
379 }
380
381 fn deliver_frame<S: Sealer>(&mut self, seal: &mut S, frame: GbpFrame) -> Result<(), NodeError> {
382 if frame.version != 1 {
385 self.emit_err_spec(codes::UNSUPPORTED_VERSION, "version != 1");
386 return Ok(());
387 }
388 if frame.group_id_array() != self.group_id {
389 self.emit_err_spec(codes::UNKNOWN_GROUP, "group_id");
390 return Ok(());
391 }
392 if frame.epoch != self.current_epoch {
393 self.emit_err_spec(
394 codes::EPOCH_MISMATCH,
395 format!("got {}, expected {}", frame.epoch, self.current_epoch),
396 );
397 self.trigger_resync();
398 return Ok(());
399 }
400 if let Err(e) = frame.validate_payload_size() {
401 self.emit_err_spec(codes::STREAM_POLICY_VIOLATION, format!("payload size: {e}"));
402 return Ok(());
403 }
404 let flags = GbpFlags::from_bits(frame.flags);
405 let st = match frame.stream_type_typed() {
406 Ok(st) => st,
407 Err(_) => {
408 self.emit_err_spec(codes::STREAM_POLICY_VIOLATION, "unknown stream_type");
409 return Ok(());
410 }
411 };
412
413 if st != StreamType::Control
419 && flags.has(GbpFlags::CRITICAL)
420 && frame.transition_id != self.last_transition_id
421 {
422 self.emit_err_spec(
423 codes::TRANSITION_MISMATCH,
424 format!(
425 "got tid={}, expected {}",
426 frame.transition_id, self.last_transition_id
427 ),
428 );
429 return Ok(());
430 }
431
432 let key = (st, frame.stream_id);
433 let hw = self.in_hw.get(&key).copied().unwrap_or(0);
434 if frame.sequence_no <= hw {
435 self.emit_err_spec(
436 codes::REPLAY_DETECTED,
437 format!(
438 "st={} sid={} seq={} hw={}",
439 st, frame.stream_id, frame.sequence_no, hw
440 ),
441 );
442 return Ok(());
443 }
444 self.in_hw.insert(key, frame.sequence_no);
445
446 let plain = match seal.open(st, frame.sequence_no, &frame.encrypted_payload) {
447 Ok(p) => p,
448 Err(e) => {
449 self.emit_err_named(
456 codes::DECRYPT_FAILED,
457 ErrorClass::Crypto,
458 true, false, format!("aead open: {e}"),
461 );
462 return Ok(());
463 }
464 };
465
466 match st {
467 StreamType::Control => self.handle_control(plain),
468 other => self.events.push(Event::PayloadReceived(DeliveredPayload {
469 stream_type: other,
470 stream_id: frame.stream_id,
471 sequence_no: frame.sequence_no,
472 flags: frame.flags,
473 plaintext: plain,
474 codec: frame.payload_codec(),
475 })),
476 }
477 Ok(())
478 }
479
480 fn handle_control(&mut self, plain: Vec<u8>) {
481 let c = match ControlMessage::from_cbor(&plain) {
482 Ok(c) => c,
483 Err(_) => {
484 self.emit_err_spec(codes::STREAM_POLICY_VIOLATION, "control decode");
485 return;
486 }
487 };
488 let opcode = match ControlOpcode::try_from(c.opcode) {
489 Ok(op) => op,
490 Err(_) => {
491 self.emit_err_spec(codes::STREAM_POLICY_VIOLATION, "unknown opcode");
492 return;
493 }
494 };
495 let tid_ok = match opcode {
497 ControlOpcode::PrepareTransition => {
501 c.transition_id > self.last_transition_id
502 && (self.pending_transition_id == 0
503 || self.pending_transition_id == c.transition_id)
504 }
505 ControlOpcode::ReadyForTransition
507 | ControlOpcode::ExecuteTransition
508 | ControlOpcode::AbortTransition => {
509 self.pending_transition_id != 0 && c.transition_id == self.pending_transition_id
510 }
511 _ => true,
514 };
515 if !tid_ok {
516 self.emit_err_spec(
517 codes::TRANSITION_MISMATCH,
518 format!(
519 "control tid={} not valid for {:?} (last={}, pending={})",
520 c.transition_id, opcode, self.last_transition_id, self.pending_transition_id
521 ),
522 );
523 return;
524 }
525 match opcode {
526 ControlOpcode::PrepareTransition => {
527 if self.pending_transition_id == c.transition_id {
531 let current_winner = self.pending_commit_sender.unwrap_or(MemberId::MAX);
532 if c.sender_id >= current_winner {
533 self.events.push(Event::Control {
537 from: c.sender_id,
538 opcode,
539 transition_id: c.transition_id,
540 request_id: c.request_id,
541 args: c.args.to_vec(),
542 });
543 return;
544 }
545 }
547 self.pending_transition_id = c.transition_id;
548 self.pending_commit_sender = Some(c.sender_id);
549 self.transition_state = TransitionState::TPrepared;
550 self.note_coordinator_activity();
552 self.execute_deadline =
554 Some(Instant::now() + Duration::from_millis(timeouts::T_EXECUTE_MAX_MS));
555 }
556 ControlOpcode::ReadyForTransition => {
557 self.transition_state = TransitionState::TReady;
558 self.prepare_deadline = None;
560 }
561 ControlOpcode::ExecuteTransition => {
562 self.execute_deadline = None;
563 self.pending_commit_sender = None;
564 self.apply_transition(c.transition_id);
565 self.note_coordinator_activity();
566 }
567 ControlOpcode::AbortTransition => {
568 self.prepare_deadline = None;
569 self.execute_deadline = None;
570 self.pending_commit_sender = None;
571 self.transition_state = TransitionState::TAborted;
572 self.pending_transition_id = 0;
573 }
574 ControlOpcode::GroupStateDigestResponse => {
575 if self.state == NodeState::Resyncing {
576 self.transition(NodeState::Active);
577 }
578 }
579 ControlOpcode::CapabilitiesAdvertise => {
580 if Self::is_coordinator_claim(&c.args) {
581 self.note_coordinator_activity();
583 if self.is_coordinator && c.sender_id < self.member_id {
587 self.is_coordinator = false;
588 }
589 self.events.push(Event::CoordinatorClaim {
590 claimant: c.sender_id,
591 });
592 }
593 }
594 _ => {}
595 }
596 self.events.push(Event::Control {
597 from: c.sender_id,
598 opcode,
599 transition_id: c.transition_id,
600 request_id: c.request_id,
601 args: c.args.to_vec(),
602 });
603 }
604
605 pub fn apply_transition(&mut self, tid: TransitionId) {
608 self.current_epoch += 1;
609 self.last_transition_id = tid;
610 self.pending_transition_id = 0;
611 self.pending_commit_sender = None;
612 self.transition_state = TransitionState::TExecuted;
613 self.out_seq.clear();
614 self.in_hw.clear();
615 self.events.push(Event::EpochAdvanced {
616 epoch: self.current_epoch,
617 transition_id: tid,
618 });
619 }
620
621 pub fn trigger_resync(&mut self) {
623 if self.state != NodeState::Resyncing {
624 self.transition(NodeState::Resyncing);
625 }
626 }
627
628 pub fn check_timeouts(&mut self) -> Vec<Event> {
635 let now = Instant::now();
636
637 if self.prepare_deadline.is_some_and(|d| now >= d) {
638 self.prepare_deadline = None;
639 self.execute_deadline = None;
640 self.pending_transition_id = 0;
641 self.transition_state = TransitionState::TAborted;
642 self.emit_err_spec(codes::PREPARE_TIMEOUT, "T_prepare_max exceeded");
643 }
644
645 if self.execute_deadline.is_some_and(|d| now >= d) {
646 self.execute_deadline = None;
647 self.emit_err_spec(codes::EXECUTE_TIMEOUT, "T_execute_max exceeded");
648 }
649
650 if self.coordinator_last_seen.is_some_and(|t| {
651 now.duration_since(t).as_millis() as u64 >= timeouts::T_COORDINATOR_GRACE_MS
652 }) {
653 self.coordinator_last_seen = None;
654 self.is_coordinator = false;
655 self.emit_err_spec(
656 codes::COORDINATOR_GONE,
657 "coordinator silence exceeded T_coordinator_grace",
658 );
659 self.events.push(Event::CoordinatorElectionNeeded);
660 }
661
662 self.drain_events()
663 }
664
665 pub fn note_coordinator_activity(&mut self) {
672 self.coordinator_last_seen = Some(Instant::now());
673 }
674
675 pub fn claim_coordinator<S: Sealer>(
686 &mut self,
687 seal: &mut S,
688 target: MemberId,
689 ) -> Result<OutboundFrame, NodeError> {
690 let args = vec![0xA1u8, 0x00, 0xF5];
692 self.is_coordinator = true;
693 self.coordinator_last_seen = Some(Instant::now());
694 self.events.push(Event::BecameCoordinator);
695 self.send_control(
696 seal,
697 target,
698 ControlOpcode::CapabilitiesAdvertise,
699 self.last_transition_id,
700 0,
701 args,
702 )
703 }
704
705 fn is_coordinator_claim(args: &[u8]) -> bool {
708 if args == [0xA1, 0x00, 0xF5] {
712 return true;
713 }
714 args.windows(2).any(|w| w == [0x00, 0xF5])
718 }
719
720 fn transition(&mut self, next: NodeState) {
721 if self.state == next {
722 return;
723 }
724 if !self.state.can_transition_to(next) {
725 let from = self.state;
726 self.state = NodeState::Failed;
727 self.events.push(Event::StateChanged {
728 from,
729 to: NodeState::Failed,
730 });
731 return;
732 }
733 let from = self.state;
734 self.state = next;
735 self.events.push(Event::StateChanged { from, to: next });
736 }
737
738 fn assert_can_send(&self) -> Result<(), NodeError> {
739 if matches!(
740 self.state,
741 NodeState::Active | NodeState::Resyncing | NodeState::EstablishingGroup
742 ) {
743 Ok(())
744 } else {
745 Err(NodeError::InvalidState(format!(
746 "cannot send in state {}",
747 self.state
748 )))
749 }
750 }
751
752 fn next_seq(&mut self, st: StreamType, sid: StreamId) -> SequenceNo {
753 let entry = self.out_seq.entry((st, sid)).or_insert(0);
754 *entry += 1;
755 *entry
756 }
757
758 fn emit_err_spec(&mut self, code: u16, reason: impl Into<String>) {
759 if let Some(spec) = ErrorSpec::lookup(code) {
760 self.emit_err_named(spec.code, spec.class, spec.retryable, spec.fatal, reason);
761 } else {
762 self.emit_err_named(code, ErrorClass::Policy, false, false, reason);
763 }
764 }
765
766 fn emit_err_named(
767 &mut self,
768 code: u16,
769 class: ErrorClass,
770 retryable: bool,
771 fatal: bool,
772 reason: impl Into<String>,
773 ) {
774 let reason = reason.into();
775 let (class, retryable, fatal) = if let Some(spec) = ErrorSpec::lookup(code) {
778 (spec.class, spec.retryable, spec.fatal)
779 } else {
780 (class, retryable, fatal)
781 };
782 let _ = ErrorObject::new(code, class, retryable, fatal, reason.clone()).to_cbor();
783 self.events.push(Event::Error {
784 code,
785 class,
786 retryable,
787 fatal,
788 reason,
789 });
790 if fatal {
791 let from = self.state;
792 self.state = NodeState::Failed;
793 self.events.push(Event::StateChanged {
794 from,
795 to: NodeState::Failed,
796 });
797 }
798 }
799}
800
801pub trait Sealer {
807 fn seal(&mut self, st: StreamType, seq: SequenceNo, pt: &[u8]) -> Result<Vec<u8>, MlsError>;
809 fn open(&mut self, st: StreamType, seq: SequenceNo, ct: &[u8]) -> Result<Vec<u8>, MlsError>;
811}
812
813impl Sealer for gbp_mls::MlsContext {
814 fn seal(&mut self, st: StreamType, seq: SequenceNo, pt: &[u8]) -> Result<Vec<u8>, MlsError> {
815 gbp_mls::MlsContext::seal(self, label_for(st), seq, pt)
816 }
817 fn open(&mut self, st: StreamType, seq: SequenceNo, ct: &[u8]) -> Result<Vec<u8>, MlsError> {
818 gbp_mls::MlsContext::open(self, label_for(st), seq, ct)
819 }
820}
821
822#[cfg(test)]
823mod tests {
824 use super::*;
825
826 struct PlainSealer;
827 impl Sealer for PlainSealer {
828 fn seal(
829 &mut self,
830 _st: StreamType,
831 _seq: SequenceNo,
832 pt: &[u8],
833 ) -> Result<Vec<u8>, MlsError> {
834 Ok(pt.to_vec())
835 }
836 fn open(
837 &mut self,
838 _st: StreamType,
839 _seq: SequenceNo,
840 ct: &[u8],
841 ) -> Result<Vec<u8>, MlsError> {
842 Ok(ct.to_vec())
843 }
844 }
845
846 fn group_id() -> GroupId {
847 let mut g = [0u8; 16];
848 g[..3].copy_from_slice(b"GBP");
849 g
850 }
851
852 #[test]
853 fn replay_window_rejects_repeat() {
854 let mut alice = GroupNode::new(1, group_id());
855 let mut bob = GroupNode::new(2, group_id());
856 alice.bootstrap_as_creator(1);
857 bob.bootstrap_as_joiner(1, 0);
858 let mut s = PlainSealer;
859 let sid = alice.member_stream_id(2);
860 let f = alice
861 .send_payload(
862 &mut s,
863 2,
864 StreamType::Text,
865 sid,
866 GbpFlags::ordered_reliable_ack(),
867 b"hi",
868 PayloadCodec::Cbor,
869 )
870 .unwrap();
871 let _ = bob.on_wire(&mut s, &f.wire).unwrap();
872 let evs = bob.on_wire(&mut s, &f.wire).unwrap();
873 assert!(evs.iter().any(|e| matches!(
874 e,
875 Event::Error {
876 code: codes::REPLAY_DETECTED,
877 ..
878 }
879 )));
880 }
881
882 #[test]
883 fn epoch_mismatch_triggers_resync() {
884 let mut alice = GroupNode::new(1, group_id());
885 let mut bob = GroupNode::new(2, group_id());
886 alice.bootstrap_as_creator(1);
887 bob.bootstrap_as_joiner(1, 0);
888 alice.current_epoch = 2;
889 let mut s = PlainSealer;
890 let sid = alice.member_stream_id(2);
891 let f = alice
892 .send_payload(
893 &mut s,
894 2,
895 StreamType::Text,
896 sid,
897 GbpFlags::ordered_reliable_ack(),
898 b"x",
899 PayloadCodec::Cbor,
900 )
901 .unwrap();
902 let _ = bob.on_wire(&mut s, &f.wire).unwrap();
903 assert_eq!(bob.state, NodeState::Resyncing);
904 }
905
906 #[test]
907 fn payload_emits_received_event() {
908 let mut alice = GroupNode::new(1, group_id());
909 let mut bob = GroupNode::new(2, group_id());
910 alice.bootstrap_as_creator(1);
911 bob.bootstrap_as_joiner(1, 0);
912 let mut s = PlainSealer;
913 let sid = alice.member_stream_id(2);
914 let f = alice
915 .send_payload(
916 &mut s,
917 2,
918 StreamType::Text,
919 sid,
920 GbpFlags::ordered_reliable_ack(),
921 b"payload",
922 PayloadCodec::Cbor,
923 )
924 .unwrap();
925 let evs = bob.on_wire(&mut s, &f.wire).unwrap();
926 let pr = evs
927 .into_iter()
928 .find_map(|e| match e {
929 Event::PayloadReceived(p) => Some(p),
930 _ => None,
931 })
932 .expect("payload");
933 assert_eq!(pr.stream_type, StreamType::Text);
934 assert_eq!(pr.plaintext, b"payload");
935 }
936
937 fn drain_errs(events: &[Event]) -> Vec<u16> {
940 events
941 .iter()
942 .filter_map(|e| match e {
943 Event::Error { code, .. } => Some(*code),
944 _ => None,
945 })
946 .collect()
947 }
948
949 fn drain_controls(events: &[Event]) -> Vec<(ControlOpcode, TransitionId)> {
950 events
951 .iter()
952 .filter_map(|e| match e {
953 Event::Control {
954 opcode,
955 transition_id,
956 ..
957 } => Some((*opcode, *transition_id)),
958 _ => None,
959 })
960 .collect()
961 }
962
963 #[test]
964 fn prepare_transition_sets_pending_on_sender_and_receiver() {
965 let mut coord = GroupNode::new(1, group_id());
966 let mut peer = GroupNode::new(2, group_id());
967 coord.bootstrap_as_creator(0);
968 peer.bootstrap_as_joiner(0, 0);
969 let mut s = PlainSealer;
970 let f = coord
972 .send_control(
973 &mut s,
974 0,
975 ControlOpcode::PrepareTransition,
976 1,
977 100,
978 b"commit-blob".to_vec(),
979 )
980 .unwrap();
981 assert_eq!(coord.pending_transition_id, 1, "sender mirrors pending");
982 assert_eq!(coord.transition_state, TransitionState::TPrepared);
983 let evs = peer.on_wire(&mut s, &f.wire).unwrap();
984 assert_eq!(peer.pending_transition_id, 1, "receiver records pending");
985 assert!(
986 drain_errs(&evs).is_empty(),
987 "no error: {:?}",
988 drain_errs(&evs)
989 );
990 let ctls = drain_controls(&evs);
991 assert_eq!(ctls, vec![(ControlOpcode::PrepareTransition, 1)]);
992 }
993
994 #[test]
995 fn ready_with_wrong_tid_is_rejected() {
996 let mut coord = GroupNode::new(1, group_id());
997 let mut peer = GroupNode::new(2, group_id());
998 coord.bootstrap_as_creator(0);
999 peer.bootstrap_as_joiner(0, 0);
1000 let mut s = PlainSealer;
1001 let f = coord
1002 .send_control(&mut s, 0, ControlOpcode::PrepareTransition, 1, 1, vec![])
1003 .unwrap();
1004 peer.on_wire(&mut s, &f.wire).unwrap();
1005 let bogus = peer
1007 .send_control(&mut s, 1, ControlOpcode::ReadyForTransition, 7, 1, vec![])
1008 .unwrap();
1009 let evs = coord.on_wire(&mut s, &bogus.wire).unwrap();
1010 let errs = drain_errs(&evs);
1011 assert!(errs.contains(&codes::TRANSITION_MISMATCH), "got {:?}", errs);
1012 }
1013
1014 #[test]
1015 fn execute_advances_epoch_and_clears_pending() {
1016 let mut coord = GroupNode::new(1, group_id());
1017 let mut peer = GroupNode::new(2, group_id());
1018 coord.bootstrap_as_creator(0);
1019 peer.bootstrap_as_joiner(0, 0);
1020 let mut s = PlainSealer;
1021 let prep = coord
1022 .send_control(&mut s, 0, ControlOpcode::PrepareTransition, 1, 1, vec![])
1023 .unwrap();
1024 peer.on_wire(&mut s, &prep.wire).unwrap();
1025 let exec = coord
1027 .send_control(&mut s, 0, ControlOpcode::ExecuteTransition, 1, 2, vec![])
1028 .unwrap();
1029 coord.apply_transition(1);
1030 let evs = peer.on_wire(&mut s, &exec.wire).unwrap();
1031 assert_eq!(coord.last_transition_id, 1);
1032 assert_eq!(coord.current_epoch, 1);
1033 assert_eq!(peer.last_transition_id, 1);
1034 assert_eq!(peer.current_epoch, 1);
1035 assert_eq!(peer.pending_transition_id, 0);
1036 assert!(evs.iter().any(|e| matches!(
1037 e,
1038 Event::EpochAdvanced {
1039 transition_id: 1,
1040 ..
1041 }
1042 )));
1043 }
1044
1045 #[test]
1046 fn abort_clears_pending_no_advance() {
1047 let mut coord = GroupNode::new(1, group_id());
1048 let mut peer = GroupNode::new(2, group_id());
1049 coord.bootstrap_as_creator(0);
1050 peer.bootstrap_as_joiner(0, 0);
1051 let mut s = PlainSealer;
1052 let prep = coord
1053 .send_control(&mut s, 0, ControlOpcode::PrepareTransition, 1, 1, vec![])
1054 .unwrap();
1055 peer.on_wire(&mut s, &prep.wire).unwrap();
1056 let abort = coord
1057 .send_control(&mut s, 0, ControlOpcode::AbortTransition, 1, 2, vec![])
1058 .unwrap();
1059 peer.on_wire(&mut s, &abort.wire).unwrap();
1060 assert_eq!(peer.pending_transition_id, 0);
1061 assert_eq!(peer.current_epoch, 0);
1062 assert_eq!(peer.transition_state, TransitionState::TAborted);
1063 assert_eq!(coord.transition_state, TransitionState::TAborted);
1064 }
1065
1066 #[test]
1067 fn bootstrap_as_joiner_with_expected_tid_accepts_first_execute() {
1068 let mut coord = GroupNode::new(1, group_id());
1069 let mut joiner = GroupNode::new(2, group_id());
1071 coord.bootstrap_as_creator(0);
1072 joiner.bootstrap_as_joiner(0, 1);
1073 assert_eq!(joiner.pending_transition_id, 1);
1074 let mut s = PlainSealer;
1075 let _ = coord
1077 .send_control(&mut s, 0, ControlOpcode::PrepareTransition, 1, 1, vec![])
1078 .unwrap();
1079 let exec = coord
1081 .send_control(&mut s, 0, ControlOpcode::ExecuteTransition, 1, 2, vec![])
1082 .unwrap();
1083 let evs = joiner.on_wire(&mut s, &exec.wire).unwrap();
1084 let errs = drain_errs(&evs);
1085 assert!(
1086 errs.is_empty(),
1087 "expected clean apply, got errors {:?}",
1088 errs
1089 );
1090 assert_eq!(joiner.last_transition_id, 1);
1091 assert_eq!(joiner.current_epoch, 1);
1092 }
1093
1094 #[test]
1097 fn claim_coordinator_sets_flag_and_emits_event() {
1098 let mut node = GroupNode::new(1, group_id());
1099 node.bootstrap_as_creator(0);
1100 node.drain_events();
1101 let mut s = PlainSealer;
1102 let _ = node.claim_coordinator(&mut s, 0).unwrap();
1103 assert!(node.is_coordinator);
1104 let evs = node.drain_events();
1105 assert!(evs.iter().any(|e| matches!(e, Event::BecameCoordinator)));
1106 }
1107
1108 #[test]
1109 fn coordinator_gone_emits_election_needed() {
1110 let mut member = GroupNode::new(2, group_id());
1111 member.bootstrap_as_joiner(0, 0);
1112 member.coordinator_last_seen = Some(Instant::now() - Duration::from_millis(11_000));
1113 let evs = member.check_timeouts();
1114 assert!(
1115 evs.iter()
1116 .any(|e| matches!(e, Event::CoordinatorElectionNeeded))
1117 );
1118 assert!(!member.is_coordinator, "flag cleared on silence");
1119 }
1120
1121 #[test]
1122 fn capabilities_advertise_with_claim_resets_silence_timer() {
1123 let mut member = GroupNode::new(2, group_id());
1124 let mut coord = GroupNode::new(1, group_id());
1125 member.bootstrap_as_joiner(0, 0);
1126 coord.bootstrap_as_creator(0);
1127 let mut s = PlainSealer;
1128 let f = coord.claim_coordinator(&mut s, 2).unwrap();
1130 let evs = member.on_wire(&mut s, &f.wire).unwrap();
1132 assert!(
1133 member.coordinator_last_seen.is_some(),
1134 "silence timer reset"
1135 );
1136 assert!(
1137 evs.iter()
1138 .any(|e| matches!(e, Event::CoordinatorClaim { claimant: 1 }))
1139 );
1140 }
1141
1142 #[test]
1143 fn higher_id_yields_to_lower_claimant() {
1144 let mut node5 = GroupNode::new(5, group_id());
1146 let mut node2 = GroupNode::new(2, group_id());
1147 node5.bootstrap_as_joiner(0, 0);
1148 node2.bootstrap_as_creator(0);
1149 let mut s = PlainSealer;
1150 node5.is_coordinator = true;
1152 let f = node2.claim_coordinator(&mut s, 5).unwrap();
1154 node5.on_wire(&mut s, &f.wire).unwrap();
1155 assert!(!node5.is_coordinator, "node5 yielded to node2");
1156 }
1157
1158 #[test]
1159 fn lower_id_keeps_coordinator_against_higher_claimant() {
1160 let mut node1 = GroupNode::new(1, group_id());
1161 let mut node5 = GroupNode::new(5, group_id());
1162 node1.bootstrap_as_creator(0);
1163 node5.bootstrap_as_joiner(0, 0);
1164 let mut s = PlainSealer;
1165 node1.is_coordinator = true;
1166 let f = node5.claim_coordinator(&mut s, 1).unwrap();
1167 node1.on_wire(&mut s, &f.wire).unwrap();
1168 assert!(node1.is_coordinator, "node1 keeps role — it has lower id");
1169 }
1170
1171 #[test]
1174 fn competing_prepare_lower_member_id_wins() {
1175 let mut node = GroupNode::new(10, group_id());
1178 node.bootstrap_as_joiner(0, 0);
1179 let mut s = PlainSealer;
1180
1181 let mut sender1 = GroupNode::new(1, group_id());
1183 sender1.bootstrap_as_creator(0);
1184 let f1 = sender1
1185 .send_control(
1186 &mut s,
1187 10,
1188 ControlOpcode::PrepareTransition,
1189 1,
1190 1,
1191 b"commit-A".to_vec(),
1192 )
1193 .unwrap();
1194 node.on_wire(&mut s, &f1.wire).unwrap();
1195 assert_eq!(
1196 node.pending_commit_sender,
1197 Some(1),
1198 "member 1 is initial winner"
1199 );
1200
1201 let mut sender3 = GroupNode::new(3, group_id());
1203 sender3.bootstrap_as_creator(0);
1204 let f3 = sender3
1205 .send_control(
1206 &mut s,
1207 10,
1208 ControlOpcode::PrepareTransition,
1209 1,
1210 2,
1211 b"commit-B".to_vec(),
1212 )
1213 .unwrap();
1214 node.on_wire(&mut s, &f3.wire).unwrap();
1215 assert_eq!(node.pending_commit_sender, Some(1), "member 1 still wins");
1217 assert_eq!(node.pending_transition_id, 1);
1218 }
1219
1220 #[test]
1221 fn competing_prepare_later_lower_id_displaces_winner() {
1222 let mut node = GroupNode::new(10, group_id());
1224 node.bootstrap_as_joiner(0, 0);
1225 let mut s = PlainSealer;
1226
1227 let mut sender5 = GroupNode::new(5, group_id());
1228 sender5.bootstrap_as_creator(0);
1229 let f5 = sender5
1230 .send_control(
1231 &mut s,
1232 10,
1233 ControlOpcode::PrepareTransition,
1234 1,
1235 1,
1236 b"commit-X".to_vec(),
1237 )
1238 .unwrap();
1239 node.on_wire(&mut s, &f5.wire).unwrap();
1240 assert_eq!(node.pending_commit_sender, Some(5));
1241
1242 let mut sender2 = GroupNode::new(2, group_id());
1243 sender2.bootstrap_as_creator(0);
1244 let f2 = sender2
1245 .send_control(
1246 &mut s,
1247 10,
1248 ControlOpcode::PrepareTransition,
1249 1,
1250 2,
1251 b"commit-Y".to_vec(),
1252 )
1253 .unwrap();
1254 node.on_wire(&mut s, &f2.wire).unwrap();
1255 assert_eq!(
1256 node.pending_commit_sender,
1257 Some(2),
1258 "member 2 displaces member 5"
1259 );
1260 }
1261
1262 #[test]
1263 fn apply_transition_clears_commit_sender() {
1264 let mut coord = GroupNode::new(1, group_id());
1265 coord.bootstrap_as_creator(0);
1266 let mut s = PlainSealer;
1267 coord
1268 .send_control(&mut s, 0, ControlOpcode::PrepareTransition, 1, 1, vec![])
1269 .unwrap();
1270 coord.apply_transition(1);
1271 assert_eq!(coord.pending_commit_sender, None);
1272 }
1273
1274 #[test]
1277 fn prepare_timeout_fires_when_deadline_exceeded() {
1278 let mut coord = GroupNode::new(1, group_id());
1279 coord.bootstrap_as_creator(0);
1280 let mut s = PlainSealer;
1281 coord
1282 .send_control(&mut s, 0, ControlOpcode::PrepareTransition, 1, 1, vec![])
1283 .unwrap();
1284 coord.prepare_deadline = Some(Instant::now() - Duration::from_millis(1));
1286 let evs = coord.check_timeouts();
1287 assert!(
1288 evs.iter().any(|e| matches!(
1289 e,
1290 Event::Error {
1291 code: codes::PREPARE_TIMEOUT,
1292 ..
1293 }
1294 )),
1295 "expected PREPARE_TIMEOUT, got {:?}",
1296 evs
1297 );
1298 assert_eq!(
1299 coord.transition_state,
1300 TransitionState::TAborted,
1301 "transition aborted"
1302 );
1303 assert_eq!(coord.prepare_deadline, None, "deadline cleared");
1304 }
1305
1306 #[test]
1307 fn execute_timeout_fires_when_deadline_exceeded() {
1308 let mut member = GroupNode::new(2, group_id());
1309 member.bootstrap_as_joiner(0, 0);
1310 let mut s = PlainSealer;
1311 member.pending_transition_id = 1;
1313 member.transition_state = TransitionState::TPrepared;
1314 member
1315 .send_control(&mut s, 1, ControlOpcode::ReadyForTransition, 1, 1, vec![])
1316 .unwrap();
1317 member.execute_deadline = Some(Instant::now() - Duration::from_millis(1));
1319 let evs = member.check_timeouts();
1320 assert!(
1321 evs.iter().any(|e| matches!(
1322 e,
1323 Event::Error {
1324 code: codes::EXECUTE_TIMEOUT,
1325 ..
1326 }
1327 )),
1328 "expected EXECUTE_TIMEOUT, got {:?}",
1329 evs
1330 );
1331 assert_eq!(member.execute_deadline, None, "deadline cleared");
1332 }
1333
1334 #[test]
1335 fn coordinator_gone_fires_after_silence() {
1336 let mut member = GroupNode::new(2, group_id());
1337 member.bootstrap_as_joiner(0, 0);
1338 member.coordinator_last_seen = Some(Instant::now() - Duration::from_millis(11_000));
1340 let evs = member.check_timeouts();
1341 assert!(
1342 evs.iter().any(|e| matches!(
1343 e,
1344 Event::Error {
1345 code: codes::COORDINATOR_GONE,
1346 ..
1347 }
1348 )),
1349 "expected COORDINATOR_GONE, got {:?}",
1350 evs
1351 );
1352 assert_eq!(member.coordinator_last_seen, None, "timer cleared");
1353 }
1354
1355 #[test]
1356 fn note_coordinator_activity_resets_silence_timer() {
1357 let mut member = GroupNode::new(2, group_id());
1358 member.bootstrap_as_joiner(0, 0);
1359 member.coordinator_last_seen = Some(Instant::now() - Duration::from_millis(11_000));
1361 member.note_coordinator_activity();
1363 let evs = member.check_timeouts();
1364 assert!(
1365 !evs.iter().any(|e| matches!(
1366 e,
1367 Event::Error {
1368 code: codes::COORDINATOR_GONE,
1369 ..
1370 }
1371 )),
1372 "should NOT fire after reset"
1373 );
1374 }
1375
1376 #[test]
1377 fn execute_clears_prepare_deadline() {
1378 let mut coord = GroupNode::new(1, group_id());
1379 coord.bootstrap_as_creator(0);
1380 let mut s = PlainSealer;
1381 coord
1382 .send_control(&mut s, 0, ControlOpcode::PrepareTransition, 1, 1, vec![])
1383 .unwrap();
1384 assert!(coord.prepare_deadline.is_some(), "deadline armed");
1385 coord
1386 .send_control(&mut s, 0, ControlOpcode::ExecuteTransition, 1, 2, vec![])
1387 .unwrap();
1388 assert_eq!(coord.prepare_deadline, None, "deadline cleared on EXECUTE");
1389 assert_eq!(
1390 coord.execute_deadline, None,
1391 "execute_deadline also cleared"
1392 );
1393 }
1394
1395 #[test]
1396 fn receive_prepare_arms_execute_deadline() {
1397 let mut coord = GroupNode::new(1, group_id());
1398 let mut member = GroupNode::new(2, group_id());
1399 coord.bootstrap_as_creator(0);
1400 member.bootstrap_as_joiner(0, 0);
1401 let mut s = PlainSealer;
1402 let f = coord
1403 .send_control(&mut s, 0, ControlOpcode::PrepareTransition, 1, 1, vec![])
1404 .unwrap();
1405 member.on_wire(&mut s, &f.wire).unwrap();
1406 assert!(
1407 member.execute_deadline.is_some(),
1408 "execute_deadline armed on receiving PREPARE"
1409 );
1410 }
1411
1412 #[test]
1413 fn receive_execute_clears_execute_deadline() {
1414 let mut coord = GroupNode::new(1, group_id());
1415 let mut member = GroupNode::new(2, group_id());
1416 coord.bootstrap_as_creator(0);
1417 member.bootstrap_as_joiner(0, 0);
1418 let mut s = PlainSealer;
1419 let prep = coord
1420 .send_control(&mut s, 0, ControlOpcode::PrepareTransition, 1, 1, vec![])
1421 .unwrap();
1422 member.on_wire(&mut s, &prep.wire).unwrap();
1423 let exec = coord
1424 .send_control(&mut s, 0, ControlOpcode::ExecuteTransition, 1, 2, vec![])
1425 .unwrap();
1426 member.on_wire(&mut s, &exec.wire).unwrap();
1427 assert_eq!(member.execute_deadline, None, "cleared on EXECUTE");
1428 }
1429
1430 #[test]
1431 fn no_timeout_when_deadlines_not_set() {
1432 let mut node = GroupNode::new(1, group_id());
1433 node.bootstrap_as_creator(0);
1434 node.drain_events(); let evs = node.check_timeouts();
1436 assert!(evs.is_empty(), "no events without armed deadlines");
1437 }
1438
1439 #[test]
1440 fn prepare_with_already_applied_tid_is_rejected() {
1441 let mut coord = GroupNode::new(1, group_id());
1444 coord.bootstrap_as_creator(0);
1445 let mut s = PlainSealer;
1446 let _ = coord
1447 .send_control(&mut s, 0, ControlOpcode::PrepareTransition, 1, 1, vec![])
1448 .unwrap();
1449 coord.apply_transition(1);
1450 assert_eq!(coord.last_transition_id, 1);
1451 assert_eq!(coord.pending_transition_id, 0);
1452 let mut peer = GroupNode::new(2, group_id());
1456 peer.bootstrap_as_joiner(coord.current_epoch, 0);
1457 let stale = peer
1458 .send_control(&mut s, 1, ControlOpcode::PrepareTransition, 1, 9, vec![])
1459 .unwrap();
1460 let evs = coord.on_wire(&mut s, &stale.wire).unwrap();
1461 let errs = drain_errs(&evs);
1462 assert!(
1463 errs.contains(&codes::TRANSITION_MISMATCH),
1464 "expected TRANSITION_MISMATCH, got {:?}",
1465 errs
1466 );
1467 }
1468
1469 #[test]
1470 fn decrypt_failed_is_non_fatal() {
1471 struct OpenFailSealer;
1473 impl Sealer for OpenFailSealer {
1474 fn seal(
1475 &mut self,
1476 _: StreamType,
1477 _: SequenceNo,
1478 p: &[u8],
1479 ) -> Result<Vec<u8>, MlsError> {
1480 Ok(p.to_vec())
1481 }
1482 fn open(
1483 &mut self,
1484 _: StreamType,
1485 _: SequenceNo,
1486 _: &[u8],
1487 ) -> Result<Vec<u8>, MlsError> {
1488 Err(MlsError::Aead("simulated".into()))
1489 }
1490 }
1491 let mut alice = GroupNode::new(1, group_id());
1492 let mut bob = GroupNode::new(2, group_id());
1493 alice.bootstrap_as_creator(1);
1494 bob.bootstrap_as_joiner(1, 0);
1495 let mut s = PlainSealer;
1496 let sid = alice.member_stream_id(2);
1497 let f = alice
1498 .send_payload(
1499 &mut s,
1500 2,
1501 StreamType::Text,
1502 sid,
1503 GbpFlags::ordered_reliable_ack(),
1504 b"x",
1505 PayloadCodec::Cbor,
1506 )
1507 .unwrap();
1508 let mut fail = OpenFailSealer;
1509 let evs = bob.on_wire(&mut fail, &f.wire).unwrap();
1510 let err = evs
1511 .iter()
1512 .find_map(|e| match e {
1513 Event::Error {
1514 code,
1515 fatal,
1516 retryable,
1517 ..
1518 } => Some((*code, *fatal, *retryable)),
1519 _ => None,
1520 })
1521 .expect("error event");
1522 assert_eq!(err.0, codes::DECRYPT_FAILED);
1523 assert!(!err.1, "must be non-fatal");
1524 assert!(err.2, "must be retryable");
1525 assert_eq!(bob.state, NodeState::Active, "bob stays Active");
1526 }
1527}