1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
use std::cmp;
use std::io::Read;
use std::io::Seek;
use std::io::SeekFrom;
use bytes::Bytes;
use crate::raw::*;
use crate::*;
pub fn from_fd<R>(fd: R, start: u64, end: u64) -> FdReader<R>
where
R: Read + Seek + Send + Sync,
{
FdReader {
inner: fd,
start,
end,
offset: 0,
}
}
pub struct FdReader<R: Read + Seek + Send + Sync> {
inner: R,
start: u64,
end: u64,
offset: u64,
}
impl<R> FdReader<R>
where
R: Read + Seek + Send + Sync,
{
#[inline]
pub(crate) fn current_size(&self) -> i64 {
debug_assert!(self.offset >= self.start, "offset must in range");
self.end as i64 - self.offset as i64
}
}
impl<R> oio::BlockingRead for FdReader<R>
where
R: Read + Seek + Send + Sync + 'static,
{
fn read(&mut self, buf: &mut [u8]) -> Result<usize> {
if self.current_size() <= 0 {
return Ok(0);
}
let max = cmp::min(buf.len() as u64, self.current_size() as u64) as usize;
let n = self.inner.read(&mut buf[..max]).map_err(|err| {
Error::new(ErrorKind::Unexpected, "read data from FdReader")
.with_context("source", "FdReader")
.set_source(err)
})?;
self.offset += n as u64;
Ok(n)
}
fn seek(&mut self, pos: SeekFrom) -> Result<u64> {
let (base, offset) = match pos {
SeekFrom::Start(n) => (self.start as i64, n as i64),
SeekFrom::End(n) => (self.end as i64, n),
SeekFrom::Current(n) => (self.offset as i64, n),
};
match base.checked_add(offset) {
Some(n) if n < 0 => Err(Error::new(
ErrorKind::Unexpected,
"invalid seek to a negative or overflowing position",
)),
Some(n) => {
let cur = self.inner.seek(SeekFrom::Start(n as u64)).map_err(|err| {
Error::new(ErrorKind::Unexpected, "seek data from FdReader")
.with_context("source", "FdReader")
.set_source(err)
})?;
self.offset = cur;
Ok(self.offset - self.start)
}
None => Err(Error::new(
ErrorKind::Unexpected,
"invalid seek to a negative or overflowing position",
)),
}
}
fn next(&mut self) -> Option<Result<Bytes>> {
Some(Err(Error::new(
ErrorKind::Unsupported,
"output reader doesn't support iterating",
)))
}
}