compio_fs/
pipe.rs

1//! Unix pipe types.
2
3use std::{
4    future::Future,
5    io,
6    os::fd::{FromRawFd, IntoRawFd},
7    path::Path,
8};
9
10use compio_buf::{BufResult, IntoInner, IoBuf, IoBufMut, IoVectoredBuf, IoVectoredBufMut};
11use compio_driver::{
12    AsRawFd, ToSharedFd, impl_raw_fd,
13    op::{BufResultExt, Recv, RecvManaged, RecvVectored, ResultTakeBuffer, Send, SendVectored},
14    syscall,
15};
16use compio_io::{AsyncRead, AsyncReadManaged, AsyncWrite};
17use compio_runtime::{BorrowedBuffer, BufferPool};
18
19use crate::File;
20
21/// Creates a pair of anonymous pipe.
22///
23/// ```
24/// use compio_fs::pipe::anonymous;
25/// use compio_io::{AsyncReadExt, AsyncWriteExt};
26///
27/// # compio_runtime::Runtime::new().unwrap().block_on(async {
28/// let (mut rx, mut tx) = anonymous().unwrap();
29///
30/// tx.write_all("Hello world!").await.unwrap();
31/// let (_, buf) = rx.read_exact(Vec::with_capacity(12)).await.unwrap();
32/// assert_eq!(&buf, b"Hello world!");
33/// # });
34/// ```
35pub fn anonymous() -> io::Result<(Receiver, Sender)> {
36    let (receiver, sender) = os_pipe::pipe()?;
37    let receiver = Receiver::from_file(File::from_std(unsafe {
38        std::fs::File::from_raw_fd(receiver.into_raw_fd())
39    })?)?;
40    let sender = Sender::from_file(File::from_std(unsafe {
41        std::fs::File::from_raw_fd(sender.into_raw_fd())
42    })?)?;
43    Ok((receiver, sender))
44}
45
46/// Options and flags which can be used to configure how a FIFO file is opened.
47///
48/// This builder allows configuring how to create a pipe end from a FIFO file.
49/// Generally speaking, when using `OpenOptions`, you'll first call [`new`],
50/// then chain calls to methods to set each option, then call either
51/// [`open_receiver`] or [`open_sender`], passing the path of the FIFO file you
52/// are trying to open. This will give you a [`io::Result`] with a pipe end
53/// inside that you can further operate on.
54///
55/// [`new`]: OpenOptions::new
56/// [`open_receiver`]: OpenOptions::open_receiver
57/// [`open_sender`]: OpenOptions::open_sender
58///
59/// # Examples
60///
61/// Opening a pair of pipe ends from a FIFO file:
62///
63/// ```no_run
64/// use compio_fs::pipe;
65///
66/// const FIFO_NAME: &str = "path/to/a/fifo";
67///
68/// # async fn dox() -> std::io::Result<()> {
69/// let rx = pipe::OpenOptions::new().open_receiver(FIFO_NAME).await?;
70/// let tx = pipe::OpenOptions::new().open_sender(FIFO_NAME).await?;
71/// # Ok(())
72/// # }
73/// ```
74///
75/// Opening a [`Sender`] on Linux when you are sure the file is a FIFO:
76///
77/// ```ignore
78/// use compio_fs::pipe;
79/// use nix::{sys::stat::Mode, unistd::mkfifo};
80///
81/// // Our program has exclusive access to this path.
82/// const FIFO_NAME: &str = "path/to/a/new/fifo";
83///
84/// # async fn dox() -> std::io::Result<()> {
85/// mkfifo(FIFO_NAME, Mode::S_IRWXU)?;
86/// let tx = pipe::OpenOptions::new()
87///     .read_write(true)
88///     .unchecked(true)
89///     .open_sender(FIFO_NAME)?;
90/// # Ok(())
91/// # }
92/// ```
93#[derive(Clone, Debug)]
94pub struct OpenOptions {
95    #[cfg(target_os = "linux")]
96    read_write: bool,
97    unchecked: bool,
98}
99
100impl OpenOptions {
101    /// Creates a blank new set of options ready for configuration.
102    ///
103    /// All options are initially set to `false`.
104    pub fn new() -> OpenOptions {
105        OpenOptions {
106            #[cfg(target_os = "linux")]
107            read_write: false,
108            unchecked: false,
109        }
110    }
111
112    /// Sets the option for read-write access.
113    ///
114    /// This option, when true, will indicate that a FIFO file will be opened
115    /// in read-write access mode. This operation is not defined by the POSIX
116    /// standard and is only guaranteed to work on Linux.
117    ///
118    /// # Examples
119    ///
120    /// Opening a [`Sender`] even if there are no open reading ends:
121    ///
122    /// ```
123    /// use compio_fs::pipe;
124    ///
125    /// # compio_runtime::Runtime::new().unwrap().block_on(async {
126    /// let tx = pipe::OpenOptions::new()
127    ///     .read_write(true)
128    ///     .open_sender("path/to/a/fifo")
129    ///     .await;
130    /// # });
131    /// ```
132    ///
133    /// Opening a resilient [`Receiver`] i.e. a reading pipe end which will not
134    /// fail with [`UnexpectedEof`] during reading if all writing ends of the
135    /// pipe close the FIFO file.
136    ///
137    /// [`UnexpectedEof`]: std::io::ErrorKind::UnexpectedEof
138    ///
139    /// ```
140    /// use compio_fs::pipe;
141    ///
142    /// # compio_runtime::Runtime::new().unwrap().block_on(async {
143    /// let tx = pipe::OpenOptions::new()
144    ///     .read_write(true)
145    ///     .open_receiver("path/to/a/fifo")
146    ///     .await;
147    /// # });
148    /// ```
149    #[cfg(target_os = "linux")]
150    #[cfg_attr(docsrs, doc(cfg(target_os = "linux")))]
151    pub fn read_write(&mut self, value: bool) -> &mut Self {
152        self.read_write = value;
153        self
154    }
155
156    /// Sets the option to skip the check for FIFO file type.
157    ///
158    /// By default, [`open_receiver`] and [`open_sender`] functions will check
159    /// if the opened file is a FIFO file. Set this option to `true` if you are
160    /// sure the file is a FIFO file.
161    ///
162    /// [`open_receiver`]: OpenOptions::open_receiver
163    /// [`open_sender`]: OpenOptions::open_sender
164    ///
165    /// # Examples
166    ///
167    /// ```no_run
168    /// use compio_fs::pipe;
169    /// use nix::{sys::stat::Mode, unistd::mkfifo};
170    ///
171    /// // Our program has exclusive access to this path.
172    /// const FIFO_NAME: &str = "path/to/a/new/fifo";
173    ///
174    /// # async fn dox() -> std::io::Result<()> {
175    /// mkfifo(FIFO_NAME, Mode::S_IRWXU)?;
176    /// let rx = pipe::OpenOptions::new()
177    ///     .unchecked(true)
178    ///     .open_receiver(FIFO_NAME)
179    ///     .await?;
180    /// # Ok(())
181    /// # }
182    /// ```
183    pub fn unchecked(&mut self, value: bool) -> &mut Self {
184        self.unchecked = value;
185        self
186    }
187
188    /// Creates a [`Receiver`] from a FIFO file with the options specified by
189    /// `self`.
190    ///
191    /// This function will open the FIFO file at the specified path, possibly
192    /// check if it is a pipe, and associate the pipe with the default event
193    /// loop for reading.
194    ///
195    /// # Errors
196    ///
197    /// If the file type check fails, this function will fail with
198    /// `io::ErrorKind::InvalidInput`. This function may also fail with
199    /// other standard OS errors.
200    pub async fn open_receiver<P: AsRef<Path>>(&self, path: P) -> io::Result<Receiver> {
201        let file = self.open(path.as_ref(), PipeEnd::Receiver).await?;
202        Receiver::from_file(file)
203    }
204
205    /// Creates a [`Sender`] from a FIFO file with the options specified by
206    /// `self`.
207    ///
208    /// This function will open the FIFO file at the specified path, possibly
209    /// check if it is a pipe, and associate the pipe with the default event
210    /// loop for writing.
211    ///
212    /// # Errors
213    ///
214    /// If the file type check fails, this function will fail with
215    /// `io::ErrorKind::InvalidInput`. If the file is not opened in
216    /// read-write access mode and the file is not currently open for
217    /// reading, this function will fail with `ENXIO`. This function may
218    /// also fail with other standard OS errors.
219    pub async fn open_sender<P: AsRef<Path>>(&self, path: P) -> io::Result<Sender> {
220        let file = self.open(path.as_ref(), PipeEnd::Sender).await?;
221        Sender::from_file(file)
222    }
223
224    async fn open(&self, path: &Path, pipe_end: PipeEnd) -> io::Result<File> {
225        let mut options = crate::OpenOptions::new();
226        options
227            .read(pipe_end == PipeEnd::Receiver)
228            .write(pipe_end == PipeEnd::Sender);
229
230        #[cfg(target_os = "linux")]
231        if self.read_write {
232            options.read(true).write(true);
233        }
234
235        let file = options.open(path).await?;
236
237        if !self.unchecked && !is_fifo(&file).await? {
238            return Err(io::Error::new(io::ErrorKind::InvalidInput, "not a pipe"));
239        }
240
241        Ok(file)
242    }
243}
244
245impl Default for OpenOptions {
246    fn default() -> OpenOptions {
247        OpenOptions::new()
248    }
249}
250
251#[derive(Clone, Copy, PartialEq, Eq, Debug)]
252enum PipeEnd {
253    Sender,
254    Receiver,
255}
256
257/// Writing end of a Unix pipe.
258///
259/// It can be constructed from a FIFO file with [`OpenOptions::open_sender`].
260///
261/// Opening a named pipe for writing involves a few steps.
262/// Call to [`OpenOptions::open_sender`] might fail with an error indicating
263/// different things:
264///
265/// * [`io::ErrorKind::NotFound`] - There is no file at the specified path.
266/// * [`io::ErrorKind::InvalidInput`] - The file exists, but it is not a FIFO.
267/// * [`ENXIO`] - The file is a FIFO, but no process has it open for reading.
268///   Sleep for a while and try again.
269/// * Other OS errors not specific to opening FIFO files.
270///
271/// Opening a `Sender` from a FIFO file should look like this:
272///
273/// ```no_run
274/// use std::time::Duration;
275///
276/// use compio_fs::pipe;
277/// use compio_runtime::time;
278///
279/// const FIFO_NAME: &str = "path/to/a/fifo";
280///
281/// # async fn dox() -> std::io::Result<()> {
282/// // Wait for a reader to open the file.
283/// let tx = loop {
284///     match pipe::OpenOptions::new().open_sender(FIFO_NAME).await {
285///         Ok(tx) => break tx,
286///         Err(e) if e.raw_os_error() == Some(libc::ENXIO) => {}
287///         Err(e) => return Err(e.into()),
288///     }
289///
290///     time::sleep(Duration::from_millis(50)).await;
291/// };
292/// # Ok(())
293/// # }
294/// ```
295///
296/// On Linux, it is possible to create a `Sender` without waiting in a sleeping
297/// loop. This is done by opening a named pipe in read-write access mode with
298/// `OpenOptions::read_write`. This way, a `Sender` can at the same time hold
299/// both a writing end and a reading end, and the latter allows to open a FIFO
300/// without [`ENXIO`] error since the pipe is open for reading as well.
301///
302/// `Sender` cannot be used to read from a pipe, so in practice the read access
303/// is only used when a FIFO is opened. However, using a `Sender` in read-write
304/// mode **may lead to lost data**, because written data will be dropped by the
305/// system as soon as all pipe ends are closed. To avoid lost data you have to
306/// make sure that a reading end has been opened before dropping a `Sender`.
307///
308/// Note that using read-write access mode with FIFO files is not defined by
309/// the POSIX standard and it is only guaranteed to work on Linux.
310///
311/// ```ignore
312/// use compio_fs::pipe;
313/// use compio_io::AsyncWriteExt;
314///
315/// const FIFO_NAME: &str = "path/to/a/fifo";
316///
317/// # async fn dox() {
318/// let mut tx = pipe::OpenOptions::new()
319///     .read_write(true)
320///     .open_sender(FIFO_NAME)
321///     .unwrap();
322///
323/// // Asynchronously write to the pipe before a reader.
324/// tx.write_all("hello world").await.unwrap();
325/// # }
326/// ```
327///
328/// [`ENXIO`]: https://docs.rs/libc/latest/libc/constant.ENXIO.html
329#[derive(Debug, Clone)]
330pub struct Sender {
331    file: File,
332}
333
334impl Sender {
335    pub(crate) fn from_file(file: File) -> io::Result<Sender> {
336        set_nonblocking(&file)?;
337        Ok(Sender { file })
338    }
339
340    /// Close the pipe. If the returned future is dropped before polling, the
341    /// pipe won't be closed.
342    pub fn close(self) -> impl Future<Output = io::Result<()>> {
343        self.file.close()
344    }
345}
346
347impl AsyncWrite for Sender {
348    #[inline]
349    async fn write<T: IoBuf>(&mut self, buf: T) -> BufResult<usize, T> {
350        (&*self).write(buf).await
351    }
352
353    #[inline]
354    async fn write_vectored<T: IoVectoredBuf>(&mut self, buf: T) -> BufResult<usize, T> {
355        (&*self).write_vectored(buf).await
356    }
357
358    #[inline]
359    async fn flush(&mut self) -> io::Result<()> {
360        (&*self).flush().await
361    }
362
363    #[inline]
364    async fn shutdown(&mut self) -> io::Result<()> {
365        (&*self).shutdown().await
366    }
367}
368
369impl AsyncWrite for &Sender {
370    async fn write<T: IoBuf>(&mut self, buffer: T) -> BufResult<usize, T> {
371        let fd = self.to_shared_fd();
372        let op = Send::new(fd, buffer);
373        compio_runtime::submit(op).await.into_inner()
374    }
375
376    async fn write_vectored<T: IoVectoredBuf>(&mut self, buffer: T) -> BufResult<usize, T> {
377        let fd = self.to_shared_fd();
378        let op = SendVectored::new(fd, buffer);
379        compio_runtime::submit(op).await.into_inner()
380    }
381
382    #[inline]
383    async fn flush(&mut self) -> io::Result<()> {
384        Ok(())
385    }
386
387    #[inline]
388    async fn shutdown(&mut self) -> io::Result<()> {
389        Ok(())
390    }
391}
392
393impl_raw_fd!(Sender, std::fs::File, file, file);
394
395/// Reading end of a Unix pipe.
396///
397/// It can be constructed from a FIFO file with [`OpenOptions::open_receiver`].
398///
399/// # Examples
400///
401/// Receiving messages from a named pipe in a loop:
402///
403/// ```no_run
404/// use std::io;
405///
406/// use compio_buf::BufResult;
407/// use compio_fs::pipe;
408/// use compio_io::AsyncReadExt;
409///
410/// const FIFO_NAME: &str = "path/to/a/fifo";
411///
412/// # async fn dox() -> io::Result<()> {
413/// let mut rx = pipe::OpenOptions::new().open_receiver(FIFO_NAME).await?;
414/// loop {
415///     let mut msg = Vec::with_capacity(256);
416///     let BufResult(res, msg) = rx.read_exact(msg).await;
417///     match res {
418///         Ok(_) => { /* handle the message */ }
419///         Err(e) if e.kind() == io::ErrorKind::UnexpectedEof => {
420///             // Writing end has been closed, we should reopen the pipe.
421///             rx = pipe::OpenOptions::new().open_receiver(FIFO_NAME).await?;
422///         }
423///         Err(e) => return Err(e.into()),
424///     }
425/// }
426/// # }
427/// ```
428///
429/// On Linux, you can use a `Receiver` in read-write access mode to implement
430/// resilient reading from a named pipe. Unlike `Receiver` opened in read-only
431/// mode, read from a pipe in read-write mode will not fail with `UnexpectedEof`
432/// when the writing end is closed. This way, a `Receiver` can asynchronously
433/// wait for the next writer to open the pipe.
434///
435/// You should not use functions waiting for EOF such as [`read_to_end`] with
436/// a `Receiver` in read-write access mode, since it **may wait forever**.
437/// `Receiver` in this mode also holds an open writing end, which prevents
438/// receiving EOF.
439///
440/// To set the read-write access mode you can use `OpenOptions::read_write`.
441/// Note that using read-write access mode with FIFO files is not defined by
442/// the POSIX standard and it is only guaranteed to work on Linux.
443///
444/// ```ignore
445/// use compio_fs::pipe;
446/// use compio_io::AsyncReadExt;
447///
448/// const FIFO_NAME: &str = "path/to/a/fifo";
449///
450/// # async fn dox() {
451/// let mut rx = pipe::OpenOptions::new()
452///     .read_write(true)
453///     .open_receiver(FIFO_NAME)
454///     .unwrap();
455/// loop {
456///     let mut msg = Vec::with_capacity(256);
457///     rx.read_exact(msg).await.unwrap();
458///     // handle the message
459/// }
460/// # }
461/// ```
462///
463/// [`read_to_end`]: compio_io::AsyncReadExt::read_to_end
464#[derive(Debug, Clone)]
465pub struct Receiver {
466    file: File,
467}
468
469impl Receiver {
470    pub(crate) fn from_file(file: File) -> io::Result<Receiver> {
471        set_nonblocking(&file)?;
472        Ok(Receiver { file })
473    }
474
475    /// Close the pipe. If the returned future is dropped before polling, the
476    /// pipe won't be closed.
477    pub fn close(self) -> impl Future<Output = io::Result<()>> {
478        self.file.close()
479    }
480}
481
482impl AsyncRead for Receiver {
483    async fn read<B: IoBufMut>(&mut self, buf: B) -> BufResult<usize, B> {
484        (&*self).read(buf).await
485    }
486
487    async fn read_vectored<V: IoVectoredBufMut>(&mut self, buf: V) -> BufResult<usize, V> {
488        (&*self).read_vectored(buf).await
489    }
490}
491
492impl AsyncRead for &Receiver {
493    async fn read<B: IoBufMut>(&mut self, buffer: B) -> BufResult<usize, B> {
494        let fd = self.to_shared_fd();
495        let op = Recv::new(fd, buffer);
496        compio_runtime::submit(op).await.into_inner().map_advanced()
497    }
498
499    async fn read_vectored<V: IoVectoredBufMut>(&mut self, buffer: V) -> BufResult<usize, V> {
500        let fd = self.to_shared_fd();
501        let op = RecvVectored::new(fd, buffer);
502        compio_runtime::submit(op).await.into_inner().map_advanced()
503    }
504}
505
506impl AsyncReadManaged for Receiver {
507    type Buffer<'a> = BorrowedBuffer<'a>;
508    type BufferPool = BufferPool;
509
510    async fn read_managed<'a>(
511        &mut self,
512        buffer_pool: &'a Self::BufferPool,
513        len: usize,
514    ) -> io::Result<Self::Buffer<'a>> {
515        (&*self).read_managed(buffer_pool, len).await
516    }
517}
518
519impl AsyncReadManaged for &Receiver {
520    type Buffer<'a> = BorrowedBuffer<'a>;
521    type BufferPool = BufferPool;
522
523    async fn read_managed<'a>(
524        &mut self,
525        buffer_pool: &'a Self::BufferPool,
526        len: usize,
527    ) -> io::Result<Self::Buffer<'a>> {
528        let fd = self.to_shared_fd();
529        let buffer_pool = buffer_pool.try_inner()?;
530        let op = RecvManaged::new(fd, buffer_pool, len)?;
531        compio_runtime::submit_with_flags(op)
532            .await
533            .take_buffer(buffer_pool)
534    }
535}
536
537impl_raw_fd!(Receiver, std::fs::File, file, file);
538
539/// Checks if file is a FIFO
540async fn is_fifo(file: &File) -> io::Result<bool> {
541    use std::os::unix::prelude::FileTypeExt;
542
543    Ok(file.metadata().await?.file_type().is_fifo())
544}
545
546/// Sets file's flags with O_NONBLOCK by fcntl.
547fn set_nonblocking(file: &impl AsRawFd) -> io::Result<()> {
548    if compio_runtime::Runtime::with_current(|r| r.driver_type()).is_polling() {
549        let fd = file.as_raw_fd();
550        let current_flags = syscall!(libc::fcntl(fd, libc::F_GETFL))?;
551        let flags = current_flags | libc::O_NONBLOCK;
552        if flags != current_flags {
553            syscall!(libc::fcntl(fd, libc::F_SETFL, flags))?;
554        }
555    }
556    Ok(())
557}