soft-cycle 0.2.0

Async controller for coordinating soft restarts and graceful shutdowns with shared listeners
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
#![doc = include_str!("../README.md")]
#![cfg_attr(docsrs, feature(doc_cfg))]

pub mod payload;

use payload::AtomicPrimitive;
#[doc(inline)]
pub use payload::Payload;

use std::{
    future::Future,
    pin::Pin,
    sync::{
        Arc,
        atomic::{AtomicU8, AtomicU32, Ordering},
    },
    task::{Context, Poll},
};

use tokio::sync::{Notify, futures::OwnedNotified};

/// Future that completes when a notification from the associated controller is observed.
///
/// Returned by [`SoftCycleController::listener`]. Resolves to `Ok(payload)` when a notification
/// is observed. If the controller is already notified when the listener is created or first
/// polled, it may complete immediately with that payload. If multiple notify/clear cycles
/// occur after the listener is created, it returns one of those payloads (no guarantee of
/// returning the earliest or latest). See the [crate-level documentation](crate) for full
/// completion guarantees.
pub struct SoftCycleListener<'a, T: Payload> {
    notify: OwnedNotified,
    controller: &'a SoftCycleController<T>,
}

impl<T: Payload> Future for SoftCycleListener<'_, T> {
    type Output = Result<T, ()>;

    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        let notify_pin = unsafe { self.as_mut().map_unchecked_mut(|s| &mut s.notify) };

        match notify_pin.poll(cx) {
            Poll::Pending => {}
            // CORRECTNESS: If the `OwnedNotified` is ready, then a call to
            // `try_notify` must have written the payload and the written
            // value must be visible here.
            Poll::Ready(()) => return Poll::Ready(Ok(self.controller.read_payload())),
        }

        // If `OwnedNotified` was not ready in the last statement, then either:
        //
        // - `listener` was called after `try_notify`, in which case we
        //   should return the payload immediately, or
        // - it was just not notified yet.
        //
        // We check the status anyway here. In the first case, it ensures the
        // correctness of the implementation. In the second case, it's a
        // speculative attempt to return early (if we're lucky enough that
        // `try_notify` was called after we polled the `OwnedNotified` and
        // before we checked the status). In both cases, it's correct.
        if self.controller.is_notified() {
            return Poll::Ready(Ok(self.controller.read_payload()));
        }

        Poll::Pending
    }
}

/// Controller status: 0 = not notified.
const STATUS_NOT_NOTIFIED: u8 = 0;
/// Controller status: 1 = storing payload.
const STATUS_STORING_PAYLOAD: u8 = 1;
/// Controller status: 2 = notified.
const STATUS_NOTIFIED: u8 = 2;
/// Controller status: 3 = clearing.
const STATUS_CLEARING: u8 = 3;

/// Coordination controller for soft restarts and graceful shutdowns.
///
/// `SoftCycleController` exposes a tiny async-friendly coordination protocol:
///
/// - producers call [`try_notify`](Self::try_notify) to publish a notification with a payload and
///   notify waiters;
/// - producers call [`try_clear`](Self::try_clear) to move back to the non-notified state;
/// - consumers create a [`SoftCycleListener`] via [`listener`](Self::listener) to wait for
///   notifications with payloads.
///
/// See the [crate-level documentation](crate) for detailed documentation and usage examples.
pub struct SoftCycleController<T: Payload = ()> {
    /// Notify used to signal listeners.
    notify: Arc<Notify>,

    /// Next notify sequence number.
    next_notify_sequence: AtomicU32,

    /// Controller status.
    status: AtomicU8,

    /// Atomic payload slot; always holds a valid value. Readers may see a newer
    /// payload than the notification they observed, but never invalid data.
    payload: <T as Payload>::UnderlyingAtomic,
}

impl<T: Payload> SoftCycleController<T> {
    /// Creates a new [`SoftCycleController`].
    #[allow(clippy::new_without_default)]
    pub fn new() -> Self {
        Self {
            notify: Arc::new(Notify::new()),
            next_notify_sequence: AtomicU32::new(0),
            status: AtomicU8::new(STATUS_NOT_NOTIFIED),
            payload: <T as Payload>::UnderlyingAtomic::new_default(),
        }
    }

    /// Attempts to notify. On success returns `Ok(sequence_number)` where the sequence number
    /// starts from 0 and increases. On failure (already notified) returns
    /// `Err(payload)` with the payload unchanged. Never blocks.
    #[must_use = "Caller must check if the operation was successful"]
    pub fn try_notify(&self, payload: T) -> Result<u32, T> {
        match self.status.compare_exchange(
            STATUS_NOT_NOTIFIED,
            STATUS_STORING_PAYLOAD,
            Ordering::AcqRel,
            Ordering::Relaxed,
        ) {
            Ok(_) => {
                let sequence_number = self.next_notify_sequence.fetch_add(1, Ordering::AcqRel);
                self.payload.store(payload.into());
                self.status.store(STATUS_NOTIFIED, Ordering::Release);
                self.notify.notify_waiters();
                Ok(sequence_number)
            }
            Err(_) => Err(payload),
        }
    }

    /// Clears the notified state. On success returns `Ok(sequence_number)` for the notification
    /// sequence number that was cleared. On failure (not currently notified) returns `Err(())`.
    /// Never blocks.
    ///
    /// Clearing after a notify does not prevent listeners already waiting from receiving the notification.
    #[allow(clippy::result_unit_err)]
    pub fn try_clear(&self) -> Result<u32, ()> {
        match self.status.compare_exchange(
            STATUS_NOTIFIED,
            STATUS_CLEARING,
            Ordering::AcqRel,
            Ordering::Relaxed,
        ) {
            Ok(_) => {
                let sequence_number = self
                    .next_notify_sequence
                    .load(Ordering::Acquire)
                    .saturating_sub(1);
                self.status.store(STATUS_NOT_NOTIFIED, Ordering::Release);
                Ok(sequence_number)
            }
            Err(_) => Err(()),
        }
    }

    /// Returns a listener that resolves when a notification is observed, with `Ok(payload)`.
    /// If already notified, it may complete in a finite number of polls; if not yet
    /// notified, it completes in a finite number of polls after the next [`try_notify`](Self::try_notify).
    /// After multiple notify/clear cycles, the listener returns one of the payloads (no
    /// guarantee of earliest or latest). See the [crate-level documentation](crate).
    #[must_use = "Caller must await the listener to receive the signal"]
    pub fn listener<'a>(&'a self) -> SoftCycleListener<'a, T> {
        SoftCycleListener {
            notify: self.notify.clone().notified_owned(),
            controller: self,
        }
    }

    /// Returns true if the controller is in the notified state.
    fn is_notified(&self) -> bool {
        self.status.load(Ordering::Acquire) == STATUS_NOTIFIED
    }

    /// Reads payload atomically. It always returns a valid value because even
    /// if [`try_notify`] was never called, the payload is initialized to the
    /// default value.
    fn read_payload(&self) -> T {
        let inner = self.payload.load();
        T::from(inner)
    }
}

#[cfg(test)]
mod tests {
    use std::time::Duration;
    use std::time::Instant;

    use super::*;

    // --- A. Global order / return value semantics ---

    /// Guarantee: try_notify success returns increasing sequence numbers from 0 upward.
    #[tokio::test]
    async fn guarantee_a_try_notify_sn_from_zero() {
        let ctrl = SoftCycleController::<u32>::new();
        assert_eq!(ctrl.try_notify(10), Ok(0));
        assert_eq!(ctrl.try_clear(), Ok(0));
        assert_eq!(ctrl.try_notify(20), Ok(1));
        assert_eq!(ctrl.try_clear(), Ok(1));
        assert_eq!(ctrl.try_notify(30), Ok(2));
    }

    /// Guarantee: try_clear when not notified returns Err(()).
    #[tokio::test]
    async fn guarantee_a_try_clear_fails_when_not_notified() {
        let ctrl = SoftCycleController::<u32>::new();
        assert_eq!(ctrl.try_clear(), Err(()));
        assert_eq!(ctrl.try_notify(1), Ok(0));
        assert_eq!(ctrl.try_clear(), Ok(0));
        assert_eq!(ctrl.try_clear(), Err(()));
    }

    /// Guarantee: try_clear success returns the sequence number that was cleared; sequence numbers
    /// are consistent across notify/clear sequence.
    #[tokio::test]
    async fn guarantee_a_sn_sequence_notify_clear_interleaved() {
        let ctrl = SoftCycleController::<u32>::new();
        assert_eq!(ctrl.try_notify(100), Ok(0));
        assert_eq!(ctrl.try_clear(), Ok(0));
        assert_eq!(ctrl.try_notify(200), Ok(1));
        assert_eq!(ctrl.try_clear(), Ok(1));
        assert_eq!(ctrl.try_notify(300), Ok(2));
    }

    // --- B. Non-blocking (try_notify and try_clear are synchronous) ---

    /// Guarantee: try_clear completes immediately even with many listeners (no reader-count barrier).
    #[tokio::test]
    async fn guarantee_b_try_clear_nonblocking_many_listeners() {
        let ctrl = Arc::new(SoftCycleController::<u32>::new());
        let mut listener_handles = Vec::new();
        for _ in 0..100 {
            let c = ctrl.clone();
            listener_handles.push(tokio::spawn(async move { c.listener().await }));
        }
        assert_eq!(ctrl.try_notify(1), Ok(0));
        let deadline = Duration::from_millis(100);
        let clear_done = tokio::time::timeout(deadline, async {
            let _ = ctrl.try_clear();
        });
        clear_done.await.expect("try_clear must not block");
        assert_eq!(ctrl.try_clear(), Err(()));
        ctrl.try_notify(2).ok();
        let mut got = 0;
        for h in listener_handles {
            if let Ok(Ok(v)) = tokio::time::timeout(Duration::from_secs(2), h).await {
                assert!(v == Ok(1) || v == Ok(2));
                got += 1;
            }
        }
        assert!(got > 0, "at least one listener should get a value");
    }

    /// Guarantee: try_notify completes synchronously even with many listeners already waiting.
    #[tokio::test]
    async fn guarantee_b_try_notify_nonblocking_many_listeners() {
        let ctrl = Arc::new(SoftCycleController::<u32>::new());
        for _ in 0..50 {
            let c = ctrl.clone();
            tokio::spawn(async move {
                let _ = c.listener().await;
            });
        }
        tokio::time::sleep(Duration::from_millis(20)).await;
        let start = Instant::now();
        let res = ctrl.try_notify(1);
        assert!(res.is_ok(), "try_notify must succeed");
        assert!(
            start.elapsed() < Duration::from_millis(50),
            "try_notify must not block"
        );
    }

    // --- C. Listener completion ---

    /// Guarantee: listener created after notify, before clear, completes in finite polls (here, immediately).
    #[tokio::test]
    async fn guarantee_c_listener_created_while_notified_completes() {
        let ctrl = Arc::new(SoftCycleController::<u32>::new());
        assert_eq!(ctrl.try_notify(42), Ok(0));
        let v = ctrl.listener().await;
        assert_eq!(v, Ok(42));
    }

    /// Guarantee: listener created after clear, before next notify, completes in finite polls after next try_notify.
    #[tokio::test]
    async fn guarantee_c_listener_created_before_notify_completes_after_notify() {
        let ctrl = Arc::new(SoftCycleController::<u32>::new());
        let c = ctrl.clone();
        let listener_task = tokio::spawn(async move { c.listener().await });
        tokio::time::sleep(Duration::from_millis(50)).await;
        assert_eq!(ctrl.try_notify(7), Ok(0));
        let r = tokio::time::timeout(Duration::from_secs(1), listener_task)
            .await
            .expect("listener must complete within timeout")
            .expect("task must not panic");
        assert_eq!(r, Ok(7));
    }

    // --- D. Multi-round: listener returns one of the payloads (no guarantee of earliest/latest) ---

    /// Guarantee: after multiple notify/clear cycles, listener returns one of the payloads.
    #[tokio::test]
    async fn guarantee_d_listener_returns_one_payload_after_multi_round() {
        let ctrl = Arc::new(SoftCycleController::<u32>::new());
        let c = ctrl.clone();
        let listener_task = tokio::spawn(async move { c.listener().await });
        assert!(ctrl.try_notify(1).is_ok());
        let _ = ctrl.try_clear();
        assert!(ctrl.try_notify(2).is_ok());
        let r = listener_task.await.unwrap();
        let allowed = [Ok(1), Ok(2)];
        assert!(
            allowed.contains(&r),
            "listener must return one of the payloads, got {:?}",
            r
        );
    }

    #[tokio::test]
    async fn concurrent_multi_round_collects_subset_of_payloads() {
        let ctrl = Arc::new(SoftCycleController::<u32>::new());
        let mut seen = Vec::new();
        let reader = ctrl.clone();
        let reader_handle = tokio::spawn(async move {
            for _ in 0..20 {
                let v = reader.listener().await;
                if let Ok(x) = v {
                    seen.push(x);
                }
            }
            seen
        });
        for i in 0..10u32 {
            assert!(ctrl.try_notify(i).is_ok());
            tokio::time::sleep(Duration::from_millis(2)).await;
            let _ = ctrl.try_clear();
            tokio::time::sleep(Duration::from_millis(2)).await;
        }
        ctrl.try_notify(99).ok();
        let collected = reader_handle.await.unwrap();
        assert!(!collected.is_empty());
        assert!(collected.iter().all(|&x| (0..10).contains(&x) || x == 99));
    }

    #[tokio::test]
    async fn stress_many_cycles_and_listeners() {
        let ctrl = Arc::new(SoftCycleController::<u32>::new());
        let writer = ctrl.clone();
        let writer_handle = tokio::spawn(async move {
            for i in 1..=400u32 {
                let _ = writer.try_clear();
                if writer.try_notify(i).is_ok() {
                    tokio::time::sleep(Duration::from_millis(30)).await;
                } else {
                    panic!("notify failed");
                }
            }
        });
        let reader = ctrl.clone();
        let reader_handle = tokio::spawn(async move {
            for _ in 0..3000 {
                if let Ok(v) = reader.listener().await {
                    assert!(0 < v && v <= 400);
                    tokio::time::sleep(Duration::from_millis(3)).await;
                }
            }
        });
        let _ = tokio::join!(writer_handle, reader_handle);
    }

    // --- Regression: concurrent try_notify / try_clear sequence number semantics ---

    /// Guarantee: under concurrent try_notify/try_clear, every successful try_notify returns a
    /// unique sequence number (0, 1, 2, ...), and every successful try_clear returns a sequence
    /// number that was previously returned by try_notify.
    #[tokio::test]
    async fn regression_concurrent_try_notify_try_clear_sequence_numbers_unique_and_consistent() {
        let ctrl = Arc::new(SoftCycleController::<u32>::new());
        let mut notify_seqs: Vec<u32> = Vec::new();
        let mut clear_seqs: Vec<u32> = Vec::new();
        let mut handles = Vec::new();
        for _ in 0..8 {
            let c = ctrl.clone();
            let h = tokio::spawn(async move {
                let mut my_notify = Vec::new();
                let mut my_clear = Vec::new();
                for i in 0..20u32 {
                    if let Ok(seq) = c.try_notify(i) {
                        my_notify.push(seq);
                    }
                    if c.try_clear().map(|seq| my_clear.push(seq)).is_err() {}
                }
                (my_notify, my_clear)
            });
            handles.push(h);
        }
        for h in handles {
            let (n, cl) = h.await.unwrap();
            notify_seqs.extend(n);
            clear_seqs.extend(cl);
        }
        notify_seqs.sort_unstable();
        clear_seqs.sort_unstable();
        // try_notify sequence numbers must be unique and contiguous from 0.
        let n = notify_seqs.len();
        let unique: std::collections::HashSet<u32> = notify_seqs.iter().copied().collect();
        assert_eq!(unique.len(), n, "every try_notify Ok(seq) must be unique");
        for seq in 0..n as u32 {
            assert!(
                notify_seqs.contains(&seq),
                "sequence numbers must be contiguous from 0, missing {}",
                seq
            );
        }
        // Every cleared sequence must have been notified.
        for &cleared in &clear_seqs {
            assert!(
                notify_seqs.contains(&cleared),
                "try_clear returned seq {} which was not returned by try_notify",
                cleared
            );
        }
    }

    /// Guarantee: under concurrent contention, try_clear success returns the sequence number
    /// of the notification that was cleared (documented semantics).
    #[tokio::test]
    async fn regression_concurrent_try_clear_returns_notification_sequence() {
        let ctrl = Arc::new(SoftCycleController::<u32>::new());
        let ctrl2 = ctrl.clone();
        let notifier = tokio::spawn(async move {
            for i in 0u32..50 {
                if ctrl2.try_notify(100 + i).is_ok() {
                    tokio::time::sleep(Duration::from_millis(1)).await;
                }
            }
        });
        let clearer = tokio::spawn(async move {
            let mut cleared = Vec::new();
            for _ in 0..60 {
                if let Ok(seq) = ctrl.try_clear() {
                    cleared.push(seq);
                }
                tokio::time::sleep(Duration::from_millis(1)).await;
            }
            cleared
        });
        let _ = notifier.await;
        let cleared = clearer.await.unwrap();
        // Every try_clear Ok(seq) must be a sequence number that was returned by try_notify.
        for &s in &cleared {
            assert!(s < 50, "cleared seq {} must be from a prior notify", s);
        }
    }
}

#[cfg(feature = "global_instance")]
#[cfg_attr(docsrs, doc(cfg(feature = "global_instance")))]
mod global;

#[cfg(feature = "global_instance")]
#[cfg_attr(docsrs, doc(cfg(feature = "global_instance")))]
pub use global::*;