pub mod builder;
mod commits;
mod core;
mod handle;
mod info;
mod inner;
mod req_handler;
mod shell;
mod shutdown;
mod snapshot;
mod state_keeper;
mod status;
use futures::channel::mpsc;
use futures::future::BoxFuture;
use crate::append::AppendArgs;
use crate::applicable::ApplicableTo;
use crate::buffer::Buffer;
use crate::error::Disoriented;
use crate::error::PollError;
use crate::error::ShutDown;
use crate::error::ShutDownOr;
use crate::invocation;
use crate::invocation::Invocation;
use crate::retry::RetryPolicy;
#[cfg(feature = "tracer")]
use crate::tracer::Tracer;
use crate::voting::IndiscriminateVoter;
use crate::Event;
pub use self::core::Core;
pub use builder::NodeBuilder;
pub use commits::Commit;
pub use handle::NodeHandle;
pub use info::NodeInfo;
pub use req_handler::RequestHandler;
pub use shell::EnterOnPoll;
pub use shell::EnterOnPollOwned;
pub use shell::EnteredShell;
pub use shell::EnteredShellRef;
pub use shell::Sendable;
pub use shell::Shell;
pub use shell::Unsendable;
pub use shutdown::DefaultShutdown;
pub use shutdown::Shutdown;
pub use snapshot::Snapshot;
pub use state_keeper::Task;
pub use status::NodeStatus;
use state_keeper::StateKeeperKit;
pub type AbstainOf<N> = invocation::AbstainOf<InvocationOf<N>>;
pub type CommunicationErrorOf<N> = invocation::CommunicationErrorOf<InvocationOf<N>>;
pub type ContextOf<N> = invocation::ContextOf<InvocationOf<N>>;
pub type CoordNumOf<N> = invocation::CoordNumOf<InvocationOf<N>>;
pub type EffectOf<N> = invocation::EffectOf<InvocationOf<N>>;
pub type EjectionOf<N> = invocation::EjectionOf<InvocationOf<N>>;
pub type FrozenStateOf<N> = invocation::FrozenStateOf<InvocationOf<N>>;
pub type InvocationOf<N> = <N as Node>::Invocation;
pub type LogEntryOf<N> = invocation::LogEntryOf<InvocationOf<N>>;
pub type LogEntryIdOf<N> = invocation::LogEntryIdOf<InvocationOf<N>>;
pub type NayOf<N> = invocation::NayOf<InvocationOf<N>>;
pub type NodeOf<N> = invocation::NodeOf<InvocationOf<N>>;
pub type NodeIdOf<N> = invocation::NodeIdOf<InvocationOf<N>>;
pub type OutcomeOf<N> = invocation::OutcomeOf<InvocationOf<N>>;
pub type RoundNumOf<N> = invocation::RoundNumOf<InvocationOf<N>>;
pub type ShutdownOf<N> = <N as Node>::Shutdown;
pub type StateOf<N> = invocation::StateOf<InvocationOf<N>>;
pub type YeaOf<N> = invocation::YeaOf<InvocationOf<N>>;
pub type AcceptanceFor<N> = invocation::AcceptanceFor<InvocationOf<N>>;
pub type AppendResultFor<N, A, R> = Result<CommitFor<N, A>, <R as RetryPolicy>::StaticError>;
pub type ImplAppendResultFor<N, A, R> =
Result<CommitFor<N, A>, ShutDownOr<<R as RetryPolicy>::Error>>;
pub type CommitFor<N, A = LogEntryOf<N>> = invocation::CommitFor<InvocationOf<N>, A>;
pub type ConflictFor<N> = invocation::ConflictFor<InvocationOf<N>>;
pub type EventFor<N> = Event<InvocationOf<N>>;
pub type HandleFor<N> = NodeHandle<InvocationOf<N>>;
pub type IndiscriminateVoterFor<N> =
IndiscriminateVoter<StateOf<N>, RoundNumOf<N>, CoordNumOf<N>, AbstainOf<N>, YeaOf<N>, NayOf<N>>;
pub type PromiseFor<N> = invocation::PromiseFor<InvocationOf<N>>;
pub type RequestHandlerFor<N> = RequestHandler<InvocationOf<N>>;
pub type SnapshotFor<N> = invocation::SnapshotFor<InvocationOf<N>>;
pub type VoteFor<N> = invocation::VoteFor<InvocationOf<N>>;
pub trait Node: Sized {
type Invocation: Invocation;
type Shutdown: Shutdown<Invocation = Self::Invocation>;
fn id(&self) -> NodeIdOf<Self>;
fn status(&self) -> NodeStatus;
fn participation(&self) -> Participation<RoundNumOf<Self>>;
fn poll_events(&mut self, cx: &mut std::task::Context<'_>) -> std::task::Poll<EventFor<Self>>;
fn next_event(&mut self) -> NextEvent<'_, Self> {
NextEvent(self)
}
fn events(&mut self) -> Events<'_, Self> {
Events(self)
}
fn handle(&self) -> NodeHandle<Self::Invocation>;
fn prepare_snapshot(&self) -> BoxFuture<'static, SnapshotFor<Self>>;
fn affirm_snapshot(
&self,
snapshot: SnapshotFor<Self>,
) -> BoxFuture<'static, Result<(), crate::error::AffirmSnapshotError>>;
fn install_snapshot(
&self,
snapshot: SnapshotFor<Self>,
) -> BoxFuture<'static, Result<(), crate::error::InstallSnapshotError>>;
fn read_stale<F, T>(&self, f: F) -> BoxFuture<'_, Result<T, Disoriented>>
where
F: FnOnce(&StateOf<Self>) -> T + Send + 'static,
T: Send + 'static;
fn read_stale_infallibly<F, T>(&self, f: F) -> BoxFuture<'_, T>
where
F: FnOnce(Option<&StateOf<Self>>) -> T + Send + 'static,
T: Send + 'static;
fn read_stale_scoped<'read, F, T>(&self, f: F) -> BoxFuture<'read, Result<T, Disoriented>>
where
F: FnOnce(&StateOf<Self>) -> T + Send + 'read,
T: Send + 'static;
fn read_stale_scoped_infallibly<'read, F, T>(&self, f: F) -> BoxFuture<'read, T>
where
F: FnOnce(Option<&StateOf<Self>>) -> T + Send + 'read,
T: Send + 'static;
fn append<A, P, R>(
&mut self,
applicable: A,
args: P,
) -> BoxFuture<'static, AppendResultFor<Self, A, R>>
where
A: ApplicableTo<StateOf<Self>> + 'static,
P: Into<AppendArgs<Self::Invocation, R>>,
R: RetryPolicy<Invocation = Self::Invocation>,
R::StaticError: From<ShutDownOr<R::Error>>;
fn shut_down(self) -> Self::Shutdown;
}
pub trait NodeImpl: Node {
fn append_impl<A, P, R>(
&mut self,
applicable: A,
args: P,
) -> BoxFuture<'static, ImplAppendResultFor<Self, A, R>>
where
A: ApplicableTo<StateOf<Self>> + 'static,
P: Into<AppendArgs<Self::Invocation, R>>,
R: RetryPolicy<Invocation = Self::Invocation>;
fn await_commit_of(
&mut self,
log_entry_id: LogEntryIdOf<Self>,
) -> BoxFuture<'static, Result<CommitFor<Self>, ShutDown>>;
fn eject(&mut self, reason: EjectionOf<Self>) -> BoxFuture<'static, Result<bool, ShutDown>>;
fn poll(
&mut self,
round_num: RoundNumOf<Self>,
additional_nodes: Vec<NodeOf<Self>>,
) -> BoxFuture<'static, Result<bool, PollError<Self::Invocation>>>;
}
pub trait DelegatingNodeImpl: Node {
type Delegate: NodeImpl<Invocation = Self::Invocation>;
fn delegate(&mut self) -> &mut Self::Delegate;
}
impl<D: DelegatingNodeImpl> NodeImpl for D {
fn append_impl<A, P, R>(
&mut self,
applicable: A,
args: P,
) -> BoxFuture<'static, ImplAppendResultFor<Self, A, R>>
where
A: ApplicableTo<StateOf<Self>> + 'static,
P: Into<AppendArgs<Self::Invocation, R>>,
R: RetryPolicy<Invocation = Self::Invocation>,
{
self.delegate().append_impl(applicable, args)
}
fn await_commit_of(
&mut self,
log_entry_id: LogEntryIdOf<Self>,
) -> BoxFuture<'static, Result<CommitFor<Self>, ShutDown>> {
self.delegate().await_commit_of(log_entry_id)
}
fn eject(&mut self, reason: EjectionOf<Self>) -> BoxFuture<'static, Result<bool, ShutDown>> {
self.delegate().eject(reason)
}
fn poll(
&mut self,
round_num: RoundNumOf<Self>,
additional_nodes: Vec<NodeOf<Self>>,
) -> BoxFuture<'static, Result<bool, PollError<Self::Invocation>>> {
self.delegate().poll(round_num, additional_nodes)
}
}
pub struct NextEvent<'a, N: ?Sized>(&'a mut N);
impl<'a, N> std::future::Future for NextEvent<'a, N>
where
N: Node,
{
type Output = EventFor<N>;
fn poll(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Self::Output> {
self.0.poll_events(cx)
}
}
pub struct Events<'a, N: ?Sized>(&'a mut N);
impl<'a, N> futures::stream::Stream for Events<'a, N>
where
N: Node,
{
type Item = EventFor<N>;
fn poll_next(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
self.0.poll_events(cx).map(Some)
}
}
pub trait Admin {
fn force_active(&self) -> BoxFuture<'static, Result<bool, ShutDown>>;
}
pub(crate) struct SpawnArgs<I, V, B>
where
I: Invocation,
B: Buffer<
RoundNum = invocation::RoundNumOf<I>,
CoordNum = invocation::CoordNumOf<I>,
Entry = invocation::LogEntryOf<I>,
>,
{
pub context: invocation::ContextOf<I>,
pub node_id: invocation::NodeIdOf<I>,
pub voter: V,
pub snapshot: invocation::SnapshotFor<I>,
pub buffer: B,
#[cfg(feature = "tracer")]
pub tracer: Option<Box<dyn Tracer<I>>>,
}
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
pub enum Participation<R> {
Active,
PartiallyActive(R),
Passive,
}
pub struct NodeKit<I: Invocation> {
state_keeper: StateKeeperKit<I>,
sender: mpsc::Sender<handle::RequestAndResponseSender<I>>,
receiver: mpsc::Receiver<handle::RequestAndResponseSender<I>>,
}
impl<I: Invocation> NodeKit<I> {
pub fn new() -> Self {
let (sender, receiver) = mpsc::channel(16);
Self {
state_keeper: StateKeeperKit::new(),
sender,
receiver,
}
}
pub fn handle(&self) -> NodeHandle<I> {
NodeHandle::new(self.sender.clone(), self.state_keeper.handle())
}
}
impl<I: Invocation> Default for NodeKit<I> {
fn default() -> Self {
Self::new()
}
}