running_process/broker/server/
backend_endpoint_allocator.rs1use std::collections::HashSet;
9
10use crate::broker::lifecycle::names::{backend_pipe, PipePath, PipePathError};
11use crate::broker::protocol::Endpoint;
12
13pub const DEFAULT_BACKEND_ENDPOINT_ATTEMPTS: usize = 16;
15
16#[derive(Debug)]
18pub struct BackendEndpointAllocator {
19 user_sid_hash: String,
20 namespace_id: String,
21 max_attempts: usize,
22 reserved_paths: HashSet<String>,
23}
24
25impl BackendEndpointAllocator {
26 pub fn new(user_sid_hash: impl Into<String>, namespace_id: impl Into<String>) -> Self {
28 Self {
29 user_sid_hash: user_sid_hash.into(),
30 namespace_id: namespace_id.into(),
31 max_attempts: DEFAULT_BACKEND_ENDPOINT_ATTEMPTS,
32 reserved_paths: HashSet::new(),
33 }
34 }
35
36 pub fn with_max_attempts(mut self, max_attempts: usize) -> Self {
38 self.max_attempts = max_attempts.max(1);
39 self
40 }
41
42 pub fn reserve_path(&mut self, path: impl Into<String>) {
44 self.reserved_paths.insert(path.into());
45 }
46
47 pub fn allocate(&mut self) -> Result<Endpoint, BackendEndpointAllocatorError> {
49 self.allocate_with_random128(|| {
50 let mut bytes = [0_u8; 16];
51 getrandom::fill(&mut bytes)?;
52 Ok(bytes)
53 })
54 }
55
56 pub fn allocate_with_random128<F>(
61 &mut self,
62 mut next_random128: F,
63 ) -> Result<Endpoint, BackendEndpointAllocatorError>
64 where
65 F: FnMut() -> Result<[u8; 16], BackendEndpointAllocatorError>,
66 {
67 for _ in 0..self.max_attempts {
68 let random128 = next_random128()?;
69 let path = endpoint_path(backend_pipe(&self.user_sid_hash, &random128)?)?;
70 if self.reserved_paths.insert(path.clone()) {
71 return Ok(Endpoint {
72 namespace_id: self.namespace_id.clone(),
73 path,
74 });
75 }
76 }
77
78 Err(BackendEndpointAllocatorError::CollisionExhausted {
79 attempts: self.max_attempts,
80 })
81 }
82}
83
84#[derive(Debug, thiserror::Error)]
86pub enum BackendEndpointAllocatorError {
87 #[error("backend endpoint random generation failed: {0}")]
89 Random(String),
90 #[error(transparent)]
92 PipePath(#[from] PipePathError),
93 #[error("backend pipe path did not contain the current platform variant")]
95 MissingPlatformPath,
96 #[error("backend endpoint allocation exhausted after {attempts} collision attempts")]
98 CollisionExhausted {
99 attempts: usize,
101 },
102}
103
104impl From<getrandom::Error> for BackendEndpointAllocatorError {
105 fn from(value: getrandom::Error) -> Self {
106 Self::Random(value.to_string())
107 }
108}
109
110fn endpoint_path(pipe_path: PipePath) -> Result<String, BackendEndpointAllocatorError> {
111 #[cfg(windows)]
112 {
113 pipe_path
114 .windows
115 .ok_or(BackendEndpointAllocatorError::MissingPlatformPath)
116 }
117
118 #[cfg(unix)]
119 {
120 pipe_path
121 .unix
122 .map(|path| path.to_string_lossy().into_owned())
123 .ok_or(BackendEndpointAllocatorError::MissingPlatformPath)
124 }
125}