tokio_splice2/
pipe.rs

1//! Linux Pipe - [`pipe(7)`].
2//!
3//! The default pipe capacity depends on the system default, but it is
4//! usually `65536` for those whose page size is `4096`. See [`pipe(7)`]
5//! for more details.
6//!
7//! Customize the pipe size is supported, see [`fcntl(2)`] for details and
8//! notices.
9//!
10//! [`pipe(7)`]: https://man7.org/linux/man-pages/man7/pipe.7.html
11//! [`fcntl(2)`]: https://man7.org/linux/man-pages/man2/fcntl.2.html
12
13// FIXME: Pipe pool?
14
15// #[cfg(feature = "feat-pipe-pool")]
16// pub(crate) mod pool;
17
18use std::num::NonZeroUsize;
19use std::{io, mem};
20
21use rustix::fd::OwnedFd;
22use rustix::pipe::{fcntl_getpipe_size, fcntl_setpipe_size, pipe_with, PipeFlags};
23
24#[allow(unsafe_code)]
25/// `MAXIMUM_PIPE_SIZE` is the maximum amount of data we asks
26/// the kernel to move in a single call to `splice(2)`.
27///
28/// We use 1MB as `splice(2)` writes data through a pipe, and 1MB is the default
29/// maximum pipe buffer size, which is determined by
30/// `/proc/sys/fs/pipe-max-size`.
31///
32/// Running applications under unprivileged user may have the pages usage
33/// limited. See [`pipe(7)`] for details.
34///
35/// [`pipe(7)`]: https://man7.org/linux/man-pages/man7/pipe.7.html
36pub const MAXIMUM_PIPE_SIZE: NonZeroUsize = unsafe { NonZeroUsize::new_unchecked(1 << 20) };
37
38#[allow(unsafe_code)]
39/// `DEFAULT_PIPE_SIZE` is the default pipe size when pipe size is not known.
40pub const DEFAULT_PIPE_SIZE: NonZeroUsize = unsafe { NonZeroUsize::new_unchecked(1 << 16) };
41
42#[derive(Debug)]
43/// Linux Pipe.
44pub struct Pipe {
45    /// File descriptor for reading from the pipe
46    read_side_fd: Fd,
47
48    /// File descriptor for writing to the pipe
49    write_side_fd: Fd,
50
51    /// Pipe size in bytes.
52    size: NonZeroUsize,
53}
54
55#[derive(Debug)]
56enum Fd {
57    /// The file descriptor can be used for reading or writing.
58    Running(OwnedFd),
59
60    #[allow(dead_code)]
61    /// The file descriptor is reserved for future use (to be recycled).
62    Reserved(OwnedFd),
63
64    /// Make compiler happy.
65    Closed,
66}
67
68impl Fd {
69    #[inline]
70    /// Convert the file descriptor to a pending state.
71    ///
72    /// This is used to indicate that the file descriptor is reserved for future
73    /// use.
74    fn set_reserved(&mut self) {
75        if let Fd::Running(owned_fd) = mem::replace(self, Fd::Closed) {
76            *self = Fd::Reserved(owned_fd);
77        }
78    }
79
80    #[inline]
81    const fn as_fd(&self) -> Option<&OwnedFd> {
82        match self {
83            Fd::Running(fd) => Some(fd),
84            _ => None,
85        }
86    }
87
88    // #[inline]
89    // #[allow(unsafe_code)]
90    // /// Safety: the caller must ensure that the operation doesn't care about the
91    // /// file descriptor's state.
92    // unsafe fn force_as_fd(&self) -> &OwnedFd {
93    //     match self {
94    //         Fd::Running(fd) => fd,
95    //         Fd::Reserved(fd) => fd,
96    //         Fd::Closed => {
97    //             panic!("Attempted to access a closed file descriptor");
98    //         }
99    //     }
100    // }
101
102    // #[inline]
103    // #[allow(unsafe_code)]
104    // // Safety: the caller must ensure that the file descriptor is not in use.
105    // unsafe fn force_as_mut_fd(&mut self) -> &mut OwnedFd {
106    //     match self {
107    //         Fd::Running(fd) => fd,
108    //         Fd::Reserved(fd) => fd,
109    //         Fd::Closed => {
110    //             panic!("Attempted to access a closed file descriptor");
111    //         }
112    //     }
113    // }
114}
115
116impl Pipe {
117    /// Create a pipe, with flags `O_NONBLOCK` and `O_CLOEXEC`.
118    ///
119    /// The default pipe size is set to `MAXIMUM_PIPE_SIZE` bytes.
120    ///
121    /// ## Errors
122    ///
123    /// * If the pipe creation of setting pipe size fails, an `io::Error` is
124    ///   returned.
125    pub fn new() -> io::Result<Self> {
126        pipe_with(PipeFlags::NONBLOCK | PipeFlags::CLOEXEC)
127            .map_err(|e| io::Error::from_raw_os_error(e.raw_os_error()))
128            .and_then(|(read_fd, write_fd)| {
129                // Splice will loop writing MAXIMUM_PIPE_SIZE bytes from the source to the pipe,
130                // and then write those bytes from the pipe to the destination.
131                // Set the pipe buffer size to MAXIMUM_PIPE_SIZE to optimize that.
132                // Ignore errors here, as a smaller buffer size will work,
133                // although it will require more system calls.
134                let size = match fcntl_setpipe_size(&read_fd, MAXIMUM_PIPE_SIZE.get()) {
135                    Ok(size) => NonZeroUsize::new(size),
136                    Err(_) => NonZeroUsize::new(fcntl_getpipe_size(&read_fd)?),
137                }
138                .ok_or_else(|| {
139                    io::Error::new(
140                        io::ErrorKind::Other,
141                        "failed to set pipe size, using default size",
142                    )
143                })?;
144
145                Ok(Self {
146                    read_side_fd: Fd::Running(read_fd),
147                    write_side_fd: Fd::Running(write_fd),
148                    size,
149                })
150            })
151    }
152
153    /// Set the pipe size.
154    ///
155    /// ## Errors
156    ///
157    /// See [`fcntl(2)`].
158    ///
159    /// [`fcntl(2)`]: https://man7.org/linux/man-pages/man2/fcntl.2.html.
160    pub fn set_pipe_size(&mut self, pipe_size: usize) -> io::Result<usize> {
161        let Some(write_side_fd) = self.write_side_fd.as_fd() else {
162            return Err(io::Error::new(
163                io::ErrorKind::Other,
164                "write side file descriptor is not available",
165            ));
166        };
167
168        fcntl_setpipe_size(write_side_fd, pipe_size)
169            .map_err(|e| io::Error::from_raw_os_error(e.raw_os_error()))
170    }
171
172    #[inline]
173    pub(crate) const fn write_side_fd(&self) -> Option<&OwnedFd> {
174        self.write_side_fd.as_fd()
175    }
176
177    #[must_use]
178    #[inline]
179    pub(crate) const fn splice_drain_finished(&self) -> bool {
180        matches!(self.write_side_fd, Fd::Reserved(_) | Fd::Closed)
181    }
182
183    #[inline]
184    /// Close the pipe write side file descriptor.
185    pub(crate) fn set_splice_drain_finished(&mut self) {
186        self.write_side_fd.set_reserved();
187    }
188
189    #[inline]
190    pub(crate) const fn read_side_fd(&self) -> Option<&OwnedFd> {
191        self.read_side_fd.as_fd()
192    }
193
194    #[must_use]
195    #[inline]
196    pub(crate) const fn splice_pump_finished(&self) -> bool {
197        matches!(self.read_side_fd, Fd::Reserved(_) | Fd::Closed)
198    }
199
200    #[inline]
201    /// Close the pipe read side file descriptor.
202    pub(crate) fn set_splice_pump_finished(&mut self) {
203        self.read_side_fd.set_reserved();
204    }
205
206    #[must_use]
207    #[inline]
208    /// Returns the size of the pipe, in bytes.
209    pub const fn size(&self) -> NonZeroUsize {
210        self.size
211    }
212}
213
214// #[cfg(feature = "feat-pipe-pool")]
215// impl Drop for Pipe {
216//     fn drop(&mut self) {
217//         #[allow(unsafe_code)]
218//         // Safety: the pipe is not in use, so it is safe to return it to the
219// pool.         unsafe {
220//             pool::PipePool::return_one(self)
221//         };
222//     }
223// }