hdfs_client/fs/
reader.rs

1use std::{
2    io::Read,
3    io::{self, SeekFrom, Write},
4    path::Path,
5    sync::Arc,
6};
7
8use hdfs_types::hdfs::{
9    hdfs_file_status_proto::FileType, DatanodeIdProto, GetBlockLocationsRequestProto,
10    GetFileInfoRequestProto, HdfsFileStatusProto, LocatedBlocksProto,
11};
12
13use crate::{data_transfer::BlockReadStream, HDFSError, IOType, HDFS};
14
15#[derive(Default)]
16pub struct ReaderOptions {
17    pub checksum: Option<bool>,
18}
19
20impl ReaderOptions {
21    pub fn open<S: Read + Write, D: Read + Write>(
22        self,
23        path: impl AsRef<Path>,
24        fs: &mut HDFS<S, D>,
25    ) -> Result<FileReader<D>, HDFSError> {
26        let src = path.as_ref().to_string_lossy().to_string();
27        let (_, info) = fs
28            .ipc
29            .get_file_info(GetFileInfoRequestProto { src: src.clone() })?;
30        let info_fs = match info.fs {
31            Some(fs) => fs,
32            None => {
33                return Err(io::Error::new(
34                    io::ErrorKind::NotFound,
35                    path.as_ref().display().to_string(),
36                )
37                .into());
38            }
39        };
40        if !matches!(info_fs.file_type(), FileType::IsFile) {
41            return Err(io::Error::new(io::ErrorKind::Other, "expect file type").into());
42        }
43        let (header, resp) = fs.ipc.get_block_locations(GetBlockLocationsRequestProto {
44            src: src.clone(),
45            offset: 0,
46            length: info_fs.length,
47        })?;
48        let locations = match resp.locations {
49            Some(loc) => loc,
50            None => {
51                return Err(io::Error::new(
52                    io::ErrorKind::InvalidData,
53                    format!("response header {header:?}"),
54                )
55                .into());
56            }
57        };
58
59        let client_name = fs.client_name.clone();
60        let conn_fn = fs.connect_data_node.clone();
61        let blk_stream = match locations.blocks.first().cloned() {
62            Some(block) => Some(create_blk_stream(
63                block,
64                &conn_fn,
65                client_name.clone(),
66                self.checksum,
67                0,
68            )?),
69            None => None,
70        };
71        Ok(FileReader {
72            connect_data_node: conn_fn,
73            locations,
74            read: 0,
75            block_idx: 0,
76            blk_stream,
77            client_name,
78            checksum: self.checksum,
79            metadata: info_fs,
80        })
81    }
82}
83
84fn create_blk_stream<D: Read + Write>(
85    block: hdfs_types::hdfs::LocatedBlockProto,
86    conn_fn: &Arc<dyn Fn(&DatanodeIdProto, IOType) -> Result<D, io::Error>>,
87    client_name: String,
88    checksum: Option<bool>,
89    offset: u64,
90) -> Result<BlockReadStream<D>, HDFSError> {
91    let stream =
92        block
93            .locs
94            .iter()
95            .enumerate()
96            .find_map(|(idx, loc)| match conn_fn(&loc.id, IOType::Read) {
97                Ok(stream) => Some(stream),
98                Err(e) => {
99                    tracing::info!(
100                        "try {} location of block {} failed {}",
101                        idx + 1,
102                        block.b.block_id,
103                        e
104                    );
105                    None
106                }
107            });
108    let stream = stream.ok_or_else(|| HDFSError::NoAvailableLocation)?;
109    let blk_stream = BlockReadStream::new(client_name, stream, offset, checksum, block)?;
110    Ok(blk_stream)
111}
112
113pub struct FileReader<D: Read + Write> {
114    connect_data_node: Arc<dyn Fn(&DatanodeIdProto, IOType) -> io::Result<D>>,
115    locations: LocatedBlocksProto,
116    read: usize,
117    block_idx: usize,
118    client_name: String,
119    blk_stream: Option<BlockReadStream<D>>,
120    checksum: Option<bool>,
121    metadata: HdfsFileStatusProto,
122}
123
124impl<D: Read + Write> Read for FileReader<D> {
125    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
126        if let Some(stream) = &mut self.blk_stream {
127            let num = stream.read(buf)?;
128            self.read += num;
129            if stream.remaining() == 0 {
130                self.block_idx += 1;
131                if let Some(block) = self.locations.blocks.get(self.block_idx).cloned() {
132                    let blk_stream = create_blk_stream(
133                        block,
134                        &self.connect_data_node,
135                        self.client_name.clone(),
136                        self.checksum,
137                        0,
138                    )?;
139                    self.blk_stream = Some(blk_stream);
140                }
141            }
142            Ok(num)
143        } else {
144            Ok(0)
145        }
146    }
147}
148
149impl<D: Read + Write> io::Seek for FileReader<D> {
150    fn seek(&mut self, pos: SeekFrom) -> io::Result<u64> {
151        match pos {
152            SeekFrom::Start(pos) => self.seek_from_start(pos),
153            SeekFrom::End(pos) => {
154                let pos = self.locations.file_length as i64 + pos;
155                if pos < 0 {
156                    return Err(io::Error::new(
157                        io::ErrorKind::InvalidInput,
158                        "seek before byte 0",
159                    ));
160                }
161                self.seek_from_start(pos as u64)
162            }
163            SeekFrom::Current(pos) => {
164                let pos = self.read as i64 + pos;
165                if pos < 0 {
166                    return Err(io::Error::new(
167                        io::ErrorKind::InvalidInput,
168                        "seek before byte 0",
169                    ));
170                }
171                self.seek_from_start(pos as u64)
172            }
173        }
174    }
175}
176
177impl<D: Read + Write> FileReader<D> {
178    pub fn metadata(&self) -> HdfsFileStatusProto {
179        self.metadata.clone()
180    }
181
182    fn seek_from_start(&mut self, pos: u64) -> Result<u64, io::Error> {
183        let locations = self.locations.clone();
184        if pos >= locations.file_length {
185            self.read = locations.file_length as usize;
186            if let Some(block) = locations.blocks.last().cloned() {
187                let offset = block.b.num_bytes();
188                self.blk_stream = Some(create_blk_stream(
189                    block,
190                    &self.connect_data_node,
191                    self.client_name.clone(),
192                    self.checksum,
193                    offset,
194                )?);
195            }
196            if !self.locations.blocks.is_empty() {
197                self.block_idx = self.locations.blocks.len() - 1;
198            }
199            Ok(locations.file_length)
200        } else {
201            let (blk_idx, block) = locations
202                .blocks
203                .clone()
204                .into_iter()
205                .enumerate()
206                .find(|(_, blk)| blk.offset + blk.b.num_bytes() > pos)
207                .ok_or_else(|| io::Error::new(io::ErrorKind::Other, "no matched block found"))?;
208            let offset = pos - block.offset;
209            self.blk_stream = Some(create_blk_stream(
210                block,
211                &self.connect_data_node,
212                self.client_name.clone(),
213                self.checksum,
214                offset,
215            )?);
216            self.block_idx = blk_idx;
217            self.read = pos as usize;
218            Ok(pos)
219        }
220    }
221}