Skip to main content

gbp_node/
node.rs

1//! GBP-layer group node.
2//!
3//! Responsibilities of this layer (analogous to IP):
4//!
5//! * Decode incoming CBOR frames and validate `version`, `group_id`, `epoch`
6//!   and `transition_id` per the GBP spec.
7//! * Enforce a per-`(stream_type, stream_id)` replay window via
8//!   `sequence_no`.
9//! * Open the AEAD payload through the [`Sealer`] trait (typically backed by
10//!   `gbp-mls`).
11//! * Surface decoded payloads to sub-protocols as
12//!   [`Event::PayloadReceived`]; the sub-protocol layer is responsible for
13//!   message-level semantics.
14//! * Drive the control plane: handle `EXECUTE_TRANSITION`, request resync on
15//!   `EPOCH_MISMATCH`, etc.
16//!
17//! Out of scope: parsing GTP / GAP / GSP payloads, GTP idempotency, GAP
18//! `key_phase` validation and mute-list tracking. Those concerns belong to
19//! the per-sub-protocol clients in the `gtp-protocol`, `gap-protocol` and
20//! `gsp-protocol` crates.
21
22use gbp::{CodecError, ControlMessage, ErrorObject, GbpFrame};
23use gbp_core::{
24    ControlOpcode, ErrorClass, GbpFlags, GroupId, MemberId, NodeState, PayloadCodec, SequenceNo,
25    StreamId, StreamType, TransitionId, TransitionState, codes, errors::ErrorSpec, timeouts,
26};
27use gbp_mls::{MlsError, label_for};
28use std::collections::HashMap;
29use std::time::{Duration, Instant};
30
31/// Errors raised by [`GroupNode`].
32#[derive(Debug, thiserror::Error)]
33pub enum NodeError {
34    /// Codec error.
35    #[error("codec: {0}")]
36    Codec(#[from] CodecError),
37    /// MLS / AEAD error.
38    #[error("mls: {0}")]
39    Mls(#[from] MlsError),
40    /// The node is not in a state that allows the requested operation.
41    #[error("invalid state: {0}")]
42    InvalidState(String),
43}
44
45/// A wire-ready outbound frame: the recipient and its serialised CBOR bytes.
46pub struct OutboundFrame {
47    /// Target member id.
48    pub to: MemberId,
49    /// CBOR-encoded [`GbpFrame`] bytes.
50    pub wire: Vec<u8>,
51}
52
53/// Information about a payload delivered by GBP to a sub-protocol.
54#[derive(Debug, Clone)]
55pub struct DeliveredPayload {
56    /// Stream class on which the frame arrived.
57    pub stream_type: StreamType,
58    /// Stream id from the frame (preserved so receivers can demultiplex
59    /// multiple sub-streams).
60    pub stream_id: StreamId,
61    /// Sequence number after passing the replay window.
62    pub sequence_no: SequenceNo,
63    /// Frame flag bits, copied as-is.
64    pub flags: u16,
65    /// Decrypted plaintext bytes.
66    pub plaintext: Vec<u8>,
67    /// Codec used to encode the plaintext (from the frame's `pf` field).
68    pub codec: PayloadCodec,
69}
70
71/// Events surfaced by the GBP layer.
72#[derive(Debug, Clone)]
73pub enum Event {
74    /// Node FSM changed state.
75    StateChanged {
76        /// Previous state.
77        from: NodeState,
78        /// New state.
79        to: NodeState,
80    },
81    /// Payload delivered to a sub-protocol (Text / Audio / Signal). Control
82    /// frames are handled internally and do not surface as
83    /// [`Event::PayloadReceived`].
84    PayloadReceived(DeliveredPayload),
85    /// A control plane message was received and parsed.
86    Control {
87        /// Sender member id.
88        from: MemberId,
89        /// Decoded opcode.
90        opcode: ControlOpcode,
91        /// `transition_id` carried by the message.
92        transition_id: TransitionId,
93        /// `request_id` echoed by ACK / NACK responders.
94        request_id: u32,
95        /// Opcode-specific args (CBOR or opaque bytes; e.g. the MLS Commit
96        /// embedded in `PREPARE_TRANSITION`).
97        args: Vec<u8>,
98    },
99    /// An error was raised.
100    Error {
101        /// Numeric error code.
102        code: u16,
103        /// Error class.
104        class: ErrorClass,
105        /// MAY be retried.
106        retryable: bool,
107        /// Fatal — the node is now in `FAILED`.
108        fatal: bool,
109        /// Human-readable reason.
110        reason: String,
111    },
112    /// Epoch transition has been applied locally.
113    EpochAdvanced {
114        /// New epoch.
115        epoch: u64,
116        /// `transition_id` that produced the new epoch.
117        transition_id: TransitionId,
118    },
119    /// Coordinator silence exceeded `T_coordinator_grace`. The application
120    /// should call [`GroupNode::claim_coordinator`] if this node has the
121    /// lowest `MemberId` among currently active members.
122    CoordinatorElectionNeeded,
123    /// This node successfully claimed the coordinator role (sent
124    /// `CAPABILITIES_ADVERTISE` with `coordinator_claim=true`).
125    BecameCoordinator,
126    /// A remote member broadcast a coordinator claim.
127    CoordinatorClaim {
128        /// The claiming member's id.
129        claimant: MemberId,
130    },
131}
132
133/// GBP-layer node.
134///
135/// Owns the framing, AEAD, replay window, FSM and control plane.
136/// Sub-protocol semantics live in their own crates and use this type plus a
137/// [`Sealer`] for outbound traffic and `on_wire` + the resulting events for
138/// inbound traffic.
139pub struct GroupNode {
140    /// Application-level member id.
141    pub member_id: MemberId,
142    /// Whether this node currently holds the coordinator role.
143    pub is_coordinator: bool,
144    /// 16-byte group identifier.
145    pub group_id: GroupId,
146    /// Current epoch as observed by the GBP layer (the authoritative epoch
147    /// lives in the underlying MLS group).
148    pub current_epoch: u64,
149    /// Last applied `transition_id`.
150    pub last_transition_id: TransitionId,
151    /// Pending `transition_id` (set during PREPARE / READY).
152    pub pending_transition_id: TransitionId,
153    /// Node FSM.
154    pub state: NodeState,
155    /// Transition FSM.
156    pub transition_state: TransitionState,
157
158    out_seq: HashMap<(StreamType, StreamId), SequenceNo>,
159    in_hw: HashMap<(StreamType, StreamId), SequenceNo>,
160    events: Vec<Event>,
161
162    /// MemberId of the member whose PREPARE_TRANSITION is currently pending.
163    /// Used for tie-break: if two PREPAREs arrive for the same transition_id,
164    /// the one from the lower MemberId wins (gbp_rfc §8).
165    pending_commit_sender: Option<MemberId>,
166    /// Deadline for receiving quorum READY after issuing PREPARE_TRANSITION.
167    /// Armed when coordinator sends PREPARE; fires ERR_PREPARE_TIMEOUT.
168    prepare_deadline: Option<Instant>,
169    /// Deadline for receiving EXECUTE_TRANSITION after sending READY_FOR_TRANSITION.
170    /// Armed when a member sends READY; fires ERR_EXECUTE_TIMEOUT.
171    execute_deadline: Option<Instant>,
172    /// Timestamp of last coordinator activity. When silence exceeds
173    /// T_COORDINATOR_GRACE_MS the node emits ERR_COORDINATOR_GONE.
174    coordinator_last_seen: Option<Instant>,
175}
176
177impl GroupNode {
178    /// Builds a fresh node in the `IDLE` state.
179    pub fn new(member_id: MemberId, group_id: GroupId) -> Self {
180        Self {
181            member_id,
182            group_id,
183            is_coordinator: false,
184            current_epoch: 0,
185            last_transition_id: 0,
186            pending_transition_id: 0,
187            state: NodeState::Idle,
188            transition_state: TransitionState::TIdle,
189            out_seq: HashMap::new(),
190            in_hw: HashMap::new(),
191            events: Vec::new(),
192            pending_commit_sender: None,
193            prepare_deadline: None,
194            execute_deadline: None,
195            coordinator_last_seen: None,
196        }
197    }
198
199    /// Drives the node from `IDLE` to `ACTIVE` as a creator.
200    pub fn bootstrap_as_creator(&mut self, epoch: u64) {
201        self.transition(NodeState::Connecting);
202        self.transition(NodeState::EstablishingGroup);
203        self.current_epoch = epoch;
204        self.transition(NodeState::Active);
205    }
206
207    /// Drives the node from `IDLE` to `ACTIVE` as a joiner.
208    ///
209    /// `expected_first_tid` lets the joiner pre-arm its pending transition
210    /// state so that the very next `EXECUTE_TRANSITION` (which will arrive
211    /// without a preceding PREPARE the joiner could decrypt — that PREPARE
212    /// was sealed under the pre-Welcome epoch) is accepted by
213    /// `handle_control`'s tid-validation matrix. Pass `0` if the joiner
214    /// recovered out-of-band and is already current.
215    pub fn bootstrap_as_joiner(&mut self, epoch: u64, expected_first_tid: u32) {
216        self.transition(NodeState::Connecting);
217        self.transition(NodeState::EstablishingGroup);
218        self.current_epoch = epoch;
219        if expected_first_tid > 0 {
220            self.pending_transition_id = expected_first_tid;
221            self.transition_state = TransitionState::TPrepared;
222        }
223        self.transition(NodeState::Active);
224    }
225
226    /// Drains and returns all queued events.
227    pub fn drain_events(&mut self) -> Vec<Event> {
228        std::mem::take(&mut self.events)
229    }
230
231    /// Returns a sender-unique `stream_id` within the given base class.
232    ///
233    /// This is used so that the receiver's replay window does not conflate
234    /// streams that originate from different members.
235    pub fn member_stream_id(&self, base: u32) -> StreamId {
236        debug_assert!(
237            self.member_id < 1_000_000,
238            "member_id overflow: {0}",
239            self.member_id
240        );
241        base + self.member_id * 100
242    }
243
244    /// Sends an opaque plaintext payload on the given stream.
245    ///
246    /// Used by the sub-protocol clients: each one encodes its message and
247    /// forwards the resulting bytes here together with the codec that was used.
248    /// Pass [`PayloadCodec::Cbor`] for the default encoding; it is
249    /// backward-compatible with pre-1.5 peers.
250    pub fn send_payload<S: Sealer>(
251        &mut self,
252        seal: &mut S,
253        target: MemberId,
254        stream_type: StreamType,
255        stream_id: StreamId,
256        flags: u16,
257        plaintext: &[u8],
258        codec: PayloadCodec,
259    ) -> Result<OutboundFrame, NodeError> {
260        self.assert_can_send()?;
261        let seq = self.next_seq(stream_type, stream_id);
262        let ciphertext = seal.seal(stream_type, seq, plaintext)?;
263        let frame = GbpFrame::new(
264            self.group_id,
265            self.current_epoch,
266            self.last_transition_id,
267            stream_type,
268            stream_id,
269            flags,
270            seq,
271            ciphertext,
272            codec.as_u8(),
273        );
274        Ok(OutboundFrame {
275            to: target,
276            wire: frame.to_cbor(),
277        })
278    }
279
280    /// Sends a control plane message on Stream 0. Wrapper around
281    /// [`GroupNode::send_payload`].
282    ///
283    /// Side effect: when the coordinator originates a `PREPARE_TRANSITION`,
284    /// it must locally adopt the same `pending_transition_id` so that the
285    /// inbound READY / EXECUTE validation matrix in `handle_control` lines
286    /// up. Without this, the coordinator never matches its own pending tid
287    /// against the remote READY frames it expects, and the handshake never
288    /// completes.
289    pub fn send_control<S: Sealer>(
290        &mut self,
291        seal: &mut S,
292        target: MemberId,
293        opcode: ControlOpcode,
294        transition_id: TransitionId,
295        request_id: u32,
296        args: Vec<u8>,
297    ) -> Result<OutboundFrame, NodeError> {
298        let ctl = ControlMessage::with_args(
299            opcode as u16,
300            request_id,
301            self.member_id,
302            transition_id,
303            args,
304        );
305        let mut flags = GbpFlags::ordered_reliable_system();
306        if matches!(
307            opcode,
308            ControlOpcode::PrepareTransition
309                | ControlOpcode::ReadyForTransition
310                | ControlOpcode::ExecuteTransition
311        ) {
312            flags |= GbpFlags::CRITICAL;
313        }
314        // Sender-side state mirroring (matches what `handle_control` does on
315        // the receiver side). We only update on PREPARE/EXECUTE/ABORT — READY
316        // is purely an ack carrying the existing pending tid.
317        match opcode {
318            ControlOpcode::PrepareTransition => {
319                self.pending_transition_id = transition_id;
320                self.transition_state = TransitionState::TPrepared;
321                self.prepare_deadline =
322                    Some(Instant::now() + Duration::from_millis(timeouts::T_PREPARE_MAX_MS));
323                self.execute_deadline = None;
324            }
325            ControlOpcode::ReadyForTransition => {
326                self.execute_deadline =
327                    Some(Instant::now() + Duration::from_millis(timeouts::T_EXECUTE_MAX_MS));
328            }
329            ControlOpcode::ExecuteTransition | ControlOpcode::AbortTransition => {
330                self.prepare_deadline = None;
331                self.execute_deadline = None;
332                if opcode == ControlOpcode::AbortTransition {
333                    self.pending_transition_id = 0;
334                    self.transition_state = TransitionState::TAborted;
335                }
336            }
337            _ => {}
338        }
339        let stream_id = self.member_stream_id(0);
340        self.send_payload(
341            seal,
342            target,
343            StreamType::Control,
344            stream_id,
345            flags,
346            &ctl.to_cbor(),
347            PayloadCodec::Cbor,
348        )
349    }
350
351    /// Feeds wire bytes to the node.
352    ///
353    /// Performs the §6.2 validation pipeline (version → group_id → epoch →
354    /// payload_size → transition_id → replay), opens the AEAD payload and
355    /// either:
356    /// * dispatches the parsed control message internally (for
357    ///   `StreamType::Control`), or
358    /// * surfaces an [`Event::PayloadReceived`] (for application streams).
359    ///
360    /// Returns every event that was produced as a result.
361    pub fn on_wire<S: Sealer>(
362        &mut self,
363        seal: &mut S,
364        wire: &[u8],
365    ) -> Result<Vec<Event>, NodeError> {
366        // Decode without payload-size validation — we want a malformed v!=1
367        // frame to surface as `ERR_UNSUPPORTED_VERSION`, not as
368        // `ERR_PAYLOAD_SIZE_MISMATCH`. Validation runs in deliver_frame, in
369        // the order required by §6.2.
370        let frame = match GbpFrame::decode(wire) {
371            Ok(f) => f,
372            Err(e) => {
373                self.emit_err_spec(codes::STREAM_POLICY_VIOLATION, format!("frame decode: {e}"));
374                return Ok(self.drain_events());
375            }
376        };
377        self.deliver_frame(seal, frame)?;
378        Ok(self.drain_events())
379    }
380
381    fn deliver_frame<S: Sealer>(&mut self, seal: &mut S, frame: GbpFrame) -> Result<(), NodeError> {
382        // §6.2 order: version → group_id → epoch → payload_size →
383        // transition_id (when CRITICAL) → replay.
384        if frame.version != 1 {
385            self.emit_err_spec(codes::UNSUPPORTED_VERSION, "version != 1");
386            return Ok(());
387        }
388        if frame.group_id_array() != self.group_id {
389            self.emit_err_spec(codes::UNKNOWN_GROUP, "group_id");
390            return Ok(());
391        }
392        if frame.epoch != self.current_epoch {
393            self.emit_err_spec(
394                codes::EPOCH_MISMATCH,
395                format!("got {}, expected {}", frame.epoch, self.current_epoch),
396            );
397            self.trigger_resync();
398            return Ok(());
399        }
400        if let Err(e) = frame.validate_payload_size() {
401            self.emit_err_spec(codes::STREAM_POLICY_VIOLATION, format!("payload size: {e}"));
402            return Ok(());
403        }
404        let flags = GbpFlags::from_bits(frame.flags);
405        let st = match frame.stream_type_typed() {
406            Ok(st) => st,
407            Err(_) => {
408                self.emit_err_spec(codes::STREAM_POLICY_VIOLATION, "unknown stream_type");
409                return Ok(());
410            }
411        };
412
413        // §6.2 transition_id ordering: CRITICAL frames on application streams
414        // MUST equal `last_transition_id`. Control-stream frames are exempt
415        // from this check and validated per-opcode inside `handle_control`,
416        // because PREPARE_TRANSITION legitimately carries `last + 1` and
417        // EXECUTE / ACK carry `pending_transition_id`.
418        if st != StreamType::Control
419            && flags.has(GbpFlags::CRITICAL)
420            && frame.transition_id != self.last_transition_id
421        {
422            self.emit_err_spec(
423                codes::TRANSITION_MISMATCH,
424                format!(
425                    "got tid={}, expected {}",
426                    frame.transition_id, self.last_transition_id
427                ),
428            );
429            return Ok(());
430        }
431
432        let key = (st, frame.stream_id);
433        let hw = self.in_hw.get(&key).copied().unwrap_or(0);
434        if frame.sequence_no <= hw {
435            self.emit_err_spec(
436                codes::REPLAY_DETECTED,
437                format!(
438                    "st={} sid={} seq={} hw={}",
439                    st, frame.stream_id, frame.sequence_no, hw
440                ),
441            );
442            return Ok(());
443        }
444        self.in_hw.insert(key, frame.sequence_no);
445
446        let plain = match seal.open(st, frame.sequence_no, &frame.encrypted_payload) {
447            Ok(p) => p,
448            Err(e) => {
449                // Non-fatal: a frame addressed under a different MLS epoch
450                // (e.g. PREPARE_TRANSITION reaching a fresh joiner that has
451                // already accepted the post-commit Welcome) cannot be
452                // decrypted, but that's expected and the node MUST keep
453                // running to receive the subsequent EXECUTE frame on the
454                // shared post-merge epoch.
455                self.emit_err_named(
456                    codes::DECRYPT_FAILED,
457                    ErrorClass::Crypto,
458                    true,  // retryable: caller may resync via digest
459                    false, // non-fatal
460                    format!("aead open: {e}"),
461                );
462                return Ok(());
463            }
464        };
465
466        match st {
467            StreamType::Control => self.handle_control(plain),
468            other => self.events.push(Event::PayloadReceived(DeliveredPayload {
469                stream_type: other,
470                stream_id: frame.stream_id,
471                sequence_no: frame.sequence_no,
472                flags: frame.flags,
473                plaintext: plain,
474                codec: frame.payload_codec(),
475            })),
476        }
477        Ok(())
478    }
479
480    fn handle_control(&mut self, plain: Vec<u8>) {
481        let c = match ControlMessage::from_cbor(&plain) {
482            Ok(c) => c,
483            Err(_) => {
484                self.emit_err_spec(codes::STREAM_POLICY_VIOLATION, "control decode");
485                return;
486            }
487        };
488        let opcode = match ControlOpcode::try_from(c.opcode) {
489            Ok(op) => op,
490            Err(_) => {
491                self.emit_err_spec(codes::STREAM_POLICY_VIOLATION, "unknown opcode");
492                return;
493            }
494        };
495        // Per-opcode TransitionID validation (§5 of gbp-control-plane).
496        let tid_ok = match opcode {
497            // PREPARE introduces last+1; receiver simply records it as pending.
498            // Re-issuing a PREPARE for an already-pending tid is allowed; a
499            // smaller-or-equal tid that is not strictly newer is rejected.
500            ControlOpcode::PrepareTransition => {
501                c.transition_id > self.last_transition_id
502                    && (self.pending_transition_id == 0
503                        || self.pending_transition_id == c.transition_id)
504            }
505            // READY / EXECUTE / ABORT must reference the pending tid.
506            ControlOpcode::ReadyForTransition
507            | ControlOpcode::ExecuteTransition
508            | ControlOpcode::AbortTransition => {
509                self.pending_transition_id != 0 && c.transition_id == self.pending_transition_id
510            }
511            // Digest / capability / ack / nack: tid is informational, no
512            // ordering constraint at the GBP layer.
513            _ => true,
514        };
515        if !tid_ok {
516            self.emit_err_spec(
517                codes::TRANSITION_MISMATCH,
518                format!(
519                    "control tid={} not valid for {:?} (last={}, pending={})",
520                    c.transition_id, opcode, self.last_transition_id, self.pending_transition_id
521                ),
522            );
523            return;
524        }
525        match opcode {
526            ControlOpcode::PrepareTransition => {
527                // Tie-break (gbp_rfc §8): if a PREPARE for the same tid already
528                // exists from another sender, keep whichever has the lower
529                // MemberId — that member's commit is the canonical winner.
530                if self.pending_transition_id == c.transition_id {
531                    let current_winner = self.pending_commit_sender.unwrap_or(MemberId::MAX);
532                    if c.sender_id >= current_winner {
533                        // Existing winner holds; discard this competing commit
534                        // but still surface the Control event so upper layers
535                        // can observe it.
536                        self.events.push(Event::Control {
537                            from: c.sender_id,
538                            opcode,
539                            transition_id: c.transition_id,
540                            request_id: c.request_id,
541                            args: c.args.to_vec(),
542                        });
543                        return;
544                    }
545                    // New sender wins — replace.
546                }
547                self.pending_transition_id = c.transition_id;
548                self.pending_commit_sender = Some(c.sender_id);
549                self.transition_state = TransitionState::TPrepared;
550                // PREPARE originates from the coordinator — record activity.
551                self.note_coordinator_activity();
552                // Arm execute deadline: member must see EXECUTE within T_execute_max.
553                self.execute_deadline =
554                    Some(Instant::now() + Duration::from_millis(timeouts::T_EXECUTE_MAX_MS));
555            }
556            ControlOpcode::ReadyForTransition => {
557                self.transition_state = TransitionState::TReady;
558                // Coordinator received a READY — clear the per-member wait.
559                self.prepare_deadline = None;
560            }
561            ControlOpcode::ExecuteTransition => {
562                self.execute_deadline = None;
563                self.pending_commit_sender = None;
564                self.apply_transition(c.transition_id);
565                self.note_coordinator_activity();
566            }
567            ControlOpcode::AbortTransition => {
568                self.prepare_deadline = None;
569                self.execute_deadline = None;
570                self.pending_commit_sender = None;
571                self.transition_state = TransitionState::TAborted;
572                self.pending_transition_id = 0;
573            }
574            ControlOpcode::GroupStateDigestResponse => {
575                if self.state == NodeState::Resyncing {
576                    self.transition(NodeState::Active);
577                }
578            }
579            ControlOpcode::CapabilitiesAdvertise => {
580                if Self::is_coordinator_claim(&c.args) {
581                    // Coordinator is alive — reset silence timer.
582                    self.note_coordinator_activity();
583                    // Collision resolution (gbp-control-plane §5.1): if we
584                    // also claimed and the remote claimant has a lower
585                    // MemberId, yield the coordinator role to them.
586                    if self.is_coordinator && c.sender_id < self.member_id {
587                        self.is_coordinator = false;
588                    }
589                    self.events.push(Event::CoordinatorClaim {
590                        claimant: c.sender_id,
591                    });
592                }
593            }
594            _ => {}
595        }
596        self.events.push(Event::Control {
597            from: c.sender_id,
598            opcode,
599            transition_id: c.transition_id,
600            request_id: c.request_id,
601            args: c.args.to_vec(),
602        });
603    }
604
605    /// Applies a new epoch (called by the coordinator after
606    /// `EXECUTE_TRANSITION`).
607    pub fn apply_transition(&mut self, tid: TransitionId) {
608        self.current_epoch += 1;
609        self.last_transition_id = tid;
610        self.pending_transition_id = 0;
611        self.pending_commit_sender = None;
612        self.transition_state = TransitionState::TExecuted;
613        self.out_seq.clear();
614        self.in_hw.clear();
615        self.events.push(Event::EpochAdvanced {
616            epoch: self.current_epoch,
617            transition_id: tid,
618        });
619    }
620
621    /// Forces the node into the `RESYNCING` state.
622    pub fn trigger_resync(&mut self) {
623        if self.state != NodeState::Resyncing {
624            self.transition(NodeState::Resyncing);
625        }
626    }
627
628    /// Checks FSM deadlines and emits timeout events if any have expired.
629    ///
630    /// Call this regularly from the application event loop (e.g. every 500 ms).
631    /// Returns the same events that would come from [`GroupNode::drain_events`];
632    /// the caller may also drain events separately — this method does not
633    /// duplicate them.
634    pub fn check_timeouts(&mut self) -> Vec<Event> {
635        let now = Instant::now();
636
637        if self.prepare_deadline.is_some_and(|d| now >= d) {
638            self.prepare_deadline = None;
639            self.execute_deadline = None;
640            self.pending_transition_id = 0;
641            self.transition_state = TransitionState::TAborted;
642            self.emit_err_spec(codes::PREPARE_TIMEOUT, "T_prepare_max exceeded");
643        }
644
645        if self.execute_deadline.is_some_and(|d| now >= d) {
646            self.execute_deadline = None;
647            self.emit_err_spec(codes::EXECUTE_TIMEOUT, "T_execute_max exceeded");
648        }
649
650        if self.coordinator_last_seen.is_some_and(|t| {
651            now.duration_since(t).as_millis() as u64 >= timeouts::T_COORDINATOR_GRACE_MS
652        }) {
653            self.coordinator_last_seen = None;
654            self.is_coordinator = false;
655            self.emit_err_spec(
656                codes::COORDINATOR_GONE,
657                "coordinator silence exceeded T_coordinator_grace",
658            );
659            self.events.push(Event::CoordinatorElectionNeeded);
660        }
661
662        self.drain_events()
663    }
664
665    /// Records that the coordinator was active right now.
666    ///
667    /// Call this whenever the node receives a frame from the current
668    /// coordinator (e.g. `PREPARE_TRANSITION`, `EXECUTE_TRANSITION`,
669    /// `CAPABILITIES_ADVERTISE` with `coordinator_claim`). Resets the
670    /// coordinator-silence timer used to detect `ERR_COORDINATOR_GONE`.
671    pub fn note_coordinator_activity(&mut self) {
672        self.coordinator_last_seen = Some(Instant::now());
673    }
674
675    /// Claims the coordinator role by broadcasting `CAPABILITIES_ADVERTISE`
676    /// with `coordinator_claim=true` (gbp-control-plane §5.1).
677    ///
678    /// Call this when [`Event::CoordinatorElectionNeeded`] fires **and** this
679    /// node has the lowest `MemberId` among currently active members. The
680    /// caller is responsible for delivering the returned frame to every group
681    /// member.
682    ///
683    /// The args payload is the minimal CBOR map `{0: true}` encoding a
684    /// coordinator claim flag.
685    pub fn claim_coordinator<S: Sealer>(
686        &mut self,
687        seal: &mut S,
688        target: MemberId,
689    ) -> Result<OutboundFrame, NodeError> {
690        // CBOR: {0: true}  →  A1 00 F5
691        let args = vec![0xA1u8, 0x00, 0xF5];
692        self.is_coordinator = true;
693        self.coordinator_last_seen = Some(Instant::now());
694        self.events.push(Event::BecameCoordinator);
695        self.send_control(
696            seal,
697            target,
698            ControlOpcode::CapabilitiesAdvertise,
699            self.last_transition_id,
700            0,
701            args,
702        )
703    }
704
705    /// Returns `true` if the raw args bytes of a `CAPABILITIES_ADVERTISE`
706    /// frame encode a coordinator claim (`{0: true}` in CBOR).
707    fn is_coordinator_claim(args: &[u8]) -> bool {
708        // Minimal CBOR map {0: true}: A1 00 F5
709        // We also accept larger maps where key 0 maps to true.
710        // Fast path: exact match on the minimal encoding.
711        if args == [0xA1, 0x00, 0xF5] {
712            return true;
713        }
714        // General path: scan for the sequence 00 F5 (uint(0) → true) inside a
715        // CBOR map. This is intentionally simple; a full CBOR parser lives in
716        // the gbp-protocol crate and is not a dependency here.
717        args.windows(2).any(|w| w == [0x00, 0xF5])
718    }
719
720    fn transition(&mut self, next: NodeState) {
721        if self.state == next {
722            return;
723        }
724        if !self.state.can_transition_to(next) {
725            let from = self.state;
726            self.state = NodeState::Failed;
727            self.events.push(Event::StateChanged {
728                from,
729                to: NodeState::Failed,
730            });
731            return;
732        }
733        let from = self.state;
734        self.state = next;
735        self.events.push(Event::StateChanged { from, to: next });
736    }
737
738    fn assert_can_send(&self) -> Result<(), NodeError> {
739        if matches!(
740            self.state,
741            NodeState::Active | NodeState::Resyncing | NodeState::EstablishingGroup
742        ) {
743            Ok(())
744        } else {
745            Err(NodeError::InvalidState(format!(
746                "cannot send in state {}",
747                self.state
748            )))
749        }
750    }
751
752    fn next_seq(&mut self, st: StreamType, sid: StreamId) -> SequenceNo {
753        let entry = self.out_seq.entry((st, sid)).or_insert(0);
754        *entry += 1;
755        *entry
756    }
757
758    fn emit_err_spec(&mut self, code: u16, reason: impl Into<String>) {
759        if let Some(spec) = ErrorSpec::lookup(code) {
760            self.emit_err_named(spec.code, spec.class, spec.retryable, spec.fatal, reason);
761        } else {
762            self.emit_err_named(code, ErrorClass::Policy, false, false, reason);
763        }
764    }
765
766    fn emit_err_named(
767        &mut self,
768        code: u16,
769        class: ErrorClass,
770        retryable: bool,
771        fatal: bool,
772        reason: impl Into<String>,
773    ) {
774        let reason = reason.into();
775        // ErrorSpec is authoritative for known codes — use its class/retryable/fatal
776        // so that the wire error always matches the registry.
777        let (class, retryable, fatal) = if let Some(spec) = ErrorSpec::lookup(code) {
778            (spec.class, spec.retryable, spec.fatal)
779        } else {
780            (class, retryable, fatal)
781        };
782        let _ = ErrorObject::new(code, class, retryable, fatal, reason.clone()).to_cbor();
783        self.events.push(Event::Error {
784            code,
785            class,
786            retryable,
787            fatal,
788            reason,
789        });
790        if fatal {
791            let from = self.state;
792            self.state = NodeState::Failed;
793            self.events.push(Event::StateChanged {
794                from,
795                to: NodeState::Failed,
796            });
797        }
798    }
799}
800
801/// Trait abstracting the AEAD seal / open operations.
802///
803/// Implemented for [`gbp_mls::MlsContext`] in this crate; tests typically
804/// implement a no-op identity sealer to exercise the FSM without standing
805/// up an MLS group.
806pub trait Sealer {
807    /// Encrypts `pt` for the given stream and per-stream sequence number.
808    fn seal(&mut self, st: StreamType, seq: SequenceNo, pt: &[u8]) -> Result<Vec<u8>, MlsError>;
809    /// Decrypts `ct` for the given stream and per-stream sequence number.
810    fn open(&mut self, st: StreamType, seq: SequenceNo, ct: &[u8]) -> Result<Vec<u8>, MlsError>;
811}
812
813impl Sealer for gbp_mls::MlsContext {
814    fn seal(&mut self, st: StreamType, seq: SequenceNo, pt: &[u8]) -> Result<Vec<u8>, MlsError> {
815        gbp_mls::MlsContext::seal(self, label_for(st), seq, pt)
816    }
817    fn open(&mut self, st: StreamType, seq: SequenceNo, ct: &[u8]) -> Result<Vec<u8>, MlsError> {
818        gbp_mls::MlsContext::open(self, label_for(st), seq, ct)
819    }
820}
821
822#[cfg(test)]
823mod tests {
824    use super::*;
825
826    struct PlainSealer;
827    impl Sealer for PlainSealer {
828        fn seal(
829            &mut self,
830            _st: StreamType,
831            _seq: SequenceNo,
832            pt: &[u8],
833        ) -> Result<Vec<u8>, MlsError> {
834            Ok(pt.to_vec())
835        }
836        fn open(
837            &mut self,
838            _st: StreamType,
839            _seq: SequenceNo,
840            ct: &[u8],
841        ) -> Result<Vec<u8>, MlsError> {
842            Ok(ct.to_vec())
843        }
844    }
845
846    fn group_id() -> GroupId {
847        let mut g = [0u8; 16];
848        g[..3].copy_from_slice(b"GBP");
849        g
850    }
851
852    #[test]
853    fn replay_window_rejects_repeat() {
854        let mut alice = GroupNode::new(1, group_id());
855        let mut bob = GroupNode::new(2, group_id());
856        alice.bootstrap_as_creator(1);
857        bob.bootstrap_as_joiner(1, 0);
858        let mut s = PlainSealer;
859        let sid = alice.member_stream_id(2);
860        let f = alice
861            .send_payload(
862                &mut s,
863                2,
864                StreamType::Text,
865                sid,
866                GbpFlags::ordered_reliable_ack(),
867                b"hi",
868                PayloadCodec::Cbor,
869            )
870            .unwrap();
871        let _ = bob.on_wire(&mut s, &f.wire).unwrap();
872        let evs = bob.on_wire(&mut s, &f.wire).unwrap();
873        assert!(evs.iter().any(|e| matches!(
874            e,
875            Event::Error {
876                code: codes::REPLAY_DETECTED,
877                ..
878            }
879        )));
880    }
881
882    #[test]
883    fn epoch_mismatch_triggers_resync() {
884        let mut alice = GroupNode::new(1, group_id());
885        let mut bob = GroupNode::new(2, group_id());
886        alice.bootstrap_as_creator(1);
887        bob.bootstrap_as_joiner(1, 0);
888        alice.current_epoch = 2;
889        let mut s = PlainSealer;
890        let sid = alice.member_stream_id(2);
891        let f = alice
892            .send_payload(
893                &mut s,
894                2,
895                StreamType::Text,
896                sid,
897                GbpFlags::ordered_reliable_ack(),
898                b"x",
899                PayloadCodec::Cbor,
900            )
901            .unwrap();
902        let _ = bob.on_wire(&mut s, &f.wire).unwrap();
903        assert_eq!(bob.state, NodeState::Resyncing);
904    }
905
906    #[test]
907    fn payload_emits_received_event() {
908        let mut alice = GroupNode::new(1, group_id());
909        let mut bob = GroupNode::new(2, group_id());
910        alice.bootstrap_as_creator(1);
911        bob.bootstrap_as_joiner(1, 0);
912        let mut s = PlainSealer;
913        let sid = alice.member_stream_id(2);
914        let f = alice
915            .send_payload(
916                &mut s,
917                2,
918                StreamType::Text,
919                sid,
920                GbpFlags::ordered_reliable_ack(),
921                b"payload",
922                PayloadCodec::Cbor,
923            )
924            .unwrap();
925        let evs = bob.on_wire(&mut s, &f.wire).unwrap();
926        let pr = evs
927            .into_iter()
928            .find_map(|e| match e {
929                Event::PayloadReceived(p) => Some(p),
930                _ => None,
931            })
932            .expect("payload");
933        assert_eq!(pr.stream_type, StreamType::Text);
934        assert_eq!(pr.plaintext, b"payload");
935    }
936
937    // ---- Control-plane handshake -----------------------------------------
938
939    fn drain_errs(events: &[Event]) -> Vec<u16> {
940        events
941            .iter()
942            .filter_map(|e| match e {
943                Event::Error { code, .. } => Some(*code),
944                _ => None,
945            })
946            .collect()
947    }
948
949    fn drain_controls(events: &[Event]) -> Vec<(ControlOpcode, TransitionId)> {
950        events
951            .iter()
952            .filter_map(|e| match e {
953                Event::Control {
954                    opcode,
955                    transition_id,
956                    ..
957                } => Some((*opcode, *transition_id)),
958                _ => None,
959            })
960            .collect()
961    }
962
963    #[test]
964    fn prepare_transition_sets_pending_on_sender_and_receiver() {
965        let mut coord = GroupNode::new(1, group_id());
966        let mut peer = GroupNode::new(2, group_id());
967        coord.bootstrap_as_creator(0);
968        peer.bootstrap_as_joiner(0, 0);
969        let mut s = PlainSealer;
970        // Coordinator sends PREPARE for tid=1
971        let f = coord
972            .send_control(
973                &mut s,
974                0,
975                ControlOpcode::PrepareTransition,
976                1,
977                100,
978                b"commit-blob".to_vec(),
979            )
980            .unwrap();
981        assert_eq!(coord.pending_transition_id, 1, "sender mirrors pending");
982        assert_eq!(coord.transition_state, TransitionState::TPrepared);
983        let evs = peer.on_wire(&mut s, &f.wire).unwrap();
984        assert_eq!(peer.pending_transition_id, 1, "receiver records pending");
985        assert!(
986            drain_errs(&evs).is_empty(),
987            "no error: {:?}",
988            drain_errs(&evs)
989        );
990        let ctls = drain_controls(&evs);
991        assert_eq!(ctls, vec![(ControlOpcode::PrepareTransition, 1)]);
992    }
993
994    #[test]
995    fn ready_with_wrong_tid_is_rejected() {
996        let mut coord = GroupNode::new(1, group_id());
997        let mut peer = GroupNode::new(2, group_id());
998        coord.bootstrap_as_creator(0);
999        peer.bootstrap_as_joiner(0, 0);
1000        let mut s = PlainSealer;
1001        let f = coord
1002            .send_control(&mut s, 0, ControlOpcode::PrepareTransition, 1, 1, vec![])
1003            .unwrap();
1004        peer.on_wire(&mut s, &f.wire).unwrap();
1005        // Peer fakes a READY for the wrong tid
1006        let bogus = peer
1007            .send_control(&mut s, 1, ControlOpcode::ReadyForTransition, 7, 1, vec![])
1008            .unwrap();
1009        let evs = coord.on_wire(&mut s, &bogus.wire).unwrap();
1010        let errs = drain_errs(&evs);
1011        assert!(errs.contains(&codes::TRANSITION_MISMATCH), "got {:?}", errs);
1012    }
1013
1014    #[test]
1015    fn execute_advances_epoch_and_clears_pending() {
1016        let mut coord = GroupNode::new(1, group_id());
1017        let mut peer = GroupNode::new(2, group_id());
1018        coord.bootstrap_as_creator(0);
1019        peer.bootstrap_as_joiner(0, 0);
1020        let mut s = PlainSealer;
1021        let prep = coord
1022            .send_control(&mut s, 0, ControlOpcode::PrepareTransition, 1, 1, vec![])
1023            .unwrap();
1024        peer.on_wire(&mut s, &prep.wire).unwrap();
1025        // Coordinator broadcasts EXECUTE; both sides apply (coord locally, peer via on_wire)
1026        let exec = coord
1027            .send_control(&mut s, 0, ControlOpcode::ExecuteTransition, 1, 2, vec![])
1028            .unwrap();
1029        coord.apply_transition(1);
1030        let evs = peer.on_wire(&mut s, &exec.wire).unwrap();
1031        assert_eq!(coord.last_transition_id, 1);
1032        assert_eq!(coord.current_epoch, 1);
1033        assert_eq!(peer.last_transition_id, 1);
1034        assert_eq!(peer.current_epoch, 1);
1035        assert_eq!(peer.pending_transition_id, 0);
1036        assert!(evs.iter().any(|e| matches!(
1037            e,
1038            Event::EpochAdvanced {
1039                transition_id: 1,
1040                ..
1041            }
1042        )));
1043    }
1044
1045    #[test]
1046    fn abort_clears_pending_no_advance() {
1047        let mut coord = GroupNode::new(1, group_id());
1048        let mut peer = GroupNode::new(2, group_id());
1049        coord.bootstrap_as_creator(0);
1050        peer.bootstrap_as_joiner(0, 0);
1051        let mut s = PlainSealer;
1052        let prep = coord
1053            .send_control(&mut s, 0, ControlOpcode::PrepareTransition, 1, 1, vec![])
1054            .unwrap();
1055        peer.on_wire(&mut s, &prep.wire).unwrap();
1056        let abort = coord
1057            .send_control(&mut s, 0, ControlOpcode::AbortTransition, 1, 2, vec![])
1058            .unwrap();
1059        peer.on_wire(&mut s, &abort.wire).unwrap();
1060        assert_eq!(peer.pending_transition_id, 0);
1061        assert_eq!(peer.current_epoch, 0);
1062        assert_eq!(peer.transition_state, TransitionState::TAborted);
1063        assert_eq!(coord.transition_state, TransitionState::TAborted);
1064    }
1065
1066    #[test]
1067    fn bootstrap_as_joiner_with_expected_tid_accepts_first_execute() {
1068        let mut coord = GroupNode::new(1, group_id());
1069        // Joiner pre-arms expected_first_tid=1 — typical post-Welcome state.
1070        let mut joiner = GroupNode::new(2, group_id());
1071        coord.bootstrap_as_creator(0);
1072        joiner.bootstrap_as_joiner(0, 1);
1073        assert_eq!(joiner.pending_transition_id, 1);
1074        let mut s = PlainSealer;
1075        // Coordinator must mirror its pending too — simulate by sending PREPARE
1076        let _ = coord
1077            .send_control(&mut s, 0, ControlOpcode::PrepareTransition, 1, 1, vec![])
1078            .unwrap();
1079        // EXECUTE should be accepted by the joiner without ever seeing PREPARE
1080        let exec = coord
1081            .send_control(&mut s, 0, ControlOpcode::ExecuteTransition, 1, 2, vec![])
1082            .unwrap();
1083        let evs = joiner.on_wire(&mut s, &exec.wire).unwrap();
1084        let errs = drain_errs(&evs);
1085        assert!(
1086            errs.is_empty(),
1087            "expected clean apply, got errors {:?}",
1088            errs
1089        );
1090        assert_eq!(joiner.last_transition_id, 1);
1091        assert_eq!(joiner.current_epoch, 1);
1092    }
1093
1094    // ---- Coordinator handover (gbp-control-plane §5.1) ---------------------
1095
1096    #[test]
1097    fn claim_coordinator_sets_flag_and_emits_event() {
1098        let mut node = GroupNode::new(1, group_id());
1099        node.bootstrap_as_creator(0);
1100        node.drain_events();
1101        let mut s = PlainSealer;
1102        let _ = node.claim_coordinator(&mut s, 0).unwrap();
1103        assert!(node.is_coordinator);
1104        let evs = node.drain_events();
1105        assert!(evs.iter().any(|e| matches!(e, Event::BecameCoordinator)));
1106    }
1107
1108    #[test]
1109    fn coordinator_gone_emits_election_needed() {
1110        let mut member = GroupNode::new(2, group_id());
1111        member.bootstrap_as_joiner(0, 0);
1112        member.coordinator_last_seen = Some(Instant::now() - Duration::from_millis(11_000));
1113        let evs = member.check_timeouts();
1114        assert!(
1115            evs.iter()
1116                .any(|e| matches!(e, Event::CoordinatorElectionNeeded))
1117        );
1118        assert!(!member.is_coordinator, "flag cleared on silence");
1119    }
1120
1121    #[test]
1122    fn capabilities_advertise_with_claim_resets_silence_timer() {
1123        let mut member = GroupNode::new(2, group_id());
1124        let mut coord = GroupNode::new(1, group_id());
1125        member.bootstrap_as_joiner(0, 0);
1126        coord.bootstrap_as_creator(0);
1127        let mut s = PlainSealer;
1128        // coord sends claim
1129        let f = coord.claim_coordinator(&mut s, 2).unwrap();
1130        // on_wire already drains events — use the returned vec.
1131        let evs = member.on_wire(&mut s, &f.wire).unwrap();
1132        assert!(
1133            member.coordinator_last_seen.is_some(),
1134            "silence timer reset"
1135        );
1136        assert!(
1137            evs.iter()
1138                .any(|e| matches!(e, Event::CoordinatorClaim { claimant: 1 }))
1139        );
1140    }
1141
1142    #[test]
1143    fn higher_id_yields_to_lower_claimant() {
1144        // Node 5 claims first, then receives a claim from node 2 (lower) → yields.
1145        let mut node5 = GroupNode::new(5, group_id());
1146        let mut node2 = GroupNode::new(2, group_id());
1147        node5.bootstrap_as_joiner(0, 0);
1148        node2.bootstrap_as_creator(0);
1149        let mut s = PlainSealer;
1150        // node5 claims
1151        node5.is_coordinator = true;
1152        // node2 broadcasts claim
1153        let f = node2.claim_coordinator(&mut s, 5).unwrap();
1154        node5.on_wire(&mut s, &f.wire).unwrap();
1155        assert!(!node5.is_coordinator, "node5 yielded to node2");
1156    }
1157
1158    #[test]
1159    fn lower_id_keeps_coordinator_against_higher_claimant() {
1160        let mut node1 = GroupNode::new(1, group_id());
1161        let mut node5 = GroupNode::new(5, group_id());
1162        node1.bootstrap_as_creator(0);
1163        node5.bootstrap_as_joiner(0, 0);
1164        let mut s = PlainSealer;
1165        node1.is_coordinator = true;
1166        let f = node5.claim_coordinator(&mut s, 1).unwrap();
1167        node1.on_wire(&mut s, &f.wire).unwrap();
1168        assert!(node1.is_coordinator, "node1 keeps role — it has lower id");
1169    }
1170
1171    // ---- Tie-break (gbp_rfc §8) ---------------------------------------------
1172
1173    #[test]
1174    fn competing_prepare_lower_member_id_wins() {
1175        // Two coordinators issue PREPARE for the same tid.
1176        // Member 1 (lower) sends first — member 3 (higher) is the loser.
1177        let mut node = GroupNode::new(10, group_id());
1178        node.bootstrap_as_joiner(0, 0);
1179        let mut s = PlainSealer;
1180
1181        // Build a PREPARE from member 1 (lower id).
1182        let mut sender1 = GroupNode::new(1, group_id());
1183        sender1.bootstrap_as_creator(0);
1184        let f1 = sender1
1185            .send_control(
1186                &mut s,
1187                10,
1188                ControlOpcode::PrepareTransition,
1189                1,
1190                1,
1191                b"commit-A".to_vec(),
1192            )
1193            .unwrap();
1194        node.on_wire(&mut s, &f1.wire).unwrap();
1195        assert_eq!(
1196            node.pending_commit_sender,
1197            Some(1),
1198            "member 1 is initial winner"
1199        );
1200
1201        // Build a PREPARE from member 3 (higher id, same tid).
1202        let mut sender3 = GroupNode::new(3, group_id());
1203        sender3.bootstrap_as_creator(0);
1204        let f3 = sender3
1205            .send_control(
1206                &mut s,
1207                10,
1208                ControlOpcode::PrepareTransition,
1209                1,
1210                2,
1211                b"commit-B".to_vec(),
1212            )
1213            .unwrap();
1214        node.on_wire(&mut s, &f3.wire).unwrap();
1215        // Lower sender (1) keeps the win.
1216        assert_eq!(node.pending_commit_sender, Some(1), "member 1 still wins");
1217        assert_eq!(node.pending_transition_id, 1);
1218    }
1219
1220    #[test]
1221    fn competing_prepare_later_lower_id_displaces_winner() {
1222        // Member 5 arrives first, then member 2 (lower) — member 2 wins.
1223        let mut node = GroupNode::new(10, group_id());
1224        node.bootstrap_as_joiner(0, 0);
1225        let mut s = PlainSealer;
1226
1227        let mut sender5 = GroupNode::new(5, group_id());
1228        sender5.bootstrap_as_creator(0);
1229        let f5 = sender5
1230            .send_control(
1231                &mut s,
1232                10,
1233                ControlOpcode::PrepareTransition,
1234                1,
1235                1,
1236                b"commit-X".to_vec(),
1237            )
1238            .unwrap();
1239        node.on_wire(&mut s, &f5.wire).unwrap();
1240        assert_eq!(node.pending_commit_sender, Some(5));
1241
1242        let mut sender2 = GroupNode::new(2, group_id());
1243        sender2.bootstrap_as_creator(0);
1244        let f2 = sender2
1245            .send_control(
1246                &mut s,
1247                10,
1248                ControlOpcode::PrepareTransition,
1249                1,
1250                2,
1251                b"commit-Y".to_vec(),
1252            )
1253            .unwrap();
1254        node.on_wire(&mut s, &f2.wire).unwrap();
1255        assert_eq!(
1256            node.pending_commit_sender,
1257            Some(2),
1258            "member 2 displaces member 5"
1259        );
1260    }
1261
1262    #[test]
1263    fn apply_transition_clears_commit_sender() {
1264        let mut coord = GroupNode::new(1, group_id());
1265        coord.bootstrap_as_creator(0);
1266        let mut s = PlainSealer;
1267        coord
1268            .send_control(&mut s, 0, ControlOpcode::PrepareTransition, 1, 1, vec![])
1269            .unwrap();
1270        coord.apply_transition(1);
1271        assert_eq!(coord.pending_commit_sender, None);
1272    }
1273
1274    // ---- Timer engine -------------------------------------------------------
1275
1276    #[test]
1277    fn prepare_timeout_fires_when_deadline_exceeded() {
1278        let mut coord = GroupNode::new(1, group_id());
1279        coord.bootstrap_as_creator(0);
1280        let mut s = PlainSealer;
1281        coord
1282            .send_control(&mut s, 0, ControlOpcode::PrepareTransition, 1, 1, vec![])
1283            .unwrap();
1284        // Manually backdate the deadline so it appears expired.
1285        coord.prepare_deadline = Some(Instant::now() - Duration::from_millis(1));
1286        let evs = coord.check_timeouts();
1287        assert!(
1288            evs.iter().any(|e| matches!(
1289                e,
1290                Event::Error {
1291                    code: codes::PREPARE_TIMEOUT,
1292                    ..
1293                }
1294            )),
1295            "expected PREPARE_TIMEOUT, got {:?}",
1296            evs
1297        );
1298        assert_eq!(
1299            coord.transition_state,
1300            TransitionState::TAborted,
1301            "transition aborted"
1302        );
1303        assert_eq!(coord.prepare_deadline, None, "deadline cleared");
1304    }
1305
1306    #[test]
1307    fn execute_timeout_fires_when_deadline_exceeded() {
1308        let mut member = GroupNode::new(2, group_id());
1309        member.bootstrap_as_joiner(0, 0);
1310        let mut s = PlainSealer;
1311        // Simulate READY sent → execute_deadline armed.
1312        member.pending_transition_id = 1;
1313        member.transition_state = TransitionState::TPrepared;
1314        member
1315            .send_control(&mut s, 1, ControlOpcode::ReadyForTransition, 1, 1, vec![])
1316            .unwrap();
1317        // Backdate.
1318        member.execute_deadline = Some(Instant::now() - Duration::from_millis(1));
1319        let evs = member.check_timeouts();
1320        assert!(
1321            evs.iter().any(|e| matches!(
1322                e,
1323                Event::Error {
1324                    code: codes::EXECUTE_TIMEOUT,
1325                    ..
1326                }
1327            )),
1328            "expected EXECUTE_TIMEOUT, got {:?}",
1329            evs
1330        );
1331        assert_eq!(member.execute_deadline, None, "deadline cleared");
1332    }
1333
1334    #[test]
1335    fn coordinator_gone_fires_after_silence() {
1336        let mut member = GroupNode::new(2, group_id());
1337        member.bootstrap_as_joiner(0, 0);
1338        // Simulate coordinator was seen 11 seconds ago (> T_COORDINATOR_GRACE_MS = 10_000).
1339        member.coordinator_last_seen = Some(Instant::now() - Duration::from_millis(11_000));
1340        let evs = member.check_timeouts();
1341        assert!(
1342            evs.iter().any(|e| matches!(
1343                e,
1344                Event::Error {
1345                    code: codes::COORDINATOR_GONE,
1346                    ..
1347                }
1348            )),
1349            "expected COORDINATOR_GONE, got {:?}",
1350            evs
1351        );
1352        assert_eq!(member.coordinator_last_seen, None, "timer cleared");
1353    }
1354
1355    #[test]
1356    fn note_coordinator_activity_resets_silence_timer() {
1357        let mut member = GroupNode::new(2, group_id());
1358        member.bootstrap_as_joiner(0, 0);
1359        // Old timestamp — would fire.
1360        member.coordinator_last_seen = Some(Instant::now() - Duration::from_millis(11_000));
1361        // Reset.
1362        member.note_coordinator_activity();
1363        let evs = member.check_timeouts();
1364        assert!(
1365            !evs.iter().any(|e| matches!(
1366                e,
1367                Event::Error {
1368                    code: codes::COORDINATOR_GONE,
1369                    ..
1370                }
1371            )),
1372            "should NOT fire after reset"
1373        );
1374    }
1375
1376    #[test]
1377    fn execute_clears_prepare_deadline() {
1378        let mut coord = GroupNode::new(1, group_id());
1379        coord.bootstrap_as_creator(0);
1380        let mut s = PlainSealer;
1381        coord
1382            .send_control(&mut s, 0, ControlOpcode::PrepareTransition, 1, 1, vec![])
1383            .unwrap();
1384        assert!(coord.prepare_deadline.is_some(), "deadline armed");
1385        coord
1386            .send_control(&mut s, 0, ControlOpcode::ExecuteTransition, 1, 2, vec![])
1387            .unwrap();
1388        assert_eq!(coord.prepare_deadline, None, "deadline cleared on EXECUTE");
1389        assert_eq!(
1390            coord.execute_deadline, None,
1391            "execute_deadline also cleared"
1392        );
1393    }
1394
1395    #[test]
1396    fn receive_prepare_arms_execute_deadline() {
1397        let mut coord = GroupNode::new(1, group_id());
1398        let mut member = GroupNode::new(2, group_id());
1399        coord.bootstrap_as_creator(0);
1400        member.bootstrap_as_joiner(0, 0);
1401        let mut s = PlainSealer;
1402        let f = coord
1403            .send_control(&mut s, 0, ControlOpcode::PrepareTransition, 1, 1, vec![])
1404            .unwrap();
1405        member.on_wire(&mut s, &f.wire).unwrap();
1406        assert!(
1407            member.execute_deadline.is_some(),
1408            "execute_deadline armed on receiving PREPARE"
1409        );
1410    }
1411
1412    #[test]
1413    fn receive_execute_clears_execute_deadline() {
1414        let mut coord = GroupNode::new(1, group_id());
1415        let mut member = GroupNode::new(2, group_id());
1416        coord.bootstrap_as_creator(0);
1417        member.bootstrap_as_joiner(0, 0);
1418        let mut s = PlainSealer;
1419        let prep = coord
1420            .send_control(&mut s, 0, ControlOpcode::PrepareTransition, 1, 1, vec![])
1421            .unwrap();
1422        member.on_wire(&mut s, &prep.wire).unwrap();
1423        let exec = coord
1424            .send_control(&mut s, 0, ControlOpcode::ExecuteTransition, 1, 2, vec![])
1425            .unwrap();
1426        member.on_wire(&mut s, &exec.wire).unwrap();
1427        assert_eq!(member.execute_deadline, None, "cleared on EXECUTE");
1428    }
1429
1430    #[test]
1431    fn no_timeout_when_deadlines_not_set() {
1432        let mut node = GroupNode::new(1, group_id());
1433        node.bootstrap_as_creator(0);
1434        node.drain_events(); // clear bootstrap StateChanged events
1435        let evs = node.check_timeouts();
1436        assert!(evs.is_empty(), "no events without armed deadlines");
1437    }
1438
1439    #[test]
1440    fn prepare_with_already_applied_tid_is_rejected() {
1441        // After the coordinator has fully applied tid=1, a replay or
1442        // late-coordinator PREPARE with the same tid must fail validation.
1443        let mut coord = GroupNode::new(1, group_id());
1444        coord.bootstrap_as_creator(0);
1445        let mut s = PlainSealer;
1446        let _ = coord
1447            .send_control(&mut s, 0, ControlOpcode::PrepareTransition, 1, 1, vec![])
1448            .unwrap();
1449        coord.apply_transition(1);
1450        assert_eq!(coord.last_transition_id, 1);
1451        assert_eq!(coord.pending_transition_id, 0);
1452        // Forge a PREPARE with the same already-applied tid (epoch matches
1453        // because we synthesise it locally with a peer node on the same
1454        // post-apply epoch).
1455        let mut peer = GroupNode::new(2, group_id());
1456        peer.bootstrap_as_joiner(coord.current_epoch, 0);
1457        let stale = peer
1458            .send_control(&mut s, 1, ControlOpcode::PrepareTransition, 1, 9, vec![])
1459            .unwrap();
1460        let evs = coord.on_wire(&mut s, &stale.wire).unwrap();
1461        let errs = drain_errs(&evs);
1462        assert!(
1463            errs.contains(&codes::TRANSITION_MISMATCH),
1464            "expected TRANSITION_MISMATCH, got {:?}",
1465            errs
1466        );
1467    }
1468
1469    #[test]
1470    fn decrypt_failed_is_non_fatal() {
1471        // Simulate a frame our open() can't unlock: a sealer that fails on `open`.
1472        struct OpenFailSealer;
1473        impl Sealer for OpenFailSealer {
1474            fn seal(
1475                &mut self,
1476                _: StreamType,
1477                _: SequenceNo,
1478                p: &[u8],
1479            ) -> Result<Vec<u8>, MlsError> {
1480                Ok(p.to_vec())
1481            }
1482            fn open(
1483                &mut self,
1484                _: StreamType,
1485                _: SequenceNo,
1486                _: &[u8],
1487            ) -> Result<Vec<u8>, MlsError> {
1488                Err(MlsError::Aead("simulated".into()))
1489            }
1490        }
1491        let mut alice = GroupNode::new(1, group_id());
1492        let mut bob = GroupNode::new(2, group_id());
1493        alice.bootstrap_as_creator(1);
1494        bob.bootstrap_as_joiner(1, 0);
1495        let mut s = PlainSealer;
1496        let sid = alice.member_stream_id(2);
1497        let f = alice
1498            .send_payload(
1499                &mut s,
1500                2,
1501                StreamType::Text,
1502                sid,
1503                GbpFlags::ordered_reliable_ack(),
1504                b"x",
1505                PayloadCodec::Cbor,
1506            )
1507            .unwrap();
1508        let mut fail = OpenFailSealer;
1509        let evs = bob.on_wire(&mut fail, &f.wire).unwrap();
1510        let err = evs
1511            .iter()
1512            .find_map(|e| match e {
1513                Event::Error {
1514                    code,
1515                    fatal,
1516                    retryable,
1517                    ..
1518                } => Some((*code, *fatal, *retryable)),
1519                _ => None,
1520            })
1521            .expect("error event");
1522        assert_eq!(err.0, codes::DECRYPT_FAILED);
1523        assert!(!err.1, "must be non-fatal");
1524        assert!(err.2, "must be retryable");
1525        assert_eq!(bob.state, NodeState::Active, "bob stays Active");
1526    }
1527}