vortex_io/
read.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
use std::future::{self, Future};
use std::io;
use std::io::Cursor;
use std::sync::Arc;

use bytes::BytesMut;
use vortex_buffer::Buffer;
use vortex_error::vortex_err;

/// An asynchronous streaming reader.
///
/// Implementations expose data via the asynchronous [`read_into`][VortexRead::read_into], which
/// will fill the exact number of bytes and advance the stream.
///
/// If the exact number of bytes is not available from the stream, an
/// [`UnexpectedEof`][std::io::ErrorKind::UnexpectedEof] error is returned.
pub trait VortexRead {
    fn read_into(&mut self, buffer: BytesMut) -> impl Future<Output = io::Result<BytesMut>>;
}

/// A trait for types that support asynchronous reads.
///
/// References to the type must be safe to [share across threads][Send], but spawned
/// futures may be `!Send` to support thread-per-core implementations.
///
/// Readers must be cheaply cloneable to allow for easy sharing across tasks or threads.
pub trait VortexReadAt: Send + Sync + Clone + 'static {
    /// Request an asynchronous positional read to be done, with results written into the provided `buffer`.
    ///
    /// This method will take ownership of the provided `buffer`, and upon successful completion will return
    /// the buffer completely full with data.
    ///
    /// If the reader does not have enough data available to fill the buffer, the returned Future will complete
    /// with an [`io::Error`].
    ///
    /// ## Thread Safety
    ///
    /// The resultant Future need not be [`Send`], allowing implementations that use thread-per-core
    /// executors.
    fn read_at_into(
        &self,
        pos: u64,
        buffer: BytesMut,
    ) -> impl Future<Output = io::Result<BytesMut>> + 'static;

    // TODO(ngates): the read implementation should be able to hint at its latency/throughput
    //  allowing the caller to make better decisions about how to coalesce reads.
    fn performance_hint(&self) -> usize {
        0
    }

    /// Asynchronously get the number of bytes of data readable.
    ///
    /// For a file it will be the size in bytes, for an object in an
    /// `ObjectStore` it will be the `ObjectMeta::size`.
    fn size(&self) -> impl Future<Output = u64> + 'static;
}

impl<T: VortexReadAt> VortexReadAt for Arc<T> {
    fn read_at_into(
        &self,
        pos: u64,
        buffer: BytesMut,
    ) -> impl Future<Output = io::Result<BytesMut>> + 'static {
        T::read_at_into(self, pos, buffer)
    }

    fn performance_hint(&self) -> usize {
        T::performance_hint(self)
    }

    fn size(&self) -> impl Future<Output = u64> + 'static {
        T::size(self)
    }
}

impl VortexRead for BytesMut {
    async fn read_into(&mut self, buffer: BytesMut) -> io::Result<BytesMut> {
        if buffer.len() > self.len() {
            Err(io::Error::new(
                io::ErrorKind::UnexpectedEof,
                vortex_err!("unexpected eof"),
            ))
        } else {
            Ok(self.split_to(buffer.len()))
        }
    }
}

// Implement reading for a cursor operation.
impl<R: VortexReadAt> VortexRead for Cursor<R> {
    async fn read_into(&mut self, buffer: BytesMut) -> io::Result<BytesMut> {
        let res = R::read_at_into(self.get_ref(), self.position(), buffer).await?;
        self.set_position(self.position() + res.len() as u64);
        Ok(res)
    }
}

impl VortexReadAt for Buffer {
    fn read_at_into(
        &self,
        pos: u64,
        mut buffer: BytesMut,
    ) -> impl Future<Output = io::Result<BytesMut>> + 'static {
        if buffer.len() + pos as usize > self.len() {
            future::ready(Err(io::Error::new(
                io::ErrorKind::UnexpectedEof,
                vortex_err!("unexpected eof"),
            )))
        } else {
            let buffer_len = buffer.len();
            buffer.copy_from_slice(
                self.slice(pos as usize..pos as usize + buffer_len)
                    .as_slice(),
            );
            future::ready(Ok(buffer))
        }
    }

    fn size(&self) -> impl Future<Output = u64> + 'static {
        future::ready(self.len() as u64)
    }
}