use std::error::Error;
use std::fmt;
pub use crate::generated_protocol::v1::*;
impl Copy for crate::generated_protocol::v1::GuestFilesystemOperation {}
impl Copy for crate::generated_protocol::v1::RootFilesystemMode {}
impl Copy for crate::generated_protocol::v1::WasmPermissionTier {}
#[allow(clippy::derivable_impls)]
impl Default for crate::generated_protocol::v1::RootFilesystemEntryKind {
fn default() -> Self {
Self::File
}
}
impl Default for crate::generated_protocol::v1::RootFilesystemEntry {
fn default() -> Self {
Self {
path: String::new(),
kind: crate::generated_protocol::v1::RootFilesystemEntryKind::File,
mode: None,
uid: None,
gid: None,
content: None,
encoding: None,
target: None,
executable: false,
}
}
}
#[allow(clippy::derivable_impls)]
impl Default for crate::generated_protocol::v1::RootFilesystemMode {
fn default() -> Self {
Self::Ephemeral
}
}
#[allow(clippy::derivable_impls)]
impl Default for crate::generated_protocol::v1::RootFilesystemDescriptor {
fn default() -> Self {
Self {
mode: crate::generated_protocol::v1::RootFilesystemMode::default(),
disable_default_base_layer: false,
lowers: Vec::new(),
bootstrap_entries: Vec::new(),
}
}
}
impl crate::generated_protocol::v1::PermissionsPolicy {
pub fn deny_all() -> Self {
use crate::generated_protocol::v1::{
FsPermissionScope, PatternPermissionScope, PermissionMode,
};
Self {
fs: Some(FsPermissionScope::PermissionMode(PermissionMode::Deny)),
network: Some(PatternPermissionScope::PermissionMode(PermissionMode::Deny)),
child_process: Some(PatternPermissionScope::PermissionMode(PermissionMode::Deny)),
process: Some(PatternPermissionScope::PermissionMode(PermissionMode::Deny)),
env: Some(PatternPermissionScope::PermissionMode(PermissionMode::Deny)),
binding: Some(PatternPermissionScope::PermissionMode(PermissionMode::Deny)),
}
}
pub fn allow_all() -> Self {
use crate::generated_protocol::v1::{
FsPermissionScope, PatternPermissionScope, PermissionMode,
};
Self {
fs: Some(FsPermissionScope::PermissionMode(PermissionMode::Allow)),
network: Some(PatternPermissionScope::PermissionMode(
PermissionMode::Allow,
)),
child_process: Some(PatternPermissionScope::PermissionMode(
PermissionMode::Allow,
)),
process: Some(PatternPermissionScope::PermissionMode(
PermissionMode::Allow,
)),
env: Some(PatternPermissionScope::PermissionMode(
PermissionMode::Allow,
)),
binding: Some(PatternPermissionScope::PermissionMode(
PermissionMode::Allow,
)),
}
}
}
impl Default for crate::generated_protocol::v1::PermissionsPolicy {
fn default() -> Self {
Self::allow_all()
}
}
impl crate::generated_protocol::v1::CreateVmRequest {
pub fn json_config(
runtime: crate::generated_protocol::v1::GuestRuntimeKind,
config: secure_exec_vm_config::CreateVmConfig,
) -> Self {
Self {
runtime,
config: serde_json::to_string(&config).expect("serialize create VM config"),
}
}
pub fn legacy_test_config(
runtime: crate::generated_protocol::v1::GuestRuntimeKind,
metadata: std::collections::HashMap<String, String>,
root_filesystem: crate::generated_protocol::v1::RootFilesystemDescriptor,
permissions: Option<crate::generated_protocol::v1::PermissionsPolicy>,
) -> Self {
let metadata: std::collections::BTreeMap<_, _> = metadata.into_iter().collect();
let mut config = secure_exec_vm_config::CreateVmConfig {
cwd: metadata.get("cwd").cloned(),
env: legacy_env_config(&metadata),
root_filesystem: legacy_root_filesystem_config(root_filesystem),
permissions: permissions.map(legacy_permissions_config),
limits: legacy_limits_config(&metadata),
dns: legacy_dns_config(&metadata),
native_root: legacy_native_root_config(&metadata),
listen: legacy_listen_config(&metadata),
..Default::default()
};
config.loopback_exempt_ports = legacy_loopback_exempt_ports(&config.env);
Self::json_config(runtime, config)
}
}
fn legacy_env_config(
metadata: &std::collections::BTreeMap<String, String>,
) -> std::collections::BTreeMap<String, String> {
metadata
.iter()
.filter_map(|(key, value)| {
key.strip_prefix("env.")
.map(|name| (name.to_string(), value.clone()))
})
.collect()
}
fn legacy_root_filesystem_config(
descriptor: crate::generated_protocol::v1::RootFilesystemDescriptor,
) -> secure_exec_vm_config::RootFilesystemConfig {
secure_exec_vm_config::RootFilesystemConfig {
mode: match descriptor.mode {
crate::generated_protocol::v1::RootFilesystemMode::Ephemeral => {
secure_exec_vm_config::RootFilesystemMode::Ephemeral
}
crate::generated_protocol::v1::RootFilesystemMode::ReadOnly => {
secure_exec_vm_config::RootFilesystemMode::ReadOnly
}
},
disable_default_base_layer: descriptor.disable_default_base_layer,
lowers: descriptor
.lowers
.into_iter()
.map(legacy_root_lower_config)
.collect(),
bootstrap_entries: descriptor
.bootstrap_entries
.into_iter()
.map(legacy_root_entry_config)
.collect(),
}
}
fn legacy_root_lower_config(
lower: crate::generated_protocol::v1::RootFilesystemLowerDescriptor,
) -> secure_exec_vm_config::RootFilesystemLowerDescriptor {
match lower {
crate::generated_protocol::v1::RootFilesystemLowerDescriptor::SnapshotRootFilesystemLower(
snapshot,
) => secure_exec_vm_config::RootFilesystemLowerDescriptor::Snapshot {
entries: snapshot
.entries
.into_iter()
.map(legacy_root_entry_config)
.collect(),
},
crate::generated_protocol::v1::RootFilesystemLowerDescriptor::BundledBaseFilesystemLower => {
secure_exec_vm_config::RootFilesystemLowerDescriptor::BundledBaseFilesystem
}
}
}
fn legacy_root_entry_config(
entry: crate::generated_protocol::v1::RootFilesystemEntry,
) -> secure_exec_vm_config::RootFilesystemEntry {
secure_exec_vm_config::RootFilesystemEntry {
path: entry.path,
kind: match entry.kind {
crate::generated_protocol::v1::RootFilesystemEntryKind::File => {
secure_exec_vm_config::RootFilesystemEntryKind::File
}
crate::generated_protocol::v1::RootFilesystemEntryKind::Directory => {
secure_exec_vm_config::RootFilesystemEntryKind::Directory
}
crate::generated_protocol::v1::RootFilesystemEntryKind::Symlink => {
secure_exec_vm_config::RootFilesystemEntryKind::Symlink
}
},
mode: entry.mode,
uid: entry.uid,
gid: entry.gid,
content: entry.content,
encoding: entry.encoding.map(|encoding| match encoding {
crate::generated_protocol::v1::RootFilesystemEntryEncoding::Utf8 => {
secure_exec_vm_config::RootFilesystemEntryEncoding::Utf8
}
crate::generated_protocol::v1::RootFilesystemEntryEncoding::Base64 => {
secure_exec_vm_config::RootFilesystemEntryEncoding::Base64
}
}),
target: entry.target,
executable: entry.executable,
}
}
fn legacy_permissions_config(
permissions: crate::generated_protocol::v1::PermissionsPolicy,
) -> secure_exec_vm_config::PermissionsPolicy {
secure_exec_vm_config::PermissionsPolicy {
fs: permissions.fs.map(legacy_fs_permission_scope_config),
network: permissions
.network
.map(legacy_pattern_permission_scope_config),
child_process: permissions
.child_process
.map(legacy_pattern_permission_scope_config),
process: permissions
.process
.map(legacy_pattern_permission_scope_config),
env: permissions.env.map(legacy_pattern_permission_scope_config),
binding: permissions
.binding
.map(legacy_pattern_permission_scope_config),
}
}
fn legacy_permission_mode_config(
mode: crate::generated_protocol::v1::PermissionMode,
) -> secure_exec_vm_config::PermissionMode {
match mode {
crate::generated_protocol::v1::PermissionMode::Allow => {
secure_exec_vm_config::PermissionMode::Allow
}
crate::generated_protocol::v1::PermissionMode::Ask => {
secure_exec_vm_config::PermissionMode::Ask
}
crate::generated_protocol::v1::PermissionMode::Deny => {
secure_exec_vm_config::PermissionMode::Deny
}
}
}
fn legacy_fs_permission_scope_config(
scope: crate::generated_protocol::v1::FsPermissionScope,
) -> secure_exec_vm_config::FsPermissionScope {
match scope {
crate::generated_protocol::v1::FsPermissionScope::PermissionMode(mode) => {
secure_exec_vm_config::FsPermissionScope::Mode(legacy_permission_mode_config(mode))
}
crate::generated_protocol::v1::FsPermissionScope::FsPermissionRuleSet(rules) => {
secure_exec_vm_config::FsPermissionScope::Rules(
secure_exec_vm_config::FsPermissionRuleSet {
default: rules.default.map(legacy_permission_mode_config),
rules: rules
.rules
.into_iter()
.map(|rule| secure_exec_vm_config::FsPermissionRule {
mode: legacy_permission_mode_config(rule.mode),
operations: rule.operations,
paths: rule.paths,
})
.collect(),
},
)
}
}
}
fn legacy_pattern_permission_scope_config(
scope: crate::generated_protocol::v1::PatternPermissionScope,
) -> secure_exec_vm_config::PatternPermissionScope {
match scope {
crate::generated_protocol::v1::PatternPermissionScope::PermissionMode(mode) => {
secure_exec_vm_config::PatternPermissionScope::Mode(legacy_permission_mode_config(mode))
}
crate::generated_protocol::v1::PatternPermissionScope::PatternPermissionRuleSet(rules) => {
secure_exec_vm_config::PatternPermissionScope::Rules(
secure_exec_vm_config::PatternPermissionRuleSet {
default: rules.default.map(legacy_permission_mode_config),
rules: rules
.rules
.into_iter()
.map(|rule| secure_exec_vm_config::PatternPermissionRule {
mode: legacy_permission_mode_config(rule.mode),
operations: rule.operations,
patterns: rule.patterns,
})
.collect(),
},
)
}
}
}
fn legacy_dns_config(
metadata: &std::collections::BTreeMap<String, String>,
) -> Option<secure_exec_vm_config::VmDnsConfig> {
let mut dns = secure_exec_vm_config::VmDnsConfig::default();
if let Some(value) = metadata.get("network.dns.servers") {
dns.name_servers = value
.split(',')
.map(str::trim)
.filter(|entry| !entry.is_empty())
.map(str::to_string)
.collect();
}
for (key, value) in metadata {
let Some(hostname) = key.strip_prefix("network.dns.override.") else {
continue;
};
dns.overrides.insert(
hostname.to_string(),
value
.split(',')
.map(str::trim)
.filter(|entry| !entry.is_empty())
.map(str::to_string)
.collect(),
);
}
if dns.name_servers.is_empty() && dns.overrides.is_empty() {
None
} else {
Some(dns)
}
}
fn legacy_native_root_config(
metadata: &std::collections::BTreeMap<String, String>,
) -> Option<secure_exec_vm_config::NativeRootFilesystemConfig> {
let id = metadata.get("rootFilesystem.nativePlugin.id")?;
let config = metadata
.get("rootFilesystem.nativePlugin.config")
.map(|value| serde_json::from_str(value).expect("parse native root plugin config"))
.unwrap_or_else(|| serde_json::Value::Object(serde_json::Map::new()));
let read_only = metadata
.get("rootFilesystem.nativePlugin.readOnly")
.map(|value| value.parse::<bool>().expect("parse native root readOnly"))
.unwrap_or(false);
Some(secure_exec_vm_config::NativeRootFilesystemConfig {
plugin: secure_exec_vm_config::MountPluginDescriptor {
id: id.clone(),
config,
},
read_only,
})
}
fn legacy_listen_config(
metadata: &std::collections::BTreeMap<String, String>,
) -> Option<secure_exec_vm_config::VmListenPolicyConfig> {
let listen = secure_exec_vm_config::VmListenPolicyConfig {
port_min: metadata
.get("network.listen.port_min")
.map(|value| value.parse::<u16>().expect("parse network.listen.port_min")),
port_max: metadata
.get("network.listen.port_max")
.map(|value| value.parse::<u16>().expect("parse network.listen.port_max")),
allow_privileged: metadata
.get("network.listen.allow_privileged")
.map(|value| {
value
.parse::<bool>()
.expect("parse network.listen.allow_privileged")
}),
};
if listen.port_min.is_none() && listen.port_max.is_none() && listen.allow_privileged.is_none() {
None
} else {
Some(listen)
}
}
fn legacy_loopback_exempt_ports(env: &std::collections::BTreeMap<String, String>) -> Vec<u16> {
let Some(value) = env.get("AGENTOS_LOOPBACK_EXEMPT_PORTS") else {
return Vec::new();
};
serde_json::from_str::<Vec<serde_json::Value>>(value)
.unwrap_or_default()
.into_iter()
.filter_map(|value| match value {
serde_json::Value::Number(number) => number.as_u64(),
serde_json::Value::String(value) => value.parse::<u64>().ok(),
_ => None,
})
.filter_map(|port| u16::try_from(port).ok())
.collect()
}
fn legacy_limits_config(
metadata: &std::collections::BTreeMap<String, String>,
) -> Option<secure_exec_vm_config::VmLimitsConfig> {
let resources = secure_exec_vm_config::ResourceLimitsConfig {
cpu_count: legacy_u64(metadata, "resource.cpu_count"),
max_processes: legacy_u64(metadata, "resource.max_processes"),
max_open_fds: legacy_u64(metadata, "resource.max_open_fds"),
max_pipes: legacy_u64(metadata, "resource.max_pipes"),
max_ptys: legacy_u64(metadata, "resource.max_ptys"),
max_sockets: legacy_u64(metadata, "resource.max_sockets"),
max_connections: legacy_u64(metadata, "resource.max_connections"),
max_socket_buffered_bytes: legacy_u64(metadata, "resource.max_socket_buffered_bytes"),
max_socket_datagram_queue_len: legacy_u64(
metadata,
"resource.max_socket_datagram_queue_len",
),
max_filesystem_bytes: legacy_u64(metadata, "resource.max_filesystem_bytes"),
max_inode_count: legacy_u64(metadata, "resource.max_inode_count"),
max_blocking_read_ms: legacy_u64(metadata, "resource.max_blocking_read_ms"),
max_pread_bytes: legacy_u64(metadata, "resource.max_pread_bytes"),
max_fd_write_bytes: legacy_u64(metadata, "resource.max_fd_write_bytes"),
max_process_argv_bytes: legacy_u64(metadata, "resource.max_process_argv_bytes"),
max_process_env_bytes: legacy_u64(metadata, "resource.max_process_env_bytes"),
max_readdir_entries: legacy_u64(metadata, "resource.max_readdir_entries"),
max_wasm_fuel: legacy_u64(metadata, "resource.max_wasm_fuel"),
max_wasm_memory_bytes: legacy_u64(metadata, "resource.max_wasm_memory_bytes"),
max_wasm_stack_bytes: legacy_u64(metadata, "resource.max_wasm_stack_bytes"),
};
let http = secure_exec_vm_config::HttpLimitsConfig {
max_fetch_response_bytes: legacy_u64(metadata, "limits.http.max_fetch_response_bytes"),
};
let tools = secure_exec_vm_config::ToolLimitsConfig {
default_tool_timeout_ms: legacy_u64(metadata, "limits.tools.default_tool_timeout_ms"),
max_tool_timeout_ms: legacy_u64(metadata, "limits.tools.max_tool_timeout_ms"),
max_registered_toolkits: legacy_u64(metadata, "limits.tools.max_registered_toolkits"),
max_registered_tools_per_vm: legacy_u64(
metadata,
"limits.tools.max_registered_tools_per_vm",
),
max_tools_per_toolkit: legacy_u64(metadata, "limits.tools.max_tools_per_toolkit"),
max_tool_schema_bytes: legacy_u64(metadata, "limits.tools.max_tool_schema_bytes"),
max_tool_examples_per_tool: legacy_u64(metadata, "limits.tools.max_tool_examples_per_tool"),
max_tool_example_input_bytes: legacy_u64(
metadata,
"limits.tools.max_tool_example_input_bytes",
),
};
let plugins = secure_exec_vm_config::PluginLimitsConfig {
max_persisted_manifest_bytes: legacy_u64(
metadata,
"limits.plugins.max_persisted_manifest_bytes",
),
max_persisted_manifest_file_bytes: legacy_u64(
metadata,
"limits.plugins.max_persisted_manifest_file_bytes",
),
};
let acp = secure_exec_vm_config::AcpLimitsConfig {
max_read_line_bytes: legacy_u64(metadata, "limits.acp.max_read_line_bytes"),
stdout_buffer_byte_limit: legacy_u64(metadata, "limits.acp.stdout_buffer_byte_limit"),
};
let js_runtime = secure_exec_vm_config::JsRuntimeLimitsConfig {
v8_heap_limit_mb: legacy_u64(metadata, "limits.js_runtime.v8_heap_limit_mb"),
sync_rpc_wait_timeout_ms: legacy_u64(
metadata,
"limits.js_runtime.sync_rpc_wait_timeout_ms",
),
captured_output_limit_bytes: legacy_u64(
metadata,
"limits.js_runtime.captured_output_limit_bytes",
),
stdin_buffer_limit_bytes: legacy_u64(
metadata,
"limits.js_runtime.stdin_buffer_limit_bytes",
),
event_payload_limit_bytes: legacy_u64(
metadata,
"limits.js_runtime.event_payload_limit_bytes",
),
v8_ipc_max_frame_bytes: legacy_u64(metadata, "limits.js_runtime.v8_ipc_max_frame_bytes"),
};
let python = secure_exec_vm_config::PythonLimitsConfig {
output_buffer_max_bytes: legacy_u64(metadata, "limits.python.output_buffer_max_bytes"),
execution_timeout_ms: legacy_u64(metadata, "limits.python.execution_timeout_ms"),
max_old_space_mb: legacy_u64(metadata, "limits.python.max_old_space_mb"),
vfs_rpc_timeout_ms: legacy_u64(metadata, "limits.python.vfs_rpc_timeout_ms"),
};
let wasm = secure_exec_vm_config::WasmLimitsConfig {
max_module_file_bytes: legacy_u64(metadata, "limits.wasm.max_module_file_bytes"),
captured_output_limit_bytes: legacy_u64(
metadata,
"limits.wasm.captured_output_limit_bytes",
),
sync_read_limit_bytes: legacy_u64(metadata, "limits.wasm.sync_read_limit_bytes"),
};
let config = secure_exec_vm_config::VmLimitsConfig {
resources: legacy_has_resource_limits(&resources).then_some(resources),
http: http.max_fetch_response_bytes.is_some().then_some(http),
tools: legacy_has_tool_limits(&tools).then_some(tools),
plugins: legacy_has_plugin_limits(&plugins).then_some(plugins),
acp: legacy_has_acp_limits(&acp).then_some(acp),
js_runtime: legacy_has_js_runtime_limits(&js_runtime).then_some(js_runtime),
python: legacy_has_python_limits(&python).then_some(python),
wasm: legacy_has_wasm_limits(&wasm).then_some(wasm),
};
if config.resources.is_none()
&& config.http.is_none()
&& config.tools.is_none()
&& config.plugins.is_none()
&& config.acp.is_none()
&& config.js_runtime.is_none()
&& config.python.is_none()
&& config.wasm.is_none()
{
None
} else {
Some(config)
}
}
fn legacy_u64(metadata: &std::collections::BTreeMap<String, String>, key: &str) -> Option<u64> {
metadata.get(key).map(|value| {
value
.parse::<u64>()
.unwrap_or_else(|error| panic!("parse {key}: {error}"))
})
}
fn legacy_has_resource_limits(config: &secure_exec_vm_config::ResourceLimitsConfig) -> bool {
config.cpu_count.is_some()
|| config.max_processes.is_some()
|| config.max_open_fds.is_some()
|| config.max_pipes.is_some()
|| config.max_ptys.is_some()
|| config.max_sockets.is_some()
|| config.max_connections.is_some()
|| config.max_socket_buffered_bytes.is_some()
|| config.max_socket_datagram_queue_len.is_some()
|| config.max_filesystem_bytes.is_some()
|| config.max_inode_count.is_some()
|| config.max_blocking_read_ms.is_some()
|| config.max_pread_bytes.is_some()
|| config.max_fd_write_bytes.is_some()
|| config.max_process_argv_bytes.is_some()
|| config.max_process_env_bytes.is_some()
|| config.max_readdir_entries.is_some()
|| config.max_wasm_fuel.is_some()
|| config.max_wasm_memory_bytes.is_some()
|| config.max_wasm_stack_bytes.is_some()
}
fn legacy_has_tool_limits(config: &secure_exec_vm_config::ToolLimitsConfig) -> bool {
config.default_tool_timeout_ms.is_some()
|| config.max_tool_timeout_ms.is_some()
|| config.max_registered_toolkits.is_some()
|| config.max_registered_tools_per_vm.is_some()
|| config.max_tools_per_toolkit.is_some()
|| config.max_tool_schema_bytes.is_some()
|| config.max_tool_examples_per_tool.is_some()
|| config.max_tool_example_input_bytes.is_some()
}
fn legacy_has_plugin_limits(config: &secure_exec_vm_config::PluginLimitsConfig) -> bool {
config.max_persisted_manifest_bytes.is_some()
|| config.max_persisted_manifest_file_bytes.is_some()
}
fn legacy_has_acp_limits(config: &secure_exec_vm_config::AcpLimitsConfig) -> bool {
config.max_read_line_bytes.is_some() || config.stdout_buffer_byte_limit.is_some()
}
fn legacy_has_js_runtime_limits(config: &secure_exec_vm_config::JsRuntimeLimitsConfig) -> bool {
config.v8_heap_limit_mb.is_some()
|| config.sync_rpc_wait_timeout_ms.is_some()
|| config.captured_output_limit_bytes.is_some()
|| config.stdin_buffer_limit_bytes.is_some()
|| config.event_payload_limit_bytes.is_some()
|| config.v8_ipc_max_frame_bytes.is_some()
}
fn legacy_has_python_limits(config: &secure_exec_vm_config::PythonLimitsConfig) -> bool {
config.output_buffer_max_bytes.is_some()
|| config.execution_timeout_ms.is_some()
|| config.max_old_space_mb.is_some()
|| config.vfs_rpc_timeout_ms.is_some()
}
fn legacy_has_wasm_limits(config: &secure_exec_vm_config::WasmLimitsConfig) -> bool {
config.max_module_file_bytes.is_some()
|| config.captured_output_limit_bytes.is_some()
|| config.sync_read_limit_bytes.is_some()
}
impl crate::generated_protocol::v1::OwnershipScope {
pub fn connection(connection_id: impl Into<String>) -> Self {
Self::ConnectionOwnership(crate::generated_protocol::v1::ConnectionOwnership {
connection_id: connection_id.into(),
})
}
pub fn session(connection_id: impl Into<String>, session_id: impl Into<String>) -> Self {
Self::SessionOwnership(crate::generated_protocol::v1::SessionOwnership {
connection_id: connection_id.into(),
session_id: session_id.into(),
})
}
pub fn vm(
connection_id: impl Into<String>,
session_id: impl Into<String>,
vm_id: impl Into<String>,
) -> Self {
Self::VmOwnership(crate::generated_protocol::v1::VmOwnership {
connection_id: connection_id.into(),
session_id: session_id.into(),
vm_id: vm_id.into(),
})
}
}
pub const PROTOCOL_NAME: &str = "secure-exec-sidecar";
pub const PROTOCOL_VERSION: u16 = 7;
pub const DEFAULT_MAX_FRAME_BYTES: usize = 1024 * 1024;
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum ProtocolCodecError {
TruncatedFrame {
actual: usize,
},
LengthPrefixMismatch {
declared: usize,
actual: usize,
},
FrameTooLarge {
size: usize,
max: usize,
},
UnsupportedSchema {
name: String,
version: u16,
},
InvalidRequestId,
InvalidRequestDirection {
request_id: RequestId,
expected: RequestDirection,
},
EmptyOwnershipField {
field: &'static str,
},
EmptyAuthToken,
InvalidOwnershipScope {
required: OwnershipRequirement,
actual: OwnershipRequirement,
},
SerializeFailure(String),
DeserializeFailure(String),
}
impl fmt::Display for ProtocolCodecError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::TruncatedFrame { actual } => {
write!(
f,
"protocol frame is truncated: only {actual} bytes provided"
)
}
Self::LengthPrefixMismatch { declared, actual } => write!(
f,
"protocol frame length prefix mismatch: declared {declared} bytes, got {actual}",
),
Self::FrameTooLarge { size, max } => {
write!(f, "protocol frame is {size} bytes, limit is {max}")
}
Self::UnsupportedSchema { name, version } => write!(
f,
"unsupported protocol schema {name}@{version}; expected {PROTOCOL_NAME}@{PROTOCOL_VERSION}",
),
Self::InvalidRequestId => write!(f, "protocol request identifiers must be non-zero"),
Self::InvalidRequestDirection {
request_id,
expected,
} => write!(f, "protocol request id {request_id} must be {expected}",),
Self::EmptyOwnershipField { field } => {
write!(f, "protocol ownership field `{field}` cannot be empty")
}
Self::EmptyAuthToken => {
write!(f, "authenticate requests require a non-empty auth token")
}
Self::InvalidOwnershipScope { required, actual } => write!(
f,
"protocol frame requires {required} ownership but carried {actual}",
),
Self::SerializeFailure(message) => {
write!(f, "protocol frame serialization failed: {message}")
}
Self::DeserializeFailure(message) => {
write!(f, "protocol frame deserialization failed: {message}")
}
}
}
}
impl Error for ProtocolCodecError {}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum OwnershipRequirement {
Any,
Connection,
Session,
Vm,
SessionOrVm,
}
impl fmt::Display for OwnershipRequirement {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::Any => write!(f, "any"),
Self::Connection => write!(f, "connection"),
Self::Session => write!(f, "session"),
Self::Vm => write!(f, "vm"),
Self::SessionOrVm => write!(f, "session-or-vm"),
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum RequestDirection {
Host,
Sidecar,
}
impl fmt::Display for RequestDirection {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::Host => write!(f, "positive"),
Self::Sidecar => write!(f, "negative"),
}
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct WireDispatchResult {
pub response: ResponseFrame,
pub events: Vec<EventFrame>,
}
#[derive(Debug, Clone)]
pub struct WireFrameCodec {
max_frame_bytes: usize,
}
impl WireFrameCodec {
pub fn new(max_frame_bytes: usize) -> Self {
Self { max_frame_bytes }
}
pub fn max_frame_bytes(&self) -> usize {
self.max_frame_bytes
}
pub fn encode(&self, frame: &ProtocolFrame) -> Result<Vec<u8>, ProtocolCodecError> {
validate_frame(frame)?;
let payload = serde_bare::to_vec(frame)
.map_err(|error| ProtocolCodecError::SerializeFailure(error.to_string()))?;
if payload.len() > self.max_frame_bytes {
return Err(ProtocolCodecError::FrameTooLarge {
size: payload.len(),
max: self.max_frame_bytes,
});
}
let length =
u32::try_from(payload.len()).map_err(|_| ProtocolCodecError::FrameTooLarge {
size: payload.len(),
max: u32::MAX as usize,
})?;
let mut encoded = Vec::with_capacity(4 + payload.len());
encoded.extend_from_slice(&length.to_be_bytes());
encoded.extend_from_slice(&payload);
Ok(encoded)
}
pub fn decode(&self, bytes: &[u8]) -> Result<ProtocolFrame, ProtocolCodecError> {
let payload = self.checked_payload(bytes)?;
let frame = serde_bare::from_slice(payload)
.map_err(|error| ProtocolCodecError::DeserializeFailure(error.to_string()))?;
validate_frame(&frame)?;
Ok(frame)
}
fn checked_payload<'a>(&self, bytes: &'a [u8]) -> Result<&'a [u8], ProtocolCodecError> {
if bytes.len() < 4 {
return Err(ProtocolCodecError::TruncatedFrame {
actual: bytes.len(),
});
}
let declared =
u32::from_be_bytes(bytes[..4].try_into().expect("length prefix is four bytes"))
as usize;
if declared > self.max_frame_bytes {
return Err(ProtocolCodecError::FrameTooLarge {
size: declared,
max: self.max_frame_bytes,
});
}
let actual = bytes.len() - 4;
if declared != actual {
return Err(ProtocolCodecError::LengthPrefixMismatch { declared, actual });
}
Ok(&bytes[4..])
}
}
impl Default for WireFrameCodec {
fn default() -> Self {
Self::new(DEFAULT_MAX_FRAME_BYTES)
}
}
pub fn protocol_schema() -> ProtocolSchema {
ProtocolSchema::current()
}
impl ProtocolSchema {
pub fn current() -> Self {
Self {
name: PROTOCOL_NAME.to_string(),
version: PROTOCOL_VERSION,
}
}
}
impl Default for ProtocolSchema {
fn default() -> Self {
Self::current()
}
}
pub(crate) fn request_frame_to_compat(
request: RequestFrame,
) -> Result<crate::protocol::RequestFrame, ProtocolCodecError> {
match crate::protocol::from_generated_protocol_frame(ProtocolFrame::RequestFrame(request))? {
crate::protocol::ProtocolFrame::Request(request) => Ok(request),
crate::protocol::ProtocolFrame::Response(_)
| crate::protocol::ProtocolFrame::Event(_)
| crate::protocol::ProtocolFrame::SidecarRequest(_)
| crate::protocol::ProtocolFrame::SidecarResponse(_) => {
Err(ProtocolCodecError::DeserializeFailure(String::from(
"wire request frame converted to non-request compatibility frame",
)))
}
}
}
pub(crate) fn ownership_scope_to_compat(
ownership: OwnershipScope,
) -> crate::protocol::OwnershipScope {
crate::protocol::from_generated_ownership_scope(ownership)
}
pub(crate) fn request_payload_to_compat(
ownership: &crate::protocol::OwnershipScope,
payload: RequestPayload,
) -> Result<crate::protocol::RequestPayload, ProtocolCodecError> {
match crate::protocol::from_generated_protocol_frame(ProtocolFrame::RequestFrame(
RequestFrame {
schema: protocol_schema(),
request_id: 1,
ownership: crate::protocol::to_generated_ownership_scope(ownership),
payload,
},
))? {
crate::protocol::ProtocolFrame::Request(request) => Ok(request.payload),
crate::protocol::ProtocolFrame::Response(_)
| crate::protocol::ProtocolFrame::Event(_)
| crate::protocol::ProtocolFrame::SidecarRequest(_)
| crate::protocol::ProtocolFrame::SidecarResponse(_) => {
Err(ProtocolCodecError::DeserializeFailure(String::from(
"wire request payload converted to non-request compatibility frame",
)))
}
}
}
pub(crate) fn response_payload_from_compat(
ownership: &crate::protocol::OwnershipScope,
payload: crate::protocol::ResponsePayload,
) -> Result<ResponsePayload, ProtocolCodecError> {
match crate::protocol::to_generated_protocol_frame(&crate::protocol::ProtocolFrame::Response(
crate::protocol::ResponseFrame::new(1, ownership.clone(), payload),
))? {
ProtocolFrame::ResponseFrame(response) => Ok(response.payload),
ProtocolFrame::RequestFrame(_)
| ProtocolFrame::EventFrame(_)
| ProtocolFrame::SidecarRequestFrame(_)
| ProtocolFrame::SidecarResponseFrame(_) => Err(ProtocolCodecError::SerializeFailure(
String::from("compatibility response payload converted to non-response wire frame"),
)),
}
}
pub(crate) fn event_frame_from_compat(
event: crate::protocol::EventFrame,
) -> Result<EventFrame, ProtocolCodecError> {
match crate::protocol::to_generated_protocol_frame(&crate::protocol::ProtocolFrame::Event(
event,
))? {
ProtocolFrame::EventFrame(event) => Ok(event),
ProtocolFrame::RequestFrame(_)
| ProtocolFrame::ResponseFrame(_)
| ProtocolFrame::SidecarRequestFrame(_)
| ProtocolFrame::SidecarResponseFrame(_) => Err(ProtocolCodecError::SerializeFailure(
String::from("compatibility event converted to non-event wire frame"),
)),
}
}
pub(crate) fn event_frame_to_compat(
event: EventFrame,
) -> Result<crate::protocol::EventFrame, ProtocolCodecError> {
match crate::protocol::from_generated_protocol_frame(ProtocolFrame::EventFrame(event))? {
crate::protocol::ProtocolFrame::Event(event) => Ok(event),
crate::protocol::ProtocolFrame::Request(_)
| crate::protocol::ProtocolFrame::Response(_)
| crate::protocol::ProtocolFrame::SidecarRequest(_)
| crate::protocol::ProtocolFrame::SidecarResponse(_) => {
Err(ProtocolCodecError::DeserializeFailure(String::from(
"wire event converted to non-event compatibility frame",
)))
}
}
}
pub(crate) fn sidecar_request_frame_from_compat(
request: crate::protocol::SidecarRequestFrame,
) -> Result<SidecarRequestFrame, ProtocolCodecError> {
match crate::protocol::to_generated_protocol_frame(
&crate::protocol::ProtocolFrame::SidecarRequest(request),
)? {
ProtocolFrame::SidecarRequestFrame(request) => Ok(request),
ProtocolFrame::RequestFrame(_)
| ProtocolFrame::ResponseFrame(_)
| ProtocolFrame::EventFrame(_)
| ProtocolFrame::SidecarResponseFrame(_) => {
Err(ProtocolCodecError::SerializeFailure(String::from(
"compatibility sidecar request converted to non-sidecar-request wire frame",
)))
}
}
}
pub(crate) fn sidecar_request_payload_to_compat(
ownership: &crate::protocol::OwnershipScope,
payload: SidecarRequestPayload,
) -> Result<crate::protocol::SidecarRequestPayload, ProtocolCodecError> {
match crate::protocol::from_generated_protocol_frame(ProtocolFrame::SidecarRequestFrame(
SidecarRequestFrame {
schema: protocol_schema(),
request_id: -1,
ownership: crate::protocol::to_generated_ownership_scope(ownership),
payload,
},
))? {
crate::protocol::ProtocolFrame::SidecarRequest(request) => Ok(request.payload),
crate::protocol::ProtocolFrame::Request(_)
| crate::protocol::ProtocolFrame::Response(_)
| crate::protocol::ProtocolFrame::Event(_)
| crate::protocol::ProtocolFrame::SidecarResponse(_) => {
Err(ProtocolCodecError::DeserializeFailure(String::from(
"wire sidecar request payload converted to non-sidecar-request compatibility frame",
)))
}
}
}
pub(crate) fn sidecar_response_frame_to_compat(
response: SidecarResponseFrame,
) -> Result<crate::protocol::SidecarResponseFrame, ProtocolCodecError> {
match crate::protocol::from_generated_protocol_frame(ProtocolFrame::SidecarResponseFrame(
response,
))? {
crate::protocol::ProtocolFrame::SidecarResponse(response) => Ok(response),
crate::protocol::ProtocolFrame::Request(_)
| crate::protocol::ProtocolFrame::Response(_)
| crate::protocol::ProtocolFrame::Event(_)
| crate::protocol::ProtocolFrame::SidecarRequest(_) => {
Err(ProtocolCodecError::DeserializeFailure(String::from(
"wire sidecar response converted to non-sidecar-response compatibility frame",
)))
}
}
}
pub(crate) fn sidecar_response_frame_from_compat(
response: crate::protocol::SidecarResponseFrame,
) -> Result<SidecarResponseFrame, ProtocolCodecError> {
match crate::protocol::to_generated_protocol_frame(
&crate::protocol::ProtocolFrame::SidecarResponse(response),
)? {
ProtocolFrame::SidecarResponseFrame(response) => Ok(response),
ProtocolFrame::RequestFrame(_)
| ProtocolFrame::ResponseFrame(_)
| ProtocolFrame::EventFrame(_)
| ProtocolFrame::SidecarRequestFrame(_) => {
Err(ProtocolCodecError::SerializeFailure(String::from(
"compatibility sidecar response converted to non-sidecar-response wire frame",
)))
}
}
}
pub(crate) fn dispatch_result_from_compat(
result: crate::state::DispatchResult,
) -> Result<WireDispatchResult, ProtocolCodecError> {
let response = match crate::protocol::to_generated_protocol_frame(
&crate::protocol::ProtocolFrame::Response(result.response),
)? {
ProtocolFrame::ResponseFrame(response) => response,
ProtocolFrame::RequestFrame(_)
| ProtocolFrame::EventFrame(_)
| ProtocolFrame::SidecarRequestFrame(_)
| ProtocolFrame::SidecarResponseFrame(_) => {
return Err(ProtocolCodecError::SerializeFailure(String::from(
"compatibility dispatch response converted to non-response wire frame",
)));
}
};
let events = result
.events
.into_iter()
.map(|event| {
match crate::protocol::to_generated_protocol_frame(
&crate::protocol::ProtocolFrame::Event(event),
)? {
ProtocolFrame::EventFrame(event) => Ok(event),
ProtocolFrame::RequestFrame(_)
| ProtocolFrame::ResponseFrame(_)
| ProtocolFrame::SidecarRequestFrame(_)
| ProtocolFrame::SidecarResponseFrame(_) => {
Err(ProtocolCodecError::SerializeFailure(String::from(
"compatibility dispatch event converted to non-event wire frame",
)))
}
}
})
.collect::<Result<Vec<_>, _>>()?;
Ok(WireDispatchResult { response, events })
}
fn validate_frame(frame: &ProtocolFrame) -> Result<(), ProtocolCodecError> {
match frame {
ProtocolFrame::RequestFrame(frame) => {
validate_schema(&frame.schema)?;
validate_request_id(frame.request_id)
}
ProtocolFrame::ResponseFrame(frame) => {
validate_schema(&frame.schema)?;
validate_request_id(frame.request_id)
}
ProtocolFrame::EventFrame(frame) => validate_schema(&frame.schema),
ProtocolFrame::SidecarRequestFrame(frame) => {
validate_schema(&frame.schema)?;
validate_request_id(frame.request_id)
}
ProtocolFrame::SidecarResponseFrame(frame) => {
validate_schema(&frame.schema)?;
validate_request_id(frame.request_id)
}
}
}
fn validate_schema(schema: &ProtocolSchema) -> Result<(), ProtocolCodecError> {
if schema.name != PROTOCOL_NAME || schema.version != PROTOCOL_VERSION {
return Err(ProtocolCodecError::UnsupportedSchema {
name: schema.name.clone(),
version: schema.version,
});
}
Ok(())
}
fn validate_request_id(request_id: RequestId) -> Result<(), ProtocolCodecError> {
if request_id == 0 {
return Err(ProtocolCodecError::InvalidRequestId);
}
Ok(())
}