1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
use std::io::{Result, SeekFrom};
use std::pin::Pin;
use futures::{
future::BoxFuture,
io::{AsyncRead, AsyncSeek},
Future,
};
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct RangeOutput {
pub start: u64,
pub data: Vec<u8>,
}
pub type RangedFuture =
Box<dyn Fn(u64, usize) -> BoxFuture<'static, std::io::Result<RangeOutput>> + Send + Sync>;
pub struct RangedAsyncReader {
pos: u64,
length: u64,
state: State,
ranged_future: RangedFuture,
min_request_size: usize,
}
enum State {
HasChunk(RangeOutput),
Seeking(BoxFuture<'static, std::io::Result<RangeOutput>>),
}
impl RangedAsyncReader {
pub fn new(length: usize, min_request_size: usize, ranged_future: RangedFuture) -> Self {
let length = length as u64;
Self {
pos: 0,
length,
state: State::HasChunk(RangeOutput {
start: 0,
data: vec![],
}),
ranged_future,
min_request_size,
}
}
}
fn range_includes(a: (usize, usize), test_interval: (usize, usize)) -> bool {
if test_interval.0 < a.0 {
return false;
}
let test_end = test_interval.0 + test_interval.1;
let a_end = a.0 + a.1;
if test_end > a_end {
return false;
}
true
}
impl AsyncRead for RangedAsyncReader {
fn poll_read(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
buf: &mut [u8],
) -> std::task::Poll<Result<usize>> {
let requested_range = (self.pos as usize, buf.len());
let min_request_size = self.min_request_size;
match &mut self.state {
State::HasChunk(output) => {
let existing_range = (output.start as usize, output.data.len());
if range_includes(existing_range, requested_range) {
let offset = requested_range.0 - existing_range.0;
buf.copy_from_slice(&output.data[offset..offset + buf.len()]);
self.pos += buf.len() as u64;
std::task::Poll::Ready(Ok(buf.len()))
} else {
let start = requested_range.0 as u64;
let length = std::cmp::max(min_request_size, requested_range.1);
let future = (self.ranged_future)(start, length);
self.state = State::Seeking(Box::pin(future));
self.poll_read(cx, buf)
}
}
State::Seeking(ref mut future) => match Pin::new(future).poll(cx) {
std::task::Poll::Ready(v) => {
match v {
Ok(output) => self.state = State::HasChunk(output),
Err(e) => return std::task::Poll::Ready(Err(e)),
};
self.poll_read(cx, buf)
}
std::task::Poll::Pending => std::task::Poll::Pending,
},
}
}
}
impl AsyncSeek for RangedAsyncReader {
fn poll_seek(
mut self: std::pin::Pin<&mut Self>,
_: &mut std::task::Context<'_>,
pos: SeekFrom,
) -> std::task::Poll<Result<u64>> {
match pos {
SeekFrom::Start(pos) => self.pos = pos,
SeekFrom::End(pos) => self.pos = (self.length as i64 + pos) as u64,
SeekFrom::Current(pos) => self.pos = (self.pos as i64 + pos) as u64,
};
std::task::Poll::Ready(Ok(self.pos))
}
}