futures_signals/signal/
channel.rs

1use super::Signal;
2use std::pin::Pin;
3use std::marker::Unpin;
4use std::sync::{Arc, Weak};
5use std::sync::atomic::{AtomicUsize, Ordering};
6use std::task::{Poll, Context, Waker};
7use crate::atomic::AtomicOption;
8
9
10#[derive(Debug)]
11struct Inner<A> {
12    value: AtomicOption<A>,
13    // TODO use AtomicWaker ?
14    waker: AtomicOption<Waker>,
15    senders: AtomicUsize,
16}
17
18impl<A> Inner<A> {
19    #[inline]
20    fn has_senders(&self) -> bool {
21        // This will be 0 if the channel was closed
22        self.senders.load(Ordering::Acquire) != 0
23    }
24
25    #[inline]
26    fn add_sender(&self) {
27        self.senders.fetch_add(1, Ordering::AcqRel);
28    }
29
30    /// Returns `true` if it no longer has any senders
31    #[inline]
32    fn remove_sender(&self) -> bool {
33        self.senders.fetch_sub(1, Ordering::AcqRel) == 1
34    }
35
36    #[inline]
37    fn remove_all_senders(&self) {
38        self.senders.store(0, Ordering::Release);
39    }
40
41    fn notify(&self) {
42        if let Some(waker) = self.waker.take() {
43            waker.wake();
44        }
45    }
46}
47
48
49#[derive(Debug)]
50pub struct Sender<A> {
51    inner: Weak<Inner<A>>,
52}
53
54impl<A> Sender<A> {
55    pub fn send(&self, value: A) -> Result<(), A> {
56        if let Some(inner) = self.inner.upgrade() {
57            if inner.has_senders() {
58                inner.value.store(Some(value));
59                inner.notify();
60                Ok(())
61
62            } else {
63                Err(value)
64            }
65
66        } else {
67            Err(value)
68        }
69    }
70
71    pub fn close(&self) {
72        if let Some(inner) = self.inner.upgrade() {
73            if inner.has_senders() {
74                inner.remove_all_senders();
75                inner.notify();
76            }
77        }
78    }
79}
80
81impl<A> Clone for Sender<A> {
82    fn clone(&self) -> Self {
83        if let Some(inner) = self.inner.upgrade() {
84            // This might be `false` if the `close` method was called
85            if inner.has_senders() {
86                inner.add_sender();
87            }
88        }
89
90        Self {
91            inner: self.inner.clone(),
92        }
93    }
94}
95
96impl<A> Drop for Sender<A> {
97    fn drop(&mut self) {
98        if let Some(inner) = self.inner.upgrade() {
99            // This might be `false` if the `close` method was called
100            if inner.has_senders() {
101                // If there aren't any more senders...
102                if inner.remove_sender() {
103                    inner.notify();
104                }
105            }
106        }
107    }
108}
109
110
111#[derive(Debug)]
112#[must_use = "Signals do nothing unless polled"]
113pub struct Receiver<A> {
114    inner: Arc<Inner<A>>,
115}
116
117impl<A> Unpin for Receiver<A> {}
118
119impl<A> Signal for Receiver<A> {
120    type Item = A;
121
122    #[inline]
123    fn poll_change(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
124        match self.inner.value.take() {
125            None => {
126                if self.inner.has_senders() {
127                    self.inner.waker.store(Some(cx.waker().clone()));
128                    Poll::Pending
129
130                } else {
131                    Poll::Ready(None)
132                }
133            },
134
135            a => Poll::Ready(a),
136        }
137    }
138}
139
140pub fn channel<A>(initial_value: A) -> (Sender<A>, Receiver<A>) {
141    let inner = Arc::new(Inner {
142        value: AtomicOption::new(Some(initial_value)),
143        waker: AtomicOption::new(None),
144        senders: AtomicUsize::new(1),
145    });
146
147    let sender = Sender {
148        inner: Arc::downgrade(&inner),
149    };
150
151    let receiver = Receiver {
152        inner,
153    };
154
155    (sender, receiver)
156}