1use std::future::{Future, ready};
5use std::io::{self, Cursor, Write};
6
7use futures::{AsyncWrite, AsyncWriteExt};
8use vortex_buffer::ByteBufferMut;
9
10use crate::IoBuf;
11
12pub trait VortexWrite {
13 fn write_all<B: IoBuf>(&mut self, buffer: B) -> impl Future<Output = io::Result<B>>;
14 fn flush(&mut self) -> impl Future<Output = io::Result<()>>;
15 fn shutdown(&mut self) -> impl Future<Output = io::Result<()>>;
16}
17
18impl VortexWrite for Vec<u8> {
19 fn write_all<B: IoBuf>(&mut self, buffer: B) -> impl Future<Output = io::Result<B>> {
20 self.extend_from_slice(buffer.as_slice());
21 ready(Ok(buffer))
22 }
23
24 fn flush(&mut self) -> impl Future<Output = io::Result<()>> {
25 ready(Ok(()))
26 }
27
28 fn shutdown(&mut self) -> impl Future<Output = io::Result<()>> {
29 ready(Ok(()))
30 }
31}
32
33impl VortexWrite for ByteBufferMut {
34 fn write_all<B: IoBuf>(&mut self, buffer: B) -> impl Future<Output = io::Result<B>> {
35 self.extend_from_slice(buffer.as_slice());
36 ready(Ok(buffer))
37 }
38
39 fn flush(&mut self) -> impl Future<Output = io::Result<()>> {
40 ready(Ok(()))
41 }
42
43 fn shutdown(&mut self) -> impl Future<Output = io::Result<()>> {
44 ready(Ok(()))
45 }
46}
47
48impl<T> VortexWrite for Cursor<T>
49where
50 Cursor<T>: Write,
51{
52 fn write_all<B: IoBuf>(&mut self, buffer: B) -> impl Future<Output = io::Result<B>> {
53 ready(Write::write_all(self, buffer.as_slice()).map(|_| buffer))
54 }
55
56 fn flush(&mut self) -> impl Future<Output = io::Result<()>> {
57 ready(Write::flush(self))
58 }
59
60 fn shutdown(&mut self) -> impl Future<Output = io::Result<()>> {
61 ready(Ok(()))
62 }
63}
64
65impl<W: VortexWrite> VortexWrite for futures::io::Cursor<W> {
66 fn write_all<B: IoBuf>(&mut self, buffer: B) -> impl Future<Output = io::Result<B>> {
67 self.set_position(self.position() + buffer.as_slice().len() as u64);
68 VortexWrite::write_all(self.get_mut(), buffer)
69 }
70
71 fn flush(&mut self) -> impl Future<Output = io::Result<()>> {
72 VortexWrite::flush(self.get_mut())
73 }
74
75 fn shutdown(&mut self) -> impl Future<Output = io::Result<()>> {
76 VortexWrite::shutdown(self.get_mut())
77 }
78}
79
80impl<W: VortexWrite> VortexWrite for &mut W {
81 fn write_all<B: IoBuf>(&mut self, buffer: B) -> impl Future<Output = io::Result<B>> {
82 (*self).write_all(buffer)
83 }
84
85 fn flush(&mut self) -> impl Future<Output = io::Result<()>> {
86 (*self).flush()
87 }
88
89 fn shutdown(&mut self) -> impl Future<Output = io::Result<()>> {
90 (*self).shutdown()
91 }
92}
93
94impl VortexWrite for async_fs::File {
95 async fn write_all<B: IoBuf>(&mut self, buffer: B) -> io::Result<B> {
96 AsyncWriteExt::write_all(self, buffer.as_slice()).await?;
97 Ok(buffer)
98 }
99
100 fn flush(&mut self) -> impl Future<Output = io::Result<()>> {
101 AsyncWriteExt::flush(self)
102 }
103
104 fn shutdown(&mut self) -> impl Future<Output = io::Result<()>> {
105 AsyncWriteExt::close(self)
106 }
107}
108
109pub struct AsyncWriteAdapter<W: AsyncWrite>(pub W);
111impl<W: AsyncWrite + Unpin> VortexWrite for AsyncWriteAdapter<W> {
112 async fn write_all<B: IoBuf>(&mut self, buffer: B) -> io::Result<B> {
113 self.0.write_all(buffer.as_slice()).await?;
114 Ok(buffer)
115 }
116
117 async fn flush(&mut self) -> io::Result<()> {
118 self.0.flush().await
119 }
120
121 async fn shutdown(&mut self) -> io::Result<()> {
122 self.0.close().await
123 }
124}
125
126#[cfg(test)]
127mod tests {
128 use rstest::rstest;
129 use vortex_buffer::ByteBufferMut;
130
131 use super::*;
132
133 #[rstest]
134 #[case::single_write(vec![vec![1, 2, 3]], vec![1, 2, 3])]
135 #[case::two_writes(vec![vec![1, 2], vec![3, 4]], vec![1, 2, 3, 4])]
136 #[case::three_writes(vec![vec![1], vec![2], vec![3]], vec![1, 2, 3])]
137 #[case::with_empty(vec![vec![1, 2], vec![], vec![3, 4]], vec![1, 2, 3, 4])]
138 #[tokio::test]
139 async fn test_vec_multiple_writes(#[case] writes: Vec<Vec<u8>>, #[case] expected: Vec<u8>) {
140 let mut writer = Vec::new();
141
142 for data in writes {
143 VortexWrite::write_all(&mut writer, data).await.unwrap();
144 }
145
146 VortexWrite::flush(&mut writer).await.unwrap();
147 VortexWrite::shutdown(&mut writer).await.unwrap();
148 assert_eq!(writer, expected);
149 }
150
151 #[rstest]
152 #[case::single_write(vec![vec![5, 6, 7]], vec![5, 6, 7])]
153 #[case::two_writes(vec![vec![10], vec![20]], vec![10, 20])]
154 #[case::multiple_small(vec![vec![1], vec![2], vec![3], vec![4]], vec![1, 2, 3, 4])]
155 #[tokio::test]
156 async fn test_byte_buffer_mut_operations(
157 #[case] writes: Vec<Vec<u8>>,
158 #[case] expected: Vec<u8>,
159 ) {
160 let mut buffer = ByteBufferMut::with_capacity(0);
161
162 for data in writes {
163 VortexWrite::write_all(&mut buffer, data).await.unwrap();
164 }
165
166 VortexWrite::flush(&mut buffer).await.unwrap();
167 VortexWrite::shutdown(&mut buffer).await.unwrap();
168 assert_eq!(buffer.as_ref(), &expected[..]);
169 }
170
171 #[rstest]
172 #[case::empty(vec![], 0)]
173 #[case::single_byte(vec![42], 1)]
174 #[case::multiple_bytes(vec![1, 2, 3, 4, 5], 5)]
175 #[case::large(vec![0; 1024], 1024)]
176 #[tokio::test]
177 async fn test_various_write_sizes(#[case] data: Vec<u8>, #[case] expected_len: usize) {
178 let mut writer = Vec::new();
179 VortexWrite::write_all(&mut writer, data.clone())
180 .await
181 .unwrap();
182 assert_eq!(writer.len(), expected_len);
183 assert_eq!(writer, data);
184 }
185
186 #[tokio::test]
187 async fn test_cursor_operations() {
188 let mut data = [0u8; 20];
189 {
190 let mut cursor = Cursor::new(&mut data[..]);
191
192 VortexWrite::write_all(&mut cursor, vec![1, 2, 3, 4, 5])
194 .await
195 .unwrap();
196 assert_eq!(cursor.position(), 5);
197
198 VortexWrite::write_all(&mut cursor, vec![6, 7, 8, 9, 10])
200 .await
201 .unwrap();
202 assert_eq!(cursor.position(), 10);
203
204 VortexWrite::flush(&mut cursor).await.unwrap();
206 }
207
208 assert_eq!(&data[..10], &[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
210 }
211
212 #[tokio::test]
213 async fn test_futures_cursor() {
214 let mut vec = Vec::new();
215 {
216 let mut cursor = futures::io::Cursor::new(&mut vec);
217
218 VortexWrite::write_all(&mut cursor, vec![10, 20, 30])
220 .await
221 .unwrap();
222 assert_eq!(cursor.position(), 3);
223
224 VortexWrite::write_all(&mut cursor, vec![40, 50])
225 .await
226 .unwrap();
227 assert_eq!(cursor.position(), 5);
228
229 VortexWrite::flush(&mut cursor).await.unwrap();
231 VortexWrite::shutdown(&mut cursor).await.unwrap();
232 }
233
234 assert_eq!(vec, vec![10, 20, 30, 40, 50]);
235 }
236
237 #[tokio::test]
238 async fn test_mutable_reference() {
239 let mut vec = Vec::new();
240 {
241 let mut writer_ref = &mut vec;
242
243 VortexWrite::write_all(&mut writer_ref, vec![100, 101, 102])
245 .await
246 .unwrap();
247
248 VortexWrite::flush(&mut writer_ref).await.unwrap();
249 VortexWrite::shutdown(&mut writer_ref).await.unwrap();
250 }
251
252 assert_eq!(vec, vec![100, 101, 102]);
253 }
254
255 #[tokio::test]
256 async fn test_large_writes() {
257 let mut writer = Vec::new();
258 let large_data = vec![42u8; 100_000];
259
260 VortexWrite::write_all(&mut writer, large_data.clone())
261 .await
262 .unwrap();
263 assert_eq!(writer.len(), 100_000);
264 assert!(writer.iter().all(|&b| b == 42));
265 }
266
267 #[tokio::test]
268 async fn test_empty_writes() {
269 let mut writer = Vec::new();
270 let empty = vec![];
271
272 VortexWrite::write_all(&mut writer, empty.clone())
273 .await
274 .unwrap();
275 assert!(writer.is_empty());
276
277 VortexWrite::write_all(&mut writer, vec![1, 2, 3])
278 .await
279 .unwrap();
280 VortexWrite::write_all(&mut writer, empty).await.unwrap();
281 assert_eq!(writer, vec![1, 2, 3]);
282 }
283
284 #[tokio::test]
285 async fn test_sequential_accumulation() {
286 let mut buffer = ByteBufferMut::with_capacity(0);
287
288 for i in 0u8..5 {
289 VortexWrite::write_all(&mut buffer, vec![i]).await.unwrap();
290 }
291
292 assert_eq!(buffer.len(), 5);
293 assert_eq!(buffer.as_ref(), &[0, 1, 2, 3, 4]);
294 }
295
296 #[rstest]
297 #[case::vec_writer(0)]
298 #[case::byte_buffer(1)]
299 #[tokio::test]
300 async fn test_writer_types(#[case] writer_type: usize) {
301 let data = vec![10, 20, 30];
302
303 match writer_type {
304 0 => {
305 let mut writer = Vec::new();
306 VortexWrite::write_all(&mut writer, data.clone())
307 .await
308 .unwrap();
309 assert_eq!(writer, data);
310 }
311 1 => {
312 let mut writer = ByteBufferMut::with_capacity(0);
313 VortexWrite::write_all(&mut writer, data.clone())
314 .await
315 .unwrap();
316 assert_eq!(writer.as_ref(), &data[..]);
317 }
318 _ => unreachable!(),
319 }
320 }
321}