Skip to main content

running_process/broker/server/
backend_launcher.rs

1//! Backend launch abstraction for Hello registry misses.
2//!
3//! The router owns admission control and registry insertion. Launchers own the
4//! platform-specific act of starting or discovering a backend and returning a
5//! verified [`BackendHandle`].
6
7use std::collections::HashMap;
8use std::path::PathBuf;
9use std::process::Command;
10use std::sync::Mutex;
11use std::time::{SystemTime, UNIX_EPOCH};
12
13use crate::broker::backend_handle::{BackendHandle, BackendHandleError, DaemonProcess};
14use crate::broker::backend_lifecycle::identity::{sha256_file, IdentityError};
15use crate::broker::host_identity;
16use crate::broker::lifecycle::sid::{user_sid_hash, SidError};
17use crate::broker::protocol::{Endpoint, ServiceDefinition};
18use crate::spawn_daemon;
19
20use super::backend_endpoint_allocator::{BackendEndpointAllocator, BackendEndpointAllocatorError};
21use super::backend_registry::BackendKey;
22use super::trace_context::TraceContext;
23
24/// Environment variable containing the logical service name for a launched
25/// backend.
26pub const BACKEND_ENV_SERVICE_NAME: &str = "RUNNING_PROCESS_BROKER_V1_SERVICE_NAME";
27/// Environment variable containing the negotiated service version.
28pub const BACKEND_ENV_SERVICE_VERSION: &str = "RUNNING_PROCESS_BROKER_V1_SERVICE_VERSION";
29/// Environment variable containing the backend IPC endpoint path.
30pub const BACKEND_ENV_ENDPOINT_PATH: &str = "RUNNING_PROCESS_BROKER_V1_BACKEND_PIPE";
31/// Environment variable containing the backend endpoint namespace.
32pub const BACKEND_ENV_ENDPOINT_NAMESPACE: &str = "RUNNING_PROCESS_BROKER_V1_BACKEND_NAMESPACE";
33/// Environment variable containing the broker instance id.
34pub const BACKEND_ENV_INSTANCE: &str = "RUNNING_PROCESS_BROKER_V1_INSTANCE";
35/// Environment variable containing the incoming W3C traceparent value.
36pub const BACKEND_ENV_TRACEPARENT: &str = "RUNNING_PROCESS_BROKER_V1_TRACEPARENT";
37/// Environment variable containing the incoming W3C tracestate value.
38pub const BACKEND_ENV_TRACESTATE: &str = "RUNNING_PROCESS_BROKER_V1_TRACESTATE";
39
40/// Inputs supplied to a backend launcher after Hello validation and budget
41/// admission.
42pub struct BackendLaunchRequest<'a> {
43    /// Backend key being launched.
44    pub key: &'a BackendKey,
45    /// Service definition that authorized the requested backend.
46    pub service_definition: &'a ServiceDefinition,
47    /// Trace context from the Hello frame that triggered this launch.
48    pub trace_context: &'a TraceContext,
49}
50
51/// Launches or discovers one backend and returns a verified handle.
52pub trait BackendLauncher: Send + Sync {
53    /// Launch the requested backend.
54    fn launch(
55        &self,
56        request: &BackendLaunchRequest<'_>,
57    ) -> Result<BackendHandle, BackendLaunchError>;
58}
59
60/// Command-based backend launcher.
61///
62/// This launcher allocates the canonical v1 backend endpoint, starts
63/// `ServiceDefinition.binary_path` as a detached daemon, passes the selected
64/// endpoint through environment variables, and verifies the spawned process
65/// identity before returning a [`BackendHandle`].
66#[derive(Debug)]
67pub struct CommandBackendLauncher {
68    user_sid_hash: String,
69    allocators: Mutex<HashMap<String, BackendEndpointAllocator>>,
70    idle_timeout_secs: Option<u32>,
71}
72
73impl CommandBackendLauncher {
74    /// Build a launcher for the current user.
75    pub fn for_current_user() -> Result<Self, SidError> {
76        Ok(Self::new(user_sid_hash()?))
77    }
78
79    /// Build a launcher with an explicit 16-hex user SID hash.
80    pub fn new(user_sid_hash: impl Into<String>) -> Self {
81        Self {
82            user_sid_hash: user_sid_hash.into(),
83            allocators: Mutex::new(HashMap::new()),
84            idle_timeout_secs: Some(30),
85        }
86    }
87
88    /// Override the idle timeout recorded in the verified daemon identity.
89    pub fn with_idle_timeout_secs(mut self, idle_timeout_secs: Option<u32>) -> Self {
90        self.idle_timeout_secs = idle_timeout_secs;
91        self
92    }
93
94    fn allocate_endpoint(
95        &self,
96        request: &BackendLaunchRequest<'_>,
97    ) -> Result<Endpoint, BackendLaunchError> {
98        let namespace_id = request.key.instance.id();
99        let mut allocators = self
100            .allocators
101            .lock()
102            .map_err(|_| BackendLaunchError::AllocatorPoisoned)?;
103        let allocator = allocators
104            .entry(namespace_id.clone())
105            .or_insert_with(|| BackendEndpointAllocator::new(&self.user_sid_hash, namespace_id));
106        Ok(allocator.allocate()?)
107    }
108}
109
110impl BackendLauncher for CommandBackendLauncher {
111    fn launch(
112        &self,
113        request: &BackendLaunchRequest<'_>,
114    ) -> Result<BackendHandle, BackendLaunchError> {
115        let endpoint = self.allocate_endpoint(request)?;
116        let binary_path = canonical_backend_binary(request.service_definition)?;
117        let mut command = Command::new(&binary_path);
118        configure_backend_command(&mut command, request, &endpoint);
119
120        let mut child = spawn_daemon(&mut command).map_err(BackendLaunchError::Spawn)?;
121        let daemon = daemon_identity_for_spawned_process(
122            child.id(),
123            binary_path,
124            endpoint.clone(),
125            self.idle_timeout_secs,
126        )?;
127
128        match BackendHandle::probe_with_service(
129            request.key.service_name.clone(),
130            request.key.service_version.clone(),
131            &endpoint,
132            &daemon,
133        ) {
134            Ok(handle) => Ok(handle),
135            Err(err) => {
136                let _ = child.kill();
137                Err(BackendLaunchError::BackendHandle(err))
138            }
139        }
140    }
141}
142
143fn configure_backend_command(
144    command: &mut Command,
145    request: &BackendLaunchRequest<'_>,
146    endpoint: &Endpoint,
147) {
148    command
149        .env(BACKEND_ENV_SERVICE_NAME, &request.key.service_name)
150        .env(BACKEND_ENV_SERVICE_VERSION, &request.key.service_version)
151        .env(BACKEND_ENV_ENDPOINT_PATH, &endpoint.path)
152        .env(BACKEND_ENV_ENDPOINT_NAMESPACE, &endpoint.namespace_id)
153        .env(BACKEND_ENV_INSTANCE, request.key.instance.id());
154
155    if !request.trace_context.traceparent.is_empty() {
156        command.env(BACKEND_ENV_TRACEPARENT, &request.trace_context.traceparent);
157    }
158    if !request.trace_context.tracestate.is_empty() {
159        command.env(BACKEND_ENV_TRACESTATE, &request.trace_context.tracestate);
160    }
161}
162
163/// Errors raised while launching a backend.
164#[derive(Debug, thiserror::Error)]
165pub enum BackendLaunchError {
166    /// The service definition did not include a backend binary path.
167    #[error("backend binary_path is empty")]
168    EmptyBinaryPath,
169    /// The service definition did not include the per-version allow-list root.
170    #[error("backend per_version_binary_dir is empty")]
171    EmptyPerVersionBinaryDir,
172    /// The backend binary path could not be canonicalized.
173    #[error("backend binary_path {path:?} could not be canonicalized: {source}")]
174    CanonicalizeBinary {
175        /// Path that failed canonicalization.
176        path: PathBuf,
177        /// Filesystem error.
178        source: std::io::Error,
179    },
180    /// The backend allow-list root could not be canonicalized.
181    #[error("backend per_version_binary_dir {path:?} could not be canonicalized: {source}")]
182    CanonicalizeBinaryRoot {
183        /// Root path that failed canonicalization.
184        path: PathBuf,
185        /// Filesystem error.
186        source: std::io::Error,
187    },
188    /// The binary was outside the configured per-version allow-list root.
189    #[error("backend binary {binary:?} is outside per-version root {root:?}")]
190    BinaryOutsideAllowRoot {
191        /// Canonical backend binary path.
192        binary: PathBuf,
193        /// Canonical allow-list root.
194        root: PathBuf,
195    },
196    /// Endpoint allocator state was poisoned.
197    #[error("backend endpoint allocator state was poisoned")]
198    AllocatorPoisoned,
199    /// Canonical endpoint allocation failed.
200    #[error(transparent)]
201    Endpoint(#[from] BackendEndpointAllocatorError),
202    /// Detached process creation failed.
203    #[error("backend daemon spawn failed: {0}")]
204    Spawn(std::io::Error),
205    /// Spawned daemon identity construction failed.
206    #[error(transparent)]
207    Identity(#[from] IdentityError),
208    /// Spawned daemon verification failed.
209    #[error(transparent)]
210    BackendHandle(#[from] BackendHandleError),
211    /// Test or custom launcher failure.
212    #[error("{0}")]
213    Launcher(String),
214}
215
216fn canonical_backend_binary(
217    service_definition: &ServiceDefinition,
218) -> Result<PathBuf, BackendLaunchError> {
219    if service_definition.binary_path.is_empty() {
220        return Err(BackendLaunchError::EmptyBinaryPath);
221    }
222    if service_definition.per_version_binary_dir.is_empty() {
223        return Err(BackendLaunchError::EmptyPerVersionBinaryDir);
224    }
225
226    let binary = PathBuf::from(&service_definition.binary_path);
227    let binary = std::fs::canonicalize(&binary).map_err(|source| {
228        BackendLaunchError::CanonicalizeBinary {
229            path: binary,
230            source,
231        }
232    })?;
233
234    let root = PathBuf::from(&service_definition.per_version_binary_dir);
235    let root = std::fs::canonicalize(&root)
236        .map_err(|source| BackendLaunchError::CanonicalizeBinaryRoot { path: root, source })?;
237
238    if !binary.starts_with(&root) {
239        return Err(BackendLaunchError::BinaryOutsideAllowRoot { binary, root });
240    }
241
242    Ok(binary)
243}
244
245fn daemon_identity_for_spawned_process(
246    pid: u32,
247    exe_path: PathBuf,
248    ipc_endpoint: Endpoint,
249    idle_timeout_secs: Option<u32>,
250) -> Result<DaemonProcess, IdentityError> {
251    let exe_sha256 = sha256_file(&exe_path)?;
252    Ok(DaemonProcess {
253        pid,
254        exe_path: exe_path.clone(),
255        exe_sha256,
256        boot_id: host_identity::current_for_path(&exe_path).boot_id,
257        ipc_endpoint,
258        started_at_unix_ms: unix_now_ms(),
259        idle_timeout_secs,
260    })
261}
262
263fn unix_now_ms() -> u64 {
264    SystemTime::now()
265        .duration_since(UNIX_EPOCH)
266        .map(|duration| duration.as_millis() as u64)
267        .unwrap_or(0)
268}
269
270#[cfg(test)]
271mod tests {
272    use std::ffi::OsStr;
273
274    use crate::broker::protocol::ServiceDefinition;
275    use crate::broker::server::{BackendKey, BrokerInstanceKey, TraceContext};
276
277    use super::*;
278
279    fn env_value(command: &Command, name: &str) -> Option<String> {
280        command.get_envs().find_map(|(key, value)| {
281            if key == OsStr::new(name) {
282                value.map(|value| value.to_string_lossy().into_owned())
283            } else {
284                None
285            }
286        })
287    }
288
289    #[test]
290    fn backend_command_environment_forwards_trace_context() {
291        let key = BackendKey::new(BrokerInstanceKey::Shared, "zccache", "1.11.20");
292        let service_definition = ServiceDefinition {
293            service_name: "zccache".into(),
294            binary_path: "backend".into(),
295            isolation: 1,
296            explicit_instance: String::new(),
297            per_version_binary_dir: ".".into(),
298            min_version: "1.10.0".into(),
299            version_allow_list: vec!["1.11.20".into()],
300            labels: Default::default(),
301        };
302        let trace_context = TraceContext {
303            request_id: 42,
304            traceparent: "00-11111111111111111111111111111111-2222222222222222-01".into(),
305            tracestate: "vendor=value".into(),
306        };
307        let request = BackendLaunchRequest {
308            key: &key,
309            service_definition: &service_definition,
310            trace_context: &trace_context,
311        };
312        let endpoint = Endpoint {
313            namespace_id: "shared".into(),
314            path: "backend.sock".into(),
315        };
316        let mut command = Command::new("backend");
317
318        configure_backend_command(&mut command, &request, &endpoint);
319
320        assert_eq!(
321            env_value(&command, BACKEND_ENV_SERVICE_NAME).as_deref(),
322            Some("zccache")
323        );
324        assert_eq!(
325            env_value(&command, BACKEND_ENV_SERVICE_VERSION).as_deref(),
326            Some("1.11.20")
327        );
328        assert_eq!(
329            env_value(&command, BACKEND_ENV_ENDPOINT_PATH).as_deref(),
330            Some("backend.sock")
331        );
332        assert_eq!(
333            env_value(&command, BACKEND_ENV_ENDPOINT_NAMESPACE).as_deref(),
334            Some("shared")
335        );
336        assert_eq!(
337            env_value(&command, BACKEND_ENV_INSTANCE).as_deref(),
338            Some("shared")
339        );
340        assert_eq!(
341            env_value(&command, BACKEND_ENV_TRACEPARENT).as_deref(),
342            Some("00-11111111111111111111111111111111-2222222222222222-01")
343        );
344        assert_eq!(
345            env_value(&command, BACKEND_ENV_TRACESTATE).as_deref(),
346            Some("vendor=value")
347        );
348    }
349}