use std::fmt::Debug;
use display_more::DisplayResultExt;
use maplit::btreemap;
use openraft_macros::since;
use crate::ChangeMembers;
use crate::LogIdOptionExt;
use crate::OptionalSend;
use crate::RaftMetrics;
use crate::RaftTypeConfig;
use crate::core::raft_msg::RaftMsg;
use crate::core::replication_lag;
use crate::errors::Fatal;
use crate::errors::InitializeError;
use crate::impls::ProgressResponder;
use crate::membership::IntoNodes;
use crate::raft::ClientWriteResult;
use crate::raft::raft_inner::RaftInner;
use crate::type_config::TypeConfigExt;
use crate::type_config::alias::LogIdOf;
use crate::type_config::alias::OneshotReceiverOf;
#[since(version = "0.10.0")]
pub(crate) struct ManagementApi<'a, C>
where C: RaftTypeConfig
{
inner: &'a RaftInner<C>,
}
impl<'a, C> ManagementApi<'a, C>
where C: RaftTypeConfig
{
pub(in crate::raft) fn new(inner: &'a RaftInner<C>) -> Self {
Self { inner }
}
#[since(version = "0.10.0")]
#[tracing::instrument(level = "debug", skip(self))]
pub(crate) async fn initialize<T>(&self, members: T) -> Result<Result<(), InitializeError<C>>, Fatal<C>>
where T: IntoNodes<C::NodeId, C::Node> + Debug {
let (tx, rx) = C::oneshot();
self.inner
.call_core(
RaftMsg::Initialize {
members: members.into_nodes(),
tx,
},
rx,
)
.await
}
#[since(version = "0.10.0")]
#[tracing::instrument(level = "info", skip_all)]
pub(crate) async fn change_membership(
&self,
members: impl Into<ChangeMembers<C::NodeId, C::Node>>,
retain: bool,
) -> Result<ClientWriteResult<C>, Fatal<C>> {
let changes: ChangeMembers<C::NodeId, C::Node> = members.into();
tracing::info!(
"change_membership: start to commit joint config: changes: {:?}, retain: {}",
changes,
retain
);
let (tx, rx) = new_responder_pair::<C, _>();
tracing::debug!("change_membership: start",);
let client_write_result = self
.inner
.call_core(
RaftMsg::ChangeMembership {
changes: changes.clone(),
retain,
tx,
},
rx,
)
.await?;
tracing::debug!(
"change_membership: client_write_result: {}",
client_write_result.display()
);
let resp = match client_write_result {
Ok(x) => x,
Err(e) => {
tracing::error!("the first step error: {}", e);
return Ok(Err(e));
}
};
tracing::debug!("res of first step: {}", resp);
let (log_id, joint) = (&resp.log_id, resp.membership.clone().unwrap());
if joint.get_joint_config().len() == 1 {
return Ok(Ok(resp));
}
tracing::debug!("committed a joint config: {} {:?}", log_id, joint);
tracing::debug!("the second step is to change to uniform config: {:?}", changes);
let (tx, rx) = new_responder_pair::<C, _>();
let changes = ChangeMembers::AddVoterIds(Default::default());
let client_write_result = self.inner.call_core(RaftMsg::ChangeMembership { changes, retain, tx }, rx).await?;
tracing::info!(
"result of second step of change_membership: {}",
client_write_result.display()
);
if let Err(e) = &client_write_result {
tracing::error!("the second step error: {}", e);
}
Ok(client_write_result)
}
#[since(version = "0.10.0")]
#[tracing::instrument(level = "debug", skip(self, id), fields(target=display(&id)))]
pub(crate) async fn add_learner(
&self,
id: C::NodeId,
node: C::Node,
blocking: bool,
) -> Result<ClientWriteResult<C>, Fatal<C>> {
let (tx, rx) = new_responder_pair::<C, _>();
let msg = RaftMsg::ChangeMembership {
changes: ChangeMembers::AddNodes(btreemap! {id.clone()=>node}),
retain: true,
tx,
};
let client_write_result = self.inner.call_core(msg, rx).await?;
let resp = match client_write_result {
Ok(x) => x,
Err(e) => return Ok(Err(e)),
};
if !blocking {
return Ok(Ok(resp));
}
if self.inner.id == id {
return Ok(Ok(resp));
}
let membership_log_id = &resp.log_id;
let wait_res = self
.inner
.wait(None)
.metrics(
|metrics| match self.check_replication_upto_date(metrics, &id, Some(membership_log_id)) {
Ok(_matching) => true,
Err(_) => false,
},
"wait new learner to become line-rate",
)
.await;
tracing::info!(
"waiting for replication to new learner: wait_res: {}",
wait_res.display()
);
Ok(Ok(resp))
}
#[since(version = "0.10.0")]
fn check_replication_upto_date(
&self,
metrics: &RaftMetrics<C>,
node_id: &C::NodeId,
membership_log_id: Option<&LogIdOf<C>>,
) -> Result<Option<LogIdOf<C>>, ()> {
if metrics.membership_config.log_id().as_ref() < membership_log_id {
return Err(());
}
if metrics.membership_config.membership().get_node(node_id).is_none() {
return Ok(None);
}
let repl = match &metrics.replication {
None => {
return Ok(None);
}
Some(x) => x,
};
let replication_metrics = repl;
let target_metrics = match replication_metrics.get(node_id) {
None => {
return Err(());
}
Some(x) => x,
};
let matched = target_metrics.clone();
let distance = replication_lag(&matched.index(), &metrics.last_log_index);
if distance <= self.inner.config.replication_lag_threshold {
return Ok(matched);
}
Err(())
}
}
fn new_responder_pair<C, T>() -> (ProgressResponder<C, T>, OneshotReceiverOf<C, T>)
where
C: RaftTypeConfig,
T: OptionalSend,
{
let (tx, _commit_rx, complete_rx) = ProgressResponder::new();
(tx, complete_rx)
}