pub mod events;
pub mod state;
pub use events::{LeaderEventSender, LeaderEventStream, SendError, leader_event_channel};
pub use state::{LeadershipState, Peer};
use std::sync::Arc;
use std::time::Duration;
use omnipaxos::OmniPaxos;
use omnipaxos::messages::Message;
use omnipaxos::storage::{Entry, Storage};
use parking_lot::Mutex;
use tokio::sync::{Notify, mpsc, oneshot};
use tokio::task::JoinHandle;
use tokio::time::interval;
use tracing::{debug, error, warn};
use tsoracle_core::Epoch;
const OUTBOUND_QUEUE_CAPACITY: usize = 1024;
#[async_trait::async_trait]
pub trait MessageSink<T: Entry>: Send + Sync + 'static {
async fn send(&self, message: Message<T>);
}
pub struct PaxosRunner<T, S>
where
T: Entry + Send + 'static,
S: Storage<T> + Send + 'static,
{
omnipaxos: Arc<Mutex<OmniPaxos<T, S>>>,
my_node_id: u64,
peers: Vec<Peer>,
tick_interval: Duration,
leader_sender: LeaderEventSender,
leader_stream: Option<LeaderEventStream>,
apply_notify: Arc<Notify>,
handle: Option<JoinHandle<()>>,
sender_handle: Option<JoinHandle<()>>,
shutdown_tx: Option<oneshot::Sender<()>>,
}
impl<T, S> PaxosRunner<T, S>
where
T: Entry + Send + 'static,
S: Storage<T> + Send + 'static,
{
pub fn new(
omnipaxos: Arc<Mutex<OmniPaxos<T, S>>>,
my_node_id: u64,
peers: Vec<Peer>,
tick_interval: Duration,
) -> Self {
let (leader_sender, leader_stream) = leader_event_channel();
Self {
omnipaxos,
my_node_id,
peers,
tick_interval,
leader_sender,
leader_stream: Some(leader_stream),
apply_notify: Arc::new(Notify::new()),
handle: None,
sender_handle: None,
shutdown_tx: None,
}
}
#[must_use]
pub fn take_leader_stream(&mut self) -> Option<LeaderEventStream> {
self.leader_stream.take()
}
#[must_use]
pub fn apply_notify(&self) -> Arc<Notify> {
self.apply_notify.clone()
}
#[must_use]
pub fn omnipaxos(&self) -> Arc<Mutex<OmniPaxos<T, S>>> {
self.omnipaxos.clone()
}
pub fn start<Sink: MessageSink<T>>(&mut self, sink: Arc<Sink>)
where
<T as Entry>::Snapshot: Send,
{
debug_assert!(
self.handle.is_none(),
"PaxosRunner::start called while already running; call stop() first",
);
let omnipaxos = self.omnipaxos.clone();
let my_node_id = self.my_node_id;
let peers = self.peers.clone();
let tick_interval = self.tick_interval;
let leader_sender = self.leader_sender.clone();
let apply_notify = self.apply_notify.clone();
let (shutdown_tx, mut shutdown_rx) = oneshot::channel();
self.shutdown_tx = Some(shutdown_tx);
let (outbound_tx, mut outbound_rx) = mpsc::channel::<Message<T>>(OUTBOUND_QUEUE_CAPACITY);
let sender_handle = tokio::spawn(async move {
while let Some(message) = outbound_rx.recv().await {
sink.send(message).await;
}
});
self.sender_handle = Some(sender_handle);
let handle = tokio::spawn(async move {
let mut ticker = interval(tick_interval);
let mut last_observed_leader: Option<u64> = None;
let mut leader_change_counter: u64 = 0;
loop {
tokio::select! {
_ = ticker.tick() => {
let outgoing = {
let mut op = omnipaxos.lock();
op.tick();
op.outgoing_messages()
};
for message in outgoing {
match outbound_tx.try_send(message) {
Ok(()) => {}
Err(mpsc::error::TrySendError::Full(_)) => {
debug!(
"paxos outbound queue full; dropping message \
(resent next tick)"
);
}
Err(mpsc::error::TrySendError::Closed(_)) => {
warn!("paxos outbound sender task gone; stopping tick loop");
break;
}
}
}
let leader_pid: Option<u64> = {
let op = omnipaxos.lock();
op.get_current_leader()
};
if leader_pid != last_observed_leader {
last_observed_leader = leader_pid;
if leader_pid.is_some() {
leader_change_counter = leader_change_counter.wrapping_add(1);
}
}
let epoch = leader_pid.map(|_| Epoch(u128::from(leader_change_counter)));
let state = LeadershipState::from_omnipaxos(
my_node_id, leader_pid, epoch, &peers,
);
if let Err(err) = leader_sender.send(state.to_consensus()) {
warn!(error = %err, "leader event channel closed");
break;
}
apply_notify.notify_waiters();
}
_ = &mut shutdown_rx => {
debug!("paxos runner received shutdown");
break;
}
}
}
});
self.handle = Some(handle);
}
pub async fn stop(&mut self) {
if let Some(tx) = self.shutdown_tx.take() {
let _ = tx.send(());
}
if let Some(handle) = self.handle.take() {
if let Err(err) = handle.await {
error!(error = ?err, "paxos runner task terminated abnormally");
}
}
if let Some(sender) = self.sender_handle.take() {
sender.abort();
let _ = sender.await;
}
}
}
impl<T, S> Drop for PaxosRunner<T, S>
where
T: Entry + Send + 'static,
S: Storage<T> + Send + 'static,
{
fn drop(&mut self) {
if let Some(tx) = self.shutdown_tx.take() {
let _ = tx.send(());
}
if let Some(sender) = self.sender_handle.take() {
sender.abort();
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use futures::StreamExt;
use omnipaxos::ballot_leader_election::Ballot;
use omnipaxos::storage::{Snapshot, StopSign, StorageResult};
use omnipaxos::{ClusterConfig, OmniPaxosConfig, ServerConfig};
use tokio::time::sleep;
use tsoracle_consensus::LeaderState;
#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
struct TestEntry;
impl Entry for TestEntry {
type Snapshot = TestSnapshot;
}
#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
struct TestSnapshot;
impl Snapshot<TestEntry> for TestSnapshot {
fn create(_: &[TestEntry]) -> Self {
Self
}
fn merge(&mut self, _: Self) {}
fn use_snapshots() -> bool {
false
}
}
#[derive(Default)]
struct StubStorage {
promise: Option<Ballot>,
accepted_round: Option<Ballot>,
decided_idx: u64,
compacted_idx: u64,
snapshot: Option<TestSnapshot>,
stopsign: Option<StopSign>,
}
impl Storage<TestEntry> for StubStorage {
fn append_entry(&mut self, _: TestEntry) -> StorageResult<u64> {
Ok(0)
}
fn append_entries(&mut self, _: Vec<TestEntry>) -> StorageResult<u64> {
Ok(0)
}
fn append_on_prefix(&mut self, _: u64, _: Vec<TestEntry>) -> StorageResult<u64> {
Ok(0)
}
fn get_entries(&self, _: u64, _: u64) -> StorageResult<Vec<TestEntry>> {
Ok(Vec::new())
}
fn get_log_len(&self) -> StorageResult<u64> {
Ok(0)
}
fn get_suffix(&self, _: u64) -> StorageResult<Vec<TestEntry>> {
Ok(Vec::new())
}
fn set_promise(&mut self, ballot: Ballot) -> StorageResult<()> {
self.promise = Some(ballot);
Ok(())
}
fn get_promise(&self) -> StorageResult<Option<Ballot>> {
Ok(self.promise)
}
fn set_accepted_round(&mut self, ballot: Ballot) -> StorageResult<()> {
self.accepted_round = Some(ballot);
Ok(())
}
fn get_accepted_round(&self) -> StorageResult<Option<Ballot>> {
Ok(self.accepted_round)
}
fn set_decided_idx(&mut self, idx: u64) -> StorageResult<()> {
self.decided_idx = idx;
Ok(())
}
fn get_decided_idx(&self) -> StorageResult<u64> {
Ok(self.decided_idx)
}
fn trim(&mut self, _: u64) -> StorageResult<()> {
Ok(())
}
fn set_compacted_idx(&mut self, idx: u64) -> StorageResult<()> {
self.compacted_idx = idx;
Ok(())
}
fn get_compacted_idx(&self) -> StorageResult<u64> {
Ok(self.compacted_idx)
}
fn set_snapshot(&mut self, snapshot: Option<TestSnapshot>) -> StorageResult<()> {
self.snapshot = snapshot;
Ok(())
}
fn get_snapshot(&self) -> StorageResult<Option<TestSnapshot>> {
Ok(self.snapshot.clone())
}
fn set_stopsign(&mut self, stopsign: Option<StopSign>) -> StorageResult<()> {
self.stopsign = stopsign;
Ok(())
}
fn get_stopsign(&self) -> StorageResult<Option<StopSign>> {
Ok(self.stopsign.clone())
}
}
struct NoopSink;
#[async_trait::async_trait]
impl MessageSink<TestEntry> for NoopSink {
async fn send(&self, _message: Message<TestEntry>) {}
}
#[derive(Default)]
struct BlockingSink {
entered: Arc<std::sync::atomic::AtomicUsize>,
}
#[async_trait::async_trait]
impl MessageSink<TestEntry> for BlockingSink {
async fn send(&self, _message: Message<TestEntry>) {
self.entered
.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
std::future::pending::<()>().await;
}
}
async fn wait_until(deadline: Duration, cond: impl Fn() -> bool) -> bool {
let start = std::time::Instant::now();
while start.elapsed() < deadline {
if cond() {
return true;
}
sleep(Duration::from_millis(2)).await;
}
cond()
}
fn build_omnipaxos(node_id: u64) -> Arc<Mutex<OmniPaxos<TestEntry, StubStorage>>> {
let cluster_config = ClusterConfig {
configuration_id: 1,
nodes: vec![1, 2, 3],
flexible_quorum: None,
};
let server_config = ServerConfig {
pid: node_id,
..Default::default()
};
let op_config = OmniPaxosConfig {
cluster_config,
server_config,
};
let op = op_config
.build(StubStorage::default())
.expect("build omnipaxos");
Arc::new(Mutex::new(op))
}
fn build_runner(node_id: u64) -> PaxosRunner<TestEntry, StubStorage> {
PaxosRunner::new(
build_omnipaxos(node_id),
node_id,
vec![],
Duration::from_millis(5),
)
}
#[tokio::test]
async fn take_leader_stream_is_once_only() {
let mut runner = build_runner(1);
assert!(runner.take_leader_stream().is_some());
assert!(runner.take_leader_stream().is_none());
}
#[tokio::test]
async fn omnipaxos_handle_is_shared() {
let omnipaxos = build_omnipaxos(1);
let runner = PaxosRunner::new(omnipaxos.clone(), 1, vec![], Duration::from_millis(5));
assert!(Arc::ptr_eq(&omnipaxos, &runner.omnipaxos()));
}
#[tokio::test]
async fn apply_notify_handle_is_shared() {
let runner = build_runner(1);
let first = runner.apply_notify();
let second = runner.apply_notify();
assert!(Arc::ptr_eq(&first, &second));
}
#[tokio::test]
async fn stop_without_start_is_noop() {
let mut runner = build_runner(1);
runner.stop().await;
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn runner_ticks_emit_unknown_state_under_dead_network() {
let mut runner = build_runner(1);
let mut stream = runner.take_leader_stream().expect("stream").into_pin();
runner.start(Arc::new(NoopSink));
assert_eq!(stream.next().await, Some(LeaderState::Unknown));
sleep(Duration::from_millis(30)).await;
runner.stop().await;
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn apply_notify_fires_on_each_tick() {
let mut runner = build_runner(1);
let notify = runner.apply_notify();
runner.start(Arc::new(NoopSink));
let woke = tokio::time::timeout(Duration::from_millis(50), notify.notified()).await;
assert!(
woke.is_ok(),
"apply_notify should fire within 50ms of starting"
);
runner.stop().await;
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn hung_sink_does_not_starve_apply_notify() {
let mut runner = build_runner(1);
let sink = Arc::new(BlockingSink::default());
let entered = sink.entered.clone();
let notify = runner.apply_notify();
runner.start(sink);
assert!(
wait_until(Duration::from_millis(200), || entered
.load(std::sync::atomic::Ordering::SeqCst)
>= 1)
.await,
"expected at least one send to be attempted",
);
let woke = tokio::time::timeout(Duration::from_millis(200), notify.notified()).await;
assert!(
woke.is_ok(),
"apply_notify must fire even while every send is wedged",
);
runner.stop().await;
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn hung_sink_does_not_deadlock_stop() {
let mut runner = build_runner(1);
let sink = Arc::new(BlockingSink::default());
let entered = sink.entered.clone();
runner.start(sink);
assert!(
wait_until(Duration::from_millis(200), || entered
.load(std::sync::atomic::Ordering::SeqCst)
>= 1)
.await,
"expected at least one send to be attempted before stop",
);
let stopped = tokio::time::timeout(Duration::from_secs(1), runner.stop()).await;
assert!(
stopped.is_ok(),
"stop() must complete even when a send is wedged",
);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn tick_loop_exits_when_leader_stream_dropped() {
let mut runner = build_runner(1);
let stream = runner.take_leader_stream().expect("stream");
drop(stream);
runner.start(Arc::new(NoopSink));
let exited = wait_until(Duration::from_secs(1), || {
runner
.handle
.as_ref()
.map(JoinHandle::is_finished)
.unwrap_or(false)
})
.await;
assert!(
exited,
"tick loop must exit once the leader-event stream is dropped",
);
runner.stop().await;
}
}