hdfs_native/
client.rs

1use std::collections::{HashMap, VecDeque};
2use std::sync::{Arc, OnceLock};
3
4use futures::stream::BoxStream;
5use futures::{stream, StreamExt};
6use tokio::runtime::{Handle, Runtime};
7use url::Url;
8
9use crate::acl::{AclEntry, AclStatus};
10use crate::common::config::{self, Configuration};
11use crate::ec::resolve_ec_policy;
12use crate::error::{HdfsError, Result};
13use crate::file::{FileReader, FileWriter};
14use crate::hdfs::protocol::NamenodeProtocol;
15use crate::hdfs::proxy::NameServiceProxy;
16use crate::proto::hdfs::hdfs_file_status_proto::FileType;
17
18use crate::proto::hdfs::{ContentSummaryProto, HdfsFileStatusProto};
19
20#[derive(Clone)]
21pub struct WriteOptions {
22    /// Block size. Default is retrieved from the server.
23    pub block_size: Option<u64>,
24    /// Replication factor. Default is retrieved from the server.
25    pub replication: Option<u32>,
26    /// Unix file permission, defaults to 0o644, which is "rw-r--r--" as a Unix permission.
27    /// This is the raw octal value represented in base 10.
28    pub permission: u32,
29    /// Whether to overwrite the file, defaults to false. If true and the
30    /// file does not exist, it will result in an error.
31    pub overwrite: bool,
32    /// Whether to create any missing parent directories, defaults to true. If false
33    /// and the parent directory does not exist, an error will be returned.
34    pub create_parent: bool,
35}
36
37impl Default for WriteOptions {
38    fn default() -> Self {
39        Self {
40            block_size: None,
41            replication: None,
42            permission: 0o644,
43            overwrite: false,
44            create_parent: true,
45        }
46    }
47}
48
49impl AsRef<WriteOptions> for WriteOptions {
50    fn as_ref(&self) -> &WriteOptions {
51        self
52    }
53}
54
55impl WriteOptions {
56    /// Set the block_size for the new file
57    pub fn block_size(mut self, block_size: u64) -> Self {
58        self.block_size = Some(block_size);
59        self
60    }
61
62    /// Set the replication for the new file
63    pub fn replication(mut self, replication: u32) -> Self {
64        self.replication = Some(replication);
65        self
66    }
67
68    /// Set the raw octal permission value for the new file
69    pub fn permission(mut self, permission: u32) -> Self {
70        self.permission = permission;
71        self
72    }
73
74    /// Set whether to overwrite an existing file
75    pub fn overwrite(mut self, overwrite: bool) -> Self {
76        self.overwrite = overwrite;
77        self
78    }
79
80    /// Set whether to create all missing parent directories
81    pub fn create_parent(mut self, create_parent: bool) -> Self {
82        self.create_parent = create_parent;
83        self
84    }
85}
86
87#[derive(Debug, Clone)]
88struct MountLink {
89    viewfs_path: String,
90    hdfs_path: String,
91    protocol: Arc<NamenodeProtocol>,
92}
93
94impl MountLink {
95    fn new(viewfs_path: &str, hdfs_path: &str, protocol: Arc<NamenodeProtocol>) -> Self {
96        // We should never have an empty path, we always want things mounted at root ("/") by default.
97        Self {
98            viewfs_path: viewfs_path.trim_end_matches("/").to_string(),
99            hdfs_path: hdfs_path.trim_end_matches("/").to_string(),
100            protocol,
101        }
102    }
103    /// Convert a viewfs path into a name service path if it matches this link
104    fn resolve(&self, path: &str) -> Option<String> {
105        // Make sure we don't partially match the last component. It either needs to be an exact
106        // match to a viewfs path, or needs to match with a trailing slash
107        if path == self.viewfs_path {
108            Some(self.hdfs_path.clone())
109        } else {
110            path.strip_prefix(&format!("{}/", self.viewfs_path))
111                .map(|relative_path| format!("{}/{}", &self.hdfs_path, relative_path))
112        }
113    }
114}
115
116#[derive(Debug)]
117struct MountTable {
118    mounts: Vec<MountLink>,
119    fallback: MountLink,
120}
121
122impl MountTable {
123    fn resolve(&self, src: &str) -> (&MountLink, String) {
124        for link in self.mounts.iter() {
125            if let Some(resolved) = link.resolve(src) {
126                return (link, resolved);
127            }
128        }
129        (&self.fallback, self.fallback.resolve(src).unwrap())
130    }
131}
132
133/// Holds either a [Runtime] or a [Handle] to an existing runtime for IO tasks
134#[derive(Debug)]
135pub enum IORuntime {
136    Runtime(Runtime),
137    Handle(Handle),
138}
139
140impl From<Runtime> for IORuntime {
141    fn from(value: Runtime) -> Self {
142        Self::Runtime(value)
143    }
144}
145
146impl From<Handle> for IORuntime {
147    fn from(value: Handle) -> Self {
148        Self::Handle(value)
149    }
150}
151
152impl IORuntime {
153    fn handle(&self) -> Handle {
154        match self {
155            Self::Runtime(runtime) => runtime.handle().clone(),
156            Self::Handle(handle) => handle.clone(),
157        }
158    }
159}
160
161/// Builds a new [Client] instance. By default, configs will be loaded from the default config directories with the following precedence:
162/// - If the `HADOOP_CONF_DIR` environment variable is defined, configs will be loaded from `${HADOOP_CONF_DIR}/{core,hdfs}-site.xml`
163/// - If the `HADOOP_HOME` environment variable is defined, configs will be loaded from `${HADOOP_HOME}/etc/hadoop/{core,hdfs}-site.xml`
164/// - Otherwise no default configs are defined
165///
166/// If no URL is defined, the `fs.defaultFS` config must be defined and is used as the URL.
167///
168/// # Examples
169///
170/// Create a new client using the fs.defaultFS config
171/// ```rust
172/// # use hdfs_native::ClientBuilder;
173/// let client = ClientBuilder::new()
174///     .with_config(vec![("fs.defaultFS", "hdfs://127.0.0.1:9000")])
175///     .build()
176///     .unwrap();
177/// ```
178///
179/// Create a new client connecting to a specific URL:
180/// ```rust
181/// # use hdfs_native::ClientBuilder;
182/// let client = ClientBuilder::new()
183///     .with_url("hdfs://127.0.0.1:9000")
184///     .build()
185///     .unwrap();
186/// ```
187///
188/// Create a new client using a dedicated tokio runtime for spawned tasks and IO operations
189/// ```rust
190/// # use hdfs_native::ClientBuilder;
191/// let client = ClientBuilder::new()
192///     .with_url("hdfs://127.0.0.1:9000")
193///     .with_io_runtime(tokio::runtime::Runtime::new().unwrap())
194///     .build()
195///     .unwrap();
196/// ```
197#[derive(Default)]
198pub struct ClientBuilder {
199    url: Option<String>,
200    config: HashMap<String, String>,
201    runtime: Option<IORuntime>,
202}
203
204impl ClientBuilder {
205    /// Create a new [ClientBuilder]
206    pub fn new() -> Self {
207        Self::default()
208    }
209
210    /// Set the URL to connect to. Can be the address of a single NameNode, or a logical NameService
211    pub fn with_url(mut self, url: impl Into<String>) -> Self {
212        self.url = Some(url.into());
213        self
214    }
215
216    /// Set configs to use for the client. The provided configs will override any found in the default config files loaded
217    pub fn with_config(
218        mut self,
219        config: impl IntoIterator<Item = (impl Into<String>, impl Into<String>)>,
220    ) -> Self {
221        self.config = config
222            .into_iter()
223            .map(|(k, v)| (k.into(), v.into()))
224            .collect();
225        self
226    }
227
228    /// Use a dedicated tokio runtime for spawned tasks and IO operations. Can either take ownership of a whole [Runtime]
229    /// or take a [Handle] to an externally owned runtime.
230    pub fn with_io_runtime(mut self, runtime: impl Into<IORuntime>) -> Self {
231        self.runtime = Some(runtime.into());
232        self
233    }
234
235    /// Create the [Client] instance from the provided settings
236    pub fn build(self) -> Result<Client> {
237        let config = Configuration::new_with_config(self.config)?;
238        let url = if let Some(url) = self.url {
239            Url::parse(&url)?
240        } else {
241            Client::default_fs(&config)?
242        };
243
244        Client::build(&url, config, self.runtime)
245    }
246}
247
248#[derive(Clone, Debug)]
249enum RuntimeHolder {
250    Custom(Arc<IORuntime>),
251    Default(Arc<OnceLock<Runtime>>),
252}
253
254impl RuntimeHolder {
255    fn new(rt: Option<IORuntime>) -> Self {
256        if let Some(rt) = rt {
257            Self::Custom(Arc::new(rt))
258        } else {
259            Self::Default(Arc::new(OnceLock::new()))
260        }
261    }
262
263    fn get_handle(&self) -> Handle {
264        match self {
265            Self::Custom(rt) => rt.handle().clone(),
266            Self::Default(rt) => match Handle::try_current() {
267                Ok(handle) => handle,
268                Err(_) => rt
269                    .get_or_init(|| Runtime::new().expect("Failed to create tokio runtime"))
270                    .handle()
271                    .clone(),
272            },
273        }
274    }
275}
276
277/// A client to a speicific NameNode, NameService, or Viewfs mount table
278#[derive(Clone, Debug)]
279pub struct Client {
280    mount_table: Arc<MountTable>,
281    config: Arc<Configuration>,
282    // Store the runtime used for spawning all internal tasks. If we are not created
283    // inside a tokio runtime, we will create our own to use.
284    rt_holder: RuntimeHolder,
285}
286
287impl Client {
288    /// Creates a new HDFS Client. The URL must include the protocol and host, and optionally a port.
289    /// If a port is included, the host is treated as a single NameNode. If no port is included, the
290    /// host is treated as a name service that will be resolved using the HDFS config.
291    #[deprecated(since = "0.12.0", note = "Use ClientBuilder instead")]
292    pub fn new(url: &str) -> Result<Self> {
293        let parsed_url = Url::parse(url)?;
294        Self::build(&parsed_url, Configuration::new()?, None)
295    }
296
297    #[deprecated(since = "0.12.0", note = "Use ClientBuilder instead")]
298    pub fn new_with_config(url: &str, config: HashMap<String, String>) -> Result<Self> {
299        let parsed_url = Url::parse(url)?;
300        Self::build(&parsed_url, Configuration::new_with_config(config)?, None)
301    }
302
303    #[deprecated(since = "0.12.0", note = "Use ClientBuilder instead")]
304    pub fn default_with_config(config: HashMap<String, String>) -> Result<Self> {
305        let config = Configuration::new_with_config(config)?;
306        Self::build(&Self::default_fs(&config)?, config, None)
307    }
308
309    fn default_fs(config: &Configuration) -> Result<Url> {
310        let url = config
311            .get(config::DEFAULT_FS)
312            .ok_or(HdfsError::InvalidArgument(format!(
313                "No {} setting found",
314                config::DEFAULT_FS
315            )))?;
316        Ok(Url::parse(url)?)
317    }
318
319    fn build(url: &Url, config: Configuration, rt: Option<IORuntime>) -> Result<Self> {
320        let resolved_url = if !url.has_host() {
321            let default_url = Self::default_fs(&config)?;
322            if url.scheme() != default_url.scheme() || !default_url.has_host() {
323                return Err(HdfsError::InvalidArgument(
324                    "URL must contain a host".to_string(),
325                ));
326            }
327            default_url
328        } else {
329            url.clone()
330        };
331
332        let config = Arc::new(config);
333
334        let rt_holder = RuntimeHolder::new(rt);
335
336        let mount_table = match url.scheme() {
337            "hdfs" => {
338                let proxy = NameServiceProxy::new(
339                    &resolved_url,
340                    Arc::clone(&config),
341                    rt_holder.get_handle(),
342                )?;
343                let protocol = Arc::new(NamenodeProtocol::new(proxy, rt_holder.get_handle()));
344
345                MountTable {
346                    mounts: Vec::new(),
347                    fallback: MountLink::new("/", "/", protocol),
348                }
349            }
350            "viewfs" => Self::build_mount_table(
351                // Host is guaranteed to be present.
352                resolved_url.host_str().expect("URL must have a host"),
353                Arc::clone(&config),
354                rt_holder.get_handle(),
355            )?,
356            _ => {
357                return Err(HdfsError::InvalidArgument(
358                    "Only `hdfs` and `viewfs` schemes are supported".to_string(),
359                ))
360            }
361        };
362
363        Ok(Self {
364            mount_table: Arc::new(mount_table),
365            config,
366            rt_holder,
367        })
368    }
369
370    fn build_mount_table(
371        host: &str,
372        config: Arc<Configuration>,
373        handle: Handle,
374    ) -> Result<MountTable> {
375        let mut mounts: Vec<MountLink> = Vec::new();
376        let mut fallback: Option<MountLink> = None;
377
378        for (viewfs_path, hdfs_url) in config.get_mount_table(host).iter() {
379            let url = Url::parse(hdfs_url)?;
380            if !url.has_host() {
381                return Err(HdfsError::InvalidArgument(
382                    "URL must contain a host".to_string(),
383                ));
384            }
385            if url.scheme() != "hdfs" {
386                return Err(HdfsError::InvalidArgument(
387                    "Only hdfs mounts are supported for viewfs".to_string(),
388                ));
389            }
390            let proxy = NameServiceProxy::new(&url, Arc::clone(&config), handle.clone())?;
391            let protocol = Arc::new(NamenodeProtocol::new(proxy, handle.clone()));
392
393            if let Some(prefix) = viewfs_path {
394                mounts.push(MountLink::new(prefix, url.path(), protocol));
395            } else {
396                if fallback.is_some() {
397                    return Err(HdfsError::InvalidArgument(
398                        "Multiple viewfs fallback links found".to_string(),
399                    ));
400                }
401                fallback = Some(MountLink::new("/", url.path(), protocol));
402            }
403        }
404
405        if let Some(fallback) = fallback {
406            // Sort the mount table from longest viewfs path to shortest. This makes sure more specific paths are considered first.
407            mounts.sort_by_key(|m| m.viewfs_path.chars().filter(|c| *c == '/').count());
408            mounts.reverse();
409
410            Ok(MountTable { mounts, fallback })
411        } else {
412            Err(HdfsError::InvalidArgument(
413                "No viewfs fallback mount found".to_string(),
414            ))
415        }
416    }
417
418    /// Retrieve the file status for the file at `path`.
419    pub async fn get_file_info(&self, path: &str) -> Result<FileStatus> {
420        let (link, resolved_path) = self.mount_table.resolve(path);
421        match link.protocol.get_file_info(&resolved_path).await?.fs {
422            Some(status) => Ok(FileStatus::from(status, path)),
423            None => Err(HdfsError::FileNotFound(path.to_string())),
424        }
425    }
426
427    /// Retrives a list of all files in directories located at `path`. Wrapper around `list_status_iter` that
428    /// returns Err if any part of the stream fails, or Ok if all file statuses were found successfully.
429    pub async fn list_status(&self, path: &str, recursive: bool) -> Result<Vec<FileStatus>> {
430        let iter = self.list_status_iter(path, recursive);
431        let statuses = iter
432            .into_stream()
433            .collect::<Vec<Result<FileStatus>>>()
434            .await;
435
436        let mut resolved_statues = Vec::<FileStatus>::with_capacity(statuses.len());
437        for status in statuses.into_iter() {
438            resolved_statues.push(status?);
439        }
440
441        Ok(resolved_statues)
442    }
443
444    /// Retrives an iterator of all files in directories located at `path`.
445    pub fn list_status_iter(&self, path: &str, recursive: bool) -> ListStatusIterator {
446        ListStatusIterator::new(path.to_string(), Arc::clone(&self.mount_table), recursive)
447    }
448
449    /// Opens a file reader for the file at `path`. Path should not include a scheme.
450    pub async fn read(&self, path: &str) -> Result<FileReader> {
451        let (link, resolved_path) = self.mount_table.resolve(path);
452        // Get all block locations. Length is actually a signed value, but the proto uses an unsigned value
453        let located_info = link
454            .protocol
455            .get_block_locations(&resolved_path, 0, i64::MAX as u64)
456            .await?;
457
458        if let Some(locations) = located_info.locations {
459            let ec_schema = if let Some(ec_policy) = locations.ec_policy.as_ref() {
460                Some(resolve_ec_policy(ec_policy)?)
461            } else {
462                None
463            };
464
465            if locations.file_encryption_info.is_some() {
466                return Err(HdfsError::UnsupportedFeature("File encryption".to_string()));
467            }
468
469            Ok(FileReader::new(
470                Arc::clone(&link.protocol),
471                locations,
472                ec_schema,
473                Arc::clone(&self.config),
474                self.rt_holder.get_handle(),
475            ))
476        } else {
477            Err(HdfsError::FileNotFound(path.to_string()))
478        }
479    }
480
481    /// Opens a new file for writing. See [WriteOptions] for options and behavior for different
482    /// scenarios.
483    pub async fn create(
484        &self,
485        src: &str,
486        write_options: impl AsRef<WriteOptions>,
487    ) -> Result<FileWriter> {
488        let write_options = write_options.as_ref();
489
490        let (link, resolved_path) = self.mount_table.resolve(src);
491
492        let create_response = link
493            .protocol
494            .create(
495                &resolved_path,
496                write_options.permission,
497                write_options.overwrite,
498                write_options.create_parent,
499                write_options.replication,
500                write_options.block_size,
501            )
502            .await?;
503
504        match create_response.fs {
505            Some(status) => {
506                if status.file_encryption_info.is_some() {
507                    let _ = self.delete(src, false).await;
508                    return Err(HdfsError::UnsupportedFeature("File encryption".to_string()));
509                }
510
511                Ok(FileWriter::new(
512                    Arc::clone(&link.protocol),
513                    resolved_path,
514                    status,
515                    Arc::clone(&self.config),
516                    self.rt_holder.get_handle(),
517                    None,
518                ))
519            }
520            None => Err(HdfsError::FileNotFound(src.to_string())),
521        }
522    }
523
524    fn needs_new_block(class: &str, msg: &str) -> bool {
525        class == "java.lang.UnsupportedOperationException" && msg.contains("NEW_BLOCK")
526    }
527
528    /// Opens an existing file for appending. An Err will be returned if the file does not exist. If the
529    /// file is replicated, the current block will be appended to until it is full. If the file is erasure
530    /// coded, a new block will be created.
531    pub async fn append(&self, src: &str) -> Result<FileWriter> {
532        let (link, resolved_path) = self.mount_table.resolve(src);
533
534        // Assume the file is replicated and try to append to the current block. If the file is
535        // erasure coded, then try again by appending to a new block.
536        let append_response = match link.protocol.append(&resolved_path, false).await {
537            Err(HdfsError::RPCError(class, msg)) if Self::needs_new_block(&class, &msg) => {
538                link.protocol.append(&resolved_path, true).await?
539            }
540            resp => resp?,
541        };
542
543        match append_response.stat {
544            Some(status) => {
545                if status.file_encryption_info.is_some() {
546                    let _ = link
547                        .protocol
548                        .complete(src, append_response.block.map(|b| b.b), status.file_id)
549                        .await;
550                    return Err(HdfsError::UnsupportedFeature("File encryption".to_string()));
551                }
552
553                Ok(FileWriter::new(
554                    Arc::clone(&link.protocol),
555                    resolved_path,
556                    status,
557                    Arc::clone(&self.config),
558                    self.rt_holder.get_handle(),
559                    append_response.block,
560                ))
561            }
562            None => Err(HdfsError::FileNotFound(src.to_string())),
563        }
564    }
565
566    /// Create a new directory at `path` with the given `permission`.
567    ///
568    /// `permission` is the raw octal value representing the Unix style permission. For example, to
569    /// set 755 (`rwxr-x-rx`) permissions, use 0o755.
570    ///
571    /// If `create_parent` is true, any missing parent directories will be created as well,
572    /// otherwise an error will be returned if the parent directory doesn't already exist.
573    pub async fn mkdirs(&self, path: &str, permission: u32, create_parent: bool) -> Result<()> {
574        let (link, resolved_path) = self.mount_table.resolve(path);
575        link.protocol
576            .mkdirs(&resolved_path, permission, create_parent)
577            .await
578            .map(|_| ())
579    }
580
581    /// Renames `src` to `dst`. Returns Ok(()) on success, and Err otherwise.
582    pub async fn rename(&self, src: &str, dst: &str, overwrite: bool) -> Result<()> {
583        let (src_link, src_resolved_path) = self.mount_table.resolve(src);
584        let (dst_link, dst_resolved_path) = self.mount_table.resolve(dst);
585        if src_link.viewfs_path == dst_link.viewfs_path {
586            src_link
587                .protocol
588                .rename(&src_resolved_path, &dst_resolved_path, overwrite)
589                .await
590                .map(|_| ())
591        } else {
592            Err(HdfsError::InvalidArgument(
593                "Cannot rename across different name services".to_string(),
594            ))
595        }
596    }
597
598    /// Deletes the file or directory at `path`. If `recursive` is false and `path` is a non-empty
599    /// directory, this will fail. Returns `Ok(true)` if it was successfully deleted.
600    pub async fn delete(&self, path: &str, recursive: bool) -> Result<bool> {
601        let (link, resolved_path) = self.mount_table.resolve(path);
602        link.protocol
603            .delete(&resolved_path, recursive)
604            .await
605            .map(|r| r.result)
606    }
607
608    /// Sets the modified and access times for a file. Times should be in milliseconds from the epoch.
609    pub async fn set_times(&self, path: &str, mtime: u64, atime: u64) -> Result<()> {
610        let (link, resolved_path) = self.mount_table.resolve(path);
611        link.protocol
612            .set_times(&resolved_path, mtime, atime)
613            .await?;
614        Ok(())
615    }
616
617    /// Optionally sets the owner and group for a file.
618    pub async fn set_owner(
619        &self,
620        path: &str,
621        owner: Option<&str>,
622        group: Option<&str>,
623    ) -> Result<()> {
624        let (link, resolved_path) = self.mount_table.resolve(path);
625        link.protocol
626            .set_owner(&resolved_path, owner, group)
627            .await?;
628        Ok(())
629    }
630
631    /// Sets permissions for a file. Permission should be an octal number reprenting the Unix style
632    /// permission.
633    ///
634    /// For example, to set permissions to rwxr-xr-x:
635    /// ```rust
636    /// # async fn func() {
637    /// # let client = hdfs_native::Client::new("localhost:9000").unwrap();
638    /// client.set_permission("/path", 0o755).await.unwrap();
639    /// }
640    /// ```
641    pub async fn set_permission(&self, path: &str, permission: u32) -> Result<()> {
642        let (link, resolved_path) = self.mount_table.resolve(path);
643        link.protocol
644            .set_permission(&resolved_path, permission)
645            .await?;
646        Ok(())
647    }
648
649    /// Sets the replication for a file.
650    pub async fn set_replication(&self, path: &str, replication: u32) -> Result<bool> {
651        let (link, resolved_path) = self.mount_table.resolve(path);
652        let result = link
653            .protocol
654            .set_replication(&resolved_path, replication)
655            .await?
656            .result;
657
658        Ok(result)
659    }
660
661    /// Gets a content summary for a file or directory rooted at `path`.
662    pub async fn get_content_summary(&self, path: &str) -> Result<ContentSummary> {
663        let (link, resolved_path) = self.mount_table.resolve(path);
664        let result = link
665            .protocol
666            .get_content_summary(&resolved_path)
667            .await?
668            .summary;
669
670        Ok(result.into())
671    }
672
673    /// Update ACL entries for file or directory at `path`. Existing entries will remain.
674    pub async fn modify_acl_entries(&self, path: &str, acl_spec: Vec<AclEntry>) -> Result<()> {
675        let (link, resolved_path) = self.mount_table.resolve(path);
676        link.protocol
677            .modify_acl_entries(&resolved_path, acl_spec)
678            .await?;
679
680        Ok(())
681    }
682
683    /// Remove specific ACL entries for file or directory at `path`.
684    pub async fn remove_acl_entries(&self, path: &str, acl_spec: Vec<AclEntry>) -> Result<()> {
685        let (link, resolved_path) = self.mount_table.resolve(path);
686        link.protocol
687            .remove_acl_entries(&resolved_path, acl_spec)
688            .await?;
689
690        Ok(())
691    }
692
693    /// Remove all default ACLs for file or directory at `path`.
694    pub async fn remove_default_acl(&self, path: &str) -> Result<()> {
695        let (link, resolved_path) = self.mount_table.resolve(path);
696        link.protocol.remove_default_acl(&resolved_path).await?;
697
698        Ok(())
699    }
700
701    /// Remove all ACL entries for file or directory at `path`.
702    pub async fn remove_acl(&self, path: &str) -> Result<()> {
703        let (link, resolved_path) = self.mount_table.resolve(path);
704        link.protocol.remove_acl(&resolved_path).await?;
705
706        Ok(())
707    }
708
709    /// Override all ACL entries for file or directory at `path`. If only access ACLs are provided,
710    /// default ACLs are maintained. Likewise if only default ACLs are provided, access ACLs are
711    /// maintained.
712    pub async fn set_acl(&self, path: &str, acl_spec: Vec<AclEntry>) -> Result<()> {
713        let (link, resolved_path) = self.mount_table.resolve(path);
714        link.protocol.set_acl(&resolved_path, acl_spec).await?;
715
716        Ok(())
717    }
718
719    /// Get the ACL status for the file or directory at `path`.
720    pub async fn get_acl_status(&self, path: &str) -> Result<AclStatus> {
721        let (link, resolved_path) = self.mount_table.resolve(path);
722        Ok(link
723            .protocol
724            .get_acl_status(&resolved_path)
725            .await?
726            .result
727            .into())
728    }
729}
730
731impl Default for Client {
732    /// Creates a new HDFS Client based on the fs.defaultFS setting. Panics if the config files fail to load,
733    /// no defaultFS is defined, or the defaultFS is invalid.
734    fn default() -> Self {
735        ClientBuilder::new()
736            .build()
737            .expect("Failed to create default client")
738    }
739}
740
741pub(crate) struct DirListingIterator {
742    path: String,
743    resolved_path: String,
744    link: MountLink,
745    files_only: bool,
746    partial_listing: VecDeque<HdfsFileStatusProto>,
747    remaining: u32,
748    last_seen: Vec<u8>,
749}
750
751impl DirListingIterator {
752    fn new(path: String, mount_table: &Arc<MountTable>, files_only: bool) -> Self {
753        let (link, resolved_path) = mount_table.resolve(&path);
754
755        DirListingIterator {
756            path,
757            resolved_path,
758            link: link.clone(),
759            files_only,
760            partial_listing: VecDeque::new(),
761            remaining: 1,
762            last_seen: Vec::new(),
763        }
764    }
765
766    async fn get_next_batch(&mut self) -> Result<bool> {
767        let listing = self
768            .link
769            .protocol
770            .get_listing(&self.resolved_path, self.last_seen.clone(), false)
771            .await?;
772
773        if let Some(dir_list) = listing.dir_list {
774            self.last_seen = dir_list
775                .partial_listing
776                .last()
777                .map(|p| p.path.clone())
778                .unwrap_or(Vec::new());
779
780            self.remaining = dir_list.remaining_entries;
781
782            self.partial_listing = dir_list
783                .partial_listing
784                .into_iter()
785                .filter(|s| !self.files_only || s.file_type() != FileType::IsDir)
786                .collect();
787            Ok(!self.partial_listing.is_empty())
788        } else {
789            Err(HdfsError::FileNotFound(self.path.clone()))
790        }
791    }
792
793    pub async fn next(&mut self) -> Option<Result<FileStatus>> {
794        if self.partial_listing.is_empty() && self.remaining > 0 {
795            if let Err(error) = self.get_next_batch().await {
796                self.remaining = 0;
797                return Some(Err(error));
798            }
799        }
800        if let Some(next) = self.partial_listing.pop_front() {
801            Some(Ok(FileStatus::from(next, &self.path)))
802        } else {
803            None
804        }
805    }
806}
807
808pub struct ListStatusIterator {
809    mount_table: Arc<MountTable>,
810    recursive: bool,
811    iters: Arc<tokio::sync::Mutex<Vec<DirListingIterator>>>,
812}
813
814impl ListStatusIterator {
815    fn new(path: String, mount_table: Arc<MountTable>, recursive: bool) -> Self {
816        let initial = DirListingIterator::new(path.clone(), &mount_table, false);
817
818        ListStatusIterator {
819            mount_table,
820            recursive,
821            iters: Arc::new(tokio::sync::Mutex::new(vec![initial])),
822        }
823    }
824
825    pub async fn next(&self) -> Option<Result<FileStatus>> {
826        let mut next_file: Option<Result<FileStatus>> = None;
827        let mut iters = self.iters.lock().await;
828        while next_file.is_none() {
829            if let Some(iter) = iters.last_mut() {
830                if let Some(file_result) = iter.next().await {
831                    if let Ok(file) = file_result {
832                        // Return the directory as the next result, but start traversing into that directory
833                        // next if we're doing a recursive listing
834                        if file.isdir && self.recursive {
835                            iters.push(DirListingIterator::new(
836                                file.path.clone(),
837                                &self.mount_table,
838                                false,
839                            ))
840                        }
841                        next_file = Some(Ok(file));
842                    } else {
843                        // Error, return that as the next element
844                        next_file = Some(file_result)
845                    }
846                } else {
847                    // We've exhausted this directory
848                    iters.pop();
849                }
850            } else {
851                // There's nothing left, just return None
852                break;
853            }
854        }
855
856        next_file
857    }
858
859    pub fn into_stream(self) -> BoxStream<'static, Result<FileStatus>> {
860        let listing = stream::unfold(self, |state| async move {
861            let next = state.next().await;
862            next.map(|n| (n, state))
863        });
864        Box::pin(listing)
865    }
866}
867
868#[derive(Debug)]
869pub struct FileStatus {
870    pub path: String,
871    pub length: usize,
872    pub isdir: bool,
873    pub permission: u16,
874    pub owner: String,
875    pub group: String,
876    pub modification_time: u64,
877    pub access_time: u64,
878    pub replication: Option<u32>,
879    pub blocksize: Option<u64>,
880}
881
882impl FileStatus {
883    fn from(value: HdfsFileStatusProto, base_path: &str) -> Self {
884        let mut path = base_path.trim_end_matches("/").to_string();
885        let relative_path = std::str::from_utf8(&value.path).unwrap();
886        if !relative_path.is_empty() {
887            path.push('/');
888            path.push_str(relative_path);
889        }
890
891        FileStatus {
892            isdir: value.file_type() == FileType::IsDir,
893            path,
894            length: value.length as usize,
895            permission: value.permission.perm as u16,
896            owner: value.owner,
897            group: value.group,
898            modification_time: value.modification_time,
899            access_time: value.access_time,
900            replication: value.block_replication,
901            blocksize: value.blocksize,
902        }
903    }
904}
905
906#[derive(Debug)]
907pub struct ContentSummary {
908    pub length: u64,
909    pub file_count: u64,
910    pub directory_count: u64,
911    pub quota: u64,
912    pub space_consumed: u64,
913    pub space_quota: u64,
914}
915
916impl From<ContentSummaryProto> for ContentSummary {
917    fn from(value: ContentSummaryProto) -> Self {
918        ContentSummary {
919            length: value.length,
920            file_count: value.file_count,
921            directory_count: value.directory_count,
922            quota: value.quota,
923            space_consumed: value.space_consumed,
924            space_quota: value.space_quota,
925        }
926    }
927}
928
929#[cfg(test)]
930mod test {
931    use std::sync::{Arc, LazyLock};
932
933    use tokio::runtime::Runtime;
934    use url::Url;
935
936    use crate::{
937        client::ClientBuilder,
938        common::config::Configuration,
939        hdfs::{protocol::NamenodeProtocol, proxy::NameServiceProxy},
940    };
941
942    use super::{MountLink, MountTable};
943
944    static RT: LazyLock<Runtime> = LazyLock::new(|| Runtime::new().unwrap());
945
946    fn create_protocol(url: &str) -> Arc<NamenodeProtocol> {
947        let proxy = NameServiceProxy::new(
948            &Url::parse(url).unwrap(),
949            Arc::new(Configuration::new().unwrap()),
950            RT.handle().clone(),
951        )
952        .unwrap();
953        Arc::new(NamenodeProtocol::new(proxy, RT.handle().clone()))
954    }
955
956    #[test]
957    fn test_default_fs() {
958        assert!(ClientBuilder::new()
959            .with_config(vec![("fs.defaultFS", "hdfs://test:9000")])
960            .build()
961            .is_ok());
962
963        assert!(ClientBuilder::new()
964            .with_config(vec![("fs.defaultFS", "hdfs://")])
965            .build()
966            .is_err());
967
968        assert!(ClientBuilder::new()
969            .with_url("hdfs://")
970            .with_config(vec![("fs.defaultFS", "hdfs://test:9000")])
971            .build()
972            .is_ok());
973
974        assert!(ClientBuilder::new()
975            .with_url("hdfs://")
976            .with_config(vec![("fs.defaultFS", "hdfs://")])
977            .build()
978            .is_err());
979
980        assert!(ClientBuilder::new()
981            .with_url("hdfs://")
982            .with_config(vec![("fs.defaultFS", "viewfs://test")])
983            .build()
984            .is_err());
985    }
986
987    #[test]
988    fn test_mount_link_resolve() {
989        let protocol = create_protocol("hdfs://127.0.0.1:9000");
990        let link = MountLink::new("/view", "/hdfs", protocol);
991
992        assert_eq!(link.resolve("/view/dir/file").unwrap(), "/hdfs/dir/file");
993        assert_eq!(link.resolve("/view").unwrap(), "/hdfs");
994        assert!(link.resolve("/hdfs/path").is_none());
995    }
996
997    #[test]
998    fn test_fallback_link() {
999        let protocol = create_protocol("hdfs://127.0.0.1:9000");
1000        let link = MountLink::new("", "/hdfs", Arc::clone(&protocol));
1001
1002        assert_eq!(link.resolve("/path/to/file").unwrap(), "/hdfs/path/to/file");
1003        assert_eq!(link.resolve("/").unwrap(), "/hdfs/");
1004        assert_eq!(link.resolve("/hdfs/path").unwrap(), "/hdfs/hdfs/path");
1005
1006        let link = MountLink::new("", "", protocol);
1007        assert_eq!(link.resolve("/").unwrap(), "/");
1008    }
1009
1010    #[test]
1011    fn test_mount_table_resolve() {
1012        let link1 = MountLink::new(
1013            "/mount1",
1014            "/path1/nested",
1015            create_protocol("hdfs://127.0.0.1:9000"),
1016        );
1017        let link2 = MountLink::new(
1018            "/mount2",
1019            "/path2",
1020            create_protocol("hdfs://127.0.0.1:9001"),
1021        );
1022        let link3 = MountLink::new(
1023            "/mount3/nested",
1024            "/path3",
1025            create_protocol("hdfs://127.0.0.1:9002"),
1026        );
1027        let fallback = MountLink::new("/", "/path4", create_protocol("hdfs://127.0.0.1:9003"));
1028
1029        let mount_table = MountTable {
1030            mounts: vec![link1, link2, link3],
1031            fallback,
1032        };
1033
1034        // Exact mount path resolves to the exact HDFS path
1035        let (link, resolved) = mount_table.resolve("/mount1");
1036        assert_eq!(link.viewfs_path, "/mount1");
1037        assert_eq!(resolved, "/path1/nested");
1038
1039        // Trailing slash is treated the same
1040        let (link, resolved) = mount_table.resolve("/mount1/");
1041        assert_eq!(link.viewfs_path, "/mount1");
1042        assert_eq!(resolved, "/path1/nested/");
1043
1044        // Doesn't do partial matches on a directory name
1045        let (link, resolved) = mount_table.resolve("/mount12");
1046        assert_eq!(link.viewfs_path, "");
1047        assert_eq!(resolved, "/path4/mount12");
1048
1049        let (link, resolved) = mount_table.resolve("/mount3/file");
1050        assert_eq!(link.viewfs_path, "");
1051        assert_eq!(resolved, "/path4/mount3/file");
1052
1053        let (link, resolved) = mount_table.resolve("/mount3/nested/file");
1054        assert_eq!(link.viewfs_path, "/mount3/nested");
1055        assert_eq!(resolved, "/path3/file");
1056    }
1057
1058    #[test]
1059    fn test_io_runtime() {
1060        assert!(ClientBuilder::new()
1061            .with_url("hdfs://127.0.0.1:9000")
1062            .with_io_runtime(Runtime::new().unwrap())
1063            .build()
1064            .is_ok());
1065
1066        let rt = Runtime::new().unwrap();
1067        assert!(ClientBuilder::new()
1068            .with_url("hdfs://127.0.0.1:9000")
1069            .with_io_runtime(rt.handle().clone())
1070            .build()
1071            .is_ok());
1072    }
1073}