smolvm_protocol/lib.rs
1//! Protocol types for smolvm host-guest communication.
2//!
3//! This crate defines the wire protocol for vsock communication between
4//! the smolvm host and the guest agent (smolvm-agent).
5//!
6//! # Protocol Overview
7//!
8//! Communication uses JSON-encoded messages over vsock. Each message is
9//! prefixed with a 4-byte big-endian length header.
10//!
11//! ```text
12//! +----------------+-------------------+
13//! | Length (4 BE) | JSON payload |
14//! +----------------+-------------------+
15//! ```
16
17#![deny(missing_docs)]
18
19use serde::{Deserialize, Serialize};
20
21pub mod guest_env;
22pub mod image_ref;
23pub mod retry;
24
25pub use image_ref::normalize_image_ref;
26
27/// Serde helper for encoding `Vec<u8>` as a base64 string in JSON.
28///
29/// Without this, serde_json serializes `Vec<u8>` as a JSON array of numbers
30/// (e.g., `[104,101,108,108,111]`), which inflates binary data by ~4x.
31/// Base64 encoding reduces this to ~1.33x.
32pub mod base64_bytes {
33 use base64::{engine::general_purpose::STANDARD, Engine};
34 use serde::{Deserialize, Deserializer, Serializer};
35
36 /// Serialize `Vec<u8>` as a base64 string.
37 pub fn serialize<S: Serializer>(data: &[u8], serializer: S) -> Result<S::Ok, S::Error> {
38 serializer.serialize_str(&STANDARD.encode(data))
39 }
40
41 /// Deserialize a base64 string into `Vec<u8>`.
42 pub fn deserialize<'de, D: Deserializer<'de>>(deserializer: D) -> Result<Vec<u8>, D::Error> {
43 let s = String::deserialize(deserializer)?;
44 STANDARD.decode(&s).map_err(serde::de::Error::custom)
45 }
46}
47
48/// Protocol version.
49pub const PROTOCOL_VERSION: u32 = 1;
50
51/// Maximum frame size (32 MB - layer exports use chunked streaming).
52pub const MAX_FRAME_SIZE: u32 = 32 * 1024 * 1024;
53
54/// Chunk size for streaming layer data (~16 MB raw, ~21 MB as base64 JSON).
55pub const LAYER_CHUNK_SIZE: usize = 16 * 1024 * 1024;
56
57/// Files at or below this size are written with a single `FileWrite`
58/// message. Larger files must stream via
59/// `FileWriteBegin` + `FileWriteChunk` so no single frame approaches
60/// [`MAX_FRAME_SIZE`] (base64 + JSON inflation is ~1.4x).
61///
62/// Chosen to keep the single-shot frame comfortably under the frame
63/// limit while preserving the fast-path latency for small config
64/// files / scripts / keys.
65pub const FILE_WRITE_SINGLE_SHOT_MAX: usize = 1024 * 1024;
66
67/// Payload bytes per streaming upload chunk. Deliberately small —
68/// equal to [`FILE_WRITE_SINGLE_SHOT_MAX`] — so each chunk's encoded
69/// frame (~1.4 MB) fits inside typical kernel Unix-socket send
70/// buffers (`SO_SNDBUF` defaults on the order of 200–256 KiB but
71/// can grow). Larger chunks would force `write_all` to spin waiting
72/// for the agent to drain, and any latency spike trips the 10 s
73/// write timeout with `EAGAIN` — exactly the failure David
74/// reproduced before this fix landed.
75///
76/// Note: [`LAYER_CHUNK_SIZE`] is 16 MiB for agent→host (download)
77/// streaming, which works because the host side of the socket has
78/// more headroom than the guest side. Upload streaming is the
79/// asymmetric case and needs a smaller chunk.
80pub const FILE_WRITE_CHUNK_SIZE: usize = FILE_WRITE_SINGLE_SHOT_MAX;
81
82/// Hard ceiling on a single file transfer in either direction.
83///
84/// On the write path: enforced at `FileWriteBegin` by the agent —
85/// `total_size > FILE_TRANSFER_MAX_TOTAL` is rejected before any
86/// staging file is created.
87///
88/// On the read path: enforced by the host's `read_file` loop —
89/// after the first chunk that pushes the accumulated total past the
90/// cap, the call bails with an error and the partial buffer is
91/// dropped. This protects the host process from OOM if the guest
92/// (compromised or merely buggy) streams unbounded data.
93///
94/// 4 GiB matches the order-of-magnitude of the default overlay disk
95/// and the `gpu_vram_mib` cap. Callers that need to move larger
96/// blobs should stage via a virtiofs mount instead of `cp`.
97pub const FILE_TRANSFER_MAX_TOTAL: u64 = 4 * 1024 * 1024 * 1024;
98
99/// Filename of the virtiofs-visible marker the agent creates when it is
100/// ready to accept vsock connections.
101///
102/// The host polls for this file through its virtiofs mount of the guest
103/// rootfs. The agent writes it (and optionally a symlink from `/oldroot/`)
104/// during deferred init, just before opening the vsock listener.
105///
106/// Both sides must agree on this name; keeping it here prevents silent drift.
107pub const AGENT_READY_MARKER: &str = ".smolvm-ready";
108
109/// Well-known vsock ports.
110pub mod ports {
111 /// Control channel for workload VMs.
112 pub const WORKLOAD_CONTROL: u32 = 5000;
113 /// Log streaming from workload VMs.
114 pub const WORKLOAD_LOGS: u32 = 5001;
115 /// Agent control port (for OCI operations and management).
116 pub const AGENT_CONTROL: u32 = 6000;
117 /// SSH agent forwarding (host SSH_AUTH_SOCK bridged to guest).
118 pub const SSH_AGENT: u32 = 6001;
119 /// DNS filtering proxy (guest forwards DNS queries to host for filtering).
120 pub const DNS_FILTER: u32 = 6002;
121}
122
123/// vsock CID constants.
124pub mod cid {
125 /// Host CID (always 2).
126 pub const HOST: u32 = 2;
127 /// Guest CID (always 3 for the first/only guest).
128 pub const GUEST: u32 = 3;
129 /// Any CID (for listening).
130 pub const ANY: u32 = u32::MAX;
131}
132
133// ============================================================================
134// Agent Protocol (OCI Operations)
135// ============================================================================
136
137/// Agent request types (for image management and OCI operations).
138#[derive(Debug, Clone, Serialize, Deserialize)]
139#[serde(tag = "method", rename_all = "snake_case")]
140pub enum AgentRequest {
141 /// Ping to check if agent is alive.
142 Ping,
143
144 /// Pull an OCI image and extract layers.
145 Pull {
146 /// Image reference (e.g., "alpine:latest", "docker.io/library/ubuntu:22.04").
147 image: String,
148 /// OCI platform to pull (e.g., "linux/arm64", "linux/amd64").
149 oci_platform: Option<String>,
150 /// Optional registry authentication credentials.
151 #[serde(default, skip_serializing_if = "Option::is_none")]
152 auth: Option<RegistryAuth>,
153 /// Proxy URL applied to the registry client (sets HTTP_PROXY and HTTPS_PROXY).
154 #[serde(default, skip_serializing_if = "Option::is_none")]
155 proxy: Option<String>,
156 /// Comma-separated NO_PROXY list of hosts/CIDRs that bypass the proxy.
157 #[serde(default, skip_serializing_if = "Option::is_none")]
158 no_proxy: Option<String>,
159 },
160
161 /// Query if an image exists locally.
162 Query {
163 /// Image reference.
164 image: String,
165 },
166
167 /// List all cached images.
168 ListImages,
169
170 /// Run garbage collection on unused layers.
171 GarbageCollect {
172 /// If true, only report what would be deleted.
173 dry_run: bool,
174 /// If true, delete all image manifests and configs first,
175 /// making all layers unreferenced so they get collected.
176 #[serde(default)]
177 purge_all: bool,
178 },
179
180 /// Prepare overlay rootfs for a workload.
181 PrepareOverlay {
182 /// Image reference.
183 image: String,
184 /// Unique workload ID for the overlay.
185 workload_id: String,
186 },
187
188 /// Clean up overlay rootfs for a workload.
189 CleanupOverlay {
190 /// Workload ID to clean up.
191 workload_id: String,
192 },
193
194 /// Format the storage disk (first-time setup).
195 FormatStorage,
196
197 /// Get storage disk status.
198 StorageStatus,
199
200 /// Test network connectivity directly from the agent (not via chroot).
201 /// Used to debug TSI networking.
202 NetworkTest {
203 /// URL to test (e.g., "http://1.1.1.1")
204 url: String,
205 },
206
207 /// Shutdown the agent.
208 Shutdown,
209
210 /// Export a layer as a tar archive.
211 ///
212 /// Used by `smolvm pack` to extract OCI layers for packaging.
213 /// The agent streams the layer tar data back via LayerData responses.
214 ExportLayer {
215 /// Image digest (sha256:...).
216 image_digest: String,
217 /// Layer index (0-based).
218 layer_index: usize,
219 },
220
221 /// Execute a command directly in the VM (not in a container).
222 ///
223 /// This runs the command in the agent's Alpine rootfs without any
224 /// container isolation. Useful for VM-level operations and debugging.
225 VmExec {
226 /// Command and arguments.
227 command: Vec<String>,
228 /// Environment variables.
229 #[serde(default)]
230 env: Vec<(String, String)>,
231 /// Working directory in the VM.
232 workdir: Option<String>,
233 /// Timeout in milliseconds.
234 #[serde(default)]
235 timeout_ms: Option<u64>,
236 /// Interactive mode - stream I/O instead of buffering.
237 #[serde(default)]
238 interactive: bool,
239 /// Allocate a pseudo-TTY for the command.
240 #[serde(default)]
241 tty: bool,
242 /// Background mode - spawn and return PID immediately without waiting.
243 #[serde(default)]
244 background: bool,
245 /// Data to pipe to the command's stdin.
246 #[serde(default)]
247 stdin_data: Option<String>,
248 },
249
250 /// Run a command in an image's rootfs.
251 ///
252 /// This prepares an overlay, chroots into it, and executes the command.
253 /// Returns stdout, stderr, and exit code when the command completes.
254 Run {
255 /// Image reference (must be pulled first).
256 image: String,
257 /// Command and arguments.
258 command: Vec<String>,
259 /// Environment variables.
260 #[serde(default)]
261 env: Vec<(String, String)>,
262 /// Working directory inside the rootfs.
263 workdir: Option<String>,
264 /// User inside the rootfs. If omitted, the OCI image default applies.
265 #[serde(default, skip_serializing_if = "Option::is_none")]
266 user: Option<String>,
267 /// Volume mounts to bind into the container.
268 /// Each tuple is (virtiofs_tag, container_path, read_only).
269 #[serde(default)]
270 mounts: Vec<(String, String, bool)>,
271 /// Timeout in milliseconds. If the command exceeds this duration,
272 /// it will be killed and return exit code 124.
273 #[serde(default)]
274 timeout_ms: Option<u64>,
275 /// Interactive mode - stream I/O instead of buffering.
276 /// When true, output is streamed via Stdout/Stderr responses,
277 /// and stdin can be sent via the Stdin request.
278 #[serde(default)]
279 interactive: bool,
280 /// Allocate a pseudo-TTY for the command.
281 /// Enables terminal features like colors, line editing, and signal handling.
282 #[serde(default)]
283 tty: bool,
284 /// Detached mode — start the container and return immediately with the
285 /// container ID. Only meaningful when `persistent_overlay_id` is set.
286 /// Returns a `Completed` response with `stdout` containing the container ID.
287 #[serde(default)]
288 detached: bool,
289 /// If set, use a persistent overlay that survives across exec sessions.
290 /// The overlay is identified by this ID (typically the machine name)
291 /// and reused on subsequent runs. If not set, an ephemeral overlay is
292 /// created and destroyed after the run.
293 #[serde(default, skip_serializing_if = "Option::is_none")]
294 persistent_overlay_id: Option<String>,
295 /// Spawn the container and return immediately with the crun PID.
296 /// The container runs detached; stdout/stderr go to /dev/null.
297 /// Incompatible with `interactive` and `tty`.
298 #[serde(default)]
299 background: bool,
300 },
301
302 /// Send stdin data to a running interactive command.
303 Stdin {
304 /// Input data to send to the command's stdin.
305 #[serde(with = "base64_bytes")]
306 data: Vec<u8>,
307 },
308
309 /// Resize the PTY window (for TTY mode).
310 Resize {
311 /// New width in columns.
312 cols: u16,
313 /// New height in rows.
314 rows: u16,
315 },
316
317 // ========================================================================
318 // File I/O
319 // ========================================================================
320 /// Write a file inside the VM in a single message.
321 ///
322 /// Use only for files up to [`FILE_WRITE_SINGLE_SHOT_MAX`]. Larger
323 /// files must stream via [`Self::FileWriteBegin`] +
324 /// [`Self::FileWriteChunk`] to avoid exceeding [`MAX_FRAME_SIZE`]
325 /// after base64 + JSON inflation.
326 FileWrite {
327 /// Absolute path in the VM filesystem.
328 path: String,
329 /// File contents.
330 #[serde(with = "base64_bytes")]
331 data: Vec<u8>,
332 /// File mode (e.g., 0o644). None = default (0644).
333 #[serde(default)]
334 mode: Option<u32>,
335 },
336
337 /// Open a streaming file upload session on this connection.
338 ///
339 /// Must be followed by one or more [`Self::FileWriteChunk`]
340 /// requests. The final chunk sets `done: true` to finalize.
341 /// Dropping the connection (or sending any non-chunk request)
342 /// before `done` aborts the session and leaves no partial file
343 /// at `path`.
344 ///
345 /// Sessions are per-connection — one session at a time.
346 FileWriteBegin {
347 /// Absolute path in the VM filesystem.
348 path: String,
349 /// File mode (e.g., 0o644). None = default (0644).
350 #[serde(default)]
351 mode: Option<u32>,
352 /// Expected total size in bytes. Rejected if it exceeds
353 /// [`FILE_TRANSFER_MAX_TOTAL`]. The agent uses this for an
354 /// early-fail check only; the actual size written is the sum
355 /// of chunk byte lengths.
356 total_size: u64,
357 },
358
359 /// Append a chunk to the currently open streaming upload.
360 /// If `done` is true, the agent fsyncs and atomically renames the
361 /// staging file onto the target path.
362 FileWriteChunk {
363 /// Chunk bytes. Typically [`FILE_WRITE_CHUNK_SIZE`] except
364 /// for the last chunk.
365 #[serde(with = "base64_bytes")]
366 data: Vec<u8>,
367 /// True on the final chunk; closes and renames the staging
368 /// file. False on intermediate chunks.
369 done: bool,
370 },
371
372 /// Read a file from the VM.
373 FileRead {
374 /// Absolute path in the VM filesystem.
375 path: String,
376 },
377}
378
379impl AgentRequest {
380 /// A log-safe one-line summary of the request.
381 ///
382 /// This string is written to the machine's console log, which is exposed
383 /// over the logs API — so it must NEVER include credential- or data-bearing
384 /// fields: registry `auth`, `env` (which can carry host-resolved secrets),
385 /// `proxy` (may embed credentials), or `data` (file/stdin bytes). Only the
386 /// variant name plus a non-secret identifier (image) is emitted.
387 ///
388 /// The match is exhaustive with no catch-all on purpose: adding a new
389 /// variant forces a compile error here, so redaction is a deliberate
390 /// decision rather than an accidental leak in some future request type.
391 pub fn log_summary(&self) -> String {
392 match self {
393 AgentRequest::Ping => "Ping".into(),
394 AgentRequest::Pull { image, .. } => format!("Pull {{ image: {image} }}"),
395 AgentRequest::Query { image, .. } => format!("Query {{ image: {image} }}"),
396 AgentRequest::ListImages => "ListImages".into(),
397 AgentRequest::GarbageCollect { .. } => "GarbageCollect".into(),
398 AgentRequest::PrepareOverlay { .. } => "PrepareOverlay".into(),
399 AgentRequest::CleanupOverlay { .. } => "CleanupOverlay".into(),
400 AgentRequest::FormatStorage => "FormatStorage".into(),
401 AgentRequest::StorageStatus => "StorageStatus".into(),
402 AgentRequest::NetworkTest { .. } => "NetworkTest".into(),
403 AgentRequest::Shutdown => "Shutdown".into(),
404 AgentRequest::ExportLayer { .. } => "ExportLayer".into(),
405 AgentRequest::VmExec { .. } => "VmExec".into(),
406 AgentRequest::Run { image, .. } => format!("Run {{ image: {image} }}"),
407 AgentRequest::Stdin { .. } => "Stdin".into(),
408 AgentRequest::Resize { .. } => "Resize".into(),
409 AgentRequest::FileWrite { .. } => "FileWrite".into(),
410 AgentRequest::FileWriteBegin { .. } => "FileWriteBegin".into(),
411 AgentRequest::FileWriteChunk { .. } => "FileWriteChunk".into(),
412 AgentRequest::FileRead { .. } => "FileRead".into(),
413 }
414 }
415}
416
417/// Agent response types.
418#[derive(Debug, Clone, Serialize, Deserialize)]
419#[serde(tag = "status", rename_all = "snake_case")]
420pub enum AgentResponse {
421 /// Operation completed successfully.
422 Ok {
423 /// Response data (varies by request type).
424 #[serde(default, skip_serializing_if = "Option::is_none")]
425 data: Option<serde_json::Value>,
426 },
427
428 /// Pong response to ping.
429 Pong {
430 /// Protocol version.
431 version: u32,
432 },
433
434 /// Progress update (for long operations like pull).
435 Progress {
436 /// Human-readable message.
437 message: String,
438 /// Completion percentage (0-100).
439 #[serde(default, skip_serializing_if = "Option::is_none")]
440 percent: Option<u8>,
441 /// Current layer being processed.
442 #[serde(default, skip_serializing_if = "Option::is_none")]
443 layer: Option<String>,
444 },
445
446 /// Operation failed.
447 Error {
448 /// Error message.
449 message: String,
450 /// Error code (for programmatic handling).
451 #[serde(default, skip_serializing_if = "Option::is_none")]
452 code: Option<String>,
453 },
454
455 /// Command execution completed (non-interactive mode).
456 Completed {
457 /// Exit code from the command.
458 exit_code: i32,
459 /// Standard output (may be truncated). `Vec<u8>` preserves binary
460 /// output (image bytes, tarballs, etc.) that would be truncated by
461 /// `String` at the first non-UTF-8 byte. Serialized as base64 JSON
462 /// string — the same format as the streaming `Stdout` variant.
463 #[serde(with = "base64_bytes")]
464 stdout: Vec<u8>,
465 /// Standard error (may be truncated).
466 #[serde(with = "base64_bytes")]
467 stderr: Vec<u8>,
468 },
469
470 /// Command started (interactive mode).
471 /// Indicates the command is running and ready to receive stdin.
472 Started,
473
474 /// Stdout data from a running command (interactive mode).
475 Stdout {
476 /// Output data.
477 #[serde(with = "base64_bytes")]
478 data: Vec<u8>,
479 },
480
481 /// Stderr data from a running command (interactive mode).
482 Stderr {
483 /// Error output data.
484 #[serde(with = "base64_bytes")]
485 data: Vec<u8>,
486 },
487
488 /// Command exited (interactive mode).
489 Exited {
490 /// Exit code from the command.
491 exit_code: i32,
492 },
493
494 /// Streaming binary-data chunk.
495 ///
496 /// Used by every streaming download direction: the agent sends
497 /// one or more `DataChunk` responses in sequence, with `done: true`
498 /// on the final chunk. Current producers: `ExportLayer` and
499 /// `FileRead`.
500 ///
501 /// Payload size per chunk should stay under
502 /// [`LAYER_CHUNK_SIZE`] so the encoded frame (~1.33× after
503 /// base64) fits inside [`MAX_FRAME_SIZE`] with JSON overhead to
504 /// spare.
505 DataChunk {
506 /// Chunk bytes. Empty allowed on the final frame (common for
507 /// EOF-on-clean-boundary cases).
508 #[serde(with = "base64_bytes")]
509 data: Vec<u8>,
510 /// True on the final chunk of the stream.
511 done: bool,
512 },
513}
514
515// ============================================================================
516// Error Code Constants
517// ============================================================================
518//
519// Standard error codes for AgentResponse::Error. Using constants ensures
520// consistency across the codebase and makes error handling more reliable.
521
522/// Error codes for agent responses.
523pub mod error_codes {
524 /// Request payload was invalid or malformed.
525 pub const INVALID_REQUEST: &str = "INVALID_REQUEST";
526 /// Requested resource was not found.
527 pub const NOT_FOUND: &str = "NOT_FOUND";
528 /// Internal error during operation.
529 pub const INTERNAL_ERROR: &str = "INTERNAL_ERROR";
530 /// Image pull operation failed.
531 pub const PULL_FAILED: &str = "PULL_FAILED";
532 /// Image query operation failed.
533 pub const QUERY_FAILED: &str = "QUERY_FAILED";
534 /// Command execution failed.
535 pub const RUN_FAILED: &str = "RUN_FAILED";
536 /// Command execution failed in container.
537 pub const EXEC_FAILED: &str = "EXEC_FAILED";
538 /// Process spawn failed.
539 pub const SPAWN_FAILED: &str = "SPAWN_FAILED";
540 /// Mount operation failed.
541 pub const MOUNT_FAILED: &str = "MOUNT_FAILED";
542 /// File I/O operation failed.
543 pub const FILE_IO_FAILED: &str = "FILE_IO_FAILED";
544 /// Overlay filesystem operation failed.
545 pub const OVERLAY_FAILED: &str = "OVERLAY_FAILED";
546 /// Cleanup operation failed.
547 pub const CLEANUP_FAILED: &str = "CLEANUP_FAILED";
548 /// Storage format operation failed.
549 pub const FORMAT_FAILED: &str = "FORMAT_FAILED";
550 /// Storage status query failed.
551 pub const STATUS_FAILED: &str = "STATUS_FAILED";
552 /// List operation failed.
553 pub const LIST_FAILED: &str = "LIST_FAILED";
554 /// Garbage collection failed.
555 pub const GC_FAILED: &str = "GC_FAILED";
556 /// Container creation failed.
557 pub const CREATE_FAILED: &str = "CREATE_FAILED";
558 /// Container start failed.
559 pub const START_FAILED: &str = "START_FAILED";
560 /// Container stop failed.
561 pub const STOP_FAILED: &str = "STOP_FAILED";
562 /// Container delete failed.
563 pub const DELETE_FAILED: &str = "DELETE_FAILED";
564 /// Export operation failed.
565 pub const EXPORT_FAILED: &str = "EXPORT_FAILED";
566 /// Serialization error.
567 pub const SERIALIZATION_ERROR: &str = "SERIALIZATION_ERROR";
568 /// Message size exceeds maximum.
569 pub const MESSAGE_TOO_LARGE: &str = "MESSAGE_TOO_LARGE";
570 /// Process wait operation failed.
571 pub const WAIT_FAILED: &str = "WAIT_FAILED";
572}
573
574impl AgentResponse {
575 /// Create an error response with the given message and code.
576 ///
577 /// # Example
578 ///
579 /// ```
580 /// use smolvm_protocol::{AgentResponse, error_codes};
581 ///
582 /// let response = AgentResponse::error("image not found", error_codes::NOT_FOUND);
583 /// ```
584 pub fn error(message: impl Into<String>, code: &str) -> Self {
585 AgentResponse::Error {
586 message: message.into(),
587 code: Some(code.to_string()),
588 }
589 }
590
591 /// Create an error response from a Result's error, with the given code.
592 ///
593 /// # Example
594 ///
595 /// ```ignore
596 /// let response = some_operation()
597 /// .map(|data| AgentResponse::ok_with_data(data))
598 /// .unwrap_or_else(|e| AgentResponse::from_err(e, error_codes::PULL_FAILED));
599 /// ```
600 pub fn from_err<E: std::fmt::Display>(err: E, code: &str) -> Self {
601 AgentResponse::Error {
602 message: err.to_string(),
603 code: Some(code.to_string()),
604 }
605 }
606
607 /// Create an Ok response with optional JSON data.
608 pub fn ok(data: Option<serde_json::Value>) -> Self {
609 AgentResponse::Ok { data }
610 }
611
612 /// Create an Ok response with JSON-serializable data.
613 ///
614 /// Returns an error response if serialization fails.
615 pub fn ok_with_data<T: serde::Serialize>(data: T) -> Self {
616 match serde_json::to_value(data) {
617 Ok(value) => AgentResponse::Ok { data: Some(value) },
618 Err(e) => AgentResponse::error(
619 format!("failed to serialize response: {}", e),
620 error_codes::SERIALIZATION_ERROR,
621 ),
622 }
623 }
624
625 /// Convert a Result into an AgentResponse.
626 ///
627 /// On success, serializes the value to JSON. On error, creates an error response.
628 ///
629 /// # Example
630 ///
631 /// ```ignore
632 /// let response = AgentResponse::from_result(
633 /// storage::pull_image(image),
634 /// error_codes::PULL_FAILED,
635 /// );
636 /// ```
637 pub fn from_result<T, E>(result: Result<T, E>, error_code: &str) -> Self
638 where
639 T: serde::Serialize,
640 E: std::fmt::Display,
641 {
642 match result {
643 Ok(data) => Self::ok_with_data(data),
644 Err(e) => Self::from_err(e, error_code),
645 }
646 }
647}
648
649/// Image information returned by Query/ListImages.
650#[derive(Debug, Clone, Serialize, Deserialize)]
651pub struct ImageInfo {
652 /// Image reference.
653 pub reference: String,
654 /// Image digest (sha256:...).
655 pub digest: String,
656 /// Image size in bytes.
657 pub size: u64,
658 /// Creation timestamp (ISO 8601).
659 pub created: Option<String>,
660 /// Platform architecture.
661 pub architecture: String,
662 /// Platform OS.
663 pub os: String,
664 /// Number of layers.
665 pub layer_count: usize,
666 /// Layer digests in order.
667 pub layers: Vec<String>,
668 /// Image entrypoint (from OCI config).
669 #[serde(default)]
670 pub entrypoint: Vec<String>,
671 /// Image default command (from OCI config).
672 #[serde(default)]
673 pub cmd: Vec<String>,
674 /// Image environment variables (from OCI config).
675 #[serde(default)]
676 pub env: Vec<String>,
677 /// Image working directory (from OCI config).
678 #[serde(default)]
679 pub workdir: Option<String>,
680 /// Image default user (from OCI config).
681 #[serde(default)]
682 pub user: Option<String>,
683}
684
685/// Overlay preparation result.
686#[derive(Debug, Clone, Serialize, Deserialize)]
687pub struct OverlayInfo {
688 /// Path to the merged overlay rootfs.
689 pub rootfs_path: String,
690 /// Path to the upper (writable) directory.
691 pub upper_path: String,
692 /// Path to the work directory.
693 pub work_path: String,
694}
695
696/// Storage status information.
697#[derive(Debug, Clone, Serialize, Deserialize)]
698pub struct StorageStatus {
699 /// Whether the storage is formatted and ready.
700 pub ready: bool,
701 /// Total size in bytes.
702 pub total_bytes: u64,
703 /// Used size in bytes.
704 pub used_bytes: u64,
705 /// Number of cached layers.
706 pub layer_count: usize,
707 /// Number of cached images.
708 pub image_count: usize,
709}
710
711/// Registry authentication credentials for pulling images.
712///
713/// `Debug` is hand-written to redact the password: this value is carried inside
714/// `AgentRequest::Pull`, and any `{:?}` of that request (e.g. a tracing span)
715/// would otherwise serialize the token verbatim into the machine's console log,
716/// which is exposed over the logs API.
717#[derive(Clone, Serialize, Deserialize)]
718pub struct RegistryAuth {
719 /// Username for authentication.
720 pub username: String,
721 /// Password or token for authentication.
722 pub password: String,
723}
724
725impl std::fmt::Debug for RegistryAuth {
726 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
727 f.debug_struct("RegistryAuth")
728 .field("username", &self.username)
729 .field("password", &"***")
730 .finish()
731 }
732}
733
734// ============================================================================
735// Workload VM Protocol (Command Execution)
736// ============================================================================
737
738/// Messages from host to workload VM.
739#[derive(Debug, Clone, Serialize, Deserialize)]
740#[serde(tag = "type", rename_all = "snake_case")]
741pub enum HostMessage {
742 /// Authentication request.
743 Auth {
744 /// Authentication token (base64).
745 token: String,
746 /// Protocol version.
747 protocol_version: u32,
748 },
749
750 /// Run a command.
751 Run {
752 /// Request ID for correlating responses.
753 request_id: u64,
754 /// Command and arguments.
755 command: Vec<String>,
756 /// Environment variables.
757 env: Vec<(String, String)>,
758 /// Working directory.
759 workdir: Option<String>,
760 },
761
762 /// Execute a command in running VM.
763 Exec {
764 /// Request ID.
765 request_id: u64,
766 /// Command and arguments.
767 command: Vec<String>,
768 /// Allocate a TTY.
769 tty: bool,
770 },
771
772 /// Send a signal to a running command.
773 Signal {
774 /// Request ID of the command.
775 request_id: u64,
776 /// Signal number.
777 signal: i32,
778 },
779
780 /// Request graceful shutdown.
781 Stop {
782 /// Timeout in milliseconds.
783 timeout_ms: u64,
784 },
785}
786
787/// Messages from workload VM to host.
788#[derive(Debug, Clone, Serialize, Deserialize)]
789#[serde(tag = "type", rename_all = "snake_case")]
790pub enum GuestMessage {
791 /// Authentication successful.
792 AuthOk,
793
794 /// Authentication failed.
795 AuthFailed,
796
797 /// VM is ready to receive commands.
798 Ready,
799
800 /// Command started.
801 Started {
802 /// Request ID.
803 request_id: u64,
804 },
805
806 /// Stdout data from command.
807 Stdout {
808 /// Request ID.
809 request_id: u64,
810 /// Output data.
811 #[serde(with = "base64_bytes")]
812 data: Vec<u8>,
813 /// Whether output was truncated.
814 truncated: bool,
815 },
816
817 /// Stderr data from command.
818 Stderr {
819 /// Request ID.
820 request_id: u64,
821 /// Output data.
822 #[serde(with = "base64_bytes")]
823 data: Vec<u8>,
824 /// Whether output was truncated.
825 truncated: bool,
826 },
827
828 /// Command exited.
829 Exit {
830 /// Request ID.
831 request_id: u64,
832 /// Exit code.
833 code: i32,
834 /// Exit reason.
835 reason: String,
836 },
837
838 /// Error occurred.
839 Error {
840 /// Request ID (if applicable).
841 request_id: Option<u64>,
842 /// Error message.
843 message: String,
844 },
845}
846
847// ============================================================================
848// Wire Format Helpers
849// ============================================================================
850
851/// Envelope that wraps any message with an optional trace ID for correlation.
852///
853/// On the wire, the trace_id is flattened into the JSON alongside the message
854/// fields: `{"trace_id":"abc123","method":"ping"}`.
855#[derive(Debug, Clone, Serialize, Deserialize)]
856pub struct Envelope<T> {
857 /// Trace ID for correlating host API requests to agent operations.
858 #[serde(skip_serializing_if = "Option::is_none", default)]
859 pub trace_id: Option<String>,
860 /// The wrapped message.
861 #[serde(flatten)]
862 pub body: T,
863}
864
865impl<T> Envelope<T> {
866 /// Create an envelope with no trace ID.
867 pub fn new(body: T) -> Self {
868 Self {
869 trace_id: None,
870 body,
871 }
872 }
873
874 /// Create an envelope with an optional trace ID.
875 pub fn with_trace_id(body: T, trace_id: Option<String>) -> Self {
876 Self { trace_id, body }
877 }
878}
879
880/// Encode a message to wire format (length-prefixed JSON).
881pub fn encode_message<T: Serialize>(msg: &T) -> Result<Vec<u8>, serde_json::Error> {
882 let json = serde_json::to_vec(msg)?;
883 let len = json.len() as u32;
884
885 let mut buf = Vec::with_capacity(4 + json.len());
886 buf.extend_from_slice(&len.to_be_bytes());
887 buf.extend_from_slice(&json);
888
889 Ok(buf)
890}
891
892/// Decode a message from wire format.
893pub fn decode_message<T: for<'de> Deserialize<'de>>(data: &[u8]) -> Result<T, DecodeError> {
894 if data.len() < 4 {
895 return Err(DecodeError::TooShort);
896 }
897
898 let len = u32::from_be_bytes([data[0], data[1], data[2], data[3]]) as usize;
899
900 if len > MAX_FRAME_SIZE as usize {
901 return Err(DecodeError::TooLarge(len));
902 }
903
904 if data.len() < 4 + len {
905 return Err(DecodeError::Incomplete {
906 expected: len,
907 got: data.len() - 4,
908 });
909 }
910
911 serde_json::from_slice(&data[4..4 + len]).map_err(DecodeError::Json)
912}
913
914/// Error decoding a wire message.
915#[derive(Debug)]
916pub enum DecodeError {
917 /// Data too short to contain length header.
918 TooShort,
919 /// Frame size exceeds maximum.
920 TooLarge(usize),
921 /// Incomplete frame.
922 Incomplete {
923 /// Expected length.
924 expected: usize,
925 /// Actual length.
926 got: usize,
927 },
928 /// JSON parse error.
929 Json(serde_json::Error),
930}
931
932impl std::fmt::Display for DecodeError {
933 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
934 match self {
935 DecodeError::TooShort => write!(f, "data too short for length header"),
936 DecodeError::TooLarge(size) => write!(f, "frame too large: {} bytes", size),
937 DecodeError::Incomplete { expected, got } => {
938 write!(
939 f,
940 "incomplete frame: expected {} bytes, got {}",
941 expected, got
942 )
943 }
944 DecodeError::Json(e) => write!(f, "JSON decode error: {}", e),
945 }
946 }
947}
948
949impl std::error::Error for DecodeError {}
950
951#[cfg(test)]
952mod tests {
953 use super::*;
954
955 #[test]
956 fn test_encode_decode_roundtrip() {
957 let req = AgentRequest::Pull {
958 image: "alpine:latest".to_string(),
959 oci_platform: Some("linux/arm64".to_string()),
960 auth: None,
961 proxy: None,
962 no_proxy: None,
963 };
964
965 let encoded = encode_message(&req).unwrap();
966 let decoded: AgentRequest = decode_message(&encoded).unwrap();
967
968 let AgentRequest::Pull {
969 image,
970 oci_platform,
971 auth,
972 proxy,
973 no_proxy,
974 } = decoded
975 else {
976 panic!("expected Pull variant, got {:?}", decoded);
977 };
978 assert_eq!(image, "alpine:latest");
979 assert_eq!(oci_platform, Some("linux/arm64".to_string()));
980 assert!(auth.is_none());
981 assert!(proxy.is_none());
982 assert!(no_proxy.is_none());
983 }
984
985 #[test]
986 fn test_encode_decode_with_auth() {
987 let req = AgentRequest::Pull {
988 image: "ghcr.io/owner/repo:latest".to_string(),
989 oci_platform: None,
990 auth: Some(RegistryAuth {
991 username: "testuser".to_string(),
992 password: "testpass".to_string(),
993 }),
994 proxy: None,
995 no_proxy: None,
996 };
997
998 let encoded = encode_message(&req).unwrap();
999 let decoded: AgentRequest = decode_message(&encoded).unwrap();
1000
1001 let AgentRequest::Pull {
1002 image,
1003 oci_platform,
1004 auth,
1005 proxy: _,
1006 no_proxy: _,
1007 } = decoded
1008 else {
1009 panic!("expected Pull variant, got {:?}", decoded);
1010 };
1011 assert_eq!(image, "ghcr.io/owner/repo:latest");
1012 assert!(oci_platform.is_none());
1013 let auth = auth.expect("auth should be Some");
1014 assert_eq!(auth.username, "testuser");
1015 assert_eq!(auth.password, "testpass");
1016 }
1017
1018 #[test]
1019 fn test_encode_decode_with_proxy() {
1020 let req = AgentRequest::Pull {
1021 image: "alpine:latest".to_string(),
1022 oci_platform: None,
1023 auth: None,
1024 proxy: Some("http://192.168.127.254:3128".to_string()),
1025 no_proxy: Some("127.0.0.1,localhost,.internal".to_string()),
1026 };
1027
1028 let encoded = encode_message(&req).unwrap();
1029 let decoded: AgentRequest = decode_message(&encoded).unwrap();
1030
1031 let AgentRequest::Pull {
1032 proxy, no_proxy, ..
1033 } = decoded
1034 else {
1035 panic!("expected Pull variant, got {:?}", decoded);
1036 };
1037 assert_eq!(proxy.as_deref(), Some("http://192.168.127.254:3128"));
1038 assert_eq!(no_proxy.as_deref(), Some("127.0.0.1,localhost,.internal"));
1039 }
1040
1041 #[test]
1042 fn test_decode_too_short() {
1043 let data = [0u8; 2];
1044 let result: Result<AgentRequest, _> = decode_message(&data);
1045 assert!(matches!(result, Err(DecodeError::TooShort)));
1046 }
1047
1048 #[test]
1049 fn test_decode_incomplete() {
1050 let mut data = vec![0, 0, 0, 100]; // claims 100 bytes
1051 data.extend_from_slice(b"{}"); // only 2 bytes of payload
1052 let result: Result<AgentRequest, _> = decode_message(&data);
1053 assert!(matches!(result, Err(DecodeError::Incomplete { .. })));
1054 }
1055
1056 #[test]
1057 fn test_agent_request_serialization() {
1058 let req = AgentRequest::Ping;
1059 let json = serde_json::to_string(&req).unwrap();
1060 assert!(json.contains("ping"));
1061
1062 let req = AgentRequest::PrepareOverlay {
1063 image: "ubuntu:22.04".to_string(),
1064 workload_id: "wl-123".to_string(),
1065 };
1066 let json = serde_json::to_string(&req).unwrap();
1067 assert!(json.contains("prepare_overlay"));
1068 }
1069
1070 #[test]
1071 fn test_agent_response_serialization() {
1072 let resp = AgentResponse::Pong {
1073 version: PROTOCOL_VERSION,
1074 };
1075 let json = serde_json::to_string(&resp).unwrap();
1076 assert!(json.contains("pong"));
1077
1078 let resp = AgentResponse::Progress {
1079 message: "Pulling layer 1/3".to_string(),
1080 percent: Some(33),
1081 layer: Some("sha256:abc123".to_string()),
1082 };
1083 let json = serde_json::to_string(&resp).unwrap();
1084 assert!(json.contains("progress"));
1085 }
1086
1087 #[test]
1088 fn file_write_begin_roundtrips() {
1089 let req = AgentRequest::FileWriteBegin {
1090 path: "/tmp/target".into(),
1091 mode: Some(0o600),
1092 total_size: 123_456_789,
1093 };
1094 let bytes = encode_message(&req).unwrap();
1095 let back: AgentRequest = decode_message(&bytes).unwrap();
1096 match back {
1097 AgentRequest::FileWriteBegin {
1098 path,
1099 mode,
1100 total_size,
1101 } => {
1102 assert_eq!(path, "/tmp/target");
1103 assert_eq!(mode, Some(0o600));
1104 assert_eq!(total_size, 123_456_789);
1105 }
1106 _ => panic!("wrong variant"),
1107 }
1108 }
1109
1110 #[test]
1111 fn file_write_chunk_roundtrips_binary_data() {
1112 // Binary data (bytes outside UTF-8) must survive the base64
1113 // trip intact. If the encoding ever silently lossifies, this
1114 // fires.
1115 let payload: Vec<u8> = (0u8..=255).collect();
1116 let req = AgentRequest::FileWriteChunk {
1117 data: payload.clone(),
1118 done: true,
1119 };
1120 let bytes = encode_message(&req).unwrap();
1121 let back: AgentRequest = decode_message(&bytes).unwrap();
1122 match back {
1123 AgentRequest::FileWriteChunk { data, done } => {
1124 assert_eq!(data, payload);
1125 assert!(done);
1126 }
1127 _ => panic!("wrong variant"),
1128 }
1129 }
1130
1131 #[test]
1132 fn file_write_size_constants_are_frame_safe() {
1133 // Sanity: a single streaming chunk at FILE_WRITE_CHUNK_SIZE
1134 // must fit inside MAX_FRAME_SIZE after base64 (+ ~33%) and
1135 // JSON overhead. If anyone bumps CHUNK_SIZE past the limit,
1136 // this test fires before production does.
1137 let chunk_bytes = FILE_WRITE_CHUNK_SIZE as u64;
1138 let base64_bytes = chunk_bytes.div_ceil(3) * 4; // ceil(n/3)*4
1139 let json_overhead = 256u64; // method tag, done bool, quotes
1140 let total = base64_bytes + json_overhead;
1141 assert!(
1142 total < MAX_FRAME_SIZE as u64,
1143 "FILE_WRITE_CHUNK_SIZE of {} bytes would produce a frame \
1144 of ~{} bytes which exceeds MAX_FRAME_SIZE of {}",
1145 chunk_bytes,
1146 total,
1147 MAX_FRAME_SIZE
1148 );
1149
1150 // Single-shot threshold must be <= chunk size. They can be
1151 // equal (a 1 MiB file is a single shot; a 1 MiB + 1 byte
1152 // file streams as two chunks); but SINGLE_SHOT > CHUNK would
1153 // be incoherent — a file slightly over the shot threshold
1154 // would need to stream as... a single oversized chunk.
1155 assert!(FILE_WRITE_SINGLE_SHOT_MAX <= FILE_WRITE_CHUNK_SIZE);
1156 }
1157
1158 #[test]
1159 fn test_ports_constants() {
1160 assert_eq!(ports::WORKLOAD_CONTROL, 5000);
1161 assert_eq!(ports::WORKLOAD_LOGS, 5001);
1162 assert_eq!(ports::AGENT_CONTROL, 6000);
1163 assert_eq!(ports::SSH_AGENT, 6001);
1164 }
1165
1166 #[test]
1167 fn test_cid_constants() {
1168 assert_eq!(cid::HOST, 2);
1169 assert_eq!(cid::GUEST, 3);
1170 }
1171
1172 #[test]
1173 fn test_envelope_serialization_with_trace_id() {
1174 let req = AgentRequest::Ping;
1175 let envelope = Envelope::with_trace_id(&req, Some("abc123".to_string()));
1176 let json = serde_json::to_string(&envelope).unwrap();
1177
1178 // trace_id should be flattened alongside the method tag
1179 assert!(json.contains("\"trace_id\":\"abc123\""));
1180 assert!(json.contains("\"method\":\"ping\""));
1181
1182 // Deserialize back — Envelope<AgentRequest> with flatten
1183 let parsed: Envelope<AgentRequest> = serde_json::from_str(&json).unwrap();
1184 assert_eq!(parsed.trace_id.as_deref(), Some("abc123"));
1185 assert!(matches!(parsed.body, AgentRequest::Ping));
1186 }
1187
1188 #[test]
1189 fn test_envelope_without_trace_id() {
1190 let req = AgentRequest::Ping;
1191 let envelope = Envelope::new(&req);
1192 let json = serde_json::to_string(&envelope).unwrap();
1193
1194 // No trace_id field (skip_serializing_if = None)
1195 assert!(!json.contains("trace_id"));
1196 assert!(json.contains("\"method\":\"ping\""));
1197 }
1198
1199 #[test]
1200 fn test_envelope_backward_compat_bare_request() {
1201 // A bare AgentRequest (no Envelope) should fail to parse as Envelope
1202 // but succeed as bare AgentRequest — this is the agent's fallback path
1203 let bare_json = r#"{"method":"ping"}"#;
1204
1205 // Envelope parse should fail (no body field to flatten into)
1206 // Actually with flatten, this may work — let's verify
1207 let envelope_result = serde_json::from_str::<Envelope<AgentRequest>>(bare_json);
1208 let bare_result = serde_json::from_str::<AgentRequest>(bare_json);
1209
1210 // At least one must succeed for backward compat
1211 assert!(
1212 envelope_result.is_ok() || bare_result.is_ok(),
1213 "Neither Envelope nor bare parse succeeded"
1214 );
1215
1216 // Bare parse must always work
1217 assert!(bare_result.is_ok());
1218 assert!(matches!(bare_result.unwrap(), AgentRequest::Ping));
1219
1220 // If Envelope works, trace_id should be None
1221 if let Ok(env) = envelope_result {
1222 assert!(env.trace_id.is_none());
1223 }
1224 }
1225}