Struct OpenOptions

Source
pub struct OpenOptions { /* private fields */ }
Expand description

Flags and parameters which control how a PosixMq message queue is opened or created.

Implementations§

Source§

impl OpenOptions

Source

pub fn readonly() -> Self

Open message queue for receiving only.

Examples found in repository?
examples/receive.rs (line 27)
11fn main() {
12    let mut args = args_os().skip(1);
13    let name = args.next().expect("argument required");
14    let timeout = args.next().map(|arg| {
15        let arg = arg.into_string().expect("invalid timeout");
16        if arg == "nonblocking" || arg == "drain" || arg == "available" || arg == "empty" {
17            None
18        } else if arg.ends_with("ms") {
19            Some(Duration::from_millis(arg[..arg.len()-2].parse().expect("invalid duration")))
20        } else if arg.ends_with("s") {
21            Some(Duration::from_secs(arg[..arg.len()-1].parse().expect("invalid duration")))
22        } else {
23            Some(Duration::from_secs(arg.parse().expect("invalid duration")))
24        }
25    });
26
27    let mut opts = posixmq::OpenOptions::readonly();
28    if timeout == Some(None) {
29        opts = *opts.nonblocking();
30    }
31    let mq = opts.open(name.as_bytes()).expect("opening failed");
32
33    if let Some(Some(timeout)) = timeout {
34        let mut buf = vec![0; mq.attributes().unwrap_or_default().max_msg_len];
35        while let Ok((priority, len)) = mq.recv_timeout(&mut buf, timeout) {
36            print!("{:3}\t", priority);
37            stdout().write_all(&buf[..len]).expect("writing to stdout failed");
38            println!();
39        }
40    } else {
41        for (priority, msg) in mq {
42            print!("{:3}\t", priority);
43            stdout().write_all(&msg).expect("writing to stdout failed");
44            println!();
45        }
46    }
47}
More examples
Hide additional examples
examples/merge.rs (line 19)
7fn main() {
8    use std::env::args;
9    use std::io::ErrorKind;
10
11    use mio::{Poll, Events, Interest, Token};
12
13    let mut queues = args().skip(1).collect::<Vec<_>>();
14    let dst = queues.pop().expect("arguments required");
15    
16    // open source queues
17    let mut src = Vec::new();
18    for name in queues {
19        match posixmq::OpenOptions::readonly().nonblocking().open(&name) {
20            Ok(mq) => src.push((mq, name)),
21            Err(e) => panic!("Cannot open {:?} for receiving: {}", name, e),
22        }
23    }
24
25    // open destination queue
26    let mut dst = match posixmq::OpenOptions::writeonly().nonblocking().create().open(&dst) {
27        Ok(mq) => (mq, dst),
28        Err(e) => panic!("Cannot open or create {:?} for sending: {}", dst, e),
29    };
30
31    let mut poll = Poll::new().expect("Cannot create selector");
32    poll.registry()
33        .register(&mut dst.0, Token(0), Interest::WRITABLE)
34        .expect("registering destination failed");
35    for (i, &mut(ref mut src, _)) in src.iter_mut().enumerate() {
36        poll.registry()
37            .register(src, Token(i+1), Interest::READABLE)
38            .expect("registering a source failed");
39    }
40
41    let mut unsent = Vec::<(u32, Box<[u8]>, &str)>::new();
42    let mut buf = [0; 8192];
43    let mut events = Events::with_capacity(1024);
44    loop {
45        poll.poll(&mut events, None).expect("Cannot poll selector");
46
47        // receive all available messages from queues that are ready
48        for event in events.iter() {
49            if event.token() == Token(0) {
50                // dst; will try to send below even without this event
51                continue;
52            }
53
54            let &(ref mq, ref name) = &src[event.token().0-1];
55            loop {
56                match mq.recv(&mut buf) {
57                    Err(ref e) if e.kind() == ErrorKind::WouldBlock => break,
58                    Err(e) => panic!("Error receiving from {}: {}", name, e),
59                    Ok((priority, len)) => unsent.push((priority, Box::from(&buf[..len]), name)),
60                }
61            }
62        }
63
64        // send as many messages as possible
65        while let Some(&(priority, ref msg, ref src)) = unsent.last() {
66            if let Err(e) = dst.0.send(priority, msg) {
67                if e.kind() == ErrorKind::WouldBlock {
68                    break;
69                }
70                panic!("Error sending to {}: {}", dst.1, e);
71            }
72            println!("message of priority {} with {} bytes from {} sent to {}", 
73                priority, msg.len(), src, dst.1
74            );
75            let _ = unsent.pop();
76        }
77    }
78}
Source

pub fn writeonly() -> Self

Open message queue for sending only.

Examples found in repository?
examples/send.rs (line 17)
9fn main() {
10    let args = args_os().skip(1).collect::<Vec<_>>();
11    if args.len() % 2 == 0 {
12        eprintln!("Usage: cargo run --example send /queue-name [priority message] ...");
13        return;
14    }
15
16    // open the message queue
17    let mq = posixmq::OpenOptions::writeonly()
18        .create()
19        .mode(0o666) // FIXME set umask to actually make it sendable for others
20        .open(args[0].as_bytes())
21        .expect("opening failed");
22
23    if args.len() == 1 {
24        // read from stdin
25        for line in stdin().lock().lines() {
26            let line = line.expect("input must be UTF-8, sorry");
27            let line = line.trim_start();
28            let num_end = line.find(' ').expect("no space in line");
29            let priority = line[..num_end].parse::<u32>().expect("priority is not a number");
30            let msg = line[num_end+1..].trim_start();
31            mq.send(priority, msg.as_bytes()).expect("sending failed");
32        }
33    } else {
34        // read from the command line
35        for i in (1..args.len()).step_by(2) {
36            let priority = args[i].to_str()
37                .and_then(|s| s.parse::<u32>().ok() )
38                .expect("priority is not a number");
39            let msg = args[i+1].as_bytes();
40            mq.send(priority, msg).expect("sending failed");
41        }
42    }
43}
More examples
Hide additional examples
examples/merge.rs (line 26)
7fn main() {
8    use std::env::args;
9    use std::io::ErrorKind;
10
11    use mio::{Poll, Events, Interest, Token};
12
13    let mut queues = args().skip(1).collect::<Vec<_>>();
14    let dst = queues.pop().expect("arguments required");
15    
16    // open source queues
17    let mut src = Vec::new();
18    for name in queues {
19        match posixmq::OpenOptions::readonly().nonblocking().open(&name) {
20            Ok(mq) => src.push((mq, name)),
21            Err(e) => panic!("Cannot open {:?} for receiving: {}", name, e),
22        }
23    }
24
25    // open destination queue
26    let mut dst = match posixmq::OpenOptions::writeonly().nonblocking().create().open(&dst) {
27        Ok(mq) => (mq, dst),
28        Err(e) => panic!("Cannot open or create {:?} for sending: {}", dst, e),
29    };
30
31    let mut poll = Poll::new().expect("Cannot create selector");
32    poll.registry()
33        .register(&mut dst.0, Token(0), Interest::WRITABLE)
34        .expect("registering destination failed");
35    for (i, &mut(ref mut src, _)) in src.iter_mut().enumerate() {
36        poll.registry()
37            .register(src, Token(i+1), Interest::READABLE)
38            .expect("registering a source failed");
39    }
40
41    let mut unsent = Vec::<(u32, Box<[u8]>, &str)>::new();
42    let mut buf = [0; 8192];
43    let mut events = Events::with_capacity(1024);
44    loop {
45        poll.poll(&mut events, None).expect("Cannot poll selector");
46
47        // receive all available messages from queues that are ready
48        for event in events.iter() {
49            if event.token() == Token(0) {
50                // dst; will try to send below even without this event
51                continue;
52            }
53
54            let &(ref mq, ref name) = &src[event.token().0-1];
55            loop {
56                match mq.recv(&mut buf) {
57                    Err(ref e) if e.kind() == ErrorKind::WouldBlock => break,
58                    Err(e) => panic!("Error receiving from {}: {}", name, e),
59                    Ok((priority, len)) => unsent.push((priority, Box::from(&buf[..len]), name)),
60                }
61            }
62        }
63
64        // send as many messages as possible
65        while let Some(&(priority, ref msg, ref src)) = unsent.last() {
66            if let Err(e) = dst.0.send(priority, msg) {
67                if e.kind() == ErrorKind::WouldBlock {
68                    break;
69                }
70                panic!("Error sending to {}: {}", dst.1, e);
71            }
72            println!("message of priority {} with {} bytes from {} sent to {}", 
73                priority, msg.len(), src, dst.1
74            );
75            let _ = unsent.pop();
76        }
77    }
78}
Source

pub fn readwrite() -> Self

Open message queue both for sending and receiving.

Examples found in repository?
examples/sort.rs (line 24)
10fn main() {
11    let args = args_os()
12        .skip(1)
13        .map(|osstring| osstring.into_vec() )
14        .collect::<Vec<Vec<u8>>>();
15    if args.is_empty() {
16        return; // nothing to do
17    }
18
19    let mut recv_buf = vec![0; args.iter().map(Vec::len).max().unwrap()];
20
21    let name = CStr::from_bytes_with_nul(b"/sort\0").unwrap();
22
23    // create queue with the necessary permissions and open it
24    let mq = posixmq::OpenOptions::readwrite()
25        .nonblocking() // use WouldBlock to detect that the queue is empty
26        .create_new()
27        .mode(0o000) // only affects future attempts at opening it
28        .capacity(args.len())
29        .max_msg_len(recv_buf.len())
30        .open_c(name)
31        .expect("opening queue failed");
32
33    // write arguments to the queue
34    for arg in args {
35        mq.send(arg.len() as u32, &arg).expect("sending failed");
36    }
37
38    // read until the queue is empty
39    let stdout = stdout();
40    let mut stdout = stdout.lock();
41    loop {
42        match mq.recv(&mut recv_buf) {
43            Err(ref e) if e.kind() == ErrorKind::WouldBlock => break,
44            Err(e) => panic!("receiving failed: {}", e),
45            Ok((_priority, len)) => {
46                stdout.write_all(&recv_buf[..len])
47                    .and_then(|()| stdout.write_all(b"\n") )
48                    .expect("writing to stdout failed");
49            }
50        }
51    }
52
53    // clean up
54    posixmq::remove_queue_c(name).expect("deleting queue failed")
55}
More examples
Hide additional examples
examples/limits.rs (line 31)
28fn main() {
29    println!("{}:", std::env::consts::OS);
30
31    let mq = posixmq::OpenOptions::readwrite()
32        .create_new()
33        .open("/default_capacities")
34        .expect("Cannot create new posix message queue /default_capacities");
35    posixmq::remove_queue("/default_capacities")
36        .expect("Cannot delete posix message queue /default_capacities");
37
38    let attrs = mq.attributes().expect("Cannot get attributes for queue");
39    println!("default queue capacity:         {}", attrs.capacity);
40    println!("default maximum message length: {}", attrs.max_msg_len);
41
42    println!("max message priority:           {}", btry(0xff_ff_ff_ff, |priority| {
43        mq.send(priority as u32, b"b")
44            .map(|_| mq.recv(&mut vec![0; attrs.max_msg_len]).unwrap() )
45            .is_ok()
46    }));
47    println!("allows empty messages:          {}", mq.send(0, b"").is_ok());
48    drop(mq);
49
50    println!("max queue capacity:             {}", btry(1_000_000, |capacity| {
51        posixmq::OpenOptions::readwrite()
52            .capacity(capacity)
53            .max_msg_len(1)
54            .create_new()
55            .open("/max_capacity")
56            .map(|_| posixmq::remove_queue("/max_capacity").unwrap() )
57            .is_ok()
58    }));
59
60    println!("max message length:             {}", btry(1_000_000, |length| {
61        posixmq::OpenOptions::readwrite()
62            .max_msg_len(length)
63            .capacity(1)
64            .create_new()
65            .open("/max_length")
66            .map(|_| posixmq::remove_queue("/max_length").unwrap() )
67            .is_ok()
68    }));
69
70    println!("max equal:                      {}", btry(1_000_000, |equal| {
71        posixmq::OpenOptions::readwrite()
72            .max_msg_len(equal)
73            .capacity(equal)
74            .create_new()
75            .open("/max_equal")
76            .map(|_| posixmq::remove_queue("/max_equal").unwrap() )
77            .is_ok()
78    }));
79
80    println!("allows just \"/\":                {}", {
81        let result = posixmq::PosixMq::create("/");
82        let _ = posixmq::remove_queue("/");
83        result.is_ok()
84    });
85    println!("enforces name rules:            {}", {
86        let noslash = std::ffi::CStr::from_bytes_with_nul(b"noslash\0").unwrap();
87        let result = posixmq::OpenOptions::readwrite().create().open_c(noslash);
88        let _ = posixmq::remove_queue_c(noslash);
89        result.is_err()
90    });
91}
Source

pub fn mode(&mut self, mode: u32) -> &mut Self

Set permissions to create the queue with.

Some bits might be cleared by the process’s umask when creating the queue, and unknown bits are ignored.

This field is ignored if the queue already exists or should not be created. If this method is not called, queues are created with mode 600.

Examples found in repository?
examples/send.rs (line 19)
9fn main() {
10    let args = args_os().skip(1).collect::<Vec<_>>();
11    if args.len() % 2 == 0 {
12        eprintln!("Usage: cargo run --example send /queue-name [priority message] ...");
13        return;
14    }
15
16    // open the message queue
17    let mq = posixmq::OpenOptions::writeonly()
18        .create()
19        .mode(0o666) // FIXME set umask to actually make it sendable for others
20        .open(args[0].as_bytes())
21        .expect("opening failed");
22
23    if args.len() == 1 {
24        // read from stdin
25        for line in stdin().lock().lines() {
26            let line = line.expect("input must be UTF-8, sorry");
27            let line = line.trim_start();
28            let num_end = line.find(' ').expect("no space in line");
29            let priority = line[..num_end].parse::<u32>().expect("priority is not a number");
30            let msg = line[num_end+1..].trim_start();
31            mq.send(priority, msg.as_bytes()).expect("sending failed");
32        }
33    } else {
34        // read from the command line
35        for i in (1..args.len()).step_by(2) {
36            let priority = args[i].to_str()
37                .and_then(|s| s.parse::<u32>().ok() )
38                .expect("priority is not a number");
39            let msg = args[i+1].as_bytes();
40            mq.send(priority, msg).expect("sending failed");
41        }
42    }
43}
More examples
Hide additional examples
examples/sort.rs (line 27)
10fn main() {
11    let args = args_os()
12        .skip(1)
13        .map(|osstring| osstring.into_vec() )
14        .collect::<Vec<Vec<u8>>>();
15    if args.is_empty() {
16        return; // nothing to do
17    }
18
19    let mut recv_buf = vec![0; args.iter().map(Vec::len).max().unwrap()];
20
21    let name = CStr::from_bytes_with_nul(b"/sort\0").unwrap();
22
23    // create queue with the necessary permissions and open it
24    let mq = posixmq::OpenOptions::readwrite()
25        .nonblocking() // use WouldBlock to detect that the queue is empty
26        .create_new()
27        .mode(0o000) // only affects future attempts at opening it
28        .capacity(args.len())
29        .max_msg_len(recv_buf.len())
30        .open_c(name)
31        .expect("opening queue failed");
32
33    // write arguments to the queue
34    for arg in args {
35        mq.send(arg.len() as u32, &arg).expect("sending failed");
36    }
37
38    // read until the queue is empty
39    let stdout = stdout();
40    let mut stdout = stdout.lock();
41    loop {
42        match mq.recv(&mut recv_buf) {
43            Err(ref e) if e.kind() == ErrorKind::WouldBlock => break,
44            Err(e) => panic!("receiving failed: {}", e),
45            Ok((_priority, len)) => {
46                stdout.write_all(&recv_buf[..len])
47                    .and_then(|()| stdout.write_all(b"\n") )
48                    .expect("writing to stdout failed");
49            }
50        }
51    }
52
53    // clean up
54    posixmq::remove_queue_c(name).expect("deleting queue failed")
55}
Source

pub fn max_msg_len(&mut self, max_msg_len: usize) -> &mut Self

Set the maximum size of each message.

recv() will fail if given a buffer smaller than this value.

If max_msg_len and capacity are both zero (or not set), the queue will be created with a maximum length and capacity decided by the operating system.
If this value is specified, capacity should also be, or opening the message queue might fail.

Examples found in repository?
examples/sort.rs (line 29)
10fn main() {
11    let args = args_os()
12        .skip(1)
13        .map(|osstring| osstring.into_vec() )
14        .collect::<Vec<Vec<u8>>>();
15    if args.is_empty() {
16        return; // nothing to do
17    }
18
19    let mut recv_buf = vec![0; args.iter().map(Vec::len).max().unwrap()];
20
21    let name = CStr::from_bytes_with_nul(b"/sort\0").unwrap();
22
23    // create queue with the necessary permissions and open it
24    let mq = posixmq::OpenOptions::readwrite()
25        .nonblocking() // use WouldBlock to detect that the queue is empty
26        .create_new()
27        .mode(0o000) // only affects future attempts at opening it
28        .capacity(args.len())
29        .max_msg_len(recv_buf.len())
30        .open_c(name)
31        .expect("opening queue failed");
32
33    // write arguments to the queue
34    for arg in args {
35        mq.send(arg.len() as u32, &arg).expect("sending failed");
36    }
37
38    // read until the queue is empty
39    let stdout = stdout();
40    let mut stdout = stdout.lock();
41    loop {
42        match mq.recv(&mut recv_buf) {
43            Err(ref e) if e.kind() == ErrorKind::WouldBlock => break,
44            Err(e) => panic!("receiving failed: {}", e),
45            Ok((_priority, len)) => {
46                stdout.write_all(&recv_buf[..len])
47                    .and_then(|()| stdout.write_all(b"\n") )
48                    .expect("writing to stdout failed");
49            }
50        }
51    }
52
53    // clean up
54    posixmq::remove_queue_c(name).expect("deleting queue failed")
55}
More examples
Hide additional examples
examples/limits.rs (line 53)
28fn main() {
29    println!("{}:", std::env::consts::OS);
30
31    let mq = posixmq::OpenOptions::readwrite()
32        .create_new()
33        .open("/default_capacities")
34        .expect("Cannot create new posix message queue /default_capacities");
35    posixmq::remove_queue("/default_capacities")
36        .expect("Cannot delete posix message queue /default_capacities");
37
38    let attrs = mq.attributes().expect("Cannot get attributes for queue");
39    println!("default queue capacity:         {}", attrs.capacity);
40    println!("default maximum message length: {}", attrs.max_msg_len);
41
42    println!("max message priority:           {}", btry(0xff_ff_ff_ff, |priority| {
43        mq.send(priority as u32, b"b")
44            .map(|_| mq.recv(&mut vec![0; attrs.max_msg_len]).unwrap() )
45            .is_ok()
46    }));
47    println!("allows empty messages:          {}", mq.send(0, b"").is_ok());
48    drop(mq);
49
50    println!("max queue capacity:             {}", btry(1_000_000, |capacity| {
51        posixmq::OpenOptions::readwrite()
52            .capacity(capacity)
53            .max_msg_len(1)
54            .create_new()
55            .open("/max_capacity")
56            .map(|_| posixmq::remove_queue("/max_capacity").unwrap() )
57            .is_ok()
58    }));
59
60    println!("max message length:             {}", btry(1_000_000, |length| {
61        posixmq::OpenOptions::readwrite()
62            .max_msg_len(length)
63            .capacity(1)
64            .create_new()
65            .open("/max_length")
66            .map(|_| posixmq::remove_queue("/max_length").unwrap() )
67            .is_ok()
68    }));
69
70    println!("max equal:                      {}", btry(1_000_000, |equal| {
71        posixmq::OpenOptions::readwrite()
72            .max_msg_len(equal)
73            .capacity(equal)
74            .create_new()
75            .open("/max_equal")
76            .map(|_| posixmq::remove_queue("/max_equal").unwrap() )
77            .is_ok()
78    }));
79
80    println!("allows just \"/\":                {}", {
81        let result = posixmq::PosixMq::create("/");
82        let _ = posixmq::remove_queue("/");
83        result.is_ok()
84    });
85    println!("enforces name rules:            {}", {
86        let noslash = std::ffi::CStr::from_bytes_with_nul(b"noslash\0").unwrap();
87        let result = posixmq::OpenOptions::readwrite().create().open_c(noslash);
88        let _ = posixmq::remove_queue_c(noslash);
89        result.is_err()
90    });
91}
Source

pub fn capacity(&mut self, capacity: usize) -> &mut Self

Set the maximum number of messages in the queue.

When the queue is full, further send()s will either block or fail with an error of type ErrorKind::WouldBlock.

If both capacity and max_msg_len are zero (or not set), the queue will be created with a maximum length and capacity decided by the operating system.
If this value is specified, max_msg_len should also be, or opening the message queue might fail.

Examples found in repository?
examples/sort.rs (line 28)
10fn main() {
11    let args = args_os()
12        .skip(1)
13        .map(|osstring| osstring.into_vec() )
14        .collect::<Vec<Vec<u8>>>();
15    if args.is_empty() {
16        return; // nothing to do
17    }
18
19    let mut recv_buf = vec![0; args.iter().map(Vec::len).max().unwrap()];
20
21    let name = CStr::from_bytes_with_nul(b"/sort\0").unwrap();
22
23    // create queue with the necessary permissions and open it
24    let mq = posixmq::OpenOptions::readwrite()
25        .nonblocking() // use WouldBlock to detect that the queue is empty
26        .create_new()
27        .mode(0o000) // only affects future attempts at opening it
28        .capacity(args.len())
29        .max_msg_len(recv_buf.len())
30        .open_c(name)
31        .expect("opening queue failed");
32
33    // write arguments to the queue
34    for arg in args {
35        mq.send(arg.len() as u32, &arg).expect("sending failed");
36    }
37
38    // read until the queue is empty
39    let stdout = stdout();
40    let mut stdout = stdout.lock();
41    loop {
42        match mq.recv(&mut recv_buf) {
43            Err(ref e) if e.kind() == ErrorKind::WouldBlock => break,
44            Err(e) => panic!("receiving failed: {}", e),
45            Ok((_priority, len)) => {
46                stdout.write_all(&recv_buf[..len])
47                    .and_then(|()| stdout.write_all(b"\n") )
48                    .expect("writing to stdout failed");
49            }
50        }
51    }
52
53    // clean up
54    posixmq::remove_queue_c(name).expect("deleting queue failed")
55}
More examples
Hide additional examples
examples/limits.rs (line 52)
28fn main() {
29    println!("{}:", std::env::consts::OS);
30
31    let mq = posixmq::OpenOptions::readwrite()
32        .create_new()
33        .open("/default_capacities")
34        .expect("Cannot create new posix message queue /default_capacities");
35    posixmq::remove_queue("/default_capacities")
36        .expect("Cannot delete posix message queue /default_capacities");
37
38    let attrs = mq.attributes().expect("Cannot get attributes for queue");
39    println!("default queue capacity:         {}", attrs.capacity);
40    println!("default maximum message length: {}", attrs.max_msg_len);
41
42    println!("max message priority:           {}", btry(0xff_ff_ff_ff, |priority| {
43        mq.send(priority as u32, b"b")
44            .map(|_| mq.recv(&mut vec![0; attrs.max_msg_len]).unwrap() )
45            .is_ok()
46    }));
47    println!("allows empty messages:          {}", mq.send(0, b"").is_ok());
48    drop(mq);
49
50    println!("max queue capacity:             {}", btry(1_000_000, |capacity| {
51        posixmq::OpenOptions::readwrite()
52            .capacity(capacity)
53            .max_msg_len(1)
54            .create_new()
55            .open("/max_capacity")
56            .map(|_| posixmq::remove_queue("/max_capacity").unwrap() )
57            .is_ok()
58    }));
59
60    println!("max message length:             {}", btry(1_000_000, |length| {
61        posixmq::OpenOptions::readwrite()
62            .max_msg_len(length)
63            .capacity(1)
64            .create_new()
65            .open("/max_length")
66            .map(|_| posixmq::remove_queue("/max_length").unwrap() )
67            .is_ok()
68    }));
69
70    println!("max equal:                      {}", btry(1_000_000, |equal| {
71        posixmq::OpenOptions::readwrite()
72            .max_msg_len(equal)
73            .capacity(equal)
74            .create_new()
75            .open("/max_equal")
76            .map(|_| posixmq::remove_queue("/max_equal").unwrap() )
77            .is_ok()
78    }));
79
80    println!("allows just \"/\":                {}", {
81        let result = posixmq::PosixMq::create("/");
82        let _ = posixmq::remove_queue("/");
83        result.is_ok()
84    });
85    println!("enforces name rules:            {}", {
86        let noslash = std::ffi::CStr::from_bytes_with_nul(b"noslash\0").unwrap();
87        let result = posixmq::OpenOptions::readwrite().create().open_c(noslash);
88        let _ = posixmq::remove_queue_c(noslash);
89        result.is_err()
90    });
91}
Source

pub fn create(&mut self) -> &mut Self

Create message queue if it doesn’t exist.

Examples found in repository?
examples/send.rs (line 18)
9fn main() {
10    let args = args_os().skip(1).collect::<Vec<_>>();
11    if args.len() % 2 == 0 {
12        eprintln!("Usage: cargo run --example send /queue-name [priority message] ...");
13        return;
14    }
15
16    // open the message queue
17    let mq = posixmq::OpenOptions::writeonly()
18        .create()
19        .mode(0o666) // FIXME set umask to actually make it sendable for others
20        .open(args[0].as_bytes())
21        .expect("opening failed");
22
23    if args.len() == 1 {
24        // read from stdin
25        for line in stdin().lock().lines() {
26            let line = line.expect("input must be UTF-8, sorry");
27            let line = line.trim_start();
28            let num_end = line.find(' ').expect("no space in line");
29            let priority = line[..num_end].parse::<u32>().expect("priority is not a number");
30            let msg = line[num_end+1..].trim_start();
31            mq.send(priority, msg.as_bytes()).expect("sending failed");
32        }
33    } else {
34        // read from the command line
35        for i in (1..args.len()).step_by(2) {
36            let priority = args[i].to_str()
37                .and_then(|s| s.parse::<u32>().ok() )
38                .expect("priority is not a number");
39            let msg = args[i+1].as_bytes();
40            mq.send(priority, msg).expect("sending failed");
41        }
42    }
43}
More examples
Hide additional examples
examples/limits.rs (line 87)
28fn main() {
29    println!("{}:", std::env::consts::OS);
30
31    let mq = posixmq::OpenOptions::readwrite()
32        .create_new()
33        .open("/default_capacities")
34        .expect("Cannot create new posix message queue /default_capacities");
35    posixmq::remove_queue("/default_capacities")
36        .expect("Cannot delete posix message queue /default_capacities");
37
38    let attrs = mq.attributes().expect("Cannot get attributes for queue");
39    println!("default queue capacity:         {}", attrs.capacity);
40    println!("default maximum message length: {}", attrs.max_msg_len);
41
42    println!("max message priority:           {}", btry(0xff_ff_ff_ff, |priority| {
43        mq.send(priority as u32, b"b")
44            .map(|_| mq.recv(&mut vec![0; attrs.max_msg_len]).unwrap() )
45            .is_ok()
46    }));
47    println!("allows empty messages:          {}", mq.send(0, b"").is_ok());
48    drop(mq);
49
50    println!("max queue capacity:             {}", btry(1_000_000, |capacity| {
51        posixmq::OpenOptions::readwrite()
52            .capacity(capacity)
53            .max_msg_len(1)
54            .create_new()
55            .open("/max_capacity")
56            .map(|_| posixmq::remove_queue("/max_capacity").unwrap() )
57            .is_ok()
58    }));
59
60    println!("max message length:             {}", btry(1_000_000, |length| {
61        posixmq::OpenOptions::readwrite()
62            .max_msg_len(length)
63            .capacity(1)
64            .create_new()
65            .open("/max_length")
66            .map(|_| posixmq::remove_queue("/max_length").unwrap() )
67            .is_ok()
68    }));
69
70    println!("max equal:                      {}", btry(1_000_000, |equal| {
71        posixmq::OpenOptions::readwrite()
72            .max_msg_len(equal)
73            .capacity(equal)
74            .create_new()
75            .open("/max_equal")
76            .map(|_| posixmq::remove_queue("/max_equal").unwrap() )
77            .is_ok()
78    }));
79
80    println!("allows just \"/\":                {}", {
81        let result = posixmq::PosixMq::create("/");
82        let _ = posixmq::remove_queue("/");
83        result.is_ok()
84    });
85    println!("enforces name rules:            {}", {
86        let noslash = std::ffi::CStr::from_bytes_with_nul(b"noslash\0").unwrap();
87        let result = posixmq::OpenOptions::readwrite().create().open_c(noslash);
88        let _ = posixmq::remove_queue_c(noslash);
89        result.is_err()
90    });
91}
examples/merge.rs (line 26)
7fn main() {
8    use std::env::args;
9    use std::io::ErrorKind;
10
11    use mio::{Poll, Events, Interest, Token};
12
13    let mut queues = args().skip(1).collect::<Vec<_>>();
14    let dst = queues.pop().expect("arguments required");
15    
16    // open source queues
17    let mut src = Vec::new();
18    for name in queues {
19        match posixmq::OpenOptions::readonly().nonblocking().open(&name) {
20            Ok(mq) => src.push((mq, name)),
21            Err(e) => panic!("Cannot open {:?} for receiving: {}", name, e),
22        }
23    }
24
25    // open destination queue
26    let mut dst = match posixmq::OpenOptions::writeonly().nonblocking().create().open(&dst) {
27        Ok(mq) => (mq, dst),
28        Err(e) => panic!("Cannot open or create {:?} for sending: {}", dst, e),
29    };
30
31    let mut poll = Poll::new().expect("Cannot create selector");
32    poll.registry()
33        .register(&mut dst.0, Token(0), Interest::WRITABLE)
34        .expect("registering destination failed");
35    for (i, &mut(ref mut src, _)) in src.iter_mut().enumerate() {
36        poll.registry()
37            .register(src, Token(i+1), Interest::READABLE)
38            .expect("registering a source failed");
39    }
40
41    let mut unsent = Vec::<(u32, Box<[u8]>, &str)>::new();
42    let mut buf = [0; 8192];
43    let mut events = Events::with_capacity(1024);
44    loop {
45        poll.poll(&mut events, None).expect("Cannot poll selector");
46
47        // receive all available messages from queues that are ready
48        for event in events.iter() {
49            if event.token() == Token(0) {
50                // dst; will try to send below even without this event
51                continue;
52            }
53
54            let &(ref mq, ref name) = &src[event.token().0-1];
55            loop {
56                match mq.recv(&mut buf) {
57                    Err(ref e) if e.kind() == ErrorKind::WouldBlock => break,
58                    Err(e) => panic!("Error receiving from {}: {}", name, e),
59                    Ok((priority, len)) => unsent.push((priority, Box::from(&buf[..len]), name)),
60                }
61            }
62        }
63
64        // send as many messages as possible
65        while let Some(&(priority, ref msg, ref src)) = unsent.last() {
66            if let Err(e) = dst.0.send(priority, msg) {
67                if e.kind() == ErrorKind::WouldBlock {
68                    break;
69                }
70                panic!("Error sending to {}: {}", dst.1, e);
71            }
72            println!("message of priority {} with {} bytes from {} sent to {}", 
73                priority, msg.len(), src, dst.1
74            );
75            let _ = unsent.pop();
76        }
77    }
78}
Source

pub fn create_new(&mut self) -> &mut Self

Create a new queue, failing if the queue already exists.

Examples found in repository?
examples/sort.rs (line 26)
10fn main() {
11    let args = args_os()
12        .skip(1)
13        .map(|osstring| osstring.into_vec() )
14        .collect::<Vec<Vec<u8>>>();
15    if args.is_empty() {
16        return; // nothing to do
17    }
18
19    let mut recv_buf = vec![0; args.iter().map(Vec::len).max().unwrap()];
20
21    let name = CStr::from_bytes_with_nul(b"/sort\0").unwrap();
22
23    // create queue with the necessary permissions and open it
24    let mq = posixmq::OpenOptions::readwrite()
25        .nonblocking() // use WouldBlock to detect that the queue is empty
26        .create_new()
27        .mode(0o000) // only affects future attempts at opening it
28        .capacity(args.len())
29        .max_msg_len(recv_buf.len())
30        .open_c(name)
31        .expect("opening queue failed");
32
33    // write arguments to the queue
34    for arg in args {
35        mq.send(arg.len() as u32, &arg).expect("sending failed");
36    }
37
38    // read until the queue is empty
39    let stdout = stdout();
40    let mut stdout = stdout.lock();
41    loop {
42        match mq.recv(&mut recv_buf) {
43            Err(ref e) if e.kind() == ErrorKind::WouldBlock => break,
44            Err(e) => panic!("receiving failed: {}", e),
45            Ok((_priority, len)) => {
46                stdout.write_all(&recv_buf[..len])
47                    .and_then(|()| stdout.write_all(b"\n") )
48                    .expect("writing to stdout failed");
49            }
50        }
51    }
52
53    // clean up
54    posixmq::remove_queue_c(name).expect("deleting queue failed")
55}
More examples
Hide additional examples
examples/limits.rs (line 32)
28fn main() {
29    println!("{}:", std::env::consts::OS);
30
31    let mq = posixmq::OpenOptions::readwrite()
32        .create_new()
33        .open("/default_capacities")
34        .expect("Cannot create new posix message queue /default_capacities");
35    posixmq::remove_queue("/default_capacities")
36        .expect("Cannot delete posix message queue /default_capacities");
37
38    let attrs = mq.attributes().expect("Cannot get attributes for queue");
39    println!("default queue capacity:         {}", attrs.capacity);
40    println!("default maximum message length: {}", attrs.max_msg_len);
41
42    println!("max message priority:           {}", btry(0xff_ff_ff_ff, |priority| {
43        mq.send(priority as u32, b"b")
44            .map(|_| mq.recv(&mut vec![0; attrs.max_msg_len]).unwrap() )
45            .is_ok()
46    }));
47    println!("allows empty messages:          {}", mq.send(0, b"").is_ok());
48    drop(mq);
49
50    println!("max queue capacity:             {}", btry(1_000_000, |capacity| {
51        posixmq::OpenOptions::readwrite()
52            .capacity(capacity)
53            .max_msg_len(1)
54            .create_new()
55            .open("/max_capacity")
56            .map(|_| posixmq::remove_queue("/max_capacity").unwrap() )
57            .is_ok()
58    }));
59
60    println!("max message length:             {}", btry(1_000_000, |length| {
61        posixmq::OpenOptions::readwrite()
62            .max_msg_len(length)
63            .capacity(1)
64            .create_new()
65            .open("/max_length")
66            .map(|_| posixmq::remove_queue("/max_length").unwrap() )
67            .is_ok()
68    }));
69
70    println!("max equal:                      {}", btry(1_000_000, |equal| {
71        posixmq::OpenOptions::readwrite()
72            .max_msg_len(equal)
73            .capacity(equal)
74            .create_new()
75            .open("/max_equal")
76            .map(|_| posixmq::remove_queue("/max_equal").unwrap() )
77            .is_ok()
78    }));
79
80    println!("allows just \"/\":                {}", {
81        let result = posixmq::PosixMq::create("/");
82        let _ = posixmq::remove_queue("/");
83        result.is_ok()
84    });
85    println!("enforces name rules:            {}", {
86        let noslash = std::ffi::CStr::from_bytes_with_nul(b"noslash\0").unwrap();
87        let result = posixmq::OpenOptions::readwrite().create().open_c(noslash);
88        let _ = posixmq::remove_queue_c(noslash);
89        result.is_err()
90    });
91}
Source

pub fn existing(&mut self) -> &mut Self

Require the queue to already exist, failing if it doesn’t.

Source

pub fn nonblocking(&mut self) -> &mut Self

Open the message queue in non-blocking mode.

This must be done if you want to use the message queue with mio.

Examples found in repository?
examples/receive.rs (line 29)
11fn main() {
12    let mut args = args_os().skip(1);
13    let name = args.next().expect("argument required");
14    let timeout = args.next().map(|arg| {
15        let arg = arg.into_string().expect("invalid timeout");
16        if arg == "nonblocking" || arg == "drain" || arg == "available" || arg == "empty" {
17            None
18        } else if arg.ends_with("ms") {
19            Some(Duration::from_millis(arg[..arg.len()-2].parse().expect("invalid duration")))
20        } else if arg.ends_with("s") {
21            Some(Duration::from_secs(arg[..arg.len()-1].parse().expect("invalid duration")))
22        } else {
23            Some(Duration::from_secs(arg.parse().expect("invalid duration")))
24        }
25    });
26
27    let mut opts = posixmq::OpenOptions::readonly();
28    if timeout == Some(None) {
29        opts = *opts.nonblocking();
30    }
31    let mq = opts.open(name.as_bytes()).expect("opening failed");
32
33    if let Some(Some(timeout)) = timeout {
34        let mut buf = vec![0; mq.attributes().unwrap_or_default().max_msg_len];
35        while let Ok((priority, len)) = mq.recv_timeout(&mut buf, timeout) {
36            print!("{:3}\t", priority);
37            stdout().write_all(&buf[..len]).expect("writing to stdout failed");
38            println!();
39        }
40    } else {
41        for (priority, msg) in mq {
42            print!("{:3}\t", priority);
43            stdout().write_all(&msg).expect("writing to stdout failed");
44            println!();
45        }
46    }
47}
More examples
Hide additional examples
examples/sort.rs (line 25)
10fn main() {
11    let args = args_os()
12        .skip(1)
13        .map(|osstring| osstring.into_vec() )
14        .collect::<Vec<Vec<u8>>>();
15    if args.is_empty() {
16        return; // nothing to do
17    }
18
19    let mut recv_buf = vec![0; args.iter().map(Vec::len).max().unwrap()];
20
21    let name = CStr::from_bytes_with_nul(b"/sort\0").unwrap();
22
23    // create queue with the necessary permissions and open it
24    let mq = posixmq::OpenOptions::readwrite()
25        .nonblocking() // use WouldBlock to detect that the queue is empty
26        .create_new()
27        .mode(0o000) // only affects future attempts at opening it
28        .capacity(args.len())
29        .max_msg_len(recv_buf.len())
30        .open_c(name)
31        .expect("opening queue failed");
32
33    // write arguments to the queue
34    for arg in args {
35        mq.send(arg.len() as u32, &arg).expect("sending failed");
36    }
37
38    // read until the queue is empty
39    let stdout = stdout();
40    let mut stdout = stdout.lock();
41    loop {
42        match mq.recv(&mut recv_buf) {
43            Err(ref e) if e.kind() == ErrorKind::WouldBlock => break,
44            Err(e) => panic!("receiving failed: {}", e),
45            Ok((_priority, len)) => {
46                stdout.write_all(&recv_buf[..len])
47                    .and_then(|()| stdout.write_all(b"\n") )
48                    .expect("writing to stdout failed");
49            }
50        }
51    }
52
53    // clean up
54    posixmq::remove_queue_c(name).expect("deleting queue failed")
55}
examples/merge.rs (line 19)
7fn main() {
8    use std::env::args;
9    use std::io::ErrorKind;
10
11    use mio::{Poll, Events, Interest, Token};
12
13    let mut queues = args().skip(1).collect::<Vec<_>>();
14    let dst = queues.pop().expect("arguments required");
15    
16    // open source queues
17    let mut src = Vec::new();
18    for name in queues {
19        match posixmq::OpenOptions::readonly().nonblocking().open(&name) {
20            Ok(mq) => src.push((mq, name)),
21            Err(e) => panic!("Cannot open {:?} for receiving: {}", name, e),
22        }
23    }
24
25    // open destination queue
26    let mut dst = match posixmq::OpenOptions::writeonly().nonblocking().create().open(&dst) {
27        Ok(mq) => (mq, dst),
28        Err(e) => panic!("Cannot open or create {:?} for sending: {}", dst, e),
29    };
30
31    let mut poll = Poll::new().expect("Cannot create selector");
32    poll.registry()
33        .register(&mut dst.0, Token(0), Interest::WRITABLE)
34        .expect("registering destination failed");
35    for (i, &mut(ref mut src, _)) in src.iter_mut().enumerate() {
36        poll.registry()
37            .register(src, Token(i+1), Interest::READABLE)
38            .expect("registering a source failed");
39    }
40
41    let mut unsent = Vec::<(u32, Box<[u8]>, &str)>::new();
42    let mut buf = [0; 8192];
43    let mut events = Events::with_capacity(1024);
44    loop {
45        poll.poll(&mut events, None).expect("Cannot poll selector");
46
47        // receive all available messages from queues that are ready
48        for event in events.iter() {
49            if event.token() == Token(0) {
50                // dst; will try to send below even without this event
51                continue;
52            }
53
54            let &(ref mq, ref name) = &src[event.token().0-1];
55            loop {
56                match mq.recv(&mut buf) {
57                    Err(ref e) if e.kind() == ErrorKind::WouldBlock => break,
58                    Err(e) => panic!("Error receiving from {}: {}", name, e),
59                    Ok((priority, len)) => unsent.push((priority, Box::from(&buf[..len]), name)),
60                }
61            }
62        }
63
64        // send as many messages as possible
65        while let Some(&(priority, ref msg, ref src)) = unsent.last() {
66            if let Err(e) = dst.0.send(priority, msg) {
67                if e.kind() == ErrorKind::WouldBlock {
68                    break;
69                }
70                panic!("Error sending to {}: {}", dst.1, e);
71            }
72            println!("message of priority {} with {} bytes from {} sent to {}", 
73                priority, msg.len(), src, dst.1
74            );
75            let _ = unsent.pop();
76        }
77    }
78}
Source

pub fn open<N: AsRef<[u8]> + ?Sized>(&self, name: &N) -> Result<PosixMq, Error>

Open a queue with the specified options.

If the name doesn’t start with a ‘/’, one will be prepended.

§Errors
  • Queue doesn’t exist (ENOENT) => ErrorKind::NotFound
  • Name is just “/” (ENOENT) or is empty => ErrorKind::NotFound
  • Queue already exists (EEXISTS) => ErrorKind::AlreadyExists
  • Not permitted to open in this mode (EACCESS) => ErrorKind::PermissionDenied
  • More than one ‘/’ in name (EACCESS) => ErrorKind::PermissionDenied
  • Invalid capacities (EINVAL) => ErrorKind::InvalidInput
  • Capacities too high (EMFILE) => ErrorKind::Other
  • Posix message queues are disabled (ENOSYS) => ErrorKind::Other
  • Name contains ‘\0’ => ErrorKind::InvalidInput
  • Name is too long (ENAMETOOLONG) => ErrorKind::Other
  • Unlikely (ENFILE, EMFILE, ENOMEM, ENOSPC) => ErrorKind::Other
  • Possibly other
Examples found in repository?
examples/send.rs (line 20)
9fn main() {
10    let args = args_os().skip(1).collect::<Vec<_>>();
11    if args.len() % 2 == 0 {
12        eprintln!("Usage: cargo run --example send /queue-name [priority message] ...");
13        return;
14    }
15
16    // open the message queue
17    let mq = posixmq::OpenOptions::writeonly()
18        .create()
19        .mode(0o666) // FIXME set umask to actually make it sendable for others
20        .open(args[0].as_bytes())
21        .expect("opening failed");
22
23    if args.len() == 1 {
24        // read from stdin
25        for line in stdin().lock().lines() {
26            let line = line.expect("input must be UTF-8, sorry");
27            let line = line.trim_start();
28            let num_end = line.find(' ').expect("no space in line");
29            let priority = line[..num_end].parse::<u32>().expect("priority is not a number");
30            let msg = line[num_end+1..].trim_start();
31            mq.send(priority, msg.as_bytes()).expect("sending failed");
32        }
33    } else {
34        // read from the command line
35        for i in (1..args.len()).step_by(2) {
36            let priority = args[i].to_str()
37                .and_then(|s| s.parse::<u32>().ok() )
38                .expect("priority is not a number");
39            let msg = args[i+1].as_bytes();
40            mq.send(priority, msg).expect("sending failed");
41        }
42    }
43}
More examples
Hide additional examples
examples/receive.rs (line 31)
11fn main() {
12    let mut args = args_os().skip(1);
13    let name = args.next().expect("argument required");
14    let timeout = args.next().map(|arg| {
15        let arg = arg.into_string().expect("invalid timeout");
16        if arg == "nonblocking" || arg == "drain" || arg == "available" || arg == "empty" {
17            None
18        } else if arg.ends_with("ms") {
19            Some(Duration::from_millis(arg[..arg.len()-2].parse().expect("invalid duration")))
20        } else if arg.ends_with("s") {
21            Some(Duration::from_secs(arg[..arg.len()-1].parse().expect("invalid duration")))
22        } else {
23            Some(Duration::from_secs(arg.parse().expect("invalid duration")))
24        }
25    });
26
27    let mut opts = posixmq::OpenOptions::readonly();
28    if timeout == Some(None) {
29        opts = *opts.nonblocking();
30    }
31    let mq = opts.open(name.as_bytes()).expect("opening failed");
32
33    if let Some(Some(timeout)) = timeout {
34        let mut buf = vec![0; mq.attributes().unwrap_or_default().max_msg_len];
35        while let Ok((priority, len)) = mq.recv_timeout(&mut buf, timeout) {
36            print!("{:3}\t", priority);
37            stdout().write_all(&buf[..len]).expect("writing to stdout failed");
38            println!();
39        }
40    } else {
41        for (priority, msg) in mq {
42            print!("{:3}\t", priority);
43            stdout().write_all(&msg).expect("writing to stdout failed");
44            println!();
45        }
46    }
47}
examples/limits.rs (line 33)
28fn main() {
29    println!("{}:", std::env::consts::OS);
30
31    let mq = posixmq::OpenOptions::readwrite()
32        .create_new()
33        .open("/default_capacities")
34        .expect("Cannot create new posix message queue /default_capacities");
35    posixmq::remove_queue("/default_capacities")
36        .expect("Cannot delete posix message queue /default_capacities");
37
38    let attrs = mq.attributes().expect("Cannot get attributes for queue");
39    println!("default queue capacity:         {}", attrs.capacity);
40    println!("default maximum message length: {}", attrs.max_msg_len);
41
42    println!("max message priority:           {}", btry(0xff_ff_ff_ff, |priority| {
43        mq.send(priority as u32, b"b")
44            .map(|_| mq.recv(&mut vec![0; attrs.max_msg_len]).unwrap() )
45            .is_ok()
46    }));
47    println!("allows empty messages:          {}", mq.send(0, b"").is_ok());
48    drop(mq);
49
50    println!("max queue capacity:             {}", btry(1_000_000, |capacity| {
51        posixmq::OpenOptions::readwrite()
52            .capacity(capacity)
53            .max_msg_len(1)
54            .create_new()
55            .open("/max_capacity")
56            .map(|_| posixmq::remove_queue("/max_capacity").unwrap() )
57            .is_ok()
58    }));
59
60    println!("max message length:             {}", btry(1_000_000, |length| {
61        posixmq::OpenOptions::readwrite()
62            .max_msg_len(length)
63            .capacity(1)
64            .create_new()
65            .open("/max_length")
66            .map(|_| posixmq::remove_queue("/max_length").unwrap() )
67            .is_ok()
68    }));
69
70    println!("max equal:                      {}", btry(1_000_000, |equal| {
71        posixmq::OpenOptions::readwrite()
72            .max_msg_len(equal)
73            .capacity(equal)
74            .create_new()
75            .open("/max_equal")
76            .map(|_| posixmq::remove_queue("/max_equal").unwrap() )
77            .is_ok()
78    }));
79
80    println!("allows just \"/\":                {}", {
81        let result = posixmq::PosixMq::create("/");
82        let _ = posixmq::remove_queue("/");
83        result.is_ok()
84    });
85    println!("enforces name rules:            {}", {
86        let noslash = std::ffi::CStr::from_bytes_with_nul(b"noslash\0").unwrap();
87        let result = posixmq::OpenOptions::readwrite().create().open_c(noslash);
88        let _ = posixmq::remove_queue_c(noslash);
89        result.is_err()
90    });
91}
examples/merge.rs (line 19)
7fn main() {
8    use std::env::args;
9    use std::io::ErrorKind;
10
11    use mio::{Poll, Events, Interest, Token};
12
13    let mut queues = args().skip(1).collect::<Vec<_>>();
14    let dst = queues.pop().expect("arguments required");
15    
16    // open source queues
17    let mut src = Vec::new();
18    for name in queues {
19        match posixmq::OpenOptions::readonly().nonblocking().open(&name) {
20            Ok(mq) => src.push((mq, name)),
21            Err(e) => panic!("Cannot open {:?} for receiving: {}", name, e),
22        }
23    }
24
25    // open destination queue
26    let mut dst = match posixmq::OpenOptions::writeonly().nonblocking().create().open(&dst) {
27        Ok(mq) => (mq, dst),
28        Err(e) => panic!("Cannot open or create {:?} for sending: {}", dst, e),
29    };
30
31    let mut poll = Poll::new().expect("Cannot create selector");
32    poll.registry()
33        .register(&mut dst.0, Token(0), Interest::WRITABLE)
34        .expect("registering destination failed");
35    for (i, &mut(ref mut src, _)) in src.iter_mut().enumerate() {
36        poll.registry()
37            .register(src, Token(i+1), Interest::READABLE)
38            .expect("registering a source failed");
39    }
40
41    let mut unsent = Vec::<(u32, Box<[u8]>, &str)>::new();
42    let mut buf = [0; 8192];
43    let mut events = Events::with_capacity(1024);
44    loop {
45        poll.poll(&mut events, None).expect("Cannot poll selector");
46
47        // receive all available messages from queues that are ready
48        for event in events.iter() {
49            if event.token() == Token(0) {
50                // dst; will try to send below even without this event
51                continue;
52            }
53
54            let &(ref mq, ref name) = &src[event.token().0-1];
55            loop {
56                match mq.recv(&mut buf) {
57                    Err(ref e) if e.kind() == ErrorKind::WouldBlock => break,
58                    Err(e) => panic!("Error receiving from {}: {}", name, e),
59                    Ok((priority, len)) => unsent.push((priority, Box::from(&buf[..len]), name)),
60                }
61            }
62        }
63
64        // send as many messages as possible
65        while let Some(&(priority, ref msg, ref src)) = unsent.last() {
66            if let Err(e) = dst.0.send(priority, msg) {
67                if e.kind() == ErrorKind::WouldBlock {
68                    break;
69                }
70                panic!("Error sending to {}: {}", dst.1, e);
71            }
72            println!("message of priority {} with {} bytes from {} sent to {}", 
73                priority, msg.len(), src, dst.1
74            );
75            let _ = unsent.pop();
76        }
77    }
78}
Source

pub fn open_c(&self, name: &CStr) -> Result<PosixMq, Error>

Open a queue with the specified options and without inspecting name or allocating.

This can on NetBSD be used to access message queues with names that doesn’t start with a ‘/’.

§Errors
  • Queue doesn’t exist (ENOENT) => ErrorKind::NotFound
  • Name is just “/” (ENOENT) => ErrorKind::NotFound
  • Queue already exists (EEXISTS) => ErrorKind::AlreadyExists
  • Not permitted to open in this mode (EACCESS) => ErrorKind::PermissionDenied
  • More than one ‘/’ in name (EACCESS) => ErrorKind::PermissionDenied
  • Invalid capacities (EINVAL) => ErrorKind::InvalidInput
  • Posix message queues are disabled (ENOSYS) => ErrorKind::Other
  • Name is empty (EINVAL) => ErrorKind::InvalidInput
  • Name is too long (ENAMETOOLONG) => ErrorKind::Other
  • Unlikely (ENFILE, EMFILE, ENOMEM, ENOSPC) => ErrorKind::Other
  • Possibly other
Examples found in repository?
examples/sort.rs (line 30)
10fn main() {
11    let args = args_os()
12        .skip(1)
13        .map(|osstring| osstring.into_vec() )
14        .collect::<Vec<Vec<u8>>>();
15    if args.is_empty() {
16        return; // nothing to do
17    }
18
19    let mut recv_buf = vec![0; args.iter().map(Vec::len).max().unwrap()];
20
21    let name = CStr::from_bytes_with_nul(b"/sort\0").unwrap();
22
23    // create queue with the necessary permissions and open it
24    let mq = posixmq::OpenOptions::readwrite()
25        .nonblocking() // use WouldBlock to detect that the queue is empty
26        .create_new()
27        .mode(0o000) // only affects future attempts at opening it
28        .capacity(args.len())
29        .max_msg_len(recv_buf.len())
30        .open_c(name)
31        .expect("opening queue failed");
32
33    // write arguments to the queue
34    for arg in args {
35        mq.send(arg.len() as u32, &arg).expect("sending failed");
36    }
37
38    // read until the queue is empty
39    let stdout = stdout();
40    let mut stdout = stdout.lock();
41    loop {
42        match mq.recv(&mut recv_buf) {
43            Err(ref e) if e.kind() == ErrorKind::WouldBlock => break,
44            Err(e) => panic!("receiving failed: {}", e),
45            Ok((_priority, len)) => {
46                stdout.write_all(&recv_buf[..len])
47                    .and_then(|()| stdout.write_all(b"\n") )
48                    .expect("writing to stdout failed");
49            }
50        }
51    }
52
53    // clean up
54    posixmq::remove_queue_c(name).expect("deleting queue failed")
55}
More examples
Hide additional examples
examples/limits.rs (line 87)
28fn main() {
29    println!("{}:", std::env::consts::OS);
30
31    let mq = posixmq::OpenOptions::readwrite()
32        .create_new()
33        .open("/default_capacities")
34        .expect("Cannot create new posix message queue /default_capacities");
35    posixmq::remove_queue("/default_capacities")
36        .expect("Cannot delete posix message queue /default_capacities");
37
38    let attrs = mq.attributes().expect("Cannot get attributes for queue");
39    println!("default queue capacity:         {}", attrs.capacity);
40    println!("default maximum message length: {}", attrs.max_msg_len);
41
42    println!("max message priority:           {}", btry(0xff_ff_ff_ff, |priority| {
43        mq.send(priority as u32, b"b")
44            .map(|_| mq.recv(&mut vec![0; attrs.max_msg_len]).unwrap() )
45            .is_ok()
46    }));
47    println!("allows empty messages:          {}", mq.send(0, b"").is_ok());
48    drop(mq);
49
50    println!("max queue capacity:             {}", btry(1_000_000, |capacity| {
51        posixmq::OpenOptions::readwrite()
52            .capacity(capacity)
53            .max_msg_len(1)
54            .create_new()
55            .open("/max_capacity")
56            .map(|_| posixmq::remove_queue("/max_capacity").unwrap() )
57            .is_ok()
58    }));
59
60    println!("max message length:             {}", btry(1_000_000, |length| {
61        posixmq::OpenOptions::readwrite()
62            .max_msg_len(length)
63            .capacity(1)
64            .create_new()
65            .open("/max_length")
66            .map(|_| posixmq::remove_queue("/max_length").unwrap() )
67            .is_ok()
68    }));
69
70    println!("max equal:                      {}", btry(1_000_000, |equal| {
71        posixmq::OpenOptions::readwrite()
72            .max_msg_len(equal)
73            .capacity(equal)
74            .create_new()
75            .open("/max_equal")
76            .map(|_| posixmq::remove_queue("/max_equal").unwrap() )
77            .is_ok()
78    }));
79
80    println!("allows just \"/\":                {}", {
81        let result = posixmq::PosixMq::create("/");
82        let _ = posixmq::remove_queue("/");
83        result.is_ok()
84    });
85    println!("enforces name rules:            {}", {
86        let noslash = std::ffi::CStr::from_bytes_with_nul(b"noslash\0").unwrap();
87        let result = posixmq::OpenOptions::readwrite().create().open_c(noslash);
88        let _ = posixmq::remove_queue_c(noslash);
89        result.is_err()
90    });
91}

Trait Implementations§

Source§

impl Clone for OpenOptions

Source§

fn clone(&self) -> OpenOptions

Returns a duplicate of the value. Read more
1.0.0 · Source§

fn clone_from(&mut self, source: &Self)

Performs copy-assignment from source. Read more
Source§

impl Debug for OpenOptions

Source§

fn fmt(&self, fmtr: &mut Formatter<'_>) -> Result

Formats the value using the given formatter. Read more
Source§

impl PartialEq for OpenOptions

Source§

fn eq(&self, other: &OpenOptions) -> bool

Tests for self and other values to be equal, and is used by ==.
1.0.0 · Source§

fn ne(&self, other: &Rhs) -> bool

Tests for !=. The default implementation is almost always sufficient, and should not be overridden without very good reason.
Source§

impl Copy for OpenOptions

Source§

impl Eq for OpenOptions

Source§

impl StructuralPartialEq for OpenOptions

Auto Trait Implementations§

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> CloneToUninit for T
where T: Clone,

Source§

unsafe fn clone_to_uninit(&self, dest: *mut u8)

🔬This is a nightly-only experimental API. (clone_to_uninit)
Performs copy-assignment from self to dest. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T> ToOwned for T
where T: Clone,

Source§

type Owned = T

The resulting type after obtaining ownership.
Source§

fn to_owned(&self) -> T

Creates owned data from borrowed data, usually by cloning. Read more
Source§

fn clone_into(&self, target: &mut T)

Uses borrowed data to replace owned data, usually by cloning. Read more
Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.