1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
use core::sync::atomic::Ordering::Relaxed;
use futures::{task::AtomicWaker, Stream};
use std::{
fmt::Debug,
hash::Hash,
pin::Pin,
sync::{atomic::AtomicBool, Arc, Weak},
task::{Context, Poll},
};
#[derive(Debug)]
pub(crate) struct Sender(Arc<Inner>);
#[derive(Debug)]
pub(crate) struct Receiver(Weak<Inner>);
#[derive(Debug, Default)]
struct Inner {
waker: AtomicWaker,
set: AtomicBool,
}
impl Drop for Inner {
fn drop(&mut self) {
// Sender holds a strong reference to Inner, and Receiver holds a weak reference to Inner,
// so this will run when the Sender is dropped.
//
// The Receiver is usually owned by a spawned async task that is always waiting on the next
// value from its Stream. While it's waiting, it continues owning all the data it has
// captured. That data will not be dropped until the the stream ends.
//
// If we don't wake the waker a final time here, the spawned task will continue waiting for
// a final message from the Receiver that never arrives, because the waker never wakes it
// up again. So we wake the waker a final time, which tries to upgrade the Receiver, which
// fails, which causes the stream to yield Poll::Ready(None), ending the stream, and
// therefore ending the task, and therefore dropping all data that the stream has
// captured, avoiding a memory leak.
self.waker.wake();
}
}
pub fn channel() -> (Sender, Receiver) {
let inner = Arc::new(Inner {
waker: AtomicWaker::new(),
set: AtomicBool::new(false),
});
let rx = Arc::downgrade(&inner);
(Sender(inner), Receiver(rx))
}
impl Sender {
pub fn notify(&mut self) {
self.0.set.store(true, Relaxed);
self.0.waker.wake();
}
}
impl Stream for Receiver {
type Item = ();
fn poll_next(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Self::Item>> {
if let Some(inner) = self.0.upgrade() {
inner.waker.register(cx.waker());
if inner.set.swap(false, Relaxed) {
Poll::Ready(Some(()))
} else {
Poll::Pending
}
} else {
Poll::Ready(None)
}
}
}
impl Hash for Sender {
fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
Arc::as_ptr(&self.0).hash(state)
}
}
impl PartialEq for Sender {
fn eq(&self, other: &Self) -> bool {
Arc::ptr_eq(&self.0, &other.0)
}
}
impl Eq for Sender {}
impl Hash for Receiver {
fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
Weak::as_ptr(&self.0).hash(state)
}
}
impl PartialEq for Receiver {
fn eq(&self, other: &Self) -> bool {
Weak::ptr_eq(&self.0, &other.0)
}
}
impl Eq for Receiver {}