conclavelib/protocol.rs
1//! Wire frames shared between the bridge and the central server.
2//!
3//! Both peers serialize [`ProtocolMessage`] with a fixed `bincode` configuration behind a
4//! length-delimited framing (the [`ProtocolWrite`] / [`ProtocolRead`] stream extensions). Two
5//! properties are fixed here for forward-compat (DESIGN.md §13), so later additions are additive
6//! rather than breaking:
7//!
8//! - a **protocol-version field** carried in [`ProtocolMessage::Hello`] and checked with
9//! [`negotiate_version`]; peers advertising an incompatible version are rejected,
10//! - an opaque **encrypted-payload envelope + key-id** ([`Payload::Encrypted`]), so end-to-end
11//! encryption (DESIGN.md §19) can be layered in without a wire break.
12//!
13//! `ProtocolError` is the typed, wire-crossing error surfaced as a [`ProtocolMessage::Error`]
14//! frame; application glue elsewhere uses `anyhow` via the `Res` / `Void` aliases.
15
16use std::future::Future;
17
18use anyhow::Context as _;
19use serde::{Deserialize, Serialize};
20use tokio::io::{AsyncRead, AsyncReadExt as _, AsyncWrite, AsyncWriteExt as _};
21
22use crate::base::{Constant, Res, SessionPath, Visibility};
23
24/// An opaque end-to-end-encrypted payload envelope, reserved now for v2 (DESIGN.md §13, §19).
25///
26/// The server fans this out without reading it; `key_id` names the per-channel key the sender
27/// wrapped the content with. Unused in v1 — reserving it keeps E2E an additive change.
28#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
29pub struct Envelope {
30 /// Ciphertext the server relays but cannot read.
31 pub ciphertext: Vec<u8>,
32 /// Identifier of the per-channel key this ciphertext was wrapped with.
33 pub key_id: Option<String>,
34}
35
36/// A message body: plaintext in v1, or the reserved E2E [`Envelope`] in v2.
37#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
38pub enum Payload {
39 /// A plaintext UTF-8 body (v1).
40 Plain(String),
41 /// An opaque end-to-end-encrypted body (v2; the envelope is reserved now).
42 Encrypted(Envelope),
43}
44
45/// A channel as surfaced by discovery ([`ProtocolMessage::ChannelList`], DESIGN.md §6).
46///
47/// Only channels the caller is allowed to see are ever listed, so no private name leaks to a
48/// non-member; `member` marks the ones the caller already belongs to.
49#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
50pub struct ChannelInfo {
51 /// The channel name.
52 pub name: String,
53 /// The visibility tier.
54 pub visibility: Visibility,
55 /// Whether the requesting user is already a member.
56 pub member: bool,
57}
58
59/// An enrolled machine as surfaced by [`ProtocolMessage::MachineList`] (`machine list`, §5.1).
60#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
61pub struct MachineInfo {
62 /// The machine name (unique within the user).
63 pub name: String,
64 /// The machine's public key, base64-encoded.
65 pub pubkey: String,
66 /// RFC 3339 enrollment timestamp.
67 pub added_at: String,
68}
69
70/// An outstanding invite as surfaced by the channel-admin audit ([`ProtocolMessage::InviteList`]).
71#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
72pub struct InviteInfo {
73 /// The opaque token string.
74 pub token: String,
75 /// Remaining redemptions, or unlimited if absent.
76 pub uses_remaining: Option<i64>,
77 /// RFC 3339 expiry, or non-expiring if absent.
78 pub expires_at: Option<String>,
79}
80
81/// An admin / moderation operation (DESIGN.md §7), authorized server-side by user role.
82#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
83pub enum AdminOp {
84 /// Create a channel with a visibility tier.
85 CreateChannel {
86 /// Channel name.
87 name: String,
88 /// Visibility tier.
89 visibility: Visibility,
90 },
91 /// Delete a channel.
92 DeleteChannel {
93 /// Channel name.
94 name: String,
95 },
96 /// Rename a channel.
97 RenameChannel {
98 /// Current name.
99 name: String,
100 /// New name.
101 new_name: String,
102 },
103 /// Change a channel's visibility tier.
104 SetVisibility {
105 /// Channel name.
106 name: String,
107 /// New visibility tier.
108 visibility: Visibility,
109 },
110 /// Add a user to a channel's access-control list.
111 AclAdd {
112 /// Channel name.
113 channel: String,
114 /// Username to add.
115 user: String,
116 },
117 /// Remove a user from a channel's access-control list.
118 AclRemove {
119 /// Channel name.
120 channel: String,
121 /// Username to remove.
122 user: String,
123 },
124 /// Create an invite token for a channel.
125 InviteCreate {
126 /// Channel name.
127 channel: String,
128 /// Maximum redemptions, or unlimited if absent.
129 uses: Option<u32>,
130 /// Lifetime in seconds, or non-expiring if absent.
131 expires_in_secs: Option<u64>,
132 },
133 /// Revoke an invite token.
134 InviteRevoke {
135 /// The token to revoke.
136 token: String,
137 },
138 /// Kick a live session or user from a channel.
139 Kick {
140 /// Channel name.
141 channel: String,
142 /// Session path or username to kick.
143 target: String,
144 },
145 /// Ban a user from a channel.
146 Ban {
147 /// Channel name.
148 channel: String,
149 /// Username to ban.
150 user: String,
151 },
152 /// Remove a user from the server (server-admin).
153 UserRemove {
154 /// Username to remove.
155 username: String,
156 },
157 /// Revoke an enrolled machine (server-admin / self), force-dropping its live sessions.
158 MachineRemove {
159 /// Machine name to revoke.
160 name: String,
161 },
162 /// Enroll a new machine key under the authenticated user (self-service, DESIGN.md §5.1).
163 ///
164 /// Appended after `MachineRemove` so existing variant indices are unchanged (forward-compat).
165 MachineAdd {
166 /// Unique-within-user name for the new machine.
167 name: String,
168 /// The new machine's Ed25519 public key (proves possession on its own first connect).
169 pubkey: Vec<u8>,
170 },
171 /// List a channel's ACL members (channel-admin; answered with a `UserList`).
172 AclList {
173 /// Channel name.
174 channel: String,
175 },
176 /// Lift a channel ban (channel-admin; does not grant ACL membership).
177 Unban {
178 /// Channel name.
179 channel: String,
180 /// Username to unban.
181 user: String,
182 },
183 /// List a channel's banned users (channel-admin; answered with a `UserList`).
184 BanList {
185 /// Channel name.
186 channel: String,
187 },
188 /// List a channel's outstanding invites (channel-admin; answered with an `InviteList`).
189 InviteList {
190 /// Channel name.
191 channel: String,
192 },
193}
194
195/// The versioned frame exchanged between a bridge and a central server.
196///
197/// Variants are append-only across protocol versions: later milestones may add variants but must
198/// not renumber or repurpose existing ones without a version bump (see [`negotiate_version`]).
199#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
200pub enum ProtocolMessage {
201 /// Client → server on connect: advertise the protocol version and the session handle.
202 Hello {
203 /// The client's protocol version.
204 protocol_version: u32,
205 /// The session handle (`--as`, defaulting to the repo/dir name).
206 session: String,
207 },
208 /// Server → client: a random nonce for the client to sign (challenge-response).
209 Challenge {
210 /// The random nonce.
211 nonce: Vec<u8>,
212 },
213 /// Client → server: the machine public key and its signature over the nonce.
214 Auth {
215 /// The machine's Ed25519 public key.
216 pubkey: Vec<u8>,
217 /// The signature over the server's nonce.
218 signature: Vec<u8>,
219 },
220 /// Server → client: authentication succeeded; the resolved full participant path.
221 Established {
222 /// The resolved `user/machine/session` path.
223 path: SessionPath,
224 },
225 /// Client → server: claim a username and enroll this machine as its first key.
226 Register {
227 /// Username to claim.
228 username: String,
229 /// Machine name for this key.
230 machine: String,
231 /// The machine's Ed25519 public key.
232 pubkey: Vec<u8>,
233 },
234 /// Client → server: join a channel, optionally redeeming an invite token.
235 Join {
236 /// Channel name.
237 channel: String,
238 /// Invite token, if required.
239 token: Option<String>,
240 },
241 /// Client → server: leave a channel.
242 Leave {
243 /// Channel name.
244 channel: String,
245 },
246 /// Client → server: request presence, optionally scoped to one channel.
247 Who {
248 /// Channel to scope to, or all subscribed channels if absent.
249 channel: Option<String>,
250 },
251 /// Client → server: an admin / moderation operation.
252 Admin(AdminOp),
253 /// A message addressed to all sessions subscribed to a channel.
254 ChannelMsg {
255 /// Channel name.
256 channel: String,
257 /// The sender's full participant path.
258 from: SessionPath,
259 /// The message body.
260 payload: Payload,
261 },
262 /// A direct message to exactly one session path.
263 Whisper {
264 /// The sender's full participant path.
265 from: SessionPath,
266 /// The single recipient's full participant path.
267 target: SessionPath,
268 /// The message body.
269 payload: Payload,
270 },
271 /// Server → client: presence enumerated as full session paths.
272 Presence {
273 /// Channel the presence is scoped to, or server-wide if absent.
274 channel: Option<String>,
275 /// The present sessions.
276 sessions: Vec<SessionPath>,
277 },
278 /// A typed error surfaced to the peer that triggered it.
279 Error(ProtocolError),
280 // ---------------------------------------------------------------------
281 // M2 additions — appended after `Error` so every existing variant keeps
282 // its wire index (the append-only, forward-compat discipline of §13).
283 // ---------------------------------------------------------------------
284 /// Client → server: request the channels visible to the authenticated user (discovery).
285 ListChannels,
286 /// Server → client: the discovery result, already visibility-gated.
287 ChannelList {
288 /// The channels the caller may see.
289 channels: Vec<ChannelInfo>,
290 },
291 /// Server → client: a [`ProtocolMessage::Join`] succeeded; the session is now subscribed.
292 Joined {
293 /// The channel that was joined.
294 channel: String,
295 },
296 /// Server → client: a control / admin operation succeeded, with an optional human detail.
297 Ack {
298 /// A short human-readable detail (e.g. the affected name), if any.
299 detail: Option<String>,
300 },
301 /// Server → client: the token minted by an [`AdminOp::InviteCreate`].
302 InviteToken {
303 /// The opaque invite token.
304 token: String,
305 },
306 /// Client → server: liveness keepalive; refreshes presence and draws a [`ProtocolMessage::Pong`]
307 /// (the application-level realization of the §10 heartbeat, uniform across transports).
308 Ping,
309 /// Server → client: keepalive acknowledgement.
310 Pong,
311 // ---------------------------------------------------------------------
312 // M4 additions — appended (forward-compat): machine / user listing and the
313 // post-auth server-role signal that gates the bridge's admin tools.
314 // ---------------------------------------------------------------------
315 /// Server → client, immediately after [`ProtocolMessage::Established`]: the authenticated user's
316 /// server-wide role, so the bridge can gate its admin tools (DESIGN.md §7).
317 ServerInfo {
318 /// Whether the user is a server admin (on the serve-config allowlist).
319 admin: bool,
320 },
321 /// Client → server: list the machines enrolled under the authenticated user.
322 ListMachines,
323 /// Server → client: the caller's enrolled machines.
324 MachineList {
325 /// The machines under the caller's account.
326 machines: Vec<MachineInfo>,
327 },
328 /// Client → server: list the server's users (server-admin only).
329 ListUsers,
330 /// Server → client: the registered usernames (server-admin only).
331 UserList {
332 /// The registered usernames.
333 users: Vec<String>,
334 },
335 // ---------------------------------------------------------------------
336 // M10 additions — appended (forward-compat): operator-visibility listings.
337 // ---------------------------------------------------------------------
338 /// Server → client: a channel's outstanding invites (channel-admin audit, PRD-0011).
339 InviteList {
340 /// The outstanding invites.
341 invites: Vec<InviteInfo>,
342 },
343 // ---------------------------------------------------------------------
344 // M12 additions — appended (forward-compat): retained history (PRD-0013).
345 // ---------------------------------------------------------------------
346 /// Client → server: read the channel's retained history strictly after `since_ms`. The caller
347 /// must be subscribed; refusal is visibility-uniform with posting (PRD-0013 T-002).
348 ReadSince {
349 /// The channel to read.
350 channel: String,
351 /// The exclusive watermark, server-stamped epoch milliseconds (`0` = everything retained).
352 since_ms: i64,
353 },
354 /// Server → client: one page of retained history (oldest-first, capped; re-ask with the last
355 /// row's `ts_ms` to page).
356 History {
357 /// The channel the page belongs to.
358 channel: String,
359 /// The messages, oldest-first.
360 messages: Vec<HistoryMessage>,
361 },
362}
363
364/// One retained channel message as surfaced by [`ProtocolMessage::History`] (PRD-0013).
365#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
366pub struct HistoryMessage {
367 /// The sender's full session path (server-stamped at post time).
368 pub from: SessionPath,
369 /// Server-stamped receive time, epoch milliseconds — the read-since watermark unit.
370 pub ts_ms: i64,
371 /// The message envelope, verbatim as it crossed the wire.
372 pub payload: Payload,
373}
374
375/// Errors that cross the wire as a [`ProtocolMessage::Error`] frame and are matched on by the
376/// server and bridge (DESIGN.md §16). Application glue elsewhere uses `anyhow` via `Res` / `Void`.
377// The public name `ProtocolError` is fixed by DESIGN.md §13 / §22; `module_name_repetitions` is a
378// false positive against that mandated vocabulary (the sibling ratrod uses the same name).
379#[allow(clippy::module_name_repetitions)]
380#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize, thiserror::Error)]
381pub enum ProtocolError {
382 /// The peer's protocol version is incompatible with ours.
383 #[error("incompatible protocol version: ours={ours}, theirs={theirs}")]
384 VersionMismatch {
385 /// This build's protocol version.
386 ours: u32,
387 /// The peer's advertised version.
388 theirs: u32,
389 },
390 /// A frame could not be decoded, or violated the schema.
391 #[error("malformed frame: {0}")]
392 MalformedFrame(String),
393 /// The operation was denied (authentication or authorization).
394 #[error("unauthorized: {0}")]
395 Unauthorized(String),
396 /// The named channel, session, or target does not exist / is not visible.
397 #[error("not found: {0}")]
398 NotFound(String),
399 /// An unexpected server-side error.
400 #[error("internal error: {0}")]
401 Internal(String),
402}
403
404/// Returns this build's protocol version if `theirs` is compatible, else a [`ProtocolError::VersionMismatch`].
405///
406/// v1 speaks exactly one version, so compatibility is equality; a later minor-compatible range
407/// widens this without changing the call sites.
408///
409/// # Errors
410///
411/// Returns [`ProtocolError::VersionMismatch`] when the peer's version is not compatible.
412pub fn negotiate_version(theirs: u32) -> Result<u32, ProtocolError> {
413 if theirs == Constant::PROTOCOL_VERSION {
414 Ok(Constant::PROTOCOL_VERSION)
415 } else {
416 Err(ProtocolError::VersionMismatch { ours: Constant::PROTOCOL_VERSION, theirs })
417 }
418}
419
420/// Encodes a frame to its wire bytes with the fixed codec configuration.
421///
422/// # Errors
423///
424/// Returns an error if the frame cannot be serialized.
425pub fn encode(message: &ProtocolMessage) -> Res<Vec<u8>> {
426 bincode::serde::encode_to_vec(message, bincode::config::standard()).context("failed to encode protocol frame")
427}
428
429/// Decodes a frame from its wire bytes with the fixed codec configuration.
430///
431/// # Errors
432///
433/// Returns an error if the bytes are not a valid encoded frame.
434pub fn decode(bytes: &[u8]) -> Res<ProtocolMessage> {
435 let (message, _) = bincode::serde::decode_from_slice(bytes, bincode::config::standard()).context("failed to decode protocol frame")?;
436 Ok(message)
437}
438
439/// Encodes a bare [`Payload`] envelope for at-rest retention (PRD-0013) — the same codec as the
440/// wire, so stored history is byte-identical to what crossed it (E2E ciphertext included).
441///
442/// # Errors
443///
444/// Returns an error if encoding fails.
445pub fn encode_payload(payload: &Payload) -> Res<Vec<u8>> {
446 bincode::serde::encode_to_vec(payload, bincode::config::standard()).context("failed to encode payload envelope")
447}
448
449/// Decodes a retained [`Payload`] envelope (the inverse of [`encode_payload`]).
450///
451/// # Errors
452///
453/// Returns an error if the bytes are not a valid encoded envelope.
454pub fn decode_payload(bytes: &[u8]) -> Res<Payload> {
455 let (payload, _) = bincode::serde::decode_from_slice(bytes, bincode::config::standard()).context("failed to decode payload envelope")?;
456 Ok(payload)
457}
458
459/// Length-delimited sending of protocol frames over any async writer.
460pub trait ProtocolWrite: AsyncWrite + Unpin {
461 /// Encodes `message` and writes it as a `u32`-length-prefixed frame, then flushes.
462 ///
463 /// # Errors
464 ///
465 /// Returns an error if the frame cannot be encoded, exceeds `u32` in length, or the write fails.
466 fn send_message(&mut self, message: &ProtocolMessage) -> impl Future<Output = Res<()>> {
467 async move {
468 let body = encode(message)?;
469 let len = u32::try_from(body.len()).context("protocol frame exceeds u32 length")?;
470 self.write_all(&len.to_be_bytes()).await?;
471 self.write_all(&body).await?;
472 self.flush().await?;
473 Ok(())
474 }
475 }
476}
477
478impl<T: AsyncWrite + Unpin + ?Sized> ProtocolWrite for T {}
479
480/// Length-delimited receiving of protocol frames over any async reader.
481pub trait ProtocolRead: AsyncRead + Unpin {
482 /// Reads one `u32`-length-prefixed frame and decodes it.
483 ///
484 /// # Errors
485 ///
486 /// Returns an error on EOF / read failure, a length prefix beyond [`Constant::MAX_FRAME_SIZE`],
487 /// or a body that does not decode.
488 fn recv_message(&mut self) -> impl Future<Output = Res<ProtocolMessage>> {
489 async move {
490 let mut len_buf = [0_u8; 4];
491 self.read_exact(&mut len_buf).await?;
492 let len = usize::try_from(u32::from_be_bytes(len_buf)).context("frame length overflow")?;
493
494 anyhow::ensure!(len <= Constant::MAX_FRAME_SIZE, "protocol frame of {len} bytes exceeds the {} byte cap", Constant::MAX_FRAME_SIZE);
495
496 let mut body = vec![0_u8; len];
497 self.read_exact(&mut body).await?;
498 decode(&body)
499 }
500 }
501}
502
503impl<T: AsyncRead + Unpin + ?Sized> ProtocolRead for T {}
504
505#[cfg(test)]
506mod tests {
507 // Tests relax `unwrap_used` (house convention; DESIGN.md §22).
508 #![allow(clippy::unwrap_used)]
509
510 use super::*;
511 use crate::tests::duplex;
512 use pretty_assertions::assert_eq;
513
514 fn assert_round_trips(message: &ProtocolMessage) {
515 let bytes = encode(message).unwrap();
516 assert_eq!(&decode(&bytes).unwrap(), message);
517 }
518
519 #[test]
520 fn hello_round_trips_with_version_field() {
521 assert_round_trips(&ProtocolMessage::Hello {
522 protocol_version: Constant::PROTOCOL_VERSION,
523 session: "razel".to_owned(),
524 });
525 }
526
527 #[test]
528 fn channel_message_round_trips_plaintext() {
529 assert_round_trips(&ProtocolMessage::ChannelMsg {
530 channel: "ops".to_owned(),
531 from: SessionPath::new("aaron", "workstation", "razel"),
532 payload: Payload::Plain("hello, agents".to_owned()),
533 });
534 }
535
536 #[test]
537 fn data_frame_round_trips_the_reserved_e2e_envelope() {
538 assert_round_trips(&ProtocolMessage::Whisper {
539 from: SessionPath::new("aaron", "workstation", "razel"),
540 target: SessionPath::new("david", "desktop", "main"),
541 payload: Payload::Encrypted(Envelope {
542 ciphertext: vec![0xDE, 0xAD, 0xBE, 0xEF],
543 key_id: Some("channel-key-1".to_owned()),
544 }),
545 });
546 }
547
548 #[test]
549 fn admin_op_round_trips() {
550 assert_round_trips(&ProtocolMessage::Admin(AdminOp::CreateChannel {
551 name: "ops".to_owned(),
552 visibility: Visibility::Private,
553 }));
554 }
555
556 #[test]
557 fn machine_add_admin_op_round_trips() {
558 assert_round_trips(&ProtocolMessage::Admin(AdminOp::MachineAdd {
559 name: "sno-box".to_owned(),
560 pubkey: vec![1, 2, 3, 4],
561 }));
562 }
563
564 #[test]
565 fn acl_list_admin_op_round_trips() {
566 assert_round_trips(&ProtocolMessage::Admin(AdminOp::AclList { channel: "ops".to_owned() }));
567 }
568
569 #[test]
570 fn invite_list_round_trips_op_and_response() {
571 assert_round_trips(&ProtocolMessage::Admin(AdminOp::InviteList { channel: "ops".to_owned() }));
572 assert_round_trips(&ProtocolMessage::InviteList {
573 invites: vec![InviteInfo {
574 token: "tok".to_owned(),
575 uses_remaining: Some(3),
576 expires_at: None,
577 }],
578 });
579 }
580
581 #[test]
582 fn history_frames_round_trip() {
583 assert_round_trips(&ProtocolMessage::ReadSince {
584 channel: "ops".to_owned(),
585 since_ms: 1_751_500_000_000,
586 });
587 assert_round_trips(&ProtocolMessage::History {
588 channel: "ops".to_owned(),
589 messages: vec![HistoryMessage {
590 from: SessionPath::new("aaron", "workstation", "razel"),
591 ts_ms: 1_751_500_000_123,
592 payload: Payload::Plain("hello".to_owned()),
593 }],
594 });
595 }
596
597 #[test]
598 fn payload_envelope_round_trips_for_retention() {
599 let payload = Payload::Plain("retained".to_owned());
600 let bytes = encode_payload(&payload).unwrap();
601 assert_eq!(decode_payload(&bytes).unwrap(), payload);
602 }
603
604 #[test]
605 fn ban_visibility_admin_ops_round_trip() {
606 assert_round_trips(&ProtocolMessage::Admin(AdminOp::Unban {
607 channel: "ops".to_owned(),
608 user: "bob".to_owned(),
609 }));
610 assert_round_trips(&ProtocolMessage::Admin(AdminOp::BanList { channel: "ops".to_owned() }));
611 }
612
613 #[test]
614 fn m2_response_frames_round_trip() {
615 assert_round_trips(&ProtocolMessage::ListChannels);
616 assert_round_trips(&ProtocolMessage::ChannelList {
617 channels: vec![ChannelInfo {
618 name: "ops".to_owned(),
619 visibility: Visibility::Private,
620 member: true,
621 }],
622 });
623 assert_round_trips(&ProtocolMessage::Joined { channel: "ops".to_owned() });
624 assert_round_trips(&ProtocolMessage::Ack { detail: Some("ops".to_owned()) });
625 assert_round_trips(&ProtocolMessage::InviteToken { token: "tok-abc".to_owned() });
626 assert_round_trips(&ProtocolMessage::Ping);
627 assert_round_trips(&ProtocolMessage::Pong);
628 }
629
630 #[test]
631 fn m4_frames_round_trip() {
632 assert_round_trips(&ProtocolMessage::ServerInfo { admin: true });
633 assert_round_trips(&ProtocolMessage::ListMachines);
634 assert_round_trips(&ProtocolMessage::MachineList {
635 machines: vec![MachineInfo {
636 name: "workstation".to_owned(),
637 pubkey: "PUBKEY".to_owned(),
638 added_at: "2026-07-02T00:00:00Z".to_owned(),
639 }],
640 });
641 assert_round_trips(&ProtocolMessage::ListUsers);
642 assert_round_trips(&ProtocolMessage::UserList {
643 users: vec!["aaron".to_owned(), "david".to_owned()],
644 });
645 }
646
647 #[test]
648 fn appending_variants_preserves_existing_wire_indices() {
649 // The forward-compat guarantee (§13): an old variant's encoding must be byte-identical
650 // after new variants are appended. `Hello` is the first variant (index 0) and must still
651 // start with a 0 discriminant byte.
652 let hello = ProtocolMessage::Hello {
653 protocol_version: Constant::PROTOCOL_VERSION,
654 session: "razel".to_owned(),
655 };
656 assert_eq!(encode(&hello).unwrap()[0], 0, "the first variant's discriminant must remain 0");
657 }
658
659 #[test]
660 fn error_frame_round_trips() {
661 assert_round_trips(&ProtocolMessage::Error(ProtocolError::VersionMismatch { ours: 1, theirs: 2 }));
662 }
663
664 #[tokio::test]
665 async fn frames_stream_over_an_async_duplex() {
666 let (mut a, mut b) = duplex();
667 let sent = ProtocolMessage::Presence {
668 channel: Some("ops".to_owned()),
669 sessions: vec![SessionPath::new("aaron", "workstation", "razel"), SessionPath::new("david", "desktop", "main")],
670 };
671
672 a.send_message(&sent).await.unwrap();
673 let got = b.recv_message().await.unwrap();
674
675 assert_eq!(got, sent);
676 }
677
678 #[test]
679 fn version_negotiation_accepts_matching_and_rejects_mismatch() {
680 assert_eq!(negotiate_version(Constant::PROTOCOL_VERSION).unwrap(), Constant::PROTOCOL_VERSION);
681 assert_eq!(
682 negotiate_version(999),
683 Err(ProtocolError::VersionMismatch {
684 ours: Constant::PROTOCOL_VERSION,
685 theirs: 999,
686 })
687 );
688 }
689
690 #[tokio::test]
691 async fn recv_rejects_a_frame_larger_than_the_cap() {
692 // A length prefix beyond the cap is rejected before the body is allocated.
693 let oversized = u32::try_from(Constant::MAX_FRAME_SIZE + 1).unwrap();
694 let framed = oversized.to_be_bytes();
695 let mut reader = framed.as_slice();
696
697 assert!(reader.recv_message().await.is_err());
698 }
699}