tokio_splice2/pipe.rs
1//! Linux Pipe - [`pipe(7)`].
2//!
3//! The default pipe capacity depends on the system default, but it is
4//! usually `65536` for those whose page size is `4096`. See [`pipe(7)`]
5//! for more details.
6//!
7//! Customize the pipe size is supported, see [`fcntl(2)`] for details and
8//! notices.
9//!
10//! [`pipe(7)`]: https://man7.org/linux/man-pages/man7/pipe.7.html
11//! [`fcntl(2)`]: https://man7.org/linux/man-pages/man2/fcntl.2.html
12
13// FIXME: Pipe pool?
14
15// #[cfg(feature = "feat-pipe-pool")]
16// pub(crate) mod pool;
17
18use std::num::NonZeroUsize;
19use std::{io, mem};
20
21use rustix::fd::OwnedFd;
22use rustix::pipe::{fcntl_getpipe_size, fcntl_setpipe_size, pipe_with, PipeFlags};
23
24#[allow(unsafe_code)]
25/// `MAXIMUM_PIPE_SIZE` is the maximum amount of data we asks
26/// the kernel to move in a single call to `splice(2)`.
27///
28/// We use 1MB as `splice(2)` writes data through a pipe, and 1MB is the default
29/// maximum pipe buffer size, which is determined by
30/// `/proc/sys/fs/pipe-max-size`.
31///
32/// Running applications under unprivileged user may have the pages usage
33/// limited. See [`pipe(7)`] for details.
34///
35/// [`pipe(7)`]: https://man7.org/linux/man-pages/man7/pipe.7.html
36pub const MAXIMUM_PIPE_SIZE: NonZeroUsize = unsafe { NonZeroUsize::new_unchecked(1 << 20) };
37
38#[allow(unsafe_code)]
39/// `DEFAULT_PIPE_SIZE` is the default pipe size when pipe size is not known.
40pub const DEFAULT_PIPE_SIZE: NonZeroUsize = unsafe { NonZeroUsize::new_unchecked(1 << 16) };
41
42#[derive(Debug)]
43/// Linux Pipe.
44pub struct Pipe {
45 /// File descriptor for reading from the pipe
46 read_side_fd: Fd,
47
48 /// File descriptor for writing to the pipe
49 write_side_fd: Fd,
50
51 /// Pipe size in bytes.
52 size: NonZeroUsize,
53}
54
55#[derive(Debug)]
56enum Fd {
57 /// The file descriptor can be used for reading or writing.
58 Running(OwnedFd),
59
60 #[allow(dead_code)]
61 /// The file descriptor is reserved for future use (to be recycled).
62 Reserved(OwnedFd),
63
64 /// Make compiler happy.
65 Closed,
66}
67
68impl Fd {
69 #[inline]
70 /// Convert the file descriptor to a pending state.
71 ///
72 /// This is used to indicate that the file descriptor is reserved for future
73 /// use.
74 fn set_reserved(&mut self) {
75 if let Fd::Running(owned_fd) = mem::replace(self, Fd::Closed) {
76 *self = Fd::Reserved(owned_fd);
77 }
78 }
79
80 #[inline]
81 const fn as_fd(&self) -> Option<&OwnedFd> {
82 match self {
83 Fd::Running(fd) => Some(fd),
84 _ => None,
85 }
86 }
87
88 // #[inline]
89 // #[allow(unsafe_code)]
90 // /// Safety: the caller must ensure that the operation doesn't care about the
91 // /// file descriptor's state.
92 // unsafe fn force_as_fd(&self) -> &OwnedFd {
93 // match self {
94 // Fd::Running(fd) => fd,
95 // Fd::Reserved(fd) => fd,
96 // Fd::Closed => {
97 // panic!("Attempted to access a closed file descriptor");
98 // }
99 // }
100 // }
101
102 // #[inline]
103 // #[allow(unsafe_code)]
104 // // Safety: the caller must ensure that the file descriptor is not in use.
105 // unsafe fn force_as_mut_fd(&mut self) -> &mut OwnedFd {
106 // match self {
107 // Fd::Running(fd) => fd,
108 // Fd::Reserved(fd) => fd,
109 // Fd::Closed => {
110 // panic!("Attempted to access a closed file descriptor");
111 // }
112 // }
113 // }
114}
115
116impl Pipe {
117 /// Create a pipe, with flags `O_NONBLOCK` and `O_CLOEXEC`.
118 ///
119 /// The default pipe size is set to `MAXIMUM_PIPE_SIZE` bytes.
120 ///
121 /// ## Errors
122 ///
123 /// * If the pipe creation of setting pipe size fails, an `io::Error` is
124 /// returned.
125 pub fn new() -> io::Result<Self> {
126 pipe_with(PipeFlags::NONBLOCK | PipeFlags::CLOEXEC)
127 .map_err(|e| io::Error::from_raw_os_error(e.raw_os_error()))
128 .and_then(|(read_fd, write_fd)| {
129 // Splice will loop writing MAXIMUM_PIPE_SIZE bytes from the source to the pipe,
130 // and then write those bytes from the pipe to the destination.
131 // Set the pipe buffer size to MAXIMUM_PIPE_SIZE to optimize that.
132 // Ignore errors here, as a smaller buffer size will work,
133 // although it will require more system calls.
134 let size = match fcntl_setpipe_size(&read_fd, MAXIMUM_PIPE_SIZE.get()) {
135 Ok(size) => NonZeroUsize::new(size),
136 Err(_) => NonZeroUsize::new(fcntl_getpipe_size(&read_fd)?),
137 }
138 .ok_or_else(|| {
139 io::Error::new(
140 io::ErrorKind::Other,
141 "failed to set pipe size, using default size",
142 )
143 })?;
144
145 Ok(Self {
146 read_side_fd: Fd::Running(read_fd),
147 write_side_fd: Fd::Running(write_fd),
148 size,
149 })
150 })
151 }
152
153 /// Set the pipe size.
154 ///
155 /// ## Errors
156 ///
157 /// See [`fcntl(2)`].
158 ///
159 /// [`fcntl(2)`]: https://man7.org/linux/man-pages/man2/fcntl.2.html.
160 pub fn set_pipe_size(&mut self, pipe_size: usize) -> io::Result<usize> {
161 let Some(write_side_fd) = self.write_side_fd.as_fd() else {
162 return Err(io::Error::new(
163 io::ErrorKind::Other,
164 "write side file descriptor is not available",
165 ));
166 };
167
168 fcntl_setpipe_size(write_side_fd, pipe_size)
169 .map_err(|e| io::Error::from_raw_os_error(e.raw_os_error()))
170 }
171
172 #[inline]
173 pub(crate) const fn write_side_fd(&self) -> Option<&OwnedFd> {
174 self.write_side_fd.as_fd()
175 }
176
177 #[must_use]
178 #[inline]
179 pub(crate) const fn splice_drain_finished(&self) -> bool {
180 matches!(self.write_side_fd, Fd::Reserved(_) | Fd::Closed)
181 }
182
183 #[inline]
184 /// Close the pipe write side file descriptor.
185 pub(crate) fn set_splice_drain_finished(&mut self) {
186 self.write_side_fd.set_reserved();
187 }
188
189 #[inline]
190 pub(crate) const fn read_side_fd(&self) -> Option<&OwnedFd> {
191 self.read_side_fd.as_fd()
192 }
193
194 #[must_use]
195 #[inline]
196 pub(crate) const fn splice_pump_finished(&self) -> bool {
197 matches!(self.read_side_fd, Fd::Reserved(_) | Fd::Closed)
198 }
199
200 #[inline]
201 /// Close the pipe read side file descriptor.
202 pub(crate) fn set_splice_pump_finished(&mut self) {
203 self.read_side_fd.set_reserved();
204 }
205
206 #[must_use]
207 #[inline]
208 /// Returns the size of the pipe, in bytes.
209 pub const fn size(&self) -> NonZeroUsize {
210 self.size
211 }
212}
213
214// #[cfg(feature = "feat-pipe-pool")]
215// impl Drop for Pipe {
216// fn drop(&mut self) {
217// #[allow(unsafe_code)]
218// // Safety: the pipe is not in use, so it is safe to return it to the
219// pool. unsafe {
220// pool::PipePool::return_one(self)
221// };
222// }
223// }