vortex_io/
read.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
use std::future::{self, Future};
use std::io;
use std::io::Cursor;
use std::sync::Arc;

use bytes::{Bytes, BytesMut};
use vortex_buffer::Buffer;
use vortex_error::vortex_err;

/// An asynchronous streaming reader.
///
/// Implementations expose data via the asynchronous [`read_bytes`][VortexRead::read_bytes], which
/// will fill the exact number of bytes and advance the stream.
///
/// If the exact number of bytes is not available from the stream, an
/// [`UnexpectedEof`][std::io::ErrorKind::UnexpectedEof] error is returned.
pub trait VortexRead {
    fn read_bytes(&mut self, len: u64) -> impl Future<Output = io::Result<Bytes>>;
}

/// A trait for types that support asynchronous reads.
///
/// References to the type must be safe to [share across threads][Send], but spawned
/// futures may be `!Send` to support thread-per-core implementations.
///
/// Readers must be cheaply cloneable to allow for easy sharing across tasks or threads.
pub trait VortexReadAt: Send + Sync + Clone + 'static {
    /// Request an asynchronous positional read. Results will be returned as an owned [`Bytes`].
    ///
    /// If the reader does not have the requested number of bytes, the returned Future will complete
    /// with an [`io::Error`].
    ///
    /// ## Thread Safety
    ///
    /// The resultant Future need not be [`Send`], allowing implementations that use thread-per-core
    /// executors.
    fn read_byte_range(
        &self,
        pos: u64,
        len: u64,
    ) -> impl Future<Output = io::Result<Bytes>> + 'static;

    // TODO(ngates): the read implementation should be able to hint at its latency/throughput
    //  allowing the caller to make better decisions about how to coalesce reads.
    fn performance_hint(&self) -> usize {
        0
    }

    /// Asynchronously get the number of bytes of data readable.
    ///
    /// For a file it will be the size in bytes, for an object in an
    /// `ObjectStore` it will be the `ObjectMeta::size`.
    fn size(&self) -> impl Future<Output = u64> + 'static;
}

impl<T: VortexReadAt> VortexReadAt for Arc<T> {
    fn read_byte_range(
        &self,
        pos: u64,
        len: u64,
    ) -> impl Future<Output = io::Result<Bytes>> + 'static {
        T::read_byte_range(self, pos, len)
    }

    fn performance_hint(&self) -> usize {
        T::performance_hint(self)
    }

    fn size(&self) -> impl Future<Output = u64> + 'static {
        T::size(self)
    }
}

impl VortexRead for BytesMut {
    async fn read_bytes(&mut self, len: u64) -> io::Result<Bytes> {
        if (len as usize) > self.len() {
            Err(io::Error::new(
                io::ErrorKind::UnexpectedEof,
                vortex_err!("unexpected eof"),
            ))
        } else {
            Ok(self.split_to(len as usize).freeze())
        }
    }
}

// Implement reading for a cursor operation.
impl<R: VortexReadAt> VortexRead for Cursor<R> {
    async fn read_bytes(&mut self, len: u64) -> io::Result<Bytes> {
        let res = R::read_byte_range(self.get_ref(), self.position(), len).await?;
        self.set_position(self.position() + len);
        Ok(res)
    }
}

impl VortexReadAt for Buffer {
    fn read_byte_range(
        &self,
        pos: u64,
        len: u64,
    ) -> impl Future<Output = io::Result<Bytes>> + 'static {
        if (len + pos) as usize > self.len() {
            future::ready(Err(io::Error::new(
                io::ErrorKind::UnexpectedEof,
                vortex_err!("unexpected eof"),
            )))
        } else {
            let mut buffer = BytesMut::with_capacity(len as usize);
            unsafe {
                buffer.set_len(len as usize);
            }
            buffer.copy_from_slice(self.slice(pos as usize..(pos + len) as usize).as_slice());
            future::ready(Ok(buffer.freeze()))
        }
    }

    fn size(&self) -> impl Future<Output = u64> + 'static {
        future::ready(self.len() as u64)
    }
}

impl VortexReadAt for Bytes {
    fn read_byte_range(
        &self,
        pos: u64,
        len: u64,
    ) -> impl Future<Output = io::Result<Bytes>> + 'static {
        if (pos + len) as usize > self.len() {
            future::ready(Err(io::Error::new(
                io::ErrorKind::UnexpectedEof,
                vortex_err!("unexpected eof"),
            )))
        } else {
            let sliced = self.slice(pos as usize..(pos + len) as usize);
            future::ready(Ok(sliced))
        }
    }

    fn size(&self) -> impl Future<Output = u64> + 'static {
        let len = self.len() as u64;
        future::ready(len)
    }
}