Skip to main content

smolvm_protocol/
lib.rs

1//! Protocol types for smolvm host-guest communication.
2//!
3//! This crate defines the wire protocol for vsock communication between
4//! the smolvm host and the guest agent (smolvm-agent).
5//!
6//! # Protocol Overview
7//!
8//! Communication uses JSON-encoded messages over vsock. Each message is
9//! prefixed with a 4-byte big-endian length header.
10//!
11//! ```text
12//! +----------------+-------------------+
13//! | Length (4 BE)  | JSON payload      |
14//! +----------------+-------------------+
15//! ```
16
17#![deny(missing_docs)]
18
19use serde::{Deserialize, Serialize};
20
21pub mod guest_env;
22pub mod image_ref;
23pub mod retry;
24
25pub use image_ref::normalize_image_ref;
26
27/// Serde helper for encoding `Vec<u8>` as a base64 string in JSON.
28///
29/// Without this, serde_json serializes `Vec<u8>` as a JSON array of numbers
30/// (e.g., `[104,101,108,108,111]`), which inflates binary data by ~4x.
31/// Base64 encoding reduces this to ~1.33x.
32pub mod base64_bytes {
33    use base64::{engine::general_purpose::STANDARD, Engine};
34    use serde::{Deserialize, Deserializer, Serializer};
35
36    /// Serialize `Vec<u8>` as a base64 string.
37    pub fn serialize<S: Serializer>(data: &[u8], serializer: S) -> Result<S::Ok, S::Error> {
38        serializer.serialize_str(&STANDARD.encode(data))
39    }
40
41    /// Deserialize a base64 string into `Vec<u8>`.
42    pub fn deserialize<'de, D: Deserializer<'de>>(deserializer: D) -> Result<Vec<u8>, D::Error> {
43        let s = String::deserialize(deserializer)?;
44        STANDARD.decode(&s).map_err(serde::de::Error::custom)
45    }
46}
47
48/// Protocol version.
49pub const PROTOCOL_VERSION: u32 = 1;
50
51/// Maximum frame size (32 MB - layer exports use chunked streaming).
52pub const MAX_FRAME_SIZE: u32 = 32 * 1024 * 1024;
53
54/// Chunk size for streaming layer data (~16 MB raw, ~21 MB as base64 JSON).
55pub const LAYER_CHUNK_SIZE: usize = 16 * 1024 * 1024;
56
57/// Files at or below this size are written with a single `FileWrite`
58/// message. Larger files must stream via
59/// `FileWriteBegin` + `FileWriteChunk` so no single frame approaches
60/// [`MAX_FRAME_SIZE`] (base64 + JSON inflation is ~1.4x).
61///
62/// Chosen to keep the single-shot frame comfortably under the frame
63/// limit while preserving the fast-path latency for small config
64/// files / scripts / keys.
65pub const FILE_WRITE_SINGLE_SHOT_MAX: usize = 1024 * 1024;
66
67/// Payload bytes per streaming upload chunk. Deliberately small —
68/// equal to [`FILE_WRITE_SINGLE_SHOT_MAX`] — so each chunk's encoded
69/// frame (~1.4 MB) fits inside typical kernel Unix-socket send
70/// buffers (`SO_SNDBUF` defaults on the order of 200–256 KiB but
71/// can grow). Larger chunks would force `write_all` to spin waiting
72/// for the agent to drain, and any latency spike trips the 10 s
73/// write timeout with `EAGAIN` — exactly the failure David
74/// reproduced before this fix landed.
75///
76/// Note: [`LAYER_CHUNK_SIZE`] is 16 MiB for agent→host (download)
77/// streaming, which works because the host side of the socket has
78/// more headroom than the guest side. Upload streaming is the
79/// asymmetric case and needs a smaller chunk.
80pub const FILE_WRITE_CHUNK_SIZE: usize = FILE_WRITE_SINGLE_SHOT_MAX;
81
82/// Hard ceiling on a single file transfer in either direction.
83///
84/// On the write path: enforced at `FileWriteBegin` by the agent —
85/// `total_size > FILE_TRANSFER_MAX_TOTAL` is rejected before any
86/// staging file is created.
87///
88/// On the read path: enforced by the host's `read_file` loop —
89/// after the first chunk that pushes the accumulated total past the
90/// cap, the call bails with an error and the partial buffer is
91/// dropped. This protects the host process from OOM if the guest
92/// (compromised or merely buggy) streams unbounded data.
93///
94/// 4 GiB matches the order-of-magnitude of the default overlay disk
95/// and the `gpu_vram_mib` cap. Callers that need to move larger
96/// blobs should stage via a virtiofs mount instead of `cp`.
97pub const FILE_TRANSFER_MAX_TOTAL: u64 = 4 * 1024 * 1024 * 1024;
98
99/// Filename of the virtiofs-visible marker the agent creates when it is
100/// ready to accept vsock connections.
101///
102/// The host polls for this file through its virtiofs mount of the guest
103/// rootfs. The agent writes it (and optionally a symlink from `/oldroot/`)
104/// during deferred init, just before opening the vsock listener.
105///
106/// Both sides must agree on this name; keeping it here prevents silent drift.
107pub const AGENT_READY_MARKER: &str = ".smolvm-ready";
108
109/// Well-known vsock ports.
110pub mod ports {
111    /// Control channel for workload VMs.
112    pub const WORKLOAD_CONTROL: u32 = 5000;
113    /// Log streaming from workload VMs.
114    pub const WORKLOAD_LOGS: u32 = 5001;
115    /// Agent control port (for OCI operations and management).
116    pub const AGENT_CONTROL: u32 = 6000;
117    /// SSH agent forwarding (host SSH_AUTH_SOCK bridged to guest).
118    pub const SSH_AGENT: u32 = 6001;
119    /// DNS filtering proxy (guest forwards DNS queries to host for filtering).
120    pub const DNS_FILTER: u32 = 6002;
121}
122
123/// vsock CID constants.
124pub mod cid {
125    /// Host CID (always 2).
126    pub const HOST: u32 = 2;
127    /// Guest CID (always 3 for the first/only guest).
128    pub const GUEST: u32 = 3;
129    /// Any CID (for listening).
130    pub const ANY: u32 = u32::MAX;
131}
132
133// ============================================================================
134// Agent Protocol (OCI Operations)
135// ============================================================================
136
137/// Agent request types (for image management and OCI operations).
138#[derive(Debug, Clone, Serialize, Deserialize)]
139#[serde(tag = "method", rename_all = "snake_case")]
140pub enum AgentRequest {
141    /// Ping to check if agent is alive.
142    Ping,
143
144    /// Pull an OCI image and extract layers.
145    Pull {
146        /// Image reference (e.g., "alpine:latest", "docker.io/library/ubuntu:22.04").
147        image: String,
148        /// OCI platform to pull (e.g., "linux/arm64", "linux/amd64").
149        oci_platform: Option<String>,
150        /// Optional registry authentication credentials.
151        #[serde(default, skip_serializing_if = "Option::is_none")]
152        auth: Option<RegistryAuth>,
153        /// Proxy URL applied to the registry client (sets HTTP_PROXY and HTTPS_PROXY).
154        #[serde(default, skip_serializing_if = "Option::is_none")]
155        proxy: Option<String>,
156        /// Comma-separated NO_PROXY list of hosts/CIDRs that bypass the proxy.
157        #[serde(default, skip_serializing_if = "Option::is_none")]
158        no_proxy: Option<String>,
159    },
160
161    /// Query if an image exists locally.
162    Query {
163        /// Image reference.
164        image: String,
165    },
166
167    /// List all cached images.
168    ListImages,
169
170    /// Run garbage collection on unused layers.
171    GarbageCollect {
172        /// If true, only report what would be deleted.
173        dry_run: bool,
174        /// If true, delete all image manifests and configs first,
175        /// making all layers unreferenced so they get collected.
176        #[serde(default)]
177        purge_all: bool,
178    },
179
180    /// Prepare overlay rootfs for a workload.
181    PrepareOverlay {
182        /// Image reference.
183        image: String,
184        /// Unique workload ID for the overlay.
185        workload_id: String,
186    },
187
188    /// Clean up overlay rootfs for a workload.
189    CleanupOverlay {
190        /// Workload ID to clean up.
191        workload_id: String,
192    },
193
194    /// Format the storage disk (first-time setup).
195    FormatStorage,
196
197    /// Get storage disk status.
198    StorageStatus,
199
200    /// Test network connectivity directly from the agent (not via chroot).
201    /// Used to debug TSI networking.
202    NetworkTest {
203        /// URL to test (e.g., "http://1.1.1.1")
204        url: String,
205    },
206
207    /// Shutdown the agent.
208    Shutdown,
209
210    /// Export a layer as a tar archive.
211    ///
212    /// Used by `smolvm pack` to extract OCI layers for packaging.
213    /// The agent streams the layer tar data back via LayerData responses.
214    ExportLayer {
215        /// Image digest (sha256:...).
216        image_digest: String,
217        /// Layer index (0-based).
218        layer_index: usize,
219    },
220
221    /// Execute a command directly in the VM (not in a container).
222    ///
223    /// This runs the command in the agent's Alpine rootfs without any
224    /// container isolation. Useful for VM-level operations and debugging.
225    VmExec {
226        /// Command and arguments.
227        command: Vec<String>,
228        /// Environment variables.
229        #[serde(default)]
230        env: Vec<(String, String)>,
231        /// Working directory in the VM.
232        workdir: Option<String>,
233        /// Timeout in milliseconds.
234        #[serde(default)]
235        timeout_ms: Option<u64>,
236        /// Interactive mode - stream I/O instead of buffering.
237        #[serde(default)]
238        interactive: bool,
239        /// Allocate a pseudo-TTY for the command.
240        #[serde(default)]
241        tty: bool,
242        /// Background mode - spawn and return PID immediately without waiting.
243        #[serde(default)]
244        background: bool,
245        /// Data to pipe to the command's stdin.
246        #[serde(default)]
247        stdin_data: Option<String>,
248    },
249
250    /// Run a command in an image's rootfs.
251    ///
252    /// This prepares an overlay, chroots into it, and executes the command.
253    /// Returns stdout, stderr, and exit code when the command completes.
254    Run {
255        /// Image reference (must be pulled first).
256        image: String,
257        /// Command and arguments.
258        command: Vec<String>,
259        /// Environment variables.
260        #[serde(default)]
261        env: Vec<(String, String)>,
262        /// Working directory inside the rootfs.
263        workdir: Option<String>,
264        /// User inside the rootfs. If omitted, the OCI image default applies.
265        #[serde(default, skip_serializing_if = "Option::is_none")]
266        user: Option<String>,
267        /// Volume mounts to bind into the container.
268        /// Each tuple is (virtiofs_tag, container_path, read_only).
269        #[serde(default)]
270        mounts: Vec<(String, String, bool)>,
271        /// Timeout in milliseconds. If the command exceeds this duration,
272        /// it will be killed and return exit code 124.
273        #[serde(default)]
274        timeout_ms: Option<u64>,
275        /// Interactive mode - stream I/O instead of buffering.
276        /// When true, output is streamed via Stdout/Stderr responses,
277        /// and stdin can be sent via the Stdin request.
278        #[serde(default)]
279        interactive: bool,
280        /// Allocate a pseudo-TTY for the command.
281        /// Enables terminal features like colors, line editing, and signal handling.
282        #[serde(default)]
283        tty: bool,
284        /// Detached mode — start the container and return immediately with the
285        /// container ID. Only meaningful when `persistent_overlay_id` is set.
286        /// Returns a `Completed` response with `stdout` containing the container ID.
287        #[serde(default)]
288        detached: bool,
289        /// If set, use a persistent overlay that survives across exec sessions.
290        /// The overlay is identified by this ID (typically the machine name)
291        /// and reused on subsequent runs. If not set, an ephemeral overlay is
292        /// created and destroyed after the run.
293        #[serde(default, skip_serializing_if = "Option::is_none")]
294        persistent_overlay_id: Option<String>,
295        /// Spawn the container and return immediately with the crun PID.
296        /// The container runs detached; stdout/stderr go to /dev/null.
297        /// Incompatible with `interactive` and `tty`.
298        #[serde(default)]
299        background: bool,
300    },
301
302    /// Send stdin data to a running interactive command.
303    Stdin {
304        /// Input data to send to the command's stdin.
305        #[serde(with = "base64_bytes")]
306        data: Vec<u8>,
307    },
308
309    /// Resize the PTY window (for TTY mode).
310    Resize {
311        /// New width in columns.
312        cols: u16,
313        /// New height in rows.
314        rows: u16,
315    },
316
317    // ========================================================================
318    // File I/O
319    // ========================================================================
320    /// Write a file inside the VM in a single message.
321    ///
322    /// Use only for files up to [`FILE_WRITE_SINGLE_SHOT_MAX`]. Larger
323    /// files must stream via [`Self::FileWriteBegin`] +
324    /// [`Self::FileWriteChunk`] to avoid exceeding [`MAX_FRAME_SIZE`]
325    /// after base64 + JSON inflation.
326    FileWrite {
327        /// Absolute path in the VM filesystem.
328        path: String,
329        /// File contents.
330        #[serde(with = "base64_bytes")]
331        data: Vec<u8>,
332        /// File mode (e.g., 0o644). None = default (0644).
333        #[serde(default)]
334        mode: Option<u32>,
335    },
336
337    /// Open a streaming file upload session on this connection.
338    ///
339    /// Must be followed by one or more [`Self::FileWriteChunk`]
340    /// requests. The final chunk sets `done: true` to finalize.
341    /// Dropping the connection (or sending any non-chunk request)
342    /// before `done` aborts the session and leaves no partial file
343    /// at `path`.
344    ///
345    /// Sessions are per-connection — one session at a time.
346    FileWriteBegin {
347        /// Absolute path in the VM filesystem.
348        path: String,
349        /// File mode (e.g., 0o644). None = default (0644).
350        #[serde(default)]
351        mode: Option<u32>,
352        /// Expected total size in bytes. Rejected if it exceeds
353        /// [`FILE_TRANSFER_MAX_TOTAL`]. The agent uses this for an
354        /// early-fail check only; the actual size written is the sum
355        /// of chunk byte lengths.
356        total_size: u64,
357    },
358
359    /// Append a chunk to the currently open streaming upload.
360    /// If `done` is true, the agent fsyncs and atomically renames the
361    /// staging file onto the target path.
362    FileWriteChunk {
363        /// Chunk bytes. Typically [`FILE_WRITE_CHUNK_SIZE`] except
364        /// for the last chunk.
365        #[serde(with = "base64_bytes")]
366        data: Vec<u8>,
367        /// True on the final chunk; closes and renames the staging
368        /// file. False on intermediate chunks.
369        done: bool,
370    },
371
372    /// Read a file from the VM.
373    FileRead {
374        /// Absolute path in the VM filesystem.
375        path: String,
376    },
377}
378
379impl AgentRequest {
380    /// A log-safe one-line summary of the request.
381    ///
382    /// This string is written to the machine's console log, which is exposed
383    /// over the logs API — so it must NEVER include credential- or data-bearing
384    /// fields: registry `auth`, `env` (which can carry host-resolved secrets),
385    /// `proxy` (may embed credentials), or `data` (file/stdin bytes). Only the
386    /// variant name plus a non-secret identifier (image) is emitted.
387    ///
388    /// The match is exhaustive with no catch-all on purpose: adding a new
389    /// variant forces a compile error here, so redaction is a deliberate
390    /// decision rather than an accidental leak in some future request type.
391    pub fn log_summary(&self) -> String {
392        match self {
393            AgentRequest::Ping => "Ping".into(),
394            AgentRequest::Pull { image, .. } => format!("Pull {{ image: {image} }}"),
395            AgentRequest::Query { image, .. } => format!("Query {{ image: {image} }}"),
396            AgentRequest::ListImages => "ListImages".into(),
397            AgentRequest::GarbageCollect { .. } => "GarbageCollect".into(),
398            AgentRequest::PrepareOverlay { .. } => "PrepareOverlay".into(),
399            AgentRequest::CleanupOverlay { .. } => "CleanupOverlay".into(),
400            AgentRequest::FormatStorage => "FormatStorage".into(),
401            AgentRequest::StorageStatus => "StorageStatus".into(),
402            AgentRequest::NetworkTest { .. } => "NetworkTest".into(),
403            AgentRequest::Shutdown => "Shutdown".into(),
404            AgentRequest::ExportLayer { .. } => "ExportLayer".into(),
405            AgentRequest::VmExec { .. } => "VmExec".into(),
406            AgentRequest::Run { image, .. } => format!("Run {{ image: {image} }}"),
407            AgentRequest::Stdin { .. } => "Stdin".into(),
408            AgentRequest::Resize { .. } => "Resize".into(),
409            AgentRequest::FileWrite { .. } => "FileWrite".into(),
410            AgentRequest::FileWriteBegin { .. } => "FileWriteBegin".into(),
411            AgentRequest::FileWriteChunk { .. } => "FileWriteChunk".into(),
412            AgentRequest::FileRead { .. } => "FileRead".into(),
413        }
414    }
415}
416
417/// Agent response types.
418#[derive(Debug, Clone, Serialize, Deserialize)]
419#[serde(tag = "status", rename_all = "snake_case")]
420pub enum AgentResponse {
421    /// Operation completed successfully.
422    Ok {
423        /// Response data (varies by request type).
424        #[serde(default, skip_serializing_if = "Option::is_none")]
425        data: Option<serde_json::Value>,
426    },
427
428    /// Pong response to ping.
429    Pong {
430        /// Protocol version.
431        version: u32,
432    },
433
434    /// Progress update (for long operations like pull).
435    Progress {
436        /// Human-readable message.
437        message: String,
438        /// Completion percentage (0-100).
439        #[serde(default, skip_serializing_if = "Option::is_none")]
440        percent: Option<u8>,
441        /// Current layer being processed.
442        #[serde(default, skip_serializing_if = "Option::is_none")]
443        layer: Option<String>,
444    },
445
446    /// Operation failed.
447    Error {
448        /// Error message.
449        message: String,
450        /// Error code (for programmatic handling).
451        #[serde(default, skip_serializing_if = "Option::is_none")]
452        code: Option<String>,
453    },
454
455    /// Command execution completed (non-interactive mode).
456    Completed {
457        /// Exit code from the command.
458        exit_code: i32,
459        /// Standard output (may be truncated). `Vec<u8>` preserves binary
460        /// output (image bytes, tarballs, etc.) that would be truncated by
461        /// `String` at the first non-UTF-8 byte. Serialized as base64 JSON
462        /// string — the same format as the streaming `Stdout` variant.
463        #[serde(with = "base64_bytes")]
464        stdout: Vec<u8>,
465        /// Standard error (may be truncated).
466        #[serde(with = "base64_bytes")]
467        stderr: Vec<u8>,
468    },
469
470    /// Command started (interactive mode).
471    /// Indicates the command is running and ready to receive stdin.
472    Started,
473
474    /// Stdout data from a running command (interactive mode).
475    Stdout {
476        /// Output data.
477        #[serde(with = "base64_bytes")]
478        data: Vec<u8>,
479    },
480
481    /// Stderr data from a running command (interactive mode).
482    Stderr {
483        /// Error output data.
484        #[serde(with = "base64_bytes")]
485        data: Vec<u8>,
486    },
487
488    /// Command exited (interactive mode).
489    Exited {
490        /// Exit code from the command.
491        exit_code: i32,
492    },
493
494    /// Streaming binary-data chunk.
495    ///
496    /// Used by every streaming download direction: the agent sends
497    /// one or more `DataChunk` responses in sequence, with `done: true`
498    /// on the final chunk. Current producers: `ExportLayer` and
499    /// `FileRead`.
500    ///
501    /// Payload size per chunk should stay under
502    /// [`LAYER_CHUNK_SIZE`] so the encoded frame (~1.33× after
503    /// base64) fits inside [`MAX_FRAME_SIZE`] with JSON overhead to
504    /// spare.
505    DataChunk {
506        /// Chunk bytes. Empty allowed on the final frame (common for
507        /// EOF-on-clean-boundary cases).
508        #[serde(with = "base64_bytes")]
509        data: Vec<u8>,
510        /// True on the final chunk of the stream.
511        done: bool,
512    },
513}
514
515// ============================================================================
516// Error Code Constants
517// ============================================================================
518//
519// Standard error codes for AgentResponse::Error. Using constants ensures
520// consistency across the codebase and makes error handling more reliable.
521
522/// Error codes for agent responses.
523pub mod error_codes {
524    /// Request payload was invalid or malformed.
525    pub const INVALID_REQUEST: &str = "INVALID_REQUEST";
526    /// Requested resource was not found.
527    pub const NOT_FOUND: &str = "NOT_FOUND";
528    /// Internal error during operation.
529    pub const INTERNAL_ERROR: &str = "INTERNAL_ERROR";
530    /// Image pull operation failed.
531    pub const PULL_FAILED: &str = "PULL_FAILED";
532    /// Image query operation failed.
533    pub const QUERY_FAILED: &str = "QUERY_FAILED";
534    /// Command execution failed.
535    pub const RUN_FAILED: &str = "RUN_FAILED";
536    /// Command execution failed in container.
537    pub const EXEC_FAILED: &str = "EXEC_FAILED";
538    /// Process spawn failed.
539    pub const SPAWN_FAILED: &str = "SPAWN_FAILED";
540    /// Mount operation failed.
541    pub const MOUNT_FAILED: &str = "MOUNT_FAILED";
542    /// File I/O operation failed.
543    pub const FILE_IO_FAILED: &str = "FILE_IO_FAILED";
544    /// Overlay filesystem operation failed.
545    pub const OVERLAY_FAILED: &str = "OVERLAY_FAILED";
546    /// Cleanup operation failed.
547    pub const CLEANUP_FAILED: &str = "CLEANUP_FAILED";
548    /// Storage format operation failed.
549    pub const FORMAT_FAILED: &str = "FORMAT_FAILED";
550    /// Storage status query failed.
551    pub const STATUS_FAILED: &str = "STATUS_FAILED";
552    /// List operation failed.
553    pub const LIST_FAILED: &str = "LIST_FAILED";
554    /// Garbage collection failed.
555    pub const GC_FAILED: &str = "GC_FAILED";
556    /// Container creation failed.
557    pub const CREATE_FAILED: &str = "CREATE_FAILED";
558    /// Container start failed.
559    pub const START_FAILED: &str = "START_FAILED";
560    /// Container stop failed.
561    pub const STOP_FAILED: &str = "STOP_FAILED";
562    /// Container delete failed.
563    pub const DELETE_FAILED: &str = "DELETE_FAILED";
564    /// Export operation failed.
565    pub const EXPORT_FAILED: &str = "EXPORT_FAILED";
566    /// Serialization error.
567    pub const SERIALIZATION_ERROR: &str = "SERIALIZATION_ERROR";
568    /// Message size exceeds maximum.
569    pub const MESSAGE_TOO_LARGE: &str = "MESSAGE_TOO_LARGE";
570    /// Process wait operation failed.
571    pub const WAIT_FAILED: &str = "WAIT_FAILED";
572}
573
574impl AgentResponse {
575    /// Create an error response with the given message and code.
576    ///
577    /// # Example
578    ///
579    /// ```
580    /// use smolvm_protocol::{AgentResponse, error_codes};
581    ///
582    /// let response = AgentResponse::error("image not found", error_codes::NOT_FOUND);
583    /// ```
584    pub fn error(message: impl Into<String>, code: &str) -> Self {
585        AgentResponse::Error {
586            message: message.into(),
587            code: Some(code.to_string()),
588        }
589    }
590
591    /// Create an error response from a Result's error, with the given code.
592    ///
593    /// # Example
594    ///
595    /// ```ignore
596    /// let response = some_operation()
597    ///     .map(|data| AgentResponse::ok_with_data(data))
598    ///     .unwrap_or_else(|e| AgentResponse::from_err(e, error_codes::PULL_FAILED));
599    /// ```
600    pub fn from_err<E: std::fmt::Display>(err: E, code: &str) -> Self {
601        AgentResponse::Error {
602            message: err.to_string(),
603            code: Some(code.to_string()),
604        }
605    }
606
607    /// Create an Ok response with optional JSON data.
608    pub fn ok(data: Option<serde_json::Value>) -> Self {
609        AgentResponse::Ok { data }
610    }
611
612    /// Create an Ok response with JSON-serializable data.
613    ///
614    /// Returns an error response if serialization fails.
615    pub fn ok_with_data<T: serde::Serialize>(data: T) -> Self {
616        match serde_json::to_value(data) {
617            Ok(value) => AgentResponse::Ok { data: Some(value) },
618            Err(e) => AgentResponse::error(
619                format!("failed to serialize response: {}", e),
620                error_codes::SERIALIZATION_ERROR,
621            ),
622        }
623    }
624
625    /// Convert a Result into an AgentResponse.
626    ///
627    /// On success, serializes the value to JSON. On error, creates an error response.
628    ///
629    /// # Example
630    ///
631    /// ```ignore
632    /// let response = AgentResponse::from_result(
633    ///     storage::pull_image(image),
634    ///     error_codes::PULL_FAILED,
635    /// );
636    /// ```
637    pub fn from_result<T, E>(result: Result<T, E>, error_code: &str) -> Self
638    where
639        T: serde::Serialize,
640        E: std::fmt::Display,
641    {
642        match result {
643            Ok(data) => Self::ok_with_data(data),
644            Err(e) => Self::from_err(e, error_code),
645        }
646    }
647}
648
649/// Image information returned by Query/ListImages.
650#[derive(Debug, Clone, Serialize, Deserialize)]
651pub struct ImageInfo {
652    /// Image reference.
653    pub reference: String,
654    /// Image digest (sha256:...).
655    pub digest: String,
656    /// Image size in bytes.
657    pub size: u64,
658    /// Creation timestamp (ISO 8601).
659    pub created: Option<String>,
660    /// Platform architecture.
661    pub architecture: String,
662    /// Platform OS.
663    pub os: String,
664    /// Number of layers.
665    pub layer_count: usize,
666    /// Layer digests in order.
667    pub layers: Vec<String>,
668    /// Image entrypoint (from OCI config).
669    #[serde(default)]
670    pub entrypoint: Vec<String>,
671    /// Image default command (from OCI config).
672    #[serde(default)]
673    pub cmd: Vec<String>,
674    /// Image environment variables (from OCI config).
675    #[serde(default)]
676    pub env: Vec<String>,
677    /// Image working directory (from OCI config).
678    #[serde(default)]
679    pub workdir: Option<String>,
680    /// Image default user (from OCI config).
681    #[serde(default)]
682    pub user: Option<String>,
683}
684
685/// Overlay preparation result.
686#[derive(Debug, Clone, Serialize, Deserialize)]
687pub struct OverlayInfo {
688    /// Path to the merged overlay rootfs.
689    pub rootfs_path: String,
690    /// Path to the upper (writable) directory.
691    pub upper_path: String,
692    /// Path to the work directory.
693    pub work_path: String,
694}
695
696/// Storage status information.
697#[derive(Debug, Clone, Serialize, Deserialize)]
698pub struct StorageStatus {
699    /// Whether the storage is formatted and ready.
700    pub ready: bool,
701    /// Total size in bytes.
702    pub total_bytes: u64,
703    /// Used size in bytes.
704    pub used_bytes: u64,
705    /// Number of cached layers.
706    pub layer_count: usize,
707    /// Number of cached images.
708    pub image_count: usize,
709}
710
711/// Registry authentication credentials for pulling images.
712///
713/// `Debug` is hand-written to redact the password: this value is carried inside
714/// `AgentRequest::Pull`, and any `{:?}` of that request (e.g. a tracing span)
715/// would otherwise serialize the token verbatim into the machine's console log,
716/// which is exposed over the logs API.
717#[derive(Clone, Serialize, Deserialize)]
718pub struct RegistryAuth {
719    /// Username for authentication.
720    pub username: String,
721    /// Password or token for authentication.
722    pub password: String,
723}
724
725impl std::fmt::Debug for RegistryAuth {
726    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
727        f.debug_struct("RegistryAuth")
728            .field("username", &self.username)
729            .field("password", &"***")
730            .finish()
731    }
732}
733
734// ============================================================================
735// Workload VM Protocol (Command Execution)
736// ============================================================================
737
738/// Messages from host to workload VM.
739#[derive(Debug, Clone, Serialize, Deserialize)]
740#[serde(tag = "type", rename_all = "snake_case")]
741pub enum HostMessage {
742    /// Authentication request.
743    Auth {
744        /// Authentication token (base64).
745        token: String,
746        /// Protocol version.
747        protocol_version: u32,
748    },
749
750    /// Run a command.
751    Run {
752        /// Request ID for correlating responses.
753        request_id: u64,
754        /// Command and arguments.
755        command: Vec<String>,
756        /// Environment variables.
757        env: Vec<(String, String)>,
758        /// Working directory.
759        workdir: Option<String>,
760    },
761
762    /// Execute a command in running VM.
763    Exec {
764        /// Request ID.
765        request_id: u64,
766        /// Command and arguments.
767        command: Vec<String>,
768        /// Allocate a TTY.
769        tty: bool,
770    },
771
772    /// Send a signal to a running command.
773    Signal {
774        /// Request ID of the command.
775        request_id: u64,
776        /// Signal number.
777        signal: i32,
778    },
779
780    /// Request graceful shutdown.
781    Stop {
782        /// Timeout in milliseconds.
783        timeout_ms: u64,
784    },
785}
786
787/// Messages from workload VM to host.
788#[derive(Debug, Clone, Serialize, Deserialize)]
789#[serde(tag = "type", rename_all = "snake_case")]
790pub enum GuestMessage {
791    /// Authentication successful.
792    AuthOk,
793
794    /// Authentication failed.
795    AuthFailed,
796
797    /// VM is ready to receive commands.
798    Ready,
799
800    /// Command started.
801    Started {
802        /// Request ID.
803        request_id: u64,
804    },
805
806    /// Stdout data from command.
807    Stdout {
808        /// Request ID.
809        request_id: u64,
810        /// Output data.
811        #[serde(with = "base64_bytes")]
812        data: Vec<u8>,
813        /// Whether output was truncated.
814        truncated: bool,
815    },
816
817    /// Stderr data from command.
818    Stderr {
819        /// Request ID.
820        request_id: u64,
821        /// Output data.
822        #[serde(with = "base64_bytes")]
823        data: Vec<u8>,
824        /// Whether output was truncated.
825        truncated: bool,
826    },
827
828    /// Command exited.
829    Exit {
830        /// Request ID.
831        request_id: u64,
832        /// Exit code.
833        code: i32,
834        /// Exit reason.
835        reason: String,
836    },
837
838    /// Error occurred.
839    Error {
840        /// Request ID (if applicable).
841        request_id: Option<u64>,
842        /// Error message.
843        message: String,
844    },
845}
846
847// ============================================================================
848// Wire Format Helpers
849// ============================================================================
850
851/// Envelope that wraps any message with an optional trace ID for correlation.
852///
853/// On the wire, the trace_id is flattened into the JSON alongside the message
854/// fields: `{"trace_id":"abc123","method":"ping"}`.
855#[derive(Debug, Clone, Serialize, Deserialize)]
856pub struct Envelope<T> {
857    /// Trace ID for correlating host API requests to agent operations.
858    #[serde(skip_serializing_if = "Option::is_none", default)]
859    pub trace_id: Option<String>,
860    /// The wrapped message.
861    #[serde(flatten)]
862    pub body: T,
863}
864
865impl<T> Envelope<T> {
866    /// Create an envelope with no trace ID.
867    pub fn new(body: T) -> Self {
868        Self {
869            trace_id: None,
870            body,
871        }
872    }
873
874    /// Create an envelope with an optional trace ID.
875    pub fn with_trace_id(body: T, trace_id: Option<String>) -> Self {
876        Self { trace_id, body }
877    }
878}
879
880/// Encode a message to wire format (length-prefixed JSON).
881pub fn encode_message<T: Serialize>(msg: &T) -> Result<Vec<u8>, serde_json::Error> {
882    let json = serde_json::to_vec(msg)?;
883    let len = json.len() as u32;
884
885    let mut buf = Vec::with_capacity(4 + json.len());
886    buf.extend_from_slice(&len.to_be_bytes());
887    buf.extend_from_slice(&json);
888
889    Ok(buf)
890}
891
892/// Decode a message from wire format.
893pub fn decode_message<T: for<'de> Deserialize<'de>>(data: &[u8]) -> Result<T, DecodeError> {
894    if data.len() < 4 {
895        return Err(DecodeError::TooShort);
896    }
897
898    let len = u32::from_be_bytes([data[0], data[1], data[2], data[3]]) as usize;
899
900    if len > MAX_FRAME_SIZE as usize {
901        return Err(DecodeError::TooLarge(len));
902    }
903
904    if data.len() < 4 + len {
905        return Err(DecodeError::Incomplete {
906            expected: len,
907            got: data.len() - 4,
908        });
909    }
910
911    serde_json::from_slice(&data[4..4 + len]).map_err(DecodeError::Json)
912}
913
914/// Error decoding a wire message.
915#[derive(Debug)]
916pub enum DecodeError {
917    /// Data too short to contain length header.
918    TooShort,
919    /// Frame size exceeds maximum.
920    TooLarge(usize),
921    /// Incomplete frame.
922    Incomplete {
923        /// Expected length.
924        expected: usize,
925        /// Actual length.
926        got: usize,
927    },
928    /// JSON parse error.
929    Json(serde_json::Error),
930}
931
932impl std::fmt::Display for DecodeError {
933    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
934        match self {
935            DecodeError::TooShort => write!(f, "data too short for length header"),
936            DecodeError::TooLarge(size) => write!(f, "frame too large: {} bytes", size),
937            DecodeError::Incomplete { expected, got } => {
938                write!(
939                    f,
940                    "incomplete frame: expected {} bytes, got {}",
941                    expected, got
942                )
943            }
944            DecodeError::Json(e) => write!(f, "JSON decode error: {}", e),
945        }
946    }
947}
948
949impl std::error::Error for DecodeError {}
950
951#[cfg(test)]
952mod tests {
953    use super::*;
954
955    #[test]
956    fn test_encode_decode_roundtrip() {
957        let req = AgentRequest::Pull {
958            image: "alpine:latest".to_string(),
959            oci_platform: Some("linux/arm64".to_string()),
960            auth: None,
961            proxy: None,
962            no_proxy: None,
963        };
964
965        let encoded = encode_message(&req).unwrap();
966        let decoded: AgentRequest = decode_message(&encoded).unwrap();
967
968        let AgentRequest::Pull {
969            image,
970            oci_platform,
971            auth,
972            proxy,
973            no_proxy,
974        } = decoded
975        else {
976            panic!("expected Pull variant, got {:?}", decoded);
977        };
978        assert_eq!(image, "alpine:latest");
979        assert_eq!(oci_platform, Some("linux/arm64".to_string()));
980        assert!(auth.is_none());
981        assert!(proxy.is_none());
982        assert!(no_proxy.is_none());
983    }
984
985    #[test]
986    fn test_encode_decode_with_auth() {
987        let req = AgentRequest::Pull {
988            image: "ghcr.io/owner/repo:latest".to_string(),
989            oci_platform: None,
990            auth: Some(RegistryAuth {
991                username: "testuser".to_string(),
992                password: "testpass".to_string(),
993            }),
994            proxy: None,
995            no_proxy: None,
996        };
997
998        let encoded = encode_message(&req).unwrap();
999        let decoded: AgentRequest = decode_message(&encoded).unwrap();
1000
1001        let AgentRequest::Pull {
1002            image,
1003            oci_platform,
1004            auth,
1005            proxy: _,
1006            no_proxy: _,
1007        } = decoded
1008        else {
1009            panic!("expected Pull variant, got {:?}", decoded);
1010        };
1011        assert_eq!(image, "ghcr.io/owner/repo:latest");
1012        assert!(oci_platform.is_none());
1013        let auth = auth.expect("auth should be Some");
1014        assert_eq!(auth.username, "testuser");
1015        assert_eq!(auth.password, "testpass");
1016    }
1017
1018    #[test]
1019    fn test_encode_decode_with_proxy() {
1020        let req = AgentRequest::Pull {
1021            image: "alpine:latest".to_string(),
1022            oci_platform: None,
1023            auth: None,
1024            proxy: Some("http://192.168.127.254:3128".to_string()),
1025            no_proxy: Some("127.0.0.1,localhost,.internal".to_string()),
1026        };
1027
1028        let encoded = encode_message(&req).unwrap();
1029        let decoded: AgentRequest = decode_message(&encoded).unwrap();
1030
1031        let AgentRequest::Pull {
1032            proxy, no_proxy, ..
1033        } = decoded
1034        else {
1035            panic!("expected Pull variant, got {:?}", decoded);
1036        };
1037        assert_eq!(proxy.as_deref(), Some("http://192.168.127.254:3128"));
1038        assert_eq!(no_proxy.as_deref(), Some("127.0.0.1,localhost,.internal"));
1039    }
1040
1041    #[test]
1042    fn test_decode_too_short() {
1043        let data = [0u8; 2];
1044        let result: Result<AgentRequest, _> = decode_message(&data);
1045        assert!(matches!(result, Err(DecodeError::TooShort)));
1046    }
1047
1048    #[test]
1049    fn test_decode_incomplete() {
1050        let mut data = vec![0, 0, 0, 100]; // claims 100 bytes
1051        data.extend_from_slice(b"{}"); // only 2 bytes of payload
1052        let result: Result<AgentRequest, _> = decode_message(&data);
1053        assert!(matches!(result, Err(DecodeError::Incomplete { .. })));
1054    }
1055
1056    #[test]
1057    fn test_agent_request_serialization() {
1058        let req = AgentRequest::Ping;
1059        let json = serde_json::to_string(&req).unwrap();
1060        assert!(json.contains("ping"));
1061
1062        let req = AgentRequest::PrepareOverlay {
1063            image: "ubuntu:22.04".to_string(),
1064            workload_id: "wl-123".to_string(),
1065        };
1066        let json = serde_json::to_string(&req).unwrap();
1067        assert!(json.contains("prepare_overlay"));
1068    }
1069
1070    #[test]
1071    fn test_agent_response_serialization() {
1072        let resp = AgentResponse::Pong {
1073            version: PROTOCOL_VERSION,
1074        };
1075        let json = serde_json::to_string(&resp).unwrap();
1076        assert!(json.contains("pong"));
1077
1078        let resp = AgentResponse::Progress {
1079            message: "Pulling layer 1/3".to_string(),
1080            percent: Some(33),
1081            layer: Some("sha256:abc123".to_string()),
1082        };
1083        let json = serde_json::to_string(&resp).unwrap();
1084        assert!(json.contains("progress"));
1085    }
1086
1087    #[test]
1088    fn file_write_begin_roundtrips() {
1089        let req = AgentRequest::FileWriteBegin {
1090            path: "/tmp/target".into(),
1091            mode: Some(0o600),
1092            total_size: 123_456_789,
1093        };
1094        let bytes = encode_message(&req).unwrap();
1095        let back: AgentRequest = decode_message(&bytes).unwrap();
1096        match back {
1097            AgentRequest::FileWriteBegin {
1098                path,
1099                mode,
1100                total_size,
1101            } => {
1102                assert_eq!(path, "/tmp/target");
1103                assert_eq!(mode, Some(0o600));
1104                assert_eq!(total_size, 123_456_789);
1105            }
1106            _ => panic!("wrong variant"),
1107        }
1108    }
1109
1110    #[test]
1111    fn file_write_chunk_roundtrips_binary_data() {
1112        // Binary data (bytes outside UTF-8) must survive the base64
1113        // trip intact. If the encoding ever silently lossifies, this
1114        // fires.
1115        let payload: Vec<u8> = (0u8..=255).collect();
1116        let req = AgentRequest::FileWriteChunk {
1117            data: payload.clone(),
1118            done: true,
1119        };
1120        let bytes = encode_message(&req).unwrap();
1121        let back: AgentRequest = decode_message(&bytes).unwrap();
1122        match back {
1123            AgentRequest::FileWriteChunk { data, done } => {
1124                assert_eq!(data, payload);
1125                assert!(done);
1126            }
1127            _ => panic!("wrong variant"),
1128        }
1129    }
1130
1131    #[test]
1132    fn file_write_size_constants_are_frame_safe() {
1133        // Sanity: a single streaming chunk at FILE_WRITE_CHUNK_SIZE
1134        // must fit inside MAX_FRAME_SIZE after base64 (+ ~33%) and
1135        // JSON overhead. If anyone bumps CHUNK_SIZE past the limit,
1136        // this test fires before production does.
1137        let chunk_bytes = FILE_WRITE_CHUNK_SIZE as u64;
1138        let base64_bytes = chunk_bytes.div_ceil(3) * 4; // ceil(n/3)*4
1139        let json_overhead = 256u64; // method tag, done bool, quotes
1140        let total = base64_bytes + json_overhead;
1141        assert!(
1142            total < MAX_FRAME_SIZE as u64,
1143            "FILE_WRITE_CHUNK_SIZE of {} bytes would produce a frame \
1144             of ~{} bytes which exceeds MAX_FRAME_SIZE of {}",
1145            chunk_bytes,
1146            total,
1147            MAX_FRAME_SIZE
1148        );
1149
1150        // Single-shot threshold must be <= chunk size. They can be
1151        // equal (a 1 MiB file is a single shot; a 1 MiB + 1 byte
1152        // file streams as two chunks); but SINGLE_SHOT > CHUNK would
1153        // be incoherent — a file slightly over the shot threshold
1154        // would need to stream as... a single oversized chunk.
1155        assert!(FILE_WRITE_SINGLE_SHOT_MAX <= FILE_WRITE_CHUNK_SIZE);
1156    }
1157
1158    #[test]
1159    fn test_ports_constants() {
1160        assert_eq!(ports::WORKLOAD_CONTROL, 5000);
1161        assert_eq!(ports::WORKLOAD_LOGS, 5001);
1162        assert_eq!(ports::AGENT_CONTROL, 6000);
1163        assert_eq!(ports::SSH_AGENT, 6001);
1164    }
1165
1166    #[test]
1167    fn test_cid_constants() {
1168        assert_eq!(cid::HOST, 2);
1169        assert_eq!(cid::GUEST, 3);
1170    }
1171
1172    #[test]
1173    fn test_envelope_serialization_with_trace_id() {
1174        let req = AgentRequest::Ping;
1175        let envelope = Envelope::with_trace_id(&req, Some("abc123".to_string()));
1176        let json = serde_json::to_string(&envelope).unwrap();
1177
1178        // trace_id should be flattened alongside the method tag
1179        assert!(json.contains("\"trace_id\":\"abc123\""));
1180        assert!(json.contains("\"method\":\"ping\""));
1181
1182        // Deserialize back — Envelope<AgentRequest> with flatten
1183        let parsed: Envelope<AgentRequest> = serde_json::from_str(&json).unwrap();
1184        assert_eq!(parsed.trace_id.as_deref(), Some("abc123"));
1185        assert!(matches!(parsed.body, AgentRequest::Ping));
1186    }
1187
1188    #[test]
1189    fn test_envelope_without_trace_id() {
1190        let req = AgentRequest::Ping;
1191        let envelope = Envelope::new(&req);
1192        let json = serde_json::to_string(&envelope).unwrap();
1193
1194        // No trace_id field (skip_serializing_if = None)
1195        assert!(!json.contains("trace_id"));
1196        assert!(json.contains("\"method\":\"ping\""));
1197    }
1198
1199    #[test]
1200    fn test_envelope_backward_compat_bare_request() {
1201        // A bare AgentRequest (no Envelope) should fail to parse as Envelope
1202        // but succeed as bare AgentRequest — this is the agent's fallback path
1203        let bare_json = r#"{"method":"ping"}"#;
1204
1205        // Envelope parse should fail (no body field to flatten into)
1206        // Actually with flatten, this may work — let's verify
1207        let envelope_result = serde_json::from_str::<Envelope<AgentRequest>>(bare_json);
1208        let bare_result = serde_json::from_str::<AgentRequest>(bare_json);
1209
1210        // At least one must succeed for backward compat
1211        assert!(
1212            envelope_result.is_ok() || bare_result.is_ok(),
1213            "Neither Envelope nor bare parse succeeded"
1214        );
1215
1216        // Bare parse must always work
1217        assert!(bare_result.is_ok());
1218        assert!(matches!(bare_result.unwrap(), AgentRequest::Ping));
1219
1220        // If Envelope works, trace_id should be None
1221        if let Ok(env) = envelope_result {
1222            assert!(env.trace_id.is_none());
1223        }
1224    }
1225}