Struct PosixMq

Source
pub struct PosixMq { /* private fields */ }
Expand description

A descriptor for an open posix message queue.

Message queues can be sent to and / or received from depending on the options it was opened with.

The descriptor is closed when this struct is dropped.

See the documentation in the crate root for examples, portability notes and OS details.

Implementations§

Source§

impl PosixMq

Source

pub fn open<N: AsRef<[u8]> + ?Sized>(name: &N) -> Result<Self, Error>

Open an existing message queue in read-write mode.

See OpenOptions::open() for details and possible errors.

Source

pub fn create<N: AsRef<[u8]> + ?Sized>(name: &N) -> Result<Self, Error>

Open a message queue in read-write mode, creating it if it doesn’t exists.

See OpenOptions::open() for details and possible errors.

Examples found in repository?
examples/limits.rs (line 81)
28fn main() {
29    println!("{}:", std::env::consts::OS);
30
31    let mq = posixmq::OpenOptions::readwrite()
32        .create_new()
33        .open("/default_capacities")
34        .expect("Cannot create new posix message queue /default_capacities");
35    posixmq::remove_queue("/default_capacities")
36        .expect("Cannot delete posix message queue /default_capacities");
37
38    let attrs = mq.attributes().expect("Cannot get attributes for queue");
39    println!("default queue capacity:         {}", attrs.capacity);
40    println!("default maximum message length: {}", attrs.max_msg_len);
41
42    println!("max message priority:           {}", btry(0xff_ff_ff_ff, |priority| {
43        mq.send(priority as u32, b"b")
44            .map(|_| mq.recv(&mut vec![0; attrs.max_msg_len]).unwrap() )
45            .is_ok()
46    }));
47    println!("allows empty messages:          {}", mq.send(0, b"").is_ok());
48    drop(mq);
49
50    println!("max queue capacity:             {}", btry(1_000_000, |capacity| {
51        posixmq::OpenOptions::readwrite()
52            .capacity(capacity)
53            .max_msg_len(1)
54            .create_new()
55            .open("/max_capacity")
56            .map(|_| posixmq::remove_queue("/max_capacity").unwrap() )
57            .is_ok()
58    }));
59
60    println!("max message length:             {}", btry(1_000_000, |length| {
61        posixmq::OpenOptions::readwrite()
62            .max_msg_len(length)
63            .capacity(1)
64            .create_new()
65            .open("/max_length")
66            .map(|_| posixmq::remove_queue("/max_length").unwrap() )
67            .is_ok()
68    }));
69
70    println!("max equal:                      {}", btry(1_000_000, |equal| {
71        posixmq::OpenOptions::readwrite()
72            .max_msg_len(equal)
73            .capacity(equal)
74            .create_new()
75            .open("/max_equal")
76            .map(|_| posixmq::remove_queue("/max_equal").unwrap() )
77            .is_ok()
78    }));
79
80    println!("allows just \"/\":                {}", {
81        let result = posixmq::PosixMq::create("/");
82        let _ = posixmq::remove_queue("/");
83        result.is_ok()
84    });
85    println!("enforces name rules:            {}", {
86        let noslash = std::ffi::CStr::from_bytes_with_nul(b"noslash\0").unwrap();
87        let result = posixmq::OpenOptions::readwrite().create().open_c(noslash);
88        let _ = posixmq::remove_queue_c(noslash);
89        result.is_err()
90    });
91}
Source

pub fn send(&self, priority: u32, msg: &[u8]) -> Result<(), Error>

Add a message to the queue.

For maximum portability, avoid using priorities >= 32 or sending zero-length messages.

§Errors
  • Queue is full and opened in nonblocking mode (EAGAIN) => ErrorKind::WouldBlock
  • Message is too big for the queue (EMSGSIZE) => ErrorKind::Other
  • Message is zero-length and the OS doesn’t allow this (EMSGSIZE) => ErrorKind::Other
  • Priority is too high (EINVAL) => ErrorKind::InvalidInput
  • Queue is opened in read-only mode (EBADF) => ErrorKind::Other
  • Possibly other => ErrorKind::Other
Examples found in repository?
examples/send.rs (line 31)
9fn main() {
10    let args = args_os().skip(1).collect::<Vec<_>>();
11    if args.len() % 2 == 0 {
12        eprintln!("Usage: cargo run --example send /queue-name [priority message] ...");
13        return;
14    }
15
16    // open the message queue
17    let mq = posixmq::OpenOptions::writeonly()
18        .create()
19        .mode(0o666) // FIXME set umask to actually make it sendable for others
20        .open(args[0].as_bytes())
21        .expect("opening failed");
22
23    if args.len() == 1 {
24        // read from stdin
25        for line in stdin().lock().lines() {
26            let line = line.expect("input must be UTF-8, sorry");
27            let line = line.trim_start();
28            let num_end = line.find(' ').expect("no space in line");
29            let priority = line[..num_end].parse::<u32>().expect("priority is not a number");
30            let msg = line[num_end+1..].trim_start();
31            mq.send(priority, msg.as_bytes()).expect("sending failed");
32        }
33    } else {
34        // read from the command line
35        for i in (1..args.len()).step_by(2) {
36            let priority = args[i].to_str()
37                .and_then(|s| s.parse::<u32>().ok() )
38                .expect("priority is not a number");
39            let msg = args[i+1].as_bytes();
40            mq.send(priority, msg).expect("sending failed");
41        }
42    }
43}
More examples
Hide additional examples
examples/sort.rs (line 35)
10fn main() {
11    let args = args_os()
12        .skip(1)
13        .map(|osstring| osstring.into_vec() )
14        .collect::<Vec<Vec<u8>>>();
15    if args.is_empty() {
16        return; // nothing to do
17    }
18
19    let mut recv_buf = vec![0; args.iter().map(Vec::len).max().unwrap()];
20
21    let name = CStr::from_bytes_with_nul(b"/sort\0").unwrap();
22
23    // create queue with the necessary permissions and open it
24    let mq = posixmq::OpenOptions::readwrite()
25        .nonblocking() // use WouldBlock to detect that the queue is empty
26        .create_new()
27        .mode(0o000) // only affects future attempts at opening it
28        .capacity(args.len())
29        .max_msg_len(recv_buf.len())
30        .open_c(name)
31        .expect("opening queue failed");
32
33    // write arguments to the queue
34    for arg in args {
35        mq.send(arg.len() as u32, &arg).expect("sending failed");
36    }
37
38    // read until the queue is empty
39    let stdout = stdout();
40    let mut stdout = stdout.lock();
41    loop {
42        match mq.recv(&mut recv_buf) {
43            Err(ref e) if e.kind() == ErrorKind::WouldBlock => break,
44            Err(e) => panic!("receiving failed: {}", e),
45            Ok((_priority, len)) => {
46                stdout.write_all(&recv_buf[..len])
47                    .and_then(|()| stdout.write_all(b"\n") )
48                    .expect("writing to stdout failed");
49            }
50        }
51    }
52
53    // clean up
54    posixmq::remove_queue_c(name).expect("deleting queue failed")
55}
examples/limits.rs (line 43)
28fn main() {
29    println!("{}:", std::env::consts::OS);
30
31    let mq = posixmq::OpenOptions::readwrite()
32        .create_new()
33        .open("/default_capacities")
34        .expect("Cannot create new posix message queue /default_capacities");
35    posixmq::remove_queue("/default_capacities")
36        .expect("Cannot delete posix message queue /default_capacities");
37
38    let attrs = mq.attributes().expect("Cannot get attributes for queue");
39    println!("default queue capacity:         {}", attrs.capacity);
40    println!("default maximum message length: {}", attrs.max_msg_len);
41
42    println!("max message priority:           {}", btry(0xff_ff_ff_ff, |priority| {
43        mq.send(priority as u32, b"b")
44            .map(|_| mq.recv(&mut vec![0; attrs.max_msg_len]).unwrap() )
45            .is_ok()
46    }));
47    println!("allows empty messages:          {}", mq.send(0, b"").is_ok());
48    drop(mq);
49
50    println!("max queue capacity:             {}", btry(1_000_000, |capacity| {
51        posixmq::OpenOptions::readwrite()
52            .capacity(capacity)
53            .max_msg_len(1)
54            .create_new()
55            .open("/max_capacity")
56            .map(|_| posixmq::remove_queue("/max_capacity").unwrap() )
57            .is_ok()
58    }));
59
60    println!("max message length:             {}", btry(1_000_000, |length| {
61        posixmq::OpenOptions::readwrite()
62            .max_msg_len(length)
63            .capacity(1)
64            .create_new()
65            .open("/max_length")
66            .map(|_| posixmq::remove_queue("/max_length").unwrap() )
67            .is_ok()
68    }));
69
70    println!("max equal:                      {}", btry(1_000_000, |equal| {
71        posixmq::OpenOptions::readwrite()
72            .max_msg_len(equal)
73            .capacity(equal)
74            .create_new()
75            .open("/max_equal")
76            .map(|_| posixmq::remove_queue("/max_equal").unwrap() )
77            .is_ok()
78    }));
79
80    println!("allows just \"/\":                {}", {
81        let result = posixmq::PosixMq::create("/");
82        let _ = posixmq::remove_queue("/");
83        result.is_ok()
84    });
85    println!("enforces name rules:            {}", {
86        let noslash = std::ffi::CStr::from_bytes_with_nul(b"noslash\0").unwrap();
87        let result = posixmq::OpenOptions::readwrite().create().open_c(noslash);
88        let _ = posixmq::remove_queue_c(noslash);
89        result.is_err()
90    });
91}
examples/merge.rs (line 66)
7fn main() {
8    use std::env::args;
9    use std::io::ErrorKind;
10
11    use mio::{Poll, Events, Interest, Token};
12
13    let mut queues = args().skip(1).collect::<Vec<_>>();
14    let dst = queues.pop().expect("arguments required");
15    
16    // open source queues
17    let mut src = Vec::new();
18    for name in queues {
19        match posixmq::OpenOptions::readonly().nonblocking().open(&name) {
20            Ok(mq) => src.push((mq, name)),
21            Err(e) => panic!("Cannot open {:?} for receiving: {}", name, e),
22        }
23    }
24
25    // open destination queue
26    let mut dst = match posixmq::OpenOptions::writeonly().nonblocking().create().open(&dst) {
27        Ok(mq) => (mq, dst),
28        Err(e) => panic!("Cannot open or create {:?} for sending: {}", dst, e),
29    };
30
31    let mut poll = Poll::new().expect("Cannot create selector");
32    poll.registry()
33        .register(&mut dst.0, Token(0), Interest::WRITABLE)
34        .expect("registering destination failed");
35    for (i, &mut(ref mut src, _)) in src.iter_mut().enumerate() {
36        poll.registry()
37            .register(src, Token(i+1), Interest::READABLE)
38            .expect("registering a source failed");
39    }
40
41    let mut unsent = Vec::<(u32, Box<[u8]>, &str)>::new();
42    let mut buf = [0; 8192];
43    let mut events = Events::with_capacity(1024);
44    loop {
45        poll.poll(&mut events, None).expect("Cannot poll selector");
46
47        // receive all available messages from queues that are ready
48        for event in events.iter() {
49            if event.token() == Token(0) {
50                // dst; will try to send below even without this event
51                continue;
52            }
53
54            let &(ref mq, ref name) = &src[event.token().0-1];
55            loop {
56                match mq.recv(&mut buf) {
57                    Err(ref e) if e.kind() == ErrorKind::WouldBlock => break,
58                    Err(e) => panic!("Error receiving from {}: {}", name, e),
59                    Ok((priority, len)) => unsent.push((priority, Box::from(&buf[..len]), name)),
60                }
61            }
62        }
63
64        // send as many messages as possible
65        while let Some(&(priority, ref msg, ref src)) = unsent.last() {
66            if let Err(e) = dst.0.send(priority, msg) {
67                if e.kind() == ErrorKind::WouldBlock {
68                    break;
69                }
70                panic!("Error sending to {}: {}", dst.1, e);
71            }
72            println!("message of priority {} with {} bytes from {} sent to {}", 
73                priority, msg.len(), src, dst.1
74            );
75            let _ = unsent.pop();
76        }
77    }
78}
Source

pub fn recv(&self, msgbuf: &mut [u8]) -> Result<(u32, usize), Error>

Take the message with the highest priority from the queue.

The buffer must be at least as big as the maximum message length.

§Errors
  • Queue is empty and opened in nonblocking mode (EAGAIN) => ErrorKind::WouldBlock
  • The receive buffer is smaller than the queue’s maximum message size (EMSGSIZE) => ErrorKind::Other
  • Queue is opened in write-only mode (EBADF) => ErrorKind::Other
  • Possibly other => ErrorKind::Other
Examples found in repository?
examples/sort.rs (line 42)
10fn main() {
11    let args = args_os()
12        .skip(1)
13        .map(|osstring| osstring.into_vec() )
14        .collect::<Vec<Vec<u8>>>();
15    if args.is_empty() {
16        return; // nothing to do
17    }
18
19    let mut recv_buf = vec![0; args.iter().map(Vec::len).max().unwrap()];
20
21    let name = CStr::from_bytes_with_nul(b"/sort\0").unwrap();
22
23    // create queue with the necessary permissions and open it
24    let mq = posixmq::OpenOptions::readwrite()
25        .nonblocking() // use WouldBlock to detect that the queue is empty
26        .create_new()
27        .mode(0o000) // only affects future attempts at opening it
28        .capacity(args.len())
29        .max_msg_len(recv_buf.len())
30        .open_c(name)
31        .expect("opening queue failed");
32
33    // write arguments to the queue
34    for arg in args {
35        mq.send(arg.len() as u32, &arg).expect("sending failed");
36    }
37
38    // read until the queue is empty
39    let stdout = stdout();
40    let mut stdout = stdout.lock();
41    loop {
42        match mq.recv(&mut recv_buf) {
43            Err(ref e) if e.kind() == ErrorKind::WouldBlock => break,
44            Err(e) => panic!("receiving failed: {}", e),
45            Ok((_priority, len)) => {
46                stdout.write_all(&recv_buf[..len])
47                    .and_then(|()| stdout.write_all(b"\n") )
48                    .expect("writing to stdout failed");
49            }
50        }
51    }
52
53    // clean up
54    posixmq::remove_queue_c(name).expect("deleting queue failed")
55}
More examples
Hide additional examples
examples/limits.rs (line 44)
28fn main() {
29    println!("{}:", std::env::consts::OS);
30
31    let mq = posixmq::OpenOptions::readwrite()
32        .create_new()
33        .open("/default_capacities")
34        .expect("Cannot create new posix message queue /default_capacities");
35    posixmq::remove_queue("/default_capacities")
36        .expect("Cannot delete posix message queue /default_capacities");
37
38    let attrs = mq.attributes().expect("Cannot get attributes for queue");
39    println!("default queue capacity:         {}", attrs.capacity);
40    println!("default maximum message length: {}", attrs.max_msg_len);
41
42    println!("max message priority:           {}", btry(0xff_ff_ff_ff, |priority| {
43        mq.send(priority as u32, b"b")
44            .map(|_| mq.recv(&mut vec![0; attrs.max_msg_len]).unwrap() )
45            .is_ok()
46    }));
47    println!("allows empty messages:          {}", mq.send(0, b"").is_ok());
48    drop(mq);
49
50    println!("max queue capacity:             {}", btry(1_000_000, |capacity| {
51        posixmq::OpenOptions::readwrite()
52            .capacity(capacity)
53            .max_msg_len(1)
54            .create_new()
55            .open("/max_capacity")
56            .map(|_| posixmq::remove_queue("/max_capacity").unwrap() )
57            .is_ok()
58    }));
59
60    println!("max message length:             {}", btry(1_000_000, |length| {
61        posixmq::OpenOptions::readwrite()
62            .max_msg_len(length)
63            .capacity(1)
64            .create_new()
65            .open("/max_length")
66            .map(|_| posixmq::remove_queue("/max_length").unwrap() )
67            .is_ok()
68    }));
69
70    println!("max equal:                      {}", btry(1_000_000, |equal| {
71        posixmq::OpenOptions::readwrite()
72            .max_msg_len(equal)
73            .capacity(equal)
74            .create_new()
75            .open("/max_equal")
76            .map(|_| posixmq::remove_queue("/max_equal").unwrap() )
77            .is_ok()
78    }));
79
80    println!("allows just \"/\":                {}", {
81        let result = posixmq::PosixMq::create("/");
82        let _ = posixmq::remove_queue("/");
83        result.is_ok()
84    });
85    println!("enforces name rules:            {}", {
86        let noslash = std::ffi::CStr::from_bytes_with_nul(b"noslash\0").unwrap();
87        let result = posixmq::OpenOptions::readwrite().create().open_c(noslash);
88        let _ = posixmq::remove_queue_c(noslash);
89        result.is_err()
90    });
91}
examples/merge.rs (line 56)
7fn main() {
8    use std::env::args;
9    use std::io::ErrorKind;
10
11    use mio::{Poll, Events, Interest, Token};
12
13    let mut queues = args().skip(1).collect::<Vec<_>>();
14    let dst = queues.pop().expect("arguments required");
15    
16    // open source queues
17    let mut src = Vec::new();
18    for name in queues {
19        match posixmq::OpenOptions::readonly().nonblocking().open(&name) {
20            Ok(mq) => src.push((mq, name)),
21            Err(e) => panic!("Cannot open {:?} for receiving: {}", name, e),
22        }
23    }
24
25    // open destination queue
26    let mut dst = match posixmq::OpenOptions::writeonly().nonblocking().create().open(&dst) {
27        Ok(mq) => (mq, dst),
28        Err(e) => panic!("Cannot open or create {:?} for sending: {}", dst, e),
29    };
30
31    let mut poll = Poll::new().expect("Cannot create selector");
32    poll.registry()
33        .register(&mut dst.0, Token(0), Interest::WRITABLE)
34        .expect("registering destination failed");
35    for (i, &mut(ref mut src, _)) in src.iter_mut().enumerate() {
36        poll.registry()
37            .register(src, Token(i+1), Interest::READABLE)
38            .expect("registering a source failed");
39    }
40
41    let mut unsent = Vec::<(u32, Box<[u8]>, &str)>::new();
42    let mut buf = [0; 8192];
43    let mut events = Events::with_capacity(1024);
44    loop {
45        poll.poll(&mut events, None).expect("Cannot poll selector");
46
47        // receive all available messages from queues that are ready
48        for event in events.iter() {
49            if event.token() == Token(0) {
50                // dst; will try to send below even without this event
51                continue;
52            }
53
54            let &(ref mq, ref name) = &src[event.token().0-1];
55            loop {
56                match mq.recv(&mut buf) {
57                    Err(ref e) if e.kind() == ErrorKind::WouldBlock => break,
58                    Err(e) => panic!("Error receiving from {}: {}", name, e),
59                    Ok((priority, len)) => unsent.push((priority, Box::from(&buf[..len]), name)),
60                }
61            }
62        }
63
64        // send as many messages as possible
65        while let Some(&(priority, ref msg, ref src)) = unsent.last() {
66            if let Err(e) = dst.0.send(priority, msg) {
67                if e.kind() == ErrorKind::WouldBlock {
68                    break;
69                }
70                panic!("Error sending to {}: {}", dst.1, e);
71            }
72            println!("message of priority {} with {} bytes from {} sent to {}", 
73                priority, msg.len(), src, dst.1
74            );
75            let _ = unsent.pop();
76        }
77    }
78}
Source

pub fn iter<'a>(&'a self) -> Iter<'a>

Returns an Iterator which calls recv() repeatedly with an appropriately sized buffer.

If the message queue is opened in non-blocking mode the iterator can be used to drain the queue. Otherwise it will block and never end.

Source

pub fn send_timeout( &self, priority: u32, msg: &[u8], timeout: Duration, ) -> Result<(), Error>

Add a message to the queue or cancel if it’s still full after a given duration.

Returns immediately if opened in nonblocking mode, and the timeout has no effect.

For maximum portability, avoid using priorities >= 32 or sending zero-length messages.

§Errors
  • Timeout expired (ETIMEDOUT) => ErrorKind::TimedOut
  • Message is too big for the queue (EMSGSIZE) => ErrorKind::Other
  • OS doesn’t allow empty messages (EMSGSIZE) => ErrorKind::Other
  • Priority is too high (EINVAL) => ErrorKind::InvalidInput
  • Queue is full and opened in nonblocking mode (EAGAIN) => ErrorKind::WouldBlock
  • Queue is opened in write-only mode (EBADF) => ErrorKind::Other
  • Timeout is too long / not representable => ErrorKind::InvalidInput
  • Possibly other => ErrorKind::Other
Source

pub fn send_deadline( &self, priority: u32, msg: &[u8], deadline: SystemTime, ) -> Result<(), Error>

Add a message to the queue or cancel if the queue is still full at a certain point in time.

Returns immediately if opened in nonblocking mode, and the timeout has no effect.
The deadline is a SystemTime because the queues are intended for inter-process commonication, and Instant might be process-specific.

For maximum portability, avoid using priorities >= 32 or sending zero-length messages.

§Errors
  • Deadline reached (ETIMEDOUT) => ErrorKind::TimedOut
  • Message is too big for the queue (EMSGSIZE) => ErrorKind::Other
  • OS doesn’t allow empty messages (EMSGSIZE) => ErrorKind::Other
  • Priority is too high (EINVAL) => ErrorKind::InvalidInput
  • Queue is full and opened in nonblocking mode (EAGAIN) => ErrorKind::WouldBlock
  • Queue is opened in write-only mode (EBADF) => ErrorKind::Other
  • Possibly other => ErrorKind::Other
Source

pub fn recv_timeout( &self, msgbuf: &mut [u8], timeout: Duration, ) -> Result<(u32, usize), Error>

Take the message with the highest priority from the queue or cancel if the queue still empty after a given duration.

Returns immediately if opened in nonblocking mode, and the timeout has no effect.

§Errors
  • Timeout expired (ETIMEDOUT) => ErrorKind::TimedOut
  • The receive buffer is smaller than the queue’s maximum message size (EMSGSIZE) => ErrorKind::Other
  • Queue is empty and opened in nonblocking mode (EAGAIN) => ErrorKind::WouldBlock
  • Queue is opened in read-only mode (EBADF) => ErrorKind::Other
  • Timeout is too long / not representable => ErrorKind::InvalidInput
  • Possibly other => ErrorKind::Other
Examples found in repository?
examples/receive.rs (line 35)
11fn main() {
12    let mut args = args_os().skip(1);
13    let name = args.next().expect("argument required");
14    let timeout = args.next().map(|arg| {
15        let arg = arg.into_string().expect("invalid timeout");
16        if arg == "nonblocking" || arg == "drain" || arg == "available" || arg == "empty" {
17            None
18        } else if arg.ends_with("ms") {
19            Some(Duration::from_millis(arg[..arg.len()-2].parse().expect("invalid duration")))
20        } else if arg.ends_with("s") {
21            Some(Duration::from_secs(arg[..arg.len()-1].parse().expect("invalid duration")))
22        } else {
23            Some(Duration::from_secs(arg.parse().expect("invalid duration")))
24        }
25    });
26
27    let mut opts = posixmq::OpenOptions::readonly();
28    if timeout == Some(None) {
29        opts = *opts.nonblocking();
30    }
31    let mq = opts.open(name.as_bytes()).expect("opening failed");
32
33    if let Some(Some(timeout)) = timeout {
34        let mut buf = vec![0; mq.attributes().unwrap_or_default().max_msg_len];
35        while let Ok((priority, len)) = mq.recv_timeout(&mut buf, timeout) {
36            print!("{:3}\t", priority);
37            stdout().write_all(&buf[..len]).expect("writing to stdout failed");
38            println!();
39        }
40    } else {
41        for (priority, msg) in mq {
42            print!("{:3}\t", priority);
43            stdout().write_all(&msg).expect("writing to stdout failed");
44            println!();
45        }
46    }
47}
Source

pub fn recv_deadline( &self, msgbuf: &mut [u8], deadline: SystemTime, ) -> Result<(u32, usize), Error>

Take the message with the highest priority from the queue or cancel if the queue is still empty at a point in time.

Returns immediately if opened in nonblocking mode, and the timeout has no effect.
The deadline is a SystemTime because the queues are intended for inter-process commonication, and Instant might be process-specific.

§Errors
  • Deadline reached (ETIMEDOUT) => ErrorKind::TimedOut
  • The receive buffer is smaller than the queue’s maximum message size (EMSGSIZE) => ErrorKind::Other
  • Queue is empty and opened in nonblocking mode (EAGAIN) => ErrorKind::WouldBlock
  • Queue is opened in read-only mode (EBADF) => ErrorKind::Other
  • Possibly other => ErrorKind::Other
Source

pub fn attributes(&self) -> Result<Attributes, Error>

Get information about the state of the message queue.

§Errors

Retrieving these attributes should only fail if the underlying descriptor has been closed or is not a message queue.

On operating systems where the descriptor is a pointer, such as on FreeBSD and Illumos, such bugs will enable undefined behavior and this call will dereference freed or uninitialized memory.
(That doesn’t make this function unsafe though - PosixMq::from_raw_mqd() and mq_close() are.)

While a send() or recv() ran in place of this call would also have failed immediately and therefore not blocked, The descriptor might have become used for another queue when a later send() or recv() is performed. The descriptor might then be in blocking mode.

§Examples
let mq = posixmq::OpenOptions::readwrite()
    .create_new()
    .max_msg_len(100)
    .capacity(3)
    .open("/with_custom_capacity")
    .expect("create queue");
let attrs = mq.attributes().expect("get attributes for queue");
assert_eq!(attrs.max_msg_len, 100);
assert_eq!(attrs.capacity, 3);
assert_eq!(attrs.current_messages, 0);
assert!(!attrs.nonblocking);

Ignore the error:

(Will only happen with buggy code (incorrect usage of from_raw_fd() or similar)).

let attrs = bad.attributes().unwrap_or_default();
assert_eq!(attrs.max_msg_len, 0);
assert_eq!(attrs.capacity, 0);
assert_eq!(attrs.current_messages, 0);
assert!(!attrs.nonblocking);
Examples found in repository?
examples/receive.rs (line 34)
11fn main() {
12    let mut args = args_os().skip(1);
13    let name = args.next().expect("argument required");
14    let timeout = args.next().map(|arg| {
15        let arg = arg.into_string().expect("invalid timeout");
16        if arg == "nonblocking" || arg == "drain" || arg == "available" || arg == "empty" {
17            None
18        } else if arg.ends_with("ms") {
19            Some(Duration::from_millis(arg[..arg.len()-2].parse().expect("invalid duration")))
20        } else if arg.ends_with("s") {
21            Some(Duration::from_secs(arg[..arg.len()-1].parse().expect("invalid duration")))
22        } else {
23            Some(Duration::from_secs(arg.parse().expect("invalid duration")))
24        }
25    });
26
27    let mut opts = posixmq::OpenOptions::readonly();
28    if timeout == Some(None) {
29        opts = *opts.nonblocking();
30    }
31    let mq = opts.open(name.as_bytes()).expect("opening failed");
32
33    if let Some(Some(timeout)) = timeout {
34        let mut buf = vec![0; mq.attributes().unwrap_or_default().max_msg_len];
35        while let Ok((priority, len)) = mq.recv_timeout(&mut buf, timeout) {
36            print!("{:3}\t", priority);
37            stdout().write_all(&buf[..len]).expect("writing to stdout failed");
38            println!();
39        }
40    } else {
41        for (priority, msg) in mq {
42            print!("{:3}\t", priority);
43            stdout().write_all(&msg).expect("writing to stdout failed");
44            println!();
45        }
46    }
47}
More examples
Hide additional examples
examples/limits.rs (line 38)
28fn main() {
29    println!("{}:", std::env::consts::OS);
30
31    let mq = posixmq::OpenOptions::readwrite()
32        .create_new()
33        .open("/default_capacities")
34        .expect("Cannot create new posix message queue /default_capacities");
35    posixmq::remove_queue("/default_capacities")
36        .expect("Cannot delete posix message queue /default_capacities");
37
38    let attrs = mq.attributes().expect("Cannot get attributes for queue");
39    println!("default queue capacity:         {}", attrs.capacity);
40    println!("default maximum message length: {}", attrs.max_msg_len);
41
42    println!("max message priority:           {}", btry(0xff_ff_ff_ff, |priority| {
43        mq.send(priority as u32, b"b")
44            .map(|_| mq.recv(&mut vec![0; attrs.max_msg_len]).unwrap() )
45            .is_ok()
46    }));
47    println!("allows empty messages:          {}", mq.send(0, b"").is_ok());
48    drop(mq);
49
50    println!("max queue capacity:             {}", btry(1_000_000, |capacity| {
51        posixmq::OpenOptions::readwrite()
52            .capacity(capacity)
53            .max_msg_len(1)
54            .create_new()
55            .open("/max_capacity")
56            .map(|_| posixmq::remove_queue("/max_capacity").unwrap() )
57            .is_ok()
58    }));
59
60    println!("max message length:             {}", btry(1_000_000, |length| {
61        posixmq::OpenOptions::readwrite()
62            .max_msg_len(length)
63            .capacity(1)
64            .create_new()
65            .open("/max_length")
66            .map(|_| posixmq::remove_queue("/max_length").unwrap() )
67            .is_ok()
68    }));
69
70    println!("max equal:                      {}", btry(1_000_000, |equal| {
71        posixmq::OpenOptions::readwrite()
72            .max_msg_len(equal)
73            .capacity(equal)
74            .create_new()
75            .open("/max_equal")
76            .map(|_| posixmq::remove_queue("/max_equal").unwrap() )
77            .is_ok()
78    }));
79
80    println!("allows just \"/\":                {}", {
81        let result = posixmq::PosixMq::create("/");
82        let _ = posixmq::remove_queue("/");
83        result.is_ok()
84    });
85    println!("enforces name rules:            {}", {
86        let noslash = std::ffi::CStr::from_bytes_with_nul(b"noslash\0").unwrap();
87        let result = posixmq::OpenOptions::readwrite().create().open_c(noslash);
88        let _ = posixmq::remove_queue_c(noslash);
89        result.is_err()
90    });
91}
Source

pub fn is_nonblocking(&self) -> Result<bool, Error>

Check whether this descriptor is in nonblocking mode.

§Errors

Should only fail as result of buggy code that either created this descriptor from something that is not a queue, or has already closed the underlying descriptor.
(This function will not silently succeed if the fd points to anything other than a queue (for example a socket), as this function is a wrapper around [attributes()][#method.attributes].)
To ignore failure, one can write .is_nonblocking().unwrap_or(false).

§An error doesn’t guarantee that any further send() or recv() wont block.

While a send() or recv() ran in place of this call would also have failed immediately and therefore not blocked, the descriptor might have become used for another queue when a later send() or recv() is performed. The descriptor might then be in blocking mode.

Source

pub fn set_nonblocking(&self, nonblocking: bool) -> Result<(), Error>

Enable or disable nonblocking mode for this descriptor.

This can also be set when opening the message queue, with OpenOptions::nonblocking().

§Errors

Setting nonblocking mode should only fail due to incorrect usage of from_raw_fd() or as_raw_fd(), see the documentation on attributes() for details.

Source

pub fn try_clone(&self) -> Result<Self, Error>

Create a new descriptor for the same message queue.

The new descriptor will have close-on-exec set.

This function is not available on FreeBSD, Illumos or Solaris.

Source

pub fn is_cloexec(&self) -> Result<bool, Error>

Check whether this descriptor will be closed if the process execs into another program.

Posix message queues are closed on exec by default, but this can be changed with set_cloexec().

This function is not available on Illumos, Solaris or VxWorks.

§Errors

Retrieving this flag should only fail if the descriptor is already closed.
In that case it will obviously not be open after execing, so treating errors as true should be safe.

§Examples
let queue = posixmq::PosixMq::create("is_cloexec").expect("open queue");
assert!(queue.is_cloexec().unwrap_or(true));
Source

pub fn set_cloexec(&self, cloexec: bool) -> Result<(), Error>

Change close-on-exec for this descriptor.

It is on by default, so this method should only be called when one wants the descriptor to remain open afte execing.

This function is not available on Illumos, Solaris or VxWorks.

§Errors

This function should only fail if the underlying file descriptor has been closed (due to incorrect usage of from_raw_fd() or similar), and not reused for something else yet.

Source

pub unsafe fn from_raw_mqd(mqd: mqd_t) -> Self

Create a PosixMq from an already opened message queue descriptor.

This function should only be used for ffi or if calling mq_open() directly for some reason.
Use from_raw_fd() instead if the surrounding code requires mqd_t to be a file descriptor.

§Safety

On some operating systems mqd_t is a pointer, which means that the safety of most other methods depend on it being correct.

Source

pub fn as_raw_mqd(&self) -> mqd_t

Get the raw message queue descriptor.

This function should only be used for passing to ffi code or to access portable features not exposed by this wrapper (such as calling mq_notify() or not automatically retrying on EINTR / ErrorKind::Interrupted when sending or receiving).

If you need a file descriptor, use as_raw_fd() instead for increased portability. (as_raw_fd() can sometimes retrieve an underlying file descriptor even if mqd_t is not an int.)

Source

pub fn into_raw_mqd(self) -> mqd_t

Convert this wrapper into the raw message queue descriptor without closing it.

This function should only be used for ffi; If you need a file descriptor use into_raw_fd() instead.

Trait Implementations§

Source§

impl AsRawFd for PosixMq

Get an underlying file descriptor for the message queue.

If you just need the raw mqd_t, use as_raw_mqd() instead for increased portability.

This impl is not available on Illumos, Solaris or VxWorks.

Source§

fn as_raw_fd(&self) -> RawFd

Extracts the raw file descriptor. Read more
Source§

impl Debug for PosixMq

Source§

fn fmt(&self, fmtr: &mut Formatter<'_>) -> Result

Formats the value using the given formatter. Read more
Source§

impl Drop for PosixMq

Source§

fn drop(&mut self)

Executes the destructor for this type. Read more
Source§

impl Evented for PosixMq

Allow receiving event notifications through mio (version 0.6).

This impl requires the mio_06 feature to be enabled:

[dependencies]
posixmq = {version="1.0", features=["mio_06"]}

Remember to open the queue in non-blocking mode. (with OpenOptions.noblocking())

Source§

fn register( &self, poll: &Poll, token: Token, interest: Ready, opts: PollOpt, ) -> Result<(), Error>

Register self with the given Poll instance. Read more
Source§

fn reregister( &self, poll: &Poll, token: Token, interest: Ready, opts: PollOpt, ) -> Result<(), Error>

Re-register self with the given Poll instance. Read more
Source§

fn deregister(&self, poll: &Poll) -> Result<(), Error>

Deregister self from the given Poll instance Read more
Source§

impl FromRawFd for PosixMq

Create a PosixMq wrapper from a raw file descriptor.

Note that the message queue will be closed when the returned PosixMq goes out of scope / is dropped.

This impl is not available on FreeBSD, Illumos or Solaris; If you got a mqd_t in a portable fashion (from FFI code or by calling mq_open() yourself for some reason), use from_raw_mqd() instead.

Source§

unsafe fn from_raw_fd(fd: RawFd) -> Self

Constructs a new instance of Self from the given raw file descriptor. Read more
Source§

impl<'a> IntoIterator for &'a PosixMq

Source§

type Item = (u32, Vec<u8>)

The type of the elements being iterated over.
Source§

type IntoIter = Iter<'a>

Which kind of iterator are we turning this into?
Source§

fn into_iter(self) -> Iter<'a>

Creates an iterator from a value. Read more
Source§

impl IntoIterator for PosixMq

Source§

type Item = (u32, Vec<u8>)

The type of the elements being iterated over.
Source§

type IntoIter = IntoIter

Which kind of iterator are we turning this into?
Source§

fn into_iter(self) -> IntoIter

Creates an iterator from a value. Read more
Source§

impl IntoRawFd for PosixMq

Convert the PosixMq into a raw file descriptor without closing the message queue.

This impl is not available on FreeBSD, Illumos or Solaris. If you need to transfer ownership to FFI code accepting a mqd_t, use into_raw_mqd() instead.

Source§

fn into_raw_fd(self) -> RawFd

Consumes this object, returning the raw underlying file descriptor. Read more
Source§

impl Source for &PosixMq

Allow receiving event notifications through mio (version 0.7).

This impl requires the mio_07 feature to be enabled:

[dependencies]
posixmq = {version="1.0", features=["mio_07"]}

Due to a long-lived bug in cargo this will currently enable the os_reactor feature of mio. This is not intended, and can change in the future.

You probably want to make the queue non-blocking: Either use OpenOptions.noblocking() when preparing to open the queue, or call set_nonblocking(true).

Source§

fn register( &mut self, registry: &Registry, token: Token, interest: Interest, ) -> Result<(), Error>

Register self with the given Registry instance. Read more
Source§

fn reregister( &mut self, registry: &Registry, token: Token, interest: Interest, ) -> Result<(), Error>

Re-register self with the given Registry instance. Read more
Source§

fn deregister(&mut self, registry: &Registry) -> Result<(), Error>

Deregister self from the given Registry instance. Read more
Source§

impl Source for PosixMq

Source§

fn register( &mut self, registry: &Registry, token: Token, interest: Interest, ) -> Result<(), Error>

Register self with the given Registry instance. Read more
Source§

fn reregister( &mut self, registry: &Registry, token: Token, interest: Interest, ) -> Result<(), Error>

Re-register self with the given Registry instance. Read more
Source§

fn deregister(&mut self, registry: &Registry) -> Result<(), Error>

Deregister self from the given Registry instance. Read more
Source§

impl Send for PosixMq

Auto Trait Implementations§

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.