1use std::io::{Error, ErrorKind, Read, Result, Seek, SeekFrom, Write};
2use std::ptr;
3
4use hdfs_sys::*;
5use libc::c_void;
6use log::debug;
7
8use crate::Client;
9
10const FILE_LIMIT: usize = 1073741824;
12
13#[derive(Debug)]
34pub struct File {
35 fs: hdfsFS,
36 f: hdfsFile,
37 path: String,
38}
39
40unsafe impl Send for File {}
42unsafe impl Sync for File {}
43
44impl Drop for File {
45 fn drop(&mut self) {
46 unsafe {
47 debug!("file has been closed");
48 let _ = hdfsCloseFile(self.fs, self.f);
49 self.f = ptr::null_mut();
51 }
52 }
53}
54
55impl File {
56 pub(crate) fn new(fs: hdfsFS, f: hdfsFile, path: &str) -> Self {
57 File {
58 fs,
59 f,
60 path: path.to_string(),
61 }
62 }
63
64 fn inner_seek(&self, offset: i64) -> Result<()> {
66 let n = unsafe { hdfsSeek(self.fs, self.f, offset) };
67
68 if n == -1 {
69 return Err(Error::last_os_error());
70 }
71
72 Ok(())
73 }
74
75 fn tell(&self) -> Result<i64> {
76 let n = unsafe { hdfsTell(self.fs, self.f) };
77
78 if n == -1 {
79 return Err(Error::last_os_error());
80 }
81
82 Ok(n)
83 }
84
85 pub fn read_at(&self, buf: &mut [u8], offset: u64) -> Result<usize> {
86 let n = unsafe {
87 hdfsPread(
88 self.fs,
89 self.f,
90 offset as i64,
91 buf.as_ptr() as *mut c_void,
92 buf.len().min(FILE_LIMIT) as i32,
93 )
94 };
95
96 if n == -1 {
97 return Err(Error::last_os_error());
98 }
99
100 Ok(n as usize)
101 }
102}
103
104impl Read for File {
105 fn read(&mut self, buf: &mut [u8]) -> Result<usize> {
106 let n = unsafe {
107 hdfsRead(
108 self.fs,
109 self.f,
110 buf.as_ptr() as *mut c_void,
111 buf.len().min(FILE_LIMIT) as i32,
112 )
113 };
114
115 if n == -1 {
116 return Err(Error::last_os_error());
117 }
118
119 Ok(n as usize)
120 }
121}
122
123impl Seek for File {
124 fn seek(&mut self, pos: SeekFrom) -> Result<u64> {
125 match pos {
126 SeekFrom::Start(n) => {
127 self.inner_seek(n as i64)?;
128 Ok(n)
129 }
130 SeekFrom::Current(n) => {
131 let current = self.tell()?;
132 let offset = (current + n) as u64;
133 self.inner_seek(offset as i64)?;
134 Ok(offset)
135 }
136 SeekFrom::End(n) => {
137 let meta = Client::new(self.fs).metadata(&self.path)?;
138 let offset = meta.len() as i64 + n;
139 self.inner_seek(offset)?;
140 Ok(offset as u64)
141 }
142 }
143 }
144}
145
146impl Write for File {
147 fn write(&mut self, buf: &[u8]) -> Result<usize> {
148 let n = unsafe {
149 hdfsWrite(
150 self.fs,
151 self.f,
152 buf.as_ptr() as *const c_void,
153 buf.len().min(FILE_LIMIT) as i32,
154 )
155 };
156
157 if n == -1 {
158 return Err(Error::last_os_error());
159 }
160
161 Ok(n as usize)
162 }
163
164 fn flush(&mut self) -> Result<()> {
165 let n = unsafe { hdfsFlush(self.fs, self.f) };
166
167 if n == -1 {
168 return Err(Error::last_os_error());
169 }
170
171 Ok(())
172 }
173}
174
175impl Read for &File {
176 fn read(&mut self, buf: &mut [u8]) -> Result<usize> {
177 let n = unsafe {
178 hdfsRead(
179 self.fs,
180 self.f,
181 buf.as_ptr() as *mut c_void,
182 buf.len().min(FILE_LIMIT) as i32,
183 )
184 };
185
186 if n == -1 {
187 return Err(Error::last_os_error());
188 }
189
190 Ok(n as usize)
191 }
192}
193
194impl Seek for &File {
195 fn seek(&mut self, pos: SeekFrom) -> Result<u64> {
196 match pos {
197 SeekFrom::Start(n) => {
198 self.inner_seek(n as i64)?;
199 Ok(n)
200 }
201 SeekFrom::Current(n) => {
202 let current = self.tell()?;
203 let offset = (current + n) as u64;
204 self.inner_seek(offset as i64)?;
205 Ok(offset)
206 }
207 SeekFrom::End(_) => Err(Error::new(
208 ErrorKind::Unsupported,
209 "hdfs doesn't support seek from end",
210 )),
211 }
212 }
213}
214
215impl Write for &File {
216 fn write(&mut self, buf: &[u8]) -> Result<usize> {
217 let n = unsafe {
218 hdfsWrite(
219 self.fs,
220 self.f,
221 buf.as_ptr() as *const c_void,
222 buf.len().min(FILE_LIMIT) as i32,
223 )
224 };
225
226 if n == -1 {
227 return Err(Error::last_os_error());
228 }
229
230 Ok(n as usize)
231 }
232
233 fn flush(&mut self) -> Result<()> {
234 let n = unsafe { hdfsFlush(self.fs, self.f) };
235
236 if n == -1 {
237 return Err(Error::last_os_error());
238 }
239
240 Ok(())
241 }
242}
243
244#[cfg(test)]
245mod tests {
246 use super::*;
247 use crate::client::ClientBuilder;
248
249 #[test]
250 fn test_file_build() {
251 let _ = env_logger::try_init();
252
253 let fs = ClientBuilder::new("default")
254 .connect()
255 .expect("init success");
256
257 let path = uuid::Uuid::new_v4().to_string();
258
259 let f = fs
260 .open_file()
261 .create(true)
262 .write(true)
263 .open(&format!("/tmp/{path}"))
264 .expect("open file success");
265
266 assert!(!f.f.is_null());
267 assert!(!f.fs.is_null());
268 }
269
270 #[test]
271 fn test_file_write() {
272 let _ = env_logger::try_init();
273
274 let fs = ClientBuilder::new("default")
275 .connect()
276 .expect("init success");
277
278 let path = uuid::Uuid::new_v4().to_string();
279
280 let mut f = fs
281 .open_file()
282 .create(true)
283 .write(true)
284 .open(&format!("/tmp/{path}"))
285 .expect("open file success");
286
287 let n = f
288 .write("Hello, World!".as_bytes())
289 .expect("write must success");
290 assert_eq!(n, 13)
291 }
292
293 #[test]
294 fn test_file_read() {
295 let _ = env_logger::try_init();
296
297 let fs = ClientBuilder::new("default")
298 .connect()
299 .expect("init success");
300
301 let path = uuid::Uuid::new_v4().to_string();
302
303 let mut f = fs
304 .open_file()
305 .create(true)
306 .write(true)
307 .open(&format!("/tmp/{path}"))
308 .expect("open file success");
309
310 let n = f
311 .write("Hello, World!".as_bytes())
312 .expect("write must success");
313 assert_eq!(n, 13)
314 }
315}