smolvm_protocol/lib.rs
1//! Protocol types for smolvm host-guest communication.
2//!
3//! This crate defines the wire protocol for vsock communication between
4//! the smolvm host and the guest agent (smolvm-agent).
5//!
6//! # Protocol Overview
7//!
8//! Communication uses JSON-encoded messages over vsock. Each message is
9//! prefixed with a 4-byte big-endian length header.
10//!
11//! ```text
12//! +----------------+-------------------+
13//! | Length (4 BE) | JSON payload |
14//! +----------------+-------------------+
15//! ```
16
17#![deny(missing_docs)]
18
19use serde::{Deserialize, Serialize};
20
21pub mod retry;
22
23/// Serde helper for encoding `Vec<u8>` as a base64 string in JSON.
24///
25/// Without this, serde_json serializes `Vec<u8>` as a JSON array of numbers
26/// (e.g., `[104,101,108,108,111]`), which inflates binary data by ~4x.
27/// Base64 encoding reduces this to ~1.33x.
28pub mod base64_bytes {
29 use base64::{engine::general_purpose::STANDARD, Engine};
30 use serde::{Deserialize, Deserializer, Serializer};
31
32 /// Serialize `Vec<u8>` as a base64 string.
33 pub fn serialize<S: Serializer>(data: &[u8], serializer: S) -> Result<S::Ok, S::Error> {
34 serializer.serialize_str(&STANDARD.encode(data))
35 }
36
37 /// Deserialize a base64 string into `Vec<u8>`.
38 pub fn deserialize<'de, D: Deserializer<'de>>(deserializer: D) -> Result<Vec<u8>, D::Error> {
39 let s = String::deserialize(deserializer)?;
40 STANDARD.decode(&s).map_err(serde::de::Error::custom)
41 }
42}
43
44/// Protocol version.
45pub const PROTOCOL_VERSION: u32 = 1;
46
47/// Maximum frame size (32 MB - layer exports use chunked streaming).
48pub const MAX_FRAME_SIZE: u32 = 32 * 1024 * 1024;
49
50/// Chunk size for streaming layer data (~16 MB raw, ~21 MB as base64 JSON).
51pub const LAYER_CHUNK_SIZE: usize = 16 * 1024 * 1024;
52
53/// Files at or below this size are written with a single `FileWrite`
54/// message. Larger files must stream via
55/// `FileWriteBegin` + `FileWriteChunk` so no single frame approaches
56/// [`MAX_FRAME_SIZE`] (base64 + JSON inflation is ~1.4x).
57///
58/// Chosen to keep the single-shot frame comfortably under the frame
59/// limit while preserving the fast-path latency for small config
60/// files / scripts / keys.
61pub const FILE_WRITE_SINGLE_SHOT_MAX: usize = 1024 * 1024;
62
63/// Payload bytes per streaming upload chunk. Deliberately small —
64/// equal to [`FILE_WRITE_SINGLE_SHOT_MAX`] — so each chunk's encoded
65/// frame (~1.4 MB) fits inside typical kernel Unix-socket send
66/// buffers (`SO_SNDBUF` defaults on the order of 200–256 KiB but
67/// can grow). Larger chunks would force `write_all` to spin waiting
68/// for the agent to drain, and any latency spike trips the 10 s
69/// write timeout with `EAGAIN` — exactly the failure David
70/// reproduced before this fix landed.
71///
72/// Note: [`LAYER_CHUNK_SIZE`] is 16 MiB for agent→host (download)
73/// streaming, which works because the host side of the socket has
74/// more headroom than the guest side. Upload streaming is the
75/// asymmetric case and needs a smaller chunk.
76pub const FILE_WRITE_CHUNK_SIZE: usize = FILE_WRITE_SINGLE_SHOT_MAX;
77
78/// Hard ceiling on a single file transfer in either direction.
79///
80/// On the write path: enforced at `FileWriteBegin` by the agent —
81/// `total_size > FILE_TRANSFER_MAX_TOTAL` is rejected before any
82/// staging file is created.
83///
84/// On the read path: enforced by the host's `read_file` loop —
85/// after the first chunk that pushes the accumulated total past the
86/// cap, the call bails with an error and the partial buffer is
87/// dropped. This protects the host process from OOM if the guest
88/// (compromised or merely buggy) streams unbounded data.
89///
90/// 4 GiB matches the order-of-magnitude of the default overlay disk
91/// and the `gpu_vram_mib` cap. Callers that need to move larger
92/// blobs should stage via a virtiofs mount instead of `cp`.
93pub const FILE_TRANSFER_MAX_TOTAL: u64 = 4 * 1024 * 1024 * 1024;
94
95/// Well-known vsock ports.
96pub mod ports {
97 /// Control channel for workload VMs.
98 pub const WORKLOAD_CONTROL: u32 = 5000;
99 /// Log streaming from workload VMs.
100 pub const WORKLOAD_LOGS: u32 = 5001;
101 /// Agent control port (for OCI operations and management).
102 pub const AGENT_CONTROL: u32 = 6000;
103 /// SSH agent forwarding (host SSH_AUTH_SOCK bridged to guest).
104 pub const SSH_AGENT: u32 = 6001;
105 /// DNS filtering proxy (guest forwards DNS queries to host for filtering).
106 pub const DNS_FILTER: u32 = 6002;
107}
108
109/// vsock CID constants.
110pub mod cid {
111 /// Host CID (always 2).
112 pub const HOST: u32 = 2;
113 /// Guest CID (always 3 for the first/only guest).
114 pub const GUEST: u32 = 3;
115 /// Any CID (for listening).
116 pub const ANY: u32 = u32::MAX;
117}
118
119// ============================================================================
120// Agent Protocol (OCI Operations)
121// ============================================================================
122
123/// Agent request types (for image management and OCI operations).
124#[derive(Debug, Clone, Serialize, Deserialize)]
125#[serde(tag = "method", rename_all = "snake_case")]
126pub enum AgentRequest {
127 /// Ping to check if agent is alive.
128 Ping,
129
130 /// Pull an OCI image and extract layers.
131 Pull {
132 /// Image reference (e.g., "alpine:latest", "docker.io/library/ubuntu:22.04").
133 image: String,
134 /// OCI platform to pull (e.g., "linux/arm64", "linux/amd64").
135 oci_platform: Option<String>,
136 /// Optional registry authentication credentials.
137 #[serde(default, skip_serializing_if = "Option::is_none")]
138 auth: Option<RegistryAuth>,
139 },
140
141 /// Query if an image exists locally.
142 Query {
143 /// Image reference.
144 image: String,
145 },
146
147 /// List all cached images.
148 ListImages,
149
150 /// Run garbage collection on unused layers.
151 GarbageCollect {
152 /// If true, only report what would be deleted.
153 dry_run: bool,
154 /// If true, delete all image manifests and configs first,
155 /// making all layers unreferenced so they get collected.
156 #[serde(default)]
157 purge_all: bool,
158 },
159
160 /// Prepare overlay rootfs for a workload.
161 PrepareOverlay {
162 /// Image reference.
163 image: String,
164 /// Unique workload ID for the overlay.
165 workload_id: String,
166 },
167
168 /// Clean up overlay rootfs for a workload.
169 CleanupOverlay {
170 /// Workload ID to clean up.
171 workload_id: String,
172 },
173
174 /// Format the storage disk (first-time setup).
175 FormatStorage,
176
177 /// Get storage disk status.
178 StorageStatus,
179
180 /// Test network connectivity directly from the agent (not via chroot).
181 /// Used to debug TSI networking.
182 NetworkTest {
183 /// URL to test (e.g., "http://1.1.1.1")
184 url: String,
185 },
186
187 /// Shutdown the agent.
188 Shutdown,
189
190 /// Export a layer as a tar archive.
191 ///
192 /// Used by `smolvm pack` to extract OCI layers for packaging.
193 /// The agent streams the layer tar data back via LayerData responses.
194 ExportLayer {
195 /// Image digest (sha256:...).
196 image_digest: String,
197 /// Layer index (0-based).
198 layer_index: usize,
199 },
200
201 /// Execute a command directly in the VM (not in a container).
202 ///
203 /// This runs the command in the agent's Alpine rootfs without any
204 /// container isolation. Useful for VM-level operations and debugging.
205 VmExec {
206 /// Command and arguments.
207 command: Vec<String>,
208 /// Environment variables.
209 #[serde(default)]
210 env: Vec<(String, String)>,
211 /// Working directory in the VM.
212 workdir: Option<String>,
213 /// Timeout in milliseconds.
214 #[serde(default)]
215 timeout_ms: Option<u64>,
216 /// Interactive mode - stream I/O instead of buffering.
217 #[serde(default)]
218 interactive: bool,
219 /// Allocate a pseudo-TTY for the command.
220 #[serde(default)]
221 tty: bool,
222 /// Background mode - spawn and return PID immediately without waiting.
223 #[serde(default)]
224 background: bool,
225 },
226
227 /// Run a command in an image's rootfs.
228 ///
229 /// This prepares an overlay, chroots into it, and executes the command.
230 /// Returns stdout, stderr, and exit code when the command completes.
231 Run {
232 /// Image reference (must be pulled first).
233 image: String,
234 /// Command and arguments.
235 command: Vec<String>,
236 /// Environment variables.
237 #[serde(default)]
238 env: Vec<(String, String)>,
239 /// Working directory inside the rootfs.
240 workdir: Option<String>,
241 /// User inside the rootfs. If omitted, the OCI image default applies.
242 #[serde(default, skip_serializing_if = "Option::is_none")]
243 user: Option<String>,
244 /// Volume mounts to bind into the container.
245 /// Each tuple is (virtiofs_tag, container_path, read_only).
246 #[serde(default)]
247 mounts: Vec<(String, String, bool)>,
248 /// Timeout in milliseconds. If the command exceeds this duration,
249 /// it will be killed and return exit code 124.
250 #[serde(default)]
251 timeout_ms: Option<u64>,
252 /// Interactive mode - stream I/O instead of buffering.
253 /// When true, output is streamed via Stdout/Stderr responses,
254 /// and stdin can be sent via the Stdin request.
255 #[serde(default)]
256 interactive: bool,
257 /// Allocate a pseudo-TTY for the command.
258 /// Enables terminal features like colors, line editing, and signal handling.
259 #[serde(default)]
260 tty: bool,
261 /// Detached mode — start the container and return immediately with the
262 /// container ID. Only meaningful when `persistent_overlay_id` is set.
263 /// Returns a `Completed` response with `stdout` containing the container ID.
264 #[serde(default)]
265 detached: bool,
266 /// If set, use a persistent overlay that survives across exec sessions.
267 /// The overlay is identified by this ID (typically the machine name)
268 /// and reused on subsequent runs. If not set, an ephemeral overlay is
269 /// created and destroyed after the run.
270 #[serde(default, skip_serializing_if = "Option::is_none")]
271 persistent_overlay_id: Option<String>,
272 /// Spawn the container and return immediately with the crun PID.
273 /// The container runs detached; stdout/stderr go to /dev/null.
274 /// Incompatible with `interactive` and `tty`.
275 #[serde(default)]
276 background: bool,
277 },
278
279 /// Send stdin data to a running interactive command.
280 Stdin {
281 /// Input data to send to the command's stdin.
282 #[serde(with = "base64_bytes")]
283 data: Vec<u8>,
284 },
285
286 /// Resize the PTY window (for TTY mode).
287 Resize {
288 /// New width in columns.
289 cols: u16,
290 /// New height in rows.
291 rows: u16,
292 },
293
294 // ========================================================================
295 // File I/O
296 // ========================================================================
297 /// Write a file inside the VM in a single message.
298 ///
299 /// Use only for files up to [`FILE_WRITE_SINGLE_SHOT_MAX`]. Larger
300 /// files must stream via [`Self::FileWriteBegin`] +
301 /// [`Self::FileWriteChunk`] to avoid exceeding [`MAX_FRAME_SIZE`]
302 /// after base64 + JSON inflation.
303 FileWrite {
304 /// Absolute path in the VM filesystem.
305 path: String,
306 /// File contents.
307 #[serde(with = "base64_bytes")]
308 data: Vec<u8>,
309 /// File mode (e.g., 0o644). None = default (0644).
310 #[serde(default)]
311 mode: Option<u32>,
312 },
313
314 /// Open a streaming file upload session on this connection.
315 ///
316 /// Must be followed by one or more [`Self::FileWriteChunk`]
317 /// requests. The final chunk sets `done: true` to finalize.
318 /// Dropping the connection (or sending any non-chunk request)
319 /// before `done` aborts the session and leaves no partial file
320 /// at `path`.
321 ///
322 /// Sessions are per-connection — one session at a time.
323 FileWriteBegin {
324 /// Absolute path in the VM filesystem.
325 path: String,
326 /// File mode (e.g., 0o644). None = default (0644).
327 #[serde(default)]
328 mode: Option<u32>,
329 /// Expected total size in bytes. Rejected if it exceeds
330 /// [`FILE_TRANSFER_MAX_TOTAL`]. The agent uses this for an
331 /// early-fail check only; the actual size written is the sum
332 /// of chunk byte lengths.
333 total_size: u64,
334 },
335
336 /// Append a chunk to the currently open streaming upload.
337 /// If `done` is true, the agent fsyncs and atomically renames the
338 /// staging file onto the target path.
339 FileWriteChunk {
340 /// Chunk bytes. Typically [`FILE_WRITE_CHUNK_SIZE`] except
341 /// for the last chunk.
342 #[serde(with = "base64_bytes")]
343 data: Vec<u8>,
344 /// True on the final chunk; closes and renames the staging
345 /// file. False on intermediate chunks.
346 done: bool,
347 },
348
349 /// Read a file from the VM.
350 FileRead {
351 /// Absolute path in the VM filesystem.
352 path: String,
353 },
354}
355
356/// Agent response types.
357#[derive(Debug, Clone, Serialize, Deserialize)]
358#[serde(tag = "status", rename_all = "snake_case")]
359pub enum AgentResponse {
360 /// Operation completed successfully.
361 Ok {
362 /// Response data (varies by request type).
363 #[serde(default, skip_serializing_if = "Option::is_none")]
364 data: Option<serde_json::Value>,
365 },
366
367 /// Pong response to ping.
368 Pong {
369 /// Protocol version.
370 version: u32,
371 },
372
373 /// Progress update (for long operations like pull).
374 Progress {
375 /// Human-readable message.
376 message: String,
377 /// Completion percentage (0-100).
378 #[serde(default, skip_serializing_if = "Option::is_none")]
379 percent: Option<u8>,
380 /// Current layer being processed.
381 #[serde(default, skip_serializing_if = "Option::is_none")]
382 layer: Option<String>,
383 },
384
385 /// Operation failed.
386 Error {
387 /// Error message.
388 message: String,
389 /// Error code (for programmatic handling).
390 #[serde(default, skip_serializing_if = "Option::is_none")]
391 code: Option<String>,
392 },
393
394 /// Command execution completed (non-interactive mode).
395 Completed {
396 /// Exit code from the command.
397 exit_code: i32,
398 /// Standard output (may be truncated). `Vec<u8>` preserves binary
399 /// output (image bytes, tarballs, etc.) that would be truncated by
400 /// `String` at the first non-UTF-8 byte. Serialized as base64 JSON
401 /// string — the same format as the streaming `Stdout` variant.
402 #[serde(with = "base64_bytes")]
403 stdout: Vec<u8>,
404 /// Standard error (may be truncated).
405 #[serde(with = "base64_bytes")]
406 stderr: Vec<u8>,
407 },
408
409 /// Command started (interactive mode).
410 /// Indicates the command is running and ready to receive stdin.
411 Started,
412
413 /// Stdout data from a running command (interactive mode).
414 Stdout {
415 /// Output data.
416 #[serde(with = "base64_bytes")]
417 data: Vec<u8>,
418 },
419
420 /// Stderr data from a running command (interactive mode).
421 Stderr {
422 /// Error output data.
423 #[serde(with = "base64_bytes")]
424 data: Vec<u8>,
425 },
426
427 /// Command exited (interactive mode).
428 Exited {
429 /// Exit code from the command.
430 exit_code: i32,
431 },
432
433 /// Streaming binary-data chunk.
434 ///
435 /// Used by every streaming download direction: the agent sends
436 /// one or more `DataChunk` responses in sequence, with `done: true`
437 /// on the final chunk. Current producers: `ExportLayer` and
438 /// `FileRead`.
439 ///
440 /// Payload size per chunk should stay under
441 /// [`LAYER_CHUNK_SIZE`] so the encoded frame (~1.33× after
442 /// base64) fits inside [`MAX_FRAME_SIZE`] with JSON overhead to
443 /// spare.
444 DataChunk {
445 /// Chunk bytes. Empty allowed on the final frame (common for
446 /// EOF-on-clean-boundary cases).
447 #[serde(with = "base64_bytes")]
448 data: Vec<u8>,
449 /// True on the final chunk of the stream.
450 done: bool,
451 },
452}
453
454// ============================================================================
455// Error Code Constants
456// ============================================================================
457//
458// Standard error codes for AgentResponse::Error. Using constants ensures
459// consistency across the codebase and makes error handling more reliable.
460
461/// Error codes for agent responses.
462pub mod error_codes {
463 /// Request payload was invalid or malformed.
464 pub const INVALID_REQUEST: &str = "INVALID_REQUEST";
465 /// Requested resource was not found.
466 pub const NOT_FOUND: &str = "NOT_FOUND";
467 /// Internal error during operation.
468 pub const INTERNAL_ERROR: &str = "INTERNAL_ERROR";
469 /// Image pull operation failed.
470 pub const PULL_FAILED: &str = "PULL_FAILED";
471 /// Image query operation failed.
472 pub const QUERY_FAILED: &str = "QUERY_FAILED";
473 /// Command execution failed.
474 pub const RUN_FAILED: &str = "RUN_FAILED";
475 /// Command execution failed in container.
476 pub const EXEC_FAILED: &str = "EXEC_FAILED";
477 /// Process spawn failed.
478 pub const SPAWN_FAILED: &str = "SPAWN_FAILED";
479 /// Mount operation failed.
480 pub const MOUNT_FAILED: &str = "MOUNT_FAILED";
481 /// File I/O operation failed.
482 pub const FILE_IO_FAILED: &str = "FILE_IO_FAILED";
483 /// Overlay filesystem operation failed.
484 pub const OVERLAY_FAILED: &str = "OVERLAY_FAILED";
485 /// Cleanup operation failed.
486 pub const CLEANUP_FAILED: &str = "CLEANUP_FAILED";
487 /// Storage format operation failed.
488 pub const FORMAT_FAILED: &str = "FORMAT_FAILED";
489 /// Storage status query failed.
490 pub const STATUS_FAILED: &str = "STATUS_FAILED";
491 /// List operation failed.
492 pub const LIST_FAILED: &str = "LIST_FAILED";
493 /// Garbage collection failed.
494 pub const GC_FAILED: &str = "GC_FAILED";
495 /// Container creation failed.
496 pub const CREATE_FAILED: &str = "CREATE_FAILED";
497 /// Container start failed.
498 pub const START_FAILED: &str = "START_FAILED";
499 /// Container stop failed.
500 pub const STOP_FAILED: &str = "STOP_FAILED";
501 /// Container delete failed.
502 pub const DELETE_FAILED: &str = "DELETE_FAILED";
503 /// Export operation failed.
504 pub const EXPORT_FAILED: &str = "EXPORT_FAILED";
505 /// Serialization error.
506 pub const SERIALIZATION_ERROR: &str = "SERIALIZATION_ERROR";
507 /// Message size exceeds maximum.
508 pub const MESSAGE_TOO_LARGE: &str = "MESSAGE_TOO_LARGE";
509 /// Process wait operation failed.
510 pub const WAIT_FAILED: &str = "WAIT_FAILED";
511}
512
513impl AgentResponse {
514 /// Create an error response with the given message and code.
515 ///
516 /// # Example
517 ///
518 /// ```
519 /// use smolvm_protocol::{AgentResponse, error_codes};
520 ///
521 /// let response = AgentResponse::error("image not found", error_codes::NOT_FOUND);
522 /// ```
523 pub fn error(message: impl Into<String>, code: &str) -> Self {
524 AgentResponse::Error {
525 message: message.into(),
526 code: Some(code.to_string()),
527 }
528 }
529
530 /// Create an error response from a Result's error, with the given code.
531 ///
532 /// # Example
533 ///
534 /// ```ignore
535 /// let response = some_operation()
536 /// .map(|data| AgentResponse::ok_with_data(data))
537 /// .unwrap_or_else(|e| AgentResponse::from_err(e, error_codes::PULL_FAILED));
538 /// ```
539 pub fn from_err<E: std::fmt::Display>(err: E, code: &str) -> Self {
540 AgentResponse::Error {
541 message: err.to_string(),
542 code: Some(code.to_string()),
543 }
544 }
545
546 /// Create an Ok response with optional JSON data.
547 pub fn ok(data: Option<serde_json::Value>) -> Self {
548 AgentResponse::Ok { data }
549 }
550
551 /// Create an Ok response with JSON-serializable data.
552 ///
553 /// Returns an error response if serialization fails.
554 pub fn ok_with_data<T: serde::Serialize>(data: T) -> Self {
555 match serde_json::to_value(data) {
556 Ok(value) => AgentResponse::Ok { data: Some(value) },
557 Err(e) => AgentResponse::error(
558 format!("failed to serialize response: {}", e),
559 error_codes::SERIALIZATION_ERROR,
560 ),
561 }
562 }
563
564 /// Convert a Result into an AgentResponse.
565 ///
566 /// On success, serializes the value to JSON. On error, creates an error response.
567 ///
568 /// # Example
569 ///
570 /// ```ignore
571 /// let response = AgentResponse::from_result(
572 /// storage::pull_image(image),
573 /// error_codes::PULL_FAILED,
574 /// );
575 /// ```
576 pub fn from_result<T, E>(result: Result<T, E>, error_code: &str) -> Self
577 where
578 T: serde::Serialize,
579 E: std::fmt::Display,
580 {
581 match result {
582 Ok(data) => Self::ok_with_data(data),
583 Err(e) => Self::from_err(e, error_code),
584 }
585 }
586}
587
588/// Image information returned by Query/ListImages.
589#[derive(Debug, Clone, Serialize, Deserialize)]
590pub struct ImageInfo {
591 /// Image reference.
592 pub reference: String,
593 /// Image digest (sha256:...).
594 pub digest: String,
595 /// Image size in bytes.
596 pub size: u64,
597 /// Creation timestamp (ISO 8601).
598 pub created: Option<String>,
599 /// Platform architecture.
600 pub architecture: String,
601 /// Platform OS.
602 pub os: String,
603 /// Number of layers.
604 pub layer_count: usize,
605 /// Layer digests in order.
606 pub layers: Vec<String>,
607 /// Image entrypoint (from OCI config).
608 #[serde(default)]
609 pub entrypoint: Vec<String>,
610 /// Image default command (from OCI config).
611 #[serde(default)]
612 pub cmd: Vec<String>,
613 /// Image environment variables (from OCI config).
614 #[serde(default)]
615 pub env: Vec<String>,
616 /// Image working directory (from OCI config).
617 #[serde(default)]
618 pub workdir: Option<String>,
619 /// Image default user (from OCI config).
620 #[serde(default)]
621 pub user: Option<String>,
622}
623
624/// Overlay preparation result.
625#[derive(Debug, Clone, Serialize, Deserialize)]
626pub struct OverlayInfo {
627 /// Path to the merged overlay rootfs.
628 pub rootfs_path: String,
629 /// Path to the upper (writable) directory.
630 pub upper_path: String,
631 /// Path to the work directory.
632 pub work_path: String,
633}
634
635/// Storage status information.
636#[derive(Debug, Clone, Serialize, Deserialize)]
637pub struct StorageStatus {
638 /// Whether the storage is formatted and ready.
639 pub ready: bool,
640 /// Total size in bytes.
641 pub total_bytes: u64,
642 /// Used size in bytes.
643 pub used_bytes: u64,
644 /// Number of cached layers.
645 pub layer_count: usize,
646 /// Number of cached images.
647 pub image_count: usize,
648}
649
650/// Registry authentication credentials for pulling images.
651#[derive(Debug, Clone, Serialize, Deserialize)]
652pub struct RegistryAuth {
653 /// Username for authentication.
654 pub username: String,
655 /// Password or token for authentication.
656 pub password: String,
657}
658
659// ============================================================================
660// Workload VM Protocol (Command Execution)
661// ============================================================================
662
663/// Messages from host to workload VM.
664#[derive(Debug, Clone, Serialize, Deserialize)]
665#[serde(tag = "type", rename_all = "snake_case")]
666pub enum HostMessage {
667 /// Authentication request.
668 Auth {
669 /// Authentication token (base64).
670 token: String,
671 /// Protocol version.
672 protocol_version: u32,
673 },
674
675 /// Run a command.
676 Run {
677 /// Request ID for correlating responses.
678 request_id: u64,
679 /// Command and arguments.
680 command: Vec<String>,
681 /// Environment variables.
682 env: Vec<(String, String)>,
683 /// Working directory.
684 workdir: Option<String>,
685 },
686
687 /// Execute a command in running VM.
688 Exec {
689 /// Request ID.
690 request_id: u64,
691 /// Command and arguments.
692 command: Vec<String>,
693 /// Allocate a TTY.
694 tty: bool,
695 },
696
697 /// Send a signal to a running command.
698 Signal {
699 /// Request ID of the command.
700 request_id: u64,
701 /// Signal number.
702 signal: i32,
703 },
704
705 /// Request graceful shutdown.
706 Stop {
707 /// Timeout in milliseconds.
708 timeout_ms: u64,
709 },
710}
711
712/// Messages from workload VM to host.
713#[derive(Debug, Clone, Serialize, Deserialize)]
714#[serde(tag = "type", rename_all = "snake_case")]
715pub enum GuestMessage {
716 /// Authentication successful.
717 AuthOk,
718
719 /// Authentication failed.
720 AuthFailed,
721
722 /// VM is ready to receive commands.
723 Ready,
724
725 /// Command started.
726 Started {
727 /// Request ID.
728 request_id: u64,
729 },
730
731 /// Stdout data from command.
732 Stdout {
733 /// Request ID.
734 request_id: u64,
735 /// Output data.
736 #[serde(with = "base64_bytes")]
737 data: Vec<u8>,
738 /// Whether output was truncated.
739 truncated: bool,
740 },
741
742 /// Stderr data from command.
743 Stderr {
744 /// Request ID.
745 request_id: u64,
746 /// Output data.
747 #[serde(with = "base64_bytes")]
748 data: Vec<u8>,
749 /// Whether output was truncated.
750 truncated: bool,
751 },
752
753 /// Command exited.
754 Exit {
755 /// Request ID.
756 request_id: u64,
757 /// Exit code.
758 code: i32,
759 /// Exit reason.
760 reason: String,
761 },
762
763 /// Error occurred.
764 Error {
765 /// Request ID (if applicable).
766 request_id: Option<u64>,
767 /// Error message.
768 message: String,
769 },
770}
771
772// ============================================================================
773// Wire Format Helpers
774// ============================================================================
775
776/// Envelope that wraps any message with an optional trace ID for correlation.
777///
778/// On the wire, the trace_id is flattened into the JSON alongside the message
779/// fields: `{"trace_id":"abc123","method":"ping"}`.
780#[derive(Debug, Clone, Serialize, Deserialize)]
781pub struct Envelope<T> {
782 /// Trace ID for correlating host API requests to agent operations.
783 #[serde(skip_serializing_if = "Option::is_none", default)]
784 pub trace_id: Option<String>,
785 /// The wrapped message.
786 #[serde(flatten)]
787 pub body: T,
788}
789
790impl<T> Envelope<T> {
791 /// Create an envelope with no trace ID.
792 pub fn new(body: T) -> Self {
793 Self {
794 trace_id: None,
795 body,
796 }
797 }
798
799 /// Create an envelope with an optional trace ID.
800 pub fn with_trace_id(body: T, trace_id: Option<String>) -> Self {
801 Self { trace_id, body }
802 }
803}
804
805/// Encode a message to wire format (length-prefixed JSON).
806pub fn encode_message<T: Serialize>(msg: &T) -> Result<Vec<u8>, serde_json::Error> {
807 let json = serde_json::to_vec(msg)?;
808 let len = json.len() as u32;
809
810 let mut buf = Vec::with_capacity(4 + json.len());
811 buf.extend_from_slice(&len.to_be_bytes());
812 buf.extend_from_slice(&json);
813
814 Ok(buf)
815}
816
817/// Decode a message from wire format.
818pub fn decode_message<T: for<'de> Deserialize<'de>>(data: &[u8]) -> Result<T, DecodeError> {
819 if data.len() < 4 {
820 return Err(DecodeError::TooShort);
821 }
822
823 let len = u32::from_be_bytes([data[0], data[1], data[2], data[3]]) as usize;
824
825 if len > MAX_FRAME_SIZE as usize {
826 return Err(DecodeError::TooLarge(len));
827 }
828
829 if data.len() < 4 + len {
830 return Err(DecodeError::Incomplete {
831 expected: len,
832 got: data.len() - 4,
833 });
834 }
835
836 serde_json::from_slice(&data[4..4 + len]).map_err(DecodeError::Json)
837}
838
839/// Error decoding a wire message.
840#[derive(Debug)]
841pub enum DecodeError {
842 /// Data too short to contain length header.
843 TooShort,
844 /// Frame size exceeds maximum.
845 TooLarge(usize),
846 /// Incomplete frame.
847 Incomplete {
848 /// Expected length.
849 expected: usize,
850 /// Actual length.
851 got: usize,
852 },
853 /// JSON parse error.
854 Json(serde_json::Error),
855}
856
857impl std::fmt::Display for DecodeError {
858 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
859 match self {
860 DecodeError::TooShort => write!(f, "data too short for length header"),
861 DecodeError::TooLarge(size) => write!(f, "frame too large: {} bytes", size),
862 DecodeError::Incomplete { expected, got } => {
863 write!(
864 f,
865 "incomplete frame: expected {} bytes, got {}",
866 expected, got
867 )
868 }
869 DecodeError::Json(e) => write!(f, "JSON decode error: {}", e),
870 }
871 }
872}
873
874impl std::error::Error for DecodeError {}
875
876#[cfg(test)]
877mod tests {
878 use super::*;
879
880 #[test]
881 fn test_encode_decode_roundtrip() {
882 let req = AgentRequest::Pull {
883 image: "alpine:latest".to_string(),
884 oci_platform: Some("linux/arm64".to_string()),
885 auth: None,
886 };
887
888 let encoded = encode_message(&req).unwrap();
889 let decoded: AgentRequest = decode_message(&encoded).unwrap();
890
891 let AgentRequest::Pull {
892 image,
893 oci_platform,
894 auth,
895 } = decoded
896 else {
897 panic!("expected Pull variant, got {:?}", decoded);
898 };
899 assert_eq!(image, "alpine:latest");
900 assert_eq!(oci_platform, Some("linux/arm64".to_string()));
901 assert!(auth.is_none());
902 }
903
904 #[test]
905 fn test_encode_decode_with_auth() {
906 let req = AgentRequest::Pull {
907 image: "ghcr.io/owner/repo:latest".to_string(),
908 oci_platform: None,
909 auth: Some(RegistryAuth {
910 username: "testuser".to_string(),
911 password: "testpass".to_string(),
912 }),
913 };
914
915 let encoded = encode_message(&req).unwrap();
916 let decoded: AgentRequest = decode_message(&encoded).unwrap();
917
918 let AgentRequest::Pull {
919 image,
920 oci_platform,
921 auth,
922 } = decoded
923 else {
924 panic!("expected Pull variant, got {:?}", decoded);
925 };
926 assert_eq!(image, "ghcr.io/owner/repo:latest");
927 assert!(oci_platform.is_none());
928 let auth = auth.expect("auth should be Some");
929 assert_eq!(auth.username, "testuser");
930 assert_eq!(auth.password, "testpass");
931 }
932
933 #[test]
934 fn test_decode_too_short() {
935 let data = [0u8; 2];
936 let result: Result<AgentRequest, _> = decode_message(&data);
937 assert!(matches!(result, Err(DecodeError::TooShort)));
938 }
939
940 #[test]
941 fn test_decode_incomplete() {
942 let mut data = vec![0, 0, 0, 100]; // claims 100 bytes
943 data.extend_from_slice(b"{}"); // only 2 bytes of payload
944 let result: Result<AgentRequest, _> = decode_message(&data);
945 assert!(matches!(result, Err(DecodeError::Incomplete { .. })));
946 }
947
948 #[test]
949 fn test_agent_request_serialization() {
950 let req = AgentRequest::Ping;
951 let json = serde_json::to_string(&req).unwrap();
952 assert!(json.contains("ping"));
953
954 let req = AgentRequest::PrepareOverlay {
955 image: "ubuntu:22.04".to_string(),
956 workload_id: "wl-123".to_string(),
957 };
958 let json = serde_json::to_string(&req).unwrap();
959 assert!(json.contains("prepare_overlay"));
960 }
961
962 #[test]
963 fn test_agent_response_serialization() {
964 let resp = AgentResponse::Pong {
965 version: PROTOCOL_VERSION,
966 };
967 let json = serde_json::to_string(&resp).unwrap();
968 assert!(json.contains("pong"));
969
970 let resp = AgentResponse::Progress {
971 message: "Pulling layer 1/3".to_string(),
972 percent: Some(33),
973 layer: Some("sha256:abc123".to_string()),
974 };
975 let json = serde_json::to_string(&resp).unwrap();
976 assert!(json.contains("progress"));
977 }
978
979 #[test]
980 fn file_write_begin_roundtrips() {
981 let req = AgentRequest::FileWriteBegin {
982 path: "/tmp/target".into(),
983 mode: Some(0o600),
984 total_size: 123_456_789,
985 };
986 let bytes = encode_message(&req).unwrap();
987 let back: AgentRequest = decode_message(&bytes).unwrap();
988 match back {
989 AgentRequest::FileWriteBegin {
990 path,
991 mode,
992 total_size,
993 } => {
994 assert_eq!(path, "/tmp/target");
995 assert_eq!(mode, Some(0o600));
996 assert_eq!(total_size, 123_456_789);
997 }
998 _ => panic!("wrong variant"),
999 }
1000 }
1001
1002 #[test]
1003 fn file_write_chunk_roundtrips_binary_data() {
1004 // Binary data (bytes outside UTF-8) must survive the base64
1005 // trip intact. If the encoding ever silently lossifies, this
1006 // fires.
1007 let payload: Vec<u8> = (0u8..=255).collect();
1008 let req = AgentRequest::FileWriteChunk {
1009 data: payload.clone(),
1010 done: true,
1011 };
1012 let bytes = encode_message(&req).unwrap();
1013 let back: AgentRequest = decode_message(&bytes).unwrap();
1014 match back {
1015 AgentRequest::FileWriteChunk { data, done } => {
1016 assert_eq!(data, payload);
1017 assert!(done);
1018 }
1019 _ => panic!("wrong variant"),
1020 }
1021 }
1022
1023 #[test]
1024 fn file_write_size_constants_are_frame_safe() {
1025 // Sanity: a single streaming chunk at FILE_WRITE_CHUNK_SIZE
1026 // must fit inside MAX_FRAME_SIZE after base64 (+ ~33%) and
1027 // JSON overhead. If anyone bumps CHUNK_SIZE past the limit,
1028 // this test fires before production does.
1029 let chunk_bytes = FILE_WRITE_CHUNK_SIZE as u64;
1030 let base64_bytes = chunk_bytes.div_ceil(3) * 4; // ceil(n/3)*4
1031 let json_overhead = 256u64; // method tag, done bool, quotes
1032 let total = base64_bytes + json_overhead;
1033 assert!(
1034 total < MAX_FRAME_SIZE as u64,
1035 "FILE_WRITE_CHUNK_SIZE of {} bytes would produce a frame \
1036 of ~{} bytes which exceeds MAX_FRAME_SIZE of {}",
1037 chunk_bytes,
1038 total,
1039 MAX_FRAME_SIZE
1040 );
1041
1042 // Single-shot threshold must be <= chunk size. They can be
1043 // equal (a 1 MiB file is a single shot; a 1 MiB + 1 byte
1044 // file streams as two chunks); but SINGLE_SHOT > CHUNK would
1045 // be incoherent — a file slightly over the shot threshold
1046 // would need to stream as... a single oversized chunk.
1047 assert!(FILE_WRITE_SINGLE_SHOT_MAX <= FILE_WRITE_CHUNK_SIZE);
1048 }
1049
1050 #[test]
1051 fn test_ports_constants() {
1052 assert_eq!(ports::WORKLOAD_CONTROL, 5000);
1053 assert_eq!(ports::WORKLOAD_LOGS, 5001);
1054 assert_eq!(ports::AGENT_CONTROL, 6000);
1055 assert_eq!(ports::SSH_AGENT, 6001);
1056 }
1057
1058 #[test]
1059 fn test_cid_constants() {
1060 assert_eq!(cid::HOST, 2);
1061 assert_eq!(cid::GUEST, 3);
1062 }
1063
1064 #[test]
1065 fn test_envelope_serialization_with_trace_id() {
1066 let req = AgentRequest::Ping;
1067 let envelope = Envelope::with_trace_id(&req, Some("abc123".to_string()));
1068 let json = serde_json::to_string(&envelope).unwrap();
1069
1070 // trace_id should be flattened alongside the method tag
1071 assert!(json.contains("\"trace_id\":\"abc123\""));
1072 assert!(json.contains("\"method\":\"ping\""));
1073
1074 // Deserialize back — Envelope<AgentRequest> with flatten
1075 let parsed: Envelope<AgentRequest> = serde_json::from_str(&json).unwrap();
1076 assert_eq!(parsed.trace_id.as_deref(), Some("abc123"));
1077 assert!(matches!(parsed.body, AgentRequest::Ping));
1078 }
1079
1080 #[test]
1081 fn test_envelope_without_trace_id() {
1082 let req = AgentRequest::Ping;
1083 let envelope = Envelope::new(&req);
1084 let json = serde_json::to_string(&envelope).unwrap();
1085
1086 // No trace_id field (skip_serializing_if = None)
1087 assert!(!json.contains("trace_id"));
1088 assert!(json.contains("\"method\":\"ping\""));
1089 }
1090
1091 #[test]
1092 fn test_envelope_backward_compat_bare_request() {
1093 // A bare AgentRequest (no Envelope) should fail to parse as Envelope
1094 // but succeed as bare AgentRequest — this is the agent's fallback path
1095 let bare_json = r#"{"method":"ping"}"#;
1096
1097 // Envelope parse should fail (no body field to flatten into)
1098 // Actually with flatten, this may work — let's verify
1099 let envelope_result = serde_json::from_str::<Envelope<AgentRequest>>(bare_json);
1100 let bare_result = serde_json::from_str::<AgentRequest>(bare_json);
1101
1102 // At least one must succeed for backward compat
1103 assert!(
1104 envelope_result.is_ok() || bare_result.is_ok(),
1105 "Neither Envelope nor bare parse succeeded"
1106 );
1107
1108 // Bare parse must always work
1109 assert!(bare_result.is_ok());
1110 assert!(matches!(bare_result.unwrap(), AgentRequest::Ping));
1111
1112 // If Envelope works, trace_id should be None
1113 if let Ok(env) = envelope_result {
1114 assert!(env.trace_id.is_none());
1115 }
1116 }
1117}