hdfs_native/
client.rs

1use std::collections::{HashMap, VecDeque};
2use std::sync::{Arc, OnceLock};
3
4use futures::stream::BoxStream;
5use futures::{StreamExt, stream};
6use tokio::runtime::{Handle, Runtime};
7use url::Url;
8
9use crate::acl::{AclEntry, AclStatus};
10use crate::common::config::{self, Configuration};
11use crate::ec::resolve_ec_policy;
12use crate::error::{HdfsError, Result};
13use crate::file::{FileReader, FileWriter};
14use crate::hdfs::protocol::NamenodeProtocol;
15use crate::hdfs::proxy::NameServiceProxy;
16use crate::proto::hdfs::hdfs_file_status_proto::FileType;
17
18use crate::glob::{GlobPattern, expand_glob, get_path_components, unescape_component};
19use crate::proto::hdfs::{ContentSummaryProto, HdfsFileStatusProto};
20
21#[derive(Clone)]
22pub struct WriteOptions {
23    /// Block size. Default is retrieved from the server.
24    pub block_size: Option<u64>,
25    /// Replication factor. Default is retrieved from the server.
26    pub replication: Option<u32>,
27    /// Unix file permission, defaults to 0o644, which is "rw-r--r--" as a Unix permission.
28    /// This is the raw octal value represented in base 10.
29    pub permission: u32,
30    /// Whether to overwrite the file, defaults to false. If true and the
31    /// file does not exist, it will result in an error.
32    pub overwrite: bool,
33    /// Whether to create any missing parent directories, defaults to true. If false
34    /// and the parent directory does not exist, an error will be returned.
35    pub create_parent: bool,
36}
37
38impl Default for WriteOptions {
39    fn default() -> Self {
40        Self {
41            block_size: None,
42            replication: None,
43            permission: 0o644,
44            overwrite: false,
45            create_parent: true,
46        }
47    }
48}
49
50impl AsRef<WriteOptions> for WriteOptions {
51    fn as_ref(&self) -> &WriteOptions {
52        self
53    }
54}
55
56impl WriteOptions {
57    /// Set the block_size for the new file
58    pub fn block_size(mut self, block_size: u64) -> Self {
59        self.block_size = Some(block_size);
60        self
61    }
62
63    /// Set the replication for the new file
64    pub fn replication(mut self, replication: u32) -> Self {
65        self.replication = Some(replication);
66        self
67    }
68
69    /// Set the raw octal permission value for the new file
70    pub fn permission(mut self, permission: u32) -> Self {
71        self.permission = permission;
72        self
73    }
74
75    /// Set whether to overwrite an existing file
76    pub fn overwrite(mut self, overwrite: bool) -> Self {
77        self.overwrite = overwrite;
78        self
79    }
80
81    /// Set whether to create all missing parent directories
82    pub fn create_parent(mut self, create_parent: bool) -> Self {
83        self.create_parent = create_parent;
84        self
85    }
86}
87
88#[derive(Debug, Clone)]
89struct MountLink {
90    viewfs_path: String,
91    hdfs_path: String,
92    protocol: Arc<NamenodeProtocol>,
93}
94
95impl MountLink {
96    fn new(viewfs_path: &str, hdfs_path: &str, protocol: Arc<NamenodeProtocol>) -> Self {
97        // We should never have an empty path, we always want things mounted at root ("/") by default.
98        Self {
99            viewfs_path: viewfs_path.trim_end_matches("/").to_string(),
100            hdfs_path: hdfs_path.trim_end_matches("/").to_string(),
101            protocol,
102        }
103    }
104    /// Convert a viewfs path into a name service path if it matches this link
105    fn resolve(&self, path: &str) -> Option<String> {
106        // Make sure we don't partially match the last component. It either needs to be an exact
107        // match to a viewfs path, or needs to match with a trailing slash
108        if path == self.viewfs_path {
109            Some(self.hdfs_path.clone())
110        } else {
111            path.strip_prefix(&format!("{}/", self.viewfs_path))
112                .map(|relative_path| format!("{}/{}", &self.hdfs_path, relative_path))
113        }
114    }
115}
116
117#[derive(Debug)]
118struct MountTable {
119    mounts: Vec<MountLink>,
120    fallback: MountLink,
121}
122
123impl MountTable {
124    fn resolve(&self, src: &str) -> (&MountLink, String) {
125        for link in self.mounts.iter() {
126            if let Some(resolved) = link.resolve(src) {
127                return (link, resolved);
128            }
129        }
130        (&self.fallback, self.fallback.resolve(src).unwrap())
131    }
132}
133
134/// Holds either a [Runtime] or a [Handle] to an existing runtime for IO tasks
135#[derive(Debug)]
136pub enum IORuntime {
137    Runtime(Runtime),
138    Handle(Handle),
139}
140
141impl From<Runtime> for IORuntime {
142    fn from(value: Runtime) -> Self {
143        Self::Runtime(value)
144    }
145}
146
147impl From<Handle> for IORuntime {
148    fn from(value: Handle) -> Self {
149        Self::Handle(value)
150    }
151}
152
153impl IORuntime {
154    fn handle(&self) -> Handle {
155        match self {
156            Self::Runtime(runtime) => runtime.handle().clone(),
157            Self::Handle(handle) => handle.clone(),
158        }
159    }
160}
161
162/// Builds a new [Client] instance. By default, configs will be loaded from the default config directories with the following precedence:
163/// - If the `HADOOP_CONF_DIR` environment variable is defined, configs will be loaded from `${HADOOP_CONF_DIR}/{core,hdfs}-site.xml`
164/// - If the `HADOOP_HOME` environment variable is defined, configs will be loaded from `${HADOOP_HOME}/etc/hadoop/{core,hdfs}-site.xml`
165/// - Otherwise no default configs are defined
166///
167/// If no URL is defined, the `fs.defaultFS` config must be defined and is used as the URL.
168///
169/// # Examples
170///
171/// Create a new client using the fs.defaultFS config
172/// ```rust
173/// # use hdfs_native::ClientBuilder;
174/// let client = ClientBuilder::new()
175///     .with_config(vec![("fs.defaultFS", "hdfs://127.0.0.1:9000")])
176///     .build()
177///     .unwrap();
178/// ```
179///
180/// Create a new client connecting to a specific URL:
181/// ```rust
182/// # use hdfs_native::ClientBuilder;
183/// let client = ClientBuilder::new()
184///     .with_url("hdfs://127.0.0.1:9000")
185///     .build()
186///     .unwrap();
187/// ```
188///
189/// Create a new client using a dedicated tokio runtime for spawned tasks and IO operations
190/// ```rust
191/// # use hdfs_native::ClientBuilder;
192/// let client = ClientBuilder::new()
193///     .with_url("hdfs://127.0.0.1:9000")
194///     .with_io_runtime(tokio::runtime::Runtime::new().unwrap())
195///     .build()
196///     .unwrap();
197/// ```
198#[derive(Default)]
199pub struct ClientBuilder {
200    url: Option<String>,
201    config: HashMap<String, String>,
202    runtime: Option<IORuntime>,
203}
204
205impl ClientBuilder {
206    /// Create a new [ClientBuilder]
207    pub fn new() -> Self {
208        Self::default()
209    }
210
211    /// Set the URL to connect to. Can be the address of a single NameNode, or a logical NameService
212    pub fn with_url(mut self, url: impl Into<String>) -> Self {
213        self.url = Some(url.into());
214        self
215    }
216
217    /// Set configs to use for the client. The provided configs will override any found in the default config files loaded
218    pub fn with_config(
219        mut self,
220        config: impl IntoIterator<Item = (impl Into<String>, impl Into<String>)>,
221    ) -> Self {
222        self.config = config
223            .into_iter()
224            .map(|(k, v)| (k.into(), v.into()))
225            .collect();
226        self
227    }
228
229    /// Use a dedicated tokio runtime for spawned tasks and IO operations. Can either take ownership of a whole [Runtime]
230    /// or take a [Handle] to an externally owned runtime.
231    pub fn with_io_runtime(mut self, runtime: impl Into<IORuntime>) -> Self {
232        self.runtime = Some(runtime.into());
233        self
234    }
235
236    /// Create the [Client] instance from the provided settings
237    pub fn build(self) -> Result<Client> {
238        let config = Configuration::new_with_config(self.config)?;
239        let url = if let Some(url) = self.url {
240            Url::parse(&url)?
241        } else {
242            Client::default_fs(&config)?
243        };
244
245        Client::build(&url, config, self.runtime)
246    }
247}
248
249#[derive(Clone, Debug)]
250enum RuntimeHolder {
251    Custom(Arc<IORuntime>),
252    Default(Arc<OnceLock<Runtime>>),
253}
254
255impl RuntimeHolder {
256    fn new(rt: Option<IORuntime>) -> Self {
257        if let Some(rt) = rt {
258            Self::Custom(Arc::new(rt))
259        } else {
260            Self::Default(Arc::new(OnceLock::new()))
261        }
262    }
263
264    fn get_handle(&self) -> Handle {
265        match self {
266            Self::Custom(rt) => rt.handle().clone(),
267            Self::Default(rt) => match Handle::try_current() {
268                Ok(handle) => handle,
269                Err(_) => rt
270                    .get_or_init(|| Runtime::new().expect("Failed to create tokio runtime"))
271                    .handle()
272                    .clone(),
273            },
274        }
275    }
276}
277
278/// A client to a speicific NameNode, NameService, or Viewfs mount table
279#[derive(Clone, Debug)]
280pub struct Client {
281    mount_table: Arc<MountTable>,
282    config: Arc<Configuration>,
283    // Store the runtime used for spawning all internal tasks. If we are not created
284    // inside a tokio runtime, we will create our own to use.
285    rt_holder: RuntimeHolder,
286}
287
288impl Client {
289    /// Creates a new HDFS Client. The URL must include the protocol and host, and optionally a port.
290    /// If a port is included, the host is treated as a single NameNode. If no port is included, the
291    /// host is treated as a name service that will be resolved using the HDFS config.
292    #[deprecated(since = "0.12.0", note = "Use ClientBuilder instead")]
293    pub fn new(url: &str) -> Result<Self> {
294        let parsed_url = Url::parse(url)?;
295        Self::build(&parsed_url, Configuration::new()?, None)
296    }
297
298    #[deprecated(since = "0.12.0", note = "Use ClientBuilder instead")]
299    pub fn new_with_config(url: &str, config: HashMap<String, String>) -> Result<Self> {
300        let parsed_url = Url::parse(url)?;
301        Self::build(&parsed_url, Configuration::new_with_config(config)?, None)
302    }
303
304    #[deprecated(since = "0.12.0", note = "Use ClientBuilder instead")]
305    pub fn default_with_config(config: HashMap<String, String>) -> Result<Self> {
306        let config = Configuration::new_with_config(config)?;
307        Self::build(&Self::default_fs(&config)?, config, None)
308    }
309
310    fn default_fs(config: &Configuration) -> Result<Url> {
311        let url = config
312            .get(config::DEFAULT_FS)
313            .ok_or(HdfsError::InvalidArgument(format!(
314                "No {} setting found",
315                config::DEFAULT_FS
316            )))?;
317        Ok(Url::parse(url)?)
318    }
319
320    fn build(url: &Url, config: Configuration, rt: Option<IORuntime>) -> Result<Self> {
321        let resolved_url = if !url.has_host() {
322            let default_url = Self::default_fs(&config)?;
323            if url.scheme() != default_url.scheme() || !default_url.has_host() {
324                return Err(HdfsError::InvalidArgument(
325                    "URL must contain a host".to_string(),
326                ));
327            }
328            default_url
329        } else {
330            url.clone()
331        };
332
333        let config = Arc::new(config);
334
335        let rt_holder = RuntimeHolder::new(rt);
336
337        let mount_table = match url.scheme() {
338            "hdfs" => {
339                let proxy = NameServiceProxy::new(
340                    &resolved_url,
341                    Arc::clone(&config),
342                    rt_holder.get_handle(),
343                )?;
344                let protocol = Arc::new(NamenodeProtocol::new(proxy, rt_holder.get_handle()));
345
346                MountTable {
347                    mounts: Vec::new(),
348                    fallback: MountLink::new("/", "/", protocol),
349                }
350            }
351            "viewfs" => Self::build_mount_table(
352                // Host is guaranteed to be present.
353                resolved_url.host_str().expect("URL must have a host"),
354                Arc::clone(&config),
355                rt_holder.get_handle(),
356            )?,
357            _ => {
358                return Err(HdfsError::InvalidArgument(
359                    "Only `hdfs` and `viewfs` schemes are supported".to_string(),
360                ));
361            }
362        };
363
364        Ok(Self {
365            mount_table: Arc::new(mount_table),
366            config,
367            rt_holder,
368        })
369    }
370
371    fn build_mount_table(
372        host: &str,
373        config: Arc<Configuration>,
374        handle: Handle,
375    ) -> Result<MountTable> {
376        let mut mounts: Vec<MountLink> = Vec::new();
377        let mut fallback: Option<MountLink> = None;
378
379        for (viewfs_path, hdfs_url) in config.get_mount_table(host).iter() {
380            let url = Url::parse(hdfs_url)?;
381            if !url.has_host() {
382                return Err(HdfsError::InvalidArgument(
383                    "URL must contain a host".to_string(),
384                ));
385            }
386            if url.scheme() != "hdfs" {
387                return Err(HdfsError::InvalidArgument(
388                    "Only hdfs mounts are supported for viewfs".to_string(),
389                ));
390            }
391            let proxy = NameServiceProxy::new(&url, Arc::clone(&config), handle.clone())?;
392            let protocol = Arc::new(NamenodeProtocol::new(proxy, handle.clone()));
393
394            if let Some(prefix) = viewfs_path {
395                mounts.push(MountLink::new(prefix, url.path(), protocol));
396            } else {
397                if fallback.is_some() {
398                    return Err(HdfsError::InvalidArgument(
399                        "Multiple viewfs fallback links found".to_string(),
400                    ));
401                }
402                fallback = Some(MountLink::new("/", url.path(), protocol));
403            }
404        }
405
406        if let Some(fallback) = fallback {
407            // Sort the mount table from longest viewfs path to shortest. This makes sure more specific paths are considered first.
408            mounts.sort_by_key(|m| m.viewfs_path.chars().filter(|c| *c == '/').count());
409            mounts.reverse();
410
411            Ok(MountTable { mounts, fallback })
412        } else {
413            Err(HdfsError::InvalidArgument(
414                "No viewfs fallback mount found".to_string(),
415            ))
416        }
417    }
418
419    /// Retrieve the file status for the file at `path`.
420    pub async fn get_file_info(&self, path: &str) -> Result<FileStatus> {
421        let (link, resolved_path) = self.mount_table.resolve(path);
422        match link.protocol.get_file_info(&resolved_path).await?.fs {
423            Some(status) => Ok(FileStatus::from(status, path)),
424            None => Err(HdfsError::FileNotFound(path.to_string())),
425        }
426    }
427
428    /// Retrives a list of all files in directories located at `path`. Wrapper around `list_status_iter` that
429    /// returns Err if any part of the stream fails, or Ok if all file statuses were found successfully.
430    pub async fn list_status(&self, path: &str, recursive: bool) -> Result<Vec<FileStatus>> {
431        let iter = self.list_status_iter(path, recursive);
432        let statuses = iter
433            .into_stream()
434            .collect::<Vec<Result<FileStatus>>>()
435            .await;
436
437        let mut resolved_statues = Vec::<FileStatus>::with_capacity(statuses.len());
438        for status in statuses.into_iter() {
439            resolved_statues.push(status?);
440        }
441
442        Ok(resolved_statues)
443    }
444
445    /// Retrives an iterator of all files in directories located at `path`.
446    pub fn list_status_iter(&self, path: &str, recursive: bool) -> ListStatusIterator {
447        ListStatusIterator::new(path.to_string(), Arc::clone(&self.mount_table), recursive)
448    }
449
450    /// Opens a file reader for the file at `path`. Path should not include a scheme.
451    pub async fn read(&self, path: &str) -> Result<FileReader> {
452        let (link, resolved_path) = self.mount_table.resolve(path);
453        // Get all block locations. Length is actually a signed value, but the proto uses an unsigned value
454        let located_info = link
455            .protocol
456            .get_block_locations(&resolved_path, 0, i64::MAX as u64)
457            .await?;
458
459        if let Some(locations) = located_info.locations {
460            let ec_schema = if let Some(ec_policy) = locations.ec_policy.as_ref() {
461                Some(resolve_ec_policy(ec_policy)?)
462            } else {
463                None
464            };
465
466            if locations.file_encryption_info.is_some() {
467                return Err(HdfsError::UnsupportedFeature("File encryption".to_string()));
468            }
469
470            Ok(FileReader::new(
471                Arc::clone(&link.protocol),
472                locations,
473                ec_schema,
474                Arc::clone(&self.config),
475                self.rt_holder.get_handle(),
476            ))
477        } else {
478            Err(HdfsError::FileNotFound(path.to_string()))
479        }
480    }
481
482    /// Opens a new file for writing. See [WriteOptions] for options and behavior for different
483    /// scenarios.
484    pub async fn create(
485        &self,
486        src: &str,
487        write_options: impl AsRef<WriteOptions>,
488    ) -> Result<FileWriter> {
489        let write_options = write_options.as_ref();
490
491        let (link, resolved_path) = self.mount_table.resolve(src);
492
493        let create_response = link
494            .protocol
495            .create(
496                &resolved_path,
497                write_options.permission,
498                write_options.overwrite,
499                write_options.create_parent,
500                write_options.replication,
501                write_options.block_size,
502            )
503            .await?;
504
505        match create_response.fs {
506            Some(status) => {
507                if status.file_encryption_info.is_some() {
508                    let _ = self.delete(src, false).await;
509                    return Err(HdfsError::UnsupportedFeature("File encryption".to_string()));
510                }
511
512                Ok(FileWriter::new(
513                    Arc::clone(&link.protocol),
514                    resolved_path,
515                    status,
516                    Arc::clone(&self.config),
517                    self.rt_holder.get_handle(),
518                    None,
519                ))
520            }
521            None => Err(HdfsError::FileNotFound(src.to_string())),
522        }
523    }
524
525    fn needs_new_block(class: &str, msg: &str) -> bool {
526        class == "java.lang.UnsupportedOperationException" && msg.contains("NEW_BLOCK")
527    }
528
529    /// Opens an existing file for appending. An Err will be returned if the file does not exist. If the
530    /// file is replicated, the current block will be appended to until it is full. If the file is erasure
531    /// coded, a new block will be created.
532    pub async fn append(&self, src: &str) -> Result<FileWriter> {
533        let (link, resolved_path) = self.mount_table.resolve(src);
534
535        // Assume the file is replicated and try to append to the current block. If the file is
536        // erasure coded, then try again by appending to a new block.
537        let append_response = match link.protocol.append(&resolved_path, false).await {
538            Err(HdfsError::RPCError(class, msg)) if Self::needs_new_block(&class, &msg) => {
539                link.protocol.append(&resolved_path, true).await?
540            }
541            resp => resp?,
542        };
543
544        match append_response.stat {
545            Some(status) => {
546                if status.file_encryption_info.is_some() {
547                    let _ = link
548                        .protocol
549                        .complete(src, append_response.block.map(|b| b.b), status.file_id)
550                        .await;
551                    return Err(HdfsError::UnsupportedFeature("File encryption".to_string()));
552                }
553
554                Ok(FileWriter::new(
555                    Arc::clone(&link.protocol),
556                    resolved_path,
557                    status,
558                    Arc::clone(&self.config),
559                    self.rt_holder.get_handle(),
560                    append_response.block,
561                ))
562            }
563            None => Err(HdfsError::FileNotFound(src.to_string())),
564        }
565    }
566
567    /// Create a new directory at `path` with the given `permission`.
568    ///
569    /// `permission` is the raw octal value representing the Unix style permission. For example, to
570    /// set 755 (`rwxr-x-rx`) permissions, use 0o755.
571    ///
572    /// If `create_parent` is true, any missing parent directories will be created as well,
573    /// otherwise an error will be returned if the parent directory doesn't already exist.
574    pub async fn mkdirs(&self, path: &str, permission: u32, create_parent: bool) -> Result<()> {
575        let (link, resolved_path) = self.mount_table.resolve(path);
576        link.protocol
577            .mkdirs(&resolved_path, permission, create_parent)
578            .await
579            .map(|_| ())
580    }
581
582    /// Renames `src` to `dst`. Returns Ok(()) on success, and Err otherwise.
583    pub async fn rename(&self, src: &str, dst: &str, overwrite: bool) -> Result<()> {
584        let (src_link, src_resolved_path) = self.mount_table.resolve(src);
585        let (dst_link, dst_resolved_path) = self.mount_table.resolve(dst);
586        if src_link.viewfs_path == dst_link.viewfs_path {
587            src_link
588                .protocol
589                .rename(&src_resolved_path, &dst_resolved_path, overwrite)
590                .await
591                .map(|_| ())
592        } else {
593            Err(HdfsError::InvalidArgument(
594                "Cannot rename across different name services".to_string(),
595            ))
596        }
597    }
598
599    /// Deletes the file or directory at `path`. If `recursive` is false and `path` is a non-empty
600    /// directory, this will fail. Returns `Ok(true)` if it was successfully deleted.
601    pub async fn delete(&self, path: &str, recursive: bool) -> Result<bool> {
602        let (link, resolved_path) = self.mount_table.resolve(path);
603        link.protocol
604            .delete(&resolved_path, recursive)
605            .await
606            .map(|r| r.result)
607    }
608
609    /// Sets the modified and access times for a file. Times should be in milliseconds from the epoch.
610    pub async fn set_times(&self, path: &str, mtime: u64, atime: u64) -> Result<()> {
611        let (link, resolved_path) = self.mount_table.resolve(path);
612        link.protocol
613            .set_times(&resolved_path, mtime, atime)
614            .await?;
615        Ok(())
616    }
617
618    /// Optionally sets the owner and group for a file.
619    pub async fn set_owner(
620        &self,
621        path: &str,
622        owner: Option<&str>,
623        group: Option<&str>,
624    ) -> Result<()> {
625        let (link, resolved_path) = self.mount_table.resolve(path);
626        link.protocol
627            .set_owner(&resolved_path, owner, group)
628            .await?;
629        Ok(())
630    }
631
632    /// Sets permissions for a file. Permission should be an octal number reprenting the Unix style
633    /// permission.
634    ///
635    /// For example, to set permissions to rwxr-xr-x:
636    /// ```rust
637    /// # async fn func() {
638    /// # let client = hdfs_native::Client::new("localhost:9000").unwrap();
639    /// client.set_permission("/path", 0o755).await.unwrap();
640    /// }
641    /// ```
642    pub async fn set_permission(&self, path: &str, permission: u32) -> Result<()> {
643        let (link, resolved_path) = self.mount_table.resolve(path);
644        link.protocol
645            .set_permission(&resolved_path, permission)
646            .await?;
647        Ok(())
648    }
649
650    /// Sets the replication for a file.
651    pub async fn set_replication(&self, path: &str, replication: u32) -> Result<bool> {
652        let (link, resolved_path) = self.mount_table.resolve(path);
653        let result = link
654            .protocol
655            .set_replication(&resolved_path, replication)
656            .await?
657            .result;
658
659        Ok(result)
660    }
661
662    /// Gets a content summary for a file or directory rooted at `path`.
663    pub async fn get_content_summary(&self, path: &str) -> Result<ContentSummary> {
664        let (link, resolved_path) = self.mount_table.resolve(path);
665        let result = link
666            .protocol
667            .get_content_summary(&resolved_path)
668            .await?
669            .summary;
670
671        Ok(result.into())
672    }
673
674    /// Update ACL entries for file or directory at `path`. Existing entries will remain.
675    pub async fn modify_acl_entries(&self, path: &str, acl_spec: Vec<AclEntry>) -> Result<()> {
676        let (link, resolved_path) = self.mount_table.resolve(path);
677        link.protocol
678            .modify_acl_entries(&resolved_path, acl_spec)
679            .await?;
680
681        Ok(())
682    }
683
684    /// Remove specific ACL entries for file or directory at `path`.
685    pub async fn remove_acl_entries(&self, path: &str, acl_spec: Vec<AclEntry>) -> Result<()> {
686        let (link, resolved_path) = self.mount_table.resolve(path);
687        link.protocol
688            .remove_acl_entries(&resolved_path, acl_spec)
689            .await?;
690
691        Ok(())
692    }
693
694    /// Remove all default ACLs for file or directory at `path`.
695    pub async fn remove_default_acl(&self, path: &str) -> Result<()> {
696        let (link, resolved_path) = self.mount_table.resolve(path);
697        link.protocol.remove_default_acl(&resolved_path).await?;
698
699        Ok(())
700    }
701
702    /// Remove all ACL entries for file or directory at `path`.
703    pub async fn remove_acl(&self, path: &str) -> Result<()> {
704        let (link, resolved_path) = self.mount_table.resolve(path);
705        link.protocol.remove_acl(&resolved_path).await?;
706
707        Ok(())
708    }
709
710    /// Override all ACL entries for file or directory at `path`. If only access ACLs are provided,
711    /// default ACLs are maintained. Likewise if only default ACLs are provided, access ACLs are
712    /// maintained.
713    pub async fn set_acl(&self, path: &str, acl_spec: Vec<AclEntry>) -> Result<()> {
714        let (link, resolved_path) = self.mount_table.resolve(path);
715        link.protocol.set_acl(&resolved_path, acl_spec).await?;
716
717        Ok(())
718    }
719
720    /// Get the ACL status for the file or directory at `path`.
721    pub async fn get_acl_status(&self, path: &str) -> Result<AclStatus> {
722        let (link, resolved_path) = self.mount_table.resolve(path);
723        Ok(link
724            .protocol
725            .get_acl_status(&resolved_path)
726            .await?
727            .result
728            .into())
729    }
730
731    /// Get all file statuses matching the glob `pattern`. Supports Hadoop-style globbing
732    /// which only applies to individual components of a path.
733    pub async fn glob_status(&self, pattern: &str) -> Result<Vec<FileStatus>> {
734        // Expand any brace groups first
735        let flattened = expand_glob(pattern.to_string())?;
736
737        let mut results: Vec<FileStatus> = Vec::new();
738
739        for flat in flattened.into_iter() {
740            // Make the pattern absolute-ish. We keep the pattern as-is; components
741            // will be split on '/'. An empty pattern yields no results.
742            if flat.is_empty() {
743                continue;
744            }
745
746            let components = get_path_components(&flat);
747
748            // Candidate holds a path (fully built so far) and optionally a resolved FileStatus
749            #[derive(Clone, Debug)]
750            struct Candidate {
751                path: String,
752                status: Option<FileStatus>,
753            }
754
755            // Start from the root placeholder
756            let mut candidates: Vec<Candidate> = vec![Candidate {
757                path: "/".to_string(),
758                status: None,
759            }];
760
761            for (idx, comp) in components.iter().enumerate() {
762                if candidates.is_empty() {
763                    break;
764                }
765
766                let is_last = idx == components.len() - 1;
767
768                let unescaped = unescape_component(comp);
769                let glob_pat = GlobPattern::new(comp)?;
770
771                if !is_last && !glob_pat.has_wildcard() {
772                    // Optimization: just append the literal component to each candidate
773                    for cand in candidates.iter_mut() {
774                        if !cand.path.ends_with('/') {
775                            cand.path.push('/');
776                        }
777                        cand.path.push_str(&unescaped);
778                        // keep status as None (we'll resolve later if needed)
779                    }
780                    continue;
781                }
782
783                let mut new_candidates: Vec<Candidate> = Vec::new();
784
785                for cand in candidates.into_iter() {
786                    if glob_pat.has_wildcard() {
787                        // List the directory represented by cand.path
788                        let listing = match self.list_status(&cand.path, false).await {
789                            Ok(listing) => listing,
790                            Err(HdfsError::FileNotFound(_)) => continue,
791                            Err(e) => return Err(e),
792                        };
793                        if listing.len() == 1 && listing[0].path == cand.path {
794                            // listing corresponds to the candidate itself (file), skip
795                            continue;
796                        }
797
798                        for child in listing.into_iter() {
799                            // If this is not the terminal component, only recurse into directories
800                            if !is_last && !child.isdir {
801                                continue;
802                            }
803
804                            // child.path already contains the full path
805                            // Extract the name portion to match against the glob pattern
806                            let name = child
807                                .path
808                                .rsplit_once('/')
809                                .map(|(_, n)| n)
810                                .unwrap_or(child.path.as_str());
811
812                            if glob_pat.matches(name) {
813                                new_candidates.push(Candidate {
814                                    path: child.path.clone(),
815                                    status: Some(child),
816                                });
817                            }
818                        }
819                    } else {
820                        // Non-glob component: use get_file_info for exact path
821                        let mut next_path = cand.path.clone();
822                        if !next_path.ends_with('/') {
823                            next_path.push('/');
824                        }
825                        next_path.push_str(&unescaped);
826
827                        match self.get_file_info(&next_path).await {
828                            Ok(status) => {
829                                if is_last || status.isdir {
830                                    new_candidates.push(Candidate {
831                                        path: status.path.clone(),
832                                        status: Some(status),
833                                    });
834                                }
835                            }
836                            Err(HdfsError::FileNotFound(_)) => continue,
837                            Err(e) => return Err(e),
838                        }
839                    }
840                }
841
842                candidates = new_candidates;
843            }
844
845            // Resolve any placeholder candidates (including root) and collect results
846            for cand in candidates.into_iter() {
847                let status = if let Some(s) = cand.status {
848                    s
849                } else {
850                    // Try to resolve the path to a real FileStatus
851                    match self.get_file_info(&cand.path).await {
852                        Ok(s) => s,
853                        Err(HdfsError::FileNotFound(_)) => continue,
854                        Err(e) => return Err(e),
855                    }
856                };
857
858                results.push(status);
859            }
860        }
861
862        Ok(results)
863    }
864}
865
866impl Default for Client {
867    /// Creates a new HDFS Client based on the fs.defaultFS setting. Panics if the config files fail to load,
868    /// no defaultFS is defined, or the defaultFS is invalid.
869    fn default() -> Self {
870        ClientBuilder::new()
871            .build()
872            .expect("Failed to create default client")
873    }
874}
875
876pub(crate) struct DirListingIterator {
877    path: String,
878    resolved_path: String,
879    link: MountLink,
880    files_only: bool,
881    partial_listing: VecDeque<HdfsFileStatusProto>,
882    remaining: u32,
883    last_seen: Vec<u8>,
884}
885
886impl DirListingIterator {
887    fn new(path: String, mount_table: &Arc<MountTable>, files_only: bool) -> Self {
888        let (link, resolved_path) = mount_table.resolve(&path);
889
890        DirListingIterator {
891            path,
892            resolved_path,
893            link: link.clone(),
894            files_only,
895            partial_listing: VecDeque::new(),
896            remaining: 1,
897            last_seen: Vec::new(),
898        }
899    }
900
901    async fn get_next_batch(&mut self) -> Result<bool> {
902        let listing = self
903            .link
904            .protocol
905            .get_listing(&self.resolved_path, self.last_seen.clone(), false)
906            .await?;
907
908        if let Some(dir_list) = listing.dir_list {
909            self.last_seen = dir_list
910                .partial_listing
911                .last()
912                .map(|p| p.path.clone())
913                .unwrap_or(Vec::new());
914
915            self.remaining = dir_list.remaining_entries;
916
917            self.partial_listing = dir_list
918                .partial_listing
919                .into_iter()
920                .filter(|s| !self.files_only || s.file_type() != FileType::IsDir)
921                .collect();
922            Ok(!self.partial_listing.is_empty())
923        } else {
924            Err(HdfsError::FileNotFound(self.path.clone()))
925        }
926    }
927
928    pub async fn next(&mut self) -> Option<Result<FileStatus>> {
929        if self.partial_listing.is_empty()
930            && self.remaining > 0
931            && let Err(error) = self.get_next_batch().await
932        {
933            self.remaining = 0;
934            return Some(Err(error));
935        }
936        if let Some(next) = self.partial_listing.pop_front() {
937            Some(Ok(FileStatus::from(next, &self.path)))
938        } else {
939            None
940        }
941    }
942}
943
944pub struct ListStatusIterator {
945    mount_table: Arc<MountTable>,
946    recursive: bool,
947    iters: Arc<tokio::sync::Mutex<Vec<DirListingIterator>>>,
948}
949
950impl ListStatusIterator {
951    fn new(path: String, mount_table: Arc<MountTable>, recursive: bool) -> Self {
952        let initial = DirListingIterator::new(path.clone(), &mount_table, false);
953
954        ListStatusIterator {
955            mount_table,
956            recursive,
957            iters: Arc::new(tokio::sync::Mutex::new(vec![initial])),
958        }
959    }
960
961    pub async fn next(&self) -> Option<Result<FileStatus>> {
962        let mut next_file: Option<Result<FileStatus>> = None;
963        let mut iters = self.iters.lock().await;
964        while next_file.is_none() {
965            if let Some(iter) = iters.last_mut() {
966                if let Some(file_result) = iter.next().await {
967                    if let Ok(file) = file_result {
968                        // Return the directory as the next result, but start traversing into that directory
969                        // next if we're doing a recursive listing
970                        if file.isdir && self.recursive {
971                            iters.push(DirListingIterator::new(
972                                file.path.clone(),
973                                &self.mount_table,
974                                false,
975                            ))
976                        }
977                        next_file = Some(Ok(file));
978                    } else {
979                        // Error, return that as the next element
980                        next_file = Some(file_result)
981                    }
982                } else {
983                    // We've exhausted this directory
984                    iters.pop();
985                }
986            } else {
987                // There's nothing left, just return None
988                break;
989            }
990        }
991
992        next_file
993    }
994
995    pub fn into_stream(self) -> BoxStream<'static, Result<FileStatus>> {
996        let listing = stream::unfold(self, |state| async move {
997            let next = state.next().await;
998            next.map(|n| (n, state))
999        });
1000        Box::pin(listing)
1001    }
1002}
1003
1004#[derive(Debug, Clone)]
1005pub struct FileStatus {
1006    pub path: String,
1007    pub length: usize,
1008    pub isdir: bool,
1009    pub permission: u16,
1010    pub owner: String,
1011    pub group: String,
1012    pub modification_time: u64,
1013    pub access_time: u64,
1014    pub replication: Option<u32>,
1015    pub blocksize: Option<u64>,
1016}
1017
1018impl FileStatus {
1019    fn from(value: HdfsFileStatusProto, base_path: &str) -> Self {
1020        let mut path = base_path.trim_end_matches("/").to_string();
1021        let relative_path = std::str::from_utf8(&value.path).unwrap();
1022        if !relative_path.is_empty() {
1023            path.push('/');
1024            path.push_str(relative_path);
1025        }
1026
1027        // Root path should be a slash
1028        if path.is_empty() {
1029            path.push('/');
1030        }
1031
1032        FileStatus {
1033            isdir: value.file_type() == FileType::IsDir,
1034            path,
1035            length: value.length as usize,
1036            permission: value.permission.perm as u16,
1037            owner: value.owner,
1038            group: value.group,
1039            modification_time: value.modification_time,
1040            access_time: value.access_time,
1041            replication: value.block_replication,
1042            blocksize: value.blocksize,
1043        }
1044    }
1045}
1046
1047#[derive(Debug)]
1048pub struct ContentSummary {
1049    pub length: u64,
1050    pub file_count: u64,
1051    pub directory_count: u64,
1052    pub quota: u64,
1053    pub space_consumed: u64,
1054    pub space_quota: u64,
1055}
1056
1057impl From<ContentSummaryProto> for ContentSummary {
1058    fn from(value: ContentSummaryProto) -> Self {
1059        ContentSummary {
1060            length: value.length,
1061            file_count: value.file_count,
1062            directory_count: value.directory_count,
1063            quota: value.quota,
1064            space_consumed: value.space_consumed,
1065            space_quota: value.space_quota,
1066        }
1067    }
1068}
1069
1070#[cfg(test)]
1071mod test {
1072    use std::sync::{Arc, LazyLock};
1073
1074    use tokio::runtime::Runtime;
1075    use url::Url;
1076
1077    use crate::{
1078        client::ClientBuilder,
1079        common::config::Configuration,
1080        hdfs::{protocol::NamenodeProtocol, proxy::NameServiceProxy},
1081    };
1082
1083    use super::{MountLink, MountTable};
1084
1085    static RT: LazyLock<Runtime> = LazyLock::new(|| Runtime::new().unwrap());
1086
1087    fn create_protocol(url: &str) -> Arc<NamenodeProtocol> {
1088        let proxy = NameServiceProxy::new(
1089            &Url::parse(url).unwrap(),
1090            Arc::new(Configuration::new().unwrap()),
1091            RT.handle().clone(),
1092        )
1093        .unwrap();
1094        Arc::new(NamenodeProtocol::new(proxy, RT.handle().clone()))
1095    }
1096
1097    #[test]
1098    fn test_default_fs() {
1099        assert!(
1100            ClientBuilder::new()
1101                .with_config(vec![("fs.defaultFS", "hdfs://test:9000")])
1102                .build()
1103                .is_ok()
1104        );
1105
1106        assert!(
1107            ClientBuilder::new()
1108                .with_config(vec![("fs.defaultFS", "hdfs://")])
1109                .build()
1110                .is_err()
1111        );
1112
1113        assert!(
1114            ClientBuilder::new()
1115                .with_url("hdfs://")
1116                .with_config(vec![("fs.defaultFS", "hdfs://test:9000")])
1117                .build()
1118                .is_ok()
1119        );
1120
1121        assert!(
1122            ClientBuilder::new()
1123                .with_url("hdfs://")
1124                .with_config(vec![("fs.defaultFS", "hdfs://")])
1125                .build()
1126                .is_err()
1127        );
1128
1129        assert!(
1130            ClientBuilder::new()
1131                .with_url("hdfs://")
1132                .with_config(vec![("fs.defaultFS", "viewfs://test")])
1133                .build()
1134                .is_err()
1135        );
1136    }
1137
1138    #[test]
1139    fn test_mount_link_resolve() {
1140        let protocol = create_protocol("hdfs://127.0.0.1:9000");
1141        let link = MountLink::new("/view", "/hdfs", protocol);
1142
1143        assert_eq!(link.resolve("/view/dir/file").unwrap(), "/hdfs/dir/file");
1144        assert_eq!(link.resolve("/view").unwrap(), "/hdfs");
1145        assert!(link.resolve("/hdfs/path").is_none());
1146    }
1147
1148    #[test]
1149    fn test_fallback_link() {
1150        let protocol = create_protocol("hdfs://127.0.0.1:9000");
1151        let link = MountLink::new("", "/hdfs", Arc::clone(&protocol));
1152
1153        assert_eq!(link.resolve("/path/to/file").unwrap(), "/hdfs/path/to/file");
1154        assert_eq!(link.resolve("/").unwrap(), "/hdfs/");
1155        assert_eq!(link.resolve("/hdfs/path").unwrap(), "/hdfs/hdfs/path");
1156
1157        let link = MountLink::new("", "", protocol);
1158        assert_eq!(link.resolve("/").unwrap(), "/");
1159    }
1160
1161    #[test]
1162    fn test_mount_table_resolve() {
1163        let link1 = MountLink::new(
1164            "/mount1",
1165            "/path1/nested",
1166            create_protocol("hdfs://127.0.0.1:9000"),
1167        );
1168        let link2 = MountLink::new(
1169            "/mount2",
1170            "/path2",
1171            create_protocol("hdfs://127.0.0.1:9001"),
1172        );
1173        let link3 = MountLink::new(
1174            "/mount3/nested",
1175            "/path3",
1176            create_protocol("hdfs://127.0.0.1:9002"),
1177        );
1178        let fallback = MountLink::new("/", "/path4", create_protocol("hdfs://127.0.0.1:9003"));
1179
1180        let mount_table = MountTable {
1181            mounts: vec![link1, link2, link3],
1182            fallback,
1183        };
1184
1185        // Exact mount path resolves to the exact HDFS path
1186        let (link, resolved) = mount_table.resolve("/mount1");
1187        assert_eq!(link.viewfs_path, "/mount1");
1188        assert_eq!(resolved, "/path1/nested");
1189
1190        // Trailing slash is treated the same
1191        let (link, resolved) = mount_table.resolve("/mount1/");
1192        assert_eq!(link.viewfs_path, "/mount1");
1193        assert_eq!(resolved, "/path1/nested/");
1194
1195        // Doesn't do partial matches on a directory name
1196        let (link, resolved) = mount_table.resolve("/mount12");
1197        assert_eq!(link.viewfs_path, "");
1198        assert_eq!(resolved, "/path4/mount12");
1199
1200        let (link, resolved) = mount_table.resolve("/mount3/file");
1201        assert_eq!(link.viewfs_path, "");
1202        assert_eq!(resolved, "/path4/mount3/file");
1203
1204        let (link, resolved) = mount_table.resolve("/mount3/nested/file");
1205        assert_eq!(link.viewfs_path, "/mount3/nested");
1206        assert_eq!(resolved, "/path3/file");
1207    }
1208
1209    #[test]
1210    fn test_io_runtime() {
1211        assert!(
1212            ClientBuilder::new()
1213                .with_url("hdfs://127.0.0.1:9000")
1214                .with_io_runtime(Runtime::new().unwrap())
1215                .build()
1216                .is_ok()
1217        );
1218
1219        let rt = Runtime::new().unwrap();
1220        assert!(
1221            ClientBuilder::new()
1222                .with_url("hdfs://127.0.0.1:9000")
1223                .with_io_runtime(rt.handle().clone())
1224                .build()
1225                .is_ok()
1226        );
1227    }
1228}