running_process/broker/server/
backend_launcher.rs1use std::collections::HashMap;
8use std::path::PathBuf;
9use std::process::Command;
10use std::sync::Mutex;
11use std::time::{SystemTime, UNIX_EPOCH};
12
13use crate::broker::backend_handle::{BackendHandle, BackendHandleError, DaemonProcess};
14use crate::broker::backend_lifecycle::identity::{sha256_file, IdentityError};
15use crate::broker::host_identity;
16use crate::broker::lifecycle::sid::{user_sid_hash, SidError};
17use crate::broker::protocol::{Endpoint, ServiceDefinition};
18use crate::spawn_daemon;
19
20use super::backend_endpoint_allocator::{BackendEndpointAllocator, BackendEndpointAllocatorError};
21use super::backend_registry::BackendKey;
22use super::trace_context::TraceContext;
23
24pub const BACKEND_ENV_SERVICE_NAME: &str = "RUNNING_PROCESS_BROKER_V1_SERVICE_NAME";
27pub const BACKEND_ENV_SERVICE_VERSION: &str = "RUNNING_PROCESS_BROKER_V1_SERVICE_VERSION";
29pub const BACKEND_ENV_ENDPOINT_PATH: &str = "RUNNING_PROCESS_BROKER_V1_BACKEND_PIPE";
31pub const BACKEND_ENV_ENDPOINT_NAMESPACE: &str = "RUNNING_PROCESS_BROKER_V1_BACKEND_NAMESPACE";
33pub const BACKEND_ENV_INSTANCE: &str = "RUNNING_PROCESS_BROKER_V1_INSTANCE";
35pub const BACKEND_ENV_TRACEPARENT: &str = "RUNNING_PROCESS_BROKER_V1_TRACEPARENT";
37pub const BACKEND_ENV_TRACESTATE: &str = "RUNNING_PROCESS_BROKER_V1_TRACESTATE";
39
40pub struct BackendLaunchRequest<'a> {
43 pub key: &'a BackendKey,
45 pub service_definition: &'a ServiceDefinition,
47 pub trace_context: &'a TraceContext,
49}
50
51pub trait BackendLauncher: Send + Sync {
53 fn launch(
55 &self,
56 request: &BackendLaunchRequest<'_>,
57 ) -> Result<BackendHandle, BackendLaunchError>;
58}
59
60#[derive(Debug)]
67pub struct CommandBackendLauncher {
68 user_sid_hash: String,
69 allocators: Mutex<HashMap<String, BackendEndpointAllocator>>,
70 idle_timeout_secs: Option<u32>,
71}
72
73impl CommandBackendLauncher {
74 pub fn for_current_user() -> Result<Self, SidError> {
76 Ok(Self::new(user_sid_hash()?))
77 }
78
79 pub fn new(user_sid_hash: impl Into<String>) -> Self {
81 Self {
82 user_sid_hash: user_sid_hash.into(),
83 allocators: Mutex::new(HashMap::new()),
84 idle_timeout_secs: Some(30),
85 }
86 }
87
88 pub fn with_idle_timeout_secs(mut self, idle_timeout_secs: Option<u32>) -> Self {
90 self.idle_timeout_secs = idle_timeout_secs;
91 self
92 }
93
94 fn allocate_endpoint(
95 &self,
96 request: &BackendLaunchRequest<'_>,
97 ) -> Result<Endpoint, BackendLaunchError> {
98 let namespace_id = request.key.instance.id();
99 let mut allocators = self
100 .allocators
101 .lock()
102 .map_err(|_| BackendLaunchError::AllocatorPoisoned)?;
103 let allocator = allocators
104 .entry(namespace_id.clone())
105 .or_insert_with(|| BackendEndpointAllocator::new(&self.user_sid_hash, namespace_id));
106 Ok(allocator.allocate()?)
107 }
108}
109
110impl BackendLauncher for CommandBackendLauncher {
111 fn launch(
112 &self,
113 request: &BackendLaunchRequest<'_>,
114 ) -> Result<BackendHandle, BackendLaunchError> {
115 let endpoint = self.allocate_endpoint(request)?;
116 let binary_path = canonical_backend_binary(request.service_definition)?;
117 let mut command = Command::new(&binary_path);
118 configure_backend_command(&mut command, request, &endpoint);
119
120 let mut child = spawn_daemon(&mut command).map_err(BackendLaunchError::Spawn)?;
121 let daemon = daemon_identity_for_spawned_process(
122 child.id(),
123 binary_path,
124 endpoint.clone(),
125 self.idle_timeout_secs,
126 )?;
127
128 match BackendHandle::probe_with_service(
129 request.key.service_name.clone(),
130 request.key.service_version.clone(),
131 &endpoint,
132 &daemon,
133 ) {
134 Ok(handle) => Ok(handle),
135 Err(err) => {
136 let _ = child.kill();
137 Err(BackendLaunchError::BackendHandle(err))
138 }
139 }
140 }
141}
142
143fn configure_backend_command(
144 command: &mut Command,
145 request: &BackendLaunchRequest<'_>,
146 endpoint: &Endpoint,
147) {
148 command
149 .env(BACKEND_ENV_SERVICE_NAME, &request.key.service_name)
150 .env(BACKEND_ENV_SERVICE_VERSION, &request.key.service_version)
151 .env(BACKEND_ENV_ENDPOINT_PATH, &endpoint.path)
152 .env(BACKEND_ENV_ENDPOINT_NAMESPACE, &endpoint.namespace_id)
153 .env(BACKEND_ENV_INSTANCE, request.key.instance.id());
154
155 if !request.trace_context.traceparent.is_empty() {
156 command.env(BACKEND_ENV_TRACEPARENT, &request.trace_context.traceparent);
157 }
158 if !request.trace_context.tracestate.is_empty() {
159 command.env(BACKEND_ENV_TRACESTATE, &request.trace_context.tracestate);
160 }
161}
162
163#[derive(Debug, thiserror::Error)]
165pub enum BackendLaunchError {
166 #[error("backend binary_path is empty")]
168 EmptyBinaryPath,
169 #[error("backend per_version_binary_dir is empty")]
171 EmptyPerVersionBinaryDir,
172 #[error("backend binary_path {path:?} could not be canonicalized: {source}")]
174 CanonicalizeBinary {
175 path: PathBuf,
177 source: std::io::Error,
179 },
180 #[error("backend per_version_binary_dir {path:?} could not be canonicalized: {source}")]
182 CanonicalizeBinaryRoot {
183 path: PathBuf,
185 source: std::io::Error,
187 },
188 #[error("backend binary {binary:?} is outside per-version root {root:?}")]
190 BinaryOutsideAllowRoot {
191 binary: PathBuf,
193 root: PathBuf,
195 },
196 #[error("backend endpoint allocator state was poisoned")]
198 AllocatorPoisoned,
199 #[error(transparent)]
201 Endpoint(#[from] BackendEndpointAllocatorError),
202 #[error("backend daemon spawn failed: {0}")]
204 Spawn(std::io::Error),
205 #[error(transparent)]
207 Identity(#[from] IdentityError),
208 #[error(transparent)]
210 BackendHandle(#[from] BackendHandleError),
211 #[error("{0}")]
213 Launcher(String),
214}
215
216fn canonical_backend_binary(
217 service_definition: &ServiceDefinition,
218) -> Result<PathBuf, BackendLaunchError> {
219 if service_definition.binary_path.is_empty() {
220 return Err(BackendLaunchError::EmptyBinaryPath);
221 }
222 if service_definition.per_version_binary_dir.is_empty() {
223 return Err(BackendLaunchError::EmptyPerVersionBinaryDir);
224 }
225
226 let binary = PathBuf::from(&service_definition.binary_path);
227 let binary = std::fs::canonicalize(&binary).map_err(|source| {
228 BackendLaunchError::CanonicalizeBinary {
229 path: binary,
230 source,
231 }
232 })?;
233
234 let root = PathBuf::from(&service_definition.per_version_binary_dir);
235 let root = std::fs::canonicalize(&root)
236 .map_err(|source| BackendLaunchError::CanonicalizeBinaryRoot { path: root, source })?;
237
238 if !binary.starts_with(&root) {
239 return Err(BackendLaunchError::BinaryOutsideAllowRoot { binary, root });
240 }
241
242 Ok(binary)
243}
244
245fn daemon_identity_for_spawned_process(
246 pid: u32,
247 exe_path: PathBuf,
248 ipc_endpoint: Endpoint,
249 idle_timeout_secs: Option<u32>,
250) -> Result<DaemonProcess, IdentityError> {
251 let exe_sha256 = sha256_file(&exe_path)?;
252 Ok(DaemonProcess {
253 pid,
254 exe_path: exe_path.clone(),
255 exe_sha256,
256 boot_id: host_identity::current_for_path(&exe_path).boot_id,
257 ipc_endpoint,
258 started_at_unix_ms: unix_now_ms(),
259 idle_timeout_secs,
260 })
261}
262
263fn unix_now_ms() -> u64 {
264 SystemTime::now()
265 .duration_since(UNIX_EPOCH)
266 .map(|duration| duration.as_millis() as u64)
267 .unwrap_or(0)
268}
269
270#[cfg(test)]
271mod tests {
272 use std::ffi::OsStr;
273
274 use crate::broker::protocol::ServiceDefinition;
275 use crate::broker::server::{BackendKey, BrokerInstanceKey, TraceContext};
276
277 use super::*;
278
279 fn env_value(command: &Command, name: &str) -> Option<String> {
280 command.get_envs().find_map(|(key, value)| {
281 if key == OsStr::new(name) {
282 value.map(|value| value.to_string_lossy().into_owned())
283 } else {
284 None
285 }
286 })
287 }
288
289 #[test]
290 fn backend_command_environment_forwards_trace_context() {
291 let key = BackendKey::new(BrokerInstanceKey::Shared, "zccache", "1.11.20");
292 let service_definition = ServiceDefinition {
293 service_name: "zccache".into(),
294 binary_path: "backend".into(),
295 isolation: 1,
296 explicit_instance: String::new(),
297 per_version_binary_dir: ".".into(),
298 min_version: "1.10.0".into(),
299 version_allow_list: vec!["1.11.20".into()],
300 labels: Default::default(),
301 };
302 let trace_context = TraceContext {
303 request_id: 42,
304 traceparent: "00-11111111111111111111111111111111-2222222222222222-01".into(),
305 tracestate: "vendor=value".into(),
306 };
307 let request = BackendLaunchRequest {
308 key: &key,
309 service_definition: &service_definition,
310 trace_context: &trace_context,
311 };
312 let endpoint = Endpoint {
313 namespace_id: "shared".into(),
314 path: "backend.sock".into(),
315 };
316 let mut command = Command::new("backend");
317
318 configure_backend_command(&mut command, &request, &endpoint);
319
320 assert_eq!(
321 env_value(&command, BACKEND_ENV_SERVICE_NAME).as_deref(),
322 Some("zccache")
323 );
324 assert_eq!(
325 env_value(&command, BACKEND_ENV_SERVICE_VERSION).as_deref(),
326 Some("1.11.20")
327 );
328 assert_eq!(
329 env_value(&command, BACKEND_ENV_ENDPOINT_PATH).as_deref(),
330 Some("backend.sock")
331 );
332 assert_eq!(
333 env_value(&command, BACKEND_ENV_ENDPOINT_NAMESPACE).as_deref(),
334 Some("shared")
335 );
336 assert_eq!(
337 env_value(&command, BACKEND_ENV_INSTANCE).as_deref(),
338 Some("shared")
339 );
340 assert_eq!(
341 env_value(&command, BACKEND_ENV_TRACEPARENT).as_deref(),
342 Some("00-11111111111111111111111111111111-2222222222222222-01")
343 );
344 assert_eq!(
345 env_value(&command, BACKEND_ENV_TRACESTATE).as_deref(),
346 Some("vendor=value")
347 );
348 }
349}