running_process/broker/server/
serve.rs1use std::num::NonZeroUsize;
10use std::path::PathBuf;
11use std::sync::Mutex;
12use std::time::Instant;
13
14use crate::broker::backend_handle::{BackendHandle, BackendHandleError, DaemonProcess};
15use crate::broker::backend_lifecycle::identity::IdentityError;
16use crate::broker::lifecycle::sid::SidError;
17use crate::broker::protocol::{Endpoint, ServiceDefinition};
18
19use super::admin::AdminSnapshot;
20use super::backend_launcher::{BackendLauncher, CommandBackendLauncher};
21use super::backend_registry::BackendRegistry;
22use super::connection::{BrokerConnectionError, PeerCredentialPolicy};
23use super::control_socket::{
24 serve_control_socket_connections_with_limit_and_policy, ControlSocketConnectionLimit,
25 ControlSocketError,
26};
27use super::hello_handler::{HelloHandler, HelloHandlerError};
28use super::hello_router::HelloRouter;
29use super::instance::{BrokerInstanceError, BrokerInstanceKey};
30use super::service_def_loader::{
31 service_definition_dir, ServiceDefinitionError, ServiceDefinitionLoader,
32};
33use super::spawn_coordinator::SpawnCoordinator;
34use super::version_allow_list::{check_version_allowed, VersionPolicyBlock};
35
36#[derive(Clone, Debug)]
38pub struct BrokerServeConfig {
39 pub socket_path: String,
41 pub service_name: String,
43 pub service_version: String,
45 pub backend_endpoint: String,
47 pub service_definition_dir: PathBuf,
49 pub max_connections: Option<NonZeroUsize>,
51}
52
53#[derive(Clone, Debug)]
55pub struct BrokerLaunchServeConfig {
56 pub socket_path: String,
58 pub service_definition_dir: PathBuf,
60 pub max_connections: Option<NonZeroUsize>,
62}
63
64impl BrokerServeConfig {
65 pub fn new(
67 socket_path: impl Into<String>,
68 service_name: impl Into<String>,
69 service_version: impl Into<String>,
70 backend_endpoint: impl Into<String>,
71 max_connections: usize,
72 ) -> Result<Self, BrokerServeError> {
73 Ok(Self {
74 socket_path: socket_path.into(),
75 service_name: service_name.into(),
76 service_version: service_version.into(),
77 backend_endpoint: backend_endpoint.into(),
78 service_definition_dir: service_definition_dir(),
79 max_connections: Some(
80 NonZeroUsize::new(max_connections)
81 .ok_or(BrokerServeError::InvalidMaxConnections)?,
82 ),
83 })
84 }
85
86 pub fn unbounded(
89 socket_path: impl Into<String>,
90 service_name: impl Into<String>,
91 service_version: impl Into<String>,
92 backend_endpoint: impl Into<String>,
93 ) -> Self {
94 Self {
95 socket_path: socket_path.into(),
96 service_name: service_name.into(),
97 service_version: service_version.into(),
98 backend_endpoint: backend_endpoint.into(),
99 service_definition_dir: service_definition_dir(),
100 max_connections: None,
101 }
102 }
103
104 pub fn with_service_definition_dir(mut self, root: impl Into<PathBuf>) -> Self {
106 self.service_definition_dir = root.into();
107 self
108 }
109
110 pub fn connection_limit(&self) -> ControlSocketConnectionLimit {
112 self.max_connections.map_or(
113 ControlSocketConnectionLimit::Unbounded,
114 ControlSocketConnectionLimit::Bounded,
115 )
116 }
117}
118
119impl BrokerLaunchServeConfig {
120 pub fn new(
123 socket_path: impl Into<String>,
124 max_connections: usize,
125 ) -> Result<Self, BrokerServeError> {
126 Ok(Self {
127 socket_path: socket_path.into(),
128 service_definition_dir: service_definition_dir(),
129 max_connections: Some(
130 NonZeroUsize::new(max_connections)
131 .ok_or(BrokerServeError::InvalidMaxConnections)?,
132 ),
133 })
134 }
135
136 pub fn unbounded(socket_path: impl Into<String>) -> Self {
139 Self {
140 socket_path: socket_path.into(),
141 service_definition_dir: service_definition_dir(),
142 max_connections: None,
143 }
144 }
145
146 pub fn with_service_definition_dir(mut self, root: impl Into<PathBuf>) -> Self {
148 self.service_definition_dir = root.into();
149 self
150 }
151
152 pub fn connection_limit(&self) -> ControlSocketConnectionLimit {
154 self.max_connections.map_or(
155 ControlSocketConnectionLimit::Unbounded,
156 ControlSocketConnectionLimit::Bounded,
157 )
158 }
159}
160
161pub fn serve_registered_backend(config: BrokerServeConfig) -> Result<(), BrokerServeError> {
163 let RegisteredServeBackend {
164 loader,
165 registry,
166 instance,
167 ..
168 } = build_registered_backend(&config)?;
169 let registry = Mutex::new(registry);
170 let router = HelloRouter::with_lifecycle_monitor(&loader, ®istry);
171 let peer_policy =
172 PeerCredentialPolicy::current_user().ok_or(BrokerServeError::PeerPolicyUnavailable)?;
173 let started_at = Instant::now();
174 let snapshot_provider = || {
175 let registry = registry
176 .lock()
177 .unwrap_or_else(|poisoned| poisoned.into_inner());
178 AdminSnapshot::from_registry(instance.id(), started_at.elapsed(), true, 0, ®istry, &[])
179 };
180 serve_control_socket_connections_with_limit_and_policy(
181 &config.socket_path,
182 &router,
183 snapshot_provider,
184 config.connection_limit(),
185 &peer_policy,
186 )?;
187 Ok(())
188}
189
190pub fn serve_launching_backends(config: BrokerLaunchServeConfig) -> Result<(), BrokerServeError> {
193 let launcher = CommandBackendLauncher::for_current_user()?;
194 serve_launching_backends_with_launcher(config, &launcher)
195}
196
197pub fn serve_launching_backends_with_launcher(
199 config: BrokerLaunchServeConfig,
200 launcher: &dyn BackendLauncher,
201) -> Result<(), BrokerServeError> {
202 let loader = ServiceDefinitionLoader::new(&config.service_definition_dir);
203 let registry = Mutex::new(BackendRegistry::new());
204 let spawn_coordinator = Mutex::new(SpawnCoordinator::new());
205 let router = HelloRouter::with_lifecycle_monitor(&loader, ®istry)
206 .with_spawn_coordinator(&spawn_coordinator)
207 .with_backend_launcher(launcher);
208 let peer_policy =
209 PeerCredentialPolicy::current_user().ok_or(BrokerServeError::PeerPolicyUnavailable)?;
210 let started_at = Instant::now();
211 let snapshot_provider = || {
212 let registry = registry
213 .lock()
214 .unwrap_or_else(|poisoned| poisoned.into_inner());
215 AdminSnapshot::from_registry("launch", started_at.elapsed(), true, 0, ®istry, &[])
216 };
217 serve_control_socket_connections_with_limit_and_policy(
218 &config.socket_path,
219 &router,
220 snapshot_provider,
221 config.connection_limit(),
222 &peer_policy,
223 )?;
224 Ok(())
225}
226
227pub fn build_hello_handler(config: &BrokerServeConfig) -> Result<HelloHandler, BrokerServeError> {
229 let registered = build_registered_backend(config)?;
230 let backend = registered
231 .registry
232 .registered_backend_for(
233 ®istered.instance,
234 ®istered.service_definition,
235 &config.service_version,
236 )
237 .ok_or(BrokerServeError::RegisteredBackendMissing)?;
238
239 Ok(HelloHandler::new().with_backend(backend)?)
240}
241
242struct RegisteredServeBackend {
243 loader: ServiceDefinitionLoader,
244 registry: BackendRegistry,
245 instance: BrokerInstanceKey,
246 service_definition: ServiceDefinition,
247}
248
249fn build_registered_backend(
250 config: &BrokerServeConfig,
251) -> Result<RegisteredServeBackend, BrokerServeError> {
252 if config.backend_endpoint.is_empty() {
253 return Err(BrokerServeError::EmptyBackendEndpoint);
254 }
255
256 let loader = ServiceDefinitionLoader::new(&config.service_definition_dir);
257 let service_definition = loader.lookup_or_reload(&config.service_name)?;
258 check_version_allowed(&config.service_version, &service_definition)
259 .map_err(BrokerServeError::VersionPolicy)?;
260
261 let instance = BrokerInstanceKey::from_service_definition(&service_definition)?;
262 let endpoint = Endpoint {
263 namespace_id: instance.id(),
264 path: config.backend_endpoint.clone(),
265 };
266 let daemon = DaemonProcess::current_process(endpoint.clone(), Some(30))?;
267 let handle = BackendHandle::probe_with_service(
268 config.service_name.clone(),
269 config.service_version.clone(),
270 &endpoint,
271 &daemon,
272 )?;
273
274 let mut registry = BackendRegistry::new();
275 registry.insert(instance.clone(), handle);
276
277 Ok(RegisteredServeBackend {
278 loader,
279 registry,
280 instance,
281 service_definition,
282 })
283}
284
285#[derive(Debug, thiserror::Error)]
287pub enum BrokerServeError {
288 #[error("max_connections must be greater than zero")]
290 InvalidMaxConnections,
291 #[error("backend endpoint must not be empty")]
293 EmptyBackendEndpoint,
294 #[error(transparent)]
296 ServiceDefinition(#[from] ServiceDefinitionError),
297 #[error(transparent)]
299 BrokerInstance(#[from] BrokerInstanceError),
300 #[error(transparent)]
302 Identity(#[from] IdentityError),
303 #[error(transparent)]
305 Sid(#[from] SidError),
306 #[error("configured service version is blocked by service-definition policy: {0:?}")]
308 VersionPolicy(VersionPolicyBlock),
309 #[error(transparent)]
311 BackendHandle(#[from] BackendHandleError),
312 #[error("registered backend was missing after registry insert")]
314 RegisteredBackendMissing,
315 #[error(transparent)]
317 HelloHandler(#[from] HelloHandlerError),
318 #[error("current-user peer credential policy is unavailable")]
320 PeerPolicyUnavailable,
321 #[error(transparent)]
323 Connection(#[from] BrokerConnectionError),
324 #[error(transparent)]
326 ControlSocket(#[from] ControlSocketError),
327}