use std::sync::Arc;
use liminal::channel::{ChannelRestartPolicy, ChannelSupervisor};
use crate::ServerError;
use crate::cluster::discovery::{ClusterResolver, as_resolver};
use crate::config::types::ClusterConfig;
#[derive(Clone, Debug)]
pub struct ChannelCluster {
supervisor: ChannelSupervisor,
resolver: Option<Arc<ClusterResolver>>,
}
impl ChannelCluster {
#[must_use]
pub const fn supervisor(&self) -> &ChannelSupervisor {
&self.supervisor
}
#[must_use]
pub const fn resolver(&self) -> Option<&Arc<ClusterResolver>> {
self.resolver.as_ref()
}
}
fn server_channel_policy() -> ChannelRestartPolicy {
ChannelRestartPolicy::default()
}
pub fn build_channel_cluster(
cluster: Option<&ClusterConfig>,
) -> Result<ChannelCluster, ServerError> {
let Some(cluster) = cluster else {
let supervisor =
ChannelSupervisor::with_policy(server_channel_policy()).map_err(|error| {
ServerError::ConfigValidation {
message: format!("failed to start channel supervisor: {error}"),
}
})?;
return Ok(ChannelCluster {
supervisor,
resolver: None,
});
};
let resolver = Arc::new(ClusterResolver::new());
let supervisor = ChannelSupervisor::with_distribution(
cluster.node_name.clone(),
node_creation(),
cluster.cookie.clone(),
as_resolver(Arc::clone(&resolver)),
server_channel_policy(),
)
.map_err(|error| ServerError::ClusterJoin {
message: format!("failed to start clustered channel supervisor: {error}"),
})?;
Ok(ChannelCluster {
supervisor,
resolver: Some(resolver),
})
}
fn node_creation() -> u32 {
let since_epoch = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|duration| duration.as_secs())
.unwrap_or(0);
let folded = u32::try_from(since_epoch & u64::from(u32::MAX)).unwrap_or(u32::MAX);
(folded & 0x7fff_ffff).max(1)
}