Skip to main content

liminal_server/server/connection/
services_cluster.rs

1//! SRV-005: the shared (optionally clustered) channel supervisor used by the
2//! server's connection services.
3//!
4//! Extracted from `services.rs` to keep that file within the 500-line limit. A
5//! single [`ChannelCluster`] holds the one [`ChannelSupervisor`] every configured
6//! channel runs on, plus the [`ClusterResolver`] shared with that supervisor's
7//! scheduler when clustering is configured. Building it is the only place the
8//! choice between a clustered and a plain supervisor is made.
9
10use std::sync::Arc;
11
12use liminal::channel::{ChannelRestartPolicy, ChannelSupervisor};
13
14use crate::ServerError;
15use crate::cluster::discovery::{ClusterResolver, as_resolver};
16use crate::config::types::ClusterConfig;
17
18/// The shared channel supervisor backing every configured channel (SRV-005).
19///
20/// Plus the cluster resolver when clustering is configured. All channels run on
21/// this ONE supervisor's scheduler so they share the clustered distribution
22/// transport; the resolver is the SAME instance handed to that scheduler, kept
23/// here so the cluster can dial seeds and learn peer names on it.
24#[derive(Clone, Debug)]
25pub struct ChannelCluster {
26    supervisor: ChannelSupervisor,
27    resolver: Option<Arc<ClusterResolver>>,
28}
29
30impl ChannelCluster {
31    /// The shared channel supervisor.
32    #[must_use]
33    pub const fn supervisor(&self) -> &ChannelSupervisor {
34        &self.supervisor
35    }
36
37    /// The cluster resolver, present only when clustering is configured.
38    #[must_use]
39    pub const fn resolver(&self) -> Option<&Arc<ClusterResolver>> {
40        self.resolver.as_ref()
41    }
42}
43
44/// The channel restart policy used for server-hosted channels. Matches the
45/// library default (one-for-one with a bounded budget) and is passed explicitly
46/// because the distribution-enabled constructor takes a policy.
47fn server_channel_policy() -> ChannelRestartPolicy {
48    ChannelRestartPolicy::default()
49}
50
51/// Builds the shared channel supervisor (SRV-005).
52///
53/// When `cluster` is `Some`, the supervisor's scheduler is distribution-enabled
54/// with the configured node identity and cookie, and a [`ClusterResolver`] is
55/// created and shared between the scheduler and the returned [`ChannelCluster`].
56/// When `None`, an ordinary non-clustered supervisor is built.
57///
58/// # Errors
59/// Returns [`ServerError`] when the underlying scheduler cannot start.
60pub fn build_channel_cluster(
61    cluster: Option<&ClusterConfig>,
62) -> Result<ChannelCluster, ServerError> {
63    let Some(cluster) = cluster else {
64        let supervisor =
65            ChannelSupervisor::with_policy(server_channel_policy()).map_err(|error| {
66                ServerError::ConfigValidation {
67                    message: format!("failed to start channel supervisor: {error}"),
68                }
69            })?;
70        return Ok(ChannelCluster {
71            supervisor,
72            resolver: None,
73        });
74    };
75    let resolver = Arc::new(ClusterResolver::new());
76    let supervisor = ChannelSupervisor::with_distribution(
77        cluster.node_name.clone(),
78        node_creation(),
79        cluster.cookie.clone(),
80        as_resolver(Arc::clone(&resolver)),
81        server_channel_policy(),
82    )
83    .map_err(|error| ServerError::ClusterJoin {
84        message: format!("failed to start clustered channel supervisor: {error}"),
85    })?;
86    Ok(ChannelCluster {
87        supervisor,
88        resolver: Some(resolver),
89    })
90}
91
92/// A non-zero node-incarnation value for beamr distribution. Derived from the
93/// process start time so two incarnations of the same node name on one host get
94/// distinct creations; never zero (beamr reserves 0 for "unknown creation").
95fn node_creation() -> u32 {
96    let since_epoch = std::time::SystemTime::now()
97        .duration_since(std::time::UNIX_EPOCH)
98        .map(|duration| duration.as_secs())
99        .unwrap_or(0);
100    // Fold the 64-bit seconds into 31 bits (the low word, masked positive) so the
101    // creation is a stable-per-process, non-zero value; the exact bits do not
102    // matter, only that incarnations differ and none is zero.
103    let folded = u32::try_from(since_epoch & u64::from(u32::MAX)).unwrap_or(u32::MAX);
104    (folded & 0x7fff_ffff).max(1)
105}