1use std::collections::HashMap;
10use std::sync::Mutex;
11use std::time::{Duration, Instant};
12
13use crate::broker::protocol::{
14 hello_reply::Result as HelloReplyResult, ErrorCode, Frame, HelloReply, Refused,
15 ServiceDefinition, PROTOCOL_VERSION,
16};
17use crate::broker::server::{
18 check_version_allowed, BackendKey, BackendLaunchRequest, BackendLauncher, BackendRegistry,
19 BrokerInstanceKey, HelloHandler, HelloHandlerError, HelloRequest, PeerIdentity,
20 RegisteredBackend, ServiceDefinitionError, ServiceDefinitionLoader, SpawnBeginError,
21 SpawnCoordinator, SpawnOutcome, TraceContext, VersionPolicyBlock,
22};
23
24#[derive(Clone, Copy)]
26pub struct HelloRouter<'a> {
27 service_definitions: &'a ServiceDefinitionLoader,
28 backends: BackendRegistryView<'a>,
29 spawn_coordinator: Option<&'a Mutex<SpawnCoordinator>>,
30 backend_launcher: Option<&'a dyn BackendLauncher>,
31}
32
33#[derive(Clone, Copy)]
34enum BackendRegistryView<'a> {
35 Static(&'a BackendRegistry),
36 Live(&'a Mutex<BackendRegistry>),
37}
38
39impl<'a> HelloRouter<'a> {
40 pub fn new(
42 service_definitions: &'a ServiceDefinitionLoader,
43 backends: &'a BackendRegistry,
44 ) -> Self {
45 Self {
46 service_definitions,
47 backends: BackendRegistryView::Static(backends),
48 spawn_coordinator: None,
49 backend_launcher: None,
50 }
51 }
52
53 pub fn with_lifecycle_monitor(
56 service_definitions: &'a ServiceDefinitionLoader,
57 backends: &'a Mutex<BackendRegistry>,
58 ) -> Self {
59 Self {
60 service_definitions,
61 backends: BackendRegistryView::Live(backends),
62 spawn_coordinator: None,
63 backend_launcher: None,
64 }
65 }
66
67 pub fn with_spawn_coordinator(
69 mut self,
70 spawn_coordinator: &'a Mutex<SpawnCoordinator>,
71 ) -> Self {
72 self.spawn_coordinator = Some(spawn_coordinator);
73 self
74 }
75
76 pub fn with_backend_launcher(mut self, backend_launcher: &'a dyn BackendLauncher) -> Self {
78 self.backend_launcher = Some(backend_launcher);
79 self
80 }
81
82 pub fn handle_frame(&self, frame: Frame, peer: PeerIdentity) -> HelloReply {
84 match HelloRequest::decode(frame, peer) {
85 Ok(request) => self.handle_request(&request),
86 Err(refused) => refused_reply(refused),
87 }
88 }
89
90 pub fn handle_request(&self, request: &HelloRequest) -> HelloReply {
92 match self.route_request(request) {
93 Ok(registered) => match HelloHandler::new().with_backend(registered) {
94 Ok(handler) => handler.handle_request(request),
95 Err(err) => refused_reply(refused_from_handler_error(err)),
96 },
97 Err(refused) => refused_reply(refused),
98 }
99 }
100
101 fn route_request(&self, request: &HelloRequest) -> Result<RegisteredBackend, Refused> {
102 let service_definition = self
103 .service_definitions
104 .lookup_or_reload(&request.hello.service_name)
105 .map_err(refused_from_service_definition_error)?;
106
107 if let Err(block) =
108 check_version_allowed(&request.hello.wanted_version, &service_definition)
109 {
110 return Err(refused_from_version_policy(block));
111 }
112
113 let instance =
114 BrokerInstanceKey::from_service_definition(&service_definition).map_err(|err| {
115 refused(
116 ErrorCode::ErrorInternal,
117 format!("service isolation could not be resolved: {err}"),
118 0,
119 )
120 })?;
121
122 if let Some(registered) = self.registered_backend_for(
123 &instance,
124 &service_definition,
125 &request.hello.wanted_version,
126 ) {
127 return Ok(registered);
128 }
129
130 let key = BackendKey::new(
131 instance,
132 request.hello.service_name.clone(),
133 request.hello.wanted_version.clone(),
134 );
135 let trace_context = request.trace_context();
136 self.launch_backend(&key, &service_definition, &trace_context)
137 }
138
139 fn registered_backend_for(
140 &self,
141 instance: &BrokerInstanceKey,
142 service_definition: &ServiceDefinition,
143 service_version: &str,
144 ) -> Option<RegisteredBackend> {
145 match self.backends {
146 BackendRegistryView::Static(registry) => {
147 registry.registered_backend_for(instance, service_definition, service_version)
148 }
149 BackendRegistryView::Live(registry) => {
150 let mut registry = registry
151 .lock()
152 .unwrap_or_else(|poisoned| poisoned.into_inner());
153 let _removed = registry.prune_stale();
154 registry.registered_backend_for(instance, service_definition, service_version)
155 }
156 }
157 }
158
159 fn launch_backend(
160 &self,
161 key: &BackendKey,
162 service_definition: &ServiceDefinition,
163 trace_context: &TraceContext,
164 ) -> Result<RegisteredBackend, Refused> {
165 self.begin_spawn(key.clone())?;
166
167 let Some(backend_launcher) = self.backend_launcher else {
168 self.finish_spawn(key, SpawnOutcome::Failed);
169 return Err(refused(
170 ErrorCode::ErrorBackendSpawnFailed,
171 "backend is not registered",
172 1_000,
173 ));
174 };
175
176 let request = BackendLaunchRequest {
177 key,
178 service_definition,
179 trace_context,
180 };
181 match backend_launcher.launch(&request) {
182 Ok(handle) => match self.register_launched_backend(key, service_definition, handle) {
183 Ok(registered) => {
184 self.finish_spawn(key, SpawnOutcome::Success);
185 Ok(registered)
186 }
187 Err(refused) => {
188 self.finish_spawn(key, SpawnOutcome::Failed);
189 Err(refused)
190 }
191 },
192 Err(err) => {
193 self.finish_spawn(key, SpawnOutcome::Failed);
194 Err(refused(
195 ErrorCode::ErrorBackendSpawnFailed,
196 format!("backend spawn failed: {err}"),
197 1_000,
198 ))
199 }
200 }
201 }
202
203 fn begin_spawn(&self, key: BackendKey) -> Result<(), Refused> {
204 let Some(spawn_coordinator) = self.spawn_coordinator else {
205 return Ok(());
206 };
207
208 let now = Instant::now();
209 let mut coordinator = spawn_coordinator
210 .lock()
211 .unwrap_or_else(|poisoned| poisoned.into_inner());
212 match coordinator.try_begin(key.clone(), now) {
213 Ok(_) => Ok(()),
214 Err(SpawnBeginError::AlreadyInProgress) => Err(refused(
215 ErrorCode::ErrorRateLimited,
216 "backend spawn already in progress",
217 1_000,
218 )),
219 Err(SpawnBeginError::BudgetExhausted { retry_after, .. }) => Err(refused(
220 ErrorCode::ErrorRateLimited,
221 "backend spawn budget exhausted",
222 duration_to_retry_ms(retry_after),
223 )),
224 }
225 }
226
227 fn finish_spawn(&self, key: &BackendKey, outcome: SpawnOutcome) {
228 let Some(spawn_coordinator) = self.spawn_coordinator else {
229 return;
230 };
231
232 let mut coordinator = spawn_coordinator
233 .lock()
234 .unwrap_or_else(|poisoned| poisoned.into_inner());
235 coordinator.finish(key, outcome, Instant::now());
236 }
237
238 fn register_launched_backend(
239 &self,
240 key: &BackendKey,
241 service_definition: &ServiceDefinition,
242 handle: crate::broker::backend_handle::BackendHandle,
243 ) -> Result<RegisteredBackend, Refused> {
244 if handle.service_name != key.service_name || handle.service_version != key.service_version
245 {
246 return Err(refused(
247 ErrorCode::ErrorInternal,
248 "launched backend identity did not match request",
249 0,
250 ));
251 }
252
253 let registered = RegisteredBackend {
254 service_definition: service_definition.clone(),
255 daemon_version: handle.service_version.clone(),
256 backend_pipe: handle.daemon_process.ipc_endpoint.path.clone(),
257 server_capabilities: 0,
258 };
259
260 if let BackendRegistryView::Live(registry) = self.backends {
261 let mut registry = registry
262 .lock()
263 .unwrap_or_else(|poisoned| poisoned.into_inner());
264 registry.insert(key.instance.clone(), handle);
265 }
266
267 Ok(registered)
268 }
269}
270
271fn refused_from_service_definition_error(error: ServiceDefinitionError) -> Refused {
272 match error {
273 ServiceDefinitionError::InvalidName(_) => {
274 refused(ErrorCode::ErrorPeerRejected, "invalid service_name", 0)
275 }
276 ServiceDefinitionError::Io(err) if err.kind() == std::io::ErrorKind::NotFound => refused(
277 ErrorCode::ErrorServiceUnknown,
278 "service definition was not found",
279 0,
280 ),
281 other => refused(
282 ErrorCode::ErrorServiceUnknown,
283 format!("service definition could not be loaded: {other}"),
284 0,
285 ),
286 }
287}
288
289fn refused_from_version_policy(block: VersionPolicyBlock) -> Refused {
290 match block {
291 VersionPolicyBlock::BelowMinVersion => refused(
292 ErrorCode::ErrorVersionBlocked,
293 "wanted_version is below min_version",
294 30_000,
295 ),
296 VersionPolicyBlock::OutsideAllowList => refused(
297 ErrorCode::ErrorVersionBlocked,
298 "wanted_version is not in version_allow_list",
299 30_000,
300 ),
301 }
302}
303
304fn refused_from_handler_error(error: HelloHandlerError) -> Refused {
305 refused(
306 ErrorCode::ErrorInternal,
307 format!("registered backend could not be installed: {error}"),
308 0,
309 )
310}
311
312fn refused(code: ErrorCode, reason: impl Into<String>, retry_after_ms: u64) -> Refused {
313 Refused {
314 reason: reason.into(),
315 daemon_min_protocol: PROTOCOL_VERSION,
316 daemon_max_protocol: PROTOCOL_VERSION,
317 code: code as i32,
318 details: HashMap::new(),
319 retry_after_ms,
320 }
321}
322
323fn duration_to_retry_ms(duration: Duration) -> u64 {
324 let millis = duration.as_millis().max(1);
325 u64::try_from(millis).unwrap_or(u64::MAX)
326}
327
328fn refused_reply(refused: Refused) -> HelloReply {
329 HelloReply {
330 result: Some(HelloReplyResult::Refused(refused)),
331 }
332}
333
334#[cfg(test)]
335mod tests {
336 use std::fs;
337
338 use prost::Message;
339
340 use crate::broker::backend_handle::{BackendHandle, DaemonProcess};
341 use crate::broker::protocol::{
342 BrokerIsolation, Endpoint, FrameKind, Hello, PayloadEncoding, ServiceDefinition,
343 };
344 use crate::broker::server::{
345 ensure_service_definition_dir, service_definition_path, PeerIdentity,
346 };
347
348 use super::*;
349
350 fn service_definition() -> ServiceDefinition {
351 let exe = std::env::current_exe().unwrap();
352 let dir = exe.parent().unwrap().to_path_buf();
353 ServiceDefinition {
354 service_name: "zccache".into(),
355 binary_path: exe.to_string_lossy().into_owned(),
356 isolation: BrokerIsolation::SharedBroker as i32,
357 explicit_instance: String::new(),
358 per_version_binary_dir: dir.to_string_lossy().into_owned(),
359 min_version: "1.10.0".into(),
360 version_allow_list: vec!["1.11.20".into()],
361 labels: Default::default(),
362 }
363 }
364
365 fn service_dir_with_definition(definition: &ServiceDefinition) -> tempfile::TempDir {
366 let tmp = tempfile::tempdir().unwrap();
367 let root = tmp.path().join("services");
368 ensure_service_definition_dir(&root).unwrap();
369 fs::write(
370 service_definition_path(&root, "zccache").unwrap(),
371 definition.encode_to_vec(),
372 )
373 .unwrap();
374 tmp
375 }
376
377 fn request() -> HelloRequest {
378 let hello = Hello {
379 client_min_protocol: 1,
380 client_max_protocol: 1,
381 service_name: "zccache".into(),
382 wanted_version: "1.11.20".into(),
383 client_version: "zccache-cli/1.11.20".into(),
384 client_capabilities: 0,
385 auth_token: Vec::new(),
386 request_id: "req-live-prune".into(),
387 connection_id: 0,
388 peer_pid: 0,
389 client_lib_name: "running-process".into(),
390 client_lib_version: env!("CARGO_PKG_VERSION").into(),
391 peer_attestation_nonce: Vec::new(),
392 capability_token: Vec::new(),
393 client_keepalive_secs: 60,
394 };
395 HelloRequest {
396 frame: Frame {
397 envelope_version: 1,
398 kind: FrameKind::Request as i32,
399 payload_protocol: 0,
400 payload: hello.encode_to_vec(),
401 request_id: 1,
402 payload_encoding: PayloadEncoding::None as i32,
403 deadline_unix_ms: 0,
404 traceparent: String::new(),
405 tracestate: String::new(),
406 },
407 hello,
408 peer: PeerIdentity {
409 pid: 0,
410 uid_or_sid: "test-peer".into(),
411 },
412 }
413 }
414
415 fn stale_backend_handle() -> BackendHandle {
416 let endpoint = Endpoint {
417 namespace_id: "shared".into(),
418 path: "rpb-v1-test-stale-backend".into(),
419 };
420 let mut daemon = DaemonProcess::current_process(endpoint, Some(30)).unwrap();
421 daemon.pid = u32::MAX;
422 BackendHandle {
423 service_name: "zccache".into(),
424 service_version: "1.11.20".into(),
425 daemon_process: daemon,
426 #[cfg(unix)]
427 pid_handle: None,
428 #[cfg(windows)]
429 process_handle: None,
430 }
431 }
432
433 #[test]
434 fn live_registry_prunes_stale_backend_before_routing() {
435 let definition = service_definition();
436 let tmp = service_dir_with_definition(&definition);
437 let loader = ServiceDefinitionLoader::new(tmp.path().join("services"));
438 let mut registry = BackendRegistry::new();
439 registry.insert(BrokerInstanceKey::Shared, stale_backend_handle());
440 let registry = Mutex::new(registry);
441 let router = HelloRouter::with_lifecycle_monitor(&loader, ®istry);
442
443 let reply = router.handle_request(&request());
444
445 assert!(registry.lock().unwrap().is_empty());
446 match reply.result.unwrap() {
447 HelloReplyResult::Refused(refused) => {
448 assert_eq!(
449 ErrorCode::try_from(refused.code).unwrap(),
450 ErrorCode::ErrorBackendSpawnFailed
451 );
452 }
453 HelloReplyResult::Negotiated(negotiated) => {
454 panic!("stale backend must not negotiate: {negotiated:?}")
455 }
456 }
457 }
458}