mod backoff_consumer;
pub(crate) mod backoff_state;
pub(crate) mod event_watcher;
pub(crate) mod inflight_append;
pub(crate) mod inflight_append_queue;
pub(crate) mod payload;
pub(crate) mod replicate;
pub(crate) mod replication_context;
pub(crate) mod replication_handle;
pub(crate) mod replication_progress;
mod replication_session_id;
pub(crate) mod response;
pub(crate) mod snapshot_transmitter;
pub(crate) mod snapshot_transmitter_handle;
pub(crate) mod stream_context;
pub(crate) mod stream_state;
use std::fmt;
use std::sync::Arc;
use std::time::Duration;
use display_more::DisplayOptionExt;
use futures_util::FutureExt;
use futures_util::StreamExt;
use payload::Payload;
use replication_progress::ReplicationProgress;
pub(crate) use replication_session_id::ReplicationSessionId;
pub(crate) use response::Progress;
use response::ReplicationResult;
use stream_state::StreamState;
use tracing::Instrument;
use crate::RaftNetworkFactory;
use crate::RaftTypeConfig;
use crate::async_runtime::Mutex;
use crate::async_runtime::watch::WatchReceiver;
use crate::base::BoxStream;
use crate::core::notification::Notification;
use crate::display_ext::display_instant::DisplayInstantExt;
use crate::errors::RPCError;
use crate::errors::ReplicationClosed;
use crate::log_id_range::LogIdRange;
use crate::network::NetBackoff;
use crate::network::NetStreamAppend;
use crate::network::RPCOption;
use crate::progress::inflight_id::InflightId;
use crate::raft::AppendEntriesRequest;
use crate::raft::StreamAppendError;
use crate::raft::StreamAppendResult;
use crate::raft_state::IOId;
use crate::replication::backoff_state::BackoffState;
use crate::replication::event_watcher::EventWatcher;
use crate::replication::inflight_append_queue::InflightAppendQueue;
use crate::replication::replication_context::ReplicationContext;
use crate::replication::stream_context::StreamContext;
use crate::storage::RaftLogStorage;
use crate::type_config::TypeConfigExt;
use crate::type_config::alias::InstantOf;
use crate::type_config::alias::JoinHandleOf;
use crate::type_config::alias::MutexOf;
use crate::type_config::async_runtime::mpsc::MpscSender;
use crate::vote::RaftVote;
pub(crate) struct ReplicationCore<C, N, LS>
where
C: RaftTypeConfig,
N: RaftNetworkFactory<C>,
LS: RaftLogStorage<C>,
{
replication_context: ReplicationContext<C>,
stream_state: Arc<MutexOf<C, StreamState<C, LS>>>,
event_watcher: EventWatcher<C>,
next_action: Option<Payload<C>>,
inflight_id: Option<InflightId>,
network: Option<N::Network>,
replication_progress: ReplicationProgress<C>,
backoff_state: BackoffState,
}
impl<C, N, LS> ReplicationCore<C, N, LS>
where
C: RaftTypeConfig,
N: RaftNetworkFactory<C>,
LS: RaftLogStorage<C>,
{
#[tracing::instrument(level = "trace", skip_all, fields(context=display(&replication_context)
))]
#[allow(clippy::type_complexity)]
#[allow(clippy::too_many_arguments)]
pub(crate) fn spawn(
replication_context: ReplicationContext<C>,
progress: ReplicationProgress<C>,
network: N::Network,
log_reader: LS::LogReader,
event_watcher: EventWatcher<C>,
span: tracing::Span,
) -> JoinHandleOf<C, Result<(), ReplicationClosed>> {
tracing::debug!(
"spawn replication: session_id={}, target={}, committed={}, matching={}",
replication_context.stream_id,
replication_context.target,
progress.local_committed.display(),
progress.remote_matched.display()
);
let backoff_state = BackoffState::new();
let this = Self {
replication_context: replication_context.clone(),
stream_state: Arc::new(MutexOf::<C, _>::new(StreamState {
replication_context,
event_watcher: event_watcher.clone(),
log_reader,
payload: None,
inflight_id: None,
leader_committed: None,
backoff_consumer: backoff_state.consumer(),
})),
inflight_id: None,
event_watcher,
network: Some(network),
replication_progress: progress,
backoff_state,
next_action: None,
};
C::spawn(this.main().instrument(span))
}
fn new_request_stream(stream_context: StreamContext<C, LS>) -> BoxStream<'static, AppendEntriesRequest<C>> {
let strm = futures_util::stream::unfold(stream_context, Self::next_append_request);
Box::pin(strm)
}
async fn next_append_request(
stream_context: StreamContext<C, LS>,
) -> Option<(AppendEntriesRequest<C>, StreamContext<C, LS>)> {
let req = {
let mut state = stream_context.stream_state.as_ref().lock().await;
state.next_request().await?
};
stream_context.inflight_append_queue.push(req.last_log_id());
Some((req, stream_context))
}
async fn main(mut self) -> Result<(), ReplicationClosed> {
let mut network = self.network.take().unwrap();
self.next_action = None;
self.inflight_id = None;
loop {
tracing::debug!(
"ReplicationCore: new stream start, next_action = {:?}",
self.next_action
);
let canceled = self.replication_context.cancel_rx.changed().now_or_never();
if canceled.map(|x| x.is_err()) == Some(true) {
tracing::info!("ReplicationCore: canceled, quit");
return Err(ReplicationClosed::new("canceled"));
}
let accepted_io: IOId<C> = self.event_watcher.io_accepted_rx.borrow_watched().clone();
let current_leader = accepted_io.leader_id().clone();
let belonging_leader = self.replication_context.leader_vote.leader_id().clone();
if current_leader != belonging_leader {
tracing::info!(
"ReplicationCore: Leader changed from {} to {}, quit replication",
belonging_leader,
current_leader
);
return Err(ReplicationClosed::new("Leader changed"));
}
self.backoff_state.reconcile(|| network.backoff());
if self.next_action.is_none() {
self.drain_events().await?;
} else {
let new_data = self.event_watcher.replicate_rx.borrow_watched().clone();
if self.inflight_id != Some(new_data.inflight_id) {
tracing::info!(
"ReplicationCore replaced current data with inflight id {:?} with new {}",
self.inflight_id,
new_data.inflight_id
);
self.inflight_id = Some(new_data.inflight_id);
self.next_action = Some(new_data.payload);
}
}
let mut payload = self.next_action.take().unwrap();
{
let mut stream_state = self.stream_state.lock().await;
stream_state.payload = Some(payload.clone());
stream_state.inflight_id = self.inflight_id;
stream_state.leader_committed = self.event_watcher.committed_rx.borrow_watched().clone()
}
let inflight_queue = InflightAppendQueue::new();
let stream_context = StreamContext {
stream_state: self.stream_state.clone(),
inflight_append_queue: inflight_queue.clone(),
};
let req_strm = Self::new_request_stream(stream_context);
let rpc_timeout = Duration::from_millis(self.replication_context.config.heartbeat_interval);
let option = RPCOption::new(rpc_timeout);
let resp_strm_res = network.stream_append(req_strm, option).await;
let resp_strm = match resp_strm_res {
Ok(resp_strm) => resp_strm,
Err(rpc_err) => {
self.backoff_state.on_error(rpc_err.backoff_rank());
self.send_progress_error(rpc_err, "initiate-stream-replication").await;
continue;
}
};
let res = self.handle_response_stream(resp_strm, inflight_queue).await;
if res.is_ok() {
payload.update_matching(self.replication_progress.remote_matched.clone());
if payload.len() != Some(0) {
self.next_action = Some(payload);
} else {
self.inflight_id = None;
}
}
}
}
async fn handle_response_stream<'s>(
&mut self,
resp_strm: BoxStream<'s, Result<StreamAppendResult<C>, RPCError<C>>>,
inflight_queue: InflightAppendQueue<C>,
) -> Result<(), &'static str> {
let mut resp_strm = std::pin::pin!(resp_strm);
while let Some(rpc_res) = resp_strm.next().await {
tracing::debug!("AppendEntries RPC response: {:?}", rpc_res);
self.backoff_state.observe(&rpc_res);
let append_res = match rpc_res {
Ok(stream_append_res) => stream_append_res,
Err(rpc_err) => {
self.send_progress_error(rpc_err, "stream-replication").await;
return Err("RPCError");
}
};
match append_res {
Ok(matching) => {
let last_acked_sending_time = inflight_queue.drain_acked(&matching);
if let Some(last) = last_acked_sending_time {
self.notify_heartbeat_progress(last).await;
}
self.replication_progress.remote_matched = matching.clone();
self.notify_progress(ReplicationResult(Ok(matching))).await;
}
Err(append_err) => {
match append_err {
StreamAppendError::Conflict(conflict_log_id) => {
self.notify_progress(ReplicationResult(Err(conflict_log_id))).await;
}
StreamAppendError::HigherVote(higher) => {
self.replication_context
.tx_notify
.send(Notification::HigherVote {
target: self.replication_context.target.clone(),
higher,
leader_vote: self.replication_context.leader_vote.clone(),
})
.await
.ok();
}
}
return Err("AppendError");
}
}
}
Ok(())
}
async fn send_progress_error(&mut self, err: RPCError<C>, when: impl fmt::Display) {
tracing::warn!(
"ReplicationCore recv RPCError: {}, when:({}); sending error to RaftCore",
err,
when
);
if self.inflight_id.is_none() {
return;
}
self.replication_context
.tx_notify
.send(Notification::ReplicationProgress {
progress: Progress {
target: self.replication_context.target.clone(),
result: Err(err.to_string()),
},
inflight_id: self.inflight_id,
})
.await
.ok();
}
async fn notify_heartbeat_progress(&mut self, sending_time: InstantOf<C>) {
tracing::debug!("ReplicationCore notify_heartbeat_progress: {}", sending_time.display());
self.replication_context
.tx_notify
.send({
Notification::HeartbeatProgress {
stream_id: self.replication_context.stream_id,
target: self.replication_context.target.clone(),
sending_time,
}
})
.await
.ok();
}
async fn notify_progress(&mut self, replication_result: ReplicationResult<C>) {
tracing::debug!(
"{}: target={}, curr_matching={}, result={}, inflight_id={}",
func_name!(),
self.replication_context.target,
self.replication_progress.remote_matched.display(),
replication_result,
self.inflight_id.display()
);
match &replication_result.0 {
Ok(matching) => {
self.replication_progress.remote_matched = matching.clone();
if matching.is_none() {
return;
}
}
Err(_conflict) => {
}
}
self.replication_context
.tx_notify
.send({
Notification::ReplicationProgress {
progress: Progress {
target: self.replication_context.target.clone(),
result: Ok(replication_result.clone()),
},
inflight_id: self.inflight_id,
}
})
.await
.ok();
}
#[tracing::instrument(level = "trace", skip_all)]
pub async fn drain_events(&mut self) -> Result<(), ReplicationClosed> {
tracing::debug!("drain_events");
let entries = self.event_watcher.replicate_rx.changed();
let committed = self.event_watcher.committed_rx.changed();
futures_util::select! {
entries_res = entries.fuse() => {
entries_res.map_err(|_e| ReplicationClosed::new("replicate_rx closed"))?;
let data = self.event_watcher.replicate_rx.borrow_watched().clone();
self.inflight_id = Some(data.inflight_id);
self.next_action = Some(data.payload);
}
committed_res = committed.fuse() => {
committed_res.map_err(|_e| ReplicationClosed::new("committed_rx closed"))?;
let committed = self.event_watcher.committed_rx.borrow_watched().clone();
self.replication_progress.local_committed = committed;
let m = self.replication_progress.remote_matched.clone();
let log_id_range = LogIdRange::new(m.clone(), m);
self.inflight_id = None;
self.next_action = Some(Payload::LogIdRange { log_id_range });
}
};
tracing::debug!(
"drain_events set: inflight_id={:?}, next_action={:?}",
self.inflight_id,
self.next_action
);
Ok(())
}
}