1use std::collections::HashMap;
2use std::ffi::CStr;
3use std::fmt::Formatter;
4use std::rc::Rc;
5
6use lazy_static::lazy_static;
7use libc::{c_int, c_short, c_void};
8use log::*;
9use std::sync::RwLock;
10use std::{ffi::CString, marker::PhantomData};
11
12use crate::err::HdfsErr;
13use crate::*;
14
15const O_RDONLY: c_int = 0;
16const O_WRONLY: c_int = 1;
17const O_APPEND: c_int = 1024;
18
19#[derive(Clone, Debug, PartialEq, Eq, Hash)]
21pub struct ConnectionProperties {
22 pub namenode_host: String,
23 pub namenode_port: u16,
24 pub namenode_user: Option<String>,
25 pub kerberos_ticket_cache_path: Option<String>,
26}
27
28unsafe impl Send for HdfsFs {}
31unsafe impl Sync for HdfsFs {}
32
33lazy_static! {
34 static ref HDFS_CACHE: RwLock<HashMap<ConnectionProperties, HdfsFs>> =
35 RwLock::new(HashMap::new());
36}
37
38#[derive(Clone)]
42pub struct HdfsFs {
43 connection_properties: ConnectionProperties,
44 raw: hdfsFS,
45 _marker: PhantomData<()>,
46}
47
48impl std::fmt::Debug for HdfsFs {
49 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
50 f.debug_struct("HdfsFs")
51 .field("url", &self.connection_properties)
52 .finish()
53 }
54}
55
56impl HdfsFs {
57 pub fn new(connection_properties: ConnectionProperties) -> Result<HdfsFs, HdfsErr> {
62 HdfsFs::new_with_hdfs_params(connection_properties, HashMap::new())
63 }
64
65 pub fn new_with_hdfs_params(
73 connection_properties: ConnectionProperties,
74 hdfs_params: HashMap<String, String>,
75 ) -> Result<HdfsFs, HdfsErr> {
76 {
78 let cache = HDFS_CACHE
79 .read()
80 .expect("Could not aquire read lock on HDFS cache");
81 if let Some(hdfs_fs) = cache.get(&connection_properties) {
82 return Ok(hdfs_fs.clone());
83 }
84 }
85
86 let mut cache = HDFS_CACHE
87 .write()
88 .expect("Could not aquire write lock on HDFS cache");
89 let hdfsFs = cache
90 .entry(connection_properties.clone())
91 .or_insert_with(|| {
92 let hdfs_fs = create_hdfs_fs(connection_properties.clone(), hdfs_params)
93 .expect("Could not create HDFS connection");
94 HdfsFs {
95 connection_properties,
96 raw: hdfs_fs,
97 _marker: PhantomData,
98 }
99 });
100
101 Ok(hdfsFs.clone())
102 }
103
104 pub fn append(&self, path: &str) -> Result<HdfsFile, HdfsErr> {
106 if !self.exist(path) {
107 return Err(HdfsErr::FileNotFound(path.to_owned()));
108 }
109 let file = unsafe {
110 let cstr_path = CString::new(path).unwrap();
111 hdfsOpenFile(self.raw, cstr_path.as_ptr(), O_APPEND, 0, 0, 0)
112 };
113 self.new_hdfs_file(path, file)
114 }
115
116 #[inline]
118 pub fn create(&self, path: &str) -> Result<HdfsFile, HdfsErr> {
119 self.create_with_params(path, false, 0, 0, 0)
120 }
121
122 #[inline]
124 pub fn create_with_overwrite(&self, path: &str, overwrite: bool) -> Result<HdfsFile, HdfsErr> {
125 self.create_with_params(path, overwrite, 0, 0, 0)
126 }
127
128 pub fn create_with_params(
130 &self,
131 path: &str,
132 overwrite: bool,
133 buf_size: i32,
134 replica_num: i16,
135 block_size: i64,
136 ) -> Result<HdfsFile, HdfsErr> {
137 if !overwrite && self.exist(path) {
138 return Err(HdfsErr::FileAlreadyExists(path.to_owned()));
139 }
140 let file = unsafe {
141 let cstr_path = CString::new(path).unwrap();
142 hdfsOpenFile(
143 self.raw,
144 cstr_path.as_ptr(),
145 O_WRONLY,
146 buf_size as c_int,
147 replica_num as c_short,
148 block_size as tOffset,
149 )
150 };
151 self.new_hdfs_file(path, file)
152 }
153
154 pub fn get_file_status(&self, path: &str) -> Result<FileStatus, HdfsErr> {
155 let ptr = unsafe {
156 let cstr_path = CString::new(path).unwrap();
157 hdfsGetPathInfo(self.raw, cstr_path.as_ptr())
158 };
159 if ptr.is_null() {
160 Err(HdfsErr::Miscellaneous(format!(
161 "Could not get file status for {}",
162 path
163 )))
164 } else {
165 Ok(FileStatus::new(ptr))
166 }
167 }
168
169 pub fn delete(&self, path: &str, recursive: bool) -> Result<bool, HdfsErr> {
174 let res = unsafe {
175 let cstr_path = CString::new(path).unwrap();
176 hdfsDelete(self.raw, cstr_path.as_ptr(), recursive as c_int)
177 };
178 if res == 0 {
179 Ok(true)
180 } else {
181 Err(HdfsErr::Miscellaneous(format!(
182 "Could not delete path: {}",
183 path
184 )))
185 }
186 }
187
188 pub fn exist(&self, path: &str) -> bool {
190 (unsafe {
191 let cstr_path = CString::new(path).unwrap();
192 hdfsExists(self.raw, cstr_path.as_ptr())
193 } == 0)
194 }
195
196 pub fn list_status(&self, path: &str) -> Result<Vec<FileStatus>, HdfsErr> {
199 let mut entry_num: c_int = 0;
200 let ptr = unsafe {
201 let cstr_path = CString::new(path).unwrap();
202 hdfsListDirectory(self.raw, cstr_path.as_ptr(), &mut entry_num)
203 };
204 if ptr.is_null() {
205 Err(HdfsErr::Miscellaneous(format!(
206 "Could not list content of path: {}",
207 path
208 )))
209 } else {
210 let shared_ptr = Rc::new(HdfsFileInfoPtr::new_array(ptr, entry_num));
211
212 let list = (0..entry_num)
213 .into_iter()
214 .map(|idx| FileStatus::from_array(shared_ptr.clone(), idx as u32))
215 .collect::<Vec<FileStatus>>();
216
217 Ok(list)
218 }
219 }
220
221 pub fn mkdir(&self, path: &str) -> Result<bool, HdfsErr> {
222 let res = unsafe {
223 let cstr_path = CString::new(path).unwrap();
224 hdfsCreateDirectory(self.raw, cstr_path.as_ptr())
225 };
226 if res == 0 {
227 Ok(true)
228 } else {
229 Err(HdfsErr::Miscellaneous(format!(
230 "Could not create directory at path: {}",
231 path
232 )))
233 }
234 }
235
236 #[inline]
237 pub fn open(&self, path: &str) -> Result<HdfsFile, HdfsErr> {
238 self.open_with_buf_size(path, 0)
239 }
240
241 pub fn open_with_buf_size(&self, path: &str, buf_size: i32) -> Result<HdfsFile, HdfsErr> {
242 let file = unsafe {
243 let cstr_path = CString::new(path).unwrap();
244 hdfsOpenFile(
245 self.raw,
246 cstr_path.as_ptr(),
247 O_RDONLY,
248 buf_size as c_int,
249 0,
250 0,
251 )
252 };
253 self.new_hdfs_file(path, file)
254 }
255
256 pub fn open_for_writing(&self, path: &str) -> Result<HdfsFile, HdfsErr> {
257 let file = unsafe {
258 let cstr_path = CString::new(path).unwrap();
259 hdfsOpenFile(self.raw, cstr_path.as_ptr(), O_WRONLY, 0, 0, 0)
260 };
261 self.new_hdfs_file(path, file)
262 }
263
264 fn new_hdfs_file(&self, path: &str, file: hdfsFile) -> Result<HdfsFile, HdfsErr> {
265 if file.is_null() {
266 Err(HdfsErr::Miscellaneous(format!(
267 "Could not open HDFS file at path {}",
268 path
269 )))
270 } else {
271 Ok(HdfsFile {
272 fs: self.clone(),
273 path: path.to_owned(),
274 file,
275 _market: PhantomData,
276 })
277 }
278 }
279
280 pub fn rename(&self, old_path: &str, new_path: &str) -> Result<bool, HdfsErr> {
287 let ret = unsafe {
288 let cstr_old_path = CString::new(old_path).unwrap();
289 let cstr_new_path = CString::new(new_path).unwrap();
290 hdfsRename(self.raw, cstr_old_path.as_ptr(), cstr_new_path.as_ptr())
291 };
292 if ret == 0 {
293 Ok(true)
294 } else {
295 Err(HdfsErr::Miscellaneous(format!(
296 "Could not reanme {} to {}",
297 old_path, new_path
298 )))
299 }
300 }
301}
302
303struct HdfsFileInfoPtr {
307 pub ptr: *const hdfsFileInfo,
308 pub len: i32,
309}
310
311impl Drop for HdfsFileInfoPtr {
312 fn drop(&mut self) {
313 unsafe { hdfsFreeFileInfo(self.ptr as *mut hdfsFileInfo, self.len) };
314 }
315}
316
317impl HdfsFileInfoPtr {
318 fn new(ptr: *const hdfsFileInfo) -> HdfsFileInfoPtr {
319 HdfsFileInfoPtr { ptr, len: 1 }
320 }
321
322 pub fn new_array(ptr: *const hdfsFileInfo, len: i32) -> HdfsFileInfoPtr {
323 HdfsFileInfoPtr { ptr, len }
324 }
325}
326
327pub struct FileStatus {
329 raw: Rc<HdfsFileInfoPtr>,
330 idx: u32,
331 _marker: PhantomData<()>,
332}
333
334impl FileStatus {
335 #[inline]
337 fn new(ptr: *const hdfsFileInfo) -> FileStatus {
338 FileStatus {
339 raw: Rc::new(HdfsFileInfoPtr::new(ptr)),
340 idx: 0,
341 _marker: PhantomData,
342 }
343 }
344
345 #[inline]
347 fn from_array(raw: Rc<HdfsFileInfoPtr>, idx: u32) -> FileStatus {
348 FileStatus {
349 raw,
350 idx,
351 _marker: PhantomData,
352 }
353 }
354
355 #[inline]
357 fn ptr(&self) -> *const hdfsFileInfo {
358 unsafe { self.raw.ptr.offset(self.idx as isize) }
359 }
360
361 #[inline]
363 pub fn name(&self) -> &str {
364 let slice = unsafe { CStr::from_ptr((*self.ptr()).mName) }.to_bytes();
365 std::str::from_utf8(slice).unwrap()
366 }
367
368 #[inline]
370 pub fn is_file(&self) -> bool {
371 match unsafe { &*self.ptr() }.mKind {
372 tObjectKind::kObjectKindFile => true,
373 tObjectKind::kObjectKindDirectory => false,
374 }
375 }
376
377 #[inline]
379 pub fn is_directory(&self) -> bool {
380 match unsafe { &*self.ptr() }.mKind {
381 tObjectKind::kObjectKindFile => false,
382 tObjectKind::kObjectKindDirectory => true,
383 }
384 }
385
386 #[inline]
388 pub fn owner(&self) -> &str {
389 let slice = unsafe { CStr::from_ptr((*self.ptr()).mOwner) }.to_bytes();
390 std::str::from_utf8(slice).unwrap()
391 }
392
393 #[inline]
395 pub fn group(&self) -> &str {
396 let slice = unsafe { CStr::from_ptr((*self.ptr()).mGroup) }.to_bytes();
397 std::str::from_utf8(slice).unwrap()
398 }
399
400 #[inline]
402 pub fn permission(&self) -> i16 {
403 unsafe { &*self.ptr() }.mPermissions as i16
404 }
405
406 #[allow(clippy::len_without_is_empty)]
408 #[inline]
409 pub fn len(&self) -> usize {
410 unsafe { &*self.ptr() }.mSize as usize
411 }
412
413 #[inline]
415 pub fn block_size(&self) -> usize {
416 unsafe { &*self.ptr() }.mBlockSize as usize
417 }
418
419 #[inline]
421 pub fn replica_count(&self) -> i16 {
422 unsafe { &*self.ptr() }.mReplication as i16
423 }
424
425 #[inline]
427 pub fn last_modified(&self) -> time_t {
428 unsafe { &*self.ptr() }.mLastMod
429 }
430
431 #[inline]
433 pub fn last_access(&self) -> time_t {
434 unsafe { &*self.ptr() }.mLastAccess
435 }
436}
437
438#[derive(Clone)]
442pub struct HdfsFile {
443 fs: HdfsFs,
444 path: String,
445 file: hdfsFile,
446 _market: PhantomData<()>,
447}
448impl std::fmt::Debug for HdfsFile {
449 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
450 f.debug_struct("HdfsFile")
451 .field("connection_properties", &self.fs.connection_properties)
452 .field("path", &self.path)
453 .finish()
454 }
455}
456
457impl HdfsFile {
458 #[inline]
459 pub fn fs(&self) -> &HdfsFs {
460 &self.fs
461 }
462
463 #[inline]
464 pub fn path(&self) -> &str {
465 &self.path
466 }
467
468 pub fn available(&self) -> Result<i32, HdfsErr> {
470 let ret = unsafe { hdfsAvailable(self.fs.raw, self.file) };
471 if ret < 0 {
472 Err(HdfsErr::Miscellaneous(format!(
473 "Could not determine HDFS availability for {}",
474 self.path
475 )))
476 } else {
477 Ok(ret)
478 }
479 }
480
481 pub fn close(&self) -> Result<bool, HdfsErr> {
483 if unsafe { hdfsCloseFile(self.fs.raw, self.file) } == 0 {
484 Ok(true)
485 } else {
486 Err(HdfsErr::Miscellaneous(format!(
487 "Could not close {}",
488 self.path
489 )))
490 }
491 }
492
493 pub fn get_file_status(&self) -> Result<FileStatus, HdfsErr> {
495 self.fs.get_file_status(self.path())
496 }
497
498 pub fn read(&self, buf: &mut [u8]) -> Result<i32, HdfsErr> {
500 let read_len = unsafe {
501 hdfsRead(
502 self.fs.raw,
503 self.file,
504 buf.as_ptr() as *mut c_void,
505 buf.len() as tSize,
506 )
507 };
508 if read_len > 0 {
509 Ok(read_len as i32)
510 } else {
511 Err(HdfsErr::Miscellaneous(format!(
512 "Failed to read from {}",
513 self.path
514 )))
515 }
516 }
517
518 pub fn seek(&self, offset: u64) -> bool {
520 (unsafe { hdfsSeek(self.fs.raw, self.file, offset as tOffset) }) == 0
521 }
522
523 pub fn write(&self, buf: &[u8]) -> Result<i32, HdfsErr> {
524 let written_len = unsafe {
525 hdfsWrite(
526 self.fs.raw,
527 self.file,
528 buf.as_ptr() as *mut c_void,
529 buf.len() as tSize,
530 )
531 };
532 if written_len > 0 {
533 Ok(written_len)
534 } else {
535 Err(HdfsErr::Miscellaneous(format!(
536 "Failed to write to {}",
537 self.path
538 )))
539 }
540 }
541}
542
543fn create_hdfs_fs(
551 connection_properties: ConnectionProperties,
552 hdfs_params: HashMap<String, String>,
553) -> Result<hdfsFS, HdfsErr> {
554 let hdfs_fs = unsafe {
555 let hdfs_builder = hdfsNewBuilder();
556
557 let cstr_host = CString::new(connection_properties.namenode_host.as_bytes()).unwrap();
558 for (k, v) in hdfs_params {
559 let cstr_k = CString::new(k).unwrap();
560 let cstr_v = CString::new(v).unwrap();
561 hdfsBuilderConfSetStr(hdfs_builder, cstr_k.as_ptr(), cstr_v.as_ptr());
562 }
563 hdfsBuilderSetNameNode(hdfs_builder, cstr_host.as_ptr());
564 hdfsBuilderSetNameNodePort(hdfs_builder, connection_properties.namenode_port);
565
566 if let Some(user) = connection_properties.namenode_user.clone() {
567 let cstr_user = CString::new(user.as_bytes()).unwrap();
568 hdfsBuilderSetUserName(hdfs_builder, cstr_user.as_ptr());
569 }
570
571 if let Some(kerb_ticket_cache_path) =
572 connection_properties.kerberos_ticket_cache_path.clone()
573 {
574 let cstr_kerb_ticket_cache_path =
575 CString::new(kerb_ticket_cache_path.as_bytes()).unwrap();
576 hdfsBuilderSetKerbTicketCachePath(hdfs_builder, cstr_kerb_ticket_cache_path.as_ptr());
577 }
578
579 info!(
580 "Connecting to Namenode, host: {}, port: {}, user: {:?}, krb_ticket_cache: {:?}",
581 connection_properties.namenode_host,
582 connection_properties.namenode_port,
583 connection_properties.namenode_user,
584 connection_properties.kerberos_ticket_cache_path
585 );
586
587 hdfsBuilderConnect(hdfs_builder)
588 };
589
590 if hdfs_fs.is_null() {
591 Err(HdfsErr::CannotConnectToNameNode(format!(
592 "{}:{}",
593 connection_properties.namenode_host, connection_properties.namenode_port
594 )))
595 } else {
596 Ok(hdfs_fs)
597 }
598}