mod replication_handler;
pub use replication_handler::*;
#[cfg(test)]
pub mod replication_handler_test;
use std::collections::HashMap;
use std::sync::Arc;
use d_engine_proto::client::ClientResponse;
use d_engine_proto::common::Entry;
use d_engine_proto::common::EntryPayload;
use d_engine_proto::server::replication::AppendEntriesRequest;
use d_engine_proto::server::replication::AppendEntriesResponse;
use d_engine_proto::server::replication::ConflictResult;
use d_engine_proto::server::replication::SuccessResult;
use dashmap::DashMap;
#[cfg(any(test, feature = "__test_support"))]
use mockall::automock;
use tonic::Status;
use tonic::async_trait;
use super::LeaderStateSnapshot;
use super::StateSnapshot;
use crate::AppendResults;
use crate::MaybeCloneOneshotSender;
use crate::Result;
use crate::TypeConfig;
use crate::alias::ROF;
#[derive(Debug)]
pub struct RaftRequestWithSignal {
#[allow(unused)]
pub id: String,
pub payloads: Vec<EntryPayload>,
pub senders: Vec<MaybeCloneOneshotSender<std::result::Result<ClientResponse, Status>>>,
pub wait_for_apply_event: bool,
}
#[derive(Debug)]
pub struct AppendResponseWithUpdates {
pub response: AppendEntriesResponse,
pub commit_index_update: Option<u64>, }
#[cfg_attr(any(test, feature = "__test_support"), automock)]
#[async_trait]
pub trait ReplicationCore<T>: Send + Sync + 'static
where
T: TypeConfig,
{
async fn handle_raft_request_in_batch(
&self,
entry_payloads: Vec<EntryPayload>,
state_snapshot: StateSnapshot,
leader_state_snapshot: LeaderStateSnapshot,
cluster_metadata: &crate::raft_role::ClusterMetadata,
ctx: &crate::RaftContext<T>,
) -> Result<AppendResults>;
fn handle_success_response(
&self,
peer_id: u32,
peer_term: u64,
success_result: SuccessResult,
leader_term: u64,
) -> Result<crate::PeerUpdate>;
fn handle_conflict_response(
&self,
peer_id: u32,
conflict_result: ConflictResult,
raft_log: &Arc<ROF<T>>,
current_next_index: u64,
) -> Result<crate::PeerUpdate>;
fn if_update_commit_index_as_follower(
my_commit_index: u64,
last_raft_log_id: u64,
leader_commit_index: u64,
) -> Option<u64>;
fn retrieve_to_be_synced_logs_for_peers(
&self,
new_entries: Vec<Entry>,
leader_last_index_before_inserting_new_entries: u64,
max_legacy_entries_per_peer: u64, peer_next_indices: &HashMap<u32, u64>,
raft_log: &Arc<ROF<T>>,
) -> DashMap<u32, Vec<Entry>>;
async fn handle_append_entries(
&self,
request: AppendEntriesRequest,
state_snapshot: &StateSnapshot,
raft_log: &Arc<ROF<T>>,
) -> Result<AppendResponseWithUpdates>;
fn check_append_entries_request_is_legal(
&self,
my_term: u64,
request: &AppendEntriesRequest,
raft_log: &Arc<ROF<T>>,
) -> AppendEntriesResponse;
}