fluke_buffet/
io.rs

1use crate::{BufResult, IoBufMut, Piece, PieceList};
2
3mod pipe;
4pub use pipe::*;
5
6mod non_uring;
7
8#[allow(async_fn_in_trait)] // we never require Send
9pub trait ReadOwned {
10    async fn read_owned<B: IoBufMut>(&mut self, buf: B) -> BufResult<usize, B>;
11}
12
13#[allow(async_fn_in_trait)] // we never require Send
14pub trait WriteOwned {
15    /// Write a single buffer, taking ownership for the duration of the write.
16    /// Might perform a partial write, see [WriteOwned::write_all]
17    async fn write_owned(&mut self, buf: impl Into<Piece>) -> BufResult<usize, Piece>;
18
19    /// Write a single buffer, re-trying the write if the kernel does a partial write.
20    async fn write_all_owned(&mut self, buf: impl Into<Piece>) -> std::io::Result<()> {
21        let mut buf = buf.into();
22        let mut written = 0;
23        let len = buf.len();
24        while written < len {
25            let (res, slice) = self.write_owned(buf).await;
26            let n = res?;
27            if n == 0 {
28                return Err(std::io::Error::new(
29                    std::io::ErrorKind::WriteZero,
30                    "write zero",
31                ));
32            }
33            (_, buf) = slice.split_at(n);
34            written += n;
35        }
36        Ok(())
37    }
38
39    /// Write a list of buffers, taking ownership for the duration of the write.
40    /// Might perform a partial write, see [WriteOwned::writev_all]
41    async fn writev_owned(&mut self, list: &PieceList) -> std::io::Result<usize> {
42        let mut total = 0;
43
44        for buf in list.pieces.iter().cloned() {
45            let buf_len = buf.len();
46            let (res, _) = self.write_owned(buf).await;
47
48            match res {
49                Ok(0) => {
50                    return Err(std::io::Error::new(
51                        std::io::ErrorKind::WriteZero,
52                        "write zero",
53                    ));
54                }
55                Ok(n) => {
56                    total += n;
57                    if n < buf_len {
58                        // partial write, return the buffer list so the caller
59                        // might choose to try the write again
60                        return Ok(total);
61                    }
62                }
63                Err(e) => {
64                    return Err(e);
65                }
66            }
67        }
68        Ok(total)
69    }
70
71    /// Write a list of buffers, re-trying the write if the kernel does a partial write.
72    async fn writev_all_owned(&mut self, mut list: PieceList) -> std::io::Result<()> {
73        while !list.is_empty() {
74            let n = self.writev_owned(&list).await?;
75            if n == 0 {
76                return Err(std::io::Error::new(
77                    std::io::ErrorKind::WriteZero,
78                    "write zero",
79                ));
80            }
81
82            let mut n = n;
83            while n > 0 {
84                // pop and/or split items from the list
85                let next_item = list.pieces.front_mut().unwrap();
86                let next_item_len = next_item.len();
87
88                if n < next_item_len {
89                    // the number of bytes written falls in the middle of the buffer.
90                    // split the buffer and push the remainder back to the list
91                    let next_item = list.pieces.pop_front().unwrap();
92                    let (l, r) = next_item.split_at(n);
93                    n -= l.len();
94                    list.pieces.push_front(r);
95                } else {
96                    // the whole buffer was written, pop it from the list
97                    list.pieces.pop_front();
98                    n -= next_item_len;
99                }
100            }
101        }
102
103        Ok(())
104    }
105
106    /// Shuts down the write end of this socket. This flushes
107    /// any data that may not have been send.
108    async fn shutdown(&mut self) -> std::io::Result<()>;
109}
110
111#[cfg(all(test, not(feature = "miri")))]
112mod tests {
113    use std::{cell::RefCell, rc::Rc};
114
115    use crate::{io::WriteOwned, BufResult, Piece, PieceList};
116
117    #[test]
118    fn test_write_all() {
119        enum Mode {
120            WriteZero,
121            WritePartial,
122        }
123
124        struct Writer {
125            mode: Mode,
126            bytes: Rc<RefCell<Vec<u8>>>,
127        }
128
129        impl WriteOwned for Writer {
130            async fn write_owned(&mut self, buf: impl Into<Piece>) -> BufResult<usize, Piece> {
131                let buf = buf.into();
132                assert!(!buf.is_empty(), "zero-length writes are forbidden");
133
134                match self.mode {
135                    Mode::WriteZero => (Ok(0), buf),
136                    Mode::WritePartial => {
137                        let n = match buf.len() {
138                            1 => 1,
139                            _ => buf.len() / 2,
140                        };
141                        self.bytes.borrow_mut().extend_from_slice(&buf[..n]);
142                        (Ok(n), buf)
143                    }
144                }
145            }
146
147            async fn shutdown(&mut self) -> std::io::Result<()> {
148                Ok(())
149            }
150        }
151
152        crate::start(async move {
153            let mut writer = Writer {
154                mode: Mode::WriteZero,
155                bytes: Default::default(),
156            };
157            let buf_a = vec![1, 2, 3, 4, 5];
158            let res = writer.write_all_owned(buf_a).await;
159            assert!(res.is_err());
160
161            let mut writer = Writer {
162                mode: Mode::WriteZero,
163                bytes: Default::default(),
164            };
165            let buf_a = vec![1, 2, 3, 4, 5];
166            let buf_b = vec![6, 7, 8, 9, 10];
167            let res = writer
168                .writev_all_owned(PieceList::single(buf_a).followed_by(buf_b))
169                .await;
170            assert!(res.is_err());
171
172            let mut writer = Writer {
173                mode: Mode::WritePartial,
174                bytes: Default::default(),
175            };
176            let buf_a = vec![1, 2, 3, 4, 5];
177            writer.write_all_owned(buf_a).await.unwrap();
178            assert_eq!(&writer.bytes.borrow()[..], &[1, 2, 3, 4, 5]);
179
180            let mut writer = Writer {
181                mode: Mode::WritePartial,
182                bytes: Default::default(),
183            };
184            let buf_a = vec![1, 2, 3, 4, 5];
185            let buf_b = vec![6, 7, 8, 9, 10];
186            writer
187                .writev_all_owned(PieceList::single(buf_a).followed_by(buf_b))
188                .await
189                .unwrap();
190            assert_eq!(&writer.bytes.borrow()[..], &[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
191        });
192    }
193}
194
195pub trait IntoHalves {
196    type Read: ReadOwned;
197    type Write: WriteOwned;
198
199    /// Split this into an owned read half and an owned write half.
200    fn into_halves(self) -> (Self::Read, Self::Write);
201}