1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
use std::{
pin::Pin,
sync::{
atomic::{AtomicBool, Ordering},
Arc,
},
thread,
time::Duration,
};
use futures::{
task::{Context, Poll},
Stream,
};
use crate::Result;
use super::{
filter::EventFilter, poll_internal, read_internal, sys::Waker, Event, InternalEvent,
INTERNAL_EVENT_READER,
};
#[derive(Debug)]
pub struct EventStream {
poll_internal_waker: Waker,
stream_wake_thread_spawned: Arc<AtomicBool>,
stream_wake_thread_should_shutdown: Arc<AtomicBool>,
}
impl Default for EventStream {
fn default() -> Self {
EventStream {
poll_internal_waker: INTERNAL_EVENT_READER.write().waker(),
stream_wake_thread_spawned: Arc::new(AtomicBool::new(false)),
stream_wake_thread_should_shutdown: Arc::new(AtomicBool::new(false)),
}
}
}
impl EventStream {
pub fn new() -> EventStream {
EventStream::default()
}
}
impl Stream for EventStream {
type Item = Result<Event>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let result = match poll_internal(Some(Duration::from_secs(0)), &EventFilter) {
Ok(true) => match read_internal(&EventFilter) {
Ok(InternalEvent::Event(event)) => Poll::Ready(Some(Ok(event))),
Err(e) => Poll::Ready(Some(Err(e))),
#[cfg(unix)]
_ => unreachable!(),
},
Ok(false) => {
if !self
.stream_wake_thread_spawned
.compare_and_swap(false, true, Ordering::SeqCst)
{
let stream_waker = cx.waker().clone();
let stream_wake_thread_spawned = self.stream_wake_thread_spawned.clone();
let stream_wake_thread_should_shutdown =
self.stream_wake_thread_should_shutdown.clone();
stream_wake_thread_should_shutdown.store(false, Ordering::SeqCst);
thread::spawn(move || {
loop {
if let Ok(true) = poll_internal(None, &EventFilter) {
break;
}
if stream_wake_thread_should_shutdown.load(Ordering::SeqCst) {
break;
}
}
stream_wake_thread_spawned.store(false, Ordering::SeqCst);
stream_waker.wake();
});
}
Poll::Pending
}
Err(e) => Poll::Ready(Some(Err(e))),
};
result
}
}
impl Drop for EventStream {
fn drop(&mut self) {
self.stream_wake_thread_should_shutdown
.store(true, Ordering::SeqCst);
let _ = self.poll_internal_waker.wake();
}
}