smolvm_protocol/lib.rs
1//! Protocol types for smolvm host-guest communication.
2//!
3//! This crate defines the wire protocol for vsock communication between
4//! the smolvm host and the guest agent (smolvm-agent).
5//!
6//! # Protocol Overview
7//!
8//! Communication uses JSON-encoded messages over vsock. Each message is
9//! prefixed with a 4-byte big-endian length header.
10//!
11//! ```text
12//! +----------------+-------------------+
13//! | Length (4 BE) | JSON payload |
14//! +----------------+-------------------+
15//! ```
16
17#![deny(missing_docs)]
18
19use serde::{Deserialize, Serialize};
20
21pub mod guest_env;
22pub mod image_ref;
23pub mod retry;
24pub mod secrets;
25
26pub use image_ref::normalize_image_ref;
27pub use secrets::{SecretRef, SecretSourceKind};
28
29/// Serde helper for encoding `Vec<u8>` as a base64 string in JSON.
30///
31/// Without this, serde_json serializes `Vec<u8>` as a JSON array of numbers
32/// (e.g., `[104,101,108,108,111]`), which inflates binary data by ~4x.
33/// Base64 encoding reduces this to ~1.33x.
34pub mod base64_bytes {
35 use base64::{engine::general_purpose::STANDARD, Engine};
36 use serde::{Deserialize, Deserializer, Serializer};
37
38 /// Serialize `Vec<u8>` as a base64 string.
39 pub fn serialize<S: Serializer>(data: &[u8], serializer: S) -> Result<S::Ok, S::Error> {
40 serializer.serialize_str(&STANDARD.encode(data))
41 }
42
43 /// Deserialize a base64 string into `Vec<u8>`.
44 pub fn deserialize<'de, D: Deserializer<'de>>(deserializer: D) -> Result<Vec<u8>, D::Error> {
45 let s = String::deserialize(deserializer)?;
46 STANDARD.decode(&s).map_err(serde::de::Error::custom)
47 }
48}
49
50/// Protocol version.
51pub const PROTOCOL_VERSION: u32 = 1;
52
53/// Maximum frame size (32 MB - layer exports use chunked streaming).
54pub const MAX_FRAME_SIZE: u32 = 32 * 1024 * 1024;
55
56/// Chunk size for streaming layer data (~16 MB raw, ~21 MB as base64 JSON).
57pub const LAYER_CHUNK_SIZE: usize = 16 * 1024 * 1024;
58
59/// Files at or below this size are written with a single `FileWrite`
60/// message. Larger files must stream via
61/// `FileWriteBegin` + `FileWriteChunk` so no single frame approaches
62/// [`MAX_FRAME_SIZE`] (base64 + JSON inflation is ~1.4x).
63///
64/// Chosen to keep the single-shot frame comfortably under the frame
65/// limit while preserving the fast-path latency for small config
66/// files / scripts / keys.
67pub const FILE_WRITE_SINGLE_SHOT_MAX: usize = 1024 * 1024;
68
69/// Payload bytes per streaming upload chunk. Deliberately small —
70/// equal to [`FILE_WRITE_SINGLE_SHOT_MAX`] — so each chunk's encoded
71/// frame (~1.4 MB) fits inside typical kernel Unix-socket send
72/// buffers (`SO_SNDBUF` defaults on the order of 200–256 KiB but
73/// can grow). Larger chunks would force `write_all` to spin waiting
74/// for the agent to drain, and any latency spike trips the 10 s
75/// write timeout with `EAGAIN` — exactly the failure David
76/// reproduced before this fix landed.
77///
78/// Note: [`LAYER_CHUNK_SIZE`] is 16 MiB for agent→host (download)
79/// streaming, which works because the host side of the socket has
80/// more headroom than the guest side. Upload streaming is the
81/// asymmetric case and needs a smaller chunk.
82pub const FILE_WRITE_CHUNK_SIZE: usize = FILE_WRITE_SINGLE_SHOT_MAX;
83
84/// Hard ceiling on a single file transfer in either direction.
85///
86/// On the write path: enforced at `FileWriteBegin` by the agent —
87/// `total_size > FILE_TRANSFER_MAX_TOTAL` is rejected before any
88/// staging file is created.
89///
90/// On the read path: enforced by the host's `read_file` loop —
91/// after the first chunk that pushes the accumulated total past the
92/// cap, the call bails with an error and the partial buffer is
93/// dropped. This protects the host process from OOM if the guest
94/// (compromised or merely buggy) streams unbounded data.
95///
96/// 4 GiB matches the order-of-magnitude of the default overlay disk
97/// and the `gpu_vram_mib` cap. Callers that need to move larger
98/// blobs should stage via a virtiofs mount instead of `cp`.
99pub const FILE_TRANSFER_MAX_TOTAL: u64 = 4 * 1024 * 1024 * 1024;
100
101/// Filename of the virtiofs-visible marker the agent creates when it is
102/// ready to accept vsock connections.
103///
104/// The host polls for this file through its virtiofs mount of the guest
105/// rootfs. The agent writes it (and optionally a symlink from `/oldroot/`)
106/// during deferred init, just before opening the vsock listener.
107///
108/// Both sides must agree on this name; keeping it here prevents silent drift.
109pub const AGENT_READY_MARKER: &str = ".smolvm-ready";
110
111/// Well-known vsock ports.
112pub mod ports {
113 /// Control channel for workload VMs.
114 pub const WORKLOAD_CONTROL: u32 = 5000;
115 /// Log streaming from workload VMs.
116 pub const WORKLOAD_LOGS: u32 = 5001;
117 /// Agent control port (for OCI operations and management).
118 pub const AGENT_CONTROL: u32 = 6000;
119 /// SSH agent forwarding (host SSH_AUTH_SOCK bridged to guest).
120 pub const SSH_AGENT: u32 = 6001;
121 /// DNS filtering proxy (guest forwards DNS queries to host for filtering).
122 pub const DNS_FILTER: u32 = 6002;
123}
124
125/// vsock CID constants.
126pub mod cid {
127 /// Host CID (always 2).
128 pub const HOST: u32 = 2;
129 /// Guest CID (always 3 for the first/only guest).
130 pub const GUEST: u32 = 3;
131 /// Any CID (for listening).
132 pub const ANY: u32 = u32::MAX;
133}
134
135// ============================================================================
136// Agent Protocol (OCI Operations)
137// ============================================================================
138
139/// Agent request types (for image management and OCI operations).
140#[derive(Debug, Clone, Serialize, Deserialize)]
141#[serde(tag = "method", rename_all = "snake_case")]
142pub enum AgentRequest {
143 /// Ping to check if agent is alive.
144 Ping,
145
146 /// Pull an OCI image and extract layers.
147 Pull {
148 /// Image reference (e.g., "alpine:latest", "docker.io/library/ubuntu:22.04").
149 image: String,
150 /// OCI platform to pull (e.g., "linux/arm64", "linux/amd64").
151 oci_platform: Option<String>,
152 /// Optional registry authentication credentials.
153 #[serde(default, skip_serializing_if = "Option::is_none")]
154 auth: Option<RegistryAuth>,
155 /// Proxy URL applied to the registry client (sets HTTP_PROXY and HTTPS_PROXY).
156 #[serde(default, skip_serializing_if = "Option::is_none")]
157 proxy: Option<String>,
158 /// Comma-separated NO_PROXY list of hosts/CIDRs that bypass the proxy.
159 #[serde(default, skip_serializing_if = "Option::is_none")]
160 no_proxy: Option<String>,
161 },
162
163 /// Query if an image exists locally.
164 Query {
165 /// Image reference.
166 image: String,
167 },
168
169 /// List all cached images.
170 ListImages,
171
172 /// Run garbage collection on unused layers.
173 GarbageCollect {
174 /// If true, only report what would be deleted.
175 dry_run: bool,
176 /// If true, delete all image manifests and configs first,
177 /// making all layers unreferenced so they get collected.
178 #[serde(default)]
179 purge_all: bool,
180 },
181
182 /// Prepare overlay rootfs for a workload.
183 PrepareOverlay {
184 /// Image reference.
185 image: String,
186 /// Unique workload ID for the overlay.
187 workload_id: String,
188 },
189
190 /// Clean up overlay rootfs for a workload.
191 CleanupOverlay {
192 /// Workload ID to clean up.
193 workload_id: String,
194 },
195
196 /// Format the storage disk (first-time setup).
197 FormatStorage,
198
199 /// Get storage disk status.
200 StorageStatus,
201
202 /// Test network connectivity directly from the agent (not via chroot).
203 /// Used to debug TSI networking.
204 NetworkTest {
205 /// URL to test (e.g., "http://1.1.1.1")
206 url: String,
207 },
208
209 /// Shutdown the agent.
210 Shutdown,
211
212 /// Export a layer as a tar archive.
213 ///
214 /// Used by `smolvm pack` to extract OCI layers for packaging.
215 /// The agent streams the layer tar data back via LayerData responses.
216 ExportLayer {
217 /// Image digest (sha256:...).
218 image_digest: String,
219 /// Layer index (0-based).
220 layer_index: usize,
221 },
222
223 /// Execute a command directly in the VM (not in a container).
224 ///
225 /// This runs the command in the agent's Alpine rootfs without any
226 /// container isolation. Useful for VM-level operations and debugging.
227 VmExec {
228 /// Command and arguments.
229 command: Vec<String>,
230 /// Environment variables.
231 #[serde(default)]
232 env: Vec<(String, String)>,
233 /// Working directory in the VM.
234 workdir: Option<String>,
235 /// Timeout in milliseconds.
236 #[serde(default)]
237 timeout_ms: Option<u64>,
238 /// Interactive mode - stream I/O instead of buffering.
239 #[serde(default)]
240 interactive: bool,
241 /// Allocate a pseudo-TTY for the command.
242 #[serde(default)]
243 tty: bool,
244 /// Background mode - spawn and return PID immediately without waiting.
245 #[serde(default)]
246 background: bool,
247 /// Data to pipe to the command's stdin.
248 #[serde(default)]
249 stdin_data: Option<String>,
250 },
251
252 /// Run a command in an image's rootfs.
253 ///
254 /// This prepares an overlay, chroots into it, and executes the command.
255 /// Returns stdout, stderr, and exit code when the command completes.
256 Run {
257 /// Image reference (must be pulled first).
258 image: String,
259 /// Command and arguments.
260 command: Vec<String>,
261 /// Environment variables.
262 #[serde(default)]
263 env: Vec<(String, String)>,
264 /// Working directory inside the rootfs.
265 workdir: Option<String>,
266 /// User inside the rootfs. If omitted, the OCI image default applies.
267 #[serde(default, skip_serializing_if = "Option::is_none")]
268 user: Option<String>,
269 /// Volume mounts to bind into the container.
270 /// Each tuple is (virtiofs_tag, container_path, read_only).
271 #[serde(default)]
272 mounts: Vec<(String, String, bool)>,
273 /// Timeout in milliseconds. If the command exceeds this duration,
274 /// it will be killed and return exit code 124.
275 #[serde(default)]
276 timeout_ms: Option<u64>,
277 /// Interactive mode - stream I/O instead of buffering.
278 /// When true, output is streamed via Stdout/Stderr responses,
279 /// and stdin can be sent via the Stdin request.
280 #[serde(default)]
281 interactive: bool,
282 /// Allocate a pseudo-TTY for the command.
283 /// Enables terminal features like colors, line editing, and signal handling.
284 #[serde(default)]
285 tty: bool,
286 /// Detached mode — start the container and return immediately with the
287 /// container ID. Only meaningful when `persistent_overlay_id` is set.
288 /// Returns a `Completed` response with `stdout` containing the container ID.
289 #[serde(default)]
290 detached: bool,
291 /// If set, use a persistent overlay that survives across exec sessions.
292 /// The overlay is identified by this ID (typically the machine name)
293 /// and reused on subsequent runs. If not set, an ephemeral overlay is
294 /// created and destroyed after the run.
295 #[serde(default, skip_serializing_if = "Option::is_none")]
296 persistent_overlay_id: Option<String>,
297 /// Spawn the container and return immediately with the crun PID.
298 /// The container runs detached; stdout/stderr go to /dev/null.
299 /// Incompatible with `interactive` and `tty`.
300 #[serde(default)]
301 background: bool,
302 },
303
304 /// Send stdin data to a running interactive command.
305 Stdin {
306 /// Input data to send to the command's stdin.
307 #[serde(with = "base64_bytes")]
308 data: Vec<u8>,
309 },
310
311 /// Resize the PTY window (for TTY mode).
312 Resize {
313 /// New width in columns.
314 cols: u16,
315 /// New height in rows.
316 rows: u16,
317 },
318
319 // ========================================================================
320 // File I/O
321 // ========================================================================
322 /// Write a file inside the VM in a single message.
323 ///
324 /// Use only for files up to [`FILE_WRITE_SINGLE_SHOT_MAX`]. Larger
325 /// files must stream via [`Self::FileWriteBegin`] +
326 /// [`Self::FileWriteChunk`] to avoid exceeding [`MAX_FRAME_SIZE`]
327 /// after base64 + JSON inflation.
328 FileWrite {
329 /// Absolute path in the VM filesystem.
330 path: String,
331 /// File contents.
332 #[serde(with = "base64_bytes")]
333 data: Vec<u8>,
334 /// File mode (e.g., 0o644). None = default (0644).
335 #[serde(default)]
336 mode: Option<u32>,
337 },
338
339 /// Open a streaming file upload session on this connection.
340 ///
341 /// Must be followed by one or more [`Self::FileWriteChunk`]
342 /// requests. The final chunk sets `done: true` to finalize.
343 /// Dropping the connection (or sending any non-chunk request)
344 /// before `done` aborts the session and leaves no partial file
345 /// at `path`.
346 ///
347 /// Sessions are per-connection — one session at a time.
348 FileWriteBegin {
349 /// Absolute path in the VM filesystem.
350 path: String,
351 /// File mode (e.g., 0o644). None = default (0644).
352 #[serde(default)]
353 mode: Option<u32>,
354 /// Expected total size in bytes. Rejected if it exceeds
355 /// [`FILE_TRANSFER_MAX_TOTAL`]. The agent uses this for an
356 /// early-fail check only; the actual size written is the sum
357 /// of chunk byte lengths.
358 total_size: u64,
359 },
360
361 /// Append a chunk to the currently open streaming upload.
362 /// If `done` is true, the agent fsyncs and atomically renames the
363 /// staging file onto the target path.
364 FileWriteChunk {
365 /// Chunk bytes. Typically [`FILE_WRITE_CHUNK_SIZE`] except
366 /// for the last chunk.
367 #[serde(with = "base64_bytes")]
368 data: Vec<u8>,
369 /// True on the final chunk; closes and renames the staging
370 /// file. False on intermediate chunks.
371 done: bool,
372 },
373
374 /// Read a file from the VM.
375 FileRead {
376 /// Absolute path in the VM filesystem.
377 path: String,
378 },
379}
380
381impl AgentRequest {
382 /// A log-safe one-line summary of the request.
383 ///
384 /// This string is written to the machine's console log, which is exposed
385 /// over the logs API — so it must NEVER include credential- or data-bearing
386 /// fields: registry `auth`, `env` (which can carry host-resolved secrets),
387 /// `proxy` (may embed credentials), or `data` (file/stdin bytes). Only the
388 /// variant name plus a non-secret identifier (image) is emitted.
389 ///
390 /// The match is exhaustive with no catch-all on purpose: adding a new
391 /// variant forces a compile error here, so redaction is a deliberate
392 /// decision rather than an accidental leak in some future request type.
393 pub fn log_summary(&self) -> String {
394 match self {
395 AgentRequest::Ping => "Ping".into(),
396 AgentRequest::Pull { image, .. } => format!("Pull {{ image: {image} }}"),
397 AgentRequest::Query { image, .. } => format!("Query {{ image: {image} }}"),
398 AgentRequest::ListImages => "ListImages".into(),
399 AgentRequest::GarbageCollect { .. } => "GarbageCollect".into(),
400 AgentRequest::PrepareOverlay { .. } => "PrepareOverlay".into(),
401 AgentRequest::CleanupOverlay { .. } => "CleanupOverlay".into(),
402 AgentRequest::FormatStorage => "FormatStorage".into(),
403 AgentRequest::StorageStatus => "StorageStatus".into(),
404 AgentRequest::NetworkTest { .. } => "NetworkTest".into(),
405 AgentRequest::Shutdown => "Shutdown".into(),
406 AgentRequest::ExportLayer { .. } => "ExportLayer".into(),
407 AgentRequest::VmExec { .. } => "VmExec".into(),
408 AgentRequest::Run { image, .. } => format!("Run {{ image: {image} }}"),
409 AgentRequest::Stdin { .. } => "Stdin".into(),
410 AgentRequest::Resize { .. } => "Resize".into(),
411 AgentRequest::FileWrite { .. } => "FileWrite".into(),
412 AgentRequest::FileWriteBegin { .. } => "FileWriteBegin".into(),
413 AgentRequest::FileWriteChunk { .. } => "FileWriteChunk".into(),
414 AgentRequest::FileRead { .. } => "FileRead".into(),
415 }
416 }
417}
418
419/// Agent response types.
420#[derive(Debug, Clone, Serialize, Deserialize)]
421#[serde(tag = "status", rename_all = "snake_case")]
422pub enum AgentResponse {
423 /// Operation completed successfully.
424 Ok {
425 /// Response data (varies by request type).
426 #[serde(default, skip_serializing_if = "Option::is_none")]
427 data: Option<serde_json::Value>,
428 },
429
430 /// Pong response to ping.
431 Pong {
432 /// Protocol version.
433 version: u32,
434 },
435
436 /// Progress update (for long operations like pull).
437 Progress {
438 /// Human-readable message.
439 message: String,
440 /// Completion percentage (0-100).
441 #[serde(default, skip_serializing_if = "Option::is_none")]
442 percent: Option<u8>,
443 /// Current layer being processed.
444 #[serde(default, skip_serializing_if = "Option::is_none")]
445 layer: Option<String>,
446 },
447
448 /// Operation failed.
449 Error {
450 /// Error message.
451 message: String,
452 /// Error code (for programmatic handling).
453 #[serde(default, skip_serializing_if = "Option::is_none")]
454 code: Option<String>,
455 },
456
457 /// Command execution completed (non-interactive mode).
458 Completed {
459 /// Exit code from the command.
460 exit_code: i32,
461 /// Standard output (may be truncated). `Vec<u8>` preserves binary
462 /// output (image bytes, tarballs, etc.) that would be truncated by
463 /// `String` at the first non-UTF-8 byte. Serialized as base64 JSON
464 /// string — the same format as the streaming `Stdout` variant.
465 #[serde(with = "base64_bytes")]
466 stdout: Vec<u8>,
467 /// Standard error (may be truncated).
468 #[serde(with = "base64_bytes")]
469 stderr: Vec<u8>,
470 },
471
472 /// Command started (interactive mode).
473 /// Indicates the command is running and ready to receive stdin.
474 Started,
475
476 /// Stdout data from a running command (interactive mode).
477 Stdout {
478 /// Output data.
479 #[serde(with = "base64_bytes")]
480 data: Vec<u8>,
481 },
482
483 /// Stderr data from a running command (interactive mode).
484 Stderr {
485 /// Error output data.
486 #[serde(with = "base64_bytes")]
487 data: Vec<u8>,
488 },
489
490 /// Command exited (interactive mode).
491 Exited {
492 /// Exit code from the command.
493 exit_code: i32,
494 },
495
496 /// Streaming binary-data chunk.
497 ///
498 /// Used by every streaming download direction: the agent sends
499 /// one or more `DataChunk` responses in sequence, with `done: true`
500 /// on the final chunk. Current producers: `ExportLayer` and
501 /// `FileRead`.
502 ///
503 /// Payload size per chunk should stay under
504 /// [`LAYER_CHUNK_SIZE`] so the encoded frame (~1.33× after
505 /// base64) fits inside [`MAX_FRAME_SIZE`] with JSON overhead to
506 /// spare.
507 DataChunk {
508 /// Chunk bytes. Empty allowed on the final frame (common for
509 /// EOF-on-clean-boundary cases).
510 #[serde(with = "base64_bytes")]
511 data: Vec<u8>,
512 /// True on the final chunk of the stream.
513 done: bool,
514 },
515}
516
517// ============================================================================
518// Error Code Constants
519// ============================================================================
520//
521// Standard error codes for AgentResponse::Error. Using constants ensures
522// consistency across the codebase and makes error handling more reliable.
523
524/// Error codes for agent responses.
525pub mod error_codes {
526 /// Request payload was invalid or malformed.
527 pub const INVALID_REQUEST: &str = "INVALID_REQUEST";
528 /// Requested resource was not found.
529 pub const NOT_FOUND: &str = "NOT_FOUND";
530 /// Internal error during operation.
531 pub const INTERNAL_ERROR: &str = "INTERNAL_ERROR";
532 /// Image pull operation failed.
533 pub const PULL_FAILED: &str = "PULL_FAILED";
534 /// Image query operation failed.
535 pub const QUERY_FAILED: &str = "QUERY_FAILED";
536 /// Command execution failed.
537 pub const RUN_FAILED: &str = "RUN_FAILED";
538 /// Command execution failed in container.
539 pub const EXEC_FAILED: &str = "EXEC_FAILED";
540 /// Process spawn failed.
541 pub const SPAWN_FAILED: &str = "SPAWN_FAILED";
542 /// Mount operation failed.
543 pub const MOUNT_FAILED: &str = "MOUNT_FAILED";
544 /// File I/O operation failed.
545 pub const FILE_IO_FAILED: &str = "FILE_IO_FAILED";
546 /// Overlay filesystem operation failed.
547 pub const OVERLAY_FAILED: &str = "OVERLAY_FAILED";
548 /// Cleanup operation failed.
549 pub const CLEANUP_FAILED: &str = "CLEANUP_FAILED";
550 /// Storage format operation failed.
551 pub const FORMAT_FAILED: &str = "FORMAT_FAILED";
552 /// Storage status query failed.
553 pub const STATUS_FAILED: &str = "STATUS_FAILED";
554 /// List operation failed.
555 pub const LIST_FAILED: &str = "LIST_FAILED";
556 /// Garbage collection failed.
557 pub const GC_FAILED: &str = "GC_FAILED";
558 /// Container creation failed.
559 pub const CREATE_FAILED: &str = "CREATE_FAILED";
560 /// Container start failed.
561 pub const START_FAILED: &str = "START_FAILED";
562 /// Container stop failed.
563 pub const STOP_FAILED: &str = "STOP_FAILED";
564 /// Container delete failed.
565 pub const DELETE_FAILED: &str = "DELETE_FAILED";
566 /// Export operation failed.
567 pub const EXPORT_FAILED: &str = "EXPORT_FAILED";
568 /// Serialization error.
569 pub const SERIALIZATION_ERROR: &str = "SERIALIZATION_ERROR";
570 /// Message size exceeds maximum.
571 pub const MESSAGE_TOO_LARGE: &str = "MESSAGE_TOO_LARGE";
572 /// Process wait operation failed.
573 pub const WAIT_FAILED: &str = "WAIT_FAILED";
574}
575
576impl AgentResponse {
577 /// Create an error response with the given message and code.
578 ///
579 /// # Example
580 ///
581 /// ```
582 /// use smolvm_protocol::{AgentResponse, error_codes};
583 ///
584 /// let response = AgentResponse::error("image not found", error_codes::NOT_FOUND);
585 /// ```
586 pub fn error(message: impl Into<String>, code: &str) -> Self {
587 AgentResponse::Error {
588 message: message.into(),
589 code: Some(code.to_string()),
590 }
591 }
592
593 /// Create an error response from a Result's error, with the given code.
594 ///
595 /// # Example
596 ///
597 /// ```ignore
598 /// let response = some_operation()
599 /// .map(|data| AgentResponse::ok_with_data(data))
600 /// .unwrap_or_else(|e| AgentResponse::from_err(e, error_codes::PULL_FAILED));
601 /// ```
602 pub fn from_err<E: std::fmt::Display>(err: E, code: &str) -> Self {
603 AgentResponse::Error {
604 message: err.to_string(),
605 code: Some(code.to_string()),
606 }
607 }
608
609 /// Create an Ok response with optional JSON data.
610 pub fn ok(data: Option<serde_json::Value>) -> Self {
611 AgentResponse::Ok { data }
612 }
613
614 /// Create an Ok response with JSON-serializable data.
615 ///
616 /// Returns an error response if serialization fails.
617 pub fn ok_with_data<T: serde::Serialize>(data: T) -> Self {
618 match serde_json::to_value(data) {
619 Ok(value) => AgentResponse::Ok { data: Some(value) },
620 Err(e) => AgentResponse::error(
621 format!("failed to serialize response: {}", e),
622 error_codes::SERIALIZATION_ERROR,
623 ),
624 }
625 }
626
627 /// Convert a Result into an AgentResponse.
628 ///
629 /// On success, serializes the value to JSON. On error, creates an error response.
630 ///
631 /// # Example
632 ///
633 /// ```ignore
634 /// let response = AgentResponse::from_result(
635 /// storage::pull_image(image),
636 /// error_codes::PULL_FAILED,
637 /// );
638 /// ```
639 pub fn from_result<T, E>(result: Result<T, E>, error_code: &str) -> Self
640 where
641 T: serde::Serialize,
642 E: std::fmt::Display,
643 {
644 match result {
645 Ok(data) => Self::ok_with_data(data),
646 Err(e) => Self::from_err(e, error_code),
647 }
648 }
649}
650
651/// Image information returned by Query/ListImages.
652#[derive(Debug, Clone, Serialize, Deserialize)]
653pub struct ImageInfo {
654 /// Image reference.
655 pub reference: String,
656 /// Image digest (sha256:...).
657 pub digest: String,
658 /// Image size in bytes.
659 pub size: u64,
660 /// Creation timestamp (ISO 8601).
661 pub created: Option<String>,
662 /// Platform architecture.
663 pub architecture: String,
664 /// Platform OS.
665 pub os: String,
666 /// Number of layers.
667 pub layer_count: usize,
668 /// Layer digests in order.
669 pub layers: Vec<String>,
670 /// Image entrypoint (from OCI config).
671 #[serde(default)]
672 pub entrypoint: Vec<String>,
673 /// Image default command (from OCI config).
674 #[serde(default)]
675 pub cmd: Vec<String>,
676 /// Image environment variables (from OCI config).
677 #[serde(default)]
678 pub env: Vec<String>,
679 /// Image working directory (from OCI config).
680 #[serde(default)]
681 pub workdir: Option<String>,
682 /// Image default user (from OCI config).
683 #[serde(default)]
684 pub user: Option<String>,
685}
686
687/// Overlay preparation result.
688#[derive(Debug, Clone, Serialize, Deserialize)]
689pub struct OverlayInfo {
690 /// Path to the merged overlay rootfs.
691 pub rootfs_path: String,
692 /// Path to the upper (writable) directory.
693 pub upper_path: String,
694 /// Path to the work directory.
695 pub work_path: String,
696}
697
698/// Storage status information.
699#[derive(Debug, Clone, Serialize, Deserialize)]
700pub struct StorageStatus {
701 /// Whether the storage is formatted and ready.
702 pub ready: bool,
703 /// Total size in bytes.
704 pub total_bytes: u64,
705 /// Used size in bytes.
706 pub used_bytes: u64,
707 /// Number of cached layers.
708 pub layer_count: usize,
709 /// Number of cached images.
710 pub image_count: usize,
711}
712
713/// Registry authentication credentials for pulling images.
714///
715/// `Debug` is hand-written to redact the password: this value is carried inside
716/// `AgentRequest::Pull`, and any `{:?}` of that request (e.g. a tracing span)
717/// would otherwise serialize the token verbatim into the machine's console log,
718/// which is exposed over the logs API.
719#[derive(Clone, Serialize, Deserialize)]
720pub struct RegistryAuth {
721 /// Username for authentication.
722 pub username: String,
723 /// Password or token for authentication.
724 pub password: String,
725}
726
727impl std::fmt::Debug for RegistryAuth {
728 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
729 f.debug_struct("RegistryAuth")
730 .field("username", &self.username)
731 .field("password", &"***")
732 .finish()
733 }
734}
735
736// ============================================================================
737// Workload VM Protocol (Command Execution)
738// ============================================================================
739
740/// Messages from host to workload VM.
741#[derive(Debug, Clone, Serialize, Deserialize)]
742#[serde(tag = "type", rename_all = "snake_case")]
743pub enum HostMessage {
744 /// Authentication request.
745 Auth {
746 /// Authentication token (base64).
747 token: String,
748 /// Protocol version.
749 protocol_version: u32,
750 },
751
752 /// Run a command.
753 Run {
754 /// Request ID for correlating responses.
755 request_id: u64,
756 /// Command and arguments.
757 command: Vec<String>,
758 /// Environment variables.
759 env: Vec<(String, String)>,
760 /// Working directory.
761 workdir: Option<String>,
762 },
763
764 /// Execute a command in running VM.
765 Exec {
766 /// Request ID.
767 request_id: u64,
768 /// Command and arguments.
769 command: Vec<String>,
770 /// Allocate a TTY.
771 tty: bool,
772 },
773
774 /// Send a signal to a running command.
775 Signal {
776 /// Request ID of the command.
777 request_id: u64,
778 /// Signal number.
779 signal: i32,
780 },
781
782 /// Request graceful shutdown.
783 Stop {
784 /// Timeout in milliseconds.
785 timeout_ms: u64,
786 },
787}
788
789/// Messages from workload VM to host.
790#[derive(Debug, Clone, Serialize, Deserialize)]
791#[serde(tag = "type", rename_all = "snake_case")]
792pub enum GuestMessage {
793 /// Authentication successful.
794 AuthOk,
795
796 /// Authentication failed.
797 AuthFailed,
798
799 /// VM is ready to receive commands.
800 Ready,
801
802 /// Command started.
803 Started {
804 /// Request ID.
805 request_id: u64,
806 },
807
808 /// Stdout data from command.
809 Stdout {
810 /// Request ID.
811 request_id: u64,
812 /// Output data.
813 #[serde(with = "base64_bytes")]
814 data: Vec<u8>,
815 /// Whether output was truncated.
816 truncated: bool,
817 },
818
819 /// Stderr data from command.
820 Stderr {
821 /// Request ID.
822 request_id: u64,
823 /// Output data.
824 #[serde(with = "base64_bytes")]
825 data: Vec<u8>,
826 /// Whether output was truncated.
827 truncated: bool,
828 },
829
830 /// Command exited.
831 Exit {
832 /// Request ID.
833 request_id: u64,
834 /// Exit code.
835 code: i32,
836 /// Exit reason.
837 reason: String,
838 },
839
840 /// Error occurred.
841 Error {
842 /// Request ID (if applicable).
843 request_id: Option<u64>,
844 /// Error message.
845 message: String,
846 },
847}
848
849// ============================================================================
850// Wire Format Helpers
851// ============================================================================
852
853/// Envelope that wraps any message with an optional trace ID for correlation.
854///
855/// On the wire, the trace_id is flattened into the JSON alongside the message
856/// fields: `{"trace_id":"abc123","method":"ping"}`.
857#[derive(Debug, Clone, Serialize, Deserialize)]
858pub struct Envelope<T> {
859 /// Trace ID for correlating host API requests to agent operations.
860 #[serde(skip_serializing_if = "Option::is_none", default)]
861 pub trace_id: Option<String>,
862 /// The wrapped message.
863 #[serde(flatten)]
864 pub body: T,
865}
866
867impl<T> Envelope<T> {
868 /// Create an envelope with no trace ID.
869 pub fn new(body: T) -> Self {
870 Self {
871 trace_id: None,
872 body,
873 }
874 }
875
876 /// Create an envelope with an optional trace ID.
877 pub fn with_trace_id(body: T, trace_id: Option<String>) -> Self {
878 Self { trace_id, body }
879 }
880}
881
882/// Encode a message to wire format (length-prefixed JSON).
883pub fn encode_message<T: Serialize>(msg: &T) -> Result<Vec<u8>, serde_json::Error> {
884 let json = serde_json::to_vec(msg)?;
885 let len = json.len() as u32;
886
887 let mut buf = Vec::with_capacity(4 + json.len());
888 buf.extend_from_slice(&len.to_be_bytes());
889 buf.extend_from_slice(&json);
890
891 Ok(buf)
892}
893
894/// Decode a message from wire format.
895pub fn decode_message<T: for<'de> Deserialize<'de>>(data: &[u8]) -> Result<T, DecodeError> {
896 if data.len() < 4 {
897 return Err(DecodeError::TooShort);
898 }
899
900 let len = u32::from_be_bytes([data[0], data[1], data[2], data[3]]) as usize;
901
902 if len > MAX_FRAME_SIZE as usize {
903 return Err(DecodeError::TooLarge(len));
904 }
905
906 if data.len() < 4 + len {
907 return Err(DecodeError::Incomplete {
908 expected: len,
909 got: data.len() - 4,
910 });
911 }
912
913 serde_json::from_slice(&data[4..4 + len]).map_err(DecodeError::Json)
914}
915
916/// Error decoding a wire message.
917#[derive(Debug)]
918pub enum DecodeError {
919 /// Data too short to contain length header.
920 TooShort,
921 /// Frame size exceeds maximum.
922 TooLarge(usize),
923 /// Incomplete frame.
924 Incomplete {
925 /// Expected length.
926 expected: usize,
927 /// Actual length.
928 got: usize,
929 },
930 /// JSON parse error.
931 Json(serde_json::Error),
932}
933
934impl std::fmt::Display for DecodeError {
935 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
936 match self {
937 DecodeError::TooShort => write!(f, "data too short for length header"),
938 DecodeError::TooLarge(size) => write!(f, "frame too large: {} bytes", size),
939 DecodeError::Incomplete { expected, got } => {
940 write!(
941 f,
942 "incomplete frame: expected {} bytes, got {}",
943 expected, got
944 )
945 }
946 DecodeError::Json(e) => write!(f, "JSON decode error: {}", e),
947 }
948 }
949}
950
951impl std::error::Error for DecodeError {}
952
953#[cfg(test)]
954mod tests {
955 use super::*;
956
957 #[test]
958 fn test_encode_decode_roundtrip() {
959 let req = AgentRequest::Pull {
960 image: "alpine:latest".to_string(),
961 oci_platform: Some("linux/arm64".to_string()),
962 auth: None,
963 proxy: None,
964 no_proxy: None,
965 };
966
967 let encoded = encode_message(&req).unwrap();
968 let decoded: AgentRequest = decode_message(&encoded).unwrap();
969
970 let AgentRequest::Pull {
971 image,
972 oci_platform,
973 auth,
974 proxy,
975 no_proxy,
976 } = decoded
977 else {
978 panic!("expected Pull variant, got {:?}", decoded);
979 };
980 assert_eq!(image, "alpine:latest");
981 assert_eq!(oci_platform, Some("linux/arm64".to_string()));
982 assert!(auth.is_none());
983 assert!(proxy.is_none());
984 assert!(no_proxy.is_none());
985 }
986
987 #[test]
988 fn test_encode_decode_with_auth() {
989 let req = AgentRequest::Pull {
990 image: "ghcr.io/owner/repo:latest".to_string(),
991 oci_platform: None,
992 auth: Some(RegistryAuth {
993 username: "testuser".to_string(),
994 password: "testpass".to_string(),
995 }),
996 proxy: None,
997 no_proxy: None,
998 };
999
1000 let encoded = encode_message(&req).unwrap();
1001 let decoded: AgentRequest = decode_message(&encoded).unwrap();
1002
1003 let AgentRequest::Pull {
1004 image,
1005 oci_platform,
1006 auth,
1007 proxy: _,
1008 no_proxy: _,
1009 } = decoded
1010 else {
1011 panic!("expected Pull variant, got {:?}", decoded);
1012 };
1013 assert_eq!(image, "ghcr.io/owner/repo:latest");
1014 assert!(oci_platform.is_none());
1015 let auth = auth.expect("auth should be Some");
1016 assert_eq!(auth.username, "testuser");
1017 assert_eq!(auth.password, "testpass");
1018 }
1019
1020 #[test]
1021 fn test_encode_decode_with_proxy() {
1022 let req = AgentRequest::Pull {
1023 image: "alpine:latest".to_string(),
1024 oci_platform: None,
1025 auth: None,
1026 proxy: Some("http://192.168.127.254:3128".to_string()),
1027 no_proxy: Some("127.0.0.1,localhost,.internal".to_string()),
1028 };
1029
1030 let encoded = encode_message(&req).unwrap();
1031 let decoded: AgentRequest = decode_message(&encoded).unwrap();
1032
1033 let AgentRequest::Pull {
1034 proxy, no_proxy, ..
1035 } = decoded
1036 else {
1037 panic!("expected Pull variant, got {:?}", decoded);
1038 };
1039 assert_eq!(proxy.as_deref(), Some("http://192.168.127.254:3128"));
1040 assert_eq!(no_proxy.as_deref(), Some("127.0.0.1,localhost,.internal"));
1041 }
1042
1043 #[test]
1044 fn test_decode_too_short() {
1045 let data = [0u8; 2];
1046 let result: Result<AgentRequest, _> = decode_message(&data);
1047 assert!(matches!(result, Err(DecodeError::TooShort)));
1048 }
1049
1050 #[test]
1051 fn test_decode_incomplete() {
1052 let mut data = vec![0, 0, 0, 100]; // claims 100 bytes
1053 data.extend_from_slice(b"{}"); // only 2 bytes of payload
1054 let result: Result<AgentRequest, _> = decode_message(&data);
1055 assert!(matches!(result, Err(DecodeError::Incomplete { .. })));
1056 }
1057
1058 #[test]
1059 fn test_agent_request_serialization() {
1060 let req = AgentRequest::Ping;
1061 let json = serde_json::to_string(&req).unwrap();
1062 assert!(json.contains("ping"));
1063
1064 let req = AgentRequest::PrepareOverlay {
1065 image: "ubuntu:22.04".to_string(),
1066 workload_id: "wl-123".to_string(),
1067 };
1068 let json = serde_json::to_string(&req).unwrap();
1069 assert!(json.contains("prepare_overlay"));
1070 }
1071
1072 #[test]
1073 fn test_agent_response_serialization() {
1074 let resp = AgentResponse::Pong {
1075 version: PROTOCOL_VERSION,
1076 };
1077 let json = serde_json::to_string(&resp).unwrap();
1078 assert!(json.contains("pong"));
1079
1080 let resp = AgentResponse::Progress {
1081 message: "Pulling layer 1/3".to_string(),
1082 percent: Some(33),
1083 layer: Some("sha256:abc123".to_string()),
1084 };
1085 let json = serde_json::to_string(&resp).unwrap();
1086 assert!(json.contains("progress"));
1087 }
1088
1089 #[test]
1090 fn file_write_begin_roundtrips() {
1091 let req = AgentRequest::FileWriteBegin {
1092 path: "/tmp/target".into(),
1093 mode: Some(0o600),
1094 total_size: 123_456_789,
1095 };
1096 let bytes = encode_message(&req).unwrap();
1097 let back: AgentRequest = decode_message(&bytes).unwrap();
1098 match back {
1099 AgentRequest::FileWriteBegin {
1100 path,
1101 mode,
1102 total_size,
1103 } => {
1104 assert_eq!(path, "/tmp/target");
1105 assert_eq!(mode, Some(0o600));
1106 assert_eq!(total_size, 123_456_789);
1107 }
1108 _ => panic!("wrong variant"),
1109 }
1110 }
1111
1112 #[test]
1113 fn file_write_chunk_roundtrips_binary_data() {
1114 // Binary data (bytes outside UTF-8) must survive the base64
1115 // trip intact. If the encoding ever silently lossifies, this
1116 // fires.
1117 let payload: Vec<u8> = (0u8..=255).collect();
1118 let req = AgentRequest::FileWriteChunk {
1119 data: payload.clone(),
1120 done: true,
1121 };
1122 let bytes = encode_message(&req).unwrap();
1123 let back: AgentRequest = decode_message(&bytes).unwrap();
1124 match back {
1125 AgentRequest::FileWriteChunk { data, done } => {
1126 assert_eq!(data, payload);
1127 assert!(done);
1128 }
1129 _ => panic!("wrong variant"),
1130 }
1131 }
1132
1133 #[test]
1134 fn file_write_size_constants_are_frame_safe() {
1135 // Sanity: a single streaming chunk at FILE_WRITE_CHUNK_SIZE
1136 // must fit inside MAX_FRAME_SIZE after base64 (+ ~33%) and
1137 // JSON overhead. If anyone bumps CHUNK_SIZE past the limit,
1138 // this test fires before production does.
1139 let chunk_bytes = FILE_WRITE_CHUNK_SIZE as u64;
1140 let base64_bytes = chunk_bytes.div_ceil(3) * 4; // ceil(n/3)*4
1141 let json_overhead = 256u64; // method tag, done bool, quotes
1142 let total = base64_bytes + json_overhead;
1143 assert!(
1144 total < MAX_FRAME_SIZE as u64,
1145 "FILE_WRITE_CHUNK_SIZE of {} bytes would produce a frame \
1146 of ~{} bytes which exceeds MAX_FRAME_SIZE of {}",
1147 chunk_bytes,
1148 total,
1149 MAX_FRAME_SIZE
1150 );
1151
1152 // Single-shot threshold must be <= chunk size. They can be
1153 // equal (a 1 MiB file is a single shot; a 1 MiB + 1 byte
1154 // file streams as two chunks); but SINGLE_SHOT > CHUNK would
1155 // be incoherent — a file slightly over the shot threshold
1156 // would need to stream as... a single oversized chunk.
1157 assert!(FILE_WRITE_SINGLE_SHOT_MAX <= FILE_WRITE_CHUNK_SIZE);
1158 }
1159
1160 #[test]
1161 fn test_ports_constants() {
1162 assert_eq!(ports::WORKLOAD_CONTROL, 5000);
1163 assert_eq!(ports::WORKLOAD_LOGS, 5001);
1164 assert_eq!(ports::AGENT_CONTROL, 6000);
1165 assert_eq!(ports::SSH_AGENT, 6001);
1166 }
1167
1168 #[test]
1169 fn test_cid_constants() {
1170 assert_eq!(cid::HOST, 2);
1171 assert_eq!(cid::GUEST, 3);
1172 }
1173
1174 #[test]
1175 fn test_envelope_serialization_with_trace_id() {
1176 let req = AgentRequest::Ping;
1177 let envelope = Envelope::with_trace_id(&req, Some("abc123".to_string()));
1178 let json = serde_json::to_string(&envelope).unwrap();
1179
1180 // trace_id should be flattened alongside the method tag
1181 assert!(json.contains("\"trace_id\":\"abc123\""));
1182 assert!(json.contains("\"method\":\"ping\""));
1183
1184 // Deserialize back — Envelope<AgentRequest> with flatten
1185 let parsed: Envelope<AgentRequest> = serde_json::from_str(&json).unwrap();
1186 assert_eq!(parsed.trace_id.as_deref(), Some("abc123"));
1187 assert!(matches!(parsed.body, AgentRequest::Ping));
1188 }
1189
1190 #[test]
1191 fn test_envelope_without_trace_id() {
1192 let req = AgentRequest::Ping;
1193 let envelope = Envelope::new(&req);
1194 let json = serde_json::to_string(&envelope).unwrap();
1195
1196 // No trace_id field (skip_serializing_if = None)
1197 assert!(!json.contains("trace_id"));
1198 assert!(json.contains("\"method\":\"ping\""));
1199 }
1200
1201 #[test]
1202 fn test_envelope_backward_compat_bare_request() {
1203 // A bare AgentRequest (no Envelope) should fail to parse as Envelope
1204 // but succeed as bare AgentRequest — this is the agent's fallback path
1205 let bare_json = r#"{"method":"ping"}"#;
1206
1207 // Envelope parse should fail (no body field to flatten into)
1208 // Actually with flatten, this may work — let's verify
1209 let envelope_result = serde_json::from_str::<Envelope<AgentRequest>>(bare_json);
1210 let bare_result = serde_json::from_str::<AgentRequest>(bare_json);
1211
1212 // At least one must succeed for backward compat
1213 assert!(
1214 envelope_result.is_ok() || bare_result.is_ok(),
1215 "Neither Envelope nor bare parse succeeded"
1216 );
1217
1218 // Bare parse must always work
1219 assert!(bare_result.is_ok());
1220 assert!(matches!(bare_result.unwrap(), AgentRequest::Ping));
1221
1222 // If Envelope works, trace_id should be None
1223 if let Ok(env) = envelope_result {
1224 assert!(env.trace_id.is_none());
1225 }
1226 }
1227}