1include!(concat!(env!("OUT_DIR"), "/bindings.rs"));
2
3use std::io::{Read, Write};
4use std::path::{Path, PathBuf};
5use std::ffi::{CStr, CString};
7use libc::c_void;
8
9
10pub struct HdfsFile {
11 pub name_node: String,
12 pub path: PathBuf, pub read_pos: i64,
14 pub size: i64,
15 pub block_size: i64,
16 fs: Option<hdfsFS>,
17 opened_file: Option<hdfsFile>,
18}
19
20
21impl HdfsFile {
22
23 pub fn open<P: Into<PathBuf>>(path: P) -> std::io::Result<HdfsFile> {
45 let mut reader = HdfsFile {
46 name_node: String::from("default"),
47 path: path.into(),
48 read_pos: 0,
49 size: 0,
50 block_size: 0,
51 fs: None,
52 opened_file: None,
53 };
54
55 reader.connect().unwrap();
56 reader.open_with_flag(O_RDONLY).unwrap();
57
58 Ok(reader)
59 }
60
61 pub fn create<P: Into<PathBuf>>(path: P) -> std::io::Result<HdfsFile> {
63 let mut reader = HdfsFile {
64 name_node: String::from("default"),
65 path: path.into(),
66 read_pos: 0,
67 size: 0,
68 block_size: 0,
69 fs: None,
70 opened_file: None,
71 };
72
73 reader.connect().unwrap();
74 reader.open_with_flag(O_WRONLY | O_CREAT).unwrap();
75
76 Ok(reader)
77 }
78
79
80 pub fn from_split<P: Into<PathBuf>>(path: P, start: i64, end: i64) -> HdfsFile {
85
86 let reader = HdfsFile {
87 name_node: String::from("default"),
88 path: path.into(),
89 read_pos: start,
90 size: end,
91 block_size: 0,
92 fs: None,
93 opened_file: None,
94 };
95
96 reader
100 }
101
102 pub fn get_hosts(&mut self, start: u64, end: u64) -> std::io::Result<Vec<String>> {
103 let fs = self.fs.unwrap();
104 let file_path = self.path.to_string_lossy();
105 let file_path = CString::new(file_path.as_bytes()).unwrap();
106
107 unsafe {
108 let block_hosts = hdfsGetHosts(fs, file_path.as_ptr(), start as i64, end as i64);
109 let block_count = (0..).take_while(
110 |i| { let arg = block_hosts.offset(*i); !(*arg).is_null() })
111 .count();
112
113 let mut hosts_strings: Vec<String> = Vec::new();
114 for i in 0..block_count {
116 let hosts = *(block_hosts.offset(i as isize));
117 let hosts_len = (0..).take_while(
118 |i| { let arg = hosts.offset(*i); !(*arg).is_null() }
119 ).count();
120
121 let hosts_iter = std::slice::from_raw_parts(hosts, hosts_len).iter();
122 for one_host in hosts_iter {
123 hosts_strings.push(CStr::from_ptr(*one_host).to_string_lossy().into_owned());
124 }
125 }
126 hdfsFreeHosts(block_hosts);
127 Ok(hosts_strings)
128 }
129 }
130
131
132
133 fn connect(&mut self) -> std::io::Result<()> {
134 let name_node_ptr = CString::new(self.name_node.as_bytes()).unwrap();
135 let fs = unsafe {
136 hdfsConnect(name_node_ptr.as_ptr(), 0)
137 };
138
139 match fs.is_null(){
140 false => {
141 self.fs = Some(fs);
142 Ok(())
143 }
144 _ => {
145 Err(std::io::Error::new(std::io::ErrorKind::NotConnected,
146 format!("Failed to connect {}", self.name_node)))
147
148 }
149 }
150 }
151
152 fn open_with_flag(&mut self, flag: u32) -> std::io::Result<()> {
153 let file_path = self.path.to_string_lossy();
154 let file_path = CString::new(file_path.as_bytes()).unwrap();
155 let fs = self.fs.unwrap();
156
157 let file_exists = unsafe {
158 hdfsExists(fs, file_path.as_ptr()) == 0
159 };
160
161 let create_flag = (flag & O_CREAT) != 0;
162 match (file_exists, create_flag) {
163
164 (false, false) => {
165 Err(std::io::Error::new(std::io::ErrorKind::NotFound,
166 format!("No such file: {:?}", self.path)))
167 }
168
169 (true, _) => {
170 let file_info_ptr = unsafe {
171 hdfsGetPathInfo(fs, file_path.as_ptr())
172 };
173
174 match file_info_ptr.is_null() {
175 true => {
176 Err(std::io::Error::new(std::io::ErrorKind::Other,
177 format!("Failed to obtained info for {:?}", self.path)))
178 }
179 false => {
180
181 let file_info = unsafe {
182 *file_info_ptr
183 };
184
185 let file_size = file_info.mSize;
186 let block_size = file_info.mBlockSize;
187 let opened_file = unsafe {
188 hdfsOpenFile(fs, file_path.as_ptr(), flag as i32, 0, 0, 0)
189 };
190
191 match opened_file.is_null(){
192 true => {
193 Err(std::io::Error::new(std::io::ErrorKind::Other,
194 format!("Failed to open: {:?}", self.path)))
195 }
196 false => {
197 self.size = file_size;
198 self.block_size = block_size;
199 self.opened_file = Some(opened_file);
200 Ok(())
201 }
202 }
203 }
204 }
205 }
206
207 _ => {
208
209 let opened_file = unsafe {
210 hdfsOpenFile(fs, file_path.as_ptr(), flag as i32, 0, 0, 0)
211 };
212
213 match opened_file.is_null(){
214 true => {
215 Err(std::io::Error::new(std::io::ErrorKind::Other,
216 format!("Failed to create: {:?}", self.path)))
217 }
218 false => {
219 self.opened_file = Some(opened_file);
220 Ok(())
221 }
222 }
223 }
224 }
225 }
226
227 pub fn close(&mut self) {
228 match self.opened_file {
229 Some(file) => {
230 unsafe { hdfsCloseFile(self.fs.unwrap(), file); }
231 self.opened_file = None;
232 },
233 _ => {},
234 }
235
236 match self.fs {
237 Some(fs) => {
238 unsafe { hdfsDisconnect(fs); }
239 self.fs = None;
240 },
241 _ => {},
242 }
243 }
244
245 pub fn delete(&mut self) -> std::io::Result<()>{
246
247 match self.opened_file {
248 Some(file) => {
249 unsafe { hdfsCloseFile(self.fs.unwrap(), file); }
250 self.opened_file = None;
251 },
252 _ => {},
253 };
254
255 let file_path = self.path.to_string_lossy();
256 let file_path = CString::new(file_path.as_bytes()).unwrap();
257
258 let result = unsafe {
259 hdfsDelete(self.fs.unwrap(), file_path.as_ptr(), 0)
260 };
261
262 match result {
263 0 => Ok(()),
264 _ => Err(std::io::Error::new(std::io::ErrorKind::Other,
265 format!("Failed to delete {:?}", self.path))),
266 }
267 }
268}
269
270impl Drop for HdfsFile {
271 fn drop(&mut self) {
272 match self.opened_file {
273 Some(file) => {
274 unsafe { hdfsCloseFile(self.fs.unwrap(), file); }
275 self.opened_file = None;
276 },
277 _ => {},
278 }
279
280 match self.fs {
281 Some(fs) => {
282 unsafe { hdfsDisconnect(fs); }
283 self.fs = None;
284 },
285 _ => {},
286 }
287 }
288}
289
290impl Read for HdfsFile {
291 fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
292 let remaining_size = self.size - self.read_pos;
293 let read_size = std::cmp::min((buf.len()) as i32, remaining_size as i32);
294
295 match self.opened_file {
296 Some(_) => {},
297 _ => {
298 self.connect().unwrap();
299 self.open_with_flag(O_RDONLY).unwrap();
300 }
301 }
302
303 unsafe {
304 hdfsPread(
305 self.fs.unwrap(),
306 self.opened_file.unwrap(),
307 self.read_pos,
308 buf.as_mut_ptr() as *mut c_void, read_size);
309 };
310
311 self.read_pos += read_size as i64;
312
313 Ok(read_size as usize)
314 }
315}
316
317impl Write for HdfsFile {
318 fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
319 let buf_ptr = buf.as_ptr() as *const c_void;
320 let written_bytes = unsafe {
321 hdfsWrite(self.fs.unwrap(), self.opened_file.unwrap(), buf_ptr, buf.len() as i32)
322 };
323
324 Ok(written_bytes as usize)
325
326 }
327
328 fn flush(&mut self) -> std::io::Result<()> {
329
330 let result = unsafe {
331 hdfsFlush(self.fs.unwrap(), self.opened_file.unwrap())
332 };
333
334 match result {
335 0 => Ok(()),
336 _ => Err(std::io::Error::new(std::io::ErrorKind::WriteZero,
337 "failed to flush to hdfs")),
338 }
339 }
340}
341
342pub fn read_dir<P: AsRef<Path>>(path: P) -> Vec<HdfsFile>{
343
344 let mut file_list = Vec::new();
345
346 let name_node = CString::new("default").unwrap();
347 let fs = unsafe {
348 hdfsConnect(name_node.as_ptr(), 0)
349 };
350
351 let file_path = path.as_ref().to_string_lossy();
352 let file_path = CString::new(file_path.as_bytes()).unwrap();
353
354 let mut num_entries: i32 = 0;
355 let list_result = unsafe {
356 hdfsListDirectory(fs, file_path.as_ptr(), &mut num_entries)
357 };
358
359 let list_result = unsafe {
360 std::slice::from_raw_parts(list_result, num_entries as usize)
361 };
362
363 for result in list_result {
364 let file_name = (*result).mName;
365 let file_name = unsafe {
366 CStr::from_ptr(file_name).to_str().unwrap()
367 };
368
369 let file_size = (*result).mSize;
370 let file_block_size = (*result).mBlockSize;
371
372 let hdfs_file = HdfsFile {
373 name_node: String::from("default"),
374 path: PathBuf::from(file_name),
375 read_pos:0,
376 size: file_size,
377 block_size: file_block_size,
378 fs: None,
379 opened_file: None
380 };
381
382 file_list.push(hdfs_file);
383 }
384
385 unsafe { hdfsDisconnect(fs) };
386 file_list
387}