#![feature(associated_type_defaults)]
#![deny(missing_docs)]
extern crate serde_cbor;
extern crate serde;
extern crate libc;
extern crate nix;
use serde::{Deserialize, Serialize};
use nix::errno::errno;
use nix::errno::Errno;
use libc::{
msgget,
msgctl,
msqid_ds,
};
use std::ptr;
use std::convert::From;
use std::marker::PhantomData;
use std::borrow::{Borrow, BorrowMut};
pub mod raw;
use raw::*;
#[derive(Debug, PartialEq, Eq)]
pub enum IpcError {
FailedToDeserialize,
QueueIsUninitialized,
CouldntReadMessage,
QueueAlreadyExists,
FailedToSerialize,
QueueDoesntExist,
QueueWasRemoved,
SignalReceived,
InvalidMessage,
InvalidCommand,
MessageTooBig,
InvalidStruct,
TooManyQueues,
AccessDenied,
QueueFull,
NoMessage,
NoMemory,
UnknownErrorValue(i32),
UnknownReturnValue(i32),
}
#[derive(Debug, Clone, Copy)]
pub enum Mode {
Public,
Group,
Private,
Custom(i32),
}
impl From<Mode> for i32 {
fn from(mode: Mode) -> i32 {
match mode {
Mode::Public => 0o666,
Mode::Group => 0o660,
Mode::Private => 0o600,
Mode::Custom(x) => x,
}
}
}
pub struct MessageQueue<T> {
pub id: i32,
pub key: i32,
pub mask: i32,
pub message_mask: i32,
pub mode: i32,
auto_kill: bool,
initialized: bool,
types: PhantomData<T>,
}
#[repr(C)]
pub struct Message {
pub mtype: i64,
pub mtext: [u8; 65536],
}
impl<T> Drop for MessageQueue<T> {
fn drop(&mut self) {
if self.auto_kill {
let _ = self.delete(); }
}
}
impl<T> MessageQueue<T> {
pub fn create(mut self) -> Self {
self.mask |= IpcFlags::CreateKey as i32;
self
}
pub fn exclusive(mut self) -> Self {
self.mask |= IpcFlags::Exclusive as i32;
self
}
pub fn async(mut self) -> Self {
self.message_mask |= IpcFlags::NoWait as i32;
self
}
pub fn mode(mut self, mode: Mode) -> Self {
self.mode = mode.into();
self
}
pub fn auto_kill(mut self, kill: bool) -> Self {
self.auto_kill = kill;
self
}
pub fn delete(&mut self) -> Result<(), IpcError> {
if !self.initialized {
return Err(IpcError::QueueIsUninitialized);
}
let res = unsafe {
msgctl(self.id, ControlCommands::DeleteQueue as i32, ptr::null::<msqid_ds>() as *mut msqid_ds)
};
match res {
-1 => match Errno::from_i32(errno()) {
Errno::EPERM => Err(IpcError::AccessDenied),
Errno::EACCES => Err(IpcError::AccessDenied),
Errno::EFAULT => Err(IpcError::InvalidStruct),
Errno::EINVAL => Err(IpcError::InvalidCommand),
Errno::EIDRM => Err(IpcError::QueueDoesntExist),
_ => Err(IpcError::UnknownErrorValue(errno())),
}
_ => { self.initialized = false; Ok(()) }
}
}
pub fn init(mut self) -> Result<Self, IpcError> {
self.initialized = true;
self.id = unsafe { msgget(self.key, self.mask | self.mode) };
match self.id {
-1 => match Errno::from_i32(errno()) {
Errno::EEXIST => Err(IpcError::QueueAlreadyExists),
Errno::ENOENT => Err(IpcError::QueueDoesntExist),
Errno::ENOSPC => Err(IpcError::TooManyQueues),
Errno::EACCES => Err(IpcError::AccessDenied),
Errno::ENOMEM => Err(IpcError::NoMemory),
_ => Err(IpcError::UnknownErrorValue(errno())),
}
_ => Ok(self),
}
}
pub fn new(key: i32) -> Self {
MessageQueue {
id: -1,
key,
mask: 0,
message_mask: 0,
mode: 0o666,
initialized: false,
auto_kill: false,
types: PhantomData
}
}
}
impl<'a, T> MessageQueue<T> where T: Serialize {
pub fn send<I>(&self, src: T, mtype: I) -> Result<(), IpcError> where I: Into<i64> {
if !self.initialized {
return Err(IpcError::QueueIsUninitialized);
}
let mut message = Box::new(Message {
mtype: mtype.into(),
mtext: [0; 65536],
});
let bytes = match serde_cbor::ser::to_vec(&src) {
Ok(b) => b,
Err(_) => return Err(IpcError::FailedToSerialize),
};
bytes
.iter()
.enumerate()
.for_each(|(i, x)| message.mtext[i] = *x);
let res = unsafe {
msgsnd(self.id, message.borrow() as *const Message, bytes.len(), 0)
};
match res {
-1 => match Errno::from_i32(errno()) {
Errno::EFAULT => Err(IpcError::CouldntReadMessage),
Errno::EIDRM => Err(IpcError::QueueWasRemoved),
Errno::EINTR => Err(IpcError::SignalReceived),
Errno::EINVAL => Err(IpcError::InvalidMessage),
Errno::E2BIG => Err(IpcError::MessageTooBig),
Errno::EACCES => Err(IpcError::AccessDenied),
Errno::ENOMSG => Err(IpcError::NoMessage),
Errno::EAGAIN => Err(IpcError::QueueFull),
Errno::ENOMEM => Err(IpcError::NoMemory),
_ => Err(IpcError::UnknownErrorValue(errno())),
}
0 => Ok(()),
x => Err(IpcError::UnknownReturnValue(x as i32)),
}
}
}
impl<'a, T> MessageQueue<T> where for<'de> T: Deserialize<'de> {
pub fn peek(&self) -> Result<T, IpcError> {
if !self.initialized {
return Err(IpcError::QueueIsUninitialized);
}
let mut message: Box<Message> = Box::new(Message {
mtype: 0,
mtext: [0; 65536],
});
let size = unsafe { msgrcv(self.id, message.borrow_mut() as *mut Message, 65536, 0, IpcFlags::MsgCopy as i32 | self.message_mask) };
if size >= 0 {
match serde_cbor::from_slice(&message.mtext[..size as usize]) {
Ok(r) => Ok(r),
Err(_) => Err(IpcError::FailedToDeserialize),
}
}
else {
match Errno::from_i32(errno()) {
Errno::EFAULT => Err(IpcError::CouldntReadMessage),
Errno::EIDRM => Err(IpcError::QueueWasRemoved),
Errno::EINTR => Err(IpcError::SignalReceived),
Errno::EINVAL => Err(IpcError::InvalidMessage),
Errno::E2BIG => Err(IpcError::MessageTooBig),
Errno::EACCES => Err(IpcError::AccessDenied),
Errno::ENOMSG => Err(IpcError::NoMessage),
Errno::EAGAIN => Err(IpcError::QueueFull),
Errno::ENOMEM => Err(IpcError::NoMemory),
_ => Err(IpcError::UnknownErrorValue(errno())),
}
}
}
pub fn recv(&self) -> Result<T, IpcError> {
if !self.initialized {
return Err(IpcError::QueueIsUninitialized);
}
let mut message: Box<Message> = Box::new(Message {
mtype: 0,
mtext: [0; 65536],
});
let size = unsafe { msgrcv(self.id, message.borrow_mut() as *mut Message, 65536, 0, self.message_mask) };
if size >= 0 {
match serde_cbor::from_slice(&message.mtext[..size as usize]) {
Ok(r) => Ok(r),
Err(_) => Err(IpcError::FailedToDeserialize),
}
}
else {
match Errno::from_i32(errno()) {
Errno::EFAULT => Err(IpcError::CouldntReadMessage),
Errno::EIDRM => Err(IpcError::QueueWasRemoved),
Errno::EINTR => Err(IpcError::SignalReceived),
Errno::EINVAL => Err(IpcError::InvalidMessage),
Errno::E2BIG => Err(IpcError::MessageTooBig),
Errno::EACCES => Err(IpcError::AccessDenied),
Errno::ENOMSG => Err(IpcError::NoMessage),
Errno::EAGAIN => Err(IpcError::QueueFull),
Errno::ENOMEM => Err(IpcError::NoMemory),
_ => Err(IpcError::UnknownErrorValue(errno())),
}
}
}
}
#[cfg(test)]
mod tests {
use ::MessageQueue;
use ::IpcError;
#[test]
fn send_message() {
let queue = MessageQueue::new(1234).init().unwrap();
let res = queue.send("kalinka", 25);
println!("{:?}", res);
assert!(res.is_ok());
}
#[test]
fn recv_message() {
let queue = MessageQueue::<String>::new(1234).init().unwrap();
let res = queue.recv();
println!("{:?}", res);
assert!(res.is_ok());
}
#[test]
fn nonblocking() {
let queue = MessageQueue::<()>::new(745965545)
.async()
.init()
.unwrap();
println!("{}", queue.mask);
assert_eq!(Err(IpcError::NoMessage), queue.recv())
}
}