ipc_rs/lib.rs
1//! This crate facilitates easy interprocess communication through the SysV IPC protocol
2//! and Serde. Beware that there is an arbitrary message size limit set at 8KB.
3//!
4//! Through some `PhantomData` and Higher-Ranked Trait Bounds magic, MessageQueue is pretty
5//! smart when it comes to type (de)serialization. Intentionally, MessageQueues are limited
6//! to one data type, which is defined when you create a new queue.
7
8// TODO implement a queue iterator
9// TODO semaphores, shared memory
10#![feature(associated_type_defaults)]
11#![deny(missing_docs)]
12extern crate serde_cbor;
13extern crate serde;
14extern crate libc;
15extern crate nix;
16
17use serde::{Deserialize, Serialize};
18use nix::errno::errno;
19use nix::errno::Errno;
20
21use libc::{
22 msgget,
23 msgctl,
24 msqid_ds,
25};
26
27use std::ptr;
28//use std::mem;
29use std::convert::From;
30use std::marker::PhantomData;
31use std::borrow::{Borrow, BorrowMut};
32
33pub mod raw;
34use raw::*;
35
36/// An enum containing all possible IPC errors
37#[derive(Debug, PartialEq, Eq)]
38pub enum IpcError {
39 /// Returned, when it wasn't possible to
40 /// deserialize bytes into the desired types.
41 /// That mostly occurs if MessageQueue is defined
42 /// with the wrong type or the input got truncated
43 FailedToDeserialize,
44 /// Returned when an attempt was made to use
45 /// a queue before the `init()` call was made
46 QueueIsUninitialized,
47 /// Returned, if it was impossible to read the `Message`
48 /// structure. Occurs if you made the raw pointer too
49 /// early and the underlying data got dropped already
50 CouldntReadMessage,
51 /// When the queue already exists, but IpcFlags::Exclusive
52 /// and `IpcFlags::CreateKey` were both specified
53 QueueAlreadyExists,
54 /// Occurs if it isn't possible to serialize a struct
55 /// into bytes. Shouldn't normally occur, might indicate
56 /// a bug in the CBOR library
57 FailedToSerialize,
58 /// `IpcFlags::Exclusive` was specified, but queue
59 /// doesn't exist
60 QueueDoesntExist,
61 /// The Queue has been removed (might be because the
62 /// system ran out of memory)
63 QueueWasRemoved,
64 /// A signal was received
65 SignalReceived,
66 /// The message is invalid, occurs if the message struct
67 /// does not follow the mtype-mtext forma
68 InvalidMessage,
69 /// Returned when an invalid command was given
70 /// to `msgctl()`
71 InvalidCommand,
72 /// The message is bigger than either the system limit
73 /// or the set limit
74 MessageTooBig,
75 /// Invalid struct
76 ///
77 /// This error is returned when
78 /// `msgctl()` was called with a invalid
79 /// pointer to a struct, which would be
80 /// either `msqid_ds or msginfo`.test
81 InvalidStruct,
82 /// There are too many `MessageQueue`s already
83 /// (shouldn't occur, the limit is pretty big)
84 TooManyQueues,
85 /// Access was denied, you are trying to read a queue
86 /// that doesn't belong to you or your process
87 AccessDenied,
88 /// The queue is full, 'nuff said
89 QueueFull,
90 /// There is no message. This isn't an error,
91 /// per se, but the intended return value of
92 /// nonblocking `recv()` calls
93 NoMessage,
94 /// There is not enough space left in the queue.
95 /// This isn't really an error either, it is what
96 /// is returned by a nonblocking `send()` call
97 NoMemory,
98
99 /// We know it was an error, but it was
100 /// something non-standard
101 UnknownErrorValue(i32),
102 /// one of the standard functions returned
103 /// a value it should never return.
104 /// (for example `msgsnd()` returning 5)
105 UnknownReturnValue(i32),
106}
107
108/// A helper enum for describing
109/// a message queue access mode
110///
111/// Note that the creator of a message queue
112/// bypasses permission mode and what's
113/// described here applies to the owner
114/// of the message queue (owner != creator).
115#[derive(Debug, Clone, Copy)]
116pub enum Mode {
117 /// Allows complete access to anyone
118 Public,
119 /// Allows complete access to only
120 /// the owner's group and the owner
121 /// (and the creator)
122 Group,
123 /// Allows complete access to only
124 /// the owner (and the creator)
125 Private,
126 /// Custom modes. Please, do try
127 /// to make sure that you only
128 /// pass numbers >= 0777
129 Custom(i32),
130}
131
132impl From<Mode> for i32 {
133 /// Allows conversion of modes to
134 /// and from `i32`. This conversion
135 /// can never fail, but there is a
136 /// chance that numbers 'longer' than
137 /// 9 bits might interfere with flags.
138 ///
139 /// Therefore, only use custom mode
140 /// when absolutely necessary.
141 fn from(mode: Mode) -> i32 {
142 match mode {
143 Mode::Public => 0o666,
144 Mode::Group => 0o660,
145 Mode::Private => 0o600,
146 Mode::Custom(x) => x,
147 }
148 }
149}
150
151/// The main message queue type.
152/// It holds basic information about a given message queue
153/// as well as type data about the content that passes
154/// through it.
155///
156/// The `PhantomData` marker ensures that the queue
157/// is locked to (de)serializing a single tyoe.
158///
159/// MessageQueue is quite liberal about the types
160/// it accepts. If you are only ever going to send
161/// a type, it just requires that the type is
162/// `Serialize`.
163///
164/// If the queue is only ever going to be receiving
165/// data, it requires the associated type to be
166/// `Deserialize`.
167///
168/// This allows you to spare some precious bytes.
169///
170/// Note that `MessageQueue` reports all errors
171/// properly, so they should be handled, lest you
172/// wish to shoot your leg off.
173///
174/// ## General Usage Example
175///
176/// Before usage a `MessageQueue` needs to be initialized
177/// through the use of the [`MessageQueue::init()`]
178/// method. Failure to do so results in the queue
179/// refusing to work.
180///
181/// ```no_run
182/// # extern crate ipc_rs;
183/// # use ipc_rs::IpcError;
184/// use ipc_rs::MessageQueue;
185///
186/// # fn main() -> Result<(), IpcError> {
187/// let my_key = 1234;
188/// let queue = MessageQueue::<String>::new(my_key)
189/// .create()
190/// .async()
191/// .init()?;
192///
193/// queue.send("hello world".to_string(), 24)
194/// .expect("failed to send a message");
195/// # Ok(())
196/// # }
197/// ```
198pub struct MessageQueue<T> {
199 /// The actual ID of the underlying SysV message
200 /// queue. This value is 'unassigned' until a call
201 /// to the `init()` method is made.
202 pub id: i32,
203 /// This is the key that was given when the
204 /// `MessageQueue` was created, make sure to use
205 /// the same key on both sides of the barricade
206 /// to ensure proper 'connection' is estabilised
207 pub key: i32,
208 /// The bit flags used to create a new queue,
209 /// see [`IpcFlags`] for more info.
210 pub mask: i32,
211 /// The bit flags used when sending/receiving a
212 /// message, they for example affect whether data
213 /// gets truncated or whether the calls to `send()`
214 /// and `recv()` are blocking or not.
215 pub message_mask: i32,
216 /// Mode bits, these are an equivalent to those
217 /// one encounters when working with files in Unix
218 /// systems, therefore, the 9 least significant bits
219 /// follow this pattern:
220 ///
221 /// ```text
222 /// rwxrwxrwx
223 /// |_||_||_|
224 /// │ │ │
225 /// │ │ └── others
226 /// │ └── owner's user group
227 /// └── owner
228 /// ```
229 ///
230 /// Currently, the execute bits are ignored, so you
231 /// needn't worry about them. Therefore, to allow
232 /// full access to anyone, mode should be set to
233 /// `0666` aka `0b110_110_110`.
234 ///
235 /// Similarly, to make the queue `private` one would
236 /// use `0600` aka `0b110_000_000`. `
237 pub mode: i32,
238 auto_kill: bool,
239 initialized: bool,
240 types: PhantomData<T>,
241}
242
243/// This struct represents a message that is inserted into
244/// a message queue on every [`MessageQueue::send()`] call.
245///
246/// It follows the SysV message recipe of having only two
247/// fields, namely:
248///
249/// * `mtype: i64` - which is the type of a message, it can
250/// be used for filtering within queues and should never
251/// be a negative integer. u64 isn't used here, however,
252/// because of the kernel's anticipated internal representation
253/// * `mtext` - which is where the data of the message are stored.
254/// The kernel doesn't care about what `mtext` is so long
255/// as it is not a pointer (because pointers are a recipe
256/// for trouble when passing the interprocess boundary).
257/// Therefore it can be either a struct or an array. Here,
258/// an array of 8K bytes was chosen to allow the maximum
259/// versatility within the default message size limit (8KiB).
260/// In the future, functionality to affect the limit shall
261/// be exposed and bigger messages will be allowed
262///
263/// Messages are required to be #[repr(C)] to avoid unexpected
264/// surprises.
265///
266/// Finally, due to the size of a Message, it is unwise to
267/// store them on the stack. On Arch x86_64, the default stack
268/// size is 8mb, which is just enough for less than a thousand
269/// messages. Use Box instead.
270#[repr(C)]
271pub struct Message {
272 /// This should be a positive integer.
273 /// For normal usage, it is inconsequential,
274 /// but you may want to use it for filtering.
275 ///
276 /// In fact, if you are looking for messages
277 /// with a specific type, the `msgtyp` parameter
278 /// of [`msgrcv()`] might be of use to you.
279 ///
280 /// Check out its documentation for more info.
281 pub mtype: i64,
282 /// This is a simple byte array. The 'standard'
283 /// allows for mtext to be either a structure
284 /// or an array. For the purposes of `ipc-rs`,
285 /// array is the better choice.
286 ///
287 /// Currently, the data is stored as CBOR, the
288 /// more efficient byte JSON. Check out the
289 /// documentation of `serde_cbor`.
290 pub mtext: [u8; 65536],
291}
292
293impl<T> Drop for MessageQueue<T> {
294 /// Does nothing unless auto_kill is specified,
295 /// in which case it deletes the associated queue
296 fn drop(&mut self) {
297 if self.auto_kill {
298 let _ = self.delete(); // We don't really care about failures here
299 }
300 }
301}
302
303impl<T> MessageQueue<T> {
304 /// Allow the creation of a new message queue
305 pub fn create(mut self) -> Self {
306 self.mask |= IpcFlags::CreateKey as i32;
307 self
308 }
309
310 /// Enforce the operation at hand. If `create()`
311 /// is also used, `init()` will fail if the create
312 /// already exist.
313 pub fn exclusive(mut self) -> Self {
314 self.mask |= IpcFlags::Exclusive as i32;
315 self
316 }
317
318 /// Adds the NoWait flag to message_mask to make
319 /// the calls to `send()` and `recv()` non-blocking.
320 /// When there is no message to be received, `recv()`
321 /// returns [`IpcError::NoMessage`] and similarly,
322 /// when a message can't be sent because the queue is
323 /// full, a nonblocking `send()` returns [`IpcError::QueueFull`]
324 pub fn async(mut self) -> Self {
325 self.message_mask |= IpcFlags::NoWait as i32;
326 self
327 }
328
329 /// Sets the mode of a given message queue.
330 /// See [`Mode`] for more information
331 pub fn mode(mut self, mode: Mode) -> Self {
332 self.mode = mode.into();
333 self
334 }
335
336 /// Automatically deletes removes a queue when it
337 /// goes out of scope. That basically boils down
338 /// to `self.delete()` being called during Drop
339 pub fn auto_kill(mut self, kill: bool) -> Self {
340 self.auto_kill = kill;
341 self
342 }
343
344 /// Deletes a queue through `msgctl()`
345 pub fn delete(&mut self) -> Result<(), IpcError> {
346 if !self.initialized {
347 return Err(IpcError::QueueIsUninitialized);
348 }
349
350 let res = unsafe {
351 msgctl(self.id, ControlCommands::DeleteQueue as i32, ptr::null::<msqid_ds>() as *mut msqid_ds)
352 };
353
354 match res {
355 -1 => match Errno::from_i32(errno()) {
356 Errno::EPERM => Err(IpcError::AccessDenied),
357 Errno::EACCES => Err(IpcError::AccessDenied),
358 Errno::EFAULT => Err(IpcError::InvalidStruct),
359 Errno::EINVAL => Err(IpcError::InvalidCommand),
360 Errno::EIDRM => Err(IpcError::QueueDoesntExist),
361 _ => Err(IpcError::UnknownErrorValue(errno())),
362 }
363 _ => { self.initialized = false; Ok(()) }
364 }
365 }
366
367 /// Initializes a MessageQueue with the key
368 /// `self.key`, proper modes and mask
369 pub fn init(mut self) -> Result<Self, IpcError> {
370 self.initialized = true;
371 self.id = unsafe { msgget(self.key, self.mask | self.mode) };
372
373 match self.id {
374 -1 => match Errno::from_i32(errno()) {
375 Errno::EEXIST => Err(IpcError::QueueAlreadyExists),
376 Errno::ENOENT => Err(IpcError::QueueDoesntExist),
377 Errno::ENOSPC => Err(IpcError::TooManyQueues),
378 Errno::EACCES => Err(IpcError::AccessDenied),
379 Errno::ENOMEM => Err(IpcError::NoMemory),
380 _ => Err(IpcError::UnknownErrorValue(errno())),
381 }
382 _ => Ok(self),
383 }
384 }
385
386 /// Defines a new `MessageQueue`
387 /// In the future, it will be possible to use more types
388 /// of keys (which would be translated to i32 behind the
389 /// scenes automatically)
390 pub fn new(key: i32) -> Self {
391 MessageQueue {
392 id: -1,
393 key,
394 mask: 0,
395 message_mask: 0,
396 mode: 0o666,
397 initialized: false,
398 auto_kill: false,
399 types: PhantomData
400 }
401 }
402}
403
404impl<'a, T> MessageQueue<T> where T: Serialize {
405 /// Sends a new message, or tries to (in case of non-blocking calls).
406 /// If the queue is full, `IpcError::QueueFull` is returned
407 pub fn send<I>(&self, src: T, mtype: I) -> Result<(), IpcError> where I: Into<i64> {
408 if !self.initialized {
409 return Err(IpcError::QueueIsUninitialized);
410 }
411
412 let mut message = Box::new(Message {
413 mtype: mtype.into(),
414 mtext: [0; 65536],
415 });
416 let bytes = match serde_cbor::ser::to_vec(&src) {
417 Ok(b) => b,
418 Err(_) => return Err(IpcError::FailedToSerialize),
419 };
420
421 bytes
422 .iter()
423 .enumerate()
424 .for_each(|(i, x)| message.mtext[i] = *x);
425
426 let res = unsafe {
427 msgsnd(self.id, message.borrow() as *const Message, bytes.len(), 0)
428 };
429
430 match res {
431 -1 => match Errno::from_i32(errno()) {
432 Errno::EFAULT => Err(IpcError::CouldntReadMessage),
433 Errno::EIDRM => Err(IpcError::QueueWasRemoved),
434 Errno::EINTR => Err(IpcError::SignalReceived),
435 Errno::EINVAL => Err(IpcError::InvalidMessage),
436 Errno::E2BIG => Err(IpcError::MessageTooBig),
437 Errno::EACCES => Err(IpcError::AccessDenied),
438 Errno::ENOMSG => Err(IpcError::NoMessage),
439 Errno::EAGAIN => Err(IpcError::QueueFull),
440 Errno::ENOMEM => Err(IpcError::NoMemory),
441 _ => Err(IpcError::UnknownErrorValue(errno())),
442 }
443 0 => Ok(()),
444 x => Err(IpcError::UnknownReturnValue(x as i32)),
445 }
446 }
447}
448
449impl<'a, T> MessageQueue<T> where for<'de> T: Deserialize<'de> {
450 /// Returns a message without removing it from the message
451 /// queue. Use `recv()` if you want to consume the message
452 pub fn peek(&self) -> Result<T, IpcError> {
453 if !self.initialized {
454 return Err(IpcError::QueueIsUninitialized);
455 }
456
457 let mut message: Box<Message> = Box::new(Message {
458 mtype: 0,
459 mtext: [0; 65536],
460 });
461
462 let size = unsafe { msgrcv(self.id, message.borrow_mut() as *mut Message, 65536, 0, IpcFlags::MsgCopy as i32 | self.message_mask) };
463
464 if size >= 0 {
465 match serde_cbor::from_slice(&message.mtext[..size as usize]) {
466 Ok(r) => Ok(r),
467 Err(_) => Err(IpcError::FailedToDeserialize),
468 }
469 }
470 else {
471 match Errno::from_i32(errno()) {
472 Errno::EFAULT => Err(IpcError::CouldntReadMessage),
473 Errno::EIDRM => Err(IpcError::QueueWasRemoved),
474 Errno::EINTR => Err(IpcError::SignalReceived),
475 Errno::EINVAL => Err(IpcError::InvalidMessage),
476 Errno::E2BIG => Err(IpcError::MessageTooBig),
477 Errno::EACCES => Err(IpcError::AccessDenied),
478 Errno::ENOMSG => Err(IpcError::NoMessage),
479 Errno::EAGAIN => Err(IpcError::QueueFull),
480 Errno::ENOMEM => Err(IpcError::NoMemory),
481 _ => Err(IpcError::UnknownErrorValue(errno())),
482 }
483 }
484 }
485
486 /// Receives a message, consuming it. If no message is
487 /// to be received, `recv()` either blocks or returns
488 /// [`IpcError::NoMemory`]
489 pub fn recv(&self) -> Result<T, IpcError> {
490 if !self.initialized {
491 return Err(IpcError::QueueIsUninitialized);
492 }
493
494 let mut message: Box<Message> = Box::new(Message {
495 mtype: 0,
496 mtext: [0; 65536],
497 }); // spooky scary stuff
498
499 let size = unsafe { msgrcv(self.id, message.borrow_mut() as *mut Message, 65536, 0, self.message_mask) };
500
501 if size >= 0 {
502 match serde_cbor::from_slice(&message.mtext[..size as usize]) {
503 Ok(r) => Ok(r),
504 Err(_) => Err(IpcError::FailedToDeserialize),
505 }
506 }
507 else {
508 match Errno::from_i32(errno()) {
509 Errno::EFAULT => Err(IpcError::CouldntReadMessage),
510 Errno::EIDRM => Err(IpcError::QueueWasRemoved),
511 Errno::EINTR => Err(IpcError::SignalReceived),
512 Errno::EINVAL => Err(IpcError::InvalidMessage),
513 Errno::E2BIG => Err(IpcError::MessageTooBig),
514 Errno::EACCES => Err(IpcError::AccessDenied),
515 Errno::ENOMSG => Err(IpcError::NoMessage),
516 Errno::EAGAIN => Err(IpcError::QueueFull),
517 Errno::ENOMEM => Err(IpcError::NoMemory),
518 _ => Err(IpcError::UnknownErrorValue(errno())),
519 }
520 }
521 }
522}
523
524#[cfg(test)]
525mod tests {
526 use ::MessageQueue;
527 use ::IpcError;
528
529 #[test]
530 fn send_message() {
531 let queue = MessageQueue::new(1234).init().unwrap();
532 let res = queue.send("kalinka", 25);
533 println!("{:?}", res);
534 assert!(res.is_ok());
535 }
536
537 #[test]
538 fn recv_message() {
539 let queue = MessageQueue::<String>::new(1234).init().unwrap();
540 let res = queue.recv();
541 println!("{:?}", res);
542 assert!(res.is_ok());
543 }
544
545 #[test]
546 fn nonblocking() {
547 let queue = MessageQueue::<()>::new(745965545)
548 .async()
549 .init()
550 .unwrap();
551
552 println!("{}", queue.mask);
553 assert_eq!(Err(IpcError::NoMessage), queue.recv())
554 }
555}