use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};
use std::time::Duration;
use async_trait::async_trait;
use omnipaxos::messages::Message;
use omnipaxos::storage::Storage;
use omnipaxos::{OmniPaxos, ProposeErr};
use parking_lot::Mutex;
use tsoracle_consensus::{AdvancePayload, ConsensusError};
use tsoracle_paxos_toolkit::lifecycle::{LeaderEventSubscriber, MessageSink, PaxosRunner, TsoPeer};
use crate::apply::{ApplyEngine, ApplyTask};
use crate::host::PaxosHighWaterHost;
use crate::log_entry::HighWaterCommand;
use crate::snapshot_policy::SnapshotPolicy;
pub struct StandaloneHost<S>
where
S: Storage<HighWaterCommand> + Send + 'static,
{
omnipaxos: Arc<Mutex<OmniPaxos<HighWaterCommand, S>>>,
my_node_id: u64,
barrier_seq: AtomicU64,
runner: PaxosRunner<HighWaterCommand, S>,
leader_subscriber: Mutex<Option<LeaderEventSubscriber>>,
engine: ApplyEngine,
task: Option<ApplyTask>,
apply_cursor: Arc<Mutex<u64>>,
barrier_timeout: Duration,
}
pub const DEFAULT_BARRIER_TIMEOUT: Duration = Duration::from_secs(5);
impl<S> StandaloneHost<S>
where
S: Storage<HighWaterCommand> + Send + 'static,
<HighWaterCommand as omnipaxos::storage::Entry>::Snapshot: Send,
{
pub fn new(
omnipaxos: Arc<Mutex<OmniPaxos<HighWaterCommand, S>>>,
my_node_id: u64,
peers: Vec<TsoPeer>,
tick_interval: Duration,
policy: SnapshotPolicy,
barrier_timeout: Duration,
) -> Self {
let mut runner = PaxosRunner::new(omnipaxos.clone(), my_node_id, peers, tick_interval);
let leader_subscriber = runner.take_leader_subscriber();
let engine = ApplyEngine::new(policy);
let mut recovery_cursor = 0u64;
engine.recover(&omnipaxos, &mut recovery_cursor);
let recovered_seq = crate::state_machine::max_logged_barrier_seq(&omnipaxos, my_node_id);
Self {
omnipaxos,
my_node_id,
barrier_seq: AtomicU64::new(recovered_seq),
runner,
leader_subscriber: Mutex::new(leader_subscriber),
engine,
task: None,
apply_cursor: Arc::new(Mutex::new(recovery_cursor)),
barrier_timeout,
}
}
pub fn take_leader_subscriber(&self) -> Option<LeaderEventSubscriber> {
self.leader_subscriber.lock().take()
}
pub fn omnipaxos_handle(&self) -> Arc<Mutex<OmniPaxos<HighWaterCommand, S>>> {
self.omnipaxos.clone()
}
pub fn start<Sink: MessageSink<HighWaterCommand>>(
&mut self,
sink: Arc<Sink>,
) -> Result<(), AlreadyRunning> {
if self.task.is_some() {
return Err(AlreadyRunning);
}
self.runner
.start(sink)
.map_err(|_runner_already_running| AlreadyRunning)?;
self.task = Some(self.engine.spawn(
self.runner.apply_notify(),
self.omnipaxos.clone(),
self.apply_cursor.clone(),
));
Ok(())
}
pub async fn stop(&mut self) {
if let Some(task) = self.task.take() {
task.stop().await;
}
self.runner.stop().await;
}
pub fn current_value(&self) -> u64 {
self.engine.high_water()
}
pub fn step(&self) -> Vec<Message<HighWaterCommand>> {
let outgoing = self.runner.tick_once();
self.apply_once();
outgoing
}
pub fn tick_only(&self) -> Vec<Message<HighWaterCommand>> {
self.runner.tick_once()
}
pub fn apply_once(&self) {
let mut cursor = self.apply_cursor.lock();
self.engine.apply_step(&self.omnipaxos, &mut cursor);
}
pub fn deliver(&self, message: Message<HighWaterCommand>) {
self.runner.handle_incoming(message);
}
async fn await_barrier(&self, seq: u64, floor: Option<u64>) -> Result<u64, ConsensusError> {
let notifier = self.engine.apply_notifier();
let wait = async {
loop {
let notified = notifier.notified();
tokio::pin!(notified);
notified.as_mut().enable();
let folded = self.engine.applied_barrier_seq(self.my_node_id) >= seq;
let floor_met = match floor {
Some(at_least) => self.engine.high_water() >= at_least,
None => true,
};
if folded && floor_met {
return Ok(self.engine.high_water());
}
if self.engine.apply_task_died() {
return Err(ConsensusError::PermanentDriver(Box::new(ApplyTaskGone)));
}
notified.await;
}
};
match tokio::time::timeout(self.barrier_timeout, wait).await {
Ok(result) => result,
Err(_elapsed) => Err(ConsensusError::TransientDriver(Box::new(
BarrierWaitTimeout(self.barrier_timeout),
))),
}
}
}
pub struct StandaloneHostBuilder<S>
where
S: Storage<HighWaterCommand> + Send + 'static,
{
omnipaxos: Option<Arc<Mutex<OmniPaxos<HighWaterCommand, S>>>>,
my_node_id: Option<u64>,
peers: Vec<TsoPeer>,
tick_interval: Duration,
policy: SnapshotPolicy,
barrier_timeout: Duration,
}
impl<S> Default for StandaloneHostBuilder<S>
where
S: Storage<HighWaterCommand> + Send + 'static,
{
fn default() -> Self {
Self {
omnipaxos: None,
my_node_id: None,
peers: Vec::new(),
tick_interval: Duration::from_millis(20),
policy: SnapshotPolicy::disabled(),
barrier_timeout: DEFAULT_BARRIER_TIMEOUT,
}
}
}
impl<S> StandaloneHostBuilder<S>
where
S: Storage<HighWaterCommand> + Send + 'static,
<HighWaterCommand as omnipaxos::storage::Entry>::Snapshot: Send,
{
pub fn omnipaxos(mut self, omnipaxos: Arc<Mutex<OmniPaxos<HighWaterCommand, S>>>) -> Self {
self.omnipaxos = Some(omnipaxos);
self
}
pub fn my_node_id(mut self, node_id: u64) -> Self {
self.my_node_id = Some(node_id);
self
}
pub fn peers(mut self, peers: Vec<TsoPeer>) -> Self {
self.peers = peers;
self
}
pub fn tick_interval(mut self, tick_interval: Duration) -> Self {
self.tick_interval = tick_interval;
self
}
pub fn snapshot_policy(mut self, policy: SnapshotPolicy) -> Self {
self.policy = policy;
self
}
pub fn barrier_timeout(mut self, barrier_timeout: Duration) -> Self {
self.barrier_timeout = barrier_timeout;
self
}
pub fn build(self) -> Result<StandaloneHost<S>, BuilderError> {
let omnipaxos = self.omnipaxos.ok_or(BuilderError::MissingOmnipaxos)?;
let my_node_id = self.my_node_id.ok_or(BuilderError::MissingNodeId)?;
Ok(StandaloneHost::new(
omnipaxos,
my_node_id,
self.peers,
self.tick_interval,
self.policy,
self.barrier_timeout,
))
}
}
impl<S> StandaloneHost<S>
where
S: Storage<HighWaterCommand> + Send + 'static,
<HighWaterCommand as omnipaxos::storage::Entry>::Snapshot: Send,
{
#[must_use]
pub fn builder() -> StandaloneHostBuilder<S> {
StandaloneHostBuilder::default()
}
}
#[derive(Debug, thiserror::Error)]
pub enum BuilderError {
#[error("omnipaxos handle is required")]
MissingOmnipaxos,
#[error("my_node_id is required")]
MissingNodeId,
}
#[derive(Debug, thiserror::Error)]
#[error("StandaloneHost::start called while already running")]
pub struct AlreadyRunning;
#[async_trait]
impl<S> PaxosHighWaterHost for StandaloneHost<S>
where
S: Storage<HighWaterCommand> + Send + 'static,
<HighWaterCommand as omnipaxos::storage::Entry>::Snapshot: Send,
{
type Entry = HighWaterCommand;
type Storage = S;
fn omnipaxos(&self) -> Arc<Mutex<OmniPaxos<HighWaterCommand, S>>> {
self.omnipaxos.clone()
}
async fn current_high_water(&self) -> Result<u64, ConsensusError> {
let seq = self.barrier_seq.fetch_add(1, Ordering::SeqCst) + 1;
self.omnipaxos
.lock()
.append(HighWaterCommand::Barrier {
node: self.my_node_id,
seq,
})
.map_err(|err| classify_append_error(err, ProposedCommand::Barrier))?;
tsoracle_yieldpoint::yieldpoint!(
"standalone_host::current_high_water::after_append_before_await"
);
self.await_barrier(seq, None).await
}
async fn submit_advance(&self, at_least: u64) -> Result<u64, ConsensusError> {
self.omnipaxos
.lock()
.append(HighWaterCommand::Advance(AdvancePayload { at_least }))
.map_err(|err| classify_append_error(err, ProposedCommand::Advance))?;
let seq = self.barrier_seq.fetch_add(1, Ordering::SeqCst) + 1;
self.omnipaxos
.lock()
.append(HighWaterCommand::Barrier {
node: self.my_node_id,
seq,
})
.map_err(|err| classify_append_error(err, ProposedCommand::Barrier))?;
tsoracle_yieldpoint::yieldpoint!(
"standalone_host::submit_advance::after_append_before_await"
);
self.await_barrier(seq, Some(at_least)).await
}
}
#[derive(Debug, Clone, Copy)]
enum ProposedCommand {
Advance,
Barrier,
}
impl std::fmt::Display for ProposedCommand {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
ProposedCommand::Advance => f.write_str("advance"),
ProposedCommand::Barrier => f.write_str("barrier"),
}
}
}
#[derive(Debug, thiserror::Error)]
enum AppendRejected {
#[error("{command} append rejected: configuration stopped by a pending reconfiguration")]
ConfigurationStopped { command: ProposedCommand },
#[error("{command} append rejected: unexpected reconfiguration-proposal error")]
UnexpectedReconfiguration { command: ProposedCommand },
}
fn classify_append_error(
err: ProposeErr<HighWaterCommand>,
command: ProposedCommand,
) -> ConsensusError {
let rejected = match err {
ProposeErr::PendingReconfigEntry(_) => AppendRejected::ConfigurationStopped { command },
ProposeErr::PendingReconfigConfig(..) | ProposeErr::ConfigError(..) => {
AppendRejected::UnexpectedReconfiguration { command }
}
};
ConsensusError::PermanentDriver(Box::new(rejected))
}
#[derive(Debug, thiserror::Error)]
#[error("barrier wait timed out after {0:?}")]
struct BarrierWaitTimeout(Duration);
#[derive(Debug, thiserror::Error)]
#[error("apply task is gone; barrier can never be folded")]
struct ApplyTaskGone;
#[cfg(test)]
mod tests {
use super::*;
#[allow(dead_code)]
fn assert_builder_api_compiles<S>()
where
S: Storage<HighWaterCommand> + Send + 'static,
<HighWaterCommand as omnipaxos::storage::Entry>::Snapshot: Send,
{
let _ = StandaloneHost::<S>::builder();
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn apply_cursor_is_seeded_from_recovered_decided_suffix() {
use omnipaxos::{ClusterConfig, OmniPaxosConfig, ServerConfig};
use std::time::Duration;
use tokio::sync::mpsc;
use tsoracle_paxos_toolkit::test_fakes::mem_network::MemNetwork;
use tsoracle_paxos_toolkit::test_fakes::mem_storage::MemStorage;
type Handle = Arc<Mutex<OmniPaxos<HighWaterCommand, MemStorage<HighWaterCommand>>>>;
let network: Arc<MemNetwork<HighWaterCommand>> = Arc::new(MemNetwork::new());
let node_ids = vec![1u64, 2, 3];
let cluster_config = ClusterConfig {
configuration_id: 1,
nodes: node_ids.clone(),
flexible_quorum: None,
};
let mut handles: Vec<(u64, Handle)> = Vec::new();
let mut inboxes: Vec<(u64, mpsc::Receiver<Message<HighWaterCommand>>)> = Vec::new();
for &node_id in &node_ids {
let server_config = ServerConfig {
pid: node_id,
election_tick_timeout: 5,
resend_message_tick_timeout: 5,
..Default::default()
};
let config = OmniPaxosConfig {
cluster_config: cluster_config.clone(),
server_config,
};
let omnipaxos = config
.build(MemStorage::<HighWaterCommand>::new())
.expect("build omnipaxos");
inboxes.push((node_id, network.register(node_id)));
handles.push((node_id, Arc::new(Mutex::new(omnipaxos))));
}
let mut drive_until = |predicate: &dyn Fn() -> bool, max_ticks: usize| {
for _ in 0..max_ticks {
let mut outgoing = Vec::new();
for (_, handle) in &handles {
let mut omnipaxos = handle.lock();
omnipaxos.tick();
outgoing.extend(omnipaxos.outgoing_messages());
}
for message in outgoing {
network.deliver_now(message);
}
for (node_id, inbox) in &mut inboxes {
while let Ok(message) = inbox.try_recv() {
let handle = &handles
.iter()
.find(|(id, _)| id == node_id)
.expect("node present")
.1;
handle.lock().handle_incoming(message);
}
}
if predicate() {
return;
}
}
panic!("predicate did not hold within {max_ticks} ticks");
};
let leader_id = || {
handles
.iter()
.find_map(|(_, handle)| handle.lock().get_current_leader())
};
drive_until(&|| leader_id().is_some(), 500);
let leader = leader_id().expect("leader elected");
let leader_handle = handles
.iter()
.find(|(id, _)| *id == leader)
.expect("leader present")
.1
.clone();
{
let mut omnipaxos = leader_handle.lock();
for at_least in [10u64, 20, 30] {
omnipaxos
.append(HighWaterCommand::Advance(AdvancePayload { at_least }))
.expect("append on leader");
}
}
drive_until(
&|| {
handles
.iter()
.all(|(_, handle)| handle.lock().get_decided_idx() >= 3)
},
500,
);
let recovered_decided = leader_handle.lock().get_decided_idx();
assert!(
recovered_decided >= 3,
"fixture must produce a non-empty decided log",
);
let host = StandaloneHost::new(
leader_handle.clone(),
leader,
Vec::new(),
Duration::from_millis(2),
SnapshotPolicy::disabled(),
DEFAULT_BARRIER_TIMEOUT,
);
assert_eq!(
*host.apply_cursor.lock(),
recovered_decided,
"apply cursor must be seeded at the recovered decided index, not re-drained from 0",
);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn barrier_seq_seed_survives_a_lost_decided_suffix() {
use omnipaxos::ballot_leader_election::Ballot;
use omnipaxos::storage::Storage as _;
use omnipaxos::{ClusterConfig, OmniPaxosConfig, ServerConfig};
use tsoracle_paxos_toolkit::test_fakes::mem_storage::MemStorage;
const MY_NODE: u64 = 1;
const DURABLE_BARRIER_SEQ: u64 = 9;
let mut storage = MemStorage::<HighWaterCommand>::new();
storage
.set_promise(Ballot::with(1, 1, 0, MY_NODE))
.expect("set promise");
storage
.append_entries(vec![
HighWaterCommand::Advance(AdvancePayload { at_least: 100 }),
HighWaterCommand::Barrier {
node: MY_NODE,
seq: DURABLE_BARRIER_SEQ,
},
])
.expect("append durable log");
storage
.set_decided_idx(1)
.expect("persist stale decided idx");
let cluster_config = ClusterConfig {
configuration_id: 1,
nodes: vec![1, 2, 3],
flexible_quorum: None,
};
let server_config = ServerConfig {
pid: MY_NODE,
..Default::default()
};
let omnipaxos = OmniPaxosConfig {
cluster_config,
server_config,
}
.build(storage)
.expect("build omnipaxos over staged storage");
let handle = Arc::new(Mutex::new(omnipaxos));
assert_eq!(
handle.lock().get_decided_idx(),
1,
"fixture must keep the barrier past the recovered decided_idx",
);
let host = StandaloneHost::new(
handle,
MY_NODE,
Vec::new(),
Duration::from_millis(2),
SnapshotPolicy::disabled(),
DEFAULT_BARRIER_TIMEOUT,
);
let seed = host.barrier_seq.load(Ordering::SeqCst);
assert!(
seed >= DURABLE_BARRIER_SEQ,
"seed {seed} must cover the durable barrier seq {DURABLE_BARRIER_SEQ}, \
so the next minted nonce ({}) cannot collide with a recovered barrier",
seed + 1,
);
}
#[test]
fn pending_reconfig_entry_classifies_as_permanent_driver() {
let err = omnipaxos::ProposeErr::PendingReconfigEntry(HighWaterCommand::Barrier {
node: 1,
seq: 1,
});
assert!(
matches!(
classify_append_error(err, ProposedCommand::Barrier),
ConsensusError::PermanentDriver(_)
),
"pending-reconfiguration append rejection must be PermanentDriver",
);
}
#[test]
fn reconfiguration_proposal_errors_classify_as_permanent_driver() {
use omnipaxos::ClusterConfig;
let config = ClusterConfig {
configuration_id: 1,
nodes: vec![1, 2, 3],
flexible_quorum: None,
};
let err = omnipaxos::ProposeErr::PendingReconfigConfig(config, None);
assert!(
matches!(
classify_append_error(err, ProposedCommand::Advance),
ConsensusError::PermanentDriver(_)
),
"unexpected reconfiguration-proposal rejection must be PermanentDriver",
);
}
#[test]
fn classified_append_error_preserves_command_and_reason() {
let err = omnipaxos::ProposeErr::PendingReconfigEntry(HighWaterCommand::Advance(
AdvancePayload { at_least: 7 },
));
let classified = classify_append_error(err, ProposedCommand::Advance);
let message = classified.to_string();
assert!(
message.contains("advance"),
"message must name the rejected command, got: {message}",
);
assert!(
message.contains("reconfigur"),
"message must name the rejection reason, got: {message}",
);
}
#[test]
fn builder_missing_omnipaxos_errors() {
use tsoracle_paxos_toolkit::test_fakes::mem_storage::MemStorage;
let result: Result<StandaloneHost<MemStorage<HighWaterCommand>>, _> =
StandaloneHost::builder().my_node_id(1).build();
assert!(matches!(result, Err(BuilderError::MissingOmnipaxos)));
}
#[test]
fn builder_missing_node_id_errors() {
use omnipaxos::{ClusterConfig, OmniPaxosConfig, ServerConfig};
use tsoracle_paxos_toolkit::test_fakes::mem_storage::MemStorage;
let cluster_config = ClusterConfig {
configuration_id: 1,
nodes: vec![1, 2, 3],
flexible_quorum: None,
};
let server_config = ServerConfig {
pid: 1,
..Default::default()
};
let config = OmniPaxosConfig {
cluster_config,
server_config,
};
let omnipaxos = config
.build(MemStorage::<HighWaterCommand>::new())
.expect("build");
let arc = Arc::new(Mutex::new(omnipaxos));
let result = StandaloneHost::builder().omnipaxos(arc).build();
assert!(matches!(result, Err(BuilderError::MissingNodeId)));
}
}