Skip to main content

conclavelib/
protocol.rs

1//! Wire frames shared between the bridge and the central server.
2//!
3//! Both peers serialize [`ProtocolMessage`] with a fixed `bincode` configuration behind a
4//! length-delimited framing (the [`ProtocolWrite`] / [`ProtocolRead`] stream extensions). Two
5//! properties are fixed here for forward-compat (DESIGN.md §13), so later additions are additive
6//! rather than breaking:
7//!
8//! - a **protocol-version field** carried in [`ProtocolMessage::Hello`] and checked with
9//!   [`negotiate_version`]; peers advertising an incompatible version are rejected,
10//! - an opaque **encrypted-payload envelope + key-id** ([`Payload::Encrypted`]), so end-to-end
11//!   encryption (DESIGN.md §19) can be layered in without a wire break.
12//!
13//! `ProtocolError` is the typed, wire-crossing error surfaced as a [`ProtocolMessage::Error`]
14//! frame; application glue elsewhere uses `anyhow` via the `Res` / `Void` aliases.
15
16use std::future::Future;
17
18use anyhow::Context as _;
19use serde::{Deserialize, Serialize};
20use tokio::io::{AsyncRead, AsyncReadExt as _, AsyncWrite, AsyncWriteExt as _};
21
22use crate::base::{Constant, Res, SessionPath, Visibility};
23
24/// An opaque end-to-end-encrypted payload envelope, reserved now for v2 (DESIGN.md §13, §19).
25///
26/// The server fans this out without reading it; `key_id` names the per-channel key the sender
27/// wrapped the content with. Unused in v1 — reserving it keeps E2E an additive change.
28#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
29pub struct Envelope {
30    /// Ciphertext the server relays but cannot read.
31    pub ciphertext: Vec<u8>,
32    /// Identifier of the per-channel key this ciphertext was wrapped with.
33    pub key_id: Option<String>,
34}
35
36/// A message body: plaintext in v1, or the reserved E2E [`Envelope`] in v2.
37#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
38pub enum Payload {
39    /// A plaintext UTF-8 body (v1).
40    Plain(String),
41    /// An opaque end-to-end-encrypted body (v2; the envelope is reserved now).
42    Encrypted(Envelope),
43}
44
45/// A channel as surfaced by discovery ([`ProtocolMessage::ChannelList`], DESIGN.md §6).
46///
47/// Only channels the caller is allowed to see are ever listed, so no private name leaks to a
48/// non-member; `member` marks the ones the caller already belongs to.
49#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
50pub struct ChannelInfo {
51    /// The channel name.
52    pub name: String,
53    /// The visibility tier.
54    pub visibility: Visibility,
55    /// Whether the requesting user is already a member.
56    pub member: bool,
57}
58
59/// An enrolled machine as surfaced by [`ProtocolMessage::MachineList`] (`machine list`, §5.1).
60#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
61pub struct MachineInfo {
62    /// The machine name (unique within the user).
63    pub name: String,
64    /// The machine's public key, base64-encoded.
65    pub pubkey: String,
66    /// RFC 3339 enrollment timestamp.
67    pub added_at: String,
68}
69
70/// An outstanding invite as surfaced by the channel-admin audit ([`ProtocolMessage::InviteList`]).
71#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
72pub struct InviteInfo {
73    /// The opaque token string.
74    pub token: String,
75    /// Remaining redemptions, or unlimited if absent.
76    pub uses_remaining: Option<i64>,
77    /// RFC 3339 expiry, or non-expiring if absent.
78    pub expires_at: Option<String>,
79}
80
81/// An admin / moderation operation (DESIGN.md §7), authorized server-side by user role.
82#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
83pub enum AdminOp {
84    /// Create a channel with a visibility tier.
85    CreateChannel {
86        /// Channel name.
87        name: String,
88        /// Visibility tier.
89        visibility: Visibility,
90    },
91    /// Delete a channel.
92    DeleteChannel {
93        /// Channel name.
94        name: String,
95    },
96    /// Rename a channel.
97    RenameChannel {
98        /// Current name.
99        name: String,
100        /// New name.
101        new_name: String,
102    },
103    /// Change a channel's visibility tier.
104    SetVisibility {
105        /// Channel name.
106        name: String,
107        /// New visibility tier.
108        visibility: Visibility,
109    },
110    /// Add a user to a channel's access-control list.
111    AclAdd {
112        /// Channel name.
113        channel: String,
114        /// Username to add.
115        user: String,
116    },
117    /// Remove a user from a channel's access-control list.
118    AclRemove {
119        /// Channel name.
120        channel: String,
121        /// Username to remove.
122        user: String,
123    },
124    /// Create an invite token for a channel.
125    InviteCreate {
126        /// Channel name.
127        channel: String,
128        /// Maximum redemptions, or unlimited if absent.
129        uses: Option<u32>,
130        /// Lifetime in seconds, or non-expiring if absent.
131        expires_in_secs: Option<u64>,
132    },
133    /// Revoke an invite token.
134    InviteRevoke {
135        /// The token to revoke.
136        token: String,
137    },
138    /// Kick a live session or user from a channel.
139    Kick {
140        /// Channel name.
141        channel: String,
142        /// Session path or username to kick.
143        target: String,
144    },
145    /// Ban a user from a channel.
146    Ban {
147        /// Channel name.
148        channel: String,
149        /// Username to ban.
150        user: String,
151    },
152    /// Remove a user from the server (server-admin).
153    UserRemove {
154        /// Username to remove.
155        username: String,
156    },
157    /// Revoke an enrolled machine (server-admin / self), force-dropping its live sessions.
158    MachineRemove {
159        /// Machine name to revoke.
160        name: String,
161    },
162    /// Enroll a new machine key under the authenticated user (self-service, DESIGN.md §5.1).
163    ///
164    /// Appended after `MachineRemove` so existing variant indices are unchanged (forward-compat).
165    MachineAdd {
166        /// Unique-within-user name for the new machine.
167        name: String,
168        /// The new machine's Ed25519 public key (proves possession on its own first connect).
169        pubkey: Vec<u8>,
170    },
171    /// List a channel's ACL members (channel-admin; answered with a `UserList`).
172    AclList {
173        /// Channel name.
174        channel: String,
175    },
176    /// Lift a channel ban (channel-admin; does not grant ACL membership).
177    Unban {
178        /// Channel name.
179        channel: String,
180        /// Username to unban.
181        user: String,
182    },
183    /// List a channel's banned users (channel-admin; answered with a `UserList`).
184    BanList {
185        /// Channel name.
186        channel: String,
187    },
188    /// List a channel's outstanding invites (channel-admin; answered with an `InviteList`).
189    InviteList {
190        /// Channel name.
191        channel: String,
192    },
193}
194
195impl AdminOp {
196    /// The operation's name for telemetry (PRD-0014): never the contents — invite tokens and
197    /// public keys must not reach logs.
198    #[must_use]
199    pub fn name(&self) -> &'static str {
200        match self {
201            Self::CreateChannel { .. } => "create_channel",
202            Self::DeleteChannel { .. } => "delete_channel",
203            Self::RenameChannel { .. } => "rename_channel",
204            Self::SetVisibility { .. } => "set_visibility",
205            Self::AclAdd { .. } => "acl_add",
206            Self::AclRemove { .. } => "acl_remove",
207            Self::InviteCreate { .. } => "invite_create",
208            Self::InviteRevoke { .. } => "invite_revoke",
209            Self::Kick { .. } => "kick",
210            Self::Ban { .. } => "ban",
211            Self::UserRemove { .. } => "user_remove",
212            Self::MachineRemove { .. } => "machine_remove",
213            Self::MachineAdd { .. } => "machine_add",
214            Self::AclList { .. } => "acl_list",
215            Self::Unban { .. } => "unban",
216            Self::BanList { .. } => "ban_list",
217            Self::InviteList { .. } => "invite_list",
218        }
219    }
220}
221
222/// The versioned frame exchanged between a bridge and a central server.
223///
224/// Variants are append-only across protocol versions: later milestones may add variants but must
225/// not renumber or repurpose existing ones without a version bump (see [`negotiate_version`]).
226#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
227pub enum ProtocolMessage {
228    /// Client → server on connect: advertise the protocol version and the session handle.
229    Hello {
230        /// The client's protocol version.
231        protocol_version: u32,
232        /// The session handle (`--as`, defaulting to the repo/dir name).
233        session: String,
234    },
235    /// Server → client: a random nonce for the client to sign (challenge-response).
236    Challenge {
237        /// The random nonce.
238        nonce: Vec<u8>,
239    },
240    /// Client → server: the machine public key and its signature over the nonce.
241    Auth {
242        /// The machine's Ed25519 public key.
243        pubkey: Vec<u8>,
244        /// The signature over the server's nonce.
245        signature: Vec<u8>,
246    },
247    /// Server → client: authentication succeeded; the resolved full participant path.
248    Established {
249        /// The resolved `user/machine/session` path.
250        path: SessionPath,
251    },
252    /// Client → server: claim a username and enroll this machine as its first key.
253    Register {
254        /// Username to claim.
255        username: String,
256        /// Machine name for this key.
257        machine: String,
258        /// The machine's Ed25519 public key.
259        pubkey: Vec<u8>,
260    },
261    /// Client → server: join a channel, optionally redeeming an invite token.
262    Join {
263        /// Channel name.
264        channel: String,
265        /// Invite token, if required.
266        token: Option<String>,
267    },
268    /// Client → server: leave a channel.
269    Leave {
270        /// Channel name.
271        channel: String,
272    },
273    /// Client → server: request presence, optionally scoped to one channel.
274    Who {
275        /// Channel to scope to, or all subscribed channels if absent.
276        channel: Option<String>,
277    },
278    /// Client → server: an admin / moderation operation.
279    Admin(AdminOp),
280    /// A message addressed to all sessions subscribed to a channel.
281    ChannelMsg {
282        /// Channel name.
283        channel: String,
284        /// The sender's full participant path.
285        from: SessionPath,
286        /// The message body.
287        payload: Payload,
288    },
289    /// A direct message to exactly one session path.
290    Whisper {
291        /// The sender's full participant path.
292        from: SessionPath,
293        /// The single recipient's full participant path.
294        target: SessionPath,
295        /// The message body.
296        payload: Payload,
297    },
298    /// Server → client: presence enumerated as full session paths.
299    Presence {
300        /// Channel the presence is scoped to, or server-wide if absent.
301        channel: Option<String>,
302        /// The present sessions.
303        sessions: Vec<SessionPath>,
304    },
305    /// A typed error surfaced to the peer that triggered it.
306    Error(ProtocolError),
307    // ---------------------------------------------------------------------
308    // M2 additions — appended after `Error` so every existing variant keeps
309    // its wire index (the append-only, forward-compat discipline of §13).
310    // ---------------------------------------------------------------------
311    /// Client → server: request the channels visible to the authenticated user (discovery).
312    ListChannels,
313    /// Server → client: the discovery result, already visibility-gated.
314    ChannelList {
315        /// The channels the caller may see.
316        channels: Vec<ChannelInfo>,
317    },
318    /// Server → client: a [`ProtocolMessage::Join`] succeeded; the session is now subscribed.
319    Joined {
320        /// The channel that was joined.
321        channel: String,
322    },
323    /// Server → client: a control / admin operation succeeded, with an optional human detail.
324    Ack {
325        /// A short human-readable detail (e.g. the affected name), if any.
326        detail: Option<String>,
327    },
328    /// Server → client: the token minted by an [`AdminOp::InviteCreate`].
329    InviteToken {
330        /// The opaque invite token.
331        token: String,
332    },
333    /// Client → server: liveness keepalive; refreshes presence and draws a [`ProtocolMessage::Pong`]
334    /// (the application-level realization of the §10 heartbeat, uniform across transports).
335    Ping,
336    /// Server → client: keepalive acknowledgement.
337    Pong,
338    // ---------------------------------------------------------------------
339    // M4 additions — appended (forward-compat): machine / user listing and the
340    // post-auth server-role signal that gates the bridge's admin tools.
341    // ---------------------------------------------------------------------
342    /// Server → client, immediately after [`ProtocolMessage::Established`]: the authenticated user's
343    /// server-wide role, so the bridge can gate its admin tools (DESIGN.md §7).
344    ServerInfo {
345        /// Whether the user is a server admin (on the serve-config allowlist).
346        admin: bool,
347    },
348    /// Client → server: list the machines enrolled under the authenticated user.
349    ListMachines,
350    /// Server → client: the caller's enrolled machines.
351    MachineList {
352        /// The machines under the caller's account.
353        machines: Vec<MachineInfo>,
354    },
355    /// Client → server: list the server's users (server-admin only).
356    ListUsers,
357    /// Server → client: the registered usernames (server-admin only).
358    UserList {
359        /// The registered usernames.
360        users: Vec<String>,
361    },
362    // ---------------------------------------------------------------------
363    // M10 additions — appended (forward-compat): operator-visibility listings.
364    // ---------------------------------------------------------------------
365    /// Server → client: a channel's outstanding invites (channel-admin audit, PRD-0011).
366    InviteList {
367        /// The outstanding invites.
368        invites: Vec<InviteInfo>,
369    },
370    // ---------------------------------------------------------------------
371    // M12 additions — appended (forward-compat): retained history (PRD-0013).
372    // ---------------------------------------------------------------------
373    /// Client → server: read the channel's retained history strictly after `since_ms`. The caller
374    /// must be subscribed; refusal is visibility-uniform with posting (PRD-0013 T-002).
375    ReadSince {
376        /// The channel to read.
377        channel: String,
378        /// The exclusive watermark, server-stamped epoch milliseconds (`0` = everything retained).
379        since_ms: i64,
380    },
381    /// Server → client: one page of retained history (oldest-first, capped; re-ask with the last
382    /// row's `ts_ms` to page).
383    History {
384        /// The channel the page belongs to.
385        channel: String,
386        /// The messages, oldest-first.
387        messages: Vec<HistoryMessage>,
388    },
389}
390
391impl ProtocolMessage {
392    /// The frame's name for telemetry (PRD-0014): the kind only — never field contents, so
393    /// message bodies, tokens, and key material cannot reach logs.
394    #[must_use]
395    pub fn kind(&self) -> &'static str {
396        match self {
397            Self::Hello { .. } => "hello",
398            Self::Challenge { .. } => "challenge",
399            Self::Auth { .. } => "auth",
400            Self::Established { .. } => "established",
401            Self::Register { .. } => "register",
402            Self::Join { .. } => "join",
403            Self::Leave { .. } => "leave",
404            Self::Who { .. } => "who",
405            Self::Admin(op) => op.name(),
406            Self::ChannelMsg { .. } => "channel_msg",
407            Self::Whisper { .. } => "whisper",
408            Self::Presence { .. } => "presence",
409            Self::Error(_) => "error",
410            Self::ListChannels => "list_channels",
411            Self::ChannelList { .. } => "channel_list",
412            Self::Joined { .. } => "joined",
413            Self::Ack { .. } => "ack",
414            Self::InviteToken { .. } => "invite_token",
415            Self::Ping => "ping",
416            Self::Pong => "pong",
417            Self::ServerInfo { .. } => "server_info",
418            Self::ListMachines => "list_machines",
419            Self::MachineList { .. } => "machine_list",
420            Self::ListUsers => "list_users",
421            Self::UserList { .. } => "user_list",
422            Self::InviteList { .. } => "invite_list",
423            Self::ReadSince { .. } => "read_since",
424            Self::History { .. } => "history",
425        }
426    }
427}
428
429/// One retained channel message as surfaced by [`ProtocolMessage::History`] (PRD-0013).
430#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
431pub struct HistoryMessage {
432    /// The sender's full session path (server-stamped at post time).
433    pub from: SessionPath,
434    /// Server-stamped receive time, epoch milliseconds — the read-since watermark unit.
435    pub ts_ms: i64,
436    /// The message envelope, verbatim as it crossed the wire.
437    pub payload: Payload,
438}
439
440/// Errors that cross the wire as a [`ProtocolMessage::Error`] frame and are matched on by the
441/// server and bridge (DESIGN.md §16). Application glue elsewhere uses `anyhow` via `Res` / `Void`.
442// The public name `ProtocolError` is fixed by DESIGN.md §13 / §22; `module_name_repetitions` is a
443// false positive against that mandated vocabulary (the sibling ratrod uses the same name).
444#[allow(clippy::module_name_repetitions)]
445#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize, thiserror::Error)]
446pub enum ProtocolError {
447    /// The peer's protocol version is incompatible with ours.
448    #[error("incompatible protocol version: ours={ours}, theirs={theirs}")]
449    VersionMismatch {
450        /// This build's protocol version.
451        ours: u32,
452        /// The peer's advertised version.
453        theirs: u32,
454    },
455    /// A frame could not be decoded, or violated the schema.
456    #[error("malformed frame: {0}")]
457    MalformedFrame(String),
458    /// The operation was denied (authentication or authorization).
459    #[error("unauthorized: {0}")]
460    Unauthorized(String),
461    /// The named channel, session, or target does not exist / is not visible.
462    #[error("not found: {0}")]
463    NotFound(String),
464    /// An unexpected server-side error.
465    #[error("internal error: {0}")]
466    Internal(String),
467}
468
469/// Returns this build's protocol version if `theirs` is compatible, else a [`ProtocolError::VersionMismatch`].
470///
471/// v1 speaks exactly one version, so compatibility is equality; a later minor-compatible range
472/// widens this without changing the call sites.
473///
474/// # Errors
475///
476/// Returns [`ProtocolError::VersionMismatch`] when the peer's version is not compatible.
477pub fn negotiate_version(theirs: u32) -> Result<u32, ProtocolError> {
478    if theirs == Constant::PROTOCOL_VERSION {
479        Ok(Constant::PROTOCOL_VERSION)
480    } else {
481        Err(ProtocolError::VersionMismatch { ours: Constant::PROTOCOL_VERSION, theirs })
482    }
483}
484
485/// Encodes a frame to its wire bytes with the fixed codec configuration.
486///
487/// # Errors
488///
489/// Returns an error if the frame cannot be serialized.
490pub fn encode(message: &ProtocolMessage) -> Res<Vec<u8>> {
491    bincode::serde::encode_to_vec(message, bincode::config::standard()).context("failed to encode protocol frame")
492}
493
494/// Decodes a frame from its wire bytes with the fixed codec configuration.
495///
496/// # Errors
497///
498/// Returns an error if the bytes are not a valid encoded frame.
499pub fn decode(bytes: &[u8]) -> Res<ProtocolMessage> {
500    let (message, _) = bincode::serde::decode_from_slice(bytes, bincode::config::standard()).context("failed to decode protocol frame")?;
501    Ok(message)
502}
503
504/// Encodes a bare [`Payload`] envelope for at-rest retention (PRD-0013) — the same codec as the
505/// wire, so stored history is byte-identical to what crossed it (E2E ciphertext included).
506///
507/// # Errors
508///
509/// Returns an error if encoding fails.
510pub fn encode_payload(payload: &Payload) -> Res<Vec<u8>> {
511    bincode::serde::encode_to_vec(payload, bincode::config::standard()).context("failed to encode payload envelope")
512}
513
514/// Decodes a retained [`Payload`] envelope (the inverse of [`encode_payload`]).
515///
516/// # Errors
517///
518/// Returns an error if the bytes are not a valid encoded envelope.
519pub fn decode_payload(bytes: &[u8]) -> Res<Payload> {
520    let (payload, _) = bincode::serde::decode_from_slice(bytes, bincode::config::standard()).context("failed to decode payload envelope")?;
521    Ok(payload)
522}
523
524/// Length-delimited sending of protocol frames over any async writer.
525pub trait ProtocolWrite: AsyncWrite + Unpin {
526    /// Encodes `message` and writes it as a `u32`-length-prefixed frame, then flushes.
527    ///
528    /// # Errors
529    ///
530    /// Returns an error if the frame cannot be encoded, exceeds `u32` in length, or the write fails.
531    fn send_message(&mut self, message: &ProtocolMessage) -> impl Future<Output = Res<()>> {
532        async move {
533            let body = encode(message)?;
534            let len = u32::try_from(body.len()).context("protocol frame exceeds u32 length")?;
535            self.write_all(&len.to_be_bytes()).await?;
536            self.write_all(&body).await?;
537            self.flush().await?;
538            Ok(())
539        }
540    }
541}
542
543impl<T: AsyncWrite + Unpin + ?Sized> ProtocolWrite for T {}
544
545/// Length-delimited receiving of protocol frames over any async reader.
546pub trait ProtocolRead: AsyncRead + Unpin {
547    /// Reads one `u32`-length-prefixed frame and decodes it.
548    ///
549    /// # Errors
550    ///
551    /// Returns an error on EOF / read failure, a length prefix beyond [`Constant::MAX_FRAME_SIZE`],
552    /// or a body that does not decode.
553    fn recv_message(&mut self) -> impl Future<Output = Res<ProtocolMessage>> {
554        async move {
555            let mut len_buf = [0_u8; 4];
556            self.read_exact(&mut len_buf).await?;
557            let len = usize::try_from(u32::from_be_bytes(len_buf)).context("frame length overflow")?;
558
559            anyhow::ensure!(len <= Constant::MAX_FRAME_SIZE, "protocol frame of {len} bytes exceeds the {} byte cap", Constant::MAX_FRAME_SIZE);
560
561            let mut body = vec![0_u8; len];
562            self.read_exact(&mut body).await?;
563            decode(&body)
564        }
565    }
566}
567
568impl<T: AsyncRead + Unpin + ?Sized> ProtocolRead for T {}
569
570#[cfg(test)]
571mod tests {
572    // Tests relax `unwrap_used` (house convention; DESIGN.md §22).
573    #![allow(clippy::unwrap_used)]
574
575    use super::*;
576    use crate::tests::duplex;
577    use pretty_assertions::assert_eq;
578
579    fn assert_round_trips(message: &ProtocolMessage) {
580        let bytes = encode(message).unwrap();
581        assert_eq!(&decode(&bytes).unwrap(), message);
582    }
583
584    #[test]
585    fn hello_round_trips_with_version_field() {
586        assert_round_trips(&ProtocolMessage::Hello {
587            protocol_version: Constant::PROTOCOL_VERSION,
588            session: "razel".to_owned(),
589        });
590    }
591
592    #[test]
593    fn channel_message_round_trips_plaintext() {
594        assert_round_trips(&ProtocolMessage::ChannelMsg {
595            channel: "ops".to_owned(),
596            from: SessionPath::new("aaron", "workstation", "razel"),
597            payload: Payload::Plain("hello, agents".to_owned()),
598        });
599    }
600
601    #[test]
602    fn data_frame_round_trips_the_reserved_e2e_envelope() {
603        assert_round_trips(&ProtocolMessage::Whisper {
604            from: SessionPath::new("aaron", "workstation", "razel"),
605            target: SessionPath::new("david", "desktop", "main"),
606            payload: Payload::Encrypted(Envelope {
607                ciphertext: vec![0xDE, 0xAD, 0xBE, 0xEF],
608                key_id: Some("channel-key-1".to_owned()),
609            }),
610        });
611    }
612
613    #[test]
614    fn admin_op_round_trips() {
615        assert_round_trips(&ProtocolMessage::Admin(AdminOp::CreateChannel {
616            name: "ops".to_owned(),
617            visibility: Visibility::Private,
618        }));
619    }
620
621    #[test]
622    fn machine_add_admin_op_round_trips() {
623        assert_round_trips(&ProtocolMessage::Admin(AdminOp::MachineAdd {
624            name: "sno-box".to_owned(),
625            pubkey: vec![1, 2, 3, 4],
626        }));
627    }
628
629    #[test]
630    fn acl_list_admin_op_round_trips() {
631        assert_round_trips(&ProtocolMessage::Admin(AdminOp::AclList { channel: "ops".to_owned() }));
632    }
633
634    #[test]
635    fn invite_list_round_trips_op_and_response() {
636        assert_round_trips(&ProtocolMessage::Admin(AdminOp::InviteList { channel: "ops".to_owned() }));
637        assert_round_trips(&ProtocolMessage::InviteList {
638            invites: vec![InviteInfo {
639                token: "tok".to_owned(),
640                uses_remaining: Some(3),
641                expires_at: None,
642            }],
643        });
644    }
645
646    #[test]
647    fn history_frames_round_trip() {
648        assert_round_trips(&ProtocolMessage::ReadSince {
649            channel: "ops".to_owned(),
650            since_ms: 1_751_500_000_000,
651        });
652        assert_round_trips(&ProtocolMessage::History {
653            channel: "ops".to_owned(),
654            messages: vec![HistoryMessage {
655                from: SessionPath::new("aaron", "workstation", "razel"),
656                ts_ms: 1_751_500_000_123,
657                payload: Payload::Plain("hello".to_owned()),
658            }],
659        });
660    }
661
662    #[test]
663    fn payload_envelope_round_trips_for_retention() {
664        let payload = Payload::Plain("retained".to_owned());
665        let bytes = encode_payload(&payload).unwrap();
666        assert_eq!(decode_payload(&bytes).unwrap(), payload);
667    }
668
669    #[test]
670    fn ban_visibility_admin_ops_round_trip() {
671        assert_round_trips(&ProtocolMessage::Admin(AdminOp::Unban {
672            channel: "ops".to_owned(),
673            user: "bob".to_owned(),
674        }));
675        assert_round_trips(&ProtocolMessage::Admin(AdminOp::BanList { channel: "ops".to_owned() }));
676    }
677
678    #[test]
679    fn m2_response_frames_round_trip() {
680        assert_round_trips(&ProtocolMessage::ListChannels);
681        assert_round_trips(&ProtocolMessage::ChannelList {
682            channels: vec![ChannelInfo {
683                name: "ops".to_owned(),
684                visibility: Visibility::Private,
685                member: true,
686            }],
687        });
688        assert_round_trips(&ProtocolMessage::Joined { channel: "ops".to_owned() });
689        assert_round_trips(&ProtocolMessage::Ack { detail: Some("ops".to_owned()) });
690        assert_round_trips(&ProtocolMessage::InviteToken { token: "tok-abc".to_owned() });
691        assert_round_trips(&ProtocolMessage::Ping);
692        assert_round_trips(&ProtocolMessage::Pong);
693    }
694
695    #[test]
696    fn m4_frames_round_trip() {
697        assert_round_trips(&ProtocolMessage::ServerInfo { admin: true });
698        assert_round_trips(&ProtocolMessage::ListMachines);
699        assert_round_trips(&ProtocolMessage::MachineList {
700            machines: vec![MachineInfo {
701                name: "workstation".to_owned(),
702                pubkey: "PUBKEY".to_owned(),
703                added_at: "2026-07-02T00:00:00Z".to_owned(),
704            }],
705        });
706        assert_round_trips(&ProtocolMessage::ListUsers);
707        assert_round_trips(&ProtocolMessage::UserList {
708            users: vec!["aaron".to_owned(), "david".to_owned()],
709        });
710    }
711
712    #[test]
713    fn appending_variants_preserves_existing_wire_indices() {
714        // The forward-compat guarantee (§13): an old variant's encoding must be byte-identical
715        // after new variants are appended. `Hello` is the first variant (index 0) and must still
716        // start with a 0 discriminant byte.
717        let hello = ProtocolMessage::Hello {
718            protocol_version: Constant::PROTOCOL_VERSION,
719            session: "razel".to_owned(),
720        };
721        assert_eq!(encode(&hello).unwrap()[0], 0, "the first variant's discriminant must remain 0");
722    }
723
724    #[test]
725    fn error_frame_round_trips() {
726        assert_round_trips(&ProtocolMessage::Error(ProtocolError::VersionMismatch { ours: 1, theirs: 2 }));
727    }
728
729    #[tokio::test]
730    async fn frames_stream_over_an_async_duplex() {
731        let (mut a, mut b) = duplex();
732        let sent = ProtocolMessage::Presence {
733            channel: Some("ops".to_owned()),
734            sessions: vec![SessionPath::new("aaron", "workstation", "razel"), SessionPath::new("david", "desktop", "main")],
735        };
736
737        a.send_message(&sent).await.unwrap();
738        let got = b.recv_message().await.unwrap();
739
740        assert_eq!(got, sent);
741    }
742
743    #[test]
744    fn version_negotiation_accepts_matching_and_rejects_mismatch() {
745        assert_eq!(negotiate_version(Constant::PROTOCOL_VERSION).unwrap(), Constant::PROTOCOL_VERSION);
746        assert_eq!(
747            negotiate_version(999),
748            Err(ProtocolError::VersionMismatch {
749                ours: Constant::PROTOCOL_VERSION,
750                theirs: 999,
751            })
752        );
753    }
754
755    #[tokio::test]
756    async fn recv_rejects_a_frame_larger_than_the_cap() {
757        // A length prefix beyond the cap is rejected before the body is allocated.
758        let oversized = u32::try_from(Constant::MAX_FRAME_SIZE + 1).unwrap();
759        let framed = oversized.to_be_bytes();
760        let mut reader = framed.as_slice();
761
762        assert!(reader.recv_message().await.is_err());
763    }
764}