1use std::{
3 fmt,
4 sync::{
5 atomic::{AtomicBool, AtomicUsize, Ordering},
6 Arc,
7 },
8};
9
10use tokio::sync::watch;
11
12#[derive(Clone, Copy, Debug, PartialEq, Eq)]
20pub enum SupervisorReadiness {
21 Pending,
23 Ready,
25}
26
27#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
35pub enum ReadinessMode {
36 #[default]
38 Immediate,
39 Explicit,
42}
43
44#[derive(Clone)]
50pub struct ReadySignal {
51 inner: ReadySignalInner,
52}
53
54impl ReadySignal {
55 pub(crate) fn immediate() -> Self {
56 Self {
57 inner: ReadySignalInner::Immediate,
58 }
59 }
60
61 pub(crate) fn explicit(tracker: ReadinessTracker, index: usize) -> Self {
62 Self {
63 inner: ReadySignalInner::Explicit { tracker, index },
64 }
65 }
66
67 pub fn mark_ready(&self) {
70 match &self.inner {
71 ReadySignalInner::Immediate => {},
72 ReadySignalInner::Explicit { tracker, index } => {
73 tracker.mark_ready(*index);
74 },
75 }
76 }
77}
78
79impl fmt::Debug for ReadySignal {
80 fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
81 match &self.inner {
82 ReadySignalInner::Immediate => formatter.write_str("ReadySignal::Immediate"),
83 ReadySignalInner::Explicit { index, .. } => formatter
84 .debug_struct("ReadySignal::Explicit")
85 .field("index", index)
86 .finish_non_exhaustive(),
87 }
88 }
89}
90
91#[derive(Clone)]
92enum ReadySignalInner {
93 Immediate,
94 Explicit {
95 tracker: ReadinessTracker,
96 index: usize,
97 },
98}
99
100#[derive(Clone)]
101pub(crate) struct ReadinessTracker {
102 inner: Arc<ReadinessInner>,
103}
104
105impl ReadinessTracker {
106 pub(crate) fn new(modes: impl IntoIterator<Item = ReadinessMode>) -> Self {
107 let mut pending = 0;
108 let members = modes
109 .into_iter()
110 .map(|mode| match mode {
111 ReadinessMode::Immediate => MemberReadiness::Immediate,
112 ReadinessMode::Explicit => {
113 pending += 1;
114 MemberReadiness::Explicit {
115 ready: AtomicBool::new(false),
116 }
117 },
118 })
119 .collect::<Vec<_>>();
120 let state = readiness_state(pending);
121 let (sender, _) = watch::channel(state);
122
123 Self {
124 inner: Arc::new(ReadinessInner {
125 members,
126 pending: AtomicUsize::new(pending),
127 sender,
128 }),
129 }
130 }
131
132 pub(crate) fn signal(&self, index: usize, mode: ReadinessMode) -> ReadySignal {
133 match mode {
134 ReadinessMode::Immediate => ReadySignal::immediate(),
135 ReadinessMode::Explicit => ReadySignal::explicit(self.clone(), index),
136 }
137 }
138
139 pub(crate) fn subscribe(&self) -> watch::Receiver<SupervisorReadiness> {
140 self.inner.sender.subscribe()
141 }
142
143 pub(crate) fn state(&self) -> SupervisorReadiness {
144 readiness_state(self.inner.pending.load(Ordering::Acquire))
145 }
146
147 pub(crate) fn is_ready(&self, index: usize) -> bool {
148 self.inner
149 .members
150 .get(index)
151 .is_some_and(MemberReadiness::is_ready)
152 }
153
154 pub(crate) fn mark_ready(&self, index: usize) {
155 let Some(member) = self.inner.members.get(index) else {
156 return;
157 };
158
159 if member.mark_ready() {
160 let previous = self.inner.pending.fetch_sub(1, Ordering::AcqRel);
161 if previous == 1 {
162 let _ = self.inner.sender.send(SupervisorReadiness::Ready);
163 }
164 }
165 }
166}
167
168struct ReadinessInner {
169 members: Vec<MemberReadiness>,
170 pending: AtomicUsize,
171 sender: watch::Sender<SupervisorReadiness>,
172}
173
174enum MemberReadiness {
175 Immediate,
176 Explicit { ready: AtomicBool },
177}
178
179impl MemberReadiness {
180 pub(crate) fn is_ready(&self) -> bool {
181 match self {
182 Self::Immediate => true,
183 Self::Explicit { ready } => ready.load(Ordering::Acquire),
184 }
185 }
186
187 pub(crate) fn mark_ready(&self) -> bool {
188 match self {
189 Self::Immediate => false,
190 Self::Explicit { ready } => ready
191 .compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
192 .is_ok(),
193 }
194 }
195}
196
197fn readiness_state(pending: usize) -> SupervisorReadiness {
198 if pending == 0 {
199 SupervisorReadiness::Ready
200 } else {
201 SupervisorReadiness::Pending
202 }
203}