hdrs/
file.rs

1use std::io::{Error, ErrorKind, Read, Result, Seek, SeekFrom, Write};
2use std::ptr;
3
4use hdfs_sys::*;
5use libc::c_void;
6use log::debug;
7
8use crate::Client;
9
10// at most 2^30 bytes, ~1GB
11const FILE_LIMIT: usize = 1073741824;
12
13/// File will hold the underlying pointer to `hdfsFile`.
14///
15/// The internal file will be closed while `Drop`, so their is no need to close it manually.
16///
17/// # Examples
18///
19/// ```no_run
20/// use hdrs::{Client, ClientBuilder};
21///
22/// let fs = ClientBuilder::new("default")
23///     .with_user("default")
24///     .with_kerberos_ticket_cache_path("/tmp/krb5_111")
25///     .connect()
26///     .expect("client connect succeed");
27/// let mut f = fs
28///     .open_file()
29///     .read(true)
30///     .open("/tmp/hello.txt")
31///     .expect("must open success");
32/// ```
33#[derive(Debug)]
34pub struct File {
35    fs: hdfsFS,
36    f: hdfsFile,
37    path: String,
38}
39
40/// HDFS's client handle is thread safe.
41unsafe impl Send for File {}
42unsafe impl Sync for File {}
43
44impl Drop for File {
45    fn drop(&mut self) {
46        unsafe {
47            debug!("file has been closed");
48            let _ = hdfsCloseFile(self.fs, self.f);
49            // hdfsCloseFile will free self.f no matter success or failed.
50            self.f = ptr::null_mut();
51        }
52    }
53}
54
55impl File {
56    pub(crate) fn new(fs: hdfsFS, f: hdfsFile, path: &str) -> Self {
57        File {
58            fs,
59            f,
60            path: path.to_string(),
61        }
62    }
63
64    /// Works only for files opened in read-only mode.
65    fn inner_seek(&self, offset: i64) -> Result<()> {
66        let n = unsafe { hdfsSeek(self.fs, self.f, offset) };
67
68        if n == -1 {
69            return Err(Error::last_os_error());
70        }
71
72        Ok(())
73    }
74
75    fn tell(&self) -> Result<i64> {
76        let n = unsafe { hdfsTell(self.fs, self.f) };
77
78        if n == -1 {
79            return Err(Error::last_os_error());
80        }
81
82        Ok(n)
83    }
84
85    pub fn read_at(&self, buf: &mut [u8], offset: u64) -> Result<usize> {
86        let n = unsafe {
87            hdfsPread(
88                self.fs,
89                self.f,
90                offset as i64,
91                buf.as_ptr() as *mut c_void,
92                buf.len().min(FILE_LIMIT) as i32,
93            )
94        };
95
96        if n == -1 {
97            return Err(Error::last_os_error());
98        }
99
100        Ok(n as usize)
101    }
102}
103
104impl Read for File {
105    fn read(&mut self, buf: &mut [u8]) -> Result<usize> {
106        let n = unsafe {
107            hdfsRead(
108                self.fs,
109                self.f,
110                buf.as_ptr() as *mut c_void,
111                buf.len().min(FILE_LIMIT) as i32,
112            )
113        };
114
115        if n == -1 {
116            return Err(Error::last_os_error());
117        }
118
119        Ok(n as usize)
120    }
121}
122
123impl Seek for File {
124    fn seek(&mut self, pos: SeekFrom) -> Result<u64> {
125        match pos {
126            SeekFrom::Start(n) => {
127                self.inner_seek(n as i64)?;
128                Ok(n)
129            }
130            SeekFrom::Current(n) => {
131                let current = self.tell()?;
132                let offset = (current + n) as u64;
133                self.inner_seek(offset as i64)?;
134                Ok(offset)
135            }
136            SeekFrom::End(n) => {
137                let meta = Client::new(self.fs).metadata(&self.path)?;
138                let offset = meta.len() as i64 + n;
139                self.inner_seek(offset)?;
140                Ok(offset as u64)
141            }
142        }
143    }
144}
145
146impl Write for File {
147    fn write(&mut self, buf: &[u8]) -> Result<usize> {
148        let n = unsafe {
149            hdfsWrite(
150                self.fs,
151                self.f,
152                buf.as_ptr() as *const c_void,
153                buf.len().min(FILE_LIMIT) as i32,
154            )
155        };
156
157        if n == -1 {
158            return Err(Error::last_os_error());
159        }
160
161        Ok(n as usize)
162    }
163
164    fn flush(&mut self) -> Result<()> {
165        let n = unsafe { hdfsFlush(self.fs, self.f) };
166
167        if n == -1 {
168            return Err(Error::last_os_error());
169        }
170
171        Ok(())
172    }
173}
174
175impl Read for &File {
176    fn read(&mut self, buf: &mut [u8]) -> Result<usize> {
177        let n = unsafe {
178            hdfsRead(
179                self.fs,
180                self.f,
181                buf.as_ptr() as *mut c_void,
182                buf.len().min(FILE_LIMIT) as i32,
183            )
184        };
185
186        if n == -1 {
187            return Err(Error::last_os_error());
188        }
189
190        Ok(n as usize)
191    }
192}
193
194impl Seek for &File {
195    fn seek(&mut self, pos: SeekFrom) -> Result<u64> {
196        match pos {
197            SeekFrom::Start(n) => {
198                self.inner_seek(n as i64)?;
199                Ok(n)
200            }
201            SeekFrom::Current(n) => {
202                let current = self.tell()?;
203                let offset = (current + n) as u64;
204                self.inner_seek(offset as i64)?;
205                Ok(offset)
206            }
207            SeekFrom::End(_) => Err(Error::new(
208                ErrorKind::Unsupported,
209                "hdfs doesn't support seek from end",
210            )),
211        }
212    }
213}
214
215impl Write for &File {
216    fn write(&mut self, buf: &[u8]) -> Result<usize> {
217        let n = unsafe {
218            hdfsWrite(
219                self.fs,
220                self.f,
221                buf.as_ptr() as *const c_void,
222                buf.len().min(FILE_LIMIT) as i32,
223            )
224        };
225
226        if n == -1 {
227            return Err(Error::last_os_error());
228        }
229
230        Ok(n as usize)
231    }
232
233    fn flush(&mut self) -> Result<()> {
234        let n = unsafe { hdfsFlush(self.fs, self.f) };
235
236        if n == -1 {
237            return Err(Error::last_os_error());
238        }
239
240        Ok(())
241    }
242}
243
244#[cfg(test)]
245mod tests {
246    use super::*;
247    use crate::client::ClientBuilder;
248
249    #[test]
250    fn test_file_build() {
251        let _ = env_logger::try_init();
252
253        let fs = ClientBuilder::new("default")
254            .connect()
255            .expect("init success");
256
257        let path = uuid::Uuid::new_v4().to_string();
258
259        let f = fs
260            .open_file()
261            .create(true)
262            .write(true)
263            .open(&format!("/tmp/{path}"))
264            .expect("open file success");
265
266        assert!(!f.f.is_null());
267        assert!(!f.fs.is_null());
268    }
269
270    #[test]
271    fn test_file_write() {
272        let _ = env_logger::try_init();
273
274        let fs = ClientBuilder::new("default")
275            .connect()
276            .expect("init success");
277
278        let path = uuid::Uuid::new_v4().to_string();
279
280        let mut f = fs
281            .open_file()
282            .create(true)
283            .write(true)
284            .open(&format!("/tmp/{path}"))
285            .expect("open file success");
286
287        let n = f
288            .write("Hello, World!".as_bytes())
289            .expect("write must success");
290        assert_eq!(n, 13)
291    }
292
293    #[test]
294    fn test_file_read() {
295        let _ = env_logger::try_init();
296
297        let fs = ClientBuilder::new("default")
298            .connect()
299            .expect("init success");
300
301        let path = uuid::Uuid::new_v4().to_string();
302
303        let mut f = fs
304            .open_file()
305            .create(true)
306            .write(true)
307            .open(&format!("/tmp/{path}"))
308            .expect("open file success");
309
310        let n = f
311            .write("Hello, World!".as_bytes())
312            .expect("write must success");
313        assert_eq!(n, 13)
314    }
315}