running_process/broker/server/
backend_endpoint_allocator.rs1use std::collections::HashSet;
9
10use crate::broker::lifecycle::names::{backend_pipe, PipePath, PipePathError};
11use crate::broker::protocol::Endpoint;
12
13pub const DEFAULT_BACKEND_ENDPOINT_ATTEMPTS: usize = 16;
15
16#[derive(Debug)]
18pub struct BackendEndpointAllocator {
19 user_sid_hash: String,
20 namespace_id: String,
21 max_attempts: usize,
22 reserved_paths: HashSet<String>,
23}
24
25impl BackendEndpointAllocator {
26 pub fn new(
28 user_sid_hash: impl Into<String>,
29 namespace_id: impl Into<String>,
30 ) -> Self {
31 Self {
32 user_sid_hash: user_sid_hash.into(),
33 namespace_id: namespace_id.into(),
34 max_attempts: DEFAULT_BACKEND_ENDPOINT_ATTEMPTS,
35 reserved_paths: HashSet::new(),
36 }
37 }
38
39 pub fn with_max_attempts(mut self, max_attempts: usize) -> Self {
41 self.max_attempts = max_attempts.max(1);
42 self
43 }
44
45 pub fn reserve_path(&mut self, path: impl Into<String>) {
47 self.reserved_paths.insert(path.into());
48 }
49
50 pub fn allocate(&mut self) -> Result<Endpoint, BackendEndpointAllocatorError> {
52 self.allocate_with_random128(|| {
53 let mut bytes = [0_u8; 16];
54 getrandom::fill(&mut bytes)?;
55 Ok(bytes)
56 })
57 }
58
59 pub fn allocate_with_random128<F>(
64 &mut self,
65 mut next_random128: F,
66 ) -> Result<Endpoint, BackendEndpointAllocatorError>
67 where
68 F: FnMut() -> Result<[u8; 16], BackendEndpointAllocatorError>,
69 {
70 for _ in 0..self.max_attempts {
71 let random128 = next_random128()?;
72 let path = endpoint_path(backend_pipe(&self.user_sid_hash, &random128)?)?;
73 if self.reserved_paths.insert(path.clone()) {
74 return Ok(Endpoint {
75 namespace_id: self.namespace_id.clone(),
76 path,
77 });
78 }
79 }
80
81 Err(BackendEndpointAllocatorError::CollisionExhausted {
82 attempts: self.max_attempts,
83 })
84 }
85}
86
87#[derive(Debug, thiserror::Error)]
89pub enum BackendEndpointAllocatorError {
90 #[error("backend endpoint random generation failed: {0}")]
92 Random(String),
93 #[error(transparent)]
95 PipePath(#[from] PipePathError),
96 #[error("backend pipe path did not contain the current platform variant")]
98 MissingPlatformPath,
99 #[error("backend endpoint allocation exhausted after {attempts} collision attempts")]
101 CollisionExhausted {
102 attempts: usize,
104 },
105}
106
107impl From<getrandom::Error> for BackendEndpointAllocatorError {
108 fn from(value: getrandom::Error) -> Self {
109 Self::Random(value.to_string())
110 }
111}
112
113fn endpoint_path(pipe_path: PipePath) -> Result<String, BackendEndpointAllocatorError> {
114 #[cfg(windows)]
115 {
116 pipe_path
117 .windows
118 .ok_or(BackendEndpointAllocatorError::MissingPlatformPath)
119 }
120
121 #[cfg(unix)]
122 {
123 pipe_path
124 .unix
125 .map(|path| path.to_string_lossy().into_owned())
126 .ok_or(BackendEndpointAllocatorError::MissingPlatformPath)
127 }
128}