conclavelib/protocol.rs
1//! Wire frames shared between the bridge and the central server.
2//!
3//! Both peers serialize [`ProtocolMessage`] with a fixed `bincode` configuration behind a
4//! length-delimited framing (the [`ProtocolWrite`] / [`ProtocolRead`] stream extensions). Two
5//! properties are fixed here for forward-compat (DESIGN.md §13), so later additions are additive
6//! rather than breaking:
7//!
8//! - a **protocol-version field** carried in [`ProtocolMessage::Hello`] and checked with
9//! [`negotiate_version`]; peers advertising an incompatible version are rejected,
10//! - an opaque **encrypted-payload envelope + key-id** ([`Payload::Encrypted`]), so end-to-end
11//! encryption (DESIGN.md §19) can be layered in without a wire break.
12//!
13//! `ProtocolError` is the typed, wire-crossing error surfaced as a [`ProtocolMessage::Error`]
14//! frame; application glue elsewhere uses `anyhow` via the `Res` / `Void` aliases.
15
16use std::future::Future;
17
18use anyhow::Context as _;
19use serde::{Deserialize, Serialize};
20use tokio::io::{AsyncRead, AsyncReadExt as _, AsyncWrite, AsyncWriteExt as _};
21
22use crate::base::{Constant, Res, SessionPath, Visibility};
23
24/// An opaque end-to-end-encrypted payload envelope, reserved now for v2 (DESIGN.md §13, §19).
25///
26/// The server fans this out without reading it; `key_id` names the per-channel key the sender
27/// wrapped the content with. Unused in v1 — reserving it keeps E2E an additive change.
28#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
29pub struct Envelope {
30 /// Ciphertext the server relays but cannot read.
31 pub ciphertext: Vec<u8>,
32 /// Identifier of the per-channel key this ciphertext was wrapped with.
33 pub key_id: Option<String>,
34}
35
36/// A message body: plaintext in v1, or the reserved E2E [`Envelope`] in v2.
37#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
38pub enum Payload {
39 /// A plaintext UTF-8 body (v1).
40 Plain(String),
41 /// An opaque end-to-end-encrypted body (v2; the envelope is reserved now).
42 Encrypted(Envelope),
43}
44
45/// A channel as surfaced by discovery ([`ProtocolMessage::ChannelList`], DESIGN.md §6).
46///
47/// Only channels the caller is allowed to see are ever listed, so no private name leaks to a
48/// non-member; `member` marks the ones the caller already belongs to.
49#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
50pub struct ChannelInfo {
51 /// The channel name.
52 pub name: String,
53 /// The visibility tier.
54 pub visibility: Visibility,
55 /// Whether the requesting user is already a member.
56 pub member: bool,
57}
58
59/// An enrolled machine as surfaced by [`ProtocolMessage::MachineList`] (`machine list`, §5.1).
60#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
61pub struct MachineInfo {
62 /// The machine name (unique within the user).
63 pub name: String,
64 /// The machine's public key, base64-encoded.
65 pub pubkey: String,
66 /// RFC 3339 enrollment timestamp.
67 pub added_at: String,
68}
69
70/// An admin / moderation operation (DESIGN.md §7), authorized server-side by user role.
71#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
72pub enum AdminOp {
73 /// Create a channel with a visibility tier.
74 CreateChannel {
75 /// Channel name.
76 name: String,
77 /// Visibility tier.
78 visibility: Visibility,
79 },
80 /// Delete a channel.
81 DeleteChannel {
82 /// Channel name.
83 name: String,
84 },
85 /// Rename a channel.
86 RenameChannel {
87 /// Current name.
88 name: String,
89 /// New name.
90 new_name: String,
91 },
92 /// Change a channel's visibility tier.
93 SetVisibility {
94 /// Channel name.
95 name: String,
96 /// New visibility tier.
97 visibility: Visibility,
98 },
99 /// Add a user to a channel's access-control list.
100 AclAdd {
101 /// Channel name.
102 channel: String,
103 /// Username to add.
104 user: String,
105 },
106 /// Remove a user from a channel's access-control list.
107 AclRemove {
108 /// Channel name.
109 channel: String,
110 /// Username to remove.
111 user: String,
112 },
113 /// Create an invite token for a channel.
114 InviteCreate {
115 /// Channel name.
116 channel: String,
117 /// Maximum redemptions, or unlimited if absent.
118 uses: Option<u32>,
119 /// Lifetime in seconds, or non-expiring if absent.
120 expires_in_secs: Option<u64>,
121 },
122 /// Revoke an invite token.
123 InviteRevoke {
124 /// The token to revoke.
125 token: String,
126 },
127 /// Kick a live session or user from a channel.
128 Kick {
129 /// Channel name.
130 channel: String,
131 /// Session path or username to kick.
132 target: String,
133 },
134 /// Ban a user from a channel.
135 Ban {
136 /// Channel name.
137 channel: String,
138 /// Username to ban.
139 user: String,
140 },
141 /// Remove a user from the server (server-admin).
142 UserRemove {
143 /// Username to remove.
144 username: String,
145 },
146 /// Revoke an enrolled machine (server-admin / self), force-dropping its live sessions.
147 MachineRemove {
148 /// Machine name to revoke.
149 name: String,
150 },
151 /// Enroll a new machine key under the authenticated user (self-service, DESIGN.md §5.1).
152 ///
153 /// Appended after `MachineRemove` so existing variant indices are unchanged (forward-compat).
154 MachineAdd {
155 /// Unique-within-user name for the new machine.
156 name: String,
157 /// The new machine's Ed25519 public key (proves possession on its own first connect).
158 pubkey: Vec<u8>,
159 },
160}
161
162/// The versioned frame exchanged between a bridge and a central server.
163///
164/// Variants are append-only across protocol versions: later milestones may add variants but must
165/// not renumber or repurpose existing ones without a version bump (see [`negotiate_version`]).
166#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
167pub enum ProtocolMessage {
168 /// Client → server on connect: advertise the protocol version and the session handle.
169 Hello {
170 /// The client's protocol version.
171 protocol_version: u32,
172 /// The session handle (`--as`, defaulting to the repo/dir name).
173 session: String,
174 },
175 /// Server → client: a random nonce for the client to sign (challenge-response).
176 Challenge {
177 /// The random nonce.
178 nonce: Vec<u8>,
179 },
180 /// Client → server: the machine public key and its signature over the nonce.
181 Auth {
182 /// The machine's Ed25519 public key.
183 pubkey: Vec<u8>,
184 /// The signature over the server's nonce.
185 signature: Vec<u8>,
186 },
187 /// Server → client: authentication succeeded; the resolved full participant path.
188 Established {
189 /// The resolved `user/machine/session` path.
190 path: SessionPath,
191 },
192 /// Client → server: claim a username and enroll this machine as its first key.
193 Register {
194 /// Username to claim.
195 username: String,
196 /// Machine name for this key.
197 machine: String,
198 /// The machine's Ed25519 public key.
199 pubkey: Vec<u8>,
200 },
201 /// Client → server: join a channel, optionally redeeming an invite token.
202 Join {
203 /// Channel name.
204 channel: String,
205 /// Invite token, if required.
206 token: Option<String>,
207 },
208 /// Client → server: leave a channel.
209 Leave {
210 /// Channel name.
211 channel: String,
212 },
213 /// Client → server: request presence, optionally scoped to one channel.
214 Who {
215 /// Channel to scope to, or all subscribed channels if absent.
216 channel: Option<String>,
217 },
218 /// Client → server: an admin / moderation operation.
219 Admin(AdminOp),
220 /// A message addressed to all sessions subscribed to a channel.
221 ChannelMsg {
222 /// Channel name.
223 channel: String,
224 /// The sender's full participant path.
225 from: SessionPath,
226 /// The message body.
227 payload: Payload,
228 },
229 /// A direct message to exactly one session path.
230 Whisper {
231 /// The sender's full participant path.
232 from: SessionPath,
233 /// The single recipient's full participant path.
234 target: SessionPath,
235 /// The message body.
236 payload: Payload,
237 },
238 /// Server → client: presence enumerated as full session paths.
239 Presence {
240 /// Channel the presence is scoped to, or server-wide if absent.
241 channel: Option<String>,
242 /// The present sessions.
243 sessions: Vec<SessionPath>,
244 },
245 /// A typed error surfaced to the peer that triggered it.
246 Error(ProtocolError),
247 // ---------------------------------------------------------------------
248 // M2 additions — appended after `Error` so every existing variant keeps
249 // its wire index (the append-only, forward-compat discipline of §13).
250 // ---------------------------------------------------------------------
251 /// Client → server: request the channels visible to the authenticated user (discovery).
252 ListChannels,
253 /// Server → client: the discovery result, already visibility-gated.
254 ChannelList {
255 /// The channels the caller may see.
256 channels: Vec<ChannelInfo>,
257 },
258 /// Server → client: a [`ProtocolMessage::Join`] succeeded; the session is now subscribed.
259 Joined {
260 /// The channel that was joined.
261 channel: String,
262 },
263 /// Server → client: a control / admin operation succeeded, with an optional human detail.
264 Ack {
265 /// A short human-readable detail (e.g. the affected name), if any.
266 detail: Option<String>,
267 },
268 /// Server → client: the token minted by an [`AdminOp::InviteCreate`].
269 InviteToken {
270 /// The opaque invite token.
271 token: String,
272 },
273 /// Client → server: liveness keepalive; refreshes presence and draws a [`ProtocolMessage::Pong`]
274 /// (the application-level realization of the §10 heartbeat, uniform across transports).
275 Ping,
276 /// Server → client: keepalive acknowledgement.
277 Pong,
278 // ---------------------------------------------------------------------
279 // M4 additions — appended (forward-compat): machine / user listing and the
280 // post-auth server-role signal that gates the bridge's admin tools.
281 // ---------------------------------------------------------------------
282 /// Server → client, immediately after [`ProtocolMessage::Established`]: the authenticated user's
283 /// server-wide role, so the bridge can gate its admin tools (DESIGN.md §7).
284 ServerInfo {
285 /// Whether the user is a server admin (on the serve-config allowlist).
286 admin: bool,
287 },
288 /// Client → server: list the machines enrolled under the authenticated user.
289 ListMachines,
290 /// Server → client: the caller's enrolled machines.
291 MachineList {
292 /// The machines under the caller's account.
293 machines: Vec<MachineInfo>,
294 },
295 /// Client → server: list the server's users (server-admin only).
296 ListUsers,
297 /// Server → client: the registered usernames (server-admin only).
298 UserList {
299 /// The registered usernames.
300 users: Vec<String>,
301 },
302}
303
304/// Errors that cross the wire as a [`ProtocolMessage::Error`] frame and are matched on by the
305/// server and bridge (DESIGN.md §16). Application glue elsewhere uses `anyhow` via `Res` / `Void`.
306// The public name `ProtocolError` is fixed by DESIGN.md §13 / §22; `module_name_repetitions` is a
307// false positive against that mandated vocabulary (the sibling ratrod uses the same name).
308#[allow(clippy::module_name_repetitions)]
309#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize, thiserror::Error)]
310pub enum ProtocolError {
311 /// The peer's protocol version is incompatible with ours.
312 #[error("incompatible protocol version: ours={ours}, theirs={theirs}")]
313 VersionMismatch {
314 /// This build's protocol version.
315 ours: u32,
316 /// The peer's advertised version.
317 theirs: u32,
318 },
319 /// A frame could not be decoded, or violated the schema.
320 #[error("malformed frame: {0}")]
321 MalformedFrame(String),
322 /// The operation was denied (authentication or authorization).
323 #[error("unauthorized: {0}")]
324 Unauthorized(String),
325 /// The named channel, session, or target does not exist / is not visible.
326 #[error("not found: {0}")]
327 NotFound(String),
328 /// An unexpected server-side error.
329 #[error("internal error: {0}")]
330 Internal(String),
331}
332
333/// Returns this build's protocol version if `theirs` is compatible, else a [`ProtocolError::VersionMismatch`].
334///
335/// v1 speaks exactly one version, so compatibility is equality; a later minor-compatible range
336/// widens this without changing the call sites.
337///
338/// # Errors
339///
340/// Returns [`ProtocolError::VersionMismatch`] when the peer's version is not compatible.
341pub fn negotiate_version(theirs: u32) -> Result<u32, ProtocolError> {
342 if theirs == Constant::PROTOCOL_VERSION {
343 Ok(Constant::PROTOCOL_VERSION)
344 } else {
345 Err(ProtocolError::VersionMismatch { ours: Constant::PROTOCOL_VERSION, theirs })
346 }
347}
348
349/// Encodes a frame to its wire bytes with the fixed codec configuration.
350///
351/// # Errors
352///
353/// Returns an error if the frame cannot be serialized.
354pub fn encode(message: &ProtocolMessage) -> Res<Vec<u8>> {
355 bincode::serde::encode_to_vec(message, bincode::config::standard()).context("failed to encode protocol frame")
356}
357
358/// Decodes a frame from its wire bytes with the fixed codec configuration.
359///
360/// # Errors
361///
362/// Returns an error if the bytes are not a valid encoded frame.
363pub fn decode(bytes: &[u8]) -> Res<ProtocolMessage> {
364 let (message, _) = bincode::serde::decode_from_slice(bytes, bincode::config::standard()).context("failed to decode protocol frame")?;
365 Ok(message)
366}
367
368/// Length-delimited sending of protocol frames over any async writer.
369pub trait ProtocolWrite: AsyncWrite + Unpin {
370 /// Encodes `message` and writes it as a `u32`-length-prefixed frame, then flushes.
371 ///
372 /// # Errors
373 ///
374 /// Returns an error if the frame cannot be encoded, exceeds `u32` in length, or the write fails.
375 fn send_message(&mut self, message: &ProtocolMessage) -> impl Future<Output = Res<()>> {
376 async move {
377 let body = encode(message)?;
378 let len = u32::try_from(body.len()).context("protocol frame exceeds u32 length")?;
379 self.write_all(&len.to_be_bytes()).await?;
380 self.write_all(&body).await?;
381 self.flush().await?;
382 Ok(())
383 }
384 }
385}
386
387impl<T: AsyncWrite + Unpin + ?Sized> ProtocolWrite for T {}
388
389/// Length-delimited receiving of protocol frames over any async reader.
390pub trait ProtocolRead: AsyncRead + Unpin {
391 /// Reads one `u32`-length-prefixed frame and decodes it.
392 ///
393 /// # Errors
394 ///
395 /// Returns an error on EOF / read failure, a length prefix beyond [`Constant::MAX_FRAME_SIZE`],
396 /// or a body that does not decode.
397 fn recv_message(&mut self) -> impl Future<Output = Res<ProtocolMessage>> {
398 async move {
399 let mut len_buf = [0_u8; 4];
400 self.read_exact(&mut len_buf).await?;
401 let len = usize::try_from(u32::from_be_bytes(len_buf)).context("frame length overflow")?;
402
403 anyhow::ensure!(len <= Constant::MAX_FRAME_SIZE, "protocol frame of {len} bytes exceeds the {} byte cap", Constant::MAX_FRAME_SIZE);
404
405 let mut body = vec![0_u8; len];
406 self.read_exact(&mut body).await?;
407 decode(&body)
408 }
409 }
410}
411
412impl<T: AsyncRead + Unpin + ?Sized> ProtocolRead for T {}
413
414#[cfg(test)]
415mod tests {
416 // Tests relax `unwrap_used` (house convention; DESIGN.md §22).
417 #![allow(clippy::unwrap_used)]
418
419 use super::*;
420 use crate::tests::duplex;
421 use pretty_assertions::assert_eq;
422
423 fn assert_round_trips(message: &ProtocolMessage) {
424 let bytes = encode(message).unwrap();
425 assert_eq!(&decode(&bytes).unwrap(), message);
426 }
427
428 #[test]
429 fn hello_round_trips_with_version_field() {
430 assert_round_trips(&ProtocolMessage::Hello {
431 protocol_version: Constant::PROTOCOL_VERSION,
432 session: "razel".to_owned(),
433 });
434 }
435
436 #[test]
437 fn channel_message_round_trips_plaintext() {
438 assert_round_trips(&ProtocolMessage::ChannelMsg {
439 channel: "ops".to_owned(),
440 from: SessionPath::new("aaron", "workstation", "razel"),
441 payload: Payload::Plain("hello, agents".to_owned()),
442 });
443 }
444
445 #[test]
446 fn data_frame_round_trips_the_reserved_e2e_envelope() {
447 assert_round_trips(&ProtocolMessage::Whisper {
448 from: SessionPath::new("aaron", "workstation", "razel"),
449 target: SessionPath::new("david", "desktop", "main"),
450 payload: Payload::Encrypted(Envelope {
451 ciphertext: vec![0xDE, 0xAD, 0xBE, 0xEF],
452 key_id: Some("channel-key-1".to_owned()),
453 }),
454 });
455 }
456
457 #[test]
458 fn admin_op_round_trips() {
459 assert_round_trips(&ProtocolMessage::Admin(AdminOp::CreateChannel {
460 name: "ops".to_owned(),
461 visibility: Visibility::Private,
462 }));
463 }
464
465 #[test]
466 fn machine_add_admin_op_round_trips() {
467 assert_round_trips(&ProtocolMessage::Admin(AdminOp::MachineAdd {
468 name: "sno-box".to_owned(),
469 pubkey: vec![1, 2, 3, 4],
470 }));
471 }
472
473 #[test]
474 fn m2_response_frames_round_trip() {
475 assert_round_trips(&ProtocolMessage::ListChannels);
476 assert_round_trips(&ProtocolMessage::ChannelList {
477 channels: vec![ChannelInfo {
478 name: "ops".to_owned(),
479 visibility: Visibility::Private,
480 member: true,
481 }],
482 });
483 assert_round_trips(&ProtocolMessage::Joined { channel: "ops".to_owned() });
484 assert_round_trips(&ProtocolMessage::Ack { detail: Some("ops".to_owned()) });
485 assert_round_trips(&ProtocolMessage::InviteToken { token: "tok-abc".to_owned() });
486 assert_round_trips(&ProtocolMessage::Ping);
487 assert_round_trips(&ProtocolMessage::Pong);
488 }
489
490 #[test]
491 fn m4_frames_round_trip() {
492 assert_round_trips(&ProtocolMessage::ServerInfo { admin: true });
493 assert_round_trips(&ProtocolMessage::ListMachines);
494 assert_round_trips(&ProtocolMessage::MachineList {
495 machines: vec![MachineInfo {
496 name: "workstation".to_owned(),
497 pubkey: "PUBKEY".to_owned(),
498 added_at: "2026-07-02T00:00:00Z".to_owned(),
499 }],
500 });
501 assert_round_trips(&ProtocolMessage::ListUsers);
502 assert_round_trips(&ProtocolMessage::UserList {
503 users: vec!["aaron".to_owned(), "david".to_owned()],
504 });
505 }
506
507 #[test]
508 fn appending_variants_preserves_existing_wire_indices() {
509 // The forward-compat guarantee (§13): an old variant's encoding must be byte-identical
510 // after new variants are appended. `Hello` is the first variant (index 0) and must still
511 // start with a 0 discriminant byte.
512 let hello = ProtocolMessage::Hello {
513 protocol_version: Constant::PROTOCOL_VERSION,
514 session: "razel".to_owned(),
515 };
516 assert_eq!(encode(&hello).unwrap()[0], 0, "the first variant's discriminant must remain 0");
517 }
518
519 #[test]
520 fn error_frame_round_trips() {
521 assert_round_trips(&ProtocolMessage::Error(ProtocolError::VersionMismatch { ours: 1, theirs: 2 }));
522 }
523
524 #[tokio::test]
525 async fn frames_stream_over_an_async_duplex() {
526 let (mut a, mut b) = duplex();
527 let sent = ProtocolMessage::Presence {
528 channel: Some("ops".to_owned()),
529 sessions: vec![SessionPath::new("aaron", "workstation", "razel"), SessionPath::new("david", "desktop", "main")],
530 };
531
532 a.send_message(&sent).await.unwrap();
533 let got = b.recv_message().await.unwrap();
534
535 assert_eq!(got, sent);
536 }
537
538 #[test]
539 fn version_negotiation_accepts_matching_and_rejects_mismatch() {
540 assert_eq!(negotiate_version(Constant::PROTOCOL_VERSION).unwrap(), Constant::PROTOCOL_VERSION);
541 assert_eq!(
542 negotiate_version(999),
543 Err(ProtocolError::VersionMismatch {
544 ours: Constant::PROTOCOL_VERSION,
545 theirs: 999,
546 })
547 );
548 }
549
550 #[tokio::test]
551 async fn recv_rejects_a_frame_larger_than_the_cap() {
552 // A length prefix beyond the cap is rejected before the body is allocated.
553 let oversized = u32::try_from(Constant::MAX_FRAME_SIZE + 1).unwrap();
554 let framed = oversized.to_be_bytes();
555 let mut reader = framed.as_slice();
556
557 assert!(reader.recv_message().await.is_err());
558 }
559}