1use std::future::Future;
8use std::io::{self, Read, Seek, SeekFrom, Write};
9use std::sync::{Arc, Mutex};
10
11use bytes::Bytes;
12use futures::StreamExt;
13use futures::stream::BoxStream;
14use tokio::runtime::Runtime;
15
16use crate::acl::{AclEntry, AclStatus};
17use crate::client::{self, ContentSummary, FileStatus, WriteOptions};
18use crate::file::{FileReader as AsyncFileReader, FileWriter as AsyncFileWriter};
19use crate::{Result, client::IORuntime};
20
21fn io_error(error: crate::HdfsError) -> io::Error {
22 io::Error::other(error)
23}
24
25#[derive(Default)]
27pub struct ClientBuilder {
28 inner: client::ClientBuilder,
29}
30
31impl ClientBuilder {
32 pub fn new() -> Self {
34 Self::default()
35 }
36
37 pub fn with_url(mut self, url: impl Into<String>) -> Self {
39 self.inner = self.inner.with_url(url);
40 self
41 }
42
43 pub fn with_config(
45 mut self,
46 config: impl IntoIterator<Item = (impl Into<String>, impl Into<String>)>,
47 ) -> Self {
48 self.inner = self.inner.with_config(config);
49 self
50 }
51
52 pub fn with_config_dir(mut self, config_dir: impl Into<String>) -> Self {
54 self.inner = self.inner.with_config_dir(config_dir);
55 self
56 }
57
58 pub fn with_user(mut self, user: impl Into<String>) -> Self {
61 self.inner = self.inner.with_user(user);
62 self
63 }
64
65 pub fn build(self) -> Result<Client> {
67 let rt = Arc::new(Runtime::new()?);
68 let inner = self
69 .inner
70 .with_io_runtime(IORuntime::from(rt.handle().clone()))
71 .build()?;
72 Ok(Client { inner, rt })
73 }
74}
75
76#[derive(Clone, Debug)]
78pub struct Client {
79 inner: client::Client,
80 rt: Arc<Runtime>,
81}
82
83impl Client {
84 fn block_on<F: Future>(&self, future: F) -> F::Output {
85 self.rt.block_on(future)
86 }
87
88 pub fn get_file_info(&self, path: &str) -> Result<FileStatus> {
90 self.block_on(self.inner.get_file_info(path))
91 }
92
93 pub fn list_status(&self, path: &str, recursive: bool) -> Result<Vec<FileStatus>> {
95 self.block_on(self.inner.list_status(path, recursive))
96 }
97
98 pub fn list_status_iter(&self, path: &str, recursive: bool) -> ListStatusIterator {
100 ListStatusIterator {
101 inner: self.inner.list_status_iter(path, recursive),
102 rt: Arc::clone(&self.rt),
103 }
104 }
105
106 pub fn read(&self, path: &str) -> Result<FileReader> {
108 Ok(FileReader {
109 inner: self.block_on(self.inner.read(path))?,
110 rt: Arc::clone(&self.rt),
111 })
112 }
113
114 pub fn create(&self, src: &str, write_options: impl AsRef<WriteOptions>) -> Result<FileWriter> {
116 Ok(FileWriter {
117 inner: self.block_on(self.inner.create(src, write_options))?,
118 rt: Arc::clone(&self.rt),
119 })
120 }
121
122 pub fn append(&self, src: &str) -> Result<FileWriter> {
124 Ok(FileWriter {
125 inner: self.block_on(self.inner.append(src))?,
126 rt: Arc::clone(&self.rt),
127 })
128 }
129
130 pub fn mkdirs(&self, path: &str, permission: u32, create_parent: bool) -> Result<()> {
132 self.block_on(self.inner.mkdirs(path, permission, create_parent))
133 }
134
135 pub fn rename(&self, src: &str, dst: &str, overwrite: bool) -> Result<()> {
137 self.block_on(self.inner.rename(src, dst, overwrite))
138 }
139
140 pub fn delete(&self, path: &str, recursive: bool) -> Result<bool> {
142 self.block_on(self.inner.delete(path, recursive))
143 }
144
145 pub fn trash(&self, path: &str) -> Result<Option<String>> {
147 self.block_on(self.inner.trash(path))
148 }
149
150 pub fn set_times(&self, path: &str, mtime: u64, atime: u64) -> Result<()> {
152 self.block_on(self.inner.set_times(path, mtime, atime))
153 }
154
155 pub fn set_owner(&self, path: &str, owner: Option<&str>, group: Option<&str>) -> Result<()> {
157 self.block_on(self.inner.set_owner(path, owner, group))
158 }
159
160 pub fn set_permission(&self, path: &str, permission: u32) -> Result<()> {
162 self.block_on(self.inner.set_permission(path, permission))
163 }
164
165 pub fn set_replication(&self, path: &str, replication: u32) -> Result<bool> {
167 self.block_on(self.inner.set_replication(path, replication))
168 }
169
170 pub fn get_content_summary(&self, path: &str) -> Result<ContentSummary> {
172 self.block_on(self.inner.get_content_summary(path))
173 }
174
175 pub fn modify_acl_entries(&self, path: &str, acl_spec: Vec<AclEntry>) -> Result<()> {
177 self.block_on(self.inner.modify_acl_entries(path, acl_spec))
178 }
179
180 pub fn remove_acl_entries(&self, path: &str, acl_spec: Vec<AclEntry>) -> Result<()> {
182 self.block_on(self.inner.remove_acl_entries(path, acl_spec))
183 }
184
185 pub fn remove_default_acl(&self, path: &str) -> Result<()> {
187 self.block_on(self.inner.remove_default_acl(path))
188 }
189
190 pub fn remove_acl(&self, path: &str) -> Result<()> {
192 self.block_on(self.inner.remove_acl(path))
193 }
194
195 pub fn set_acl(&self, path: &str, acl_spec: Vec<AclEntry>) -> Result<()> {
197 self.block_on(self.inner.set_acl(path, acl_spec))
198 }
199
200 pub fn get_acl_status(&self, path: &str) -> Result<AclStatus> {
202 self.block_on(self.inner.get_acl_status(path))
203 }
204
205 pub fn glob_status(&self, pattern: &str) -> Result<Vec<FileStatus>> {
207 self.block_on(self.inner.glob_status(pattern))
208 }
209}
210
211impl Default for Client {
212 fn default() -> Self {
213 ClientBuilder::new()
214 .build()
215 .expect("Failed to create default client")
216 }
217}
218
219pub struct ListStatusIterator {
221 inner: client::ListStatusIterator,
222 rt: Arc<Runtime>,
223}
224
225impl Iterator for ListStatusIterator {
226 type Item = Result<FileStatus>;
227
228 fn next(&mut self) -> Option<Self::Item> {
229 self.rt.block_on(self.inner.next())
230 }
231}
232
233pub struct FileReader {
235 inner: AsyncFileReader,
236 rt: Arc<Runtime>,
237}
238
239impl FileReader {
240 pub fn file_length(&self) -> usize {
242 self.inner.file_length()
243 }
244
245 pub fn remaining(&self) -> usize {
247 self.inner.remaining()
248 }
249
250 pub fn set_position(&mut self, pos: usize) {
252 self.inner.set_position(pos);
253 }
254
255 pub fn tell(&self) -> usize {
257 self.inner.tell()
258 }
259
260 pub fn read_bytes(&mut self, len: usize) -> Result<Bytes> {
262 self.rt.block_on(self.inner.read_bytes(len))
263 }
264
265 pub fn read_into(&mut self, buf: &mut [u8]) -> Result<usize> {
267 self.rt.block_on(self.inner.read_into(buf))
268 }
269
270 pub fn read_range(&self, offset: usize, len: usize) -> Result<Bytes> {
272 self.rt.block_on(self.inner.read_range(offset, len))
273 }
274
275 pub fn read_range_buf(&self, buf: &mut [u8], offset: usize) -> Result<()> {
277 self.rt.block_on(self.inner.read_range_buf(buf, offset))
278 }
279
280 pub fn read_range_stream(&self, offset: usize, len: usize) -> FileReadStream {
282 FileReadStream {
283 inner: Mutex::new(self.inner.read_range_stream(offset, len).boxed()),
284 rt: Arc::clone(&self.rt),
285 }
286 }
287}
288
289impl Read for FileReader {
290 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
291 self.read_into(buf).map_err(io_error)
292 }
293}
294
295impl Seek for FileReader {
296 fn seek(&mut self, pos: SeekFrom) -> io::Result<u64> {
297 let file_length = self.file_length() as i128;
298 let current = self.tell() as i128;
299 let new_pos = match pos {
300 SeekFrom::Start(pos) => i128::from(pos),
301 SeekFrom::End(offset) => file_length + i128::from(offset),
302 SeekFrom::Current(offset) => current + i128::from(offset),
303 };
304
305 if new_pos < 0 || new_pos > file_length {
306 return Err(io::Error::new(
307 io::ErrorKind::InvalidInput,
308 "cannot seek outside of file bounds",
309 ));
310 }
311
312 self.inner.set_position(new_pos as usize);
313 Ok(new_pos as u64)
314 }
315}
316
317pub struct FileReadStream {
319 inner: Mutex<BoxStream<'static, Result<Bytes>>>,
320 rt: Arc<Runtime>,
321}
322
323impl Iterator for FileReadStream {
324 type Item = Result<Bytes>;
325
326 fn next(&mut self) -> Option<Self::Item> {
327 self.rt.block_on(self.inner.lock().unwrap().next())
328 }
329}
330
331pub struct FileWriter {
333 inner: AsyncFileWriter,
334 rt: Arc<Runtime>,
335}
336
337impl FileWriter {
338 pub fn write_bytes(&mut self, buf: Bytes) -> Result<usize> {
340 self.rt.block_on(self.inner.write_bytes(buf))
341 }
342
343 pub fn close(&mut self) -> Result<()> {
345 self.rt.block_on(self.inner.close())
346 }
347}
348
349impl Write for FileWriter {
350 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
351 self.write_bytes(Bytes::copy_from_slice(buf))
352 .map_err(io_error)
353 }
354
355 fn flush(&mut self) -> io::Result<()> {
356 Ok(())
357 }
358}