libhdfs3_sys/
hdfs3.rs

1use std::collections::HashMap;
2use std::ffi::CStr;
3use std::fmt::Formatter;
4use std::rc::Rc;
5
6use lazy_static::lazy_static;
7use libc::{c_int, c_short, c_void};
8use log::*;
9use std::sync::RwLock;
10use std::{ffi::CString, marker::PhantomData};
11
12use crate::err::HdfsErr;
13use crate::*;
14
15const O_RDONLY: c_int = 0;
16const O_WRONLY: c_int = 1;
17const O_APPEND: c_int = 1024;
18
19/// Encapsulate Namenode connection properties
20#[derive(Clone, Debug, PartialEq, Eq, Hash)]
21pub struct ConnectionProperties {
22    pub namenode_host: String,
23    pub namenode_port: u16,
24    pub namenode_user: Option<String>,
25    pub kerberos_ticket_cache_path: Option<String>,
26}
27
28/// since HDFS client handles are completely thread safe, here we implement Send + Sync trait
29/// for HdfsFs
30unsafe impl Send for HdfsFs {}
31unsafe impl Sync for HdfsFs {}
32
33lazy_static! {
34    static ref HDFS_CACHE: RwLock<HashMap<ConnectionProperties, HdfsFs>> =
35        RwLock::new(HashMap::new());
36}
37
38/// Hdfs Filesystem
39///
40/// It is basically thread safe because the native API for hdfsFs is thread-safe.
41#[derive(Clone)]
42pub struct HdfsFs {
43    connection_properties: ConnectionProperties,
44    raw: hdfsFS,
45    _marker: PhantomData<()>,
46}
47
48impl std::fmt::Debug for HdfsFs {
49    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
50        f.debug_struct("HdfsFs")
51            .field("url", &self.connection_properties)
52            .finish()
53    }
54}
55
56impl HdfsFs {
57    /// Create an instance of HdfsFs. A global cache is used to ensure that only one instance
58    /// is created per namenode uri.
59    ///
60    /// * connection_properties - Namenode connection parameters
61    pub fn new(connection_properties: ConnectionProperties) -> Result<HdfsFs, HdfsErr> {
62        HdfsFs::new_with_hdfs_params(connection_properties, HashMap::new())
63    }
64
65    /// Create an instance of HdfsFs. A global cache is used to ensure that only one instance
66    /// is created per namenode uri.
67    ///
68    /// * connection_properties - Namenode connection parameters
69    /// * hdfs_params - optional key value pairs that need to be passed to configure
70    ///   the HDFS client side.
71    ///   Example: key = 'dfs.domain.socket.path', value = '/var/lib/hadoop-fs/dn_socket'
72    pub fn new_with_hdfs_params(
73        connection_properties: ConnectionProperties,
74        hdfs_params: HashMap<String, String>,
75    ) -> Result<HdfsFs, HdfsErr> {
76        // Try to get from cache if an entry exists.
77        {
78            let cache = HDFS_CACHE
79                .read()
80                .expect("Could not aquire read lock on HDFS cache");
81            if let Some(hdfs_fs) = cache.get(&connection_properties) {
82                return Ok(hdfs_fs.clone());
83            }
84        }
85
86        let mut cache = HDFS_CACHE
87            .write()
88            .expect("Could not aquire write lock on HDFS cache");
89        let hdfsFs = cache
90            .entry(connection_properties.clone())
91            .or_insert_with(|| {
92                let hdfs_fs = create_hdfs_fs(connection_properties.clone(), hdfs_params)
93                    .expect("Could not create HDFS connection");
94                HdfsFs {
95                    connection_properties,
96                    raw: hdfs_fs,
97                    _marker: PhantomData,
98                }
99            });
100
101        Ok(hdfsFs.clone())
102    }
103
104    /// Open a file for append
105    pub fn append(&self, path: &str) -> Result<HdfsFile, HdfsErr> {
106        if !self.exist(path) {
107            return Err(HdfsErr::FileNotFound(path.to_owned()));
108        }
109        let file = unsafe {
110            let cstr_path = CString::new(path).unwrap();
111            hdfsOpenFile(self.raw, cstr_path.as_ptr(), O_APPEND, 0, 0, 0)
112        };
113        self.new_hdfs_file(path, file)
114    }
115
116    /// Create the given path as read-only
117    #[inline]
118    pub fn create(&self, path: &str) -> Result<HdfsFile, HdfsErr> {
119        self.create_with_params(path, false, 0, 0, 0)
120    }
121
122    /// Create the given path as writable
123    #[inline]
124    pub fn create_with_overwrite(&self, path: &str, overwrite: bool) -> Result<HdfsFile, HdfsErr> {
125        self.create_with_params(path, overwrite, 0, 0, 0)
126    }
127
128    /// Create the given path
129    pub fn create_with_params(
130        &self,
131        path: &str,
132        overwrite: bool,
133        buf_size: i32,
134        replica_num: i16,
135        block_size: i64,
136    ) -> Result<HdfsFile, HdfsErr> {
137        if !overwrite && self.exist(path) {
138            return Err(HdfsErr::FileAlreadyExists(path.to_owned()));
139        }
140        let file = unsafe {
141            let cstr_path = CString::new(path).unwrap();
142            hdfsOpenFile(
143                self.raw,
144                cstr_path.as_ptr(),
145                O_WRONLY,
146                buf_size as c_int,
147                replica_num as c_short,
148                block_size as tOffset,
149            )
150        };
151        self.new_hdfs_file(path, file)
152    }
153
154    pub fn get_file_status(&self, path: &str) -> Result<FileStatus, HdfsErr> {
155        let ptr = unsafe {
156            let cstr_path = CString::new(path).unwrap();
157            hdfsGetPathInfo(self.raw, cstr_path.as_ptr())
158        };
159        if ptr.is_null() {
160            Err(HdfsErr::Miscellaneous(format!(
161                "Could not get file status for {}",
162                path
163            )))
164        } else {
165            Ok(FileStatus::new(ptr))
166        }
167    }
168
169    /// Delete the content at the given path.
170    ///
171    /// * path - the path on the filesystem
172    /// * recursive - if true, delete the content recursively.
173    pub fn delete(&self, path: &str, recursive: bool) -> Result<bool, HdfsErr> {
174        let res = unsafe {
175            let cstr_path = CString::new(path).unwrap();
176            hdfsDelete(self.raw, cstr_path.as_ptr(), recursive as c_int)
177        };
178        if res == 0 {
179            Ok(true)
180        } else {
181            Err(HdfsErr::Miscellaneous(format!(
182                "Could not delete path: {}",
183                path
184            )))
185        }
186    }
187
188    /// Check if the given path exists on the filesystem
189    pub fn exist(&self, path: &str) -> bool {
190        (unsafe {
191            let cstr_path = CString::new(path).unwrap();
192            hdfsExists(self.raw, cstr_path.as_ptr())
193        } == 0)
194    }
195
196    /// Get the file status of each entry under the specified path
197    /// Note that it is an error to list an empty directory.
198    pub fn list_status(&self, path: &str) -> Result<Vec<FileStatus>, HdfsErr> {
199        let mut entry_num: c_int = 0;
200        let ptr = unsafe {
201            let cstr_path = CString::new(path).unwrap();
202            hdfsListDirectory(self.raw, cstr_path.as_ptr(), &mut entry_num)
203        };
204        if ptr.is_null() {
205            Err(HdfsErr::Miscellaneous(format!(
206                "Could not list content of path: {}",
207                path
208            )))
209        } else {
210            let shared_ptr = Rc::new(HdfsFileInfoPtr::new_array(ptr, entry_num));
211
212            let list = (0..entry_num)
213                .into_iter()
214                .map(|idx| FileStatus::from_array(shared_ptr.clone(), idx as u32))
215                .collect::<Vec<FileStatus>>();
216
217            Ok(list)
218        }
219    }
220
221    pub fn mkdir(&self, path: &str) -> Result<bool, HdfsErr> {
222        let res = unsafe {
223            let cstr_path = CString::new(path).unwrap();
224            hdfsCreateDirectory(self.raw, cstr_path.as_ptr())
225        };
226        if res == 0 {
227            Ok(true)
228        } else {
229            Err(HdfsErr::Miscellaneous(format!(
230                "Could not create directory at path: {}",
231                path
232            )))
233        }
234    }
235
236    #[inline]
237    pub fn open(&self, path: &str) -> Result<HdfsFile, HdfsErr> {
238        self.open_with_buf_size(path, 0)
239    }
240
241    pub fn open_with_buf_size(&self, path: &str, buf_size: i32) -> Result<HdfsFile, HdfsErr> {
242        let file = unsafe {
243            let cstr_path = CString::new(path).unwrap();
244            hdfsOpenFile(
245                self.raw,
246                cstr_path.as_ptr(),
247                O_RDONLY,
248                buf_size as c_int,
249                0,
250                0,
251            )
252        };
253        self.new_hdfs_file(path, file)
254    }
255
256    pub fn open_for_writing(&self, path: &str) -> Result<HdfsFile, HdfsErr> {
257        let file = unsafe {
258            let cstr_path = CString::new(path).unwrap();
259            hdfsOpenFile(self.raw, cstr_path.as_ptr(), O_WRONLY, 0, 0, 0)
260        };
261        self.new_hdfs_file(path, file)
262    }
263
264    fn new_hdfs_file(&self, path: &str, file: hdfsFile) -> Result<HdfsFile, HdfsErr> {
265        if file.is_null() {
266            Err(HdfsErr::Miscellaneous(format!(
267                "Could not open HDFS file at path {}",
268                path
269            )))
270        } else {
271            Ok(HdfsFile {
272                fs: self.clone(),
273                path: path.to_owned(),
274                file,
275                _market: PhantomData,
276            })
277        }
278    }
279
280    /// Rename a file
281    ///
282    /// old_path - the path to rename
283    /// new_path - the new name
284    ///
285    /// Note that the destination directory must exist.
286    pub fn rename(&self, old_path: &str, new_path: &str) -> Result<bool, HdfsErr> {
287        let ret = unsafe {
288            let cstr_old_path = CString::new(old_path).unwrap();
289            let cstr_new_path = CString::new(new_path).unwrap();
290            hdfsRename(self.raw, cstr_old_path.as_ptr(), cstr_new_path.as_ptr())
291        };
292        if ret == 0 {
293            Ok(true)
294        } else {
295            Err(HdfsErr::Miscellaneous(format!(
296                "Could not reanme {} to {}",
297                old_path, new_path
298            )))
299        }
300    }
301}
302
303// -------------------------------------------------------------------------------------------------
304
305/// Safely deallocatable hdfsFileinfo pointer
306struct HdfsFileInfoPtr {
307    pub ptr: *const hdfsFileInfo,
308    pub len: i32,
309}
310
311impl Drop for HdfsFileInfoPtr {
312    fn drop(&mut self) {
313        unsafe { hdfsFreeFileInfo(self.ptr as *mut hdfsFileInfo, self.len) };
314    }
315}
316
317impl HdfsFileInfoPtr {
318    fn new(ptr: *const hdfsFileInfo) -> HdfsFileInfoPtr {
319        HdfsFileInfoPtr { ptr, len: 1 }
320    }
321
322    pub fn new_array(ptr: *const hdfsFileInfo, len: i32) -> HdfsFileInfoPtr {
323        HdfsFileInfoPtr { ptr, len }
324    }
325}
326
327/// Interface that represents the client side information for a file or directory.
328pub struct FileStatus {
329    raw: Rc<HdfsFileInfoPtr>,
330    idx: u32,
331    _marker: PhantomData<()>,
332}
333
334impl FileStatus {
335    /// create FileStatus from *const hdfsFileInfo
336    #[inline]
337    fn new(ptr: *const hdfsFileInfo) -> FileStatus {
338        FileStatus {
339            raw: Rc::new(HdfsFileInfoPtr::new(ptr)),
340            idx: 0,
341            _marker: PhantomData,
342        }
343    }
344
345    /// create FileStatus from *const hdfsFileInfo which points to a dynamically allocated array.
346    #[inline]
347    fn from_array(raw: Rc<HdfsFileInfoPtr>, idx: u32) -> FileStatus {
348        FileStatus {
349            raw,
350            idx,
351            _marker: PhantomData,
352        }
353    }
354
355    /// Get the pointer to hdfsFileInfo
356    #[inline]
357    fn ptr(&self) -> *const hdfsFileInfo {
358        unsafe { self.raw.ptr.offset(self.idx as isize) }
359    }
360
361    /// Get the name of the file
362    #[inline]
363    pub fn name(&self) -> &str {
364        let slice = unsafe { CStr::from_ptr((*self.ptr()).mName) }.to_bytes();
365        std::str::from_utf8(slice).unwrap()
366    }
367
368    /// Is this a file?
369    #[inline]
370    pub fn is_file(&self) -> bool {
371        match unsafe { &*self.ptr() }.mKind {
372            tObjectKind::kObjectKindFile => true,
373            tObjectKind::kObjectKindDirectory => false,
374        }
375    }
376
377    /// Is this a directory?
378    #[inline]
379    pub fn is_directory(&self) -> bool {
380        match unsafe { &*self.ptr() }.mKind {
381            tObjectKind::kObjectKindFile => false,
382            tObjectKind::kObjectKindDirectory => true,
383        }
384    }
385
386    /// Get the owner of the file
387    #[inline]
388    pub fn owner(&self) -> &str {
389        let slice = unsafe { CStr::from_ptr((*self.ptr()).mOwner) }.to_bytes();
390        std::str::from_utf8(slice).unwrap()
391    }
392
393    /// Get the group associated with the file
394    #[inline]
395    pub fn group(&self) -> &str {
396        let slice = unsafe { CStr::from_ptr((*self.ptr()).mGroup) }.to_bytes();
397        std::str::from_utf8(slice).unwrap()
398    }
399
400    /// Get the permissions associated with the file
401    #[inline]
402    pub fn permission(&self) -> i16 {
403        unsafe { &*self.ptr() }.mPermissions as i16
404    }
405
406    /// Get the length of this file, in bytes.
407    #[allow(clippy::len_without_is_empty)]
408    #[inline]
409    pub fn len(&self) -> usize {
410        unsafe { &*self.ptr() }.mSize as usize
411    }
412
413    /// Get the block size of the file.
414    #[inline]
415    pub fn block_size(&self) -> usize {
416        unsafe { &*self.ptr() }.mBlockSize as usize
417    }
418
419    /// Get the replication factor of a file.
420    #[inline]
421    pub fn replica_count(&self) -> i16 {
422        unsafe { &*self.ptr() }.mReplication as i16
423    }
424
425    /// Get the last modification time for the file in seconds
426    #[inline]
427    pub fn last_modified(&self) -> time_t {
428        unsafe { &*self.ptr() }.mLastMod
429    }
430
431    /// Get the last access time for the file in seconds
432    #[inline]
433    pub fn last_access(&self) -> time_t {
434        unsafe { &*self.ptr() }.mLastAccess
435    }
436}
437
438// -------------------------------------------------------------------------------------------------
439
440/// An HDFS file
441#[derive(Clone)]
442pub struct HdfsFile {
443    fs: HdfsFs,
444    path: String,
445    file: hdfsFile,
446    _market: PhantomData<()>,
447}
448impl std::fmt::Debug for HdfsFile {
449    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
450        f.debug_struct("HdfsFile")
451            .field("connection_properties", &self.fs.connection_properties)
452            .field("path", &self.path)
453            .finish()
454    }
455}
456
457impl HdfsFile {
458    #[inline]
459    pub fn fs(&self) -> &HdfsFs {
460        &self.fs
461    }
462
463    #[inline]
464    pub fn path(&self) -> &str {
465        &self.path
466    }
467
468    ///  Number of bytes that can be read from this file without blocking.
469    pub fn available(&self) -> Result<i32, HdfsErr> {
470        let ret = unsafe { hdfsAvailable(self.fs.raw, self.file) };
471        if ret < 0 {
472            Err(HdfsErr::Miscellaneous(format!(
473                "Could not determine HDFS availability for {}",
474                self.path
475            )))
476        } else {
477            Ok(ret)
478        }
479    }
480
481    /// Close the opened file
482    pub fn close(&self) -> Result<bool, HdfsErr> {
483        if unsafe { hdfsCloseFile(self.fs.raw, self.file) } == 0 {
484            Ok(true)
485        } else {
486            Err(HdfsErr::Miscellaneous(format!(
487                "Could not close {}",
488                self.path
489            )))
490        }
491    }
492
493    /// Get the file status, including file size, last modified time, etc
494    pub fn get_file_status(&self) -> Result<FileStatus, HdfsErr> {
495        self.fs.get_file_status(self.path())
496    }
497
498    /// Read data from an open file
499    pub fn read(&self, buf: &mut [u8]) -> Result<i32, HdfsErr> {
500        let read_len = unsafe {
501            hdfsRead(
502                self.fs.raw,
503                self.file,
504                buf.as_ptr() as *mut c_void,
505                buf.len() as tSize,
506            )
507        };
508        if read_len > 0 {
509            Ok(read_len as i32)
510        } else {
511            Err(HdfsErr::Miscellaneous(format!(
512                "Failed to read from {}",
513                self.path
514            )))
515        }
516    }
517
518    /// Seek to given offset in file.
519    pub fn seek(&self, offset: u64) -> bool {
520        (unsafe { hdfsSeek(self.fs.raw, self.file, offset as tOffset) }) == 0
521    }
522
523    pub fn write(&self, buf: &[u8]) -> Result<i32, HdfsErr> {
524        let written_len = unsafe {
525            hdfsWrite(
526                self.fs.raw,
527                self.file,
528                buf.as_ptr() as *mut c_void,
529                buf.len() as tSize,
530            )
531        };
532        if written_len > 0 {
533            Ok(written_len)
534        } else {
535            Err(HdfsErr::Miscellaneous(format!(
536                "Failed to write to {}",
537                self.path
538            )))
539        }
540    }
541}
542
543// -------------------------------------------------------------------------------------------------
544
545/// Create an instance of hdfsFs.
546///
547/// * connection_properties - Namenode connection parameters
548/// * hdfs_params - optional key value pairs that need to be passed to configure
549///   the HDFS client side
550fn create_hdfs_fs(
551    connection_properties: ConnectionProperties,
552    hdfs_params: HashMap<String, String>,
553) -> Result<hdfsFS, HdfsErr> {
554    let hdfs_fs = unsafe {
555        let hdfs_builder = hdfsNewBuilder();
556
557        let cstr_host = CString::new(connection_properties.namenode_host.as_bytes()).unwrap();
558        for (k, v) in hdfs_params {
559            let cstr_k = CString::new(k).unwrap();
560            let cstr_v = CString::new(v).unwrap();
561            hdfsBuilderConfSetStr(hdfs_builder, cstr_k.as_ptr(), cstr_v.as_ptr());
562        }
563        hdfsBuilderSetNameNode(hdfs_builder, cstr_host.as_ptr());
564        hdfsBuilderSetNameNodePort(hdfs_builder, connection_properties.namenode_port);
565
566        if let Some(user) = connection_properties.namenode_user.clone() {
567            let cstr_user = CString::new(user.as_bytes()).unwrap();
568            hdfsBuilderSetUserName(hdfs_builder, cstr_user.as_ptr());
569        }
570
571        if let Some(kerb_ticket_cache_path) =
572            connection_properties.kerberos_ticket_cache_path.clone()
573        {
574            let cstr_kerb_ticket_cache_path =
575                CString::new(kerb_ticket_cache_path.as_bytes()).unwrap();
576            hdfsBuilderSetKerbTicketCachePath(hdfs_builder, cstr_kerb_ticket_cache_path.as_ptr());
577        }
578
579        info!(
580            "Connecting to Namenode, host: {}, port: {}, user: {:?}, krb_ticket_cache: {:?}",
581            connection_properties.namenode_host,
582            connection_properties.namenode_port,
583            connection_properties.namenode_user,
584            connection_properties.kerberos_ticket_cache_path
585        );
586
587        hdfsBuilderConnect(hdfs_builder)
588    };
589
590    if hdfs_fs.is_null() {
591        Err(HdfsErr::CannotConnectToNameNode(format!(
592            "{}:{}",
593            connection_properties.namenode_host, connection_properties.namenode_port
594        )))
595    } else {
596        Ok(hdfs_fs)
597    }
598}