rust_hdfs/
hdfs_fs.rs

1include!(concat!(env!("OUT_DIR"), "/bindings.rs"));
2
3use std::io::{Read, Write};
4use std::path::{Path, PathBuf};
5// use std::fs::{OpenOptions};
6use std::ffi::{CStr, CString};
7use libc::c_void;
8
9
10pub struct HdfsFile {
11    pub name_node: String, 
12    pub path: PathBuf, // not sure it makes sense or not using Path/PathBuf for hdfs 
13    pub read_pos: i64,
14    pub size: i64,
15    pub block_size: i64,
16    fs: Option<hdfsFS>,
17    opened_file: Option<hdfsFile>, 
18}
19
20
21impl HdfsFile {
22
23    // Attempts to open a file in read-only mode.
24    // pub fn init_with_name_node<P:Into<String>, Q: Into<PathBuf>>(name_node: P, 
25    //     path: Q) -> std::io::Result<HdfsFile> {
26    //     let mut reader = HdfsFile {
27    //         name_node: name_node.into(),
28    //         path: path.into(),
29    //         read_pos: 0,
30    //         size: 0,
31    //         block_size: 0,
32    //         fs: None,
33    //         opened_file: None, 
34    //     };
35
36    //     reader.connect().unwrap();
37    //     reader.open_with_flag(O_RDONLY).unwrap();
38    
39    //     Ok(reader)
40
41    // }
42
43    // Attempts to open a file in read-only mode.
44    pub fn open<P: Into<PathBuf>>(path: P) -> std::io::Result<HdfsFile> {
45        let mut reader = HdfsFile {
46            name_node: String::from("default"), 
47            path: path.into(),
48            read_pos: 0,
49            size: 0,
50            block_size: 0,
51            fs: None,
52            opened_file: None, 
53        };
54    
55        reader.connect().unwrap();
56        reader.open_with_flag(O_RDONLY).unwrap();
57    
58        Ok(reader)
59    }
60
61    // Opens a file in write mode.
62    pub fn create<P: Into<PathBuf>>(path: P) -> std::io::Result<HdfsFile> {
63        let mut reader = HdfsFile {
64            name_node: String::from("default"), 
65            path: path.into(),
66            read_pos: 0,
67            size: 0,
68            block_size: 0,
69            fs: None,
70            opened_file: None, 
71        };
72    
73        reader.connect().unwrap();
74        reader.open_with_flag(O_WRONLY | O_CREAT).unwrap();
75    
76        Ok(reader)
77    }
78
79    
80    // pub fn with_option() -> OpenOptions {
81    //     unimplemented!();
82    // }
83
84    pub fn from_split<P: Into<PathBuf>>(path: P, start: i64, end: i64) -> HdfsFile {
85
86        let reader = HdfsFile {
87            name_node: String::from("default"), 
88            path: path.into(),
89            read_pos: start,
90            size: end,
91            block_size: 0,
92            fs: None,
93            opened_file: None, 
94        };
95    
96        // reader.connect().unwrap();
97        // reader.open_with_flag(O_RDONLY).unwrap();
98
99        reader
100    }
101
102    pub fn get_hosts(&mut self, start: u64, end: u64) -> std::io::Result<Vec<String>> {
103        let fs = self.fs.unwrap();
104        let file_path = self.path.to_string_lossy();
105        let file_path = CString::new(file_path.as_bytes()).unwrap();
106        
107        unsafe {
108            let block_hosts = hdfsGetHosts(fs, file_path.as_ptr(), start as i64, end as i64);
109            let block_count = (0..).take_while(
110                |i| { let arg = block_hosts.offset(*i); !(*arg).is_null() })
111                .count();
112            
113            let mut hosts_strings: Vec<String> = Vec::new();
114            // looks too complicated
115            for i in 0..block_count {
116                let hosts = *(block_hosts.offset(i as isize));
117                let hosts_len = (0..).take_while(
118                    |i| { let arg = hosts.offset(*i); !(*arg).is_null() }
119                ).count();
120
121                let hosts_iter = std::slice::from_raw_parts(hosts, hosts_len).iter();
122                for one_host in hosts_iter {
123                    hosts_strings.push(CStr::from_ptr(*one_host).to_string_lossy().into_owned());
124                }
125            }
126            hdfsFreeHosts(block_hosts);
127            Ok(hosts_strings)
128        }
129    }
130
131    
132
133    fn connect(&mut self) -> std::io::Result<()> {
134        let name_node_ptr = CString::new(self.name_node.as_bytes()).unwrap();
135        let fs = unsafe {
136            hdfsConnect(name_node_ptr.as_ptr(), 0)
137        };
138
139        match fs.is_null(){
140            false => {
141                self.fs = Some(fs);
142                Ok(())
143            }
144            _ => {
145                Err(std::io::Error::new(std::io::ErrorKind::NotConnected, 
146                    format!("Failed to connect {}", self.name_node)))
147
148            }
149        }
150    }
151
152    fn open_with_flag(&mut self, flag: u32) -> std::io::Result<()> {
153        let file_path = self.path.to_string_lossy();
154        let file_path = CString::new(file_path.as_bytes()).unwrap();
155        let fs = self.fs.unwrap();
156
157        let file_exists = unsafe {
158            hdfsExists(fs, file_path.as_ptr()) == 0
159        };
160
161        let create_flag = (flag & O_CREAT) != 0;
162        match (file_exists, create_flag) {
163
164            (false, false) => {
165                Err(std::io::Error::new(std::io::ErrorKind::NotFound, 
166                    format!("No such file: {:?}", self.path)))
167            }
168
169            (true, _) => {
170                let file_info_ptr = unsafe {
171                    hdfsGetPathInfo(fs, file_path.as_ptr())
172                };
173
174                match file_info_ptr.is_null() {
175                    true => {
176                        Err(std::io::Error::new(std::io::ErrorKind::Other, 
177                            format!("Failed to obtained info for {:?}", self.path)))
178                    }
179                    false => {
180
181                        let file_info = unsafe {
182                            *file_info_ptr
183                        };
184
185                        let file_size = file_info.mSize;
186                        let block_size = file_info.mBlockSize;
187                        let opened_file = unsafe {
188                            hdfsOpenFile(fs, file_path.as_ptr(), flag as i32, 0, 0, 0)
189                        };
190
191                        match opened_file.is_null(){
192                            true => {
193                                Err(std::io::Error::new(std::io::ErrorKind::Other, 
194                                    format!("Failed to open: {:?}", self.path)))
195                            }
196                            false => {
197                                self.size = file_size;
198                                self.block_size = block_size;
199                                self.opened_file = Some(opened_file);
200                                Ok(())
201                            }
202                        }
203                    }
204                }
205            }
206
207            _ => {
208
209                let opened_file = unsafe {
210                    hdfsOpenFile(fs, file_path.as_ptr(), flag as i32, 0, 0, 0)
211                };
212
213                match opened_file.is_null(){
214                    true => {
215                        Err(std::io::Error::new(std::io::ErrorKind::Other, 
216                            format!("Failed to create: {:?}", self.path)))
217                    }
218                    false => {
219                        self.opened_file = Some(opened_file);
220                        Ok(())
221                    }
222                }
223            }
224        }
225    }
226
227    pub fn close(&mut self) {
228        match self.opened_file {
229            Some(file) => {
230                unsafe { hdfsCloseFile(self.fs.unwrap(), file); }
231                self.opened_file = None; 
232            }, 
233            _ => {},
234        }
235
236        match self.fs {
237            Some(fs) => {
238                unsafe { hdfsDisconnect(fs); }
239                self.fs = None; 
240            }, 
241            _ => {},
242        }
243    }
244    
245    pub fn delete(&mut self) -> std::io::Result<()>{
246
247        match self.opened_file {
248            Some(file) => {
249                unsafe { hdfsCloseFile(self.fs.unwrap(), file); }
250                self.opened_file = None; 
251            }, 
252            _ => {},
253        };
254
255        let file_path = self.path.to_string_lossy();
256        let file_path = CString::new(file_path.as_bytes()).unwrap();
257
258        let result = unsafe {
259            hdfsDelete(self.fs.unwrap(), file_path.as_ptr(), 0)
260        };
261
262        match result {
263            0 => Ok(()),
264            _ => Err(std::io::Error::new(std::io::ErrorKind::Other, 
265                format!("Failed to delete {:?}", self.path))),
266        }
267    }
268}
269
270impl Drop for HdfsFile {
271    fn drop(&mut self) {
272        match self.opened_file {
273            Some(file) => {
274                unsafe { hdfsCloseFile(self.fs.unwrap(), file); }
275                self.opened_file = None; 
276            }, 
277            _ => {},
278        }
279
280        match self.fs {
281            Some(fs) => {
282                unsafe { hdfsDisconnect(fs); }
283                self.fs = None; 
284            }, 
285            _ => {},
286        }
287    }
288}
289
290impl Read for HdfsFile {
291    fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
292        let remaining_size = self.size - self.read_pos;
293        let read_size = std::cmp::min((buf.len()) as i32, remaining_size as i32);
294
295        match self.opened_file {
296            Some(_) => {},
297            _ => {
298                self.connect().unwrap();
299                self.open_with_flag(O_RDONLY).unwrap();
300            }
301        }
302
303        unsafe {
304            hdfsPread(
305                self.fs.unwrap(), 
306                self.opened_file.unwrap(), 
307                self.read_pos, 
308            buf.as_mut_ptr() as *mut c_void, read_size);
309        };
310        
311        self.read_pos += read_size as i64;
312
313        Ok(read_size as usize)
314    }
315}
316
317impl Write for HdfsFile {
318    fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
319        let buf_ptr = buf.as_ptr() as *const c_void;
320        let written_bytes = unsafe {
321            hdfsWrite(self.fs.unwrap(), self.opened_file.unwrap(), buf_ptr, buf.len() as i32)
322        };
323
324        Ok(written_bytes as usize)
325
326    }
327
328    fn flush(&mut self) -> std::io::Result<()> {
329
330        let result = unsafe {
331            hdfsFlush(self.fs.unwrap(), self.opened_file.unwrap())
332        };
333
334        match result {
335            0 => Ok(()),
336            _ => Err(std::io::Error::new(std::io::ErrorKind::WriteZero, 
337                "failed to flush to hdfs")),
338        }
339    }
340}
341
342pub fn read_dir<P: AsRef<Path>>(path: P) -> Vec<HdfsFile>{
343
344    let mut file_list = Vec::new();
345
346    let name_node = CString::new("default").unwrap();
347    let fs = unsafe {
348        hdfsConnect(name_node.as_ptr(), 0)
349    };
350
351    let file_path = path.as_ref().to_string_lossy();
352    let file_path = CString::new(file_path.as_bytes()).unwrap();
353
354    let mut num_entries: i32 = 0;
355    let list_result = unsafe {
356        hdfsListDirectory(fs, file_path.as_ptr(), &mut num_entries)
357    };
358
359    let list_result = unsafe {
360        std::slice::from_raw_parts(list_result, num_entries as usize)
361    };
362
363    for result in list_result {
364        let file_name = (*result).mName;
365        let file_name = unsafe {
366            CStr::from_ptr(file_name).to_str().unwrap()
367        }; 
368
369        let file_size = (*result).mSize;
370        let file_block_size = (*result).mBlockSize;
371
372        let hdfs_file = HdfsFile {
373            name_node: String::from("default"),
374            path: PathBuf::from(file_name),
375            read_pos:0,
376            size: file_size,
377            block_size: file_block_size,
378            fs: None,
379            opened_file: None
380        };
381
382        file_list.push(hdfs_file);
383    }
384
385    unsafe { hdfsDisconnect(fs) };
386    file_list
387}