1use std::{
2 io::{self, BufReader, BufWriter, Read, Write},
3 net::{TcpStream, ToSocketAddrs},
4 path::Path,
5 sync::Arc,
6};
7
8use crate::{hrpc::HRpc, HDFSError};
9use hdfs_types::hdfs::{DatanodeIdProto, GetListingRequestProto, HdfsFileStatusProto};
10
11const CLIENT_NAME: &str = "hdfs-rust-client";
12
13mod writer;
14pub use writer::{FileWriter, WriterOptions};
15mod reader;
16pub use reader::{FileReader, ReaderOptions};
17
18#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
20pub struct FSConfig {
21 pub name_node: String,
23 pub port: u16,
25 pub user: String,
27}
28
29#[derive(Debug, Clone)]
30pub struct ClientConfig {
31 pub real_user: Option<String>,
32 pub effective_user: Option<String>,
33 pub name_node: Vec<String>,
34 pub connection_timeout: u64,
35 pub use_hostname: bool,
36 pub write_buf_size: usize,
37 pub read_buf_size: usize,
38 pub tcp_keepalived: Option<u64>,
39 pub no_delay: bool,
40}
41
42impl Default for ClientConfig {
43 fn default() -> Self {
44 Self {
45 real_user: Default::default(),
46 effective_user: Default::default(),
47 name_node: vec!["127.0.0.1:9000".into()],
48 use_hostname: true,
49 write_buf_size: 64 * 1024,
50 read_buf_size: 64 * 1024,
51 connection_timeout: 30,
52 tcp_keepalived: Some(30),
53 no_delay: true,
54 }
55 }
56}
57
58pub struct BufStream<S: Read + Write>(pub BufReader<Wrapped<S>>);
59
60impl<S: Read + Write> BufStream<S> {
61 pub fn new(stream: S) -> Self {
62 Self(BufReader::new(Wrapped(BufWriter::new(stream))))
63 }
64
65 pub fn with(stream: S, read_buf: usize, write_buf: usize) -> Self {
66 Self(BufReader::with_capacity(
67 read_buf,
68 Wrapped(BufWriter::with_capacity(write_buf, stream)),
69 ))
70 }
71}
72
73pub struct Wrapped<S: Read + Write>(pub BufWriter<S>);
74
75impl<S: Read + Write> Read for Wrapped<S> {
76 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
77 self.0.get_mut().read(buf)
78 }
79}
80
81impl<S: Read + Write> Write for Wrapped<S> {
82 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
83 self.0.write(buf)
84 }
85
86 fn flush(&mut self) -> io::Result<()> {
87 self.0.flush()
88 }
89}
90
91impl<S: Read + Write> Read for BufStream<S> {
92 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
93 self.0.read(buf)
94 }
95}
96
97impl<S: Read + Write> Write for BufStream<S> {
98 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
99 self.0.get_mut().write(buf)
100 }
101
102 fn flush(&mut self) -> io::Result<()> {
103 self.0.get_mut().flush()
104 }
105}
106
107#[derive(Debug, Clone, Copy)]
108pub enum IOType {
109 Read,
110 Write,
111 Append,
112}
113
114pub struct HDFS<S: Read + Write, D: Read + Write> {
115 client_name: String,
116 ipc: HRpc<S>,
117 create_ipc: Box<dyn Fn() -> io::Result<HRpc<S>>>,
118 connect_data_node: Arc<dyn Fn(&DatanodeIdProto, IOType) -> io::Result<D> + 'static>,
119}
120
121pub trait ToNameNodes {
122 fn to_name_nodes(self) -> Vec<String>;
123}
124
125impl ToNameNodes for &str {
126 fn to_name_nodes(self) -> Vec<String> {
127 vec![self.to_string()]
128 }
129}
130impl ToNameNodes for String {
131 fn to_name_nodes(self) -> Vec<String> {
132 vec![self]
133 }
134}
135
136impl ToNameNodes for Vec<String> {
137 fn to_name_nodes(self) -> Vec<String> {
138 self
139 }
140}
141
142impl<S: ToString> ToNameNodes for &[S] {
143 fn to_name_nodes(self) -> Vec<String> {
144 self.iter().map(|s| s.to_string()).collect()
145 }
146}
147
148impl HDFS<BufStream<TcpStream>, BufStream<TcpStream>> {
149 pub fn connect<S: ToString>(
150 name_node: impl ToNameNodes,
151 user: impl Into<Option<S>>,
152 ) -> io::Result<Self> {
153 let config = ClientConfig {
154 name_node: name_node.to_name_nodes(),
155 effective_user: user.into().map(|s| s.to_string()),
156 ..Default::default()
157 };
158 Self::connect_with(config)
159 }
160
161 pub fn connect_with(config: ClientConfig) -> io::Result<Self> {
162 let ClientConfig {
163 real_user,
164 effective_user,
165 name_node,
166 connection_timeout,
167 use_hostname,
168 write_buf_size,
169 read_buf_size,
170 tcp_keepalived,
171 no_delay,
172 } = config;
173 let timeout = std::time::Duration::from_secs(connection_timeout);
174 HDFS::new(
175 move || {
176 let stream = name_node
177 .iter()
178 .filter_map(|addr| addr.to_socket_addrs().ok())
179 .flatten()
180 .find_map(|addr| {
181 tracing::debug!(message="connect name node", addr=?addr);
182 TcpStream::connect_timeout(&addr, timeout).ok()
183 });
184 let stream = stream.ok_or(io::Error::new(
185 io::ErrorKind::Other,
186 "no available name node",
187 ))?;
188 let sk_ref = socket2::SockRef::from(&stream);
189 if let Some(keep) = tcp_keepalived {
190 let keepalive = socket2::TcpKeepalive::new()
191 .with_time(std::time::Duration::from_secs(keep));
192 sk_ref.set_tcp_keepalive(&keepalive)?;
193 }
194 sk_ref.set_nodelay(no_delay)?;
195 let stream = BufStream::with(stream, write_buf_size, read_buf_size);
196 let ipc = HRpc::connect(
197 stream,
198 effective_user.clone(),
199 real_user.clone(),
200 None,
201 None,
202 )?;
203 Ok(ipc)
204 },
205 move |datanode, _| {
206 let mut addrs = if use_hostname {
207 tracing::debug!(
208 message = "connect data node",
209 hostname = datanode.host_name,
210 port = datanode.xfer_port
211 );
212 (datanode.host_name.as_str(), datanode.xfer_port as u16)
213 .to_socket_addrs()
214 .map_err(|_| io::Error::new(io::ErrorKind::Other, "invalid address"))?
215 } else {
216 tracing::debug!(
217 message = "connect data node",
218 ip = datanode.ip_addr,
219 port = datanode.xfer_port
220 );
221 (datanode.ip_addr.as_str(), datanode.xfer_port as u16)
222 .to_socket_addrs()
223 .map_err(|_| io::Error::new(io::ErrorKind::Other, "invalid ip addr"))?
224 };
225 let addr = addrs
226 .next()
227 .ok_or_else(|| io::Error::new(io::ErrorKind::Other, "invalid ip address"))?;
228 let stream = TcpStream::connect_timeout(&addr, timeout)?;
229 let sk_ref = socket2::SockRef::from(&stream);
230 if let Some(keep) = tcp_keepalived {
231 let keepalive = socket2::TcpKeepalive::new()
232 .with_time(std::time::Duration::from_secs(keep));
233 sk_ref.set_tcp_keepalive(&keepalive)?;
234 }
235 sk_ref.set_nodelay(no_delay)?;
236 let stream = BufStream::with(stream, write_buf_size, read_buf_size);
237 Ok(stream)
238 },
239 )
240 }
241}
242
243impl<S: Read + Write, D: Read + Write> HDFS<S, D> {
244 pub fn new(
245 create_ipc: impl Fn() -> io::Result<HRpc<S>> + 'static,
246 connect_datanode: impl Fn(&DatanodeIdProto, IOType) -> io::Result<D> + 'static,
247 ) -> io::Result<Self> {
248 let client_name = format!("{}_{}", CLIENT_NAME, uuid::Uuid::new_v4());
249 let ipc = create_ipc()?;
250
251 Ok(Self {
252 client_name,
253 ipc,
254 create_ipc: Box::new(create_ipc),
255 connect_data_node: Arc::new(connect_datanode),
256 })
257 }
258
259 pub fn client_name(&self) -> &str {
260 &self.client_name
261 }
262
263 pub fn get_rpc(&mut self) -> &mut HRpc<S> {
264 &mut self.ipc
265 }
266
267 pub fn new_rpc(&mut self) -> Result<HRpc<S>, io::Error> {
268 (self.create_ipc)()
269 }
270
271 pub fn open(&mut self, path: impl AsRef<Path>) -> Result<FileReader<D>, HDFSError> {
273 ReaderOptions::default().open(path, self)
274 }
275
276 pub fn reader_options(&mut self) -> ReaderOptions {
278 ReaderOptions::default()
279 }
280
281 pub fn create(&mut self, path: impl AsRef<Path>) -> Result<FileWriter<S, D>, HDFSError> {
283 WriterOptions::default().create(path, self)
284 }
285
286 pub fn append(&mut self, path: impl AsRef<Path>) -> Result<FileWriter<S, D>, HDFSError> {
288 WriterOptions::default().append(path, self)
289 }
290
291 pub fn writer_options(&mut self) -> WriterOptions {
292 WriterOptions::default()
293 }
294
295 pub fn create_dir(&mut self, path: impl AsRef<Path>) -> Result<(), HDFSError> {
296 let req = hdfs_types::hdfs::MkdirsRequestProto {
297 src: path.as_ref().to_string_lossy().to_string(),
298 masked: hdfs_types::hdfs::FsPermissionProto { perm: 0o666 },
299 create_parent: false,
300 unmasked: None,
301 };
302 let (_, resp) = self.ipc.mkdirs(req)?;
303 assert!(resp.result);
304 Ok(())
305 }
306
307 pub fn create_dir_all(&mut self, path: impl AsRef<Path>) -> Result<(), HDFSError> {
308 let req = hdfs_types::hdfs::MkdirsRequestProto {
309 src: path.as_ref().to_string_lossy().to_string(),
310 masked: hdfs_types::hdfs::FsPermissionProto { perm: 0o666 },
311 create_parent: true,
312 unmasked: None,
313 };
314 let (_, resp) = self.ipc.mkdirs(req)?;
315 assert!(resp.result);
316 Ok(())
317 }
318
319 pub fn read(&mut self, path: impl AsRef<Path>) -> Result<Vec<u8>, HDFSError> {
321 let mut fd = self.open(path)?;
322 let mut buf = vec![0; fd.metadata().length as usize];
323 fd.read_to_end(&mut buf)?;
324 Ok(buf)
325 }
326
327 pub fn read_dir(
328 &mut self,
329 path: impl AsRef<Path>,
330 ) -> Result<Vec<HdfsFileStatusProto>, HDFSError> {
331 let mut start_after = vec![];
332 let mut result = vec![];
333 loop {
334 let req = GetListingRequestProto {
335 src: path.as_ref().to_string_lossy().to_string(),
336 start_after: start_after.clone(),
337 need_location: true,
338 };
339 let (_, resp) = self.get_rpc().get_listing(req)?;
340 if let Some(mut dir) = resp.dir_list {
341 result.append(&mut dir.partial_listing);
342 if dir.remaining_entries == 0 {
343 break;
344 }
345 if let Some(last) = result.last() {
346 start_after = last.path.clone();
347 }
348 } else {
349 break;
350 }
351 }
352 Ok(result)
353 }
354
355 pub fn remove_dir(&mut self, path: impl AsRef<Path>) -> Result<(), HDFSError> {
356 let req = hdfs_types::hdfs::DeleteRequestProto {
357 src: path.as_ref().to_string_lossy().to_string(),
358 recursive: false,
359 };
360 let (_, resp) = self.ipc.delete(req)?;
361 assert!(resp.result);
362 Ok(())
363 }
364
365 pub fn remove_dir_all(&mut self, path: impl AsRef<Path>) -> Result<(), HDFSError> {
366 let req = hdfs_types::hdfs::DeleteRequestProto {
367 src: path.as_ref().to_string_lossy().to_string(),
368 recursive: true,
369 };
370 let (_, resp) = self.ipc.delete(req)?;
371 assert!(resp.result);
372 Ok(())
373 }
374
375 pub fn remove_file(&mut self, path: impl AsRef<Path>) -> Result<(), HDFSError> {
376 let req = hdfs_types::hdfs::DeleteRequestProto {
377 src: path.as_ref().to_string_lossy().to_string(),
378 recursive: false,
379 };
380 self.ipc.delete(req)?;
381 Ok(())
382 }
383
384 pub fn rename(
385 &mut self,
386 from: impl AsRef<Path>,
387 to: impl AsRef<Path>,
388 ) -> Result<(), HDFSError> {
389 let req = hdfs_types::hdfs::Rename2RequestProto {
390 src: from.as_ref().to_string_lossy().to_string(),
391 dst: to.as_ref().to_string_lossy().to_string(),
392 overwrite_dest: true,
393 move_to_trash: Some(true),
394 };
395 self.ipc.rename2(req)?;
396 Ok(())
397 }
398}