1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
// Copyright 2015 Nathan Sizemore <nathanrsizemore@gmail.com>
//
// This Source Code Form is subject to the terms of the Mozilla Public License, v. 2.0.
// If a copy of the MPL was not distributed with this file,
// You can obtain one at http://mozilla.org/MPL/2.0/.


extern crate libc;
extern crate time;
extern crate simple_slab;
#[macro_use] extern crate bitflags;


use std::ops::Add;
use std::sync::Mutex;
use std::io::{self, Error};
use std::os::unix::io::{RawFd, AsRawFd};

use time::{Duration, PreciseTime};
use simple_slab::Slab;
pub use interest::Interest;

mod interest;


bitflags! {
    flags ControlOptions: i32 {
        const EPOLL_CTL_ADD = libc::EPOLL_CTL_ADD,
        const EPOLL_CTL_MOD = libc::EPOLL_CTL_MOD,
        const EPOLL_CTL_DEL = libc::EPOLL_CTL_DEL
    }
}

bitflags! {
    pub flags Events: u32 {
        /// Sets the Edge Triggered behavior for the associated file descriptor.
        ///
        /// The default behavior for epoll is Level Triggered.
        const EPOLLET      = libc::EPOLLET as u32,
        /// The associated file is available for read operations.
        const EPOLLIN      = libc::EPOLLIN as u32,
        /// Error condition happened on the associated file descriptor.
        ///
        /// `wait` will always wait for this event; is not necessary to set it in events.
        const EPOLLERR     = libc::EPOLLERR as u32,
        /// Hang up happened on the associated file descriptor.
        ///
        /// `wait` will always wait for this event; it is not necessary to set it in events.
        /// Note that when reading from a channel such as a pipe or a stream socket, this event
        /// merely indicates that the peer closed its end of the channel. Subsequent reads from
        /// the channel will return 0 (end of file) only after all outstanding data in the
        /// channel has been consumed.
        const EPOLLHUP     = libc::EPOLLHUP as u32,
        /// The associated file is available for write operations.
        const EPOLLOUT     = libc::EPOLLOUT as u32,
        /// There is urgent data available for read operations.
        const EPOLLPRI     = libc::EPOLLPRI as u32,
        /// Stream socket peer closed connection, or shut down writing half of connection.
        ///
        /// This flag is especially useful for writing simple code to detect peer shutdown when
        /// using Edge Triggered monitoring.
        const EPOLLRDHUP   = libc::EPOLLRDHUP as u32,
        /// If `EPOLLONESHOT` and `EPOLLET` are clear and the process has the `CAP_BLOCK_SUSPEND`
        /// capability, ensure that the system does not enter "suspend" or "hibernate" while this
        /// event is pending or being processed.
        ///
        /// The event is considered as being "processed" from the time when it is returned by
        /// a call to `wait` until the next call to `wait` on the same `EpollInstance`
        /// descriptor, the closure of that file descriptor, the removal of the event file
        /// descriptor with `EPOLL_CTL_DEL`, or the clearing of `EPOLLWAKEUP` for the event file
        /// descriptor with `EPOLL_CTL_MOD`.
        const EPOLLWAKEUP  = libc::EPOLLWAKEUP as u32,
        /// Sets the one-shot behavior for the associated file descriptor.
        ///
        /// This means that after an event is pulled out with `wait` the associated file
        /// descriptor is internally disabled and no other events will be reported by the epoll
        /// interface.  The user must call `ctl` with `EPOLL_CTL_MOD` to rearm the file
        /// descriptor with a new event mask.
        const EPOLLONESHOT = libc::EPOLLONESHOT as u32
    }
}


/// Thread safe abstraction around the returned `fd` from `libc::epoll_create(1)`
pub struct EpollInstance {
    fd: libc::c_int,
    interest_mutex: Mutex<Slab<Interest>>,
    events: u64,
    wait: Duration
}

impl EpollInstance {

    /// Creates a new `EpollInstance`.
    ///
    /// ## Notes
    /// * `FD_CLOEXEC` flag is set on the underlying fd returned.
    pub fn new() -> io::Result<EpollInstance> {
        let epfd = unsafe {
            let fd = try!(cvt(libc::epoll_create(1)));
            let mut flags = try!(cvt(libc::fcntl(fd, libc::F_GETFD)));
            flags |= libc::FD_CLOEXEC;
            try!(cvt(libc::fcntl(fd, libc::F_SETFD, flags)));
            fd
        };

        Ok(EpollInstance {
            fd: epfd,
            interest_mutex: Mutex::new(Slab::<Interest>::new()),
            events: 0,
            wait: Duration::zero()
        })
    }

    /// Register an initial `Interest` with this instance.
    ///
    /// ## Panics
    ///
    /// Panics if the interior Mutex has been poisoned.
    pub fn add_interest(&mut self, interest: Interest) -> io::Result<()> {
        let mut event_mask = libc::epoll_event {
            events: interest.events().bits() as u32,
            u64: interest.data()
        };
        try!(ctl(self.fd,
                 EPOLL_CTL_ADD.bits(),
                 interest.as_raw_fd(),
                 &mut event_mask as *mut libc::epoll_event));

        let mut list = self.interest_mutex.lock().unwrap();
        (*list).insert(interest);

        Ok(())
    }

    /// Modify the original `Interest`, identified by its `RawFd`, to the passed
    /// interest's events and data fields.
    ///
    /// ## Panics
    ///
    /// Panics if the interior Mutex has been poisoned.
    pub fn mod_interest(&mut self, interest: &Interest) -> io::Result<()> {
        let mut event_mask = libc::epoll_event {
            events: interest.events().bits() as u32,
            u64: interest.data()
        };
        try!(ctl(self.fd,
                 EPOLL_CTL_MOD.bits(),
                 interest.as_raw_fd(),
                 &mut event_mask as *mut libc::epoll_event));

        let mut list = self.interest_mutex.lock().unwrap();
        for ref mut _interest in (*list).iter_mut() {
            if _interest.as_raw_fd() == interest.as_raw_fd() {
                _interest.set_events(interest.events());
                _interest.set_data(interest.data());
                break;
            }
        }

        Ok(())
    }

    /// Remove the passed `Interest`, identified by its `RawFd`, from this instance.
    ///
    /// ## Panics
    ///
    /// Panics if the interior Mutex has been poisoned.
    pub fn del_interest(&mut self, interest: &Interest) -> io::Result<()> {
        // In kernel versions before 2.6.9, the EPOLL_CTL_DEL operation required a non-null
        // pointer in event, even though this argument is ignored.
        let mut event_mask = libc::epoll_event {
            events: 0u32,
            u64: 0u64
        };
        try!(ctl(self.fd,
                 EPOLL_CTL_DEL.bits(),
                 interest.as_raw_fd(),
                 &mut event_mask as *mut libc::epoll_event));

        let mut offset: usize = 0;
        let mut list = self.interest_mutex.lock().unwrap();
        for ref mut _interest in (*list).iter() {
            if _interest.as_raw_fd() == interest.as_raw_fd() {
                break;
            }
            offset += 1
        }
        (*list).remove(offset);

        Ok(())
    }

    /// Waits for events on this instance for at most `timeout` milliseconds and returns at most
    /// `max_returned` `Interests`.
    ///
    /// ## Notes
    ///
    /// * If `timeout` is negative, it will block until an event is received.
    /// * `max_returned` must be greater than zero.
    ///
    /// ## Panics
    ///
    /// Panics if the interior Mutex has been poisoned.
    pub fn wait(&mut self, timeout: i32, max_returned: usize) -> io::Result<Vec<Interest>> {
        let timeout = if timeout < -1 { -1 } else { timeout };
        let mut ret_buf = Vec::<Interest>::with_capacity(max_returned);
        let mut buf = Vec::<libc::epoll_event>::with_capacity(max_returned);

        let (start, end) = unsafe {
            let start = PreciseTime::now();
            let num_events = try!(cvt(libc::epoll_wait(self.fd,
                                                       buf.as_mut_ptr(),
                                                       max_returned as i32,
                                                       timeout))) as usize;
            let end = PreciseTime::now();
            buf.set_len(num_events);
            (start, end)
        };

        self.events += buf.len() as u64;
        self.wait.add(start.to(end));

        let mut list = self.interest_mutex.lock().unwrap();
        for ref event in buf.iter() {
            for ref mut interest in (*list).iter_mut() {
                if interest.data() == event.u64 {
                    (*interest.events_mut()) = Events::from_bits(event.events).unwrap();
                    ret_buf.push(interest.clone());
                    break;
                }
            }
        }

        Ok((ret_buf))
    }

    /// Returns the total number of events reported.
    pub fn events(&self) -> u64 {
        self.events
    }

    /// Returns the total time spent in a blocked, waiting state.
    pub fn wait_time(&self) -> std::time::Duration {
        self.wait.to_std().unwrap()
    }
}

impl AsRawFd for EpollInstance {
    fn as_raw_fd(&self) -> RawFd { self.fd }
}

unsafe impl Send for EpollInstance {}
unsafe impl Sync for EpollInstance {}



fn ctl(epfd: libc::c_int,
       op: libc::c_int,
       fd: libc::c_int,
       event: *mut libc::epoll_event)
       -> io::Result<()>
{
    unsafe { try!(cvt(libc::epoll_ctl(epfd, op, fd, event))) };
    Ok(())
}

fn cvt(result: libc::c_int) -> io::Result<libc::c_int> {
    if result < 0 {
        Err(Error::last_os_error())
    } else {
        Ok(result)
    }
}