Skip to main content

running_process/broker/server/
serve.rs

1//! Broker serve-mode wiring for registered and launch-backed backends.
2//!
3//! Phase 4 grows the long-lived daemon incrementally. This module connects the
4//! existing service-definition loader, broker instance routing, backend
5//! registry, backend launch coordination, and framed local-socket accept loop.
6//! Tests can still request bounded runs while the CLI defaults to accepting
7//! until process exit.
8
9use std::num::NonZeroUsize;
10use std::path::PathBuf;
11use std::sync::Mutex;
12use std::time::Instant;
13
14use crate::broker::backend_handle::{BackendHandle, BackendHandleError, DaemonProcess};
15use crate::broker::backend_lifecycle::identity::IdentityError;
16use crate::broker::lifecycle::sid::SidError;
17use crate::broker::protocol::{Endpoint, ServiceDefinition};
18
19use super::admin::AdminSnapshot;
20use super::backend_launcher::{BackendLauncher, CommandBackendLauncher};
21use super::backend_registry::BackendRegistry;
22use super::connection::{BrokerConnectionError, PeerCredentialPolicy};
23use super::control_socket::{
24    serve_control_socket_connections_with_limit_policy_post_hello_and_fd_guard,
25    ControlSocketConnectionLimit, ControlSocketError,
26};
27use super::fd_pressure::FdPressureGuard;
28use super::handoff_serve::{complete_negotiated_handoff, ServeHandoffContext};
29use super::hello_handler::{HelloHandler, HelloHandlerError};
30use super::hello_router::HelloRouter;
31use super::instance::{BrokerInstanceError, BrokerInstanceKey};
32use super::service_def_loader::{
33    service_definition_dir, ServiceDefinitionError, ServiceDefinitionLoader,
34};
35use super::spawn_coordinator::SpawnCoordinator;
36use super::version_allow_list::{check_version_allowed, VersionPolicyBlock};
37
38/// Configuration for a bounded broker serve-mode run.
39#[derive(Clone, Debug)]
40pub struct BrokerServeConfig {
41    /// Local socket path or Windows pipe name to bind.
42    pub socket_path: String,
43    /// Service definition to load.
44    pub service_name: String,
45    /// Backend version to register for Hello negotiation.
46    pub service_version: String,
47    /// Direct backend endpoint returned to negotiated clients.
48    pub backend_endpoint: String,
49    /// Directory containing `<service>.servicedef` protobuf files.
50    pub service_definition_dir: PathBuf,
51    /// Optional number of control-socket connections to accept before returning.
52    pub max_connections: Option<NonZeroUsize>,
53    /// Optional backend handoff endpoint enabling the Phase 6 handle-passing
54    /// optimization (#387). `None` (the default) disables handoff entirely:
55    /// negotiated clients always reconnect through `backend_endpoint`. This
56    /// matches the opt-in Phase 6 gate in `docs/v1-rollout-policy.md`.
57    pub handoff_endpoint: Option<String>,
58}
59
60/// Configuration for serve mode that launches backends on Hello miss.
61#[derive(Clone, Debug)]
62pub struct BrokerLaunchServeConfig {
63    /// Local socket path or Windows pipe name to bind.
64    pub socket_path: String,
65    /// Directory containing `<service>.servicedef` protobuf files.
66    pub service_definition_dir: PathBuf,
67    /// Optional number of control-socket connections to accept before returning.
68    pub max_connections: Option<NonZeroUsize>,
69}
70
71impl BrokerServeConfig {
72    /// Build a serve config using the platform service-definition directory.
73    pub fn new(
74        socket_path: impl Into<String>,
75        service_name: impl Into<String>,
76        service_version: impl Into<String>,
77        backend_endpoint: impl Into<String>,
78        max_connections: usize,
79    ) -> Result<Self, BrokerServeError> {
80        Ok(Self {
81            socket_path: socket_path.into(),
82            service_name: service_name.into(),
83            service_version: service_version.into(),
84            backend_endpoint: backend_endpoint.into(),
85            service_definition_dir: service_definition_dir(),
86            max_connections: Some(
87                NonZeroUsize::new(max_connections)
88                    .ok_or(BrokerServeError::InvalidMaxConnections)?,
89            ),
90            handoff_endpoint: None,
91        })
92    }
93
94    /// Build an unbounded serve config using the platform service-definition
95    /// directory.
96    pub fn unbounded(
97        socket_path: impl Into<String>,
98        service_name: impl Into<String>,
99        service_version: impl Into<String>,
100        backend_endpoint: impl Into<String>,
101    ) -> Self {
102        Self {
103            socket_path: socket_path.into(),
104            service_name: service_name.into(),
105            service_version: service_version.into(),
106            backend_endpoint: backend_endpoint.into(),
107            service_definition_dir: service_definition_dir(),
108            max_connections: None,
109            handoff_endpoint: None,
110        }
111    }
112
113    /// Override the service-definition directory.
114    pub fn with_service_definition_dir(mut self, root: impl Into<PathBuf>) -> Self {
115        self.service_definition_dir = root.into();
116        self
117    }
118
119    /// Opt in to the Phase 6 handle-passing handoff by configuring the
120    /// backend handoff endpoint the broker dials after negotiation (#387).
121    pub fn with_handoff_endpoint(mut self, endpoint: impl Into<String>) -> Self {
122        self.handoff_endpoint = Some(endpoint.into());
123        self
124    }
125
126    /// Return the configured accept-loop connection limit.
127    pub fn connection_limit(&self) -> ControlSocketConnectionLimit {
128        self.max_connections.map_or(
129            ControlSocketConnectionLimit::Unbounded,
130            ControlSocketConnectionLimit::Bounded,
131        )
132    }
133}
134
135impl BrokerLaunchServeConfig {
136    /// Build a launch-backed serve config using the platform
137    /// service-definition directory.
138    pub fn new(
139        socket_path: impl Into<String>,
140        max_connections: usize,
141    ) -> Result<Self, BrokerServeError> {
142        Ok(Self {
143            socket_path: socket_path.into(),
144            service_definition_dir: service_definition_dir(),
145            max_connections: Some(
146                NonZeroUsize::new(max_connections)
147                    .ok_or(BrokerServeError::InvalidMaxConnections)?,
148            ),
149        })
150    }
151
152    /// Build an unbounded launch-backed serve config using the platform
153    /// service-definition directory.
154    pub fn unbounded(socket_path: impl Into<String>) -> Self {
155        Self {
156            socket_path: socket_path.into(),
157            service_definition_dir: service_definition_dir(),
158            max_connections: None,
159        }
160    }
161
162    /// Override the service-definition directory.
163    pub fn with_service_definition_dir(mut self, root: impl Into<PathBuf>) -> Self {
164        self.service_definition_dir = root.into();
165        self
166    }
167
168    /// Return the configured accept-loop connection limit.
169    pub fn connection_limit(&self) -> ControlSocketConnectionLimit {
170        self.max_connections.map_or(
171            ControlSocketConnectionLimit::Unbounded,
172            ControlSocketConnectionLimit::Bounded,
173        )
174    }
175}
176
177/// Serve a bounded number of broker Hello connections.
178pub fn serve_registered_backend(config: BrokerServeConfig) -> Result<(), BrokerServeError> {
179    let RegisteredServeBackend {
180        loader,
181        registry,
182        instance,
183        ..
184    } = build_registered_backend(&config)?;
185    let registry = Mutex::new(registry);
186    let router = HelloRouter::with_lifecycle_monitor(&loader, &registry);
187    let peer_policy =
188        PeerCredentialPolicy::current_user().ok_or(BrokerServeError::PeerPolicyUnavailable)?;
189    let started_at = Instant::now();
190    let fd_guard = FdPressureGuard::default();
191    let snapshot_provider = || {
192        let registry = registry
193            .lock()
194            .unwrap_or_else(|poisoned| poisoned.into_inner());
195        let demoted = fd_guard.is_demoted();
196        AdminSnapshot::from_registry(
197            instance.id(),
198            started_at.elapsed(),
199            !demoted,
200            0,
201            &registry,
202            &[],
203        )
204        .with_fd_pressure_demoted(demoted)
205    };
206    serve_control_socket_connections_with_limit_policy_post_hello_and_fd_guard(
207        &config.socket_path,
208        &router,
209        snapshot_provider,
210        config.connection_limit(),
211        &peer_policy,
212        |stream, reply| {
213            // Off by default: no handoff endpoint means no handoff attempt.
214            let Some(handoff_endpoint) = config.handoff_endpoint.as_deref() else {
215                return;
216            };
217            let ctx = ServeHandoffContext {
218                handoff_endpoint,
219                service_name: &config.service_name,
220                service_version: &config.service_version,
221                instance: &instance,
222                registry: &registry,
223            };
224            complete_negotiated_handoff(&ctx, stream, reply);
225        },
226        &fd_guard,
227    )?;
228    Ok(())
229}
230
231/// Serve a bounded number of broker Hello connections, launching backends on
232/// verified registry misses.
233pub fn serve_launching_backends(config: BrokerLaunchServeConfig) -> Result<(), BrokerServeError> {
234    let launcher = CommandBackendLauncher::for_current_user()?;
235    serve_launching_backends_with_launcher(config, &launcher)
236}
237
238/// Testable launch-backed serve mode with an injected launcher.
239pub fn serve_launching_backends_with_launcher(
240    config: BrokerLaunchServeConfig,
241    launcher: &dyn BackendLauncher,
242) -> Result<(), BrokerServeError> {
243    let loader = ServiceDefinitionLoader::new(&config.service_definition_dir);
244    let registry = Mutex::new(BackendRegistry::new());
245    let spawn_coordinator = Mutex::new(SpawnCoordinator::new());
246    let router = HelloRouter::with_lifecycle_monitor(&loader, &registry)
247        .with_spawn_coordinator(&spawn_coordinator)
248        .with_backend_launcher(launcher);
249    let peer_policy =
250        PeerCredentialPolicy::current_user().ok_or(BrokerServeError::PeerPolicyUnavailable)?;
251    let started_at = Instant::now();
252    let fd_guard = FdPressureGuard::default();
253    let snapshot_provider = || {
254        let registry = registry
255            .lock()
256            .unwrap_or_else(|poisoned| poisoned.into_inner());
257        let demoted = fd_guard.is_demoted();
258        AdminSnapshot::from_registry("launch", started_at.elapsed(), !demoted, 0, &registry, &[])
259            .with_fd_pressure_demoted(demoted)
260    };
261    serve_control_socket_connections_with_limit_policy_post_hello_and_fd_guard(
262        &config.socket_path,
263        &router,
264        snapshot_provider,
265        config.connection_limit(),
266        &peer_policy,
267        |_stream, _reply| {},
268        &fd_guard,
269    )?;
270    Ok(())
271}
272
273/// Build a Hello handler from one service definition and backend endpoint.
274pub fn build_hello_handler(config: &BrokerServeConfig) -> Result<HelloHandler, BrokerServeError> {
275    let registered = build_registered_backend(config)?;
276    let backend = registered
277        .registry
278        .registered_backend_for(
279            &registered.instance,
280            &registered.service_definition,
281            &config.service_version,
282        )
283        .ok_or(BrokerServeError::RegisteredBackendMissing)?;
284
285    Ok(HelloHandler::new().with_backend(backend)?)
286}
287
288struct RegisteredServeBackend {
289    loader: ServiceDefinitionLoader,
290    registry: BackendRegistry,
291    instance: BrokerInstanceKey,
292    service_definition: ServiceDefinition,
293}
294
295fn build_registered_backend(
296    config: &BrokerServeConfig,
297) -> Result<RegisteredServeBackend, BrokerServeError> {
298    if config.backend_endpoint.is_empty() {
299        return Err(BrokerServeError::EmptyBackendEndpoint);
300    }
301
302    let loader = ServiceDefinitionLoader::new(&config.service_definition_dir);
303    let service_definition = loader.lookup_or_reload(&config.service_name)?;
304    check_version_allowed(&config.service_version, &service_definition)
305        .map_err(BrokerServeError::VersionPolicy)?;
306
307    let instance = BrokerInstanceKey::from_service_definition(&service_definition)?;
308    let endpoint = Endpoint {
309        namespace_id: instance.id(),
310        path: config.backend_endpoint.clone(),
311    };
312    let daemon = DaemonProcess::current_process(endpoint.clone(), Some(30))?;
313    let handle = BackendHandle::probe_with_service(
314        config.service_name.clone(),
315        config.service_version.clone(),
316        &endpoint,
317        &daemon,
318    )?;
319
320    let mut registry = BackendRegistry::new();
321    registry.insert(instance.clone(), handle);
322
323    Ok(RegisteredServeBackend {
324        loader,
325        registry,
326        instance,
327        service_definition,
328    })
329}
330
331/// Errors raised while wiring or serving the bounded broker.
332#[derive(Debug, thiserror::Error)]
333pub enum BrokerServeError {
334    /// The connection bound must be non-zero.
335    #[error("max_connections must be greater than zero")]
336    InvalidMaxConnections,
337    /// The configured backend endpoint is empty.
338    #[error("backend endpoint must not be empty")]
339    EmptyBackendEndpoint,
340    /// Service-definition load or validation failed.
341    #[error(transparent)]
342    ServiceDefinition(#[from] ServiceDefinitionError),
343    /// Service isolation could not be mapped to a broker instance.
344    #[error(transparent)]
345    BrokerInstance(#[from] BrokerInstanceError),
346    /// Current process identity could not be recorded for the configured backend.
347    #[error(transparent)]
348    Identity(#[from] IdentityError),
349    /// Current user SID hash could not be computed for backend endpoint allocation.
350    #[error(transparent)]
351    Sid(#[from] SidError),
352    /// Configured backend version is blocked by the service definition.
353    #[error("configured service version is blocked by service-definition policy: {0:?}")]
354    VersionPolicy(VersionPolicyBlock),
355    /// Backend identity verification failed.
356    #[error(transparent)]
357    BackendHandle(#[from] BackendHandleError),
358    /// Registry lookup failed after inserting the configured backend.
359    #[error("registered backend was missing after registry insert")]
360    RegisteredBackendMissing,
361    /// Hello handler construction failed.
362    #[error(transparent)]
363    HelloHandler(#[from] HelloHandlerError),
364    /// The platform current-user peer policy could not be constructed.
365    #[error("current-user peer credential policy is unavailable")]
366    PeerPolicyUnavailable,
367    /// Local-socket serving failed.
368    #[error(transparent)]
369    Connection(#[from] BrokerConnectionError),
370    /// Shared control-socket serving failed.
371    #[error(transparent)]
372    ControlSocket(#[from] ControlSocketError),
373}