1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
use std::io::{Result, SeekFrom};
use std::pin::Pin;

use futures::{
    future::BoxFuture,
    io::{AsyncRead, AsyncSeek},
    Future,
};

/// A range of bytes with a known starting position.
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct RangeOutput {
    /// the start
    pub start: u64,
    /// the data
    pub data: Vec<u8>,
}

/// A function that returns a [`BoxFuture`] of [`RangeOutput`].
/// For example, an async request to return a range of bytes from a blob from the internet.
pub type RangedFuture =
    Box<dyn Fn(u64, usize) -> BoxFuture<'static, std::io::Result<RangeOutput>> + Send + Sync>;

/// A struct that converts [`RangedFuture`] to a `AsyncRead + AsyncSeek` with an internal buffer.
pub struct RangedAsyncReader {
    pos: u64,
    length: u64, // total size
    state: State,
    ranged_future: RangedFuture,
    min_request_size: usize, // requests have at least this size
}

enum State {
    HasChunk(RangeOutput),
    Seeking(BoxFuture<'static, std::io::Result<RangeOutput>>),
}

impl RangedAsyncReader {
    /// Creates a new [`RangedAsyncReader`]. `length` is the total size of the blob being seeked,
    /// `min_request_size` is the minimum number of bytes allowed to be requested to `range_get`.
    pub fn new(length: usize, min_request_size: usize, ranged_future: RangedFuture) -> Self {
        let length = length as u64;
        Self {
            pos: 0,
            length,
            state: State::HasChunk(RangeOutput {
                start: 0,
                data: vec![],
            }),
            ranged_future,
            min_request_size,
        }
    }
}

// whether `test_interval` is inside `a` (start, length).
// a    = [          ]
// test =    [     ]
// returns true
fn range_includes(a: (usize, usize), test_interval: (usize, usize)) -> bool {
    if test_interval.0 < a.0 {
        return false;
    }
    let test_end = test_interval.0 + test_interval.1;
    let a_end = a.0 + a.1;
    if test_end > a_end {
        return false;
    }
    true
}

impl AsyncRead for RangedAsyncReader {
    fn poll_read(
        mut self: std::pin::Pin<&mut Self>,
        cx: &mut std::task::Context<'_>,
        buf: &mut [u8],
    ) -> std::task::Poll<Result<usize>> {
        let requested_range = (self.pos as usize, buf.len());
        let min_request_size = self.min_request_size;
        match &mut self.state {
            State::HasChunk(output) => {
                let existing_range = (output.start as usize, output.data.len());
                if range_includes(existing_range, requested_range) {
                    let offset = requested_range.0 - existing_range.0;
                    buf.copy_from_slice(&output.data[offset..offset + buf.len()]);
                    self.pos += buf.len() as u64;
                    std::task::Poll::Ready(Ok(buf.len()))
                } else {
                    let start = requested_range.0 as u64;
                    let length = std::cmp::max(min_request_size, requested_range.1);
                    let future = (self.ranged_future)(start, length);
                    self.state = State::Seeking(Box::pin(future));
                    self.poll_read(cx, buf)
                }
            }
            State::Seeking(ref mut future) => match Pin::new(future).poll(cx) {
                std::task::Poll::Ready(v) => {
                    match v {
                        Ok(output) => self.state = State::HasChunk(output),
                        Err(e) => return std::task::Poll::Ready(Err(e)),
                    };
                    self.poll_read(cx, buf)
                }
                std::task::Poll::Pending => std::task::Poll::Pending,
            },
        }
    }
}

impl AsyncSeek for RangedAsyncReader {
    fn poll_seek(
        mut self: std::pin::Pin<&mut Self>,
        _: &mut std::task::Context<'_>,
        pos: SeekFrom,
    ) -> std::task::Poll<Result<u64>> {
        match pos {
            SeekFrom::Start(pos) => self.pos = pos,
            SeekFrom::End(pos) => self.pos = (self.length as i64 + pos) as u64,
            SeekFrom::Current(pos) => self.pos = (self.pos as i64 + pos) as u64,
        };
        std::task::Poll::Ready(Ok(self.pos))
    }
}