use {
crate::{
Consistency,
PeerId,
groups::{
CommandError,
IndexRange,
QueryError,
QueryResultAt,
StateMachine,
Storage,
SyncContext,
raft::{role::Role, shared::Shared},
state::WorkerState,
},
primitives::{BoxPinFut, InternalFutureExt},
},
core::{
future::ready,
task::{Context, Poll},
},
std::sync::Arc,
};
mod candidate;
mod follower;
mod leader;
mod protocol;
mod role;
mod shared;
pub(super) use protocol::Message;
pub struct Raft<S, M>
where
S: Storage<M::Command>,
M: StateMachine,
{
role: role::Role<M>,
shared: shared::Shared<S, M>,
}
impl<S, M> Raft<S, M>
where
S: Storage<M::Command>,
M: StateMachine,
{
pub fn new(group: Arc<WorkerState<M>>, storage: S, state_machine: M) -> Self {
let shared = Shared::new(group, storage, state_machine);
let role = Role::new(&shared);
Self { role, shared }
}
pub fn receive_protocol_message(
&mut self,
message: Message<M>,
from: PeerId,
) {
self
.role
.receive_protocol_message(message, from, &mut self.shared);
}
pub fn feed(
&mut self,
commands: Vec<M::Command>,
) -> BoxPinFut<Result<IndexRange, CommandError<M>>> {
match &mut self.role {
Role::Leader(leader) => {
ready(Ok(leader.enqueue_commands(commands, &self.shared))).pin()
}
Role::Follower(follower) => {
follower.forward_commands(commands, &self.shared).pin()
}
Role::Candidate(_) => {
ready(Err(CommandError::Offline(commands))).pin()
}
}
}
pub fn query(
&mut self,
query: M::Query,
consistency: Consistency,
) -> BoxPinFut<Result<QueryResultAt<M>, QueryError<M>>> {
match &mut self.role {
Role::Leader(_) => {
ready(Ok(QueryResultAt {
result: self.shared.state_machine().query(query),
at_position: self.shared.committed().index(),
}))
.pin()
}
Role::Follower(follower) => match consistency {
Consistency::Weak => {
ready(Ok(QueryResultAt {
result: self.shared.state_machine().query(query),
at_position: self.shared.committed().index(),
}))
.pin()
}
Consistency::Strong => {
follower.forward_query(query, &self.shared)
}
},
Role::Candidate(_) => {
ready(Err(QueryError::Offline(query))).pin()
}
}
}
pub fn poll(&mut self, cx: &mut Context<'_>) -> Poll<()> {
self.role.poll(cx, &mut self.shared)
}
}