1use gbp::{CodecError, ControlMessage, ErrorObject, GbpFrame};
23use gbp_core::{
24 ControlOpcode, ErrorClass, GbpFlags, GroupId, MemberId, NodeState, PayloadCodec, SequenceNo,
25 StreamId, StreamType, TransitionId, TransitionState, codes, errors::ErrorSpec, timeouts,
26};
27use gbp_mls::{MlsError, label_for};
28use std::collections::HashMap;
29use std::time::Duration;
30#[cfg(not(target_arch = "wasm32"))]
31use std::time::Instant;
32#[cfg(target_arch = "wasm32")]
33use web_time::Instant;
34
35#[derive(Debug, thiserror::Error)]
37pub enum NodeError {
38 #[error("codec: {0}")]
40 Codec(#[from] CodecError),
41 #[error("mls: {0}")]
43 Mls(#[from] MlsError),
44 #[error("invalid state: {0}")]
46 InvalidState(String),
47}
48
49pub struct OutboundFrame {
51 pub to: MemberId,
53 pub wire: Vec<u8>,
55}
56
57#[derive(Debug, Clone)]
59pub struct DeliveredPayload {
60 pub stream_type: StreamType,
62 pub stream_id: StreamId,
65 pub sequence_no: SequenceNo,
67 pub flags: u16,
69 pub plaintext: Vec<u8>,
71 pub codec: PayloadCodec,
73}
74
75#[derive(Debug, Clone)]
77pub enum Event {
78 StateChanged {
80 from: NodeState,
82 to: NodeState,
84 },
85 PayloadReceived(DeliveredPayload),
89 Control {
91 from: MemberId,
93 opcode: ControlOpcode,
95 transition_id: TransitionId,
97 request_id: u32,
99 args: Vec<u8>,
102 },
103 Error {
105 code: u16,
107 class: ErrorClass,
109 retryable: bool,
111 fatal: bool,
113 reason: String,
115 },
116 EpochAdvanced {
118 epoch: u64,
120 transition_id: TransitionId,
122 },
123 CoordinatorElectionNeeded,
127 BecameCoordinator,
130 CoordinatorClaim {
132 claimant: MemberId,
134 },
135}
136
137pub struct GroupNode {
144 pub member_id: MemberId,
146 pub is_coordinator: bool,
148 pub group_id: GroupId,
150 pub current_epoch: u64,
153 pub last_transition_id: TransitionId,
155 pub pending_transition_id: TransitionId,
157 pub state: NodeState,
159 pub transition_state: TransitionState,
161
162 out_seq: HashMap<(StreamType, StreamId), SequenceNo>,
163 in_hw: HashMap<(StreamType, StreamId), SequenceNo>,
164 events: Vec<Event>,
165
166 pending_commit_sender: Option<MemberId>,
170 prepare_deadline: Option<Instant>,
173 execute_deadline: Option<Instant>,
176 coordinator_last_seen: Option<Instant>,
179}
180
181impl GroupNode {
182 pub fn new(member_id: MemberId, group_id: GroupId) -> Self {
184 Self {
185 member_id,
186 group_id,
187 is_coordinator: false,
188 current_epoch: 0,
189 last_transition_id: 0,
190 pending_transition_id: 0,
191 state: NodeState::Idle,
192 transition_state: TransitionState::TIdle,
193 out_seq: HashMap::new(),
194 in_hw: HashMap::new(),
195 events: Vec::new(),
196 pending_commit_sender: None,
197 prepare_deadline: None,
198 execute_deadline: None,
199 coordinator_last_seen: None,
200 }
201 }
202
203 pub fn bootstrap_as_creator(&mut self, epoch: u64) {
205 self.transition(NodeState::Connecting);
206 self.transition(NodeState::EstablishingGroup);
207 self.current_epoch = epoch;
208 self.transition(NodeState::Active);
209 }
210
211 pub fn bootstrap_as_joiner(&mut self, epoch: u64, expected_first_tid: u32) {
220 self.transition(NodeState::Connecting);
221 self.transition(NodeState::EstablishingGroup);
222 self.current_epoch = epoch;
223 if expected_first_tid > 0 {
224 self.pending_transition_id = expected_first_tid;
225 self.transition_state = TransitionState::TPrepared;
226 }
227 self.transition(NodeState::Active);
228 }
229
230 pub fn drain_events(&mut self) -> Vec<Event> {
232 std::mem::take(&mut self.events)
233 }
234
235 pub fn member_stream_id(&self, base: u32) -> StreamId {
240 debug_assert!(
241 self.member_id < 1_000_000,
242 "member_id overflow: {0}",
243 self.member_id
244 );
245 base + self.member_id * 100
246 }
247
248 pub fn send_payload<S: Sealer>(
255 &mut self,
256 seal: &mut S,
257 target: MemberId,
258 stream_type: StreamType,
259 stream_id: StreamId,
260 flags: u16,
261 plaintext: &[u8],
262 codec: PayloadCodec,
263 ) -> Result<OutboundFrame, NodeError> {
264 self.assert_can_send()?;
265 let seq = self.next_seq(stream_type, stream_id);
266 let ciphertext = seal.seal(stream_type, seq, plaintext)?;
267 let frame = GbpFrame::new(
268 self.group_id,
269 self.current_epoch,
270 self.last_transition_id,
271 stream_type,
272 stream_id,
273 flags,
274 seq,
275 ciphertext,
276 codec.as_u8(),
277 );
278 Ok(OutboundFrame {
279 to: target,
280 wire: frame.to_cbor(),
281 })
282 }
283
284 pub fn send_control<S: Sealer>(
294 &mut self,
295 seal: &mut S,
296 target: MemberId,
297 opcode: ControlOpcode,
298 transition_id: TransitionId,
299 request_id: u32,
300 args: Vec<u8>,
301 ) -> Result<OutboundFrame, NodeError> {
302 let ctl = ControlMessage::with_args(
303 opcode as u16,
304 request_id,
305 self.member_id,
306 transition_id,
307 args,
308 );
309 let mut flags = GbpFlags::ordered_reliable_system();
310 if matches!(
311 opcode,
312 ControlOpcode::PrepareTransition
313 | ControlOpcode::ReadyForTransition
314 | ControlOpcode::ExecuteTransition
315 ) {
316 flags |= GbpFlags::CRITICAL;
317 }
318 match opcode {
322 ControlOpcode::PrepareTransition => {
323 self.pending_transition_id = transition_id;
324 self.transition_state = TransitionState::TPrepared;
325 self.prepare_deadline =
326 Some(Instant::now() + Duration::from_millis(timeouts::T_PREPARE_MAX_MS));
327 self.execute_deadline = None;
328 }
329 ControlOpcode::ReadyForTransition => {
330 self.execute_deadline =
331 Some(Instant::now() + Duration::from_millis(timeouts::T_EXECUTE_MAX_MS));
332 }
333 ControlOpcode::ExecuteTransition | ControlOpcode::AbortTransition => {
334 self.prepare_deadline = None;
335 self.execute_deadline = None;
336 if opcode == ControlOpcode::AbortTransition {
337 self.pending_transition_id = 0;
338 self.transition_state = TransitionState::TAborted;
339 }
340 }
341 _ => {}
342 }
343 let stream_id = self.member_stream_id(0);
344 self.send_payload(
345 seal,
346 target,
347 StreamType::Control,
348 stream_id,
349 flags,
350 &ctl.to_cbor(),
351 PayloadCodec::Cbor,
352 )
353 }
354
355 pub fn on_wire<S: Sealer>(
366 &mut self,
367 seal: &mut S,
368 wire: &[u8],
369 ) -> Result<Vec<Event>, NodeError> {
370 let frame = match GbpFrame::decode(wire) {
375 Ok(f) => f,
376 Err(e) => {
377 self.emit_err_spec(codes::STREAM_POLICY_VIOLATION, format!("frame decode: {e}"));
378 return Ok(self.drain_events());
379 }
380 };
381 self.deliver_frame(seal, frame)?;
382 Ok(self.drain_events())
383 }
384
385 fn deliver_frame<S: Sealer>(&mut self, seal: &mut S, frame: GbpFrame) -> Result<(), NodeError> {
386 if frame.version != 1 {
389 self.emit_err_spec(codes::UNSUPPORTED_VERSION, "version != 1");
390 return Ok(());
391 }
392 if frame.group_id_array() != self.group_id {
393 self.emit_err_spec(codes::UNKNOWN_GROUP, "group_id");
394 return Ok(());
395 }
396 if frame.epoch != self.current_epoch {
397 self.emit_err_spec(
398 codes::EPOCH_MISMATCH,
399 format!("got {}, expected {}", frame.epoch, self.current_epoch),
400 );
401 self.trigger_resync();
402 return Ok(());
403 }
404 if let Err(e) = frame.validate_payload_size() {
405 self.emit_err_spec(codes::STREAM_POLICY_VIOLATION, format!("payload size: {e}"));
406 return Ok(());
407 }
408 let flags = GbpFlags::from_bits(frame.flags);
409 let st = match frame.stream_type_typed() {
410 Ok(st) => st,
411 Err(_) => {
412 self.emit_err_spec(codes::STREAM_POLICY_VIOLATION, "unknown stream_type");
413 return Ok(());
414 }
415 };
416
417 if st != StreamType::Control
423 && flags.has(GbpFlags::CRITICAL)
424 && frame.transition_id != self.last_transition_id
425 {
426 self.emit_err_spec(
427 codes::TRANSITION_MISMATCH,
428 format!(
429 "got tid={}, expected {}",
430 frame.transition_id, self.last_transition_id
431 ),
432 );
433 return Ok(());
434 }
435
436 let key = (st, frame.stream_id);
437 let hw = self.in_hw.get(&key).copied().unwrap_or(0);
438 if frame.sequence_no <= hw {
439 self.emit_err_spec(
440 codes::REPLAY_DETECTED,
441 format!(
442 "st={} sid={} seq={} hw={}",
443 st, frame.stream_id, frame.sequence_no, hw
444 ),
445 );
446 return Ok(());
447 }
448 self.in_hw.insert(key, frame.sequence_no);
449
450 let plain = match seal.open(st, frame.sequence_no, &frame.encrypted_payload) {
451 Ok(p) => p,
452 Err(e) => {
453 self.emit_err_named(
460 codes::DECRYPT_FAILED,
461 ErrorClass::Crypto,
462 true, false, format!("aead open: {e}"),
465 );
466 return Ok(());
467 }
468 };
469
470 match st {
471 StreamType::Control => self.handle_control(plain),
472 other => self.events.push(Event::PayloadReceived(DeliveredPayload {
473 stream_type: other,
474 stream_id: frame.stream_id,
475 sequence_no: frame.sequence_no,
476 flags: frame.flags,
477 plaintext: plain,
478 codec: frame.payload_codec(),
479 })),
480 }
481 Ok(())
482 }
483
484 fn handle_control(&mut self, plain: Vec<u8>) {
485 let c = match ControlMessage::from_cbor(&plain) {
486 Ok(c) => c,
487 Err(_) => {
488 self.emit_err_spec(codes::STREAM_POLICY_VIOLATION, "control decode");
489 return;
490 }
491 };
492 let opcode = match ControlOpcode::try_from(c.opcode) {
493 Ok(op) => op,
494 Err(_) => {
495 self.emit_err_spec(codes::STREAM_POLICY_VIOLATION, "unknown opcode");
496 return;
497 }
498 };
499 let tid_ok = match opcode {
501 ControlOpcode::PrepareTransition => {
505 c.transition_id > self.last_transition_id
506 && (self.pending_transition_id == 0
507 || self.pending_transition_id == c.transition_id)
508 }
509 ControlOpcode::ReadyForTransition
511 | ControlOpcode::ExecuteTransition
512 | ControlOpcode::AbortTransition => {
513 self.pending_transition_id != 0 && c.transition_id == self.pending_transition_id
514 }
515 _ => true,
518 };
519 if !tid_ok {
520 self.emit_err_spec(
521 codes::TRANSITION_MISMATCH,
522 format!(
523 "control tid={} not valid for {:?} (last={}, pending={})",
524 c.transition_id, opcode, self.last_transition_id, self.pending_transition_id
525 ),
526 );
527 return;
528 }
529 match opcode {
530 ControlOpcode::PrepareTransition => {
531 if self.pending_transition_id == c.transition_id {
535 let current_winner = self.pending_commit_sender.unwrap_or(MemberId::MAX);
536 if c.sender_id >= current_winner {
537 self.events.push(Event::Control {
541 from: c.sender_id,
542 opcode,
543 transition_id: c.transition_id,
544 request_id: c.request_id,
545 args: c.args.to_vec(),
546 });
547 return;
548 }
549 }
551 self.pending_transition_id = c.transition_id;
552 self.pending_commit_sender = Some(c.sender_id);
553 self.transition_state = TransitionState::TPrepared;
554 self.note_coordinator_activity();
556 self.execute_deadline =
558 Some(Instant::now() + Duration::from_millis(timeouts::T_EXECUTE_MAX_MS));
559 }
560 ControlOpcode::ReadyForTransition => {
561 self.transition_state = TransitionState::TReady;
562 self.prepare_deadline = None;
564 }
565 ControlOpcode::ExecuteTransition => {
566 self.execute_deadline = None;
567 self.pending_commit_sender = None;
568 self.apply_transition(c.transition_id);
569 self.note_coordinator_activity();
570 }
571 ControlOpcode::AbortTransition => {
572 self.prepare_deadline = None;
573 self.execute_deadline = None;
574 self.pending_commit_sender = None;
575 self.transition_state = TransitionState::TAborted;
576 self.pending_transition_id = 0;
577 }
578 ControlOpcode::GroupStateDigestResponse => {
579 if self.state == NodeState::Resyncing {
580 self.transition(NodeState::Active);
581 }
582 }
583 ControlOpcode::CapabilitiesAdvertise => {
584 if Self::is_coordinator_claim(&c.args) {
585 self.note_coordinator_activity();
587 if self.is_coordinator && c.sender_id < self.member_id {
591 self.is_coordinator = false;
592 }
593 self.events.push(Event::CoordinatorClaim {
594 claimant: c.sender_id,
595 });
596 }
597 }
598 _ => {}
599 }
600 self.events.push(Event::Control {
601 from: c.sender_id,
602 opcode,
603 transition_id: c.transition_id,
604 request_id: c.request_id,
605 args: c.args.to_vec(),
606 });
607 }
608
609 pub fn apply_transition(&mut self, tid: TransitionId) {
612 self.current_epoch += 1;
613 self.last_transition_id = tid;
614 self.pending_transition_id = 0;
615 self.pending_commit_sender = None;
616 self.transition_state = TransitionState::TExecuted;
617 self.out_seq.clear();
618 self.in_hw.clear();
619 self.events.push(Event::EpochAdvanced {
620 epoch: self.current_epoch,
621 transition_id: tid,
622 });
623 }
624
625 pub fn trigger_resync(&mut self) {
627 if self.state != NodeState::Resyncing {
628 self.transition(NodeState::Resyncing);
629 }
630 }
631
632 pub fn check_timeouts(&mut self) -> Vec<Event> {
639 let now = Instant::now();
640
641 if self.prepare_deadline.is_some_and(|d| now >= d) {
642 self.prepare_deadline = None;
643 self.execute_deadline = None;
644 self.pending_transition_id = 0;
645 self.transition_state = TransitionState::TAborted;
646 self.emit_err_spec(codes::PREPARE_TIMEOUT, "T_prepare_max exceeded");
647 }
648
649 if self.execute_deadline.is_some_and(|d| now >= d) {
650 self.execute_deadline = None;
651 self.emit_err_spec(codes::EXECUTE_TIMEOUT, "T_execute_max exceeded");
652 }
653
654 if self.coordinator_last_seen.is_some_and(|t| {
655 now.duration_since(t).as_millis() as u64 >= timeouts::T_COORDINATOR_GRACE_MS
656 }) {
657 self.coordinator_last_seen = None;
658 self.is_coordinator = false;
659 self.emit_err_spec(
660 codes::COORDINATOR_GONE,
661 "coordinator silence exceeded T_coordinator_grace",
662 );
663 self.events.push(Event::CoordinatorElectionNeeded);
664 }
665
666 self.drain_events()
667 }
668
669 pub fn note_coordinator_activity(&mut self) {
676 self.coordinator_last_seen = Some(Instant::now());
677 }
678
679 pub fn claim_coordinator<S: Sealer>(
690 &mut self,
691 seal: &mut S,
692 target: MemberId,
693 ) -> Result<OutboundFrame, NodeError> {
694 let args = vec![0xA1u8, 0x00, 0xF5];
696 self.is_coordinator = true;
697 self.coordinator_last_seen = Some(Instant::now());
698 self.events.push(Event::BecameCoordinator);
699 self.send_control(
700 seal,
701 target,
702 ControlOpcode::CapabilitiesAdvertise,
703 self.last_transition_id,
704 0,
705 args,
706 )
707 }
708
709 fn is_coordinator_claim(args: &[u8]) -> bool {
712 if args == [0xA1, 0x00, 0xF5] {
716 return true;
717 }
718 args.windows(2).any(|w| w == [0x00, 0xF5])
722 }
723
724 fn transition(&mut self, next: NodeState) {
725 if self.state == next {
726 return;
727 }
728 if !self.state.can_transition_to(next) {
729 let from = self.state;
730 self.state = NodeState::Failed;
731 self.events.push(Event::StateChanged {
732 from,
733 to: NodeState::Failed,
734 });
735 return;
736 }
737 let from = self.state;
738 self.state = next;
739 self.events.push(Event::StateChanged { from, to: next });
740 }
741
742 fn assert_can_send(&self) -> Result<(), NodeError> {
743 if matches!(
744 self.state,
745 NodeState::Active | NodeState::Resyncing | NodeState::EstablishingGroup
746 ) {
747 Ok(())
748 } else {
749 Err(NodeError::InvalidState(format!(
750 "cannot send in state {}",
751 self.state
752 )))
753 }
754 }
755
756 fn next_seq(&mut self, st: StreamType, sid: StreamId) -> SequenceNo {
757 let entry = self.out_seq.entry((st, sid)).or_insert(0);
758 *entry += 1;
759 *entry
760 }
761
762 fn emit_err_spec(&mut self, code: u16, reason: impl Into<String>) {
763 if let Some(spec) = ErrorSpec::lookup(code) {
764 self.emit_err_named(spec.code, spec.class, spec.retryable, spec.fatal, reason);
765 } else {
766 self.emit_err_named(code, ErrorClass::Policy, false, false, reason);
767 }
768 }
769
770 fn emit_err_named(
771 &mut self,
772 code: u16,
773 class: ErrorClass,
774 retryable: bool,
775 fatal: bool,
776 reason: impl Into<String>,
777 ) {
778 let reason = reason.into();
779 let (class, retryable, fatal) = if let Some(spec) = ErrorSpec::lookup(code) {
782 (spec.class, spec.retryable, spec.fatal)
783 } else {
784 (class, retryable, fatal)
785 };
786 let _ = ErrorObject::new(code, class, retryable, fatal, reason.clone()).to_cbor();
787 self.events.push(Event::Error {
788 code,
789 class,
790 retryable,
791 fatal,
792 reason,
793 });
794 if fatal {
795 let from = self.state;
796 self.state = NodeState::Failed;
797 self.events.push(Event::StateChanged {
798 from,
799 to: NodeState::Failed,
800 });
801 }
802 }
803}
804
805pub trait Sealer {
811 fn seal(&mut self, st: StreamType, seq: SequenceNo, pt: &[u8]) -> Result<Vec<u8>, MlsError>;
813 fn open(&mut self, st: StreamType, seq: SequenceNo, ct: &[u8]) -> Result<Vec<u8>, MlsError>;
815}
816
817impl Sealer for gbp_mls::MlsContext {
818 fn seal(&mut self, st: StreamType, seq: SequenceNo, pt: &[u8]) -> Result<Vec<u8>, MlsError> {
819 gbp_mls::MlsContext::seal(self, label_for(st), seq, pt)
820 }
821 fn open(&mut self, st: StreamType, seq: SequenceNo, ct: &[u8]) -> Result<Vec<u8>, MlsError> {
822 gbp_mls::MlsContext::open(self, label_for(st), seq, ct)
823 }
824}
825
826#[cfg(test)]
827mod tests {
828 use super::*;
829
830 struct PlainSealer;
831 impl Sealer for PlainSealer {
832 fn seal(
833 &mut self,
834 _st: StreamType,
835 _seq: SequenceNo,
836 pt: &[u8],
837 ) -> Result<Vec<u8>, MlsError> {
838 Ok(pt.to_vec())
839 }
840 fn open(
841 &mut self,
842 _st: StreamType,
843 _seq: SequenceNo,
844 ct: &[u8],
845 ) -> Result<Vec<u8>, MlsError> {
846 Ok(ct.to_vec())
847 }
848 }
849
850 fn group_id() -> GroupId {
851 let mut g = [0u8; 16];
852 g[..3].copy_from_slice(b"GBP");
853 g
854 }
855
856 #[test]
857 fn replay_window_rejects_repeat() {
858 let mut alice = GroupNode::new(1, group_id());
859 let mut bob = GroupNode::new(2, group_id());
860 alice.bootstrap_as_creator(1);
861 bob.bootstrap_as_joiner(1, 0);
862 let mut s = PlainSealer;
863 let sid = alice.member_stream_id(2);
864 let f = alice
865 .send_payload(
866 &mut s,
867 2,
868 StreamType::Text,
869 sid,
870 GbpFlags::ordered_reliable_ack(),
871 b"hi",
872 PayloadCodec::Cbor,
873 )
874 .unwrap();
875 let _ = bob.on_wire(&mut s, &f.wire).unwrap();
876 let evs = bob.on_wire(&mut s, &f.wire).unwrap();
877 assert!(evs.iter().any(|e| matches!(
878 e,
879 Event::Error {
880 code: codes::REPLAY_DETECTED,
881 ..
882 }
883 )));
884 }
885
886 #[test]
887 fn epoch_mismatch_triggers_resync() {
888 let mut alice = GroupNode::new(1, group_id());
889 let mut bob = GroupNode::new(2, group_id());
890 alice.bootstrap_as_creator(1);
891 bob.bootstrap_as_joiner(1, 0);
892 alice.current_epoch = 2;
893 let mut s = PlainSealer;
894 let sid = alice.member_stream_id(2);
895 let f = alice
896 .send_payload(
897 &mut s,
898 2,
899 StreamType::Text,
900 sid,
901 GbpFlags::ordered_reliable_ack(),
902 b"x",
903 PayloadCodec::Cbor,
904 )
905 .unwrap();
906 let _ = bob.on_wire(&mut s, &f.wire).unwrap();
907 assert_eq!(bob.state, NodeState::Resyncing);
908 }
909
910 #[test]
911 fn payload_emits_received_event() {
912 let mut alice = GroupNode::new(1, group_id());
913 let mut bob = GroupNode::new(2, group_id());
914 alice.bootstrap_as_creator(1);
915 bob.bootstrap_as_joiner(1, 0);
916 let mut s = PlainSealer;
917 let sid = alice.member_stream_id(2);
918 let f = alice
919 .send_payload(
920 &mut s,
921 2,
922 StreamType::Text,
923 sid,
924 GbpFlags::ordered_reliable_ack(),
925 b"payload",
926 PayloadCodec::Cbor,
927 )
928 .unwrap();
929 let evs = bob.on_wire(&mut s, &f.wire).unwrap();
930 let pr = evs
931 .into_iter()
932 .find_map(|e| match e {
933 Event::PayloadReceived(p) => Some(p),
934 _ => None,
935 })
936 .expect("payload");
937 assert_eq!(pr.stream_type, StreamType::Text);
938 assert_eq!(pr.plaintext, b"payload");
939 }
940
941 fn drain_errs(events: &[Event]) -> Vec<u16> {
944 events
945 .iter()
946 .filter_map(|e| match e {
947 Event::Error { code, .. } => Some(*code),
948 _ => None,
949 })
950 .collect()
951 }
952
953 fn drain_controls(events: &[Event]) -> Vec<(ControlOpcode, TransitionId)> {
954 events
955 .iter()
956 .filter_map(|e| match e {
957 Event::Control {
958 opcode,
959 transition_id,
960 ..
961 } => Some((*opcode, *transition_id)),
962 _ => None,
963 })
964 .collect()
965 }
966
967 #[test]
968 fn prepare_transition_sets_pending_on_sender_and_receiver() {
969 let mut coord = GroupNode::new(1, group_id());
970 let mut peer = GroupNode::new(2, group_id());
971 coord.bootstrap_as_creator(0);
972 peer.bootstrap_as_joiner(0, 0);
973 let mut s = PlainSealer;
974 let f = coord
976 .send_control(
977 &mut s,
978 0,
979 ControlOpcode::PrepareTransition,
980 1,
981 100,
982 b"commit-blob".to_vec(),
983 )
984 .unwrap();
985 assert_eq!(coord.pending_transition_id, 1, "sender mirrors pending");
986 assert_eq!(coord.transition_state, TransitionState::TPrepared);
987 let evs = peer.on_wire(&mut s, &f.wire).unwrap();
988 assert_eq!(peer.pending_transition_id, 1, "receiver records pending");
989 assert!(
990 drain_errs(&evs).is_empty(),
991 "no error: {:?}",
992 drain_errs(&evs)
993 );
994 let ctls = drain_controls(&evs);
995 assert_eq!(ctls, vec![(ControlOpcode::PrepareTransition, 1)]);
996 }
997
998 #[test]
999 fn ready_with_wrong_tid_is_rejected() {
1000 let mut coord = GroupNode::new(1, group_id());
1001 let mut peer = GroupNode::new(2, group_id());
1002 coord.bootstrap_as_creator(0);
1003 peer.bootstrap_as_joiner(0, 0);
1004 let mut s = PlainSealer;
1005 let f = coord
1006 .send_control(&mut s, 0, ControlOpcode::PrepareTransition, 1, 1, vec![])
1007 .unwrap();
1008 peer.on_wire(&mut s, &f.wire).unwrap();
1009 let bogus = peer
1011 .send_control(&mut s, 1, ControlOpcode::ReadyForTransition, 7, 1, vec![])
1012 .unwrap();
1013 let evs = coord.on_wire(&mut s, &bogus.wire).unwrap();
1014 let errs = drain_errs(&evs);
1015 assert!(errs.contains(&codes::TRANSITION_MISMATCH), "got {:?}", errs);
1016 }
1017
1018 #[test]
1019 fn execute_advances_epoch_and_clears_pending() {
1020 let mut coord = GroupNode::new(1, group_id());
1021 let mut peer = GroupNode::new(2, group_id());
1022 coord.bootstrap_as_creator(0);
1023 peer.bootstrap_as_joiner(0, 0);
1024 let mut s = PlainSealer;
1025 let prep = coord
1026 .send_control(&mut s, 0, ControlOpcode::PrepareTransition, 1, 1, vec![])
1027 .unwrap();
1028 peer.on_wire(&mut s, &prep.wire).unwrap();
1029 let exec = coord
1031 .send_control(&mut s, 0, ControlOpcode::ExecuteTransition, 1, 2, vec![])
1032 .unwrap();
1033 coord.apply_transition(1);
1034 let evs = peer.on_wire(&mut s, &exec.wire).unwrap();
1035 assert_eq!(coord.last_transition_id, 1);
1036 assert_eq!(coord.current_epoch, 1);
1037 assert_eq!(peer.last_transition_id, 1);
1038 assert_eq!(peer.current_epoch, 1);
1039 assert_eq!(peer.pending_transition_id, 0);
1040 assert!(evs.iter().any(|e| matches!(
1041 e,
1042 Event::EpochAdvanced {
1043 transition_id: 1,
1044 ..
1045 }
1046 )));
1047 }
1048
1049 #[test]
1050 fn abort_clears_pending_no_advance() {
1051 let mut coord = GroupNode::new(1, group_id());
1052 let mut peer = GroupNode::new(2, group_id());
1053 coord.bootstrap_as_creator(0);
1054 peer.bootstrap_as_joiner(0, 0);
1055 let mut s = PlainSealer;
1056 let prep = coord
1057 .send_control(&mut s, 0, ControlOpcode::PrepareTransition, 1, 1, vec![])
1058 .unwrap();
1059 peer.on_wire(&mut s, &prep.wire).unwrap();
1060 let abort = coord
1061 .send_control(&mut s, 0, ControlOpcode::AbortTransition, 1, 2, vec![])
1062 .unwrap();
1063 peer.on_wire(&mut s, &abort.wire).unwrap();
1064 assert_eq!(peer.pending_transition_id, 0);
1065 assert_eq!(peer.current_epoch, 0);
1066 assert_eq!(peer.transition_state, TransitionState::TAborted);
1067 assert_eq!(coord.transition_state, TransitionState::TAborted);
1068 }
1069
1070 #[test]
1071 fn bootstrap_as_joiner_with_expected_tid_accepts_first_execute() {
1072 let mut coord = GroupNode::new(1, group_id());
1073 let mut joiner = GroupNode::new(2, group_id());
1075 coord.bootstrap_as_creator(0);
1076 joiner.bootstrap_as_joiner(0, 1);
1077 assert_eq!(joiner.pending_transition_id, 1);
1078 let mut s = PlainSealer;
1079 let _ = coord
1081 .send_control(&mut s, 0, ControlOpcode::PrepareTransition, 1, 1, vec![])
1082 .unwrap();
1083 let exec = coord
1085 .send_control(&mut s, 0, ControlOpcode::ExecuteTransition, 1, 2, vec![])
1086 .unwrap();
1087 let evs = joiner.on_wire(&mut s, &exec.wire).unwrap();
1088 let errs = drain_errs(&evs);
1089 assert!(
1090 errs.is_empty(),
1091 "expected clean apply, got errors {:?}",
1092 errs
1093 );
1094 assert_eq!(joiner.last_transition_id, 1);
1095 assert_eq!(joiner.current_epoch, 1);
1096 }
1097
1098 #[test]
1101 fn claim_coordinator_sets_flag_and_emits_event() {
1102 let mut node = GroupNode::new(1, group_id());
1103 node.bootstrap_as_creator(0);
1104 node.drain_events();
1105 let mut s = PlainSealer;
1106 let _ = node.claim_coordinator(&mut s, 0).unwrap();
1107 assert!(node.is_coordinator);
1108 let evs = node.drain_events();
1109 assert!(evs.iter().any(|e| matches!(e, Event::BecameCoordinator)));
1110 }
1111
1112 #[test]
1113 fn coordinator_gone_emits_election_needed() {
1114 let mut member = GroupNode::new(2, group_id());
1115 member.bootstrap_as_joiner(0, 0);
1116 member.coordinator_last_seen = Some(Instant::now() - Duration::from_millis(11_000));
1117 let evs = member.check_timeouts();
1118 assert!(
1119 evs.iter()
1120 .any(|e| matches!(e, Event::CoordinatorElectionNeeded))
1121 );
1122 assert!(!member.is_coordinator, "flag cleared on silence");
1123 }
1124
1125 #[test]
1126 fn capabilities_advertise_with_claim_resets_silence_timer() {
1127 let mut member = GroupNode::new(2, group_id());
1128 let mut coord = GroupNode::new(1, group_id());
1129 member.bootstrap_as_joiner(0, 0);
1130 coord.bootstrap_as_creator(0);
1131 let mut s = PlainSealer;
1132 let f = coord.claim_coordinator(&mut s, 2).unwrap();
1134 let evs = member.on_wire(&mut s, &f.wire).unwrap();
1136 assert!(
1137 member.coordinator_last_seen.is_some(),
1138 "silence timer reset"
1139 );
1140 assert!(
1141 evs.iter()
1142 .any(|e| matches!(e, Event::CoordinatorClaim { claimant: 1 }))
1143 );
1144 }
1145
1146 #[test]
1147 fn higher_id_yields_to_lower_claimant() {
1148 let mut node5 = GroupNode::new(5, group_id());
1150 let mut node2 = GroupNode::new(2, group_id());
1151 node5.bootstrap_as_joiner(0, 0);
1152 node2.bootstrap_as_creator(0);
1153 let mut s = PlainSealer;
1154 node5.is_coordinator = true;
1156 let f = node2.claim_coordinator(&mut s, 5).unwrap();
1158 node5.on_wire(&mut s, &f.wire).unwrap();
1159 assert!(!node5.is_coordinator, "node5 yielded to node2");
1160 }
1161
1162 #[test]
1163 fn lower_id_keeps_coordinator_against_higher_claimant() {
1164 let mut node1 = GroupNode::new(1, group_id());
1165 let mut node5 = GroupNode::new(5, group_id());
1166 node1.bootstrap_as_creator(0);
1167 node5.bootstrap_as_joiner(0, 0);
1168 let mut s = PlainSealer;
1169 node1.is_coordinator = true;
1170 let f = node5.claim_coordinator(&mut s, 1).unwrap();
1171 node1.on_wire(&mut s, &f.wire).unwrap();
1172 assert!(node1.is_coordinator, "node1 keeps role — it has lower id");
1173 }
1174
1175 #[test]
1178 fn competing_prepare_lower_member_id_wins() {
1179 let mut node = GroupNode::new(10, group_id());
1182 node.bootstrap_as_joiner(0, 0);
1183 let mut s = PlainSealer;
1184
1185 let mut sender1 = GroupNode::new(1, group_id());
1187 sender1.bootstrap_as_creator(0);
1188 let f1 = sender1
1189 .send_control(
1190 &mut s,
1191 10,
1192 ControlOpcode::PrepareTransition,
1193 1,
1194 1,
1195 b"commit-A".to_vec(),
1196 )
1197 .unwrap();
1198 node.on_wire(&mut s, &f1.wire).unwrap();
1199 assert_eq!(
1200 node.pending_commit_sender,
1201 Some(1),
1202 "member 1 is initial winner"
1203 );
1204
1205 let mut sender3 = GroupNode::new(3, group_id());
1207 sender3.bootstrap_as_creator(0);
1208 let f3 = sender3
1209 .send_control(
1210 &mut s,
1211 10,
1212 ControlOpcode::PrepareTransition,
1213 1,
1214 2,
1215 b"commit-B".to_vec(),
1216 )
1217 .unwrap();
1218 node.on_wire(&mut s, &f3.wire).unwrap();
1219 assert_eq!(node.pending_commit_sender, Some(1), "member 1 still wins");
1221 assert_eq!(node.pending_transition_id, 1);
1222 }
1223
1224 #[test]
1225 fn competing_prepare_later_lower_id_displaces_winner() {
1226 let mut node = GroupNode::new(10, group_id());
1228 node.bootstrap_as_joiner(0, 0);
1229 let mut s = PlainSealer;
1230
1231 let mut sender5 = GroupNode::new(5, group_id());
1232 sender5.bootstrap_as_creator(0);
1233 let f5 = sender5
1234 .send_control(
1235 &mut s,
1236 10,
1237 ControlOpcode::PrepareTransition,
1238 1,
1239 1,
1240 b"commit-X".to_vec(),
1241 )
1242 .unwrap();
1243 node.on_wire(&mut s, &f5.wire).unwrap();
1244 assert_eq!(node.pending_commit_sender, Some(5));
1245
1246 let mut sender2 = GroupNode::new(2, group_id());
1247 sender2.bootstrap_as_creator(0);
1248 let f2 = sender2
1249 .send_control(
1250 &mut s,
1251 10,
1252 ControlOpcode::PrepareTransition,
1253 1,
1254 2,
1255 b"commit-Y".to_vec(),
1256 )
1257 .unwrap();
1258 node.on_wire(&mut s, &f2.wire).unwrap();
1259 assert_eq!(
1260 node.pending_commit_sender,
1261 Some(2),
1262 "member 2 displaces member 5"
1263 );
1264 }
1265
1266 #[test]
1267 fn apply_transition_clears_commit_sender() {
1268 let mut coord = GroupNode::new(1, group_id());
1269 coord.bootstrap_as_creator(0);
1270 let mut s = PlainSealer;
1271 coord
1272 .send_control(&mut s, 0, ControlOpcode::PrepareTransition, 1, 1, vec![])
1273 .unwrap();
1274 coord.apply_transition(1);
1275 assert_eq!(coord.pending_commit_sender, None);
1276 }
1277
1278 #[test]
1281 fn prepare_timeout_fires_when_deadline_exceeded() {
1282 let mut coord = GroupNode::new(1, group_id());
1283 coord.bootstrap_as_creator(0);
1284 let mut s = PlainSealer;
1285 coord
1286 .send_control(&mut s, 0, ControlOpcode::PrepareTransition, 1, 1, vec![])
1287 .unwrap();
1288 coord.prepare_deadline = Some(Instant::now() - Duration::from_millis(1));
1290 let evs = coord.check_timeouts();
1291 assert!(
1292 evs.iter().any(|e| matches!(
1293 e,
1294 Event::Error {
1295 code: codes::PREPARE_TIMEOUT,
1296 ..
1297 }
1298 )),
1299 "expected PREPARE_TIMEOUT, got {:?}",
1300 evs
1301 );
1302 assert_eq!(
1303 coord.transition_state,
1304 TransitionState::TAborted,
1305 "transition aborted"
1306 );
1307 assert_eq!(coord.prepare_deadline, None, "deadline cleared");
1308 }
1309
1310 #[test]
1311 fn execute_timeout_fires_when_deadline_exceeded() {
1312 let mut member = GroupNode::new(2, group_id());
1313 member.bootstrap_as_joiner(0, 0);
1314 let mut s = PlainSealer;
1315 member.pending_transition_id = 1;
1317 member.transition_state = TransitionState::TPrepared;
1318 member
1319 .send_control(&mut s, 1, ControlOpcode::ReadyForTransition, 1, 1, vec![])
1320 .unwrap();
1321 member.execute_deadline = Some(Instant::now() - Duration::from_millis(1));
1323 let evs = member.check_timeouts();
1324 assert!(
1325 evs.iter().any(|e| matches!(
1326 e,
1327 Event::Error {
1328 code: codes::EXECUTE_TIMEOUT,
1329 ..
1330 }
1331 )),
1332 "expected EXECUTE_TIMEOUT, got {:?}",
1333 evs
1334 );
1335 assert_eq!(member.execute_deadline, None, "deadline cleared");
1336 }
1337
1338 #[test]
1339 fn coordinator_gone_fires_after_silence() {
1340 let mut member = GroupNode::new(2, group_id());
1341 member.bootstrap_as_joiner(0, 0);
1342 member.coordinator_last_seen = Some(Instant::now() - Duration::from_millis(11_000));
1344 let evs = member.check_timeouts();
1345 assert!(
1346 evs.iter().any(|e| matches!(
1347 e,
1348 Event::Error {
1349 code: codes::COORDINATOR_GONE,
1350 ..
1351 }
1352 )),
1353 "expected COORDINATOR_GONE, got {:?}",
1354 evs
1355 );
1356 assert_eq!(member.coordinator_last_seen, None, "timer cleared");
1357 }
1358
1359 #[test]
1360 fn note_coordinator_activity_resets_silence_timer() {
1361 let mut member = GroupNode::new(2, group_id());
1362 member.bootstrap_as_joiner(0, 0);
1363 member.coordinator_last_seen = Some(Instant::now() - Duration::from_millis(11_000));
1365 member.note_coordinator_activity();
1367 let evs = member.check_timeouts();
1368 assert!(
1369 !evs.iter().any(|e| matches!(
1370 e,
1371 Event::Error {
1372 code: codes::COORDINATOR_GONE,
1373 ..
1374 }
1375 )),
1376 "should NOT fire after reset"
1377 );
1378 }
1379
1380 #[test]
1381 fn execute_clears_prepare_deadline() {
1382 let mut coord = GroupNode::new(1, group_id());
1383 coord.bootstrap_as_creator(0);
1384 let mut s = PlainSealer;
1385 coord
1386 .send_control(&mut s, 0, ControlOpcode::PrepareTransition, 1, 1, vec![])
1387 .unwrap();
1388 assert!(coord.prepare_deadline.is_some(), "deadline armed");
1389 coord
1390 .send_control(&mut s, 0, ControlOpcode::ExecuteTransition, 1, 2, vec![])
1391 .unwrap();
1392 assert_eq!(coord.prepare_deadline, None, "deadline cleared on EXECUTE");
1393 assert_eq!(
1394 coord.execute_deadline, None,
1395 "execute_deadline also cleared"
1396 );
1397 }
1398
1399 #[test]
1400 fn receive_prepare_arms_execute_deadline() {
1401 let mut coord = GroupNode::new(1, group_id());
1402 let mut member = GroupNode::new(2, group_id());
1403 coord.bootstrap_as_creator(0);
1404 member.bootstrap_as_joiner(0, 0);
1405 let mut s = PlainSealer;
1406 let f = coord
1407 .send_control(&mut s, 0, ControlOpcode::PrepareTransition, 1, 1, vec![])
1408 .unwrap();
1409 member.on_wire(&mut s, &f.wire).unwrap();
1410 assert!(
1411 member.execute_deadline.is_some(),
1412 "execute_deadline armed on receiving PREPARE"
1413 );
1414 }
1415
1416 #[test]
1417 fn receive_execute_clears_execute_deadline() {
1418 let mut coord = GroupNode::new(1, group_id());
1419 let mut member = GroupNode::new(2, group_id());
1420 coord.bootstrap_as_creator(0);
1421 member.bootstrap_as_joiner(0, 0);
1422 let mut s = PlainSealer;
1423 let prep = coord
1424 .send_control(&mut s, 0, ControlOpcode::PrepareTransition, 1, 1, vec![])
1425 .unwrap();
1426 member.on_wire(&mut s, &prep.wire).unwrap();
1427 let exec = coord
1428 .send_control(&mut s, 0, ControlOpcode::ExecuteTransition, 1, 2, vec![])
1429 .unwrap();
1430 member.on_wire(&mut s, &exec.wire).unwrap();
1431 assert_eq!(member.execute_deadline, None, "cleared on EXECUTE");
1432 }
1433
1434 #[test]
1435 fn no_timeout_when_deadlines_not_set() {
1436 let mut node = GroupNode::new(1, group_id());
1437 node.bootstrap_as_creator(0);
1438 node.drain_events(); let evs = node.check_timeouts();
1440 assert!(evs.is_empty(), "no events without armed deadlines");
1441 }
1442
1443 #[test]
1444 fn prepare_with_already_applied_tid_is_rejected() {
1445 let mut coord = GroupNode::new(1, group_id());
1448 coord.bootstrap_as_creator(0);
1449 let mut s = PlainSealer;
1450 let _ = coord
1451 .send_control(&mut s, 0, ControlOpcode::PrepareTransition, 1, 1, vec![])
1452 .unwrap();
1453 coord.apply_transition(1);
1454 assert_eq!(coord.last_transition_id, 1);
1455 assert_eq!(coord.pending_transition_id, 0);
1456 let mut peer = GroupNode::new(2, group_id());
1460 peer.bootstrap_as_joiner(coord.current_epoch, 0);
1461 let stale = peer
1462 .send_control(&mut s, 1, ControlOpcode::PrepareTransition, 1, 9, vec![])
1463 .unwrap();
1464 let evs = coord.on_wire(&mut s, &stale.wire).unwrap();
1465 let errs = drain_errs(&evs);
1466 assert!(
1467 errs.contains(&codes::TRANSITION_MISMATCH),
1468 "expected TRANSITION_MISMATCH, got {:?}",
1469 errs
1470 );
1471 }
1472
1473 #[test]
1474 fn decrypt_failed_is_non_fatal() {
1475 struct OpenFailSealer;
1477 impl Sealer for OpenFailSealer {
1478 fn seal(
1479 &mut self,
1480 _: StreamType,
1481 _: SequenceNo,
1482 p: &[u8],
1483 ) -> Result<Vec<u8>, MlsError> {
1484 Ok(p.to_vec())
1485 }
1486 fn open(
1487 &mut self,
1488 _: StreamType,
1489 _: SequenceNo,
1490 _: &[u8],
1491 ) -> Result<Vec<u8>, MlsError> {
1492 Err(MlsError::Aead("simulated".into()))
1493 }
1494 }
1495 let mut alice = GroupNode::new(1, group_id());
1496 let mut bob = GroupNode::new(2, group_id());
1497 alice.bootstrap_as_creator(1);
1498 bob.bootstrap_as_joiner(1, 0);
1499 let mut s = PlainSealer;
1500 let sid = alice.member_stream_id(2);
1501 let f = alice
1502 .send_payload(
1503 &mut s,
1504 2,
1505 StreamType::Text,
1506 sid,
1507 GbpFlags::ordered_reliable_ack(),
1508 b"x",
1509 PayloadCodec::Cbor,
1510 )
1511 .unwrap();
1512 let mut fail = OpenFailSealer;
1513 let evs = bob.on_wire(&mut fail, &f.wire).unwrap();
1514 let err = evs
1515 .iter()
1516 .find_map(|e| match e {
1517 Event::Error {
1518 code,
1519 fatal,
1520 retryable,
1521 ..
1522 } => Some((*code, *fatal, *retryable)),
1523 _ => None,
1524 })
1525 .expect("error event");
1526 assert_eq!(err.0, codes::DECRYPT_FAILED);
1527 assert!(!err.1, "must be non-fatal");
1528 assert!(err.2, "must be retryable");
1529 assert_eq!(bob.state, NodeState::Active, "bob stays Active");
1530 }
1531}