vortex_io/
write.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use std::future::Future;
5use std::future::ready;
6use std::io::Cursor;
7use std::io::Write;
8use std::io::{self};
9
10use futures::AsyncWrite;
11use futures::AsyncWriteExt;
12use vortex_buffer::ByteBufferMut;
13
14use crate::IoBuf;
15
16pub trait VortexWrite {
17    fn write_all<B: IoBuf>(&mut self, buffer: B) -> impl Future<Output = io::Result<B>>;
18    fn flush(&mut self) -> impl Future<Output = io::Result<()>>;
19    fn shutdown(&mut self) -> impl Future<Output = io::Result<()>>;
20}
21
22impl VortexWrite for Vec<u8> {
23    fn write_all<B: IoBuf>(&mut self, buffer: B) -> impl Future<Output = io::Result<B>> {
24        self.extend_from_slice(buffer.as_slice());
25        ready(Ok(buffer))
26    }
27
28    fn flush(&mut self) -> impl Future<Output = io::Result<()>> {
29        ready(Ok(()))
30    }
31
32    fn shutdown(&mut self) -> impl Future<Output = io::Result<()>> {
33        ready(Ok(()))
34    }
35}
36
37impl VortexWrite for ByteBufferMut {
38    fn write_all<B: IoBuf>(&mut self, buffer: B) -> impl Future<Output = io::Result<B>> {
39        self.extend_from_slice(buffer.as_slice());
40        ready(Ok(buffer))
41    }
42
43    fn flush(&mut self) -> impl Future<Output = io::Result<()>> {
44        ready(Ok(()))
45    }
46
47    fn shutdown(&mut self) -> impl Future<Output = io::Result<()>> {
48        ready(Ok(()))
49    }
50}
51
52impl<T> VortexWrite for Cursor<T>
53where
54    Cursor<T>: Write,
55{
56    fn write_all<B: IoBuf>(&mut self, buffer: B) -> impl Future<Output = io::Result<B>> {
57        ready(Write::write_all(self, buffer.as_slice()).map(|_| buffer))
58    }
59
60    fn flush(&mut self) -> impl Future<Output = io::Result<()>> {
61        ready(Write::flush(self))
62    }
63
64    fn shutdown(&mut self) -> impl Future<Output = io::Result<()>> {
65        ready(Ok(()))
66    }
67}
68
69impl<W: VortexWrite> VortexWrite for futures::io::Cursor<W> {
70    fn write_all<B: IoBuf>(&mut self, buffer: B) -> impl Future<Output = io::Result<B>> {
71        self.set_position(self.position() + buffer.as_slice().len() as u64);
72        VortexWrite::write_all(self.get_mut(), buffer)
73    }
74
75    fn flush(&mut self) -> impl Future<Output = io::Result<()>> {
76        VortexWrite::flush(self.get_mut())
77    }
78
79    fn shutdown(&mut self) -> impl Future<Output = io::Result<()>> {
80        VortexWrite::shutdown(self.get_mut())
81    }
82}
83
84impl<W: VortexWrite> VortexWrite for &mut W {
85    fn write_all<B: IoBuf>(&mut self, buffer: B) -> impl Future<Output = io::Result<B>> {
86        (*self).write_all(buffer)
87    }
88
89    fn flush(&mut self) -> impl Future<Output = io::Result<()>> {
90        (*self).flush()
91    }
92
93    fn shutdown(&mut self) -> impl Future<Output = io::Result<()>> {
94        (*self).shutdown()
95    }
96}
97
98impl VortexWrite for async_fs::File {
99    async fn write_all<B: IoBuf>(&mut self, buffer: B) -> io::Result<B> {
100        AsyncWriteExt::write_all(self, buffer.as_slice()).await?;
101        Ok(buffer)
102    }
103
104    fn flush(&mut self) -> impl Future<Output = io::Result<()>> {
105        AsyncWriteExt::flush(self)
106    }
107
108    fn shutdown(&mut self) -> impl Future<Output = io::Result<()>> {
109        AsyncWriteExt::close(self)
110    }
111}
112
113/// An adapter to use an `AsyncWrite` as a `VortexWrite`.
114pub struct AsyncWriteAdapter<W: AsyncWrite>(pub W);
115impl<W: AsyncWrite + Unpin> VortexWrite for AsyncWriteAdapter<W> {
116    async fn write_all<B: IoBuf>(&mut self, buffer: B) -> io::Result<B> {
117        self.0.write_all(buffer.as_slice()).await?;
118        Ok(buffer)
119    }
120
121    async fn flush(&mut self) -> io::Result<()> {
122        self.0.flush().await
123    }
124
125    async fn shutdown(&mut self) -> io::Result<()> {
126        self.0.close().await
127    }
128}
129
130#[cfg(test)]
131mod tests {
132    use rstest::rstest;
133    use vortex_buffer::ByteBufferMut;
134
135    use super::*;
136
137    #[rstest]
138    #[case::single_write(vec![vec![1, 2, 3]], vec![1, 2, 3])]
139    #[case::two_writes(vec![vec![1, 2], vec![3, 4]], vec![1, 2, 3, 4])]
140    #[case::three_writes(vec![vec![1], vec![2], vec![3]], vec![1, 2, 3])]
141    #[case::with_empty(vec![vec![1, 2], vec![], vec![3, 4]], vec![1, 2, 3, 4])]
142    #[tokio::test]
143    async fn test_vec_multiple_writes(#[case] writes: Vec<Vec<u8>>, #[case] expected: Vec<u8>) {
144        let mut writer = Vec::new();
145
146        for data in writes {
147            VortexWrite::write_all(&mut writer, data).await.unwrap();
148        }
149
150        VortexWrite::flush(&mut writer).await.unwrap();
151        VortexWrite::shutdown(&mut writer).await.unwrap();
152        assert_eq!(writer, expected);
153    }
154
155    #[rstest]
156    #[case::single_write(vec![vec![5, 6, 7]], vec![5, 6, 7])]
157    #[case::two_writes(vec![vec![10], vec![20]], vec![10, 20])]
158    #[case::multiple_small(vec![vec![1], vec![2], vec![3], vec![4]], vec![1, 2, 3, 4])]
159    #[tokio::test]
160    async fn test_byte_buffer_mut_operations(
161        #[case] writes: Vec<Vec<u8>>,
162        #[case] expected: Vec<u8>,
163    ) {
164        let mut buffer = ByteBufferMut::with_capacity(0);
165
166        for data in writes {
167            VortexWrite::write_all(&mut buffer, data).await.unwrap();
168        }
169
170        VortexWrite::flush(&mut buffer).await.unwrap();
171        VortexWrite::shutdown(&mut buffer).await.unwrap();
172        assert_eq!(buffer.as_ref(), &expected[..]);
173    }
174
175    #[rstest]
176    #[case::empty(vec![], 0)]
177    #[case::single_byte(vec![42], 1)]
178    #[case::multiple_bytes(vec![1, 2, 3, 4, 5], 5)]
179    #[case::large(vec![0; 1024], 1024)]
180    #[tokio::test]
181    async fn test_various_write_sizes(#[case] data: Vec<u8>, #[case] expected_len: usize) {
182        let mut writer = Vec::new();
183        VortexWrite::write_all(&mut writer, data.clone())
184            .await
185            .unwrap();
186        assert_eq!(writer.len(), expected_len);
187        assert_eq!(writer, data);
188    }
189
190    #[tokio::test]
191    async fn test_cursor_operations() {
192        let mut data = [0u8; 20];
193        {
194            let mut cursor = Cursor::new(&mut data[..]);
195
196            // Write to cursor
197            VortexWrite::write_all(&mut cursor, vec![1, 2, 3, 4, 5])
198                .await
199                .unwrap();
200            assert_eq!(cursor.position(), 5);
201
202            // Write more data
203            VortexWrite::write_all(&mut cursor, vec![6, 7, 8, 9, 10])
204                .await
205                .unwrap();
206            assert_eq!(cursor.position(), 10);
207
208            // Test flush
209            VortexWrite::flush(&mut cursor).await.unwrap();
210        }
211
212        // Check data after cursor is dropped
213        assert_eq!(&data[..10], &[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
214    }
215
216    #[tokio::test]
217    async fn test_futures_cursor() {
218        let mut vec = Vec::new();
219        {
220            let mut cursor = futures::io::Cursor::new(&mut vec);
221
222            // Test write operations
223            VortexWrite::write_all(&mut cursor, vec![10, 20, 30])
224                .await
225                .unwrap();
226            assert_eq!(cursor.position(), 3);
227
228            VortexWrite::write_all(&mut cursor, vec![40, 50])
229                .await
230                .unwrap();
231            assert_eq!(cursor.position(), 5);
232
233            // Test flush and shutdown
234            VortexWrite::flush(&mut cursor).await.unwrap();
235            VortexWrite::shutdown(&mut cursor).await.unwrap();
236        }
237
238        assert_eq!(vec, vec![10, 20, 30, 40, 50]);
239    }
240
241    #[tokio::test]
242    async fn test_mutable_reference() {
243        let mut vec = Vec::new();
244        {
245            let mut writer_ref = &mut vec;
246
247            // Test operations through mutable reference
248            VortexWrite::write_all(&mut writer_ref, vec![100, 101, 102])
249                .await
250                .unwrap();
251
252            VortexWrite::flush(&mut writer_ref).await.unwrap();
253            VortexWrite::shutdown(&mut writer_ref).await.unwrap();
254        }
255
256        assert_eq!(vec, vec![100, 101, 102]);
257    }
258
259    #[tokio::test]
260    async fn test_large_writes() {
261        let mut writer = Vec::new();
262        let large_data = vec![42u8; 100_000];
263
264        VortexWrite::write_all(&mut writer, large_data.clone())
265            .await
266            .unwrap();
267        assert_eq!(writer.len(), 100_000);
268        assert!(writer.iter().all(|&b| b == 42));
269    }
270
271    #[tokio::test]
272    async fn test_empty_writes() {
273        let mut writer = Vec::new();
274        let empty = vec![];
275
276        VortexWrite::write_all(&mut writer, empty.clone())
277            .await
278            .unwrap();
279        assert!(writer.is_empty());
280
281        VortexWrite::write_all(&mut writer, vec![1, 2, 3])
282            .await
283            .unwrap();
284        VortexWrite::write_all(&mut writer, empty).await.unwrap();
285        assert_eq!(writer, vec![1, 2, 3]);
286    }
287
288    #[tokio::test]
289    async fn test_sequential_accumulation() {
290        let mut buffer = ByteBufferMut::with_capacity(0);
291
292        for i in 0u8..5 {
293            VortexWrite::write_all(&mut buffer, vec![i]).await.unwrap();
294        }
295
296        assert_eq!(buffer.len(), 5);
297        assert_eq!(buffer.as_ref(), &[0, 1, 2, 3, 4]);
298    }
299
300    #[rstest]
301    #[case::vec_writer(0)]
302    #[case::byte_buffer(1)]
303    #[tokio::test]
304    async fn test_writer_types(#[case] writer_type: usize) {
305        let data = vec![10, 20, 30];
306
307        match writer_type {
308            0 => {
309                let mut writer = Vec::new();
310                VortexWrite::write_all(&mut writer, data.clone())
311                    .await
312                    .unwrap();
313                assert_eq!(writer, data);
314            }
315            1 => {
316                let mut writer = ByteBufferMut::with_capacity(0);
317                VortexWrite::write_all(&mut writer, data.clone())
318                    .await
319                    .unwrap();
320                assert_eq!(writer.as_ref(), &data[..]);
321            }
322            _ => unreachable!(),
323        }
324    }
325}