pacifica-rs 1.0.0

rust implementation of PacificA: Replication in Log-Based Distributed Storage Systems
use futures::FutureExt;
use crate::core::lifecycle::{Component, LoopHandler};
use crate::core::replica_group_agent::ReplicaGroupAgent;
use crate::core::{CoreNotification, Lifecycle, ReplicaComponent, ResultSender, TaskSender};
use crate::error::{LifeCycleError, PacificaError};
use crate::runtime::{MpscUnboundedReceiver, TypeConfigExt};
use crate::type_config::alias::{MpscUnboundedReceiverOf, OneshotReceiverOf};
use crate::util::{send_result, RepeatedTask, RepeatedTimer};
use crate::{ReplicaOption, ReplicaState, TypeConfig};
use std::sync::{Arc, Mutex};

pub(crate) struct StatelessState<C>
where
    C: TypeConfig,
{
    state_check_timer: RepeatedTimer<C>,

    work_handler: Mutex<Option<WorkHandler<C>>>,
    tx_task: TaskSender<C, Task<C>>,
}

impl<C> StatelessState<C>
where
    C: TypeConfig,
{
    pub(crate) fn new(
        replica_group_agent: Arc<ReplicaComponent<C, ReplicaGroupAgent<C>>>,
        core_notification: Arc<CoreNotification<C>>,
        replica_option: Arc<ReplicaOption>,
    ) -> StatelessState<C> {
        let state_check_interval = replica_option.recover_interval();
        let (tx_task, rx_task) = C::mpsc_unbounded();
        let state_checker: StatelessChecker<C> = StatelessChecker::new(TaskSender::new(tx_task.clone()));
        let state_check_timer = RepeatedTimer::new(state_checker, state_check_interval, false);

        let work_handler = WorkHandler::new(replica_group_agent, core_notification, rx_task);

        StatelessState {
            state_check_timer,
            work_handler: Mutex::new(Some(work_handler)),
            tx_task: TaskSender::new(tx_task),
        }
    }
}

enum Task<C>
where
    C: TypeConfig,
{
    Check {
        callback: ResultSender<C, (), PacificaError<C>>,
    },
}

impl<C> Lifecycle<C> for StatelessState<C>
where
    C: TypeConfig,
{
    async fn startup(&self) -> Result<(), LifeCycleError> {
        self.state_check_timer.turn_on();
        Ok(())
    }

    async fn shutdown(&self) -> Result<(), LifeCycleError> {
        self.state_check_timer.shutdown();
        Ok(())
    }
}

struct StatelessChecker<C>
where
    C: TypeConfig,
{
    tx_task: TaskSender<C, Task<C>>,
}

impl<C> StatelessChecker<C>
where
    C: TypeConfig,
{
    fn new(tx_task: TaskSender<C, Task<C>>) -> StatelessChecker<C> {
        StatelessChecker { tx_task }
    }

    async fn check_state(&self) -> Result<(), PacificaError<C>> {
        let (tx, rx) = C::oneshot();
        self.tx_task.send(Task::Check { callback: tx })?;
        let _ = rx.await?;
        Ok(())
    }
}

impl<C> RepeatedTask for StatelessChecker<C>
where
    C: TypeConfig,
{
    async fn execute(&mut self) {
        let _ = self.check_state().await;
    }
}

pub(crate) struct WorkHandler<C>
where
    C: TypeConfig,
{
    replica_group_agent: Arc<ReplicaComponent<C, ReplicaGroupAgent<C>>>,
    core_notification: Arc<CoreNotification<C>>,
    rx_task: MpscUnboundedReceiverOf<C, Task<C>>,
}

impl<C> WorkHandler<C>
where
    C: TypeConfig,
{
    fn new(
        replica_group_agent: Arc<ReplicaComponent<C, ReplicaGroupAgent<C>>>,
        core_notification: Arc<CoreNotification<C>>,
        rx_task: MpscUnboundedReceiverOf<C, Task<C>>,
    ) -> WorkHandler<C> {
        WorkHandler {
            replica_group_agent,
            core_notification,
            rx_task,
        }
    }
}

impl<C> WorkHandler<C>
where
    C: TypeConfig,
{
    async fn handle_task(&mut self, task: Task<C>) -> Result<(), LifeCycleError> {
        match task {
            Task::Check { callback } => {
                let result = self.handle_check().await;
                let _ = send_result::<C, (), PacificaError<C>>(callback, result);
            }
        }

        Ok(())
    }

    async fn handle_check(&mut self) -> Result<(), PacificaError<C>> {
        let _ = self.replica_group_agent.force_refresh_get().await;
        let new_state = self.replica_group_agent.get_self_state().await;
        if ReplicaState::Stateless != new_state {
            let _ = self.core_notification.core_state_change();
        }
        Ok(())
    }
}

impl<C> LoopHandler<C> for WorkHandler<C>
where
    C: TypeConfig,
{
    async fn run_loop(mut self, mut rx_shutdown: OneshotReceiverOf<C, ()>) -> Result<(), LifeCycleError> {
        loop {
            futures::select_biased! {
            _ = (&mut rx_shutdown).fuse() => {
                        tracing::debug!("StatelessState received shutdown signal.");
                        break;
                }
            task_msg = self.rx_task.recv().fuse() => {
                    match task_msg {
                        Some(task) => {
                            self.handle_task(task).await?
                        }
                        None => {
                            tracing::warn!("received unexpected task message.");
                            break;
                        }
                    }
                }

            }
        }
        Ok(())
    }
}

impl<C> Component<C> for StatelessState<C>
where
    C: TypeConfig,
{
    type LoopHandler = WorkHandler<C>;

    fn new_loop_handler(&self) -> Option<Self::LoopHandler> {
        self.work_handler.lock().unwrap().take()
    }
}