use std::collections::HashMap;
use std::path::PathBuf;
use std::process::Command;
use std::sync::Mutex;
use std::time::{SystemTime, UNIX_EPOCH};
use crate::broker::backend_handle::{BackendHandle, BackendHandleError, DaemonProcess};
use crate::broker::backend_lifecycle::identity::{sha256_file, IdentityError};
use crate::broker::host_identity;
use crate::broker::lifecycle::sid::{user_sid_hash, SidError};
use crate::broker::protocol::{Endpoint, ServiceDefinition};
use crate::spawn_daemon;
use super::backend_endpoint_allocator::{BackendEndpointAllocator, BackendEndpointAllocatorError};
use super::backend_registry::BackendKey;
use super::trace_context::TraceContext;
pub const BACKEND_ENV_SERVICE_NAME: &str = "RUNNING_PROCESS_BROKER_V1_SERVICE_NAME";
pub const BACKEND_ENV_SERVICE_VERSION: &str = "RUNNING_PROCESS_BROKER_V1_SERVICE_VERSION";
pub const BACKEND_ENV_ENDPOINT_PATH: &str = "RUNNING_PROCESS_BROKER_V1_BACKEND_PIPE";
pub const BACKEND_ENV_ENDPOINT_NAMESPACE: &str = "RUNNING_PROCESS_BROKER_V1_BACKEND_NAMESPACE";
pub const BACKEND_ENV_INSTANCE: &str = "RUNNING_PROCESS_BROKER_V1_INSTANCE";
pub const BACKEND_ENV_TRACEPARENT: &str = "RUNNING_PROCESS_BROKER_V1_TRACEPARENT";
pub const BACKEND_ENV_TRACESTATE: &str = "RUNNING_PROCESS_BROKER_V1_TRACESTATE";
pub struct BackendLaunchRequest<'a> {
pub key: &'a BackendKey,
pub service_definition: &'a ServiceDefinition,
pub trace_context: &'a TraceContext,
}
pub trait BackendLauncher: Send + Sync {
fn launch(
&self,
request: &BackendLaunchRequest<'_>,
) -> Result<BackendHandle, BackendLaunchError>;
}
#[derive(Debug)]
pub struct CommandBackendLauncher {
user_sid_hash: String,
allocators: Mutex<HashMap<String, BackendEndpointAllocator>>,
idle_timeout_secs: Option<u32>,
}
impl CommandBackendLauncher {
pub fn for_current_user() -> Result<Self, SidError> {
Ok(Self::new(user_sid_hash()?))
}
pub fn new(user_sid_hash: impl Into<String>) -> Self {
Self {
user_sid_hash: user_sid_hash.into(),
allocators: Mutex::new(HashMap::new()),
idle_timeout_secs: Some(30),
}
}
pub fn with_idle_timeout_secs(mut self, idle_timeout_secs: Option<u32>) -> Self {
self.idle_timeout_secs = idle_timeout_secs;
self
}
fn allocate_endpoint(
&self,
request: &BackendLaunchRequest<'_>,
) -> Result<Endpoint, BackendLaunchError> {
let namespace_id = request.key.instance.id();
let mut allocators = self
.allocators
.lock()
.map_err(|_| BackendLaunchError::AllocatorPoisoned)?;
let allocator = allocators
.entry(namespace_id.clone())
.or_insert_with(|| BackendEndpointAllocator::new(&self.user_sid_hash, namespace_id));
Ok(allocator.allocate()?)
}
}
impl BackendLauncher for CommandBackendLauncher {
fn launch(
&self,
request: &BackendLaunchRequest<'_>,
) -> Result<BackendHandle, BackendLaunchError> {
let endpoint = self.allocate_endpoint(request)?;
let binary_path = canonical_backend_binary(request.service_definition)?;
let mut command = Command::new(&binary_path);
configure_backend_command(&mut command, request, &endpoint);
let mut child = spawn_daemon(&mut command).map_err(BackendLaunchError::Spawn)?;
let daemon = daemon_identity_for_spawned_process(
child.id(),
binary_path,
endpoint.clone(),
self.idle_timeout_secs,
)?;
match BackendHandle::probe_with_service(
request.key.service_name.clone(),
request.key.service_version.clone(),
&endpoint,
&daemon,
) {
Ok(handle) => Ok(handle),
Err(err) => {
let _ = child.kill();
Err(BackendLaunchError::BackendHandle(err))
}
}
}
}
fn configure_backend_command(
command: &mut Command,
request: &BackendLaunchRequest<'_>,
endpoint: &Endpoint,
) {
command
.env(BACKEND_ENV_SERVICE_NAME, &request.key.service_name)
.env(BACKEND_ENV_SERVICE_VERSION, &request.key.service_version)
.env(BACKEND_ENV_ENDPOINT_PATH, &endpoint.path)
.env(BACKEND_ENV_ENDPOINT_NAMESPACE, &endpoint.namespace_id)
.env(BACKEND_ENV_INSTANCE, request.key.instance.id());
if !request.trace_context.traceparent.is_empty() {
command.env(BACKEND_ENV_TRACEPARENT, &request.trace_context.traceparent);
}
if !request.trace_context.tracestate.is_empty() {
command.env(BACKEND_ENV_TRACESTATE, &request.trace_context.tracestate);
}
}
#[derive(Debug, thiserror::Error)]
pub enum BackendLaunchError {
#[error("backend binary_path is empty")]
EmptyBinaryPath,
#[error("backend per_version_binary_dir is empty")]
EmptyPerVersionBinaryDir,
#[error("backend binary_path {path:?} could not be canonicalized: {source}")]
CanonicalizeBinary {
path: PathBuf,
source: std::io::Error,
},
#[error("backend per_version_binary_dir {path:?} could not be canonicalized: {source}")]
CanonicalizeBinaryRoot {
path: PathBuf,
source: std::io::Error,
},
#[error("backend binary {binary:?} is outside per-version root {root:?}")]
BinaryOutsideAllowRoot {
binary: PathBuf,
root: PathBuf,
},
#[error("backend endpoint allocator state was poisoned")]
AllocatorPoisoned,
#[error(transparent)]
Endpoint(#[from] BackendEndpointAllocatorError),
#[error("backend daemon spawn failed: {0}")]
Spawn(std::io::Error),
#[error(transparent)]
Identity(#[from] IdentityError),
#[error(transparent)]
BackendHandle(#[from] BackendHandleError),
#[error("{0}")]
Launcher(String),
}
fn canonical_backend_binary(
service_definition: &ServiceDefinition,
) -> Result<PathBuf, BackendLaunchError> {
if service_definition.binary_path.is_empty() {
return Err(BackendLaunchError::EmptyBinaryPath);
}
if service_definition.per_version_binary_dir.is_empty() {
return Err(BackendLaunchError::EmptyPerVersionBinaryDir);
}
let binary = PathBuf::from(&service_definition.binary_path);
let binary = std::fs::canonicalize(&binary).map_err(|source| {
BackendLaunchError::CanonicalizeBinary {
path: binary,
source,
}
})?;
let root = PathBuf::from(&service_definition.per_version_binary_dir);
let root = std::fs::canonicalize(&root)
.map_err(|source| BackendLaunchError::CanonicalizeBinaryRoot { path: root, source })?;
if !binary.starts_with(&root) {
return Err(BackendLaunchError::BinaryOutsideAllowRoot { binary, root });
}
Ok(binary)
}
fn daemon_identity_for_spawned_process(
pid: u32,
exe_path: PathBuf,
ipc_endpoint: Endpoint,
idle_timeout_secs: Option<u32>,
) -> Result<DaemonProcess, IdentityError> {
let exe_sha256 = sha256_file(&exe_path)?;
Ok(DaemonProcess {
pid,
exe_path: exe_path.clone(),
exe_sha256,
boot_id: host_identity::current_for_path(&exe_path).boot_id,
ipc_endpoint,
started_at_unix_ms: unix_now_ms(),
idle_timeout_secs,
})
}
fn unix_now_ms() -> u64 {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|duration| duration.as_millis() as u64)
.unwrap_or(0)
}
#[cfg(test)]
mod tests {
use std::ffi::OsStr;
use crate::broker::protocol::ServiceDefinition;
use crate::broker::server::{BackendKey, BrokerInstanceKey, TraceContext};
use super::*;
fn env_value(command: &Command, name: &str) -> Option<String> {
command.get_envs().find_map(|(key, value)| {
if key == OsStr::new(name) {
value.map(|value| value.to_string_lossy().into_owned())
} else {
None
}
})
}
#[test]
fn backend_command_environment_forwards_trace_context() {
let key = BackendKey::new(BrokerInstanceKey::Shared, "zccache", "1.11.20");
let service_definition = ServiceDefinition {
service_name: "zccache".into(),
binary_path: "backend".into(),
isolation: 1,
explicit_instance: String::new(),
per_version_binary_dir: ".".into(),
min_version: "1.10.0".into(),
version_allow_list: vec!["1.11.20".into()],
labels: Default::default(),
};
let trace_context = TraceContext {
request_id: 42,
traceparent: "00-11111111111111111111111111111111-2222222222222222-01".into(),
tracestate: "vendor=value".into(),
};
let request = BackendLaunchRequest {
key: &key,
service_definition: &service_definition,
trace_context: &trace_context,
};
let endpoint = Endpoint {
namespace_id: "shared".into(),
path: "backend.sock".into(),
};
let mut command = Command::new("backend");
configure_backend_command(&mut command, &request, &endpoint);
assert_eq!(
env_value(&command, BACKEND_ENV_SERVICE_NAME).as_deref(),
Some("zccache")
);
assert_eq!(
env_value(&command, BACKEND_ENV_SERVICE_VERSION).as_deref(),
Some("1.11.20")
);
assert_eq!(
env_value(&command, BACKEND_ENV_ENDPOINT_PATH).as_deref(),
Some("backend.sock")
);
assert_eq!(
env_value(&command, BACKEND_ENV_ENDPOINT_NAMESPACE).as_deref(),
Some("shared")
);
assert_eq!(
env_value(&command, BACKEND_ENV_INSTANCE).as_deref(),
Some("shared")
);
assert_eq!(
env_value(&command, BACKEND_ENV_TRACEPARENT).as_deref(),
Some("00-11111111111111111111111111111111-2222222222222222-01")
);
assert_eq!(
env_value(&command, BACKEND_ENV_TRACESTATE).as_deref(),
Some("vendor=value")
);
}
}