fibers_inotify 0.1.1

A futures friendly inotify wrapper for fibers crate
Documentation
use std::collections::VecDeque;
use std::ffi::{CStr, CString, OsString};
use std::fs::File;
use std::io::{self, Read};
use std::marker::PhantomData;
use std::mem;
use std::os::unix::ffi::OsStringExt;
use std::os::unix::io::{AsRawFd, FromRawFd, RawFd};
use std::path::{Path, PathBuf};
use fibers;
use fibers::io::poll::{EventedHandle, Interest, Register};
use fibers::sync::oneshot::Monitor;
use futures::{Async, Future, Poll, Stream};
use inotify_sys;
use libc;

use {Error, ErrorKind, EventMask, Result, WatchMask};
use mio_ext::OwnedEventedFd;

/// Event notified by [inotify].
///
/// [inotify]: http://man7.org/linux/man-pages/man7/inotify.7.html
#[derive(Debug, Clone)]
pub struct InotifyEvent {
    pub(crate) wd: WatchDecriptor,

    /// Mask describing event.
    pub mask: EventMask,

    /// Unique cookie associating related events.
    pub cookie: u32,

    /// The file/directory name within to the watched directory.
    ///
    /// This is present only when an event is returned for a file or directory
    /// inside a watched directory.
    pub name: Option<PathBuf>,
}

#[derive(Debug)]
pub struct Inotify {
    file: File,
    events: VecDeque<InotifyEvent>,
    read_monitor: ReadMonitor,
    _cannot_sync: PhantomData<*const ()>,
}
unsafe impl Send for Inotify {}
impl Inotify {
    pub fn new() -> Result<Self> {
        let flags = inotify_sys::IN_NONBLOCK;
        let fd = unsafe { inotify_sys::inotify_init1(flags) };
        if fd == -1 {
            Err(track!(Error::last_os_error()))
        } else {
            Ok(Inotify {
                file: unsafe { File::from_raw_fd(fd) },
                events: VecDeque::new(),
                read_monitor: track!(ReadMonitor::new(fd))?,
                _cannot_sync: PhantomData,
            })
        }
    }
    pub fn add_watch<P: AsRef<Path>>(
        &mut self,
        path: P,
        mask: WatchMask,
    ) -> Result<WatchDecriptor> {
        let wd = unsafe {
            let path = track!(
                CString::new(path.as_ref().to_path_buf().into_os_string().into_vec())
                    .map_err(Error::from)
            )?;
            inotify_sys::inotify_add_watch(self.file.as_raw_fd(), path.as_ptr(), mask.bits())
        };
        if wd == -1 {
            Err(track!(Error::last_os_error()))
        } else {
            Ok(WatchDecriptor(wd))
        }
    }
    pub fn remove_watch(&mut self, wd: WatchDecriptor) -> Result<()> {
        let result = unsafe { inotify_sys::inotify_rm_watch(self.file.as_raw_fd(), wd.0) };
        if result == -1 {
            Err(track!(Error::last_os_error()))
        } else {
            Ok(())
        }
    }

    fn read_event(&mut self) -> Result<Option<InotifyEvent>> {
        if let Some(event) = self.events.pop_front() {
            return Ok(Some(event));
        }

        let mut buf = [0; 4096];
        match self.file.read(&mut buf) {
            Err(e) => {
                if e.kind() == io::ErrorKind::WouldBlock {
                    Ok(None)
                } else {
                    Err(track!(Error::from(e)))
                }
            }
            Ok(read_size) => {
                let mut offset = 0;
                while offset < read_size {
                    let raw_event = unsafe {
                        &*((&buf[offset..]).as_ptr() as *const inotify_sys::inotify_event)
                    };
                    offset += mem::size_of::<inotify_sys::inotify_event>() + raw_event.len as usize;
                    track_assert!(offset <= read_size, ErrorKind::Other);

                    let name = if raw_event.len == 0 {
                        None
                    } else {
                        let start = offset - raw_event.len as usize;
                        let mut end = offset;
                        while end > 1 && buf[end - 1] == b'\0' && buf[end - 2] == b'\0' {
                            end -= 1;
                        }
                        let name = track!(
                            CStr::from_bytes_with_nul(&buf[start..end]).map_err(Error::from)
                        )?;
                        let name = PathBuf::from(OsString::from_vec(name.to_bytes().to_owned()));
                        Some(name)
                    };
                    let event = InotifyEvent {
                        wd: WatchDecriptor(raw_event.wd),
                        mask: EventMask::from_bits_truncate(raw_event.mask),
                        cookie: raw_event.cookie,
                        name,
                    };
                    self.events.push_back(event);
                }
                Ok(self.events.pop_front())
            }
        }
    }
}
impl Stream for Inotify {
    type Item = InotifyEvent;
    type Error = Error;
    fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
        track!(self.read_monitor.poll())?;
        if let Some(event) = track!(self.read_event())? {
            Ok(Async::Ready(Some(event)))
        } else {
            Ok(Async::NotReady)
        }
    }
}

#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord)]
pub struct WatchDecriptor(pub(crate) libc::c_int);

#[derive(Debug)]
struct ReadMonitor {
    register: Register<OwnedEventedFd>,
    handle: Option<EventedHandle<OwnedEventedFd>>,
    monitor: Option<Monitor<(), io::Error>>,
}
impl ReadMonitor {
    fn new(fd: RawFd) -> Result<Self> {
        let register = fibers::fiber::with_current_context(|mut context| {
            context.poller().register(OwnedEventedFd(fd))
        });
        Ok(ReadMonitor {
            register: track_assert_some!(register, ErrorKind::Other, "Not in a fiber context"),
            handle: None,
            monitor: None,
        })
    }
}
impl Future for ReadMonitor {
    type Item = ();
    type Error = Error;
    fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
        loop {
            if let Some(handle) = self.handle.as_ref() {
                let is_ready = track!(self.monitor.poll().map_err(Error::from))?.is_ready();
                if is_ready {
                    self.monitor = Some(handle.monitor(Interest::Read));
                    continue;
                }
                return Ok(Async::NotReady);
            }
            if let Async::Ready(handle) = track!(self.register.poll().map_err(Error::from))? {
                self.handle = Some(handle);
                continue;
            }
            return Ok(Async::NotReady);
        }
    }
}