use display_more::DisplayOptionExt;
use futures_util::TryStreamExt;
use tracing::Instrument;
use crate::RaftLogReader;
use crate::RaftSnapshotBuilder;
use crate::RaftTypeConfig;
use crate::StorageError;
use crate::async_runtime::MpscReceiver;
use crate::async_runtime::OneshotSender;
use crate::core::ApplyResult;
use crate::core::notification::Notification;
use crate::core::sm::Command;
use crate::core::sm::CommandResult;
use crate::core::sm::Response;
use crate::core::sm::handle::Handle;
use crate::entry::RaftEntry;
use crate::errors::StorageIOResult;
use crate::raft::responder::core_responder::CoreResponder;
#[cfg(doc)]
use crate::storage::RaftLogStorage;
use crate::storage::RaftStateMachine;
use crate::storage::v2::entry_responder::EntryResponderBuilder;
use crate::type_config::TypeConfigExt;
use crate::type_config::alias::JoinHandleOf;
use crate::type_config::alias::LogIdOf;
use crate::type_config::alias::MpscReceiverOf;
use crate::type_config::alias::MpscSenderOf;
use crate::type_config::alias::OneshotSenderOf;
use crate::type_config::alias::SnapshotOf;
use crate::type_config::async_runtime::mpsc::MpscSender;
pub(crate) struct Worker<C, SM, LR>
where
C: RaftTypeConfig,
SM: RaftStateMachine<C>,
LR: RaftLogReader<C>,
{
state_machine: SM,
log_reader: LR,
cmd_rx: MpscReceiverOf<C, Command<C, SM>>,
resp_tx: MpscSenderOf<C, Notification<C>>,
}
impl<C, SM, LR> Worker<C, SM, LR>
where
C: RaftTypeConfig,
SM: RaftStateMachine<C>,
LR: RaftLogReader<C>,
{
pub(crate) fn spawn(
state_machine: SM,
log_reader: LR,
resp_tx: MpscSenderOf<C, Notification<C>>,
state_machine_channel_size: usize,
span: tracing::Span,
) -> Handle<C, SM> {
let (cmd_tx, cmd_rx) = C::mpsc(state_machine_channel_size);
let worker = Worker {
state_machine,
log_reader,
cmd_rx,
resp_tx,
};
let join_handle = worker.do_spawn(span);
Handle { cmd_tx, join_handle }
}
fn do_spawn(mut self, span: tracing::Span) -> JoinHandleOf<C, ()> {
let fu = async move {
let res = self.worker_loop().await;
if let Err(err) = res {
tracing::error!("{} while execute state machine command", err,);
self.resp_tx
.send(Notification::StateMachine {
command_result: CommandResult { result: Err(err) },
})
.await
.ok();
}
};
C::spawn(fu.instrument(span))
}
#[tracing::instrument(level = "debug", skip_all)]
async fn worker_loop(&mut self) -> Result<(), StorageError<C>> {
loop {
let cmd = self.cmd_rx.recv().await;
let cmd = match cmd {
None => {
tracing::info!("{}: rx closed, state machine worker quit", func_name!());
return Ok(());
}
Some(x) => x,
};
tracing::debug!("{}: received command: {:?}", func_name!(), cmd);
match cmd {
Command::BuildSnapshot => {
tracing::info!("{}: build snapshot", func_name!());
self.build_snapshot(self.resp_tx.clone()).await;
}
Command::GetSnapshot { tx } => {
tracing::info!("{}: get snapshot", func_name!());
self.get_snapshot(tx).await?;
}
Command::InstallFullSnapshot {
log_io_id: io_id,
snapshot,
} => {
tracing::info!("{}: install complete snapshot", func_name!());
let meta = snapshot.meta.clone();
self.state_machine
.install_snapshot(&meta, snapshot.snapshot)
.await
.sto_write_snapshot(Some(meta.signature()))?;
tracing::info!("Done install complete snapshot, meta: {}", meta);
let res = CommandResult::new(Ok(Response::InstallSnapshot((io_id, Some(meta)))));
self.resp_tx.send(Notification::sm(res)).await.ok();
}
Command::BeginReceivingSnapshot { tx } => {
tracing::info!("{}: BeginReceivingSnapshot", func_name!());
let snapshot_data = self.state_machine.begin_receiving_snapshot().await.sto_write_snapshot(None)?;
tx.send(snapshot_data).ok();
}
Command::Apply {
first,
last,
client_resp_channels,
} => {
let resp = self.apply(first, last, client_resp_channels).await?;
let res = CommandResult::new(Ok(Response::Apply(resp)));
self.resp_tx.send(Notification::sm(res)).await.ok();
}
Command::ExternalFunc { func } => {
tracing::debug!("{}: run user defined ExternalFunc", func_name!());
func(&mut self.state_machine).await;
}
};
}
}
#[tracing::instrument(level = "debug", skip_all)]
async fn apply(
&mut self,
first: LogIdOf<C>,
last: LogIdOf<C>,
client_resp_channels: Vec<(u64, CoreResponder<C>)>,
) -> Result<ApplyResult<C>, StorageError<C>> {
let since = first.index();
let end = last.index() + 1;
#[cfg(debug_assertions)]
let (got_last_index, last_apply) = {
let l = std::sync::Arc::new(std::sync::atomic::AtomicU64::new(0));
(l.clone(), l)
};
let strm = self.log_reader.entries_stream(since..end).await;
let mut responder_iter = client_resp_channels.into_iter().peekable();
let strm = strm.map_ok(move |entry| {
let log_index = entry.index();
let responder = if responder_iter.peek().map(|(idx, _)| *idx) == Some(log_index) {
responder_iter.next().map(|(_, r)| r)
} else {
None
};
let item = EntryResponderBuilder { entry, responder };
#[cfg(debug_assertions)]
last_apply.store(log_index, std::sync::atomic::Ordering::Relaxed);
tracing::debug!("Applying entry to state machine: {}", item);
let (ent, responder) = item.into_parts();
(ent, responder)
});
self.state_machine.apply(Box::pin(strm)).await.sto_apply(last.clone())?;
#[cfg(debug_assertions)]
{
assert_eq!(end - 1, got_last_index.load(std::sync::atomic::Ordering::Relaxed));
}
let resp = ApplyResult {
since,
end,
last_applied: last,
};
Ok(resp)
}
#[tracing::instrument(level = "info", skip_all)]
async fn build_snapshot(&mut self, resp_tx: MpscSenderOf<C, Notification<C>>) {
let builder = self.state_machine.try_create_snapshot_builder(false).await;
let Some(mut builder) = builder else {
tracing::info!("{}: snapshot building is refused by state machine", func_name!());
let res = CommandResult::new(Ok(Response::BuildSnapshotDone(None)));
resp_tx.send(Notification::sm(res)).await.ok();
return;
};
let _handle = C::spawn(async move {
let res = builder.build_snapshot().await.sto_write_snapshot(None);
let res = res.map(|snap| Response::BuildSnapshotDone(Some(snap.meta)));
let cmd_res = CommandResult::new(res);
resp_tx.send(Notification::sm(cmd_res)).await.ok();
});
tracing::info!("{}: returning; spawned building snapshot task", func_name!());
}
#[tracing::instrument(level = "info", skip_all)]
async fn get_snapshot(&mut self, tx: OneshotSenderOf<C, Option<SnapshotOf<C>>>) -> Result<(), StorageError<C>> {
tracing::info!("{}", func_name!());
let snapshot = self.state_machine.get_current_snapshot().await.sto_read_snapshot(None)?;
tracing::info!(
"sending back snapshot: meta: {}",
snapshot.as_ref().map(|s| &s.meta).display()
);
tx.send(snapshot).ok();
Ok(())
}
}