vortex_io/
write.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use std::future::{Future, ready};
5use std::io::{self, Cursor, Write};
6
7use futures::{AsyncWrite, AsyncWriteExt};
8use vortex_buffer::ByteBufferMut;
9
10use crate::IoBuf;
11
12pub trait VortexWrite {
13    fn write_all<B: IoBuf>(&mut self, buffer: B) -> impl Future<Output = io::Result<B>>;
14    fn flush(&mut self) -> impl Future<Output = io::Result<()>>;
15    fn shutdown(&mut self) -> impl Future<Output = io::Result<()>>;
16}
17
18impl VortexWrite for Vec<u8> {
19    fn write_all<B: IoBuf>(&mut self, buffer: B) -> impl Future<Output = io::Result<B>> {
20        self.extend_from_slice(buffer.as_slice());
21        ready(Ok(buffer))
22    }
23
24    fn flush(&mut self) -> impl Future<Output = io::Result<()>> {
25        ready(Ok(()))
26    }
27
28    fn shutdown(&mut self) -> impl Future<Output = io::Result<()>> {
29        ready(Ok(()))
30    }
31}
32
33impl VortexWrite for ByteBufferMut {
34    fn write_all<B: IoBuf>(&mut self, buffer: B) -> impl Future<Output = io::Result<B>> {
35        self.extend_from_slice(buffer.as_slice());
36        ready(Ok(buffer))
37    }
38
39    fn flush(&mut self) -> impl Future<Output = io::Result<()>> {
40        ready(Ok(()))
41    }
42
43    fn shutdown(&mut self) -> impl Future<Output = io::Result<()>> {
44        ready(Ok(()))
45    }
46}
47
48impl<T> VortexWrite for Cursor<T>
49where
50    Cursor<T>: Write,
51{
52    fn write_all<B: IoBuf>(&mut self, buffer: B) -> impl Future<Output = io::Result<B>> {
53        ready(Write::write_all(self, buffer.as_slice()).map(|_| buffer))
54    }
55
56    fn flush(&mut self) -> impl Future<Output = io::Result<()>> {
57        ready(Write::flush(self))
58    }
59
60    fn shutdown(&mut self) -> impl Future<Output = io::Result<()>> {
61        ready(Ok(()))
62    }
63}
64
65impl<W: VortexWrite> VortexWrite for futures::io::Cursor<W> {
66    fn write_all<B: IoBuf>(&mut self, buffer: B) -> impl Future<Output = io::Result<B>> {
67        self.set_position(self.position() + buffer.as_slice().len() as u64);
68        VortexWrite::write_all(self.get_mut(), buffer)
69    }
70
71    fn flush(&mut self) -> impl Future<Output = io::Result<()>> {
72        VortexWrite::flush(self.get_mut())
73    }
74
75    fn shutdown(&mut self) -> impl Future<Output = io::Result<()>> {
76        VortexWrite::shutdown(self.get_mut())
77    }
78}
79
80impl<W: VortexWrite> VortexWrite for &mut W {
81    fn write_all<B: IoBuf>(&mut self, buffer: B) -> impl Future<Output = io::Result<B>> {
82        (*self).write_all(buffer)
83    }
84
85    fn flush(&mut self) -> impl Future<Output = io::Result<()>> {
86        (*self).flush()
87    }
88
89    fn shutdown(&mut self) -> impl Future<Output = io::Result<()>> {
90        (*self).shutdown()
91    }
92}
93
94impl VortexWrite for async_fs::File {
95    async fn write_all<B: IoBuf>(&mut self, buffer: B) -> io::Result<B> {
96        AsyncWriteExt::write_all(self, buffer.as_slice()).await?;
97        Ok(buffer)
98    }
99
100    fn flush(&mut self) -> impl Future<Output = io::Result<()>> {
101        AsyncWriteExt::flush(self)
102    }
103
104    fn shutdown(&mut self) -> impl Future<Output = io::Result<()>> {
105        AsyncWriteExt::close(self)
106    }
107}
108
109/// An adapter to use an `AsyncWrite` as a `VortexWrite`.
110pub struct AsyncWriteAdapter<W: AsyncWrite>(pub W);
111impl<W: AsyncWrite + Unpin> VortexWrite for AsyncWriteAdapter<W> {
112    async fn write_all<B: IoBuf>(&mut self, buffer: B) -> io::Result<B> {
113        self.0.write_all(buffer.as_slice()).await?;
114        Ok(buffer)
115    }
116
117    async fn flush(&mut self) -> io::Result<()> {
118        self.0.flush().await
119    }
120
121    async fn shutdown(&mut self) -> io::Result<()> {
122        self.0.close().await
123    }
124}
125
126#[cfg(test)]
127mod tests {
128    use rstest::rstest;
129    use vortex_buffer::ByteBufferMut;
130
131    use super::*;
132
133    #[rstest]
134    #[case::single_write(vec![vec![1, 2, 3]], vec![1, 2, 3])]
135    #[case::two_writes(vec![vec![1, 2], vec![3, 4]], vec![1, 2, 3, 4])]
136    #[case::three_writes(vec![vec![1], vec![2], vec![3]], vec![1, 2, 3])]
137    #[case::with_empty(vec![vec![1, 2], vec![], vec![3, 4]], vec![1, 2, 3, 4])]
138    #[tokio::test]
139    async fn test_vec_multiple_writes(#[case] writes: Vec<Vec<u8>>, #[case] expected: Vec<u8>) {
140        let mut writer = Vec::new();
141
142        for data in writes {
143            VortexWrite::write_all(&mut writer, data).await.unwrap();
144        }
145
146        VortexWrite::flush(&mut writer).await.unwrap();
147        VortexWrite::shutdown(&mut writer).await.unwrap();
148        assert_eq!(writer, expected);
149    }
150
151    #[rstest]
152    #[case::single_write(vec![vec![5, 6, 7]], vec![5, 6, 7])]
153    #[case::two_writes(vec![vec![10], vec![20]], vec![10, 20])]
154    #[case::multiple_small(vec![vec![1], vec![2], vec![3], vec![4]], vec![1, 2, 3, 4])]
155    #[tokio::test]
156    async fn test_byte_buffer_mut_operations(
157        #[case] writes: Vec<Vec<u8>>,
158        #[case] expected: Vec<u8>,
159    ) {
160        let mut buffer = ByteBufferMut::with_capacity(0);
161
162        for data in writes {
163            VortexWrite::write_all(&mut buffer, data).await.unwrap();
164        }
165
166        VortexWrite::flush(&mut buffer).await.unwrap();
167        VortexWrite::shutdown(&mut buffer).await.unwrap();
168        assert_eq!(buffer.as_ref(), &expected[..]);
169    }
170
171    #[rstest]
172    #[case::empty(vec![], 0)]
173    #[case::single_byte(vec![42], 1)]
174    #[case::multiple_bytes(vec![1, 2, 3, 4, 5], 5)]
175    #[case::large(vec![0; 1024], 1024)]
176    #[tokio::test]
177    async fn test_various_write_sizes(#[case] data: Vec<u8>, #[case] expected_len: usize) {
178        let mut writer = Vec::new();
179        VortexWrite::write_all(&mut writer, data.clone())
180            .await
181            .unwrap();
182        assert_eq!(writer.len(), expected_len);
183        assert_eq!(writer, data);
184    }
185
186    #[tokio::test]
187    async fn test_cursor_operations() {
188        let mut data = [0u8; 20];
189        {
190            let mut cursor = Cursor::new(&mut data[..]);
191
192            // Write to cursor
193            VortexWrite::write_all(&mut cursor, vec![1, 2, 3, 4, 5])
194                .await
195                .unwrap();
196            assert_eq!(cursor.position(), 5);
197
198            // Write more data
199            VortexWrite::write_all(&mut cursor, vec![6, 7, 8, 9, 10])
200                .await
201                .unwrap();
202            assert_eq!(cursor.position(), 10);
203
204            // Test flush
205            VortexWrite::flush(&mut cursor).await.unwrap();
206        }
207
208        // Check data after cursor is dropped
209        assert_eq!(&data[..10], &[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
210    }
211
212    #[tokio::test]
213    async fn test_futures_cursor() {
214        let mut vec = Vec::new();
215        {
216            let mut cursor = futures::io::Cursor::new(&mut vec);
217
218            // Test write operations
219            VortexWrite::write_all(&mut cursor, vec![10, 20, 30])
220                .await
221                .unwrap();
222            assert_eq!(cursor.position(), 3);
223
224            VortexWrite::write_all(&mut cursor, vec![40, 50])
225                .await
226                .unwrap();
227            assert_eq!(cursor.position(), 5);
228
229            // Test flush and shutdown
230            VortexWrite::flush(&mut cursor).await.unwrap();
231            VortexWrite::shutdown(&mut cursor).await.unwrap();
232        }
233
234        assert_eq!(vec, vec![10, 20, 30, 40, 50]);
235    }
236
237    #[tokio::test]
238    async fn test_mutable_reference() {
239        let mut vec = Vec::new();
240        {
241            let mut writer_ref = &mut vec;
242
243            // Test operations through mutable reference
244            VortexWrite::write_all(&mut writer_ref, vec![100, 101, 102])
245                .await
246                .unwrap();
247
248            VortexWrite::flush(&mut writer_ref).await.unwrap();
249            VortexWrite::shutdown(&mut writer_ref).await.unwrap();
250        }
251
252        assert_eq!(vec, vec![100, 101, 102]);
253    }
254
255    #[tokio::test]
256    async fn test_large_writes() {
257        let mut writer = Vec::new();
258        let large_data = vec![42u8; 100_000];
259
260        VortexWrite::write_all(&mut writer, large_data.clone())
261            .await
262            .unwrap();
263        assert_eq!(writer.len(), 100_000);
264        assert!(writer.iter().all(|&b| b == 42));
265    }
266
267    #[tokio::test]
268    async fn test_empty_writes() {
269        let mut writer = Vec::new();
270        let empty = vec![];
271
272        VortexWrite::write_all(&mut writer, empty.clone())
273            .await
274            .unwrap();
275        assert!(writer.is_empty());
276
277        VortexWrite::write_all(&mut writer, vec![1, 2, 3])
278            .await
279            .unwrap();
280        VortexWrite::write_all(&mut writer, empty).await.unwrap();
281        assert_eq!(writer, vec![1, 2, 3]);
282    }
283
284    #[tokio::test]
285    async fn test_sequential_accumulation() {
286        let mut buffer = ByteBufferMut::with_capacity(0);
287
288        for i in 0u8..5 {
289            VortexWrite::write_all(&mut buffer, vec![i]).await.unwrap();
290        }
291
292        assert_eq!(buffer.len(), 5);
293        assert_eq!(buffer.as_ref(), &[0, 1, 2, 3, 4]);
294    }
295
296    #[rstest]
297    #[case::vec_writer(0)]
298    #[case::byte_buffer(1)]
299    #[tokio::test]
300    async fn test_writer_types(#[case] writer_type: usize) {
301        let data = vec![10, 20, 30];
302
303        match writer_type {
304            0 => {
305                let mut writer = Vec::new();
306                VortexWrite::write_all(&mut writer, data.clone())
307                    .await
308                    .unwrap();
309                assert_eq!(writer, data);
310            }
311            1 => {
312                let mut writer = ByteBufferMut::with_capacity(0);
313                VortexWrite::write_all(&mut writer, data.clone())
314                    .await
315                    .unwrap();
316                assert_eq!(writer.as_ref(), &data[..]);
317            }
318            _ => unreachable!(),
319        }
320    }
321}