mod builder;
pub use builder::*;
mod leader_notifier;
pub(crate) use leader_notifier::*;
#[doc(hidden)]
mod type_config;
use tracing::info;
#[doc(hidden)]
pub use type_config::*;
#[cfg(test)]
mod builder_test;
#[cfg(test)]
mod node_test;
#[cfg(test)]
mod test_helpers;
use std::fmt::Debug;
use std::sync::Arc;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::Ordering;
use d_engine_core::Membership;
use d_engine_core::Raft;
use d_engine_core::RaftEvent;
use d_engine_core::RaftNodeConfig;
use d_engine_core::Result;
use d_engine_core::TypeConfig;
use d_engine_core::alias::MOF;
#[cfg(feature = "watch")]
use d_engine_core::watch::WatchRegistry;
use tokio::sync::Mutex;
use tokio::sync::mpsc;
use tokio::sync::watch;
pub struct Node<T>
where
T: TypeConfig,
{
pub(crate) node_id: u32,
pub(crate) raft_core: Arc<Mutex<Raft<T>>>,
pub(crate) membership: Arc<MOF<T>>,
pub(crate) event_tx: mpsc::Sender<RaftEvent>,
pub(crate) cmd_tx: mpsc::UnboundedSender<d_engine_core::ClientCmd>,
pub(crate) ready: AtomicBool,
pub(crate) rpc_ready_tx: watch::Sender<bool>,
pub(crate) leader_notifier: LeaderNotifier,
pub node_config: Arc<RaftNodeConfig>,
#[cfg(feature = "watch")]
pub(crate) watch_registry: Option<Arc<WatchRegistry>>,
#[cfg(feature = "watch")]
pub(crate) _watch_dispatcher_handle: Option<tokio::task::JoinHandle<()>>,
pub(crate) _sm_worker_handle: Option<tokio::task::JoinHandle<()>>,
pub(crate) _commit_handler_handle: Option<tokio::task::JoinHandle<()>>,
pub(crate) _lease_cleanup_handle: Option<tokio::task::JoinHandle<()>>,
pub(crate) shutdown_signal: watch::Receiver<()>,
}
impl<T> Debug for Node<T>
where
T: TypeConfig,
{
fn fmt(
&self,
f: &mut std::fmt::Formatter<'_>,
) -> std::fmt::Result {
f.debug_struct("Node").field("node_id", &self.node_id).finish()
}
}
impl<T> Node<T>
where
T: TypeConfig,
{
pub async fn run(&self) -> Result<()> {
let mut shutdown_signal = self.shutdown_signal.clone();
shutdown_signal.borrow_and_update();
if self.node_config.is_learner() {
self.run_as_learner(&mut shutdown_signal).await?;
} else {
self.run_as_voter(&mut shutdown_signal).await?;
}
self.start_raft_loop().await
}
async fn run_as_learner(
&self,
shutdown: &mut watch::Receiver<()>,
) -> Result<()> {
info!("Learner node bootstrap initiated");
self.set_rpc_ready(true);
self.warmup_with_shutdown(shutdown).await?;
let raft = self.raft_core.lock().await;
info!(%self.node_config.cluster.node_id, "Learner joining cluster");
raft.join_cluster().await?;
drop(raft);
Ok(())
}
async fn run_as_voter(
&self,
shutdown: &mut watch::Receiver<()>,
) -> Result<()> {
info!("Voter node bootstrap initiated");
tokio::select! {
result = self.membership.check_cluster_is_ready() => result?,
_ = shutdown.changed() => {
info!("Shutdown during cluster ready check");
return Ok(());
}
}
self.set_rpc_ready(true);
self.warmup_with_shutdown(shutdown).await
}
async fn warmup_with_shutdown(
&self,
shutdown: &mut watch::Receiver<()>,
) -> Result<()> {
tokio::select! {
result = self.membership.pre_warm_connections() => result?,
_ = shutdown.changed() => {
info!("Shutdown during connection warmup");
return Ok(());
}
}
Ok(())
}
async fn start_raft_loop(&self) -> Result<()> {
let mut raft = self.raft_core.lock().await;
raft.run().await
}
pub fn set_rpc_ready(
&self,
is_ready: bool,
) {
info!("Set node RPC server ready: {}", is_ready);
self.ready.store(is_ready, Ordering::SeqCst);
let _ = self.rpc_ready_tx.send(is_ready);
}
pub fn is_rpc_ready(&self) -> bool {
self.ready.load(Ordering::Acquire)
}
pub fn ready_notifier(&self) -> watch::Receiver<bool> {
self.rpc_ready_tx.subscribe()
}
pub fn leader_change_notifier(&self) -> watch::Receiver<Option<crate::LeaderInfo>> {
self.leader_notifier.subscribe()
}
pub fn from_raft(
raft: Raft<T>,
shutdown_signal: watch::Receiver<()>,
) -> Self {
let event_tx = raft.event_sender();
let node_config = raft.ctx.node_config();
let membership = raft.ctx.membership();
let node_id = raft.node_id;
let (rpc_ready_tx, _rpc_ready_rx) = watch::channel(false);
let leader_notifier = LeaderNotifier::new();
let (cmd_tx, _cmd_rx) = mpsc::unbounded_channel();
Node {
node_id,
raft_core: Arc::new(Mutex::new(raft)),
membership,
event_tx,
cmd_tx,
ready: AtomicBool::new(false),
rpc_ready_tx,
leader_notifier,
node_config,
#[cfg(feature = "watch")]
watch_registry: None,
#[cfg(feature = "watch")]
_watch_dispatcher_handle: None,
_sm_worker_handle: None,
_commit_handler_handle: None,
_lease_cleanup_handle: None,
shutdown_signal,
}
}
pub fn node_id(&self) -> u32 {
self.node_id
}
}