1#![deny(missing_docs, rustdoc::broken_intra_doc_links)]
39
40use std::{
41 future::Future,
42 io::{self, Cursor},
43};
44
45use bytes::{Buf, Bytes, BytesMut};
46
47#[allow(clippy::len_without_is_empty)]
57pub trait AsyncSliceReader {
58 #[must_use = "io futures must be polled to completion"]
66 fn read_at(&mut self, offset: u64, len: usize) -> impl Future<Output = io::Result<Bytes>>;
67
68 fn read_exact_at(
70 &mut self,
71 offset: u64,
72 len: usize,
73 ) -> impl Future<Output = io::Result<Bytes>> {
74 async move {
75 let res = self.read_at(offset, len).await?;
76 if res.len() < len {
77 return Err(io::ErrorKind::UnexpectedEof.into());
78 }
79 Ok(res)
80 }
81 }
82
83 #[must_use = "io futures must be polled to completion"]
85 fn size(&mut self) -> impl Future<Output = io::Result<u64>>;
86}
87
88impl<T: AsyncSliceReader> AsyncSliceReader for &mut T {
89 async fn read_at(&mut self, offset: u64, len: usize) -> io::Result<Bytes> {
90 (**self).read_at(offset, len).await
91 }
92
93 async fn size(&mut self) -> io::Result<u64> {
94 (**self).size().await
95 }
96}
97
98impl<T: AsyncSliceReader> AsyncSliceReader for Box<T> {
99 async fn read_at(&mut self, offset: u64, len: usize) -> io::Result<Bytes> {
100 (**self).read_at(offset, len).await
101 }
102
103 async fn size(&mut self) -> io::Result<u64> {
104 (**self).size().await
105 }
106}
107
108pub trait AsyncSliceReaderExt: AsyncSliceReader {
110 #[allow(async_fn_in_trait)]
112 async fn read_to_end(&mut self) -> io::Result<Bytes> {
113 self.read_at(0, usize::MAX).await
114 }
115}
116
117impl<T: AsyncSliceReader> AsyncSliceReaderExt for T {}
118
119pub trait AsyncSliceWriter: Sized {
129 #[must_use = "io futures must be polled to completion"]
134 fn write_at(&mut self, offset: u64, data: &[u8]) -> impl Future<Output = io::Result<()>>;
135
136 #[must_use = "io futures must be polled to completion"]
141 fn write_bytes_at(&mut self, offset: u64, data: Bytes) -> impl Future<Output = io::Result<()>>;
142
143 #[must_use = "io futures must be polled to completion"]
145 fn set_len(&mut self, len: u64) -> impl Future<Output = io::Result<()>>;
146
147 #[must_use = "io futures must be polled to completion"]
149 fn sync(&mut self) -> impl Future<Output = io::Result<()>>;
150}
151
152impl<T: AsyncSliceWriter> AsyncSliceWriter for &mut T {
153 async fn write_at(&mut self, offset: u64, data: &[u8]) -> io::Result<()> {
154 (**self).write_at(offset, data).await
155 }
156
157 async fn write_bytes_at(&mut self, offset: u64, data: Bytes) -> io::Result<()> {
158 (**self).write_bytes_at(offset, data).await
159 }
160
161 async fn set_len(&mut self, len: u64) -> io::Result<()> {
162 (**self).set_len(len).await
163 }
164
165 async fn sync(&mut self) -> io::Result<()> {
166 (**self).sync().await
167 }
168}
169
170impl<T: AsyncSliceWriter> AsyncSliceWriter for Box<T> {
171 async fn write_at(&mut self, offset: u64, data: &[u8]) -> io::Result<()> {
172 (**self).write_at(offset, data).await
173 }
174
175 async fn write_bytes_at(&mut self, offset: u64, data: Bytes) -> io::Result<()> {
176 (**self).write_bytes_at(offset, data).await
177 }
178
179 async fn set_len(&mut self, len: u64) -> io::Result<()> {
180 (**self).set_len(len).await
181 }
182
183 async fn sync(&mut self) -> io::Result<()> {
184 (**self).sync().await
185 }
186}
187
188pub trait AsyncStreamReader {
190 fn read_bytes(&mut self, len: usize) -> impl Future<Output = io::Result<Bytes>>;
194
195 fn read<const L: usize>(&mut self) -> impl Future<Output = io::Result<[u8; L]>>;
199
200 fn read_bytes_exact(&mut self, len: usize) -> impl Future<Output = io::Result<Bytes>> {
202 async move {
203 let res = self.read_bytes(len).await?;
204 if res.len() < len {
205 return Err(io::ErrorKind::UnexpectedEof.into());
206 }
207 Ok(res)
208 }
209 }
210
211 fn read_exact<const L: usize>(&mut self) -> impl Future<Output = io::Result<[u8; L]>> {
213 async move {
214 let res = self.read::<L>().await?;
215 if res.len() < L {
216 return Err(io::ErrorKind::UnexpectedEof.into());
217 }
218 Ok(res)
219 }
220 }
221}
222
223impl<T: AsyncStreamReader> AsyncStreamReader for &mut T {
224 async fn read_bytes(&mut self, len: usize) -> io::Result<Bytes> {
225 (**self).read_bytes(len).await
226 }
227
228 async fn read<const L: usize>(&mut self) -> io::Result<[u8; L]> {
229 (**self).read().await
230 }
231}
232
233impl AsyncStreamReader for Bytes {
234 async fn read_bytes(&mut self, len: usize) -> io::Result<Bytes> {
235 let res = self.split_to(len.min(Bytes::len(self)));
236 Ok(res)
237 }
238
239 async fn read<const L: usize>(&mut self) -> io::Result<[u8; L]> {
240 if Bytes::len(self) < L {
241 return Err(io::ErrorKind::UnexpectedEof.into());
242 }
243 let mut res = [0u8; L];
244 self.split_to(L).copy_to_slice(&mut res);
245 Ok(res)
246 }
247}
248
249impl AsyncStreamReader for BytesMut {
250 async fn read_bytes(&mut self, len: usize) -> io::Result<Bytes> {
251 let res = self.split_to(len.min(BytesMut::len(self)));
252 Ok(res.freeze())
253 }
254
255 async fn read<const L: usize>(&mut self) -> io::Result<[u8; L]> {
256 if BytesMut::len(self) < L {
257 return Err(io::ErrorKind::UnexpectedEof.into());
258 }
259 let mut res = [0u8; L];
260 self.split_to(L).copy_to_slice(&mut res);
261 Ok(res)
262 }
263}
264
265impl AsyncStreamReader for &[u8] {
266 async fn read_bytes(&mut self, len: usize) -> io::Result<Bytes> {
267 let len = len.min(self.len());
268 let res = Bytes::copy_from_slice(&self[..len]);
269 *self = &self[len..];
270 Ok(res)
271 }
272
273 async fn read<const L: usize>(&mut self) -> io::Result<[u8; L]> {
274 if self.len() < L {
275 return Err(io::ErrorKind::UnexpectedEof.into());
276 }
277 let mut res = [0u8; L];
278 res.copy_from_slice(&self[..L]);
279 *self = &self[L..];
280 Ok(res)
281 }
282}
283
284impl<T: AsyncSliceReader> AsyncStreamReader for Cursor<T> {
285 async fn read_bytes(&mut self, len: usize) -> io::Result<Bytes> {
286 let offset = self.position();
287 let res = self.get_mut().read_at(offset, len).await?;
288 self.set_position(offset + res.len() as u64);
289 Ok(res)
290 }
291
292 async fn read<const L: usize>(&mut self) -> io::Result<[u8; L]> {
293 let offset = self.position();
294 let res = self.get_mut().read_at(offset, L).await?;
295 if res.len() < L {
296 return Err(io::ErrorKind::UnexpectedEof.into());
297 }
298 self.set_position(offset + res.len() as u64);
299 let mut buf = [0u8; L];
300 buf.copy_from_slice(&res);
301 Ok(buf)
302 }
303}
304
305pub trait AsyncStreamWriter {
307 fn write(&mut self, data: &[u8]) -> impl Future<Output = io::Result<()>>;
311
312 fn write_bytes(&mut self, data: Bytes) -> impl Future<Output = io::Result<()>>;
316
317 fn sync(&mut self) -> impl Future<Output = io::Result<()>>;
319}
320
321impl<T: AsyncStreamWriter> AsyncStreamWriter for &mut T {
322 async fn write(&mut self, data: &[u8]) -> io::Result<()> {
323 (**self).write(data).await
324 }
325
326 async fn sync(&mut self) -> io::Result<()> {
327 (**self).sync().await
328 }
329
330 async fn write_bytes(&mut self, data: Bytes) -> io::Result<()> {
331 (**self).write_bytes(data).await
332 }
333}
334
335impl AsyncStreamWriter for Vec<u8> {
336 async fn write(&mut self, data: &[u8]) -> io::Result<()> {
337 self.extend_from_slice(data);
338 Ok(())
339 }
340
341 async fn write_bytes(&mut self, data: Bytes) -> io::Result<()> {
342 self.extend_from_slice(data.as_ref());
343 Ok(())
344 }
345
346 async fn sync(&mut self) -> io::Result<()> {
347 Ok(())
348 }
349}
350
351impl AsyncStreamWriter for BytesMut {
352 async fn write(&mut self, data: &[u8]) -> io::Result<()> {
353 self.extend_from_slice(data);
354 Ok(())
355 }
356
357 async fn write_bytes(&mut self, data: Bytes) -> io::Result<()> {
358 self.extend_from_slice(data.as_ref());
359 Ok(())
360 }
361
362 async fn sync(&mut self) -> io::Result<()> {
363 Ok(())
364 }
365}
366
367#[cfg(feature = "tokio-io")]
368mod tokio_io;
369#[cfg(feature = "tokio-io")]
370pub use tokio_io::*;
371
372#[cfg(feature = "stats")]
373pub mod stats;
374
375#[cfg(feature = "x-http")]
376mod http;
377#[cfg(feature = "x-http")]
378pub use http::*;
379
380mod mem;
382
383#[cfg(feature = "tokio-util")]
384impl<L, R> AsyncSliceReader for tokio_util::either::Either<L, R>
385where
386 L: AsyncSliceReader + 'static,
387 R: AsyncSliceReader + 'static,
388{
389 async fn read_at(&mut self, offset: u64, len: usize) -> io::Result<Bytes> {
390 match self {
391 Self::Left(l) => l.read_at(offset, len).await,
392 Self::Right(r) => r.read_at(offset, len).await,
393 }
394 }
395
396 async fn size(&mut self) -> io::Result<u64> {
397 match self {
398 Self::Left(l) => l.size().await,
399 Self::Right(r) => r.size().await,
400 }
401 }
402}
403
404#[cfg(feature = "tokio-util")]
405impl<L, R> AsyncSliceWriter for tokio_util::either::Either<L, R>
406where
407 L: AsyncSliceWriter + 'static,
408 R: AsyncSliceWriter + 'static,
409{
410 async fn write_bytes_at(&mut self, offset: u64, data: Bytes) -> io::Result<()> {
411 match self {
412 Self::Left(l) => l.write_bytes_at(offset, data).await,
413 Self::Right(r) => r.write_bytes_at(offset, data).await,
414 }
415 }
416
417 async fn write_at(&mut self, offset: u64, data: &[u8]) -> io::Result<()> {
418 match self {
419 Self::Left(l) => l.write_at(offset, data).await,
420 Self::Right(r) => r.write_at(offset, data).await,
421 }
422 }
423
424 async fn sync(&mut self) -> io::Result<()> {
425 match self {
426 Self::Left(l) => l.sync().await,
427 Self::Right(r) => r.sync().await,
428 }
429 }
430
431 async fn set_len(&mut self, len: u64) -> io::Result<()> {
432 match self {
433 Self::Left(l) => l.set_len(len).await,
434 Self::Right(r) => r.set_len(len).await,
435 }
436 }
437}
438
439#[cfg(any(feature = "tokio-io", feature = "x-http"))]
440fn make_io_error<E>(e: E) -> io::Error
441where
442 E: Into<Box<dyn std::error::Error + Send + Sync>>,
443{
444 io::Error::other(e)
445}
446
447#[cfg(test)]
448mod tests {
449
450 use std::fmt::Debug;
451 #[cfg(feature = "tokio-io")]
452 use std::io::Write;
453
454 use proptest::prelude::*;
455
456 use super::*;
457 use crate::mem::limited_range;
458
459 #[cfg(feature = "x-http")]
461 mod test_server {
462 use std::{net::SocketAddr, ops::Range, sync::Arc};
463
464 use axum::{routing::get, Extension, Router};
465 use hyper::{Body, Request, Response, StatusCode};
466
467 use super::*;
468
469 pub fn serve(data: Vec<u8>) -> (SocketAddr, impl Future<Output = hyper::Result<()>>) {
470 let app = Router::new()
472 .route("/", get(handler))
473 .layer(Extension(Arc::new(data)));
474
475 let addr: SocketAddr = SocketAddr::from(([0, 0, 0, 0], 0));
477 let fut = axum::Server::bind(&addr).serve(app.into_make_service());
478
479 (fut.local_addr(), fut)
481 }
482
483 async fn handler(state: Extension<Arc<Vec<u8>>>, req: Request<Body>) -> Response<Body> {
484 let data = state.0.as_ref();
485 if let Some(range_header) = req.headers().get("Range") {
487 if let Ok(range) = parse_range_header(range_header.to_str().unwrap()) {
488 let start = range.start.min(data.len());
490 let end = range.end.min(data.len());
491 let sliced_data = &data[start..end];
492 if start == end {
493 return Response::builder()
494 .header("Content-Type", "application/octet-stream")
495 .status(StatusCode::NO_CONTENT)
496 .body(Body::from(vec![]))
497 .unwrap();
498 }
499
500 return Response::builder()
502 .status(StatusCode::PARTIAL_CONTENT)
503 .header("Content-Type", "application/octet-stream")
504 .header("Content-Length", sliced_data.len())
505 .header(
506 "Content-Range",
507 format!("bytes {}-{}/{}", start, end - 1, data.len()),
508 )
509 .body(Body::from(sliced_data.to_vec()))
510 .unwrap();
511 }
512 }
513
514 Response::new(data.to_owned().into())
516 }
517
518 fn parse_range_header(
519 header_value: &str,
520 ) -> std::result::Result<Range<usize>, &'static str> {
521 let prefix = "bytes=";
522 if header_value.starts_with(prefix) {
523 let range_str = header_value.strip_prefix(prefix).unwrap();
524 if let Some(index) = range_str.find('-') {
525 let start = range_str[..index]
526 .parse()
527 .map_err(|_| "Failed to parse range start")?;
528 let end: usize = range_str[index + 1..]
529 .parse()
530 .map_err(|_| "Failed to parse range end")?;
531 return Ok(start..end + 1);
532 }
533 }
534 Err("Invalid Range header format")
535 }
536 }
537
538 async fn read_mut_smoke(mut file: impl AsyncSliceReader) -> io::Result<()> {
540 let expected = (0..100u8).collect::<Vec<_>>();
541
542 let res = file.read_at(0, usize::MAX).await?;
544 assert_eq!(res, expected);
545
546 let res = file.size().await?;
547 assert_eq!(res, 100);
548
549 let res = file.read_at(0, 3).await?;
551 assert_eq!(res, vec![0, 1, 2]);
552
553 let res = file.read_at(95, 10).await?;
555 assert_eq!(res, vec![95, 96, 97, 98, 99]);
556
557 let res = file.read_at(110, 10).await?;
559 assert_eq!(res, vec![]);
560
561 Ok(())
562 }
563
564 async fn write_mut_smoke<F: AsyncSliceWriter, C: Fn(&F) -> Vec<u8>>(
566 mut file: F,
567 contents: C,
568 ) -> io::Result<()> {
569 file.write_bytes_at(0, vec![0, 1, 2].into()).await?;
571 assert_eq!(contents(&file), &[0, 1, 2]);
572
573 file.write_bytes_at(5, vec![0, 1, 2].into()).await?;
575 assert_eq!(contents(&file), &[0, 1, 2, 0, 0, 0, 1, 2]);
576
577 file.write_at(8, &1u16.to_le_bytes()).await?;
579 assert_eq!(contents(&file), &[0, 1, 2, 0, 0, 0, 1, 2, 1, 0]);
580
581 file.set_len(0).await?;
583 assert_eq!(contents(&file).len(), 0);
584
585 Ok(())
586 }
587
588 #[cfg(feature = "tokio-io")]
589 #[tokio::test]
591 async fn file_reading_smoke() -> io::Result<()> {
592 let mut file = tempfile::tempfile().unwrap();
594 file.write_all(&(0..100u8).collect::<Vec<_>>()).unwrap();
595 read_mut_smoke(File::from_std(file)).await?;
596 Ok(())
597 }
598
599 #[tokio::test]
601 async fn bytes_reading_smoke() -> io::Result<()> {
602 let bytes: Bytes = (0..100u8).collect::<Vec<_>>().into();
603 read_mut_smoke(bytes).await?;
604
605 Ok(())
606 }
607
608 #[tokio::test]
610 async fn bytes_mut_reading_smoke() -> io::Result<()> {
611 let mut bytes: BytesMut = BytesMut::new();
612 bytes.extend(0..100u8);
613
614 read_mut_smoke(bytes).await?;
615
616 Ok(())
617 }
618
619 fn bytes_mut_contents(bytes: &BytesMut) -> Vec<u8> {
620 bytes.to_vec()
621 }
622
623 #[cfg(feature = "tokio-io")]
624 #[tokio::test]
625 async fn async_slice_writer_smoke() -> io::Result<()> {
626 let file = tempfile::tempfile().unwrap();
627 write_mut_smoke(File::from_std(file), |x| x.read_contents()).await?;
628
629 Ok(())
630 }
631
632 #[tokio::test]
633 async fn bytes_mut_writing_smoke() -> io::Result<()> {
634 let bytes: BytesMut = BytesMut::new();
635
636 write_mut_smoke(bytes, |x| x.as_ref().to_vec()).await?;
637
638 Ok(())
639 }
640
641 fn random_slice(offset: u64, size: usize) -> impl Strategy<Value = (u64, Vec<u8>)> {
642 (0..offset, 0..size).prop_map(|(offset, size)| {
643 let data = (0..size).map(|x| x as u8).collect::<Vec<_>>();
644 (offset, data)
645 })
646 }
647
648 fn random_write_op(offset: u64, size: usize) -> impl Strategy<Value = WriteOp> {
649 prop_oneof![
650 20 => random_slice(offset, size).prop_map(|(offset, data)| WriteOp::Write(offset, data)),
651 1 => (0..(offset + size as u64)).prop_map(WriteOp::SetLen),
652 1 => Just(WriteOp::Sync),
653 ]
654 }
655
656 fn random_write_ops(offset: u64, size: usize, n: usize) -> impl Strategy<Value = Vec<WriteOp>> {
657 prop::collection::vec(random_write_op(offset, size), n)
658 }
659
660 fn random_read_ops(offset: u64, size: usize, n: usize) -> impl Strategy<Value = Vec<ReadOp>> {
661 prop::collection::vec(random_read_op(offset, size), n)
662 }
663
664 fn sequential_offset(mag: usize) -> impl Strategy<Value = isize> {
665 prop_oneof![
666 20 => Just(0),
667 1 => (0..mag).prop_map(|x| x as isize),
668 1 => (0..mag).prop_map(|x| -(x as isize)),
669 ]
670 }
671
672 fn random_read_op(offset: u64, size: usize) -> impl Strategy<Value = ReadOp> {
673 prop_oneof![
674 20 => (0..offset, 0..size).prop_map(|(offset, len)| ReadOp::ReadAt(offset, len)),
675 1 => (sequential_offset(1024), 0..size).prop_map(|(offset, len)| ReadOp::ReadSequential(offset, len)),
676 1 => Just(ReadOp::Len),
677 ]
678 }
679
680 #[derive(Debug, Clone)]
681 enum ReadOp {
682 ReadAt(u64, usize),
684 ReadSequential(isize, usize),
686 Len,
688 }
689
690 #[derive(Debug, Clone)]
691 enum WriteOp {
692 Write(u64, Vec<u8>),
694 SetLen(u64),
696 Sync,
698 }
699
700 fn apply_op(file: &mut Vec<u8>, op: &WriteOp) {
702 match op {
703 WriteOp::Write(offset, data) => {
704 if data.is_empty() {
706 return;
707 }
708 let end = offset.saturating_add(data.len() as u64);
709 let start = usize::try_from(*offset).unwrap();
710 let end = usize::try_from(end).unwrap();
711 if end > file.len() {
712 file.resize(end, 0);
713 }
714 file[start..end].copy_from_slice(data);
715 }
716 WriteOp::SetLen(offset) => {
717 let offset = usize::try_from(*offset).unwrap_or(usize::MAX);
718 file.resize(offset, 0);
719 }
720 WriteOp::Sync => {}
721 }
722 }
723
724 fn async_test<F: Future>(f: F) -> F::Output {
725 let rt = tokio::runtime::Builder::new_current_thread()
726 .enable_all()
727 .build()
728 .unwrap();
729 rt.block_on(f)
730 }
731
732 async fn write_op_test<W: AsyncSliceWriter, C: Fn(&W) -> Vec<u8>>(
733 ops: Vec<WriteOp>,
734 mut bytes: W,
735 content: C,
736 ) -> io::Result<()> {
737 let mut reference = Vec::new();
738 for op in ops {
739 apply_op(&mut reference, &op);
740 match op {
741 WriteOp::Write(offset, data) => {
742 AsyncSliceWriter::write_bytes_at(&mut bytes, offset, data.into()).await?;
743 }
744 WriteOp::SetLen(offset) => {
745 AsyncSliceWriter::set_len(&mut bytes, offset).await?;
746 }
747 WriteOp::Sync => {
748 AsyncSliceWriter::sync(&mut bytes).await?;
749 }
750 }
751 assert_eq!(content(&bytes), reference.as_slice());
752 }
753 io::Result::Ok(())
754 }
755
756 async fn read_op_test<R: AsyncSliceReader + Debug>(
757 ops: Vec<ReadOp>,
758 mut file: R,
759 actual: &[u8],
760 ) -> io::Result<()> {
761 let mut current = 0u64;
762 for op in ops {
763 match op {
764 ReadOp::ReadAt(offset, len) => {
765 println!("{:?} {} {}", file, offset, len);
766 let data = AsyncSliceReader::read_at(&mut file, offset, len).await?;
767 assert_eq!(&data, &actual[limited_range(offset, len, actual.len())]);
768 current = offset.checked_add(len as u64).unwrap();
769 }
770 ReadOp::ReadSequential(offset, len) => {
771 let offset = if offset >= 0 {
772 current.saturating_add(offset as u64)
773 } else {
774 current.saturating_sub((-offset) as u64)
775 };
776 let data = AsyncSliceReader::read_at(&mut file, offset, len).await?;
777 assert_eq!(&data, &actual[limited_range(offset, len, actual.len())]);
778 current = offset.checked_add(len as u64).unwrap();
779 }
780 ReadOp::Len => {
781 let len = AsyncSliceReader::size(&mut file).await?;
782 assert_eq!(len, actual.len() as u64);
783 }
784 }
785 }
786 io::Result::Ok(())
787 }
788
789 #[cfg(feature = "x-http")]
802 #[tokio::test]
803 #[cfg_attr(target_os = "windows", ignore)]
804 async fn http_smoke() {
805 let (addr, server) = test_server::serve(b"hello world".to_vec());
806 let url = format!("http://{}", addr);
807 println!("serving from {}", url);
808 let url = reqwest::Url::parse(&url).unwrap();
809 let server = tokio::spawn(server);
810 let mut reader = HttpAdapter::new(url);
811 let len = reader.size().await.unwrap();
812 assert_eq!(len, 11);
813 println!("len: {:?}", reader);
814 let part = reader.read_at(0, 11).await.unwrap();
815 assert_eq!(part.as_ref(), b"hello world");
816 let part = reader.read_at(6, 5).await.unwrap();
817 assert_eq!(part.as_ref(), b"world");
818 let part = reader.read_at(6, 10).await.unwrap();
819 assert_eq!(part.as_ref(), b"world");
820 let part = reader.read_at(100, 10).await.unwrap();
821 assert_eq!(part.as_ref(), b"");
822 server.abort();
823 }
824
825 proptest! {
826
827 #[test]
828 fn bytes_write(ops in random_write_ops(1024, 1024, 10)) {
829 async_test(write_op_test(ops, BytesMut::new(), bytes_mut_contents)).unwrap();
830 }
831
832 #[cfg(feature = "tokio-io")]
833 #[test]
834 fn file_write(ops in random_write_ops(1024, 1024, 10)) {
835 let file = tempfile::tempfile().unwrap();
836 async_test(write_op_test(ops, File::from_std(file), |x| x.read_contents())).unwrap();
837 }
838
839 #[test]
840 fn bytes_read(data in proptest::collection::vec(any::<u8>(), 0..1024), ops in random_read_ops(1024, 1024, 2)) {
841 async_test(read_op_test(ops.clone(), Bytes::from(data.clone()), &data)).unwrap();
842 async_test(read_op_test(ops.clone(), BytesMut::from(data.as_slice()), &data)).unwrap();
843 async_test(read_op_test(ops, data.as_slice(), &data)).unwrap();
844 }
845
846 #[cfg(feature = "tokio-io")]
847 #[test]
848 fn file_read(data in proptest::collection::vec(any::<u8>(), 0..1024), ops in random_read_ops(1024, 1024, 2)) {
849 let mut file = tempfile::tempfile().unwrap();
850 file.write_all(&data).unwrap();
851 async_test(read_op_test(ops, File::from_std(file), &data)).unwrap();
852 }
853
854 #[cfg(feature = "x-http")]
855 #[cfg_attr(target_os = "windows", ignore)]
856 #[test]
857 fn http_read(data in proptest::collection::vec(any::<u8>(), 0..10), ops in random_read_ops(10, 10, 2)) {
858 async_test(async move {
859 let (addr, server) = test_server::serve(data.clone());
861 let server = tokio::spawn(server);
863 let url = reqwest::Url::parse(&format!("http://{}", addr)).unwrap();
865 let file = HttpAdapter::new(url);
866 read_op_test(ops, file, &data).await.unwrap();
868 server.abort();
870 });
871 }
872 }
873}