tokio_splice2/context.rs
1//! `splice(2)` IO context.
2
3use std::marker::PhantomData;
4use std::num::NonZeroUsize;
5use std::pin::Pin;
6use std::task::{ready, Context, Poll};
7use std::{fmt, io};
8
9use rustix::pipe::{splice, SpliceFlags};
10
11use crate::io::{AsyncReadFd, AsyncWriteFd, IsFile, IsNotFile, SpliceIo};
12use crate::pipe::Pipe;
13use crate::traffic::TrafficResult;
14use crate::utils::{Drained, Offset};
15
16/// Splice IO context.
17pub struct SpliceIoCtx<R, W> {
18 /// The `off_in` when splicing from `R` to the pipe, or the `off_out` when
19 /// splicing from the pipe to `W`.
20 offset: Offset,
21 /// Target length to read from `R` then write to `W`.
22 ///
23 /// Default is `isize::MAX`, which means read as much as possible.
24 size_to_splice: usize,
25
26 /// Pipe used to splice data.
27 pipe: Pipe,
28
29 /// Bytes that have been read from `R` into pipe write side.
30 has_read: usize,
31 /// Bytes that have been written to `W` from pipe read side.
32 has_written: usize,
33 /// Whether need to flush `W` after splicing.
34 need_flush: bool,
35
36 r: PhantomData<R>,
37 w: PhantomData<W>,
38}
39
40impl<R, W> fmt::Debug for SpliceIoCtx<R, W> {
41 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
42 f.debug_struct("SpliceIoCtx")
43 .field("offset", &self.offset)
44 .field("size_to_splice", &self.size_to_splice)
45 .field("pipe", &self.pipe)
46 .field("has_read", &self.has_read)
47 .field("has_written", &self.has_written)
48 .field("need_flush", &self.need_flush)
49 .finish()
50 }
51}
52
53impl<R, W> SpliceIoCtx<R, W> {
54 #[inline]
55 fn _prepare() -> io::Result<Self> {
56 Ok(Self {
57 offset: Offset::None,
58 size_to_splice: isize::MAX as usize,
59 pipe: Pipe::new()?,
60 has_read: 0,
61 has_written: 0,
62 need_flush: false,
63 r: PhantomData,
64 w: PhantomData,
65 })
66 }
67
68 #[inline]
69 /// Prepare a new `SpliceIoCtx` instance.
70 ///
71 /// Can be used only when `R` and `W` are not files.
72 ///
73 /// ## Errors
74 ///
75 /// * Create pipe failed.
76 pub fn prepare() -> io::Result<Self>
77 where
78 R: IsNotFile,
79 W: IsNotFile,
80 {
81 Self::_prepare()
82 }
83
84 #[must_use]
85 #[inline]
86 /// Set the target length of bytes to be copy from `R` to `W`.
87 ///
88 /// ## Notice
89 ///
90 /// You MAY want need
91 /// [`prepare_reading_file`](Self::prepare_reading_file) or
92 /// [`prepare_writing_file`](Self::prepare_writing_file) if `R` or `W` is a
93 /// file.
94 pub fn with_target_len(self, size_to_splice: usize) -> Self
95 where
96 R: IsNotFile,
97 W: IsNotFile,
98 {
99 Self {
100 size_to_splice,
101 ..self
102 }
103 }
104
105 #[inline]
106 /// Prepare a new `SpliceIoCtx` instance.
107 ///
108 /// Can be used only when `R` is a file.
109 ///
110 /// ## Arguments
111 ///
112 /// * `f_len` - File length.
113 /// * `f_offset_start` - File offset start. Set to `None` to read from the
114 /// beginning.
115 /// * `f_offset_end` - File offset end. Set to `None` to read to the end.
116 ///
117 /// ## Errors
118 ///
119 /// * Invalid offset.
120 /// * Create pipe failed.
121 pub fn prepare_reading_file(
122 f_len: u64,
123 f_offset_start: Option<u64>,
124 f_offset_end: Option<u64>,
125 ) -> io::Result<Self>
126 where
127 R: IsFile,
128 W: IsNotFile,
129 {
130 Ok(SpliceIoCtx {
131 offset: Offset::In(Some(f_offset_start.unwrap_or(0))),
132 size_to_splice: Offset::calc_size_to_splice(f_len, f_offset_start, f_offset_end)?
133 .try_into()
134 .map_err(|_| io::Error::new(io::ErrorKind::InvalidInput, "file size too large"))?,
135 ..Self::_prepare()?
136 })
137 }
138
139 #[inline]
140 /// Prepare a new `SpliceIoCtx` instance.
141 ///
142 /// Can be used only when `W` is a file.
143 ///
144 /// ## Arguments
145 ///
146 /// * `f_len` - File length.
147 /// * `f_offset_start` - File offset start. Set to `None` to write from the
148 /// beginning.
149 /// * `f_offset_end` - File offset end. Set to `None` to write to the end.
150 ///
151 /// ## Errors
152 ///
153 /// * Invalid offset.
154 /// * Create pipe failed.
155 pub fn prepare_writing_file(
156 f_len: u64,
157 f_offset_start: Option<u64>,
158 f_offset_end: Option<u64>,
159 ) -> io::Result<Self>
160 where
161 R: IsNotFile,
162 W: IsFile,
163 {
164 Ok(SpliceIoCtx {
165 offset: Offset::Out(Some(f_offset_start.unwrap_or(0))),
166 size_to_splice: Offset::calc_size_to_splice(f_len, f_offset_start, f_offset_end)?
167 .try_into()
168 .map_err(|_| io::Error::new(io::ErrorKind::InvalidInput, "file size too large"))?,
169 ..Self::_prepare()?
170 })
171 }
172
173 #[inline]
174 /// Set the pipe size.
175 ///
176 /// See [`Pipe`]'s top level docs for more details.
177 ///
178 /// ## Errors
179 ///
180 /// * Set pipe size failed.
181 ///
182 /// For more details, see [`fcntl(2)`].
183 ///
184 /// [`fcntl(2)`]: https://man7.org/linux/man-pages/man2/fcntl.2.html.
185 pub fn set_pipe_size(mut self, pipe_size: usize) -> io::Result<Self> {
186 self.pipe.set_pipe_size(pipe_size)?;
187 Ok(self)
188 }
189}
190
191impl<R, W> SpliceIoCtx<R, W> {
192 #[must_use]
193 #[inline]
194 /// Returns bytes that have been read from `R`.
195 pub const fn has_read(&self) -> usize {
196 self.has_read
197 }
198
199 #[must_use]
200 #[inline]
201 /// Returns bytes that have been written to `W`.
202 pub const fn has_written(&self) -> usize {
203 self.has_written
204 }
205
206 #[must_use]
207 #[inline]
208 /// Returns the pipe size.
209 pub const fn pipe_size(&self) -> NonZeroUsize {
210 self.pipe.size()
211 }
212
213 // #[inline]
214 // /// Returns if splice drain is finished.
215 // pub(crate) const fn splice_drain_finished(&self) -> bool {
216 // self.pipe.splice_drain_finished()
217 // }
218
219 // #[inline]
220 // /// Returns if splice pump is finished.
221 // pub(crate) const fn splice_pump_finished(&self) -> bool {
222 // self.pipe.splice_pump_finished()
223 // }
224
225 #[inline]
226 /// Returns if draining and pumping are both finished.
227 pub(crate) const fn finished(&self) -> bool {
228 self.pipe.splice_drain_finished() && self.pipe.splice_pump_finished()
229 }
230
231 #[must_use]
232 #[inline]
233 /// Returns the traffic result (client TX one).
234 pub const fn traffic_client_tx(&self, error: Option<io::Error>) -> TrafficResult {
235 TrafficResult {
236 tx: self.has_written,
237 rx: 0,
238 error,
239 }
240 }
241
242 #[must_use]
243 #[inline]
244 /// Returns the traffic result (client RX one).
245 pub const fn traffic_client_rx(&self, error: Option<io::Error>) -> TrafficResult {
246 TrafficResult {
247 tx: 0,
248 rx: self.has_read,
249 error,
250 }
251 }
252
253 #[must_use]
254 #[inline]
255 /// Builder pattern version of [`SpliceIo::new`].
256 pub fn into_io(self) -> SpliceIo<R, W> {
257 SpliceIo::new(self)
258 }
259}
260
261impl<R, W> SpliceIoCtx<R, W>
262where
263 R: AsyncReadFd,
264 W: AsyncWriteFd,
265{
266 #[cfg_attr(
267 any(
268 feature = "feat-tracing-trace",
269 all(debug_assertions, feature = "feat-tracing")
270 ),
271 tracing::instrument(level = "TRACE", skip(cx, r), ret)
272 )]
273 /// `poll_splice_drain` moves data from a socket (or file) to a pipe.
274 ///
275 /// Invariant: when entering `poll_splice_drain`, the pipe is empty. It is
276 /// either in its initial state, or `poll_splice_pump` has emptied it
277 /// previously.
278 ///
279 /// Given this, `poll_splice_drain` can reasonably assume that the pipe is
280 /// ready for writing, so if splice returns EAGAIN, it must be because
281 /// the socket is not ready for reading.
282 ///
283 /// Will close pipe write side when no more data to read or when I/O error
284 /// occurs except EINTR / EAGAIN.
285 pub(crate) fn poll_splice_drain(
286 &mut self,
287 cx: &mut Context<'_>,
288 r: Pin<&mut R>,
289 ideal_len: Option<NonZeroUsize>,
290 ) -> Poll<io::Result<Drained>> {
291 crate::trace!("`poll_splice_drain`");
292
293 let Some(pipe_write_side_fd) = self.pipe.write_side_fd() else {
294 // Has `set_splice_drain_finished`, no need to close pipe write side again.
295 // self.pipe.set_splice_drain_finished();
296
297 return Poll::Ready(Ok(Drained::Done));
298 };
299
300 let Some(size_rest_to_splice) = self
301 .size_to_splice
302 .checked_sub(self.has_read)
303 .map(|l| match ideal_len {
304 Some(len) => len.get().min(l),
305 None => l,
306 })
307 .and_then(NonZeroUsize::new)
308 else {
309 self.pipe.set_splice_drain_finished();
310
311 return Poll::Ready(Ok(Drained::Done));
312 };
313
314 loop {
315 // In theory calling splice(2) with SPLICE_F_NONBLOCK could end up an infinite
316 // loop here, because it could return EAGAIN ceaselessly when the write
317 // end of the pipe is full, but this shouldn't be a concern here, since
318 // the pipe buffer must be sufficient (all buffered bytes will be written to
319 // writer after this).
320 ready!(r.poll_read_ready(cx))?;
321
322 match r.try_io_read(|| {
323 splice(
324 r.as_fd(),
325 self.offset.off_in(),
326 pipe_write_side_fd,
327 None,
328 size_rest_to_splice.get(),
329 SpliceFlags::NONBLOCK,
330 )
331 .map(NonZeroUsize::new)
332 .map_err(|e| io::Error::from_raw_os_error(e.raw_os_error()))
333 }) {
334 Ok(Some(drained)) => {
335 self.has_read += drained.get();
336 self.size_to_splice -= drained.get();
337
338 break Poll::Ready(Ok(Drained::Some(drained)));
339 }
340 Ok(None) => {
341 self.pipe.set_splice_drain_finished();
342
343 break Poll::Ready(Ok(Drained::Done));
344 }
345 Err(e) => {
346 match e.kind() {
347 io::ErrorKind::WouldBlock | io::ErrorKind::Interrupted => {
348 // The `r` is not ready for reading from, busy loop,
349 // though will return pending in next loop
350 // continue;
351 }
352 _ => {
353 self.pipe.set_splice_drain_finished();
354
355 break Poll::Ready(Err(e));
356 }
357 }
358 }
359 }
360 }
361 }
362
363 #[cfg_attr(
364 any(
365 feature = "feat-tracing-trace",
366 all(debug_assertions, feature = "feat-tracing")
367 ),
368 tracing::instrument(level = "TRACE", skip(cx, w), ret)
369 )]
370 /// `poll_splice_pump` moves data from a pipe to a socket (or file).
371 ///
372 /// Will close pipe read side when no more data to write or when I/O error
373 /// occurs except EINTR / EAGAIN.
374 ///
375 /// Will keep pumping data until the pipe is empty (returns
376 /// `Poll::Ready(Ok(())`), the socket is not ready (returns
377 /// `Poll::Pending`) or error occurs.
378 pub(crate) fn poll_splice_pump(
379 &mut self,
380 cx: &mut Context<'_>,
381 w: Pin<&mut W>,
382 ) -> Poll<io::Result<()>> {
383 crate::trace!("`poll_splice_pump`");
384
385 let Some(pipe_read_side_fd) = self.pipe.read_side_fd() else {
386 return Poll::Ready(Ok(()));
387 };
388
389 loop {
390 let Some(size_need_to_be_written) = self.has_read.checked_sub(self.has_written) else {
391 // If `has_written` is larger than `has_read`, may never stop.
392 // In particular, user's wrong implementation returning
393 // incorrect written length may lead to thread blocking.
394
395 self.pipe.set_splice_pump_finished();
396
397 break Poll::Ready(Err(io::Error::new(
398 io::ErrorKind::Other,
399 "`has_written` larger than `has_read`",
400 )));
401 };
402
403 let Some(size_need_to_be_written) = NonZeroUsize::new(size_need_to_be_written) else {
404 if self.pipe.splice_drain_finished() {
405 self.pipe.set_splice_pump_finished();
406 }
407
408 break Poll::Ready(Ok(()));
409 };
410
411 ready!(w.poll_write_ready(cx))?;
412
413 match w.try_io_write(|| {
414 splice(
415 pipe_read_side_fd,
416 None,
417 w.as_fd(),
418 self.offset.off_out(),
419 size_need_to_be_written.get(),
420 SpliceFlags::NONBLOCK,
421 )
422 .map(NonZeroUsize::new)
423 .map_err(|e| io::Error::from_raw_os_error(e.raw_os_error()))
424 }) {
425 Ok(Some(has_written)) => {
426 // Go to next loop to check if there is more data
427 self.has_written += has_written.get();
428 self.need_flush = true;
429 }
430 Ok(None) => {
431 break Poll::Ready(Err(io::ErrorKind::WriteZero.into()));
432 }
433 Err(e) => {
434 match e.kind() {
435 io::ErrorKind::WouldBlock | io::ErrorKind::Interrupted => {
436 // The `r` is not ready for reading from, busy loop,
437 // though will return pending in next loop
438 // continue;
439 }
440 _ => {
441 self.pipe.set_splice_drain_finished();
442
443 break Poll::Ready(Err(e));
444 }
445 }
446 }
447 }
448 }
449 }
450}