lambda_runtime_api_client/body/
watch.rs

1//! Body::channel utilities. Extracted from Hyper under MIT license.
2//! <https://github.com/hyperium/hyper/blob/master/LICENSE>
3
4//! An SPSC broadcast channel.
5//!
6//! - The value can only be a `usize`.
7//! - The consumer is only notified if the value is different.
8//! - The value `0` is reserved for closed.
9
10use futures_util::task::AtomicWaker;
11use std::{
12    sync::{
13        atomic::{AtomicUsize, Ordering},
14        Arc,
15    },
16    task,
17};
18
19type Value = usize;
20
21pub(crate) const CLOSED: usize = 0;
22
23pub(crate) fn channel(initial: Value) -> (Sender, Receiver) {
24    debug_assert!(initial != CLOSED, "watch::channel initial state of 0 is reserved");
25
26    let shared = Arc::new(Shared {
27        value: AtomicUsize::new(initial),
28        waker: AtomicWaker::new(),
29    });
30
31    (Sender { shared: shared.clone() }, Receiver { shared })
32}
33
34pub(crate) struct Sender {
35    shared: Arc<Shared>,
36}
37
38pub(crate) struct Receiver {
39    shared: Arc<Shared>,
40}
41
42struct Shared {
43    value: AtomicUsize,
44    waker: AtomicWaker,
45}
46
47impl Sender {
48    pub(crate) fn send(&mut self, value: Value) {
49        if self.shared.value.swap(value, Ordering::SeqCst) != value {
50            self.shared.waker.wake();
51        }
52    }
53}
54
55impl Drop for Sender {
56    fn drop(&mut self) {
57        self.send(CLOSED);
58    }
59}
60
61impl Receiver {
62    pub(crate) fn load(&mut self, cx: &mut task::Context<'_>) -> Value {
63        self.shared.waker.register(cx.waker());
64        self.shared.value.load(Ordering::SeqCst)
65    }
66
67    #[allow(unused)]
68    pub(crate) fn peek(&self) -> Value {
69        self.shared.value.load(Ordering::Relaxed)
70    }
71}