compio_fs/pipe.rs
1//! Unix pipe types.
2
3use std::{
4 future::Future,
5 io,
6 os::fd::{FromRawFd, IntoRawFd},
7 path::Path,
8};
9
10use compio_buf::{BufResult, IntoInner, IoBuf, IoBufMut, IoVectoredBuf, IoVectoredBufMut};
11use compio_driver::{
12 AsRawFd, ToSharedFd, impl_raw_fd,
13 op::{BufResultExt, Recv, RecvManaged, RecvVectored, ResultTakeBuffer, Send, SendVectored},
14 syscall,
15};
16use compio_io::{AsyncRead, AsyncReadManaged, AsyncWrite};
17use compio_runtime::{BorrowedBuffer, BufferPool};
18
19use crate::File;
20
21/// Creates a pair of anonymous pipe.
22///
23/// ```
24/// use compio_fs::pipe::anonymous;
25/// use compio_io::{AsyncReadExt, AsyncWriteExt};
26///
27/// # compio_runtime::Runtime::new().unwrap().block_on(async {
28/// let (mut rx, mut tx) = anonymous().unwrap();
29///
30/// tx.write_all("Hello world!").await.unwrap();
31/// let (_, buf) = rx.read_exact(Vec::with_capacity(12)).await.unwrap();
32/// assert_eq!(&buf, b"Hello world!");
33/// # });
34/// ```
35pub fn anonymous() -> io::Result<(Receiver, Sender)> {
36 let (receiver, sender) = os_pipe::pipe()?;
37 let receiver = Receiver::from_file(File::from_std(unsafe {
38 std::fs::File::from_raw_fd(receiver.into_raw_fd())
39 })?)?;
40 let sender = Sender::from_file(File::from_std(unsafe {
41 std::fs::File::from_raw_fd(sender.into_raw_fd())
42 })?)?;
43 Ok((receiver, sender))
44}
45
46/// Options and flags which can be used to configure how a FIFO file is opened.
47///
48/// This builder allows configuring how to create a pipe end from a FIFO file.
49/// Generally speaking, when using `OpenOptions`, you'll first call [`new`],
50/// then chain calls to methods to set each option, then call either
51/// [`open_receiver`] or [`open_sender`], passing the path of the FIFO file you
52/// are trying to open. This will give you a [`io::Result`] with a pipe end
53/// inside that you can further operate on.
54///
55/// [`new`]: OpenOptions::new
56/// [`open_receiver`]: OpenOptions::open_receiver
57/// [`open_sender`]: OpenOptions::open_sender
58///
59/// # Examples
60///
61/// Opening a pair of pipe ends from a FIFO file:
62///
63/// ```no_run
64/// use compio_fs::pipe;
65///
66/// const FIFO_NAME: &str = "path/to/a/fifo";
67///
68/// # async fn dox() -> std::io::Result<()> {
69/// let rx = pipe::OpenOptions::new().open_receiver(FIFO_NAME).await?;
70/// let tx = pipe::OpenOptions::new().open_sender(FIFO_NAME).await?;
71/// # Ok(())
72/// # }
73/// ```
74///
75/// Opening a [`Sender`] on Linux when you are sure the file is a FIFO:
76///
77/// ```ignore
78/// use compio_fs::pipe;
79/// use nix::{sys::stat::Mode, unistd::mkfifo};
80///
81/// // Our program has exclusive access to this path.
82/// const FIFO_NAME: &str = "path/to/a/new/fifo";
83///
84/// # async fn dox() -> std::io::Result<()> {
85/// mkfifo(FIFO_NAME, Mode::S_IRWXU)?;
86/// let tx = pipe::OpenOptions::new()
87/// .read_write(true)
88/// .unchecked(true)
89/// .open_sender(FIFO_NAME)?;
90/// # Ok(())
91/// # }
92/// ```
93#[derive(Clone, Debug)]
94pub struct OpenOptions {
95 #[cfg(target_os = "linux")]
96 read_write: bool,
97 unchecked: bool,
98}
99
100impl OpenOptions {
101 /// Creates a blank new set of options ready for configuration.
102 ///
103 /// All options are initially set to `false`.
104 pub fn new() -> OpenOptions {
105 OpenOptions {
106 #[cfg(target_os = "linux")]
107 read_write: false,
108 unchecked: false,
109 }
110 }
111
112 /// Sets the option for read-write access.
113 ///
114 /// This option, when true, will indicate that a FIFO file will be opened
115 /// in read-write access mode. This operation is not defined by the POSIX
116 /// standard and is only guaranteed to work on Linux.
117 ///
118 /// # Examples
119 ///
120 /// Opening a [`Sender`] even if there are no open reading ends:
121 ///
122 /// ```
123 /// use compio_fs::pipe;
124 ///
125 /// # compio_runtime::Runtime::new().unwrap().block_on(async {
126 /// let tx = pipe::OpenOptions::new()
127 /// .read_write(true)
128 /// .open_sender("path/to/a/fifo")
129 /// .await;
130 /// # });
131 /// ```
132 ///
133 /// Opening a resilient [`Receiver`] i.e. a reading pipe end which will not
134 /// fail with [`UnexpectedEof`] during reading if all writing ends of the
135 /// pipe close the FIFO file.
136 ///
137 /// [`UnexpectedEof`]: std::io::ErrorKind::UnexpectedEof
138 ///
139 /// ```
140 /// use compio_fs::pipe;
141 ///
142 /// # compio_runtime::Runtime::new().unwrap().block_on(async {
143 /// let tx = pipe::OpenOptions::new()
144 /// .read_write(true)
145 /// .open_receiver("path/to/a/fifo")
146 /// .await;
147 /// # });
148 /// ```
149 #[cfg(target_os = "linux")]
150 #[cfg_attr(docsrs, doc(cfg(target_os = "linux")))]
151 pub fn read_write(&mut self, value: bool) -> &mut Self {
152 self.read_write = value;
153 self
154 }
155
156 /// Sets the option to skip the check for FIFO file type.
157 ///
158 /// By default, [`open_receiver`] and [`open_sender`] functions will check
159 /// if the opened file is a FIFO file. Set this option to `true` if you are
160 /// sure the file is a FIFO file.
161 ///
162 /// [`open_receiver`]: OpenOptions::open_receiver
163 /// [`open_sender`]: OpenOptions::open_sender
164 ///
165 /// # Examples
166 ///
167 /// ```no_run
168 /// use compio_fs::pipe;
169 /// use nix::{sys::stat::Mode, unistd::mkfifo};
170 ///
171 /// // Our program has exclusive access to this path.
172 /// const FIFO_NAME: &str = "path/to/a/new/fifo";
173 ///
174 /// # async fn dox() -> std::io::Result<()> {
175 /// mkfifo(FIFO_NAME, Mode::S_IRWXU)?;
176 /// let rx = pipe::OpenOptions::new()
177 /// .unchecked(true)
178 /// .open_receiver(FIFO_NAME)
179 /// .await?;
180 /// # Ok(())
181 /// # }
182 /// ```
183 pub fn unchecked(&mut self, value: bool) -> &mut Self {
184 self.unchecked = value;
185 self
186 }
187
188 /// Creates a [`Receiver`] from a FIFO file with the options specified by
189 /// `self`.
190 ///
191 /// This function will open the FIFO file at the specified path, possibly
192 /// check if it is a pipe, and associate the pipe with the default event
193 /// loop for reading.
194 ///
195 /// # Errors
196 ///
197 /// If the file type check fails, this function will fail with
198 /// `io::ErrorKind::InvalidInput`. This function may also fail with
199 /// other standard OS errors.
200 pub async fn open_receiver<P: AsRef<Path>>(&self, path: P) -> io::Result<Receiver> {
201 let file = self.open(path.as_ref(), PipeEnd::Receiver).await?;
202 Receiver::from_file(file)
203 }
204
205 /// Creates a [`Sender`] from a FIFO file with the options specified by
206 /// `self`.
207 ///
208 /// This function will open the FIFO file at the specified path, possibly
209 /// check if it is a pipe, and associate the pipe with the default event
210 /// loop for writing.
211 ///
212 /// # Errors
213 ///
214 /// If the file type check fails, this function will fail with
215 /// `io::ErrorKind::InvalidInput`. If the file is not opened in
216 /// read-write access mode and the file is not currently open for
217 /// reading, this function will fail with `ENXIO`. This function may
218 /// also fail with other standard OS errors.
219 pub async fn open_sender<P: AsRef<Path>>(&self, path: P) -> io::Result<Sender> {
220 let file = self.open(path.as_ref(), PipeEnd::Sender).await?;
221 Sender::from_file(file)
222 }
223
224 async fn open(&self, path: &Path, pipe_end: PipeEnd) -> io::Result<File> {
225 let mut options = crate::OpenOptions::new();
226 options
227 .read(pipe_end == PipeEnd::Receiver)
228 .write(pipe_end == PipeEnd::Sender);
229
230 #[cfg(target_os = "linux")]
231 if self.read_write {
232 options.read(true).write(true);
233 }
234
235 let file = options.open(path).await?;
236
237 if !self.unchecked && !is_fifo(&file).await? {
238 return Err(io::Error::new(io::ErrorKind::InvalidInput, "not a pipe"));
239 }
240
241 Ok(file)
242 }
243}
244
245impl Default for OpenOptions {
246 fn default() -> OpenOptions {
247 OpenOptions::new()
248 }
249}
250
251#[derive(Clone, Copy, PartialEq, Eq, Debug)]
252enum PipeEnd {
253 Sender,
254 Receiver,
255}
256
257/// Writing end of a Unix pipe.
258///
259/// It can be constructed from a FIFO file with [`OpenOptions::open_sender`].
260///
261/// Opening a named pipe for writing involves a few steps.
262/// Call to [`OpenOptions::open_sender`] might fail with an error indicating
263/// different things:
264///
265/// * [`io::ErrorKind::NotFound`] - There is no file at the specified path.
266/// * [`io::ErrorKind::InvalidInput`] - The file exists, but it is not a FIFO.
267/// * [`ENXIO`] - The file is a FIFO, but no process has it open for reading.
268/// Sleep for a while and try again.
269/// * Other OS errors not specific to opening FIFO files.
270///
271/// Opening a `Sender` from a FIFO file should look like this:
272///
273/// ```no_run
274/// use std::time::Duration;
275///
276/// use compio_fs::pipe;
277/// use compio_runtime::time;
278///
279/// const FIFO_NAME: &str = "path/to/a/fifo";
280///
281/// # async fn dox() -> std::io::Result<()> {
282/// // Wait for a reader to open the file.
283/// let tx = loop {
284/// match pipe::OpenOptions::new().open_sender(FIFO_NAME).await {
285/// Ok(tx) => break tx,
286/// Err(e) if e.raw_os_error() == Some(libc::ENXIO) => {}
287/// Err(e) => return Err(e.into()),
288/// }
289///
290/// time::sleep(Duration::from_millis(50)).await;
291/// };
292/// # Ok(())
293/// # }
294/// ```
295///
296/// On Linux, it is possible to create a `Sender` without waiting in a sleeping
297/// loop. This is done by opening a named pipe in read-write access mode with
298/// `OpenOptions::read_write`. This way, a `Sender` can at the same time hold
299/// both a writing end and a reading end, and the latter allows to open a FIFO
300/// without [`ENXIO`] error since the pipe is open for reading as well.
301///
302/// `Sender` cannot be used to read from a pipe, so in practice the read access
303/// is only used when a FIFO is opened. However, using a `Sender` in read-write
304/// mode **may lead to lost data**, because written data will be dropped by the
305/// system as soon as all pipe ends are closed. To avoid lost data you have to
306/// make sure that a reading end has been opened before dropping a `Sender`.
307///
308/// Note that using read-write access mode with FIFO files is not defined by
309/// the POSIX standard and it is only guaranteed to work on Linux.
310///
311/// ```ignore
312/// use compio_fs::pipe;
313/// use compio_io::AsyncWriteExt;
314///
315/// const FIFO_NAME: &str = "path/to/a/fifo";
316///
317/// # async fn dox() {
318/// let mut tx = pipe::OpenOptions::new()
319/// .read_write(true)
320/// .open_sender(FIFO_NAME)
321/// .unwrap();
322///
323/// // Asynchronously write to the pipe before a reader.
324/// tx.write_all("hello world").await.unwrap();
325/// # }
326/// ```
327///
328/// [`ENXIO`]: https://docs.rs/libc/latest/libc/constant.ENXIO.html
329#[derive(Debug, Clone)]
330pub struct Sender {
331 file: File,
332}
333
334impl Sender {
335 pub(crate) fn from_file(file: File) -> io::Result<Sender> {
336 set_nonblocking(&file)?;
337 Ok(Sender { file })
338 }
339
340 /// Close the pipe. If the returned future is dropped before polling, the
341 /// pipe won't be closed.
342 pub fn close(self) -> impl Future<Output = io::Result<()>> {
343 self.file.close()
344 }
345}
346
347impl AsyncWrite for Sender {
348 #[inline]
349 async fn write<T: IoBuf>(&mut self, buf: T) -> BufResult<usize, T> {
350 (&*self).write(buf).await
351 }
352
353 #[inline]
354 async fn write_vectored<T: IoVectoredBuf>(&mut self, buf: T) -> BufResult<usize, T> {
355 (&*self).write_vectored(buf).await
356 }
357
358 #[inline]
359 async fn flush(&mut self) -> io::Result<()> {
360 (&*self).flush().await
361 }
362
363 #[inline]
364 async fn shutdown(&mut self) -> io::Result<()> {
365 (&*self).shutdown().await
366 }
367}
368
369impl AsyncWrite for &Sender {
370 async fn write<T: IoBuf>(&mut self, buffer: T) -> BufResult<usize, T> {
371 let fd = self.to_shared_fd();
372 let op = Send::new(fd, buffer);
373 compio_runtime::submit(op).await.into_inner()
374 }
375
376 async fn write_vectored<T: IoVectoredBuf>(&mut self, buffer: T) -> BufResult<usize, T> {
377 let fd = self.to_shared_fd();
378 let op = SendVectored::new(fd, buffer);
379 compio_runtime::submit(op).await.into_inner()
380 }
381
382 #[inline]
383 async fn flush(&mut self) -> io::Result<()> {
384 Ok(())
385 }
386
387 #[inline]
388 async fn shutdown(&mut self) -> io::Result<()> {
389 Ok(())
390 }
391}
392
393impl_raw_fd!(Sender, std::fs::File, file, file);
394
395/// Reading end of a Unix pipe.
396///
397/// It can be constructed from a FIFO file with [`OpenOptions::open_receiver`].
398///
399/// # Examples
400///
401/// Receiving messages from a named pipe in a loop:
402///
403/// ```no_run
404/// use std::io;
405///
406/// use compio_buf::BufResult;
407/// use compio_fs::pipe;
408/// use compio_io::AsyncReadExt;
409///
410/// const FIFO_NAME: &str = "path/to/a/fifo";
411///
412/// # async fn dox() -> io::Result<()> {
413/// let mut rx = pipe::OpenOptions::new().open_receiver(FIFO_NAME).await?;
414/// loop {
415/// let mut msg = Vec::with_capacity(256);
416/// let BufResult(res, msg) = rx.read_exact(msg).await;
417/// match res {
418/// Ok(_) => { /* handle the message */ }
419/// Err(e) if e.kind() == io::ErrorKind::UnexpectedEof => {
420/// // Writing end has been closed, we should reopen the pipe.
421/// rx = pipe::OpenOptions::new().open_receiver(FIFO_NAME).await?;
422/// }
423/// Err(e) => return Err(e.into()),
424/// }
425/// }
426/// # }
427/// ```
428///
429/// On Linux, you can use a `Receiver` in read-write access mode to implement
430/// resilient reading from a named pipe. Unlike `Receiver` opened in read-only
431/// mode, read from a pipe in read-write mode will not fail with `UnexpectedEof`
432/// when the writing end is closed. This way, a `Receiver` can asynchronously
433/// wait for the next writer to open the pipe.
434///
435/// You should not use functions waiting for EOF such as [`read_to_end`] with
436/// a `Receiver` in read-write access mode, since it **may wait forever**.
437/// `Receiver` in this mode also holds an open writing end, which prevents
438/// receiving EOF.
439///
440/// To set the read-write access mode you can use `OpenOptions::read_write`.
441/// Note that using read-write access mode with FIFO files is not defined by
442/// the POSIX standard and it is only guaranteed to work on Linux.
443///
444/// ```ignore
445/// use compio_fs::pipe;
446/// use compio_io::AsyncReadExt;
447///
448/// const FIFO_NAME: &str = "path/to/a/fifo";
449///
450/// # async fn dox() {
451/// let mut rx = pipe::OpenOptions::new()
452/// .read_write(true)
453/// .open_receiver(FIFO_NAME)
454/// .unwrap();
455/// loop {
456/// let mut msg = Vec::with_capacity(256);
457/// rx.read_exact(msg).await.unwrap();
458/// // handle the message
459/// }
460/// # }
461/// ```
462///
463/// [`read_to_end`]: compio_io::AsyncReadExt::read_to_end
464#[derive(Debug, Clone)]
465pub struct Receiver {
466 file: File,
467}
468
469impl Receiver {
470 pub(crate) fn from_file(file: File) -> io::Result<Receiver> {
471 set_nonblocking(&file)?;
472 Ok(Receiver { file })
473 }
474
475 /// Close the pipe. If the returned future is dropped before polling, the
476 /// pipe won't be closed.
477 pub fn close(self) -> impl Future<Output = io::Result<()>> {
478 self.file.close()
479 }
480}
481
482impl AsyncRead for Receiver {
483 async fn read<B: IoBufMut>(&mut self, buf: B) -> BufResult<usize, B> {
484 (&*self).read(buf).await
485 }
486
487 async fn read_vectored<V: IoVectoredBufMut>(&mut self, buf: V) -> BufResult<usize, V> {
488 (&*self).read_vectored(buf).await
489 }
490}
491
492impl AsyncRead for &Receiver {
493 async fn read<B: IoBufMut>(&mut self, buffer: B) -> BufResult<usize, B> {
494 let fd = self.to_shared_fd();
495 let op = Recv::new(fd, buffer);
496 compio_runtime::submit(op).await.into_inner().map_advanced()
497 }
498
499 async fn read_vectored<V: IoVectoredBufMut>(&mut self, buffer: V) -> BufResult<usize, V> {
500 let fd = self.to_shared_fd();
501 let op = RecvVectored::new(fd, buffer);
502 compio_runtime::submit(op).await.into_inner().map_advanced()
503 }
504}
505
506impl AsyncReadManaged for Receiver {
507 type Buffer<'a> = BorrowedBuffer<'a>;
508 type BufferPool = BufferPool;
509
510 async fn read_managed<'a>(
511 &mut self,
512 buffer_pool: &'a Self::BufferPool,
513 len: usize,
514 ) -> io::Result<Self::Buffer<'a>> {
515 (&*self).read_managed(buffer_pool, len).await
516 }
517}
518
519impl AsyncReadManaged for &Receiver {
520 type Buffer<'a> = BorrowedBuffer<'a>;
521 type BufferPool = BufferPool;
522
523 async fn read_managed<'a>(
524 &mut self,
525 buffer_pool: &'a Self::BufferPool,
526 len: usize,
527 ) -> io::Result<Self::Buffer<'a>> {
528 let fd = self.to_shared_fd();
529 let buffer_pool = buffer_pool.try_inner()?;
530 let op = RecvManaged::new(fd, buffer_pool, len)?;
531 compio_runtime::submit_with_flags(op)
532 .await
533 .take_buffer(buffer_pool)
534 }
535}
536
537impl_raw_fd!(Receiver, std::fs::File, file, file);
538
539/// Checks if file is a FIFO
540async fn is_fifo(file: &File) -> io::Result<bool> {
541 use std::os::unix::prelude::FileTypeExt;
542
543 Ok(file.metadata().await?.file_type().is_fifo())
544}
545
546/// Sets file's flags with O_NONBLOCK by fcntl.
547fn set_nonblocking(file: &impl AsRawFd) -> io::Result<()> {
548 if compio_runtime::Runtime::with_current(|r| r.driver_type()).is_polling() {
549 let fd = file.as_raw_fd();
550 let current_flags = syscall!(libc::fcntl(fd, libc::F_GETFL))?;
551 let flags = current_flags | libc::O_NONBLOCK;
552 if flags != current_flags {
553 syscall!(libc::fcntl(fd, libc::F_SETFL, flags))?;
554 }
555 }
556 Ok(())
557}