1use gbp::{CodecError, ControlMessage, ErrorObject, GbpFrame};
23use gbp_core::{
24 ControlOpcode, ErrorClass, GbpFlags, GroupId, MemberId, NodeState, SequenceNo, StreamId,
25 StreamType, TransitionId, TransitionState, codes, errors::ErrorSpec, timeouts,
26};
27use gbp_mls::{MlsError, label_for};
28use std::collections::HashMap;
29use std::time::{Duration, Instant};
30
31#[derive(Debug, thiserror::Error)]
33pub enum NodeError {
34 #[error("codec: {0}")]
36 Codec(#[from] CodecError),
37 #[error("mls: {0}")]
39 Mls(#[from] MlsError),
40 #[error("invalid state: {0}")]
42 InvalidState(String),
43}
44
45pub struct OutboundFrame {
47 pub to: MemberId,
49 pub wire: Vec<u8>,
51}
52
53#[derive(Debug, Clone)]
55pub struct DeliveredPayload {
56 pub stream_type: StreamType,
58 pub stream_id: StreamId,
61 pub sequence_no: SequenceNo,
63 pub flags: u16,
65 pub plaintext: Vec<u8>,
67}
68
69#[derive(Debug, Clone)]
71pub enum Event {
72 StateChanged {
74 from: NodeState,
76 to: NodeState,
78 },
79 PayloadReceived(DeliveredPayload),
83 Control {
85 from: MemberId,
87 opcode: ControlOpcode,
89 transition_id: TransitionId,
91 request_id: u32,
93 args: Vec<u8>,
96 },
97 Error {
99 code: u16,
101 class: ErrorClass,
103 retryable: bool,
105 fatal: bool,
107 reason: String,
109 },
110 EpochAdvanced {
112 epoch: u64,
114 transition_id: TransitionId,
116 },
117 CoordinatorElectionNeeded,
121 BecameCoordinator,
124 CoordinatorClaim {
126 claimant: MemberId,
128 },
129}
130
131pub struct GroupNode {
138 pub member_id: MemberId,
140 pub is_coordinator: bool,
142 pub group_id: GroupId,
144 pub current_epoch: u64,
147 pub last_transition_id: TransitionId,
149 pub pending_transition_id: TransitionId,
151 pub state: NodeState,
153 pub transition_state: TransitionState,
155
156 out_seq: HashMap<(StreamType, StreamId), SequenceNo>,
157 in_hw: HashMap<(StreamType, StreamId), SequenceNo>,
158 events: Vec<Event>,
159
160 pending_commit_sender: Option<MemberId>,
164 prepare_deadline: Option<Instant>,
167 execute_deadline: Option<Instant>,
170 coordinator_last_seen: Option<Instant>,
173}
174
175impl GroupNode {
176 pub fn new(member_id: MemberId, group_id: GroupId) -> Self {
178 Self {
179 member_id,
180 group_id,
181 is_coordinator: false,
182 current_epoch: 0,
183 last_transition_id: 0,
184 pending_transition_id: 0,
185 state: NodeState::Idle,
186 transition_state: TransitionState::TIdle,
187 out_seq: HashMap::new(),
188 in_hw: HashMap::new(),
189 events: Vec::new(),
190 pending_commit_sender: None,
191 prepare_deadline: None,
192 execute_deadline: None,
193 coordinator_last_seen: None,
194 }
195 }
196
197 pub fn bootstrap_as_creator(&mut self, epoch: u64) {
199 self.transition(NodeState::Connecting);
200 self.transition(NodeState::EstablishingGroup);
201 self.current_epoch = epoch;
202 self.transition(NodeState::Active);
203 }
204
205 pub fn bootstrap_as_joiner(&mut self, epoch: u64, expected_first_tid: u32) {
214 self.transition(NodeState::Connecting);
215 self.transition(NodeState::EstablishingGroup);
216 self.current_epoch = epoch;
217 if expected_first_tid > 0 {
218 self.pending_transition_id = expected_first_tid;
219 self.transition_state = TransitionState::TPrepared;
220 }
221 self.transition(NodeState::Active);
222 }
223
224 pub fn drain_events(&mut self) -> Vec<Event> {
226 std::mem::take(&mut self.events)
227 }
228
229 pub fn member_stream_id(&self, base: u32) -> StreamId {
234 debug_assert!(
235 self.member_id < 1_000_000,
236 "member_id overflow: {0}",
237 self.member_id
238 );
239 base + self.member_id * 100
240 }
241
242 pub fn send_payload<S: Sealer>(
247 &mut self,
248 seal: &mut S,
249 target: MemberId,
250 stream_type: StreamType,
251 stream_id: StreamId,
252 flags: u16,
253 plaintext: &[u8],
254 ) -> Result<OutboundFrame, NodeError> {
255 self.assert_can_send()?;
256 let seq = self.next_seq(stream_type, stream_id);
257 let ciphertext = seal.seal(stream_type, seq, plaintext)?;
258 let frame = GbpFrame::new(
259 self.group_id,
260 self.current_epoch,
261 self.last_transition_id,
262 stream_type,
263 stream_id,
264 flags,
265 seq,
266 ciphertext,
267 );
268 Ok(OutboundFrame {
269 to: target,
270 wire: frame.to_cbor(),
271 })
272 }
273
274 pub fn send_control<S: Sealer>(
284 &mut self,
285 seal: &mut S,
286 target: MemberId,
287 opcode: ControlOpcode,
288 transition_id: TransitionId,
289 request_id: u32,
290 args: Vec<u8>,
291 ) -> Result<OutboundFrame, NodeError> {
292 let ctl = ControlMessage::with_args(
293 opcode as u16,
294 request_id,
295 self.member_id,
296 transition_id,
297 args,
298 );
299 let mut flags = GbpFlags::ordered_reliable_system();
300 if matches!(
301 opcode,
302 ControlOpcode::PrepareTransition
303 | ControlOpcode::ReadyForTransition
304 | ControlOpcode::ExecuteTransition
305 ) {
306 flags |= GbpFlags::CRITICAL;
307 }
308 match opcode {
312 ControlOpcode::PrepareTransition => {
313 self.pending_transition_id = transition_id;
314 self.transition_state = TransitionState::TPrepared;
315 self.prepare_deadline =
316 Some(Instant::now() + Duration::from_millis(timeouts::T_PREPARE_MAX_MS));
317 self.execute_deadline = None;
318 }
319 ControlOpcode::ReadyForTransition => {
320 self.execute_deadline =
321 Some(Instant::now() + Duration::from_millis(timeouts::T_EXECUTE_MAX_MS));
322 }
323 ControlOpcode::ExecuteTransition | ControlOpcode::AbortTransition => {
324 self.prepare_deadline = None;
325 self.execute_deadline = None;
326 if opcode == ControlOpcode::AbortTransition {
327 self.pending_transition_id = 0;
328 self.transition_state = TransitionState::TAborted;
329 }
330 }
331 _ => {}
332 }
333 let stream_id = self.member_stream_id(0);
334 self.send_payload(
335 seal,
336 target,
337 StreamType::Control,
338 stream_id,
339 flags,
340 &ctl.to_cbor(),
341 )
342 }
343
344 pub fn on_wire<S: Sealer>(
355 &mut self,
356 seal: &mut S,
357 wire: &[u8],
358 ) -> Result<Vec<Event>, NodeError> {
359 let frame = match GbpFrame::decode(wire) {
364 Ok(f) => f,
365 Err(e) => {
366 self.emit_err_spec(codes::STREAM_POLICY_VIOLATION, format!("frame decode: {e}"));
367 return Ok(self.drain_events());
368 }
369 };
370 self.deliver_frame(seal, frame)?;
371 Ok(self.drain_events())
372 }
373
374 fn deliver_frame<S: Sealer>(&mut self, seal: &mut S, frame: GbpFrame) -> Result<(), NodeError> {
375 if frame.version != 1 {
378 self.emit_err_spec(codes::UNSUPPORTED_VERSION, "version != 1");
379 return Ok(());
380 }
381 if frame.group_id_array() != self.group_id {
382 self.emit_err_spec(codes::UNKNOWN_GROUP, "group_id");
383 return Ok(());
384 }
385 if frame.epoch != self.current_epoch {
386 self.emit_err_spec(
387 codes::EPOCH_MISMATCH,
388 format!("got {}, expected {}", frame.epoch, self.current_epoch),
389 );
390 self.trigger_resync();
391 return Ok(());
392 }
393 if let Err(e) = frame.validate_payload_size() {
394 self.emit_err_spec(codes::STREAM_POLICY_VIOLATION, format!("payload size: {e}"));
395 return Ok(());
396 }
397 let flags = GbpFlags::from_bits(frame.flags);
398 let st = match frame.stream_type_typed() {
399 Ok(st) => st,
400 Err(_) => {
401 self.emit_err_spec(codes::STREAM_POLICY_VIOLATION, "unknown stream_type");
402 return Ok(());
403 }
404 };
405
406 if st != StreamType::Control
412 && flags.has(GbpFlags::CRITICAL)
413 && frame.transition_id != self.last_transition_id
414 {
415 self.emit_err_spec(
416 codes::TRANSITION_MISMATCH,
417 format!(
418 "got tid={}, expected {}",
419 frame.transition_id, self.last_transition_id
420 ),
421 );
422 return Ok(());
423 }
424
425 let key = (st, frame.stream_id);
426 let hw = self.in_hw.get(&key).copied().unwrap_or(0);
427 if frame.sequence_no <= hw {
428 self.emit_err_spec(
429 codes::REPLAY_DETECTED,
430 format!(
431 "st={} sid={} seq={} hw={}",
432 st, frame.stream_id, frame.sequence_no, hw
433 ),
434 );
435 return Ok(());
436 }
437 self.in_hw.insert(key, frame.sequence_no);
438
439 let plain = match seal.open(st, frame.sequence_no, &frame.encrypted_payload) {
440 Ok(p) => p,
441 Err(e) => {
442 self.emit_err_named(
449 codes::DECRYPT_FAILED,
450 ErrorClass::Crypto,
451 true, false, format!("aead open: {e}"),
454 );
455 return Ok(());
456 }
457 };
458
459 match st {
460 StreamType::Control => self.handle_control(plain),
461 other => self.events.push(Event::PayloadReceived(DeliveredPayload {
462 stream_type: other,
463 stream_id: frame.stream_id,
464 sequence_no: frame.sequence_no,
465 flags: frame.flags,
466 plaintext: plain,
467 })),
468 }
469 Ok(())
470 }
471
472 fn handle_control(&mut self, plain: Vec<u8>) {
473 let c = match ControlMessage::from_cbor(&plain) {
474 Ok(c) => c,
475 Err(_) => {
476 self.emit_err_spec(codes::STREAM_POLICY_VIOLATION, "control decode");
477 return;
478 }
479 };
480 let opcode = match ControlOpcode::try_from(c.opcode) {
481 Ok(op) => op,
482 Err(_) => {
483 self.emit_err_spec(codes::STREAM_POLICY_VIOLATION, "unknown opcode");
484 return;
485 }
486 };
487 let tid_ok = match opcode {
489 ControlOpcode::PrepareTransition => {
493 c.transition_id > self.last_transition_id
494 && (self.pending_transition_id == 0
495 || self.pending_transition_id == c.transition_id)
496 }
497 ControlOpcode::ReadyForTransition
499 | ControlOpcode::ExecuteTransition
500 | ControlOpcode::AbortTransition => {
501 self.pending_transition_id != 0 && c.transition_id == self.pending_transition_id
502 }
503 _ => true,
506 };
507 if !tid_ok {
508 self.emit_err_spec(
509 codes::TRANSITION_MISMATCH,
510 format!(
511 "control tid={} not valid for {:?} (last={}, pending={})",
512 c.transition_id, opcode, self.last_transition_id, self.pending_transition_id
513 ),
514 );
515 return;
516 }
517 match opcode {
518 ControlOpcode::PrepareTransition => {
519 if self.pending_transition_id == c.transition_id {
523 let current_winner = self.pending_commit_sender.unwrap_or(MemberId::MAX);
524 if c.sender_id >= current_winner {
525 self.events.push(Event::Control {
529 from: c.sender_id,
530 opcode,
531 transition_id: c.transition_id,
532 request_id: c.request_id,
533 args: c.args.to_vec(),
534 });
535 return;
536 }
537 }
539 self.pending_transition_id = c.transition_id;
540 self.pending_commit_sender = Some(c.sender_id);
541 self.transition_state = TransitionState::TPrepared;
542 self.note_coordinator_activity();
544 self.execute_deadline =
546 Some(Instant::now() + Duration::from_millis(timeouts::T_EXECUTE_MAX_MS));
547 }
548 ControlOpcode::ReadyForTransition => {
549 self.transition_state = TransitionState::TReady;
550 self.prepare_deadline = None;
552 }
553 ControlOpcode::ExecuteTransition => {
554 self.execute_deadline = None;
555 self.pending_commit_sender = None;
556 self.apply_transition(c.transition_id);
557 self.note_coordinator_activity();
558 }
559 ControlOpcode::AbortTransition => {
560 self.prepare_deadline = None;
561 self.execute_deadline = None;
562 self.pending_commit_sender = None;
563 self.transition_state = TransitionState::TAborted;
564 self.pending_transition_id = 0;
565 }
566 ControlOpcode::GroupStateDigestResponse => {
567 if self.state == NodeState::Resyncing {
568 self.transition(NodeState::Active);
569 }
570 }
571 ControlOpcode::CapabilitiesAdvertise => {
572 if Self::is_coordinator_claim(&c.args) {
573 self.note_coordinator_activity();
575 if self.is_coordinator && c.sender_id < self.member_id {
579 self.is_coordinator = false;
580 }
581 self.events.push(Event::CoordinatorClaim {
582 claimant: c.sender_id,
583 });
584 }
585 }
586 _ => {}
587 }
588 self.events.push(Event::Control {
589 from: c.sender_id,
590 opcode,
591 transition_id: c.transition_id,
592 request_id: c.request_id,
593 args: c.args.to_vec(),
594 });
595 }
596
597 pub fn apply_transition(&mut self, tid: TransitionId) {
600 self.current_epoch += 1;
601 self.last_transition_id = tid;
602 self.pending_transition_id = 0;
603 self.pending_commit_sender = None;
604 self.transition_state = TransitionState::TExecuted;
605 self.out_seq.clear();
606 self.in_hw.clear();
607 self.events.push(Event::EpochAdvanced {
608 epoch: self.current_epoch,
609 transition_id: tid,
610 });
611 }
612
613 pub fn trigger_resync(&mut self) {
615 if self.state != NodeState::Resyncing {
616 self.transition(NodeState::Resyncing);
617 }
618 }
619
620 pub fn check_timeouts(&mut self) -> Vec<Event> {
627 let now = Instant::now();
628
629 if self.prepare_deadline.is_some_and(|d| now >= d) {
630 self.prepare_deadline = None;
631 self.execute_deadline = None;
632 self.pending_transition_id = 0;
633 self.transition_state = TransitionState::TAborted;
634 self.emit_err_spec(codes::PREPARE_TIMEOUT, "T_prepare_max exceeded");
635 }
636
637 if self.execute_deadline.is_some_and(|d| now >= d) {
638 self.execute_deadline = None;
639 self.emit_err_spec(codes::EXECUTE_TIMEOUT, "T_execute_max exceeded");
640 }
641
642 if self.coordinator_last_seen.is_some_and(|t| {
643 now.duration_since(t).as_millis() as u64 >= timeouts::T_COORDINATOR_GRACE_MS
644 }) {
645 self.coordinator_last_seen = None;
646 self.is_coordinator = false;
647 self.emit_err_spec(
648 codes::COORDINATOR_GONE,
649 "coordinator silence exceeded T_coordinator_grace",
650 );
651 self.events.push(Event::CoordinatorElectionNeeded);
652 }
653
654 self.drain_events()
655 }
656
657 pub fn note_coordinator_activity(&mut self) {
664 self.coordinator_last_seen = Some(Instant::now());
665 }
666
667 pub fn claim_coordinator<S: Sealer>(
678 &mut self,
679 seal: &mut S,
680 target: MemberId,
681 ) -> Result<OutboundFrame, NodeError> {
682 let args = vec![0xA1u8, 0x00, 0xF5];
684 self.is_coordinator = true;
685 self.coordinator_last_seen = Some(Instant::now());
686 self.events.push(Event::BecameCoordinator);
687 self.send_control(
688 seal,
689 target,
690 ControlOpcode::CapabilitiesAdvertise,
691 self.last_transition_id,
692 0,
693 args,
694 )
695 }
696
697 fn is_coordinator_claim(args: &[u8]) -> bool {
700 if args == [0xA1, 0x00, 0xF5] {
704 return true;
705 }
706 args.windows(2).any(|w| w == [0x00, 0xF5])
710 }
711
712 fn transition(&mut self, next: NodeState) {
713 if self.state == next {
714 return;
715 }
716 if !self.state.can_transition_to(next) {
717 let from = self.state;
718 self.state = NodeState::Failed;
719 self.events.push(Event::StateChanged {
720 from,
721 to: NodeState::Failed,
722 });
723 return;
724 }
725 let from = self.state;
726 self.state = next;
727 self.events.push(Event::StateChanged { from, to: next });
728 }
729
730 fn assert_can_send(&self) -> Result<(), NodeError> {
731 if matches!(
732 self.state,
733 NodeState::Active | NodeState::Resyncing | NodeState::EstablishingGroup
734 ) {
735 Ok(())
736 } else {
737 Err(NodeError::InvalidState(format!(
738 "cannot send in state {}",
739 self.state
740 )))
741 }
742 }
743
744 fn next_seq(&mut self, st: StreamType, sid: StreamId) -> SequenceNo {
745 let entry = self.out_seq.entry((st, sid)).or_insert(0);
746 *entry += 1;
747 *entry
748 }
749
750 fn emit_err_spec(&mut self, code: u16, reason: impl Into<String>) {
751 if let Some(spec) = ErrorSpec::lookup(code) {
752 self.emit_err_named(spec.code, spec.class, spec.retryable, spec.fatal, reason);
753 } else {
754 self.emit_err_named(code, ErrorClass::Policy, false, false, reason);
755 }
756 }
757
758 fn emit_err_named(
759 &mut self,
760 code: u16,
761 class: ErrorClass,
762 retryable: bool,
763 fatal: bool,
764 reason: impl Into<String>,
765 ) {
766 let reason = reason.into();
767 let (class, retryable, fatal) = if let Some(spec) = ErrorSpec::lookup(code) {
770 (spec.class, spec.retryable, spec.fatal)
771 } else {
772 (class, retryable, fatal)
773 };
774 let _ = ErrorObject::new(code, class, retryable, fatal, reason.clone()).to_cbor();
775 self.events.push(Event::Error {
776 code,
777 class,
778 retryable,
779 fatal,
780 reason,
781 });
782 if fatal {
783 let from = self.state;
784 self.state = NodeState::Failed;
785 self.events.push(Event::StateChanged {
786 from,
787 to: NodeState::Failed,
788 });
789 }
790 }
791}
792
793pub trait Sealer {
799 fn seal(&mut self, st: StreamType, seq: SequenceNo, pt: &[u8]) -> Result<Vec<u8>, MlsError>;
801 fn open(&mut self, st: StreamType, seq: SequenceNo, ct: &[u8]) -> Result<Vec<u8>, MlsError>;
803}
804
805impl Sealer for gbp_mls::MlsContext {
806 fn seal(&mut self, st: StreamType, seq: SequenceNo, pt: &[u8]) -> Result<Vec<u8>, MlsError> {
807 gbp_mls::MlsContext::seal(self, label_for(st), seq, pt)
808 }
809 fn open(&mut self, st: StreamType, seq: SequenceNo, ct: &[u8]) -> Result<Vec<u8>, MlsError> {
810 gbp_mls::MlsContext::open(self, label_for(st), seq, ct)
811 }
812}
813
814#[cfg(test)]
815mod tests {
816 use super::*;
817
818 struct PlainSealer;
819 impl Sealer for PlainSealer {
820 fn seal(
821 &mut self,
822 _st: StreamType,
823 _seq: SequenceNo,
824 pt: &[u8],
825 ) -> Result<Vec<u8>, MlsError> {
826 Ok(pt.to_vec())
827 }
828 fn open(
829 &mut self,
830 _st: StreamType,
831 _seq: SequenceNo,
832 ct: &[u8],
833 ) -> Result<Vec<u8>, MlsError> {
834 Ok(ct.to_vec())
835 }
836 }
837
838 fn group_id() -> GroupId {
839 let mut g = [0u8; 16];
840 g[..3].copy_from_slice(b"GBP");
841 g
842 }
843
844 #[test]
845 fn replay_window_rejects_repeat() {
846 let mut alice = GroupNode::new(1, group_id());
847 let mut bob = GroupNode::new(2, group_id());
848 alice.bootstrap_as_creator(1);
849 bob.bootstrap_as_joiner(1, 0);
850 let mut s = PlainSealer;
851 let sid = alice.member_stream_id(2);
852 let f = alice
853 .send_payload(
854 &mut s,
855 2,
856 StreamType::Text,
857 sid,
858 GbpFlags::ordered_reliable_ack(),
859 b"hi",
860 )
861 .unwrap();
862 let _ = bob.on_wire(&mut s, &f.wire).unwrap();
863 let evs = bob.on_wire(&mut s, &f.wire).unwrap();
864 assert!(evs.iter().any(|e| matches!(
865 e,
866 Event::Error {
867 code: codes::REPLAY_DETECTED,
868 ..
869 }
870 )));
871 }
872
873 #[test]
874 fn epoch_mismatch_triggers_resync() {
875 let mut alice = GroupNode::new(1, group_id());
876 let mut bob = GroupNode::new(2, group_id());
877 alice.bootstrap_as_creator(1);
878 bob.bootstrap_as_joiner(1, 0);
879 alice.current_epoch = 2;
880 let mut s = PlainSealer;
881 let sid = alice.member_stream_id(2);
882 let f = alice
883 .send_payload(
884 &mut s,
885 2,
886 StreamType::Text,
887 sid,
888 GbpFlags::ordered_reliable_ack(),
889 b"x",
890 )
891 .unwrap();
892 let _ = bob.on_wire(&mut s, &f.wire).unwrap();
893 assert_eq!(bob.state, NodeState::Resyncing);
894 }
895
896 #[test]
897 fn payload_emits_received_event() {
898 let mut alice = GroupNode::new(1, group_id());
899 let mut bob = GroupNode::new(2, group_id());
900 alice.bootstrap_as_creator(1);
901 bob.bootstrap_as_joiner(1, 0);
902 let mut s = PlainSealer;
903 let sid = alice.member_stream_id(2);
904 let f = alice
905 .send_payload(
906 &mut s,
907 2,
908 StreamType::Text,
909 sid,
910 GbpFlags::ordered_reliable_ack(),
911 b"payload",
912 )
913 .unwrap();
914 let evs = bob.on_wire(&mut s, &f.wire).unwrap();
915 let pr = evs
916 .into_iter()
917 .find_map(|e| match e {
918 Event::PayloadReceived(p) => Some(p),
919 _ => None,
920 })
921 .expect("payload");
922 assert_eq!(pr.stream_type, StreamType::Text);
923 assert_eq!(pr.plaintext, b"payload");
924 }
925
926 fn drain_errs(events: &[Event]) -> Vec<u16> {
929 events
930 .iter()
931 .filter_map(|e| match e {
932 Event::Error { code, .. } => Some(*code),
933 _ => None,
934 })
935 .collect()
936 }
937
938 fn drain_controls(events: &[Event]) -> Vec<(ControlOpcode, TransitionId)> {
939 events
940 .iter()
941 .filter_map(|e| match e {
942 Event::Control {
943 opcode,
944 transition_id,
945 ..
946 } => Some((*opcode, *transition_id)),
947 _ => None,
948 })
949 .collect()
950 }
951
952 #[test]
953 fn prepare_transition_sets_pending_on_sender_and_receiver() {
954 let mut coord = GroupNode::new(1, group_id());
955 let mut peer = GroupNode::new(2, group_id());
956 coord.bootstrap_as_creator(0);
957 peer.bootstrap_as_joiner(0, 0);
958 let mut s = PlainSealer;
959 let f = coord
961 .send_control(
962 &mut s,
963 0,
964 ControlOpcode::PrepareTransition,
965 1,
966 100,
967 b"commit-blob".to_vec(),
968 )
969 .unwrap();
970 assert_eq!(coord.pending_transition_id, 1, "sender mirrors pending");
971 assert_eq!(coord.transition_state, TransitionState::TPrepared);
972 let evs = peer.on_wire(&mut s, &f.wire).unwrap();
973 assert_eq!(peer.pending_transition_id, 1, "receiver records pending");
974 assert!(
975 drain_errs(&evs).is_empty(),
976 "no error: {:?}",
977 drain_errs(&evs)
978 );
979 let ctls = drain_controls(&evs);
980 assert_eq!(ctls, vec![(ControlOpcode::PrepareTransition, 1)]);
981 }
982
983 #[test]
984 fn ready_with_wrong_tid_is_rejected() {
985 let mut coord = GroupNode::new(1, group_id());
986 let mut peer = GroupNode::new(2, group_id());
987 coord.bootstrap_as_creator(0);
988 peer.bootstrap_as_joiner(0, 0);
989 let mut s = PlainSealer;
990 let f = coord
991 .send_control(&mut s, 0, ControlOpcode::PrepareTransition, 1, 1, vec![])
992 .unwrap();
993 peer.on_wire(&mut s, &f.wire).unwrap();
994 let bogus = peer
996 .send_control(&mut s, 1, ControlOpcode::ReadyForTransition, 7, 1, vec![])
997 .unwrap();
998 let evs = coord.on_wire(&mut s, &bogus.wire).unwrap();
999 let errs = drain_errs(&evs);
1000 assert!(errs.contains(&codes::TRANSITION_MISMATCH), "got {:?}", errs);
1001 }
1002
1003 #[test]
1004 fn execute_advances_epoch_and_clears_pending() {
1005 let mut coord = GroupNode::new(1, group_id());
1006 let mut peer = GroupNode::new(2, group_id());
1007 coord.bootstrap_as_creator(0);
1008 peer.bootstrap_as_joiner(0, 0);
1009 let mut s = PlainSealer;
1010 let prep = coord
1011 .send_control(&mut s, 0, ControlOpcode::PrepareTransition, 1, 1, vec![])
1012 .unwrap();
1013 peer.on_wire(&mut s, &prep.wire).unwrap();
1014 let exec = coord
1016 .send_control(&mut s, 0, ControlOpcode::ExecuteTransition, 1, 2, vec![])
1017 .unwrap();
1018 coord.apply_transition(1);
1019 let evs = peer.on_wire(&mut s, &exec.wire).unwrap();
1020 assert_eq!(coord.last_transition_id, 1);
1021 assert_eq!(coord.current_epoch, 1);
1022 assert_eq!(peer.last_transition_id, 1);
1023 assert_eq!(peer.current_epoch, 1);
1024 assert_eq!(peer.pending_transition_id, 0);
1025 assert!(evs.iter().any(|e| matches!(
1026 e,
1027 Event::EpochAdvanced {
1028 transition_id: 1,
1029 ..
1030 }
1031 )));
1032 }
1033
1034 #[test]
1035 fn abort_clears_pending_no_advance() {
1036 let mut coord = GroupNode::new(1, group_id());
1037 let mut peer = GroupNode::new(2, group_id());
1038 coord.bootstrap_as_creator(0);
1039 peer.bootstrap_as_joiner(0, 0);
1040 let mut s = PlainSealer;
1041 let prep = coord
1042 .send_control(&mut s, 0, ControlOpcode::PrepareTransition, 1, 1, vec![])
1043 .unwrap();
1044 peer.on_wire(&mut s, &prep.wire).unwrap();
1045 let abort = coord
1046 .send_control(&mut s, 0, ControlOpcode::AbortTransition, 1, 2, vec![])
1047 .unwrap();
1048 peer.on_wire(&mut s, &abort.wire).unwrap();
1049 assert_eq!(peer.pending_transition_id, 0);
1050 assert_eq!(peer.current_epoch, 0);
1051 assert_eq!(peer.transition_state, TransitionState::TAborted);
1052 assert_eq!(coord.transition_state, TransitionState::TAborted);
1053 }
1054
1055 #[test]
1056 fn bootstrap_as_joiner_with_expected_tid_accepts_first_execute() {
1057 let mut coord = GroupNode::new(1, group_id());
1058 let mut joiner = GroupNode::new(2, group_id());
1060 coord.bootstrap_as_creator(0);
1061 joiner.bootstrap_as_joiner(0, 1);
1062 assert_eq!(joiner.pending_transition_id, 1);
1063 let mut s = PlainSealer;
1064 let _ = coord
1066 .send_control(&mut s, 0, ControlOpcode::PrepareTransition, 1, 1, vec![])
1067 .unwrap();
1068 let exec = coord
1070 .send_control(&mut s, 0, ControlOpcode::ExecuteTransition, 1, 2, vec![])
1071 .unwrap();
1072 let evs = joiner.on_wire(&mut s, &exec.wire).unwrap();
1073 let errs = drain_errs(&evs);
1074 assert!(
1075 errs.is_empty(),
1076 "expected clean apply, got errors {:?}",
1077 errs
1078 );
1079 assert_eq!(joiner.last_transition_id, 1);
1080 assert_eq!(joiner.current_epoch, 1);
1081 }
1082
1083 #[test]
1086 fn claim_coordinator_sets_flag_and_emits_event() {
1087 let mut node = GroupNode::new(1, group_id());
1088 node.bootstrap_as_creator(0);
1089 node.drain_events();
1090 let mut s = PlainSealer;
1091 let _ = node.claim_coordinator(&mut s, 0).unwrap();
1092 assert!(node.is_coordinator);
1093 let evs = node.drain_events();
1094 assert!(evs.iter().any(|e| matches!(e, Event::BecameCoordinator)));
1095 }
1096
1097 #[test]
1098 fn coordinator_gone_emits_election_needed() {
1099 let mut member = GroupNode::new(2, group_id());
1100 member.bootstrap_as_joiner(0, 0);
1101 member.coordinator_last_seen = Some(Instant::now() - Duration::from_millis(11_000));
1102 let evs = member.check_timeouts();
1103 assert!(
1104 evs.iter()
1105 .any(|e| matches!(e, Event::CoordinatorElectionNeeded))
1106 );
1107 assert!(!member.is_coordinator, "flag cleared on silence");
1108 }
1109
1110 #[test]
1111 fn capabilities_advertise_with_claim_resets_silence_timer() {
1112 let mut member = GroupNode::new(2, group_id());
1113 let mut coord = GroupNode::new(1, group_id());
1114 member.bootstrap_as_joiner(0, 0);
1115 coord.bootstrap_as_creator(0);
1116 let mut s = PlainSealer;
1117 let f = coord.claim_coordinator(&mut s, 2).unwrap();
1119 let evs = member.on_wire(&mut s, &f.wire).unwrap();
1121 assert!(
1122 member.coordinator_last_seen.is_some(),
1123 "silence timer reset"
1124 );
1125 assert!(
1126 evs.iter()
1127 .any(|e| matches!(e, Event::CoordinatorClaim { claimant: 1 }))
1128 );
1129 }
1130
1131 #[test]
1132 fn higher_id_yields_to_lower_claimant() {
1133 let mut node5 = GroupNode::new(5, group_id());
1135 let mut node2 = GroupNode::new(2, group_id());
1136 node5.bootstrap_as_joiner(0, 0);
1137 node2.bootstrap_as_creator(0);
1138 let mut s = PlainSealer;
1139 node5.is_coordinator = true;
1141 let f = node2.claim_coordinator(&mut s, 5).unwrap();
1143 node5.on_wire(&mut s, &f.wire).unwrap();
1144 assert!(!node5.is_coordinator, "node5 yielded to node2");
1145 }
1146
1147 #[test]
1148 fn lower_id_keeps_coordinator_against_higher_claimant() {
1149 let mut node1 = GroupNode::new(1, group_id());
1150 let mut node5 = GroupNode::new(5, group_id());
1151 node1.bootstrap_as_creator(0);
1152 node5.bootstrap_as_joiner(0, 0);
1153 let mut s = PlainSealer;
1154 node1.is_coordinator = true;
1155 let f = node5.claim_coordinator(&mut s, 1).unwrap();
1156 node1.on_wire(&mut s, &f.wire).unwrap();
1157 assert!(node1.is_coordinator, "node1 keeps role — it has lower id");
1158 }
1159
1160 #[test]
1163 fn competing_prepare_lower_member_id_wins() {
1164 let mut node = GroupNode::new(10, group_id());
1167 node.bootstrap_as_joiner(0, 0);
1168 let mut s = PlainSealer;
1169
1170 let mut sender1 = GroupNode::new(1, group_id());
1172 sender1.bootstrap_as_creator(0);
1173 let f1 = sender1
1174 .send_control(
1175 &mut s,
1176 10,
1177 ControlOpcode::PrepareTransition,
1178 1,
1179 1,
1180 b"commit-A".to_vec(),
1181 )
1182 .unwrap();
1183 node.on_wire(&mut s, &f1.wire).unwrap();
1184 assert_eq!(
1185 node.pending_commit_sender,
1186 Some(1),
1187 "member 1 is initial winner"
1188 );
1189
1190 let mut sender3 = GroupNode::new(3, group_id());
1192 sender3.bootstrap_as_creator(0);
1193 let f3 = sender3
1194 .send_control(
1195 &mut s,
1196 10,
1197 ControlOpcode::PrepareTransition,
1198 1,
1199 2,
1200 b"commit-B".to_vec(),
1201 )
1202 .unwrap();
1203 node.on_wire(&mut s, &f3.wire).unwrap();
1204 assert_eq!(node.pending_commit_sender, Some(1), "member 1 still wins");
1206 assert_eq!(node.pending_transition_id, 1);
1207 }
1208
1209 #[test]
1210 fn competing_prepare_later_lower_id_displaces_winner() {
1211 let mut node = GroupNode::new(10, group_id());
1213 node.bootstrap_as_joiner(0, 0);
1214 let mut s = PlainSealer;
1215
1216 let mut sender5 = GroupNode::new(5, group_id());
1217 sender5.bootstrap_as_creator(0);
1218 let f5 = sender5
1219 .send_control(
1220 &mut s,
1221 10,
1222 ControlOpcode::PrepareTransition,
1223 1,
1224 1,
1225 b"commit-X".to_vec(),
1226 )
1227 .unwrap();
1228 node.on_wire(&mut s, &f5.wire).unwrap();
1229 assert_eq!(node.pending_commit_sender, Some(5));
1230
1231 let mut sender2 = GroupNode::new(2, group_id());
1232 sender2.bootstrap_as_creator(0);
1233 let f2 = sender2
1234 .send_control(
1235 &mut s,
1236 10,
1237 ControlOpcode::PrepareTransition,
1238 1,
1239 2,
1240 b"commit-Y".to_vec(),
1241 )
1242 .unwrap();
1243 node.on_wire(&mut s, &f2.wire).unwrap();
1244 assert_eq!(
1245 node.pending_commit_sender,
1246 Some(2),
1247 "member 2 displaces member 5"
1248 );
1249 }
1250
1251 #[test]
1252 fn apply_transition_clears_commit_sender() {
1253 let mut coord = GroupNode::new(1, group_id());
1254 coord.bootstrap_as_creator(0);
1255 let mut s = PlainSealer;
1256 coord
1257 .send_control(&mut s, 0, ControlOpcode::PrepareTransition, 1, 1, vec![])
1258 .unwrap();
1259 coord.apply_transition(1);
1260 assert_eq!(coord.pending_commit_sender, None);
1261 }
1262
1263 #[test]
1266 fn prepare_timeout_fires_when_deadline_exceeded() {
1267 let mut coord = GroupNode::new(1, group_id());
1268 coord.bootstrap_as_creator(0);
1269 let mut s = PlainSealer;
1270 coord
1271 .send_control(&mut s, 0, ControlOpcode::PrepareTransition, 1, 1, vec![])
1272 .unwrap();
1273 coord.prepare_deadline = Some(Instant::now() - Duration::from_millis(1));
1275 let evs = coord.check_timeouts();
1276 assert!(
1277 evs.iter().any(|e| matches!(
1278 e,
1279 Event::Error {
1280 code: codes::PREPARE_TIMEOUT,
1281 ..
1282 }
1283 )),
1284 "expected PREPARE_TIMEOUT, got {:?}",
1285 evs
1286 );
1287 assert_eq!(
1288 coord.transition_state,
1289 TransitionState::TAborted,
1290 "transition aborted"
1291 );
1292 assert_eq!(coord.prepare_deadline, None, "deadline cleared");
1293 }
1294
1295 #[test]
1296 fn execute_timeout_fires_when_deadline_exceeded() {
1297 let mut member = GroupNode::new(2, group_id());
1298 member.bootstrap_as_joiner(0, 0);
1299 let mut s = PlainSealer;
1300 member.pending_transition_id = 1;
1302 member.transition_state = TransitionState::TPrepared;
1303 member
1304 .send_control(&mut s, 1, ControlOpcode::ReadyForTransition, 1, 1, vec![])
1305 .unwrap();
1306 member.execute_deadline = Some(Instant::now() - Duration::from_millis(1));
1308 let evs = member.check_timeouts();
1309 assert!(
1310 evs.iter().any(|e| matches!(
1311 e,
1312 Event::Error {
1313 code: codes::EXECUTE_TIMEOUT,
1314 ..
1315 }
1316 )),
1317 "expected EXECUTE_TIMEOUT, got {:?}",
1318 evs
1319 );
1320 assert_eq!(member.execute_deadline, None, "deadline cleared");
1321 }
1322
1323 #[test]
1324 fn coordinator_gone_fires_after_silence() {
1325 let mut member = GroupNode::new(2, group_id());
1326 member.bootstrap_as_joiner(0, 0);
1327 member.coordinator_last_seen = Some(Instant::now() - Duration::from_millis(11_000));
1329 let evs = member.check_timeouts();
1330 assert!(
1331 evs.iter().any(|e| matches!(
1332 e,
1333 Event::Error {
1334 code: codes::COORDINATOR_GONE,
1335 ..
1336 }
1337 )),
1338 "expected COORDINATOR_GONE, got {:?}",
1339 evs
1340 );
1341 assert_eq!(member.coordinator_last_seen, None, "timer cleared");
1342 }
1343
1344 #[test]
1345 fn note_coordinator_activity_resets_silence_timer() {
1346 let mut member = GroupNode::new(2, group_id());
1347 member.bootstrap_as_joiner(0, 0);
1348 member.coordinator_last_seen = Some(Instant::now() - Duration::from_millis(11_000));
1350 member.note_coordinator_activity();
1352 let evs = member.check_timeouts();
1353 assert!(
1354 !evs.iter().any(|e| matches!(
1355 e,
1356 Event::Error {
1357 code: codes::COORDINATOR_GONE,
1358 ..
1359 }
1360 )),
1361 "should NOT fire after reset"
1362 );
1363 }
1364
1365 #[test]
1366 fn execute_clears_prepare_deadline() {
1367 let mut coord = GroupNode::new(1, group_id());
1368 coord.bootstrap_as_creator(0);
1369 let mut s = PlainSealer;
1370 coord
1371 .send_control(&mut s, 0, ControlOpcode::PrepareTransition, 1, 1, vec![])
1372 .unwrap();
1373 assert!(coord.prepare_deadline.is_some(), "deadline armed");
1374 coord
1375 .send_control(&mut s, 0, ControlOpcode::ExecuteTransition, 1, 2, vec![])
1376 .unwrap();
1377 assert_eq!(coord.prepare_deadline, None, "deadline cleared on EXECUTE");
1378 assert_eq!(
1379 coord.execute_deadline, None,
1380 "execute_deadline also cleared"
1381 );
1382 }
1383
1384 #[test]
1385 fn receive_prepare_arms_execute_deadline() {
1386 let mut coord = GroupNode::new(1, group_id());
1387 let mut member = GroupNode::new(2, group_id());
1388 coord.bootstrap_as_creator(0);
1389 member.bootstrap_as_joiner(0, 0);
1390 let mut s = PlainSealer;
1391 let f = coord
1392 .send_control(&mut s, 0, ControlOpcode::PrepareTransition, 1, 1, vec![])
1393 .unwrap();
1394 member.on_wire(&mut s, &f.wire).unwrap();
1395 assert!(
1396 member.execute_deadline.is_some(),
1397 "execute_deadline armed on receiving PREPARE"
1398 );
1399 }
1400
1401 #[test]
1402 fn receive_execute_clears_execute_deadline() {
1403 let mut coord = GroupNode::new(1, group_id());
1404 let mut member = GroupNode::new(2, group_id());
1405 coord.bootstrap_as_creator(0);
1406 member.bootstrap_as_joiner(0, 0);
1407 let mut s = PlainSealer;
1408 let prep = coord
1409 .send_control(&mut s, 0, ControlOpcode::PrepareTransition, 1, 1, vec![])
1410 .unwrap();
1411 member.on_wire(&mut s, &prep.wire).unwrap();
1412 let exec = coord
1413 .send_control(&mut s, 0, ControlOpcode::ExecuteTransition, 1, 2, vec![])
1414 .unwrap();
1415 member.on_wire(&mut s, &exec.wire).unwrap();
1416 assert_eq!(member.execute_deadline, None, "cleared on EXECUTE");
1417 }
1418
1419 #[test]
1420 fn no_timeout_when_deadlines_not_set() {
1421 let mut node = GroupNode::new(1, group_id());
1422 node.bootstrap_as_creator(0);
1423 node.drain_events(); let evs = node.check_timeouts();
1425 assert!(evs.is_empty(), "no events without armed deadlines");
1426 }
1427
1428 #[test]
1429 fn prepare_with_already_applied_tid_is_rejected() {
1430 let mut coord = GroupNode::new(1, group_id());
1433 coord.bootstrap_as_creator(0);
1434 let mut s = PlainSealer;
1435 let _ = coord
1436 .send_control(&mut s, 0, ControlOpcode::PrepareTransition, 1, 1, vec![])
1437 .unwrap();
1438 coord.apply_transition(1);
1439 assert_eq!(coord.last_transition_id, 1);
1440 assert_eq!(coord.pending_transition_id, 0);
1441 let mut peer = GroupNode::new(2, group_id());
1445 peer.bootstrap_as_joiner(coord.current_epoch, 0);
1446 let stale = peer
1447 .send_control(&mut s, 1, ControlOpcode::PrepareTransition, 1, 9, vec![])
1448 .unwrap();
1449 let evs = coord.on_wire(&mut s, &stale.wire).unwrap();
1450 let errs = drain_errs(&evs);
1451 assert!(
1452 errs.contains(&codes::TRANSITION_MISMATCH),
1453 "expected TRANSITION_MISMATCH, got {:?}",
1454 errs
1455 );
1456 }
1457
1458 #[test]
1459 fn decrypt_failed_is_non_fatal() {
1460 struct OpenFailSealer;
1462 impl Sealer for OpenFailSealer {
1463 fn seal(
1464 &mut self,
1465 _: StreamType,
1466 _: SequenceNo,
1467 p: &[u8],
1468 ) -> Result<Vec<u8>, MlsError> {
1469 Ok(p.to_vec())
1470 }
1471 fn open(
1472 &mut self,
1473 _: StreamType,
1474 _: SequenceNo,
1475 _: &[u8],
1476 ) -> Result<Vec<u8>, MlsError> {
1477 Err(MlsError::Aead("simulated".into()))
1478 }
1479 }
1480 let mut alice = GroupNode::new(1, group_id());
1481 let mut bob = GroupNode::new(2, group_id());
1482 alice.bootstrap_as_creator(1);
1483 bob.bootstrap_as_joiner(1, 0);
1484 let mut s = PlainSealer;
1485 let sid = alice.member_stream_id(2);
1486 let f = alice
1487 .send_payload(
1488 &mut s,
1489 2,
1490 StreamType::Text,
1491 sid,
1492 GbpFlags::ordered_reliable_ack(),
1493 b"x",
1494 )
1495 .unwrap();
1496 let mut fail = OpenFailSealer;
1497 let evs = bob.on_wire(&mut fail, &f.wire).unwrap();
1498 let err = evs
1499 .iter()
1500 .find_map(|e| match e {
1501 Event::Error {
1502 code,
1503 fatal,
1504 retryable,
1505 ..
1506 } => Some((*code, *fatal, *retryable)),
1507 _ => None,
1508 })
1509 .expect("error event");
1510 assert_eq!(err.0, codes::DECRYPT_FAILED);
1511 assert!(!err.1, "must be non-fatal");
1512 assert!(err.2, "must be retryable");
1513 assert_eq!(bob.state, NodeState::Active, "bob stays Active");
1514 }
1515}