#![cfg_attr(feature="mio_07", doc="```")]
#![cfg_attr(not(feature="mio_07"), doc="```compile_fail")]
#![allow(clippy::needless_return, clippy::redundant_closure, clippy::needless_lifetimes)] #![allow(clippy::range_plus_one)] #![allow(clippy::cast_lossless)]
use std::{io, mem, ptr};
use std::ffi::CStr;
use std::io::ErrorKind;
use std::fmt::{self, Debug, Formatter};
#[cfg(any(
target_os="linux", target_os="freebsd",
target_os="netbsd", target_os="dragonfly",
))]
use std::os::unix::io::{AsRawFd, RawFd};
#[cfg(any(target_os="linux", target_os="netbsd", target_os="dragonfly"))]
use std::os::unix::io::{FromRawFd, IntoRawFd};
use std::time::{Duration, SystemTime};
extern crate libc;
use libc::{c_int, c_uint, c_char};
#[cfg(not(all(target_arch="x86_64", target_os="linux", target_pointer_width="32")))]
use libc::c_long;
use libc::{mqd_t, mq_open, mq_close, mq_unlink, mq_send, mq_receive};
use libc::{mq_attr, mq_getattr, mq_setattr};
use libc::{timespec, time_t, mq_timedsend, mq_timedreceive};
#[cfg(target_os="freebsd")]
use libc::mq_getfd_np;
use libc::{mode_t, O_ACCMODE, O_RDONLY, O_WRONLY, O_RDWR, O_CREAT, O_EXCL, O_NONBLOCK};
#[cfg(any(
target_os="linux", target_os="freebsd",
target_os="netbsd", target_os="dragonfly",
))]
use libc::{fcntl, F_GETFD, FD_CLOEXEC, ioctl, FIOCLEX, FIONCLEX};
#[cfg(any(target_os="linux", target_os="netbsd", target_os="dragonfly"))]
use libc::F_DUPFD_CLOEXEC;
#[cfg(feature="mio_06")]
extern crate mio_06;
#[cfg(feature="mio_06")]
use mio_06::{event::Evented, unix::EventedFd, Ready, Poll, PollOpt};
#[cfg(feature="mio_07")]
extern crate mio_07;
#[cfg(feature="mio_07")]
use mio_07::{event::Source, unix::SourceFd, Registry, Interest};
const CSTR_BUF_SIZE: usize = 48;
fn with_name_as_cstr<F: FnOnce(&CStr)->Result<R,io::Error>, R>(mut name: &[u8], f: F)
-> Result<R,io::Error> {
if name.first() == Some(&b'/') {
name = &name[1..];
}
let mut longbuf: Box<[u8]>;
let mut shortbuf: [u8; CSTR_BUF_SIZE];
let c_bytes = if name.len() + 2 <= CSTR_BUF_SIZE {
shortbuf = [0; CSTR_BUF_SIZE];
&mut shortbuf[..name.len()+2]
} else {
longbuf = vec![0; name.len()+2].into_boxed_slice();
&mut longbuf
};
c_bytes[0] = b'/';
c_bytes[1..name.len()+1].copy_from_slice(name);
match CStr::from_bytes_with_nul(c_bytes) {
Ok(name) => f(name),
Err(_) => Err(io::Error::new(ErrorKind::InvalidInput, "contains nul byte"))
}
}
#[derive(Clone,Copy, PartialEq,Eq)]
pub struct OpenOptions {
flags: c_int,
mode: mode_t,
capacity: usize,
max_msg_len: usize,
}
impl Debug for OpenOptions {
fn fmt(&self, fmtr: &mut Formatter) -> fmt::Result {
fmtr.debug_struct("OpenOptions")
.field(
"read",
&((self.flags & O_ACCMODE) == O_RDWR || (self.flags & O_ACCMODE) == O_RDONLY)
)
.field(
"write",
&((self.flags & O_ACCMODE) == O_RDWR || (self.flags & O_ACCMODE) == O_WRONLY)
)
.field("create", &(self.flags & O_CREAT != 0))
.field("open", &(self.flags & O_EXCL == 0))
.field("mode", &format_args!("{:03o}", self.mode))
.field("capacity", &self.capacity)
.field("max_msg_len", &self.max_msg_len)
.field("nonblocking", &((self.flags & O_NONBLOCK) != 0))
.finish()
}
}
impl OpenOptions {
fn new(flags: c_int) -> Self {
OpenOptions {
flags,
mode: 0o600,
capacity: 0,
max_msg_len: 0,
}
}
pub fn readonly() -> Self {
OpenOptions::new(O_RDONLY)
}
pub fn writeonly() -> Self {
OpenOptions::new(O_WRONLY)
}
pub fn readwrite() -> Self {
OpenOptions::new(O_RDWR)
}
pub fn mode(&mut self, mode: u32) -> &mut Self {
self.mode = mode as mode_t;
return self;
}
pub fn max_msg_len(&mut self, max_msg_len: usize) -> &mut Self {
self.max_msg_len = max_msg_len;
return self;
}
pub fn capacity(&mut self, capacity: usize) -> &mut Self {
self.capacity = capacity;
return self;
}
pub fn create(&mut self) -> &mut Self {
self.flags |= O_CREAT;
self.flags &= !O_EXCL;
return self;
}
pub fn create_new(&mut self) -> &mut Self {
self.flags |= O_CREAT | O_EXCL;
return self;
}
pub fn existing(&mut self) -> &mut Self {
self.flags &= !(O_CREAT | O_EXCL);
return self;
}
pub fn nonblocking(&mut self) -> &mut Self {
self.flags |= O_NONBLOCK;
return self;
}
pub fn open<N: AsRef<[u8]> + ?Sized>(&self, name: &N) -> Result<PosixMq, io::Error> {
pub fn open_slice(opts: &OpenOptions, name: &[u8]) -> Result<PosixMq, io::Error> {
with_name_as_cstr(name, |name| opts.open_c(&name) )
}
open_slice(self, name.as_ref())
}
pub fn open_c(&self, name: &CStr) -> Result<PosixMq, io::Error> {
let opts = self;
let permissions = opts.mode as c_int;
let mut capacities = unsafe { mem::zeroed::<mq_attr>() };
let capacities_ptr = if opts.capacity != 0 || opts.max_msg_len != 0 {
capacities.mq_maxmsg = opts.capacity as KernelLong;
capacities.mq_msgsize = opts.max_msg_len as KernelLong;
&mut capacities as *mut mq_attr
} else {
ptr::null_mut::<mq_attr>()
};
let mqd = unsafe { mq_open(name.as_ptr(), opts.flags, permissions, capacities_ptr) };
if mqd == -1isize as mqd_t {
return Err(io::Error::last_os_error());
}
let mq = PosixMq{mqd};
#[cfg(any(target_os="netbsd", target_os="dragonfly"))]
mq.set_cloexec(true)?;
Ok(mq)
}
}
pub fn remove_queue<N: AsRef<[u8]> + ?Sized>(name: &N) -> Result<(), io::Error> {
fn remove_queue_slice(name: &[u8]) -> Result<(), io::Error> {
with_name_as_cstr(name, |name| remove_queue_c(&name) )
}
remove_queue_slice(name.as_ref())
}
pub fn remove_queue_c(name: &CStr) -> Result<(), io::Error> {
let name = name.as_ptr();
let ret = unsafe { mq_unlink(name) };
if ret != 0 {
return Err(io::Error::last_os_error());
}
Ok(())
}
#[cfg(all(target_arch="x86_64", target_os="linux", target_pointer_width="32"))]
type KernelLong = i64;
#[cfg(not(all(target_arch="x86_64", target_os="linux", target_pointer_width="32")))]
type KernelLong = c_long;
#[derive(Clone,Copy, PartialEq,Eq, Default)]
pub struct Attributes {
pub max_msg_len: usize,
pub capacity: usize,
pub current_messages: usize,
pub nonblocking: bool,
_private: ()
}
impl Debug for Attributes {
fn fmt(&self, fmtr: &mut fmt::Formatter) -> fmt::Result {
fmtr.debug_struct("Attributes")
.field("max_msg_len", &self.max_msg_len)
.field("capacity", &self.capacity)
.field("current_messages", &self.current_messages)
.field("nonblocking", &self.nonblocking)
.finish()
}
}
macro_rules! retry_if_interrupted {($call:expr) => {{
loop { let ret = $call;
if ret != -1 {
break ret;
}
let err = io::Error::last_os_error();
if err.kind() != ErrorKind::Interrupted {
return Err(err)
}
}
}}}
fn deadline_to_realtime(deadline: SystemTime) -> Result<timespec, timespec> {
fn new_timespec(secs: time_t, nsecs: KernelLong) -> timespec {
let mut ts: timespec = unsafe { mem::zeroed() };
ts.tv_sec = secs;
ts.tv_nsec = nsecs;
return ts;
}
match deadline.duration_since(SystemTime::UNIX_EPOCH) {
Ok(expires) if expires.as_secs() > time_t::max_value() as u64
=> Err(new_timespec(time_t::max_value(), 0)),
Ok(expires)
=> Ok(new_timespec(expires.as_secs() as time_t, expires.subsec_nanos() as KernelLong)),
Err(ref earlier) if earlier.duration() > Duration::new(time_t::max_value() as u64 + 1, 0)
=> Err(new_timespec(time_t::min_value()+1, 0)), Err(ref earlier) if earlier.duration().subsec_nanos() == 0
=> Ok(new_timespec(-(earlier.duration().as_secs() as time_t), 0)),
Err(earlier) => {
let before = earlier.duration();
let secs = -(before.as_secs() as time_t) - 1;
let nsecs = 1_000_000_000 - before.subsec_nanos() as KernelLong;
Ok(new_timespec(secs, nsecs))
}
}
}
fn timeout_to_realtime(timeout: Duration) -> Result<timespec, io::Error> {
if let Ok(now) = deadline_to_realtime(SystemTime::now()) {
let mut expires = now;
expires.tv_sec = expires.tv_sec.wrapping_add(timeout.as_secs() as time_t);
expires.tv_nsec += timeout.subsec_nanos() as KernelLong;
const NANO: KernelLong = 1_000_000_000;
expires.tv_sec = expires.tv_sec.wrapping_add(expires.tv_nsec / NANO);
expires.tv_nsec %= NANO;
if timeout.as_secs() > time_t::max_value() as u64 || expires.tv_sec < now.tv_sec {
Err(io::Error::new(ErrorKind::InvalidInput, "timeout is too long"))
} else {
Ok(expires)
}
} else {
Err(io::Error::new(ErrorKind::Other, "system time is not representable"))
}
}
pub struct PosixMq {
mqd: mqd_t
}
impl PosixMq {
pub fn open<N: AsRef<[u8]> + ?Sized>(name: &N) -> Result<Self, io::Error> {
OpenOptions::readwrite().open(name)
}
pub fn create<N: AsRef<[u8]> + ?Sized>(name: &N) -> Result<Self, io::Error> {
OpenOptions::readwrite().create().open(name)
}
pub fn send(&self, priority: u32, msg: &[u8]) -> Result<(), io::Error> {
let mptr = msg.as_ptr() as *const c_char;
retry_if_interrupted!(unsafe { mq_send(self.mqd, mptr, msg.len(), priority as c_uint) });
Ok(())
}
pub fn recv(&self, msgbuf: &mut [u8]) -> Result<(u32, usize), io::Error> {
let bptr = msgbuf.as_mut_ptr() as *mut c_char;
let mut priority = 0 as c_uint;
let len = retry_if_interrupted!(
unsafe { mq_receive(self.mqd, bptr, msgbuf.len(), &mut priority) }
);
Ok((priority as u32, len as usize))
}
pub fn iter<'a>(&'a self) -> Iter<'a> {
self.into_iter()
}
fn timedsend(&self, priority: u32, msg: &[u8], deadline: ×pec)
-> Result<(), io::Error> {
let mptr = msg.as_ptr() as *const c_char;
retry_if_interrupted!(unsafe {
mq_timedsend(self.mqd, mptr, msg.len(), priority as c_uint, deadline)
});
Ok(())
}
pub fn send_timeout(&self, priority: u32, msg: &[u8], timeout: Duration)
-> Result<(), io::Error> {
timeout_to_realtime(timeout).and_then(|expires| self.timedsend(priority, msg, &expires) )
}
pub fn send_deadline(&self, priority: u32, msg: &[u8], deadline: SystemTime)
-> Result<(), io::Error> {
match deadline_to_realtime(deadline) {
Ok(expires) => self.timedsend(priority, msg, &expires),
Err(_) => Err(io::Error::new(ErrorKind::InvalidInput, "deadline is not representable"))
}
}
fn timedreceive(&self, msgbuf: &mut[u8], deadline: ×pec)
-> Result<(u32, usize), io::Error> {
let bptr = msgbuf.as_mut_ptr() as *mut c_char;
let mut priority: c_uint = 0;
let len = retry_if_interrupted!(
unsafe { mq_timedreceive(self.mqd, bptr, msgbuf.len(), &mut priority, deadline) }
);
Ok((priority as u32, len as usize))
}
pub fn recv_timeout(&self, msgbuf: &mut[u8], timeout: Duration)
-> Result<(u32, usize), io::Error> {
timeout_to_realtime(timeout).and_then(|expires| self.timedreceive(msgbuf, &expires) )
}
pub fn recv_deadline(&self, msgbuf: &mut[u8], deadline: SystemTime)
-> Result<(u32, usize), io::Error> {
match deadline_to_realtime(deadline) {
Ok(expires) => self.timedreceive(msgbuf, &expires),
Err(_) => Err(io::Error::new(ErrorKind::InvalidInput, "deadline is not representable"))
}
}
#[cfg_attr(
any(target_os="linux", target_os="android", target_os="netbsd", target_os="dragonfly"),
doc="```"
)]
#[cfg_attr(
not(any(target_os="linux", target_os="android", target_os="netbsd", target_os="dragonfly")),
doc="```no_compile"
)]
pub fn attributes(&self) -> Result<Attributes, io::Error> {
let mut attrs: mq_attr = unsafe { mem::zeroed() };
if unsafe { mq_getattr(self.mqd, &mut attrs) } == -1 {
Err(io::Error::last_os_error())
} else {
Ok(Attributes {
max_msg_len: attrs.mq_msgsize as usize,
capacity: attrs.mq_maxmsg as usize,
current_messages: attrs.mq_curmsgs as usize,
nonblocking: (attrs.mq_flags & (O_NONBLOCK as KernelLong)) != 0,
_private: ()
})
}
}
pub fn is_nonblocking(&self) -> Result<bool, io::Error> {
match self.attributes() {
Ok(attrs) => Ok(attrs.nonblocking),
Err(e) => Err(e),
}
}
pub fn set_nonblocking(&self, nonblocking: bool) -> Result<(), io::Error> {
let mut attrs: mq_attr = unsafe { mem::zeroed() };
attrs.mq_flags = if nonblocking {O_NONBLOCK as KernelLong} else {0};
let res = unsafe { mq_setattr(self.mqd, &attrs, ptr::null_mut()) };
if res == -1 {
return Err(io::Error::last_os_error());
}
Ok(())
}
#[cfg(any(target_os="linux", target_os="dragonfly", target_os="netbsd"))]
pub fn try_clone(&self) -> Result<Self, io::Error> {
let mq = match unsafe { fcntl(self.mqd, F_DUPFD_CLOEXEC, 0) } {
-1 => return Err(io::Error::last_os_error()),
fd => PosixMq{mqd: fd},
};
#[cfg(target_os="netbsd")]
mq.set_cloexec(true)?;
Ok(mq)
}
pub fn is_cloexec(&self) -> Result<bool, io::Error> {
#[cfg(any(
target_os="linux", target_os="freebsd",
target_os="netbsd", target_os="dragonfly",
))]
match unsafe { fcntl(self.as_raw_fd(), F_GETFD) } {
-1 => Err(io::Error::last_os_error()),
flags => Ok((flags & FD_CLOEXEC) != 0),
}
#[cfg(not(any(
target_os="linux", target_os="freebsd",
target_os="netbsd", target_os="dragonfly",
)))]
Err(io::Error::new(
ErrorKind::Other,
"close-on-exec information is not available"
))
}
#[cfg(any(
target_os="linux", target_os="freebsd",
target_os="netbsd", target_os="dragonfly",
))]
pub fn set_cloexec(&self, cloexec: bool) -> Result<(), io::Error> {
let op = if cloexec {FIOCLEX} else {FIONCLEX};
match unsafe { ioctl(self.as_raw_fd(), op) } {
-1 => Err(io::Error::last_os_error()),
_ => Ok(())
}
}
pub unsafe fn from_raw_mqd(mqd: mqd_t) -> Self {
PosixMq{mqd}
}
pub fn as_raw_mqd(&self) -> mqd_t {
self.mqd
}
pub fn into_raw_mqd(self) -> mqd_t {
let mqd = self.mqd;
mem::forget(self);
return mqd;
}
}
#[cfg(any(
target_os="linux", target_os="freebsd",
target_os="netbsd", target_os="dragonfly",
))]
impl AsRawFd for PosixMq {
#[cfg(not(target_os="freebsd"))]
fn as_raw_fd(&self) -> RawFd {
self.mqd
}
#[cfg(target_os="freebsd")]
fn as_raw_fd(&self) -> RawFd {
unsafe { mq_getfd_np(self.mqd) }
}
}
#[cfg(any(target_os="linux", target_os="netbsd", target_os="dragonfly"))]
impl FromRawFd for PosixMq {
unsafe fn from_raw_fd(fd: RawFd) -> Self {
PosixMq{mqd: fd}
}
}
#[cfg(any(target_os="linux", target_os="netbsd", target_os="dragonfly"))]
impl IntoRawFd for PosixMq {
fn into_raw_fd(self) -> RawFd {
let fd = self.mqd;
mem::forget(self);
return fd;
}
}
impl IntoIterator for PosixMq {
type Item = (u32, Vec<u8>);
type IntoIter = IntoIter;
fn into_iter(self) -> IntoIter {
IntoIter {
max_msg_len: match self.attributes() {
Ok(attrs) => attrs.max_msg_len,
Err(_) => 0,
},
mq: self,
}
}
}
impl<'a> IntoIterator for &'a PosixMq {
type Item = (u32, Vec<u8>);
type IntoIter = Iter<'a>;
fn into_iter(self) -> Iter<'a> {
Iter {
max_msg_len: match self.attributes() {
Ok(attrs) => attrs.max_msg_len,
Err(_) => 0,
},
mq: self,
}
}
}
impl Debug for PosixMq {
fn fmt(&self, fmtr: &mut Formatter) -> fmt::Result {
let mut representation = fmtr.debug_struct("PosixMq");
#[cfg(not(any(
target_os="linux", target_os="netbsd", target_os="dragonfly",
)))]
representation.field("mqd", &self.mqd);
#[cfg(any(
target_os="linux", target_os="freebsd",
target_os="netbsd", target_os="dragonfly",
))]
representation.field("fd", &self.as_raw_fd());
return representation.finish();
}
}
impl Drop for PosixMq {
fn drop(&mut self) {
unsafe { mq_close(self.mqd) };
}
}
unsafe impl Send for PosixMq {}
#[cfg(any(target_os="freebsd", target_os="illumos", target_os="solaris"))]
unsafe impl Sync for PosixMq {}
#[cfg(feature="mio_06")]
impl Evented for PosixMq {
fn register(&self, poll: &Poll, token: mio_06::Token, interest: Ready, opts: PollOpt)
-> Result<(), io::Error> {
EventedFd(&self.as_raw_fd()).register(poll, token, interest, opts)
}
fn reregister(&self, poll: &Poll, token: mio_06::Token, interest: Ready, opts: PollOpt)
-> Result<(), io::Error> {
EventedFd(&self.as_raw_fd()).reregister(poll, token, interest, opts)
}
fn deregister(&self, poll: &Poll) -> Result<(), io::Error> {
EventedFd(&self.as_raw_fd()).deregister(poll)
}
}
#[cfg(feature="mio_07")]
impl Source for &PosixMq {
fn register(&mut self, registry: &Registry, token: mio_07::Token, interest: Interest)
-> Result<(), io::Error> {
SourceFd(&self.as_raw_fd()).register(registry, token, interest)
}
fn reregister(&mut self, registry: &Registry, token: mio_07::Token, interest: Interest)
-> Result<(), io::Error> {
SourceFd(&self.as_raw_fd()).reregister(registry, token, interest)
}
fn deregister(&mut self, registry: &Registry) -> Result<(), io::Error> {
SourceFd(&self.as_raw_fd()).deregister(registry)
}
}
#[cfg(feature="mio_07")]
impl Source for PosixMq {
fn register(&mut self, registry: &Registry, token: mio_07::Token, interest: Interest)
-> Result<(), io::Error> {
{&mut &*self}.register(registry, token, interest)
}
fn reregister(&mut self, registry: &Registry, token: mio_07::Token, interest: Interest)
-> Result<(), io::Error> {
{&mut &*self}.reregister(registry, token, interest)
}
fn deregister(&mut self, registry: &Registry) -> Result<(), io::Error> {
{&mut &*self}.deregister(registry)
}
}
#[derive(Clone)]
pub struct Iter<'a> {
mq: &'a PosixMq,
max_msg_len: usize,
}
impl<'a> Iterator for Iter<'a> {
type Item = (u32, Vec<u8>);
fn next(&mut self) -> Option<(u32, Vec<u8>)> {
let mut buf = vec![0; self.max_msg_len];
match self.mq.recv(&mut buf) {
Err(ref e) if e.kind() == ErrorKind::WouldBlock => None,
Err(e) => panic!("Cannot receive from posix message queue: {}", e),
Ok((priority, len)) => {
buf.truncate(len);
Some((priority, buf))
}
}
}
}
pub struct IntoIter {
mq: PosixMq,
max_msg_len: usize,
}
impl Iterator for IntoIter {
type Item = (u32, Vec<u8>);
fn next(&mut self) -> Option<(u32, Vec<u8>)> {
Iter{mq: &self.mq, max_msg_len: self.max_msg_len}.next()
}
}
#[cfg(debug_assertions)]
mod doctest_md_files {
macro_rules! mdfile {($content:expr, $(#[$meta:meta])* $attach_to:ident) => {
#[doc=$content]
#[allow(unused)]
$(#[$meta])* enum $attach_to {}
}}
mdfile!{include_str!("README.md"), Readme}
}