pub struct OpenOptions { /* private fields */ }
Expand description
Flags and parameters which control how a PosixMq
message queue is opened or created.
Implementations§
Source§impl OpenOptions
impl OpenOptions
Sourcepub fn readonly() -> Self
pub fn readonly() -> Self
Open message queue for receiving only.
Examples found in repository?
11fn main() {
12 let mut args = args_os().skip(1);
13 let name = args.next().expect("argument required");
14 let timeout = args.next().map(|arg| {
15 let arg = arg.into_string().expect("invalid timeout");
16 if arg == "nonblocking" || arg == "drain" || arg == "available" || arg == "empty" {
17 None
18 } else if arg.ends_with("ms") {
19 Some(Duration::from_millis(arg[..arg.len()-2].parse().expect("invalid duration")))
20 } else if arg.ends_with("s") {
21 Some(Duration::from_secs(arg[..arg.len()-1].parse().expect("invalid duration")))
22 } else {
23 Some(Duration::from_secs(arg.parse().expect("invalid duration")))
24 }
25 });
26
27 let mut opts = posixmq::OpenOptions::readonly();
28 if timeout == Some(None) {
29 opts = *opts.nonblocking();
30 }
31 let mq = opts.open(name.as_bytes()).expect("opening failed");
32
33 if let Some(Some(timeout)) = timeout {
34 let mut buf = vec![0; mq.attributes().unwrap_or_default().max_msg_len];
35 while let Ok((priority, len)) = mq.recv_timeout(&mut buf, timeout) {
36 print!("{:3}\t", priority);
37 stdout().write_all(&buf[..len]).expect("writing to stdout failed");
38 println!();
39 }
40 } else {
41 for (priority, msg) in mq {
42 print!("{:3}\t", priority);
43 stdout().write_all(&msg).expect("writing to stdout failed");
44 println!();
45 }
46 }
47}
More examples
7fn main() {
8 use std::env::args;
9 use std::io::ErrorKind;
10
11 use mio::{Poll, Events, Interest, Token};
12
13 let mut queues = args().skip(1).collect::<Vec<_>>();
14 let dst = queues.pop().expect("arguments required");
15
16 // open source queues
17 let mut src = Vec::new();
18 for name in queues {
19 match posixmq::OpenOptions::readonly().nonblocking().open(&name) {
20 Ok(mq) => src.push((mq, name)),
21 Err(e) => panic!("Cannot open {:?} for receiving: {}", name, e),
22 }
23 }
24
25 // open destination queue
26 let mut dst = match posixmq::OpenOptions::writeonly().nonblocking().create().open(&dst) {
27 Ok(mq) => (mq, dst),
28 Err(e) => panic!("Cannot open or create {:?} for sending: {}", dst, e),
29 };
30
31 let mut poll = Poll::new().expect("Cannot create selector");
32 poll.registry()
33 .register(&mut dst.0, Token(0), Interest::WRITABLE)
34 .expect("registering destination failed");
35 for (i, &mut(ref mut src, _)) in src.iter_mut().enumerate() {
36 poll.registry()
37 .register(src, Token(i+1), Interest::READABLE)
38 .expect("registering a source failed");
39 }
40
41 let mut unsent = Vec::<(u32, Box<[u8]>, &str)>::new();
42 let mut buf = [0; 8192];
43 let mut events = Events::with_capacity(1024);
44 loop {
45 poll.poll(&mut events, None).expect("Cannot poll selector");
46
47 // receive all available messages from queues that are ready
48 for event in events.iter() {
49 if event.token() == Token(0) {
50 // dst; will try to send below even without this event
51 continue;
52 }
53
54 let &(ref mq, ref name) = &src[event.token().0-1];
55 loop {
56 match mq.recv(&mut buf) {
57 Err(ref e) if e.kind() == ErrorKind::WouldBlock => break,
58 Err(e) => panic!("Error receiving from {}: {}", name, e),
59 Ok((priority, len)) => unsent.push((priority, Box::from(&buf[..len]), name)),
60 }
61 }
62 }
63
64 // send as many messages as possible
65 while let Some(&(priority, ref msg, ref src)) = unsent.last() {
66 if let Err(e) = dst.0.send(priority, msg) {
67 if e.kind() == ErrorKind::WouldBlock {
68 break;
69 }
70 panic!("Error sending to {}: {}", dst.1, e);
71 }
72 println!("message of priority {} with {} bytes from {} sent to {}",
73 priority, msg.len(), src, dst.1
74 );
75 let _ = unsent.pop();
76 }
77 }
78}
Sourcepub fn writeonly() -> Self
pub fn writeonly() -> Self
Open message queue for sending only.
Examples found in repository?
9fn main() {
10 let args = args_os().skip(1).collect::<Vec<_>>();
11 if args.len() % 2 == 0 {
12 eprintln!("Usage: cargo run --example send /queue-name [priority message] ...");
13 return;
14 }
15
16 // open the message queue
17 let mq = posixmq::OpenOptions::writeonly()
18 .create()
19 .mode(0o666) // FIXME set umask to actually make it sendable for others
20 .open(args[0].as_bytes())
21 .expect("opening failed");
22
23 if args.len() == 1 {
24 // read from stdin
25 for line in stdin().lock().lines() {
26 let line = line.expect("input must be UTF-8, sorry");
27 let line = line.trim_start();
28 let num_end = line.find(' ').expect("no space in line");
29 let priority = line[..num_end].parse::<u32>().expect("priority is not a number");
30 let msg = line[num_end+1..].trim_start();
31 mq.send(priority, msg.as_bytes()).expect("sending failed");
32 }
33 } else {
34 // read from the command line
35 for i in (1..args.len()).step_by(2) {
36 let priority = args[i].to_str()
37 .and_then(|s| s.parse::<u32>().ok() )
38 .expect("priority is not a number");
39 let msg = args[i+1].as_bytes();
40 mq.send(priority, msg).expect("sending failed");
41 }
42 }
43}
More examples
7fn main() {
8 use std::env::args;
9 use std::io::ErrorKind;
10
11 use mio::{Poll, Events, Interest, Token};
12
13 let mut queues = args().skip(1).collect::<Vec<_>>();
14 let dst = queues.pop().expect("arguments required");
15
16 // open source queues
17 let mut src = Vec::new();
18 for name in queues {
19 match posixmq::OpenOptions::readonly().nonblocking().open(&name) {
20 Ok(mq) => src.push((mq, name)),
21 Err(e) => panic!("Cannot open {:?} for receiving: {}", name, e),
22 }
23 }
24
25 // open destination queue
26 let mut dst = match posixmq::OpenOptions::writeonly().nonblocking().create().open(&dst) {
27 Ok(mq) => (mq, dst),
28 Err(e) => panic!("Cannot open or create {:?} for sending: {}", dst, e),
29 };
30
31 let mut poll = Poll::new().expect("Cannot create selector");
32 poll.registry()
33 .register(&mut dst.0, Token(0), Interest::WRITABLE)
34 .expect("registering destination failed");
35 for (i, &mut(ref mut src, _)) in src.iter_mut().enumerate() {
36 poll.registry()
37 .register(src, Token(i+1), Interest::READABLE)
38 .expect("registering a source failed");
39 }
40
41 let mut unsent = Vec::<(u32, Box<[u8]>, &str)>::new();
42 let mut buf = [0; 8192];
43 let mut events = Events::with_capacity(1024);
44 loop {
45 poll.poll(&mut events, None).expect("Cannot poll selector");
46
47 // receive all available messages from queues that are ready
48 for event in events.iter() {
49 if event.token() == Token(0) {
50 // dst; will try to send below even without this event
51 continue;
52 }
53
54 let &(ref mq, ref name) = &src[event.token().0-1];
55 loop {
56 match mq.recv(&mut buf) {
57 Err(ref e) if e.kind() == ErrorKind::WouldBlock => break,
58 Err(e) => panic!("Error receiving from {}: {}", name, e),
59 Ok((priority, len)) => unsent.push((priority, Box::from(&buf[..len]), name)),
60 }
61 }
62 }
63
64 // send as many messages as possible
65 while let Some(&(priority, ref msg, ref src)) = unsent.last() {
66 if let Err(e) = dst.0.send(priority, msg) {
67 if e.kind() == ErrorKind::WouldBlock {
68 break;
69 }
70 panic!("Error sending to {}: {}", dst.1, e);
71 }
72 println!("message of priority {} with {} bytes from {} sent to {}",
73 priority, msg.len(), src, dst.1
74 );
75 let _ = unsent.pop();
76 }
77 }
78}
Sourcepub fn readwrite() -> Self
pub fn readwrite() -> Self
Open message queue both for sending and receiving.
Examples found in repository?
10fn main() {
11 let args = args_os()
12 .skip(1)
13 .map(|osstring| osstring.into_vec() )
14 .collect::<Vec<Vec<u8>>>();
15 if args.is_empty() {
16 return; // nothing to do
17 }
18
19 let mut recv_buf = vec![0; args.iter().map(Vec::len).max().unwrap()];
20
21 let name = CStr::from_bytes_with_nul(b"/sort\0").unwrap();
22
23 // create queue with the necessary permissions and open it
24 let mq = posixmq::OpenOptions::readwrite()
25 .nonblocking() // use WouldBlock to detect that the queue is empty
26 .create_new()
27 .mode(0o000) // only affects future attempts at opening it
28 .capacity(args.len())
29 .max_msg_len(recv_buf.len())
30 .open_c(name)
31 .expect("opening queue failed");
32
33 // write arguments to the queue
34 for arg in args {
35 mq.send(arg.len() as u32, &arg).expect("sending failed");
36 }
37
38 // read until the queue is empty
39 let stdout = stdout();
40 let mut stdout = stdout.lock();
41 loop {
42 match mq.recv(&mut recv_buf) {
43 Err(ref e) if e.kind() == ErrorKind::WouldBlock => break,
44 Err(e) => panic!("receiving failed: {}", e),
45 Ok((_priority, len)) => {
46 stdout.write_all(&recv_buf[..len])
47 .and_then(|()| stdout.write_all(b"\n") )
48 .expect("writing to stdout failed");
49 }
50 }
51 }
52
53 // clean up
54 posixmq::remove_queue_c(name).expect("deleting queue failed")
55}
More examples
28fn main() {
29 println!("{}:", std::env::consts::OS);
30
31 let mq = posixmq::OpenOptions::readwrite()
32 .create_new()
33 .open("/default_capacities")
34 .expect("Cannot create new posix message queue /default_capacities");
35 posixmq::remove_queue("/default_capacities")
36 .expect("Cannot delete posix message queue /default_capacities");
37
38 let attrs = mq.attributes().expect("Cannot get attributes for queue");
39 println!("default queue capacity: {}", attrs.capacity);
40 println!("default maximum message length: {}", attrs.max_msg_len);
41
42 println!("max message priority: {}", btry(0xff_ff_ff_ff, |priority| {
43 mq.send(priority as u32, b"b")
44 .map(|_| mq.recv(&mut vec![0; attrs.max_msg_len]).unwrap() )
45 .is_ok()
46 }));
47 println!("allows empty messages: {}", mq.send(0, b"").is_ok());
48 drop(mq);
49
50 println!("max queue capacity: {}", btry(1_000_000, |capacity| {
51 posixmq::OpenOptions::readwrite()
52 .capacity(capacity)
53 .max_msg_len(1)
54 .create_new()
55 .open("/max_capacity")
56 .map(|_| posixmq::remove_queue("/max_capacity").unwrap() )
57 .is_ok()
58 }));
59
60 println!("max message length: {}", btry(1_000_000, |length| {
61 posixmq::OpenOptions::readwrite()
62 .max_msg_len(length)
63 .capacity(1)
64 .create_new()
65 .open("/max_length")
66 .map(|_| posixmq::remove_queue("/max_length").unwrap() )
67 .is_ok()
68 }));
69
70 println!("max equal: {}", btry(1_000_000, |equal| {
71 posixmq::OpenOptions::readwrite()
72 .max_msg_len(equal)
73 .capacity(equal)
74 .create_new()
75 .open("/max_equal")
76 .map(|_| posixmq::remove_queue("/max_equal").unwrap() )
77 .is_ok()
78 }));
79
80 println!("allows just \"/\": {}", {
81 let result = posixmq::PosixMq::create("/");
82 let _ = posixmq::remove_queue("/");
83 result.is_ok()
84 });
85 println!("enforces name rules: {}", {
86 let noslash = std::ffi::CStr::from_bytes_with_nul(b"noslash\0").unwrap();
87 let result = posixmq::OpenOptions::readwrite().create().open_c(noslash);
88 let _ = posixmq::remove_queue_c(noslash);
89 result.is_err()
90 });
91}
Sourcepub fn mode(&mut self, mode: u32) -> &mut Self
pub fn mode(&mut self, mode: u32) -> &mut Self
Set permissions to create the queue with.
Some bits might be cleared by the process’s umask when creating the queue, and unknown bits are ignored.
This field is ignored if the queue already exists or should not be created. If this method is not called, queues are created with mode 600.
Examples found in repository?
9fn main() {
10 let args = args_os().skip(1).collect::<Vec<_>>();
11 if args.len() % 2 == 0 {
12 eprintln!("Usage: cargo run --example send /queue-name [priority message] ...");
13 return;
14 }
15
16 // open the message queue
17 let mq = posixmq::OpenOptions::writeonly()
18 .create()
19 .mode(0o666) // FIXME set umask to actually make it sendable for others
20 .open(args[0].as_bytes())
21 .expect("opening failed");
22
23 if args.len() == 1 {
24 // read from stdin
25 for line in stdin().lock().lines() {
26 let line = line.expect("input must be UTF-8, sorry");
27 let line = line.trim_start();
28 let num_end = line.find(' ').expect("no space in line");
29 let priority = line[..num_end].parse::<u32>().expect("priority is not a number");
30 let msg = line[num_end+1..].trim_start();
31 mq.send(priority, msg.as_bytes()).expect("sending failed");
32 }
33 } else {
34 // read from the command line
35 for i in (1..args.len()).step_by(2) {
36 let priority = args[i].to_str()
37 .and_then(|s| s.parse::<u32>().ok() )
38 .expect("priority is not a number");
39 let msg = args[i+1].as_bytes();
40 mq.send(priority, msg).expect("sending failed");
41 }
42 }
43}
More examples
10fn main() {
11 let args = args_os()
12 .skip(1)
13 .map(|osstring| osstring.into_vec() )
14 .collect::<Vec<Vec<u8>>>();
15 if args.is_empty() {
16 return; // nothing to do
17 }
18
19 let mut recv_buf = vec![0; args.iter().map(Vec::len).max().unwrap()];
20
21 let name = CStr::from_bytes_with_nul(b"/sort\0").unwrap();
22
23 // create queue with the necessary permissions and open it
24 let mq = posixmq::OpenOptions::readwrite()
25 .nonblocking() // use WouldBlock to detect that the queue is empty
26 .create_new()
27 .mode(0o000) // only affects future attempts at opening it
28 .capacity(args.len())
29 .max_msg_len(recv_buf.len())
30 .open_c(name)
31 .expect("opening queue failed");
32
33 // write arguments to the queue
34 for arg in args {
35 mq.send(arg.len() as u32, &arg).expect("sending failed");
36 }
37
38 // read until the queue is empty
39 let stdout = stdout();
40 let mut stdout = stdout.lock();
41 loop {
42 match mq.recv(&mut recv_buf) {
43 Err(ref e) if e.kind() == ErrorKind::WouldBlock => break,
44 Err(e) => panic!("receiving failed: {}", e),
45 Ok((_priority, len)) => {
46 stdout.write_all(&recv_buf[..len])
47 .and_then(|()| stdout.write_all(b"\n") )
48 .expect("writing to stdout failed");
49 }
50 }
51 }
52
53 // clean up
54 posixmq::remove_queue_c(name).expect("deleting queue failed")
55}
Sourcepub fn max_msg_len(&mut self, max_msg_len: usize) -> &mut Self
pub fn max_msg_len(&mut self, max_msg_len: usize) -> &mut Self
Set the maximum size of each message.
recv()
will fail if given a buffer smaller than this value.
If max_msg_len and capacity are both zero (or not set), the queue
will be created with a maximum length and capacity decided by the
operating system.
If this value is specified, capacity should also be, or opening the
message queue might fail.
Examples found in repository?
10fn main() {
11 let args = args_os()
12 .skip(1)
13 .map(|osstring| osstring.into_vec() )
14 .collect::<Vec<Vec<u8>>>();
15 if args.is_empty() {
16 return; // nothing to do
17 }
18
19 let mut recv_buf = vec![0; args.iter().map(Vec::len).max().unwrap()];
20
21 let name = CStr::from_bytes_with_nul(b"/sort\0").unwrap();
22
23 // create queue with the necessary permissions and open it
24 let mq = posixmq::OpenOptions::readwrite()
25 .nonblocking() // use WouldBlock to detect that the queue is empty
26 .create_new()
27 .mode(0o000) // only affects future attempts at opening it
28 .capacity(args.len())
29 .max_msg_len(recv_buf.len())
30 .open_c(name)
31 .expect("opening queue failed");
32
33 // write arguments to the queue
34 for arg in args {
35 mq.send(arg.len() as u32, &arg).expect("sending failed");
36 }
37
38 // read until the queue is empty
39 let stdout = stdout();
40 let mut stdout = stdout.lock();
41 loop {
42 match mq.recv(&mut recv_buf) {
43 Err(ref e) if e.kind() == ErrorKind::WouldBlock => break,
44 Err(e) => panic!("receiving failed: {}", e),
45 Ok((_priority, len)) => {
46 stdout.write_all(&recv_buf[..len])
47 .and_then(|()| stdout.write_all(b"\n") )
48 .expect("writing to stdout failed");
49 }
50 }
51 }
52
53 // clean up
54 posixmq::remove_queue_c(name).expect("deleting queue failed")
55}
More examples
28fn main() {
29 println!("{}:", std::env::consts::OS);
30
31 let mq = posixmq::OpenOptions::readwrite()
32 .create_new()
33 .open("/default_capacities")
34 .expect("Cannot create new posix message queue /default_capacities");
35 posixmq::remove_queue("/default_capacities")
36 .expect("Cannot delete posix message queue /default_capacities");
37
38 let attrs = mq.attributes().expect("Cannot get attributes for queue");
39 println!("default queue capacity: {}", attrs.capacity);
40 println!("default maximum message length: {}", attrs.max_msg_len);
41
42 println!("max message priority: {}", btry(0xff_ff_ff_ff, |priority| {
43 mq.send(priority as u32, b"b")
44 .map(|_| mq.recv(&mut vec![0; attrs.max_msg_len]).unwrap() )
45 .is_ok()
46 }));
47 println!("allows empty messages: {}", mq.send(0, b"").is_ok());
48 drop(mq);
49
50 println!("max queue capacity: {}", btry(1_000_000, |capacity| {
51 posixmq::OpenOptions::readwrite()
52 .capacity(capacity)
53 .max_msg_len(1)
54 .create_new()
55 .open("/max_capacity")
56 .map(|_| posixmq::remove_queue("/max_capacity").unwrap() )
57 .is_ok()
58 }));
59
60 println!("max message length: {}", btry(1_000_000, |length| {
61 posixmq::OpenOptions::readwrite()
62 .max_msg_len(length)
63 .capacity(1)
64 .create_new()
65 .open("/max_length")
66 .map(|_| posixmq::remove_queue("/max_length").unwrap() )
67 .is_ok()
68 }));
69
70 println!("max equal: {}", btry(1_000_000, |equal| {
71 posixmq::OpenOptions::readwrite()
72 .max_msg_len(equal)
73 .capacity(equal)
74 .create_new()
75 .open("/max_equal")
76 .map(|_| posixmq::remove_queue("/max_equal").unwrap() )
77 .is_ok()
78 }));
79
80 println!("allows just \"/\": {}", {
81 let result = posixmq::PosixMq::create("/");
82 let _ = posixmq::remove_queue("/");
83 result.is_ok()
84 });
85 println!("enforces name rules: {}", {
86 let noslash = std::ffi::CStr::from_bytes_with_nul(b"noslash\0").unwrap();
87 let result = posixmq::OpenOptions::readwrite().create().open_c(noslash);
88 let _ = posixmq::remove_queue_c(noslash);
89 result.is_err()
90 });
91}
Sourcepub fn capacity(&mut self, capacity: usize) -> &mut Self
pub fn capacity(&mut self, capacity: usize) -> &mut Self
Set the maximum number of messages in the queue.
When the queue is full, further send()
s will either block
or fail with an error of type ErrorKind::WouldBlock
.
If both capacity and max_msg_len are zero (or not set), the queue
will be created with a maximum length and capacity decided by the
operating system.
If this value is specified, max_msg_len should also be, or opening the
message queue might fail.
Examples found in repository?
10fn main() {
11 let args = args_os()
12 .skip(1)
13 .map(|osstring| osstring.into_vec() )
14 .collect::<Vec<Vec<u8>>>();
15 if args.is_empty() {
16 return; // nothing to do
17 }
18
19 let mut recv_buf = vec![0; args.iter().map(Vec::len).max().unwrap()];
20
21 let name = CStr::from_bytes_with_nul(b"/sort\0").unwrap();
22
23 // create queue with the necessary permissions and open it
24 let mq = posixmq::OpenOptions::readwrite()
25 .nonblocking() // use WouldBlock to detect that the queue is empty
26 .create_new()
27 .mode(0o000) // only affects future attempts at opening it
28 .capacity(args.len())
29 .max_msg_len(recv_buf.len())
30 .open_c(name)
31 .expect("opening queue failed");
32
33 // write arguments to the queue
34 for arg in args {
35 mq.send(arg.len() as u32, &arg).expect("sending failed");
36 }
37
38 // read until the queue is empty
39 let stdout = stdout();
40 let mut stdout = stdout.lock();
41 loop {
42 match mq.recv(&mut recv_buf) {
43 Err(ref e) if e.kind() == ErrorKind::WouldBlock => break,
44 Err(e) => panic!("receiving failed: {}", e),
45 Ok((_priority, len)) => {
46 stdout.write_all(&recv_buf[..len])
47 .and_then(|()| stdout.write_all(b"\n") )
48 .expect("writing to stdout failed");
49 }
50 }
51 }
52
53 // clean up
54 posixmq::remove_queue_c(name).expect("deleting queue failed")
55}
More examples
28fn main() {
29 println!("{}:", std::env::consts::OS);
30
31 let mq = posixmq::OpenOptions::readwrite()
32 .create_new()
33 .open("/default_capacities")
34 .expect("Cannot create new posix message queue /default_capacities");
35 posixmq::remove_queue("/default_capacities")
36 .expect("Cannot delete posix message queue /default_capacities");
37
38 let attrs = mq.attributes().expect("Cannot get attributes for queue");
39 println!("default queue capacity: {}", attrs.capacity);
40 println!("default maximum message length: {}", attrs.max_msg_len);
41
42 println!("max message priority: {}", btry(0xff_ff_ff_ff, |priority| {
43 mq.send(priority as u32, b"b")
44 .map(|_| mq.recv(&mut vec![0; attrs.max_msg_len]).unwrap() )
45 .is_ok()
46 }));
47 println!("allows empty messages: {}", mq.send(0, b"").is_ok());
48 drop(mq);
49
50 println!("max queue capacity: {}", btry(1_000_000, |capacity| {
51 posixmq::OpenOptions::readwrite()
52 .capacity(capacity)
53 .max_msg_len(1)
54 .create_new()
55 .open("/max_capacity")
56 .map(|_| posixmq::remove_queue("/max_capacity").unwrap() )
57 .is_ok()
58 }));
59
60 println!("max message length: {}", btry(1_000_000, |length| {
61 posixmq::OpenOptions::readwrite()
62 .max_msg_len(length)
63 .capacity(1)
64 .create_new()
65 .open("/max_length")
66 .map(|_| posixmq::remove_queue("/max_length").unwrap() )
67 .is_ok()
68 }));
69
70 println!("max equal: {}", btry(1_000_000, |equal| {
71 posixmq::OpenOptions::readwrite()
72 .max_msg_len(equal)
73 .capacity(equal)
74 .create_new()
75 .open("/max_equal")
76 .map(|_| posixmq::remove_queue("/max_equal").unwrap() )
77 .is_ok()
78 }));
79
80 println!("allows just \"/\": {}", {
81 let result = posixmq::PosixMq::create("/");
82 let _ = posixmq::remove_queue("/");
83 result.is_ok()
84 });
85 println!("enforces name rules: {}", {
86 let noslash = std::ffi::CStr::from_bytes_with_nul(b"noslash\0").unwrap();
87 let result = posixmq::OpenOptions::readwrite().create().open_c(noslash);
88 let _ = posixmq::remove_queue_c(noslash);
89 result.is_err()
90 });
91}
Sourcepub fn create(&mut self) -> &mut Self
pub fn create(&mut self) -> &mut Self
Create message queue if it doesn’t exist.
Examples found in repository?
9fn main() {
10 let args = args_os().skip(1).collect::<Vec<_>>();
11 if args.len() % 2 == 0 {
12 eprintln!("Usage: cargo run --example send /queue-name [priority message] ...");
13 return;
14 }
15
16 // open the message queue
17 let mq = posixmq::OpenOptions::writeonly()
18 .create()
19 .mode(0o666) // FIXME set umask to actually make it sendable for others
20 .open(args[0].as_bytes())
21 .expect("opening failed");
22
23 if args.len() == 1 {
24 // read from stdin
25 for line in stdin().lock().lines() {
26 let line = line.expect("input must be UTF-8, sorry");
27 let line = line.trim_start();
28 let num_end = line.find(' ').expect("no space in line");
29 let priority = line[..num_end].parse::<u32>().expect("priority is not a number");
30 let msg = line[num_end+1..].trim_start();
31 mq.send(priority, msg.as_bytes()).expect("sending failed");
32 }
33 } else {
34 // read from the command line
35 for i in (1..args.len()).step_by(2) {
36 let priority = args[i].to_str()
37 .and_then(|s| s.parse::<u32>().ok() )
38 .expect("priority is not a number");
39 let msg = args[i+1].as_bytes();
40 mq.send(priority, msg).expect("sending failed");
41 }
42 }
43}
More examples
28fn main() {
29 println!("{}:", std::env::consts::OS);
30
31 let mq = posixmq::OpenOptions::readwrite()
32 .create_new()
33 .open("/default_capacities")
34 .expect("Cannot create new posix message queue /default_capacities");
35 posixmq::remove_queue("/default_capacities")
36 .expect("Cannot delete posix message queue /default_capacities");
37
38 let attrs = mq.attributes().expect("Cannot get attributes for queue");
39 println!("default queue capacity: {}", attrs.capacity);
40 println!("default maximum message length: {}", attrs.max_msg_len);
41
42 println!("max message priority: {}", btry(0xff_ff_ff_ff, |priority| {
43 mq.send(priority as u32, b"b")
44 .map(|_| mq.recv(&mut vec![0; attrs.max_msg_len]).unwrap() )
45 .is_ok()
46 }));
47 println!("allows empty messages: {}", mq.send(0, b"").is_ok());
48 drop(mq);
49
50 println!("max queue capacity: {}", btry(1_000_000, |capacity| {
51 posixmq::OpenOptions::readwrite()
52 .capacity(capacity)
53 .max_msg_len(1)
54 .create_new()
55 .open("/max_capacity")
56 .map(|_| posixmq::remove_queue("/max_capacity").unwrap() )
57 .is_ok()
58 }));
59
60 println!("max message length: {}", btry(1_000_000, |length| {
61 posixmq::OpenOptions::readwrite()
62 .max_msg_len(length)
63 .capacity(1)
64 .create_new()
65 .open("/max_length")
66 .map(|_| posixmq::remove_queue("/max_length").unwrap() )
67 .is_ok()
68 }));
69
70 println!("max equal: {}", btry(1_000_000, |equal| {
71 posixmq::OpenOptions::readwrite()
72 .max_msg_len(equal)
73 .capacity(equal)
74 .create_new()
75 .open("/max_equal")
76 .map(|_| posixmq::remove_queue("/max_equal").unwrap() )
77 .is_ok()
78 }));
79
80 println!("allows just \"/\": {}", {
81 let result = posixmq::PosixMq::create("/");
82 let _ = posixmq::remove_queue("/");
83 result.is_ok()
84 });
85 println!("enforces name rules: {}", {
86 let noslash = std::ffi::CStr::from_bytes_with_nul(b"noslash\0").unwrap();
87 let result = posixmq::OpenOptions::readwrite().create().open_c(noslash);
88 let _ = posixmq::remove_queue_c(noslash);
89 result.is_err()
90 });
91}
7fn main() {
8 use std::env::args;
9 use std::io::ErrorKind;
10
11 use mio::{Poll, Events, Interest, Token};
12
13 let mut queues = args().skip(1).collect::<Vec<_>>();
14 let dst = queues.pop().expect("arguments required");
15
16 // open source queues
17 let mut src = Vec::new();
18 for name in queues {
19 match posixmq::OpenOptions::readonly().nonblocking().open(&name) {
20 Ok(mq) => src.push((mq, name)),
21 Err(e) => panic!("Cannot open {:?} for receiving: {}", name, e),
22 }
23 }
24
25 // open destination queue
26 let mut dst = match posixmq::OpenOptions::writeonly().nonblocking().create().open(&dst) {
27 Ok(mq) => (mq, dst),
28 Err(e) => panic!("Cannot open or create {:?} for sending: {}", dst, e),
29 };
30
31 let mut poll = Poll::new().expect("Cannot create selector");
32 poll.registry()
33 .register(&mut dst.0, Token(0), Interest::WRITABLE)
34 .expect("registering destination failed");
35 for (i, &mut(ref mut src, _)) in src.iter_mut().enumerate() {
36 poll.registry()
37 .register(src, Token(i+1), Interest::READABLE)
38 .expect("registering a source failed");
39 }
40
41 let mut unsent = Vec::<(u32, Box<[u8]>, &str)>::new();
42 let mut buf = [0; 8192];
43 let mut events = Events::with_capacity(1024);
44 loop {
45 poll.poll(&mut events, None).expect("Cannot poll selector");
46
47 // receive all available messages from queues that are ready
48 for event in events.iter() {
49 if event.token() == Token(0) {
50 // dst; will try to send below even without this event
51 continue;
52 }
53
54 let &(ref mq, ref name) = &src[event.token().0-1];
55 loop {
56 match mq.recv(&mut buf) {
57 Err(ref e) if e.kind() == ErrorKind::WouldBlock => break,
58 Err(e) => panic!("Error receiving from {}: {}", name, e),
59 Ok((priority, len)) => unsent.push((priority, Box::from(&buf[..len]), name)),
60 }
61 }
62 }
63
64 // send as many messages as possible
65 while let Some(&(priority, ref msg, ref src)) = unsent.last() {
66 if let Err(e) = dst.0.send(priority, msg) {
67 if e.kind() == ErrorKind::WouldBlock {
68 break;
69 }
70 panic!("Error sending to {}: {}", dst.1, e);
71 }
72 println!("message of priority {} with {} bytes from {} sent to {}",
73 priority, msg.len(), src, dst.1
74 );
75 let _ = unsent.pop();
76 }
77 }
78}
Sourcepub fn create_new(&mut self) -> &mut Self
pub fn create_new(&mut self) -> &mut Self
Create a new queue, failing if the queue already exists.
Examples found in repository?
10fn main() {
11 let args = args_os()
12 .skip(1)
13 .map(|osstring| osstring.into_vec() )
14 .collect::<Vec<Vec<u8>>>();
15 if args.is_empty() {
16 return; // nothing to do
17 }
18
19 let mut recv_buf = vec![0; args.iter().map(Vec::len).max().unwrap()];
20
21 let name = CStr::from_bytes_with_nul(b"/sort\0").unwrap();
22
23 // create queue with the necessary permissions and open it
24 let mq = posixmq::OpenOptions::readwrite()
25 .nonblocking() // use WouldBlock to detect that the queue is empty
26 .create_new()
27 .mode(0o000) // only affects future attempts at opening it
28 .capacity(args.len())
29 .max_msg_len(recv_buf.len())
30 .open_c(name)
31 .expect("opening queue failed");
32
33 // write arguments to the queue
34 for arg in args {
35 mq.send(arg.len() as u32, &arg).expect("sending failed");
36 }
37
38 // read until the queue is empty
39 let stdout = stdout();
40 let mut stdout = stdout.lock();
41 loop {
42 match mq.recv(&mut recv_buf) {
43 Err(ref e) if e.kind() == ErrorKind::WouldBlock => break,
44 Err(e) => panic!("receiving failed: {}", e),
45 Ok((_priority, len)) => {
46 stdout.write_all(&recv_buf[..len])
47 .and_then(|()| stdout.write_all(b"\n") )
48 .expect("writing to stdout failed");
49 }
50 }
51 }
52
53 // clean up
54 posixmq::remove_queue_c(name).expect("deleting queue failed")
55}
More examples
28fn main() {
29 println!("{}:", std::env::consts::OS);
30
31 let mq = posixmq::OpenOptions::readwrite()
32 .create_new()
33 .open("/default_capacities")
34 .expect("Cannot create new posix message queue /default_capacities");
35 posixmq::remove_queue("/default_capacities")
36 .expect("Cannot delete posix message queue /default_capacities");
37
38 let attrs = mq.attributes().expect("Cannot get attributes for queue");
39 println!("default queue capacity: {}", attrs.capacity);
40 println!("default maximum message length: {}", attrs.max_msg_len);
41
42 println!("max message priority: {}", btry(0xff_ff_ff_ff, |priority| {
43 mq.send(priority as u32, b"b")
44 .map(|_| mq.recv(&mut vec![0; attrs.max_msg_len]).unwrap() )
45 .is_ok()
46 }));
47 println!("allows empty messages: {}", mq.send(0, b"").is_ok());
48 drop(mq);
49
50 println!("max queue capacity: {}", btry(1_000_000, |capacity| {
51 posixmq::OpenOptions::readwrite()
52 .capacity(capacity)
53 .max_msg_len(1)
54 .create_new()
55 .open("/max_capacity")
56 .map(|_| posixmq::remove_queue("/max_capacity").unwrap() )
57 .is_ok()
58 }));
59
60 println!("max message length: {}", btry(1_000_000, |length| {
61 posixmq::OpenOptions::readwrite()
62 .max_msg_len(length)
63 .capacity(1)
64 .create_new()
65 .open("/max_length")
66 .map(|_| posixmq::remove_queue("/max_length").unwrap() )
67 .is_ok()
68 }));
69
70 println!("max equal: {}", btry(1_000_000, |equal| {
71 posixmq::OpenOptions::readwrite()
72 .max_msg_len(equal)
73 .capacity(equal)
74 .create_new()
75 .open("/max_equal")
76 .map(|_| posixmq::remove_queue("/max_equal").unwrap() )
77 .is_ok()
78 }));
79
80 println!("allows just \"/\": {}", {
81 let result = posixmq::PosixMq::create("/");
82 let _ = posixmq::remove_queue("/");
83 result.is_ok()
84 });
85 println!("enforces name rules: {}", {
86 let noslash = std::ffi::CStr::from_bytes_with_nul(b"noslash\0").unwrap();
87 let result = posixmq::OpenOptions::readwrite().create().open_c(noslash);
88 let _ = posixmq::remove_queue_c(noslash);
89 result.is_err()
90 });
91}
Sourcepub fn existing(&mut self) -> &mut Self
pub fn existing(&mut self) -> &mut Self
Require the queue to already exist, failing if it doesn’t.
Sourcepub fn nonblocking(&mut self) -> &mut Self
pub fn nonblocking(&mut self) -> &mut Self
Open the message queue in non-blocking mode.
This must be done if you want to use the message queue with mio.
Examples found in repository?
11fn main() {
12 let mut args = args_os().skip(1);
13 let name = args.next().expect("argument required");
14 let timeout = args.next().map(|arg| {
15 let arg = arg.into_string().expect("invalid timeout");
16 if arg == "nonblocking" || arg == "drain" || arg == "available" || arg == "empty" {
17 None
18 } else if arg.ends_with("ms") {
19 Some(Duration::from_millis(arg[..arg.len()-2].parse().expect("invalid duration")))
20 } else if arg.ends_with("s") {
21 Some(Duration::from_secs(arg[..arg.len()-1].parse().expect("invalid duration")))
22 } else {
23 Some(Duration::from_secs(arg.parse().expect("invalid duration")))
24 }
25 });
26
27 let mut opts = posixmq::OpenOptions::readonly();
28 if timeout == Some(None) {
29 opts = *opts.nonblocking();
30 }
31 let mq = opts.open(name.as_bytes()).expect("opening failed");
32
33 if let Some(Some(timeout)) = timeout {
34 let mut buf = vec![0; mq.attributes().unwrap_or_default().max_msg_len];
35 while let Ok((priority, len)) = mq.recv_timeout(&mut buf, timeout) {
36 print!("{:3}\t", priority);
37 stdout().write_all(&buf[..len]).expect("writing to stdout failed");
38 println!();
39 }
40 } else {
41 for (priority, msg) in mq {
42 print!("{:3}\t", priority);
43 stdout().write_all(&msg).expect("writing to stdout failed");
44 println!();
45 }
46 }
47}
More examples
10fn main() {
11 let args = args_os()
12 .skip(1)
13 .map(|osstring| osstring.into_vec() )
14 .collect::<Vec<Vec<u8>>>();
15 if args.is_empty() {
16 return; // nothing to do
17 }
18
19 let mut recv_buf = vec![0; args.iter().map(Vec::len).max().unwrap()];
20
21 let name = CStr::from_bytes_with_nul(b"/sort\0").unwrap();
22
23 // create queue with the necessary permissions and open it
24 let mq = posixmq::OpenOptions::readwrite()
25 .nonblocking() // use WouldBlock to detect that the queue is empty
26 .create_new()
27 .mode(0o000) // only affects future attempts at opening it
28 .capacity(args.len())
29 .max_msg_len(recv_buf.len())
30 .open_c(name)
31 .expect("opening queue failed");
32
33 // write arguments to the queue
34 for arg in args {
35 mq.send(arg.len() as u32, &arg).expect("sending failed");
36 }
37
38 // read until the queue is empty
39 let stdout = stdout();
40 let mut stdout = stdout.lock();
41 loop {
42 match mq.recv(&mut recv_buf) {
43 Err(ref e) if e.kind() == ErrorKind::WouldBlock => break,
44 Err(e) => panic!("receiving failed: {}", e),
45 Ok((_priority, len)) => {
46 stdout.write_all(&recv_buf[..len])
47 .and_then(|()| stdout.write_all(b"\n") )
48 .expect("writing to stdout failed");
49 }
50 }
51 }
52
53 // clean up
54 posixmq::remove_queue_c(name).expect("deleting queue failed")
55}
7fn main() {
8 use std::env::args;
9 use std::io::ErrorKind;
10
11 use mio::{Poll, Events, Interest, Token};
12
13 let mut queues = args().skip(1).collect::<Vec<_>>();
14 let dst = queues.pop().expect("arguments required");
15
16 // open source queues
17 let mut src = Vec::new();
18 for name in queues {
19 match posixmq::OpenOptions::readonly().nonblocking().open(&name) {
20 Ok(mq) => src.push((mq, name)),
21 Err(e) => panic!("Cannot open {:?} for receiving: {}", name, e),
22 }
23 }
24
25 // open destination queue
26 let mut dst = match posixmq::OpenOptions::writeonly().nonblocking().create().open(&dst) {
27 Ok(mq) => (mq, dst),
28 Err(e) => panic!("Cannot open or create {:?} for sending: {}", dst, e),
29 };
30
31 let mut poll = Poll::new().expect("Cannot create selector");
32 poll.registry()
33 .register(&mut dst.0, Token(0), Interest::WRITABLE)
34 .expect("registering destination failed");
35 for (i, &mut(ref mut src, _)) in src.iter_mut().enumerate() {
36 poll.registry()
37 .register(src, Token(i+1), Interest::READABLE)
38 .expect("registering a source failed");
39 }
40
41 let mut unsent = Vec::<(u32, Box<[u8]>, &str)>::new();
42 let mut buf = [0; 8192];
43 let mut events = Events::with_capacity(1024);
44 loop {
45 poll.poll(&mut events, None).expect("Cannot poll selector");
46
47 // receive all available messages from queues that are ready
48 for event in events.iter() {
49 if event.token() == Token(0) {
50 // dst; will try to send below even without this event
51 continue;
52 }
53
54 let &(ref mq, ref name) = &src[event.token().0-1];
55 loop {
56 match mq.recv(&mut buf) {
57 Err(ref e) if e.kind() == ErrorKind::WouldBlock => break,
58 Err(e) => panic!("Error receiving from {}: {}", name, e),
59 Ok((priority, len)) => unsent.push((priority, Box::from(&buf[..len]), name)),
60 }
61 }
62 }
63
64 // send as many messages as possible
65 while let Some(&(priority, ref msg, ref src)) = unsent.last() {
66 if let Err(e) = dst.0.send(priority, msg) {
67 if e.kind() == ErrorKind::WouldBlock {
68 break;
69 }
70 panic!("Error sending to {}: {}", dst.1, e);
71 }
72 println!("message of priority {} with {} bytes from {} sent to {}",
73 priority, msg.len(), src, dst.1
74 );
75 let _ = unsent.pop();
76 }
77 }
78}
Sourcepub fn open<N: AsRef<[u8]> + ?Sized>(&self, name: &N) -> Result<PosixMq, Error>
pub fn open<N: AsRef<[u8]> + ?Sized>(&self, name: &N) -> Result<PosixMq, Error>
Open a queue with the specified options.
If the name doesn’t start with a ‘/’, one will be prepended.
§Errors
- Queue doesn’t exist (ENOENT) =>
ErrorKind::NotFound
- Name is just “/” (ENOENT) or is empty =>
ErrorKind::NotFound
- Queue already exists (EEXISTS) =>
ErrorKind::AlreadyExists
- Not permitted to open in this mode (EACCESS) =>
ErrorKind::PermissionDenied
- More than one ‘/’ in name (EACCESS) =>
ErrorKind::PermissionDenied
- Invalid capacities (EINVAL) =>
ErrorKind::InvalidInput
- Capacities too high (EMFILE) =>
ErrorKind::Other
- Posix message queues are disabled (ENOSYS) =>
ErrorKind::Other
- Name contains ‘\0’ =>
ErrorKind::InvalidInput
- Name is too long (ENAMETOOLONG) =>
ErrorKind::Other
- Unlikely (ENFILE, EMFILE, ENOMEM, ENOSPC) =>
ErrorKind::Other
- Possibly other
Examples found in repository?
9fn main() {
10 let args = args_os().skip(1).collect::<Vec<_>>();
11 if args.len() % 2 == 0 {
12 eprintln!("Usage: cargo run --example send /queue-name [priority message] ...");
13 return;
14 }
15
16 // open the message queue
17 let mq = posixmq::OpenOptions::writeonly()
18 .create()
19 .mode(0o666) // FIXME set umask to actually make it sendable for others
20 .open(args[0].as_bytes())
21 .expect("opening failed");
22
23 if args.len() == 1 {
24 // read from stdin
25 for line in stdin().lock().lines() {
26 let line = line.expect("input must be UTF-8, sorry");
27 let line = line.trim_start();
28 let num_end = line.find(' ').expect("no space in line");
29 let priority = line[..num_end].parse::<u32>().expect("priority is not a number");
30 let msg = line[num_end+1..].trim_start();
31 mq.send(priority, msg.as_bytes()).expect("sending failed");
32 }
33 } else {
34 // read from the command line
35 for i in (1..args.len()).step_by(2) {
36 let priority = args[i].to_str()
37 .and_then(|s| s.parse::<u32>().ok() )
38 .expect("priority is not a number");
39 let msg = args[i+1].as_bytes();
40 mq.send(priority, msg).expect("sending failed");
41 }
42 }
43}
More examples
11fn main() {
12 let mut args = args_os().skip(1);
13 let name = args.next().expect("argument required");
14 let timeout = args.next().map(|arg| {
15 let arg = arg.into_string().expect("invalid timeout");
16 if arg == "nonblocking" || arg == "drain" || arg == "available" || arg == "empty" {
17 None
18 } else if arg.ends_with("ms") {
19 Some(Duration::from_millis(arg[..arg.len()-2].parse().expect("invalid duration")))
20 } else if arg.ends_with("s") {
21 Some(Duration::from_secs(arg[..arg.len()-1].parse().expect("invalid duration")))
22 } else {
23 Some(Duration::from_secs(arg.parse().expect("invalid duration")))
24 }
25 });
26
27 let mut opts = posixmq::OpenOptions::readonly();
28 if timeout == Some(None) {
29 opts = *opts.nonblocking();
30 }
31 let mq = opts.open(name.as_bytes()).expect("opening failed");
32
33 if let Some(Some(timeout)) = timeout {
34 let mut buf = vec![0; mq.attributes().unwrap_or_default().max_msg_len];
35 while let Ok((priority, len)) = mq.recv_timeout(&mut buf, timeout) {
36 print!("{:3}\t", priority);
37 stdout().write_all(&buf[..len]).expect("writing to stdout failed");
38 println!();
39 }
40 } else {
41 for (priority, msg) in mq {
42 print!("{:3}\t", priority);
43 stdout().write_all(&msg).expect("writing to stdout failed");
44 println!();
45 }
46 }
47}
28fn main() {
29 println!("{}:", std::env::consts::OS);
30
31 let mq = posixmq::OpenOptions::readwrite()
32 .create_new()
33 .open("/default_capacities")
34 .expect("Cannot create new posix message queue /default_capacities");
35 posixmq::remove_queue("/default_capacities")
36 .expect("Cannot delete posix message queue /default_capacities");
37
38 let attrs = mq.attributes().expect("Cannot get attributes for queue");
39 println!("default queue capacity: {}", attrs.capacity);
40 println!("default maximum message length: {}", attrs.max_msg_len);
41
42 println!("max message priority: {}", btry(0xff_ff_ff_ff, |priority| {
43 mq.send(priority as u32, b"b")
44 .map(|_| mq.recv(&mut vec![0; attrs.max_msg_len]).unwrap() )
45 .is_ok()
46 }));
47 println!("allows empty messages: {}", mq.send(0, b"").is_ok());
48 drop(mq);
49
50 println!("max queue capacity: {}", btry(1_000_000, |capacity| {
51 posixmq::OpenOptions::readwrite()
52 .capacity(capacity)
53 .max_msg_len(1)
54 .create_new()
55 .open("/max_capacity")
56 .map(|_| posixmq::remove_queue("/max_capacity").unwrap() )
57 .is_ok()
58 }));
59
60 println!("max message length: {}", btry(1_000_000, |length| {
61 posixmq::OpenOptions::readwrite()
62 .max_msg_len(length)
63 .capacity(1)
64 .create_new()
65 .open("/max_length")
66 .map(|_| posixmq::remove_queue("/max_length").unwrap() )
67 .is_ok()
68 }));
69
70 println!("max equal: {}", btry(1_000_000, |equal| {
71 posixmq::OpenOptions::readwrite()
72 .max_msg_len(equal)
73 .capacity(equal)
74 .create_new()
75 .open("/max_equal")
76 .map(|_| posixmq::remove_queue("/max_equal").unwrap() )
77 .is_ok()
78 }));
79
80 println!("allows just \"/\": {}", {
81 let result = posixmq::PosixMq::create("/");
82 let _ = posixmq::remove_queue("/");
83 result.is_ok()
84 });
85 println!("enforces name rules: {}", {
86 let noslash = std::ffi::CStr::from_bytes_with_nul(b"noslash\0").unwrap();
87 let result = posixmq::OpenOptions::readwrite().create().open_c(noslash);
88 let _ = posixmq::remove_queue_c(noslash);
89 result.is_err()
90 });
91}
7fn main() {
8 use std::env::args;
9 use std::io::ErrorKind;
10
11 use mio::{Poll, Events, Interest, Token};
12
13 let mut queues = args().skip(1).collect::<Vec<_>>();
14 let dst = queues.pop().expect("arguments required");
15
16 // open source queues
17 let mut src = Vec::new();
18 for name in queues {
19 match posixmq::OpenOptions::readonly().nonblocking().open(&name) {
20 Ok(mq) => src.push((mq, name)),
21 Err(e) => panic!("Cannot open {:?} for receiving: {}", name, e),
22 }
23 }
24
25 // open destination queue
26 let mut dst = match posixmq::OpenOptions::writeonly().nonblocking().create().open(&dst) {
27 Ok(mq) => (mq, dst),
28 Err(e) => panic!("Cannot open or create {:?} for sending: {}", dst, e),
29 };
30
31 let mut poll = Poll::new().expect("Cannot create selector");
32 poll.registry()
33 .register(&mut dst.0, Token(0), Interest::WRITABLE)
34 .expect("registering destination failed");
35 for (i, &mut(ref mut src, _)) in src.iter_mut().enumerate() {
36 poll.registry()
37 .register(src, Token(i+1), Interest::READABLE)
38 .expect("registering a source failed");
39 }
40
41 let mut unsent = Vec::<(u32, Box<[u8]>, &str)>::new();
42 let mut buf = [0; 8192];
43 let mut events = Events::with_capacity(1024);
44 loop {
45 poll.poll(&mut events, None).expect("Cannot poll selector");
46
47 // receive all available messages from queues that are ready
48 for event in events.iter() {
49 if event.token() == Token(0) {
50 // dst; will try to send below even without this event
51 continue;
52 }
53
54 let &(ref mq, ref name) = &src[event.token().0-1];
55 loop {
56 match mq.recv(&mut buf) {
57 Err(ref e) if e.kind() == ErrorKind::WouldBlock => break,
58 Err(e) => panic!("Error receiving from {}: {}", name, e),
59 Ok((priority, len)) => unsent.push((priority, Box::from(&buf[..len]), name)),
60 }
61 }
62 }
63
64 // send as many messages as possible
65 while let Some(&(priority, ref msg, ref src)) = unsent.last() {
66 if let Err(e) = dst.0.send(priority, msg) {
67 if e.kind() == ErrorKind::WouldBlock {
68 break;
69 }
70 panic!("Error sending to {}: {}", dst.1, e);
71 }
72 println!("message of priority {} with {} bytes from {} sent to {}",
73 priority, msg.len(), src, dst.1
74 );
75 let _ = unsent.pop();
76 }
77 }
78}
Sourcepub fn open_c(&self, name: &CStr) -> Result<PosixMq, Error>
pub fn open_c(&self, name: &CStr) -> Result<PosixMq, Error>
Open a queue with the specified options and without inspecting name
or allocating.
This can on NetBSD be used to access message queues with names that doesn’t start with a ‘/’.
§Errors
- Queue doesn’t exist (ENOENT) =>
ErrorKind::NotFound
- Name is just “/” (ENOENT) =>
ErrorKind::NotFound
- Queue already exists (EEXISTS) =>
ErrorKind::AlreadyExists
- Not permitted to open in this mode (EACCESS) =>
ErrorKind::PermissionDenied
- More than one ‘/’ in name (EACCESS) =>
ErrorKind::PermissionDenied
- Invalid capacities (EINVAL) =>
ErrorKind::InvalidInput
- Posix message queues are disabled (ENOSYS) =>
ErrorKind::Other
- Name is empty (EINVAL) =>
ErrorKind::InvalidInput
- Name is too long (ENAMETOOLONG) =>
ErrorKind::Other
- Unlikely (ENFILE, EMFILE, ENOMEM, ENOSPC) =>
ErrorKind::Other
- Possibly other
Examples found in repository?
10fn main() {
11 let args = args_os()
12 .skip(1)
13 .map(|osstring| osstring.into_vec() )
14 .collect::<Vec<Vec<u8>>>();
15 if args.is_empty() {
16 return; // nothing to do
17 }
18
19 let mut recv_buf = vec![0; args.iter().map(Vec::len).max().unwrap()];
20
21 let name = CStr::from_bytes_with_nul(b"/sort\0").unwrap();
22
23 // create queue with the necessary permissions and open it
24 let mq = posixmq::OpenOptions::readwrite()
25 .nonblocking() // use WouldBlock to detect that the queue is empty
26 .create_new()
27 .mode(0o000) // only affects future attempts at opening it
28 .capacity(args.len())
29 .max_msg_len(recv_buf.len())
30 .open_c(name)
31 .expect("opening queue failed");
32
33 // write arguments to the queue
34 for arg in args {
35 mq.send(arg.len() as u32, &arg).expect("sending failed");
36 }
37
38 // read until the queue is empty
39 let stdout = stdout();
40 let mut stdout = stdout.lock();
41 loop {
42 match mq.recv(&mut recv_buf) {
43 Err(ref e) if e.kind() == ErrorKind::WouldBlock => break,
44 Err(e) => panic!("receiving failed: {}", e),
45 Ok((_priority, len)) => {
46 stdout.write_all(&recv_buf[..len])
47 .and_then(|()| stdout.write_all(b"\n") )
48 .expect("writing to stdout failed");
49 }
50 }
51 }
52
53 // clean up
54 posixmq::remove_queue_c(name).expect("deleting queue failed")
55}
More examples
28fn main() {
29 println!("{}:", std::env::consts::OS);
30
31 let mq = posixmq::OpenOptions::readwrite()
32 .create_new()
33 .open("/default_capacities")
34 .expect("Cannot create new posix message queue /default_capacities");
35 posixmq::remove_queue("/default_capacities")
36 .expect("Cannot delete posix message queue /default_capacities");
37
38 let attrs = mq.attributes().expect("Cannot get attributes for queue");
39 println!("default queue capacity: {}", attrs.capacity);
40 println!("default maximum message length: {}", attrs.max_msg_len);
41
42 println!("max message priority: {}", btry(0xff_ff_ff_ff, |priority| {
43 mq.send(priority as u32, b"b")
44 .map(|_| mq.recv(&mut vec![0; attrs.max_msg_len]).unwrap() )
45 .is_ok()
46 }));
47 println!("allows empty messages: {}", mq.send(0, b"").is_ok());
48 drop(mq);
49
50 println!("max queue capacity: {}", btry(1_000_000, |capacity| {
51 posixmq::OpenOptions::readwrite()
52 .capacity(capacity)
53 .max_msg_len(1)
54 .create_new()
55 .open("/max_capacity")
56 .map(|_| posixmq::remove_queue("/max_capacity").unwrap() )
57 .is_ok()
58 }));
59
60 println!("max message length: {}", btry(1_000_000, |length| {
61 posixmq::OpenOptions::readwrite()
62 .max_msg_len(length)
63 .capacity(1)
64 .create_new()
65 .open("/max_length")
66 .map(|_| posixmq::remove_queue("/max_length").unwrap() )
67 .is_ok()
68 }));
69
70 println!("max equal: {}", btry(1_000_000, |equal| {
71 posixmq::OpenOptions::readwrite()
72 .max_msg_len(equal)
73 .capacity(equal)
74 .create_new()
75 .open("/max_equal")
76 .map(|_| posixmq::remove_queue("/max_equal").unwrap() )
77 .is_ok()
78 }));
79
80 println!("allows just \"/\": {}", {
81 let result = posixmq::PosixMq::create("/");
82 let _ = posixmq::remove_queue("/");
83 result.is_ok()
84 });
85 println!("enforces name rules: {}", {
86 let noslash = std::ffi::CStr::from_bytes_with_nul(b"noslash\0").unwrap();
87 let result = posixmq::OpenOptions::readwrite().create().open_c(noslash);
88 let _ = posixmq::remove_queue_c(noslash);
89 result.is_err()
90 });
91}
Trait Implementations§
Source§impl Clone for OpenOptions
impl Clone for OpenOptions
Source§fn clone(&self) -> OpenOptions
fn clone(&self) -> OpenOptions
1.0.0 · Source§fn clone_from(&mut self, source: &Self)
fn clone_from(&mut self, source: &Self)
source
. Read more