use std::num::NonZeroUsize;
use std::path::PathBuf;
use std::sync::Mutex;
use std::time::Instant;
use crate::broker::backend_handle::{BackendHandle, BackendHandleError, DaemonProcess};
use crate::broker::backend_lifecycle::identity::IdentityError;
use crate::broker::lifecycle::sid::SidError;
use crate::broker::protocol::{Endpoint, ServiceDefinition};
use super::admin::AdminSnapshot;
use super::backend_launcher::{BackendLauncher, CommandBackendLauncher};
use super::backend_registry::BackendRegistry;
use super::connection::{BrokerConnectionError, PeerCredentialPolicy};
use super::control_socket::{
serve_control_socket_connections_with_limit_policy_post_hello_and_fd_guard,
ControlSocketConnectionLimit, ControlSocketError,
};
use super::fd_pressure::FdPressureGuard;
use super::handoff_serve::{complete_negotiated_handoff, ServeHandoffContext};
use super::hello_handler::{HelloHandler, HelloHandlerError};
use super::hello_router::HelloRouter;
use super::instance::{BrokerInstanceError, BrokerInstanceKey};
use super::service_def_loader::{
service_definition_dir, ServiceDefinitionError, ServiceDefinitionLoader,
};
use super::spawn_coordinator::SpawnCoordinator;
use super::version_allow_list::{check_version_allowed, VersionPolicyBlock};
#[derive(Clone, Debug)]
pub struct BrokerServeConfig {
pub socket_path: String,
pub service_name: String,
pub service_version: String,
pub backend_endpoint: String,
pub service_definition_dir: PathBuf,
pub max_connections: Option<NonZeroUsize>,
pub handoff_endpoint: Option<String>,
}
#[derive(Clone, Debug)]
pub struct BrokerLaunchServeConfig {
pub socket_path: String,
pub service_definition_dir: PathBuf,
pub max_connections: Option<NonZeroUsize>,
}
impl BrokerServeConfig {
pub fn new(
socket_path: impl Into<String>,
service_name: impl Into<String>,
service_version: impl Into<String>,
backend_endpoint: impl Into<String>,
max_connections: usize,
) -> Result<Self, BrokerServeError> {
Ok(Self {
socket_path: socket_path.into(),
service_name: service_name.into(),
service_version: service_version.into(),
backend_endpoint: backend_endpoint.into(),
service_definition_dir: service_definition_dir(),
max_connections: Some(
NonZeroUsize::new(max_connections)
.ok_or(BrokerServeError::InvalidMaxConnections)?,
),
handoff_endpoint: None,
})
}
pub fn unbounded(
socket_path: impl Into<String>,
service_name: impl Into<String>,
service_version: impl Into<String>,
backend_endpoint: impl Into<String>,
) -> Self {
Self {
socket_path: socket_path.into(),
service_name: service_name.into(),
service_version: service_version.into(),
backend_endpoint: backend_endpoint.into(),
service_definition_dir: service_definition_dir(),
max_connections: None,
handoff_endpoint: None,
}
}
pub fn with_service_definition_dir(mut self, root: impl Into<PathBuf>) -> Self {
self.service_definition_dir = root.into();
self
}
pub fn with_handoff_endpoint(mut self, endpoint: impl Into<String>) -> Self {
self.handoff_endpoint = Some(endpoint.into());
self
}
pub fn connection_limit(&self) -> ControlSocketConnectionLimit {
self.max_connections.map_or(
ControlSocketConnectionLimit::Unbounded,
ControlSocketConnectionLimit::Bounded,
)
}
}
impl BrokerLaunchServeConfig {
pub fn new(
socket_path: impl Into<String>,
max_connections: usize,
) -> Result<Self, BrokerServeError> {
Ok(Self {
socket_path: socket_path.into(),
service_definition_dir: service_definition_dir(),
max_connections: Some(
NonZeroUsize::new(max_connections)
.ok_or(BrokerServeError::InvalidMaxConnections)?,
),
})
}
pub fn unbounded(socket_path: impl Into<String>) -> Self {
Self {
socket_path: socket_path.into(),
service_definition_dir: service_definition_dir(),
max_connections: None,
}
}
pub fn with_service_definition_dir(mut self, root: impl Into<PathBuf>) -> Self {
self.service_definition_dir = root.into();
self
}
pub fn connection_limit(&self) -> ControlSocketConnectionLimit {
self.max_connections.map_or(
ControlSocketConnectionLimit::Unbounded,
ControlSocketConnectionLimit::Bounded,
)
}
}
pub fn serve_registered_backend(config: BrokerServeConfig) -> Result<(), BrokerServeError> {
let RegisteredServeBackend {
loader,
registry,
instance,
..
} = build_registered_backend(&config)?;
let registry = Mutex::new(registry);
let router = HelloRouter::with_lifecycle_monitor(&loader, ®istry);
let peer_policy =
PeerCredentialPolicy::current_user().ok_or(BrokerServeError::PeerPolicyUnavailable)?;
let started_at = Instant::now();
let fd_guard = FdPressureGuard::default();
let snapshot_provider = || {
let registry = registry
.lock()
.unwrap_or_else(|poisoned| poisoned.into_inner());
let demoted = fd_guard.is_demoted();
AdminSnapshot::from_registry(
instance.id(),
started_at.elapsed(),
!demoted,
0,
®istry,
&[],
)
.with_fd_pressure_demoted(demoted)
};
serve_control_socket_connections_with_limit_policy_post_hello_and_fd_guard(
&config.socket_path,
&router,
snapshot_provider,
config.connection_limit(),
&peer_policy,
|stream, reply| {
let Some(handoff_endpoint) = config.handoff_endpoint.as_deref() else {
return;
};
let ctx = ServeHandoffContext {
handoff_endpoint,
service_name: &config.service_name,
service_version: &config.service_version,
instance: &instance,
registry: ®istry,
};
complete_negotiated_handoff(&ctx, stream, reply);
},
&fd_guard,
)?;
Ok(())
}
pub fn serve_launching_backends(config: BrokerLaunchServeConfig) -> Result<(), BrokerServeError> {
let launcher = CommandBackendLauncher::for_current_user()?;
serve_launching_backends_with_launcher(config, &launcher)
}
pub fn serve_launching_backends_with_launcher(
config: BrokerLaunchServeConfig,
launcher: &dyn BackendLauncher,
) -> Result<(), BrokerServeError> {
let loader = ServiceDefinitionLoader::new(&config.service_definition_dir);
let registry = Mutex::new(BackendRegistry::new());
let spawn_coordinator = Mutex::new(SpawnCoordinator::new());
let router = HelloRouter::with_lifecycle_monitor(&loader, ®istry)
.with_spawn_coordinator(&spawn_coordinator)
.with_backend_launcher(launcher);
let peer_policy =
PeerCredentialPolicy::current_user().ok_or(BrokerServeError::PeerPolicyUnavailable)?;
let started_at = Instant::now();
let fd_guard = FdPressureGuard::default();
let snapshot_provider = || {
let registry = registry
.lock()
.unwrap_or_else(|poisoned| poisoned.into_inner());
let demoted = fd_guard.is_demoted();
AdminSnapshot::from_registry("launch", started_at.elapsed(), !demoted, 0, ®istry, &[])
.with_fd_pressure_demoted(demoted)
};
serve_control_socket_connections_with_limit_policy_post_hello_and_fd_guard(
&config.socket_path,
&router,
snapshot_provider,
config.connection_limit(),
&peer_policy,
|_stream, _reply| {},
&fd_guard,
)?;
Ok(())
}
pub fn build_hello_handler(config: &BrokerServeConfig) -> Result<HelloHandler, BrokerServeError> {
let registered = build_registered_backend(config)?;
let backend = registered
.registry
.registered_backend_for(
®istered.instance,
®istered.service_definition,
&config.service_version,
)
.ok_or(BrokerServeError::RegisteredBackendMissing)?;
Ok(HelloHandler::new().with_backend(backend)?)
}
struct RegisteredServeBackend {
loader: ServiceDefinitionLoader,
registry: BackendRegistry,
instance: BrokerInstanceKey,
service_definition: ServiceDefinition,
}
fn build_registered_backend(
config: &BrokerServeConfig,
) -> Result<RegisteredServeBackend, BrokerServeError> {
if config.backend_endpoint.is_empty() {
return Err(BrokerServeError::EmptyBackendEndpoint);
}
let loader = ServiceDefinitionLoader::new(&config.service_definition_dir);
let service_definition = loader.lookup_or_reload(&config.service_name)?;
check_version_allowed(&config.service_version, &service_definition)
.map_err(BrokerServeError::VersionPolicy)?;
let instance = BrokerInstanceKey::from_service_definition(&service_definition)?;
let endpoint = Endpoint {
namespace_id: instance.id(),
path: config.backend_endpoint.clone(),
};
let daemon = DaemonProcess::current_process(endpoint.clone(), Some(30))?;
let handle = BackendHandle::probe_with_service(
config.service_name.clone(),
config.service_version.clone(),
&endpoint,
&daemon,
)?;
let mut registry = BackendRegistry::new();
registry.insert(instance.clone(), handle);
Ok(RegisteredServeBackend {
loader,
registry,
instance,
service_definition,
})
}
#[derive(Debug, thiserror::Error)]
pub enum BrokerServeError {
#[error("max_connections must be greater than zero")]
InvalidMaxConnections,
#[error("backend endpoint must not be empty")]
EmptyBackendEndpoint,
#[error(transparent)]
ServiceDefinition(#[from] ServiceDefinitionError),
#[error(transparent)]
BrokerInstance(#[from] BrokerInstanceError),
#[error(transparent)]
Identity(#[from] IdentityError),
#[error(transparent)]
Sid(#[from] SidError),
#[error("configured service version is blocked by service-definition policy: {0:?}")]
VersionPolicy(VersionPolicyBlock),
#[error(transparent)]
BackendHandle(#[from] BackendHandleError),
#[error("registered backend was missing after registry insert")]
RegisteredBackendMissing,
#[error(transparent)]
HelloHandler(#[from] HelloHandlerError),
#[error("current-user peer credential policy is unavailable")]
PeerPolicyUnavailable,
#[error(transparent)]
Connection(#[from] BrokerConnectionError),
#[error(transparent)]
ControlSocket(#[from] ControlSocketError),
}