fluke_maybe_uring/
io.rs

1use crate::{
2    buf::{IoBuf, IoBufMut},
3    BufResult,
4};
5
6mod chan;
7pub use chan::*;
8
9mod buf_or_slice;
10use buf_or_slice::*;
11
12mod non_uring;
13pub use non_uring::*;
14
15pub trait ReadOwned {
16    async fn read<B: IoBufMut>(&mut self, buf: B) -> BufResult<usize, B>;
17}
18
19pub trait WriteOwned {
20    /// Write a single buffer, taking ownership for the duration of the write.
21    /// Might perform a partial write, see [WriteOwned::write_all]
22    async fn write<B: IoBuf>(&mut self, buf: B) -> BufResult<usize, B>;
23
24    /// Write a single buffer, re-trying the write if the kernel does a partial write.
25    async fn write_all<B: IoBuf>(&mut self, mut buf: B) -> std::io::Result<()> {
26        let mut written = 0;
27        let len = buf.bytes_init();
28        while written < len {
29            let (res, slice) = self.write(buf.slice(written..len)).await;
30            buf = slice.into_inner();
31            let n = res?;
32            if n == 0 {
33                return Err(std::io::Error::new(
34                    std::io::ErrorKind::WriteZero,
35                    "write zero",
36                ));
37            }
38            written += n;
39        }
40        Ok(())
41    }
42
43    /// Write a list of buffers, taking ownership for the duration of the write.
44    /// Might perform a partial write, see [WriteOwned::writev_all]
45    async fn writev<B: IoBuf>(&mut self, list: Vec<B>) -> BufResult<usize, Vec<B>> {
46        let mut out_list = Vec::with_capacity(list.len());
47        let mut list = list.into_iter();
48        let mut total = 0;
49
50        while let Some(buf) = list.next() {
51            let buf_len = buf.bytes_init();
52            let (res, buf) = self.write(buf).await;
53            out_list.push(buf);
54
55            match res {
56                Ok(0) => {
57                    out_list.extend(list);
58                    return (
59                        Err(std::io::Error::new(
60                            std::io::ErrorKind::WriteZero,
61                            "write zero",
62                        )),
63                        out_list,
64                    );
65                }
66                Ok(n) => {
67                    total += n;
68                    if n < buf_len {
69                        // partial write, return the buffer list so the caller
70                        // might choose to try the write again
71                        out_list.extend(list);
72                        return (Ok(total), out_list);
73                    }
74                }
75                Err(e) => {
76                    out_list.extend(list);
77                    return (Err(e), out_list);
78                }
79            }
80        }
81
82        (Ok(total), out_list)
83    }
84
85    /// Write a list of buffers, re-trying the write if the kernel does a partial write.
86    async fn writev_all<B: IoBuf>(&mut self, list: impl Into<Vec<B>>) -> std::io::Result<()> {
87        // FIXME: converting into a `Vec` and _then_ into an iterator is silly,
88        // we can probably find a better function signature here.
89        let mut list: Vec<_> = list.into().into_iter().map(BufOrSlice::Buf).collect();
90
91        while !list.is_empty() {
92            let res;
93            (res, list) = self.writev(list).await;
94            let n = res?;
95
96            if n == 0 {
97                return Err(std::io::Error::new(
98                    std::io::ErrorKind::WriteZero,
99                    "write zero",
100                ));
101            }
102
103            let mut n = n;
104            list = list
105                .into_iter()
106                .filter_map(|item| {
107                    if n == 0 {
108                        Some(item)
109                    } else {
110                        let item_len = item.len();
111
112                        if n >= item_len {
113                            n -= item_len;
114                            None
115                        } else {
116                            let item = item.consume(n);
117                            n = 0;
118                            Some(item)
119                        }
120                    }
121                })
122                .collect();
123            assert_eq!(n, 0);
124        }
125
126        Ok(())
127    }
128}
129
130#[cfg(all(test, not(feature = "miri")))]
131mod tests {
132    use std::{cell::RefCell, rc::Rc};
133
134    use crate::{buf::IoBuf, io::WriteOwned, BufResult};
135
136    #[test]
137    fn test_write_all() {
138        enum Mode {
139            WriteZero,
140            WritePartial,
141        }
142
143        struct Writer {
144            mode: Mode,
145            bytes: Rc<RefCell<Vec<u8>>>,
146        }
147
148        impl WriteOwned for Writer {
149            async fn write<B: IoBuf>(&mut self, buf: B) -> BufResult<usize, B> {
150                assert!(buf.bytes_init() > 0, "zero-length writes are forbidden");
151
152                match self.mode {
153                    Mode::WriteZero => (Ok(0), buf),
154                    Mode::WritePartial => {
155                        let n = match buf.bytes_init() {
156                            1 => 1,
157                            _ => buf.bytes_init() / 2,
158                        };
159                        let slice = unsafe { std::slice::from_raw_parts(buf.stable_ptr(), n) };
160                        self.bytes.borrow_mut().extend_from_slice(slice);
161                        (Ok(n), buf)
162                    }
163                }
164            }
165        }
166
167        crate::start(async move {
168            let mut writer = Writer {
169                mode: Mode::WriteZero,
170                bytes: Default::default(),
171            };
172            let buf_a = vec![1, 2, 3, 4, 5];
173            let res = writer.write_all(buf_a).await;
174            assert!(res.is_err());
175
176            let mut writer = Writer {
177                mode: Mode::WriteZero,
178                bytes: Default::default(),
179            };
180            let buf_a = vec![1, 2, 3, 4, 5];
181            let buf_b = vec![6, 7, 8, 9, 10];
182            let res = writer.writev_all(vec![buf_a, buf_b]).await;
183            assert!(res.is_err());
184
185            let mut writer = Writer {
186                mode: Mode::WritePartial,
187                bytes: Default::default(),
188            };
189            let buf_a = vec![1, 2, 3, 4, 5];
190            writer.write_all(buf_a).await.unwrap();
191            assert_eq!(&writer.bytes.borrow()[..], &[1, 2, 3, 4, 5]);
192
193            let mut writer = Writer {
194                mode: Mode::WritePartial,
195                bytes: Default::default(),
196            };
197            let buf_a = vec![1, 2, 3, 4, 5];
198            let buf_b = vec![6, 7, 8, 9, 10];
199            writer.writev_all(vec![buf_a, buf_b]).await.unwrap();
200            assert_eq!(&writer.bytes.borrow()[..], &[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
201        });
202    }
203}
204
205pub trait IntoHalves {
206    type Read: ReadOwned;
207    type Write: WriteOwned;
208
209    /// Split this into an owned read half and an owned write half.
210    fn into_halves(self) -> (Self::Read, Self::Write);
211}