Skip to main content

hdfs_native/
sync.rs

1//! Synchronous wrappers around the asynchronous HDFS client.
2//!
3//! The sync client owns a Tokio runtime and delegates operations to the async
4//! [`crate::Client`]. This is intended for applications that want blocking APIs
5//! without managing an async runtime directly.
6
7use std::future::Future;
8use std::io::{self, Read, Seek, SeekFrom, Write};
9use std::sync::{Arc, Mutex};
10
11use bytes::Bytes;
12use futures::StreamExt;
13use futures::stream::BoxStream;
14use tokio::runtime::Runtime;
15
16use crate::acl::{AclEntry, AclStatus};
17use crate::client::{self, ContentSummary, FileStatus, WriteOptions};
18use crate::file::{FileReader as AsyncFileReader, FileWriter as AsyncFileWriter};
19use crate::{Result, client::IORuntime};
20
21fn io_error(error: crate::HdfsError) -> io::Error {
22    io::Error::other(error)
23}
24
25/// Builds a new synchronous [`Client`] instance.
26#[derive(Default)]
27pub struct ClientBuilder {
28    inner: client::ClientBuilder,
29}
30
31impl ClientBuilder {
32    /// Create a new [`ClientBuilder`].
33    pub fn new() -> Self {
34        Self::default()
35    }
36
37    /// Set the URL to connect to.
38    pub fn with_url(mut self, url: impl Into<String>) -> Self {
39        self.inner = self.inner.with_url(url);
40        self
41    }
42
43    /// Set configs to use for the client.
44    pub fn with_config(
45        mut self,
46        config: impl IntoIterator<Item = (impl Into<String>, impl Into<String>)>,
47    ) -> Self {
48        self.inner = self.inner.with_config(config);
49        self
50    }
51
52    /// Set the configuration directory path to read from.
53    pub fn with_config_dir(mut self, config_dir: impl Into<String>) -> Self {
54        self.inner = self.inner.with_config_dir(config_dir);
55        self
56    }
57
58    /// Set the effective user for the client. If not set, the client will detect user from
59    /// environment variables `HADOOP_USER_NAME` or `HADOOP_PROXY_USER`.
60    pub fn with_user(mut self, user: impl Into<String>) -> Self {
61        self.inner = self.inner.with_user(user);
62        self
63    }
64
65    /// Create the synchronous [`Client`] from the provided settings.
66    pub fn build(self) -> Result<Client> {
67        let rt = Arc::new(Runtime::new()?);
68        let inner = self
69            .inner
70            .with_io_runtime(IORuntime::from(rt.handle().clone()))
71            .build()?;
72        Ok(Client { inner, rt })
73    }
74}
75
76/// A blocking HDFS client.
77#[derive(Clone, Debug)]
78pub struct Client {
79    inner: client::Client,
80    rt: Arc<Runtime>,
81}
82
83impl Client {
84    fn block_on<F: Future>(&self, future: F) -> F::Output {
85        self.rt.block_on(future)
86    }
87
88    /// Retrieve the file status for the file at `path`.
89    pub fn get_file_info(&self, path: &str) -> Result<FileStatus> {
90        self.block_on(self.inner.get_file_info(path))
91    }
92
93    /// Retrieve all file statuses under `path`.
94    pub fn list_status(&self, path: &str, recursive: bool) -> Result<Vec<FileStatus>> {
95        self.block_on(self.inner.list_status(path, recursive))
96    }
97
98    /// Retrieve a blocking iterator of all files in directories located at `path`.
99    pub fn list_status_iter(&self, path: &str, recursive: bool) -> ListStatusIterator {
100        ListStatusIterator {
101            inner: self.inner.list_status_iter(path, recursive),
102            rt: Arc::clone(&self.rt),
103        }
104    }
105
106    /// Opens a file reader for the file at `path`.
107    pub fn read(&self, path: &str) -> Result<FileReader> {
108        Ok(FileReader {
109            inner: self.block_on(self.inner.read(path))?,
110            rt: Arc::clone(&self.rt),
111        })
112    }
113
114    /// Opens a new file for writing.
115    pub fn create(&self, src: &str, write_options: impl AsRef<WriteOptions>) -> Result<FileWriter> {
116        Ok(FileWriter {
117            inner: self.block_on(self.inner.create(src, write_options))?,
118            rt: Arc::clone(&self.rt),
119        })
120    }
121
122    /// Opens an existing file for appending.
123    pub fn append(&self, src: &str) -> Result<FileWriter> {
124        Ok(FileWriter {
125            inner: self.block_on(self.inner.append(src))?,
126            rt: Arc::clone(&self.rt),
127        })
128    }
129
130    /// Create a new directory at `path` with the given permission.
131    pub fn mkdirs(&self, path: &str, permission: u32, create_parent: bool) -> Result<()> {
132        self.block_on(self.inner.mkdirs(path, permission, create_parent))
133    }
134
135    /// Rename `src` to `dst`.
136    pub fn rename(&self, src: &str, dst: &str, overwrite: bool) -> Result<()> {
137        self.block_on(self.inner.rename(src, dst, overwrite))
138    }
139
140    /// Delete the file or directory at `path`.
141    pub fn delete(&self, path: &str, recursive: bool) -> Result<bool> {
142        self.block_on(self.inner.delete(path, recursive))
143    }
144
145    /// Move a file or directory at `path` into the user's trash.
146    pub fn trash(&self, path: &str) -> Result<Option<String>> {
147        self.block_on(self.inner.trash(path))
148    }
149
150    /// Set modified and access times for a file.
151    pub fn set_times(&self, path: &str, mtime: u64, atime: u64) -> Result<()> {
152        self.block_on(self.inner.set_times(path, mtime, atime))
153    }
154
155    /// Optionally set the owner and group for a file.
156    pub fn set_owner(&self, path: &str, owner: Option<&str>, group: Option<&str>) -> Result<()> {
157        self.block_on(self.inner.set_owner(path, owner, group))
158    }
159
160    /// Set permissions for a file.
161    pub fn set_permission(&self, path: &str, permission: u32) -> Result<()> {
162        self.block_on(self.inner.set_permission(path, permission))
163    }
164
165    /// Set replication for a file.
166    pub fn set_replication(&self, path: &str, replication: u32) -> Result<bool> {
167        self.block_on(self.inner.set_replication(path, replication))
168    }
169
170    /// Get a content summary for a file or directory rooted at `path`.
171    pub fn get_content_summary(&self, path: &str) -> Result<ContentSummary> {
172        self.block_on(self.inner.get_content_summary(path))
173    }
174
175    /// Update ACL entries for file or directory at `path`.
176    pub fn modify_acl_entries(&self, path: &str, acl_spec: Vec<AclEntry>) -> Result<()> {
177        self.block_on(self.inner.modify_acl_entries(path, acl_spec))
178    }
179
180    /// Remove specific ACL entries for file or directory at `path`.
181    pub fn remove_acl_entries(&self, path: &str, acl_spec: Vec<AclEntry>) -> Result<()> {
182        self.block_on(self.inner.remove_acl_entries(path, acl_spec))
183    }
184
185    /// Remove all default ACL entries for file or directory at `path`.
186    pub fn remove_default_acl(&self, path: &str) -> Result<()> {
187        self.block_on(self.inner.remove_default_acl(path))
188    }
189
190    /// Remove all ACL entries for file or directory at `path`.
191    pub fn remove_acl(&self, path: &str) -> Result<()> {
192        self.block_on(self.inner.remove_acl(path))
193    }
194
195    /// Override ACL entries for file or directory at `path`.
196    pub fn set_acl(&self, path: &str, acl_spec: Vec<AclEntry>) -> Result<()> {
197        self.block_on(self.inner.set_acl(path, acl_spec))
198    }
199
200    /// Get ACL status for the file or directory at `path`.
201    pub fn get_acl_status(&self, path: &str) -> Result<AclStatus> {
202        self.block_on(self.inner.get_acl_status(path))
203    }
204
205    /// Get all file statuses matching the glob `pattern`.
206    pub fn glob_status(&self, pattern: &str) -> Result<Vec<FileStatus>> {
207        self.block_on(self.inner.glob_status(pattern))
208    }
209}
210
211impl Default for Client {
212    fn default() -> Self {
213        ClientBuilder::new()
214            .build()
215            .expect("Failed to create default client")
216    }
217}
218
219/// A blocking file status iterator.
220pub struct ListStatusIterator {
221    inner: client::ListStatusIterator,
222    rt: Arc<Runtime>,
223}
224
225impl Iterator for ListStatusIterator {
226    type Item = Result<FileStatus>;
227
228    fn next(&mut self) -> Option<Self::Item> {
229        self.rt.block_on(self.inner.next())
230    }
231}
232
233/// A blocking file reader.
234pub struct FileReader {
235    inner: AsyncFileReader,
236    rt: Arc<Runtime>,
237}
238
239impl FileReader {
240    /// Returns the total size of the file.
241    pub fn file_length(&self) -> usize {
242        self.inner.file_length()
243    }
244
245    /// Returns the remaining bytes left based on the current cursor position.
246    pub fn remaining(&self) -> usize {
247        self.inner.remaining()
248    }
249
250    /// Sets the cursor position.
251    pub fn set_position(&mut self, pos: usize) {
252        self.inner.set_position(pos);
253    }
254
255    /// Returns the current cursor position in the file.
256    pub fn tell(&self) -> usize {
257        self.inner.tell()
258    }
259
260    /// Read up to `len` bytes, advancing the internal position.
261    pub fn read_bytes(&mut self, len: usize) -> Result<Bytes> {
262        self.rt.block_on(self.inner.read_bytes(len))
263    }
264
265    /// Read up to `buf.len()` bytes into the provided slice.
266    pub fn read_into(&mut self, buf: &mut [u8]) -> Result<usize> {
267        self.rt.block_on(self.inner.read_into(buf))
268    }
269
270    /// Read up to `len` bytes starting at `offset`.
271    pub fn read_range(&self, offset: usize, len: usize) -> Result<Bytes> {
272        self.rt.block_on(self.inner.read_range(offset, len))
273    }
274
275    /// Read file data into an existing buffer.
276    pub fn read_range_buf(&self, buf: &mut [u8], offset: usize) -> Result<()> {
277        self.rt.block_on(self.inner.read_range_buf(buf, offset))
278    }
279
280    /// Return a blocking stream of `Bytes` objects containing the file content.
281    pub fn read_range_stream(&self, offset: usize, len: usize) -> FileReadStream {
282        FileReadStream {
283            inner: Mutex::new(self.inner.read_range_stream(offset, len).boxed()),
284            rt: Arc::clone(&self.rt),
285        }
286    }
287}
288
289impl Read for FileReader {
290    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
291        self.read_into(buf).map_err(io_error)
292    }
293}
294
295impl Seek for FileReader {
296    fn seek(&mut self, pos: SeekFrom) -> io::Result<u64> {
297        let file_length = self.file_length() as i128;
298        let current = self.tell() as i128;
299        let new_pos = match pos {
300            SeekFrom::Start(pos) => i128::from(pos),
301            SeekFrom::End(offset) => file_length + i128::from(offset),
302            SeekFrom::Current(offset) => current + i128::from(offset),
303        };
304
305        if new_pos < 0 || new_pos > file_length {
306            return Err(io::Error::new(
307                io::ErrorKind::InvalidInput,
308                "cannot seek outside of file bounds",
309            ));
310        }
311
312        self.inner.set_position(new_pos as usize);
313        Ok(new_pos as u64)
314    }
315}
316
317/// A blocking stream of file bytes.
318pub struct FileReadStream {
319    inner: Mutex<BoxStream<'static, Result<Bytes>>>,
320    rt: Arc<Runtime>,
321}
322
323impl Iterator for FileReadStream {
324    type Item = Result<Bytes>;
325
326    fn next(&mut self) -> Option<Self::Item> {
327        self.rt.block_on(self.inner.lock().unwrap().next())
328    }
329}
330
331/// A blocking file writer.
332pub struct FileWriter {
333    inner: AsyncFileWriter,
334    rt: Arc<Runtime>,
335}
336
337impl FileWriter {
338    /// Write bytes to the file.
339    pub fn write_bytes(&mut self, buf: Bytes) -> Result<usize> {
340        self.rt.block_on(self.inner.write_bytes(buf))
341    }
342
343    /// Close the file writer.
344    pub fn close(&mut self) -> Result<()> {
345        self.rt.block_on(self.inner.close())
346    }
347}
348
349impl Write for FileWriter {
350    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
351        self.write_bytes(Bytes::copy_from_slice(buf))
352            .map_err(io_error)
353    }
354
355    fn flush(&mut self) -> io::Result<()> {
356        Ok(())
357    }
358}