pacifica-rs 1.0.0

rust implementation of PacificA: Replication in Log-Based Distributed Storage Systems
use crate::core::fsm::StateMachineCaller;
use crate::core::lifecycle::{Component, Lifecycle, LoopHandler, ReplicaComponent};
use crate::core::log::LogManager;
use crate::core::replica_group_agent::ReplicaGroupAgent;
use crate::core::state::append_entries_handler::AppendEntriesHandler;
use crate::core::task_sender::TaskSender;
use crate::core::{CoreNotification, ResultSender};
use crate::error::{HigherTermError, LifeCycleError, PacificaError};
use crate::rpc::message::{AppendEntriesRequest, AppendEntriesResponse, ReplicaRecoverRequest, ReplicaRecoverResponse};
use crate::rpc::{ReplicaClient, RpcOption};
use crate::runtime::{MpscUnboundedReceiver, TypeConfigExt};
use crate::type_config::alias::{MpscUnboundedReceiverOf, OneshotReceiverOf};
use crate::util::{send_result, Leased, RepeatedTask, RepeatedTimer};
use crate::{ReplicaOption, TypeConfig};
use futures::FutureExt;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Mutex, RwLock};
use crate::fsm::StateMachine;

pub(crate) struct CandidateState<C, FSM>
where
    C: TypeConfig,
    FSM: StateMachine<C>,
{
    recover_timer: RepeatedTimer<C>,
    append_entries_handler: AppendEntriesHandler<C, FSM>,
    core_notification: Arc<CoreNotification<C>>,
    recovering: Arc<AtomicBool>,
    tx_task: TaskSender<C, Task<C>>,
    work_handler: Mutex<Option<WorkHandler<C>>>,
}

impl<C, FSM> CandidateState<C, FSM>
where
    C: TypeConfig,
    FSM: StateMachine<C>,
{
    pub fn new(
        replica_client: Arc<C::ReplicaClient>,
        log_manager: Arc<ReplicaComponent<C, LogManager<C>>>,
        fsm_caller: Arc<ReplicaComponent<C, StateMachineCaller<C, FSM>>>,
        replica_group_agent: Arc<ReplicaComponent<C, ReplicaGroupAgent<C>>>,
        core_notification: Arc<CoreNotification<C>>,
        replica_option: Arc<ReplicaOption>,
    ) -> Self {
        let recover_interval = replica_option.recover_interval();
        let (tx_task, rx_task) = C::mpsc_unbounded();
        let recovering = Arc::new(AtomicBool::new(false));
        let recover_task: RecoverTask<C> = RecoverTask::new(recovering.clone(), TaskSender::new(tx_task.clone()));
        let recover_timer = RepeatedTimer::new(recover_task, recover_interval, false);
        let grace_period_timeout = replica_option.grace_period_timeout();
        let grace_period = Leased::new(C::now(), grace_period_timeout.clone());
        let grace_period = Arc::new(RwLock::new(grace_period));
        let append_entries_handler = AppendEntriesHandler::new(
            grace_period,
            log_manager.clone(),
            fsm_caller.clone(),
            replica_group_agent.clone(),
            core_notification.clone(),
            replica_option.clone(),
        );


        let work_handler = WorkHandler::new(
            replica_group_agent.clone(),
            replica_client.clone(),
            core_notification.clone(),
            recovering.clone(),
            replica_option.clone(),
            rx_task,
        );

        Self {
            append_entries_handler,
            core_notification,
            recover_timer,
            recovering,
            tx_task: TaskSender::new(tx_task),
            work_handler: Mutex::new(Some(work_handler)),
        }
    }

    pub(crate) async fn recover(&self) -> Result<(), PacificaError<C>> {
        let (result_sender, rx) = C::oneshot();
        self.tx_task.send(Task::Recover {
            callback: result_sender,
        })?;
        let result: Result<(), PacificaError<C>> = rx.await?;
        result
    }

    pub(crate) async fn handle_append_entries_request(
        &self,
        request: AppendEntriesRequest<C>,
    ) -> Result<AppendEntriesResponse, PacificaError<C>> {
        self.append_entries_handler.handle_append_entries_request(request).await
    }
}

impl<C, FSM> Lifecycle<C> for CandidateState<C, FSM>
where
    C: TypeConfig,
    FSM: StateMachine<C>,
{
    async fn startup(&self) -> Result<(), LifeCycleError> {
        self.recover_timer.turn_on();

        Ok(())
    }

    async fn shutdown(&self) -> Result<(), LifeCycleError> {
        let _ = self.recover_timer.shutdown();
        //
        Ok(())
    }
}

struct RecoverTask<C>
where
    C: TypeConfig,
{
    recovering: Arc<AtomicBool>,
    tx_task: TaskSender<C, Task<C>>,
}

impl<C> RecoverTask<C>
where
    C: TypeConfig,
{
    fn new(recovering: Arc<AtomicBool>, tx_task: TaskSender<C, Task<C>>) -> RecoverTask<C> {
        RecoverTask { recovering, tx_task }
    }

    async fn recover(&self) -> Result<(), PacificaError<C>> {
        if self.recovering.load(Ordering::SeqCst) {
            return Ok(());
        }
        let (result_sender, rx) = C::oneshot();
        self.tx_task.send(Task::Recover {
            callback: result_sender,
        })?;
        let result: Result<(), PacificaError<C>> = rx.await?;
        result
    }
}

impl<C> RepeatedTask for RecoverTask<C>
where
    C: TypeConfig,
{
    async fn execute(&mut self) {
        let result = self.recover().await;
        if let Err(e) = result {
            tracing::error!("Recover failed, error: {}", e);
        }
    }
}

pub(crate) struct WorkHandler<C>
where
    C: TypeConfig,
{
    replica_group_agent: Arc<ReplicaComponent<C, ReplicaGroupAgent<C>>>,
    replica_client: Arc<C::ReplicaClient>,
    core_notification: Arc<CoreNotification<C>>,
    recovering: Arc<AtomicBool>,
    replica_option: Arc<ReplicaOption>,
    rx_task: MpscUnboundedReceiverOf<C, Task<C>>,
}
impl<C> WorkHandler<C>
where
    C: TypeConfig,
{
    fn new(
        replica_group_agent: Arc<ReplicaComponent<C, ReplicaGroupAgent<C>>>,
        replica_client: Arc<C::ReplicaClient>,
        core_notification: Arc<CoreNotification<C>>,
        recovering: Arc<AtomicBool>,
        replica_option: Arc<ReplicaOption>,
        rx_task: MpscUnboundedReceiverOf<C, Task<C>>,
    ) -> WorkHandler<C> {
        WorkHandler {
            replica_group_agent,
            replica_client,
            core_notification,
            recovering,
            replica_option,
            rx_task,
        }
    }

    async fn handle_task(&mut self, task: Task<C>) -> Result<(), LifeCycleError> {
        match task {
            Task::Recover { callback } => {
                let result = self.handle_recover().await;
                let _ = send_result::<C, (), PacificaError<C>>(callback, result);
            }
        }
        Ok(())
    }

    async fn handle_recover(&mut self) -> Result<(), PacificaError<C>> {
        self.check_and_do_recover().await
    }

    async fn check_and_do_recover(&mut self) -> Result<(), PacificaError<C>> {
        self.recovering.store(true, Ordering::SeqCst);
        let result = self.do_recover().await;
        self.recovering.store(false, Ordering::SeqCst);
        result
    }

    async fn do_recover(&self) -> Result<(), PacificaError<C>> {
        let replica_group = self.replica_group_agent.force_refresh_get().await?;
        let version = replica_group.version();
        let term = replica_group.term();
        let primary_id = replica_group.primary_id();
        let recover_id = self.replica_group_agent.current_id();
        let request = ReplicaRecoverRequest::new(term, version, recover_id);
        let mut rpc_option = RpcOption::default();
        rpc_option.timeout = self.replica_option.recover_timeout();
        let recover_response = self
            .replica_client
            .replica_recover(primary_id, request, rpc_option)
            .await
            .map_err(|e| PacificaError::RpcClientError(e))?;
        match recover_response {
            ReplicaRecoverResponse::Success => {
                let _ = self.core_notification.core_state_change();
                Ok(())
            }
            ReplicaRecoverResponse::HigherTerm { term } => {
                let _ = self.core_notification.higher_term(term);
                Err(PacificaError::HigherTermError(HigherTermError::new(term)))
            }
        }
    }
}

impl<C> LoopHandler<C> for WorkHandler<C>
where
    C: TypeConfig,
{
    async fn run_loop(mut self, mut rx_shutdown: OneshotReceiverOf<C, ()>) -> Result<(), LifeCycleError> {
        loop {
            futures::select_biased! {
            _ = (&mut rx_shutdown).fuse() => {
                        tracing::info!("CandidateState received shutdown signal.");
                        break;
                }
            task_msg = self.rx_task.recv().fuse() => {
                    match task_msg {
                        Some(task) => {
                            self.handle_task(task).await?
                        }
                        None => {
                            tracing::warn!("received unexpected task message.");
                            break;
                        }
                    }
                }
            }
        }
        Ok(())
    }
}

impl<C, FSM> Component<C> for CandidateState<C, FSM>
where
    C: TypeConfig,
    FSM: StateMachine<C>,
{
    type LoopHandler = WorkHandler<C>;

    fn new_loop_handler(&self) -> Option<Self::LoopHandler> {
        self.work_handler.lock().unwrap().take()
    }
}

enum Task<C>
where
    C: TypeConfig,
{
    Recover {
        callback: ResultSender<C, (), PacificaError<C>>,
    },
}

#[derive(Debug, Copy, Clone)]
enum RecoverState {
    Recovering,
    Recovered,
    UnRecover,
}