hdfs_native/
client.rs

1use std::collections::{HashMap, VecDeque};
2use std::sync::{Arc, OnceLock};
3
4use futures::stream::BoxStream;
5use futures::{StreamExt, stream};
6use tokio::runtime::{Handle, Runtime};
7use url::Url;
8
9use crate::acl::{AclEntry, AclStatus};
10use crate::common::config::{self, Configuration};
11use crate::ec::resolve_ec_policy;
12use crate::error::{HdfsError, Result};
13use crate::file::{FileReader, FileWriter};
14use crate::hdfs::protocol::NamenodeProtocol;
15use crate::hdfs::proxy::NameServiceProxy;
16use crate::proto::hdfs::hdfs_file_status_proto::FileType;
17
18use crate::glob::{GlobPattern, expand_glob, get_path_components, unescape_component};
19use crate::proto::hdfs::{ContentSummaryProto, HdfsFileStatusProto};
20
21#[derive(Clone)]
22pub struct WriteOptions {
23    /// Block size. Default is retrieved from the server.
24    pub block_size: Option<u64>,
25    /// Replication factor. Default is retrieved from the server.
26    pub replication: Option<u32>,
27    /// Unix file permission, defaults to 0o644, which is "rw-r--r--" as a Unix permission.
28    /// This is the raw octal value represented in base 10.
29    pub permission: u32,
30    /// Whether to overwrite the file, defaults to false. If true and the
31    /// file does not exist, it will result in an error.
32    pub overwrite: bool,
33    /// Whether to create any missing parent directories, defaults to true. If false
34    /// and the parent directory does not exist, an error will be returned.
35    pub create_parent: bool,
36}
37
38impl Default for WriteOptions {
39    fn default() -> Self {
40        Self {
41            block_size: None,
42            replication: None,
43            permission: 0o644,
44            overwrite: false,
45            create_parent: true,
46        }
47    }
48}
49
50impl AsRef<WriteOptions> for WriteOptions {
51    fn as_ref(&self) -> &WriteOptions {
52        self
53    }
54}
55
56impl WriteOptions {
57    /// Set the block_size for the new file
58    pub fn block_size(mut self, block_size: u64) -> Self {
59        self.block_size = Some(block_size);
60        self
61    }
62
63    /// Set the replication for the new file
64    pub fn replication(mut self, replication: u32) -> Self {
65        self.replication = Some(replication);
66        self
67    }
68
69    /// Set the raw octal permission value for the new file
70    pub fn permission(mut self, permission: u32) -> Self {
71        self.permission = permission;
72        self
73    }
74
75    /// Set whether to overwrite an existing file
76    pub fn overwrite(mut self, overwrite: bool) -> Self {
77        self.overwrite = overwrite;
78        self
79    }
80
81    /// Set whether to create all missing parent directories
82    pub fn create_parent(mut self, create_parent: bool) -> Self {
83        self.create_parent = create_parent;
84        self
85    }
86}
87
88#[derive(Debug, Clone)]
89struct MountLink {
90    viewfs_path: String,
91    hdfs_path: String,
92    protocol: Arc<NamenodeProtocol>,
93}
94
95impl MountLink {
96    fn new(viewfs_path: &str, hdfs_path: &str, protocol: Arc<NamenodeProtocol>) -> Self {
97        // We should never have an empty path, we always want things mounted at root ("/") by default.
98        Self {
99            viewfs_path: viewfs_path.trim_end_matches("/").to_string(),
100            hdfs_path: hdfs_path.trim_end_matches("/").to_string(),
101            protocol,
102        }
103    }
104    /// Convert a viewfs path into a name service path if it matches this link
105    fn resolve(&self, path: &str) -> Option<String> {
106        // Make sure we don't partially match the last component. It either needs to be an exact
107        // match to a viewfs path, or needs to match with a trailing slash
108        if path == self.viewfs_path {
109            Some(self.hdfs_path.clone())
110        } else {
111            path.strip_prefix(&format!("{}/", self.viewfs_path))
112                .map(|relative_path| format!("{}/{}", &self.hdfs_path, relative_path))
113        }
114    }
115}
116
117#[derive(Debug)]
118struct MountTable {
119    mounts: Vec<MountLink>,
120    fallback: MountLink,
121}
122
123impl MountTable {
124    fn resolve(&self, src: &str) -> (&MountLink, String) {
125        for link in self.mounts.iter() {
126            if let Some(resolved) = link.resolve(src) {
127                return (link, resolved);
128            }
129        }
130        (&self.fallback, self.fallback.resolve(src).unwrap())
131    }
132}
133
134/// Holds either a [Runtime] or a [Handle] to an existing runtime for IO tasks
135#[derive(Debug)]
136pub enum IORuntime {
137    Runtime(Runtime),
138    Handle(Handle),
139}
140
141impl From<Runtime> for IORuntime {
142    fn from(value: Runtime) -> Self {
143        Self::Runtime(value)
144    }
145}
146
147impl From<Handle> for IORuntime {
148    fn from(value: Handle) -> Self {
149        Self::Handle(value)
150    }
151}
152
153impl IORuntime {
154    fn handle(&self) -> Handle {
155        match self {
156            Self::Runtime(runtime) => runtime.handle().clone(),
157            Self::Handle(handle) => handle.clone(),
158        }
159    }
160}
161
162/// Builds a new [Client] instance. By default, configs will be loaded from the default config directories with the following precedence:
163/// - If the `HADOOP_CONF_DIR` environment variable is defined, configs will be loaded from `${HADOOP_CONF_DIR}/{core,hdfs}-site.xml`
164/// - If the `HADOOP_HOME` environment variable is defined, configs will be loaded from `${HADOOP_HOME}/etc/hadoop/{core,hdfs}-site.xml`
165/// - Otherwise no default configs are defined
166///
167/// If no URL is defined, the `fs.defaultFS` config must be defined and is used as the URL.
168///
169/// # Examples
170///
171/// Create a new client using the fs.defaultFS config
172/// ```rust
173/// # use hdfs_native::ClientBuilder;
174/// let client = ClientBuilder::new()
175///     .with_config(vec![("fs.defaultFS", "hdfs://127.0.0.1:9000")])
176///     .build()
177///     .unwrap();
178/// ```
179///
180/// Create a new client connecting to a specific URL:
181/// ```rust
182/// # use hdfs_native::ClientBuilder;
183/// let client = ClientBuilder::new()
184///     .with_url("hdfs://127.0.0.1:9000")
185///     .build()
186///     .unwrap();
187/// ```
188///
189/// Create a new client using a dedicated tokio runtime for spawned tasks and IO operations
190/// ```rust
191/// # use hdfs_native::ClientBuilder;
192/// let client = ClientBuilder::new()
193///     .with_url("hdfs://127.0.0.1:9000")
194///     .with_io_runtime(tokio::runtime::Runtime::new().unwrap())
195///     .build()
196///     .unwrap();
197/// ```
198#[derive(Default)]
199pub struct ClientBuilder {
200    url: Option<String>,
201    config: HashMap<String, String>,
202    runtime: Option<IORuntime>,
203}
204
205impl ClientBuilder {
206    /// Create a new [ClientBuilder]
207    pub fn new() -> Self {
208        Self::default()
209    }
210
211    /// Set the URL to connect to. Can be the address of a single NameNode, or a logical NameService
212    pub fn with_url(mut self, url: impl Into<String>) -> Self {
213        self.url = Some(url.into());
214        self
215    }
216
217    /// Set configs to use for the client. The provided configs will override any found in the default config files loaded
218    pub fn with_config(
219        mut self,
220        config: impl IntoIterator<Item = (impl Into<String>, impl Into<String>)>,
221    ) -> Self {
222        self.config = config
223            .into_iter()
224            .map(|(k, v)| (k.into(), v.into()))
225            .collect();
226        self
227    }
228
229    /// Use a dedicated tokio runtime for spawned tasks and IO operations. Can either take ownership of a whole [Runtime]
230    /// or take a [Handle] to an externally owned runtime.
231    pub fn with_io_runtime(mut self, runtime: impl Into<IORuntime>) -> Self {
232        self.runtime = Some(runtime.into());
233        self
234    }
235
236    /// Create the [Client] instance from the provided settings
237    pub fn build(self) -> Result<Client> {
238        let config = Configuration::new_with_config(self.config)?;
239        let url = if let Some(url) = self.url {
240            Url::parse(&url)?
241        } else {
242            Client::default_fs(&config)?
243        };
244
245        Client::build(&url, config, self.runtime)
246    }
247}
248
249#[derive(Clone, Debug)]
250enum RuntimeHolder {
251    Custom(Arc<IORuntime>),
252    Default(Arc<OnceLock<Runtime>>),
253}
254
255impl RuntimeHolder {
256    fn new(rt: Option<IORuntime>) -> Self {
257        if let Some(rt) = rt {
258            Self::Custom(Arc::new(rt))
259        } else {
260            Self::Default(Arc::new(OnceLock::new()))
261        }
262    }
263
264    fn get_handle(&self) -> Handle {
265        match self {
266            Self::Custom(rt) => rt.handle().clone(),
267            Self::Default(rt) => match Handle::try_current() {
268                Ok(handle) => handle,
269                Err(_) => rt
270                    .get_or_init(|| Runtime::new().expect("Failed to create tokio runtime"))
271                    .handle()
272                    .clone(),
273            },
274        }
275    }
276}
277
278/// A client to a speicific NameNode, NameService, or Viewfs mount table
279#[derive(Clone, Debug)]
280pub struct Client {
281    mount_table: Arc<MountTable>,
282    config: Arc<Configuration>,
283    // Store the runtime used for spawning all internal tasks. If we are not created
284    // inside a tokio runtime, we will create our own to use.
285    rt_holder: RuntimeHolder,
286}
287
288impl Client {
289    /// Creates a new HDFS Client. The URL must include the protocol and host, and optionally a port.
290    /// If a port is included, the host is treated as a single NameNode. If no port is included, the
291    /// host is treated as a name service that will be resolved using the HDFS config.
292    #[deprecated(since = "0.12.0", note = "Use ClientBuilder instead")]
293    pub fn new(url: &str) -> Result<Self> {
294        let parsed_url = Url::parse(url)?;
295        Self::build(&parsed_url, Configuration::new()?, None)
296    }
297
298    #[deprecated(since = "0.12.0", note = "Use ClientBuilder instead")]
299    pub fn new_with_config(url: &str, config: HashMap<String, String>) -> Result<Self> {
300        let parsed_url = Url::parse(url)?;
301        Self::build(&parsed_url, Configuration::new_with_config(config)?, None)
302    }
303
304    #[deprecated(since = "0.12.0", note = "Use ClientBuilder instead")]
305    pub fn default_with_config(config: HashMap<String, String>) -> Result<Self> {
306        let config = Configuration::new_with_config(config)?;
307        Self::build(&Self::default_fs(&config)?, config, None)
308    }
309
310    fn default_fs(config: &Configuration) -> Result<Url> {
311        let url = config
312            .get(config::DEFAULT_FS)
313            .ok_or(HdfsError::InvalidArgument(format!(
314                "No {} setting found",
315                config::DEFAULT_FS
316            )))?;
317        Ok(Url::parse(url)?)
318    }
319
320    fn build(url: &Url, config: Configuration, rt: Option<IORuntime>) -> Result<Self> {
321        let resolved_url = if !url.has_host() {
322            let default_url = Self::default_fs(&config)?;
323            if url.scheme() != default_url.scheme() || !default_url.has_host() {
324                return Err(HdfsError::InvalidArgument(
325                    "URL must contain a host".to_string(),
326                ));
327            }
328            default_url
329        } else {
330            url.clone()
331        };
332
333        let config = Arc::new(config);
334
335        let rt_holder = RuntimeHolder::new(rt);
336
337        let mount_table = match url.scheme() {
338            "hdfs" => {
339                let proxy = NameServiceProxy::new(
340                    &resolved_url,
341                    Arc::clone(&config),
342                    rt_holder.get_handle(),
343                )?;
344                let protocol = Arc::new(NamenodeProtocol::new(proxy, rt_holder.get_handle()));
345
346                MountTable {
347                    mounts: Vec::new(),
348                    fallback: MountLink::new("/", "/", protocol),
349                }
350            }
351            "viewfs" => Self::build_mount_table(
352                // Host is guaranteed to be present.
353                resolved_url.host_str().expect("URL must have a host"),
354                Arc::clone(&config),
355                rt_holder.get_handle(),
356            )?,
357            _ => {
358                return Err(HdfsError::InvalidArgument(
359                    "Only `hdfs` and `viewfs` schemes are supported".to_string(),
360                ));
361            }
362        };
363
364        Ok(Self {
365            mount_table: Arc::new(mount_table),
366            config,
367            rt_holder,
368        })
369    }
370
371    fn build_mount_table(
372        host: &str,
373        config: Arc<Configuration>,
374        handle: Handle,
375    ) -> Result<MountTable> {
376        let mut mounts: Vec<MountLink> = Vec::new();
377        let mut fallback: Option<MountLink> = None;
378
379        for (viewfs_path, hdfs_url) in config.get_mount_table(host).iter() {
380            let url = Url::parse(hdfs_url)?;
381            if !url.has_host() {
382                return Err(HdfsError::InvalidArgument(
383                    "URL must contain a host".to_string(),
384                ));
385            }
386            if url.scheme() != "hdfs" {
387                return Err(HdfsError::InvalidArgument(
388                    "Only hdfs mounts are supported for viewfs".to_string(),
389                ));
390            }
391            let proxy = NameServiceProxy::new(&url, Arc::clone(&config), handle.clone())?;
392            let protocol = Arc::new(NamenodeProtocol::new(proxy, handle.clone()));
393
394            if let Some(prefix) = viewfs_path {
395                mounts.push(MountLink::new(prefix, url.path(), protocol));
396            } else {
397                if fallback.is_some() {
398                    return Err(HdfsError::InvalidArgument(
399                        "Multiple viewfs fallback links found".to_string(),
400                    ));
401                }
402                fallback = Some(MountLink::new("/", url.path(), protocol));
403            }
404        }
405
406        if let Some(fallback) = fallback {
407            // Sort the mount table from longest viewfs path to shortest. This makes sure more specific paths are considered first.
408            mounts.sort_by_key(|m| m.viewfs_path.chars().filter(|c| *c == '/').count());
409            mounts.reverse();
410
411            Ok(MountTable { mounts, fallback })
412        } else {
413            Err(HdfsError::InvalidArgument(
414                "No viewfs fallback mount found".to_string(),
415            ))
416        }
417    }
418
419    /// Retrieve the file status for the file at `path`.
420    pub async fn get_file_info(&self, path: &str) -> Result<FileStatus> {
421        let (link, resolved_path) = self.mount_table.resolve(path);
422        match link.protocol.get_file_info(&resolved_path).await?.fs {
423            Some(status) => Ok(FileStatus::from(status, path)),
424            None => Err(HdfsError::FileNotFound(path.to_string())),
425        }
426    }
427
428    /// Retrives a list of all files in directories located at `path`. Wrapper around `list_status_iter` that
429    /// returns Err if any part of the stream fails, or Ok if all file statuses were found successfully.
430    pub async fn list_status(&self, path: &str, recursive: bool) -> Result<Vec<FileStatus>> {
431        let iter = self.list_status_iter(path, recursive);
432        let statuses = iter
433            .into_stream()
434            .collect::<Vec<Result<FileStatus>>>()
435            .await;
436
437        let mut resolved_statues = Vec::<FileStatus>::with_capacity(statuses.len());
438        for status in statuses.into_iter() {
439            resolved_statues.push(status?);
440        }
441
442        Ok(resolved_statues)
443    }
444
445    /// Retrives an iterator of all files in directories located at `path`.
446    pub fn list_status_iter(&self, path: &str, recursive: bool) -> ListStatusIterator {
447        ListStatusIterator::new(path.to_string(), Arc::clone(&self.mount_table), recursive)
448    }
449
450    /// Opens a file reader for the file at `path`. Path should not include a scheme.
451    pub async fn read(&self, path: &str) -> Result<FileReader> {
452        let (link, resolved_path) = self.mount_table.resolve(path);
453        // Get all block locations. Length is actually a signed value, but the proto uses an unsigned value
454        let located_info = link
455            .protocol
456            .get_block_locations(&resolved_path, 0, i64::MAX as u64)
457            .await?;
458
459        if let Some(locations) = located_info.locations {
460            let ec_schema = if let Some(ec_policy) = locations.ec_policy.as_ref() {
461                Some(resolve_ec_policy(ec_policy)?)
462            } else {
463                None
464            };
465
466            if locations.file_encryption_info.is_some() {
467                return Err(HdfsError::UnsupportedFeature("File encryption".to_string()));
468            }
469
470            Ok(FileReader::new(
471                Arc::clone(&link.protocol),
472                locations,
473                ec_schema,
474                Arc::clone(&self.config),
475                self.rt_holder.get_handle(),
476            ))
477        } else {
478            Err(HdfsError::FileNotFound(path.to_string()))
479        }
480    }
481
482    /// Opens a new file for writing. See [WriteOptions] for options and behavior for different
483    /// scenarios.
484    pub async fn create(
485        &self,
486        src: &str,
487        write_options: impl AsRef<WriteOptions>,
488    ) -> Result<FileWriter> {
489        let write_options = write_options.as_ref();
490
491        let (link, resolved_path) = self.mount_table.resolve(src);
492
493        let create_response = link
494            .protocol
495            .create(
496                &resolved_path,
497                write_options.permission,
498                write_options.overwrite,
499                write_options.create_parent,
500                write_options.replication,
501                write_options.block_size,
502            )
503            .await?;
504
505        match create_response.fs {
506            Some(status) => {
507                if status.file_encryption_info.is_some() {
508                    let _ = self.delete(src, false).await;
509                    return Err(HdfsError::UnsupportedFeature("File encryption".to_string()));
510                }
511
512                Ok(FileWriter::new(
513                    Arc::clone(&link.protocol),
514                    resolved_path,
515                    status,
516                    Arc::clone(&self.config),
517                    self.rt_holder.get_handle(),
518                    None,
519                ))
520            }
521            None => Err(HdfsError::FileNotFound(src.to_string())),
522        }
523    }
524
525    fn needs_new_block(class: &str, msg: &str) -> bool {
526        class == "java.lang.UnsupportedOperationException" && msg.contains("NEW_BLOCK")
527    }
528
529    /// Opens an existing file for appending. An Err will be returned if the file does not exist. If the
530    /// file is replicated, the current block will be appended to until it is full. If the file is erasure
531    /// coded, a new block will be created.
532    pub async fn append(&self, src: &str) -> Result<FileWriter> {
533        let (link, resolved_path) = self.mount_table.resolve(src);
534
535        // Assume the file is replicated and try to append to the current block. If the file is
536        // erasure coded, then try again by appending to a new block.
537        let append_response = match link.protocol.append(&resolved_path, false).await {
538            Err(HdfsError::RPCError(class, msg)) if Self::needs_new_block(&class, &msg) => {
539                link.protocol.append(&resolved_path, true).await?
540            }
541            resp => resp?,
542        };
543
544        match append_response.stat {
545            Some(status) => {
546                if status.file_encryption_info.is_some() {
547                    let _ = link
548                        .protocol
549                        .complete(src, append_response.block.map(|b| b.b), status.file_id)
550                        .await;
551                    return Err(HdfsError::UnsupportedFeature("File encryption".to_string()));
552                }
553
554                Ok(FileWriter::new(
555                    Arc::clone(&link.protocol),
556                    resolved_path,
557                    status,
558                    Arc::clone(&self.config),
559                    self.rt_holder.get_handle(),
560                    append_response.block,
561                ))
562            }
563            None => Err(HdfsError::FileNotFound(src.to_string())),
564        }
565    }
566
567    /// Create a new directory at `path` with the given `permission`.
568    ///
569    /// `permission` is the raw octal value representing the Unix style permission. For example, to
570    /// set 755 (`rwxr-x-rx`) permissions, use 0o755.
571    ///
572    /// If `create_parent` is true, any missing parent directories will be created as well,
573    /// otherwise an error will be returned if the parent directory doesn't already exist.
574    pub async fn mkdirs(&self, path: &str, permission: u32, create_parent: bool) -> Result<()> {
575        let (link, resolved_path) = self.mount_table.resolve(path);
576        link.protocol
577            .mkdirs(&resolved_path, permission, create_parent)
578            .await
579            .map(|_| ())
580    }
581
582    /// Renames `src` to `dst`. Returns Ok(()) on success, and Err otherwise.
583    pub async fn rename(&self, src: &str, dst: &str, overwrite: bool) -> Result<()> {
584        let (src_link, src_resolved_path) = self.mount_table.resolve(src);
585        let (dst_link, dst_resolved_path) = self.mount_table.resolve(dst);
586        if src_link.viewfs_path == dst_link.viewfs_path {
587            src_link
588                .protocol
589                .rename(&src_resolved_path, &dst_resolved_path, overwrite)
590                .await
591                .map(|_| ())
592        } else {
593            Err(HdfsError::InvalidArgument(
594                "Cannot rename across different name services".to_string(),
595            ))
596        }
597    }
598
599    /// Deletes the file or directory at `path`. If `recursive` is false and `path` is a non-empty
600    /// directory, this will fail. Returns `Ok(true)` if it was successfully deleted.
601    pub async fn delete(&self, path: &str, recursive: bool) -> Result<bool> {
602        let (link, resolved_path) = self.mount_table.resolve(path);
603        link.protocol
604            .delete(&resolved_path, recursive)
605            .await
606            .map(|r| r.result)
607    }
608
609    /// Sets the modified and access times for a file. Times should be in milliseconds from the epoch.
610    pub async fn set_times(&self, path: &str, mtime: u64, atime: u64) -> Result<()> {
611        let (link, resolved_path) = self.mount_table.resolve(path);
612        link.protocol
613            .set_times(&resolved_path, mtime, atime)
614            .await?;
615        Ok(())
616    }
617
618    /// Optionally sets the owner and group for a file.
619    pub async fn set_owner(
620        &self,
621        path: &str,
622        owner: Option<&str>,
623        group: Option<&str>,
624    ) -> Result<()> {
625        let (link, resolved_path) = self.mount_table.resolve(path);
626        link.protocol
627            .set_owner(&resolved_path, owner, group)
628            .await?;
629        Ok(())
630    }
631
632    /// Sets permissions for a file. Permission should be an octal number reprenting the Unix style
633    /// permission.
634    ///
635    /// For example, to set permissions to rwxr-xr-x:
636    /// ```rust
637    /// # async fn func() {
638    /// # let client = hdfs_native::Client::new("localhost:9000").unwrap();
639    /// client.set_permission("/path", 0o755).await.unwrap();
640    /// }
641    /// ```
642    pub async fn set_permission(&self, path: &str, permission: u32) -> Result<()> {
643        let (link, resolved_path) = self.mount_table.resolve(path);
644        link.protocol
645            .set_permission(&resolved_path, permission)
646            .await?;
647        Ok(())
648    }
649
650    /// Sets the replication for a file.
651    pub async fn set_replication(&self, path: &str, replication: u32) -> Result<bool> {
652        let (link, resolved_path) = self.mount_table.resolve(path);
653        let result = link
654            .protocol
655            .set_replication(&resolved_path, replication)
656            .await?
657            .result;
658
659        Ok(result)
660    }
661
662    /// Gets a content summary for a file or directory rooted at `path`.
663    pub async fn get_content_summary(&self, path: &str) -> Result<ContentSummary> {
664        let (link, resolved_path) = self.mount_table.resolve(path);
665        let result = link
666            .protocol
667            .get_content_summary(&resolved_path)
668            .await?
669            .summary;
670
671        Ok(result.into())
672    }
673
674    /// Update ACL entries for file or directory at `path`. Existing entries will remain.
675    pub async fn modify_acl_entries(&self, path: &str, acl_spec: Vec<AclEntry>) -> Result<()> {
676        let (link, resolved_path) = self.mount_table.resolve(path);
677        link.protocol
678            .modify_acl_entries(&resolved_path, acl_spec)
679            .await?;
680
681        Ok(())
682    }
683
684    /// Remove specific ACL entries for file or directory at `path`.
685    pub async fn remove_acl_entries(&self, path: &str, acl_spec: Vec<AclEntry>) -> Result<()> {
686        let (link, resolved_path) = self.mount_table.resolve(path);
687        link.protocol
688            .remove_acl_entries(&resolved_path, acl_spec)
689            .await?;
690
691        Ok(())
692    }
693
694    /// Remove all default ACLs for file or directory at `path`.
695    pub async fn remove_default_acl(&self, path: &str) -> Result<()> {
696        let (link, resolved_path) = self.mount_table.resolve(path);
697        link.protocol.remove_default_acl(&resolved_path).await?;
698
699        Ok(())
700    }
701
702    /// Remove all ACL entries for file or directory at `path`.
703    pub async fn remove_acl(&self, path: &str) -> Result<()> {
704        let (link, resolved_path) = self.mount_table.resolve(path);
705        link.protocol.remove_acl(&resolved_path).await?;
706
707        Ok(())
708    }
709
710    /// Override all ACL entries for file or directory at `path`. If only access ACLs are provided,
711    /// default ACLs are maintained. Likewise if only default ACLs are provided, access ACLs are
712    /// maintained.
713    pub async fn set_acl(&self, path: &str, acl_spec: Vec<AclEntry>) -> Result<()> {
714        let (link, resolved_path) = self.mount_table.resolve(path);
715        link.protocol.set_acl(&resolved_path, acl_spec).await?;
716
717        Ok(())
718    }
719
720    /// Get the ACL status for the file or directory at `path`.
721    pub async fn get_acl_status(&self, path: &str) -> Result<AclStatus> {
722        let (link, resolved_path) = self.mount_table.resolve(path);
723        Ok(link
724            .protocol
725            .get_acl_status(&resolved_path)
726            .await?
727            .result
728            .into())
729    }
730
731    /// Get all file statuses matching the glob `pattern`. Supports Hadoop-style globbing
732    /// which only applies to individual components of a path.
733    pub async fn glob_status(&self, pattern: &str) -> Result<Vec<FileStatus>> {
734        // Expand any brace groups first
735        let flattened = expand_glob(pattern.to_string())?;
736
737        let mut results: Vec<FileStatus> = Vec::new();
738
739        for flat in flattened.into_iter() {
740            // Make the pattern absolute-ish. We keep the pattern as-is; components
741            // will be split on '/'. An empty pattern yields no results.
742            if flat.is_empty() {
743                continue;
744            }
745
746            let components = get_path_components(&flat);
747
748            // Candidate holds a path (fully built so far) and optionally a resolved FileStatus
749            #[derive(Clone, Debug)]
750            struct Candidate {
751                path: String,
752                status: Option<FileStatus>,
753            }
754
755            // Start from the root placeholder
756            let mut candidates: Vec<Candidate> = vec![Candidate {
757                path: "/".to_string(),
758                status: None,
759            }];
760
761            for (idx, comp) in components.iter().enumerate() {
762                if candidates.is_empty() {
763                    break;
764                }
765
766                let is_last = idx == components.len() - 1;
767
768                let unescaped = unescape_component(comp);
769                let glob_pat = GlobPattern::new(comp)?;
770
771                if !is_last && !glob_pat.has_wildcard() {
772                    // Optimization: just append the literal component to each candidate
773                    for cand in candidates.iter_mut() {
774                        if !cand.path.ends_with('/') {
775                            cand.path.push('/');
776                        }
777                        cand.path.push_str(&unescaped);
778                        // keep status as None (we'll resolve later if needed)
779                    }
780                    continue;
781                }
782
783                let mut new_candidates: Vec<Candidate> = Vec::new();
784
785                for cand in candidates.into_iter() {
786                    if glob_pat.has_wildcard() {
787                        // List the directory represented by cand.path
788                        let listing = self.list_status(&cand.path, false).await?;
789                        if listing.len() == 1 && listing[0].path == cand.path {
790                            // listing corresponds to the candidate itself (file), skip
791                            continue;
792                        }
793
794                        for child in listing.into_iter() {
795                            // If this is not the terminal component, only recurse into directories
796                            if !is_last && !child.isdir {
797                                continue;
798                            }
799
800                            // child.path already contains the full path
801                            // Extract the name portion to match against the glob pattern
802                            let name = child
803                                .path
804                                .rsplit_once('/')
805                                .map(|(_, n)| n)
806                                .unwrap_or(child.path.as_str());
807
808                            if glob_pat.matches(name) {
809                                new_candidates.push(Candidate {
810                                    path: child.path.clone(),
811                                    status: Some(child),
812                                });
813                            }
814                        }
815                    } else {
816                        // Non-glob component: use get_file_info for exact path
817                        let mut next_path = cand.path.clone();
818                        if !next_path.ends_with('/') {
819                            next_path.push('/');
820                        }
821                        next_path.push_str(&unescaped);
822
823                        if let Ok(status) = self.get_file_info(&next_path).await {
824                            new_candidates.push(Candidate {
825                                path: status.path.clone(),
826                                status: Some(status),
827                            });
828                        }
829                    }
830                }
831
832                candidates = new_candidates;
833            }
834
835            // Resolve any placeholder candidates (including root) and collect results
836            for cand in candidates.into_iter() {
837                let status = if let Some(s) = cand.status {
838                    s
839                } else {
840                    // Try to resolve the path to a real FileStatus
841                    match self.get_file_info(&cand.path).await {
842                        Ok(s) => s,
843                        Err(HdfsError::FileNotFound(_)) => continue,
844                        Err(e) => return Err(e),
845                    }
846                };
847
848                results.push(status);
849            }
850        }
851
852        Ok(results)
853    }
854}
855
856impl Default for Client {
857    /// Creates a new HDFS Client based on the fs.defaultFS setting. Panics if the config files fail to load,
858    /// no defaultFS is defined, or the defaultFS is invalid.
859    fn default() -> Self {
860        ClientBuilder::new()
861            .build()
862            .expect("Failed to create default client")
863    }
864}
865
866pub(crate) struct DirListingIterator {
867    path: String,
868    resolved_path: String,
869    link: MountLink,
870    files_only: bool,
871    partial_listing: VecDeque<HdfsFileStatusProto>,
872    remaining: u32,
873    last_seen: Vec<u8>,
874}
875
876impl DirListingIterator {
877    fn new(path: String, mount_table: &Arc<MountTable>, files_only: bool) -> Self {
878        let (link, resolved_path) = mount_table.resolve(&path);
879
880        DirListingIterator {
881            path,
882            resolved_path,
883            link: link.clone(),
884            files_only,
885            partial_listing: VecDeque::new(),
886            remaining: 1,
887            last_seen: Vec::new(),
888        }
889    }
890
891    async fn get_next_batch(&mut self) -> Result<bool> {
892        let listing = self
893            .link
894            .protocol
895            .get_listing(&self.resolved_path, self.last_seen.clone(), false)
896            .await?;
897
898        if let Some(dir_list) = listing.dir_list {
899            self.last_seen = dir_list
900                .partial_listing
901                .last()
902                .map(|p| p.path.clone())
903                .unwrap_or(Vec::new());
904
905            self.remaining = dir_list.remaining_entries;
906
907            self.partial_listing = dir_list
908                .partial_listing
909                .into_iter()
910                .filter(|s| !self.files_only || s.file_type() != FileType::IsDir)
911                .collect();
912            Ok(!self.partial_listing.is_empty())
913        } else {
914            Err(HdfsError::FileNotFound(self.path.clone()))
915        }
916    }
917
918    pub async fn next(&mut self) -> Option<Result<FileStatus>> {
919        if self.partial_listing.is_empty()
920            && self.remaining > 0
921            && let Err(error) = self.get_next_batch().await
922        {
923            self.remaining = 0;
924            return Some(Err(error));
925        }
926        if let Some(next) = self.partial_listing.pop_front() {
927            Some(Ok(FileStatus::from(next, &self.path)))
928        } else {
929            None
930        }
931    }
932}
933
934pub struct ListStatusIterator {
935    mount_table: Arc<MountTable>,
936    recursive: bool,
937    iters: Arc<tokio::sync::Mutex<Vec<DirListingIterator>>>,
938}
939
940impl ListStatusIterator {
941    fn new(path: String, mount_table: Arc<MountTable>, recursive: bool) -> Self {
942        let initial = DirListingIterator::new(path.clone(), &mount_table, false);
943
944        ListStatusIterator {
945            mount_table,
946            recursive,
947            iters: Arc::new(tokio::sync::Mutex::new(vec![initial])),
948        }
949    }
950
951    pub async fn next(&self) -> Option<Result<FileStatus>> {
952        let mut next_file: Option<Result<FileStatus>> = None;
953        let mut iters = self.iters.lock().await;
954        while next_file.is_none() {
955            if let Some(iter) = iters.last_mut() {
956                if let Some(file_result) = iter.next().await {
957                    if let Ok(file) = file_result {
958                        // Return the directory as the next result, but start traversing into that directory
959                        // next if we're doing a recursive listing
960                        if file.isdir && self.recursive {
961                            iters.push(DirListingIterator::new(
962                                file.path.clone(),
963                                &self.mount_table,
964                                false,
965                            ))
966                        }
967                        next_file = Some(Ok(file));
968                    } else {
969                        // Error, return that as the next element
970                        next_file = Some(file_result)
971                    }
972                } else {
973                    // We've exhausted this directory
974                    iters.pop();
975                }
976            } else {
977                // There's nothing left, just return None
978                break;
979            }
980        }
981
982        next_file
983    }
984
985    pub fn into_stream(self) -> BoxStream<'static, Result<FileStatus>> {
986        let listing = stream::unfold(self, |state| async move {
987            let next = state.next().await;
988            next.map(|n| (n, state))
989        });
990        Box::pin(listing)
991    }
992}
993
994#[derive(Debug, Clone)]
995pub struct FileStatus {
996    pub path: String,
997    pub length: usize,
998    pub isdir: bool,
999    pub permission: u16,
1000    pub owner: String,
1001    pub group: String,
1002    pub modification_time: u64,
1003    pub access_time: u64,
1004    pub replication: Option<u32>,
1005    pub blocksize: Option<u64>,
1006}
1007
1008impl FileStatus {
1009    fn from(value: HdfsFileStatusProto, base_path: &str) -> Self {
1010        let mut path = base_path.trim_end_matches("/").to_string();
1011        let relative_path = std::str::from_utf8(&value.path).unwrap();
1012        if !relative_path.is_empty() {
1013            path.push('/');
1014            path.push_str(relative_path);
1015        }
1016
1017        // Root path should be a slash
1018        if path.is_empty() {
1019            path.push('/');
1020        }
1021
1022        FileStatus {
1023            isdir: value.file_type() == FileType::IsDir,
1024            path,
1025            length: value.length as usize,
1026            permission: value.permission.perm as u16,
1027            owner: value.owner,
1028            group: value.group,
1029            modification_time: value.modification_time,
1030            access_time: value.access_time,
1031            replication: value.block_replication,
1032            blocksize: value.blocksize,
1033        }
1034    }
1035}
1036
1037#[derive(Debug)]
1038pub struct ContentSummary {
1039    pub length: u64,
1040    pub file_count: u64,
1041    pub directory_count: u64,
1042    pub quota: u64,
1043    pub space_consumed: u64,
1044    pub space_quota: u64,
1045}
1046
1047impl From<ContentSummaryProto> for ContentSummary {
1048    fn from(value: ContentSummaryProto) -> Self {
1049        ContentSummary {
1050            length: value.length,
1051            file_count: value.file_count,
1052            directory_count: value.directory_count,
1053            quota: value.quota,
1054            space_consumed: value.space_consumed,
1055            space_quota: value.space_quota,
1056        }
1057    }
1058}
1059
1060#[cfg(test)]
1061mod test {
1062    use std::sync::{Arc, LazyLock};
1063
1064    use tokio::runtime::Runtime;
1065    use url::Url;
1066
1067    use crate::{
1068        client::ClientBuilder,
1069        common::config::Configuration,
1070        hdfs::{protocol::NamenodeProtocol, proxy::NameServiceProxy},
1071    };
1072
1073    use super::{MountLink, MountTable};
1074
1075    static RT: LazyLock<Runtime> = LazyLock::new(|| Runtime::new().unwrap());
1076
1077    fn create_protocol(url: &str) -> Arc<NamenodeProtocol> {
1078        let proxy = NameServiceProxy::new(
1079            &Url::parse(url).unwrap(),
1080            Arc::new(Configuration::new().unwrap()),
1081            RT.handle().clone(),
1082        )
1083        .unwrap();
1084        Arc::new(NamenodeProtocol::new(proxy, RT.handle().clone()))
1085    }
1086
1087    #[test]
1088    fn test_default_fs() {
1089        assert!(
1090            ClientBuilder::new()
1091                .with_config(vec![("fs.defaultFS", "hdfs://test:9000")])
1092                .build()
1093                .is_ok()
1094        );
1095
1096        assert!(
1097            ClientBuilder::new()
1098                .with_config(vec![("fs.defaultFS", "hdfs://")])
1099                .build()
1100                .is_err()
1101        );
1102
1103        assert!(
1104            ClientBuilder::new()
1105                .with_url("hdfs://")
1106                .with_config(vec![("fs.defaultFS", "hdfs://test:9000")])
1107                .build()
1108                .is_ok()
1109        );
1110
1111        assert!(
1112            ClientBuilder::new()
1113                .with_url("hdfs://")
1114                .with_config(vec![("fs.defaultFS", "hdfs://")])
1115                .build()
1116                .is_err()
1117        );
1118
1119        assert!(
1120            ClientBuilder::new()
1121                .with_url("hdfs://")
1122                .with_config(vec![("fs.defaultFS", "viewfs://test")])
1123                .build()
1124                .is_err()
1125        );
1126    }
1127
1128    #[test]
1129    fn test_mount_link_resolve() {
1130        let protocol = create_protocol("hdfs://127.0.0.1:9000");
1131        let link = MountLink::new("/view", "/hdfs", protocol);
1132
1133        assert_eq!(link.resolve("/view/dir/file").unwrap(), "/hdfs/dir/file");
1134        assert_eq!(link.resolve("/view").unwrap(), "/hdfs");
1135        assert!(link.resolve("/hdfs/path").is_none());
1136    }
1137
1138    #[test]
1139    fn test_fallback_link() {
1140        let protocol = create_protocol("hdfs://127.0.0.1:9000");
1141        let link = MountLink::new("", "/hdfs", Arc::clone(&protocol));
1142
1143        assert_eq!(link.resolve("/path/to/file").unwrap(), "/hdfs/path/to/file");
1144        assert_eq!(link.resolve("/").unwrap(), "/hdfs/");
1145        assert_eq!(link.resolve("/hdfs/path").unwrap(), "/hdfs/hdfs/path");
1146
1147        let link = MountLink::new("", "", protocol);
1148        assert_eq!(link.resolve("/").unwrap(), "/");
1149    }
1150
1151    #[test]
1152    fn test_mount_table_resolve() {
1153        let link1 = MountLink::new(
1154            "/mount1",
1155            "/path1/nested",
1156            create_protocol("hdfs://127.0.0.1:9000"),
1157        );
1158        let link2 = MountLink::new(
1159            "/mount2",
1160            "/path2",
1161            create_protocol("hdfs://127.0.0.1:9001"),
1162        );
1163        let link3 = MountLink::new(
1164            "/mount3/nested",
1165            "/path3",
1166            create_protocol("hdfs://127.0.0.1:9002"),
1167        );
1168        let fallback = MountLink::new("/", "/path4", create_protocol("hdfs://127.0.0.1:9003"));
1169
1170        let mount_table = MountTable {
1171            mounts: vec![link1, link2, link3],
1172            fallback,
1173        };
1174
1175        // Exact mount path resolves to the exact HDFS path
1176        let (link, resolved) = mount_table.resolve("/mount1");
1177        assert_eq!(link.viewfs_path, "/mount1");
1178        assert_eq!(resolved, "/path1/nested");
1179
1180        // Trailing slash is treated the same
1181        let (link, resolved) = mount_table.resolve("/mount1/");
1182        assert_eq!(link.viewfs_path, "/mount1");
1183        assert_eq!(resolved, "/path1/nested/");
1184
1185        // Doesn't do partial matches on a directory name
1186        let (link, resolved) = mount_table.resolve("/mount12");
1187        assert_eq!(link.viewfs_path, "");
1188        assert_eq!(resolved, "/path4/mount12");
1189
1190        let (link, resolved) = mount_table.resolve("/mount3/file");
1191        assert_eq!(link.viewfs_path, "");
1192        assert_eq!(resolved, "/path4/mount3/file");
1193
1194        let (link, resolved) = mount_table.resolve("/mount3/nested/file");
1195        assert_eq!(link.viewfs_path, "/mount3/nested");
1196        assert_eq!(resolved, "/path3/file");
1197    }
1198
1199    #[test]
1200    fn test_io_runtime() {
1201        assert!(
1202            ClientBuilder::new()
1203                .with_url("hdfs://127.0.0.1:9000")
1204                .with_io_runtime(Runtime::new().unwrap())
1205                .build()
1206                .is_ok()
1207        );
1208
1209        let rt = Runtime::new().unwrap();
1210        assert!(
1211            ClientBuilder::new()
1212                .with_url("hdfs://127.0.0.1:9000")
1213                .with_io_runtime(rt.handle().clone())
1214                .build()
1215                .is_ok()
1216        );
1217    }
1218}