Skip to main content

microsandbox_protocol/
message.rs

1//! Message envelope and type definitions for the agent protocol.
2
3use serde::{Deserialize, Serialize, de::DeserializeOwned};
4
5use crate::error::ProtocolResult;
6
7//--------------------------------------------------------------------------------------------------
8// Constants
9//--------------------------------------------------------------------------------------------------
10
11/// Current protocol version.
12pub const PROTOCOL_VERSION: u8 = 5;
13
14/// Frame flag: this is the last message for the given correlation ID.
15///
16/// Set on terminal message types such as `ExecExited`, `FsResponse`, and `TcpClosed`.
17pub const FLAG_TERMINAL: u8 = 0b0000_0001;
18
19/// Frame flag: this is the first message of a new session.
20///
21/// Set on session-initiating message types such as `ExecRequest`, `FsRequest`, and `TcpConnect`.
22pub const FLAG_SESSION_START: u8 = 0b0000_0010;
23
24/// Frame flag: this message requests sandbox shutdown.
25///
26/// Set on `Shutdown` messages. The sandbox-process relay uses this to trigger
27/// drain escalation (SIGTERM → SIGKILL) if the guest doesn't exit voluntarily.
28pub const FLAG_SHUTDOWN: u8 = 0b0000_0100;
29
30/// Size of the frame header fields that sit between the length prefix and the
31/// CBOR payload: `[id: u32 BE][flags: u8]` = 5 bytes.
32pub const FRAME_HEADER_SIZE: usize = 5;
33
34//--------------------------------------------------------------------------------------------------
35// Types
36//--------------------------------------------------------------------------------------------------
37
38/// The message envelope sent over the wire.
39///
40/// Each message contains a version, type, correlation ID, flags, and a CBOR payload.
41///
42/// Wire format: `[len: u32 BE][id: u32 BE][flags: u8][CBOR(v, t, p)]`
43///
44/// The `id` and `flags` fields live in the binary frame header (outside CBOR)
45/// so that relay intermediaries can route frames without CBOR parsing.
46#[derive(Debug, Clone, Serialize, Deserialize)]
47pub struct Message {
48    /// Protocol generation, echoed into the frame.
49    ///
50    /// This is the single protocol version axis (see `VERSIONING.md`), the same
51    /// number negotiated once at the handshake — not a second, message-local
52    /// version. It is carried here so a frame is self-describing for debugging
53    /// and telemetry; behavior is gated on the negotiated generation, not on
54    /// reading this field per message.
55    pub v: u8,
56
57    /// Message type.
58    pub t: MessageType,
59
60    /// Correlation ID used to associate requests with responses and
61    /// to identify exec sessions.
62    ///
63    /// Serialized in the binary frame header, not in CBOR.
64    #[serde(skip)]
65    pub id: u32,
66
67    /// Frame flags computed from the message type.
68    ///
69    /// Serialized in the binary frame header, not in CBOR.
70    #[serde(skip)]
71    pub flags: u8,
72
73    /// The CBOR-encoded payload bytes.
74    #[serde(with = "serde_bytes")]
75    pub p: Vec<u8>,
76}
77
78/// Identifies the type of a protocol message.
79///
80/// The `#[strum(serialize = ...)]` attribute on each variant is the single
81/// source for its wire string: [`as_str`](Self::as_str) and
82/// [`from_wire_str`](Self::from_wire_str) are derived from it, and
83/// [`strum::IntoEnumIterator`] yields every variant for exhaustive iteration
84/// (the schema snapshot) without a hand-maintained list.
85#[derive(
86    Debug,
87    Clone,
88    Copy,
89    PartialEq,
90    Eq,
91    Hash,
92    strum::IntoStaticStr,
93    strum::EnumString,
94    strum::EnumIter,
95)]
96pub enum MessageType {
97    /// Guest agent is ready.
98    #[strum(serialize = "core.ready")]
99    Ready,
100
101    /// Guest reports init context before user mounts.
102    #[strum(serialize = "core.init.resolved")]
103    InitResolved,
104
105    /// Host acknowledges init-context setup.
106    #[strum(serialize = "core.init.ack")]
107    InitAck,
108
109    /// Host requests shutdown.
110    #[strum(serialize = "core.shutdown")]
111    Shutdown,
112
113    /// Host relay reports that one SDK client disconnected.
114    #[strum(serialize = "core.relay.client.disconnected")]
115    RelayClientDisconnected,
116
117    /// Host asks the guest to synchronize `CLOCK_REALTIME`.
118    #[strum(serialize = "core.clock.sync")]
119    ClockSync,
120
121    /// Peer reports a recoverable protocol-level error.
122    #[strum(serialize = "core.error")]
123    CoreError,
124
125    /// Host requests command execution.
126    #[strum(serialize = "core.exec.request")]
127    ExecRequest,
128
129    /// Guest confirms command started.
130    #[strum(serialize = "core.exec.started")]
131    ExecStarted,
132
133    /// Host sends stdin data.
134    #[strum(serialize = "core.exec.stdin")]
135    ExecStdin,
136
137    /// Guest reports that a prior `ExecStdin` write to the child's
138    /// stdin failed (e.g. the child closed its read end). Non-terminal:
139    /// the session continues and may still produce stdout/stderr and
140    /// an exit code.
141    #[strum(serialize = "core.exec.stdin.error")]
142    ExecStdinError,
143
144    /// Guest sends stdout data.
145    #[strum(serialize = "core.exec.stdout")]
146    ExecStdout,
147
148    /// Guest sends stderr data.
149    #[strum(serialize = "core.exec.stderr")]
150    ExecStderr,
151
152    /// Guest reports command exit.
153    #[strum(serialize = "core.exec.exited")]
154    ExecExited,
155
156    /// Guest reports command failed to spawn (binary not found,
157    /// permission denied, etc.). Distinct from `ExecExited` —
158    /// `ExecFailed` means the user code never ran. Terminal.
159    #[strum(serialize = "core.exec.failed")]
160    ExecFailed,
161
162    /// Host requests PTY resize.
163    #[strum(serialize = "core.exec.resize")]
164    ExecResize,
165
166    /// Host sends signal to process.
167    #[strum(serialize = "core.exec.signal")]
168    ExecSignal,
169
170    /// Host requests a filesystem operation.
171    #[strum(serialize = "core.fs.request")]
172    FsRequest,
173
174    /// Guest sends a terminal filesystem response.
175    #[strum(serialize = "core.fs.response")]
176    FsResponse,
177
178    /// Streaming file data chunk (bidirectional).
179    #[strum(serialize = "core.fs.data")]
180    FsData,
181
182    /// Host requests a TCP connection from inside the guest.
183    #[strum(serialize = "core.tcp.connect")]
184    TcpConnect,
185
186    /// Guest confirms that a TCP connection was opened.
187    #[strum(serialize = "core.tcp.connected")]
188    TcpConnected,
189
190    /// TCP stream data chunk (bidirectional).
191    #[strum(serialize = "core.tcp.data")]
192    TcpData,
193
194    /// One TCP stream side has closed its write half.
195    #[strum(serialize = "core.tcp.eof")]
196    TcpEof,
197
198    /// Host requests a TCP session close.
199    #[strum(serialize = "core.tcp.close")]
200    TcpClose,
201
202    /// Guest reports that a TCP session is closed. Terminal.
203    #[strum(serialize = "core.tcp.closed")]
204    TcpClosed,
205
206    /// Guest reports that a TCP session failed. Terminal.
207    #[strum(serialize = "core.tcp.failed")]
208    TcpFailed,
209}
210
211//--------------------------------------------------------------------------------------------------
212// Methods
213//--------------------------------------------------------------------------------------------------
214
215impl Message {
216    /// Creates a new message with the current protocol version and raw payload bytes.
217    pub fn new(t: MessageType, id: u32, p: Vec<u8>) -> Self {
218        let flags = t.flags();
219        Self {
220            v: PROTOCOL_VERSION,
221            t,
222            id,
223            flags,
224            p,
225        }
226    }
227
228    /// Creates a new message by serializing the given payload to CBOR.
229    pub fn with_payload<T: Serialize>(
230        t: MessageType,
231        id: u32,
232        payload: &T,
233    ) -> ProtocolResult<Self> {
234        let mut p = Vec::new();
235        ciborium::into_writer(payload, &mut p)?;
236        let flags = t.flags();
237        Ok(Self {
238            v: PROTOCOL_VERSION,
239            t,
240            id,
241            flags,
242            p,
243        })
244    }
245
246    /// Deserializes the payload bytes into the given type.
247    pub fn payload<T: DeserializeOwned>(&self) -> ProtocolResult<T> {
248        Ok(ciborium::from_reader(&self.p[..])?)
249    }
250}
251
252impl MessageType {
253    /// Computes the frame flags byte for this message type.
254    pub fn flags(&self) -> u8 {
255        match self {
256            Self::CoreError
257            | Self::ExecExited
258            | Self::ExecFailed
259            | Self::FsResponse
260            | Self::TcpClosed
261            | Self::TcpFailed => FLAG_TERMINAL,
262            Self::ExecRequest | Self::FsRequest | Self::TcpConnect => FLAG_SESSION_START,
263            Self::Shutdown => FLAG_SHUTDOWN,
264            _ => 0,
265        }
266    }
267
268    /// The protocol generation that introduced this message type.
269    ///
270    /// A per-type label on the single protocol generation axis (see
271    /// `VERSIONING.md`), not a separate version counter. The send path gates on
272    /// it: a type whose generation exceeds the peer's negotiated generation is
273    /// rejected locally with a typed error instead of being sent to a peer that
274    /// cannot handle it, so only that one feature fails rather than the session.
275    ///
276    /// Core and exec types belong to the generation-1 baseline; they work on
277    /// every runtime we still talk to, including the pre-0.5 legacy one.
278    /// Filesystem streaming did not exist in the pre-0.5 legacy protocol
279    /// (generation 1), so the `Fs*` types require generation 2 or newer.
280    /// TCP forwarding was introduced in generation 4. `core.error` was
281    /// introduced in generation 5.
282    ///
283    /// There is deliberately no wildcard arm: adding a new `MessageType` must
284    /// force a conscious choice of the generation that introduced it (and a
285    /// matching `PROTOCOL_VERSION` bump). Message types are append-only — never
286    /// lower or re-purpose an existing value.
287    pub fn min_protocol_version(&self) -> u8 {
288        match self {
289            Self::Ready
290            | Self::InitResolved
291            | Self::InitAck
292            | Self::Shutdown
293            | Self::RelayClientDisconnected
294            | Self::ClockSync
295            | Self::ExecRequest
296            | Self::ExecStarted
297            | Self::ExecStdin
298            | Self::ExecStdinError
299            | Self::ExecStdout
300            | Self::ExecStderr
301            | Self::ExecExited
302            | Self::ExecFailed
303            | Self::ExecResize
304            | Self::ExecSignal => 1,
305            Self::FsRequest | Self::FsResponse | Self::FsData => 2,
306            Self::CoreError => 5,
307            Self::TcpConnect
308            | Self::TcpConnected
309            | Self::TcpData
310            | Self::TcpEof
311            | Self::TcpClose
312            | Self::TcpClosed
313            | Self::TcpFailed => 4,
314        }
315    }
316
317    /// Whether a peer that speaks `peer_generation` is new enough to handle this
318    /// message type.
319    ///
320    /// The shared version-compatibility primitive for both directions. The host
321    /// gates its sends on it (`AgentClient::ensure_version_compat`); the guest
322    /// can gate a guest-initiated message the same way, reading the peer's
323    /// generation from the `v` field of the request that established the session.
324    /// See `VERSIONING.md`.
325    pub fn is_available_at(&self, peer_generation: u8) -> bool {
326        self.min_protocol_version() <= peer_generation
327    }
328
329    /// Returns the wire string representation.
330    ///
331    /// Backed by the per-variant `#[strum(serialize = ...)]` attribute, the
332    /// single source of truth for wire strings.
333    pub fn as_str(&self) -> &'static str {
334        (*self).into()
335    }
336
337    /// Parses a wire string into a message type, the inverse of
338    /// [`as_str`](Self::as_str). Returns `None` for an unknown string.
339    pub fn from_wire_str(s: &str) -> Option<Self> {
340        s.parse().ok()
341    }
342}
343
344//--------------------------------------------------------------------------------------------------
345// Trait Implementations
346//--------------------------------------------------------------------------------------------------
347
348impl Serialize for MessageType {
349    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
350    where
351        S: serde::Serializer,
352    {
353        serializer.serialize_str(self.as_str())
354    }
355}
356
357impl<'de> Deserialize<'de> for MessageType {
358    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
359    where
360        D: serde::Deserializer<'de>,
361    {
362        let s = String::deserialize(deserializer)?;
363        Self::from_wire_str(&s)
364            .ok_or_else(|| serde::de::Error::custom(format!("unknown message type: {s}")))
365    }
366}
367
368//--------------------------------------------------------------------------------------------------
369// Tests
370//--------------------------------------------------------------------------------------------------
371
372#[cfg(test)]
373mod tests {
374    use super::*;
375
376    #[test]
377    fn test_message_type_roundtrip() {
378        let types = [
379            (MessageType::Ready, "core.ready"),
380            (MessageType::InitResolved, "core.init.resolved"),
381            (MessageType::InitAck, "core.init.ack"),
382            (MessageType::Shutdown, "core.shutdown"),
383            (
384                MessageType::RelayClientDisconnected,
385                "core.relay.client.disconnected",
386            ),
387            (MessageType::ClockSync, "core.clock.sync"),
388            (MessageType::CoreError, "core.error"),
389            (MessageType::ExecRequest, "core.exec.request"),
390            (MessageType::ExecStarted, "core.exec.started"),
391            (MessageType::ExecStdin, "core.exec.stdin"),
392            (MessageType::ExecStdinError, "core.exec.stdin.error"),
393            (MessageType::ExecStdout, "core.exec.stdout"),
394            (MessageType::ExecStderr, "core.exec.stderr"),
395            (MessageType::ExecExited, "core.exec.exited"),
396            (MessageType::ExecFailed, "core.exec.failed"),
397            (MessageType::ExecResize, "core.exec.resize"),
398            (MessageType::ExecSignal, "core.exec.signal"),
399            (MessageType::FsRequest, "core.fs.request"),
400            (MessageType::FsResponse, "core.fs.response"),
401            (MessageType::FsData, "core.fs.data"),
402            (MessageType::TcpConnect, "core.tcp.connect"),
403            (MessageType::TcpConnected, "core.tcp.connected"),
404            (MessageType::TcpData, "core.tcp.data"),
405            (MessageType::TcpEof, "core.tcp.eof"),
406            (MessageType::TcpClose, "core.tcp.close"),
407            (MessageType::TcpClosed, "core.tcp.closed"),
408            (MessageType::TcpFailed, "core.tcp.failed"),
409        ];
410
411        for (mt, expected_str) in &types {
412            assert_eq!(mt.as_str(), *expected_str);
413            assert_eq!(MessageType::from_wire_str(expected_str).unwrap(), *mt);
414        }
415    }
416
417    #[test]
418    fn test_message_type_serde_roundtrip() {
419        let types = [
420            MessageType::Ready,
421            MessageType::InitResolved,
422            MessageType::InitAck,
423            MessageType::Shutdown,
424            MessageType::RelayClientDisconnected,
425            MessageType::ClockSync,
426            MessageType::CoreError,
427            MessageType::ExecRequest,
428            MessageType::ExecStarted,
429            MessageType::ExecStdin,
430            MessageType::ExecStdinError,
431            MessageType::ExecStdout,
432            MessageType::ExecStderr,
433            MessageType::ExecExited,
434            MessageType::ExecFailed,
435            MessageType::ExecResize,
436            MessageType::ExecSignal,
437            MessageType::FsRequest,
438            MessageType::FsResponse,
439            MessageType::FsData,
440            MessageType::TcpConnect,
441            MessageType::TcpConnected,
442            MessageType::TcpData,
443            MessageType::TcpEof,
444            MessageType::TcpClose,
445            MessageType::TcpClosed,
446            MessageType::TcpFailed,
447        ];
448
449        for mt in &types {
450            let mut buf = Vec::new();
451            ciborium::into_writer(mt, &mut buf).unwrap();
452            let decoded: MessageType = ciborium::from_reader(&buf[..]).unwrap();
453            assert_eq!(&decoded, mt);
454        }
455    }
456
457    #[test]
458    fn test_unknown_message_type() {
459        assert!(MessageType::from_wire_str("core.unknown").is_none());
460    }
461
462    #[test]
463    fn test_message_with_payload_roundtrip() {
464        use crate::exec::ExecExited;
465
466        let msg =
467            Message::with_payload(MessageType::ExecExited, 7, &ExecExited { code: 42 }).unwrap();
468
469        assert_eq!(msg.t, MessageType::ExecExited);
470        assert_eq!(msg.id, 7);
471        assert_eq!(msg.flags, FLAG_TERMINAL);
472
473        let payload: ExecExited = msg.payload().unwrap();
474        assert_eq!(payload.code, 42);
475    }
476
477    #[test]
478    fn test_message_type_flags() {
479        assert_eq!(MessageType::ExecExited.flags(), FLAG_TERMINAL);
480        assert_eq!(MessageType::ExecFailed.flags(), FLAG_TERMINAL);
481        assert_eq!(MessageType::FsResponse.flags(), FLAG_TERMINAL);
482        assert_eq!(MessageType::TcpClosed.flags(), FLAG_TERMINAL);
483        assert_eq!(MessageType::TcpFailed.flags(), FLAG_TERMINAL);
484        assert_eq!(MessageType::ExecRequest.flags(), FLAG_SESSION_START);
485        assert_eq!(MessageType::FsRequest.flags(), FLAG_SESSION_START);
486        assert_eq!(MessageType::TcpConnect.flags(), FLAG_SESSION_START);
487        assert_eq!(MessageType::Ready.flags(), 0);
488        assert_eq!(MessageType::InitResolved.flags(), 0);
489        assert_eq!(MessageType::InitAck.flags(), 0);
490        assert_eq!(MessageType::Shutdown.flags(), FLAG_SHUTDOWN);
491        assert_eq!(MessageType::ClockSync.flags(), 0);
492        assert_eq!(MessageType::ExecStarted.flags(), 0);
493        assert_eq!(MessageType::ExecStdin.flags(), 0);
494        assert_eq!(MessageType::ExecStdout.flags(), 0);
495        assert_eq!(MessageType::ExecStderr.flags(), 0);
496        assert_eq!(MessageType::ExecResize.flags(), 0);
497        assert_eq!(MessageType::ExecSignal.flags(), 0);
498        assert_eq!(MessageType::FsData.flags(), 0);
499        assert_eq!(MessageType::TcpConnected.flags(), 0);
500        assert_eq!(MessageType::TcpData.flags(), 0);
501        assert_eq!(MessageType::TcpEof.flags(), 0);
502        assert_eq!(MessageType::TcpClose.flags(), 0);
503    }
504
505    #[test]
506    fn test_additive_fields_keep_old_and_new_compatible() {
507        // The core backward-compatibility guarantee from VERSIONING.md: a new,
508        // always-optional field is safe in both directions across a version skew.
509        use serde::{Deserialize, Serialize};
510
511        // A payload as it existed at an older generation.
512        #[derive(Serialize, Deserialize)]
513        struct Old {
514            a: u32,
515            b: u32,
516        }
517
518        // The same payload after a later generation added `c` (optional).
519        #[derive(Serialize, Deserialize, Debug, PartialEq)]
520        struct New {
521            a: u32,
522            b: u32,
523            #[serde(default)]
524            c: u32,
525        }
526
527        // New sender -> old receiver: the unknown `c` is ignored, not an error.
528        let mut new_bytes = Vec::new();
529        ciborium::into_writer(&New { a: 1, b: 2, c: 3 }, &mut new_bytes).unwrap();
530        let as_old: Old = ciborium::from_reader(&new_bytes[..]).unwrap();
531        assert_eq!((as_old.a, as_old.b), (1, 2));
532
533        // Old sender -> new receiver: the missing `c` falls back to its default.
534        let mut old_bytes = Vec::new();
535        ciborium::into_writer(&Old { a: 1, b: 2 }, &mut old_bytes).unwrap();
536        let as_new: New = ciborium::from_reader(&old_bytes[..]).unwrap();
537        assert_eq!(as_new, New { a: 1, b: 2, c: 0 });
538    }
539
540    #[test]
541    fn test_is_available_at() {
542        // Exec is in the generation-1 baseline: available to every peer.
543        assert!(MessageType::ExecRequest.is_available_at(1));
544        assert!(MessageType::ExecRequest.is_available_at(2));
545        assert!(MessageType::ExecRequest.is_available_at(PROTOCOL_VERSION));
546        // Filesystem requires generation 2: unavailable to a legacy (gen 1) peer.
547        assert!(!MessageType::FsRequest.is_available_at(1));
548        assert!(MessageType::FsRequest.is_available_at(2));
549        assert!(MessageType::FsRequest.is_available_at(PROTOCOL_VERSION));
550    }
551
552    #[test]
553    fn test_min_protocol_version_per_type() {
554        // Core and exec types are the generation-1 baseline: usable on every
555        // runtime we still talk to, including the pre-0.5 legacy one.
556        let baseline = [
557            MessageType::Ready,
558            MessageType::InitResolved,
559            MessageType::InitAck,
560            MessageType::Shutdown,
561            MessageType::RelayClientDisconnected,
562            MessageType::ClockSync,
563            MessageType::ExecRequest,
564            MessageType::ExecStarted,
565            MessageType::ExecStdin,
566            MessageType::ExecStdinError,
567            MessageType::ExecStdout,
568            MessageType::ExecStderr,
569            MessageType::ExecExited,
570            MessageType::ExecFailed,
571            MessageType::ExecResize,
572            MessageType::ExecSignal,
573        ];
574        for mt in &baseline {
575            assert_eq!(mt.min_protocol_version(), 1, "{mt:?} should be v1 baseline");
576        }
577
578        // Filesystem streaming did not exist in the pre-0.5 legacy protocol, so
579        // these require a post-legacy generation.
580        for mt in [
581            MessageType::FsRequest,
582            MessageType::FsResponse,
583            MessageType::FsData,
584        ] {
585            assert_eq!(mt.min_protocol_version(), 2, "{mt:?} should require gen 2");
586        }
587
588        // Every current type must be sendable to a current peer.
589        assert!(MessageType::FsRequest.min_protocol_version() <= PROTOCOL_VERSION);
590    }
591
592    #[test]
593    fn test_message_new_computes_flags() {
594        let msg = Message::new(MessageType::ExecRequest, 1, Vec::new());
595        assert_eq!(msg.flags, FLAG_SESSION_START);
596
597        let msg = Message::new(MessageType::ExecStdout, 1, Vec::new());
598        assert_eq!(msg.flags, 0);
599    }
600}