hdfs_client/
fs.rs

1use std::{
2    io::{self, BufReader, BufWriter, Read, Write},
3    net::{TcpStream, ToSocketAddrs},
4    path::Path,
5    sync::Arc,
6};
7
8use crate::{hrpc::HRpc, HDFSError};
9use hdfs_types::hdfs::{DatanodeIdProto, GetListingRequestProto, HdfsFileStatusProto};
10
11const CLIENT_NAME: &str = "hdfs-rust-client";
12
13mod writer;
14pub use writer::{FileWriter, WriterOptions};
15mod reader;
16pub use reader::{FileReader, ReaderOptions};
17
18/// HDFS 协议配置
19#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
20pub struct FSConfig {
21    /// name node host name or ip
22    pub name_node: String,
23    /// name node ipc port, default ipc port is 9000
24    pub port: u16,
25    /// username
26    pub user: String,
27}
28
29#[derive(Debug, Clone)]
30pub struct ClientConfig {
31    pub real_user: Option<String>,
32    pub effective_user: Option<String>,
33    pub name_node: Vec<String>,
34    pub connection_timeout: u64,
35    pub use_hostname: bool,
36    pub write_buf_size: usize,
37    pub read_buf_size: usize,
38    pub tcp_keepalived: Option<u64>,
39    pub no_delay: bool,
40}
41
42impl Default for ClientConfig {
43    fn default() -> Self {
44        Self {
45            real_user: Default::default(),
46            effective_user: Default::default(),
47            name_node: vec!["127.0.0.1:9000".into()],
48            use_hostname: true,
49            write_buf_size: 64 * 1024,
50            read_buf_size: 64 * 1024,
51            connection_timeout: 30,
52            tcp_keepalived: Some(30),
53            no_delay: true,
54        }
55    }
56}
57
58pub struct BufStream<S: Read + Write>(pub BufReader<Wrapped<S>>);
59
60impl<S: Read + Write> BufStream<S> {
61    pub fn new(stream: S) -> Self {
62        Self(BufReader::new(Wrapped(BufWriter::new(stream))))
63    }
64
65    pub fn with(stream: S, read_buf: usize, write_buf: usize) -> Self {
66        Self(BufReader::with_capacity(
67            read_buf,
68            Wrapped(BufWriter::with_capacity(write_buf, stream)),
69        ))
70    }
71}
72
73pub struct Wrapped<S: Read + Write>(pub BufWriter<S>);
74
75impl<S: Read + Write> Read for Wrapped<S> {
76    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
77        self.0.get_mut().read(buf)
78    }
79}
80
81impl<S: Read + Write> Write for Wrapped<S> {
82    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
83        self.0.write(buf)
84    }
85
86    fn flush(&mut self) -> io::Result<()> {
87        self.0.flush()
88    }
89}
90
91impl<S: Read + Write> Read for BufStream<S> {
92    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
93        self.0.read(buf)
94    }
95}
96
97impl<S: Read + Write> Write for BufStream<S> {
98    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
99        self.0.get_mut().write(buf)
100    }
101
102    fn flush(&mut self) -> io::Result<()> {
103        self.0.get_mut().flush()
104    }
105}
106
107#[derive(Debug, Clone, Copy)]
108pub enum IOType {
109    Read,
110    Write,
111    Append,
112}
113
114pub struct HDFS<S: Read + Write, D: Read + Write> {
115    client_name: String,
116    ipc: HRpc<S>,
117    create_ipc: Box<dyn Fn() -> io::Result<HRpc<S>>>,
118    connect_data_node: Arc<dyn Fn(&DatanodeIdProto, IOType) -> io::Result<D> + 'static>,
119}
120
121pub trait ToNameNodes {
122    fn to_name_nodes(self) -> Vec<String>;
123}
124
125impl ToNameNodes for &str {
126    fn to_name_nodes(self) -> Vec<String> {
127        vec![self.to_string()]
128    }
129}
130impl ToNameNodes for String {
131    fn to_name_nodes(self) -> Vec<String> {
132        vec![self]
133    }
134}
135
136impl ToNameNodes for Vec<String> {
137    fn to_name_nodes(self) -> Vec<String> {
138        self
139    }
140}
141
142impl<S: ToString> ToNameNodes for &[S] {
143    fn to_name_nodes(self) -> Vec<String> {
144        self.iter().map(|s| s.to_string()).collect()
145    }
146}
147
148impl HDFS<BufStream<TcpStream>, BufStream<TcpStream>> {
149    pub fn connect<S: ToString>(
150        name_node: impl ToNameNodes,
151        user: impl Into<Option<S>>,
152    ) -> io::Result<Self> {
153        let config = ClientConfig {
154            name_node: name_node.to_name_nodes(),
155            effective_user: user.into().map(|s| s.to_string()),
156            ..Default::default()
157        };
158        Self::connect_with(config)
159    }
160
161    pub fn connect_with(config: ClientConfig) -> io::Result<Self> {
162        let ClientConfig {
163            real_user,
164            effective_user,
165            name_node,
166            connection_timeout,
167            use_hostname,
168            write_buf_size,
169            read_buf_size,
170            tcp_keepalived,
171            no_delay,
172        } = config;
173        let timeout = std::time::Duration::from_secs(connection_timeout);
174        HDFS::new(
175            move || {
176                let stream = name_node
177                    .iter()
178                    .filter_map(|addr| addr.to_socket_addrs().ok())
179                    .flatten()
180                    .find_map(|addr| {
181                        tracing::debug!(message="connect name node", addr=?addr);
182                        TcpStream::connect_timeout(&addr, timeout).ok()
183                    });
184                let stream = stream.ok_or(io::Error::new(
185                    io::ErrorKind::Other,
186                    "no available name node",
187                ))?;
188                let sk_ref = socket2::SockRef::from(&stream);
189                if let Some(keep) = tcp_keepalived {
190                    let keepalive = socket2::TcpKeepalive::new()
191                        .with_time(std::time::Duration::from_secs(keep));
192                    sk_ref.set_tcp_keepalive(&keepalive)?;
193                }
194                sk_ref.set_nodelay(no_delay)?;
195                let stream = BufStream::with(stream, write_buf_size, read_buf_size);
196                let ipc = HRpc::connect(
197                    stream,
198                    effective_user.clone(),
199                    real_user.clone(),
200                    None,
201                    None,
202                )?;
203                Ok(ipc)
204            },
205            move |datanode, _| {
206                let mut addrs = if use_hostname {
207                    tracing::debug!(
208                        message = "connect data node",
209                        hostname = datanode.host_name,
210                        port = datanode.xfer_port
211                    );
212                    (datanode.host_name.as_str(), datanode.xfer_port as u16)
213                        .to_socket_addrs()
214                        .map_err(|_| io::Error::new(io::ErrorKind::Other, "invalid address"))?
215                } else {
216                    tracing::debug!(
217                        message = "connect data node",
218                        ip = datanode.ip_addr,
219                        port = datanode.xfer_port
220                    );
221                    (datanode.ip_addr.as_str(), datanode.xfer_port as u16)
222                        .to_socket_addrs()
223                        .map_err(|_| io::Error::new(io::ErrorKind::Other, "invalid ip addr"))?
224                };
225                let addr = addrs
226                    .next()
227                    .ok_or_else(|| io::Error::new(io::ErrorKind::Other, "invalid ip address"))?;
228                let stream = TcpStream::connect_timeout(&addr, timeout)?;
229                let sk_ref = socket2::SockRef::from(&stream);
230                if let Some(keep) = tcp_keepalived {
231                    let keepalive = socket2::TcpKeepalive::new()
232                        .with_time(std::time::Duration::from_secs(keep));
233                    sk_ref.set_tcp_keepalive(&keepalive)?;
234                }
235                sk_ref.set_nodelay(no_delay)?;
236                let stream = BufStream::with(stream, write_buf_size, read_buf_size);
237                Ok(stream)
238            },
239        )
240    }
241}
242
243impl<S: Read + Write, D: Read + Write> HDFS<S, D> {
244    pub fn new(
245        create_ipc: impl Fn() -> io::Result<HRpc<S>> + 'static,
246        connect_datanode: impl Fn(&DatanodeIdProto, IOType) -> io::Result<D> + 'static,
247    ) -> io::Result<Self> {
248        let client_name = format!("{}_{}", CLIENT_NAME, uuid::Uuid::new_v4());
249        let ipc = create_ipc()?;
250
251        Ok(Self {
252            client_name,
253            ipc,
254            create_ipc: Box::new(create_ipc),
255            connect_data_node: Arc::new(connect_datanode),
256        })
257    }
258
259    pub fn client_name(&self) -> &str {
260        &self.client_name
261    }
262
263    pub fn get_rpc(&mut self) -> &mut HRpc<S> {
264        &mut self.ipc
265    }
266
267    pub fn new_rpc(&mut self) -> Result<HRpc<S>, io::Error> {
268        (self.create_ipc)()
269    }
270
271    /// open a file
272    pub fn open(&mut self, path: impl AsRef<Path>) -> Result<FileReader<D>, HDFSError> {
273        ReaderOptions::default().open(path, self)
274    }
275
276    /// use reader option to custom file read options
277    pub fn reader_options(&mut self) -> ReaderOptions {
278        ReaderOptions::default()
279    }
280
281    /// create a file
282    pub fn create(&mut self, path: impl AsRef<Path>) -> Result<FileWriter<S, D>, HDFSError> {
283        WriterOptions::default().create(path, self)
284    }
285
286    /// open a existing file and append content to it
287    pub fn append(&mut self, path: impl AsRef<Path>) -> Result<FileWriter<S, D>, HDFSError> {
288        WriterOptions::default().append(path, self)
289    }
290
291    pub fn writer_options(&mut self) -> WriterOptions {
292        WriterOptions::default()
293    }
294
295    pub fn create_dir(&mut self, path: impl AsRef<Path>) -> Result<(), HDFSError> {
296        let req = hdfs_types::hdfs::MkdirsRequestProto {
297            src: path.as_ref().to_string_lossy().to_string(),
298            masked: hdfs_types::hdfs::FsPermissionProto { perm: 0o666 },
299            create_parent: false,
300            unmasked: None,
301        };
302        let (_, resp) = self.ipc.mkdirs(req)?;
303        assert!(resp.result);
304        Ok(())
305    }
306
307    pub fn create_dir_all(&mut self, path: impl AsRef<Path>) -> Result<(), HDFSError> {
308        let req = hdfs_types::hdfs::MkdirsRequestProto {
309            src: path.as_ref().to_string_lossy().to_string(),
310            masked: hdfs_types::hdfs::FsPermissionProto { perm: 0o666 },
311            create_parent: true,
312            unmasked: None,
313        };
314        let (_, resp) = self.ipc.mkdirs(req)?;
315        assert!(resp.result);
316        Ok(())
317    }
318
319    /// read the entire contents of a file into a bytes vector.
320    pub fn read(&mut self, path: impl AsRef<Path>) -> Result<Vec<u8>, HDFSError> {
321        let mut fd = self.open(path)?;
322        let mut buf = vec![0; fd.metadata().length as usize];
323        fd.read_to_end(&mut buf)?;
324        Ok(buf)
325    }
326
327    pub fn read_dir(
328        &mut self,
329        path: impl AsRef<Path>,
330    ) -> Result<Vec<HdfsFileStatusProto>, HDFSError> {
331        let mut start_after = vec![];
332        let mut result = vec![];
333        loop {
334            let req = GetListingRequestProto {
335                src: path.as_ref().to_string_lossy().to_string(),
336                start_after: start_after.clone(),
337                need_location: true,
338            };
339            let (_, resp) = self.get_rpc().get_listing(req)?;
340            if let Some(mut dir) = resp.dir_list {
341                result.append(&mut dir.partial_listing);
342                if dir.remaining_entries == 0 {
343                    break;
344                }
345                if let Some(last) = result.last() {
346                    start_after = last.path.clone();
347                }
348            } else {
349                break;
350            }
351        }
352        Ok(result)
353    }
354
355    pub fn remove_dir(&mut self, path: impl AsRef<Path>) -> Result<(), HDFSError> {
356        let req = hdfs_types::hdfs::DeleteRequestProto {
357            src: path.as_ref().to_string_lossy().to_string(),
358            recursive: false,
359        };
360        let (_, resp) = self.ipc.delete(req)?;
361        assert!(resp.result);
362        Ok(())
363    }
364
365    pub fn remove_dir_all(&mut self, path: impl AsRef<Path>) -> Result<(), HDFSError> {
366        let req = hdfs_types::hdfs::DeleteRequestProto {
367            src: path.as_ref().to_string_lossy().to_string(),
368            recursive: true,
369        };
370        let (_, resp) = self.ipc.delete(req)?;
371        assert!(resp.result);
372        Ok(())
373    }
374
375    pub fn remove_file(&mut self, path: impl AsRef<Path>) -> Result<(), HDFSError> {
376        let req = hdfs_types::hdfs::DeleteRequestProto {
377            src: path.as_ref().to_string_lossy().to_string(),
378            recursive: false,
379        };
380        self.ipc.delete(req)?;
381        Ok(())
382    }
383
384    pub fn rename(
385        &mut self,
386        from: impl AsRef<Path>,
387        to: impl AsRef<Path>,
388    ) -> Result<(), HDFSError> {
389        let req = hdfs_types::hdfs::Rename2RequestProto {
390            src: from.as_ref().to_string_lossy().to_string(),
391            dst: to.as_ref().to_string_lossy().to_string(),
392            overwrite_dest: true,
393            move_to_trash: Some(true),
394        };
395        self.ipc.rename2(req)?;
396        Ok(())
397    }
398}