nix/
mqueue.rs

1//! Posix Message Queue functions
2//!
3//! # Example
4//!
5// no_run because a kernel module may be required.
6//! ```no_run
7//! # use std::ffi::CString;
8//! # use nix::mqueue::*;
9//! use nix::sys::stat::Mode;
10//!
11//! const MSG_SIZE: mq_attr_member_t = 32;
12//! let mq_name= "/a_nix_test_queue";
13//!
14//! let oflag0 = MQ_OFlag::O_CREAT | MQ_OFlag::O_WRONLY;
15//! let mode = Mode::S_IWUSR | Mode::S_IRUSR | Mode::S_IRGRP | Mode::S_IROTH;
16//! let mqd0 = mq_open(mq_name, oflag0, mode, None).unwrap();
17//! let msg_to_send = b"msg_1";
18//! mq_send(&mqd0, msg_to_send, 1).unwrap();
19//!
20//! let oflag1 = MQ_OFlag::O_CREAT | MQ_OFlag::O_RDONLY;
21//! let mqd1 = mq_open(mq_name, oflag1, mode, None).unwrap();
22//! let mut buf = [0u8; 32];
23//! let mut prio = 0u32;
24//! let len = mq_receive(&mqd1, &mut buf, &mut prio).unwrap();
25//! assert_eq!(prio, 1);
26//! assert_eq!(msg_to_send, &buf[0..len]);
27//!
28//! mq_close(mqd1).unwrap();
29//! mq_close(mqd0).unwrap();
30//! ```
31//! [Further reading and details on the C API](https://man7.org/linux/man-pages/man7/mq_overview.7.html)
32
33use crate::errno::Errno;
34use crate::NixPath;
35use crate::Result;
36
37use crate::sys::stat::Mode;
38use libc::{self, mqd_t, size_t};
39use std::mem;
40#[cfg(any(
41    target_os = "linux",
42    target_os = "netbsd",
43    target_os = "dragonfly"
44))]
45use std::os::unix::io::{
46    AsFd, AsRawFd, BorrowedFd, FromRawFd, IntoRawFd, RawFd,
47};
48
49libc_bitflags! {
50    /// Used with [`mq_open`].
51    pub struct MQ_OFlag: libc::c_int {
52        /// Open the message queue for receiving messages.
53        O_RDONLY;
54        /// Open the queue for sending messages.
55        O_WRONLY;
56        /// Open the queue for both receiving and sending messages
57        O_RDWR;
58        /// Create a message queue.
59        O_CREAT;
60        /// If set along with `O_CREAT`, `mq_open` will fail if the message
61        /// queue name exists.
62        O_EXCL;
63        /// `mq_send` and `mq_receive` should fail with `EAGAIN` rather than
64        /// wait for resources that are not currently available.
65        O_NONBLOCK;
66        /// Set the close-on-exec flag for the message queue descriptor.
67        O_CLOEXEC;
68    }
69}
70
71/// A message-queue attribute, optionally used with [`mq_setattr`] and
72/// [`mq_getattr`] and optionally [`mq_open`],
73#[repr(C)]
74#[derive(Clone, Copy, Debug, Eq, Hash, PartialEq)]
75pub struct MqAttr {
76    mq_attr: libc::mq_attr,
77}
78
79/// Identifies an open POSIX Message Queue
80// A safer wrapper around libc::mqd_t, which is a pointer on some platforms
81// Deliberately is not Clone to prevent use-after-close scenarios
82#[repr(transparent)]
83#[derive(Debug)]
84#[allow(missing_copy_implementations)]
85pub struct MqdT(mqd_t);
86
87// x32 compatibility
88// See https://sourceware.org/bugzilla/show_bug.cgi?id=21279
89/// Size of a message queue attribute member
90#[cfg(all(target_arch = "x86_64", target_pointer_width = "32"))]
91pub type mq_attr_member_t = i64;
92/// Size of a message queue attribute member
93#[cfg(not(all(target_arch = "x86_64", target_pointer_width = "32")))]
94pub type mq_attr_member_t = libc::c_long;
95
96impl MqAttr {
97    /// Create a new message queue attribute
98    ///
99    /// # Arguments
100    ///
101    /// - `mq_flags`:   Either `0` or `O_NONBLOCK`.
102    /// - `mq_maxmsg`:  Maximum number of messages on the queue.
103    /// - `mq_msgsize`: Maximum message size in bytes.
104    /// - `mq_curmsgs`: Number of messages currently in the queue.
105    pub fn new(
106        mq_flags: mq_attr_member_t,
107        mq_maxmsg: mq_attr_member_t,
108        mq_msgsize: mq_attr_member_t,
109        mq_curmsgs: mq_attr_member_t,
110    ) -> MqAttr {
111        let mut attr = mem::MaybeUninit::<libc::mq_attr>::uninit();
112        unsafe {
113            let p = attr.as_mut_ptr();
114            (*p).mq_flags = mq_flags;
115            (*p).mq_maxmsg = mq_maxmsg;
116            (*p).mq_msgsize = mq_msgsize;
117            (*p).mq_curmsgs = mq_curmsgs;
118            MqAttr {
119                mq_attr: attr.assume_init(),
120            }
121        }
122    }
123
124    /// The current flags, either `0` or `O_NONBLOCK`.
125    pub const fn flags(&self) -> mq_attr_member_t {
126        self.mq_attr.mq_flags
127    }
128
129    /// The max number of messages that can be held by the queue
130    pub const fn maxmsg(&self) -> mq_attr_member_t {
131        self.mq_attr.mq_maxmsg
132    }
133
134    /// The maximum size of each message (in bytes)
135    pub const fn msgsize(&self) -> mq_attr_member_t {
136        self.mq_attr.mq_msgsize
137    }
138
139    /// The number of messages currently held in the queue
140    pub const fn curmsgs(&self) -> mq_attr_member_t {
141        self.mq_attr.mq_curmsgs
142    }
143}
144
145/// Open a message queue
146///
147/// See also [`mq_open(2)`](https://pubs.opengroup.org/onlinepubs/9699919799/functions/mq_open.html)
148// The mode.bits() cast is only lossless on some OSes
149#[allow(clippy::cast_lossless)]
150pub fn mq_open<P>(
151    name: &P,
152    oflag: MQ_OFlag,
153    mode: Mode,
154    attr: Option<&MqAttr>,
155) -> Result<MqdT>
156where
157    P: ?Sized + NixPath,
158{
159    let res = name.with_nix_path(|cstr| match attr {
160        Some(mq_attr) => unsafe {
161            libc::mq_open(
162                cstr.as_ptr(),
163                oflag.bits(),
164                mode.bits() as libc::c_int,
165                &mq_attr.mq_attr as *const libc::mq_attr,
166            )
167        },
168        None => unsafe { libc::mq_open(cstr.as_ptr(), oflag.bits()) },
169    })?;
170
171    Errno::result(res).map(MqdT)
172}
173
174/// Remove a message queue
175///
176/// See also [`mq_unlink(2)`](https://pubs.opengroup.org/onlinepubs/9699919799/functions/mq_unlink.html)
177pub fn mq_unlink<P>(name: &P) -> Result<()>
178where
179    P: ?Sized + NixPath,
180{
181    let res =
182        name.with_nix_path(|cstr| unsafe { libc::mq_unlink(cstr.as_ptr()) })?;
183    Errno::result(res).map(drop)
184}
185
186/// Close a message queue
187///
188/// See also [`mq_close(2)`](https://pubs.opengroup.org/onlinepubs/9699919799/functions/mq_close.html)
189pub fn mq_close(mqdes: MqdT) -> Result<()> {
190    let res = unsafe { libc::mq_close(mqdes.0) };
191    Errno::result(res).map(drop)
192}
193
194/// Receive a message from a message queue
195///
196/// See also [`mq_receive(2)`](https://pubs.opengroup.org/onlinepubs/9699919799/functions/mq_receive.html)
197pub fn mq_receive(
198    mqdes: &MqdT,
199    message: &mut [u8],
200    msg_prio: &mut u32,
201) -> Result<usize> {
202    let len = message.len() as size_t;
203    let res = unsafe {
204        libc::mq_receive(
205            mqdes.0,
206            message.as_mut_ptr().cast(),
207            len,
208            msg_prio as *mut u32,
209        )
210    };
211    Errno::result(res).map(|r| r as usize)
212}
213
214feature! {
215    #![feature = "time"]
216    use crate::sys::time::TimeSpec;
217    /// Receive a message from a message queue with a timeout
218    ///
219    /// See also ['mq_timedreceive(2)'](https://pubs.opengroup.org/onlinepubs/9699919799/functions/mq_receive.html)
220    pub fn mq_timedreceive(
221        mqdes: &MqdT,
222        message: &mut [u8],
223        msg_prio: &mut u32,
224        abstime: &TimeSpec,
225    ) -> Result<usize> {
226        let len = message.len() as size_t;
227        let res = unsafe {
228            libc::mq_timedreceive(
229                mqdes.0,
230                message.as_mut_ptr().cast(),
231                len,
232                msg_prio as *mut u32,
233                abstime.as_ref(),
234            )
235        };
236        Errno::result(res).map(|r| r as usize)
237    }
238}
239
240/// Send a message to a message queue
241///
242/// See also [`mq_send(2)`](https://pubs.opengroup.org/onlinepubs/9699919799/functions/mq_send.html)
243pub fn mq_send(mqdes: &MqdT, message: &[u8], msq_prio: u32) -> Result<()> {
244    let res = unsafe {
245        libc::mq_send(mqdes.0, message.as_ptr().cast(), message.len(), msq_prio)
246    };
247    Errno::result(res).map(drop)
248}
249
250/// Get message queue attributes
251///
252/// See also [`mq_getattr(2)`](https://pubs.opengroup.org/onlinepubs/9699919799/functions/mq_getattr.html)
253pub fn mq_getattr(mqd: &MqdT) -> Result<MqAttr> {
254    let mut attr = mem::MaybeUninit::<libc::mq_attr>::uninit();
255    let res = unsafe { libc::mq_getattr(mqd.0, attr.as_mut_ptr()) };
256    Errno::result(res).map(|_| unsafe {
257        MqAttr {
258            mq_attr: attr.assume_init(),
259        }
260    })
261}
262
263/// Set the attributes of the message queue. Only `O_NONBLOCK` can be set,
264/// everything else will be ignored. Returns the old attributes.
265///
266/// It is recommend to use the `mq_set_nonblock()` and `mq_remove_nonblock()`
267/// convenience functions as they are easier to use.
268///
269/// [Further reading](https://pubs.opengroup.org/onlinepubs/9699919799/functions/mq_setattr.html)
270pub fn mq_setattr(mqd: &MqdT, newattr: &MqAttr) -> Result<MqAttr> {
271    let mut attr = mem::MaybeUninit::<libc::mq_attr>::uninit();
272    let res = unsafe {
273        libc::mq_setattr(
274            mqd.0,
275            &newattr.mq_attr as *const libc::mq_attr,
276            attr.as_mut_ptr(),
277        )
278    };
279    Errno::result(res).map(|_| unsafe {
280        MqAttr {
281            mq_attr: attr.assume_init(),
282        }
283    })
284}
285
286/// Convenience function.
287/// Sets the `O_NONBLOCK` attribute for a given message queue descriptor
288/// Returns the old attributes
289#[allow(clippy::useless_conversion)] // Not useless on all OSes
290pub fn mq_set_nonblock(mqd: &MqdT) -> Result<MqAttr> {
291    let oldattr = mq_getattr(mqd)?;
292    let newattr = MqAttr::new(
293        mq_attr_member_t::from(MQ_OFlag::O_NONBLOCK.bits()),
294        oldattr.mq_attr.mq_maxmsg,
295        oldattr.mq_attr.mq_msgsize,
296        oldattr.mq_attr.mq_curmsgs,
297    );
298    mq_setattr(mqd, &newattr)
299}
300
301/// Convenience function.
302/// Removes `O_NONBLOCK` attribute for a given message queue descriptor
303/// Returns the old attributes
304pub fn mq_remove_nonblock(mqd: &MqdT) -> Result<MqAttr> {
305    let oldattr = mq_getattr(mqd)?;
306    let newattr = MqAttr::new(
307        0,
308        oldattr.mq_attr.mq_maxmsg,
309        oldattr.mq_attr.mq_msgsize,
310        oldattr.mq_attr.mq_curmsgs,
311    );
312    mq_setattr(mqd, &newattr)
313}
314
315#[cfg(any(target_os = "linux", target_os = "netbsd", target_os = "dragonfly"))]
316impl AsFd for MqdT {
317    /// Borrow the underlying message queue descriptor.
318    fn as_fd(&self) -> BorrowedFd {
319        // SAFETY: [MqdT] will only contain a valid fd by construction.
320        unsafe { BorrowedFd::borrow_raw(self.0) }
321    }
322}
323
324#[cfg(any(target_os = "linux", target_os = "netbsd", target_os = "dragonfly"))]
325impl AsRawFd for MqdT {
326    /// Return the underlying message queue descriptor.
327    ///
328    /// Returned descriptor is a "shallow copy" of the descriptor, so it refers
329    ///  to the same underlying kernel object as `self`.
330    fn as_raw_fd(&self) -> RawFd {
331        self.0
332    }
333}
334
335#[cfg(any(target_os = "linux", target_os = "netbsd", target_os = "dragonfly"))]
336impl FromRawFd for MqdT {
337    /// Construct an [MqdT] from [RawFd].
338    ///
339    /// # Safety
340    /// The `fd` given must be a valid and open file descriptor for a message
341    ///  queue.
342    unsafe fn from_raw_fd(fd: RawFd) -> MqdT {
343        MqdT(fd)
344    }
345}
346
347#[cfg(any(target_os = "linux", target_os = "netbsd", target_os = "dragonfly"))]
348impl IntoRawFd for MqdT {
349    /// Consume this [MqdT] and return a [RawFd].
350    fn into_raw_fd(self) -> RawFd {
351        self.0
352    }
353}