1use std::{
2 io::Read,
3 io::{self, SeekFrom, Write},
4 path::Path,
5 sync::Arc,
6};
7
8use hdfs_types::hdfs::{
9 hdfs_file_status_proto::FileType, DatanodeIdProto, GetBlockLocationsRequestProto,
10 GetFileInfoRequestProto, HdfsFileStatusProto, LocatedBlocksProto,
11};
12
13use crate::{data_transfer::BlockReadStream, HDFSError, IOType, HDFS};
14
15#[derive(Default)]
16pub struct ReaderOptions {
17 pub checksum: Option<bool>,
18}
19
20impl ReaderOptions {
21 pub fn open<S: Read + Write, D: Read + Write>(
22 self,
23 path: impl AsRef<Path>,
24 fs: &mut HDFS<S, D>,
25 ) -> Result<FileReader<D>, HDFSError> {
26 let src = path.as_ref().to_string_lossy().to_string();
27 let (_, info) = fs
28 .ipc
29 .get_file_info(GetFileInfoRequestProto { src: src.clone() })?;
30 let info_fs = match info.fs {
31 Some(fs) => fs,
32 None => {
33 return Err(io::Error::new(
34 io::ErrorKind::NotFound,
35 path.as_ref().display().to_string(),
36 )
37 .into());
38 }
39 };
40 if !matches!(info_fs.file_type(), FileType::IsFile) {
41 return Err(io::Error::new(io::ErrorKind::Other, "expect file type").into());
42 }
43 let (header, resp) = fs.ipc.get_block_locations(GetBlockLocationsRequestProto {
44 src: src.clone(),
45 offset: 0,
46 length: info_fs.length,
47 })?;
48 let locations = match resp.locations {
49 Some(loc) => loc,
50 None => {
51 return Err(io::Error::new(
52 io::ErrorKind::InvalidData,
53 format!("response header {header:?}"),
54 )
55 .into());
56 }
57 };
58
59 let client_name = fs.client_name.clone();
60 let conn_fn = fs.connect_data_node.clone();
61 let blk_stream = match locations.blocks.first().cloned() {
62 Some(block) => Some(create_blk_stream(
63 block,
64 &conn_fn,
65 client_name.clone(),
66 self.checksum,
67 0,
68 )?),
69 None => None,
70 };
71 Ok(FileReader {
72 connect_data_node: conn_fn,
73 locations,
74 read: 0,
75 block_idx: 0,
76 blk_stream,
77 client_name,
78 checksum: self.checksum,
79 metadata: info_fs,
80 })
81 }
82}
83
84fn create_blk_stream<D: Read + Write>(
85 block: hdfs_types::hdfs::LocatedBlockProto,
86 conn_fn: &Arc<dyn Fn(&DatanodeIdProto, IOType) -> Result<D, io::Error>>,
87 client_name: String,
88 checksum: Option<bool>,
89 offset: u64,
90) -> Result<BlockReadStream<D>, HDFSError> {
91 let stream =
92 block
93 .locs
94 .iter()
95 .enumerate()
96 .find_map(|(idx, loc)| match conn_fn(&loc.id, IOType::Read) {
97 Ok(stream) => Some(stream),
98 Err(e) => {
99 tracing::info!(
100 "try {} location of block {} failed {}",
101 idx + 1,
102 block.b.block_id,
103 e
104 );
105 None
106 }
107 });
108 let stream = stream.ok_or_else(|| HDFSError::NoAvailableLocation)?;
109 let blk_stream = BlockReadStream::new(client_name, stream, offset, checksum, block)?;
110 Ok(blk_stream)
111}
112
113pub struct FileReader<D: Read + Write> {
114 connect_data_node: Arc<dyn Fn(&DatanodeIdProto, IOType) -> io::Result<D>>,
115 locations: LocatedBlocksProto,
116 read: usize,
117 block_idx: usize,
118 client_name: String,
119 blk_stream: Option<BlockReadStream<D>>,
120 checksum: Option<bool>,
121 metadata: HdfsFileStatusProto,
122}
123
124impl<D: Read + Write> Read for FileReader<D> {
125 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
126 if let Some(stream) = &mut self.blk_stream {
127 let num = stream.read(buf)?;
128 self.read += num;
129 if stream.remaining() == 0 {
130 self.block_idx += 1;
131 if let Some(block) = self.locations.blocks.get(self.block_idx).cloned() {
132 let blk_stream = create_blk_stream(
133 block,
134 &self.connect_data_node,
135 self.client_name.clone(),
136 self.checksum,
137 0,
138 )?;
139 self.blk_stream = Some(blk_stream);
140 }
141 }
142 Ok(num)
143 } else {
144 Ok(0)
145 }
146 }
147}
148
149impl<D: Read + Write> io::Seek for FileReader<D> {
150 fn seek(&mut self, pos: SeekFrom) -> io::Result<u64> {
151 match pos {
152 SeekFrom::Start(pos) => self.seek_from_start(pos),
153 SeekFrom::End(pos) => {
154 let pos = self.locations.file_length as i64 + pos;
155 if pos < 0 {
156 return Err(io::Error::new(
157 io::ErrorKind::InvalidInput,
158 "seek before byte 0",
159 ));
160 }
161 self.seek_from_start(pos as u64)
162 }
163 SeekFrom::Current(pos) => {
164 let pos = self.read as i64 + pos;
165 if pos < 0 {
166 return Err(io::Error::new(
167 io::ErrorKind::InvalidInput,
168 "seek before byte 0",
169 ));
170 }
171 self.seek_from_start(pos as u64)
172 }
173 }
174 }
175}
176
177impl<D: Read + Write> FileReader<D> {
178 pub fn metadata(&self) -> HdfsFileStatusProto {
179 self.metadata.clone()
180 }
181
182 fn seek_from_start(&mut self, pos: u64) -> Result<u64, io::Error> {
183 let locations = self.locations.clone();
184 if pos >= locations.file_length {
185 self.read = locations.file_length as usize;
186 if let Some(block) = locations.blocks.last().cloned() {
187 let offset = block.b.num_bytes();
188 self.blk_stream = Some(create_blk_stream(
189 block,
190 &self.connect_data_node,
191 self.client_name.clone(),
192 self.checksum,
193 offset,
194 )?);
195 }
196 if !self.locations.blocks.is_empty() {
197 self.block_idx = self.locations.blocks.len() - 1;
198 }
199 Ok(locations.file_length)
200 } else {
201 let (blk_idx, block) = locations
202 .blocks
203 .clone()
204 .into_iter()
205 .enumerate()
206 .find(|(_, blk)| blk.offset + blk.b.num_bytes() > pos)
207 .ok_or_else(|| io::Error::new(io::ErrorKind::Other, "no matched block found"))?;
208 let offset = pos - block.offset;
209 self.blk_stream = Some(create_blk_stream(
210 block,
211 &self.connect_data_node,
212 self.client_name.clone(),
213 self.checksum,
214 offset,
215 )?);
216 self.block_idx = blk_idx;
217 self.read = pos as usize;
218 Ok(pos)
219 }
220 }
221}