use std::sync::{Arc, Mutex};
use ::net::adapter::net::compute::DaemonHostConfig;
use ::net::adapter::net::compute::{
standby_group::StandbyGroup as CoreStandbyGroup,
standby_group::StandbyGroupConfig as CoreStandbyGroupConfig,
};
use ::net::adapter::net::state::causal::CausalEvent;
use crate::compute::{DaemonRuntime, ObserverHandle};
use crate::groups::common::{GroupHealth, MemberInfo, MemberRole};
use crate::groups::error::GroupError;
#[derive(Debug, Clone)]
pub struct StandbyGroupConfig {
pub member_count: u8,
pub group_seed: [u8; 32],
pub host_config: DaemonHostConfig,
}
impl From<StandbyGroupConfig> for CoreStandbyGroupConfig {
fn from(cfg: StandbyGroupConfig) -> Self {
CoreStandbyGroupConfig {
member_count: cfg.member_count,
group_seed: cfg.group_seed,
host_config: cfg.host_config,
}
}
}
pub struct StandbyGroup {
observer_handle: Mutex<Option<ObserverHandle>>,
inner: Arc<Mutex<CoreStandbyGroup>>,
runtime: DaemonRuntime,
kind: String,
}
fn make_buffer_observer(inner: &Arc<Mutex<CoreStandbyGroup>>) -> crate::compute::DeliverObserver {
let weak = Arc::downgrade(inner);
Arc::new(move |event: &CausalEvent| {
if let Some(core) = weak.upgrade() {
if let Ok(mut guard) = core.lock() {
guard.on_event_delivered(event.clone());
}
}
})
}
impl StandbyGroup {
pub fn spawn(
runtime: &DaemonRuntime,
kind: &str,
config: StandbyGroupConfig,
) -> Result<Self, GroupError> {
if !runtime.is_ready_pub() {
return Err(GroupError::NotReady);
}
let factory = runtime
.factory_for_kind_pub(kind)
.map_err(|_| GroupError::FactoryNotFound(kind.to_string()))?;
let scheduler = runtime.scheduler_arc();
let registry = runtime.registry_arc();
let core =
CoreStandbyGroup::spawn(config.into(), move || (factory)(), &scheduler, ®istry)?;
let inner = Arc::new(Mutex::new(core));
let active_origin = inner
.lock()
.expect("StandbyGroup mutex poisoned")
.active_origin();
let observer = make_buffer_observer(&inner);
let handle = runtime.register_deliver_observer(active_origin, observer);
Ok(Self {
observer_handle: Mutex::new(Some(handle)),
inner,
runtime: runtime.clone(),
kind: kind.to_string(),
})
}
fn rebind_observer(&self, new_origin: u64) {
let observer = make_buffer_observer(&self.inner);
let new_handle = self.runtime.register_deliver_observer(new_origin, observer);
let mut slot = self
.observer_handle
.lock()
.expect("StandbyGroup observer-handle mutex poisoned");
*slot = Some(new_handle);
}
pub fn kind(&self) -> &str {
&self.kind
}
pub fn active_origin(&self) -> u64 {
self.inner
.lock()
.expect("StandbyGroup mutex poisoned")
.active_origin()
}
#[doc(hidden)]
pub fn on_event_delivered(&self, event: CausalEvent) {
self.inner
.lock()
.expect("StandbyGroup mutex poisoned")
.on_event_delivered(event);
}
pub fn sync_standbys(&self) -> Result<u64, GroupError> {
let registry = self.runtime.registry_arc();
let mut guard = self.inner.lock().expect("StandbyGroup mutex poisoned");
Ok(guard.sync_standbys(®istry)?)
}
pub fn promote(&self) -> Result<u64, GroupError> {
let factory = self
.runtime
.factory_for_kind_pub(&self.kind)
.map_err(|_| GroupError::FactoryNotFound(self.kind.clone()))?;
let scheduler = self.runtime.scheduler_arc();
let registry = self.runtime.registry_arc();
let new_origin = {
let mut guard = self.inner.lock().expect("StandbyGroup mutex poisoned");
guard.promote(move || (factory)(), ®istry, &scheduler)?
};
self.rebind_observer(new_origin);
Ok(new_origin)
}
pub fn on_node_failure(&self, failed_node_id: u64) -> Result<Option<u64>, GroupError> {
let factory = self
.runtime
.factory_for_kind_pub(&self.kind)
.map_err(|_| GroupError::FactoryNotFound(self.kind.clone()))?;
let scheduler = self.runtime.scheduler_arc();
let registry = self.runtime.registry_arc();
let result = {
let mut guard = self.inner.lock().expect("StandbyGroup mutex poisoned");
guard.on_node_failure(failed_node_id, move || (factory)(), &scheduler, ®istry)?
};
if let Some(new_origin) = result {
self.rebind_observer(new_origin);
}
Ok(result)
}
pub fn on_node_recovery(&self, recovered_node_id: u64) {
let registry = self.runtime.registry_arc();
let mut guard = self.inner.lock().expect("StandbyGroup mutex poisoned");
guard.on_node_recovery(recovered_node_id, ®istry);
}
pub fn health(&self) -> GroupHealth {
self.inner
.lock()
.expect("StandbyGroup mutex poisoned")
.health()
}
pub fn active_healthy(&self) -> bool {
self.inner
.lock()
.expect("StandbyGroup mutex poisoned")
.active_healthy()
}
pub fn active_index(&self) -> u8 {
self.inner
.lock()
.expect("StandbyGroup mutex poisoned")
.active_index()
}
pub fn member_role(&self, index: u8) -> Option<MemberRole> {
self.inner
.lock()
.expect("StandbyGroup mutex poisoned")
.member_role(index)
}
pub fn synced_through(&self, index: u8) -> Option<u64> {
self.inner
.lock()
.expect("StandbyGroup mutex poisoned")
.synced_through(index)
}
pub fn buffered_event_count(&self) -> usize {
self.inner
.lock()
.expect("StandbyGroup mutex poisoned")
.buffered_event_count()
}
pub fn group_id(&self) -> u32 {
self.inner
.lock()
.expect("StandbyGroup mutex poisoned")
.group_id()
}
pub fn members(&self) -> Vec<MemberInfo> {
self.inner
.lock()
.expect("StandbyGroup mutex poisoned")
.members()
.to_vec()
}
pub fn member_count(&self) -> u8 {
self.inner
.lock()
.expect("StandbyGroup mutex poisoned")
.member_count()
}
pub fn standby_count(&self) -> u8 {
self.inner
.lock()
.expect("StandbyGroup mutex poisoned")
.standby_count()
}
}
impl std::fmt::Debug for StandbyGroup {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let guard = self.inner.lock().expect("StandbyGroup mutex poisoned");
f.debug_struct("StandbyGroup")
.field("group_id", &format_args!("{:#x}", guard.group_id()))
.field("active_index", &guard.active_index())
.field("member_count", &guard.member_count())
.field("buffered_events", &guard.buffered_event_count())
.finish()
}
}