Expand description

Summary

Native support for the limited reader.

Motivation

In proposal object-native-api we introduced Reader, in which we will send request like:

let op = OpRead {
    path: self.path.to_string(),
    offset: Some(self.current_offset()),
    size: None,
};

In this implementation, we depend on the HTTP client to drop the request when we stop reading. However, we always read too much extra data, which decreases our reading performance.

Here is a benchmark around reading the whole file and only reading half:

s3/read/1c741003-40ef-43a9-b23f-b6a32ed7c4c6
                        time:   [7.2697 ms 7.3521 ms 7.4378 ms]
                        thrpt:  [2.1008 GiB/s 2.1252 GiB/s 2.1493 GiB/s]
s3/read_half/1c741003-40ef-43a9-b23f-b6a32ed7c4c6
                        time:   [7.0645 ms 7.1524 ms 7.2473 ms]
                        thrpt:  [1.0780 GiB/s 1.0923 GiB/s 1.1059 GiB/s]

So our current behavior is buggy, and we need more clear API to address that.

Guide-level explanation

We will remove Reader::total_size() from public API instead of adding the following APIs for Object:

pub fn reader(&self) -> Reader {}
pub fn range_reader(&self, offset: u64, size: u64) -> Reader {}
pub fn offset_reader(&self, offset: u64) -> Reader {}
pub fn limited_reader(&self, size: u64) -> Reader {}
  • reader: returns a new reader who can read the whole file.
  • range_reader: returns a ranged reader which read [offset, offset+size).
  • offset_reader: returns a reader from offset [offset:]
  • limited_reader: returns a limited reader [:size]

Take parquet’s actual logic as an example. We can rewrite:

async fn _read_single_column_async<'b, R, F>(
    factory: F,
    meta: &ColumnChunkMetaData,
) -> Result<(&ColumnChunkMetaData, Vec<u8>)>
where
    R: AsyncRead + AsyncSeek + Send + Unpin,
    F: Fn() -> BoxFuture<'b, std::io::Result<R>>,
{
    let mut reader = factory().await?;
    let (start, len) = meta.byte_range();
    reader.seek(std::io::SeekFrom::Start(start)).await?;
    let mut chunk = vec![0; len as usize];
    reader.read_exact(&mut chunk).await?;
    Result::Ok((meta, chunk))
}

into

async fn _read_single_column_async<'b, R, F>(
    factory: F,
    meta: &ColumnChunkMetaData,
) -> Result<(&ColumnChunkMetaData, Vec<u8>)>
where
    R: AsyncRead + AsyncSeek + Send + Unpin,
    F: Fn(usize, usize) -> BoxFuture<'b, std::io::Result<R>>,
{
    let (start, len) = meta.byte_range();
    let mut reader = factory(start, len).await?;
    let mut chunk = vec![0; len as usize];
    reader.read_exact(&mut chunk).await?;
    Result::Ok((meta, chunk))
}

So that:

  • No extra data will be read.
  • No extra seek/stat operation is needed.

Reference-level explanation

Inside Reader, we will correctly maintain offset, size, and pos.

  • If offset is None, we will use 0 instead.
  • If size is None, we will use meta.content_length() - self.offset.unwrap_or_default() instead.

We will calculate Reader current offset and size easily:

fn current_offset(&self) -> u64 {
    self.offset.unwrap_or_default() + self.pos
}

fn current_size(&self) -> Option<u64> {
    self.size.map(|v| v - self.pos)
}

Instead of constantly requesting the entire object content, we will set the size:

let op = OpRead {
    path: self.path.to_string(),
    offset: Some(self.current_offset()),
    size: self.current_size(),
};

After this change, we will have a similar throughput for read_all and read_half:

s3/read/6dd40f8d-7455-451e-b510-3b7ac23e0468
                        time:   [4.9554 ms 5.0888 ms 5.2282 ms]
                        thrpt:  [2.9886 GiB/s 3.0704 GiB/s 3.1532 GiB/s]
s3/read_half/6dd40f8d-7455-451e-b510-3b7ac23e0468
                        time:   [3.1868 ms 3.2494 ms 3.3052 ms]
                        thrpt:  [2.3637 GiB/s 2.4043 GiB/s 2.4515 GiB/s]

Drawbacks

None

Rationale and alternatives

None

Prior art

None

Unresolved questions

None

Future possibilities

  • Refactor the parquet reading logic to make the most use of range_reader.