lambda_runtime_api_client/body/
watch.rs1use futures_util::task::AtomicWaker;
11use std::{
12 sync::{
13 atomic::{AtomicUsize, Ordering},
14 Arc,
15 },
16 task,
17};
18
19type Value = usize;
20
21pub(crate) const CLOSED: usize = 0;
22
23pub(crate) fn channel(initial: Value) -> (Sender, Receiver) {
24 debug_assert!(initial != CLOSED, "watch::channel initial state of 0 is reserved");
25
26 let shared = Arc::new(Shared {
27 value: AtomicUsize::new(initial),
28 waker: AtomicWaker::new(),
29 });
30
31 (Sender { shared: shared.clone() }, Receiver { shared })
32}
33
34pub(crate) struct Sender {
35 shared: Arc<Shared>,
36}
37
38pub(crate) struct Receiver {
39 shared: Arc<Shared>,
40}
41
42struct Shared {
43 value: AtomicUsize,
44 waker: AtomicWaker,
45}
46
47impl Sender {
48 pub(crate) fn send(&mut self, value: Value) {
49 if self.shared.value.swap(value, Ordering::SeqCst) != value {
50 self.shared.waker.wake();
51 }
52 }
53}
54
55impl Drop for Sender {
56 fn drop(&mut self) {
57 self.send(CLOSED);
58 }
59}
60
61impl Receiver {
62 pub(crate) fn load(&mut self, cx: &mut task::Context<'_>) -> Value {
63 self.shared.waker.register(cx.waker());
64 self.shared.value.load(Ordering::SeqCst)
65 }
66
67 #[allow(unused)]
68 pub(crate) fn peek(&self) -> Value {
69 self.shared.value.load(Ordering::Relaxed)
70 }
71}