running_process/broker/server/
serve.rs1use std::num::NonZeroUsize;
10use std::path::PathBuf;
11use std::sync::Mutex;
12use std::time::Instant;
13
14use crate::broker::backend_handle::{BackendHandle, BackendHandleError, DaemonProcess};
15use crate::broker::backend_lifecycle::identity::IdentityError;
16use crate::broker::lifecycle::sid::SidError;
17use crate::broker::protocol::{Endpoint, ServiceDefinition};
18
19use super::admin::AdminSnapshot;
20use super::backend_launcher::{BackendLauncher, CommandBackendLauncher};
21use super::backend_registry::BackendRegistry;
22use super::connection::{BrokerConnectionError, PeerCredentialPolicy};
23use super::control_socket::{
24 serve_control_socket_connections_with_limit_policy_post_hello_and_fd_guard,
25 ControlSocketConnectionLimit, ControlSocketError,
26};
27use super::fd_pressure::FdPressureGuard;
28use super::handoff_serve::{complete_negotiated_handoff, ServeHandoffContext};
29use super::hello_handler::{HelloHandler, HelloHandlerError};
30use super::hello_router::HelloRouter;
31use super::instance::{BrokerInstanceError, BrokerInstanceKey};
32use super::service_def_loader::{
33 service_definition_dir, ServiceDefinitionError, ServiceDefinitionLoader,
34};
35use super::spawn_coordinator::SpawnCoordinator;
36use super::version_allow_list::{check_version_allowed, VersionPolicyBlock};
37
38#[derive(Clone, Debug)]
40pub struct BrokerServeConfig {
41 pub socket_path: String,
43 pub service_name: String,
45 pub service_version: String,
47 pub backend_endpoint: String,
49 pub service_definition_dir: PathBuf,
51 pub max_connections: Option<NonZeroUsize>,
53 pub handoff_endpoint: Option<String>,
58}
59
60#[derive(Clone, Debug)]
62pub struct BrokerLaunchServeConfig {
63 pub socket_path: String,
65 pub service_definition_dir: PathBuf,
67 pub max_connections: Option<NonZeroUsize>,
69}
70
71impl BrokerServeConfig {
72 pub fn new(
74 socket_path: impl Into<String>,
75 service_name: impl Into<String>,
76 service_version: impl Into<String>,
77 backend_endpoint: impl Into<String>,
78 max_connections: usize,
79 ) -> Result<Self, BrokerServeError> {
80 Ok(Self {
81 socket_path: socket_path.into(),
82 service_name: service_name.into(),
83 service_version: service_version.into(),
84 backend_endpoint: backend_endpoint.into(),
85 service_definition_dir: service_definition_dir(),
86 max_connections: Some(
87 NonZeroUsize::new(max_connections)
88 .ok_or(BrokerServeError::InvalidMaxConnections)?,
89 ),
90 handoff_endpoint: None,
91 })
92 }
93
94 pub fn unbounded(
97 socket_path: impl Into<String>,
98 service_name: impl Into<String>,
99 service_version: impl Into<String>,
100 backend_endpoint: impl Into<String>,
101 ) -> Self {
102 Self {
103 socket_path: socket_path.into(),
104 service_name: service_name.into(),
105 service_version: service_version.into(),
106 backend_endpoint: backend_endpoint.into(),
107 service_definition_dir: service_definition_dir(),
108 max_connections: None,
109 handoff_endpoint: None,
110 }
111 }
112
113 pub fn with_service_definition_dir(mut self, root: impl Into<PathBuf>) -> Self {
115 self.service_definition_dir = root.into();
116 self
117 }
118
119 pub fn with_handoff_endpoint(mut self, endpoint: impl Into<String>) -> Self {
122 self.handoff_endpoint = Some(endpoint.into());
123 self
124 }
125
126 pub fn connection_limit(&self) -> ControlSocketConnectionLimit {
128 self.max_connections.map_or(
129 ControlSocketConnectionLimit::Unbounded,
130 ControlSocketConnectionLimit::Bounded,
131 )
132 }
133}
134
135impl BrokerLaunchServeConfig {
136 pub fn new(
139 socket_path: impl Into<String>,
140 max_connections: usize,
141 ) -> Result<Self, BrokerServeError> {
142 Ok(Self {
143 socket_path: socket_path.into(),
144 service_definition_dir: service_definition_dir(),
145 max_connections: Some(
146 NonZeroUsize::new(max_connections)
147 .ok_or(BrokerServeError::InvalidMaxConnections)?,
148 ),
149 })
150 }
151
152 pub fn unbounded(socket_path: impl Into<String>) -> Self {
155 Self {
156 socket_path: socket_path.into(),
157 service_definition_dir: service_definition_dir(),
158 max_connections: None,
159 }
160 }
161
162 pub fn with_service_definition_dir(mut self, root: impl Into<PathBuf>) -> Self {
164 self.service_definition_dir = root.into();
165 self
166 }
167
168 pub fn connection_limit(&self) -> ControlSocketConnectionLimit {
170 self.max_connections.map_or(
171 ControlSocketConnectionLimit::Unbounded,
172 ControlSocketConnectionLimit::Bounded,
173 )
174 }
175}
176
177pub fn serve_registered_backend(config: BrokerServeConfig) -> Result<(), BrokerServeError> {
179 let RegisteredServeBackend {
180 loader,
181 registry,
182 instance,
183 ..
184 } = build_registered_backend(&config)?;
185 let registry = Mutex::new(registry);
186 let router = HelloRouter::with_lifecycle_monitor(&loader, ®istry);
187 let peer_policy =
188 PeerCredentialPolicy::current_user().ok_or(BrokerServeError::PeerPolicyUnavailable)?;
189 let started_at = Instant::now();
190 let fd_guard = FdPressureGuard::default();
191 let snapshot_provider = || {
192 let registry = registry
193 .lock()
194 .unwrap_or_else(|poisoned| poisoned.into_inner());
195 let demoted = fd_guard.is_demoted();
196 AdminSnapshot::from_registry(
197 instance.id(),
198 started_at.elapsed(),
199 !demoted,
200 0,
201 ®istry,
202 &[],
203 )
204 .with_fd_pressure_demoted(demoted)
205 };
206 serve_control_socket_connections_with_limit_policy_post_hello_and_fd_guard(
207 &config.socket_path,
208 &router,
209 snapshot_provider,
210 config.connection_limit(),
211 &peer_policy,
212 |stream, reply| {
213 let Some(handoff_endpoint) = config.handoff_endpoint.as_deref() else {
215 return;
216 };
217 let ctx = ServeHandoffContext {
218 handoff_endpoint,
219 service_name: &config.service_name,
220 service_version: &config.service_version,
221 instance: &instance,
222 registry: ®istry,
223 };
224 complete_negotiated_handoff(&ctx, stream, reply);
225 },
226 &fd_guard,
227 )?;
228 Ok(())
229}
230
231pub fn serve_launching_backends(config: BrokerLaunchServeConfig) -> Result<(), BrokerServeError> {
234 let launcher = CommandBackendLauncher::for_current_user()?;
235 serve_launching_backends_with_launcher(config, &launcher)
236}
237
238pub fn serve_launching_backends_with_launcher(
240 config: BrokerLaunchServeConfig,
241 launcher: &dyn BackendLauncher,
242) -> Result<(), BrokerServeError> {
243 let loader = ServiceDefinitionLoader::new(&config.service_definition_dir);
244 let registry = Mutex::new(BackendRegistry::new());
245 let spawn_coordinator = Mutex::new(SpawnCoordinator::new());
246 let router = HelloRouter::with_lifecycle_monitor(&loader, ®istry)
247 .with_spawn_coordinator(&spawn_coordinator)
248 .with_backend_launcher(launcher);
249 let peer_policy =
250 PeerCredentialPolicy::current_user().ok_or(BrokerServeError::PeerPolicyUnavailable)?;
251 let started_at = Instant::now();
252 let fd_guard = FdPressureGuard::default();
253 let snapshot_provider = || {
254 let registry = registry
255 .lock()
256 .unwrap_or_else(|poisoned| poisoned.into_inner());
257 let demoted = fd_guard.is_demoted();
258 AdminSnapshot::from_registry("launch", started_at.elapsed(), !demoted, 0, ®istry, &[])
259 .with_fd_pressure_demoted(demoted)
260 };
261 serve_control_socket_connections_with_limit_policy_post_hello_and_fd_guard(
262 &config.socket_path,
263 &router,
264 snapshot_provider,
265 config.connection_limit(),
266 &peer_policy,
267 |_stream, _reply| {},
268 &fd_guard,
269 )?;
270 Ok(())
271}
272
273pub fn build_hello_handler(config: &BrokerServeConfig) -> Result<HelloHandler, BrokerServeError> {
275 let registered = build_registered_backend(config)?;
276 let backend = registered
277 .registry
278 .registered_backend_for(
279 ®istered.instance,
280 ®istered.service_definition,
281 &config.service_version,
282 )
283 .ok_or(BrokerServeError::RegisteredBackendMissing)?;
284
285 Ok(HelloHandler::new().with_backend(backend)?)
286}
287
288struct RegisteredServeBackend {
289 loader: ServiceDefinitionLoader,
290 registry: BackendRegistry,
291 instance: BrokerInstanceKey,
292 service_definition: ServiceDefinition,
293}
294
295fn build_registered_backend(
296 config: &BrokerServeConfig,
297) -> Result<RegisteredServeBackend, BrokerServeError> {
298 if config.backend_endpoint.is_empty() {
299 return Err(BrokerServeError::EmptyBackendEndpoint);
300 }
301
302 let loader = ServiceDefinitionLoader::new(&config.service_definition_dir);
303 let service_definition = loader.lookup_or_reload(&config.service_name)?;
304 check_version_allowed(&config.service_version, &service_definition)
305 .map_err(BrokerServeError::VersionPolicy)?;
306
307 let instance = BrokerInstanceKey::from_service_definition(&service_definition)?;
308 let endpoint = Endpoint {
309 namespace_id: instance.id(),
310 path: config.backend_endpoint.clone(),
311 };
312 let daemon = DaemonProcess::current_process(endpoint.clone(), Some(30))?;
313 let handle = BackendHandle::probe_with_service(
314 config.service_name.clone(),
315 config.service_version.clone(),
316 &endpoint,
317 &daemon,
318 )?;
319
320 let mut registry = BackendRegistry::new();
321 registry.insert(instance.clone(), handle);
322
323 Ok(RegisteredServeBackend {
324 loader,
325 registry,
326 instance,
327 service_definition,
328 })
329}
330
331#[derive(Debug, thiserror::Error)]
333pub enum BrokerServeError {
334 #[error("max_connections must be greater than zero")]
336 InvalidMaxConnections,
337 #[error("backend endpoint must not be empty")]
339 EmptyBackendEndpoint,
340 #[error(transparent)]
342 ServiceDefinition(#[from] ServiceDefinitionError),
343 #[error(transparent)]
345 BrokerInstance(#[from] BrokerInstanceError),
346 #[error(transparent)]
348 Identity(#[from] IdentityError),
349 #[error(transparent)]
351 Sid(#[from] SidError),
352 #[error("configured service version is blocked by service-definition policy: {0:?}")]
354 VersionPolicy(VersionPolicyBlock),
355 #[error(transparent)]
357 BackendHandle(#[from] BackendHandleError),
358 #[error("registered backend was missing after registry insert")]
360 RegisteredBackendMissing,
361 #[error(transparent)]
363 HelloHandler(#[from] HelloHandlerError),
364 #[error("current-user peer credential policy is unavailable")]
366 PeerPolicyUnavailable,
367 #[error(transparent)]
369 Connection(#[from] BrokerConnectionError),
370 #[error(transparent)]
372 ControlSocket(#[from] ControlSocketError),
373}