ipc_rs/
lib.rs

1//! This crate facilitates easy interprocess communication through the SysV IPC protocol
2//! and Serde. Beware that there is an arbitrary message size limit set at 8KB.
3//!
4//! Through some `PhantomData` and Higher-Ranked Trait Bounds magic, MessageQueue is pretty
5//! smart when it comes to type (de)serialization. Intentionally, MessageQueues are limited
6//! to one data type, which is defined when you create a new queue.
7
8// TODO implement a queue iterator
9// TODO semaphores, shared memory
10#![feature(associated_type_defaults)]
11#![deny(missing_docs)]
12extern crate serde_cbor;
13extern crate serde;
14extern crate libc;
15extern crate nix;
16
17use serde::{Deserialize, Serialize};
18use nix::errno::errno;
19use nix::errno::Errno;
20
21use libc::{
22	msgget,
23	msgctl,
24	msqid_ds,
25};
26
27use std::ptr;
28//use std::mem;
29use std::convert::From;
30use std::marker::PhantomData;
31use std::borrow::{Borrow, BorrowMut};
32
33pub mod raw;
34use raw::*;
35
36/// An enum containing all possible IPC errors
37#[derive(Debug, PartialEq, Eq)]
38pub enum IpcError {
39	/// Returned, when it wasn't possible to
40	/// deserialize bytes into the desired types.
41	/// That mostly occurs if MessageQueue is defined
42	/// with the wrong type or the input got truncated
43	FailedToDeserialize,
44	/// Returned when an attempt was made to use
45	/// a queue before the `init()` call was made
46	QueueIsUninitialized,
47	/// Returned, if it was impossible to read the `Message`
48	/// structure. Occurs if you made the raw pointer too
49	/// early and the underlying data got dropped already
50	CouldntReadMessage,
51	/// When the queue already exists, but IpcFlags::Exclusive
52	/// and `IpcFlags::CreateKey` were both specified
53	QueueAlreadyExists,
54	/// Occurs if it isn't possible to serialize a struct
55	/// into bytes. Shouldn't normally occur, might indicate
56	/// a bug in the CBOR library
57	FailedToSerialize,
58	/// `IpcFlags::Exclusive` was specified, but queue
59	/// doesn't exist
60	QueueDoesntExist,
61	/// The Queue has been removed (might be because the
62	/// system ran out of memory)
63	QueueWasRemoved,
64	/// A signal was received
65	SignalReceived,
66	/// The message is invalid, occurs if the message struct
67	/// does not follow the mtype-mtext forma
68	InvalidMessage,
69	/// Returned when an invalid command was given
70	/// to `msgctl()`
71	InvalidCommand,
72	/// The message is bigger than either the system limit
73	/// or the set limit
74	MessageTooBig,
75	/// Invalid struct
76	///
77	/// This error is returned when
78	/// `msgctl()` was called with a invalid
79	/// pointer to a struct, which would be
80	/// either `msqid_ds or msginfo`.test
81	InvalidStruct,
82	/// There are too many `MessageQueue`s already
83	/// (shouldn't occur, the limit is pretty big)
84	TooManyQueues,
85	/// Access was denied, you are trying to read a queue
86	/// that doesn't belong to you or your process
87	AccessDenied,
88	/// The queue is full, 'nuff said
89	QueueFull,
90	/// There is no message. This isn't an error,
91	/// per se, but the intended return value of
92	/// nonblocking `recv()` calls
93	NoMessage,
94	/// There is not enough space left in the queue.
95	/// This isn't really an error either, it is what
96	/// is returned by a nonblocking `send()` call
97	NoMemory,
98
99	/// We know it was an error, but it was
100	/// something non-standard
101	UnknownErrorValue(i32),
102	/// one of the standard functions returned
103	/// a value it should never return.
104	/// (for example `msgsnd()` returning 5)
105	UnknownReturnValue(i32),
106}
107
108/// A helper enum for describing
109/// a message queue access mode
110///
111/// Note that the creator of a message queue
112/// bypasses permission mode and what's
113/// described here applies to the owner
114/// of the message queue (owner != creator).
115#[derive(Debug, Clone, Copy)]
116pub enum Mode {
117	/// Allows complete access to anyone
118	Public,
119	/// Allows complete access to only
120	/// the owner's group and the owner
121	/// (and the creator)
122	Group,
123	/// Allows complete access to only
124	/// the owner (and the creator)
125	Private,
126	/// Custom modes. Please, do try
127	/// to make sure that you only
128	/// pass numbers >= 0777
129	Custom(i32),
130}
131
132impl From<Mode> for i32 {
133	/// Allows conversion of modes to
134	/// and from `i32`. This conversion
135	/// can never fail, but there is a
136	/// chance that numbers 'longer' than
137	/// 9 bits might interfere with flags.
138	///
139	/// Therefore, only use custom mode
140	/// when absolutely necessary.
141	fn from(mode: Mode) -> i32 {
142		match mode {
143			Mode::Public => 0o666,
144			Mode::Group => 0o660,
145			Mode::Private => 0o600,
146			Mode::Custom(x) => x,
147		}
148	}
149}
150
151/// The main message queue type.
152/// It holds basic information about a given message queue
153/// as well as type data about the content that passes
154/// through it.
155///
156/// The `PhantomData` marker ensures that the queue
157/// is locked to (de)serializing a single tyoe.
158///
159/// MessageQueue is quite liberal about the types
160/// it accepts. If you are only ever going to send
161/// a type, it just requires that the type is
162/// `Serialize`.
163///
164/// If the queue is only ever going to be receiving
165/// data, it requires the associated type to be
166/// `Deserialize`.
167///
168/// This allows you to spare some precious bytes.
169///
170/// Note that `MessageQueue` reports all errors
171/// properly, so they should be handled, lest you
172/// wish to shoot your leg off.
173///
174/// ## General Usage Example
175///
176/// Before usage a `MessageQueue` needs to be initialized
177/// through the use of the [`MessageQueue::init()`]
178/// method. Failure to do so results in the queue
179/// refusing to work.
180///
181/// ```no_run
182/// # extern crate ipc_rs;
183/// # use ipc_rs::IpcError;
184/// use ipc_rs::MessageQueue;
185///
186/// # fn main() -> Result<(), IpcError> {
187/// let my_key = 1234;
188/// let queue = MessageQueue::<String>::new(my_key)
189/// 	.create()
190/// 	.async()
191/// 	.init()?;
192///
193/// queue.send("hello world".to_string(), 24)
194/// 	.expect("failed to send a message");
195/// # Ok(())
196/// # }
197/// ```
198pub struct MessageQueue<T> {
199	/// The actual ID of the underlying SysV message
200	/// queue. This value is 'unassigned' until a call
201	/// to the `init()` method is made.
202	pub id: i32,
203	/// This is the key that was given when the
204	/// `MessageQueue` was created, make sure to use
205	/// the same key on both sides of the barricade
206	/// to ensure proper 'connection' is estabilised
207	pub key: i32,
208	/// The bit flags used to create a new queue,
209	/// see [`IpcFlags`] for more info.
210	pub mask: i32,
211	/// The bit flags used when sending/receiving a
212	/// message, they for example affect whether data
213	/// gets truncated or whether the calls to `send()`
214	/// and `recv()` are blocking or not.
215	pub message_mask: i32,
216	/// Mode bits, these are an equivalent to those
217	/// one encounters when working with files in Unix
218	/// systems, therefore, the 9 least significant bits
219	/// follow this pattern:
220	///
221	/// ```text
222	/// rwxrwxrwx
223	/// |_||_||_|
224	///  │  │  │
225	///  │  │  └── others
226	///  │  └── owner's user group
227	///  └── owner
228	/// ```
229	///
230	/// Currently, the execute bits are ignored, so you
231	/// needn't worry about them. Therefore, to allow
232	/// full access to anyone, mode should be set to
233	/// `0666` aka `0b110_110_110`.
234	///
235	/// Similarly, to make the queue `private` one would
236	/// use `0600` aka `0b110_000_000`. `
237	pub mode: i32,
238	auto_kill: bool,
239	initialized: bool,
240	types: PhantomData<T>,
241}
242
243/// This struct represents a message that is inserted into
244/// a message queue on every [`MessageQueue::send()`] call.
245///
246/// It follows the SysV message recipe of having only two
247/// fields, namely:
248///
249/// * `mtype: i64` - which is the type of a message, it can
250///   be used for filtering within queues and should never
251///   be a negative integer. u64 isn't used here, however,
252///   because of the kernel's anticipated internal representation
253/// * `mtext` - which is where the data of the message are stored.
254///   The kernel doesn't care about what `mtext` is so long
255///   as it is not a pointer (because pointers are a recipe
256///   for trouble when passing the interprocess boundary).
257///   Therefore it can be either a struct or an array. Here,
258///   an array of 8K bytes was chosen to allow the maximum
259///   versatility within the default message size limit (8KiB).
260///   In the future, functionality to affect the limit shall
261///   be exposed and bigger messages will be allowed
262///
263/// Messages are required to be #[repr(C)] to avoid unexpected
264/// surprises.
265///
266/// Finally, due to the size of a Message, it is unwise to
267/// store them on the stack. On Arch x86_64, the default stack
268/// size is 8mb, which is just enough for less than a thousand
269/// messages. Use Box instead.
270#[repr(C)]
271pub struct Message {
272	/// This should be a positive integer.
273	/// For normal usage, it is inconsequential,
274	/// but you may want to use it for filtering.
275	///
276	/// In fact, if you are looking for messages
277	/// with a specific type, the `msgtyp` parameter
278	/// of [`msgrcv()`] might be of use to you.
279	///
280	/// Check out its documentation for more info.
281	pub mtype: i64,
282	/// This is a simple byte array. The 'standard'
283	/// allows for mtext to be either a structure
284	/// or an array. For the purposes of `ipc-rs`,
285	/// array is the better choice.
286	///
287	/// Currently, the data is stored as CBOR, the
288	/// more efficient byte JSON. Check out the
289	/// documentation of `serde_cbor`.
290	pub mtext: [u8; 65536],
291}
292
293impl<T> Drop for MessageQueue<T> {
294	/// Does nothing unless auto_kill is specified,
295	/// in which case it deletes the associated queue
296	fn drop(&mut self) {
297		if self.auto_kill {
298			let _ = self.delete(); // We don't really care about failures here
299		}
300	}
301}
302
303impl<T> MessageQueue<T> {
304	/// Allow the creation of a new message queue
305	pub fn create(mut self) -> Self {
306		self.mask |= IpcFlags::CreateKey as i32;
307		self
308	}
309
310	/// Enforce the operation at hand. If `create()`
311	/// is also used, `init()` will fail if the create
312	/// already exist.
313	pub fn exclusive(mut self) -> Self {
314		self.mask |= IpcFlags::Exclusive as i32;
315		self
316	}
317
318	/// Adds the NoWait flag to message_mask to make
319	/// the calls to `send()` and `recv()` non-blocking.
320	/// When there is no message to be received, `recv()`
321	/// returns [`IpcError::NoMessage`] and similarly,
322	/// when a message can't be sent because the queue is
323	/// full, a nonblocking `send()` returns [`IpcError::QueueFull`]
324	pub fn async(mut self) -> Self {
325		self.message_mask |= IpcFlags::NoWait as i32;
326		self
327	}
328
329	/// Sets the mode of a given message queue.
330	/// See [`Mode`] for more information
331	pub fn mode(mut self, mode: Mode) -> Self {
332		self.mode = mode.into();
333		self
334	}
335
336	/// Automatically deletes removes a queue when it
337	/// goes out of scope. That basically boils down
338	/// to `self.delete()` being called during Drop
339	pub fn auto_kill(mut self, kill: bool) -> Self {
340		self.auto_kill = kill;
341		self
342	}
343
344	/// Deletes a queue through `msgctl()`
345	pub fn delete(&mut self) -> Result<(), IpcError> {
346		if !self.initialized {
347			return Err(IpcError::QueueIsUninitialized);
348		}
349
350		let res = unsafe {
351			msgctl(self.id, ControlCommands::DeleteQueue as i32, ptr::null::<msqid_ds>() as *mut msqid_ds)
352		};
353
354		match res {
355			-1 => match Errno::from_i32(errno()) {
356			    Errno::EPERM  => Err(IpcError::AccessDenied),
357			    Errno::EACCES => Err(IpcError::AccessDenied),
358			    Errno::EFAULT => Err(IpcError::InvalidStruct),
359			    Errno::EINVAL => Err(IpcError::InvalidCommand),
360			    Errno::EIDRM  => Err(IpcError::QueueDoesntExist),
361			    _ => Err(IpcError::UnknownErrorValue(errno())),
362			}
363			_ => { self.initialized = false; Ok(()) }
364		}
365	}
366
367	/// Initializes a MessageQueue with the key
368	/// `self.key`, proper modes and mask
369	pub fn init(mut self) -> Result<Self, IpcError> {
370		self.initialized = true;
371		self.id = unsafe { msgget(self.key, self.mask | self.mode) };
372
373		match self.id {
374			-1 => match Errno::from_i32(errno()) {
375				Errno::EEXIST => Err(IpcError::QueueAlreadyExists),
376				Errno::ENOENT => Err(IpcError::QueueDoesntExist),
377				Errno::ENOSPC => Err(IpcError::TooManyQueues),
378				Errno::EACCES => Err(IpcError::AccessDenied),
379				Errno::ENOMEM => Err(IpcError::NoMemory),
380				_ => Err(IpcError::UnknownErrorValue(errno())),
381			}
382			_  => Ok(self),
383		}
384	}
385
386	/// Defines a new `MessageQueue`
387	/// In the future, it will be possible to use more types
388	/// of keys (which would be translated to i32 behind the
389	/// scenes automatically)
390	pub fn new(key: i32) -> Self {
391		MessageQueue {
392			id: -1,
393			key,
394			mask: 0,
395			message_mask: 0,
396			mode: 0o666,
397			initialized: false,
398			auto_kill: false,
399			types: PhantomData
400		}
401	}
402}
403
404impl<'a, T> MessageQueue<T> where T: Serialize {
405	/// Sends a new message, or tries to (in case of non-blocking calls).
406	/// If the queue is full, `IpcError::QueueFull` is returned
407	pub fn send<I>(&self, src: T, mtype: I) -> Result<(), IpcError> where I: Into<i64> {
408		if !self.initialized {
409			return Err(IpcError::QueueIsUninitialized);
410		}
411
412		let mut message = Box::new(Message {
413			mtype: mtype.into(),
414			mtext: [0; 65536],
415		});
416		let bytes = match serde_cbor::ser::to_vec(&src) {
417			Ok(b) => b,
418			Err(_) => return Err(IpcError::FailedToSerialize),
419		};
420
421		bytes
422			.iter()
423			.enumerate()
424			.for_each(|(i, x)| message.mtext[i] = *x);
425
426		let res = unsafe {
427			msgsnd(self.id, message.borrow() as *const Message, bytes.len(), 0)
428		};
429
430		match res {
431			-1 => match Errno::from_i32(errno()) {
432				Errno::EFAULT => Err(IpcError::CouldntReadMessage),
433				Errno::EIDRM  => Err(IpcError::QueueWasRemoved),
434				Errno::EINTR  => Err(IpcError::SignalReceived),
435				Errno::EINVAL => Err(IpcError::InvalidMessage),
436				Errno::E2BIG  => Err(IpcError::MessageTooBig),
437				Errno::EACCES => Err(IpcError::AccessDenied),
438				Errno::ENOMSG => Err(IpcError::NoMessage),
439				Errno::EAGAIN => Err(IpcError::QueueFull),
440				Errno::ENOMEM => Err(IpcError::NoMemory),
441				_ => Err(IpcError::UnknownErrorValue(errno())),
442			}
443			0 => Ok(()),
444			x => Err(IpcError::UnknownReturnValue(x as i32)),
445		}
446	}
447}
448
449impl<'a, T> MessageQueue<T> where for<'de> T: Deserialize<'de> {
450	/// Returns a message without removing it from the message
451	/// queue. Use `recv()` if you want to consume the message
452	pub fn peek(&self) -> Result<T, IpcError> {
453		if !self.initialized {
454			return Err(IpcError::QueueIsUninitialized);
455		}
456
457		let mut message: Box<Message> = Box::new(Message {
458			mtype: 0,
459			mtext: [0; 65536],
460		});
461
462		let size = unsafe { msgrcv(self.id, message.borrow_mut() as *mut Message, 65536, 0, IpcFlags::MsgCopy as i32 | self.message_mask) };
463
464		if size >= 0 {
465			match serde_cbor::from_slice(&message.mtext[..size as usize]) {
466				Ok(r) => Ok(r),
467				Err(_) => Err(IpcError::FailedToDeserialize),
468			}
469		}
470		else {
471			match Errno::from_i32(errno()) {
472				Errno::EFAULT => Err(IpcError::CouldntReadMessage),
473				Errno::EIDRM  => Err(IpcError::QueueWasRemoved),
474				Errno::EINTR  => Err(IpcError::SignalReceived),
475				Errno::EINVAL => Err(IpcError::InvalidMessage),
476				Errno::E2BIG  => Err(IpcError::MessageTooBig),
477				Errno::EACCES => Err(IpcError::AccessDenied),
478				Errno::ENOMSG => Err(IpcError::NoMessage),
479				Errno::EAGAIN => Err(IpcError::QueueFull),
480				Errno::ENOMEM => Err(IpcError::NoMemory),
481				_ => Err(IpcError::UnknownErrorValue(errno())),
482			}
483		}
484	}
485
486	/// Receives a message, consuming it. If no message is
487	/// to be received, `recv()` either blocks or returns
488	/// [`IpcError::NoMemory`]
489	pub fn recv(&self) -> Result<T, IpcError> {
490		if !self.initialized {
491			return Err(IpcError::QueueIsUninitialized);
492		}
493
494		let mut message: Box<Message> = Box::new(Message {
495			mtype: 0,
496			mtext: [0; 65536],
497		}); // spooky scary stuff
498
499		let size = unsafe { msgrcv(self.id, message.borrow_mut() as *mut Message, 65536, 0, self.message_mask) };
500
501		if size >= 0 {
502			match serde_cbor::from_slice(&message.mtext[..size as usize]) {
503				Ok(r) => Ok(r),
504				Err(_) => Err(IpcError::FailedToDeserialize),
505			}
506		}
507		else {
508			match Errno::from_i32(errno()) {
509				Errno::EFAULT => Err(IpcError::CouldntReadMessage),
510				Errno::EIDRM  => Err(IpcError::QueueWasRemoved),
511				Errno::EINTR  => Err(IpcError::SignalReceived),
512				Errno::EINVAL => Err(IpcError::InvalidMessage),
513				Errno::E2BIG  => Err(IpcError::MessageTooBig),
514				Errno::EACCES => Err(IpcError::AccessDenied),
515				Errno::ENOMSG => Err(IpcError::NoMessage),
516				Errno::EAGAIN => Err(IpcError::QueueFull),
517				Errno::ENOMEM => Err(IpcError::NoMemory),
518				_ => Err(IpcError::UnknownErrorValue(errno())),
519			}
520		}
521	}
522}
523
524#[cfg(test)]
525mod tests {
526	use ::MessageQueue;
527	use ::IpcError;
528
529	#[test]
530	fn send_message() {
531		let queue = MessageQueue::new(1234).init().unwrap();
532		let res = queue.send("kalinka", 25);
533		println!("{:?}", res);
534		assert!(res.is_ok());
535	}
536
537	#[test]
538	fn recv_message() {
539		let queue = MessageQueue::<String>::new(1234).init().unwrap();
540		let res = queue.recv();
541		println!("{:?}", res);
542		assert!(res.is_ok());
543	}
544
545	#[test]
546	fn nonblocking() {
547		let queue = MessageQueue::<()>::new(745965545)
548			.async()
549			.init()
550			.unwrap();
551
552		println!("{}", queue.mask);
553		assert_eq!(Err(IpcError::NoMessage), queue.recv())
554	}
555}