1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
use std::os::unix::io::AsRawFd;
use std::io::{Result, Error};
use std::fmt::Debug;
use event::Event;
use timer::Timer;
use channel::{channel, sync_channel, Sender, SyncSender, Receiver};
#[cfg(any(target_os = "linux", target_os = "android"))]
use epoll::KernelRegistrar;
#[cfg(any(target_os = "bitrig", target_os = "dragonfly",
target_os = "freebsd", target_os = "ios", target_os = "macos",
target_os = "netbsd", target_os = "openbsd"))]
pub use kqueue::KernelRegistrar;
/// An abstraction for registering file descriptors with a kernel poller
///
/// A Registrar is tied to a Poller of the same type, and registers sockets and unique IDs for those
/// sockets as userdata that can be waited on by the poller. A Registar should only be retrieved via
/// a call to Poller::get_registrar(&self), and not created on it's own.
#[derive(Debug, Clone)]
pub struct Registrar {
inner: KernelRegistrar
}
impl Registrar {
/// This method is public only so it can be used directly by the Poller. Do not Use it.
#[doc(hidden)]
pub fn new(inner: KernelRegistrar) -> Registrar {
Registrar {
inner: inner
}
}
/// Register a socket for a given event type, with a Poller and return it's unique ID
///
/// Note that if the sock type is not pollable, then an error will be returned.
pub fn register<T: AsRawFd>(&self, sock: &T, event: Event) -> Result<usize> {
self.inner.register(sock, event).map_err(|e| Error::from(e))
}
/// Reregister a socket with a Poller
pub fn reregister<T: AsRawFd>(&self, id: usize, sock: &T, event: Event) -> Result<()> {
self.inner.reregister(id, sock, event).map_err(|e| Error::from(e))
}
/// Remove a socket from a Poller
///
/// Note that ownership of the socket is taken here. Sockets should only be deregistered when
/// the caller is done with them.
///
/// Will return an error if the socket is not present in the poller when using epoll. Returns no
/// error with kqueue.
pub fn deregister<T: AsRawFd>(&self, sock: T) -> Result<()> {
self.inner.deregister(sock).map_err(|e| Error::from(e))
}
/// Set a timeout in ms that fires once
///
/// Note that this timeout may be delivered late due to the time taken between the calls to
/// `Poller::wait()` exceeding the timeout, but it will never be delivered early.
///
/// Note that an error will be returned if the maximum number of file descriptors is already
/// registered with the kernel poller.
pub fn set_timeout(&self, timeout: usize) -> Result<Timer> {
self.inner.set_timeout(timeout).map_err(|e| Error::from(e))
}
/// Set a recurring timeout in ms
///
/// A notification with the returned id will be sent at the given interval. The timeout can be
/// cancelled with a call to `cancel_timeout()`
///
/// Note that if `Poller::wait()` is not called in a loop, these timeouts as well as other
/// notifications, will not be delivered. Timeouts may be delivered late, due to the time taken
/// between calls to `Poller::wait()` exceeding the timeout, but will never be delivered early.
///
/// Note that an error will be returned if the maximum number of file descriptors is already
/// registered with the kernel poller.
pub fn set_interval(&self, interval: usize) -> Result<Timer> {
self.inner.set_interval(interval).map_err(|e| Error::from(e))
}
/// Cancel a recurring timeout.
///
/// Note that there may be timeouts in flight already that were not yet cancelled, so it is
/// possible that you may receive notifications after the timeout was cancelled. This can be
/// mitigated by keeping track of live timers and only processing timeout events for known live
/// timers.
///
/// An error will be returned if the timer is not registered with the kernel poller.
pub fn cancel_timeout(&self, timer: Timer) -> Result<()> {
self.inner.cancel_timeout(timer).map_err(|e| Error::from(e))
}
/// Create an asynchronous mpsc channel where the Receiver is registered with the kernel poller.
///
/// Each new Receiver gets registered using a user space event mechanism (either eventfd or
/// kevent depending upon OS). When a send occurs the kernel notification mechanism (a syscall on
/// a file descriptor) will be issued to alert the kernel poller to wakeup and issue a
/// notification for the Receiver. However, since syscalls are expensive, an optimization is
/// made where if the kernel poller is already set to awaken, or currently processing events, a
/// new syscall will not be made.
///
/// Standard rust mpsc channels are used internally and have non-blocking semantics. Note that
/// the return type is different since the Receiver is being registered with the kernel poller
/// and this can fail.
///
/// When a Receiver is dropped it will become unregistered.
pub fn channel<T: Debug>(&self) -> Result<(Sender<T>, Receiver<T>)> {
channel(self.inner.clone())
}
/// Create a synchronous mpsc channel where the Receiver is registered with the kernel poller.
///
/// Each new Receiver gets registered using a user space event mechanism (either eventfd or
/// kevent depending upon OS). When a send occurs the kernel notification mechanism (a syscall on
/// a file descriptor) will be issued to alert the kernel poller to wakeup and issue a
/// notification for the Receiver. However, since syscalls are expensive, an optimization is
/// made where if the kernel poller is already set to awaken, or currently processing events, a
/// new syscall will not be made.
///
/// Standard rust synchronous mpsc channels are used internally and block when the queue is
/// full, as given by the bound in the construcotr. Note that the return type is different
/// since the Receiver is being registered with the kernel poller and this can fail.
///
/// When a Receiver is dropped it will become unregistered.
pub fn sync_channel<T: Debug>(&self, bound: usize) -> Result<(SyncSender<T>, Receiver<T>)> {
sync_channel(self.inner.clone(), bound)
}
}