#![cfg_attr(feature="mio", doc="```")]
#![cfg_attr(not(feature="mio"), doc="```no_compile")]
use std::{io, mem, ptr};
use std::borrow::Cow;
use std::ffi::{CStr, CString};
use std::io::ErrorKind;
use std::fmt::{self, Debug, Formatter};
#[cfg(not(target_os="dragonflybsd"))]
use std::os::unix::io::{AsRawFd, RawFd};
#[cfg(not(any(target_os="freebsd", target_os="dragonflybsd")))]
use std::os::unix::io::{FromRawFd, IntoRawFd};
extern crate libc;
use libc::{c_int, c_uint, c_long, mode_t};
use libc::{mqd_t, mq_open, mq_send, mq_receive, mq_close, mq_unlink};
use libc::{mq_attr, mq_getattr, mq_setattr};
use libc::{O_ACCMODE, O_RDONLY, O_WRONLY, O_RDWR};
use libc::{O_CREAT, O_EXCL, O_NONBLOCK, O_CLOEXEC};
use libc::{fcntl, F_GETFD, F_SETFD, FD_CLOEXEC};
#[cfg(target_os="freebsd")]
extern "C" {
fn mq_getfd_np(mq: mqd_t) -> c_int;
}
#[cfg(feature="mio")]
extern crate mio;
#[cfg(feature="mio")]
use mio::event::Evented;
#[cfg(feature="mio")]
use mio::unix::EventedFd;
#[cfg(feature="mio")]
use mio::{Ready, Poll, PollOpt, Token};
pub fn name_from_bytes<N: AsRef<[u8]> + ?Sized>(name: &N) -> Cow<CStr> {
let name = name.as_ref();
if name.len() > 0 && name[0] == b'/' && name[name.len()-1] == b'\0' {
if let Ok(borrowed) = CStr::from_bytes_with_nul(name) {
return Cow::Borrowed(borrowed);
}
} else {
let mut owned = Vec::with_capacity(name.len()+2);
if name.first() != Some(&b'/') {
owned.push(b'/');
}
owned.extend_from_slice(name);
if name.last() == Some(&b'\0') {
owned.pop();
}
if let Ok(owned) = CString::new(owned) {
return Cow::Owned(owned);
}
}
panic!("Queue name contains interior '\0' bytes");
}
fn name_to_cstring(name: &[u8]) -> Result<CString, io::Error> {
let mut buf = Vec::with_capacity(name.len()+2);
if name.first() != Some(&b'/') {
buf.push(b'/');
}
buf.extend_from_slice(name);
CString::new(buf).map_err(|err| io::Error::from(err) )
}
#[derive(Clone,Copy, PartialEq,Eq)]
pub struct OpenOptions {
mode: c_int,
permissions: mode_t,
capacity: usize,
max_msg_len: usize,
}
impl Debug for OpenOptions {
fn fmt(&self, fmtr: &mut Formatter) -> fmt::Result {
fmtr.debug_struct("OpenOptions")
.field("read", &((self.mode & O_ACCMODE) == O_RDWR || (self.mode & O_ACCMODE) == O_RDONLY))
.field("write", &((self.mode & O_ACCMODE) == O_RDWR || (self.mode & O_ACCMODE) == O_WRONLY))
.field("create", &(self.mode & O_CREAT != 0))
.field("open", &(self.mode & O_EXCL == 0))
.field("permissions", &format_args!("{:03o}", self.permissions))
.field("capacity", &self.capacity)
.field("max_msg_len", &self.max_msg_len)
.field("nonblocking", &((self.mode & O_NONBLOCK) != 0))
.field("cloexec", &((self.mode & O_CLOEXEC) != 0))
.finish()
}
}
impl OpenOptions {
fn new(mode: c_int) -> Self {
OpenOptions {
mode: O_CLOEXEC | mode,
permissions: 0o700,
capacity: 0,
max_msg_len: 0,
}
}
pub fn readonly() -> Self {
OpenOptions::new(O_RDONLY)
}
pub fn writeonly() -> Self {
OpenOptions::new(O_WRONLY)
}
pub fn readwrite() -> Self {
OpenOptions::new(O_RDWR)
}
pub fn permissions(&mut self, permissions: u16) -> &mut Self {
self.permissions = permissions as mode_t;
return self;
}
pub fn max_msg_len(&mut self, max_msg_len: usize) -> &mut Self {
self.max_msg_len = max_msg_len;
return self;
}
pub fn capacity(&mut self, capacity: usize) -> &mut Self {
self.capacity = capacity;
return self;
}
pub fn create(&mut self) -> &mut Self {
self.mode |= O_CREAT;
self.mode &= !O_EXCL;
return self;
}
pub fn create_new(&mut self) -> &mut Self {
self.mode |= O_CREAT | O_EXCL;
return self;
}
pub fn existing(&mut self) -> &mut Self {
self.mode &= !(O_CREAT | O_EXCL);
return self;
}
pub fn nonblocking(&mut self) -> &mut Self {
self.mode |= O_NONBLOCK;
return self;
}
pub fn not_cloexec(&mut self) -> &mut Self {
self.mode &= !O_CLOEXEC;
return self;
}
pub fn open<N: AsRef<[u8]> + ?Sized>(&self, name: &N) -> Result<PosixMq, io::Error> {
name_to_cstring(name.as_ref()).and_then(|name| self.open_c(&name) )
}
pub fn open_c(&self, name: &CStr) -> Result<PosixMq, io::Error> {
PosixMq::new_c(name, self)
}
}
pub fn unlink<N: AsRef<[u8]> + ?Sized>(name: &N) -> Result<(), io::Error> {
name_to_cstring(name.as_ref()).and_then(|name| unlink_c(&name) )
}
pub fn unlink_c(name: &CStr) -> Result<(), io::Error> {
let name = name.as_ptr();
let ret = unsafe { mq_unlink(name) };
if ret != 0 {
return Err(io::Error::last_os_error());
}
Ok(())
}
#[cfg(target="x86_64-unknown-linux-gnux32")]
type AttrField = i64;
#[cfg(not(target="x86_64-unknown-linux-gnux32"))]
type AttrField = c_long;
pub struct PosixMq {
mqd: mqd_t
}
impl PosixMq {
fn new_c(name: &CStr, opts: &OpenOptions) -> Result<Self, io::Error> {
let permissions = opts.permissions as c_int;
let mut capacities = unsafe { mem::zeroed::<mq_attr>() };
let mut capacities_ptr = ptr::null_mut::<mq_attr>();
if opts.capacity != 0 || opts.max_msg_len != 0 {
capacities.mq_maxmsg = opts.capacity as AttrField;
capacities.mq_msgsize = opts.max_msg_len as AttrField;
capacities_ptr = &mut capacities as *mut mq_attr;
}
let mqd = unsafe { mq_open(name.as_ptr(), opts.mode, permissions, capacities_ptr) };
if mqd == -1isize as mqd_t {
return Err(io::Error::last_os_error());
}
let mq = PosixMq{mqd};
#[cfg(not(target_os="dragonflybsd"))]
let _ = unsafe { mq.set_cloexec(opts.mode & O_CLOEXEC != 0) };
Ok(mq)
}
pub fn open<N: AsRef<[u8]> + ?Sized>(name: &N) -> Result<Self, io::Error> {
OpenOptions::readonly().open(name)
}
pub fn create<N: AsRef<[u8]> + ?Sized>(name: &N) -> Result<Self, io::Error> {
OpenOptions::readwrite().create().open(name)
}
pub fn send(&self, priority: u32, msg: &[u8]) -> Result<(), io::Error> {
let bptr = msg.as_ptr() as *const i8;
loop { let ret = unsafe { mq_send(self.mqd, bptr, msg.len(), priority as c_uint) };
if ret == 0 {
return Ok(());
}
let err = io::Error::last_os_error();
if err.kind() != ErrorKind::Interrupted {
return Err(err)
}
}
}
pub fn receive(&self, msgbuf: &mut [u8]) -> Result<(u32, usize), io::Error> {
let bptr = msgbuf.as_mut_ptr() as *mut i8;
let mut priority = 0 as c_uint;
loop { let len = unsafe { mq_receive(self.mqd, bptr, msgbuf.len(), &mut priority) };
if len >= 0 {
return Ok((priority as u32, len as usize));
}
let err = io::Error::last_os_error();
if err.kind() != ErrorKind::Interrupted {
return Err(err)
}
}
}
pub fn attributes(&self) -> Attributes {
let mut attrs: mq_attr = unsafe { mem::zeroed() };
let ret = unsafe { mq_getattr(self.mqd, &mut attrs) };
if ret == -1 {
Attributes { max_msg_len: 0, capacity: 0, current_messages: 0, nonblocking: true }
} else {
Attributes {
max_msg_len: attrs.mq_msgsize as usize,
capacity: attrs.mq_maxmsg as usize,
current_messages: attrs.mq_curmsgs as usize,
nonblocking: (attrs.mq_flags & (O_NONBLOCK as AttrField)) != 0,
}
}
}
pub fn is_nonblocking(&self) -> bool {
self.attributes().nonblocking
}
pub fn set_nonblocking(&self, nonblocking: bool) -> Result<(), io::Error> {
let mut attrs: mq_attr = unsafe { mem::zeroed() };
attrs.mq_flags = if nonblocking {O_NONBLOCK as AttrField} else {0};
let res = unsafe { mq_setattr(self.mqd, &attrs, ptr::null_mut()) };
if res == -1 {
return Err(io::Error::last_os_error());
}
Ok(())
}
#[cfg(not(target_os="dragonflybsd"))]
pub fn is_cloexec(&self) -> bool {
let flags = unsafe { fcntl(self.as_raw_fd(), F_GETFD) };
if flags == -1 {
true
} else {
(flags & FD_CLOEXEC) != 0
}
}
#[cfg(not(target_os="dragonflybsd"))]
pub unsafe fn set_cloexec(&self, cloexec: bool) -> Result<(), io::Error> {
let prev = fcntl(self.as_raw_fd(), F_GETFD);
if prev == -1 {
return Err(io::Error::last_os_error());
}
let new = if cloexec {
prev | FD_CLOEXEC
} else {
prev & !FD_CLOEXEC
};
if new != prev {
let ret = fcntl(self.as_raw_fd(), F_SETFD, new);
if ret == -1 {
return Err(io::Error::last_os_error());
}
}
Ok(())
}
}
#[cfg(not(target_os="dragonflybsd"))]
impl AsRawFd for PosixMq {
#[cfg(not(any(target_os="freebsd", target_os="dragonflybsd")))]
fn as_raw_fd(&self) -> RawFd {
self.mqd as RawFd
}
#[cfg(target_os="freebsd")]
fn as_raw_fd(&self) -> RawFd {
unsafe { mq_getfd_np(self.mqd) as RawFd }
}
}
#[cfg(not(any(target_os="freebsd", target_os="dragonflybsd")))]
impl FromRawFd for PosixMq {
unsafe fn from_raw_fd(fd: RawFd) -> Self {
PosixMq { mqd: fd as mqd_t }
}
}
#[cfg(not(any(target_os="freebsd", target_os="dragonflybsd")))]
impl IntoRawFd for PosixMq {
fn into_raw_fd(self) -> RawFd {
let fd = self.mqd;
mem::forget(self);
return fd;
}
}
impl Debug for PosixMq {
#[cfg(any(target_os="linux", target_os="freebsd"))]
fn fmt(&self, fmtr: &mut Formatter) -> fmt::Result {
write!(fmtr, "PosixMq{{ fd: {} }}", self.as_raw_fd())
}
#[cfg(not(any(target_os="linux", target_os="freebsd")))]
fn fmt(&self, fmtr: &mut Formatter) -> fmt::Result {
write!(fmtr, "PosixMq{{}}")
}
}
impl Drop for PosixMq {
fn drop(&mut self) {
unsafe { mq_close(self.mqd) };
}
}
unsafe impl Send for PosixMq {}
#[cfg(feature="mio")]
impl Evented for PosixMq {
fn register(&self, poll: &Poll, token: Token, interest: Ready, opts: PollOpt)
-> Result<(), io::Error> {
EventedFd(&self.as_raw_fd()).register(poll, token, interest, opts)
}
fn reregister(&self, poll: &Poll, token: Token, interest: Ready, opts: PollOpt)
-> Result<(), io::Error> {
EventedFd(&self.as_raw_fd()).reregister(poll, token, interest, opts)
}
fn deregister(&self, poll: &Poll) -> Result<(), io::Error> {
EventedFd(&self.as_raw_fd()).deregister(poll)
}
}
#[derive(Clone,Copy, PartialEq,Eq, Debug)]
pub struct Attributes {
pub max_msg_len: usize,
pub capacity: usize,
pub current_messages: usize,
pub nonblocking: bool,
}