liminal_server/server/connection/services_cluster.rs
1//! SRV-005: the shared (optionally clustered) channel supervisor used by the
2//! server's connection services.
3//!
4//! Extracted from `services.rs` to keep that file within the 500-line limit. A
5//! single [`ChannelCluster`] holds the one [`ChannelSupervisor`] every configured
6//! channel runs on, plus the [`ClusterResolver`] shared with that supervisor's
7//! scheduler when clustering is configured. Building it is the only place the
8//! choice between a clustered and a plain supervisor is made.
9
10use std::sync::Arc;
11
12use liminal::channel::{ChannelRestartPolicy, ChannelSupervisor};
13
14use crate::ServerError;
15use crate::cluster::discovery::{ClusterResolver, as_resolver};
16use crate::config::types::ClusterConfig;
17
18/// The shared channel supervisor backing every configured channel (SRV-005).
19///
20/// Plus the cluster resolver when clustering is configured. All channels run on
21/// this ONE supervisor's scheduler so they share the clustered distribution
22/// transport; the resolver is the SAME instance handed to that scheduler, kept
23/// here so the cluster can dial seeds and learn peer names on it.
24#[derive(Clone, Debug)]
25pub struct ChannelCluster {
26 supervisor: ChannelSupervisor,
27 resolver: Option<Arc<ClusterResolver>>,
28}
29
30impl ChannelCluster {
31 /// The shared channel supervisor.
32 #[must_use]
33 pub const fn supervisor(&self) -> &ChannelSupervisor {
34 &self.supervisor
35 }
36
37 /// The cluster resolver, present only when clustering is configured.
38 #[must_use]
39 pub const fn resolver(&self) -> Option<&Arc<ClusterResolver>> {
40 self.resolver.as_ref()
41 }
42}
43
44/// The channel restart policy used for server-hosted channels. Matches the
45/// library default (one-for-one with a bounded budget) and is passed explicitly
46/// because the distribution-enabled constructor takes a policy.
47fn server_channel_policy() -> ChannelRestartPolicy {
48 ChannelRestartPolicy::default()
49}
50
51/// Builds the shared channel supervisor (SRV-005).
52///
53/// When `cluster` is `Some`, the supervisor's scheduler is distribution-enabled
54/// with the configured node identity and cookie, and a [`ClusterResolver`] is
55/// created and shared between the scheduler and the returned [`ChannelCluster`].
56/// When `None`, an ordinary non-clustered supervisor is built.
57///
58/// # Errors
59/// Returns [`ServerError`] when the underlying scheduler cannot start.
60pub fn build_channel_cluster(
61 cluster: Option<&ClusterConfig>,
62) -> Result<ChannelCluster, ServerError> {
63 let Some(cluster) = cluster else {
64 let supervisor =
65 ChannelSupervisor::with_policy(server_channel_policy()).map_err(|error| {
66 ServerError::ConfigValidation {
67 message: format!("failed to start channel supervisor: {error}"),
68 }
69 })?;
70 return Ok(ChannelCluster {
71 supervisor,
72 resolver: None,
73 });
74 };
75 let resolver = Arc::new(ClusterResolver::new());
76 let supervisor = ChannelSupervisor::with_distribution(
77 cluster.node_name.clone(),
78 node_creation(),
79 cluster.cookie.clone(),
80 as_resolver(Arc::clone(&resolver)),
81 server_channel_policy(),
82 )
83 .map_err(|error| ServerError::ClusterJoin {
84 message: format!("failed to start clustered channel supervisor: {error}"),
85 })?;
86 Ok(ChannelCluster {
87 supervisor,
88 resolver: Some(resolver),
89 })
90}
91
92/// A non-zero node-incarnation value for beamr distribution. Derived from the
93/// process start time so two incarnations of the same node name on one host get
94/// distinct creations; never zero (beamr reserves 0 for "unknown creation").
95fn node_creation() -> u32 {
96 let since_epoch = std::time::SystemTime::now()
97 .duration_since(std::time::UNIX_EPOCH)
98 .map(|duration| duration.as_secs())
99 .unwrap_or(0);
100 // Fold the 64-bit seconds into 31 bits (the low word, masked positive) so the
101 // creation is a stable-per-process, non-zero value; the exact bits do not
102 // matter, only that incarnations differ and none is zero.
103 let folded = u32::try_from(since_epoch & u64::from(u32::MAX)).unwrap_or(u32::MAX);
104 (folded & 0x7fff_ffff).max(1)
105}