use futures::FutureExt;
use crate::core::fsm::StateMachineCaller;
use crate::core::lifecycle::{Component, Lifecycle, LoopHandler, ReplicaComponent};
use crate::core::log::{LogManager};
use crate::core::replica_group_agent::ReplicaGroupAgent;
use crate::core::state::append_entries_handler::AppendEntriesHandler;
use crate::core::{CoreNotification, ResultSender, TaskSender};
use crate::error::{LifeCycleError, PacificaError};
use crate::rpc::message::{
AppendEntriesRequest, AppendEntriesResponse, TransferPrimaryRequest, TransferPrimaryResponse,
};
use crate::runtime::{MpscUnboundedReceiver, TypeConfigExt};
use crate::type_config::alias::{InstantOf, MpscUnboundedReceiverOf, OneshotReceiverOf};
use crate::util::{send_result, Leased, RepeatedTask, RepeatedTimer};
use crate::{ReplicaOption, TypeConfig};
use std::sync::{Arc, Mutex, RwLock};
use crate::fsm::StateMachine;
pub(crate) struct SecondaryState<C, FSM>
where
C: TypeConfig,
FSM: StateMachine<C>,
{
grace_period: Arc<RwLock<Leased<InstantOf<C>>>>,
grace_period_timer: RepeatedTimer<C>,
replica_group_agent: Arc<ReplicaComponent<C, ReplicaGroupAgent<C>>>,
work_handler: Mutex<Option<WorkHandler<C, FSM>>>,
tx_task: TaskSender<C, Task<C>>,
}
impl<C, FSM> SecondaryState<C, FSM>
where
C: TypeConfig,
FSM: StateMachine<C>,
{
pub(crate) fn new(
fsm_caller: Arc<ReplicaComponent<C, StateMachineCaller<C, FSM>>>,
log_manager: Arc<ReplicaComponent<C, LogManager<C>>>,
replica_group_agent: Arc<ReplicaComponent<C, ReplicaGroupAgent<C>>>,
core_notification: Arc<CoreNotification<C>>,
replica_option: Arc<ReplicaOption>,
) -> SecondaryState<C, FSM> {
let grace_period_timeout = replica_option.grace_period_timeout();
let grace_period = Leased::new(C::now(), grace_period_timeout.clone());
let grace_period = Arc::new(RwLock::new(grace_period));
let (tx_task, rx_task) = C::mpsc_unbounded();
let grace_tx_task = TaskSender::new(tx_task.clone());
let grace_period_checker: GracePeriodChecker<C> = GracePeriodChecker::new(grace_period.clone(), grace_tx_task);
let grace_period_timer = RepeatedTimer::new(grace_period_checker, grace_period_timeout, false);
let append_entries_handler = AppendEntriesHandler::new(
grace_period.clone(),
log_manager.clone(),
fsm_caller.clone(),
replica_group_agent.clone(),
core_notification.clone(),
replica_option.clone(),
);
let work_handler = WorkHandler::new(
replica_group_agent.clone(),
append_entries_handler,
core_notification.clone(),
rx_task,
);
Self {
grace_period,
grace_period_timer,
replica_group_agent,
work_handler: Mutex::new(Some(work_handler)),
tx_task: TaskSender::new(tx_task),
}
}
pub(crate) fn get_grace_period(&self) -> Leased<InstantOf<C>> {
self.grace_period.read().unwrap().clone()
}
pub(crate) fn is_grace_period_expired(&self) -> bool {
self.get_grace_period().is_expired(C::now())
}
pub(crate) async fn elect_self(&self) -> Result<(), PacificaError<C>> {
let (tx, rx) = C::oneshot();
self.tx_task.send(Task::ElectSelf { callback: tx })?;
let result: Result<(), PacificaError<C>> = rx.await?;
result
}
pub(crate) async fn handle_append_entries_request(
&self,
request: AppendEntriesRequest<C>,
) -> Result<AppendEntriesResponse, PacificaError<C>> {
let (callback, rx) = C::oneshot();
self.tx_task.send(Task::AppendEntries { request, callback })?;
let result: Result<AppendEntriesResponse, PacificaError<C>> = rx.await?;
result
}
pub(crate) async fn handle_transfer_primary_request(
&self,
request: TransferPrimaryRequest<C>,
) -> Result<TransferPrimaryResponse, PacificaError<C>> {
let replica_group = self.replica_group_agent.get_replica_group().await?;
let cur_term = replica_group.term();
if request.term < cur_term {
return Ok(TransferPrimaryResponse::higher_term(cur_term));
}
self.elect_self().await?;
Ok(TransferPrimaryResponse::Success)
}
}
impl<C, FSM> Lifecycle<C> for SecondaryState<C, FSM>
where
C: TypeConfig,
FSM: StateMachine<C>,
{
async fn startup(&self) -> Result<(), LifeCycleError> {
self.grace_period_timer.turn_on();
Ok(())
}
async fn shutdown(&self) -> Result<(), LifeCycleError> {
self.grace_period_timer.shutdown();
Ok(())
}
}
struct GracePeriodChecker<C>
where
C: TypeConfig,
{
grace_period: Arc<RwLock<Leased<InstantOf<C>>>>,
tx_task: TaskSender<C, Task<C>>,
}
impl<C> GracePeriodChecker<C>
where
C: TypeConfig,
{
fn new(grace_period: Arc<RwLock<Leased<InstantOf<C>>>>, tx_task: TaskSender<C, Task<C>>) -> GracePeriodChecker<C> {
GracePeriodChecker { grace_period, tx_task }
}
fn is_grace_period_expired(&self) -> bool {
self.grace_period.read().unwrap().is_expired(C::now())
}
async fn handle_grace_period_check(&self) -> Result<(), PacificaError<C>> {
if self.is_grace_period_expired() {
let (tx, rx) = C::oneshot();
self.tx_task.send(Task::ElectSelf { callback: tx })?;
let result: Result<(), PacificaError<C>> = rx.await?;
return result;
}
Ok(())
}
}
impl<C> RepeatedTask for GracePeriodChecker<C>
where
C: TypeConfig,
{
async fn execute(&mut self) {
let result = self.handle_grace_period_check().await;
if let Err(e) = result {
tracing::error!("Failed to handle grace period check. err: {}", e);
}
}
}
pub(crate) struct WorkHandler<C, FSM>
where
C: TypeConfig,
FSM: StateMachine<C>,
{
replica_group_agent: Arc<ReplicaComponent<C, ReplicaGroupAgent<C>>>,
append_entries_handler: AppendEntriesHandler<C, FSM>,
core_notification: Arc<CoreNotification<C>>,
rx_task: MpscUnboundedReceiverOf<C, Task<C>>,
}
impl<C, FSM> WorkHandler<C, FSM>
where
C: TypeConfig,
FSM: StateMachine<C>,
{
fn new(
replica_group_agent: Arc<ReplicaComponent<C, ReplicaGroupAgent<C>>>,
append_entries_handler: AppendEntriesHandler<C, FSM>,
core_notification: Arc<CoreNotification<C>>,
rx_task: MpscUnboundedReceiverOf<C, Task<C>>,
) -> WorkHandler<C, FSM> {
WorkHandler {
replica_group_agent,
append_entries_handler,
core_notification,
rx_task,
}
}
async fn handle_task(&mut self, task: Task<C>) -> Result<(), LifeCycleError> {
match task {
Task::AppendEntries { request, callback } => {
let result = self.handle_append_entries_request(request).await;
let _ = send_result::<C, AppendEntriesResponse, PacificaError<C>>(callback, result);
}
Task::ElectSelf { callback } => {
let result = self.handle_elect_self().await;
let _ = send_result::<C, (), PacificaError<C>>(callback, result);
}
}
Ok(())
}
async fn handle_elect_self(&self) -> Result<(), PacificaError<C>> {
self.replica_group_agent.elect_self().await?;
self.core_notification.core_state_change()?;
Ok(())
}
async fn handle_append_entries_request(
&self,
request: AppendEntriesRequest<C>,
) -> Result<AppendEntriesResponse, PacificaError<C>> {
self.append_entries_handler.handle_append_entries_request(request).await
}
}
impl<C, FSM> LoopHandler<C> for WorkHandler<C, FSM>
where
C: TypeConfig,
FSM: StateMachine<C>,
{
async fn run_loop(mut self, mut rx_shutdown: OneshotReceiverOf<C, ()>) -> Result<(), LifeCycleError> {
loop {
futures::select_biased! {
_ = (&mut rx_shutdown).fuse() => {
tracing::info!("SecondaryState received shutdown signal.");
break;
}
task_msg = self.rx_task.recv().fuse() => {
match task_msg {
Some(task) => {
self.handle_task(task).await?
}
None => {
tracing::warn!("received unexpected task message.");
break;
}
}
}
}
}
Ok(())
}
}
impl<C, FSM> Component<C> for SecondaryState<C, FSM>
where
C: TypeConfig,
FSM: StateMachine<C>,
{
type LoopHandler = WorkHandler<C, FSM>;
fn new_loop_handler(&self) -> Option<Self::LoopHandler> {
self.work_handler.lock().unwrap().take()
}
}
enum Task<C>
where
C: TypeConfig,
{
AppendEntries {
request: AppendEntriesRequest<C>,
callback: ResultSender<C, AppendEntriesResponse, PacificaError<C>>,
},
ElectSelf {
callback: ResultSender<C, (), PacificaError<C>>,
},
}