Skip to main content

gbp_node/
node.rs

1//! GBP-layer group node.
2//!
3//! Responsibilities of this layer (analogous to IP):
4//!
5//! * Decode incoming CBOR frames and validate `version`, `group_id`, `epoch`
6//!   and `transition_id` per the GBP spec.
7//! * Enforce a per-`(stream_type, stream_id)` replay window via
8//!   `sequence_no`.
9//! * Open the AEAD payload through the [`Sealer`] trait (typically backed by
10//!   `gbp-mls`).
11//! * Surface decoded payloads to sub-protocols as
12//!   [`Event::PayloadReceived`]; the sub-protocol layer is responsible for
13//!   message-level semantics.
14//! * Drive the control plane: handle `EXECUTE_TRANSITION`, request resync on
15//!   `EPOCH_MISMATCH`, etc.
16//!
17//! Out of scope: parsing GTP / GAP / GSP payloads, GTP idempotency, GAP
18//! `key_phase` validation and mute-list tracking. Those concerns belong to
19//! the per-sub-protocol clients in the `gtp-protocol`, `gap-protocol` and
20//! `gsp-protocol` crates.
21
22use gbp::{CodecError, ControlMessage, ErrorObject, GbpFrame};
23use gbp_core::{
24    ControlOpcode, ErrorClass, GbpFlags, GroupId, MemberId, NodeState, SequenceNo, StreamId,
25    StreamType, TransitionId, TransitionState, codes, errors::ErrorSpec, timeouts,
26};
27use gbp_mls::{MlsError, label_for};
28use std::collections::HashMap;
29use std::time::{Duration, Instant};
30
31/// Errors raised by [`GroupNode`].
32#[derive(Debug, thiserror::Error)]
33pub enum NodeError {
34    /// Codec error.
35    #[error("codec: {0}")]
36    Codec(#[from] CodecError),
37    /// MLS / AEAD error.
38    #[error("mls: {0}")]
39    Mls(#[from] MlsError),
40    /// The node is not in a state that allows the requested operation.
41    #[error("invalid state: {0}")]
42    InvalidState(String),
43}
44
45/// A wire-ready outbound frame: the recipient and its serialised CBOR bytes.
46pub struct OutboundFrame {
47    /// Target member id.
48    pub to: MemberId,
49    /// CBOR-encoded [`GbpFrame`] bytes.
50    pub wire: Vec<u8>,
51}
52
53/// Information about a payload delivered by GBP to a sub-protocol.
54#[derive(Debug, Clone)]
55pub struct DeliveredPayload {
56    /// Stream class on which the frame arrived.
57    pub stream_type: StreamType,
58    /// Stream id from the frame (preserved so receivers can demultiplex
59    /// multiple sub-streams).
60    pub stream_id: StreamId,
61    /// Sequence number after passing the replay window.
62    pub sequence_no: SequenceNo,
63    /// Frame flag bits, copied as-is.
64    pub flags: u16,
65    /// Decrypted plaintext bytes.
66    pub plaintext: Vec<u8>,
67}
68
69/// Events surfaced by the GBP layer.
70#[derive(Debug, Clone)]
71pub enum Event {
72    /// Node FSM changed state.
73    StateChanged {
74        /// Previous state.
75        from: NodeState,
76        /// New state.
77        to: NodeState,
78    },
79    /// Payload delivered to a sub-protocol (Text / Audio / Signal). Control
80    /// frames are handled internally and do not surface as
81    /// [`Event::PayloadReceived`].
82    PayloadReceived(DeliveredPayload),
83    /// A control plane message was received and parsed.
84    Control {
85        /// Sender member id.
86        from: MemberId,
87        /// Decoded opcode.
88        opcode: ControlOpcode,
89        /// `transition_id` carried by the message.
90        transition_id: TransitionId,
91        /// `request_id` echoed by ACK / NACK responders.
92        request_id: u32,
93        /// Opcode-specific args (CBOR or opaque bytes; e.g. the MLS Commit
94        /// embedded in `PREPARE_TRANSITION`).
95        args: Vec<u8>,
96    },
97    /// An error was raised.
98    Error {
99        /// Numeric error code.
100        code: u16,
101        /// Error class.
102        class: ErrorClass,
103        /// MAY be retried.
104        retryable: bool,
105        /// Fatal — the node is now in `FAILED`.
106        fatal: bool,
107        /// Human-readable reason.
108        reason: String,
109    },
110    /// Epoch transition has been applied locally.
111    EpochAdvanced {
112        /// New epoch.
113        epoch: u64,
114        /// `transition_id` that produced the new epoch.
115        transition_id: TransitionId,
116    },
117    /// Coordinator silence exceeded `T_coordinator_grace`. The application
118    /// should call [`GroupNode::claim_coordinator`] if this node has the
119    /// lowest `MemberId` among currently active members.
120    CoordinatorElectionNeeded,
121    /// This node successfully claimed the coordinator role (sent
122    /// `CAPABILITIES_ADVERTISE` with `coordinator_claim=true`).
123    BecameCoordinator,
124    /// A remote member broadcast a coordinator claim.
125    CoordinatorClaim {
126        /// The claiming member's id.
127        claimant: MemberId,
128    },
129}
130
131/// GBP-layer node.
132///
133/// Owns the framing, AEAD, replay window, FSM and control plane.
134/// Sub-protocol semantics live in their own crates and use this type plus a
135/// [`Sealer`] for outbound traffic and `on_wire` + the resulting events for
136/// inbound traffic.
137pub struct GroupNode {
138    /// Application-level member id.
139    pub member_id: MemberId,
140    /// Whether this node currently holds the coordinator role.
141    pub is_coordinator: bool,
142    /// 16-byte group identifier.
143    pub group_id: GroupId,
144    /// Current epoch as observed by the GBP layer (the authoritative epoch
145    /// lives in the underlying MLS group).
146    pub current_epoch: u64,
147    /// Last applied `transition_id`.
148    pub last_transition_id: TransitionId,
149    /// Pending `transition_id` (set during PREPARE / READY).
150    pub pending_transition_id: TransitionId,
151    /// Node FSM.
152    pub state: NodeState,
153    /// Transition FSM.
154    pub transition_state: TransitionState,
155
156    out_seq: HashMap<(StreamType, StreamId), SequenceNo>,
157    in_hw: HashMap<(StreamType, StreamId), SequenceNo>,
158    events: Vec<Event>,
159
160    /// MemberId of the member whose PREPARE_TRANSITION is currently pending.
161    /// Used for tie-break: if two PREPAREs arrive for the same transition_id,
162    /// the one from the lower MemberId wins (gbp_rfc §8).
163    pending_commit_sender: Option<MemberId>,
164    /// Deadline for receiving quorum READY after issuing PREPARE_TRANSITION.
165    /// Armed when coordinator sends PREPARE; fires ERR_PREPARE_TIMEOUT.
166    prepare_deadline: Option<Instant>,
167    /// Deadline for receiving EXECUTE_TRANSITION after sending READY_FOR_TRANSITION.
168    /// Armed when a member sends READY; fires ERR_EXECUTE_TIMEOUT.
169    execute_deadline: Option<Instant>,
170    /// Timestamp of last coordinator activity. When silence exceeds
171    /// T_COORDINATOR_GRACE_MS the node emits ERR_COORDINATOR_GONE.
172    coordinator_last_seen: Option<Instant>,
173}
174
175impl GroupNode {
176    /// Builds a fresh node in the `IDLE` state.
177    pub fn new(member_id: MemberId, group_id: GroupId) -> Self {
178        Self {
179            member_id,
180            group_id,
181            is_coordinator: false,
182            current_epoch: 0,
183            last_transition_id: 0,
184            pending_transition_id: 0,
185            state: NodeState::Idle,
186            transition_state: TransitionState::TIdle,
187            out_seq: HashMap::new(),
188            in_hw: HashMap::new(),
189            events: Vec::new(),
190            pending_commit_sender: None,
191            prepare_deadline: None,
192            execute_deadline: None,
193            coordinator_last_seen: None,
194        }
195    }
196
197    /// Drives the node from `IDLE` to `ACTIVE` as a creator.
198    pub fn bootstrap_as_creator(&mut self, epoch: u64) {
199        self.transition(NodeState::Connecting);
200        self.transition(NodeState::EstablishingGroup);
201        self.current_epoch = epoch;
202        self.transition(NodeState::Active);
203    }
204
205    /// Drives the node from `IDLE` to `ACTIVE` as a joiner.
206    ///
207    /// `expected_first_tid` lets the joiner pre-arm its pending transition
208    /// state so that the very next `EXECUTE_TRANSITION` (which will arrive
209    /// without a preceding PREPARE the joiner could decrypt — that PREPARE
210    /// was sealed under the pre-Welcome epoch) is accepted by
211    /// `handle_control`'s tid-validation matrix. Pass `0` if the joiner
212    /// recovered out-of-band and is already current.
213    pub fn bootstrap_as_joiner(&mut self, epoch: u64, expected_first_tid: u32) {
214        self.transition(NodeState::Connecting);
215        self.transition(NodeState::EstablishingGroup);
216        self.current_epoch = epoch;
217        if expected_first_tid > 0 {
218            self.pending_transition_id = expected_first_tid;
219            self.transition_state = TransitionState::TPrepared;
220        }
221        self.transition(NodeState::Active);
222    }
223
224    /// Drains and returns all queued events.
225    pub fn drain_events(&mut self) -> Vec<Event> {
226        std::mem::take(&mut self.events)
227    }
228
229    /// Returns a sender-unique `stream_id` within the given base class.
230    ///
231    /// This is used so that the receiver's replay window does not conflate
232    /// streams that originate from different members.
233    pub fn member_stream_id(&self, base: u32) -> StreamId {
234        debug_assert!(
235            self.member_id < 1_000_000,
236            "member_id overflow: {0}",
237            self.member_id
238        );
239        base + self.member_id * 100
240    }
241
242    /// Sends an opaque plaintext payload on the given stream.
243    ///
244    /// Used by the sub-protocol clients: each one CBOR-encodes its message
245    /// and forwards the resulting bytes here.
246    pub fn send_payload<S: Sealer>(
247        &mut self,
248        seal: &mut S,
249        target: MemberId,
250        stream_type: StreamType,
251        stream_id: StreamId,
252        flags: u16,
253        plaintext: &[u8],
254    ) -> Result<OutboundFrame, NodeError> {
255        self.assert_can_send()?;
256        let seq = self.next_seq(stream_type, stream_id);
257        let ciphertext = seal.seal(stream_type, seq, plaintext)?;
258        let frame = GbpFrame::new(
259            self.group_id,
260            self.current_epoch,
261            self.last_transition_id,
262            stream_type,
263            stream_id,
264            flags,
265            seq,
266            ciphertext,
267        );
268        Ok(OutboundFrame {
269            to: target,
270            wire: frame.to_cbor(),
271        })
272    }
273
274    /// Sends a control plane message on Stream 0. Wrapper around
275    /// [`GroupNode::send_payload`].
276    ///
277    /// Side effect: when the coordinator originates a `PREPARE_TRANSITION`,
278    /// it must locally adopt the same `pending_transition_id` so that the
279    /// inbound READY / EXECUTE validation matrix in `handle_control` lines
280    /// up. Without this, the coordinator never matches its own pending tid
281    /// against the remote READY frames it expects, and the handshake never
282    /// completes.
283    pub fn send_control<S: Sealer>(
284        &mut self,
285        seal: &mut S,
286        target: MemberId,
287        opcode: ControlOpcode,
288        transition_id: TransitionId,
289        request_id: u32,
290        args: Vec<u8>,
291    ) -> Result<OutboundFrame, NodeError> {
292        let ctl = ControlMessage::with_args(
293            opcode as u16,
294            request_id,
295            self.member_id,
296            transition_id,
297            args,
298        );
299        let mut flags = GbpFlags::ordered_reliable_system();
300        if matches!(
301            opcode,
302            ControlOpcode::PrepareTransition
303                | ControlOpcode::ReadyForTransition
304                | ControlOpcode::ExecuteTransition
305        ) {
306            flags |= GbpFlags::CRITICAL;
307        }
308        // Sender-side state mirroring (matches what `handle_control` does on
309        // the receiver side). We only update on PREPARE/EXECUTE/ABORT — READY
310        // is purely an ack carrying the existing pending tid.
311        match opcode {
312            ControlOpcode::PrepareTransition => {
313                self.pending_transition_id = transition_id;
314                self.transition_state = TransitionState::TPrepared;
315                self.prepare_deadline =
316                    Some(Instant::now() + Duration::from_millis(timeouts::T_PREPARE_MAX_MS));
317                self.execute_deadline = None;
318            }
319            ControlOpcode::ReadyForTransition => {
320                self.execute_deadline =
321                    Some(Instant::now() + Duration::from_millis(timeouts::T_EXECUTE_MAX_MS));
322            }
323            ControlOpcode::ExecuteTransition | ControlOpcode::AbortTransition => {
324                self.prepare_deadline = None;
325                self.execute_deadline = None;
326                if opcode == ControlOpcode::AbortTransition {
327                    self.pending_transition_id = 0;
328                    self.transition_state = TransitionState::TAborted;
329                }
330            }
331            _ => {}
332        }
333        let stream_id = self.member_stream_id(0);
334        self.send_payload(
335            seal,
336            target,
337            StreamType::Control,
338            stream_id,
339            flags,
340            &ctl.to_cbor(),
341        )
342    }
343
344    /// Feeds wire bytes to the node.
345    ///
346    /// Performs the §6.2 validation pipeline (version → group_id → epoch →
347    /// payload_size → transition_id → replay), opens the AEAD payload and
348    /// either:
349    /// * dispatches the parsed control message internally (for
350    ///   `StreamType::Control`), or
351    /// * surfaces an [`Event::PayloadReceived`] (for application streams).
352    ///
353    /// Returns every event that was produced as a result.
354    pub fn on_wire<S: Sealer>(
355        &mut self,
356        seal: &mut S,
357        wire: &[u8],
358    ) -> Result<Vec<Event>, NodeError> {
359        // Decode without payload-size validation — we want a malformed v!=1
360        // frame to surface as `ERR_UNSUPPORTED_VERSION`, not as
361        // `ERR_PAYLOAD_SIZE_MISMATCH`. Validation runs in deliver_frame, in
362        // the order required by §6.2.
363        let frame = match GbpFrame::decode(wire) {
364            Ok(f) => f,
365            Err(e) => {
366                self.emit_err_spec(codes::STREAM_POLICY_VIOLATION, format!("frame decode: {e}"));
367                return Ok(self.drain_events());
368            }
369        };
370        self.deliver_frame(seal, frame)?;
371        Ok(self.drain_events())
372    }
373
374    fn deliver_frame<S: Sealer>(&mut self, seal: &mut S, frame: GbpFrame) -> Result<(), NodeError> {
375        // §6.2 order: version → group_id → epoch → payload_size →
376        // transition_id (when CRITICAL) → replay.
377        if frame.version != 1 {
378            self.emit_err_spec(codes::UNSUPPORTED_VERSION, "version != 1");
379            return Ok(());
380        }
381        if frame.group_id_array() != self.group_id {
382            self.emit_err_spec(codes::UNKNOWN_GROUP, "group_id");
383            return Ok(());
384        }
385        if frame.epoch != self.current_epoch {
386            self.emit_err_spec(
387                codes::EPOCH_MISMATCH,
388                format!("got {}, expected {}", frame.epoch, self.current_epoch),
389            );
390            self.trigger_resync();
391            return Ok(());
392        }
393        if let Err(e) = frame.validate_payload_size() {
394            self.emit_err_spec(codes::STREAM_POLICY_VIOLATION, format!("payload size: {e}"));
395            return Ok(());
396        }
397        let flags = GbpFlags::from_bits(frame.flags);
398        let st = match frame.stream_type_typed() {
399            Ok(st) => st,
400            Err(_) => {
401                self.emit_err_spec(codes::STREAM_POLICY_VIOLATION, "unknown stream_type");
402                return Ok(());
403            }
404        };
405
406        // §6.2 transition_id ordering: CRITICAL frames on application streams
407        // MUST equal `last_transition_id`. Control-stream frames are exempt
408        // from this check and validated per-opcode inside `handle_control`,
409        // because PREPARE_TRANSITION legitimately carries `last + 1` and
410        // EXECUTE / ACK carry `pending_transition_id`.
411        if st != StreamType::Control
412            && flags.has(GbpFlags::CRITICAL)
413            && frame.transition_id != self.last_transition_id
414        {
415            self.emit_err_spec(
416                codes::TRANSITION_MISMATCH,
417                format!(
418                    "got tid={}, expected {}",
419                    frame.transition_id, self.last_transition_id
420                ),
421            );
422            return Ok(());
423        }
424
425        let key = (st, frame.stream_id);
426        let hw = self.in_hw.get(&key).copied().unwrap_or(0);
427        if frame.sequence_no <= hw {
428            self.emit_err_spec(
429                codes::REPLAY_DETECTED,
430                format!(
431                    "st={} sid={} seq={} hw={}",
432                    st, frame.stream_id, frame.sequence_no, hw
433                ),
434            );
435            return Ok(());
436        }
437        self.in_hw.insert(key, frame.sequence_no);
438
439        let plain = match seal.open(st, frame.sequence_no, &frame.encrypted_payload) {
440            Ok(p) => p,
441            Err(e) => {
442                // Non-fatal: a frame addressed under a different MLS epoch
443                // (e.g. PREPARE_TRANSITION reaching a fresh joiner that has
444                // already accepted the post-commit Welcome) cannot be
445                // decrypted, but that's expected and the node MUST keep
446                // running to receive the subsequent EXECUTE frame on the
447                // shared post-merge epoch.
448                self.emit_err_named(
449                    codes::DECRYPT_FAILED,
450                    ErrorClass::Crypto,
451                    true,  // retryable: caller may resync via digest
452                    false, // non-fatal
453                    format!("aead open: {e}"),
454                );
455                return Ok(());
456            }
457        };
458
459        match st {
460            StreamType::Control => self.handle_control(plain),
461            other => self.events.push(Event::PayloadReceived(DeliveredPayload {
462                stream_type: other,
463                stream_id: frame.stream_id,
464                sequence_no: frame.sequence_no,
465                flags: frame.flags,
466                plaintext: plain,
467            })),
468        }
469        Ok(())
470    }
471
472    fn handle_control(&mut self, plain: Vec<u8>) {
473        let c = match ControlMessage::from_cbor(&plain) {
474            Ok(c) => c,
475            Err(_) => {
476                self.emit_err_spec(codes::STREAM_POLICY_VIOLATION, "control decode");
477                return;
478            }
479        };
480        let opcode = match ControlOpcode::try_from(c.opcode) {
481            Ok(op) => op,
482            Err(_) => {
483                self.emit_err_spec(codes::STREAM_POLICY_VIOLATION, "unknown opcode");
484                return;
485            }
486        };
487        // Per-opcode TransitionID validation (§5 of gbp-control-plane).
488        let tid_ok = match opcode {
489            // PREPARE introduces last+1; receiver simply records it as pending.
490            // Re-issuing a PREPARE for an already-pending tid is allowed; a
491            // smaller-or-equal tid that is not strictly newer is rejected.
492            ControlOpcode::PrepareTransition => {
493                c.transition_id > self.last_transition_id
494                    && (self.pending_transition_id == 0
495                        || self.pending_transition_id == c.transition_id)
496            }
497            // READY / EXECUTE / ABORT must reference the pending tid.
498            ControlOpcode::ReadyForTransition
499            | ControlOpcode::ExecuteTransition
500            | ControlOpcode::AbortTransition => {
501                self.pending_transition_id != 0 && c.transition_id == self.pending_transition_id
502            }
503            // Digest / capability / ack / nack: tid is informational, no
504            // ordering constraint at the GBP layer.
505            _ => true,
506        };
507        if !tid_ok {
508            self.emit_err_spec(
509                codes::TRANSITION_MISMATCH,
510                format!(
511                    "control tid={} not valid for {:?} (last={}, pending={})",
512                    c.transition_id, opcode, self.last_transition_id, self.pending_transition_id
513                ),
514            );
515            return;
516        }
517        match opcode {
518            ControlOpcode::PrepareTransition => {
519                // Tie-break (gbp_rfc §8): if a PREPARE for the same tid already
520                // exists from another sender, keep whichever has the lower
521                // MemberId — that member's commit is the canonical winner.
522                if self.pending_transition_id == c.transition_id {
523                    let current_winner = self.pending_commit_sender.unwrap_or(MemberId::MAX);
524                    if c.sender_id >= current_winner {
525                        // Existing winner holds; discard this competing commit
526                        // but still surface the Control event so upper layers
527                        // can observe it.
528                        self.events.push(Event::Control {
529                            from: c.sender_id,
530                            opcode,
531                            transition_id: c.transition_id,
532                            request_id: c.request_id,
533                            args: c.args.to_vec(),
534                        });
535                        return;
536                    }
537                    // New sender wins — replace.
538                }
539                self.pending_transition_id = c.transition_id;
540                self.pending_commit_sender = Some(c.sender_id);
541                self.transition_state = TransitionState::TPrepared;
542                // PREPARE originates from the coordinator — record activity.
543                self.note_coordinator_activity();
544                // Arm execute deadline: member must see EXECUTE within T_execute_max.
545                self.execute_deadline =
546                    Some(Instant::now() + Duration::from_millis(timeouts::T_EXECUTE_MAX_MS));
547            }
548            ControlOpcode::ReadyForTransition => {
549                self.transition_state = TransitionState::TReady;
550                // Coordinator received a READY — clear the per-member wait.
551                self.prepare_deadline = None;
552            }
553            ControlOpcode::ExecuteTransition => {
554                self.execute_deadline = None;
555                self.pending_commit_sender = None;
556                self.apply_transition(c.transition_id);
557                self.note_coordinator_activity();
558            }
559            ControlOpcode::AbortTransition => {
560                self.prepare_deadline = None;
561                self.execute_deadline = None;
562                self.pending_commit_sender = None;
563                self.transition_state = TransitionState::TAborted;
564                self.pending_transition_id = 0;
565            }
566            ControlOpcode::GroupStateDigestResponse => {
567                if self.state == NodeState::Resyncing {
568                    self.transition(NodeState::Active);
569                }
570            }
571            ControlOpcode::CapabilitiesAdvertise => {
572                if Self::is_coordinator_claim(&c.args) {
573                    // Coordinator is alive — reset silence timer.
574                    self.note_coordinator_activity();
575                    // Collision resolution (gbp-control-plane §5.1): if we
576                    // also claimed and the remote claimant has a lower
577                    // MemberId, yield the coordinator role to them.
578                    if self.is_coordinator && c.sender_id < self.member_id {
579                        self.is_coordinator = false;
580                    }
581                    self.events.push(Event::CoordinatorClaim {
582                        claimant: c.sender_id,
583                    });
584                }
585            }
586            _ => {}
587        }
588        self.events.push(Event::Control {
589            from: c.sender_id,
590            opcode,
591            transition_id: c.transition_id,
592            request_id: c.request_id,
593            args: c.args.to_vec(),
594        });
595    }
596
597    /// Applies a new epoch (called by the coordinator after
598    /// `EXECUTE_TRANSITION`).
599    pub fn apply_transition(&mut self, tid: TransitionId) {
600        self.current_epoch += 1;
601        self.last_transition_id = tid;
602        self.pending_transition_id = 0;
603        self.pending_commit_sender = None;
604        self.transition_state = TransitionState::TExecuted;
605        self.out_seq.clear();
606        self.in_hw.clear();
607        self.events.push(Event::EpochAdvanced {
608            epoch: self.current_epoch,
609            transition_id: tid,
610        });
611    }
612
613    /// Forces the node into the `RESYNCING` state.
614    pub fn trigger_resync(&mut self) {
615        if self.state != NodeState::Resyncing {
616            self.transition(NodeState::Resyncing);
617        }
618    }
619
620    /// Checks FSM deadlines and emits timeout events if any have expired.
621    ///
622    /// Call this regularly from the application event loop (e.g. every 500 ms).
623    /// Returns the same events that would come from [`GroupNode::drain_events`];
624    /// the caller may also drain events separately — this method does not
625    /// duplicate them.
626    pub fn check_timeouts(&mut self) -> Vec<Event> {
627        let now = Instant::now();
628
629        if self.prepare_deadline.is_some_and(|d| now >= d) {
630            self.prepare_deadline = None;
631            self.execute_deadline = None;
632            self.pending_transition_id = 0;
633            self.transition_state = TransitionState::TAborted;
634            self.emit_err_spec(codes::PREPARE_TIMEOUT, "T_prepare_max exceeded");
635        }
636
637        if self.execute_deadline.is_some_and(|d| now >= d) {
638            self.execute_deadline = None;
639            self.emit_err_spec(codes::EXECUTE_TIMEOUT, "T_execute_max exceeded");
640        }
641
642        if self.coordinator_last_seen.is_some_and(|t| {
643            now.duration_since(t).as_millis() as u64 >= timeouts::T_COORDINATOR_GRACE_MS
644        }) {
645            self.coordinator_last_seen = None;
646            self.is_coordinator = false;
647            self.emit_err_spec(
648                codes::COORDINATOR_GONE,
649                "coordinator silence exceeded T_coordinator_grace",
650            );
651            self.events.push(Event::CoordinatorElectionNeeded);
652        }
653
654        self.drain_events()
655    }
656
657    /// Records that the coordinator was active right now.
658    ///
659    /// Call this whenever the node receives a frame from the current
660    /// coordinator (e.g. `PREPARE_TRANSITION`, `EXECUTE_TRANSITION`,
661    /// `CAPABILITIES_ADVERTISE` with `coordinator_claim`). Resets the
662    /// coordinator-silence timer used to detect `ERR_COORDINATOR_GONE`.
663    pub fn note_coordinator_activity(&mut self) {
664        self.coordinator_last_seen = Some(Instant::now());
665    }
666
667    /// Claims the coordinator role by broadcasting `CAPABILITIES_ADVERTISE`
668    /// with `coordinator_claim=true` (gbp-control-plane §5.1).
669    ///
670    /// Call this when [`Event::CoordinatorElectionNeeded`] fires **and** this
671    /// node has the lowest `MemberId` among currently active members. The
672    /// caller is responsible for delivering the returned frame to every group
673    /// member.
674    ///
675    /// The args payload is the minimal CBOR map `{0: true}` encoding a
676    /// coordinator claim flag.
677    pub fn claim_coordinator<S: Sealer>(
678        &mut self,
679        seal: &mut S,
680        target: MemberId,
681    ) -> Result<OutboundFrame, NodeError> {
682        // CBOR: {0: true}  →  A1 00 F5
683        let args = vec![0xA1u8, 0x00, 0xF5];
684        self.is_coordinator = true;
685        self.coordinator_last_seen = Some(Instant::now());
686        self.events.push(Event::BecameCoordinator);
687        self.send_control(
688            seal,
689            target,
690            ControlOpcode::CapabilitiesAdvertise,
691            self.last_transition_id,
692            0,
693            args,
694        )
695    }
696
697    /// Returns `true` if the raw args bytes of a `CAPABILITIES_ADVERTISE`
698    /// frame encode a coordinator claim (`{0: true}` in CBOR).
699    fn is_coordinator_claim(args: &[u8]) -> bool {
700        // Minimal CBOR map {0: true}: A1 00 F5
701        // We also accept larger maps where key 0 maps to true.
702        // Fast path: exact match on the minimal encoding.
703        if args == [0xA1, 0x00, 0xF5] {
704            return true;
705        }
706        // General path: scan for the sequence 00 F5 (uint(0) → true) inside a
707        // CBOR map. This is intentionally simple; a full CBOR parser lives in
708        // the gbp-protocol crate and is not a dependency here.
709        args.windows(2).any(|w| w == [0x00, 0xF5])
710    }
711
712    fn transition(&mut self, next: NodeState) {
713        if self.state == next {
714            return;
715        }
716        if !self.state.can_transition_to(next) {
717            let from = self.state;
718            self.state = NodeState::Failed;
719            self.events.push(Event::StateChanged {
720                from,
721                to: NodeState::Failed,
722            });
723            return;
724        }
725        let from = self.state;
726        self.state = next;
727        self.events.push(Event::StateChanged { from, to: next });
728    }
729
730    fn assert_can_send(&self) -> Result<(), NodeError> {
731        if matches!(
732            self.state,
733            NodeState::Active | NodeState::Resyncing | NodeState::EstablishingGroup
734        ) {
735            Ok(())
736        } else {
737            Err(NodeError::InvalidState(format!(
738                "cannot send in state {}",
739                self.state
740            )))
741        }
742    }
743
744    fn next_seq(&mut self, st: StreamType, sid: StreamId) -> SequenceNo {
745        let entry = self.out_seq.entry((st, sid)).or_insert(0);
746        *entry += 1;
747        *entry
748    }
749
750    fn emit_err_spec(&mut self, code: u16, reason: impl Into<String>) {
751        if let Some(spec) = ErrorSpec::lookup(code) {
752            self.emit_err_named(spec.code, spec.class, spec.retryable, spec.fatal, reason);
753        } else {
754            self.emit_err_named(code, ErrorClass::Policy, false, false, reason);
755        }
756    }
757
758    fn emit_err_named(
759        &mut self,
760        code: u16,
761        class: ErrorClass,
762        retryable: bool,
763        fatal: bool,
764        reason: impl Into<String>,
765    ) {
766        let reason = reason.into();
767        // ErrorSpec is authoritative for known codes — use its class/retryable/fatal
768        // so that the wire error always matches the registry.
769        let (class, retryable, fatal) = if let Some(spec) = ErrorSpec::lookup(code) {
770            (spec.class, spec.retryable, spec.fatal)
771        } else {
772            (class, retryable, fatal)
773        };
774        let _ = ErrorObject::new(code, class, retryable, fatal, reason.clone()).to_cbor();
775        self.events.push(Event::Error {
776            code,
777            class,
778            retryable,
779            fatal,
780            reason,
781        });
782        if fatal {
783            let from = self.state;
784            self.state = NodeState::Failed;
785            self.events.push(Event::StateChanged {
786                from,
787                to: NodeState::Failed,
788            });
789        }
790    }
791}
792
793/// Trait abstracting the AEAD seal / open operations.
794///
795/// Implemented for [`gbp_mls::MlsContext`] in this crate; tests typically
796/// implement a no-op identity sealer to exercise the FSM without standing
797/// up an MLS group.
798pub trait Sealer {
799    /// Encrypts `pt` for the given stream and per-stream sequence number.
800    fn seal(&mut self, st: StreamType, seq: SequenceNo, pt: &[u8]) -> Result<Vec<u8>, MlsError>;
801    /// Decrypts `ct` for the given stream and per-stream sequence number.
802    fn open(&mut self, st: StreamType, seq: SequenceNo, ct: &[u8]) -> Result<Vec<u8>, MlsError>;
803}
804
805impl Sealer for gbp_mls::MlsContext {
806    fn seal(&mut self, st: StreamType, seq: SequenceNo, pt: &[u8]) -> Result<Vec<u8>, MlsError> {
807        gbp_mls::MlsContext::seal(self, label_for(st), seq, pt)
808    }
809    fn open(&mut self, st: StreamType, seq: SequenceNo, ct: &[u8]) -> Result<Vec<u8>, MlsError> {
810        gbp_mls::MlsContext::open(self, label_for(st), seq, ct)
811    }
812}
813
814#[cfg(test)]
815mod tests {
816    use super::*;
817
818    struct PlainSealer;
819    impl Sealer for PlainSealer {
820        fn seal(
821            &mut self,
822            _st: StreamType,
823            _seq: SequenceNo,
824            pt: &[u8],
825        ) -> Result<Vec<u8>, MlsError> {
826            Ok(pt.to_vec())
827        }
828        fn open(
829            &mut self,
830            _st: StreamType,
831            _seq: SequenceNo,
832            ct: &[u8],
833        ) -> Result<Vec<u8>, MlsError> {
834            Ok(ct.to_vec())
835        }
836    }
837
838    fn group_id() -> GroupId {
839        let mut g = [0u8; 16];
840        g[..3].copy_from_slice(b"GBP");
841        g
842    }
843
844    #[test]
845    fn replay_window_rejects_repeat() {
846        let mut alice = GroupNode::new(1, group_id());
847        let mut bob = GroupNode::new(2, group_id());
848        alice.bootstrap_as_creator(1);
849        bob.bootstrap_as_joiner(1, 0);
850        let mut s = PlainSealer;
851        let sid = alice.member_stream_id(2);
852        let f = alice
853            .send_payload(
854                &mut s,
855                2,
856                StreamType::Text,
857                sid,
858                GbpFlags::ordered_reliable_ack(),
859                b"hi",
860            )
861            .unwrap();
862        let _ = bob.on_wire(&mut s, &f.wire).unwrap();
863        let evs = bob.on_wire(&mut s, &f.wire).unwrap();
864        assert!(evs.iter().any(|e| matches!(
865            e,
866            Event::Error {
867                code: codes::REPLAY_DETECTED,
868                ..
869            }
870        )));
871    }
872
873    #[test]
874    fn epoch_mismatch_triggers_resync() {
875        let mut alice = GroupNode::new(1, group_id());
876        let mut bob = GroupNode::new(2, group_id());
877        alice.bootstrap_as_creator(1);
878        bob.bootstrap_as_joiner(1, 0);
879        alice.current_epoch = 2;
880        let mut s = PlainSealer;
881        let sid = alice.member_stream_id(2);
882        let f = alice
883            .send_payload(
884                &mut s,
885                2,
886                StreamType::Text,
887                sid,
888                GbpFlags::ordered_reliable_ack(),
889                b"x",
890            )
891            .unwrap();
892        let _ = bob.on_wire(&mut s, &f.wire).unwrap();
893        assert_eq!(bob.state, NodeState::Resyncing);
894    }
895
896    #[test]
897    fn payload_emits_received_event() {
898        let mut alice = GroupNode::new(1, group_id());
899        let mut bob = GroupNode::new(2, group_id());
900        alice.bootstrap_as_creator(1);
901        bob.bootstrap_as_joiner(1, 0);
902        let mut s = PlainSealer;
903        let sid = alice.member_stream_id(2);
904        let f = alice
905            .send_payload(
906                &mut s,
907                2,
908                StreamType::Text,
909                sid,
910                GbpFlags::ordered_reliable_ack(),
911                b"payload",
912            )
913            .unwrap();
914        let evs = bob.on_wire(&mut s, &f.wire).unwrap();
915        let pr = evs
916            .into_iter()
917            .find_map(|e| match e {
918                Event::PayloadReceived(p) => Some(p),
919                _ => None,
920            })
921            .expect("payload");
922        assert_eq!(pr.stream_type, StreamType::Text);
923        assert_eq!(pr.plaintext, b"payload");
924    }
925
926    // ---- Control-plane handshake -----------------------------------------
927
928    fn drain_errs(events: &[Event]) -> Vec<u16> {
929        events
930            .iter()
931            .filter_map(|e| match e {
932                Event::Error { code, .. } => Some(*code),
933                _ => None,
934            })
935            .collect()
936    }
937
938    fn drain_controls(events: &[Event]) -> Vec<(ControlOpcode, TransitionId)> {
939        events
940            .iter()
941            .filter_map(|e| match e {
942                Event::Control {
943                    opcode,
944                    transition_id,
945                    ..
946                } => Some((*opcode, *transition_id)),
947                _ => None,
948            })
949            .collect()
950    }
951
952    #[test]
953    fn prepare_transition_sets_pending_on_sender_and_receiver() {
954        let mut coord = GroupNode::new(1, group_id());
955        let mut peer = GroupNode::new(2, group_id());
956        coord.bootstrap_as_creator(0);
957        peer.bootstrap_as_joiner(0, 0);
958        let mut s = PlainSealer;
959        // Coordinator sends PREPARE for tid=1
960        let f = coord
961            .send_control(
962                &mut s,
963                0,
964                ControlOpcode::PrepareTransition,
965                1,
966                100,
967                b"commit-blob".to_vec(),
968            )
969            .unwrap();
970        assert_eq!(coord.pending_transition_id, 1, "sender mirrors pending");
971        assert_eq!(coord.transition_state, TransitionState::TPrepared);
972        let evs = peer.on_wire(&mut s, &f.wire).unwrap();
973        assert_eq!(peer.pending_transition_id, 1, "receiver records pending");
974        assert!(
975            drain_errs(&evs).is_empty(),
976            "no error: {:?}",
977            drain_errs(&evs)
978        );
979        let ctls = drain_controls(&evs);
980        assert_eq!(ctls, vec![(ControlOpcode::PrepareTransition, 1)]);
981    }
982
983    #[test]
984    fn ready_with_wrong_tid_is_rejected() {
985        let mut coord = GroupNode::new(1, group_id());
986        let mut peer = GroupNode::new(2, group_id());
987        coord.bootstrap_as_creator(0);
988        peer.bootstrap_as_joiner(0, 0);
989        let mut s = PlainSealer;
990        let f = coord
991            .send_control(&mut s, 0, ControlOpcode::PrepareTransition, 1, 1, vec![])
992            .unwrap();
993        peer.on_wire(&mut s, &f.wire).unwrap();
994        // Peer fakes a READY for the wrong tid
995        let bogus = peer
996            .send_control(&mut s, 1, ControlOpcode::ReadyForTransition, 7, 1, vec![])
997            .unwrap();
998        let evs = coord.on_wire(&mut s, &bogus.wire).unwrap();
999        let errs = drain_errs(&evs);
1000        assert!(errs.contains(&codes::TRANSITION_MISMATCH), "got {:?}", errs);
1001    }
1002
1003    #[test]
1004    fn execute_advances_epoch_and_clears_pending() {
1005        let mut coord = GroupNode::new(1, group_id());
1006        let mut peer = GroupNode::new(2, group_id());
1007        coord.bootstrap_as_creator(0);
1008        peer.bootstrap_as_joiner(0, 0);
1009        let mut s = PlainSealer;
1010        let prep = coord
1011            .send_control(&mut s, 0, ControlOpcode::PrepareTransition, 1, 1, vec![])
1012            .unwrap();
1013        peer.on_wire(&mut s, &prep.wire).unwrap();
1014        // Coordinator broadcasts EXECUTE; both sides apply (coord locally, peer via on_wire)
1015        let exec = coord
1016            .send_control(&mut s, 0, ControlOpcode::ExecuteTransition, 1, 2, vec![])
1017            .unwrap();
1018        coord.apply_transition(1);
1019        let evs = peer.on_wire(&mut s, &exec.wire).unwrap();
1020        assert_eq!(coord.last_transition_id, 1);
1021        assert_eq!(coord.current_epoch, 1);
1022        assert_eq!(peer.last_transition_id, 1);
1023        assert_eq!(peer.current_epoch, 1);
1024        assert_eq!(peer.pending_transition_id, 0);
1025        assert!(evs.iter().any(|e| matches!(
1026            e,
1027            Event::EpochAdvanced {
1028                transition_id: 1,
1029                ..
1030            }
1031        )));
1032    }
1033
1034    #[test]
1035    fn abort_clears_pending_no_advance() {
1036        let mut coord = GroupNode::new(1, group_id());
1037        let mut peer = GroupNode::new(2, group_id());
1038        coord.bootstrap_as_creator(0);
1039        peer.bootstrap_as_joiner(0, 0);
1040        let mut s = PlainSealer;
1041        let prep = coord
1042            .send_control(&mut s, 0, ControlOpcode::PrepareTransition, 1, 1, vec![])
1043            .unwrap();
1044        peer.on_wire(&mut s, &prep.wire).unwrap();
1045        let abort = coord
1046            .send_control(&mut s, 0, ControlOpcode::AbortTransition, 1, 2, vec![])
1047            .unwrap();
1048        peer.on_wire(&mut s, &abort.wire).unwrap();
1049        assert_eq!(peer.pending_transition_id, 0);
1050        assert_eq!(peer.current_epoch, 0);
1051        assert_eq!(peer.transition_state, TransitionState::TAborted);
1052        assert_eq!(coord.transition_state, TransitionState::TAborted);
1053    }
1054
1055    #[test]
1056    fn bootstrap_as_joiner_with_expected_tid_accepts_first_execute() {
1057        let mut coord = GroupNode::new(1, group_id());
1058        // Joiner pre-arms expected_first_tid=1 — typical post-Welcome state.
1059        let mut joiner = GroupNode::new(2, group_id());
1060        coord.bootstrap_as_creator(0);
1061        joiner.bootstrap_as_joiner(0, 1);
1062        assert_eq!(joiner.pending_transition_id, 1);
1063        let mut s = PlainSealer;
1064        // Coordinator must mirror its pending too — simulate by sending PREPARE
1065        let _ = coord
1066            .send_control(&mut s, 0, ControlOpcode::PrepareTransition, 1, 1, vec![])
1067            .unwrap();
1068        // EXECUTE should be accepted by the joiner without ever seeing PREPARE
1069        let exec = coord
1070            .send_control(&mut s, 0, ControlOpcode::ExecuteTransition, 1, 2, vec![])
1071            .unwrap();
1072        let evs = joiner.on_wire(&mut s, &exec.wire).unwrap();
1073        let errs = drain_errs(&evs);
1074        assert!(
1075            errs.is_empty(),
1076            "expected clean apply, got errors {:?}",
1077            errs
1078        );
1079        assert_eq!(joiner.last_transition_id, 1);
1080        assert_eq!(joiner.current_epoch, 1);
1081    }
1082
1083    // ---- Coordinator handover (gbp-control-plane §5.1) ---------------------
1084
1085    #[test]
1086    fn claim_coordinator_sets_flag_and_emits_event() {
1087        let mut node = GroupNode::new(1, group_id());
1088        node.bootstrap_as_creator(0);
1089        node.drain_events();
1090        let mut s = PlainSealer;
1091        let _ = node.claim_coordinator(&mut s, 0).unwrap();
1092        assert!(node.is_coordinator);
1093        let evs = node.drain_events();
1094        assert!(evs.iter().any(|e| matches!(e, Event::BecameCoordinator)));
1095    }
1096
1097    #[test]
1098    fn coordinator_gone_emits_election_needed() {
1099        let mut member = GroupNode::new(2, group_id());
1100        member.bootstrap_as_joiner(0, 0);
1101        member.coordinator_last_seen = Some(Instant::now() - Duration::from_millis(11_000));
1102        let evs = member.check_timeouts();
1103        assert!(
1104            evs.iter()
1105                .any(|e| matches!(e, Event::CoordinatorElectionNeeded))
1106        );
1107        assert!(!member.is_coordinator, "flag cleared on silence");
1108    }
1109
1110    #[test]
1111    fn capabilities_advertise_with_claim_resets_silence_timer() {
1112        let mut member = GroupNode::new(2, group_id());
1113        let mut coord = GroupNode::new(1, group_id());
1114        member.bootstrap_as_joiner(0, 0);
1115        coord.bootstrap_as_creator(0);
1116        let mut s = PlainSealer;
1117        // coord sends claim
1118        let f = coord.claim_coordinator(&mut s, 2).unwrap();
1119        // on_wire already drains events — use the returned vec.
1120        let evs = member.on_wire(&mut s, &f.wire).unwrap();
1121        assert!(
1122            member.coordinator_last_seen.is_some(),
1123            "silence timer reset"
1124        );
1125        assert!(
1126            evs.iter()
1127                .any(|e| matches!(e, Event::CoordinatorClaim { claimant: 1 }))
1128        );
1129    }
1130
1131    #[test]
1132    fn higher_id_yields_to_lower_claimant() {
1133        // Node 5 claims first, then receives a claim from node 2 (lower) → yields.
1134        let mut node5 = GroupNode::new(5, group_id());
1135        let mut node2 = GroupNode::new(2, group_id());
1136        node5.bootstrap_as_joiner(0, 0);
1137        node2.bootstrap_as_creator(0);
1138        let mut s = PlainSealer;
1139        // node5 claims
1140        node5.is_coordinator = true;
1141        // node2 broadcasts claim
1142        let f = node2.claim_coordinator(&mut s, 5).unwrap();
1143        node5.on_wire(&mut s, &f.wire).unwrap();
1144        assert!(!node5.is_coordinator, "node5 yielded to node2");
1145    }
1146
1147    #[test]
1148    fn lower_id_keeps_coordinator_against_higher_claimant() {
1149        let mut node1 = GroupNode::new(1, group_id());
1150        let mut node5 = GroupNode::new(5, group_id());
1151        node1.bootstrap_as_creator(0);
1152        node5.bootstrap_as_joiner(0, 0);
1153        let mut s = PlainSealer;
1154        node1.is_coordinator = true;
1155        let f = node5.claim_coordinator(&mut s, 1).unwrap();
1156        node1.on_wire(&mut s, &f.wire).unwrap();
1157        assert!(node1.is_coordinator, "node1 keeps role — it has lower id");
1158    }
1159
1160    // ---- Tie-break (gbp_rfc §8) ---------------------------------------------
1161
1162    #[test]
1163    fn competing_prepare_lower_member_id_wins() {
1164        // Two coordinators issue PREPARE for the same tid.
1165        // Member 1 (lower) sends first — member 3 (higher) is the loser.
1166        let mut node = GroupNode::new(10, group_id());
1167        node.bootstrap_as_joiner(0, 0);
1168        let mut s = PlainSealer;
1169
1170        // Build a PREPARE from member 1 (lower id).
1171        let mut sender1 = GroupNode::new(1, group_id());
1172        sender1.bootstrap_as_creator(0);
1173        let f1 = sender1
1174            .send_control(
1175                &mut s,
1176                10,
1177                ControlOpcode::PrepareTransition,
1178                1,
1179                1,
1180                b"commit-A".to_vec(),
1181            )
1182            .unwrap();
1183        node.on_wire(&mut s, &f1.wire).unwrap();
1184        assert_eq!(
1185            node.pending_commit_sender,
1186            Some(1),
1187            "member 1 is initial winner"
1188        );
1189
1190        // Build a PREPARE from member 3 (higher id, same tid).
1191        let mut sender3 = GroupNode::new(3, group_id());
1192        sender3.bootstrap_as_creator(0);
1193        let f3 = sender3
1194            .send_control(
1195                &mut s,
1196                10,
1197                ControlOpcode::PrepareTransition,
1198                1,
1199                2,
1200                b"commit-B".to_vec(),
1201            )
1202            .unwrap();
1203        node.on_wire(&mut s, &f3.wire).unwrap();
1204        // Lower sender (1) keeps the win.
1205        assert_eq!(node.pending_commit_sender, Some(1), "member 1 still wins");
1206        assert_eq!(node.pending_transition_id, 1);
1207    }
1208
1209    #[test]
1210    fn competing_prepare_later_lower_id_displaces_winner() {
1211        // Member 5 arrives first, then member 2 (lower) — member 2 wins.
1212        let mut node = GroupNode::new(10, group_id());
1213        node.bootstrap_as_joiner(0, 0);
1214        let mut s = PlainSealer;
1215
1216        let mut sender5 = GroupNode::new(5, group_id());
1217        sender5.bootstrap_as_creator(0);
1218        let f5 = sender5
1219            .send_control(
1220                &mut s,
1221                10,
1222                ControlOpcode::PrepareTransition,
1223                1,
1224                1,
1225                b"commit-X".to_vec(),
1226            )
1227            .unwrap();
1228        node.on_wire(&mut s, &f5.wire).unwrap();
1229        assert_eq!(node.pending_commit_sender, Some(5));
1230
1231        let mut sender2 = GroupNode::new(2, group_id());
1232        sender2.bootstrap_as_creator(0);
1233        let f2 = sender2
1234            .send_control(
1235                &mut s,
1236                10,
1237                ControlOpcode::PrepareTransition,
1238                1,
1239                2,
1240                b"commit-Y".to_vec(),
1241            )
1242            .unwrap();
1243        node.on_wire(&mut s, &f2.wire).unwrap();
1244        assert_eq!(
1245            node.pending_commit_sender,
1246            Some(2),
1247            "member 2 displaces member 5"
1248        );
1249    }
1250
1251    #[test]
1252    fn apply_transition_clears_commit_sender() {
1253        let mut coord = GroupNode::new(1, group_id());
1254        coord.bootstrap_as_creator(0);
1255        let mut s = PlainSealer;
1256        coord
1257            .send_control(&mut s, 0, ControlOpcode::PrepareTransition, 1, 1, vec![])
1258            .unwrap();
1259        coord.apply_transition(1);
1260        assert_eq!(coord.pending_commit_sender, None);
1261    }
1262
1263    // ---- Timer engine -------------------------------------------------------
1264
1265    #[test]
1266    fn prepare_timeout_fires_when_deadline_exceeded() {
1267        let mut coord = GroupNode::new(1, group_id());
1268        coord.bootstrap_as_creator(0);
1269        let mut s = PlainSealer;
1270        coord
1271            .send_control(&mut s, 0, ControlOpcode::PrepareTransition, 1, 1, vec![])
1272            .unwrap();
1273        // Manually backdate the deadline so it appears expired.
1274        coord.prepare_deadline = Some(Instant::now() - Duration::from_millis(1));
1275        let evs = coord.check_timeouts();
1276        assert!(
1277            evs.iter().any(|e| matches!(
1278                e,
1279                Event::Error {
1280                    code: codes::PREPARE_TIMEOUT,
1281                    ..
1282                }
1283            )),
1284            "expected PREPARE_TIMEOUT, got {:?}",
1285            evs
1286        );
1287        assert_eq!(
1288            coord.transition_state,
1289            TransitionState::TAborted,
1290            "transition aborted"
1291        );
1292        assert_eq!(coord.prepare_deadline, None, "deadline cleared");
1293    }
1294
1295    #[test]
1296    fn execute_timeout_fires_when_deadline_exceeded() {
1297        let mut member = GroupNode::new(2, group_id());
1298        member.bootstrap_as_joiner(0, 0);
1299        let mut s = PlainSealer;
1300        // Simulate READY sent → execute_deadline armed.
1301        member.pending_transition_id = 1;
1302        member.transition_state = TransitionState::TPrepared;
1303        member
1304            .send_control(&mut s, 1, ControlOpcode::ReadyForTransition, 1, 1, vec![])
1305            .unwrap();
1306        // Backdate.
1307        member.execute_deadline = Some(Instant::now() - Duration::from_millis(1));
1308        let evs = member.check_timeouts();
1309        assert!(
1310            evs.iter().any(|e| matches!(
1311                e,
1312                Event::Error {
1313                    code: codes::EXECUTE_TIMEOUT,
1314                    ..
1315                }
1316            )),
1317            "expected EXECUTE_TIMEOUT, got {:?}",
1318            evs
1319        );
1320        assert_eq!(member.execute_deadline, None, "deadline cleared");
1321    }
1322
1323    #[test]
1324    fn coordinator_gone_fires_after_silence() {
1325        let mut member = GroupNode::new(2, group_id());
1326        member.bootstrap_as_joiner(0, 0);
1327        // Simulate coordinator was seen 11 seconds ago (> T_COORDINATOR_GRACE_MS = 10_000).
1328        member.coordinator_last_seen = Some(Instant::now() - Duration::from_millis(11_000));
1329        let evs = member.check_timeouts();
1330        assert!(
1331            evs.iter().any(|e| matches!(
1332                e,
1333                Event::Error {
1334                    code: codes::COORDINATOR_GONE,
1335                    ..
1336                }
1337            )),
1338            "expected COORDINATOR_GONE, got {:?}",
1339            evs
1340        );
1341        assert_eq!(member.coordinator_last_seen, None, "timer cleared");
1342    }
1343
1344    #[test]
1345    fn note_coordinator_activity_resets_silence_timer() {
1346        let mut member = GroupNode::new(2, group_id());
1347        member.bootstrap_as_joiner(0, 0);
1348        // Old timestamp — would fire.
1349        member.coordinator_last_seen = Some(Instant::now() - Duration::from_millis(11_000));
1350        // Reset.
1351        member.note_coordinator_activity();
1352        let evs = member.check_timeouts();
1353        assert!(
1354            !evs.iter().any(|e| matches!(
1355                e,
1356                Event::Error {
1357                    code: codes::COORDINATOR_GONE,
1358                    ..
1359                }
1360            )),
1361            "should NOT fire after reset"
1362        );
1363    }
1364
1365    #[test]
1366    fn execute_clears_prepare_deadline() {
1367        let mut coord = GroupNode::new(1, group_id());
1368        coord.bootstrap_as_creator(0);
1369        let mut s = PlainSealer;
1370        coord
1371            .send_control(&mut s, 0, ControlOpcode::PrepareTransition, 1, 1, vec![])
1372            .unwrap();
1373        assert!(coord.prepare_deadline.is_some(), "deadline armed");
1374        coord
1375            .send_control(&mut s, 0, ControlOpcode::ExecuteTransition, 1, 2, vec![])
1376            .unwrap();
1377        assert_eq!(coord.prepare_deadline, None, "deadline cleared on EXECUTE");
1378        assert_eq!(
1379            coord.execute_deadline, None,
1380            "execute_deadline also cleared"
1381        );
1382    }
1383
1384    #[test]
1385    fn receive_prepare_arms_execute_deadline() {
1386        let mut coord = GroupNode::new(1, group_id());
1387        let mut member = GroupNode::new(2, group_id());
1388        coord.bootstrap_as_creator(0);
1389        member.bootstrap_as_joiner(0, 0);
1390        let mut s = PlainSealer;
1391        let f = coord
1392            .send_control(&mut s, 0, ControlOpcode::PrepareTransition, 1, 1, vec![])
1393            .unwrap();
1394        member.on_wire(&mut s, &f.wire).unwrap();
1395        assert!(
1396            member.execute_deadline.is_some(),
1397            "execute_deadline armed on receiving PREPARE"
1398        );
1399    }
1400
1401    #[test]
1402    fn receive_execute_clears_execute_deadline() {
1403        let mut coord = GroupNode::new(1, group_id());
1404        let mut member = GroupNode::new(2, group_id());
1405        coord.bootstrap_as_creator(0);
1406        member.bootstrap_as_joiner(0, 0);
1407        let mut s = PlainSealer;
1408        let prep = coord
1409            .send_control(&mut s, 0, ControlOpcode::PrepareTransition, 1, 1, vec![])
1410            .unwrap();
1411        member.on_wire(&mut s, &prep.wire).unwrap();
1412        let exec = coord
1413            .send_control(&mut s, 0, ControlOpcode::ExecuteTransition, 1, 2, vec![])
1414            .unwrap();
1415        member.on_wire(&mut s, &exec.wire).unwrap();
1416        assert_eq!(member.execute_deadline, None, "cleared on EXECUTE");
1417    }
1418
1419    #[test]
1420    fn no_timeout_when_deadlines_not_set() {
1421        let mut node = GroupNode::new(1, group_id());
1422        node.bootstrap_as_creator(0);
1423        node.drain_events(); // clear bootstrap StateChanged events
1424        let evs = node.check_timeouts();
1425        assert!(evs.is_empty(), "no events without armed deadlines");
1426    }
1427
1428    #[test]
1429    fn prepare_with_already_applied_tid_is_rejected() {
1430        // After the coordinator has fully applied tid=1, a replay or
1431        // late-coordinator PREPARE with the same tid must fail validation.
1432        let mut coord = GroupNode::new(1, group_id());
1433        coord.bootstrap_as_creator(0);
1434        let mut s = PlainSealer;
1435        let _ = coord
1436            .send_control(&mut s, 0, ControlOpcode::PrepareTransition, 1, 1, vec![])
1437            .unwrap();
1438        coord.apply_transition(1);
1439        assert_eq!(coord.last_transition_id, 1);
1440        assert_eq!(coord.pending_transition_id, 0);
1441        // Forge a PREPARE with the same already-applied tid (epoch matches
1442        // because we synthesise it locally with a peer node on the same
1443        // post-apply epoch).
1444        let mut peer = GroupNode::new(2, group_id());
1445        peer.bootstrap_as_joiner(coord.current_epoch, 0);
1446        let stale = peer
1447            .send_control(&mut s, 1, ControlOpcode::PrepareTransition, 1, 9, vec![])
1448            .unwrap();
1449        let evs = coord.on_wire(&mut s, &stale.wire).unwrap();
1450        let errs = drain_errs(&evs);
1451        assert!(
1452            errs.contains(&codes::TRANSITION_MISMATCH),
1453            "expected TRANSITION_MISMATCH, got {:?}",
1454            errs
1455        );
1456    }
1457
1458    #[test]
1459    fn decrypt_failed_is_non_fatal() {
1460        // Simulate a frame our open() can't unlock: a sealer that fails on `open`.
1461        struct OpenFailSealer;
1462        impl Sealer for OpenFailSealer {
1463            fn seal(
1464                &mut self,
1465                _: StreamType,
1466                _: SequenceNo,
1467                p: &[u8],
1468            ) -> Result<Vec<u8>, MlsError> {
1469                Ok(p.to_vec())
1470            }
1471            fn open(
1472                &mut self,
1473                _: StreamType,
1474                _: SequenceNo,
1475                _: &[u8],
1476            ) -> Result<Vec<u8>, MlsError> {
1477                Err(MlsError::Aead("simulated".into()))
1478            }
1479        }
1480        let mut alice = GroupNode::new(1, group_id());
1481        let mut bob = GroupNode::new(2, group_id());
1482        alice.bootstrap_as_creator(1);
1483        bob.bootstrap_as_joiner(1, 0);
1484        let mut s = PlainSealer;
1485        let sid = alice.member_stream_id(2);
1486        let f = alice
1487            .send_payload(
1488                &mut s,
1489                2,
1490                StreamType::Text,
1491                sid,
1492                GbpFlags::ordered_reliable_ack(),
1493                b"x",
1494            )
1495            .unwrap();
1496        let mut fail = OpenFailSealer;
1497        let evs = bob.on_wire(&mut fail, &f.wire).unwrap();
1498        let err = evs
1499            .iter()
1500            .find_map(|e| match e {
1501                Event::Error {
1502                    code,
1503                    fatal,
1504                    retryable,
1505                    ..
1506                } => Some((*code, *fatal, *retryable)),
1507                _ => None,
1508            })
1509            .expect("error event");
1510        assert_eq!(err.0, codes::DECRYPT_FAILED);
1511        assert!(!err.1, "must be non-fatal");
1512        assert!(err.2, "must be retryable");
1513        assert_eq!(bob.state, NodeState::Active, "bob stays Active");
1514    }
1515}