Skip to main content

running_process/broker/server/
hello_router.rs

1//! Service-definition-backed Hello routing.
2//!
3//! `HelloHandler` owns deterministic validation and in-memory negotiation.
4//! `HelloRouter` adds the broker-facing lookup layer: reload the service
5//! definition for each request, resolve the trust-domain instance, query the
6//! backend registry, and then delegate the final reply construction to
7//! `HelloHandler`.
8
9use std::collections::HashMap;
10use std::sync::Mutex;
11use std::time::{Duration, Instant};
12
13use crate::broker::protocol::{
14    hello_reply::Result as HelloReplyResult, ErrorCode, Frame, HelloReply, Refused,
15    ServiceDefinition,
16};
17use crate::broker::server::{
18    check_version_allowed, BackendKey, BackendLaunchRequest, BackendLauncher, BackendRegistry,
19    BrokerInstanceKey, HelloHandler, HelloHandlerError, HelloRequest, PeerIdentity,
20    RegisteredBackend, ServiceDefinitionError, ServiceDefinitionLoader, SpawnBeginError,
21    SpawnCoordinator, SpawnOutcome, TraceContext, VersionPolicyBlock,
22};
23
24const PROTOCOL_VERSION: u32 = 1;
25
26/// Routes decoded Hello requests through service definitions and backend state.
27#[derive(Clone, Copy)]
28pub struct HelloRouter<'a> {
29    service_definitions: &'a ServiceDefinitionLoader,
30    backends: BackendRegistryView<'a>,
31    spawn_coordinator: Option<&'a Mutex<SpawnCoordinator>>,
32    backend_launcher: Option<&'a dyn BackendLauncher>,
33}
34
35#[derive(Clone, Copy)]
36enum BackendRegistryView<'a> {
37    Static(&'a BackendRegistry),
38    Live(&'a Mutex<BackendRegistry>),
39}
40
41impl<'a> HelloRouter<'a> {
42    /// Create a router over immutable broker state.
43    pub fn new(
44        service_definitions: &'a ServiceDefinitionLoader,
45        backends: &'a BackendRegistry,
46    ) -> Self {
47        Self {
48            service_definitions,
49            backends: BackendRegistryView::Static(backends),
50            spawn_coordinator: None,
51            backend_launcher: None,
52        }
53    }
54
55    /// Create a router over live broker state that prunes stale backend handles
56    /// before each registry lookup.
57    pub fn with_lifecycle_monitor(
58        service_definitions: &'a ServiceDefinitionLoader,
59        backends: &'a Mutex<BackendRegistry>,
60    ) -> Self {
61        Self {
62            service_definitions,
63            backends: BackendRegistryView::Live(backends),
64            spawn_coordinator: None,
65            backend_launcher: None,
66        }
67    }
68
69    /// Attach spawn-budget coordination for backend registry misses.
70    pub fn with_spawn_coordinator(
71        mut self,
72        spawn_coordinator: &'a Mutex<SpawnCoordinator>,
73    ) -> Self {
74        self.spawn_coordinator = Some(spawn_coordinator);
75        self
76    }
77
78    /// Attach a launcher used to satisfy verified backend registry misses.
79    pub fn with_backend_launcher(mut self, backend_launcher: &'a dyn BackendLauncher) -> Self {
80        self.backend_launcher = Some(backend_launcher);
81        self
82    }
83
84    /// Decode and route a framed Hello request.
85    pub fn handle_frame(&self, frame: Frame, peer: PeerIdentity) -> HelloReply {
86        match HelloRequest::decode(frame, peer) {
87            Ok(request) => self.handle_request(&request),
88            Err(refused) => refused_reply(refused),
89        }
90    }
91
92    /// Route a decoded Hello request.
93    pub fn handle_request(&self, request: &HelloRequest) -> HelloReply {
94        match self.route_request(request) {
95            Ok(registered) => match HelloHandler::new().with_backend(registered) {
96                Ok(handler) => handler.handle_request(request),
97                Err(err) => refused_reply(refused_from_handler_error(err)),
98            },
99            Err(refused) => refused_reply(refused),
100        }
101    }
102
103    fn route_request(&self, request: &HelloRequest) -> Result<RegisteredBackend, Refused> {
104        let service_definition = self
105            .service_definitions
106            .lookup_or_reload(&request.hello.service_name)
107            .map_err(refused_from_service_definition_error)?;
108
109        if let Err(block) =
110            check_version_allowed(&request.hello.wanted_version, &service_definition)
111        {
112            return Err(refused_from_version_policy(block));
113        }
114
115        let instance =
116            BrokerInstanceKey::from_service_definition(&service_definition).map_err(|err| {
117                refused(
118                    ErrorCode::ErrorInternal,
119                    format!("service isolation could not be resolved: {err}"),
120                    0,
121                )
122            })?;
123
124        if let Some(registered) = self.registered_backend_for(
125            &instance,
126            &service_definition,
127            &request.hello.wanted_version,
128        ) {
129            return Ok(registered);
130        }
131
132        let key = BackendKey::new(
133            instance,
134            request.hello.service_name.clone(),
135            request.hello.wanted_version.clone(),
136        );
137        let trace_context = request.trace_context();
138        self.launch_backend(&key, &service_definition, &trace_context)
139    }
140
141    fn registered_backend_for(
142        &self,
143        instance: &BrokerInstanceKey,
144        service_definition: &ServiceDefinition,
145        service_version: &str,
146    ) -> Option<RegisteredBackend> {
147        match self.backends {
148            BackendRegistryView::Static(registry) => {
149                registry.registered_backend_for(instance, service_definition, service_version)
150            }
151            BackendRegistryView::Live(registry) => {
152                let mut registry = registry
153                    .lock()
154                    .unwrap_or_else(|poisoned| poisoned.into_inner());
155                let _removed = registry.prune_stale();
156                registry.registered_backend_for(instance, service_definition, service_version)
157            }
158        }
159    }
160
161    fn launch_backend(
162        &self,
163        key: &BackendKey,
164        service_definition: &ServiceDefinition,
165        trace_context: &TraceContext,
166    ) -> Result<RegisteredBackend, Refused> {
167        self.begin_spawn(key.clone())?;
168
169        let Some(backend_launcher) = self.backend_launcher else {
170            self.finish_spawn(key, SpawnOutcome::Failed);
171            return Err(refused(
172                ErrorCode::ErrorBackendSpawnFailed,
173                "backend is not registered",
174                1_000,
175            ));
176        };
177
178        let request = BackendLaunchRequest {
179            key,
180            service_definition,
181            trace_context,
182        };
183        match backend_launcher.launch(&request) {
184            Ok(handle) => match self.register_launched_backend(key, service_definition, handle) {
185                Ok(registered) => {
186                    self.finish_spawn(key, SpawnOutcome::Success);
187                    Ok(registered)
188                }
189                Err(refused) => {
190                    self.finish_spawn(key, SpawnOutcome::Failed);
191                    Err(refused)
192                }
193            },
194            Err(err) => {
195                self.finish_spawn(key, SpawnOutcome::Failed);
196                Err(refused(
197                    ErrorCode::ErrorBackendSpawnFailed,
198                    format!("backend spawn failed: {err}"),
199                    1_000,
200                ))
201            }
202        }
203    }
204
205    fn begin_spawn(&self, key: BackendKey) -> Result<(), Refused> {
206        let Some(spawn_coordinator) = self.spawn_coordinator else {
207            return Ok(());
208        };
209
210        let now = Instant::now();
211        let mut coordinator = spawn_coordinator
212            .lock()
213            .unwrap_or_else(|poisoned| poisoned.into_inner());
214        match coordinator.try_begin(key.clone(), now) {
215            Ok(_) => Ok(()),
216            Err(SpawnBeginError::AlreadyInProgress) => Err(refused(
217                ErrorCode::ErrorRateLimited,
218                "backend spawn already in progress",
219                1_000,
220            )),
221            Err(SpawnBeginError::BudgetExhausted { retry_after, .. }) => Err(refused(
222                ErrorCode::ErrorRateLimited,
223                "backend spawn budget exhausted",
224                duration_to_retry_ms(retry_after),
225            )),
226        }
227    }
228
229    fn finish_spawn(&self, key: &BackendKey, outcome: SpawnOutcome) {
230        let Some(spawn_coordinator) = self.spawn_coordinator else {
231            return;
232        };
233
234        let mut coordinator = spawn_coordinator
235            .lock()
236            .unwrap_or_else(|poisoned| poisoned.into_inner());
237        coordinator.finish(key, outcome, Instant::now());
238    }
239
240    fn register_launched_backend(
241        &self,
242        key: &BackendKey,
243        service_definition: &ServiceDefinition,
244        handle: crate::broker::backend_handle::BackendHandle,
245    ) -> Result<RegisteredBackend, Refused> {
246        if handle.service_name != key.service_name || handle.service_version != key.service_version
247        {
248            return Err(refused(
249                ErrorCode::ErrorInternal,
250                "launched backend identity did not match request",
251                0,
252            ));
253        }
254
255        let registered = RegisteredBackend {
256            service_definition: service_definition.clone(),
257            daemon_version: handle.service_version.clone(),
258            backend_pipe: handle.daemon_process.ipc_endpoint.path.clone(),
259            server_capabilities: 0,
260        };
261
262        if let BackendRegistryView::Live(registry) = self.backends {
263            let mut registry = registry
264                .lock()
265                .unwrap_or_else(|poisoned| poisoned.into_inner());
266            registry.insert(key.instance.clone(), handle);
267        }
268
269        Ok(registered)
270    }
271}
272
273fn refused_from_service_definition_error(error: ServiceDefinitionError) -> Refused {
274    match error {
275        ServiceDefinitionError::InvalidName(_) => {
276            refused(ErrorCode::ErrorPeerRejected, "invalid service_name", 0)
277        }
278        ServiceDefinitionError::Io(err) if err.kind() == std::io::ErrorKind::NotFound => refused(
279            ErrorCode::ErrorServiceUnknown,
280            "service definition was not found",
281            0,
282        ),
283        other => refused(
284            ErrorCode::ErrorServiceUnknown,
285            format!("service definition could not be loaded: {other}"),
286            0,
287        ),
288    }
289}
290
291fn refused_from_version_policy(block: VersionPolicyBlock) -> Refused {
292    match block {
293        VersionPolicyBlock::BelowMinVersion => refused(
294            ErrorCode::ErrorVersionBlocked,
295            "wanted_version is below min_version",
296            30_000,
297        ),
298        VersionPolicyBlock::OutsideAllowList => refused(
299            ErrorCode::ErrorVersionBlocked,
300            "wanted_version is not in version_allow_list",
301            30_000,
302        ),
303    }
304}
305
306fn refused_from_handler_error(error: HelloHandlerError) -> Refused {
307    refused(
308        ErrorCode::ErrorInternal,
309        format!("registered backend could not be installed: {error}"),
310        0,
311    )
312}
313
314fn refused(code: ErrorCode, reason: impl Into<String>, retry_after_ms: u64) -> Refused {
315    Refused {
316        reason: reason.into(),
317        daemon_min_protocol: PROTOCOL_VERSION,
318        daemon_max_protocol: PROTOCOL_VERSION,
319        code: code as i32,
320        details: HashMap::new(),
321        retry_after_ms,
322    }
323}
324
325fn duration_to_retry_ms(duration: Duration) -> u64 {
326    let millis = duration.as_millis().max(1);
327    u64::try_from(millis).unwrap_or(u64::MAX)
328}
329
330fn refused_reply(refused: Refused) -> HelloReply {
331    HelloReply {
332        result: Some(HelloReplyResult::Refused(refused)),
333    }
334}
335
336#[cfg(test)]
337mod tests {
338    use std::fs;
339
340    use prost::Message;
341
342    use crate::broker::backend_handle::{BackendHandle, DaemonProcess};
343    use crate::broker::protocol::{
344        BrokerIsolation, Endpoint, FrameKind, Hello, PayloadEncoding, ServiceDefinition,
345    };
346    use crate::broker::server::{
347        ensure_service_definition_dir, service_definition_path, PeerIdentity,
348    };
349
350    use super::*;
351
352    fn service_definition() -> ServiceDefinition {
353        let exe = std::env::current_exe().unwrap();
354        let dir = exe.parent().unwrap().to_path_buf();
355        ServiceDefinition {
356            service_name: "zccache".into(),
357            binary_path: exe.to_string_lossy().into_owned(),
358            isolation: BrokerIsolation::SharedBroker as i32,
359            explicit_instance: String::new(),
360            per_version_binary_dir: dir.to_string_lossy().into_owned(),
361            min_version: "1.10.0".into(),
362            version_allow_list: vec!["1.11.20".into()],
363            labels: Default::default(),
364        }
365    }
366
367    fn service_dir_with_definition(definition: &ServiceDefinition) -> tempfile::TempDir {
368        let tmp = tempfile::tempdir().unwrap();
369        let root = tmp.path().join("services");
370        ensure_service_definition_dir(&root).unwrap();
371        fs::write(
372            service_definition_path(&root, "zccache").unwrap(),
373            definition.encode_to_vec(),
374        )
375        .unwrap();
376        tmp
377    }
378
379    fn request() -> HelloRequest {
380        let hello = Hello {
381            client_min_protocol: 1,
382            client_max_protocol: 1,
383            service_name: "zccache".into(),
384            wanted_version: "1.11.20".into(),
385            client_version: "zccache-cli/1.11.20".into(),
386            client_capabilities: 0,
387            auth_token: Vec::new(),
388            request_id: "req-live-prune".into(),
389            connection_id: 0,
390            peer_pid: 0,
391            client_lib_name: "running-process".into(),
392            client_lib_version: env!("CARGO_PKG_VERSION").into(),
393            peer_attestation_nonce: Vec::new(),
394            capability_token: Vec::new(),
395            client_keepalive_secs: 60,
396        };
397        HelloRequest {
398            frame: Frame {
399                envelope_version: 1,
400                kind: FrameKind::Request as i32,
401                payload_protocol: 0,
402                payload: hello.encode_to_vec(),
403                request_id: 1,
404                payload_encoding: PayloadEncoding::None as i32,
405                deadline_unix_ms: 0,
406                traceparent: String::new(),
407                tracestate: String::new(),
408            },
409            hello,
410            peer: PeerIdentity {
411                pid: 0,
412                uid_or_sid: "test-peer".into(),
413            },
414        }
415    }
416
417    fn stale_backend_handle() -> BackendHandle {
418        let endpoint = Endpoint {
419            namespace_id: "shared".into(),
420            path: "rpb-v1-test-stale-backend".into(),
421        };
422        let mut daemon = DaemonProcess::current_process(endpoint, Some(30)).unwrap();
423        daemon.pid = u32::MAX;
424        BackendHandle {
425            service_name: "zccache".into(),
426            service_version: "1.11.20".into(),
427            daemon_process: daemon,
428            #[cfg(unix)]
429            pid_handle: None,
430            #[cfg(windows)]
431            process_handle: None,
432        }
433    }
434
435    #[test]
436    fn live_registry_prunes_stale_backend_before_routing() {
437        let definition = service_definition();
438        let tmp = service_dir_with_definition(&definition);
439        let loader = ServiceDefinitionLoader::new(tmp.path().join("services"));
440        let mut registry = BackendRegistry::new();
441        registry.insert(BrokerInstanceKey::Shared, stale_backend_handle());
442        let registry = Mutex::new(registry);
443        let router = HelloRouter::with_lifecycle_monitor(&loader, &registry);
444
445        let reply = router.handle_request(&request());
446
447        assert!(registry.lock().unwrap().is_empty());
448        match reply.result.unwrap() {
449            HelloReplyResult::Refused(refused) => {
450                assert_eq!(
451                    ErrorCode::try_from(refused.code).unwrap(),
452                    ErrorCode::ErrorBackendSpawnFailed
453                );
454            }
455            HelloReplyResult::Negotiated(negotiated) => {
456                panic!("stale backend must not negotiate: {negotiated:?}")
457            }
458        }
459    }
460}