Skip to main content

gbp_node/
node.rs

1//! GBP-layer group node.
2//!
3//! Responsibilities of this layer (analogous to IP):
4//!
5//! * Decode incoming CBOR frames and validate `version`, `group_id`, `epoch`
6//!   and `transition_id` per the GBP spec.
7//! * Enforce a per-`(stream_type, stream_id)` replay window via
8//!   `sequence_no`.
9//! * Open the AEAD payload through the [`Sealer`] trait (typically backed by
10//!   `gbp-mls`).
11//! * Surface decoded payloads to sub-protocols as
12//!   [`Event::PayloadReceived`]; the sub-protocol layer is responsible for
13//!   message-level semantics.
14//! * Drive the control plane: handle `EXECUTE_TRANSITION`, request resync on
15//!   `EPOCH_MISMATCH`, etc.
16//!
17//! Out of scope: parsing GTP / GAP / GSP payloads, GTP idempotency, GAP
18//! `key_phase` validation and mute-list tracking. Those concerns belong to
19//! the per-sub-protocol clients in the `gtp-protocol`, `gap-protocol` and
20//! `gsp-protocol` crates.
21
22use gbp::{CodecError, ControlMessage, ErrorObject, GbpFrame};
23use gbp_core::{
24    ControlOpcode, ErrorClass, GbpFlags, GroupId, MemberId, NodeState, PayloadCodec, SequenceNo,
25    StreamId, StreamType, TransitionId, TransitionState, codes, errors::ErrorSpec, timeouts,
26};
27use gbp_mls::{MlsError, label_for};
28use std::collections::HashMap;
29use std::time::Duration;
30#[cfg(not(target_arch = "wasm32"))]
31use std::time::Instant;
32#[cfg(target_arch = "wasm32")]
33use web_time::Instant;
34
35/// Errors raised by [`GroupNode`].
36#[derive(Debug, thiserror::Error)]
37pub enum NodeError {
38    /// Codec error.
39    #[error("codec: {0}")]
40    Codec(#[from] CodecError),
41    /// MLS / AEAD error.
42    #[error("mls: {0}")]
43    Mls(#[from] MlsError),
44    /// The node is not in a state that allows the requested operation.
45    #[error("invalid state: {0}")]
46    InvalidState(String),
47}
48
49/// A wire-ready outbound frame: the recipient and its serialised CBOR bytes.
50pub struct OutboundFrame {
51    /// Target member id.
52    pub to: MemberId,
53    /// CBOR-encoded [`GbpFrame`] bytes.
54    pub wire: Vec<u8>,
55}
56
57/// Information about a payload delivered by GBP to a sub-protocol.
58#[derive(Debug, Clone)]
59pub struct DeliveredPayload {
60    /// Stream class on which the frame arrived.
61    pub stream_type: StreamType,
62    /// Stream id from the frame (preserved so receivers can demultiplex
63    /// multiple sub-streams).
64    pub stream_id: StreamId,
65    /// Sequence number after passing the replay window.
66    pub sequence_no: SequenceNo,
67    /// Frame flag bits, copied as-is.
68    pub flags: u16,
69    /// Decrypted plaintext bytes.
70    pub plaintext: Vec<u8>,
71    /// Codec used to encode the plaintext (from the frame's `pf` field).
72    pub codec: PayloadCodec,
73}
74
75/// Events surfaced by the GBP layer.
76#[derive(Debug, Clone)]
77pub enum Event {
78    /// Node FSM changed state.
79    StateChanged {
80        /// Previous state.
81        from: NodeState,
82        /// New state.
83        to: NodeState,
84    },
85    /// Payload delivered to a sub-protocol (Text / Audio / Signal). Control
86    /// frames are handled internally and do not surface as
87    /// [`Event::PayloadReceived`].
88    PayloadReceived(DeliveredPayload),
89    /// A control plane message was received and parsed.
90    Control {
91        /// Sender member id.
92        from: MemberId,
93        /// Decoded opcode.
94        opcode: ControlOpcode,
95        /// `transition_id` carried by the message.
96        transition_id: TransitionId,
97        /// `request_id` echoed by ACK / NACK responders.
98        request_id: u32,
99        /// Opcode-specific args (CBOR or opaque bytes; e.g. the MLS Commit
100        /// embedded in `PREPARE_TRANSITION`).
101        args: Vec<u8>,
102    },
103    /// An error was raised.
104    Error {
105        /// Numeric error code.
106        code: u16,
107        /// Error class.
108        class: ErrorClass,
109        /// MAY be retried.
110        retryable: bool,
111        /// Fatal — the node is now in `FAILED`.
112        fatal: bool,
113        /// Human-readable reason.
114        reason: String,
115    },
116    /// Epoch transition has been applied locally.
117    EpochAdvanced {
118        /// New epoch.
119        epoch: u64,
120        /// `transition_id` that produced the new epoch.
121        transition_id: TransitionId,
122    },
123    /// Coordinator silence exceeded `T_coordinator_grace`. The application
124    /// should call [`GroupNode::claim_coordinator`] if this node has the
125    /// lowest `MemberId` among currently active members.
126    CoordinatorElectionNeeded,
127    /// This node successfully claimed the coordinator role (sent
128    /// `CAPABILITIES_ADVERTISE` with `coordinator_claim=true`).
129    BecameCoordinator,
130    /// A remote member broadcast a coordinator claim.
131    CoordinatorClaim {
132        /// The claiming member's id.
133        claimant: MemberId,
134    },
135}
136
137/// GBP-layer node.
138///
139/// Owns the framing, AEAD, replay window, FSM and control plane.
140/// Sub-protocol semantics live in their own crates and use this type plus a
141/// [`Sealer`] for outbound traffic and `on_wire` + the resulting events for
142/// inbound traffic.
143pub struct GroupNode {
144    /// Application-level member id.
145    pub member_id: MemberId,
146    /// Whether this node currently holds the coordinator role.
147    pub is_coordinator: bool,
148    /// 16-byte group identifier.
149    pub group_id: GroupId,
150    /// Current epoch as observed by the GBP layer (the authoritative epoch
151    /// lives in the underlying MLS group).
152    pub current_epoch: u64,
153    /// Last applied `transition_id`.
154    pub last_transition_id: TransitionId,
155    /// Pending `transition_id` (set during PREPARE / READY).
156    pub pending_transition_id: TransitionId,
157    /// Node FSM.
158    pub state: NodeState,
159    /// Transition FSM.
160    pub transition_state: TransitionState,
161
162    out_seq: HashMap<(StreamType, StreamId), SequenceNo>,
163    in_hw: HashMap<(StreamType, StreamId), SequenceNo>,
164    events: Vec<Event>,
165
166    /// MemberId of the member whose PREPARE_TRANSITION is currently pending.
167    /// Used for tie-break: if two PREPAREs arrive for the same transition_id,
168    /// the one from the lower MemberId wins (gbp_rfc §8).
169    pending_commit_sender: Option<MemberId>,
170    /// Deadline for receiving quorum READY after issuing PREPARE_TRANSITION.
171    /// Armed when coordinator sends PREPARE; fires ERR_PREPARE_TIMEOUT.
172    prepare_deadline: Option<Instant>,
173    /// Deadline for receiving EXECUTE_TRANSITION after sending READY_FOR_TRANSITION.
174    /// Armed when a member sends READY; fires ERR_EXECUTE_TIMEOUT.
175    execute_deadline: Option<Instant>,
176    /// Timestamp of last coordinator activity. When silence exceeds
177    /// T_COORDINATOR_GRACE_MS the node emits ERR_COORDINATOR_GONE.
178    coordinator_last_seen: Option<Instant>,
179}
180
181impl GroupNode {
182    /// Builds a fresh node in the `IDLE` state.
183    pub fn new(member_id: MemberId, group_id: GroupId) -> Self {
184        Self {
185            member_id,
186            group_id,
187            is_coordinator: false,
188            current_epoch: 0,
189            last_transition_id: 0,
190            pending_transition_id: 0,
191            state: NodeState::Idle,
192            transition_state: TransitionState::TIdle,
193            out_seq: HashMap::new(),
194            in_hw: HashMap::new(),
195            events: Vec::new(),
196            pending_commit_sender: None,
197            prepare_deadline: None,
198            execute_deadline: None,
199            coordinator_last_seen: None,
200        }
201    }
202
203    /// Drives the node from `IDLE` to `ACTIVE` as a creator.
204    pub fn bootstrap_as_creator(&mut self, epoch: u64) {
205        self.transition(NodeState::Connecting);
206        self.transition(NodeState::EstablishingGroup);
207        self.current_epoch = epoch;
208        self.transition(NodeState::Active);
209    }
210
211    /// Drives the node from `IDLE` to `ACTIVE` as a joiner.
212    ///
213    /// `expected_first_tid` lets the joiner pre-arm its pending transition
214    /// state so that the very next `EXECUTE_TRANSITION` (which will arrive
215    /// without a preceding PREPARE the joiner could decrypt — that PREPARE
216    /// was sealed under the pre-Welcome epoch) is accepted by
217    /// `handle_control`'s tid-validation matrix. Pass `0` if the joiner
218    /// recovered out-of-band and is already current.
219    pub fn bootstrap_as_joiner(&mut self, epoch: u64, expected_first_tid: u32) {
220        self.transition(NodeState::Connecting);
221        self.transition(NodeState::EstablishingGroup);
222        self.current_epoch = epoch;
223        if expected_first_tid > 0 {
224            self.pending_transition_id = expected_first_tid;
225            self.transition_state = TransitionState::TPrepared;
226        }
227        self.transition(NodeState::Active);
228    }
229
230    /// Drains and returns all queued events.
231    pub fn drain_events(&mut self) -> Vec<Event> {
232        std::mem::take(&mut self.events)
233    }
234
235    /// Returns a sender-unique `stream_id` within the given base class.
236    ///
237    /// This is used so that the receiver's replay window does not conflate
238    /// streams that originate from different members.
239    pub fn member_stream_id(&self, base: u32) -> StreamId {
240        debug_assert!(
241            self.member_id < 1_000_000,
242            "member_id overflow: {0}",
243            self.member_id
244        );
245        base + self.member_id * 100
246    }
247
248    /// Sends an opaque plaintext payload on the given stream.
249    ///
250    /// Used by the sub-protocol clients: each one encodes its message and
251    /// forwards the resulting bytes here together with the codec that was used.
252    /// Pass [`PayloadCodec::Cbor`] for the default encoding; it is
253    /// backward-compatible with pre-1.5 peers.
254    pub fn send_payload<S: Sealer>(
255        &mut self,
256        seal: &mut S,
257        target: MemberId,
258        stream_type: StreamType,
259        stream_id: StreamId,
260        flags: u16,
261        plaintext: &[u8],
262        codec: PayloadCodec,
263    ) -> Result<OutboundFrame, NodeError> {
264        self.assert_can_send()?;
265        let seq = self.next_seq(stream_type, stream_id);
266        let ciphertext = seal.seal(stream_type, seq, plaintext)?;
267        let frame = GbpFrame::new(
268            self.group_id,
269            self.current_epoch,
270            self.last_transition_id,
271            stream_type,
272            stream_id,
273            flags,
274            seq,
275            ciphertext,
276            codec.as_u8(),
277        );
278        Ok(OutboundFrame {
279            to: target,
280            wire: frame.to_cbor(),
281        })
282    }
283
284    /// Sends a control plane message on Stream 0. Wrapper around
285    /// [`GroupNode::send_payload`].
286    ///
287    /// Side effect: when the coordinator originates a `PREPARE_TRANSITION`,
288    /// it must locally adopt the same `pending_transition_id` so that the
289    /// inbound READY / EXECUTE validation matrix in `handle_control` lines
290    /// up. Without this, the coordinator never matches its own pending tid
291    /// against the remote READY frames it expects, and the handshake never
292    /// completes.
293    pub fn send_control<S: Sealer>(
294        &mut self,
295        seal: &mut S,
296        target: MemberId,
297        opcode: ControlOpcode,
298        transition_id: TransitionId,
299        request_id: u32,
300        args: Vec<u8>,
301    ) -> Result<OutboundFrame, NodeError> {
302        let ctl = ControlMessage::with_args(
303            opcode as u16,
304            request_id,
305            self.member_id,
306            transition_id,
307            args,
308        );
309        let mut flags = GbpFlags::ordered_reliable_system();
310        if matches!(
311            opcode,
312            ControlOpcode::PrepareTransition
313                | ControlOpcode::ReadyForTransition
314                | ControlOpcode::ExecuteTransition
315        ) {
316            flags |= GbpFlags::CRITICAL;
317        }
318        // Sender-side state mirroring (matches what `handle_control` does on
319        // the receiver side). We only update on PREPARE/EXECUTE/ABORT — READY
320        // is purely an ack carrying the existing pending tid.
321        match opcode {
322            ControlOpcode::PrepareTransition => {
323                self.pending_transition_id = transition_id;
324                self.transition_state = TransitionState::TPrepared;
325                self.prepare_deadline =
326                    Some(Instant::now() + Duration::from_millis(timeouts::T_PREPARE_MAX_MS));
327                self.execute_deadline = None;
328            }
329            ControlOpcode::ReadyForTransition => {
330                self.execute_deadline =
331                    Some(Instant::now() + Duration::from_millis(timeouts::T_EXECUTE_MAX_MS));
332            }
333            ControlOpcode::ExecuteTransition | ControlOpcode::AbortTransition => {
334                self.prepare_deadline = None;
335                self.execute_deadline = None;
336                if opcode == ControlOpcode::AbortTransition {
337                    self.pending_transition_id = 0;
338                    self.transition_state = TransitionState::TAborted;
339                }
340            }
341            _ => {}
342        }
343        let stream_id = self.member_stream_id(0);
344        self.send_payload(
345            seal,
346            target,
347            StreamType::Control,
348            stream_id,
349            flags,
350            &ctl.to_cbor(),
351            PayloadCodec::Cbor,
352        )
353    }
354
355    /// Feeds wire bytes to the node.
356    ///
357    /// Performs the §6.2 validation pipeline (version → group_id → epoch →
358    /// payload_size → transition_id → replay), opens the AEAD payload and
359    /// either:
360    /// * dispatches the parsed control message internally (for
361    ///   `StreamType::Control`), or
362    /// * surfaces an [`Event::PayloadReceived`] (for application streams).
363    ///
364    /// Returns every event that was produced as a result.
365    pub fn on_wire<S: Sealer>(
366        &mut self,
367        seal: &mut S,
368        wire: &[u8],
369    ) -> Result<Vec<Event>, NodeError> {
370        // Decode without payload-size validation — we want a malformed v!=1
371        // frame to surface as `ERR_UNSUPPORTED_VERSION`, not as
372        // `ERR_PAYLOAD_SIZE_MISMATCH`. Validation runs in deliver_frame, in
373        // the order required by §6.2.
374        let frame = match GbpFrame::decode(wire) {
375            Ok(f) => f,
376            Err(e) => {
377                self.emit_err_spec(codes::STREAM_POLICY_VIOLATION, format!("frame decode: {e}"));
378                return Ok(self.drain_events());
379            }
380        };
381        self.deliver_frame(seal, frame)?;
382        Ok(self.drain_events())
383    }
384
385    fn deliver_frame<S: Sealer>(&mut self, seal: &mut S, frame: GbpFrame) -> Result<(), NodeError> {
386        // §6.2 order: version → group_id → epoch → payload_size →
387        // transition_id (when CRITICAL) → replay.
388        if frame.version != 1 {
389            self.emit_err_spec(codes::UNSUPPORTED_VERSION, "version != 1");
390            return Ok(());
391        }
392        if frame.group_id_array() != self.group_id {
393            self.emit_err_spec(codes::UNKNOWN_GROUP, "group_id");
394            return Ok(());
395        }
396        if frame.epoch != self.current_epoch {
397            self.emit_err_spec(
398                codes::EPOCH_MISMATCH,
399                format!("got {}, expected {}", frame.epoch, self.current_epoch),
400            );
401            self.trigger_resync();
402            return Ok(());
403        }
404        if let Err(e) = frame.validate_payload_size() {
405            self.emit_err_spec(codes::STREAM_POLICY_VIOLATION, format!("payload size: {e}"));
406            return Ok(());
407        }
408        let flags = GbpFlags::from_bits(frame.flags);
409        let st = match frame.stream_type_typed() {
410            Ok(st) => st,
411            Err(_) => {
412                self.emit_err_spec(codes::STREAM_POLICY_VIOLATION, "unknown stream_type");
413                return Ok(());
414            }
415        };
416
417        // §6.2 transition_id ordering: CRITICAL frames on application streams
418        // MUST equal `last_transition_id`. Control-stream frames are exempt
419        // from this check and validated per-opcode inside `handle_control`,
420        // because PREPARE_TRANSITION legitimately carries `last + 1` and
421        // EXECUTE / ACK carry `pending_transition_id`.
422        if st != StreamType::Control
423            && flags.has(GbpFlags::CRITICAL)
424            && frame.transition_id != self.last_transition_id
425        {
426            self.emit_err_spec(
427                codes::TRANSITION_MISMATCH,
428                format!(
429                    "got tid={}, expected {}",
430                    frame.transition_id, self.last_transition_id
431                ),
432            );
433            return Ok(());
434        }
435
436        let key = (st, frame.stream_id);
437        let hw = self.in_hw.get(&key).copied().unwrap_or(0);
438        if frame.sequence_no <= hw {
439            self.emit_err_spec(
440                codes::REPLAY_DETECTED,
441                format!(
442                    "st={} sid={} seq={} hw={}",
443                    st, frame.stream_id, frame.sequence_no, hw
444                ),
445            );
446            return Ok(());
447        }
448        self.in_hw.insert(key, frame.sequence_no);
449
450        let plain = match seal.open(st, frame.sequence_no, &frame.encrypted_payload) {
451            Ok(p) => p,
452            Err(e) => {
453                // Non-fatal: a frame addressed under a different MLS epoch
454                // (e.g. PREPARE_TRANSITION reaching a fresh joiner that has
455                // already accepted the post-commit Welcome) cannot be
456                // decrypted, but that's expected and the node MUST keep
457                // running to receive the subsequent EXECUTE frame on the
458                // shared post-merge epoch.
459                self.emit_err_named(
460                    codes::DECRYPT_FAILED,
461                    ErrorClass::Crypto,
462                    true,  // retryable: caller may resync via digest
463                    false, // non-fatal
464                    format!("aead open: {e}"),
465                );
466                return Ok(());
467            }
468        };
469
470        match st {
471            StreamType::Control => self.handle_control(plain),
472            other => self.events.push(Event::PayloadReceived(DeliveredPayload {
473                stream_type: other,
474                stream_id: frame.stream_id,
475                sequence_no: frame.sequence_no,
476                flags: frame.flags,
477                plaintext: plain,
478                codec: frame.payload_codec(),
479            })),
480        }
481        Ok(())
482    }
483
484    fn handle_control(&mut self, plain: Vec<u8>) {
485        let c = match ControlMessage::from_cbor(&plain) {
486            Ok(c) => c,
487            Err(_) => {
488                self.emit_err_spec(codes::STREAM_POLICY_VIOLATION, "control decode");
489                return;
490            }
491        };
492        let opcode = match ControlOpcode::try_from(c.opcode) {
493            Ok(op) => op,
494            Err(_) => {
495                self.emit_err_spec(codes::STREAM_POLICY_VIOLATION, "unknown opcode");
496                return;
497            }
498        };
499        // Per-opcode TransitionID validation (§5 of gbp-control-plane).
500        let tid_ok = match opcode {
501            // PREPARE introduces last+1; receiver simply records it as pending.
502            // Re-issuing a PREPARE for an already-pending tid is allowed; a
503            // smaller-or-equal tid that is not strictly newer is rejected.
504            ControlOpcode::PrepareTransition => {
505                c.transition_id > self.last_transition_id
506                    && (self.pending_transition_id == 0
507                        || self.pending_transition_id == c.transition_id)
508            }
509            // READY / EXECUTE / ABORT must reference the pending tid.
510            ControlOpcode::ReadyForTransition
511            | ControlOpcode::ExecuteTransition
512            | ControlOpcode::AbortTransition => {
513                self.pending_transition_id != 0 && c.transition_id == self.pending_transition_id
514            }
515            // Digest / capability / ack / nack: tid is informational, no
516            // ordering constraint at the GBP layer.
517            _ => true,
518        };
519        if !tid_ok {
520            self.emit_err_spec(
521                codes::TRANSITION_MISMATCH,
522                format!(
523                    "control tid={} not valid for {:?} (last={}, pending={})",
524                    c.transition_id, opcode, self.last_transition_id, self.pending_transition_id
525                ),
526            );
527            return;
528        }
529        match opcode {
530            ControlOpcode::PrepareTransition => {
531                // Tie-break (gbp_rfc §8): if a PREPARE for the same tid already
532                // exists from another sender, keep whichever has the lower
533                // MemberId — that member's commit is the canonical winner.
534                if self.pending_transition_id == c.transition_id {
535                    let current_winner = self.pending_commit_sender.unwrap_or(MemberId::MAX);
536                    if c.sender_id >= current_winner {
537                        // Existing winner holds; discard this competing commit
538                        // but still surface the Control event so upper layers
539                        // can observe it.
540                        self.events.push(Event::Control {
541                            from: c.sender_id,
542                            opcode,
543                            transition_id: c.transition_id,
544                            request_id: c.request_id,
545                            args: c.args.to_vec(),
546                        });
547                        return;
548                    }
549                    // New sender wins — replace.
550                }
551                self.pending_transition_id = c.transition_id;
552                self.pending_commit_sender = Some(c.sender_id);
553                self.transition_state = TransitionState::TPrepared;
554                // PREPARE originates from the coordinator — record activity.
555                self.note_coordinator_activity();
556                // Arm execute deadline: member must see EXECUTE within T_execute_max.
557                self.execute_deadline =
558                    Some(Instant::now() + Duration::from_millis(timeouts::T_EXECUTE_MAX_MS));
559            }
560            ControlOpcode::ReadyForTransition => {
561                self.transition_state = TransitionState::TReady;
562                // Coordinator received a READY — clear the per-member wait.
563                self.prepare_deadline = None;
564            }
565            ControlOpcode::ExecuteTransition => {
566                self.execute_deadline = None;
567                self.pending_commit_sender = None;
568                self.apply_transition(c.transition_id);
569                self.note_coordinator_activity();
570            }
571            ControlOpcode::AbortTransition => {
572                self.prepare_deadline = None;
573                self.execute_deadline = None;
574                self.pending_commit_sender = None;
575                self.transition_state = TransitionState::TAborted;
576                self.pending_transition_id = 0;
577            }
578            ControlOpcode::GroupStateDigestResponse => {
579                if self.state == NodeState::Resyncing {
580                    self.transition(NodeState::Active);
581                }
582            }
583            ControlOpcode::CapabilitiesAdvertise => {
584                if Self::is_coordinator_claim(&c.args) {
585                    // Coordinator is alive — reset silence timer.
586                    self.note_coordinator_activity();
587                    // Collision resolution (gbp-control-plane §5.1): if we
588                    // also claimed and the remote claimant has a lower
589                    // MemberId, yield the coordinator role to them.
590                    if self.is_coordinator && c.sender_id < self.member_id {
591                        self.is_coordinator = false;
592                    }
593                    self.events.push(Event::CoordinatorClaim {
594                        claimant: c.sender_id,
595                    });
596                }
597            }
598            _ => {}
599        }
600        self.events.push(Event::Control {
601            from: c.sender_id,
602            opcode,
603            transition_id: c.transition_id,
604            request_id: c.request_id,
605            args: c.args.to_vec(),
606        });
607    }
608
609    /// Applies a new epoch (called by the coordinator after
610    /// `EXECUTE_TRANSITION`).
611    pub fn apply_transition(&mut self, tid: TransitionId) {
612        self.current_epoch += 1;
613        self.last_transition_id = tid;
614        self.pending_transition_id = 0;
615        self.pending_commit_sender = None;
616        self.transition_state = TransitionState::TExecuted;
617        self.out_seq.clear();
618        self.in_hw.clear();
619        self.events.push(Event::EpochAdvanced {
620            epoch: self.current_epoch,
621            transition_id: tid,
622        });
623    }
624
625    /// Forces the node into the `RESYNCING` state.
626    pub fn trigger_resync(&mut self) {
627        if self.state != NodeState::Resyncing {
628            self.transition(NodeState::Resyncing);
629        }
630    }
631
632    /// Checks FSM deadlines and emits timeout events if any have expired.
633    ///
634    /// Call this regularly from the application event loop (e.g. every 500 ms).
635    /// Returns the same events that would come from [`GroupNode::drain_events`];
636    /// the caller may also drain events separately — this method does not
637    /// duplicate them.
638    pub fn check_timeouts(&mut self) -> Vec<Event> {
639        let now = Instant::now();
640
641        if self.prepare_deadline.is_some_and(|d| now >= d) {
642            self.prepare_deadline = None;
643            self.execute_deadline = None;
644            self.pending_transition_id = 0;
645            self.transition_state = TransitionState::TAborted;
646            self.emit_err_spec(codes::PREPARE_TIMEOUT, "T_prepare_max exceeded");
647        }
648
649        if self.execute_deadline.is_some_and(|d| now >= d) {
650            self.execute_deadline = None;
651            self.emit_err_spec(codes::EXECUTE_TIMEOUT, "T_execute_max exceeded");
652        }
653
654        if self.coordinator_last_seen.is_some_and(|t| {
655            now.duration_since(t).as_millis() as u64 >= timeouts::T_COORDINATOR_GRACE_MS
656        }) {
657            self.coordinator_last_seen = None;
658            self.is_coordinator = false;
659            self.emit_err_spec(
660                codes::COORDINATOR_GONE,
661                "coordinator silence exceeded T_coordinator_grace",
662            );
663            self.events.push(Event::CoordinatorElectionNeeded);
664        }
665
666        self.drain_events()
667    }
668
669    /// Records that the coordinator was active right now.
670    ///
671    /// Call this whenever the node receives a frame from the current
672    /// coordinator (e.g. `PREPARE_TRANSITION`, `EXECUTE_TRANSITION`,
673    /// `CAPABILITIES_ADVERTISE` with `coordinator_claim`). Resets the
674    /// coordinator-silence timer used to detect `ERR_COORDINATOR_GONE`.
675    pub fn note_coordinator_activity(&mut self) {
676        self.coordinator_last_seen = Some(Instant::now());
677    }
678
679    /// Claims the coordinator role by broadcasting `CAPABILITIES_ADVERTISE`
680    /// with `coordinator_claim=true` (gbp-control-plane §5.1).
681    ///
682    /// Call this when [`Event::CoordinatorElectionNeeded`] fires **and** this
683    /// node has the lowest `MemberId` among currently active members. The
684    /// caller is responsible for delivering the returned frame to every group
685    /// member.
686    ///
687    /// The args payload is the minimal CBOR map `{0: true}` encoding a
688    /// coordinator claim flag.
689    pub fn claim_coordinator<S: Sealer>(
690        &mut self,
691        seal: &mut S,
692        target: MemberId,
693    ) -> Result<OutboundFrame, NodeError> {
694        // CBOR: {0: true}  →  A1 00 F5
695        let args = vec![0xA1u8, 0x00, 0xF5];
696        self.is_coordinator = true;
697        self.coordinator_last_seen = Some(Instant::now());
698        self.events.push(Event::BecameCoordinator);
699        self.send_control(
700            seal,
701            target,
702            ControlOpcode::CapabilitiesAdvertise,
703            self.last_transition_id,
704            0,
705            args,
706        )
707    }
708
709    /// Returns `true` if the raw args bytes of a `CAPABILITIES_ADVERTISE`
710    /// frame encode a coordinator claim (`{0: true}` in CBOR).
711    fn is_coordinator_claim(args: &[u8]) -> bool {
712        // Minimal CBOR map {0: true}: A1 00 F5
713        // We also accept larger maps where key 0 maps to true.
714        // Fast path: exact match on the minimal encoding.
715        if args == [0xA1, 0x00, 0xF5] {
716            return true;
717        }
718        // General path: scan for the sequence 00 F5 (uint(0) → true) inside a
719        // CBOR map. This is intentionally simple; a full CBOR parser lives in
720        // the gbp-protocol crate and is not a dependency here.
721        args.windows(2).any(|w| w == [0x00, 0xF5])
722    }
723
724    fn transition(&mut self, next: NodeState) {
725        if self.state == next {
726            return;
727        }
728        if !self.state.can_transition_to(next) {
729            let from = self.state;
730            self.state = NodeState::Failed;
731            self.events.push(Event::StateChanged {
732                from,
733                to: NodeState::Failed,
734            });
735            return;
736        }
737        let from = self.state;
738        self.state = next;
739        self.events.push(Event::StateChanged { from, to: next });
740    }
741
742    fn assert_can_send(&self) -> Result<(), NodeError> {
743        if matches!(
744            self.state,
745            NodeState::Active | NodeState::Resyncing | NodeState::EstablishingGroup
746        ) {
747            Ok(())
748        } else {
749            Err(NodeError::InvalidState(format!(
750                "cannot send in state {}",
751                self.state
752            )))
753        }
754    }
755
756    fn next_seq(&mut self, st: StreamType, sid: StreamId) -> SequenceNo {
757        let entry = self.out_seq.entry((st, sid)).or_insert(0);
758        *entry += 1;
759        *entry
760    }
761
762    fn emit_err_spec(&mut self, code: u16, reason: impl Into<String>) {
763        if let Some(spec) = ErrorSpec::lookup(code) {
764            self.emit_err_named(spec.code, spec.class, spec.retryable, spec.fatal, reason);
765        } else {
766            self.emit_err_named(code, ErrorClass::Policy, false, false, reason);
767        }
768    }
769
770    fn emit_err_named(
771        &mut self,
772        code: u16,
773        class: ErrorClass,
774        retryable: bool,
775        fatal: bool,
776        reason: impl Into<String>,
777    ) {
778        let reason = reason.into();
779        // ErrorSpec is authoritative for known codes — use its class/retryable/fatal
780        // so that the wire error always matches the registry.
781        let (class, retryable, fatal) = if let Some(spec) = ErrorSpec::lookup(code) {
782            (spec.class, spec.retryable, spec.fatal)
783        } else {
784            (class, retryable, fatal)
785        };
786        let _ = ErrorObject::new(code, class, retryable, fatal, reason.clone()).to_cbor();
787        self.events.push(Event::Error {
788            code,
789            class,
790            retryable,
791            fatal,
792            reason,
793        });
794        if fatal {
795            let from = self.state;
796            self.state = NodeState::Failed;
797            self.events.push(Event::StateChanged {
798                from,
799                to: NodeState::Failed,
800            });
801        }
802    }
803}
804
805/// Trait abstracting the AEAD seal / open operations.
806///
807/// Implemented for [`gbp_mls::MlsContext`] in this crate; tests typically
808/// implement a no-op identity sealer to exercise the FSM without standing
809/// up an MLS group.
810pub trait Sealer {
811    /// Encrypts `pt` for the given stream and per-stream sequence number.
812    fn seal(&mut self, st: StreamType, seq: SequenceNo, pt: &[u8]) -> Result<Vec<u8>, MlsError>;
813    /// Decrypts `ct` for the given stream and per-stream sequence number.
814    fn open(&mut self, st: StreamType, seq: SequenceNo, ct: &[u8]) -> Result<Vec<u8>, MlsError>;
815}
816
817impl Sealer for gbp_mls::MlsContext {
818    fn seal(&mut self, st: StreamType, seq: SequenceNo, pt: &[u8]) -> Result<Vec<u8>, MlsError> {
819        gbp_mls::MlsContext::seal(self, label_for(st), seq, pt)
820    }
821    fn open(&mut self, st: StreamType, seq: SequenceNo, ct: &[u8]) -> Result<Vec<u8>, MlsError> {
822        gbp_mls::MlsContext::open(self, label_for(st), seq, ct)
823    }
824}
825
826#[cfg(test)]
827mod tests {
828    use super::*;
829
830    struct PlainSealer;
831    impl Sealer for PlainSealer {
832        fn seal(
833            &mut self,
834            _st: StreamType,
835            _seq: SequenceNo,
836            pt: &[u8],
837        ) -> Result<Vec<u8>, MlsError> {
838            Ok(pt.to_vec())
839        }
840        fn open(
841            &mut self,
842            _st: StreamType,
843            _seq: SequenceNo,
844            ct: &[u8],
845        ) -> Result<Vec<u8>, MlsError> {
846            Ok(ct.to_vec())
847        }
848    }
849
850    fn group_id() -> GroupId {
851        let mut g = [0u8; 16];
852        g[..3].copy_from_slice(b"GBP");
853        g
854    }
855
856    #[test]
857    fn replay_window_rejects_repeat() {
858        let mut alice = GroupNode::new(1, group_id());
859        let mut bob = GroupNode::new(2, group_id());
860        alice.bootstrap_as_creator(1);
861        bob.bootstrap_as_joiner(1, 0);
862        let mut s = PlainSealer;
863        let sid = alice.member_stream_id(2);
864        let f = alice
865            .send_payload(
866                &mut s,
867                2,
868                StreamType::Text,
869                sid,
870                GbpFlags::ordered_reliable_ack(),
871                b"hi",
872                PayloadCodec::Cbor,
873            )
874            .unwrap();
875        let _ = bob.on_wire(&mut s, &f.wire).unwrap();
876        let evs = bob.on_wire(&mut s, &f.wire).unwrap();
877        assert!(evs.iter().any(|e| matches!(
878            e,
879            Event::Error {
880                code: codes::REPLAY_DETECTED,
881                ..
882            }
883        )));
884    }
885
886    #[test]
887    fn epoch_mismatch_triggers_resync() {
888        let mut alice = GroupNode::new(1, group_id());
889        let mut bob = GroupNode::new(2, group_id());
890        alice.bootstrap_as_creator(1);
891        bob.bootstrap_as_joiner(1, 0);
892        alice.current_epoch = 2;
893        let mut s = PlainSealer;
894        let sid = alice.member_stream_id(2);
895        let f = alice
896            .send_payload(
897                &mut s,
898                2,
899                StreamType::Text,
900                sid,
901                GbpFlags::ordered_reliable_ack(),
902                b"x",
903                PayloadCodec::Cbor,
904            )
905            .unwrap();
906        let _ = bob.on_wire(&mut s, &f.wire).unwrap();
907        assert_eq!(bob.state, NodeState::Resyncing);
908    }
909
910    #[test]
911    fn payload_emits_received_event() {
912        let mut alice = GroupNode::new(1, group_id());
913        let mut bob = GroupNode::new(2, group_id());
914        alice.bootstrap_as_creator(1);
915        bob.bootstrap_as_joiner(1, 0);
916        let mut s = PlainSealer;
917        let sid = alice.member_stream_id(2);
918        let f = alice
919            .send_payload(
920                &mut s,
921                2,
922                StreamType::Text,
923                sid,
924                GbpFlags::ordered_reliable_ack(),
925                b"payload",
926                PayloadCodec::Cbor,
927            )
928            .unwrap();
929        let evs = bob.on_wire(&mut s, &f.wire).unwrap();
930        let pr = evs
931            .into_iter()
932            .find_map(|e| match e {
933                Event::PayloadReceived(p) => Some(p),
934                _ => None,
935            })
936            .expect("payload");
937        assert_eq!(pr.stream_type, StreamType::Text);
938        assert_eq!(pr.plaintext, b"payload");
939    }
940
941    // ---- Control-plane handshake -----------------------------------------
942
943    fn drain_errs(events: &[Event]) -> Vec<u16> {
944        events
945            .iter()
946            .filter_map(|e| match e {
947                Event::Error { code, .. } => Some(*code),
948                _ => None,
949            })
950            .collect()
951    }
952
953    fn drain_controls(events: &[Event]) -> Vec<(ControlOpcode, TransitionId)> {
954        events
955            .iter()
956            .filter_map(|e| match e {
957                Event::Control {
958                    opcode,
959                    transition_id,
960                    ..
961                } => Some((*opcode, *transition_id)),
962                _ => None,
963            })
964            .collect()
965    }
966
967    #[test]
968    fn prepare_transition_sets_pending_on_sender_and_receiver() {
969        let mut coord = GroupNode::new(1, group_id());
970        let mut peer = GroupNode::new(2, group_id());
971        coord.bootstrap_as_creator(0);
972        peer.bootstrap_as_joiner(0, 0);
973        let mut s = PlainSealer;
974        // Coordinator sends PREPARE for tid=1
975        let f = coord
976            .send_control(
977                &mut s,
978                0,
979                ControlOpcode::PrepareTransition,
980                1,
981                100,
982                b"commit-blob".to_vec(),
983            )
984            .unwrap();
985        assert_eq!(coord.pending_transition_id, 1, "sender mirrors pending");
986        assert_eq!(coord.transition_state, TransitionState::TPrepared);
987        let evs = peer.on_wire(&mut s, &f.wire).unwrap();
988        assert_eq!(peer.pending_transition_id, 1, "receiver records pending");
989        assert!(
990            drain_errs(&evs).is_empty(),
991            "no error: {:?}",
992            drain_errs(&evs)
993        );
994        let ctls = drain_controls(&evs);
995        assert_eq!(ctls, vec![(ControlOpcode::PrepareTransition, 1)]);
996    }
997
998    #[test]
999    fn ready_with_wrong_tid_is_rejected() {
1000        let mut coord = GroupNode::new(1, group_id());
1001        let mut peer = GroupNode::new(2, group_id());
1002        coord.bootstrap_as_creator(0);
1003        peer.bootstrap_as_joiner(0, 0);
1004        let mut s = PlainSealer;
1005        let f = coord
1006            .send_control(&mut s, 0, ControlOpcode::PrepareTransition, 1, 1, vec![])
1007            .unwrap();
1008        peer.on_wire(&mut s, &f.wire).unwrap();
1009        // Peer fakes a READY for the wrong tid
1010        let bogus = peer
1011            .send_control(&mut s, 1, ControlOpcode::ReadyForTransition, 7, 1, vec![])
1012            .unwrap();
1013        let evs = coord.on_wire(&mut s, &bogus.wire).unwrap();
1014        let errs = drain_errs(&evs);
1015        assert!(errs.contains(&codes::TRANSITION_MISMATCH), "got {:?}", errs);
1016    }
1017
1018    #[test]
1019    fn execute_advances_epoch_and_clears_pending() {
1020        let mut coord = GroupNode::new(1, group_id());
1021        let mut peer = GroupNode::new(2, group_id());
1022        coord.bootstrap_as_creator(0);
1023        peer.bootstrap_as_joiner(0, 0);
1024        let mut s = PlainSealer;
1025        let prep = coord
1026            .send_control(&mut s, 0, ControlOpcode::PrepareTransition, 1, 1, vec![])
1027            .unwrap();
1028        peer.on_wire(&mut s, &prep.wire).unwrap();
1029        // Coordinator broadcasts EXECUTE; both sides apply (coord locally, peer via on_wire)
1030        let exec = coord
1031            .send_control(&mut s, 0, ControlOpcode::ExecuteTransition, 1, 2, vec![])
1032            .unwrap();
1033        coord.apply_transition(1);
1034        let evs = peer.on_wire(&mut s, &exec.wire).unwrap();
1035        assert_eq!(coord.last_transition_id, 1);
1036        assert_eq!(coord.current_epoch, 1);
1037        assert_eq!(peer.last_transition_id, 1);
1038        assert_eq!(peer.current_epoch, 1);
1039        assert_eq!(peer.pending_transition_id, 0);
1040        assert!(evs.iter().any(|e| matches!(
1041            e,
1042            Event::EpochAdvanced {
1043                transition_id: 1,
1044                ..
1045            }
1046        )));
1047    }
1048
1049    #[test]
1050    fn abort_clears_pending_no_advance() {
1051        let mut coord = GroupNode::new(1, group_id());
1052        let mut peer = GroupNode::new(2, group_id());
1053        coord.bootstrap_as_creator(0);
1054        peer.bootstrap_as_joiner(0, 0);
1055        let mut s = PlainSealer;
1056        let prep = coord
1057            .send_control(&mut s, 0, ControlOpcode::PrepareTransition, 1, 1, vec![])
1058            .unwrap();
1059        peer.on_wire(&mut s, &prep.wire).unwrap();
1060        let abort = coord
1061            .send_control(&mut s, 0, ControlOpcode::AbortTransition, 1, 2, vec![])
1062            .unwrap();
1063        peer.on_wire(&mut s, &abort.wire).unwrap();
1064        assert_eq!(peer.pending_transition_id, 0);
1065        assert_eq!(peer.current_epoch, 0);
1066        assert_eq!(peer.transition_state, TransitionState::TAborted);
1067        assert_eq!(coord.transition_state, TransitionState::TAborted);
1068    }
1069
1070    #[test]
1071    fn bootstrap_as_joiner_with_expected_tid_accepts_first_execute() {
1072        let mut coord = GroupNode::new(1, group_id());
1073        // Joiner pre-arms expected_first_tid=1 — typical post-Welcome state.
1074        let mut joiner = GroupNode::new(2, group_id());
1075        coord.bootstrap_as_creator(0);
1076        joiner.bootstrap_as_joiner(0, 1);
1077        assert_eq!(joiner.pending_transition_id, 1);
1078        let mut s = PlainSealer;
1079        // Coordinator must mirror its pending too — simulate by sending PREPARE
1080        let _ = coord
1081            .send_control(&mut s, 0, ControlOpcode::PrepareTransition, 1, 1, vec![])
1082            .unwrap();
1083        // EXECUTE should be accepted by the joiner without ever seeing PREPARE
1084        let exec = coord
1085            .send_control(&mut s, 0, ControlOpcode::ExecuteTransition, 1, 2, vec![])
1086            .unwrap();
1087        let evs = joiner.on_wire(&mut s, &exec.wire).unwrap();
1088        let errs = drain_errs(&evs);
1089        assert!(
1090            errs.is_empty(),
1091            "expected clean apply, got errors {:?}",
1092            errs
1093        );
1094        assert_eq!(joiner.last_transition_id, 1);
1095        assert_eq!(joiner.current_epoch, 1);
1096    }
1097
1098    // ---- Coordinator handover (gbp-control-plane §5.1) ---------------------
1099
1100    #[test]
1101    fn claim_coordinator_sets_flag_and_emits_event() {
1102        let mut node = GroupNode::new(1, group_id());
1103        node.bootstrap_as_creator(0);
1104        node.drain_events();
1105        let mut s = PlainSealer;
1106        let _ = node.claim_coordinator(&mut s, 0).unwrap();
1107        assert!(node.is_coordinator);
1108        let evs = node.drain_events();
1109        assert!(evs.iter().any(|e| matches!(e, Event::BecameCoordinator)));
1110    }
1111
1112    #[test]
1113    fn coordinator_gone_emits_election_needed() {
1114        let mut member = GroupNode::new(2, group_id());
1115        member.bootstrap_as_joiner(0, 0);
1116        member.coordinator_last_seen = Some(Instant::now() - Duration::from_millis(11_000));
1117        let evs = member.check_timeouts();
1118        assert!(
1119            evs.iter()
1120                .any(|e| matches!(e, Event::CoordinatorElectionNeeded))
1121        );
1122        assert!(!member.is_coordinator, "flag cleared on silence");
1123    }
1124
1125    #[test]
1126    fn capabilities_advertise_with_claim_resets_silence_timer() {
1127        let mut member = GroupNode::new(2, group_id());
1128        let mut coord = GroupNode::new(1, group_id());
1129        member.bootstrap_as_joiner(0, 0);
1130        coord.bootstrap_as_creator(0);
1131        let mut s = PlainSealer;
1132        // coord sends claim
1133        let f = coord.claim_coordinator(&mut s, 2).unwrap();
1134        // on_wire already drains events — use the returned vec.
1135        let evs = member.on_wire(&mut s, &f.wire).unwrap();
1136        assert!(
1137            member.coordinator_last_seen.is_some(),
1138            "silence timer reset"
1139        );
1140        assert!(
1141            evs.iter()
1142                .any(|e| matches!(e, Event::CoordinatorClaim { claimant: 1 }))
1143        );
1144    }
1145
1146    #[test]
1147    fn higher_id_yields_to_lower_claimant() {
1148        // Node 5 claims first, then receives a claim from node 2 (lower) → yields.
1149        let mut node5 = GroupNode::new(5, group_id());
1150        let mut node2 = GroupNode::new(2, group_id());
1151        node5.bootstrap_as_joiner(0, 0);
1152        node2.bootstrap_as_creator(0);
1153        let mut s = PlainSealer;
1154        // node5 claims
1155        node5.is_coordinator = true;
1156        // node2 broadcasts claim
1157        let f = node2.claim_coordinator(&mut s, 5).unwrap();
1158        node5.on_wire(&mut s, &f.wire).unwrap();
1159        assert!(!node5.is_coordinator, "node5 yielded to node2");
1160    }
1161
1162    #[test]
1163    fn lower_id_keeps_coordinator_against_higher_claimant() {
1164        let mut node1 = GroupNode::new(1, group_id());
1165        let mut node5 = GroupNode::new(5, group_id());
1166        node1.bootstrap_as_creator(0);
1167        node5.bootstrap_as_joiner(0, 0);
1168        let mut s = PlainSealer;
1169        node1.is_coordinator = true;
1170        let f = node5.claim_coordinator(&mut s, 1).unwrap();
1171        node1.on_wire(&mut s, &f.wire).unwrap();
1172        assert!(node1.is_coordinator, "node1 keeps role — it has lower id");
1173    }
1174
1175    // ---- Tie-break (gbp_rfc §8) ---------------------------------------------
1176
1177    #[test]
1178    fn competing_prepare_lower_member_id_wins() {
1179        // Two coordinators issue PREPARE for the same tid.
1180        // Member 1 (lower) sends first — member 3 (higher) is the loser.
1181        let mut node = GroupNode::new(10, group_id());
1182        node.bootstrap_as_joiner(0, 0);
1183        let mut s = PlainSealer;
1184
1185        // Build a PREPARE from member 1 (lower id).
1186        let mut sender1 = GroupNode::new(1, group_id());
1187        sender1.bootstrap_as_creator(0);
1188        let f1 = sender1
1189            .send_control(
1190                &mut s,
1191                10,
1192                ControlOpcode::PrepareTransition,
1193                1,
1194                1,
1195                b"commit-A".to_vec(),
1196            )
1197            .unwrap();
1198        node.on_wire(&mut s, &f1.wire).unwrap();
1199        assert_eq!(
1200            node.pending_commit_sender,
1201            Some(1),
1202            "member 1 is initial winner"
1203        );
1204
1205        // Build a PREPARE from member 3 (higher id, same tid).
1206        let mut sender3 = GroupNode::new(3, group_id());
1207        sender3.bootstrap_as_creator(0);
1208        let f3 = sender3
1209            .send_control(
1210                &mut s,
1211                10,
1212                ControlOpcode::PrepareTransition,
1213                1,
1214                2,
1215                b"commit-B".to_vec(),
1216            )
1217            .unwrap();
1218        node.on_wire(&mut s, &f3.wire).unwrap();
1219        // Lower sender (1) keeps the win.
1220        assert_eq!(node.pending_commit_sender, Some(1), "member 1 still wins");
1221        assert_eq!(node.pending_transition_id, 1);
1222    }
1223
1224    #[test]
1225    fn competing_prepare_later_lower_id_displaces_winner() {
1226        // Member 5 arrives first, then member 2 (lower) — member 2 wins.
1227        let mut node = GroupNode::new(10, group_id());
1228        node.bootstrap_as_joiner(0, 0);
1229        let mut s = PlainSealer;
1230
1231        let mut sender5 = GroupNode::new(5, group_id());
1232        sender5.bootstrap_as_creator(0);
1233        let f5 = sender5
1234            .send_control(
1235                &mut s,
1236                10,
1237                ControlOpcode::PrepareTransition,
1238                1,
1239                1,
1240                b"commit-X".to_vec(),
1241            )
1242            .unwrap();
1243        node.on_wire(&mut s, &f5.wire).unwrap();
1244        assert_eq!(node.pending_commit_sender, Some(5));
1245
1246        let mut sender2 = GroupNode::new(2, group_id());
1247        sender2.bootstrap_as_creator(0);
1248        let f2 = sender2
1249            .send_control(
1250                &mut s,
1251                10,
1252                ControlOpcode::PrepareTransition,
1253                1,
1254                2,
1255                b"commit-Y".to_vec(),
1256            )
1257            .unwrap();
1258        node.on_wire(&mut s, &f2.wire).unwrap();
1259        assert_eq!(
1260            node.pending_commit_sender,
1261            Some(2),
1262            "member 2 displaces member 5"
1263        );
1264    }
1265
1266    #[test]
1267    fn apply_transition_clears_commit_sender() {
1268        let mut coord = GroupNode::new(1, group_id());
1269        coord.bootstrap_as_creator(0);
1270        let mut s = PlainSealer;
1271        coord
1272            .send_control(&mut s, 0, ControlOpcode::PrepareTransition, 1, 1, vec![])
1273            .unwrap();
1274        coord.apply_transition(1);
1275        assert_eq!(coord.pending_commit_sender, None);
1276    }
1277
1278    // ---- Timer engine -------------------------------------------------------
1279
1280    #[test]
1281    fn prepare_timeout_fires_when_deadline_exceeded() {
1282        let mut coord = GroupNode::new(1, group_id());
1283        coord.bootstrap_as_creator(0);
1284        let mut s = PlainSealer;
1285        coord
1286            .send_control(&mut s, 0, ControlOpcode::PrepareTransition, 1, 1, vec![])
1287            .unwrap();
1288        // Manually backdate the deadline so it appears expired.
1289        coord.prepare_deadline = Some(Instant::now() - Duration::from_millis(1));
1290        let evs = coord.check_timeouts();
1291        assert!(
1292            evs.iter().any(|e| matches!(
1293                e,
1294                Event::Error {
1295                    code: codes::PREPARE_TIMEOUT,
1296                    ..
1297                }
1298            )),
1299            "expected PREPARE_TIMEOUT, got {:?}",
1300            evs
1301        );
1302        assert_eq!(
1303            coord.transition_state,
1304            TransitionState::TAborted,
1305            "transition aborted"
1306        );
1307        assert_eq!(coord.prepare_deadline, None, "deadline cleared");
1308    }
1309
1310    #[test]
1311    fn execute_timeout_fires_when_deadline_exceeded() {
1312        let mut member = GroupNode::new(2, group_id());
1313        member.bootstrap_as_joiner(0, 0);
1314        let mut s = PlainSealer;
1315        // Simulate READY sent → execute_deadline armed.
1316        member.pending_transition_id = 1;
1317        member.transition_state = TransitionState::TPrepared;
1318        member
1319            .send_control(&mut s, 1, ControlOpcode::ReadyForTransition, 1, 1, vec![])
1320            .unwrap();
1321        // Backdate.
1322        member.execute_deadline = Some(Instant::now() - Duration::from_millis(1));
1323        let evs = member.check_timeouts();
1324        assert!(
1325            evs.iter().any(|e| matches!(
1326                e,
1327                Event::Error {
1328                    code: codes::EXECUTE_TIMEOUT,
1329                    ..
1330                }
1331            )),
1332            "expected EXECUTE_TIMEOUT, got {:?}",
1333            evs
1334        );
1335        assert_eq!(member.execute_deadline, None, "deadline cleared");
1336    }
1337
1338    #[test]
1339    fn coordinator_gone_fires_after_silence() {
1340        let mut member = GroupNode::new(2, group_id());
1341        member.bootstrap_as_joiner(0, 0);
1342        // Simulate coordinator was seen 11 seconds ago (> T_COORDINATOR_GRACE_MS = 10_000).
1343        member.coordinator_last_seen = Some(Instant::now() - Duration::from_millis(11_000));
1344        let evs = member.check_timeouts();
1345        assert!(
1346            evs.iter().any(|e| matches!(
1347                e,
1348                Event::Error {
1349                    code: codes::COORDINATOR_GONE,
1350                    ..
1351                }
1352            )),
1353            "expected COORDINATOR_GONE, got {:?}",
1354            evs
1355        );
1356        assert_eq!(member.coordinator_last_seen, None, "timer cleared");
1357    }
1358
1359    #[test]
1360    fn note_coordinator_activity_resets_silence_timer() {
1361        let mut member = GroupNode::new(2, group_id());
1362        member.bootstrap_as_joiner(0, 0);
1363        // Old timestamp — would fire.
1364        member.coordinator_last_seen = Some(Instant::now() - Duration::from_millis(11_000));
1365        // Reset.
1366        member.note_coordinator_activity();
1367        let evs = member.check_timeouts();
1368        assert!(
1369            !evs.iter().any(|e| matches!(
1370                e,
1371                Event::Error {
1372                    code: codes::COORDINATOR_GONE,
1373                    ..
1374                }
1375            )),
1376            "should NOT fire after reset"
1377        );
1378    }
1379
1380    #[test]
1381    fn execute_clears_prepare_deadline() {
1382        let mut coord = GroupNode::new(1, group_id());
1383        coord.bootstrap_as_creator(0);
1384        let mut s = PlainSealer;
1385        coord
1386            .send_control(&mut s, 0, ControlOpcode::PrepareTransition, 1, 1, vec![])
1387            .unwrap();
1388        assert!(coord.prepare_deadline.is_some(), "deadline armed");
1389        coord
1390            .send_control(&mut s, 0, ControlOpcode::ExecuteTransition, 1, 2, vec![])
1391            .unwrap();
1392        assert_eq!(coord.prepare_deadline, None, "deadline cleared on EXECUTE");
1393        assert_eq!(
1394            coord.execute_deadline, None,
1395            "execute_deadline also cleared"
1396        );
1397    }
1398
1399    #[test]
1400    fn receive_prepare_arms_execute_deadline() {
1401        let mut coord = GroupNode::new(1, group_id());
1402        let mut member = GroupNode::new(2, group_id());
1403        coord.bootstrap_as_creator(0);
1404        member.bootstrap_as_joiner(0, 0);
1405        let mut s = PlainSealer;
1406        let f = coord
1407            .send_control(&mut s, 0, ControlOpcode::PrepareTransition, 1, 1, vec![])
1408            .unwrap();
1409        member.on_wire(&mut s, &f.wire).unwrap();
1410        assert!(
1411            member.execute_deadline.is_some(),
1412            "execute_deadline armed on receiving PREPARE"
1413        );
1414    }
1415
1416    #[test]
1417    fn receive_execute_clears_execute_deadline() {
1418        let mut coord = GroupNode::new(1, group_id());
1419        let mut member = GroupNode::new(2, group_id());
1420        coord.bootstrap_as_creator(0);
1421        member.bootstrap_as_joiner(0, 0);
1422        let mut s = PlainSealer;
1423        let prep = coord
1424            .send_control(&mut s, 0, ControlOpcode::PrepareTransition, 1, 1, vec![])
1425            .unwrap();
1426        member.on_wire(&mut s, &prep.wire).unwrap();
1427        let exec = coord
1428            .send_control(&mut s, 0, ControlOpcode::ExecuteTransition, 1, 2, vec![])
1429            .unwrap();
1430        member.on_wire(&mut s, &exec.wire).unwrap();
1431        assert_eq!(member.execute_deadline, None, "cleared on EXECUTE");
1432    }
1433
1434    #[test]
1435    fn no_timeout_when_deadlines_not_set() {
1436        let mut node = GroupNode::new(1, group_id());
1437        node.bootstrap_as_creator(0);
1438        node.drain_events(); // clear bootstrap StateChanged events
1439        let evs = node.check_timeouts();
1440        assert!(evs.is_empty(), "no events without armed deadlines");
1441    }
1442
1443    #[test]
1444    fn prepare_with_already_applied_tid_is_rejected() {
1445        // After the coordinator has fully applied tid=1, a replay or
1446        // late-coordinator PREPARE with the same tid must fail validation.
1447        let mut coord = GroupNode::new(1, group_id());
1448        coord.bootstrap_as_creator(0);
1449        let mut s = PlainSealer;
1450        let _ = coord
1451            .send_control(&mut s, 0, ControlOpcode::PrepareTransition, 1, 1, vec![])
1452            .unwrap();
1453        coord.apply_transition(1);
1454        assert_eq!(coord.last_transition_id, 1);
1455        assert_eq!(coord.pending_transition_id, 0);
1456        // Forge a PREPARE with the same already-applied tid (epoch matches
1457        // because we synthesise it locally with a peer node on the same
1458        // post-apply epoch).
1459        let mut peer = GroupNode::new(2, group_id());
1460        peer.bootstrap_as_joiner(coord.current_epoch, 0);
1461        let stale = peer
1462            .send_control(&mut s, 1, ControlOpcode::PrepareTransition, 1, 9, vec![])
1463            .unwrap();
1464        let evs = coord.on_wire(&mut s, &stale.wire).unwrap();
1465        let errs = drain_errs(&evs);
1466        assert!(
1467            errs.contains(&codes::TRANSITION_MISMATCH),
1468            "expected TRANSITION_MISMATCH, got {:?}",
1469            errs
1470        );
1471    }
1472
1473    #[test]
1474    fn decrypt_failed_is_non_fatal() {
1475        // Simulate a frame our open() can't unlock: a sealer that fails on `open`.
1476        struct OpenFailSealer;
1477        impl Sealer for OpenFailSealer {
1478            fn seal(
1479                &mut self,
1480                _: StreamType,
1481                _: SequenceNo,
1482                p: &[u8],
1483            ) -> Result<Vec<u8>, MlsError> {
1484                Ok(p.to_vec())
1485            }
1486            fn open(
1487                &mut self,
1488                _: StreamType,
1489                _: SequenceNo,
1490                _: &[u8],
1491            ) -> Result<Vec<u8>, MlsError> {
1492                Err(MlsError::Aead("simulated".into()))
1493            }
1494        }
1495        let mut alice = GroupNode::new(1, group_id());
1496        let mut bob = GroupNode::new(2, group_id());
1497        alice.bootstrap_as_creator(1);
1498        bob.bootstrap_as_joiner(1, 0);
1499        let mut s = PlainSealer;
1500        let sid = alice.member_stream_id(2);
1501        let f = alice
1502            .send_payload(
1503                &mut s,
1504                2,
1505                StreamType::Text,
1506                sid,
1507                GbpFlags::ordered_reliable_ack(),
1508                b"x",
1509                PayloadCodec::Cbor,
1510            )
1511            .unwrap();
1512        let mut fail = OpenFailSealer;
1513        let evs = bob.on_wire(&mut fail, &f.wire).unwrap();
1514        let err = evs
1515            .iter()
1516            .find_map(|e| match e {
1517                Event::Error {
1518                    code,
1519                    fatal,
1520                    retryable,
1521                    ..
1522                } => Some((*code, *fatal, *retryable)),
1523                _ => None,
1524            })
1525            .expect("error event");
1526        assert_eq!(err.0, codes::DECRYPT_FAILED);
1527        assert!(!err.1, "must be non-fatal");
1528        assert!(err.2, "must be retryable");
1529        assert_eq!(bob.state, NodeState::Active, "bob stays Active");
1530    }
1531}