use std::sync::Arc;
use std::time::Duration;
use display_more::DisplayOptionExt;
use futures_util::Stream;
use openraft_macros::since;
use crate::LogIdOptionExt;
use crate::LogIndexOptionExt;
use crate::OptionalSend;
use crate::RaftMetrics;
use crate::RaftTypeConfig;
use crate::core::raft_msg::RaftMsg;
use crate::core::raft_msg::external_command::ExternalCommand;
use crate::errors::Fatal;
#[cfg(doc)]
use crate::errors::into_raft_result::IntoRaftResult;
use crate::raft::AppendEntriesRequest;
use crate::raft::AppendEntriesResponse;
use crate::raft::SnapshotResponse;
use crate::raft::TransferLeaderRequest;
use crate::raft::VoteRequest;
use crate::raft::VoteResponse;
use crate::raft::raft_inner::RaftInner;
use crate::raft::stream_append;
use crate::raft::stream_append::StreamAppendResult;
use crate::type_config::TypeConfigExt;
use crate::type_config::alias::SnapshotDataOf;
use crate::type_config::alias::SnapshotOf;
use crate::type_config::alias::VoteOf;
use crate::vote::raft_vote::RaftVoteExt;
#[since(version = "0.10.0")]
pub(crate) struct ProtocolApi<C>
where C: RaftTypeConfig
{
inner: Arc<RaftInner<C>>,
}
impl<C> ProtocolApi<C>
where C: RaftTypeConfig
{
pub(in crate::raft) fn new(inner: Arc<RaftInner<C>>) -> Self {
Self { inner }
}
#[since(version = "0.10.0")]
#[tracing::instrument(level = "debug", skip(self, rpc))]
pub(crate) async fn vote(&self, rpc: VoteRequest<C>) -> Result<VoteResponse<C>, Fatal<C>> {
tracing::info!("Raft::vote(): rpc: {}", rpc);
let (tx, rx) = C::oneshot();
self.inner.call_core(RaftMsg::RequestVote { rpc, tx }, rx).await
}
#[since(version = "0.10.0")]
#[tracing::instrument(level = "debug", skip(self, rpc))]
pub(crate) async fn append_entries(
&self,
rpc: AppendEntriesRequest<C>,
) -> Result<AppendEntriesResponse<C>, Fatal<C>> {
tracing::debug!("Raft::append_entries: rpc: {}", rpc);
let (tx, rx) = C::oneshot();
let stream_result: StreamAppendResult<C> = self.inner.call_core(RaftMsg::AppendEntries { rpc, tx }, rx).await?;
Ok(AppendEntriesResponse::from(stream_result))
}
#[since(version = "0.10.0")]
pub(crate) fn stream_append<S>(
self,
stream: S,
) -> impl Stream<Item = Result<StreamAppendResult<C>, Fatal<C>>> + OptionalSend + 'static
where
S: Stream<Item = AppendEntriesRequest<C>> + OptionalSend + 'static,
{
stream_append::stream_append(self.inner, stream)
}
#[since(version = "0.10.0")]
#[tracing::instrument(level = "debug", skip_all)]
pub(crate) async fn get_snapshot(&self) -> Result<Option<SnapshotOf<C>>, Fatal<C>> {
tracing::debug!("Raft::get_snapshot()");
let (tx, rx) = C::oneshot();
let cmd = ExternalCommand::GetSnapshot { tx };
self.inner.call_core(RaftMsg::ExternalCommand { cmd }, rx).await
}
#[since(version = "0.10.0")]
#[tracing::instrument(level = "debug", skip_all)]
pub(crate) async fn begin_receiving_snapshot(&self) -> Result<SnapshotDataOf<C>, Fatal<C>> {
tracing::info!("Raft::begin_receiving_snapshot()");
let (tx, rx) = C::oneshot();
self.inner.call_core(RaftMsg::GetSnapshotReceiver { tx }, rx).await
}
#[since(version = "0.10.0")]
#[tracing::instrument(level = "debug", skip_all)]
pub(crate) async fn install_full_snapshot(
&self,
vote: VoteOf<C>,
snapshot: SnapshotOf<C>,
) -> Result<SnapshotResponse<C>, Fatal<C>> {
tracing::info!("Raft::install_full_snapshot()");
let (tx, rx) = C::oneshot();
self.inner.call_core(RaftMsg::InstallSnapshot { vote, snapshot, tx }, rx).await
}
#[since(version = "0.10.0")]
pub(crate) async fn handle_transfer_leader(&self, req: TransferLeaderRequest<C>) -> Result<(), Fatal<C>> {
if &req.to_node_id == self.inner.id() {
self.ensure_log_flushed_for_transfer_leader(&req).await?;
}
let raft_msg = RaftMsg::HandleTransferLeader {
from: req.from_leader,
to: req.to_node_id,
};
self.inner.send_msg(raft_msg).await?;
Ok(())
}
async fn ensure_log_flushed_for_transfer_leader(&self, req: &TransferLeaderRequest<C>) -> Result<(), Fatal<C>> {
let ok = |m: &RaftMetrics<C>| {
req.from_leader() == &m.vote && m.last_log_index.next_index() >= req.last_log_id().next_index()
};
#[allow(clippy::neg_cmp_op_on_partial_ord)]
let fail = |m: &RaftMetrics<C>| !(req.from_leader.as_ref_vote() >= m.vote.as_ref_vote());
let timeout = Some(Duration::from_millis(self.inner.config().election_timeout_min));
let metrics_res =
self.inner.wait(timeout).metrics(|st| ok(st) || fail(st), "transfer_leader await flushed log").await;
match metrics_res {
Ok(metrics) => {
if fail(&metrics) {
tracing::warn!(
"Vote changed, give up Leader-transfer; expected vote: {}, metrics: {}",
req.from_leader,
metrics
);
return Ok(());
}
tracing::info!(
"Leader-transfer condition satisfied, submit Leader-transfer message; \
expected: (vote: {}, flushed_log: {})",
req.from_leader,
req.last_log_id.display(),
);
}
Err(err) => {
tracing::warn!(
"Leader-transfer condition fail to satisfy, still submit Leader-transfer; \
expected: (vote: {}; flushed_log: {}), error: {}",
req.from_leader,
req.last_log_id.display(),
err
);
}
};
Ok(())
}
}