tokio_splice2/
context.rs

1//! `splice(2)` IO context.
2
3use std::marker::PhantomData;
4use std::num::NonZeroUsize;
5use std::pin::Pin;
6use std::task::{ready, Context, Poll};
7use std::{fmt, io};
8
9use rustix::pipe::{splice, SpliceFlags};
10
11use crate::io::{AsyncReadFd, AsyncWriteFd, IsFile, IsNotFile, SpliceIo};
12use crate::pipe::Pipe;
13use crate::traffic::TrafficResult;
14use crate::utils::{Drained, Offset};
15
16/// Splice IO context.
17pub struct SpliceIoCtx<R, W> {
18    /// The `off_in` when splicing from `R` to the pipe, or the `off_out` when
19    /// splicing from the pipe to `W`.
20    offset: Offset,
21    /// Target length to read from `R` then write to `W`.
22    ///
23    /// Default is `isize::MAX`, which means read as much as possible.
24    size_to_splice: usize,
25
26    /// Pipe used to splice data.
27    pipe: Pipe,
28
29    /// Bytes that have been read from `R` into pipe write side.
30    has_read: usize,
31    /// Bytes that have been written to `W` from pipe read side.
32    has_written: usize,
33    /// Whether need to flush `W` after splicing.
34    need_flush: bool,
35
36    r: PhantomData<R>,
37    w: PhantomData<W>,
38}
39
40impl<R, W> fmt::Debug for SpliceIoCtx<R, W> {
41    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
42        f.debug_struct("SpliceIoCtx")
43            .field("offset", &self.offset)
44            .field("size_to_splice", &self.size_to_splice)
45            .field("pipe", &self.pipe)
46            .field("has_read", &self.has_read)
47            .field("has_written", &self.has_written)
48            .field("need_flush", &self.need_flush)
49            .finish()
50    }
51}
52
53impl<R, W> SpliceIoCtx<R, W> {
54    #[inline]
55    fn _prepare() -> io::Result<Self> {
56        Ok(Self {
57            offset: Offset::None,
58            size_to_splice: isize::MAX as usize,
59            pipe: Pipe::new()?,
60            has_read: 0,
61            has_written: 0,
62            need_flush: false,
63            r: PhantomData,
64            w: PhantomData,
65        })
66    }
67
68    #[inline]
69    /// Prepare a new `SpliceIoCtx` instance.
70    ///
71    /// Can be used only when `R` and `W` are not files.
72    ///
73    /// ## Errors
74    ///
75    /// * Create pipe failed.
76    pub fn prepare() -> io::Result<Self>
77    where
78        R: IsNotFile,
79        W: IsNotFile,
80    {
81        Self::_prepare()
82    }
83
84    #[must_use]
85    #[inline]
86    /// Set the target length of bytes to be copy from `R` to `W`.
87    ///
88    /// ## Notice
89    ///
90    /// You MAY want need
91    /// [`prepare_reading_file`](Self::prepare_reading_file) or
92    /// [`prepare_writing_file`](Self::prepare_writing_file) if `R` or `W` is a
93    /// file.
94    pub fn with_target_len(self, size_to_splice: usize) -> Self
95    where
96        R: IsNotFile,
97        W: IsNotFile,
98    {
99        Self {
100            size_to_splice,
101            ..self
102        }
103    }
104
105    #[inline]
106    /// Prepare a new `SpliceIoCtx` instance.
107    ///
108    /// Can be used only when `R` is a file.
109    ///
110    /// ## Arguments
111    ///
112    /// * `f_len` - File length.
113    /// * `f_offset_start` - File offset start. Set to `None` to read from the
114    ///   beginning.
115    /// * `f_offset_end` - File offset end. Set to `None` to read to the end.
116    ///
117    /// ## Errors
118    ///
119    /// * Invalid offset.
120    /// * Create pipe failed.
121    pub fn prepare_reading_file(
122        f_len: u64,
123        f_offset_start: Option<u64>,
124        f_offset_end: Option<u64>,
125    ) -> io::Result<Self>
126    where
127        R: IsFile,
128        W: IsNotFile,
129    {
130        Ok(SpliceIoCtx {
131            offset: Offset::In(Some(f_offset_start.unwrap_or(0))),
132            size_to_splice: Offset::calc_size_to_splice(f_len, f_offset_start, f_offset_end)?
133                .try_into()
134                .map_err(|_| io::Error::new(io::ErrorKind::InvalidInput, "file size too large"))?,
135            ..Self::_prepare()?
136        })
137    }
138
139    #[inline]
140    /// Prepare a new `SpliceIoCtx` instance.
141    ///
142    /// Can be used only when `W` is a file.
143    ///
144    /// ## Arguments
145    ///
146    /// * `f_len` - File length.
147    /// * `f_offset_start` - File offset start. Set to `None` to write from the
148    ///   beginning.
149    /// * `f_offset_end` - File offset end. Set to `None` to write to the end.
150    ///
151    /// ## Errors
152    ///
153    /// * Invalid offset.
154    /// * Create pipe failed.
155    pub fn prepare_writing_file(
156        f_len: u64,
157        f_offset_start: Option<u64>,
158        f_offset_end: Option<u64>,
159    ) -> io::Result<Self>
160    where
161        R: IsNotFile,
162        W: IsFile,
163    {
164        Ok(SpliceIoCtx {
165            offset: Offset::Out(Some(f_offset_start.unwrap_or(0))),
166            size_to_splice: Offset::calc_size_to_splice(f_len, f_offset_start, f_offset_end)?
167                .try_into()
168                .map_err(|_| io::Error::new(io::ErrorKind::InvalidInput, "file size too large"))?,
169            ..Self::_prepare()?
170        })
171    }
172
173    #[inline]
174    /// Set the pipe size.
175    ///
176    /// See [`Pipe`]'s top level docs for more details.
177    ///
178    /// ## Errors
179    ///
180    /// * Set pipe size failed.
181    ///
182    /// For more details, see [`fcntl(2)`].
183    ///
184    /// [`fcntl(2)`]: https://man7.org/linux/man-pages/man2/fcntl.2.html.
185    pub fn set_pipe_size(mut self, pipe_size: usize) -> io::Result<Self> {
186        self.pipe.set_pipe_size(pipe_size)?;
187        Ok(self)
188    }
189}
190
191impl<R, W> SpliceIoCtx<R, W> {
192    #[must_use]
193    #[inline]
194    /// Returns bytes that have been read from `R`.
195    pub const fn has_read(&self) -> usize {
196        self.has_read
197    }
198
199    #[must_use]
200    #[inline]
201    /// Returns bytes that have been written to `W`.
202    pub const fn has_written(&self) -> usize {
203        self.has_written
204    }
205
206    #[must_use]
207    #[inline]
208    /// Returns the pipe size.
209    pub const fn pipe_size(&self) -> NonZeroUsize {
210        self.pipe.size()
211    }
212
213    // #[inline]
214    // /// Returns if splice drain is finished.
215    // pub(crate) const fn splice_drain_finished(&self) -> bool {
216    //     self.pipe.splice_drain_finished()
217    // }
218
219    // #[inline]
220    // /// Returns if splice pump is finished.
221    // pub(crate) const fn splice_pump_finished(&self) -> bool {
222    //     self.pipe.splice_pump_finished()
223    // }
224
225    #[inline]
226    /// Returns if draining and pumping are both finished.
227    pub(crate) const fn finished(&self) -> bool {
228        self.pipe.splice_drain_finished() && self.pipe.splice_pump_finished()
229    }
230
231    #[must_use]
232    #[inline]
233    /// Returns the traffic result (client TX one).
234    pub const fn traffic_client_tx(&self, error: Option<io::Error>) -> TrafficResult {
235        TrafficResult {
236            tx: self.has_written,
237            rx: 0,
238            error,
239        }
240    }
241
242    #[must_use]
243    #[inline]
244    /// Returns the traffic result (client RX one).
245    pub const fn traffic_client_rx(&self, error: Option<io::Error>) -> TrafficResult {
246        TrafficResult {
247            tx: 0,
248            rx: self.has_read,
249            error,
250        }
251    }
252
253    #[must_use]
254    #[inline]
255    /// Builder pattern version of [`SpliceIo::new`].
256    pub fn into_io(self) -> SpliceIo<R, W> {
257        SpliceIo::new(self)
258    }
259}
260
261impl<R, W> SpliceIoCtx<R, W>
262where
263    R: AsyncReadFd,
264    W: AsyncWriteFd,
265{
266    #[cfg_attr(
267        any(
268            feature = "feat-tracing-trace",
269            all(debug_assertions, feature = "feat-tracing")
270        ),
271        tracing::instrument(level = "TRACE", skip(cx, r), ret)
272    )]
273    /// `poll_splice_drain` moves data from a socket (or file) to a pipe.
274    ///
275    /// Invariant: when entering `poll_splice_drain`, the pipe is empty. It is
276    /// either in its initial state, or `poll_splice_pump` has emptied it
277    /// previously.
278    ///
279    /// Given this, `poll_splice_drain` can reasonably assume that the pipe is
280    /// ready for writing, so if splice returns EAGAIN, it must be because
281    /// the socket is not ready for reading.
282    ///
283    /// Will close pipe write side when no more data to read or when I/O error
284    /// occurs except EINTR / EAGAIN.
285    pub(crate) fn poll_splice_drain(
286        &mut self,
287        cx: &mut Context<'_>,
288        r: Pin<&mut R>,
289        ideal_len: Option<NonZeroUsize>,
290    ) -> Poll<io::Result<Drained>> {
291        crate::trace!("`poll_splice_drain`");
292
293        let Some(pipe_write_side_fd) = self.pipe.write_side_fd() else {
294            // Has `set_splice_drain_finished`, no need to close pipe write side again.
295            // self.pipe.set_splice_drain_finished();
296
297            return Poll::Ready(Ok(Drained::Done));
298        };
299
300        let Some(size_rest_to_splice) = self
301            .size_to_splice
302            .checked_sub(self.has_read)
303            .map(|l| match ideal_len {
304                Some(len) => len.get().min(l),
305                None => l,
306            })
307            .and_then(NonZeroUsize::new)
308        else {
309            self.pipe.set_splice_drain_finished();
310
311            return Poll::Ready(Ok(Drained::Done));
312        };
313
314        loop {
315            // In theory calling splice(2) with SPLICE_F_NONBLOCK could end up an infinite
316            // loop here, because it could return EAGAIN ceaselessly when the write
317            // end of the pipe is full, but this shouldn't be a concern here, since
318            // the pipe buffer must be sufficient (all buffered bytes will be written to
319            // writer after this).
320            ready!(r.poll_read_ready(cx))?;
321
322            match r.try_io_read(|| {
323                splice(
324                    r.as_fd(),
325                    self.offset.off_in(),
326                    pipe_write_side_fd,
327                    None,
328                    size_rest_to_splice.get(),
329                    SpliceFlags::NONBLOCK,
330                )
331                .map(NonZeroUsize::new)
332                .map_err(|e| io::Error::from_raw_os_error(e.raw_os_error()))
333            }) {
334                Ok(Some(drained)) => {
335                    self.has_read += drained.get();
336                    self.size_to_splice -= drained.get();
337
338                    break Poll::Ready(Ok(Drained::Some(drained)));
339                }
340                Ok(None) => {
341                    self.pipe.set_splice_drain_finished();
342
343                    break Poll::Ready(Ok(Drained::Done));
344                }
345                Err(e) => {
346                    match e.kind() {
347                        io::ErrorKind::WouldBlock | io::ErrorKind::Interrupted => {
348                            // The `r` is not ready for reading from, busy loop,
349                            // though will return pending in next loop
350                            // continue;
351                        }
352                        _ => {
353                            self.pipe.set_splice_drain_finished();
354
355                            break Poll::Ready(Err(e));
356                        }
357                    }
358                }
359            }
360        }
361    }
362
363    #[cfg_attr(
364        any(
365            feature = "feat-tracing-trace",
366            all(debug_assertions, feature = "feat-tracing")
367        ),
368        tracing::instrument(level = "TRACE", skip(cx, w), ret)
369    )]
370    /// `poll_splice_pump` moves data from a pipe to a socket (or file).
371    ///
372    /// Will close pipe read side when no more data to write or when I/O error
373    /// occurs except EINTR / EAGAIN.
374    ///
375    /// Will keep pumping data until the pipe is empty (returns
376    /// `Poll::Ready(Ok(())`), the socket is not ready (returns
377    /// `Poll::Pending`) or error occurs.
378    pub(crate) fn poll_splice_pump(
379        &mut self,
380        cx: &mut Context<'_>,
381        w: Pin<&mut W>,
382    ) -> Poll<io::Result<()>> {
383        crate::trace!("`poll_splice_pump`");
384
385        let Some(pipe_read_side_fd) = self.pipe.read_side_fd() else {
386            return Poll::Ready(Ok(()));
387        };
388
389        loop {
390            let Some(size_need_to_be_written) = self.has_read.checked_sub(self.has_written) else {
391                // If `has_written` is larger than `has_read`, may never stop.
392                // In particular, user's wrong implementation returning
393                // incorrect written length may lead to thread blocking.
394
395                self.pipe.set_splice_pump_finished();
396
397                break Poll::Ready(Err(io::Error::new(
398                    io::ErrorKind::Other,
399                    "`has_written` larger than `has_read`",
400                )));
401            };
402
403            let Some(size_need_to_be_written) = NonZeroUsize::new(size_need_to_be_written) else {
404                if self.pipe.splice_drain_finished() {
405                    self.pipe.set_splice_pump_finished();
406                }
407
408                break Poll::Ready(Ok(()));
409            };
410
411            ready!(w.poll_write_ready(cx))?;
412
413            match w.try_io_write(|| {
414                splice(
415                    pipe_read_side_fd,
416                    None,
417                    w.as_fd(),
418                    self.offset.off_out(),
419                    size_need_to_be_written.get(),
420                    SpliceFlags::NONBLOCK,
421                )
422                .map(NonZeroUsize::new)
423                .map_err(|e| io::Error::from_raw_os_error(e.raw_os_error()))
424            }) {
425                Ok(Some(has_written)) => {
426                    // Go to next loop to check if there is more data
427                    self.has_written += has_written.get();
428                    self.need_flush = true;
429                }
430                Ok(None) => {
431                    break Poll::Ready(Err(io::ErrorKind::WriteZero.into()));
432                }
433                Err(e) => {
434                    match e.kind() {
435                        io::ErrorKind::WouldBlock | io::ErrorKind::Interrupted => {
436                            // The `r` is not ready for reading from, busy loop,
437                            // though will return pending in next loop
438                            // continue;
439                        }
440                        _ => {
441                            self.pipe.set_splice_drain_finished();
442
443                            break Poll::Ready(Err(e));
444                        }
445                    }
446                }
447            }
448        }
449    }
450}