layered_io/
write_layered.rs

1use crate::{Activity, Bufferable, Status};
2use std::io::{self, IoSlice, Write};
3
4/// An extension of [`std::io::Write`], but adds a `close` function to allow
5/// the stream to be closed and any outstanding errors to be reported, without
6/// requiring a `sync_all`.
7pub trait WriteLayered: Write + Bufferable {
8    /// Flush any buffers and declare the end of the stream. Subsequent writes
9    /// will fail.
10    fn close(&mut self) -> io::Result<()>;
11
12    /// Like [`Write::flush`], but has a status parameter describing
13    /// the future of the stream:
14    ///  - `Status::Ok(Activity::Active)`: do nothing
15    ///  - `Status::Ok(Activity::Push)`: flush any buffers and transmit all
16    ///    data
17    ///  - `Status::End`: flush any buffers and declare the end of the stream
18    ///
19    /// Passing `Status::Ok(Activity::Push)` makes this behave the same as
20    /// `flush()`.
21    fn flush_with_status(&mut self, status: Status) -> io::Result<()> {
22        match status {
23            Status::Open(Activity::Active) => Ok(()),
24            Status::Open(Activity::Push) => self.flush(),
25            Status::End => self.close(),
26        }
27    }
28}
29
30/// Default implementation of [`Write::write_vectored`], in terms of
31/// [`Write::write`].
32pub fn default_write_vectored<Inner: Write + ?Sized>(
33    inner: &mut Inner,
34    bufs: &[IoSlice<'_>],
35) -> io::Result<usize> {
36    let buf = bufs
37        .iter()
38        .find(|b| !b.is_empty())
39        .map_or(&[][..], |b| &**b);
40    inner.write(buf)
41}
42
43/// Default implementation of [`Write::is_write_vectored`] accompanying
44/// [`default_write_vectored`].
45#[cfg(can_vector)]
46#[inline]
47pub fn default_is_write_vectored<Inner: Write + ?Sized>(_inner: &Inner) -> bool {
48    false
49}
50
51/// Default implementation of [`Write::write_all`], in terms of
52/// [`Write::write`].
53#[allow(clippy::indexing_slicing)]
54pub fn default_write_all<Inner: Write + ?Sized>(
55    inner: &mut Inner,
56    mut buf: &[u8],
57) -> io::Result<()> {
58    while !buf.is_empty() {
59        match inner.write(buf) {
60            Ok(0) => {
61                return Err(io::Error::new(
62                    io::ErrorKind::WriteZero,
63                    "failed to write whole buffer",
64                ));
65            }
66            Ok(n) => buf = &buf[n..],
67            Err(ref e) if e.kind() == io::ErrorKind::Interrupted => {}
68            Err(e) => return Err(e),
69        }
70    }
71    Ok(())
72}
73
74/// Default implementation of [`Write::write_all_vectored`], in terms of
75/// [`Write::write_vectored`].
76#[cfg(write_all_vectored)]
77pub fn default_write_all_vectored<Inner: Write + ?Sized>(
78    inner: &mut Inner,
79    mut bufs: &mut [IoSlice],
80) -> io::Result<()> {
81    // TODO: Use [rust-lang/rust#70436]once it stabilizes.
82    // [rust-lang/rust#70436]: https://github.com/rust-lang/rust/issues/70436
83    while !bufs.is_empty() {
84        match inner.write_vectored(bufs) {
85            Ok(nwritten) => bufs = advance(bufs, nwritten),
86            Err(ref e) if e.kind() == io::ErrorKind::Interrupted => (),
87            Err(e) => return Err(e),
88        }
89    }
90    Ok(())
91}
92
93/// This will be obviated by [rust-lang/rust#62726].
94///
95/// [rust-lang/rust#62726]: https://github.com/rust-lang/rust/issues/62726.
96///
97/// Once this is removed, layered-io can become a `#![forbid(unsafe_code)]`
98/// crate.
99#[cfg(write_all_vectored)]
100fn advance<'a, 'b>(bufs: &'b mut [IoSlice<'a>], n: usize) -> &'b mut [IoSlice<'a>] {
101    use std::slice;
102
103    // Number of buffers to remove.
104    let mut remove = 0;
105    // Total length of all the to be removed buffers.
106    let mut accumulated_len = 0;
107    for buf in bufs.iter() {
108        if accumulated_len + buf.len() > n {
109            break;
110        }
111        accumulated_len += buf.len();
112        remove += 1;
113    }
114
115    #[allow(clippy::indexing_slicing)]
116    let bufs = &mut bufs[remove..];
117    if let Some(first) = bufs.first_mut() {
118        let advance_by = n - accumulated_len;
119        let mut ptr = first.as_ptr();
120        let mut len = first.len();
121        unsafe {
122            ptr = ptr.add(advance_by);
123            len -= advance_by;
124            *first = IoSlice::<'a>::new(slice::from_raw_parts::<'a>(ptr, len));
125        }
126    }
127    bufs
128}
129
130impl WriteLayered for std::io::Cursor<Vec<u8>> {
131    #[inline]
132    fn close(&mut self) -> io::Result<()> {
133        self.set_position(self.get_ref().len().try_into().unwrap());
134        Ok(())
135    }
136}
137
138impl WriteLayered for std::io::Cursor<Box<[u8]>> {
139    #[inline]
140    fn close(&mut self) -> io::Result<()> {
141        self.set_position(self.get_ref().len().try_into().unwrap());
142        Ok(())
143    }
144}
145
146impl WriteLayered for std::io::Cursor<&mut Vec<u8>> {
147    #[inline]
148    fn close(&mut self) -> io::Result<()> {
149        self.set_position(self.get_ref().len().try_into().unwrap());
150        Ok(())
151    }
152}
153
154impl WriteLayered for std::io::Cursor<&mut [u8]> {
155    #[inline]
156    fn close(&mut self) -> io::Result<()> {
157        self.set_position(self.get_ref().len().try_into().unwrap());
158        Ok(())
159    }
160}
161
162impl<W: WriteLayered> WriteLayered for Box<W> {
163    #[inline]
164    fn close(&mut self) -> io::Result<()> {
165        self.as_mut().close()
166    }
167}
168
169impl<W: WriteLayered> WriteLayered for &mut W {
170    #[inline]
171    fn close(&mut self) -> io::Result<()> {
172        (**self).close()
173    }
174}