Skip to main content

running_process/broker/server/
serve.rs

1//! Broker serve-mode wiring for registered and launch-backed backends.
2//!
3//! Phase 4 grows the long-lived daemon incrementally. This module connects the
4//! existing service-definition loader, broker instance routing, backend
5//! registry, backend launch coordination, and framed local-socket accept loop.
6//! Tests can still request bounded runs while the CLI defaults to accepting
7//! until process exit.
8
9use std::num::NonZeroUsize;
10use std::path::PathBuf;
11use std::sync::Mutex;
12use std::time::Instant;
13
14use crate::broker::backend_handle::{BackendHandle, BackendHandleError, DaemonProcess};
15use crate::broker::backend_lifecycle::identity::IdentityError;
16use crate::broker::lifecycle::sid::SidError;
17use crate::broker::protocol::{Endpoint, ServiceDefinition};
18
19use super::admin::AdminSnapshot;
20use super::backend_launcher::{BackendLauncher, CommandBackendLauncher};
21use super::backend_registry::BackendRegistry;
22use super::connection::{BrokerConnectionError, PeerCredentialPolicy};
23use super::control_socket::{
24    serve_control_socket_connections_with_limit_and_policy, ControlSocketConnectionLimit,
25    ControlSocketError,
26};
27use super::hello_handler::{HelloHandler, HelloHandlerError};
28use super::hello_router::HelloRouter;
29use super::instance::{BrokerInstanceError, BrokerInstanceKey};
30use super::service_def_loader::{
31    service_definition_dir, ServiceDefinitionError, ServiceDefinitionLoader,
32};
33use super::spawn_coordinator::SpawnCoordinator;
34use super::version_allow_list::{check_version_allowed, VersionPolicyBlock};
35
36/// Configuration for a bounded broker serve-mode run.
37#[derive(Clone, Debug)]
38pub struct BrokerServeConfig {
39    /// Local socket path or Windows pipe name to bind.
40    pub socket_path: String,
41    /// Service definition to load.
42    pub service_name: String,
43    /// Backend version to register for Hello negotiation.
44    pub service_version: String,
45    /// Direct backend endpoint returned to negotiated clients.
46    pub backend_endpoint: String,
47    /// Directory containing `<service>.servicedef` protobuf files.
48    pub service_definition_dir: PathBuf,
49    /// Optional number of control-socket connections to accept before returning.
50    pub max_connections: Option<NonZeroUsize>,
51}
52
53/// Configuration for serve mode that launches backends on Hello miss.
54#[derive(Clone, Debug)]
55pub struct BrokerLaunchServeConfig {
56    /// Local socket path or Windows pipe name to bind.
57    pub socket_path: String,
58    /// Directory containing `<service>.servicedef` protobuf files.
59    pub service_definition_dir: PathBuf,
60    /// Optional number of control-socket connections to accept before returning.
61    pub max_connections: Option<NonZeroUsize>,
62}
63
64impl BrokerServeConfig {
65    /// Build a serve config using the platform service-definition directory.
66    pub fn new(
67        socket_path: impl Into<String>,
68        service_name: impl Into<String>,
69        service_version: impl Into<String>,
70        backend_endpoint: impl Into<String>,
71        max_connections: usize,
72    ) -> Result<Self, BrokerServeError> {
73        Ok(Self {
74            socket_path: socket_path.into(),
75            service_name: service_name.into(),
76            service_version: service_version.into(),
77            backend_endpoint: backend_endpoint.into(),
78            service_definition_dir: service_definition_dir(),
79            max_connections: Some(
80                NonZeroUsize::new(max_connections)
81                    .ok_or(BrokerServeError::InvalidMaxConnections)?,
82            ),
83        })
84    }
85
86    /// Build an unbounded serve config using the platform service-definition
87    /// directory.
88    pub fn unbounded(
89        socket_path: impl Into<String>,
90        service_name: impl Into<String>,
91        service_version: impl Into<String>,
92        backend_endpoint: impl Into<String>,
93    ) -> Self {
94        Self {
95            socket_path: socket_path.into(),
96            service_name: service_name.into(),
97            service_version: service_version.into(),
98            backend_endpoint: backend_endpoint.into(),
99            service_definition_dir: service_definition_dir(),
100            max_connections: None,
101        }
102    }
103
104    /// Override the service-definition directory.
105    pub fn with_service_definition_dir(mut self, root: impl Into<PathBuf>) -> Self {
106        self.service_definition_dir = root.into();
107        self
108    }
109
110    /// Return the configured accept-loop connection limit.
111    pub fn connection_limit(&self) -> ControlSocketConnectionLimit {
112        self.max_connections.map_or(
113            ControlSocketConnectionLimit::Unbounded,
114            ControlSocketConnectionLimit::Bounded,
115        )
116    }
117}
118
119impl BrokerLaunchServeConfig {
120    /// Build a launch-backed serve config using the platform
121    /// service-definition directory.
122    pub fn new(
123        socket_path: impl Into<String>,
124        max_connections: usize,
125    ) -> Result<Self, BrokerServeError> {
126        Ok(Self {
127            socket_path: socket_path.into(),
128            service_definition_dir: service_definition_dir(),
129            max_connections: Some(
130                NonZeroUsize::new(max_connections)
131                    .ok_or(BrokerServeError::InvalidMaxConnections)?,
132            ),
133        })
134    }
135
136    /// Build an unbounded launch-backed serve config using the platform
137    /// service-definition directory.
138    pub fn unbounded(socket_path: impl Into<String>) -> Self {
139        Self {
140            socket_path: socket_path.into(),
141            service_definition_dir: service_definition_dir(),
142            max_connections: None,
143        }
144    }
145
146    /// Override the service-definition directory.
147    pub fn with_service_definition_dir(mut self, root: impl Into<PathBuf>) -> Self {
148        self.service_definition_dir = root.into();
149        self
150    }
151
152    /// Return the configured accept-loop connection limit.
153    pub fn connection_limit(&self) -> ControlSocketConnectionLimit {
154        self.max_connections.map_or(
155            ControlSocketConnectionLimit::Unbounded,
156            ControlSocketConnectionLimit::Bounded,
157        )
158    }
159}
160
161/// Serve a bounded number of broker Hello connections.
162pub fn serve_registered_backend(config: BrokerServeConfig) -> Result<(), BrokerServeError> {
163    let RegisteredServeBackend {
164        loader,
165        registry,
166        instance,
167        ..
168    } = build_registered_backend(&config)?;
169    let registry = Mutex::new(registry);
170    let router = HelloRouter::with_lifecycle_monitor(&loader, &registry);
171    let peer_policy =
172        PeerCredentialPolicy::current_user().ok_or(BrokerServeError::PeerPolicyUnavailable)?;
173    let started_at = Instant::now();
174    let snapshot_provider = || {
175        let registry = registry
176            .lock()
177            .unwrap_or_else(|poisoned| poisoned.into_inner());
178        AdminSnapshot::from_registry(instance.id(), started_at.elapsed(), true, 0, &registry, &[])
179    };
180    serve_control_socket_connections_with_limit_and_policy(
181        &config.socket_path,
182        &router,
183        snapshot_provider,
184        config.connection_limit(),
185        &peer_policy,
186    )?;
187    Ok(())
188}
189
190/// Serve a bounded number of broker Hello connections, launching backends on
191/// verified registry misses.
192pub fn serve_launching_backends(config: BrokerLaunchServeConfig) -> Result<(), BrokerServeError> {
193    let launcher = CommandBackendLauncher::for_current_user()?;
194    serve_launching_backends_with_launcher(config, &launcher)
195}
196
197/// Testable launch-backed serve mode with an injected launcher.
198pub fn serve_launching_backends_with_launcher(
199    config: BrokerLaunchServeConfig,
200    launcher: &dyn BackendLauncher,
201) -> Result<(), BrokerServeError> {
202    let loader = ServiceDefinitionLoader::new(&config.service_definition_dir);
203    let registry = Mutex::new(BackendRegistry::new());
204    let spawn_coordinator = Mutex::new(SpawnCoordinator::new());
205    let router = HelloRouter::with_lifecycle_monitor(&loader, &registry)
206        .with_spawn_coordinator(&spawn_coordinator)
207        .with_backend_launcher(launcher);
208    let peer_policy =
209        PeerCredentialPolicy::current_user().ok_or(BrokerServeError::PeerPolicyUnavailable)?;
210    let started_at = Instant::now();
211    let snapshot_provider = || {
212        let registry = registry
213            .lock()
214            .unwrap_or_else(|poisoned| poisoned.into_inner());
215        AdminSnapshot::from_registry("launch", started_at.elapsed(), true, 0, &registry, &[])
216    };
217    serve_control_socket_connections_with_limit_and_policy(
218        &config.socket_path,
219        &router,
220        snapshot_provider,
221        config.connection_limit(),
222        &peer_policy,
223    )?;
224    Ok(())
225}
226
227/// Build a Hello handler from one service definition and backend endpoint.
228pub fn build_hello_handler(config: &BrokerServeConfig) -> Result<HelloHandler, BrokerServeError> {
229    let registered = build_registered_backend(config)?;
230    let backend = registered
231        .registry
232        .registered_backend_for(
233            &registered.instance,
234            &registered.service_definition,
235            &config.service_version,
236        )
237        .ok_or(BrokerServeError::RegisteredBackendMissing)?;
238
239    Ok(HelloHandler::new().with_backend(backend)?)
240}
241
242struct RegisteredServeBackend {
243    loader: ServiceDefinitionLoader,
244    registry: BackendRegistry,
245    instance: BrokerInstanceKey,
246    service_definition: ServiceDefinition,
247}
248
249fn build_registered_backend(
250    config: &BrokerServeConfig,
251) -> Result<RegisteredServeBackend, BrokerServeError> {
252    if config.backend_endpoint.is_empty() {
253        return Err(BrokerServeError::EmptyBackendEndpoint);
254    }
255
256    let loader = ServiceDefinitionLoader::new(&config.service_definition_dir);
257    let service_definition = loader.lookup_or_reload(&config.service_name)?;
258    check_version_allowed(&config.service_version, &service_definition)
259        .map_err(BrokerServeError::VersionPolicy)?;
260
261    let instance = BrokerInstanceKey::from_service_definition(&service_definition)?;
262    let endpoint = Endpoint {
263        namespace_id: instance.id(),
264        path: config.backend_endpoint.clone(),
265    };
266    let daemon = DaemonProcess::current_process(endpoint.clone(), Some(30))?;
267    let handle = BackendHandle::probe_with_service(
268        config.service_name.clone(),
269        config.service_version.clone(),
270        &endpoint,
271        &daemon,
272    )?;
273
274    let mut registry = BackendRegistry::new();
275    registry.insert(instance.clone(), handle);
276
277    Ok(RegisteredServeBackend {
278        loader,
279        registry,
280        instance,
281        service_definition,
282    })
283}
284
285/// Errors raised while wiring or serving the bounded broker.
286#[derive(Debug, thiserror::Error)]
287pub enum BrokerServeError {
288    /// The connection bound must be non-zero.
289    #[error("max_connections must be greater than zero")]
290    InvalidMaxConnections,
291    /// The configured backend endpoint is empty.
292    #[error("backend endpoint must not be empty")]
293    EmptyBackendEndpoint,
294    /// Service-definition load or validation failed.
295    #[error(transparent)]
296    ServiceDefinition(#[from] ServiceDefinitionError),
297    /// Service isolation could not be mapped to a broker instance.
298    #[error(transparent)]
299    BrokerInstance(#[from] BrokerInstanceError),
300    /// Current process identity could not be recorded for the configured backend.
301    #[error(transparent)]
302    Identity(#[from] IdentityError),
303    /// Current user SID hash could not be computed for backend endpoint allocation.
304    #[error(transparent)]
305    Sid(#[from] SidError),
306    /// Configured backend version is blocked by the service definition.
307    #[error("configured service version is blocked by service-definition policy: {0:?}")]
308    VersionPolicy(VersionPolicyBlock),
309    /// Backend identity verification failed.
310    #[error(transparent)]
311    BackendHandle(#[from] BackendHandleError),
312    /// Registry lookup failed after inserting the configured backend.
313    #[error("registered backend was missing after registry insert")]
314    RegisteredBackendMissing,
315    /// Hello handler construction failed.
316    #[error(transparent)]
317    HelloHandler(#[from] HelloHandlerError),
318    /// The platform current-user peer policy could not be constructed.
319    #[error("current-user peer credential policy is unavailable")]
320    PeerPolicyUnavailable,
321    /// Local-socket serving failed.
322    #[error(transparent)]
323    Connection(#[from] BrokerConnectionError),
324    /// Shared control-socket serving failed.
325    #[error(transparent)]
326    ControlSocket(#[from] ControlSocketError),
327}