use crate::core::fsm::StateMachineCaller;
use crate::core::lifecycle::{Component, Lifecycle, LoopHandler, ReplicaComponent};
use crate::core::log::LogManager;
use crate::core::snapshot::task::Task;
use crate::core::task_sender::TaskSender;
use crate::error::{LifeCycleError, PacificaError};
use crate::rpc::message::{InstallSnapshotRequest, InstallSnapshotResponse};
use crate::rpc::RpcServiceError;
use crate::runtime::{MpscUnboundedReceiver, TypeConfigExt};
use crate::storage::{GetFileRequest, GetFileResponse, GetFileService, SnapshotDownloader, SnapshotStorage, StorageError};
use crate::type_config::alias::{
MpscUnboundedReceiverOf, OneshotReceiverOf, SnapshotReaderOf,
SnapshotStorageOf,
};
use crate::util::{send_result, AutoClose, RepeatedTask, RepeatedTimer};
use crate::{LogId, ReplicaId, ReplicaOption, TypeConfig};
use anyerror::AnyError;
use futures::FutureExt;
use std::sync::{Arc, Mutex, RwLock};
use tracing::{Level, Span};
use tracing_futures::Instrument;
use crate::fsm::StateMachine;
pub(crate) struct SnapshotExecutor<C, FSM>
where
C: TypeConfig,
FSM: StateMachine<C>,
{
snapshot_storage: Arc<RwLock<SnapshotStorageOf<C>>>,
snapshot_timer: RepeatedTimer<C>,
work_handler: Mutex<Option<WorkHandler<C, FSM>>>,
tx_task: TaskSender<C, Task<C>>,
span: Span,
}
impl<C, FSM> SnapshotExecutor<C, FSM>
where
C: TypeConfig,
FSM: StateMachine<C>,
{
pub(crate) fn new(
snapshot_storage: SnapshotStorageOf<C>,
log_manager: Arc<ReplicaComponent<C, LogManager<C>>>,
fsm_caller: Arc<ReplicaComponent<C, StateMachineCaller<C, FSM>>>,
replica_option: Arc<ReplicaOption>,
span: Span,
) -> Self {
let snapshot_storage = Arc::new(RwLock::new(snapshot_storage));
let (tx_task, rx_task) = C::mpsc_unbounded();
let snapshot_saver: SnapshotSaver<C> = SnapshotSaver::new(
TaskSender::new(tx_task.clone())
);
let snapshot_timer = RepeatedTimer::new(snapshot_saver, replica_option.snapshot_save_interval(), false);
let wrk_span = tracing::span!(
parent: &span,
Level::DEBUG,
"WorkHandler",
);
let work_handler = WorkHandler::new(
snapshot_storage.clone(),
log_manager,
fsm_caller,
replica_option,
rx_task,
wrk_span,
);
SnapshotExecutor {
snapshot_storage,
snapshot_timer,
work_handler: Mutex::new(Some(work_handler)),
tx_task: TaskSender::new(tx_task),
span
}
}
pub(crate) async fn load_snapshot(&self) -> Result<LogId, PacificaError<C>> {
let (callback, rx_result) = C::oneshot();
self.tx_task.send(Task::SnapshotLoad { callback })?;
let result: Result<LogId, PacificaError<C>> = rx_result.await?;
result
}
pub(crate) async fn save_snapshot(&self) -> Result<LogId, PacificaError<C>> {
let (callback, rx) = C::oneshot();
self.tx_task.send(Task::SnapshotSave { callback })?;
let result: Result<LogId, PacificaError<C>> = rx.await?;
result
}
pub(crate) async fn install_snapshot(
&self,
request: InstallSnapshotRequest<C>,
) -> Result<InstallSnapshotResponse, PacificaError<C>> {
let (callback, rx) = C::oneshot();
self.tx_task.send(Task::InstallSnapshot { request, callback })?;
let result: Result<InstallSnapshotResponse, PacificaError<C>> = rx.await?;
result
}
pub(crate) async fn open_snapshot_reader(
&self,
) -> Result<Option<AutoClose<SnapshotReaderOf<C>>>, StorageError> {
let snapshot_reader = self.snapshot_storage.write().unwrap().open_reader().map_err(|e| StorageError::open_reader(e))?;
let snapshot_reader = snapshot_reader.map(|reader| AutoClose::new(reader));
Ok(snapshot_reader)
}
}
struct SnapshotSaver<C>
where
C: TypeConfig,
{
tx_task: TaskSender<C, Task<C>>,
}
impl<C> SnapshotSaver<C>
where
C: TypeConfig,
{
fn new(tx_task: TaskSender<C, Task<C>>) -> SnapshotSaver<C> {
SnapshotSaver { tx_task }
}
async fn do_snapshot_save(&self) -> Result<LogId, PacificaError<C>> {
let (callback, rx) = C::oneshot();
self.tx_task.send(Task::SnapshotSave { callback })?;
let result: Result<LogId, PacificaError<C>> = rx.await?;
result
}
}
impl<C> RepeatedTask for SnapshotSaver<C>
where
C: TypeConfig,
{
async fn execute(&mut self) {
let result = self.do_snapshot_save().await;
match result {
Ok(log_id) => {
tracing::debug!("do snapshot save success. log_id: {}", log_id);
}
Err(e) => {
tracing::error!("do snapshot save failed. error: {}", e);
}
}
}
}
pub(crate) struct WorkHandler<C, FSM>
where
C: TypeConfig,
FSM: StateMachine<C>,
{
snapshot_storage: Arc<RwLock<SnapshotStorageOf<C>>>,
log_manager: Arc<ReplicaComponent<C, LogManager<C>>>,
fsm_caller: Arc<ReplicaComponent<C, StateMachineCaller<C, FSM>>>,
replica_option: Arc<ReplicaOption>,
last_snapshot_log_id: LogId,
rx_task: MpscUnboundedReceiverOf<C, Task<C>>,
span: Span
}
impl<C, FSM> WorkHandler<C, FSM>
where
C: TypeConfig,
FSM: StateMachine<C>,
{
fn new(
snapshot_storage: Arc<RwLock<C::SnapshotStorage>>,
log_manager: Arc<ReplicaComponent<C, LogManager<C>>>,
fsm_caller: Arc<ReplicaComponent<C, StateMachineCaller<C, FSM>>>,
replica_option: Arc<ReplicaOption>,
rx_task: MpscUnboundedReceiverOf<C, Task<C>>,
span: Span
) -> WorkHandler<C, FSM> {
WorkHandler {
snapshot_storage,
log_manager,
fsm_caller,
replica_option,
last_snapshot_log_id: LogId::default(),
rx_task,
span
}
}
async fn handle_task(&mut self, task: Task<C>) -> Result<(), LifeCycleError> {
match task {
Task::SnapshotLoad { callback } => {
let result = self.do_snapshot_load().await;
let _ = send_result::<C, LogId, PacificaError<C>>(callback, result);
}
Task::SnapshotSave { callback } => {
let result = self.do_snapshot_save(0).await;
let _ = send_result::<C, LogId, PacificaError<C>>(callback, result);
}
Task::InstallSnapshot { request, callback } => {
let result = self.do_snapshot_install(request).await;
let _ = send_result::<C, InstallSnapshotResponse, PacificaError<C>>(callback, result);
}
}
Ok(())
}
async fn do_snapshot_save(&mut self, log_index_margin: usize) -> Result<LogId, PacificaError<C>> {
let committed_log_index = self.fsm_caller.get_committed_log_index();
let distance = committed_log_index - self.last_snapshot_log_id.index;
if distance <= log_index_margin {
return Ok(self.last_snapshot_log_id.clone());
}
let writer = self.snapshot_storage.write().unwrap().open_writer().map_err(|e| StorageError::open_writer(e))?;
let writer = AutoClose::new(writer);
let snapshot_log_id = self.fsm_caller.on_snapshot_save(writer).await?;
self.on_snapshot_success(snapshot_log_id.clone()).await?;
tracing::info!("success to save snapshot. log_id: {}", snapshot_log_id);
Ok(snapshot_log_id)
}
async fn do_snapshot_load(&mut self) -> Result<LogId, PacificaError<C>> {
let snapshot_reader = {
let mut storage = self.snapshot_storage.write().unwrap();
storage.open_reader()
};
let snapshot_reader = snapshot_reader.map_err(|e| StorageError::open_reader(e))?;
if let Some(snapshot_reader) = snapshot_reader {
let snapshot_log_id = self.fsm_caller.on_snapshot_load(AutoClose::new(snapshot_reader)).await?;
tracing::info!("success to load snapshot. log_id: {}", snapshot_log_id);
self.on_snapshot_success(snapshot_log_id.clone()).await?;
return Ok(snapshot_log_id);
};
Ok(self.last_snapshot_log_id.clone())
}
async fn on_snapshot_success(&mut self, snapshot_log_id: LogId) -> Result<(), PacificaError<C>> {
self.last_snapshot_log_id = snapshot_log_id.clone();
self.log_manager.on_snapshot(snapshot_log_id).await?;
Ok(())
}
async fn do_snapshot_install(
&mut self,
request: InstallSnapshotRequest<C>,
) -> Result<InstallSnapshotResponse, PacificaError<C>> {
if request.snapshot_log_id <= self.last_snapshot_log_id {
return Ok(InstallSnapshotResponse::Success);
}
let target_id = request.primary_id.clone();
let _ = self.do_snapshot_download(target_id, request.read_id).await?;
let log_id = self.do_snapshot_load().await?;
tracing::info!("success to install snapshot. log_id: {}", log_id);
assert_eq!(log_id, request.snapshot_log_id);
Ok(InstallSnapshotResponse::Success)
}
async fn do_snapshot_download(
&mut self,
target_id: ReplicaId<C::NodeId>,
download_id: usize,
) -> Result<(), StorageError> {
let downloader = {
let mut snapshot_storage = self.snapshot_storage.write().unwrap();
snapshot_storage.open_downloader(target_id, download_id)
};
let mut downloader = downloader.map_err(|e| {
StorageError::download_snapshot(download_id, e)
})?;
downloader.download().await.map_err(|e| {
StorageError::download_snapshot(download_id, e)
})?;
Ok(())
}
}
impl<C, FSM> LoopHandler<C> for WorkHandler<C, FSM>
where
C: TypeConfig,
FSM: StateMachine<C>,
{
#[tracing::instrument(level = "debug", skip_all, err)]
async fn run_loop(mut self, mut rx_shutdown: OneshotReceiverOf<C, ()>) -> Result<(), LifeCycleError> {
let span = self.span.clone();
let lopper = async move {
tracing::debug!("starting...");
loop {
futures::select_biased! {
_ = (&mut rx_shutdown).fuse() => {
tracing::info!("received shutdown signal.");
break;
}
task_msg = self.rx_task.recv().fuse() => {
match task_msg {
Some(task) => {
let result = self.handle_task(task).await;
if let Err(e) = result {
tracing::error!("SnapshotExecutor failed to handle task. {}", e);
}
}
None => {
tracing::warn!("received unexpected task message.");
break;
}
}
}
}
}
Ok(())
};
lopper.instrument(span).await
}
}
impl<C, FSM> Lifecycle<C> for SnapshotExecutor<C, FSM>
where
C: TypeConfig,
FSM: StateMachine<C>,
{
#[tracing::instrument(level = "debug", skip(self), err)]
async fn startup(&self) -> Result<(), LifeCycleError> {
self.load_snapshot().await.map_err(|e| LifeCycleError::StartupError(AnyError::new(&e)))?;
self.snapshot_timer.turn_on();
Ok(())
}
#[tracing::instrument(level = "debug", skip(self), err)]
async fn shutdown(&self) -> Result<(), LifeCycleError> {
let _ = self.snapshot_timer.shutdown();
Ok(())
}
}
impl<C, FSM> Component<C> for SnapshotExecutor<C, FSM>
where
C: TypeConfig,
FSM: StateMachine<C>,
{
type LoopHandler = WorkHandler<C, FSM>;
fn new_loop_handler(&self) -> Option<Self::LoopHandler> {
self.work_handler.lock().unwrap().take()
}
}
impl<C, FSM> GetFileService<C> for SnapshotExecutor<C, FSM>
where
C: TypeConfig,
FSM: StateMachine<C>,
{
#[inline]
async fn handle_get_file_request(&self, request: GetFileRequest) -> Result<GetFileResponse, RpcServiceError> {
let snapshot_storage = {
self.snapshot_storage.read().unwrap().file_service().map_err(|e| {
RpcServiceError::storage_error(e.to_string())
})
}?;
snapshot_storage.handle_get_file_request(request).await
}
}