Crate posixmq[][src]

Posix message queue wrapper with optional mio integration.

Posix message queues are like pipes, but message-oriented which makes them safe to read by multiple processes. Messages are sorted based on an additional priority parameter. Queues are not placed in the normal file system, but uses a separate, flat namespace. Normal file permissions still apply though. For a longer introduction, see man mq_overview or man mq.

They are not all that useful, as only Linux and some BSDs implement them, and even there you might be limited to creating queues with a capacity of no more than 10 messages at a time.

Examples

Send a couple messages:

use posixmq::PosixMq;

// open the message queue if it exists, or create it if it doesn't.
// names should start with a slash and have no more slashes.
let mq = PosixMq::create("/hello_posixmq").unwrap();
mq.send(0, b"message").unwrap();
// messages with equal priority will be received in order
mq.send(0, b"queue").unwrap();
// but this message has higher priority and will be received first
mq.send(10, b"Hello,").unwrap();

and receive them:

use posixmq::PosixMq;

// open the queue read-only, or fail if it doesn't exist.
let mq = PosixMq::open("/hello_posixmq").unwrap();
// delete the message queue when you don't need to open it again.
// otherwise it will remain until the system is rebooted, consuming
posixmq::remove_queue("/hello_posixmq").unwrap();

// the receive buffer must be at least as big as the biggest possible
// message, or you will not be allowed to receive anything.
let mut buf = vec![0; mq.attributes().unwrap().max_msg_len];
assert_eq!(mq.recv(&mut buf).unwrap(), (10, "Hello,".len()));
assert_eq!(mq.recv(&mut buf).unwrap(), (0, "message".len()));
assert_eq!(mq.recv(&mut buf).unwrap(), (0, "queue".len()));
assert_eq!(&buf[..5], b"queue");

// check that there are no more messages
assert_eq!(mq.attributes().unwrap().current_messages, 0);
// note that acting on this value is race-prone. A better way to do this
// would be to switch our descriptor to non-blocking mode, and check for
// an error of type `ErrorKind::WouldBlock`.

With mio (and features = ["mio_07"] in Cargo.toml):

// set up queue
let mut receiver = posixmq::OpenOptions::readonly()
    .nonblocking()
    .capacity(3)
    .max_msg_len(100)
    .create_new()
    .open("/mio")
    .unwrap();

// send something from another thread (or process)
let sender = thread::spawn(move|| {
    let sender = posixmq::OpenOptions::writeonly().open("/mio").unwrap();
    posixmq::remove_queue("/mio").unwrap();
    sender.send(0, b"async").unwrap();
});

// set up mio and register
let mut poll = Poll::new().unwrap();
poll.registry().register(&mut receiver, Token(0), Interest::READABLE).unwrap();
let mut events = Events::with_capacity(10);

poll.poll(&mut events, None).unwrap();
for event in &events {
    if event.token() == Token(0) {
        loop {
            let mut buf = [0; 100];
            match receiver.recv(&mut buf) {
                Err(ref e) if e.kind() == ErrorKind::WouldBlock => break,
                Err(e) => panic!("Error receiving message: {}", e),
                Ok((priority, len)) => {
                    assert_eq!(priority, 0);
                    assert_eq!(&buf[..len], b"async");
                }
            }
        }
    }
}

sender.join().unwrap();

See the examples/ directory for more.

Portability

While the p in POSIX stands for Portable, that is not a fitting description of their message queues; Support is spotty even among *nix OSes. Windows, macOS, OpenBSD, Android, ios, Rumprun, Fuchsia and Emscripten doesn't support posix message queues at all.

Compatible operating systems and features

 LinuxFreeBSD 11+NetBSDDragonFly BSDIllumosSolarisVxWorks
core featuresYesYesYesYesYesYesYes
mio Source & EventedYesYesunusableYesNoNoNo
FromRawFd+IntoRawFd+try_clone()YesNoYesYesNoNoNo
AsRawFd+set_cloexec()YesYesYesYesNoNoNo
Tested?Manually+CIManually+CIManuallyManuallyManually (on OmniOSce)Cross-checked on CINo

This library will fail to compile if the target OS doesn't have posix message queues.

Feature explanations:

  • FromRawFd+IntoRawFd+try_clone(): For theese to work, the inner mqd_t type must be an int/RawFd typedef, and known to represent a file descriptor.
    These impls are only available on OSes where this is known to be the case, to increase the likelyhood that the core features will compile on an unknown OS.
  • AsRawFd+set_cloexec(): Similar to FromRawFd and IntoRawFd, but FreeBSD 11+ has a function which lets one get a file descriptor from a mqd_t.
    Changing or querying close-on-exec requires AsRawFd, and is only only meaningful on operating systems that have the concept of exec().
    is_cloexec() is always present and returns true on OSes where close-on-exec cannot be disabled or one cannot exec(). (posix message queue descriptors should have close-on-exec set by default).
  • mio Source & Evented: The impls require both AsRawFd and that mio compiles on the OS. This does not guarantee that the event notification mechanism used by mio supports posix message queues though. (registering fails on NetBSD)

On Linux, message queues and their permissions can be viewed in /dev/mqueue/. The kernel can be compiled to not support posix message queues, so it's not guaranteed to always work. (such as on Android)

On FreeBSD, the kernel module responsible for posix message queues is not loaded by default; Run kldload mqueuefs as root to enable it. To list queues, the file system must additionally be mounted first: mount -t mqueuefs null $somewhere.
Versions before 11 do not have the function used to get a file descriptor, so this library will not compile there.

On NetBSD, re-opening message queues multiple times can eventually make all further opens fail. This does not affect programs that open a single queue once.
The mio integration compiles, but registering message queues with mio fails.
Because NetBSD ignores cloexec when opening or cloning descriptors, there is a race condition with other threads exec'ing before this library can enable close-on-exec for the descriptor.

DragonFly BSD doesn't set cloexec when opening either, but does when cloning.

OS-dependent restrictions and default values

Not even limiting oneself to the core features is enough to guarantee portability!

 LinuxFreeBSDNetBSDDragonFly BSDIllumos
max priority3276763313131
default capacity10103232128
default max_msg_len819210249929921024
max capacity10*100512512No limit
max max_msg_len8192*163841638416384No limit
allows empty messagesYesYesNoNoYes
enforces name rulesYesYesNoNoYes
allows "/.", "/.." and "/"NoNoYesYesYes

On Linux the listed size limits only apply to unprivileged processes. As root there instead appears to be a combined limit on memory usage of the form capacity*(max_msg_len+k), but is several times higher than 10*8192.

Differences from the C API

  • send(), recv() and the timed equivalents tries again when EINTR / ErrorKind::Interrupted is returned. (Consistent with how std does IO)
  • open() and all other methods which take AsRef<[u8]> prepends '/' to the name if missing. (They have to copy the name anyway, to append a terminating '\0') Use open_c() and remove_queue_c() if you need to interact with queues on NetBSD or DragonFly that doesn't have a leading '/'.

Minimum supported Rust version

The minimum supported Rust version for posixmq 1.0.z releases is 1.31.1.
Later 1.y.0 releases might increase this. Until rustup has builds for DragonFly BSD and Illumos, the minimum version will not be increased past what is available in the repositories for those operating systems.

Structs

Attributes

Contains information about the capacities and state of a posix message queue.

IntoIter

An Iterator that recv()s messages from an owned PosixMq.

Iter

An Iterator that calls recv() on a borrowed PosixMq.

OpenOptions

Flags and parameters which control how a PosixMq message queue is opened or created.

PosixMq

A descriptor for an open posix message queue.

Functions

remove_queue

Delete a posix message queue.

remove_queue_c

Delete a posix message queue, without inspecting name or allocating.