use std::time::Duration;
use parking_lot::Mutex;
use tokio::sync::{
Mutex as AsyncMutex, MutexGuard as AsyncMutexGuard, RwLock, RwLockReadGuard, RwLockWriteGuard,
watch,
};
use tsoracle_core::{
Allocator, CommitOutcome, CoreError, Epoch, PeerEndpoint, PhysicalMs, WindowGrant,
};
use crate::server::ServingState;
pub(crate) struct ServingCore {
allocator: Mutex<Allocator>,
state_tx: watch::Sender<ServingState>,
extension_lock: AsyncMutex<()>,
extension_gate: RwLock<()>,
window_ahead: Duration,
}
impl ServingCore {
pub(crate) fn new(window_ahead: Duration) -> Self {
let (state_tx, _) = watch::channel(ServingState::NotServing {
leader_endpoint: None,
leader_epoch: None,
});
ServingCore {
allocator: Mutex::new(Allocator::new()),
state_tx,
extension_lock: AsyncMutex::new(()),
extension_gate: RwLock::new(()),
window_ahead,
}
}
pub(crate) fn current_max_safe_physical_ms(&self) -> u64 {
let Some(persisted) = self.allocator.lock().committed_high_water() else {
return 0;
};
let window_ms = self.window_ahead.as_millis() as u64;
persisted.saturating_sub(window_ms + 1)
}
pub(crate) fn current_epoch(&self) -> Option<Epoch> {
self.allocator.lock().epoch()
}
pub(crate) fn subscribe(&self) -> watch::Receiver<ServingState> {
self.state_tx.subscribe()
}
pub(crate) fn serving_state(&self) -> ServingState {
self.state_tx.borrow().clone()
}
pub(crate) fn is_serving(&self) -> bool {
matches!(&*self.state_tx.borrow(), ServingState::Serving)
}
pub(crate) fn publish_serving(&self) {
self.state_tx.send_replace(ServingState::Serving);
}
pub(crate) fn publish_not_serving(
&self,
leader_endpoint: Option<PeerEndpoint>,
leader_epoch: Option<Epoch>,
) {
self.state_tx.send_replace(ServingState::NotServing {
leader_endpoint,
leader_epoch,
});
}
pub(crate) fn step_down(
&self,
leader_endpoint: Option<PeerEndpoint>,
leader_epoch: Option<Epoch>,
) {
self.allocator.lock().step_down();
self.state_tx.send_replace(ServingState::NotServing {
leader_endpoint,
leader_epoch,
});
}
pub(crate) fn enter_fencing(&self) {
self.step_down(None, None);
}
pub(crate) fn try_grant(&self, now_ms: u64, count: u32) -> Result<WindowGrant, CoreError> {
self.allocator.lock().try_grant(now_ms, count)
}
pub(crate) fn seed_on_leadership_gained(
&self,
serving_floor: u64,
committed_ceiling: u64,
epoch: Epoch,
) -> Result<(), CoreError> {
let serving_floor = PhysicalMs::try_new(serving_floor)?;
let committed_ceiling = PhysicalMs::try_new(committed_ceiling)?;
self.allocator
.lock()
.become_leader(serving_floor, committed_ceiling, epoch)
}
pub(crate) fn commit_extension(
&self,
actual: u64,
epoch: Epoch,
) -> Result<CommitOutcome, CoreError> {
let actual = PhysicalMs::try_new(actual)?;
self.allocator
.lock()
.try_commit_window_extension(actual, epoch)
}
pub(crate) async fn extension_slot(&self) -> ExtensionSlot<'_> {
let lock = self.extension_lock.lock().await;
ExtensionSlot {
_lock: lock,
core: self,
}
}
pub(crate) async fn drain_barrier_write(&self) -> RwLockWriteGuard<'_, ()> {
self.extension_gate.write().await
}
}
pub(crate) struct ExtensionSlot<'a> {
_lock: AsyncMutexGuard<'a, ()>,
core: &'a ServingCore,
}
impl<'a> ExtensionSlot<'a> {
pub(crate) fn would_grant(&self, now_ms: u64, count: u32) -> bool {
self.core.allocator.lock().would_grant(now_ms, count)
}
pub(crate) fn prepare_extension(
&self,
now_ms: u64,
window_ahead_ms: u64,
) -> Result<(u64, Epoch), CoreError> {
let allocator = self.core.allocator.lock();
let Some(epoch) = allocator.epoch() else {
return Err(CoreError::NotLeader);
};
let now_ms = PhysicalMs::try_new(now_ms)?;
let requested = allocator
.try_prepare_window_extension(now_ms, window_ahead_ms)?
.get();
Ok((requested, epoch))
}
pub(crate) async fn drain_barrier(&self) -> RwLockReadGuard<'a, ()> {
let core = self.core;
core.extension_gate.read().await
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn step_down_clears_and_publishes_not_serving_with_hint() {
let core = ServingCore::new(Duration::from_secs(3));
assert_eq!(core.state_tx.receiver_count(), 0);
let hint = PeerEndpoint::try_from("new-leader:9000").unwrap();
core.step_down(Some(hint.clone()), Some(Epoch(7)));
match core.serving_state() {
ServingState::NotServing {
leader_endpoint,
leader_epoch,
} => {
assert_eq!(leader_endpoint, Some(hint));
assert_eq!(leader_epoch, Some(Epoch(7)));
}
ServingState::Serving => panic!("expected NotServing after step_down"),
}
}
#[tokio::test]
async fn step_down_does_not_take_the_gate() {
let core = ServingCore::new(Duration::from_secs(3));
let slot = core.extension_slot().await;
let _barrier = slot.drain_barrier().await;
core.step_down(None, None);
assert!(matches!(
core.serving_state(),
ServingState::NotServing { .. }
));
}
#[tokio::test]
async fn write_barrier_excludes_held_read_barrier() {
let core = ServingCore::new(Duration::from_secs(3));
let slot = core.extension_slot().await;
let barrier = slot.drain_barrier().await;
assert!(
core.extension_gate.try_write().is_err(),
"write barrier must be blocked while a read barrier is held"
);
drop(barrier);
drop(slot);
assert!(
core.extension_gate.try_write().is_ok(),
"write barrier must be free once the read barrier drains"
);
}
#[tokio::test]
async fn prepare_extension_without_leadership_is_not_leader() {
let core = ServingCore::new(Duration::from_secs(3));
let slot = core.extension_slot().await;
assert!(matches!(
slot.prepare_extension(1, 1_000),
Err(CoreError::NotLeader)
));
assert!(!slot.would_grant(1, 1));
}
#[test]
fn is_serving_tracks_state_in_lockstep_with_serving_state() {
let core = ServingCore::new(Duration::from_secs(3));
assert!(!core.is_serving(), "fresh core is NotServing");
assert!(matches!(
core.serving_state(),
ServingState::NotServing { .. }
));
core.publish_serving();
assert!(
core.is_serving(),
"publish_serving must flip is_serving true"
);
assert!(matches!(core.serving_state(), ServingState::Serving));
core.step_down(
Some(PeerEndpoint::try_from("leader:9000").unwrap()),
Some(Epoch(1)),
);
assert!(!core.is_serving(), "step_down must flip is_serving false");
}
#[test]
fn try_grant_without_leadership_is_not_leader() {
let core = ServingCore::new(Duration::from_secs(3));
assert!(matches!(core.try_grant(1, 1), Err(CoreError::NotLeader)));
}
#[test]
fn seed_then_try_grant_succeeds_at_seeded_epoch() {
let core = ServingCore::new(Duration::from_secs(3));
core.seed_on_leadership_gained(1_000, 5_000, Epoch(3))
.expect("seed must succeed (ceiling >= floor)");
let grant = core.try_grant(1_000, 1).expect("grant must succeed");
assert_eq!(grant.epoch(), Epoch(3));
}
#[test]
fn enter_fencing_clears_allocator_then_publishes_not_serving() {
let core = ServingCore::new(Duration::from_secs(3));
core.seed_on_leadership_gained(1_000, 5_000, Epoch(3))
.expect("seed must succeed (ceiling >= floor)");
assert!(core.try_grant(1_000, 1).is_ok(), "seeded core must grant");
core.enter_fencing();
assert!(
matches!(
core.serving_state(),
ServingState::NotServing {
leader_endpoint: None,
leader_epoch: None,
}
),
"enter_fencing must publish NotServing with no leader hint"
);
assert!(
matches!(core.try_grant(1_000, 1), Err(CoreError::NotLeader)),
"enter_fencing must clear the allocator"
);
}
}