iroh_io/
lib.rs

1//! Traits for async local IO, and implementations for local files, http resources and memory.
2//! This crate provides two core traits: [AsyncSliceReader] and [AsyncSliceWriter].
3//!
4//! Memory implementations for Bytes and BytesMut are provided by default, file and http are
5//! behind features.
6//!
7//! Futures for the two core traits are not required to be [Send](std::marker::Send).
8//!
9//! This allows implementing these traits for types that are not thread safe, such as many
10//! embedded databases.
11//!
12//! It also allows for omitting thread synchronization primitives. E.g. you could use an
13//! `Rc<RefCell<BytesMut>>` instead of a much more costly `Arc<Mutex<BytesMut>>`.
14//!
15//! The downside is that you have to use a local executor, e.g.
16//! [LocalPoolHandle](https://docs.rs/tokio-util/latest/tokio_util/task/struct.LocalPoolHandle.html),
17//! if you want to do async IO in a background task.
18//!
19//! A good blog post providing a rationale for these decisions is
20//! [Local Async Executors and Why They Should be the Default](https://maciej.codes/2022-06-09-local-async.html).
21//!
22//! All futures returned by this trait must be polled to completion, otherwise
23//! all subsequent calls will produce an io error.
24//!
25//! So it is fine to e.g. limit a call to read_at with a timeout, but once a future
26//! is dropped without being polled to completion, the reader is not useable anymore.
27//!
28//! All methods, even those that do not modify the underlying resource, take a mutable
29//! reference to self. This is to enforce linear access to the underlying resource.
30//!
31//! In general it is assumed that readers are cheap, so in case of an error you can
32//! always get a new reader. Also, if you need concurrent access to the same resource,
33//! create multiple readers.
34//!
35//! One thing you might wonder is why there are separate methods for writing [Bytes] and writing slices.
36//! The reason is that if you already have [Bytes] and the underlying writer needs [Bytes], you can avoid
37//! an allocation.
38#![deny(missing_docs, rustdoc::broken_intra_doc_links)]
39
40use std::{
41    future::Future,
42    io::{self, Cursor},
43};
44
45use bytes::{Buf, Bytes, BytesMut};
46
47/// A trait to abstract async reading from different resource.
48///
49/// This trait does not require the notion of a current position, but instead
50/// requires explicitly passing the offset to read_at. In addition to the ability
51/// to read at an arbitrary offset, it also provides the ability to get the
52/// length of the resource.
53///
54/// This is similar to the io interface of sqlite.
55/// See xRead, xFileSize in <https://www.sqlite.org/c3ref/io_methods.html>
56#[allow(clippy::len_without_is_empty)]
57pub trait AsyncSliceReader {
58    /// Read the entire buffer at the given position.
59    ///
60    /// Will return at most `len` bytes, but may return fewer if the resource is smaller.
61    /// If the range is completely covered by the resource, will return exactly `len` bytes.
62    /// If the range is not covered at all by the resource, will return an empty buffer.
63    /// It will never return an io error independent of the range as long as the underlying
64    /// resource is valid.
65    #[must_use = "io futures must be polled to completion"]
66    fn read_at(&mut self, offset: u64, len: usize) -> impl Future<Output = io::Result<Bytes>>;
67
68    /// Variant of read_at that returns an error if less than `len` bytes are read.
69    fn read_exact_at(
70        &mut self,
71        offset: u64,
72        len: usize,
73    ) -> impl Future<Output = io::Result<Bytes>> {
74        async move {
75            let res = self.read_at(offset, len).await?;
76            if res.len() < len {
77                return Err(io::ErrorKind::UnexpectedEof.into());
78            }
79            Ok(res)
80        }
81    }
82
83    /// Get the length of the resource
84    #[must_use = "io futures must be polled to completion"]
85    fn size(&mut self) -> impl Future<Output = io::Result<u64>>;
86}
87
88impl<T: AsyncSliceReader> AsyncSliceReader for &mut T {
89    async fn read_at(&mut self, offset: u64, len: usize) -> io::Result<Bytes> {
90        (**self).read_at(offset, len).await
91    }
92
93    async fn size(&mut self) -> io::Result<u64> {
94        (**self).size().await
95    }
96}
97
98impl<T: AsyncSliceReader> AsyncSliceReader for Box<T> {
99    async fn read_at(&mut self, offset: u64, len: usize) -> io::Result<Bytes> {
100        (**self).read_at(offset, len).await
101    }
102
103    async fn size(&mut self) -> io::Result<u64> {
104        (**self).size().await
105    }
106}
107
108/// Extension trait for [AsyncSliceReader].
109pub trait AsyncSliceReaderExt: AsyncSliceReader {
110    /// Read the entire resource into a [bytes::Bytes] buffer, if possible.
111    #[allow(async_fn_in_trait)]
112    async fn read_to_end(&mut self) -> io::Result<Bytes> {
113        self.read_at(0, usize::MAX).await
114    }
115}
116
117impl<T: AsyncSliceReader> AsyncSliceReaderExt for T {}
118
119/// A trait to abstract async writing to different resources.
120///
121/// This trait does not require the notion of a current position, but instead
122/// requires explicitly passing the offset to write_at and write_bytes_at.
123/// In addition to the ability to write at an arbitrary offset, it also provides
124/// the ability to set the length of the resource.
125///
126/// This is similar to the io interface of sqlite.
127/// See xWrite in <https://www.sqlite.org/c3ref/io_methods.html>
128pub trait AsyncSliceWriter: Sized {
129    /// Write the entire slice at the given position.
130    ///
131    /// if self.len < offset + data.len(), the underlying resource will be extended.
132    /// if self.len < offset, the gap will be filled with zeros.
133    #[must_use = "io futures must be polled to completion"]
134    fn write_at(&mut self, offset: u64, data: &[u8]) -> impl Future<Output = io::Result<()>>;
135
136    /// Write the entire Bytes at the given position.
137    ///
138    /// Use this if you have a Bytes, to avoid allocations.
139    /// Other than that it is equivalent to [AsyncSliceWriter::write_at].
140    #[must_use = "io futures must be polled to completion"]
141    fn write_bytes_at(&mut self, offset: u64, data: Bytes) -> impl Future<Output = io::Result<()>>;
142
143    /// Set the length of the underlying storage.
144    #[must_use = "io futures must be polled to completion"]
145    fn set_len(&mut self, len: u64) -> impl Future<Output = io::Result<()>>;
146
147    /// Sync any buffers to the underlying storage.
148    #[must_use = "io futures must be polled to completion"]
149    fn sync(&mut self) -> impl Future<Output = io::Result<()>>;
150}
151
152impl<T: AsyncSliceWriter> AsyncSliceWriter for &mut T {
153    async fn write_at(&mut self, offset: u64, data: &[u8]) -> io::Result<()> {
154        (**self).write_at(offset, data).await
155    }
156
157    async fn write_bytes_at(&mut self, offset: u64, data: Bytes) -> io::Result<()> {
158        (**self).write_bytes_at(offset, data).await
159    }
160
161    async fn set_len(&mut self, len: u64) -> io::Result<()> {
162        (**self).set_len(len).await
163    }
164
165    async fn sync(&mut self) -> io::Result<()> {
166        (**self).sync().await
167    }
168}
169
170impl<T: AsyncSliceWriter> AsyncSliceWriter for Box<T> {
171    async fn write_at(&mut self, offset: u64, data: &[u8]) -> io::Result<()> {
172        (**self).write_at(offset, data).await
173    }
174
175    async fn write_bytes_at(&mut self, offset: u64, data: Bytes) -> io::Result<()> {
176        (**self).write_bytes_at(offset, data).await
177    }
178
179    async fn set_len(&mut self, len: u64) -> io::Result<()> {
180        (**self).set_len(len).await
181    }
182
183    async fn sync(&mut self) -> io::Result<()> {
184        (**self).sync().await
185    }
186}
187
188/// A non seekable reader, e.g. a network socket.
189pub trait AsyncStreamReader {
190    /// Read at most `len` bytes. To read to the end, pass u64::MAX.
191    ///
192    /// returns an empty buffer to indicate EOF.
193    fn read_bytes(&mut self, len: usize) -> impl Future<Output = io::Result<Bytes>>;
194
195    /// Read a fixed size buffer.
196    ///
197    /// If there are less than L bytes available, an io::ErrorKind::UnexpectedEof error is returned.
198    fn read<const L: usize>(&mut self) -> impl Future<Output = io::Result<[u8; L]>>;
199
200    /// Variant of read_bytes that returns an error if less than `len` bytes are read.
201    fn read_bytes_exact(&mut self, len: usize) -> impl Future<Output = io::Result<Bytes>> {
202        async move {
203            let res = self.read_bytes(len).await?;
204            if res.len() < len {
205                return Err(io::ErrorKind::UnexpectedEof.into());
206            }
207            Ok(res)
208        }
209    }
210
211    /// Variant of read that returns an error if less than `L` bytes are read.
212    fn read_exact<const L: usize>(&mut self) -> impl Future<Output = io::Result<[u8; L]>> {
213        async move {
214            let res = self.read::<L>().await?;
215            if res.len() < L {
216                return Err(io::ErrorKind::UnexpectedEof.into());
217            }
218            Ok(res)
219        }
220    }
221}
222
223impl<T: AsyncStreamReader> AsyncStreamReader for &mut T {
224    async fn read_bytes(&mut self, len: usize) -> io::Result<Bytes> {
225        (**self).read_bytes(len).await
226    }
227
228    async fn read<const L: usize>(&mut self) -> io::Result<[u8; L]> {
229        (**self).read().await
230    }
231}
232
233impl AsyncStreamReader for Bytes {
234    async fn read_bytes(&mut self, len: usize) -> io::Result<Bytes> {
235        let res = self.split_to(len.min(Bytes::len(self)));
236        Ok(res)
237    }
238
239    async fn read<const L: usize>(&mut self) -> io::Result<[u8; L]> {
240        if Bytes::len(self) < L {
241            return Err(io::ErrorKind::UnexpectedEof.into());
242        }
243        let mut res = [0u8; L];
244        self.split_to(L).copy_to_slice(&mut res);
245        Ok(res)
246    }
247}
248
249impl AsyncStreamReader for BytesMut {
250    async fn read_bytes(&mut self, len: usize) -> io::Result<Bytes> {
251        let res = self.split_to(len.min(BytesMut::len(self)));
252        Ok(res.freeze())
253    }
254
255    async fn read<const L: usize>(&mut self) -> io::Result<[u8; L]> {
256        if BytesMut::len(self) < L {
257            return Err(io::ErrorKind::UnexpectedEof.into());
258        }
259        let mut res = [0u8; L];
260        self.split_to(L).copy_to_slice(&mut res);
261        Ok(res)
262    }
263}
264
265impl AsyncStreamReader for &[u8] {
266    async fn read_bytes(&mut self, len: usize) -> io::Result<Bytes> {
267        let len = len.min(self.len());
268        let res = Bytes::copy_from_slice(&self[..len]);
269        *self = &self[len..];
270        Ok(res)
271    }
272
273    async fn read<const L: usize>(&mut self) -> io::Result<[u8; L]> {
274        if self.len() < L {
275            return Err(io::ErrorKind::UnexpectedEof.into());
276        }
277        let mut res = [0u8; L];
278        res.copy_from_slice(&self[..L]);
279        *self = &self[L..];
280        Ok(res)
281    }
282}
283
284impl<T: AsyncSliceReader> AsyncStreamReader for Cursor<T> {
285    async fn read_bytes(&mut self, len: usize) -> io::Result<Bytes> {
286        let offset = self.position();
287        let res = self.get_mut().read_at(offset, len).await?;
288        self.set_position(offset + res.len() as u64);
289        Ok(res)
290    }
291
292    async fn read<const L: usize>(&mut self) -> io::Result<[u8; L]> {
293        let offset = self.position();
294        let res = self.get_mut().read_at(offset, L).await?;
295        if res.len() < L {
296            return Err(io::ErrorKind::UnexpectedEof.into());
297        }
298        self.set_position(offset + res.len() as u64);
299        let mut buf = [0u8; L];
300        buf.copy_from_slice(&res);
301        Ok(buf)
302    }
303}
304
305/// A non seekable writer, e.g. a network socket.
306pub trait AsyncStreamWriter {
307    /// Write the entire slice.
308    ///
309    /// In case of an error, some bytes may have been written.
310    fn write(&mut self, data: &[u8]) -> impl Future<Output = io::Result<()>>;
311
312    /// Write the entire bytes.
313    ///
314    /// In case of an error, some bytes may have been written.
315    fn write_bytes(&mut self, data: Bytes) -> impl Future<Output = io::Result<()>>;
316
317    /// Sync any buffers to the underlying storage.
318    fn sync(&mut self) -> impl Future<Output = io::Result<()>>;
319}
320
321impl<T: AsyncStreamWriter> AsyncStreamWriter for &mut T {
322    async fn write(&mut self, data: &[u8]) -> io::Result<()> {
323        (**self).write(data).await
324    }
325
326    async fn sync(&mut self) -> io::Result<()> {
327        (**self).sync().await
328    }
329
330    async fn write_bytes(&mut self, data: Bytes) -> io::Result<()> {
331        (**self).write_bytes(data).await
332    }
333}
334
335impl AsyncStreamWriter for Vec<u8> {
336    async fn write(&mut self, data: &[u8]) -> io::Result<()> {
337        self.extend_from_slice(data);
338        Ok(())
339    }
340
341    async fn write_bytes(&mut self, data: Bytes) -> io::Result<()> {
342        self.extend_from_slice(data.as_ref());
343        Ok(())
344    }
345
346    async fn sync(&mut self) -> io::Result<()> {
347        Ok(())
348    }
349}
350
351impl AsyncStreamWriter for BytesMut {
352    async fn write(&mut self, data: &[u8]) -> io::Result<()> {
353        self.extend_from_slice(data);
354        Ok(())
355    }
356
357    async fn write_bytes(&mut self, data: Bytes) -> io::Result<()> {
358        self.extend_from_slice(data.as_ref());
359        Ok(())
360    }
361
362    async fn sync(&mut self) -> io::Result<()> {
363        Ok(())
364    }
365}
366
367#[cfg(feature = "tokio-io")]
368mod tokio_io;
369#[cfg(feature = "tokio-io")]
370pub use tokio_io::*;
371
372#[cfg(feature = "stats")]
373pub mod stats;
374
375#[cfg(feature = "x-http")]
376mod http;
377#[cfg(feature = "x-http")]
378pub use http::*;
379
380/// implementations for [AsyncSliceReader] and [AsyncSliceWriter] for [bytes::Bytes] and [bytes::BytesMut]
381mod mem;
382
383#[cfg(feature = "tokio-util")]
384impl<L, R> AsyncSliceReader for tokio_util::either::Either<L, R>
385where
386    L: AsyncSliceReader + 'static,
387    R: AsyncSliceReader + 'static,
388{
389    async fn read_at(&mut self, offset: u64, len: usize) -> io::Result<Bytes> {
390        match self {
391            Self::Left(l) => l.read_at(offset, len).await,
392            Self::Right(r) => r.read_at(offset, len).await,
393        }
394    }
395
396    async fn size(&mut self) -> io::Result<u64> {
397        match self {
398            Self::Left(l) => l.size().await,
399            Self::Right(r) => r.size().await,
400        }
401    }
402}
403
404#[cfg(feature = "tokio-util")]
405impl<L, R> AsyncSliceWriter for tokio_util::either::Either<L, R>
406where
407    L: AsyncSliceWriter + 'static,
408    R: AsyncSliceWriter + 'static,
409{
410    async fn write_bytes_at(&mut self, offset: u64, data: Bytes) -> io::Result<()> {
411        match self {
412            Self::Left(l) => l.write_bytes_at(offset, data).await,
413            Self::Right(r) => r.write_bytes_at(offset, data).await,
414        }
415    }
416
417    async fn write_at(&mut self, offset: u64, data: &[u8]) -> io::Result<()> {
418        match self {
419            Self::Left(l) => l.write_at(offset, data).await,
420            Self::Right(r) => r.write_at(offset, data).await,
421        }
422    }
423
424    async fn sync(&mut self) -> io::Result<()> {
425        match self {
426            Self::Left(l) => l.sync().await,
427            Self::Right(r) => r.sync().await,
428        }
429    }
430
431    async fn set_len(&mut self, len: u64) -> io::Result<()> {
432        match self {
433            Self::Left(l) => l.set_len(len).await,
434            Self::Right(r) => r.set_len(len).await,
435        }
436    }
437}
438
439#[cfg(any(feature = "tokio-io", feature = "x-http"))]
440fn make_io_error<E>(e: E) -> io::Error
441where
442    E: Into<Box<dyn std::error::Error + Send + Sync>>,
443{
444    io::Error::other(e)
445}
446
447#[cfg(test)]
448mod tests {
449
450    use std::fmt::Debug;
451    #[cfg(feature = "tokio-io")]
452    use std::io::Write;
453
454    use proptest::prelude::*;
455
456    use super::*;
457    use crate::mem::limited_range;
458
459    /// A test server that serves data on a random port, supporting head, get, and range requests
460    #[cfg(feature = "x-http")]
461    mod test_server {
462        use std::{net::SocketAddr, ops::Range, sync::Arc};
463
464        use axum::{routing::get, Extension, Router};
465        use hyper::{Body, Request, Response, StatusCode};
466
467        use super::*;
468
469        pub fn serve(data: Vec<u8>) -> (SocketAddr, impl Future<Output = hyper::Result<()>>) {
470            // Create an Axum router
471            let app = Router::new()
472                .route("/", get(handler))
473                .layer(Extension(Arc::new(data)));
474
475            // Create the server
476            let addr: SocketAddr = SocketAddr::from(([0, 0, 0, 0], 0));
477            let fut = axum::Server::bind(&addr).serve(app.into_make_service());
478
479            // Return the server address and a future that completes when the server is shut down
480            (fut.local_addr(), fut)
481        }
482
483        async fn handler(state: Extension<Arc<Vec<u8>>>, req: Request<Body>) -> Response<Body> {
484            let data = state.0.as_ref();
485            // Check if the request contains a "Range" header
486            if let Some(range_header) = req.headers().get("Range") {
487                if let Ok(range) = parse_range_header(range_header.to_str().unwrap()) {
488                    // Extract the requested range from the data
489                    let start = range.start.min(data.len());
490                    let end = range.end.min(data.len());
491                    let sliced_data = &data[start..end];
492                    if start == end {
493                        return Response::builder()
494                            .header("Content-Type", "application/octet-stream")
495                            .status(StatusCode::NO_CONTENT)
496                            .body(Body::from(vec![]))
497                            .unwrap();
498                    }
499
500                    // Create a partial response with the sliced data
501                    return Response::builder()
502                        .status(StatusCode::PARTIAL_CONTENT)
503                        .header("Content-Type", "application/octet-stream")
504                        .header("Content-Length", sliced_data.len())
505                        .header(
506                            "Content-Range",
507                            format!("bytes {}-{}/{}", start, end - 1, data.len()),
508                        )
509                        .body(Body::from(sliced_data.to_vec()))
510                        .unwrap();
511                }
512            }
513
514            // Return the full data if no range header was found
515            Response::new(data.to_owned().into())
516        }
517
518        fn parse_range_header(
519            header_value: &str,
520        ) -> std::result::Result<Range<usize>, &'static str> {
521            let prefix = "bytes=";
522            if header_value.starts_with(prefix) {
523                let range_str = header_value.strip_prefix(prefix).unwrap();
524                if let Some(index) = range_str.find('-') {
525                    let start = range_str[..index]
526                        .parse()
527                        .map_err(|_| "Failed to parse range start")?;
528                    let end: usize = range_str[index + 1..]
529                        .parse()
530                        .map_err(|_| "Failed to parse range end")?;
531                    return Ok(start..end + 1);
532                }
533            }
534            Err("Invalid Range header format")
535        }
536    }
537
538    /// mutable style read smoke test, expects a resource containing 0..100u8
539    async fn read_mut_smoke(mut file: impl AsyncSliceReader) -> io::Result<()> {
540        let expected = (0..100u8).collect::<Vec<_>>();
541
542        // read the whole file
543        let res = file.read_at(0, usize::MAX).await?;
544        assert_eq!(res, expected);
545
546        let res = file.size().await?;
547        assert_eq!(res, 100);
548
549        // read 3 bytes at offset 0
550        let res = file.read_at(0, 3).await?;
551        assert_eq!(res, vec![0, 1, 2]);
552
553        // read 10 bytes at offset 95 (near the end of the file)
554        let res = file.read_at(95, 10).await?;
555        assert_eq!(res, vec![95, 96, 97, 98, 99]);
556
557        // read 10 bytes at offset 110 (after the end of the file)
558        let res = file.read_at(110, 10).await?;
559        assert_eq!(res, vec![]);
560
561        Ok(())
562    }
563
564    /// mutable style write smoke test, expects an empty resource
565    async fn write_mut_smoke<F: AsyncSliceWriter, C: Fn(&F) -> Vec<u8>>(
566        mut file: F,
567        contents: C,
568    ) -> io::Result<()> {
569        // write 3 bytes at offset 0
570        file.write_bytes_at(0, vec![0, 1, 2].into()).await?;
571        assert_eq!(contents(&file), &[0, 1, 2]);
572
573        // write 3 bytes at offset 5
574        file.write_bytes_at(5, vec![0, 1, 2].into()).await?;
575        assert_eq!(contents(&file), &[0, 1, 2, 0, 0, 0, 1, 2]);
576
577        // write a u16 at offset 8
578        file.write_at(8, &1u16.to_le_bytes()).await?;
579        assert_eq!(contents(&file), &[0, 1, 2, 0, 0, 0, 1, 2, 1, 0]);
580
581        // truncate to 0
582        file.set_len(0).await?;
583        assert_eq!(contents(&file).len(), 0);
584
585        Ok(())
586    }
587
588    #[cfg(feature = "tokio-io")]
589    /// Tests the various ways to read from a std::fs::File
590    #[tokio::test]
591    async fn file_reading_smoke() -> io::Result<()> {
592        // create a file with 100 bytes
593        let mut file = tempfile::tempfile().unwrap();
594        file.write_all(&(0..100u8).collect::<Vec<_>>()).unwrap();
595        read_mut_smoke(File::from_std(file)).await?;
596        Ok(())
597    }
598
599    /// Tests the various ways to read from a bytes::Bytes
600    #[tokio::test]
601    async fn bytes_reading_smoke() -> io::Result<()> {
602        let bytes: Bytes = (0..100u8).collect::<Vec<_>>().into();
603        read_mut_smoke(bytes).await?;
604
605        Ok(())
606    }
607
608    /// Tests the various ways to read from a bytes::BytesMut
609    #[tokio::test]
610    async fn bytes_mut_reading_smoke() -> io::Result<()> {
611        let mut bytes: BytesMut = BytesMut::new();
612        bytes.extend(0..100u8);
613
614        read_mut_smoke(bytes).await?;
615
616        Ok(())
617    }
618
619    fn bytes_mut_contents(bytes: &BytesMut) -> Vec<u8> {
620        bytes.to_vec()
621    }
622
623    #[cfg(feature = "tokio-io")]
624    #[tokio::test]
625    async fn async_slice_writer_smoke() -> io::Result<()> {
626        let file = tempfile::tempfile().unwrap();
627        write_mut_smoke(File::from_std(file), |x| x.read_contents()).await?;
628
629        Ok(())
630    }
631
632    #[tokio::test]
633    async fn bytes_mut_writing_smoke() -> io::Result<()> {
634        let bytes: BytesMut = BytesMut::new();
635
636        write_mut_smoke(bytes, |x| x.as_ref().to_vec()).await?;
637
638        Ok(())
639    }
640
641    fn random_slice(offset: u64, size: usize) -> impl Strategy<Value = (u64, Vec<u8>)> {
642        (0..offset, 0..size).prop_map(|(offset, size)| {
643            let data = (0..size).map(|x| x as u8).collect::<Vec<_>>();
644            (offset, data)
645        })
646    }
647
648    fn random_write_op(offset: u64, size: usize) -> impl Strategy<Value = WriteOp> {
649        prop_oneof![
650            20 => random_slice(offset, size).prop_map(|(offset, data)| WriteOp::Write(offset, data)),
651            1 => (0..(offset + size as u64)).prop_map(WriteOp::SetLen),
652            1 => Just(WriteOp::Sync),
653        ]
654    }
655
656    fn random_write_ops(offset: u64, size: usize, n: usize) -> impl Strategy<Value = Vec<WriteOp>> {
657        prop::collection::vec(random_write_op(offset, size), n)
658    }
659
660    fn random_read_ops(offset: u64, size: usize, n: usize) -> impl Strategy<Value = Vec<ReadOp>> {
661        prop::collection::vec(random_read_op(offset, size), n)
662    }
663
664    fn sequential_offset(mag: usize) -> impl Strategy<Value = isize> {
665        prop_oneof![
666            20 => Just(0),
667            1 => (0..mag).prop_map(|x| x as isize),
668            1 => (0..mag).prop_map(|x| -(x as isize)),
669        ]
670    }
671
672    fn random_read_op(offset: u64, size: usize) -> impl Strategy<Value = ReadOp> {
673        prop_oneof![
674            20 => (0..offset, 0..size).prop_map(|(offset, len)| ReadOp::ReadAt(offset, len)),
675            1 => (sequential_offset(1024), 0..size).prop_map(|(offset, len)| ReadOp::ReadSequential(offset, len)),
676            1 => Just(ReadOp::Len),
677        ]
678    }
679
680    #[derive(Debug, Clone)]
681    enum ReadOp {
682        // read the data at the given offset
683        ReadAt(u64, usize),
684        // read the data relative to the previous position
685        ReadSequential(isize, usize),
686        // ask for the length of the file
687        Len,
688    }
689
690    #[derive(Debug, Clone)]
691    enum WriteOp {
692        // write the data at the given offset
693        Write(u64, Vec<u8>),
694        // set the length of the file
695        SetLen(u64),
696        // sync the file
697        Sync,
698    }
699
700    /// reference implementation for a vector of bytes
701    fn apply_op(file: &mut Vec<u8>, op: &WriteOp) {
702        match op {
703            WriteOp::Write(offset, data) => {
704                // an empty write is a no-op and does not resize the file
705                if data.is_empty() {
706                    return;
707                }
708                let end = offset.saturating_add(data.len() as u64);
709                let start = usize::try_from(*offset).unwrap();
710                let end = usize::try_from(end).unwrap();
711                if end > file.len() {
712                    file.resize(end, 0);
713                }
714                file[start..end].copy_from_slice(data);
715            }
716            WriteOp::SetLen(offset) => {
717                let offset = usize::try_from(*offset).unwrap_or(usize::MAX);
718                file.resize(offset, 0);
719            }
720            WriteOp::Sync => {}
721        }
722    }
723
724    fn async_test<F: Future>(f: F) -> F::Output {
725        let rt = tokio::runtime::Builder::new_current_thread()
726            .enable_all()
727            .build()
728            .unwrap();
729        rt.block_on(f)
730    }
731
732    async fn write_op_test<W: AsyncSliceWriter, C: Fn(&W) -> Vec<u8>>(
733        ops: Vec<WriteOp>,
734        mut bytes: W,
735        content: C,
736    ) -> io::Result<()> {
737        let mut reference = Vec::new();
738        for op in ops {
739            apply_op(&mut reference, &op);
740            match op {
741                WriteOp::Write(offset, data) => {
742                    AsyncSliceWriter::write_bytes_at(&mut bytes, offset, data.into()).await?;
743                }
744                WriteOp::SetLen(offset) => {
745                    AsyncSliceWriter::set_len(&mut bytes, offset).await?;
746                }
747                WriteOp::Sync => {
748                    AsyncSliceWriter::sync(&mut bytes).await?;
749                }
750            }
751            assert_eq!(content(&bytes), reference.as_slice());
752        }
753        io::Result::Ok(())
754    }
755
756    async fn read_op_test<R: AsyncSliceReader + Debug>(
757        ops: Vec<ReadOp>,
758        mut file: R,
759        actual: &[u8],
760    ) -> io::Result<()> {
761        let mut current = 0u64;
762        for op in ops {
763            match op {
764                ReadOp::ReadAt(offset, len) => {
765                    println!("{:?} {} {}", file, offset, len);
766                    let data = AsyncSliceReader::read_at(&mut file, offset, len).await?;
767                    assert_eq!(&data, &actual[limited_range(offset, len, actual.len())]);
768                    current = offset.checked_add(len as u64).unwrap();
769                }
770                ReadOp::ReadSequential(offset, len) => {
771                    let offset = if offset >= 0 {
772                        current.saturating_add(offset as u64)
773                    } else {
774                        current.saturating_sub((-offset) as u64)
775                    };
776                    let data = AsyncSliceReader::read_at(&mut file, offset, len).await?;
777                    assert_eq!(&data, &actual[limited_range(offset, len, actual.len())]);
778                    current = offset.checked_add(len as u64).unwrap();
779                }
780                ReadOp::Len => {
781                    let len = AsyncSliceReader::size(&mut file).await?;
782                    assert_eq!(len, actual.len() as u64);
783                }
784            }
785        }
786        io::Result::Ok(())
787    }
788
789    // #[cfg(feature = "x-http")]
790    // #[tokio::test]
791    // async fn test_http_range() -> io::Result<()> {
792    //     let url = reqwest::Url::parse("https://ipfs.io/ipfs/bafybeiaj2dgwpi6bsisyf4wq7yvj4lqvpbmlmztm35hqyqqjihybnden24/image").unwrap();
793    //     let mut resource = HttpAdapter::new(url).await?;
794    //     let buf = resource.read_at(0, 100).await?;
795    //     println!("buf: {:?}", buf);
796    //     let buf = resource.read_at(1000000, 100).await?;
797    //     println!("buf: {:?}", buf);
798    //     Ok(())
799    // }
800
801    #[cfg(feature = "x-http")]
802    #[tokio::test]
803    #[cfg_attr(target_os = "windows", ignore)]
804    async fn http_smoke() {
805        let (addr, server) = test_server::serve(b"hello world".to_vec());
806        let url = format!("http://{}", addr);
807        println!("serving from {}", url);
808        let url = reqwest::Url::parse(&url).unwrap();
809        let server = tokio::spawn(server);
810        let mut reader = HttpAdapter::new(url);
811        let len = reader.size().await.unwrap();
812        assert_eq!(len, 11);
813        println!("len: {:?}", reader);
814        let part = reader.read_at(0, 11).await.unwrap();
815        assert_eq!(part.as_ref(), b"hello world");
816        let part = reader.read_at(6, 5).await.unwrap();
817        assert_eq!(part.as_ref(), b"world");
818        let part = reader.read_at(6, 10).await.unwrap();
819        assert_eq!(part.as_ref(), b"world");
820        let part = reader.read_at(100, 10).await.unwrap();
821        assert_eq!(part.as_ref(), b"");
822        server.abort();
823    }
824
825    proptest! {
826
827        #[test]
828        fn bytes_write(ops in random_write_ops(1024, 1024, 10)) {
829            async_test(write_op_test(ops, BytesMut::new(), bytes_mut_contents)).unwrap();
830        }
831
832        #[cfg(feature = "tokio-io")]
833        #[test]
834        fn file_write(ops in random_write_ops(1024, 1024, 10)) {
835            let file = tempfile::tempfile().unwrap();
836            async_test(write_op_test(ops, File::from_std(file), |x| x.read_contents())).unwrap();
837        }
838
839        #[test]
840        fn bytes_read(data in proptest::collection::vec(any::<u8>(), 0..1024), ops in random_read_ops(1024, 1024, 2)) {
841            async_test(read_op_test(ops.clone(), Bytes::from(data.clone()), &data)).unwrap();
842            async_test(read_op_test(ops.clone(), BytesMut::from(data.as_slice()), &data)).unwrap();
843            async_test(read_op_test(ops, data.as_slice(), &data)).unwrap();
844        }
845
846        #[cfg(feature = "tokio-io")]
847        #[test]
848        fn file_read(data in proptest::collection::vec(any::<u8>(), 0..1024), ops in random_read_ops(1024, 1024, 2)) {
849            let mut file = tempfile::tempfile().unwrap();
850            file.write_all(&data).unwrap();
851            async_test(read_op_test(ops, File::from_std(file), &data)).unwrap();
852        }
853
854        #[cfg(feature = "x-http")]
855        #[cfg_attr(target_os = "windows", ignore)]
856        #[test]
857        fn http_read(data in proptest::collection::vec(any::<u8>(), 0..10), ops in random_read_ops(10, 10, 2)) {
858            async_test(async move {
859                // create a test server. this has to happen in a tokio runtime
860                let (addr, server) = test_server::serve(data.clone());
861                // spawn the server in a background task
862                let server = tokio::spawn(server);
863                // create a resource from the server
864                let url = reqwest::Url::parse(&format!("http://{}", addr)).unwrap();
865                let file = HttpAdapter::new(url);
866                // run the test
867                read_op_test(ops, file, &data).await.unwrap();
868                // stop the server
869                server.abort();
870            });
871        }
872    }
873}