futures_signals/signal/
channel.rs1use super::Signal;
2use std::pin::Pin;
3use std::marker::Unpin;
4use std::sync::{Arc, Weak};
5use std::sync::atomic::{AtomicUsize, Ordering};
6use std::task::{Poll, Context, Waker};
7use crate::atomic::AtomicOption;
8
9
10#[derive(Debug)]
11struct Inner<A> {
12 value: AtomicOption<A>,
13 waker: AtomicOption<Waker>,
15 senders: AtomicUsize,
16}
17
18impl<A> Inner<A> {
19 #[inline]
20 fn has_senders(&self) -> bool {
21 self.senders.load(Ordering::Acquire) != 0
23 }
24
25 #[inline]
26 fn add_sender(&self) {
27 self.senders.fetch_add(1, Ordering::AcqRel);
28 }
29
30 #[inline]
32 fn remove_sender(&self) -> bool {
33 self.senders.fetch_sub(1, Ordering::AcqRel) == 1
34 }
35
36 #[inline]
37 fn remove_all_senders(&self) {
38 self.senders.store(0, Ordering::Release);
39 }
40
41 fn notify(&self) {
42 if let Some(waker) = self.waker.take() {
43 waker.wake();
44 }
45 }
46}
47
48
49#[derive(Debug)]
50pub struct Sender<A> {
51 inner: Weak<Inner<A>>,
52}
53
54impl<A> Sender<A> {
55 pub fn send(&self, value: A) -> Result<(), A> {
56 if let Some(inner) = self.inner.upgrade() {
57 if inner.has_senders() {
58 inner.value.store(Some(value));
59 inner.notify();
60 Ok(())
61
62 } else {
63 Err(value)
64 }
65
66 } else {
67 Err(value)
68 }
69 }
70
71 pub fn close(&self) {
72 if let Some(inner) = self.inner.upgrade() {
73 if inner.has_senders() {
74 inner.remove_all_senders();
75 inner.notify();
76 }
77 }
78 }
79}
80
81impl<A> Clone for Sender<A> {
82 fn clone(&self) -> Self {
83 if let Some(inner) = self.inner.upgrade() {
84 if inner.has_senders() {
86 inner.add_sender();
87 }
88 }
89
90 Self {
91 inner: self.inner.clone(),
92 }
93 }
94}
95
96impl<A> Drop for Sender<A> {
97 fn drop(&mut self) {
98 if let Some(inner) = self.inner.upgrade() {
99 if inner.has_senders() {
101 if inner.remove_sender() {
103 inner.notify();
104 }
105 }
106 }
107 }
108}
109
110
111#[derive(Debug)]
112#[must_use = "Signals do nothing unless polled"]
113pub struct Receiver<A> {
114 inner: Arc<Inner<A>>,
115}
116
117impl<A> Unpin for Receiver<A> {}
118
119impl<A> Signal for Receiver<A> {
120 type Item = A;
121
122 #[inline]
123 fn poll_change(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
124 match self.inner.value.take() {
125 None => {
126 if self.inner.has_senders() {
127 self.inner.waker.store(Some(cx.waker().clone()));
128 Poll::Pending
129
130 } else {
131 Poll::Ready(None)
132 }
133 },
134
135 a => Poll::Ready(a),
136 }
137 }
138}
139
140pub fn channel<A>(initial_value: A) -> (Sender<A>, Receiver<A>) {
141 let inner = Arc::new(Inner {
142 value: AtomicOption::new(Some(initial_value)),
143 waker: AtomicOption::new(None),
144 senders: AtomicUsize::new(1),
145 });
146
147 let sender = Sender {
148 inner: Arc::downgrade(&inner),
149 };
150
151 let receiver = Receiver {
152 inner,
153 };
154
155 (sender, receiver)
156}