use std::collections::{BTreeSet, HashMap, HashSet, VecDeque};
use std::sync::Arc;
use std::time::{Duration, Instant};
use tokio::sync::Notify;
#[derive(Debug, Clone)]
pub struct BackoffPolicy {
base: Duration,
max: Duration,
}
impl BackoffPolicy {
pub const fn new(base: Duration, max: Duration) -> Self {
Self { base, max }
}
pub fn delay_for(&self, error_count: u32) -> Duration {
#[allow(clippy::cast_possible_truncation)] let ms = self.base.as_millis() as u64;
Duration::from_millis(ms.saturating_mul(1u64 << error_count.min(31))).min(self.max)
}
}
const DEFAULT_BACKOFF: BackoffPolicy =
BackoffPolicy::new(Duration::from_millis(100), Duration::from_secs(30));
const SWEEP_INTERVAL: Duration = Duration::from_mins(1);
const ERROR_STATE_TTL: Duration = Duration::from_mins(5);
pub enum AckResult {
Consumed,
Redirtied,
}
struct ErrorEntry {
error_count: u32,
last_update: Instant,
}
pub struct PartitionScheduler {
pending: BTreeSet<(Instant, i64)>,
pending_ids: HashSet<i64>,
claimed: HashSet<i64>,
redirtied: HashSet<i64>,
error_state: HashMap<i64, ErrorEntry>,
last_sweep: Instant,
backoff: BackoffPolicy,
}
impl PartitionScheduler {
fn new() -> Self {
Self {
pending: BTreeSet::new(),
pending_ids: HashSet::new(),
claimed: HashSet::new(),
redirtied: HashSet::new(),
error_state: HashMap::new(),
last_sweep: Instant::now(),
backoff: DEFAULT_BACKOFF,
}
}
fn absorb(&mut self, entries: impl Iterator<Item = (i64, Instant)>) {
for (pid, dirty_since) in entries {
if self.pending_ids.contains(&pid) {
continue;
}
if self.claimed.contains(&pid) {
self.redirtied.insert(pid);
continue;
}
self.pending.insert((dirty_since, pid));
self.pending_ids.insert(pid);
}
}
fn maybe_sweep_errors(&mut self, now: Instant) {
if now.duration_since(self.last_sweep) >= SWEEP_INTERVAL {
self.last_sweep = now;
self.error_state
.retain(|_, entry| now.duration_since(entry.last_update) < ERROR_STATE_TTL);
}
}
fn pop(&mut self, now: Instant) -> Option<(i64, Instant)> {
let &(dirty_since, pid) = self.pending.first()?;
if dirty_since > now {
return None;
}
self.pending.remove(&(dirty_since, pid));
self.pending_ids.remove(&pid);
self.claimed.insert(pid);
Some((pid, dirty_since))
}
fn is_claimed(&self, pid: i64) -> bool {
self.claimed.contains(&pid)
}
fn has_ready_work(&self, now: Instant) -> bool {
self.pending
.first()
.is_some_and(|&(dirty_since, _)| dirty_since <= now)
}
fn ack_processed(&mut self, pid: i64) -> AckResult {
self.claimed.remove(&pid);
self.error_state.remove(&pid);
if self.redirtied.remove(&pid) {
self.reinsert(pid, Instant::now());
AckResult::Redirtied
} else {
AckResult::Consumed
}
}
fn ack_requeue(&mut self, pid: i64, dirty_since: Instant) {
self.claimed.remove(&pid);
self.redirtied.remove(&pid);
self.reinsert(pid, dirty_since);
}
fn ack_error(&mut self, pid: i64) {
self.claimed.remove(&pid);
self.redirtied.remove(&pid);
let now = Instant::now();
let entry = self.error_state.entry(pid).or_insert(ErrorEntry {
error_count: 0,
last_update: now,
});
entry.error_count += 1;
entry.last_update = now;
let delay = self.backoff.delay_for(entry.error_count);
self.reinsert(pid, now + delay);
}
fn reinsert(&mut self, pid: i64, dirty_since: Instant) {
self.pending.insert((dirty_since, pid));
self.pending_ids.insert(pid);
}
}
const INBOX_COALESCE: Duration = Duration::from_millis(10);
struct Inbox {
queue: VecDeque<(i64, Instant)>,
last_push: HashMap<i64, Instant>,
}
impl Inbox {
fn new() -> Self {
Self {
queue: VecDeque::new(),
last_push: HashMap::new(),
}
}
fn try_push(&mut self, pid: i64, now: Instant) -> bool {
if let Some(&last) = self.last_push.get(&pid)
&& now.duration_since(last) < INBOX_COALESCE
{
return false;
}
self.last_push.insert(pid, now);
self.queue.push_back((pid, now));
true
}
fn force_push(&mut self, pid: i64, now: Instant) {
self.last_push.insert(pid, now);
self.queue.push_back((pid, now));
}
fn drain(&mut self) -> VecDeque<(i64, Instant)> {
self.last_push.clear();
std::mem::take(&mut self.queue)
}
}
pub struct SharedPrioritizer {
inbox: std::sync::Mutex<Inbox>,
scheduler: std::sync::Mutex<PartitionScheduler>,
notify: Arc<Notify>,
}
impl SharedPrioritizer {
#[must_use]
pub fn new() -> Self {
Self {
inbox: std::sync::Mutex::new(Inbox::new()),
scheduler: std::sync::Mutex::new(PartitionScheduler::new()),
notify: Arc::new(Notify::new()),
}
}
pub fn notifier(&self) -> Arc<Notify> {
Arc::clone(&self.notify)
}
pub(crate) fn wake_sequencers(&self) {
self.notify.notify_one();
}
pub fn push_dirty(&self, pid: i64) {
self.push_dirty_impl(pid, Instant::now());
}
#[cfg(test)]
fn push_dirty_at(&self, pid: i64, dirty_since: Instant) {
self.push_dirty_impl(pid, dirty_since);
}
fn push_dirty_impl(&self, pid: i64, dirty_since: Instant) {
let mut inbox = self
.inbox
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner);
let mut accepted = inbox.try_push(pid, dirty_since);
if !accepted {
let sched = self
.scheduler
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner);
if sched.is_claimed(pid) {
inbox.force_push(pid, dirty_since);
accepted = true;
}
}
drop(inbox);
if accepted {
self.notify.notify_one();
}
}
pub fn take(self: &Arc<Self>) -> Option<PartitionGuard> {
let drained = {
let mut inbox = self
.inbox
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner);
inbox.drain()
};
let mut sched = self
.scheduler
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner);
sched.absorb(drained.into_iter());
let now = Instant::now();
sched.maybe_sweep_errors(now);
let (pid, dirty_since) = sched.pop(now)?;
let has_more = sched.has_ready_work(now);
drop(sched);
if has_more {
self.notify.notify_one();
}
Some(PartitionGuard {
pid,
dirty_since,
prioritizer: Arc::clone(self),
acked: false,
})
}
}
pub struct PartitionGuard {
pid: i64,
dirty_since: Instant,
prioritizer: Arc<SharedPrioritizer>,
acked: bool,
}
impl PartitionGuard {
pub fn partition_id(&self) -> i64 {
self.pid
}
pub fn processed(mut self) {
self.acked = true;
let mut sched = self
.prioritizer
.scheduler
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner);
let result = sched.ack_processed(self.pid);
let should_notify =
matches!(result, AckResult::Redirtied) || sched.has_ready_work(Instant::now());
drop(sched);
if should_notify {
self.prioritizer.notify.notify_one();
}
}
pub fn skipped(mut self) {
self.acked = true;
self.requeue();
}
pub fn error(mut self) {
self.acked = true;
let mut sched = self
.prioritizer
.scheduler
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner);
sched.ack_error(self.pid);
}
fn requeue(&self) {
let mut sched = self
.prioritizer
.scheduler
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner);
sched.ack_requeue(self.pid, self.dirty_since);
drop(sched);
self.prioritizer.notify.notify_one();
}
}
impl Drop for PartitionGuard {
fn drop(&mut self) {
if !self.acked {
self.requeue();
}
}
}
#[cfg(test)]
#[cfg_attr(coverage_nightly, coverage(off))]
mod tests {
use super::*;
use std::collections::HashSet;
fn make_shared() -> Arc<SharedPrioritizer> {
Arc::new(SharedPrioritizer::new())
}
#[test]
fn backoff_linear_progression() {
let policy = BackoffPolicy::new(Duration::from_millis(100), Duration::from_secs(30));
assert_eq!(policy.delay_for(1), Duration::from_millis(200));
assert_eq!(policy.delay_for(2), Duration::from_millis(400));
assert_eq!(policy.delay_for(3), Duration::from_millis(800));
}
#[test]
fn backoff_cap_at_max() {
let policy = BackoffPolicy::new(Duration::from_millis(100), Duration::from_secs(30));
assert_eq!(policy.delay_for(25), Duration::from_secs(30));
}
#[test]
fn backoff_overflow_safety() {
let policy = BackoffPolicy::new(Duration::from_millis(100), Duration::from_secs(30));
assert_eq!(policy.delay_for(31), Duration::from_secs(30));
assert_eq!(policy.delay_for(u32::MAX), Duration::from_secs(30));
}
#[test]
fn sched_push_absorb_pop_roundtrip() {
let mut sched = PartitionScheduler::new();
let now = Instant::now();
sched.absorb([(42, now)].into_iter());
let (pid, ds) = sched.pop(now).unwrap();
assert_eq!(pid, 42);
assert_eq!(ds, now);
}
#[test]
fn sched_pop_returns_oldest_first() {
let mut sched = PartitionScheduler::new();
let t0 = Instant::now();
let t1 = t0 + Duration::from_millis(1);
let t2 = t0 + Duration::from_millis(2);
sched.absorb([(30, t0), (10, t1), (20, t2)].into_iter());
assert_eq!(sched.pop(t2).unwrap().0, 30);
assert_eq!(sched.pop(t2).unwrap().0, 10);
assert_eq!(sched.pop(t2).unwrap().0, 20);
}
#[test]
fn sched_pop_skips_future_timestamps() {
let mut sched = PartitionScheduler::new();
let now = Instant::now();
sched.absorb([(42, now + Duration::from_secs(10))].into_iter());
assert!(sched.pop(now).is_none());
}
#[test]
fn sched_ack_error_applies_exponential_backoff() {
let mut sched = PartitionScheduler::new();
let now = Instant::now();
sched.absorb([(42, now)].into_iter());
sched.pop(now);
sched.ack_error(42);
assert!(sched.pop(now).is_none());
assert!(sched.pop(now + Duration::from_millis(250)).is_some());
}
#[test]
fn sched_ack_error_cooldown_preserved_despite_absorb() {
let mut sched = PartitionScheduler::new();
let now = Instant::now();
sched.absorb([(42, now)].into_iter());
sched.pop(now);
sched.ack_error(42);
sched.absorb([(42, now)].into_iter());
assert!(sched.pop(now).is_none(), "cooldown must be preserved");
}
#[test]
fn sched_absorb_dedupes_against_pending_ids() {
let mut sched = PartitionScheduler::new();
let now = Instant::now();
sched.absorb([(42, now), (42, now)].into_iter());
sched.pop(now).unwrap();
assert!(sched.pop(now).is_none(), "should have only one entry");
}
#[test]
fn sched_absorb_inserts_redirtied_when_claimed() {
let mut sched = PartitionScheduler::new();
let now = Instant::now();
sched.absorb([(42, now)].into_iter());
sched.pop(now); sched.absorb([(42, now)].into_iter()); assert!(sched.redirtied.contains(&42));
}
#[test]
fn sched_ack_processed_requeues_if_redirtied() {
let mut sched = PartitionScheduler::new();
let now = Instant::now();
sched.absorb([(42, now)].into_iter());
sched.pop(now);
sched.absorb([(42, now)].into_iter()); assert!(matches!(sched.ack_processed(42), AckResult::Redirtied));
assert!(sched.pop(Instant::now()).is_some());
}
#[test]
fn sched_ack_processed_clears_error_state() {
let mut sched = PartitionScheduler::new();
let now = Instant::now();
sched.absorb([(42, now)].into_iter());
sched.pop(now);
sched.ack_error(42); assert!(sched.error_state.contains_key(&42));
let later = Instant::now() + Duration::from_millis(300);
sched.pop(later).unwrap();
sched.ack_processed(42);
assert!(!sched.error_state.contains_key(&42));
}
#[test]
fn sched_ack_requeue_restores_original_priority() {
let mut sched = PartitionScheduler::new();
let t0 = Instant::now();
let t1 = t0 + Duration::from_millis(10);
sched.absorb([(10, t0), (20, t1)].into_iter());
let (pid, ds) = sched.pop(t1).unwrap();
assert_eq!(pid, 10);
sched.ack_requeue(pid, ds);
assert_eq!(sched.pop(t1).unwrap().0, 10);
}
#[test]
fn sched_sweep_removes_stale_error_entries() {
let mut sched = PartitionScheduler::new();
sched.error_state.insert(
42,
ErrorEntry {
error_count: 3,
last_update: Instant::now().checked_sub(Duration::from_mins(10)).unwrap(),
},
);
sched.last_sweep = Instant::now()
.checked_sub(SWEEP_INTERVAL)
.unwrap()
.checked_sub(Duration::from_secs(1))
.unwrap();
sched.maybe_sweep_errors(Instant::now());
assert!(!sched.error_state.contains_key(&42));
}
#[test]
fn take_empty_returns_none() {
let sp = make_shared();
assert!(sp.take().is_none());
}
#[test]
fn take_returns_guard_with_correct_pid() {
let sp = make_shared();
sp.push_dirty(42);
let guard = sp.take().expect("should return a guard");
assert_eq!(guard.partition_id(), 42);
guard.processed();
}
#[test]
fn take_returns_distinct_pids() {
let sp = make_shared();
sp.push_dirty(10);
sp.push_dirty(20);
let g1 = sp.take().unwrap();
let g2 = sp.take().unwrap();
assert_ne!(g1.partition_id(), g2.partition_id());
g1.processed();
g2.processed();
}
#[test]
fn processed_consumes_signal() {
let sp = make_shared();
sp.push_dirty(42);
let guard = sp.take().unwrap();
assert_eq!(guard.partition_id(), 42);
guard.processed();
assert!(sp.take().is_none());
}
#[test]
fn processed_resets_error_state() {
let sp = make_shared();
sp.push_dirty(10);
let g = sp.take().unwrap();
g.error();
{
let mut sched = sp.scheduler.lock().unwrap();
let entry = *sched.pending.iter().find(|(_, pid)| *pid == 10).unwrap();
sched.pending.remove(&entry);
sched.pending.insert((
Instant::now().checked_sub(Duration::from_secs(1)).unwrap(),
10,
));
}
let g2 = sp.take().unwrap();
g2.processed();
let sched = sp.scheduler.lock().unwrap();
assert!(!sched.error_state.contains_key(&10));
}
#[test]
fn skipped_preserves_signal() {
let sp = make_shared();
sp.push_dirty(42);
let guard = sp.take().unwrap();
guard.skipped();
let guard2 = sp.take().expect("should reappear after skip");
assert_eq!(guard2.partition_id(), 42);
guard2.processed();
}
#[test]
fn skipped_retains_original_priority() {
let sp = make_shared();
let now = Instant::now();
let t0 = now.checked_sub(Duration::from_secs(2)).unwrap();
sp.push_dirty_at(10, t0);
sp.push_dirty_at(20, t0 + Duration::from_secs(1));
let g1 = sp.take().unwrap();
assert_eq!(g1.partition_id(), 10);
g1.skipped();
let g2 = sp.take().unwrap();
assert_eq!(
g2.partition_id(),
10,
"skipped partition should retain priority"
);
g2.processed();
let g3 = sp.take().unwrap();
assert_eq!(g3.partition_id(), 20);
g3.processed();
}
#[test]
fn error_defers_partition() {
let sp = make_shared();
sp.push_dirty(42);
let guard = sp.take().unwrap();
guard.error();
assert!(
sp.take().is_none(),
"deferred partition should not be ready"
);
let sched = sp.scheduler.lock().unwrap();
assert!(sched.pending_ids.contains(&42));
}
#[test]
fn error_cooldown_cap_at_30s() {
let sp = make_shared();
let now = Instant::now();
for _ in 0..25 {
sp.push_dirty(10);
{
let mut sched = sp.scheduler.lock().unwrap();
let mut inbox = sp.inbox.lock().unwrap();
for (pid, ts) in inbox.drain() {
if !sched.pending_ids.contains(&pid) && !sched.claimed.contains(&pid) {
sched.pending.insert((ts, pid));
sched.pending_ids.insert(pid);
}
}
if let Some(&(ts, pid)) = sched.pending.first()
&& pid == 10
{
sched.pending.remove(&(ts, pid));
sched.pending_ids.remove(&pid);
sched
.pending
.insert((now.checked_sub(Duration::from_secs(1)).unwrap(), pid));
sched.pending_ids.insert(pid);
}
}
if let Some(g) = sp.take() {
g.error();
}
}
let sched = sp.scheduler.lock().unwrap();
let entry = sched.pending.iter().find(|(_, pid)| *pid == 10);
if let Some(&(dirty_since, _)) = entry {
assert!(
dirty_since <= now + Duration::from_secs(30) + Duration::from_millis(500),
"cooldown should be capped at 30s"
);
}
}
#[test]
fn error_healthy_partition_served_before_deferred() {
let sp = make_shared();
sp.push_dirty(42);
let g = sp.take().unwrap();
g.error();
sp.push_dirty(10);
let guard = sp.take().unwrap();
assert_eq!(
guard.partition_id(),
10,
"healthy partition should be served first"
);
guard.processed();
let sched = sp.scheduler.lock().unwrap();
assert!(sched.pending_ids.contains(&42));
}
#[test]
fn dropped_guard_preserves_signal() {
let sp = make_shared();
sp.push_dirty(42);
{
let _guard = sp.take().unwrap();
}
let guard2 = sp.take().expect("should reappear after drop");
assert_eq!(guard2.partition_id(), 42);
guard2.processed();
}
#[test]
fn push_dirty_dedup_in_pending() {
let sp = make_shared();
for _ in 0..5 {
sp.push_dirty(42);
}
let guard = sp.take().unwrap();
assert_eq!(guard.partition_id(), 42);
guard.processed();
assert!(sp.take().is_none());
}
#[test]
fn push_dirty_dedup_for_claimed() {
let sp = make_shared();
sp.push_dirty(42);
let guard = sp.take().unwrap();
assert_eq!(guard.partition_id(), 42);
sp.push_dirty(42);
assert!(sp.take().is_none(), "claimed partition should be deduped");
guard.processed();
}
#[test]
fn redirty_while_claimed_survives_another_workers_take() {
let sp = make_shared();
sp.push_dirty(42);
let guard_a = sp.take().unwrap();
assert_eq!(guard_a.partition_id(), 42);
sp.push_dirty(42);
assert!(sp.take().is_none());
guard_a.processed();
let guard = sp.take().expect(
"re-dirty signal lost: pid=42 was dropped by dedup during \
take() while claimed, then processed() removed it from claimed \
\u{2014} partition is now invisible until cold reconciler",
);
assert_eq!(guard.partition_id(), 42);
guard.processed();
}
#[test]
fn oldest_dirty_served_first() {
let sp = make_shared();
let now = Instant::now();
let t0 = now.checked_sub(Duration::from_secs(3)).unwrap();
sp.push_dirty_at(30, t0);
sp.push_dirty_at(10, t0 + Duration::from_secs(1));
sp.push_dirty_at(20, t0 + Duration::from_secs(2));
let g1 = sp.take().unwrap();
assert_eq!(g1.partition_id(), 30, "oldest dirty should be first");
g1.processed();
let g2 = sp.take().unwrap();
assert_eq!(g2.partition_id(), 10);
g2.processed();
let g3 = sp.take().unwrap();
assert_eq!(g3.partition_id(), 20);
g3.processed();
}
#[test]
fn hundred_partitions_all_served() {
let sp = make_shared();
for i in 0..100 {
sp.push_dirty(i);
}
let mut taken = Vec::new();
while let Some(g) = sp.take() {
taken.push(g.partition_id());
g.processed();
}
assert_eq!(taken.len(), 100);
let unique: HashSet<i64> = taken.iter().copied().collect();
assert_eq!(unique.len(), 100);
}
#[test]
fn coalesced_push_still_notifies() {
let sp = make_shared();
let notify = sp.notifier();
sp.push_dirty(10);
sp.push_dirty(10);
let rt = tokio::runtime::Builder::new_current_thread()
.enable_time()
.build()
.unwrap();
rt.block_on(async {
tokio::time::timeout(Duration::from_millis(10), notify.notified())
.await
.expect("notify should have a stored permit from push_dirty");
});
}
#[test]
fn push_after_drain_not_coalesced() {
let sp = make_shared();
sp.push_dirty(10);
let g = sp.take().unwrap();
assert_eq!(g.partition_id(), 10);
g.processed();
sp.push_dirty(10);
let g2 = sp.take().unwrap();
assert_eq!(g2.partition_id(), 10);
g2.processed();
}
#[test]
fn push_dirty_during_claimed_partition_preserved() {
let sp = make_shared();
sp.push_dirty(10);
let g = sp.take().unwrap();
assert_eq!(g.partition_id(), 10);
sp.push_dirty(10);
g.processed();
let g2 = sp.take().unwrap();
assert_eq!(g2.partition_id(), 10);
g2.processed();
}
#[test]
fn multiple_partitions_interleaved_push_and_take() {
let sp = make_shared();
sp.push_dirty(1);
sp.push_dirty(2);
let g1 = sp.take().unwrap();
assert_eq!(g1.partition_id(), 1);
sp.push_dirty(3);
let g2 = sp.take().unwrap();
assert_eq!(g2.partition_id(), 2);
g1.processed();
let g3 = sp.take().unwrap();
assert_eq!(g3.partition_id(), 3);
g2.processed();
g3.processed();
assert!(sp.take().is_none());
}
#[test]
fn dropped_guard_without_ack_requeues() {
let sp = make_shared();
sp.push_dirty(10);
{
let _g = sp.take().unwrap();
}
let g = sp.take().unwrap();
assert_eq!(g.partition_id(), 10);
g.processed();
}
#[test]
fn coalesced_push_while_claimed_forces_redirty() {
let sp = make_shared();
sp.push_dirty(10);
let g = sp.take().unwrap();
assert_eq!(g.partition_id(), 10);
sp.push_dirty(10);
g.processed();
let g2 = sp.take().unwrap();
assert_eq!(g2.partition_id(), 10);
g2.processed();
}
}