1use std::collections::HashMap;
10use std::sync::Mutex;
11use std::time::{Duration, Instant};
12
13use crate::broker::protocol::{
14 hello_reply::Result as HelloReplyResult, ErrorCode, Frame, HelloReply, Refused,
15 ServiceDefinition,
16};
17use crate::broker::server::{
18 check_version_allowed, BackendKey, BackendLaunchRequest, BackendLauncher, BackendRegistry,
19 BrokerInstanceKey, HelloHandler, HelloHandlerError, HelloRequest, PeerIdentity,
20 RegisteredBackend, ServiceDefinitionError, ServiceDefinitionLoader, SpawnBeginError,
21 SpawnCoordinator, SpawnOutcome, TraceContext, VersionPolicyBlock,
22};
23
24const PROTOCOL_VERSION: u32 = 1;
25
26#[derive(Clone, Copy)]
28pub struct HelloRouter<'a> {
29 service_definitions: &'a ServiceDefinitionLoader,
30 backends: BackendRegistryView<'a>,
31 spawn_coordinator: Option<&'a Mutex<SpawnCoordinator>>,
32 backend_launcher: Option<&'a dyn BackendLauncher>,
33}
34
35#[derive(Clone, Copy)]
36enum BackendRegistryView<'a> {
37 Static(&'a BackendRegistry),
38 Live(&'a Mutex<BackendRegistry>),
39}
40
41impl<'a> HelloRouter<'a> {
42 pub fn new(
44 service_definitions: &'a ServiceDefinitionLoader,
45 backends: &'a BackendRegistry,
46 ) -> Self {
47 Self {
48 service_definitions,
49 backends: BackendRegistryView::Static(backends),
50 spawn_coordinator: None,
51 backend_launcher: None,
52 }
53 }
54
55 pub fn with_lifecycle_monitor(
58 service_definitions: &'a ServiceDefinitionLoader,
59 backends: &'a Mutex<BackendRegistry>,
60 ) -> Self {
61 Self {
62 service_definitions,
63 backends: BackendRegistryView::Live(backends),
64 spawn_coordinator: None,
65 backend_launcher: None,
66 }
67 }
68
69 pub fn with_spawn_coordinator(
71 mut self,
72 spawn_coordinator: &'a Mutex<SpawnCoordinator>,
73 ) -> Self {
74 self.spawn_coordinator = Some(spawn_coordinator);
75 self
76 }
77
78 pub fn with_backend_launcher(mut self, backend_launcher: &'a dyn BackendLauncher) -> Self {
80 self.backend_launcher = Some(backend_launcher);
81 self
82 }
83
84 pub fn handle_frame(&self, frame: Frame, peer: PeerIdentity) -> HelloReply {
86 match HelloRequest::decode(frame, peer) {
87 Ok(request) => self.handle_request(&request),
88 Err(refused) => refused_reply(refused),
89 }
90 }
91
92 pub fn handle_request(&self, request: &HelloRequest) -> HelloReply {
94 match self.route_request(request) {
95 Ok(registered) => match HelloHandler::new().with_backend(registered) {
96 Ok(handler) => handler.handle_request(request),
97 Err(err) => refused_reply(refused_from_handler_error(err)),
98 },
99 Err(refused) => refused_reply(refused),
100 }
101 }
102
103 fn route_request(&self, request: &HelloRequest) -> Result<RegisteredBackend, Refused> {
104 let service_definition = self
105 .service_definitions
106 .lookup_or_reload(&request.hello.service_name)
107 .map_err(refused_from_service_definition_error)?;
108
109 if let Err(block) =
110 check_version_allowed(&request.hello.wanted_version, &service_definition)
111 {
112 return Err(refused_from_version_policy(block));
113 }
114
115 let instance =
116 BrokerInstanceKey::from_service_definition(&service_definition).map_err(|err| {
117 refused(
118 ErrorCode::ErrorInternal,
119 format!("service isolation could not be resolved: {err}"),
120 0,
121 )
122 })?;
123
124 if let Some(registered) = self.registered_backend_for(
125 &instance,
126 &service_definition,
127 &request.hello.wanted_version,
128 ) {
129 return Ok(registered);
130 }
131
132 let key = BackendKey::new(
133 instance,
134 request.hello.service_name.clone(),
135 request.hello.wanted_version.clone(),
136 );
137 let trace_context = request.trace_context();
138 self.launch_backend(&key, &service_definition, &trace_context)
139 }
140
141 fn registered_backend_for(
142 &self,
143 instance: &BrokerInstanceKey,
144 service_definition: &ServiceDefinition,
145 service_version: &str,
146 ) -> Option<RegisteredBackend> {
147 match self.backends {
148 BackendRegistryView::Static(registry) => {
149 registry.registered_backend_for(instance, service_definition, service_version)
150 }
151 BackendRegistryView::Live(registry) => {
152 let mut registry = registry
153 .lock()
154 .unwrap_or_else(|poisoned| poisoned.into_inner());
155 let _removed = registry.prune_stale();
156 registry.registered_backend_for(instance, service_definition, service_version)
157 }
158 }
159 }
160
161 fn launch_backend(
162 &self,
163 key: &BackendKey,
164 service_definition: &ServiceDefinition,
165 trace_context: &TraceContext,
166 ) -> Result<RegisteredBackend, Refused> {
167 self.begin_spawn(key.clone())?;
168
169 let Some(backend_launcher) = self.backend_launcher else {
170 self.finish_spawn(key, SpawnOutcome::Failed);
171 return Err(refused(
172 ErrorCode::ErrorBackendSpawnFailed,
173 "backend is not registered",
174 1_000,
175 ));
176 };
177
178 let request = BackendLaunchRequest {
179 key,
180 service_definition,
181 trace_context,
182 };
183 match backend_launcher.launch(&request) {
184 Ok(handle) => match self.register_launched_backend(key, service_definition, handle) {
185 Ok(registered) => {
186 self.finish_spawn(key, SpawnOutcome::Success);
187 Ok(registered)
188 }
189 Err(refused) => {
190 self.finish_spawn(key, SpawnOutcome::Failed);
191 Err(refused)
192 }
193 },
194 Err(err) => {
195 self.finish_spawn(key, SpawnOutcome::Failed);
196 Err(refused(
197 ErrorCode::ErrorBackendSpawnFailed,
198 format!("backend spawn failed: {err}"),
199 1_000,
200 ))
201 }
202 }
203 }
204
205 fn begin_spawn(&self, key: BackendKey) -> Result<(), Refused> {
206 let Some(spawn_coordinator) = self.spawn_coordinator else {
207 return Ok(());
208 };
209
210 let now = Instant::now();
211 let mut coordinator = spawn_coordinator
212 .lock()
213 .unwrap_or_else(|poisoned| poisoned.into_inner());
214 match coordinator.try_begin(key.clone(), now) {
215 Ok(_) => Ok(()),
216 Err(SpawnBeginError::AlreadyInProgress) => Err(refused(
217 ErrorCode::ErrorRateLimited,
218 "backend spawn already in progress",
219 1_000,
220 )),
221 Err(SpawnBeginError::BudgetExhausted { retry_after, .. }) => Err(refused(
222 ErrorCode::ErrorRateLimited,
223 "backend spawn budget exhausted",
224 duration_to_retry_ms(retry_after),
225 )),
226 }
227 }
228
229 fn finish_spawn(&self, key: &BackendKey, outcome: SpawnOutcome) {
230 let Some(spawn_coordinator) = self.spawn_coordinator else {
231 return;
232 };
233
234 let mut coordinator = spawn_coordinator
235 .lock()
236 .unwrap_or_else(|poisoned| poisoned.into_inner());
237 coordinator.finish(key, outcome, Instant::now());
238 }
239
240 fn register_launched_backend(
241 &self,
242 key: &BackendKey,
243 service_definition: &ServiceDefinition,
244 handle: crate::broker::backend_handle::BackendHandle,
245 ) -> Result<RegisteredBackend, Refused> {
246 if handle.service_name != key.service_name || handle.service_version != key.service_version
247 {
248 return Err(refused(
249 ErrorCode::ErrorInternal,
250 "launched backend identity did not match request",
251 0,
252 ));
253 }
254
255 let registered = RegisteredBackend {
256 service_definition: service_definition.clone(),
257 daemon_version: handle.service_version.clone(),
258 backend_pipe: handle.daemon_process.ipc_endpoint.path.clone(),
259 server_capabilities: 0,
260 };
261
262 if let BackendRegistryView::Live(registry) = self.backends {
263 let mut registry = registry
264 .lock()
265 .unwrap_or_else(|poisoned| poisoned.into_inner());
266 registry.insert(key.instance.clone(), handle);
267 }
268
269 Ok(registered)
270 }
271}
272
273fn refused_from_service_definition_error(error: ServiceDefinitionError) -> Refused {
274 match error {
275 ServiceDefinitionError::InvalidName(_) => {
276 refused(ErrorCode::ErrorPeerRejected, "invalid service_name", 0)
277 }
278 ServiceDefinitionError::Io(err) if err.kind() == std::io::ErrorKind::NotFound => refused(
279 ErrorCode::ErrorServiceUnknown,
280 "service definition was not found",
281 0,
282 ),
283 other => refused(
284 ErrorCode::ErrorServiceUnknown,
285 format!("service definition could not be loaded: {other}"),
286 0,
287 ),
288 }
289}
290
291fn refused_from_version_policy(block: VersionPolicyBlock) -> Refused {
292 match block {
293 VersionPolicyBlock::BelowMinVersion => refused(
294 ErrorCode::ErrorVersionBlocked,
295 "wanted_version is below min_version",
296 30_000,
297 ),
298 VersionPolicyBlock::OutsideAllowList => refused(
299 ErrorCode::ErrorVersionBlocked,
300 "wanted_version is not in version_allow_list",
301 30_000,
302 ),
303 }
304}
305
306fn refused_from_handler_error(error: HelloHandlerError) -> Refused {
307 refused(
308 ErrorCode::ErrorInternal,
309 format!("registered backend could not be installed: {error}"),
310 0,
311 )
312}
313
314fn refused(code: ErrorCode, reason: impl Into<String>, retry_after_ms: u64) -> Refused {
315 Refused {
316 reason: reason.into(),
317 daemon_min_protocol: PROTOCOL_VERSION,
318 daemon_max_protocol: PROTOCOL_VERSION,
319 code: code as i32,
320 details: HashMap::new(),
321 retry_after_ms,
322 }
323}
324
325fn duration_to_retry_ms(duration: Duration) -> u64 {
326 let millis = duration.as_millis().max(1);
327 u64::try_from(millis).unwrap_or(u64::MAX)
328}
329
330fn refused_reply(refused: Refused) -> HelloReply {
331 HelloReply {
332 result: Some(HelloReplyResult::Refused(refused)),
333 }
334}
335
336#[cfg(test)]
337mod tests {
338 use std::fs;
339
340 use prost::Message;
341
342 use crate::broker::backend_handle::{BackendHandle, DaemonProcess};
343 use crate::broker::protocol::{
344 BrokerIsolation, Endpoint, FrameKind, Hello, PayloadEncoding, ServiceDefinition,
345 };
346 use crate::broker::server::{
347 ensure_service_definition_dir, service_definition_path, PeerIdentity,
348 };
349
350 use super::*;
351
352 fn service_definition() -> ServiceDefinition {
353 let exe = std::env::current_exe().unwrap();
354 let dir = exe.parent().unwrap().to_path_buf();
355 ServiceDefinition {
356 service_name: "zccache".into(),
357 binary_path: exe.to_string_lossy().into_owned(),
358 isolation: BrokerIsolation::SharedBroker as i32,
359 explicit_instance: String::new(),
360 per_version_binary_dir: dir.to_string_lossy().into_owned(),
361 min_version: "1.10.0".into(),
362 version_allow_list: vec!["1.11.20".into()],
363 labels: Default::default(),
364 }
365 }
366
367 fn service_dir_with_definition(definition: &ServiceDefinition) -> tempfile::TempDir {
368 let tmp = tempfile::tempdir().unwrap();
369 let root = tmp.path().join("services");
370 ensure_service_definition_dir(&root).unwrap();
371 fs::write(
372 service_definition_path(&root, "zccache").unwrap(),
373 definition.encode_to_vec(),
374 )
375 .unwrap();
376 tmp
377 }
378
379 fn request() -> HelloRequest {
380 let hello = Hello {
381 client_min_protocol: 1,
382 client_max_protocol: 1,
383 service_name: "zccache".into(),
384 wanted_version: "1.11.20".into(),
385 client_version: "zccache-cli/1.11.20".into(),
386 client_capabilities: 0,
387 auth_token: Vec::new(),
388 request_id: "req-live-prune".into(),
389 connection_id: 0,
390 peer_pid: 0,
391 client_lib_name: "running-process".into(),
392 client_lib_version: env!("CARGO_PKG_VERSION").into(),
393 peer_attestation_nonce: Vec::new(),
394 capability_token: Vec::new(),
395 client_keepalive_secs: 60,
396 };
397 HelloRequest {
398 frame: Frame {
399 envelope_version: 1,
400 kind: FrameKind::Request as i32,
401 payload_protocol: 0,
402 payload: hello.encode_to_vec(),
403 request_id: 1,
404 payload_encoding: PayloadEncoding::None as i32,
405 deadline_unix_ms: 0,
406 traceparent: String::new(),
407 tracestate: String::new(),
408 },
409 hello,
410 peer: PeerIdentity {
411 pid: 0,
412 uid_or_sid: "test-peer".into(),
413 },
414 }
415 }
416
417 fn stale_backend_handle() -> BackendHandle {
418 let endpoint = Endpoint {
419 namespace_id: "shared".into(),
420 path: "rpb-v1-test-stale-backend".into(),
421 };
422 let mut daemon = DaemonProcess::current_process(endpoint, Some(30)).unwrap();
423 daemon.pid = u32::MAX;
424 BackendHandle {
425 service_name: "zccache".into(),
426 service_version: "1.11.20".into(),
427 daemon_process: daemon,
428 #[cfg(unix)]
429 pid_handle: None,
430 #[cfg(windows)]
431 process_handle: None,
432 }
433 }
434
435 #[test]
436 fn live_registry_prunes_stale_backend_before_routing() {
437 let definition = service_definition();
438 let tmp = service_dir_with_definition(&definition);
439 let loader = ServiceDefinitionLoader::new(tmp.path().join("services"));
440 let mut registry = BackendRegistry::new();
441 registry.insert(BrokerInstanceKey::Shared, stale_backend_handle());
442 let registry = Mutex::new(registry);
443 let router = HelloRouter::with_lifecycle_monitor(&loader, ®istry);
444
445 let reply = router.handle_request(&request());
446
447 assert!(registry.lock().unwrap().is_empty());
448 match reply.result.unwrap() {
449 HelloReplyResult::Refused(refused) => {
450 assert_eq!(
451 ErrorCode::try_from(refused.code).unwrap(),
452 ErrorCode::ErrorBackendSpawnFailed
453 );
454 }
455 HelloReplyResult::Negotiated(negotiated) => {
456 panic!("stale backend must not negotiate: {negotiated:?}")
457 }
458 }
459 }
460}