extern crate libc;
extern crate nix;
use error::Error;
use libc::mqd_t;
use nix::mqueue;
use nix::sys::stat;
use std::ffi::CString;
use std::fs::File;
use std::io::Read;
use std::ops::Drop;
use std::string::ToString;
pub mod error;
#[cfg(test)]
mod tests;
#[derive(Debug, Clone, PartialEq)]
pub struct Name(CString);
impl Name {
pub fn new<S: ToString>(s: S) -> Result<Self, Error> {
let string = s.to_string();
if !string.starts_with('/') {
return Err(Error::InvalidQueueName("Queue name must start with '/'"));
}
if string.len() == 1 {
return Err(Error::InvalidQueueName(
"Queue name must be a slash followed by one or more characters",
));
}
if string.len() > 255 {
return Err(Error::InvalidQueueName(
"Queue name must not exceed 255 characters",
));
}
if string.matches('/').count() > 1 {
return Err(Error::InvalidQueueName(
"Queue name can not contain more than one slash",
));
}
Ok(Name(CString::new(string).unwrap()))
}
}
#[derive(Debug, Clone, PartialEq)]
pub struct Message {
pub data: Vec<u8>,
pub priority: u32,
}
#[derive(Debug)]
pub struct Queue {
name: Name,
queue_descriptor: mqd_t,
max_pending: i64,
max_size: usize,
}
impl Queue {
pub fn create(name: Name, max_pending: i64, max_size: i64) -> Result<Queue, Error> {
if max_pending > read_i64_from_file(MSG_MAX)? {
return Err(Error::MaximumMessageCountExceeded());
}
if max_size > read_i64_from_file(MSGSIZE_MAX)? {
return Err(Error::MaximumMessageSizeExceeded());
}
let oflags = {
let mut flags = mqueue::MQ_OFlag::empty();
flags.toggle(mqueue::MQ_OFlag::O_RDWR);
flags.toggle(mqueue::MQ_OFlag::O_CREAT);
flags.toggle(mqueue::MQ_OFlag::O_EXCL);
flags
};
let attr = mqueue::MqAttr::new(0, max_pending, max_size, 0);
let queue_descriptor = mqueue::mq_open(&name.0, oflags, default_mode(), Some(&attr))?;
Ok(Queue {
name,
queue_descriptor,
max_pending,
max_size: max_size as usize,
})
}
pub fn open(name: Name) -> Result<Queue, Error> {
let oflags = mqueue::MQ_OFlag::O_RDWR;
let queue_descriptor = mqueue::mq_open(&name.0, oflags, default_mode(), None)?;
let attr = mq_getattr(queue_descriptor)?;
Ok(Queue {
name,
queue_descriptor,
max_pending: attr.mq_maxmsg,
max_size: attr.mq_msgsize as usize,
})
}
pub fn open_or_create(name: Name) -> Result<Queue, Error> {
let oflags = {
let mut flags = mqueue::MQ_OFlag::empty();
flags.toggle(mqueue::MQ_OFlag::O_RDWR);
flags.toggle(mqueue::MQ_OFlag::O_CREAT);
flags
};
let default_pending = read_i64_from_file(MSG_DEFAULT)?;
let default_size = read_i64_from_file(MSGSIZE_DEFAULT)?;
let attr = mqueue::MqAttr::new(0, default_pending, default_size, 0);
let queue_descriptor = mqueue::mq_open(&name.0, oflags, default_mode(), Some(&attr))?;
let actual_attr = mq_getattr(queue_descriptor)?;
Ok(Queue {
name,
queue_descriptor,
max_pending: actual_attr.mq_maxmsg,
max_size: actual_attr.mq_msgsize as usize,
})
}
pub fn delete(self) -> Result<(), Error> {
mqueue::mq_unlink(&self.name.0)?;
drop(self);
Ok(())
}
pub fn send(&self, msg: &Message) -> Result<(), Error> {
if msg.data.len() > self.max_size as usize {
return Err(Error::MessageSizeExceeded());
}
mqueue::mq_send(self.queue_descriptor, msg.data.as_ref(), msg.priority)
.map_err(|e| e.into())
}
pub fn receive(&self) -> Result<Message, Error> {
let mut data: Vec<u8> = vec![0; self.max_size as usize];
let mut priority: u32 = 0;
let msg_size = mqueue::mq_receive(self.queue_descriptor, data.as_mut(), &mut priority)?;
data.truncate(msg_size);
Ok(Message { data, priority })
}
pub fn max_pending(&self) -> i64 {
self.max_pending
}
pub fn max_size(&self) -> usize {
self.max_size
}
}
impl Drop for Queue {
fn drop(&mut self) {
mqueue::mq_close(self.queue_descriptor).ok();
}
}
fn default_mode() -> stat::Mode {
let mut mode = stat::Mode::empty();
mode.toggle(stat::Mode::S_IRUSR);
mode.toggle(stat::Mode::S_IWUSR);
mode
}
const MSG_DEFAULT: &'static str = "/proc/sys/fs/mqueue/msg_default";
const MSG_MAX: &'static str = "/proc/sys/fs/mqueue/msg_max";
const MSGSIZE_DEFAULT: &'static str = "/proc/sys/fs/mqueue/msgsize_default";
const MSGSIZE_MAX: &'static str = "/proc/sys/fs/mqueue/msgsize_max";
fn read_i64_from_file(name: &str) -> Result<i64, Error> {
let mut file = File::open(name.to_string())?;
let mut content = String::new();
file.read_to_string(&mut content)?;
Ok(content.trim().parse()?)
}
fn mq_getattr(mqd: mqd_t) -> Result<libc::mq_attr, Error> {
use std::mem;
let mut attr = mem::MaybeUninit::<libc::mq_attr>::uninit();
let res = unsafe { libc::mq_getattr(mqd, attr.as_mut_ptr()) };
nix::errno::Errno::result(res)
.map(|_| unsafe { attr.assume_init() })
.map_err(|e| e.into())
}