use std::io::{BufRead, BufReader, Read, Write};
use std::os::unix::net::UnixStream;
use std::time::Duration;
use anyhow::{Context, Result, bail};
use serde::{Deserialize, Serialize};
pub const GUEST_CID: u32 = 3;
pub const GUEST_AGENT_PORT: u32 = 52;
pub const DEFAULT_TIMEOUT_SECS: u64 = 10;
const MAX_FRAME_SIZE: usize = 256 * 1024;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum GuestRequest {
WorkerStatus,
SleepPrep { drain_timeout_secs: u64 },
Wake,
Ping,
IntegrationStatus,
CheckpointIntegrations { integrations: Vec<String> },
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum GuestResponse {
WorkerStatus {
status: String,
last_busy_at: Option<String>,
},
SleepPrepAck {
success: bool,
detail: Option<String>,
},
WakeAck { success: bool },
Pong,
Error { message: String },
IntegrationStatusReport {
integrations: Vec<crate::integrations::IntegrationStateReport>,
},
CheckpointResult {
success: bool,
failed: Vec<String>,
detail: Option<String>,
},
}
pub const HOST_BOUND_PORT: u32 = 53;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum HostBoundRequest {
WakeInstance {
tenant_id: String,
pool_id: String,
instance_id: String,
},
QueryInstanceStatus {
tenant_id: String,
pool_id: String,
instance_id: String,
},
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum HostBoundResponse {
WakeResult {
success: bool,
detail: Option<String>,
},
InstanceStatus {
status: String,
guest_ip: Option<String>,
},
Error { message: String },
}
pub fn read_frame<T: serde::de::DeserializeOwned>(stream: &mut UnixStream) -> Result<T> {
let mut len_buf = [0u8; 4];
stream
.read_exact(&mut len_buf)
.with_context(|| "Failed to read frame length")?;
let frame_len = u32::from_be_bytes(len_buf) as usize;
if frame_len > MAX_FRAME_SIZE {
bail!(
"Frame too large: {} bytes (max {})",
frame_len,
MAX_FRAME_SIZE
);
}
let mut buf = vec![0u8; frame_len];
stream
.read_exact(&mut buf)
.with_context(|| "Failed to read frame body")?;
serde_json::from_slice(&buf).with_context(|| "Failed to deserialize frame")
}
pub fn write_frame<T: Serialize>(stream: &mut UnixStream, value: &T) -> Result<()> {
let data = serde_json::to_vec(value).with_context(|| "Failed to serialize frame")?;
let len = (data.len() as u32).to_be_bytes();
stream
.write_all(&len)
.with_context(|| "Failed to write frame length")?;
stream
.write_all(&data)
.with_context(|| "Failed to write frame body")?;
stream.flush()?;
Ok(())
}
pub fn vsock_uds_path(instance_dir: &str) -> String {
format!("{}/runtime/v.sock", instance_dir)
}
fn connect_to(uds_path: &str, timeout_secs: u64) -> Result<UnixStream> {
let timeout = Duration::from_secs(timeout_secs);
let stream = UnixStream::connect(uds_path)
.with_context(|| format!("Failed to connect to vsock UDS at {}", uds_path))?;
stream.set_read_timeout(Some(timeout))?;
stream.set_write_timeout(Some(timeout))?;
let mut stream = stream;
writeln!(stream, "CONNECT {}", GUEST_AGENT_PORT).with_context(|| "Failed to send CONNECT")?;
stream.flush()?;
let mut reader = BufReader::new(&stream);
let mut response_line = String::new();
reader
.read_line(&mut response_line)
.with_context(|| "Failed to read CONNECT response")?;
if !response_line.starts_with("OK ") {
bail!(
"Vsock CONNECT failed: expected 'OK {}', got '{}'",
GUEST_AGENT_PORT,
response_line.trim()
);
}
Ok(stream)
}
fn connect(instance_dir: &str, timeout_secs: u64) -> Result<UnixStream> {
connect_to(&vsock_uds_path(instance_dir), timeout_secs)
}
fn send_request(stream: &mut UnixStream, req: &GuestRequest) -> Result<GuestResponse> {
let data = serde_json::to_vec(req).with_context(|| "Failed to serialize request")?;
let len = (data.len() as u32).to_be_bytes();
stream
.write_all(&len)
.with_context(|| "Failed to write frame length")?;
stream
.write_all(&data)
.with_context(|| "Failed to write frame body")?;
stream.flush()?;
let mut len_buf = [0u8; 4];
stream
.read_exact(&mut len_buf)
.with_context(|| "Failed to read response length")?;
let resp_len = u32::from_be_bytes(len_buf) as usize;
if resp_len > MAX_FRAME_SIZE {
bail!(
"Response frame too large: {} bytes (max {})",
resp_len,
MAX_FRAME_SIZE
);
}
let mut buf = vec![0u8; resp_len];
stream
.read_exact(&mut buf)
.with_context(|| "Failed to read response body")?;
serde_json::from_slice(&buf).with_context(|| "Failed to deserialize response")
}
pub fn query_worker_status(instance_dir: &str) -> Result<GuestResponse> {
let mut stream = connect(instance_dir, DEFAULT_TIMEOUT_SECS)?;
send_request(&mut stream, &GuestRequest::WorkerStatus)
}
pub fn request_sleep_prep(instance_dir: &str, drain_timeout_secs: u64) -> Result<bool> {
let mut stream = connect(instance_dir, drain_timeout_secs)?;
let resp = send_request(&mut stream, &GuestRequest::SleepPrep { drain_timeout_secs })?;
match resp {
GuestResponse::SleepPrepAck { success, .. } => Ok(success),
GuestResponse::Error { message } => {
bail!("Guest sleep prep error: {}", message);
}
_ => bail!("Unexpected response to SleepPrep"),
}
}
pub fn signal_wake(instance_dir: &str) -> Result<bool> {
let mut stream = connect(instance_dir, DEFAULT_TIMEOUT_SECS)?;
let resp = send_request(&mut stream, &GuestRequest::Wake)?;
match resp {
GuestResponse::WakeAck { success } => Ok(success),
GuestResponse::Error { message } => {
bail!("Guest wake error: {}", message);
}
_ => bail!("Unexpected response to Wake"),
}
}
pub fn ping(instance_dir: &str) -> Result<bool> {
let mut stream = connect(instance_dir, DEFAULT_TIMEOUT_SECS)?;
let resp = send_request(&mut stream, &GuestRequest::Ping)?;
Ok(matches!(resp, GuestResponse::Pong))
}
pub fn query_integration_status(
instance_dir: &str,
) -> Result<Vec<crate::integrations::IntegrationStateReport>> {
let mut stream = connect(instance_dir, DEFAULT_TIMEOUT_SECS)?;
let resp = send_request(&mut stream, &GuestRequest::IntegrationStatus)?;
match resp {
GuestResponse::IntegrationStatusReport { integrations } => Ok(integrations),
GuestResponse::Error { message } => {
bail!("Guest integration status error: {}", message);
}
_ => bail!("Unexpected response to IntegrationStatus"),
}
}
pub fn checkpoint_integrations(
instance_dir: &str,
integrations: Vec<String>,
timeout_secs: u64,
) -> Result<bool> {
let mut stream = connect(instance_dir, timeout_secs)?;
let resp = send_request(
&mut stream,
&GuestRequest::CheckpointIntegrations { integrations },
)?;
match resp {
GuestResponse::CheckpointResult { success, .. } => Ok(success),
GuestResponse::Error { message } => {
bail!("Guest checkpoint error: {}", message);
}
_ => bail!("Unexpected response to CheckpointIntegrations"),
}
}
pub fn ping_at(vsock_uds_path: &str) -> Result<bool> {
let mut stream = connect_to(vsock_uds_path, DEFAULT_TIMEOUT_SECS)?;
let resp = send_request(&mut stream, &GuestRequest::Ping)?;
Ok(matches!(resp, GuestResponse::Pong))
}
pub fn query_worker_status_at(vsock_uds_path: &str) -> Result<GuestResponse> {
let mut stream = connect_to(vsock_uds_path, DEFAULT_TIMEOUT_SECS)?;
send_request(&mut stream, &GuestRequest::WorkerStatus)
}
pub fn query_integration_status_at(
vsock_uds_path: &str,
) -> Result<Vec<crate::integrations::IntegrationStateReport>> {
let mut stream = connect_to(vsock_uds_path, DEFAULT_TIMEOUT_SECS)?;
let resp = send_request(&mut stream, &GuestRequest::IntegrationStatus)?;
match resp {
GuestResponse::IntegrationStatusReport { integrations } => Ok(integrations),
GuestResponse::Error { message } => {
bail!("Guest integration status error: {}", message);
}
_ => bail!("Unexpected response to IntegrationStatus"),
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_guest_request_roundtrip() {
let variants: Vec<GuestRequest> = vec![
GuestRequest::WorkerStatus,
GuestRequest::SleepPrep {
drain_timeout_secs: 30,
},
GuestRequest::Wake,
GuestRequest::Ping,
GuestRequest::IntegrationStatus,
GuestRequest::CheckpointIntegrations {
integrations: vec!["whatsapp".to_string(), "telegram".to_string()],
},
];
for req in &variants {
let json = serde_json::to_string(req).unwrap();
let parsed: GuestRequest = serde_json::from_str(&json).unwrap();
let json2 = serde_json::to_string(&parsed).unwrap();
assert_eq!(json, json2);
}
}
#[test]
fn test_guest_response_roundtrip() {
use crate::integrations::{IntegrationStateReport, IntegrationStatus};
let variants: Vec<GuestResponse> = vec![
GuestResponse::WorkerStatus {
status: "idle".to_string(),
last_busy_at: Some("2025-01-01T00:00:00Z".to_string()),
},
GuestResponse::SleepPrepAck {
success: true,
detail: Some("flushed".to_string()),
},
GuestResponse::WakeAck { success: true },
GuestResponse::Pong,
GuestResponse::Error {
message: "oops".to_string(),
},
GuestResponse::IntegrationStatusReport {
integrations: vec![IntegrationStateReport {
name: "whatsapp".to_string(),
status: IntegrationStatus::Active,
last_checkpoint_at: Some("2025-06-01T12:00:00Z".to_string()),
state_size_bytes: 8192,
health: None,
}],
},
GuestResponse::CheckpointResult {
success: true,
failed: vec![],
detail: Some("all checkpointed".to_string()),
},
];
for resp in &variants {
let json = serde_json::to_string(resp).unwrap();
let parsed: GuestResponse = serde_json::from_str(&json).unwrap();
let json2 = serde_json::to_string(&parsed).unwrap();
assert_eq!(json, json2);
}
}
#[test]
fn test_vsock_uds_path() {
assert_eq!(
vsock_uds_path("/var/lib/mvm/tenants/acme/pools/workers/instances/i-abc"),
"/var/lib/mvm/tenants/acme/pools/workers/instances/i-abc/runtime/v.sock"
);
}
#[test]
fn test_guest_request_sleep_prep_fields() {
let req = GuestRequest::SleepPrep {
drain_timeout_secs: 45,
};
let json = serde_json::to_string(&req).unwrap();
assert!(json.contains("45"));
assert!(json.contains("SleepPrep"));
}
#[test]
fn test_guest_response_worker_status_fields() {
let resp = GuestResponse::WorkerStatus {
status: "busy".to_string(),
last_busy_at: None,
};
let json = serde_json::to_string(&resp).unwrap();
assert!(json.contains("\"status\":\"busy\""));
}
#[test]
fn test_constants() {
assert_eq!(GUEST_CID, 3);
assert_eq!(GUEST_AGENT_PORT, 52);
assert_eq!(DEFAULT_TIMEOUT_SECS, 10);
}
#[test]
fn test_max_frame_size() {
assert_eq!(MAX_FRAME_SIZE, 256 * 1024);
}
#[test]
fn test_checkpoint_request_serde() {
let req = GuestRequest::CheckpointIntegrations {
integrations: vec!["whatsapp".to_string(), "signal".to_string()],
};
let json = serde_json::to_string(&req).unwrap();
assert!(json.contains("CheckpointIntegrations"));
assert!(json.contains("whatsapp"));
assert!(json.contains("signal"));
let parsed: GuestRequest = serde_json::from_str(&json).unwrap();
let json2 = serde_json::to_string(&parsed).unwrap();
assert_eq!(json, json2);
}
#[test]
fn test_host_bound_request_roundtrip() {
let variants: Vec<HostBoundRequest> = vec![
HostBoundRequest::WakeInstance {
tenant_id: "alice".to_string(),
pool_id: "workers".to_string(),
instance_id: "i-abc123".to_string(),
},
HostBoundRequest::QueryInstanceStatus {
tenant_id: "alice".to_string(),
pool_id: "workers".to_string(),
instance_id: "i-abc123".to_string(),
},
];
for req in &variants {
let json = serde_json::to_string(req).unwrap();
let parsed: HostBoundRequest = serde_json::from_str(&json).unwrap();
let json2 = serde_json::to_string(&parsed).unwrap();
assert_eq!(json, json2);
}
}
#[test]
fn test_host_bound_response_roundtrip() {
let variants: Vec<HostBoundResponse> = vec![
HostBoundResponse::WakeResult {
success: true,
detail: Some("woke i-abc123".to_string()),
},
HostBoundResponse::InstanceStatus {
status: "Running".to_string(),
guest_ip: Some("10.240.1.5".to_string()),
},
HostBoundResponse::Error {
message: "instance not found".to_string(),
},
];
for resp in &variants {
let json = serde_json::to_string(resp).unwrap();
let parsed: HostBoundResponse = serde_json::from_str(&json).unwrap();
let json2 = serde_json::to_string(&parsed).unwrap();
assert_eq!(json, json2);
}
}
#[test]
fn test_ping_at_nonexistent_path() {
let result = ping_at("/nonexistent/v.sock");
assert!(result.is_err());
}
#[test]
fn test_query_worker_status_at_nonexistent_path() {
let result = query_worker_status_at("/nonexistent/v.sock");
assert!(result.is_err());
}
#[test]
fn test_query_integration_status_at_nonexistent_path() {
let result = query_integration_status_at("/nonexistent/v.sock");
assert!(result.is_err());
}
#[test]
fn test_host_bound_port_constant() {
assert_eq!(HOST_BOUND_PORT, 53);
}
#[test]
fn test_checkpoint_result_failure() {
let resp = GuestResponse::CheckpointResult {
success: false,
failed: vec!["whatsapp".to_string()],
detail: Some("session locked".to_string()),
};
let json = serde_json::to_string(&resp).unwrap();
let parsed: GuestResponse = serde_json::from_str(&json).unwrap();
match parsed {
GuestResponse::CheckpointResult {
success, failed, ..
} => {
assert!(!success);
assert_eq!(failed, vec!["whatsapp"]);
}
_ => panic!("wrong variant"),
}
}
}