use std::error::Error;
use std::fmt::{Debug, Display, Formatter};
use crate::core::fsm::{CommitResultBatch, StateMachineCaller};
use crate::core::lifecycle::{ReplicaComponent};
use crate::core::log::LogManager;
use crate::core::operation::Operation;
use crate::core::replica_group_agent::ReplicaGroupAgent;
use crate::core::state::candidate_state::CandidateState;
use crate::core::state::primary_state::PrimaryState;
use crate::core::state::secondary_state::SecondaryState;
use crate::core::state::stateless_state::StatelessState;
use crate::core::{CoreNotification, Lifecycle};
use crate::error::{LifeCycleError, PacificaError, ReplicaStateError};
use crate::rpc::message::{
AppendEntriesRequest, AppendEntriesResponse,
ReplicaRecoverRequest, ReplicaRecoverResponse, TransferPrimaryRequest, TransferPrimaryResponse,
};
use crate::{ReplicaId, ReplicaOption, ReplicaState, TypeConfig};
use std::sync::Arc;
use std::time::Duration;
use crate::core::snapshot::SnapshotExecutor;
use crate::fsm::StateMachine;
mod append_entries_handler;
mod candidate_state;
mod primary_state;
mod secondary_state;
mod stateless_state;
pub(crate) enum CoreState<C, FSM>
where
C: TypeConfig,
FSM: StateMachine<C>,
{
Primary {
state: ReplicaComponent<C, PrimaryState<C, FSM>>,
},
Secondary {
state: ReplicaComponent<C, SecondaryState<C, FSM>>,
},
Candidate {
state: ReplicaComponent<C, CandidateState<C, FSM>>,
},
Stateless {
state: ReplicaComponent<C, StatelessState<C>>,
},
Shutdown,
}
impl<C, FSM> CoreState<C, FSM>
where
C: TypeConfig,
FSM: StateMachine<C>,
{
pub(crate) fn new_primary(
fsm_caller: Arc<ReplicaComponent<C, StateMachineCaller<C, FSM>>>,
log_manager: Arc<ReplicaComponent<C, LogManager<C>>>,
replica_group_agent: Arc<ReplicaComponent<C, ReplicaGroupAgent<C>>>,
snapshot_executor: Arc<ReplicaComponent<C, SnapshotExecutor<C, FSM>>>,
core_notification: Arc<CoreNotification<C>>,
replica_client: Arc<C::ReplicaClient>,
replica_option: Arc<ReplicaOption>,
) -> CoreState <C, FSM> {
let next_log_index = log_manager.get_last_log_index() + 1;
let primary_state = PrimaryState::new(
next_log_index,
fsm_caller,
log_manager,
snapshot_executor,
replica_group_agent,
core_notification,
replica_client,
replica_option,
);
CoreState::Primary {
state: ReplicaComponent::new(primary_state),
}
}
pub(crate) fn new_secondary(
fsm_caller: Arc<ReplicaComponent<C, StateMachineCaller<C, FSM>>>,
log_manager: Arc<ReplicaComponent<C, LogManager<C>>>,
replica_group_agent: Arc<ReplicaComponent<C, ReplicaGroupAgent<C>>>,
core_notification: Arc<CoreNotification<C>>,
replica_option: Arc<ReplicaOption>,
) -> CoreState <C, FSM> {
let state = SecondaryState::new(
fsm_caller,
log_manager,
replica_group_agent,
core_notification,
replica_option,
);
CoreState::Secondary {
state: ReplicaComponent::new(state),
}
}
pub(crate) fn new_candidate(
fsm_caller: Arc<ReplicaComponent<C, StateMachineCaller<C, FSM>>>,
log_manager: Arc<ReplicaComponent<C, LogManager<C>>>,
replica_group_agent: Arc<ReplicaComponent<C, ReplicaGroupAgent<C>>>,
replica_client: Arc<C::ReplicaClient>,
core_notification: Arc<CoreNotification<C>>,
replica_option: Arc<ReplicaOption>,
) -> CoreState <C, FSM> {
let state = CandidateState::new(
replica_client,
log_manager,
fsm_caller,
replica_group_agent,
core_notification,
replica_option,
);
CoreState::Candidate {
state: ReplicaComponent::new(state),
}
}
pub(crate) fn new_stateless(
replica_group_agent: Arc<ReplicaComponent<C, ReplicaGroupAgent<C>>>,
core_notification: Arc<CoreNotification<C>>,
replica_option: Arc<ReplicaOption>,
) -> CoreState <C, FSM> {
let state = StatelessState::new(replica_group_agent, core_notification, replica_option);
CoreState::Stateless {
state: ReplicaComponent::new(state),
}
}
pub(crate) fn is_primary(&self) -> bool {
match self {
CoreState::Primary { .. } => true,
_ => false,
}
}
pub(crate) fn is_secondary(&self) -> bool {
match self {
CoreState::Secondary { .. } => true,
_ => false,
}
}
pub(crate) fn is_candidate(&self) -> bool {
match self {
CoreState::Candidate { .. } => true,
_ => false,
}
}
pub(crate) fn get_replica_state(&self) -> ReplicaState {
let state = match self {
CoreState::Primary { state: _ } => ReplicaState::Primary,
CoreState::Secondary { state: _ } => ReplicaState::Secondary,
CoreState::Candidate { state: _ }=> ReplicaState::Candidate,
CoreState::Shutdown => ReplicaState::Shutdown,
CoreState::Stateless { state: _ }=> ReplicaState::Stateless,
};
state
}
pub(crate) fn commit_operation(&self, operation: Operation<C>) -> Result<(), CommitOperationError<C>> {
match self {
CoreState::Primary { state: primary } => {
primary.commit(operation)?;
Ok(())
}
_ => {
let error = PacificaError::ReplicaStateError(ReplicaStateError::primary_but_not(
self.get_replica_state()
));
Err(CommitOperationError::new(operation, error))
}
}
}
pub(crate) fn send_commit_result(&self, result: CommitResultBatch<C>) -> Result<(), PacificaError<C>> {
match self {
CoreState::Primary { state: primary } => {
primary.send_commit_result(result)?;
Ok(())
}
_ => {
let error = PacificaError::ReplicaStateError(ReplicaStateError::primary_but_not(
self.get_replica_state(),
));
tracing::warn!("send_commit_result, occurred an error: {}", error);
Err(error)
}
}
}
pub(crate) async fn transfer_primary(&self, new_primary: ReplicaId<C::NodeId>, timeout: Duration) -> Result<(), PacificaError<C>> {
match self {
CoreState::Primary { state } => state.transfer_primary(new_primary, timeout).await,
_ => Err(PacificaError::ReplicaStateError(ReplicaStateError::primary_but_not(
self.get_replica_state(),
))),
}
}
pub(crate) async fn replica_recover(&self) -> Result<(), PacificaError<C>> {
match self {
CoreState::Candidate { state } => state.recover().await,
_ => Err(PacificaError::ReplicaStateError(ReplicaStateError::candidate_but_not(
self.get_replica_state(),
))),
}
}
pub(crate) async fn handle_append_entries_request(
&self,
request: AppendEntriesRequest<C>,
) -> Result<AppendEntriesResponse, PacificaError<C>> {
match self {
CoreState::Secondary { state } => state.handle_append_entries_request(request).await,
CoreState::Candidate { state } => state.handle_append_entries_request(request).await,
_ => {
let error = PacificaError::ReplicaStateError(ReplicaStateError::secondary_or_candidate_but_not(
self.get_replica_state(),
));
tracing::warn!("handle append_entries_request, occurred an error: {}", error);
Err(error)
}
}
}
pub(crate) async fn handle_replica_recover_request(
&self,
request: ReplicaRecoverRequest<C>,
) -> Result<ReplicaRecoverResponse, PacificaError<C>> {
match self {
CoreState::Primary { state: primary } => primary.replica_recover(request).await,
_ => {
let error =
PacificaError::ReplicaStateError(ReplicaStateError::primary_but_not(self.get_replica_state()));
tracing::warn!("handle replica_recover_request, occurred an error: {}", error);
Err(error)
}
}
}
pub(crate) async fn handle_transfer_primary_request(
&self,
request: TransferPrimaryRequest<C>,
) -> Result<TransferPrimaryResponse, PacificaError<C>> {
match self {
CoreState::Secondary { state } => state.handle_transfer_primary_request(request).await,
_ => {
let error =
PacificaError::ReplicaStateError(ReplicaStateError::secondary_but_not(self.get_replica_state()));
tracing::warn!("handle transfer_primary_request, occurred an error: {}", error);
Err(error)
}
}
}
}
impl<C, FSM> Lifecycle<C> for CoreState<C, FSM>
where
C: TypeConfig,
FSM: StateMachine<C>,
{
async fn startup(&self) -> Result<(), LifeCycleError> {
match self {
CoreState::Primary { state: primary } => {
primary.startup().await?;
primary.reconciliation().await
},
CoreState::Secondary { state } => state.startup().await,
CoreState::Candidate { state } => state.startup().await,
CoreState::Stateless { state } => state.startup().await,
CoreState::Shutdown => Ok(()),
}
}
async fn shutdown(&self) -> Result<(), LifeCycleError> {
match self {
CoreState::Primary { state: primary } => primary.shutdown().await,
CoreState::Secondary { state } => state.shutdown().await,
CoreState::Candidate { state } => state.shutdown().await,
CoreState::Stateless { state } => state.shutdown().await,
CoreState::Shutdown => Ok(()),
}
}
}
pub(crate) struct CommitOperationError<C>
where
C: TypeConfig,
{
pub operation: Operation<C>,
pub error: PacificaError<C>,
}
impl<C> CommitOperationError<C>
where
C: TypeConfig,
{
pub(crate) fn new(operation: Operation<C>,
error: PacificaError<C>, ) -> CommitOperationError<C> {
CommitOperationError {
operation,
error,
}
}
}
impl<C> Debug for CommitOperationError<C>
where
C: TypeConfig,
{
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "An Error=({}) occurred while commit operation({})", self.error, self.operation)
}
}
impl<C> Display for CommitOperationError<C>
where
C: TypeConfig,
{
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "{:?}", self)
}
}
impl<C> Error for CommitOperationError<C>
where C: TypeConfig{
}