Skip to main content

running_process/broker/server/
hello_router.rs

1//! Service-definition-backed Hello routing.
2//!
3//! `HelloHandler` owns deterministic validation and in-memory negotiation.
4//! `HelloRouter` adds the broker-facing lookup layer: reload the service
5//! definition for each request, resolve the trust-domain instance, query the
6//! backend registry, and then delegate the final reply construction to
7//! `HelloHandler`.
8
9use std::collections::HashMap;
10use std::sync::Mutex;
11use std::time::{Duration, Instant};
12
13use crate::broker::protocol::{
14    hello_reply::Result as HelloReplyResult, ErrorCode, Frame, HelloReply, Refused,
15    ServiceDefinition, PROTOCOL_VERSION,
16};
17use crate::broker::server::{
18    check_version_allowed, BackendKey, BackendLaunchRequest, BackendLauncher, BackendRegistry,
19    BrokerInstanceKey, HelloHandler, HelloHandlerError, HelloRequest, PeerIdentity,
20    RegisteredBackend, ServiceDefinitionError, ServiceDefinitionLoader, SpawnBeginError,
21    SpawnCoordinator, SpawnOutcome, TraceContext, VersionPolicyBlock,
22};
23
24/// Routes decoded Hello requests through service definitions and backend state.
25#[derive(Clone, Copy)]
26pub struct HelloRouter<'a> {
27    service_definitions: &'a ServiceDefinitionLoader,
28    backends: BackendRegistryView<'a>,
29    spawn_coordinator: Option<&'a Mutex<SpawnCoordinator>>,
30    backend_launcher: Option<&'a dyn BackendLauncher>,
31}
32
33#[derive(Clone, Copy)]
34enum BackendRegistryView<'a> {
35    Static(&'a BackendRegistry),
36    Live(&'a Mutex<BackendRegistry>),
37}
38
39impl<'a> HelloRouter<'a> {
40    /// Create a router over immutable broker state.
41    pub fn new(
42        service_definitions: &'a ServiceDefinitionLoader,
43        backends: &'a BackendRegistry,
44    ) -> Self {
45        Self {
46            service_definitions,
47            backends: BackendRegistryView::Static(backends),
48            spawn_coordinator: None,
49            backend_launcher: None,
50        }
51    }
52
53    /// Create a router over live broker state that prunes stale backend handles
54    /// before each registry lookup.
55    pub fn with_lifecycle_monitor(
56        service_definitions: &'a ServiceDefinitionLoader,
57        backends: &'a Mutex<BackendRegistry>,
58    ) -> Self {
59        Self {
60            service_definitions,
61            backends: BackendRegistryView::Live(backends),
62            spawn_coordinator: None,
63            backend_launcher: None,
64        }
65    }
66
67    /// Attach spawn-budget coordination for backend registry misses.
68    pub fn with_spawn_coordinator(
69        mut self,
70        spawn_coordinator: &'a Mutex<SpawnCoordinator>,
71    ) -> Self {
72        self.spawn_coordinator = Some(spawn_coordinator);
73        self
74    }
75
76    /// Attach a launcher used to satisfy verified backend registry misses.
77    pub fn with_backend_launcher(mut self, backend_launcher: &'a dyn BackendLauncher) -> Self {
78        self.backend_launcher = Some(backend_launcher);
79        self
80    }
81
82    /// Decode and route a framed Hello request.
83    pub fn handle_frame(&self, frame: Frame, peer: PeerIdentity) -> HelloReply {
84        match HelloRequest::decode(frame, peer) {
85            Ok(request) => self.handle_request(&request),
86            Err(refused) => refused_reply(refused),
87        }
88    }
89
90    /// Route a decoded Hello request.
91    pub fn handle_request(&self, request: &HelloRequest) -> HelloReply {
92        match self.route_request(request) {
93            Ok(registered) => match HelloHandler::new().with_backend(registered) {
94                Ok(handler) => handler.handle_request(request),
95                Err(err) => refused_reply(refused_from_handler_error(err)),
96            },
97            Err(refused) => refused_reply(refused),
98        }
99    }
100
101    fn route_request(&self, request: &HelloRequest) -> Result<RegisteredBackend, Refused> {
102        let service_definition = self
103            .service_definitions
104            .lookup_or_reload(&request.hello.service_name)
105            .map_err(refused_from_service_definition_error)?;
106
107        if let Err(block) =
108            check_version_allowed(&request.hello.wanted_version, &service_definition)
109        {
110            return Err(refused_from_version_policy(block));
111        }
112
113        let instance =
114            BrokerInstanceKey::from_service_definition(&service_definition).map_err(|err| {
115                refused(
116                    ErrorCode::ErrorInternal,
117                    format!("service isolation could not be resolved: {err}"),
118                    0,
119                )
120            })?;
121
122        if let Some(registered) = self.registered_backend_for(
123            &instance,
124            &service_definition,
125            &request.hello.wanted_version,
126        ) {
127            return Ok(registered);
128        }
129
130        let key = BackendKey::new(
131            instance,
132            request.hello.service_name.clone(),
133            request.hello.wanted_version.clone(),
134        );
135        let trace_context = request.trace_context();
136        self.launch_backend(&key, &service_definition, &trace_context)
137    }
138
139    fn registered_backend_for(
140        &self,
141        instance: &BrokerInstanceKey,
142        service_definition: &ServiceDefinition,
143        service_version: &str,
144    ) -> Option<RegisteredBackend> {
145        match self.backends {
146            BackendRegistryView::Static(registry) => {
147                registry.registered_backend_for(instance, service_definition, service_version)
148            }
149            BackendRegistryView::Live(registry) => {
150                let mut registry = registry
151                    .lock()
152                    .unwrap_or_else(|poisoned| poisoned.into_inner());
153                let _removed = registry.prune_stale();
154                registry.registered_backend_for(instance, service_definition, service_version)
155            }
156        }
157    }
158
159    fn launch_backend(
160        &self,
161        key: &BackendKey,
162        service_definition: &ServiceDefinition,
163        trace_context: &TraceContext,
164    ) -> Result<RegisteredBackend, Refused> {
165        self.begin_spawn(key.clone())?;
166
167        let Some(backend_launcher) = self.backend_launcher else {
168            self.finish_spawn(key, SpawnOutcome::Failed);
169            return Err(refused(
170                ErrorCode::ErrorBackendSpawnFailed,
171                "backend is not registered",
172                1_000,
173            ));
174        };
175
176        let request = BackendLaunchRequest {
177            key,
178            service_definition,
179            trace_context,
180        };
181        match backend_launcher.launch(&request) {
182            Ok(handle) => match self.register_launched_backend(key, service_definition, handle) {
183                Ok(registered) => {
184                    self.finish_spawn(key, SpawnOutcome::Success);
185                    Ok(registered)
186                }
187                Err(refused) => {
188                    self.finish_spawn(key, SpawnOutcome::Failed);
189                    Err(refused)
190                }
191            },
192            Err(err) => {
193                self.finish_spawn(key, SpawnOutcome::Failed);
194                Err(refused(
195                    ErrorCode::ErrorBackendSpawnFailed,
196                    format!("backend spawn failed: {err}"),
197                    1_000,
198                ))
199            }
200        }
201    }
202
203    fn begin_spawn(&self, key: BackendKey) -> Result<(), Refused> {
204        let Some(spawn_coordinator) = self.spawn_coordinator else {
205            return Ok(());
206        };
207
208        let now = Instant::now();
209        let mut coordinator = spawn_coordinator
210            .lock()
211            .unwrap_or_else(|poisoned| poisoned.into_inner());
212        match coordinator.try_begin(key.clone(), now) {
213            Ok(_) => Ok(()),
214            Err(SpawnBeginError::AlreadyInProgress) => Err(refused(
215                ErrorCode::ErrorRateLimited,
216                "backend spawn already in progress",
217                1_000,
218            )),
219            Err(SpawnBeginError::BudgetExhausted { retry_after, .. }) => Err(refused(
220                ErrorCode::ErrorRateLimited,
221                "backend spawn budget exhausted",
222                duration_to_retry_ms(retry_after),
223            )),
224        }
225    }
226
227    fn finish_spawn(&self, key: &BackendKey, outcome: SpawnOutcome) {
228        let Some(spawn_coordinator) = self.spawn_coordinator else {
229            return;
230        };
231
232        let mut coordinator = spawn_coordinator
233            .lock()
234            .unwrap_or_else(|poisoned| poisoned.into_inner());
235        coordinator.finish(key, outcome, Instant::now());
236    }
237
238    fn register_launched_backend(
239        &self,
240        key: &BackendKey,
241        service_definition: &ServiceDefinition,
242        handle: crate::broker::backend_handle::BackendHandle,
243    ) -> Result<RegisteredBackend, Refused> {
244        if handle.service_name != key.service_name || handle.service_version != key.service_version
245        {
246            return Err(refused(
247                ErrorCode::ErrorInternal,
248                "launched backend identity did not match request",
249                0,
250            ));
251        }
252
253        let registered = RegisteredBackend {
254            service_definition: service_definition.clone(),
255            daemon_version: handle.service_version.clone(),
256            backend_pipe: handle.daemon_process.ipc_endpoint.path.clone(),
257            server_capabilities: 0,
258        };
259
260        if let BackendRegistryView::Live(registry) = self.backends {
261            let mut registry = registry
262                .lock()
263                .unwrap_or_else(|poisoned| poisoned.into_inner());
264            registry.insert(key.instance.clone(), handle);
265        }
266
267        Ok(registered)
268    }
269}
270
271fn refused_from_service_definition_error(error: ServiceDefinitionError) -> Refused {
272    match error {
273        ServiceDefinitionError::InvalidName(_) => {
274            refused(ErrorCode::ErrorPeerRejected, "invalid service_name", 0)
275        }
276        ServiceDefinitionError::Io(err) if err.kind() == std::io::ErrorKind::NotFound => refused(
277            ErrorCode::ErrorServiceUnknown,
278            "service definition was not found",
279            0,
280        ),
281        other => refused(
282            ErrorCode::ErrorServiceUnknown,
283            format!("service definition could not be loaded: {other}"),
284            0,
285        ),
286    }
287}
288
289fn refused_from_version_policy(block: VersionPolicyBlock) -> Refused {
290    match block {
291        VersionPolicyBlock::BelowMinVersion => refused(
292            ErrorCode::ErrorVersionBlocked,
293            "wanted_version is below min_version",
294            30_000,
295        ),
296        VersionPolicyBlock::OutsideAllowList => refused(
297            ErrorCode::ErrorVersionBlocked,
298            "wanted_version is not in version_allow_list",
299            30_000,
300        ),
301    }
302}
303
304fn refused_from_handler_error(error: HelloHandlerError) -> Refused {
305    refused(
306        ErrorCode::ErrorInternal,
307        format!("registered backend could not be installed: {error}"),
308        0,
309    )
310}
311
312fn refused(code: ErrorCode, reason: impl Into<String>, retry_after_ms: u64) -> Refused {
313    Refused {
314        reason: reason.into(),
315        daemon_min_protocol: PROTOCOL_VERSION,
316        daemon_max_protocol: PROTOCOL_VERSION,
317        code: code as i32,
318        details: HashMap::new(),
319        retry_after_ms,
320    }
321}
322
323fn duration_to_retry_ms(duration: Duration) -> u64 {
324    let millis = duration.as_millis().max(1);
325    u64::try_from(millis).unwrap_or(u64::MAX)
326}
327
328fn refused_reply(refused: Refused) -> HelloReply {
329    HelloReply {
330        result: Some(HelloReplyResult::Refused(refused)),
331    }
332}
333
334#[cfg(test)]
335mod tests {
336    use std::fs;
337
338    use prost::Message;
339
340    use crate::broker::backend_handle::{BackendHandle, DaemonProcess};
341    use crate::broker::protocol::{
342        BrokerIsolation, Endpoint, FrameKind, Hello, PayloadEncoding, ServiceDefinition,
343    };
344    use crate::broker::server::{
345        ensure_service_definition_dir, service_definition_path, PeerIdentity,
346    };
347
348    use super::*;
349
350    fn service_definition() -> ServiceDefinition {
351        let exe = std::env::current_exe().unwrap();
352        let dir = exe.parent().unwrap().to_path_buf();
353        ServiceDefinition {
354            service_name: "zccache".into(),
355            binary_path: exe.to_string_lossy().into_owned(),
356            isolation: BrokerIsolation::SharedBroker as i32,
357            explicit_instance: String::new(),
358            per_version_binary_dir: dir.to_string_lossy().into_owned(),
359            min_version: "1.10.0".into(),
360            version_allow_list: vec!["1.11.20".into()],
361            labels: Default::default(),
362        }
363    }
364
365    fn service_dir_with_definition(definition: &ServiceDefinition) -> tempfile::TempDir {
366        let tmp = tempfile::tempdir().unwrap();
367        let root = tmp.path().join("services");
368        ensure_service_definition_dir(&root).unwrap();
369        fs::write(
370            service_definition_path(&root, "zccache").unwrap(),
371            definition.encode_to_vec(),
372        )
373        .unwrap();
374        tmp
375    }
376
377    fn request() -> HelloRequest {
378        let hello = Hello {
379            client_min_protocol: 1,
380            client_max_protocol: 1,
381            service_name: "zccache".into(),
382            wanted_version: "1.11.20".into(),
383            client_version: "zccache-cli/1.11.20".into(),
384            client_capabilities: 0,
385            auth_token: Vec::new(),
386            request_id: "req-live-prune".into(),
387            connection_id: 0,
388            peer_pid: 0,
389            client_lib_name: "running-process".into(),
390            client_lib_version: env!("CARGO_PKG_VERSION").into(),
391            peer_attestation_nonce: Vec::new(),
392            capability_token: Vec::new(),
393            client_keepalive_secs: 60,
394        };
395        HelloRequest {
396            frame: Frame {
397                envelope_version: 1,
398                kind: FrameKind::Request as i32,
399                payload_protocol: 0,
400                payload: hello.encode_to_vec(),
401                request_id: 1,
402                payload_encoding: PayloadEncoding::None as i32,
403                deadline_unix_ms: 0,
404                traceparent: String::new(),
405                tracestate: String::new(),
406            },
407            hello,
408            peer: PeerIdentity {
409                pid: 0,
410                uid_or_sid: "test-peer".into(),
411            },
412        }
413    }
414
415    fn stale_backend_handle() -> BackendHandle {
416        let endpoint = Endpoint {
417            namespace_id: "shared".into(),
418            path: "rpb-v1-test-stale-backend".into(),
419        };
420        let mut daemon = DaemonProcess::current_process(endpoint, Some(30)).unwrap();
421        daemon.pid = u32::MAX;
422        BackendHandle {
423            service_name: "zccache".into(),
424            service_version: "1.11.20".into(),
425            daemon_process: daemon,
426            #[cfg(unix)]
427            pid_handle: None,
428            #[cfg(windows)]
429            process_handle: None,
430        }
431    }
432
433    #[test]
434    fn live_registry_prunes_stale_backend_before_routing() {
435        let definition = service_definition();
436        let tmp = service_dir_with_definition(&definition);
437        let loader = ServiceDefinitionLoader::new(tmp.path().join("services"));
438        let mut registry = BackendRegistry::new();
439        registry.insert(BrokerInstanceKey::Shared, stale_backend_handle());
440        let registry = Mutex::new(registry);
441        let router = HelloRouter::with_lifecycle_monitor(&loader, &registry);
442
443        let reply = router.handle_request(&request());
444
445        assert!(registry.lock().unwrap().is_empty());
446        match reply.result.unwrap() {
447            HelloReplyResult::Refused(refused) => {
448                assert_eq!(
449                    ErrorCode::try_from(refused.code).unwrap(),
450                    ErrorCode::ErrorBackendSpawnFailed
451                );
452            }
453            HelloReplyResult::Negotiated(negotiated) => {
454                panic!("stale backend must not negotiate: {negotiated:?}")
455            }
456        }
457    }
458}