use core::pin::Pin;
use core::task::{Context, Poll};
use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};
use async_trait::async_trait;
use futures::{Stream, StreamExt};
use tsoracle_consensus::{ConsensusDriver, ConsensusError, LeaderState};
use tsoracle_core::Epoch;
use tsoracle_paxos_toolkit::lifecycle::LeaderEventSubscriber;
use crate::host::PaxosHighWaterHost;
use crate::type_config::encode_epoch;
pub struct PaxosDriver<H>
where
H: PaxosHighWaterHost,
{
host: H,
leader_subscriber: LeaderEventSubscriber,
next_generation: AtomicU64,
active_generation: Arc<AtomicU64>,
}
impl<H> PaxosDriver<H>
where
H: PaxosHighWaterHost,
{
pub fn new(host: H, leader_subscriber: LeaderEventSubscriber) -> Self {
Self {
host,
leader_subscriber,
next_generation: AtomicU64::new(1),
active_generation: Arc::new(AtomicU64::new(0)),
}
}
pub fn host(&self) -> &H {
&self.host
}
fn mint_generation(&self) -> u64 {
loop {
let generation = self.next_generation.fetch_add(1, Ordering::Relaxed);
if generation != 0 {
return generation;
}
}
}
}
#[async_trait]
impl<H> ConsensusDriver for PaxosDriver<H>
where
H: PaxosHighWaterHost,
{
fn leadership_events(&self) -> Pin<Box<dyn Stream<Item = LeaderState> + Send>> {
let my_generation = self.mint_generation();
if let Err(active_generation) = self.active_generation.compare_exchange(
0,
my_generation,
Ordering::AcqRel,
Ordering::Acquire,
) {
#[cfg(feature = "metrics")]
metrics::counter!("tsoracle.leadership_stream.rejected.total").increment(1);
tracing::error!(
rejected_generation = my_generation,
active_generation,
"PaxosDriver::leadership_events called while a leadership stream is already \
active; refusing the concurrent second subscription and returning a \
fail-closed empty stream. Build exactly one Server per consensus driver, and \
fully stop a server (drop its WatchGuard) before starting a replacement."
);
return Box::pin(futures::stream::empty());
}
let lease = StreamLease {
active: Arc::clone(&self.active_generation),
generation: my_generation,
};
let stream = self.leader_subscriber.subscribe();
let omnipaxos = self.host.omnipaxos();
let mapped = stream.map(move |state| match state {
LeaderState::Leader { .. } => {
let ballot = omnipaxos.lock().get_promise();
LeaderState::Leader {
epoch: encode_epoch(ballot),
}
}
other => other,
});
Box::pin(LeasedStream {
_lease: lease,
inner: Box::pin(mapped),
})
}
async fn load_high_water(&self) -> Result<u64, ConsensusError> {
self.host.current_high_water().await
}
async fn persist_high_water(&self, at_least: u64, epoch: Epoch) -> Result<u64, ConsensusError> {
tsoracle_consensus::reject_out_of_range_advance(at_least)?;
let current_epoch = {
let handle = self.host.omnipaxos();
let guard = handle.lock();
encode_epoch(guard.get_promise())
};
if epoch != current_epoch {
return Err(ConsensusError::Fenced {
expected: epoch,
current: current_epoch,
});
}
self.host.submit_advance(at_least).await
}
}
struct StreamLease {
active: Arc<AtomicU64>,
generation: u64,
}
impl Drop for StreamLease {
fn drop(&mut self) {
let _ =
self.active
.compare_exchange(self.generation, 0, Ordering::Release, Ordering::Relaxed);
}
}
struct LeasedStream {
_lease: StreamLease,
inner: Pin<Box<dyn Stream<Item = LeaderState> + Send>>,
}
impl Stream for LeasedStream {
type Item = LeaderState;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
self.inner.as_mut().poll_next(cx)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::log_entry::HighWaterCommand;
use omnipaxos::OmniPaxos;
use parking_lot::Mutex;
use std::sync::Arc;
use tsoracle_paxos_toolkit::lifecycle::leader_event_channel;
use tsoracle_paxos_toolkit::test_fakes::mem_storage::MemStorage;
fn enable_tracing() {
use tracing_subscriber::filter::LevelFilter;
let _ = tracing_subscriber::fmt()
.with_max_level(LevelFilter::TRACE)
.with_test_writer()
.try_init();
}
struct StubHost {
omnipaxos: Arc<Mutex<OmniPaxos<HighWaterCommand, MemStorage<HighWaterCommand>>>>,
}
impl StubHost {
fn new() -> Self {
use omnipaxos::{ClusterConfig, OmniPaxosConfig, ServerConfig};
let cluster_config = ClusterConfig {
configuration_id: 1,
nodes: vec![1, 2, 3],
flexible_quorum: None,
};
let server_config = ServerConfig {
pid: 1,
..Default::default()
};
let config = OmniPaxosConfig {
cluster_config,
server_config,
};
let omnipaxos = config
.build(MemStorage::<HighWaterCommand>::new())
.expect("build");
Self {
omnipaxos: Arc::new(Mutex::new(omnipaxos)),
}
}
}
#[async_trait]
impl PaxosHighWaterHost for StubHost {
type Entry = HighWaterCommand;
type Storage = MemStorage<HighWaterCommand>;
fn omnipaxos(
&self,
) -> Arc<Mutex<OmniPaxos<HighWaterCommand, MemStorage<HighWaterCommand>>>> {
self.omnipaxos.clone()
}
async fn current_high_water(&self) -> Result<u64, ConsensusError> {
Ok(0)
}
async fn submit_advance(&self, at_least: u64) -> Result<u64, ConsensusError> {
Ok(at_least)
}
}
#[tokio::test]
async fn host_accessor_returns_the_wrapped_host() {
let host = StubHost::new();
let omnipaxos_handle = host.omnipaxos();
let (_sender, subscriber) = leader_event_channel();
let driver = PaxosDriver::new(host, subscriber);
assert!(
Arc::ptr_eq(&driver.host().omnipaxos(), &omnipaxos_handle),
"host() must return the same host the driver was constructed with",
);
}
#[tokio::test]
async fn persist_with_stale_epoch_returns_fenced() {
let host = StubHost::new();
let (_sender, subscriber) = leader_event_channel();
let driver = PaxosDriver::new(host, subscriber);
let stale_epoch = Epoch(0xDEAD_BEEF);
let result = driver.persist_high_water(42, stale_epoch).await;
match result {
Err(ConsensusError::Fenced { expected, current }) => {
assert_eq!(expected, stale_epoch);
assert_ne!(current, stale_epoch);
}
other => panic!("expected Fenced, got {other:?}"),
}
}
#[tokio::test]
async fn persist_rejects_out_of_range_before_append() {
use tsoracle_core::PHYSICAL_MS_MAX;
let host = StubHost::new();
let current_ballot = host.omnipaxos().lock().get_promise();
let current_epoch = encode_epoch(current_ballot);
let (_sender, subscriber) = leader_event_channel();
let driver = PaxosDriver::new(host, subscriber);
let err = driver
.persist_high_water(PHYSICAL_MS_MAX + 1, current_epoch)
.await
.expect_err("an out-of-range advance must be rejected, not appended");
assert!(
matches!(err, ConsensusError::PermanentDriver(_)),
"out-of-range advance must classify as PermanentDriver, got {err:?}"
);
assert_eq!(
driver
.persist_high_water(PHYSICAL_MS_MAX, current_epoch)
.await
.expect("the maximum in-range value must persist"),
PHYSICAL_MS_MAX
);
}
#[tokio::test]
async fn persist_with_matching_epoch_calls_submit_advance() {
let host = StubHost::new();
let current_ballot = host.omnipaxos().lock().get_promise();
let current_epoch = encode_epoch(current_ballot);
let (_sender, subscriber) = leader_event_channel();
let driver = PaxosDriver::new(host, subscriber);
let result = driver.persist_high_water(99, current_epoch).await;
assert_eq!(result.unwrap(), 99);
}
#[tokio::test]
async fn leadership_events_resubscribes_after_drop() {
let host = StubHost::new();
let (sender, subscriber) = leader_event_channel();
let driver = PaxosDriver::new(host, subscriber);
sender
.send(LeaderState::Leader { epoch: Epoch(7) })
.unwrap();
let mut first = driver.leadership_events();
assert!(
matches!(first.next().await, Some(LeaderState::Leader { .. })),
"first subscription must yield the current leader state",
);
drop(first);
let mut second = driver.leadership_events();
assert!(
matches!(second.next().await, Some(LeaderState::Leader { .. })),
"a sequential re-subscription after drop must re-derive a live stream",
);
}
#[tokio::test]
async fn leadership_events_second_concurrent_subscription_fails_closed() {
enable_tracing();
let host = StubHost::new();
let (sender, subscriber) = leader_event_channel();
let driver = PaxosDriver::new(host, subscriber);
sender
.send(LeaderState::Leader { epoch: Epoch(7) })
.unwrap();
let mut first = driver.leadership_events();
let mut second = driver.leadership_events();
assert!(
second.next().await.is_none(),
"a concurrent second subscription must fail closed with an empty stream",
);
assert!(
matches!(first.next().await, Some(LeaderState::Leader { .. })),
"the live first stream must keep yielding after a rejected second subscription",
);
}
#[tokio::test]
async fn leadership_events_lease_round_trips_across_generations() {
let host = StubHost::new();
let (sender, subscriber) = leader_event_channel();
let driver = PaxosDriver::new(host, subscriber);
sender
.send(LeaderState::Leader { epoch: Epoch(7) })
.unwrap();
for _ in 0..3 {
let mut stream = driver.leadership_events();
assert!(
matches!(stream.next().await, Some(LeaderState::Leader { .. })),
"each sequential generation must re-derive a live stream",
);
drop(stream);
}
}
#[tokio::test]
async fn leadership_events_never_mints_the_zero_sentinel_on_counter_wrap() {
let host = StubHost::new();
let (sender, subscriber) = leader_event_channel();
let driver = PaxosDriver::new(host, subscriber);
sender
.send(LeaderState::Leader { epoch: Epoch(7) })
.unwrap();
driver.next_generation.store(u64::MAX, Ordering::SeqCst);
drop(driver.leadership_events());
let mut first = driver.leadership_events();
assert!(
matches!(first.next().await, Some(LeaderState::Leader { .. })),
"the post-wrap subscription must still yield a live stream",
);
assert_ne!(
driver.active_generation.load(Ordering::SeqCst),
0,
"a live stream must occupy the lease slot; minting the 0 sentinel \
would leave it visibly free",
);
let mut second = driver.leadership_events();
assert!(
second.next().await.is_none(),
"a concurrent second subscription after counter wrap must fail closed",
);
}
#[tokio::test]
async fn load_high_water_delegates_to_host() {
let host = StubHost::new();
let (_sender, subscriber) = leader_event_channel();
let driver = PaxosDriver::new(host, subscriber);
assert_eq!(driver.load_high_water().await.unwrap(), 0);
}
#[test]
fn stream_lease_drop_frees_slot_only_for_its_own_generation() {
let slot = Arc::new(AtomicU64::new(0));
slot.store(9, Ordering::SeqCst);
let stale = StreamLease {
active: Arc::clone(&slot),
generation: 5,
};
drop(stale);
assert_eq!(
slot.load(Ordering::SeqCst),
9,
"a stale generation's Drop must not free a slot held by a newer generation",
);
let rightful = StreamLease {
active: Arc::clone(&slot),
generation: 9,
};
drop(rightful);
assert_eq!(
slot.load(Ordering::SeqCst),
0,
"the owning generation's Drop must release the slot",
);
}
#[cfg(feature = "metrics")]
mod rejection_metrics {
use super::*;
use metrics_util::{
MetricKind,
debugging::{DebugValue, DebuggingRecorder},
};
type RecordedMetric = (
metrics_util::CompositeKey,
Option<metrics::Unit>,
Option<metrics::SharedString>,
DebugValue,
);
fn counter(snapshot: &[RecordedMetric], name: &str) -> u64 {
for (composite, _u, _d, value) in snapshot {
if composite.kind() == MetricKind::Counter && composite.key().name() == name {
if let DebugValue::Counter(n) = value {
return *n;
}
}
}
0
}
#[tokio::test]
async fn rejected_concurrent_subscription_increments_counter() {
const REJECTED: &str = "tsoracle.leadership_stream.rejected.total";
let host = StubHost::new();
let (sender, subscriber) = leader_event_channel();
let driver = PaxosDriver::new(host, subscriber);
sender
.send(LeaderState::Leader { epoch: Epoch(7) })
.unwrap();
let recorder = DebuggingRecorder::new();
let snapshotter = recorder.snapshotter();
let first = metrics::with_local_recorder(&recorder, || driver.leadership_events());
assert_eq!(
counter(&snapshotter.snapshot().into_vec(), REJECTED),
0,
"a lone live subscription must not count as a rejection",
);
let second = metrics::with_local_recorder(&recorder, || driver.leadership_events());
assert_eq!(
counter(&snapshotter.snapshot().into_vec(), REJECTED),
1,
"a fail-closed rejection must increment the counter",
);
drop(first);
drop(second);
let _resubscribed =
metrics::with_local_recorder(&recorder, || driver.leadership_events());
assert_eq!(
counter(&snapshotter.snapshot().into_vec(), REJECTED),
1,
"a clean sequential re-subscription must not be counted as a rejection",
);
}
}
}