[][src]Crate posixmq

Posix message queue wrapper with optional mio integration.

Posix message queues are like pipes, but message-oriented which makes them safe to read by multiple processes. Messages are sorted based on an additional priority parameter. Queues are not placed in the normal file system, but uses a separate, flat namespace. Normal file permissions still apply though. For a longer introduction, see man mq_overview or man mq.

They are not all that useful, as only Linux and some BSDs implement them, and even there you might be limited to creating queues with a capacity of no more than 10 messages at a time.

Examples

Send a couple messages:

use posixmq::PosixMq;

// open the message queue if it exists, or create it if it doesn't.
// names should start with a slash and have no more slashes.
let mq = PosixMq::create("/hello_posixmq").unwrap();
mq.send(0, b"message").unwrap();
// messages with equal priority will be received in order
mq.send(0, b"queue").unwrap();
// but this message has higher priority and will be received first
mq.send(10, b"Hello,").unwrap();

and receive them:

use posixmq::PosixMq;

// open the queue read-only, or fail if it doesn't exist.
let mq = PosixMq::open("/hello_posixmq").unwrap();
// delete the message queue when you don't need to open it again.
// otherwise it will remain until the system is rebooted, consuming
posixmq::unlink("/hello_posixmq").unwrap();

// the receive buffer must be at least as big as the biggest possible message,
// or you will not be allowed to receive anything.
let mut buf = vec![0; mq.attributes().max_msg_len];
assert_eq!(mq.receive(&mut buf).unwrap(), (10, "Hello,".len()));
assert_eq!(mq.receive(&mut buf).unwrap(), (0, "message".len()));
assert_eq!(mq.receive(&mut buf).unwrap(), (0, "queue".len()));
assert_eq!(&buf[..5], b"queue");

// check that there are no more messages
assert_eq!(mq.attributes().current_messages, 0);
// note that acting on this value is race-prone. A better way to do this
// would be to switch our descriptor to non-blocking mode, and check for
// an error of type `ErrorKind::WouldBlock`.

With mio (and features = ["mio"]):

// set up queue
let receiver = posixmq::OpenOptions::readonly()
    .nonblocking()
    .capacity(3)
    .max_msg_len(100)
    .create_new()
    .open("/mio")
    .unwrap();

// send something from another thread (or process)
let sender = thread::spawn(move|| {
    let sender = posixmq::OpenOptions::writeonly().open("/mio").unwrap();
    posixmq::unlink("/mio").unwrap();
    sender.send(0, b"async").unwrap();
});

// set up mio and register
let poll = Poll::new().unwrap();
poll.register(&receiver, Token(0), Ready::readable(), PollOpt::edge()).unwrap();
let mut events = Events::with_capacity(10);

poll.poll(&mut events, None).unwrap();
for event in &events {
    if event.token() == Token(0) {
        loop {
           let mut buf = [0; 100];
           match receiver.receive(&mut buf) {
               Err(ref e) if e.kind() == ErrorKind::WouldBlock => break,
               Err(e) => panic!("Error receiving message: {}", e),
               Ok((priority, len)) => {
                   assert_eq!(priority, 0);
                   assert_eq!(&buf[..len], b"async");
               }
           }
        }
    }
}

sender.join().unwrap();

See the examples/ directory for more.

Portability

While the p in POSIX stands for Portable, that is not a fitting description of their message queues; Support is spotty even among *nix OSes. Windows, macOS, OpenBSD, Android, ios, Rumprun and Emscripten doesn't support posix message queues at all.

Compatible operating systems and features

  Linux FreeBSD 11+ NetBSD DragonFlyBSD Illumos Solaris Fuchsia
core features Yes Yes Yes Yes coming coming Yes
mio Evented Yes Yes unusable Yes No No No
Sync Yes Yes Yes Yes Yes Yes Yes
FromRawFd+IntoRawFd Yes No Yes Yes No No Yes
AsRawFd Yes Yes Yes Yes No No Yes
(is|set)_cloexec() Yes Yes Yes Yes No No Yes
Tested? Yes, CI Yes, CI Manually Manually Manually No Cross-compiles

This library will fail to compile if the target OS doesn't support posix message queues at all.

Feature explanations:

  • FromRawFd+IntoRawFd: For this to compile, the inner mqd_t type must an int typedef, and bad things might happen if it doesn't represent a file descriptor. These impls are currently on by default and only disabled when known not to work.
  • AsRawFd: similar to FromRawFd and IntoRawFd, but FreeBSD 11+ has a function which lets one get a file descriptor for a mqd_t. This is required for querying or changing cloexec, and also for reliably setting it.
  • mio Evented: The impl requires both AsRawFd and that mio compiless. This does not guarantee that the polling mechanism used by mio supports posix message queues though.

On Linux, message queues and their permissions can be viewed in /dev/mqueue/. The kernel can be compiled to not support posix message queues, so it's not guaranteed to always work. (sch as on Adroid)

On FreeBSD, the kernel module responsible for posix message queues is not loaded by default; Run kldload mqueuefs as root to enable it. To list queues, the file system must additionally be mounted first: mount -t mqueuefs null $somewhere. Versions before 11 do not have the function used to get a file descriptor, so this library will not compile there.

On NetBSD, re-opening message queues multiple times can eventually make all further opens fail. This does not affect programs that open a single queue once.
The mio integration compiles, but registering message queues fail.

On Illumos and Solaris, the libc crate doesn't have the necessary functions or types at the moment so this library won't compile. Once a libc version with those is released, the core features will become useable.

OS-dependent restrictions and default values

Not even limiting oneself to the core features is enough to guarantee portability!

  Linux FreeBSD NetBSD DragonFlyBSD Illumos
max priority 32767 63 31 31 31
default capacity 10 10 32 32 128
default max_msg_len 8192 1024 992 992 1024
max capacity 10* 100 512 512 No limit
max max_msg_len 8192* 16384 16384 16384 No limit
allows empty messages Yes Yes No No Yes
enforces name rules Yes Yes No No Yes
allows "/.", "/.." and "/" No No Yes Yes Yes

On Linux the listed size limits only apply to unprivileged processes. As root there instead appears to be a combined limit on memory usage of the form capacity*(max_msg_len+k), but is several times higher than 10*8192.

Differences from the C API

  • send() and receive() tries again when EINTR / ErrorKind::Interrupted is returned. (Consistent with normal Rust io)
  • Descriptors are by default opened with O_CLOEXEC. (Consistent with normal Rust io)
  • open() and all other methods which take AsRef<[u8]> prepends '/' to the name if missing. (They allocate anyway, to append a terminating '\0')

Minimum Rust version

The minimum supported Rust version is 1.31.
While the crate might currently compile on older versions, a minor release can break this. Until rustup has builds for DragonFlyBSD and Illumos, this crate will never require a newer Rust version than what is available in the DragonFlyBSD or Joyent repositories.

Missing and planned features

  • mq_timedsend() and mq_timedreceive() wrappers.
  • Iterator-implementing struct that calls receive()
  • Listing queues and their owners using OS-specific interfaces (such as /dev/mqueue/ on Linux)
  • tmpfile equivalent
  • Querying and possibly changing limits and default values
  • Struct that deletes the message queue when dropped
  • Test or check more platforms on CI
  • Support more OSes?
  • mq_notify()?

Please open an issue if you want any of them.

Structs

Attributes

Contains information about the capacities and state of a posix message queue.

OpenOptions

Flags and parameters which control how a PosixMq message queue is opened or created.

PosixMq

A descriptor for an open posix message queue.

Functions

name_from_bytes

Helper function for converting a str or byte slice into a C string without allocating when possible.

unlink

Delete a posix message queue.

unlink_c

Delete a posix message queue, without inspecting name or allocating.