use std::{
fmt,
sync::{
atomic::{AtomicBool, AtomicUsize, Ordering},
Arc,
},
};
use tokio::sync::watch;
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum SupervisorReadiness {
Pending,
Ready,
}
#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
pub enum ReadinessMode {
#[default]
Immediate,
Explicit,
}
#[derive(Clone)]
pub struct ReadySignal {
inner: ReadySignalInner,
}
impl ReadySignal {
pub(crate) fn immediate() -> Self {
Self {
inner: ReadySignalInner::Immediate,
}
}
pub(crate) fn explicit(tracker: ReadinessTracker, index: usize) -> Self {
Self {
inner: ReadySignalInner::Explicit { tracker, index },
}
}
pub fn mark_ready(&self) {
match &self.inner {
ReadySignalInner::Immediate => {},
ReadySignalInner::Explicit { tracker, index } => {
tracker.mark_ready(*index);
},
}
}
}
impl fmt::Debug for ReadySignal {
fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
match &self.inner {
ReadySignalInner::Immediate => formatter.write_str("ReadySignal::Immediate"),
ReadySignalInner::Explicit { index, .. } => formatter
.debug_struct("ReadySignal::Explicit")
.field("index", index)
.finish_non_exhaustive(),
}
}
}
#[derive(Clone)]
enum ReadySignalInner {
Immediate,
Explicit {
tracker: ReadinessTracker,
index: usize,
},
}
#[derive(Clone)]
pub(crate) struct ReadinessTracker {
inner: Arc<ReadinessInner>,
}
impl ReadinessTracker {
pub(crate) fn new(modes: impl IntoIterator<Item = ReadinessMode>) -> Self {
let mut pending = 0;
let members = modes
.into_iter()
.map(|mode| match mode {
ReadinessMode::Immediate => MemberReadiness::Immediate,
ReadinessMode::Explicit => {
pending += 1;
MemberReadiness::Explicit {
ready: AtomicBool::new(false),
}
},
})
.collect::<Vec<_>>();
let state = readiness_state(pending);
let (sender, _) = watch::channel(state);
Self {
inner: Arc::new(ReadinessInner {
members,
pending: AtomicUsize::new(pending),
sender,
}),
}
}
pub(crate) fn signal(&self, index: usize, mode: ReadinessMode) -> ReadySignal {
match mode {
ReadinessMode::Immediate => ReadySignal::immediate(),
ReadinessMode::Explicit => ReadySignal::explicit(self.clone(), index),
}
}
pub(crate) fn subscribe(&self) -> watch::Receiver<SupervisorReadiness> {
self.inner.sender.subscribe()
}
pub(crate) fn state(&self) -> SupervisorReadiness {
readiness_state(self.inner.pending.load(Ordering::Acquire))
}
pub(crate) fn is_ready(&self, index: usize) -> bool {
self.inner
.members
.get(index)
.is_some_and(MemberReadiness::is_ready)
}
pub(crate) fn mark_ready(&self, index: usize) {
let Some(member) = self.inner.members.get(index) else {
return;
};
if member.mark_ready() {
let previous = self.inner.pending.fetch_sub(1, Ordering::AcqRel);
if previous == 1 {
let _ = self.inner.sender.send(SupervisorReadiness::Ready);
}
}
}
}
struct ReadinessInner {
members: Vec<MemberReadiness>,
pending: AtomicUsize,
sender: watch::Sender<SupervisorReadiness>,
}
enum MemberReadiness {
Immediate,
Explicit { ready: AtomicBool },
}
impl MemberReadiness {
pub(crate) fn is_ready(&self) -> bool {
match self {
Self::Immediate => true,
Self::Explicit { ready } => ready.load(Ordering::Acquire),
}
}
pub(crate) fn mark_ready(&self) -> bool {
match self {
Self::Immediate => false,
Self::Explicit { ready } => ready
.compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
.is_ok(),
}
}
}
fn readiness_state(pending: usize) -> SupervisorReadiness {
if pending == 0 {
SupervisorReadiness::Ready
} else {
SupervisorReadiness::Pending
}
}