Skip to main content

conclavelib/
protocol.rs

1//! Wire frames shared between the bridge and the central server.
2//!
3//! Both peers serialize [`ProtocolMessage`] with a fixed `bincode` configuration behind a
4//! length-delimited framing (the [`ProtocolWrite`] / [`ProtocolRead`] stream extensions). Two
5//! properties are fixed here for forward-compat (DESIGN.md §13), so later additions are additive
6//! rather than breaking:
7//!
8//! - a **protocol-version field** carried in [`ProtocolMessage::Hello`] and checked with
9//!   [`negotiate_version`]; peers advertising an incompatible version are rejected,
10//! - an opaque **encrypted-payload envelope + key-id** ([`Payload::Encrypted`]), so end-to-end
11//!   encryption (DESIGN.md §19) can be layered in without a wire break.
12//!
13//! `ProtocolError` is the typed, wire-crossing error surfaced as a [`ProtocolMessage::Error`]
14//! frame; application glue elsewhere uses `anyhow` via the `Res` / `Void` aliases.
15
16use std::future::Future;
17
18use anyhow::Context as _;
19use serde::{Deserialize, Serialize};
20use tokio::io::{AsyncRead, AsyncReadExt as _, AsyncWrite, AsyncWriteExt as _};
21
22use crate::base::{Constant, Res, SessionPath, Visibility};
23
24/// An opaque end-to-end-encrypted payload envelope, reserved now for v2 (DESIGN.md §13, §19).
25///
26/// The server fans this out without reading it; `key_id` names the per-channel key the sender
27/// wrapped the content with. Unused in v1 — reserving it keeps E2E an additive change.
28#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
29pub struct Envelope {
30    /// Ciphertext the server relays but cannot read.
31    pub ciphertext: Vec<u8>,
32    /// Identifier of the per-channel key this ciphertext was wrapped with.
33    pub key_id: Option<String>,
34}
35
36/// A message body: plaintext in v1, or the reserved E2E [`Envelope`] in v2.
37#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
38pub enum Payload {
39    /// A plaintext UTF-8 body (v1).
40    Plain(String),
41    /// An opaque end-to-end-encrypted body (v2; the envelope is reserved now).
42    Encrypted(Envelope),
43}
44
45/// A channel as surfaced by discovery ([`ProtocolMessage::ChannelList`], DESIGN.md §6).
46///
47/// Only channels the caller is allowed to see are ever listed, so no private name leaks to a
48/// non-member; `member` marks the ones the caller already belongs to.
49#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
50pub struct ChannelInfo {
51    /// The channel name.
52    pub name: String,
53    /// The visibility tier.
54    pub visibility: Visibility,
55    /// Whether the requesting user is already a member.
56    pub member: bool,
57}
58
59/// An enrolled machine as surfaced by [`ProtocolMessage::MachineList`] (`machine list`, §5.1).
60#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
61pub struct MachineInfo {
62    /// The machine name (unique within the user).
63    pub name: String,
64    /// The machine's public key, base64-encoded.
65    pub pubkey: String,
66    /// RFC 3339 enrollment timestamp.
67    pub added_at: String,
68}
69
70/// An admin / moderation operation (DESIGN.md §7), authorized server-side by user role.
71#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
72pub enum AdminOp {
73    /// Create a channel with a visibility tier.
74    CreateChannel {
75        /// Channel name.
76        name: String,
77        /// Visibility tier.
78        visibility: Visibility,
79    },
80    /// Delete a channel.
81    DeleteChannel {
82        /// Channel name.
83        name: String,
84    },
85    /// Rename a channel.
86    RenameChannel {
87        /// Current name.
88        name: String,
89        /// New name.
90        new_name: String,
91    },
92    /// Change a channel's visibility tier.
93    SetVisibility {
94        /// Channel name.
95        name: String,
96        /// New visibility tier.
97        visibility: Visibility,
98    },
99    /// Add a user to a channel's access-control list.
100    AclAdd {
101        /// Channel name.
102        channel: String,
103        /// Username to add.
104        user: String,
105    },
106    /// Remove a user from a channel's access-control list.
107    AclRemove {
108        /// Channel name.
109        channel: String,
110        /// Username to remove.
111        user: String,
112    },
113    /// Create an invite token for a channel.
114    InviteCreate {
115        /// Channel name.
116        channel: String,
117        /// Maximum redemptions, or unlimited if absent.
118        uses: Option<u32>,
119        /// Lifetime in seconds, or non-expiring if absent.
120        expires_in_secs: Option<u64>,
121    },
122    /// Revoke an invite token.
123    InviteRevoke {
124        /// The token to revoke.
125        token: String,
126    },
127    /// Kick a live session or user from a channel.
128    Kick {
129        /// Channel name.
130        channel: String,
131        /// Session path or username to kick.
132        target: String,
133    },
134    /// Ban a user from a channel.
135    Ban {
136        /// Channel name.
137        channel: String,
138        /// Username to ban.
139        user: String,
140    },
141    /// Remove a user from the server (server-admin).
142    UserRemove {
143        /// Username to remove.
144        username: String,
145    },
146    /// Revoke an enrolled machine (server-admin / self), force-dropping its live sessions.
147    MachineRemove {
148        /// Machine name to revoke.
149        name: String,
150    },
151    /// Enroll a new machine key under the authenticated user (self-service, DESIGN.md §5.1).
152    ///
153    /// Appended after `MachineRemove` so existing variant indices are unchanged (forward-compat).
154    MachineAdd {
155        /// Unique-within-user name for the new machine.
156        name: String,
157        /// The new machine's Ed25519 public key (proves possession on its own first connect).
158        pubkey: Vec<u8>,
159    },
160}
161
162/// The versioned frame exchanged between a bridge and a central server.
163///
164/// Variants are append-only across protocol versions: later milestones may add variants but must
165/// not renumber or repurpose existing ones without a version bump (see [`negotiate_version`]).
166#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
167pub enum ProtocolMessage {
168    /// Client → server on connect: advertise the protocol version and the session handle.
169    Hello {
170        /// The client's protocol version.
171        protocol_version: u32,
172        /// The session handle (`--as`, defaulting to the repo/dir name).
173        session: String,
174    },
175    /// Server → client: a random nonce for the client to sign (challenge-response).
176    Challenge {
177        /// The random nonce.
178        nonce: Vec<u8>,
179    },
180    /// Client → server: the machine public key and its signature over the nonce.
181    Auth {
182        /// The machine's Ed25519 public key.
183        pubkey: Vec<u8>,
184        /// The signature over the server's nonce.
185        signature: Vec<u8>,
186    },
187    /// Server → client: authentication succeeded; the resolved full participant path.
188    Established {
189        /// The resolved `user/machine/session` path.
190        path: SessionPath,
191    },
192    /// Client → server: claim a username and enroll this machine as its first key.
193    Register {
194        /// Username to claim.
195        username: String,
196        /// Machine name for this key.
197        machine: String,
198        /// The machine's Ed25519 public key.
199        pubkey: Vec<u8>,
200    },
201    /// Client → server: join a channel, optionally redeeming an invite token.
202    Join {
203        /// Channel name.
204        channel: String,
205        /// Invite token, if required.
206        token: Option<String>,
207    },
208    /// Client → server: leave a channel.
209    Leave {
210        /// Channel name.
211        channel: String,
212    },
213    /// Client → server: request presence, optionally scoped to one channel.
214    Who {
215        /// Channel to scope to, or all subscribed channels if absent.
216        channel: Option<String>,
217    },
218    /// Client → server: an admin / moderation operation.
219    Admin(AdminOp),
220    /// A message addressed to all sessions subscribed to a channel.
221    ChannelMsg {
222        /// Channel name.
223        channel: String,
224        /// The sender's full participant path.
225        from: SessionPath,
226        /// The message body.
227        payload: Payload,
228    },
229    /// A direct message to exactly one session path.
230    Whisper {
231        /// The sender's full participant path.
232        from: SessionPath,
233        /// The single recipient's full participant path.
234        target: SessionPath,
235        /// The message body.
236        payload: Payload,
237    },
238    /// Server → client: presence enumerated as full session paths.
239    Presence {
240        /// Channel the presence is scoped to, or server-wide if absent.
241        channel: Option<String>,
242        /// The present sessions.
243        sessions: Vec<SessionPath>,
244    },
245    /// A typed error surfaced to the peer that triggered it.
246    Error(ProtocolError),
247    // ---------------------------------------------------------------------
248    // M2 additions — appended after `Error` so every existing variant keeps
249    // its wire index (the append-only, forward-compat discipline of §13).
250    // ---------------------------------------------------------------------
251    /// Client → server: request the channels visible to the authenticated user (discovery).
252    ListChannels,
253    /// Server → client: the discovery result, already visibility-gated.
254    ChannelList {
255        /// The channels the caller may see.
256        channels: Vec<ChannelInfo>,
257    },
258    /// Server → client: a [`ProtocolMessage::Join`] succeeded; the session is now subscribed.
259    Joined {
260        /// The channel that was joined.
261        channel: String,
262    },
263    /// Server → client: a control / admin operation succeeded, with an optional human detail.
264    Ack {
265        /// A short human-readable detail (e.g. the affected name), if any.
266        detail: Option<String>,
267    },
268    /// Server → client: the token minted by an [`AdminOp::InviteCreate`].
269    InviteToken {
270        /// The opaque invite token.
271        token: String,
272    },
273    /// Client → server: liveness keepalive; refreshes presence and draws a [`ProtocolMessage::Pong`]
274    /// (the application-level realization of the §10 heartbeat, uniform across transports).
275    Ping,
276    /// Server → client: keepalive acknowledgement.
277    Pong,
278    // ---------------------------------------------------------------------
279    // M4 additions — appended (forward-compat): machine / user listing and the
280    // post-auth server-role signal that gates the bridge's admin tools.
281    // ---------------------------------------------------------------------
282    /// Server → client, immediately after [`ProtocolMessage::Established`]: the authenticated user's
283    /// server-wide role, so the bridge can gate its admin tools (DESIGN.md §7).
284    ServerInfo {
285        /// Whether the user is a server admin (on the serve-config allowlist).
286        admin: bool,
287    },
288    /// Client → server: list the machines enrolled under the authenticated user.
289    ListMachines,
290    /// Server → client: the caller's enrolled machines.
291    MachineList {
292        /// The machines under the caller's account.
293        machines: Vec<MachineInfo>,
294    },
295    /// Client → server: list the server's users (server-admin only).
296    ListUsers,
297    /// Server → client: the registered usernames (server-admin only).
298    UserList {
299        /// The registered usernames.
300        users: Vec<String>,
301    },
302}
303
304/// Errors that cross the wire as a [`ProtocolMessage::Error`] frame and are matched on by the
305/// server and bridge (DESIGN.md §16). Application glue elsewhere uses `anyhow` via `Res` / `Void`.
306// The public name `ProtocolError` is fixed by DESIGN.md §13 / §22; `module_name_repetitions` is a
307// false positive against that mandated vocabulary (the sibling ratrod uses the same name).
308#[allow(clippy::module_name_repetitions)]
309#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize, thiserror::Error)]
310pub enum ProtocolError {
311    /// The peer's protocol version is incompatible with ours.
312    #[error("incompatible protocol version: ours={ours}, theirs={theirs}")]
313    VersionMismatch {
314        /// This build's protocol version.
315        ours: u32,
316        /// The peer's advertised version.
317        theirs: u32,
318    },
319    /// A frame could not be decoded, or violated the schema.
320    #[error("malformed frame: {0}")]
321    MalformedFrame(String),
322    /// The operation was denied (authentication or authorization).
323    #[error("unauthorized: {0}")]
324    Unauthorized(String),
325    /// The named channel, session, or target does not exist / is not visible.
326    #[error("not found: {0}")]
327    NotFound(String),
328    /// An unexpected server-side error.
329    #[error("internal error: {0}")]
330    Internal(String),
331}
332
333/// Returns this build's protocol version if `theirs` is compatible, else a [`ProtocolError::VersionMismatch`].
334///
335/// v1 speaks exactly one version, so compatibility is equality; a later minor-compatible range
336/// widens this without changing the call sites.
337///
338/// # Errors
339///
340/// Returns [`ProtocolError::VersionMismatch`] when the peer's version is not compatible.
341pub fn negotiate_version(theirs: u32) -> Result<u32, ProtocolError> {
342    if theirs == Constant::PROTOCOL_VERSION {
343        Ok(Constant::PROTOCOL_VERSION)
344    } else {
345        Err(ProtocolError::VersionMismatch { ours: Constant::PROTOCOL_VERSION, theirs })
346    }
347}
348
349/// Encodes a frame to its wire bytes with the fixed codec configuration.
350///
351/// # Errors
352///
353/// Returns an error if the frame cannot be serialized.
354pub fn encode(message: &ProtocolMessage) -> Res<Vec<u8>> {
355    bincode::serde::encode_to_vec(message, bincode::config::standard()).context("failed to encode protocol frame")
356}
357
358/// Decodes a frame from its wire bytes with the fixed codec configuration.
359///
360/// # Errors
361///
362/// Returns an error if the bytes are not a valid encoded frame.
363pub fn decode(bytes: &[u8]) -> Res<ProtocolMessage> {
364    let (message, _) = bincode::serde::decode_from_slice(bytes, bincode::config::standard()).context("failed to decode protocol frame")?;
365    Ok(message)
366}
367
368/// Length-delimited sending of protocol frames over any async writer.
369pub trait ProtocolWrite: AsyncWrite + Unpin {
370    /// Encodes `message` and writes it as a `u32`-length-prefixed frame, then flushes.
371    ///
372    /// # Errors
373    ///
374    /// Returns an error if the frame cannot be encoded, exceeds `u32` in length, or the write fails.
375    fn send_message(&mut self, message: &ProtocolMessage) -> impl Future<Output = Res<()>> {
376        async move {
377            let body = encode(message)?;
378            let len = u32::try_from(body.len()).context("protocol frame exceeds u32 length")?;
379            self.write_all(&len.to_be_bytes()).await?;
380            self.write_all(&body).await?;
381            self.flush().await?;
382            Ok(())
383        }
384    }
385}
386
387impl<T: AsyncWrite + Unpin + ?Sized> ProtocolWrite for T {}
388
389/// Length-delimited receiving of protocol frames over any async reader.
390pub trait ProtocolRead: AsyncRead + Unpin {
391    /// Reads one `u32`-length-prefixed frame and decodes it.
392    ///
393    /// # Errors
394    ///
395    /// Returns an error on EOF / read failure, a length prefix beyond [`Constant::MAX_FRAME_SIZE`],
396    /// or a body that does not decode.
397    fn recv_message(&mut self) -> impl Future<Output = Res<ProtocolMessage>> {
398        async move {
399            let mut len_buf = [0_u8; 4];
400            self.read_exact(&mut len_buf).await?;
401            let len = usize::try_from(u32::from_be_bytes(len_buf)).context("frame length overflow")?;
402
403            anyhow::ensure!(len <= Constant::MAX_FRAME_SIZE, "protocol frame of {len} bytes exceeds the {} byte cap", Constant::MAX_FRAME_SIZE);
404
405            let mut body = vec![0_u8; len];
406            self.read_exact(&mut body).await?;
407            decode(&body)
408        }
409    }
410}
411
412impl<T: AsyncRead + Unpin + ?Sized> ProtocolRead for T {}
413
414#[cfg(test)]
415mod tests {
416    // Tests relax `unwrap_used` (house convention; DESIGN.md §22).
417    #![allow(clippy::unwrap_used)]
418
419    use super::*;
420    use crate::tests::duplex;
421    use pretty_assertions::assert_eq;
422
423    fn assert_round_trips(message: &ProtocolMessage) {
424        let bytes = encode(message).unwrap();
425        assert_eq!(&decode(&bytes).unwrap(), message);
426    }
427
428    #[test]
429    fn hello_round_trips_with_version_field() {
430        assert_round_trips(&ProtocolMessage::Hello {
431            protocol_version: Constant::PROTOCOL_VERSION,
432            session: "razel".to_owned(),
433        });
434    }
435
436    #[test]
437    fn channel_message_round_trips_plaintext() {
438        assert_round_trips(&ProtocolMessage::ChannelMsg {
439            channel: "ops".to_owned(),
440            from: SessionPath::new("aaron", "workstation", "razel"),
441            payload: Payload::Plain("hello, agents".to_owned()),
442        });
443    }
444
445    #[test]
446    fn data_frame_round_trips_the_reserved_e2e_envelope() {
447        assert_round_trips(&ProtocolMessage::Whisper {
448            from: SessionPath::new("aaron", "workstation", "razel"),
449            target: SessionPath::new("david", "desktop", "main"),
450            payload: Payload::Encrypted(Envelope {
451                ciphertext: vec![0xDE, 0xAD, 0xBE, 0xEF],
452                key_id: Some("channel-key-1".to_owned()),
453            }),
454        });
455    }
456
457    #[test]
458    fn admin_op_round_trips() {
459        assert_round_trips(&ProtocolMessage::Admin(AdminOp::CreateChannel {
460            name: "ops".to_owned(),
461            visibility: Visibility::Private,
462        }));
463    }
464
465    #[test]
466    fn machine_add_admin_op_round_trips() {
467        assert_round_trips(&ProtocolMessage::Admin(AdminOp::MachineAdd {
468            name: "sno-box".to_owned(),
469            pubkey: vec![1, 2, 3, 4],
470        }));
471    }
472
473    #[test]
474    fn m2_response_frames_round_trip() {
475        assert_round_trips(&ProtocolMessage::ListChannels);
476        assert_round_trips(&ProtocolMessage::ChannelList {
477            channels: vec![ChannelInfo {
478                name: "ops".to_owned(),
479                visibility: Visibility::Private,
480                member: true,
481            }],
482        });
483        assert_round_trips(&ProtocolMessage::Joined { channel: "ops".to_owned() });
484        assert_round_trips(&ProtocolMessage::Ack { detail: Some("ops".to_owned()) });
485        assert_round_trips(&ProtocolMessage::InviteToken { token: "tok-abc".to_owned() });
486        assert_round_trips(&ProtocolMessage::Ping);
487        assert_round_trips(&ProtocolMessage::Pong);
488    }
489
490    #[test]
491    fn m4_frames_round_trip() {
492        assert_round_trips(&ProtocolMessage::ServerInfo { admin: true });
493        assert_round_trips(&ProtocolMessage::ListMachines);
494        assert_round_trips(&ProtocolMessage::MachineList {
495            machines: vec![MachineInfo {
496                name: "workstation".to_owned(),
497                pubkey: "PUBKEY".to_owned(),
498                added_at: "2026-07-02T00:00:00Z".to_owned(),
499            }],
500        });
501        assert_round_trips(&ProtocolMessage::ListUsers);
502        assert_round_trips(&ProtocolMessage::UserList {
503            users: vec!["aaron".to_owned(), "david".to_owned()],
504        });
505    }
506
507    #[test]
508    fn appending_variants_preserves_existing_wire_indices() {
509        // The forward-compat guarantee (§13): an old variant's encoding must be byte-identical
510        // after new variants are appended. `Hello` is the first variant (index 0) and must still
511        // start with a 0 discriminant byte.
512        let hello = ProtocolMessage::Hello {
513            protocol_version: Constant::PROTOCOL_VERSION,
514            session: "razel".to_owned(),
515        };
516        assert_eq!(encode(&hello).unwrap()[0], 0, "the first variant's discriminant must remain 0");
517    }
518
519    #[test]
520    fn error_frame_round_trips() {
521        assert_round_trips(&ProtocolMessage::Error(ProtocolError::VersionMismatch { ours: 1, theirs: 2 }));
522    }
523
524    #[tokio::test]
525    async fn frames_stream_over_an_async_duplex() {
526        let (mut a, mut b) = duplex();
527        let sent = ProtocolMessage::Presence {
528            channel: Some("ops".to_owned()),
529            sessions: vec![SessionPath::new("aaron", "workstation", "razel"), SessionPath::new("david", "desktop", "main")],
530        };
531
532        a.send_message(&sent).await.unwrap();
533        let got = b.recv_message().await.unwrap();
534
535        assert_eq!(got, sent);
536    }
537
538    #[test]
539    fn version_negotiation_accepts_matching_and_rejects_mismatch() {
540        assert_eq!(negotiate_version(Constant::PROTOCOL_VERSION).unwrap(), Constant::PROTOCOL_VERSION);
541        assert_eq!(
542            negotiate_version(999),
543            Err(ProtocolError::VersionMismatch {
544                ours: Constant::PROTOCOL_VERSION,
545                theirs: 999,
546            })
547        );
548    }
549
550    #[tokio::test]
551    async fn recv_rejects_a_frame_larger_than_the_cap() {
552        // A length prefix beyond the cap is rejected before the body is allocated.
553        let oversized = u32::try_from(Constant::MAX_FRAME_SIZE + 1).unwrap();
554        let framed = oversized.to_be_bytes();
555        let mut reader = framed.as_slice();
556
557        assert!(reader.recv_message().await.is_err());
558    }
559}