liminal_server/server/
runtime.rs1use std::path::Path;
2use std::sync::Arc;
3
4use crate::ServerError;
5use crate::cluster::{self, ClusterHandle};
6use crate::config::file::load_config;
7use crate::config::types::ClusterConfig;
8use crate::health::{ReadinessState, SharedReadinessState, start_health_server};
9use crate::server::connection::ConnectionSupervisor;
10use crate::server::connection::services::{ChannelCluster, LiminalConnectionServices};
11use crate::server::listener::ServerListener;
12use crate::server::shutdown::{ShutdownHandle, register_signal_handlers, run_shutdown_sequence};
13
14pub fn run(config_path: &Path) -> Result<(), ServerError> {
20 if config_path.as_os_str().is_empty() {
21 return Err(ServerError::ConfigLoad {
22 message: "configuration path is empty".to_owned(),
23 });
24 }
25
26 let config = load_config(config_path)?;
27
28 let readiness = SharedReadinessState::new(ReadinessState::default());
29 let health_server = start_health_server(config.health_listen_address, readiness.clone())?;
30 let shutdown_handle = ShutdownHandle::new();
31 let signal_registration = register_signal_handlers(shutdown_handle.clone())?;
32
33 let services = Arc::new(LiminalConnectionServices::from_config(&config)?);
37 let channel_cluster = services.channel_cluster().clone();
38 let connection_supervisor = ConnectionSupervisor::with_services(services)?;
39
40 let cluster_handle = match config.cluster.as_ref() {
45 Some(cluster_config) => Some(start_cluster(&channel_cluster, cluster_config)?),
46 None => None,
47 };
48
49 let mut listener = ServerListener::bind(&config, connection_supervisor)?;
50 readiness.set_config_loaded(true);
51 readiness.set_listener_bound(true);
52 readiness.set_cluster_configured(config.cluster.is_some());
53
54 tracing::debug!(
55 config_path = %config_path.display(),
56 listen_address = %config.listen_address,
57 health_listen_address = %health_server.local_addr(),
58 "liminal server configuration validated"
59 );
60
61 tracing::info!(
62 listen_address = %listener.local_addr(),
63 health_listen_address = %health_server.local_addr(),
64 "liminal server started"
65 );
66
67 shutdown_handle.wait();
68 readiness.set_listener_bound(false);
69
70 if let Some(mut cluster_handle) = cluster_handle {
74 cluster_handle.shutdown();
75 }
76
77 let supervisor = listener.supervisor();
78 let shutdown_result = run_shutdown_sequence(&mut listener, &supervisor, config.drain_timeout());
79 drop(signal_registration);
80 health_server.shutdown()?;
81 shutdown_result
82}
83
84fn start_cluster(
90 channel_cluster: &ChannelCluster,
91 cluster_config: &ClusterConfig,
92) -> Result<ClusterHandle, ServerError> {
93 let resolver = channel_cluster
94 .resolver()
95 .cloned()
96 .ok_or_else(|| ServerError::ClusterJoin {
97 message: "clustering configured but channel supervisor has no distribution resolver"
98 .to_owned(),
99 })?;
100 let scheduler = channel_cluster.supervisor().scheduler();
101 let supervisor = channel_cluster.supervisor().clone();
102 cluster::start(&scheduler, resolver, cluster_config, move |sync| {
103 supervisor.install_observer(Arc::new(sync));
104 })
105}