use std::sync::{Arc, Mutex};
use ::net::adapter::net::behavior::loadbalance::Strategy;
use ::net::adapter::net::compute::DaemonHostConfig;
use ::net::adapter::net::compute::{
replica_group::ReplicaGroup as CoreReplicaGroup,
replica_group::ReplicaGroupConfig as CoreReplicaGroupConfig,
};
use crate::compute::DaemonRuntime;
use crate::groups::common::{GroupHealth, MemberInfo, RequestContext};
use crate::groups::error::GroupError;
#[derive(Debug, Clone)]
pub struct ReplicaGroupConfig {
pub replica_count: u8,
pub group_seed: [u8; 32],
pub lb_strategy: Strategy,
pub host_config: DaemonHostConfig,
}
impl From<ReplicaGroupConfig> for CoreReplicaGroupConfig {
fn from(cfg: ReplicaGroupConfig) -> Self {
CoreReplicaGroupConfig {
replica_count: cfg.replica_count,
group_seed: cfg.group_seed,
lb_strategy: cfg.lb_strategy,
host_config: cfg.host_config,
}
}
}
pub struct ReplicaGroup {
inner: Arc<Mutex<CoreReplicaGroup>>,
runtime: DaemonRuntime,
kind: String,
}
impl ReplicaGroup {
pub fn spawn(
runtime: &DaemonRuntime,
kind: &str,
config: ReplicaGroupConfig,
) -> Result<Self, GroupError> {
if !runtime.is_ready_pub() {
return Err(GroupError::NotReady);
}
let factory = runtime
.factory_for_kind_pub(kind)
.map_err(|_| GroupError::FactoryNotFound(kind.to_string()))?;
let scheduler = runtime.scheduler_arc();
let registry = runtime.registry_arc();
let core =
CoreReplicaGroup::spawn(config.into(), move || (factory)(), &scheduler, ®istry)?;
Ok(Self {
inner: Arc::new(Mutex::new(core)),
runtime: runtime.clone(),
kind: kind.to_string(),
})
}
pub fn kind(&self) -> &str {
&self.kind
}
pub fn route_event(&self, ctx: &RequestContext) -> Result<u64, GroupError> {
let guard = self.inner.lock().expect("ReplicaGroup mutex poisoned");
Ok(guard.route_event(ctx)?)
}
pub fn scale_to(&self, n: u8) -> Result<(), GroupError> {
let factory = self
.runtime
.factory_for_kind_pub(&self.kind)
.map_err(|_| GroupError::FactoryNotFound(self.kind.clone()))?;
let scheduler = self.runtime.scheduler_arc();
let registry = self.runtime.registry_arc();
let mut guard = self.inner.lock().expect("ReplicaGroup mutex poisoned");
guard.scale_to(n, move || (factory)(), &scheduler, ®istry)?;
Ok(())
}
pub fn on_node_failure(&self, failed_node_id: u64) -> Result<Vec<u8>, GroupError> {
let factory = self
.runtime
.factory_for_kind_pub(&self.kind)
.map_err(|_| GroupError::FactoryNotFound(self.kind.clone()))?;
let scheduler = self.runtime.scheduler_arc();
let registry = self.runtime.registry_arc();
let mut guard = self.inner.lock().expect("ReplicaGroup mutex poisoned");
let replaced =
guard.on_node_failure(failed_node_id, move || (factory)(), &scheduler, ®istry)?;
Ok(replaced)
}
pub fn on_node_recovery(&self, recovered_node_id: u64) {
let registry = self.runtime.registry_arc();
let mut guard = self.inner.lock().expect("ReplicaGroup mutex poisoned");
guard.on_node_recovery(recovered_node_id, ®istry);
}
pub fn health(&self) -> GroupHealth {
self.inner
.lock()
.expect("ReplicaGroup mutex poisoned")
.health()
}
pub fn group_id(&self) -> u32 {
self.inner
.lock()
.expect("ReplicaGroup mutex poisoned")
.group_id()
}
pub fn replicas(&self) -> Vec<MemberInfo> {
self.inner
.lock()
.expect("ReplicaGroup mutex poisoned")
.replicas()
.to_vec()
}
pub fn replica_count(&self) -> u8 {
self.inner
.lock()
.expect("ReplicaGroup mutex poisoned")
.replica_count()
}
pub fn healthy_count(&self) -> u8 {
self.inner
.lock()
.expect("ReplicaGroup mutex poisoned")
.healthy_count()
}
}
impl std::fmt::Debug for ReplicaGroup {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let guard = self.inner.lock().expect("ReplicaGroup mutex poisoned");
f.debug_struct("ReplicaGroup")
.field("group_id", &format_args!("{:#x}", guard.group_id()))
.field("replica_count", &guard.replica_count())
.field("healthy_count", &guard.healthy_count())
.finish()
}
}