Skip to main content

liminal_server/server/
runtime.rs

1use std::path::Path;
2use std::sync::Arc;
3
4use crate::ServerError;
5use crate::cluster::{self, ClusterHandle};
6use crate::config::file::load_config;
7use crate::config::types::ClusterConfig;
8use crate::health::{ReadinessState, SharedReadinessState, start_health_server};
9use crate::server::connection::ConnectionSupervisor;
10use crate::server::connection::services::{ChannelCluster, LiminalConnectionServices};
11use crate::server::listener::ServerListener;
12use crate::server::shutdown::{ShutdownHandle, register_signal_handlers, run_shutdown_sequence};
13
14/// Starts the server deployment wrapper for the supplied configuration path.
15///
16/// # Errors
17///
18/// Returns [`ServerError`] when a later server lifecycle phase fails.
19pub fn run(config_path: &Path) -> Result<(), ServerError> {
20    if config_path.as_os_str().is_empty() {
21        return Err(ServerError::ConfigLoad {
22            message: "configuration path is empty".to_owned(),
23        });
24    }
25
26    let config = load_config(config_path)?;
27
28    let readiness = SharedReadinessState::new(ReadinessState::default());
29    let health_server = start_health_server(config.health_listen_address, readiness.clone())?;
30    let shutdown_handle = ShutdownHandle::new();
31    let signal_registration = register_signal_handlers(shutdown_handle.clone())?;
32
33    // Build the library-backed services once so we can reach the shared (possibly
34    // clustered) channel supervisor before the connection supervisor takes
35    // ownership of the services as a trait object.
36    let services = Arc::new(LiminalConnectionServices::from_config(&config)?);
37    let channel_cluster = services.channel_cluster().clone();
38    let connection_supervisor = ConnectionSupervisor::with_services(services)?;
39
40    // SRV-005: start clustering on the channel-supervisor scheduler when a
41    // [cluster] section is configured. The returned handle owns the inbound
42    // distribution listener and the membership poll loop; it must outlive the
43    // server and is torn down in the shutdown sequence below.
44    let cluster_handle = match config.cluster.as_ref() {
45        Some(cluster_config) => Some(start_cluster(&channel_cluster, cluster_config)?),
46        None => None,
47    };
48
49    let mut listener = ServerListener::bind(&config, connection_supervisor)?;
50    readiness.set_config_loaded(true);
51    readiness.set_listener_bound(true);
52    readiness.set_cluster_configured(config.cluster.is_some());
53
54    tracing::debug!(
55        config_path = %config_path.display(),
56        listen_address = %config.listen_address,
57        health_listen_address = %health_server.local_addr(),
58        "liminal server configuration validated"
59    );
60
61    tracing::info!(
62        listen_address = %listener.local_addr(),
63        health_listen_address = %health_server.local_addr(),
64        "liminal server started"
65    );
66
67    shutdown_handle.wait();
68    readiness.set_listener_bound(false);
69
70    // Tear the cluster down before draining connections: stop accepting peer
71    // links and halt the membership poll loop. Each node shuts down independently
72    // (no cluster-wide coordinated shutdown — that boundary belongs to SRV-004).
73    if let Some(mut cluster_handle) = cluster_handle {
74        cluster_handle.shutdown();
75    }
76
77    let supervisor = listener.supervisor();
78    let shutdown_result = run_shutdown_sequence(&mut listener, &supervisor, config.drain_timeout());
79    drop(signal_registration);
80    health_server.shutdown()?;
81    shutdown_result
82}
83
84/// Starts clustering on the shared channel supervisor's scheduler (SRV-005).
85///
86/// Installs the cluster `sync` as the supervisor's [`ClusterObserver`] so channel
87/// subscribe/unsubscribe/publish events drive process-group membership and
88/// cross-node fan-out.
89fn start_cluster(
90    channel_cluster: &ChannelCluster,
91    cluster_config: &ClusterConfig,
92) -> Result<ClusterHandle, ServerError> {
93    let resolver = channel_cluster
94        .resolver()
95        .cloned()
96        .ok_or_else(|| ServerError::ClusterJoin {
97            message: "clustering configured but channel supervisor has no distribution resolver"
98                .to_owned(),
99        })?;
100    let scheduler = channel_cluster.supervisor().scheduler();
101    let supervisor = channel_cluster.supervisor().clone();
102    cluster::start(&scheduler, resolver, cluster_config, move |sync| {
103        supervisor.install_observer(Arc::new(sync));
104    })
105}