Skip to main content

smolvm_protocol/
lib.rs

1//! Protocol types for smolvm host-guest communication.
2//!
3//! This crate defines the wire protocol for vsock communication between
4//! the smolvm host and the guest agent (smolvm-agent).
5//!
6//! # Protocol Overview
7//!
8//! Communication uses JSON-encoded messages over vsock. Each message is
9//! prefixed with a 4-byte big-endian length header.
10//!
11//! ```text
12//! +----------------+-------------------+
13//! | Length (4 BE)  | JSON payload      |
14//! +----------------+-------------------+
15//! ```
16
17#![deny(missing_docs)]
18
19use serde::{Deserialize, Serialize};
20
21pub mod retry;
22
23/// Serde helper for encoding `Vec<u8>` as a base64 string in JSON.
24///
25/// Without this, serde_json serializes `Vec<u8>` as a JSON array of numbers
26/// (e.g., `[104,101,108,108,111]`), which inflates binary data by ~4x.
27/// Base64 encoding reduces this to ~1.33x.
28pub mod base64_bytes {
29    use base64::{engine::general_purpose::STANDARD, Engine};
30    use serde::{Deserialize, Deserializer, Serializer};
31
32    /// Serialize `Vec<u8>` as a base64 string.
33    pub fn serialize<S: Serializer>(data: &[u8], serializer: S) -> Result<S::Ok, S::Error> {
34        serializer.serialize_str(&STANDARD.encode(data))
35    }
36
37    /// Deserialize a base64 string into `Vec<u8>`.
38    pub fn deserialize<'de, D: Deserializer<'de>>(deserializer: D) -> Result<Vec<u8>, D::Error> {
39        let s = String::deserialize(deserializer)?;
40        STANDARD.decode(&s).map_err(serde::de::Error::custom)
41    }
42}
43
44/// Protocol version.
45pub const PROTOCOL_VERSION: u32 = 1;
46
47/// Maximum frame size (32 MB - layer exports use chunked streaming).
48pub const MAX_FRAME_SIZE: u32 = 32 * 1024 * 1024;
49
50/// Chunk size for streaming layer data (~16 MB raw, ~21 MB as base64 JSON).
51pub const LAYER_CHUNK_SIZE: usize = 16 * 1024 * 1024;
52
53/// Files at or below this size are written with a single `FileWrite`
54/// message. Larger files must stream via
55/// `FileWriteBegin` + `FileWriteChunk` so no single frame approaches
56/// [`MAX_FRAME_SIZE`] (base64 + JSON inflation is ~1.4x).
57///
58/// Chosen to keep the single-shot frame comfortably under the frame
59/// limit while preserving the fast-path latency for small config
60/// files / scripts / keys.
61pub const FILE_WRITE_SINGLE_SHOT_MAX: usize = 1024 * 1024;
62
63/// Payload bytes per streaming upload chunk. Deliberately small —
64/// equal to [`FILE_WRITE_SINGLE_SHOT_MAX`] — so each chunk's encoded
65/// frame (~1.4 MB) fits inside typical kernel Unix-socket send
66/// buffers (`SO_SNDBUF` defaults on the order of 200–256 KiB but
67/// can grow). Larger chunks would force `write_all` to spin waiting
68/// for the agent to drain, and any latency spike trips the 10 s
69/// write timeout with `EAGAIN` — exactly the failure David
70/// reproduced before this fix landed.
71///
72/// Note: [`LAYER_CHUNK_SIZE`] is 16 MiB for agent→host (download)
73/// streaming, which works because the host side of the socket has
74/// more headroom than the guest side. Upload streaming is the
75/// asymmetric case and needs a smaller chunk.
76pub const FILE_WRITE_CHUNK_SIZE: usize = FILE_WRITE_SINGLE_SHOT_MAX;
77
78/// Hard ceiling on a single file transfer in either direction.
79///
80/// On the write path: enforced at `FileWriteBegin` by the agent —
81/// `total_size > FILE_TRANSFER_MAX_TOTAL` is rejected before any
82/// staging file is created.
83///
84/// On the read path: enforced by the host's `read_file` loop —
85/// after the first chunk that pushes the accumulated total past the
86/// cap, the call bails with an error and the partial buffer is
87/// dropped. This protects the host process from OOM if the guest
88/// (compromised or merely buggy) streams unbounded data.
89///
90/// 4 GiB matches the order-of-magnitude of the default overlay disk
91/// and the `gpu_vram_mib` cap. Callers that need to move larger
92/// blobs should stage via a virtiofs mount instead of `cp`.
93pub const FILE_TRANSFER_MAX_TOTAL: u64 = 4 * 1024 * 1024 * 1024;
94
95/// Well-known vsock ports.
96pub mod ports {
97    /// Control channel for workload VMs.
98    pub const WORKLOAD_CONTROL: u32 = 5000;
99    /// Log streaming from workload VMs.
100    pub const WORKLOAD_LOGS: u32 = 5001;
101    /// Agent control port (for OCI operations and management).
102    pub const AGENT_CONTROL: u32 = 6000;
103    /// SSH agent forwarding (host SSH_AUTH_SOCK bridged to guest).
104    pub const SSH_AGENT: u32 = 6001;
105    /// DNS filtering proxy (guest forwards DNS queries to host for filtering).
106    pub const DNS_FILTER: u32 = 6002;
107}
108
109/// vsock CID constants.
110pub mod cid {
111    /// Host CID (always 2).
112    pub const HOST: u32 = 2;
113    /// Guest CID (always 3 for the first/only guest).
114    pub const GUEST: u32 = 3;
115    /// Any CID (for listening).
116    pub const ANY: u32 = u32::MAX;
117}
118
119// ============================================================================
120// Agent Protocol (OCI Operations)
121// ============================================================================
122
123/// Agent request types (for image management and OCI operations).
124#[derive(Debug, Clone, Serialize, Deserialize)]
125#[serde(tag = "method", rename_all = "snake_case")]
126pub enum AgentRequest {
127    /// Ping to check if agent is alive.
128    Ping,
129
130    /// Pull an OCI image and extract layers.
131    Pull {
132        /// Image reference (e.g., "alpine:latest", "docker.io/library/ubuntu:22.04").
133        image: String,
134        /// OCI platform to pull (e.g., "linux/arm64", "linux/amd64").
135        oci_platform: Option<String>,
136        /// Optional registry authentication credentials.
137        #[serde(default, skip_serializing_if = "Option::is_none")]
138        auth: Option<RegistryAuth>,
139    },
140
141    /// Query if an image exists locally.
142    Query {
143        /// Image reference.
144        image: String,
145    },
146
147    /// List all cached images.
148    ListImages,
149
150    /// Run garbage collection on unused layers.
151    GarbageCollect {
152        /// If true, only report what would be deleted.
153        dry_run: bool,
154        /// If true, delete all image manifests and configs first,
155        /// making all layers unreferenced so they get collected.
156        #[serde(default)]
157        purge_all: bool,
158    },
159
160    /// Prepare overlay rootfs for a workload.
161    PrepareOverlay {
162        /// Image reference.
163        image: String,
164        /// Unique workload ID for the overlay.
165        workload_id: String,
166    },
167
168    /// Clean up overlay rootfs for a workload.
169    CleanupOverlay {
170        /// Workload ID to clean up.
171        workload_id: String,
172    },
173
174    /// Format the storage disk (first-time setup).
175    FormatStorage,
176
177    /// Get storage disk status.
178    StorageStatus,
179
180    /// Test network connectivity directly from the agent (not via chroot).
181    /// Used to debug TSI networking.
182    NetworkTest {
183        /// URL to test (e.g., "http://1.1.1.1")
184        url: String,
185    },
186
187    /// Shutdown the agent.
188    Shutdown,
189
190    /// Export a layer as a tar archive.
191    ///
192    /// Used by `smolvm pack` to extract OCI layers for packaging.
193    /// The agent streams the layer tar data back via LayerData responses.
194    ExportLayer {
195        /// Image digest (sha256:...).
196        image_digest: String,
197        /// Layer index (0-based).
198        layer_index: usize,
199    },
200
201    /// Execute a command directly in the VM (not in a container).
202    ///
203    /// This runs the command in the agent's Alpine rootfs without any
204    /// container isolation. Useful for VM-level operations and debugging.
205    VmExec {
206        /// Command and arguments.
207        command: Vec<String>,
208        /// Environment variables.
209        #[serde(default)]
210        env: Vec<(String, String)>,
211        /// Working directory in the VM.
212        workdir: Option<String>,
213        /// Timeout in milliseconds.
214        #[serde(default)]
215        timeout_ms: Option<u64>,
216        /// Interactive mode - stream I/O instead of buffering.
217        #[serde(default)]
218        interactive: bool,
219        /// Allocate a pseudo-TTY for the command.
220        #[serde(default)]
221        tty: bool,
222        /// Background mode - spawn and return PID immediately without waiting.
223        #[serde(default)]
224        background: bool,
225    },
226
227    /// Run a command in an image's rootfs.
228    ///
229    /// This prepares an overlay, chroots into it, and executes the command.
230    /// Returns stdout, stderr, and exit code when the command completes.
231    Run {
232        /// Image reference (must be pulled first).
233        image: String,
234        /// Command and arguments.
235        command: Vec<String>,
236        /// Environment variables.
237        #[serde(default)]
238        env: Vec<(String, String)>,
239        /// Working directory inside the rootfs.
240        workdir: Option<String>,
241        /// User inside the rootfs. If omitted, the OCI image default applies.
242        #[serde(default, skip_serializing_if = "Option::is_none")]
243        user: Option<String>,
244        /// Volume mounts to bind into the container.
245        /// Each tuple is (virtiofs_tag, container_path, read_only).
246        #[serde(default)]
247        mounts: Vec<(String, String, bool)>,
248        /// Timeout in milliseconds. If the command exceeds this duration,
249        /// it will be killed and return exit code 124.
250        #[serde(default)]
251        timeout_ms: Option<u64>,
252        /// Interactive mode - stream I/O instead of buffering.
253        /// When true, output is streamed via Stdout/Stderr responses,
254        /// and stdin can be sent via the Stdin request.
255        #[serde(default)]
256        interactive: bool,
257        /// Allocate a pseudo-TTY for the command.
258        /// Enables terminal features like colors, line editing, and signal handling.
259        #[serde(default)]
260        tty: bool,
261        /// Detached mode — start the container and return immediately with the
262        /// container ID. Only meaningful when `persistent_overlay_id` is set.
263        /// Returns a `Completed` response with `stdout` containing the container ID.
264        #[serde(default)]
265        detached: bool,
266        /// If set, use a persistent overlay that survives across exec sessions.
267        /// The overlay is identified by this ID (typically the machine name)
268        /// and reused on subsequent runs. If not set, an ephemeral overlay is
269        /// created and destroyed after the run.
270        #[serde(default, skip_serializing_if = "Option::is_none")]
271        persistent_overlay_id: Option<String>,
272        /// Spawn the container and return immediately with the crun PID.
273        /// The container runs detached; stdout/stderr go to /dev/null.
274        /// Incompatible with `interactive` and `tty`.
275        #[serde(default)]
276        background: bool,
277    },
278
279    /// Send stdin data to a running interactive command.
280    Stdin {
281        /// Input data to send to the command's stdin.
282        #[serde(with = "base64_bytes")]
283        data: Vec<u8>,
284    },
285
286    /// Resize the PTY window (for TTY mode).
287    Resize {
288        /// New width in columns.
289        cols: u16,
290        /// New height in rows.
291        rows: u16,
292    },
293
294    // ========================================================================
295    // File I/O
296    // ========================================================================
297    /// Write a file inside the VM in a single message.
298    ///
299    /// Use only for files up to [`FILE_WRITE_SINGLE_SHOT_MAX`]. Larger
300    /// files must stream via [`Self::FileWriteBegin`] +
301    /// [`Self::FileWriteChunk`] to avoid exceeding [`MAX_FRAME_SIZE`]
302    /// after base64 + JSON inflation.
303    FileWrite {
304        /// Absolute path in the VM filesystem.
305        path: String,
306        /// File contents.
307        #[serde(with = "base64_bytes")]
308        data: Vec<u8>,
309        /// File mode (e.g., 0o644). None = default (0644).
310        #[serde(default)]
311        mode: Option<u32>,
312    },
313
314    /// Open a streaming file upload session on this connection.
315    ///
316    /// Must be followed by one or more [`Self::FileWriteChunk`]
317    /// requests. The final chunk sets `done: true` to finalize.
318    /// Dropping the connection (or sending any non-chunk request)
319    /// before `done` aborts the session and leaves no partial file
320    /// at `path`.
321    ///
322    /// Sessions are per-connection — one session at a time.
323    FileWriteBegin {
324        /// Absolute path in the VM filesystem.
325        path: String,
326        /// File mode (e.g., 0o644). None = default (0644).
327        #[serde(default)]
328        mode: Option<u32>,
329        /// Expected total size in bytes. Rejected if it exceeds
330        /// [`FILE_TRANSFER_MAX_TOTAL`]. The agent uses this for an
331        /// early-fail check only; the actual size written is the sum
332        /// of chunk byte lengths.
333        total_size: u64,
334    },
335
336    /// Append a chunk to the currently open streaming upload.
337    /// If `done` is true, the agent fsyncs and atomically renames the
338    /// staging file onto the target path.
339    FileWriteChunk {
340        /// Chunk bytes. Typically [`FILE_WRITE_CHUNK_SIZE`] except
341        /// for the last chunk.
342        #[serde(with = "base64_bytes")]
343        data: Vec<u8>,
344        /// True on the final chunk; closes and renames the staging
345        /// file. False on intermediate chunks.
346        done: bool,
347    },
348
349    /// Read a file from the VM.
350    FileRead {
351        /// Absolute path in the VM filesystem.
352        path: String,
353    },
354}
355
356/// Agent response types.
357#[derive(Debug, Clone, Serialize, Deserialize)]
358#[serde(tag = "status", rename_all = "snake_case")]
359pub enum AgentResponse {
360    /// Operation completed successfully.
361    Ok {
362        /// Response data (varies by request type).
363        #[serde(default, skip_serializing_if = "Option::is_none")]
364        data: Option<serde_json::Value>,
365    },
366
367    /// Pong response to ping.
368    Pong {
369        /// Protocol version.
370        version: u32,
371    },
372
373    /// Progress update (for long operations like pull).
374    Progress {
375        /// Human-readable message.
376        message: String,
377        /// Completion percentage (0-100).
378        #[serde(default, skip_serializing_if = "Option::is_none")]
379        percent: Option<u8>,
380        /// Current layer being processed.
381        #[serde(default, skip_serializing_if = "Option::is_none")]
382        layer: Option<String>,
383    },
384
385    /// Operation failed.
386    Error {
387        /// Error message.
388        message: String,
389        /// Error code (for programmatic handling).
390        #[serde(default, skip_serializing_if = "Option::is_none")]
391        code: Option<String>,
392    },
393
394    /// Command execution completed (non-interactive mode).
395    Completed {
396        /// Exit code from the command.
397        exit_code: i32,
398        /// Standard output (may be truncated). `Vec<u8>` preserves binary
399        /// output (image bytes, tarballs, etc.) that would be truncated by
400        /// `String` at the first non-UTF-8 byte. Serialized as base64 JSON
401        /// string — the same format as the streaming `Stdout` variant.
402        #[serde(with = "base64_bytes")]
403        stdout: Vec<u8>,
404        /// Standard error (may be truncated).
405        #[serde(with = "base64_bytes")]
406        stderr: Vec<u8>,
407    },
408
409    /// Command started (interactive mode).
410    /// Indicates the command is running and ready to receive stdin.
411    Started,
412
413    /// Stdout data from a running command (interactive mode).
414    Stdout {
415        /// Output data.
416        #[serde(with = "base64_bytes")]
417        data: Vec<u8>,
418    },
419
420    /// Stderr data from a running command (interactive mode).
421    Stderr {
422        /// Error output data.
423        #[serde(with = "base64_bytes")]
424        data: Vec<u8>,
425    },
426
427    /// Command exited (interactive mode).
428    Exited {
429        /// Exit code from the command.
430        exit_code: i32,
431    },
432
433    /// Streaming binary-data chunk.
434    ///
435    /// Used by every streaming download direction: the agent sends
436    /// one or more `DataChunk` responses in sequence, with `done: true`
437    /// on the final chunk. Current producers: `ExportLayer` and
438    /// `FileRead`.
439    ///
440    /// Payload size per chunk should stay under
441    /// [`LAYER_CHUNK_SIZE`] so the encoded frame (~1.33× after
442    /// base64) fits inside [`MAX_FRAME_SIZE`] with JSON overhead to
443    /// spare.
444    DataChunk {
445        /// Chunk bytes. Empty allowed on the final frame (common for
446        /// EOF-on-clean-boundary cases).
447        #[serde(with = "base64_bytes")]
448        data: Vec<u8>,
449        /// True on the final chunk of the stream.
450        done: bool,
451    },
452}
453
454// ============================================================================
455// Error Code Constants
456// ============================================================================
457//
458// Standard error codes for AgentResponse::Error. Using constants ensures
459// consistency across the codebase and makes error handling more reliable.
460
461/// Error codes for agent responses.
462pub mod error_codes {
463    /// Request payload was invalid or malformed.
464    pub const INVALID_REQUEST: &str = "INVALID_REQUEST";
465    /// Requested resource was not found.
466    pub const NOT_FOUND: &str = "NOT_FOUND";
467    /// Internal error during operation.
468    pub const INTERNAL_ERROR: &str = "INTERNAL_ERROR";
469    /// Image pull operation failed.
470    pub const PULL_FAILED: &str = "PULL_FAILED";
471    /// Image query operation failed.
472    pub const QUERY_FAILED: &str = "QUERY_FAILED";
473    /// Command execution failed.
474    pub const RUN_FAILED: &str = "RUN_FAILED";
475    /// Command execution failed in container.
476    pub const EXEC_FAILED: &str = "EXEC_FAILED";
477    /// Process spawn failed.
478    pub const SPAWN_FAILED: &str = "SPAWN_FAILED";
479    /// Mount operation failed.
480    pub const MOUNT_FAILED: &str = "MOUNT_FAILED";
481    /// File I/O operation failed.
482    pub const FILE_IO_FAILED: &str = "FILE_IO_FAILED";
483    /// Overlay filesystem operation failed.
484    pub const OVERLAY_FAILED: &str = "OVERLAY_FAILED";
485    /// Cleanup operation failed.
486    pub const CLEANUP_FAILED: &str = "CLEANUP_FAILED";
487    /// Storage format operation failed.
488    pub const FORMAT_FAILED: &str = "FORMAT_FAILED";
489    /// Storage status query failed.
490    pub const STATUS_FAILED: &str = "STATUS_FAILED";
491    /// List operation failed.
492    pub const LIST_FAILED: &str = "LIST_FAILED";
493    /// Garbage collection failed.
494    pub const GC_FAILED: &str = "GC_FAILED";
495    /// Container creation failed.
496    pub const CREATE_FAILED: &str = "CREATE_FAILED";
497    /// Container start failed.
498    pub const START_FAILED: &str = "START_FAILED";
499    /// Container stop failed.
500    pub const STOP_FAILED: &str = "STOP_FAILED";
501    /// Container delete failed.
502    pub const DELETE_FAILED: &str = "DELETE_FAILED";
503    /// Export operation failed.
504    pub const EXPORT_FAILED: &str = "EXPORT_FAILED";
505    /// Serialization error.
506    pub const SERIALIZATION_ERROR: &str = "SERIALIZATION_ERROR";
507    /// Message size exceeds maximum.
508    pub const MESSAGE_TOO_LARGE: &str = "MESSAGE_TOO_LARGE";
509    /// Process wait operation failed.
510    pub const WAIT_FAILED: &str = "WAIT_FAILED";
511}
512
513impl AgentResponse {
514    /// Create an error response with the given message and code.
515    ///
516    /// # Example
517    ///
518    /// ```
519    /// use smolvm_protocol::{AgentResponse, error_codes};
520    ///
521    /// let response = AgentResponse::error("image not found", error_codes::NOT_FOUND);
522    /// ```
523    pub fn error(message: impl Into<String>, code: &str) -> Self {
524        AgentResponse::Error {
525            message: message.into(),
526            code: Some(code.to_string()),
527        }
528    }
529
530    /// Create an error response from a Result's error, with the given code.
531    ///
532    /// # Example
533    ///
534    /// ```ignore
535    /// let response = some_operation()
536    ///     .map(|data| AgentResponse::ok_with_data(data))
537    ///     .unwrap_or_else(|e| AgentResponse::from_err(e, error_codes::PULL_FAILED));
538    /// ```
539    pub fn from_err<E: std::fmt::Display>(err: E, code: &str) -> Self {
540        AgentResponse::Error {
541            message: err.to_string(),
542            code: Some(code.to_string()),
543        }
544    }
545
546    /// Create an Ok response with optional JSON data.
547    pub fn ok(data: Option<serde_json::Value>) -> Self {
548        AgentResponse::Ok { data }
549    }
550
551    /// Create an Ok response with JSON-serializable data.
552    ///
553    /// Returns an error response if serialization fails.
554    pub fn ok_with_data<T: serde::Serialize>(data: T) -> Self {
555        match serde_json::to_value(data) {
556            Ok(value) => AgentResponse::Ok { data: Some(value) },
557            Err(e) => AgentResponse::error(
558                format!("failed to serialize response: {}", e),
559                error_codes::SERIALIZATION_ERROR,
560            ),
561        }
562    }
563
564    /// Convert a Result into an AgentResponse.
565    ///
566    /// On success, serializes the value to JSON. On error, creates an error response.
567    ///
568    /// # Example
569    ///
570    /// ```ignore
571    /// let response = AgentResponse::from_result(
572    ///     storage::pull_image(image),
573    ///     error_codes::PULL_FAILED,
574    /// );
575    /// ```
576    pub fn from_result<T, E>(result: Result<T, E>, error_code: &str) -> Self
577    where
578        T: serde::Serialize,
579        E: std::fmt::Display,
580    {
581        match result {
582            Ok(data) => Self::ok_with_data(data),
583            Err(e) => Self::from_err(e, error_code),
584        }
585    }
586}
587
588/// Image information returned by Query/ListImages.
589#[derive(Debug, Clone, Serialize, Deserialize)]
590pub struct ImageInfo {
591    /// Image reference.
592    pub reference: String,
593    /// Image digest (sha256:...).
594    pub digest: String,
595    /// Image size in bytes.
596    pub size: u64,
597    /// Creation timestamp (ISO 8601).
598    pub created: Option<String>,
599    /// Platform architecture.
600    pub architecture: String,
601    /// Platform OS.
602    pub os: String,
603    /// Number of layers.
604    pub layer_count: usize,
605    /// Layer digests in order.
606    pub layers: Vec<String>,
607    /// Image entrypoint (from OCI config).
608    #[serde(default)]
609    pub entrypoint: Vec<String>,
610    /// Image default command (from OCI config).
611    #[serde(default)]
612    pub cmd: Vec<String>,
613    /// Image environment variables (from OCI config).
614    #[serde(default)]
615    pub env: Vec<String>,
616    /// Image working directory (from OCI config).
617    #[serde(default)]
618    pub workdir: Option<String>,
619    /// Image default user (from OCI config).
620    #[serde(default)]
621    pub user: Option<String>,
622}
623
624/// Overlay preparation result.
625#[derive(Debug, Clone, Serialize, Deserialize)]
626pub struct OverlayInfo {
627    /// Path to the merged overlay rootfs.
628    pub rootfs_path: String,
629    /// Path to the upper (writable) directory.
630    pub upper_path: String,
631    /// Path to the work directory.
632    pub work_path: String,
633}
634
635/// Storage status information.
636#[derive(Debug, Clone, Serialize, Deserialize)]
637pub struct StorageStatus {
638    /// Whether the storage is formatted and ready.
639    pub ready: bool,
640    /// Total size in bytes.
641    pub total_bytes: u64,
642    /// Used size in bytes.
643    pub used_bytes: u64,
644    /// Number of cached layers.
645    pub layer_count: usize,
646    /// Number of cached images.
647    pub image_count: usize,
648}
649
650/// Registry authentication credentials for pulling images.
651#[derive(Debug, Clone, Serialize, Deserialize)]
652pub struct RegistryAuth {
653    /// Username for authentication.
654    pub username: String,
655    /// Password or token for authentication.
656    pub password: String,
657}
658
659// ============================================================================
660// Workload VM Protocol (Command Execution)
661// ============================================================================
662
663/// Messages from host to workload VM.
664#[derive(Debug, Clone, Serialize, Deserialize)]
665#[serde(tag = "type", rename_all = "snake_case")]
666pub enum HostMessage {
667    /// Authentication request.
668    Auth {
669        /// Authentication token (base64).
670        token: String,
671        /// Protocol version.
672        protocol_version: u32,
673    },
674
675    /// Run a command.
676    Run {
677        /// Request ID for correlating responses.
678        request_id: u64,
679        /// Command and arguments.
680        command: Vec<String>,
681        /// Environment variables.
682        env: Vec<(String, String)>,
683        /// Working directory.
684        workdir: Option<String>,
685    },
686
687    /// Execute a command in running VM.
688    Exec {
689        /// Request ID.
690        request_id: u64,
691        /// Command and arguments.
692        command: Vec<String>,
693        /// Allocate a TTY.
694        tty: bool,
695    },
696
697    /// Send a signal to a running command.
698    Signal {
699        /// Request ID of the command.
700        request_id: u64,
701        /// Signal number.
702        signal: i32,
703    },
704
705    /// Request graceful shutdown.
706    Stop {
707        /// Timeout in milliseconds.
708        timeout_ms: u64,
709    },
710}
711
712/// Messages from workload VM to host.
713#[derive(Debug, Clone, Serialize, Deserialize)]
714#[serde(tag = "type", rename_all = "snake_case")]
715pub enum GuestMessage {
716    /// Authentication successful.
717    AuthOk,
718
719    /// Authentication failed.
720    AuthFailed,
721
722    /// VM is ready to receive commands.
723    Ready,
724
725    /// Command started.
726    Started {
727        /// Request ID.
728        request_id: u64,
729    },
730
731    /// Stdout data from command.
732    Stdout {
733        /// Request ID.
734        request_id: u64,
735        /// Output data.
736        #[serde(with = "base64_bytes")]
737        data: Vec<u8>,
738        /// Whether output was truncated.
739        truncated: bool,
740    },
741
742    /// Stderr data from command.
743    Stderr {
744        /// Request ID.
745        request_id: u64,
746        /// Output data.
747        #[serde(with = "base64_bytes")]
748        data: Vec<u8>,
749        /// Whether output was truncated.
750        truncated: bool,
751    },
752
753    /// Command exited.
754    Exit {
755        /// Request ID.
756        request_id: u64,
757        /// Exit code.
758        code: i32,
759        /// Exit reason.
760        reason: String,
761    },
762
763    /// Error occurred.
764    Error {
765        /// Request ID (if applicable).
766        request_id: Option<u64>,
767        /// Error message.
768        message: String,
769    },
770}
771
772// ============================================================================
773// Wire Format Helpers
774// ============================================================================
775
776/// Envelope that wraps any message with an optional trace ID for correlation.
777///
778/// On the wire, the trace_id is flattened into the JSON alongside the message
779/// fields: `{"trace_id":"abc123","method":"ping"}`.
780#[derive(Debug, Clone, Serialize, Deserialize)]
781pub struct Envelope<T> {
782    /// Trace ID for correlating host API requests to agent operations.
783    #[serde(skip_serializing_if = "Option::is_none", default)]
784    pub trace_id: Option<String>,
785    /// The wrapped message.
786    #[serde(flatten)]
787    pub body: T,
788}
789
790impl<T> Envelope<T> {
791    /// Create an envelope with no trace ID.
792    pub fn new(body: T) -> Self {
793        Self {
794            trace_id: None,
795            body,
796        }
797    }
798
799    /// Create an envelope with an optional trace ID.
800    pub fn with_trace_id(body: T, trace_id: Option<String>) -> Self {
801        Self { trace_id, body }
802    }
803}
804
805/// Encode a message to wire format (length-prefixed JSON).
806pub fn encode_message<T: Serialize>(msg: &T) -> Result<Vec<u8>, serde_json::Error> {
807    let json = serde_json::to_vec(msg)?;
808    let len = json.len() as u32;
809
810    let mut buf = Vec::with_capacity(4 + json.len());
811    buf.extend_from_slice(&len.to_be_bytes());
812    buf.extend_from_slice(&json);
813
814    Ok(buf)
815}
816
817/// Decode a message from wire format.
818pub fn decode_message<T: for<'de> Deserialize<'de>>(data: &[u8]) -> Result<T, DecodeError> {
819    if data.len() < 4 {
820        return Err(DecodeError::TooShort);
821    }
822
823    let len = u32::from_be_bytes([data[0], data[1], data[2], data[3]]) as usize;
824
825    if len > MAX_FRAME_SIZE as usize {
826        return Err(DecodeError::TooLarge(len));
827    }
828
829    if data.len() < 4 + len {
830        return Err(DecodeError::Incomplete {
831            expected: len,
832            got: data.len() - 4,
833        });
834    }
835
836    serde_json::from_slice(&data[4..4 + len]).map_err(DecodeError::Json)
837}
838
839/// Error decoding a wire message.
840#[derive(Debug)]
841pub enum DecodeError {
842    /// Data too short to contain length header.
843    TooShort,
844    /// Frame size exceeds maximum.
845    TooLarge(usize),
846    /// Incomplete frame.
847    Incomplete {
848        /// Expected length.
849        expected: usize,
850        /// Actual length.
851        got: usize,
852    },
853    /// JSON parse error.
854    Json(serde_json::Error),
855}
856
857impl std::fmt::Display for DecodeError {
858    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
859        match self {
860            DecodeError::TooShort => write!(f, "data too short for length header"),
861            DecodeError::TooLarge(size) => write!(f, "frame too large: {} bytes", size),
862            DecodeError::Incomplete { expected, got } => {
863                write!(
864                    f,
865                    "incomplete frame: expected {} bytes, got {}",
866                    expected, got
867                )
868            }
869            DecodeError::Json(e) => write!(f, "JSON decode error: {}", e),
870        }
871    }
872}
873
874impl std::error::Error for DecodeError {}
875
876#[cfg(test)]
877mod tests {
878    use super::*;
879
880    #[test]
881    fn test_encode_decode_roundtrip() {
882        let req = AgentRequest::Pull {
883            image: "alpine:latest".to_string(),
884            oci_platform: Some("linux/arm64".to_string()),
885            auth: None,
886        };
887
888        let encoded = encode_message(&req).unwrap();
889        let decoded: AgentRequest = decode_message(&encoded).unwrap();
890
891        let AgentRequest::Pull {
892            image,
893            oci_platform,
894            auth,
895        } = decoded
896        else {
897            panic!("expected Pull variant, got {:?}", decoded);
898        };
899        assert_eq!(image, "alpine:latest");
900        assert_eq!(oci_platform, Some("linux/arm64".to_string()));
901        assert!(auth.is_none());
902    }
903
904    #[test]
905    fn test_encode_decode_with_auth() {
906        let req = AgentRequest::Pull {
907            image: "ghcr.io/owner/repo:latest".to_string(),
908            oci_platform: None,
909            auth: Some(RegistryAuth {
910                username: "testuser".to_string(),
911                password: "testpass".to_string(),
912            }),
913        };
914
915        let encoded = encode_message(&req).unwrap();
916        let decoded: AgentRequest = decode_message(&encoded).unwrap();
917
918        let AgentRequest::Pull {
919            image,
920            oci_platform,
921            auth,
922        } = decoded
923        else {
924            panic!("expected Pull variant, got {:?}", decoded);
925        };
926        assert_eq!(image, "ghcr.io/owner/repo:latest");
927        assert!(oci_platform.is_none());
928        let auth = auth.expect("auth should be Some");
929        assert_eq!(auth.username, "testuser");
930        assert_eq!(auth.password, "testpass");
931    }
932
933    #[test]
934    fn test_decode_too_short() {
935        let data = [0u8; 2];
936        let result: Result<AgentRequest, _> = decode_message(&data);
937        assert!(matches!(result, Err(DecodeError::TooShort)));
938    }
939
940    #[test]
941    fn test_decode_incomplete() {
942        let mut data = vec![0, 0, 0, 100]; // claims 100 bytes
943        data.extend_from_slice(b"{}"); // only 2 bytes of payload
944        let result: Result<AgentRequest, _> = decode_message(&data);
945        assert!(matches!(result, Err(DecodeError::Incomplete { .. })));
946    }
947
948    #[test]
949    fn test_agent_request_serialization() {
950        let req = AgentRequest::Ping;
951        let json = serde_json::to_string(&req).unwrap();
952        assert!(json.contains("ping"));
953
954        let req = AgentRequest::PrepareOverlay {
955            image: "ubuntu:22.04".to_string(),
956            workload_id: "wl-123".to_string(),
957        };
958        let json = serde_json::to_string(&req).unwrap();
959        assert!(json.contains("prepare_overlay"));
960    }
961
962    #[test]
963    fn test_agent_response_serialization() {
964        let resp = AgentResponse::Pong {
965            version: PROTOCOL_VERSION,
966        };
967        let json = serde_json::to_string(&resp).unwrap();
968        assert!(json.contains("pong"));
969
970        let resp = AgentResponse::Progress {
971            message: "Pulling layer 1/3".to_string(),
972            percent: Some(33),
973            layer: Some("sha256:abc123".to_string()),
974        };
975        let json = serde_json::to_string(&resp).unwrap();
976        assert!(json.contains("progress"));
977    }
978
979    #[test]
980    fn file_write_begin_roundtrips() {
981        let req = AgentRequest::FileWriteBegin {
982            path: "/tmp/target".into(),
983            mode: Some(0o600),
984            total_size: 123_456_789,
985        };
986        let bytes = encode_message(&req).unwrap();
987        let back: AgentRequest = decode_message(&bytes).unwrap();
988        match back {
989            AgentRequest::FileWriteBegin {
990                path,
991                mode,
992                total_size,
993            } => {
994                assert_eq!(path, "/tmp/target");
995                assert_eq!(mode, Some(0o600));
996                assert_eq!(total_size, 123_456_789);
997            }
998            _ => panic!("wrong variant"),
999        }
1000    }
1001
1002    #[test]
1003    fn file_write_chunk_roundtrips_binary_data() {
1004        // Binary data (bytes outside UTF-8) must survive the base64
1005        // trip intact. If the encoding ever silently lossifies, this
1006        // fires.
1007        let payload: Vec<u8> = (0u8..=255).collect();
1008        let req = AgentRequest::FileWriteChunk {
1009            data: payload.clone(),
1010            done: true,
1011        };
1012        let bytes = encode_message(&req).unwrap();
1013        let back: AgentRequest = decode_message(&bytes).unwrap();
1014        match back {
1015            AgentRequest::FileWriteChunk { data, done } => {
1016                assert_eq!(data, payload);
1017                assert!(done);
1018            }
1019            _ => panic!("wrong variant"),
1020        }
1021    }
1022
1023    #[test]
1024    fn file_write_size_constants_are_frame_safe() {
1025        // Sanity: a single streaming chunk at FILE_WRITE_CHUNK_SIZE
1026        // must fit inside MAX_FRAME_SIZE after base64 (+ ~33%) and
1027        // JSON overhead. If anyone bumps CHUNK_SIZE past the limit,
1028        // this test fires before production does.
1029        let chunk_bytes = FILE_WRITE_CHUNK_SIZE as u64;
1030        let base64_bytes = chunk_bytes.div_ceil(3) * 4; // ceil(n/3)*4
1031        let json_overhead = 256u64; // method tag, done bool, quotes
1032        let total = base64_bytes + json_overhead;
1033        assert!(
1034            total < MAX_FRAME_SIZE as u64,
1035            "FILE_WRITE_CHUNK_SIZE of {} bytes would produce a frame \
1036             of ~{} bytes which exceeds MAX_FRAME_SIZE of {}",
1037            chunk_bytes,
1038            total,
1039            MAX_FRAME_SIZE
1040        );
1041
1042        // Single-shot threshold must be <= chunk size. They can be
1043        // equal (a 1 MiB file is a single shot; a 1 MiB + 1 byte
1044        // file streams as two chunks); but SINGLE_SHOT > CHUNK would
1045        // be incoherent — a file slightly over the shot threshold
1046        // would need to stream as... a single oversized chunk.
1047        assert!(FILE_WRITE_SINGLE_SHOT_MAX <= FILE_WRITE_CHUNK_SIZE);
1048    }
1049
1050    #[test]
1051    fn test_ports_constants() {
1052        assert_eq!(ports::WORKLOAD_CONTROL, 5000);
1053        assert_eq!(ports::WORKLOAD_LOGS, 5001);
1054        assert_eq!(ports::AGENT_CONTROL, 6000);
1055        assert_eq!(ports::SSH_AGENT, 6001);
1056    }
1057
1058    #[test]
1059    fn test_cid_constants() {
1060        assert_eq!(cid::HOST, 2);
1061        assert_eq!(cid::GUEST, 3);
1062    }
1063
1064    #[test]
1065    fn test_envelope_serialization_with_trace_id() {
1066        let req = AgentRequest::Ping;
1067        let envelope = Envelope::with_trace_id(&req, Some("abc123".to_string()));
1068        let json = serde_json::to_string(&envelope).unwrap();
1069
1070        // trace_id should be flattened alongside the method tag
1071        assert!(json.contains("\"trace_id\":\"abc123\""));
1072        assert!(json.contains("\"method\":\"ping\""));
1073
1074        // Deserialize back — Envelope<AgentRequest> with flatten
1075        let parsed: Envelope<AgentRequest> = serde_json::from_str(&json).unwrap();
1076        assert_eq!(parsed.trace_id.as_deref(), Some("abc123"));
1077        assert!(matches!(parsed.body, AgentRequest::Ping));
1078    }
1079
1080    #[test]
1081    fn test_envelope_without_trace_id() {
1082        let req = AgentRequest::Ping;
1083        let envelope = Envelope::new(&req);
1084        let json = serde_json::to_string(&envelope).unwrap();
1085
1086        // No trace_id field (skip_serializing_if = None)
1087        assert!(!json.contains("trace_id"));
1088        assert!(json.contains("\"method\":\"ping\""));
1089    }
1090
1091    #[test]
1092    fn test_envelope_backward_compat_bare_request() {
1093        // A bare AgentRequest (no Envelope) should fail to parse as Envelope
1094        // but succeed as bare AgentRequest — this is the agent's fallback path
1095        let bare_json = r#"{"method":"ping"}"#;
1096
1097        // Envelope parse should fail (no body field to flatten into)
1098        // Actually with flatten, this may work — let's verify
1099        let envelope_result = serde_json::from_str::<Envelope<AgentRequest>>(bare_json);
1100        let bare_result = serde_json::from_str::<AgentRequest>(bare_json);
1101
1102        // At least one must succeed for backward compat
1103        assert!(
1104            envelope_result.is_ok() || bare_result.is_ok(),
1105            "Neither Envelope nor bare parse succeeded"
1106        );
1107
1108        // Bare parse must always work
1109        assert!(bare_result.is_ok());
1110        assert!(matches!(bare_result.unwrap(), AgentRequest::Ping));
1111
1112        // If Envelope works, trace_id should be None
1113        if let Ok(env) = envelope_result {
1114            assert!(env.trace_id.is_none());
1115        }
1116    }
1117}