1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
#[cfg(target_os = "linux")]
mod eventfd {
use std::fs::File;
use std::io::{self, Read, Write};
use std::mem;
use std::os::unix::io::FromRawFd;
use crate::event::EventedId;
use crate::poll::{Interests, PollOption};
use crate::sys::Selector;
/// Awakener backed by `eventfd`.
///
/// `eventfd` is effectively an 64 bit counter. All writes must be of 8
/// bytes (64 bits) and are converted (native endian) into an 64 bit
/// unsigned integer and added to the count. Reads must also be 8 bytes and
/// reset the count to 0, returning the count.
#[derive(Debug)]
pub struct Awakener {
fd: File,
}
impl Awakener {
pub fn new(selector: &Selector, id: EventedId) -> io::Result<Awakener> {
let fd = unsafe { libc::eventfd(0, libc::EFD_CLOEXEC | libc::EFD_NONBLOCK) };
if fd == -1 {
return Err(io::Error::last_os_error());
}
selector.register(fd, id, Interests::READABLE, PollOption::Edge)?;
Ok(Awakener {
fd: unsafe { File::from_raw_fd(fd) },
})
}
pub fn try_clone(&self) -> io::Result<Awakener> {
self.fd.try_clone().map(|fd| Awakener { fd })
}
pub fn wake(&self) -> io::Result<()> {
let buf: [u8; 8] = unsafe { mem::transmute(1u64) };
match (&self.fd).write(&buf) {
Ok(_) => Ok(()),
Err(ref err) if err.kind() == io::ErrorKind::WouldBlock => {
// Writing only blocks if the counter is going to overflow.
// So we'll reset the counter to 0 and wake it again.
self.reset()?;
self.wake()
},
Err(err) => Err(err),
}
}
/// Rest the eventfd object, only need to call this if `wake` fails.
fn reset(&self) -> io::Result<()> {
let mut buf: [u8; 8] = [0; 8];
match (&self.fd).read(&mut buf) {
Ok(_) => Ok(()),
// If the `Awakener` hasn't been awoken yet this will return a
// `WouldBlock` error which we can safely ignore.
Err(ref err) if err.kind() == io::ErrorKind::WouldBlock => Ok(()),
Err(err) => Err(err),
}
}
}
}
#[cfg(target_os = "linux")]
pub use self::eventfd::Awakener;
#[cfg(any(target_os = "freebsd", target_os = "macos",
target_os = "netbsd", target_os = "openbsd"))]
mod kqueue {
use std::io;
use crate::event::EventedId;
use crate::sys::Selector;
/// Awakener backed by kqueue user space notifications (`EVFILT_USER`).
///
/// The implementation is fairly simple, first the kqueue must be setup to
/// receive awakener events this done by calling `Selector.setup_awakener`.
/// Next we need access to kqueue, thus we need to duplicate the file
/// descriptor. Now waking is as simple as adding an event to the kqueue.
#[derive(Debug)]
pub struct Awakener {
selector: Selector,
id: EventedId,
}
impl Awakener {
pub fn new(selector: &Selector, id: EventedId) -> io::Result<Awakener> {
selector.try_clone().and_then(|selector| {
selector.setup_awakener(id)
.map(|()| Awakener { selector, id })
})
}
pub fn try_clone(&self) -> io::Result<Awakener> {
self.selector.try_clone().map(|selector| Awakener {
selector,
id: self.id,
})
}
pub fn wake(&self) -> io::Result<()> {
self.selector.wake(self.id)
}
}
}
#[cfg(any(target_os = "freebsd", target_os = "macos",
target_os = "netbsd", target_os = "openbsd"))]
pub use self::kqueue::Awakener;