buffet/
io.rs

1use crate::{BufResult, IoBufMut, Piece, PieceList};
2
3mod pipe;
4pub use pipe::*;
5
6mod non_uring;
7
8#[allow(async_fn_in_trait)] // we never require Send
9pub trait ReadOwned {
10    async fn read_owned<B: IoBufMut>(&mut self, buf: B) -> BufResult<usize, B>;
11}
12
13#[allow(async_fn_in_trait)] // we never require Send
14pub trait WriteOwned {
15    /// Write a single buffer, taking ownership for the duration of the write.
16    /// Might perform a partial write, see [WriteOwned::write_all_owned]
17    async fn write_owned(&mut self, buf: impl Into<Piece>) -> BufResult<usize, Piece>;
18
19    /// Write a single buffer, re-trying the write if the kernel does a partial
20    /// write.
21    async fn write_all_owned(&mut self, buf: impl Into<Piece>) -> std::io::Result<()> {
22        let mut buf = buf.into();
23        let mut written = 0;
24        let len = buf.len();
25        while written < len {
26            let (res, slice) = self.write_owned(buf).await;
27            let n = res?;
28            if n == 0 {
29                return Err(std::io::Error::new(
30                    std::io::ErrorKind::WriteZero,
31                    "write zero",
32                ));
33            }
34            (_, buf) = slice.split_at(n);
35            written += n;
36        }
37        Ok(())
38    }
39
40    /// Write a list of buffers, taking ownership for the duration of the write.
41    /// Might perform a partial write, see [WriteOwned::writev_all_owned]
42    async fn writev_owned(&mut self, list: &PieceList) -> std::io::Result<usize> {
43        let mut total = 0;
44
45        for buf in list.pieces.iter().cloned() {
46            let buf_len = buf.len();
47
48            let (res, _) = self.write_owned(buf).await;
49            match res {
50                Ok(0) => {
51                    return Err(std::io::Error::new(
52                        std::io::ErrorKind::WriteZero,
53                        "write zero",
54                    ));
55                }
56                Ok(n) => {
57                    total += n;
58                    if n < buf_len {
59                        // partial write, return the buffer list so the caller
60                        // might choose to try the write again
61                        return Ok(total);
62                    }
63                }
64                Err(e) => {
65                    return Err(e);
66                }
67            }
68        }
69        Ok(total)
70    }
71
72    /// Write a list of buffers, re-trying the write if the kernel does a
73    /// partial write.
74    async fn writev_all_owned(&mut self, mut list: PieceList) -> std::io::Result<()> {
75        while !list.is_empty() {
76            let n = self.writev_owned(&list).await?;
77
78            if n == 0 {
79                return Err(std::io::Error::new(
80                    std::io::ErrorKind::WriteZero,
81                    "write zero",
82                ));
83            }
84
85            let mut n = n;
86            while n > 0 {
87                // pop and/or split items from the list
88                let next_item = list.pieces.front_mut().unwrap();
89                let next_item_len = next_item.len();
90
91                if n < next_item_len {
92                    // the number of bytes written falls in the middle of the buffer.
93                    // split the buffer and push the remainder back to the list
94                    let next_item = list.pieces.pop_front().unwrap();
95                    let (l, r) = next_item.split_at(n);
96                    n -= l.len();
97                    list.pieces.push_front(r);
98                } else {
99                    // the whole buffer was written, pop it from the list
100                    list.pieces.pop_front();
101                    n -= next_item_len;
102                }
103            }
104        }
105        Ok(())
106    }
107
108    /// Shuts down the write end of this socket. This flushes
109    /// any data that may not have been send.
110    async fn shutdown(&mut self) -> std::io::Result<()>;
111}
112
113#[cfg(all(test, not(feature = "miri")))]
114mod tests {
115    use std::{cell::RefCell, rc::Rc};
116
117    use crate::{io::WriteOwned, BufResult, Piece, PieceList};
118
119    #[test]
120    fn test_write_all() {
121        enum Mode {
122            WriteZero,
123            WritePartial,
124        }
125
126        struct Writer {
127            mode: Mode,
128            bytes: Rc<RefCell<Vec<u8>>>,
129        }
130
131        impl WriteOwned for Writer {
132            async fn write_owned(&mut self, buf: impl Into<Piece>) -> BufResult<usize, Piece> {
133                let buf = buf.into();
134                assert!(!buf.is_empty(), "zero-length writes are forbidden");
135
136                match self.mode {
137                    Mode::WriteZero => (Ok(0), buf),
138                    Mode::WritePartial => {
139                        let n = match buf.len() {
140                            1 => 1,
141                            _ => buf.len() / 2,
142                        };
143                        self.bytes.borrow_mut().extend_from_slice(&buf[..n]);
144                        (Ok(n), buf)
145                    }
146                }
147            }
148
149            async fn shutdown(&mut self) -> std::io::Result<()> {
150                Ok(())
151            }
152        }
153
154        crate::start(async move {
155            let mut writer = Writer {
156                mode: Mode::WriteZero,
157                bytes: Default::default(),
158            };
159            let buf_a = vec![1, 2, 3, 4, 5];
160            let res = writer.write_all_owned(buf_a).await;
161            assert!(res.is_err());
162
163            let mut writer = Writer {
164                mode: Mode::WriteZero,
165                bytes: Default::default(),
166            };
167            let buf_a = vec![1, 2, 3, 4, 5];
168            let buf_b = vec![6, 7, 8, 9, 10];
169            let res = writer
170                .writev_all_owned(PieceList::single(buf_a).followed_by(buf_b))
171                .await;
172            assert!(res.is_err());
173
174            let mut writer = Writer {
175                mode: Mode::WritePartial,
176                bytes: Default::default(),
177            };
178            let buf_a = vec![1, 2, 3, 4, 5];
179            writer.write_all_owned(buf_a).await.unwrap();
180            assert_eq!(&writer.bytes.borrow()[..], &[1, 2, 3, 4, 5]);
181
182            let mut writer = Writer {
183                mode: Mode::WritePartial,
184                bytes: Default::default(),
185            };
186            let buf_a = vec![1, 2, 3, 4, 5];
187            let buf_b = vec![6, 7, 8, 9, 10];
188            writer
189                .writev_all_owned(PieceList::single(buf_a).followed_by(buf_b))
190                .await
191                .unwrap();
192            assert_eq!(&writer.bytes.borrow()[..], &[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
193        });
194    }
195}
196
197pub trait IntoHalves: 'static {
198    type Read: ReadOwned;
199    type Write: WriteOwned;
200
201    /// Split this into an owned read half and an owned write half.
202    fn into_halves(self) -> (Self::Read, Self::Write);
203}