1#![doc = include_str!("../README.md")]
2#![cfg_attr(docsrs, feature(doc_cfg))]
3
4pub mod payload;
5
6use payload::AtomicPrimitive;
7#[doc(inline)]
8pub use payload::Payload;
9
10use std::{
11 future::Future,
12 pin::Pin,
13 sync::{
14 Arc,
15 atomic::{AtomicU8, AtomicU32, Ordering},
16 },
17 task::{Context, Poll},
18};
19
20use tokio::sync::{Notify, futures::OwnedNotified};
21
22pub struct SoftCycleListener<'a, T: Payload> {
31 notify: OwnedNotified,
32 controller: &'a SoftCycleController<T>,
33}
34
35impl<T: Payload> Future for SoftCycleListener<'_, T> {
36 type Output = Result<T, ()>;
37
38 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
39 let notify_pin = unsafe { self.as_mut().map_unchecked_mut(|s| &mut s.notify) };
40
41 match notify_pin.poll(cx) {
42 Poll::Pending => {}
43 Poll::Ready(()) => return Poll::Ready(Ok(self.controller.read_payload())),
47 }
48
49 if self.controller.is_notified() {
61 return Poll::Ready(Ok(self.controller.read_payload()));
62 }
63
64 Poll::Pending
65 }
66}
67
68const STATUS_NOT_NOTIFIED: u8 = 0;
70const STATUS_STORING_PAYLOAD: u8 = 1;
72const STATUS_NOTIFIED: u8 = 2;
74const STATUS_CLEARING: u8 = 3;
76
77pub struct SoftCycleController<T: Payload = ()> {
89 notify: Arc<Notify>,
91
92 next_notify_sequence: AtomicU32,
94
95 status: AtomicU8,
97
98 payload: <T as Payload>::UnderlyingAtomic,
101}
102
103impl<T: Payload> SoftCycleController<T> {
104 #[allow(clippy::new_without_default)]
106 pub fn new() -> Self {
107 Self {
108 notify: Arc::new(Notify::new()),
109 next_notify_sequence: AtomicU32::new(0),
110 status: AtomicU8::new(STATUS_NOT_NOTIFIED),
111 payload: <T as Payload>::UnderlyingAtomic::new_default(),
112 }
113 }
114
115 #[must_use = "Caller must check if the operation was successful"]
119 pub fn try_notify(&self, payload: T) -> Result<u32, T> {
120 match self.status.compare_exchange(
121 STATUS_NOT_NOTIFIED,
122 STATUS_STORING_PAYLOAD,
123 Ordering::AcqRel,
124 Ordering::Relaxed,
125 ) {
126 Ok(_) => {
127 let sequence_number = self.next_notify_sequence.fetch_add(1, Ordering::AcqRel);
128 self.payload.store(payload.into());
129 self.status.store(STATUS_NOTIFIED, Ordering::Release);
130 self.notify.notify_waiters();
131 Ok(sequence_number)
132 }
133 Err(_) => Err(payload),
134 }
135 }
136
137 #[allow(clippy::result_unit_err)]
143 pub fn try_clear(&self) -> Result<u32, ()> {
144 match self.status.compare_exchange(
145 STATUS_NOTIFIED,
146 STATUS_CLEARING,
147 Ordering::AcqRel,
148 Ordering::Relaxed,
149 ) {
150 Ok(_) => {
151 let sequence_number = self
152 .next_notify_sequence
153 .load(Ordering::Acquire)
154 .saturating_sub(1);
155 self.status.store(STATUS_NOT_NOTIFIED, Ordering::Release);
156 Ok(sequence_number)
157 }
158 Err(_) => Err(()),
159 }
160 }
161
162 #[must_use = "Caller must await the listener to receive the signal"]
168 pub fn listener<'a>(&'a self) -> SoftCycleListener<'a, T> {
169 SoftCycleListener {
170 notify: self.notify.clone().notified_owned(),
171 controller: self,
172 }
173 }
174
175 fn is_notified(&self) -> bool {
177 self.status.load(Ordering::Acquire) == STATUS_NOTIFIED
178 }
179
180 fn read_payload(&self) -> T {
184 let inner = self.payload.load();
185 T::from(inner)
186 }
187}
188
189#[cfg(test)]
190mod tests {
191 use std::time::Duration;
192 use std::time::Instant;
193
194 use super::*;
195
196 #[tokio::test]
200 async fn guarantee_a_try_notify_sn_from_zero() {
201 let ctrl = SoftCycleController::<u32>::new();
202 assert_eq!(ctrl.try_notify(10), Ok(0));
203 assert_eq!(ctrl.try_clear(), Ok(0));
204 assert_eq!(ctrl.try_notify(20), Ok(1));
205 assert_eq!(ctrl.try_clear(), Ok(1));
206 assert_eq!(ctrl.try_notify(30), Ok(2));
207 }
208
209 #[tokio::test]
211 async fn guarantee_a_try_clear_fails_when_not_notified() {
212 let ctrl = SoftCycleController::<u32>::new();
213 assert_eq!(ctrl.try_clear(), Err(()));
214 assert_eq!(ctrl.try_notify(1), Ok(0));
215 assert_eq!(ctrl.try_clear(), Ok(0));
216 assert_eq!(ctrl.try_clear(), Err(()));
217 }
218
219 #[tokio::test]
222 async fn guarantee_a_sn_sequence_notify_clear_interleaved() {
223 let ctrl = SoftCycleController::<u32>::new();
224 assert_eq!(ctrl.try_notify(100), Ok(0));
225 assert_eq!(ctrl.try_clear(), Ok(0));
226 assert_eq!(ctrl.try_notify(200), Ok(1));
227 assert_eq!(ctrl.try_clear(), Ok(1));
228 assert_eq!(ctrl.try_notify(300), Ok(2));
229 }
230
231 #[tokio::test]
235 async fn guarantee_b_try_clear_nonblocking_many_listeners() {
236 let ctrl = Arc::new(SoftCycleController::<u32>::new());
237 let mut listener_handles = Vec::new();
238 for _ in 0..100 {
239 let c = ctrl.clone();
240 listener_handles.push(tokio::spawn(async move { c.listener().await }));
241 }
242 assert_eq!(ctrl.try_notify(1), Ok(0));
243 let deadline = Duration::from_millis(100);
244 let clear_done = tokio::time::timeout(deadline, async {
245 let _ = ctrl.try_clear();
246 });
247 clear_done.await.expect("try_clear must not block");
248 assert_eq!(ctrl.try_clear(), Err(()));
249 ctrl.try_notify(2).ok();
250 let mut got = 0;
251 for h in listener_handles {
252 if let Ok(Ok(v)) = tokio::time::timeout(Duration::from_secs(2), h).await {
253 assert!(v == Ok(1) || v == Ok(2));
254 got += 1;
255 }
256 }
257 assert!(got > 0, "at least one listener should get a value");
258 }
259
260 #[tokio::test]
262 async fn guarantee_b_try_notify_nonblocking_many_listeners() {
263 let ctrl = Arc::new(SoftCycleController::<u32>::new());
264 for _ in 0..50 {
265 let c = ctrl.clone();
266 tokio::spawn(async move {
267 let _ = c.listener().await;
268 });
269 }
270 tokio::time::sleep(Duration::from_millis(20)).await;
271 let start = Instant::now();
272 let res = ctrl.try_notify(1);
273 assert!(res.is_ok(), "try_notify must succeed");
274 assert!(
275 start.elapsed() < Duration::from_millis(50),
276 "try_notify must not block"
277 );
278 }
279
280 #[tokio::test]
284 async fn guarantee_c_listener_created_while_notified_completes() {
285 let ctrl = Arc::new(SoftCycleController::<u32>::new());
286 assert_eq!(ctrl.try_notify(42), Ok(0));
287 let v = ctrl.listener().await;
288 assert_eq!(v, Ok(42));
289 }
290
291 #[tokio::test]
293 async fn guarantee_c_listener_created_before_notify_completes_after_notify() {
294 let ctrl = Arc::new(SoftCycleController::<u32>::new());
295 let c = ctrl.clone();
296 let listener_task = tokio::spawn(async move { c.listener().await });
297 tokio::time::sleep(Duration::from_millis(50)).await;
298 assert_eq!(ctrl.try_notify(7), Ok(0));
299 let r = tokio::time::timeout(Duration::from_secs(1), listener_task)
300 .await
301 .expect("listener must complete within timeout")
302 .expect("task must not panic");
303 assert_eq!(r, Ok(7));
304 }
305
306 #[tokio::test]
310 async fn guarantee_d_listener_returns_one_payload_after_multi_round() {
311 let ctrl = Arc::new(SoftCycleController::<u32>::new());
312 let c = ctrl.clone();
313 let listener_task = tokio::spawn(async move { c.listener().await });
314 assert!(ctrl.try_notify(1).is_ok());
315 let _ = ctrl.try_clear();
316 assert!(ctrl.try_notify(2).is_ok());
317 let r = listener_task.await.unwrap();
318 let allowed = [Ok(1), Ok(2)];
319 assert!(
320 allowed.contains(&r),
321 "listener must return one of the payloads, got {:?}",
322 r
323 );
324 }
325
326 #[tokio::test]
327 async fn concurrent_multi_round_collects_subset_of_payloads() {
328 let ctrl = Arc::new(SoftCycleController::<u32>::new());
329 let mut seen = Vec::new();
330 let reader = ctrl.clone();
331 let reader_handle = tokio::spawn(async move {
332 for _ in 0..20 {
333 let v = reader.listener().await;
334 if let Ok(x) = v {
335 seen.push(x);
336 }
337 }
338 seen
339 });
340 for i in 0..10u32 {
341 assert!(ctrl.try_notify(i).is_ok());
342 tokio::time::sleep(Duration::from_millis(2)).await;
343 let _ = ctrl.try_clear();
344 tokio::time::sleep(Duration::from_millis(2)).await;
345 }
346 ctrl.try_notify(99).ok();
347 let collected = reader_handle.await.unwrap();
348 assert!(!collected.is_empty());
349 assert!(collected.iter().all(|&x| (0..10).contains(&x) || x == 99));
350 }
351
352 #[tokio::test]
353 async fn stress_many_cycles_and_listeners() {
354 let ctrl = Arc::new(SoftCycleController::<u32>::new());
355 let writer = ctrl.clone();
356 let writer_handle = tokio::spawn(async move {
357 for i in 1..=400u32 {
358 let _ = writer.try_clear();
359 if writer.try_notify(i).is_ok() {
360 tokio::time::sleep(Duration::from_millis(30)).await;
361 } else {
362 panic!("notify failed");
363 }
364 }
365 });
366 let reader = ctrl.clone();
367 let reader_handle = tokio::spawn(async move {
368 for _ in 0..3000 {
369 if let Ok(v) = reader.listener().await {
370 assert!(0 < v && v <= 400);
371 tokio::time::sleep(Duration::from_millis(3)).await;
372 }
373 }
374 });
375 let _ = tokio::join!(writer_handle, reader_handle);
376 }
377
378 #[tokio::test]
384 async fn regression_concurrent_try_notify_try_clear_sequence_numbers_unique_and_consistent() {
385 let ctrl = Arc::new(SoftCycleController::<u32>::new());
386 let mut notify_seqs: Vec<u32> = Vec::new();
387 let mut clear_seqs: Vec<u32> = Vec::new();
388 let mut handles = Vec::new();
389 for _ in 0..8 {
390 let c = ctrl.clone();
391 let h = tokio::spawn(async move {
392 let mut my_notify = Vec::new();
393 let mut my_clear = Vec::new();
394 for i in 0..20u32 {
395 if let Ok(seq) = c.try_notify(i) {
396 my_notify.push(seq);
397 }
398 if c.try_clear().map(|seq| my_clear.push(seq)).is_err() {}
399 }
400 (my_notify, my_clear)
401 });
402 handles.push(h);
403 }
404 for h in handles {
405 let (n, cl) = h.await.unwrap();
406 notify_seqs.extend(n);
407 clear_seqs.extend(cl);
408 }
409 notify_seqs.sort_unstable();
410 clear_seqs.sort_unstable();
411 let n = notify_seqs.len();
413 let unique: std::collections::HashSet<u32> = notify_seqs.iter().copied().collect();
414 assert_eq!(unique.len(), n, "every try_notify Ok(seq) must be unique");
415 for seq in 0..n as u32 {
416 assert!(
417 notify_seqs.contains(&seq),
418 "sequence numbers must be contiguous from 0, missing {}",
419 seq
420 );
421 }
422 for &cleared in &clear_seqs {
424 assert!(
425 notify_seqs.contains(&cleared),
426 "try_clear returned seq {} which was not returned by try_notify",
427 cleared
428 );
429 }
430 }
431
432 #[tokio::test]
435 async fn regression_concurrent_try_clear_returns_notification_sequence() {
436 let ctrl = Arc::new(SoftCycleController::<u32>::new());
437 let ctrl2 = ctrl.clone();
438 let notifier = tokio::spawn(async move {
439 for i in 0u32..50 {
440 if ctrl2.try_notify(100 + i).is_ok() {
441 tokio::time::sleep(Duration::from_millis(1)).await;
442 }
443 }
444 });
445 let clearer = tokio::spawn(async move {
446 let mut cleared = Vec::new();
447 for _ in 0..60 {
448 if let Ok(seq) = ctrl.try_clear() {
449 cleared.push(seq);
450 }
451 tokio::time::sleep(Duration::from_millis(1)).await;
452 }
453 cleared
454 });
455 let _ = notifier.await;
456 let cleared = clearer.await.unwrap();
457 for &s in &cleared {
459 assert!(s < 50, "cleared seq {} must be from a prior notify", s);
460 }
461 }
462}
463
464#[cfg(feature = "global_instance")]
465#[cfg_attr(docsrs, doc(cfg(feature = "global_instance")))]
466mod global;
467
468#[cfg(feature = "global_instance")]
469#[cfg_attr(docsrs, doc(cfg(feature = "global_instance")))]
470pub use global::*;