use std::sync::Arc;
use std::sync::atomic::{AtomicU8, Ordering};
use omnipaxos::OmniPaxos;
use omnipaxos::storage::Storage;
use parking_lot::Mutex;
use tokio::sync::Notify;
use tokio::task::JoinHandle;
use tracing::warn;
use crate::log_entry::HighWaterCommand;
use crate::snapshot_policy::SnapshotPolicy;
use crate::state_machine::{ApplyState, drain_decided_into, maybe_snapshot};
const APPLY_NEVER_SPAWNED: u8 = 0;
const APPLY_ALIVE: u8 = 1;
const APPLY_DEAD: u8 = 2;
#[derive(Clone)]
pub(crate) struct ApplyEngine {
apply_state: ApplyState,
policy: Arc<Mutex<SnapshotPolicy>>,
apply_liveness: Arc<AtomicU8>,
}
impl ApplyEngine {
pub(crate) fn new(policy: SnapshotPolicy) -> Self {
Self {
apply_state: ApplyState::new(),
policy: Arc::new(Mutex::new(policy)),
apply_liveness: Arc::new(AtomicU8::new(APPLY_NEVER_SPAWNED)),
}
}
pub(crate) fn apply_task_died(&self) -> bool {
self.apply_liveness.load(Ordering::Acquire) == APPLY_DEAD
}
pub(crate) fn recover<S>(
&self,
omnipaxos: &Arc<Mutex<OmniPaxos<HighWaterCommand, S>>>,
cursor: &mut u64,
) where
S: Storage<HighWaterCommand> + Send + 'static,
{
drain_decided_into(omnipaxos, cursor, &self.apply_state);
self.policy.lock().rebase(*cursor);
}
pub(crate) fn apply_step<S>(
&self,
omnipaxos: &Arc<Mutex<OmniPaxos<HighWaterCommand, S>>>,
cursor: &mut u64,
) where
S: Storage<HighWaterCommand> + Send + 'static,
{
let decided_idx = drain_decided_into(omnipaxos, cursor, &self.apply_state);
let mut policy = self.policy.lock();
maybe_snapshot(omnipaxos, &mut policy, decided_idx);
}
pub(crate) fn high_water(&self) -> u64 {
self.apply_state.high_water()
}
pub(crate) fn applied_barrier_seq(&self, node: u64) -> u64 {
self.apply_state.applied_barrier_seq(node)
}
pub(crate) fn apply_notifier(&self) -> Arc<Notify> {
self.apply_state.apply_notifier()
}
pub(crate) fn spawn<S>(
&self,
apply_notify: Arc<Notify>,
omnipaxos: Arc<Mutex<OmniPaxos<HighWaterCommand, S>>>,
cursor: Arc<Mutex<u64>>,
) -> ApplyTask
where
S: Storage<HighWaterCommand> + Send + 'static,
<HighWaterCommand as omnipaxos::storage::Entry>::Snapshot: Send,
{
let shutdown = Arc::new(Notify::new());
let task_shutdown = shutdown.clone();
let engine = self.clone();
self.apply_liveness.store(APPLY_ALIVE, Ordering::Release);
let death_guard = ApplyDeathGuard {
liveness: self.apply_liveness.clone(),
waiters: self.apply_notifier(),
};
let handle = tokio::spawn(async move {
let _death_guard = death_guard;
loop {
tokio::select! {
_ = apply_notify.notified() => {
{
let mut cursor = cursor.lock();
engine.apply_step(&omnipaxos, &mut cursor);
}
tsoracle_yieldpoint::yieldpoint!(
"standalone_host::apply_task::between_iterations"
);
}
_ = task_shutdown.notified() => {
break;
}
}
}
});
ApplyTask { handle, shutdown }
}
}
struct ApplyDeathGuard {
liveness: Arc<AtomicU8>,
waiters: Arc<Notify>,
}
impl Drop for ApplyDeathGuard {
fn drop(&mut self) {
self.liveness.store(APPLY_DEAD, Ordering::Release);
self.waiters.notify_waiters();
}
}
pub(crate) struct ApplyTask {
handle: JoinHandle<()>,
shutdown: Arc<Notify>,
}
impl ApplyTask {
pub(crate) async fn stop(self) {
self.shutdown.notify_one();
if let Err(err) = self.handle.await {
warn!(error = ?err, "paxos driver apply task terminated abnormally");
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn new_engine_starts_at_zero_high_water() {
let engine = ApplyEngine::new(SnapshotPolicy::disabled());
assert_eq!(engine.high_water(), 0);
}
#[test]
fn applied_barrier_seq_is_zero_for_unseen_node() {
let engine = ApplyEngine::new(SnapshotPolicy::disabled());
assert_eq!(engine.applied_barrier_seq(7), 0);
}
#[test]
fn apply_notifier_is_stable_across_calls() {
let engine = ApplyEngine::new(SnapshotPolicy::disabled());
assert!(Arc::ptr_eq(
&engine.apply_notifier(),
&engine.apply_notifier()
));
}
#[test]
fn clone_shares_apply_state() {
let engine = ApplyEngine::new(SnapshotPolicy::disabled());
let clone = engine.clone();
assert!(Arc::ptr_eq(
&engine.apply_notifier(),
&clone.apply_notifier()
));
}
}