hdfs-client 0.2.0

hdfs rust native client
Documentation
use std::{
    io::Read,
    io::{self, SeekFrom, Write},
    path::Path,
    sync::Arc,
};

use hdfs_types::hdfs::{
    hdfs_file_status_proto::FileType, DatanodeIdProto, GetBlockLocationsRequestProto,
    GetFileInfoRequestProto, HdfsFileStatusProto, LocatedBlocksProto,
};

use crate::{data_transfer::BlockReadStream, HDFSError, IOType, HDFS};

#[derive(Default)]
pub struct ReaderOptions {
    pub checksum: Option<bool>,
}

impl ReaderOptions {
    pub fn open<S: Read + Write, D: Read + Write>(
        self,
        path: impl AsRef<Path>,
        fs: &mut HDFS<S, D>,
    ) -> Result<FileReader<D>, HDFSError> {
        let src = path.as_ref().to_string_lossy().to_string();
        let (_, info) = fs
            .ipc
            .get_file_info(GetFileInfoRequestProto { src: src.clone() })?;
        let info_fs = match info.fs {
            Some(fs) => fs,
            None => {
                return Err(io::Error::new(
                    io::ErrorKind::NotFound,
                    path.as_ref().display().to_string(),
                )
                .into());
            }
        };
        if !matches!(info_fs.file_type(), FileType::IsFile) {
            return Err(io::Error::new(io::ErrorKind::Other, "expect file type").into());
        }
        let (header, resp) = fs.ipc.get_block_locations(GetBlockLocationsRequestProto {
            src: src.clone(),
            offset: 0,
            length: info_fs.length,
        })?;
        let locations = match resp.locations {
            Some(loc) => loc,
            None => {
                return Err(io::Error::new(
                    io::ErrorKind::InvalidData,
                    format!("response header {header:?}"),
                )
                .into());
            }
        };

        let client_name = fs.client_name.clone();
        let conn_fn = fs.connect_data_node.clone();
        let blk_stream = match locations.blocks.first().cloned() {
            Some(block) => Some(create_blk_stream(
                block,
                &conn_fn,
                client_name.clone(),
                self.checksum,
                0,
            )?),
            None => None,
        };
        Ok(FileReader {
            connect_data_node: conn_fn,
            locations,
            read: 0,
            block_idx: 0,
            blk_stream,
            client_name,
            checksum: self.checksum,
            metadata: info_fs,
        })
    }
}

fn create_blk_stream<D: Read + Write>(
    block: hdfs_types::hdfs::LocatedBlockProto,
    conn_fn: &Arc<dyn Fn(&DatanodeIdProto, IOType) -> Result<D, io::Error>>,
    client_name: String,
    checksum: Option<bool>,
    offset: u64,
) -> Result<BlockReadStream<D>, HDFSError> {
    let stream =
        block
            .locs
            .iter()
            .enumerate()
            .find_map(|(idx, loc)| match conn_fn(&loc.id, IOType::Read) {
                Ok(stream) => Some(stream),
                Err(e) => {
                    tracing::info!(
                        "try {} location of block {} failed {}",
                        idx + 1,
                        block.b.block_id,
                        e
                    );
                    None
                }
            });
    let stream = stream.ok_or_else(|| HDFSError::NoAvailableLocation)?;
    let blk_stream = BlockReadStream::new(client_name, stream, offset, checksum, block)?;
    Ok(blk_stream)
}

pub struct FileReader<D: Read + Write> {
    connect_data_node: Arc<dyn Fn(&DatanodeIdProto, IOType) -> io::Result<D>>,
    locations: LocatedBlocksProto,
    read: usize,
    block_idx: usize,
    client_name: String,
    blk_stream: Option<BlockReadStream<D>>,
    checksum: Option<bool>,
    metadata: HdfsFileStatusProto,
}

impl<D: Read + Write> Read for FileReader<D> {
    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
        if let Some(stream) = &mut self.blk_stream {
            let num = stream.read(buf)?;
            self.read += num;
            if stream.remaining() == 0 {
                self.block_idx += 1;
                if let Some(block) = self.locations.blocks.get(self.block_idx).cloned() {
                    let blk_stream = create_blk_stream(
                        block,
                        &self.connect_data_node,
                        self.client_name.clone(),
                        self.checksum,
                        0,
                    )?;
                    self.blk_stream = Some(blk_stream);
                }
            }
            Ok(num)
        } else {
            Ok(0)
        }
    }
}

impl<D: Read + Write> io::Seek for FileReader<D> {
    fn seek(&mut self, pos: SeekFrom) -> io::Result<u64> {
        match pos {
            SeekFrom::Start(pos) => self.seek_from_start(pos),
            SeekFrom::End(pos) => {
                let pos = self.locations.file_length as i64 + pos;
                if pos < 0 {
                    return Err(io::Error::new(
                        io::ErrorKind::InvalidInput,
                        "seek before byte 0",
                    ));
                }
                self.seek_from_start(pos as u64)
            }
            SeekFrom::Current(pos) => {
                let pos = self.read as i64 + pos;
                if pos < 0 {
                    return Err(io::Error::new(
                        io::ErrorKind::InvalidInput,
                        "seek before byte 0",
                    ));
                }
                self.seek_from_start(pos as u64)
            }
        }
    }
}

impl<D: Read + Write> FileReader<D> {
    pub fn metadata(&self) -> HdfsFileStatusProto {
        self.metadata.clone()
    }

    fn seek_from_start(&mut self, pos: u64) -> Result<u64, io::Error> {
        let locations = self.locations.clone();
        if pos >= locations.file_length {
            self.read = locations.file_length as usize;
            if let Some(block) = locations.blocks.last().cloned() {
                let offset = block.b.num_bytes();
                self.blk_stream = Some(create_blk_stream(
                    block,
                    &self.connect_data_node,
                    self.client_name.clone(),
                    self.checksum,
                    offset,
                )?);
            }
            if !self.locations.blocks.is_empty() {
                self.block_idx = self.locations.blocks.len() - 1;
            }
            Ok(locations.file_length)
        } else {
            let (blk_idx, block) = locations
                .blocks
                .clone()
                .into_iter()
                .enumerate()
                .find(|(_, blk)| blk.offset + blk.b.num_bytes() > pos)
                .ok_or_else(|| io::Error::new(io::ErrorKind::Other, "no matched block found"))?;
            let offset = pos - block.offset;
            self.blk_stream = Some(create_blk_stream(
                block,
                &self.connect_data_node,
                self.client_name.clone(),
                self.checksum,
                offset,
            )?);
            self.block_idx = blk_idx;
            self.read = pos as usize;
            Ok(pos)
        }
    }
}