1use std::future::Future;
5use std::future::ready;
6use std::io::Cursor;
7use std::io::Write;
8use std::io::{self};
9
10use futures::AsyncWrite;
11use futures::AsyncWriteExt;
12use vortex_buffer::ByteBufferMut;
13
14use crate::IoBuf;
15
16pub trait VortexWrite {
17 fn write_all<B: IoBuf>(&mut self, buffer: B) -> impl Future<Output = io::Result<B>>;
18 fn flush(&mut self) -> impl Future<Output = io::Result<()>>;
19 fn shutdown(&mut self) -> impl Future<Output = io::Result<()>>;
20}
21
22impl VortexWrite for Vec<u8> {
23 fn write_all<B: IoBuf>(&mut self, buffer: B) -> impl Future<Output = io::Result<B>> {
24 self.extend_from_slice(buffer.as_slice());
25 ready(Ok(buffer))
26 }
27
28 fn flush(&mut self) -> impl Future<Output = io::Result<()>> {
29 ready(Ok(()))
30 }
31
32 fn shutdown(&mut self) -> impl Future<Output = io::Result<()>> {
33 ready(Ok(()))
34 }
35}
36
37impl VortexWrite for ByteBufferMut {
38 fn write_all<B: IoBuf>(&mut self, buffer: B) -> impl Future<Output = io::Result<B>> {
39 self.extend_from_slice(buffer.as_slice());
40 ready(Ok(buffer))
41 }
42
43 fn flush(&mut self) -> impl Future<Output = io::Result<()>> {
44 ready(Ok(()))
45 }
46
47 fn shutdown(&mut self) -> impl Future<Output = io::Result<()>> {
48 ready(Ok(()))
49 }
50}
51
52impl<T> VortexWrite for Cursor<T>
53where
54 Cursor<T>: Write,
55{
56 fn write_all<B: IoBuf>(&mut self, buffer: B) -> impl Future<Output = io::Result<B>> {
57 ready(Write::write_all(self, buffer.as_slice()).map(|_| buffer))
58 }
59
60 fn flush(&mut self) -> impl Future<Output = io::Result<()>> {
61 ready(Write::flush(self))
62 }
63
64 fn shutdown(&mut self) -> impl Future<Output = io::Result<()>> {
65 ready(Ok(()))
66 }
67}
68
69impl<W: VortexWrite> VortexWrite for futures::io::Cursor<W> {
70 fn write_all<B: IoBuf>(&mut self, buffer: B) -> impl Future<Output = io::Result<B>> {
71 self.set_position(self.position() + buffer.as_slice().len() as u64);
72 VortexWrite::write_all(self.get_mut(), buffer)
73 }
74
75 fn flush(&mut self) -> impl Future<Output = io::Result<()>> {
76 VortexWrite::flush(self.get_mut())
77 }
78
79 fn shutdown(&mut self) -> impl Future<Output = io::Result<()>> {
80 VortexWrite::shutdown(self.get_mut())
81 }
82}
83
84impl<W: VortexWrite> VortexWrite for &mut W {
85 fn write_all<B: IoBuf>(&mut self, buffer: B) -> impl Future<Output = io::Result<B>> {
86 (*self).write_all(buffer)
87 }
88
89 fn flush(&mut self) -> impl Future<Output = io::Result<()>> {
90 (*self).flush()
91 }
92
93 fn shutdown(&mut self) -> impl Future<Output = io::Result<()>> {
94 (*self).shutdown()
95 }
96}
97
98impl VortexWrite for async_fs::File {
99 async fn write_all<B: IoBuf>(&mut self, buffer: B) -> io::Result<B> {
100 AsyncWriteExt::write_all(self, buffer.as_slice()).await?;
101 Ok(buffer)
102 }
103
104 fn flush(&mut self) -> impl Future<Output = io::Result<()>> {
105 AsyncWriteExt::flush(self)
106 }
107
108 fn shutdown(&mut self) -> impl Future<Output = io::Result<()>> {
109 AsyncWriteExt::close(self)
110 }
111}
112
113pub struct AsyncWriteAdapter<W: AsyncWrite>(pub W);
115impl<W: AsyncWrite + Unpin> VortexWrite for AsyncWriteAdapter<W> {
116 async fn write_all<B: IoBuf>(&mut self, buffer: B) -> io::Result<B> {
117 self.0.write_all(buffer.as_slice()).await?;
118 Ok(buffer)
119 }
120
121 async fn flush(&mut self) -> io::Result<()> {
122 self.0.flush().await
123 }
124
125 async fn shutdown(&mut self) -> io::Result<()> {
126 self.0.close().await
127 }
128}
129
130#[cfg(test)]
131mod tests {
132 use rstest::rstest;
133 use vortex_buffer::ByteBufferMut;
134
135 use super::*;
136
137 #[rstest]
138 #[case::single_write(vec![vec![1, 2, 3]], vec![1, 2, 3])]
139 #[case::two_writes(vec![vec![1, 2], vec![3, 4]], vec![1, 2, 3, 4])]
140 #[case::three_writes(vec![vec![1], vec![2], vec![3]], vec![1, 2, 3])]
141 #[case::with_empty(vec![vec![1, 2], vec![], vec![3, 4]], vec![1, 2, 3, 4])]
142 #[tokio::test]
143 async fn test_vec_multiple_writes(#[case] writes: Vec<Vec<u8>>, #[case] expected: Vec<u8>) {
144 let mut writer = Vec::new();
145
146 for data in writes {
147 VortexWrite::write_all(&mut writer, data).await.unwrap();
148 }
149
150 VortexWrite::flush(&mut writer).await.unwrap();
151 VortexWrite::shutdown(&mut writer).await.unwrap();
152 assert_eq!(writer, expected);
153 }
154
155 #[rstest]
156 #[case::single_write(vec![vec![5, 6, 7]], vec![5, 6, 7])]
157 #[case::two_writes(vec![vec![10], vec![20]], vec![10, 20])]
158 #[case::multiple_small(vec![vec![1], vec![2], vec![3], vec![4]], vec![1, 2, 3, 4])]
159 #[tokio::test]
160 async fn test_byte_buffer_mut_operations(
161 #[case] writes: Vec<Vec<u8>>,
162 #[case] expected: Vec<u8>,
163 ) {
164 let mut buffer = ByteBufferMut::with_capacity(0);
165
166 for data in writes {
167 VortexWrite::write_all(&mut buffer, data).await.unwrap();
168 }
169
170 VortexWrite::flush(&mut buffer).await.unwrap();
171 VortexWrite::shutdown(&mut buffer).await.unwrap();
172 assert_eq!(buffer.as_ref(), &expected[..]);
173 }
174
175 #[rstest]
176 #[case::empty(vec![], 0)]
177 #[case::single_byte(vec![42], 1)]
178 #[case::multiple_bytes(vec![1, 2, 3, 4, 5], 5)]
179 #[case::large(vec![0; 1024], 1024)]
180 #[tokio::test]
181 async fn test_various_write_sizes(#[case] data: Vec<u8>, #[case] expected_len: usize) {
182 let mut writer = Vec::new();
183 VortexWrite::write_all(&mut writer, data.clone())
184 .await
185 .unwrap();
186 assert_eq!(writer.len(), expected_len);
187 assert_eq!(writer, data);
188 }
189
190 #[tokio::test]
191 async fn test_cursor_operations() {
192 let mut data = [0u8; 20];
193 {
194 let mut cursor = Cursor::new(&mut data[..]);
195
196 VortexWrite::write_all(&mut cursor, vec![1, 2, 3, 4, 5])
198 .await
199 .unwrap();
200 assert_eq!(cursor.position(), 5);
201
202 VortexWrite::write_all(&mut cursor, vec![6, 7, 8, 9, 10])
204 .await
205 .unwrap();
206 assert_eq!(cursor.position(), 10);
207
208 VortexWrite::flush(&mut cursor).await.unwrap();
210 }
211
212 assert_eq!(&data[..10], &[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
214 }
215
216 #[tokio::test]
217 async fn test_futures_cursor() {
218 let mut vec = Vec::new();
219 {
220 let mut cursor = futures::io::Cursor::new(&mut vec);
221
222 VortexWrite::write_all(&mut cursor, vec![10, 20, 30])
224 .await
225 .unwrap();
226 assert_eq!(cursor.position(), 3);
227
228 VortexWrite::write_all(&mut cursor, vec![40, 50])
229 .await
230 .unwrap();
231 assert_eq!(cursor.position(), 5);
232
233 VortexWrite::flush(&mut cursor).await.unwrap();
235 VortexWrite::shutdown(&mut cursor).await.unwrap();
236 }
237
238 assert_eq!(vec, vec![10, 20, 30, 40, 50]);
239 }
240
241 #[tokio::test]
242 async fn test_mutable_reference() {
243 let mut vec = Vec::new();
244 {
245 let mut writer_ref = &mut vec;
246
247 VortexWrite::write_all(&mut writer_ref, vec![100, 101, 102])
249 .await
250 .unwrap();
251
252 VortexWrite::flush(&mut writer_ref).await.unwrap();
253 VortexWrite::shutdown(&mut writer_ref).await.unwrap();
254 }
255
256 assert_eq!(vec, vec![100, 101, 102]);
257 }
258
259 #[tokio::test]
260 async fn test_large_writes() {
261 let mut writer = Vec::new();
262 let large_data = vec![42u8; 100_000];
263
264 VortexWrite::write_all(&mut writer, large_data.clone())
265 .await
266 .unwrap();
267 assert_eq!(writer.len(), 100_000);
268 assert!(writer.iter().all(|&b| b == 42));
269 }
270
271 #[tokio::test]
272 async fn test_empty_writes() {
273 let mut writer = Vec::new();
274 let empty = vec![];
275
276 VortexWrite::write_all(&mut writer, empty.clone())
277 .await
278 .unwrap();
279 assert!(writer.is_empty());
280
281 VortexWrite::write_all(&mut writer, vec![1, 2, 3])
282 .await
283 .unwrap();
284 VortexWrite::write_all(&mut writer, empty).await.unwrap();
285 assert_eq!(writer, vec![1, 2, 3]);
286 }
287
288 #[tokio::test]
289 async fn test_sequential_accumulation() {
290 let mut buffer = ByteBufferMut::with_capacity(0);
291
292 for i in 0u8..5 {
293 VortexWrite::write_all(&mut buffer, vec![i]).await.unwrap();
294 }
295
296 assert_eq!(buffer.len(), 5);
297 assert_eq!(buffer.as_ref(), &[0, 1, 2, 3, 4]);
298 }
299
300 #[rstest]
301 #[case::vec_writer(0)]
302 #[case::byte_buffer(1)]
303 #[tokio::test]
304 async fn test_writer_types(#[case] writer_type: usize) {
305 let data = vec![10, 20, 30];
306
307 match writer_type {
308 0 => {
309 let mut writer = Vec::new();
310 VortexWrite::write_all(&mut writer, data.clone())
311 .await
312 .unwrap();
313 assert_eq!(writer, data);
314 }
315 1 => {
316 let mut writer = ByteBufferMut::with_capacity(0);
317 VortexWrite::write_all(&mut writer, data.clone())
318 .await
319 .unwrap();
320 assert_eq!(writer.as_ref(), &data[..]);
321 }
322 _ => unreachable!(),
323 }
324 }
325}