Skip to main content

hdfs_native/
client.rs

1use std::collections::{HashMap, VecDeque};
2use std::sync::{Arc, OnceLock};
3
4use futures::stream::BoxStream;
5use futures::{StreamExt, stream};
6use tokio::runtime::{Handle, Runtime};
7use url::Url;
8
9use crate::acl::{AclEntry, AclStatus};
10use crate::common::config::{self, Configuration};
11use crate::ec::resolve_ec_policy;
12use crate::error::{HdfsError, Result};
13use crate::file::{FileReader, FileWriter};
14use crate::hdfs::protocol::NamenodeProtocol;
15use crate::hdfs::proxy::NameServiceProxy;
16use crate::proto::hdfs::hdfs_file_status_proto::FileType;
17use crate::security::user::User;
18
19use crate::glob::{GlobPattern, expand_glob, get_path_components, unescape_component};
20use crate::proto::hdfs::{ContentSummaryProto, HdfsFileStatusProto};
21
22#[derive(Clone)]
23pub struct WriteOptions {
24    /// Block size. Default is retrieved from the server.
25    pub block_size: Option<u64>,
26    /// Replication factor. Default is retrieved from the server.
27    pub replication: Option<u32>,
28    /// Unix file permission, defaults to 0o644, which is "rw-r--r--" as a Unix permission.
29    /// This is the raw octal value represented in base 10.
30    pub permission: u32,
31    /// Whether to overwrite the file, defaults to false. If true and the
32    /// file does not exist, it will result in an error.
33    pub overwrite: bool,
34    /// Whether to create any missing parent directories, defaults to true. If false
35    /// and the parent directory does not exist, an error will be returned.
36    pub create_parent: bool,
37}
38
39impl Default for WriteOptions {
40    fn default() -> Self {
41        Self {
42            block_size: None,
43            replication: None,
44            permission: 0o644,
45            overwrite: false,
46            create_parent: true,
47        }
48    }
49}
50
51impl AsRef<WriteOptions> for WriteOptions {
52    fn as_ref(&self) -> &WriteOptions {
53        self
54    }
55}
56
57impl WriteOptions {
58    /// Set the block_size for the new file
59    pub fn block_size(mut self, block_size: u64) -> Self {
60        self.block_size = Some(block_size);
61        self
62    }
63
64    /// Set the replication for the new file
65    pub fn replication(mut self, replication: u32) -> Self {
66        self.replication = Some(replication);
67        self
68    }
69
70    /// Set the raw octal permission value for the new file
71    pub fn permission(mut self, permission: u32) -> Self {
72        self.permission = permission;
73        self
74    }
75
76    /// Set whether to overwrite an existing file
77    pub fn overwrite(mut self, overwrite: bool) -> Self {
78        self.overwrite = overwrite;
79        self
80    }
81
82    /// Set whether to create all missing parent directories
83    pub fn create_parent(mut self, create_parent: bool) -> Self {
84        self.create_parent = create_parent;
85        self
86    }
87}
88
89#[derive(Debug, Clone)]
90struct MountLink {
91    viewfs_path: String,
92    hdfs_path: String,
93    protocol: Arc<NamenodeProtocol>,
94}
95
96impl MountLink {
97    fn new(viewfs_path: &str, hdfs_path: &str, protocol: Arc<NamenodeProtocol>) -> Self {
98        // We should never have an empty path, we always want things mounted at root ("/") by default.
99        Self {
100            viewfs_path: viewfs_path.trim_end_matches("/").to_string(),
101            hdfs_path: hdfs_path.trim_end_matches("/").to_string(),
102            protocol,
103        }
104    }
105    /// Convert a viewfs path into a name service path if it matches this link
106    fn resolve(&self, path: &str) -> Option<String> {
107        // Make sure we don't partially match the last component. It either needs to be an exact
108        // match to a viewfs path, or needs to match with a trailing slash
109        if path == self.viewfs_path {
110            Some(self.hdfs_path.clone())
111        } else {
112            path.strip_prefix(&format!("{}/", self.viewfs_path))
113                .map(|relative_path| format!("{}/{}", &self.hdfs_path, relative_path))
114        }
115    }
116}
117
118#[derive(Debug)]
119struct MountTable {
120    mounts: Vec<MountLink>,
121    fallback: MountLink,
122    home_dir: String,
123}
124
125impl MountTable {
126    fn resolve(&self, src: &str) -> (&MountLink, String) {
127        let path = if src.starts_with('/') {
128            src.to_string()
129        } else {
130            format!("{}/{}", self.home_dir, src)
131        };
132
133        for link in self.mounts.iter() {
134            if let Some(resolved) = link.resolve(&path) {
135                return (link, resolved);
136            }
137        }
138        (&self.fallback, self.fallback.resolve(&path).unwrap())
139    }
140}
141
142fn build_home_dir(
143    scheme: &str,
144    host: Option<&str>,
145    config: &Configuration,
146    username: &str,
147) -> String {
148    let prefix = match scheme {
149        "hdfs" => config.get("dfs.user.home.dir.prefix"),
150        "viewfs" => {
151            host.and_then(|host| config.get(&format!("fs.viewfs.mounttable.{host}.homedir")))
152        }
153        _ => None,
154    }
155    .unwrap_or("/user");
156
157    let prefix = prefix.trim_end_matches('/');
158    if prefix.is_empty() {
159        format!("/{username}")
160    } else {
161        format!("{prefix}/{username}")
162    }
163}
164
165/// Holds either a [Runtime] or a [Handle] to an existing runtime for IO tasks
166#[derive(Debug)]
167pub enum IORuntime {
168    Runtime(Runtime),
169    Handle(Handle),
170}
171
172impl From<Runtime> for IORuntime {
173    fn from(value: Runtime) -> Self {
174        Self::Runtime(value)
175    }
176}
177
178impl From<Handle> for IORuntime {
179    fn from(value: Handle) -> Self {
180        Self::Handle(value)
181    }
182}
183
184impl IORuntime {
185    fn handle(&self) -> Handle {
186        match self {
187            Self::Runtime(runtime) => runtime.handle().clone(),
188            Self::Handle(handle) => handle.clone(),
189        }
190    }
191}
192
193/// Builds a new [Client] instance. Configs will be loaded with the following precedence:
194///
195/// - If method `ClientBuilder::with_config_dir` is invoked, configs will be loaded from `${config_dir}/{core,hdfs}-site.xml`
196/// - If the `HADOOP_CONF_DIR` environment variable is defined, configs will be loaded from `${HADOOP_CONF_DIR}/{core,hdfs}-site.xml`
197/// - If the `HADOOP_HOME` environment variable is defined, configs will be loaded from `${HADOOP_HOME}/etc/hadoop/{core,hdfs}-site.xml`
198/// - Otherwise no configs are defined
199///
200/// Finally, configs set by `with_config` will override the configs loaded above.
201///
202/// If no URL is defined, the `fs.defaultFS` config must be defined and is used as the URL.
203///
204/// # Examples
205///
206/// Create a new client with given config directory
207///
208/// ```rust,no_run
209/// # use hdfs_native::ClientBuilder;
210/// let client = ClientBuilder::new()
211///     .with_config_dir("/opt/hadoop/etc/hadoop")
212///     .build()
213///     .unwrap();
214/// ```
215///
216/// Create a new client with the environment variable
217///
218/// ```rust,no_run
219/// # use hdfs_native::ClientBuilder;
220/// unsafe { std::env::set_var("HADOOP_CONF_DIR", "/opt/hadoop/etc/hadoop") };
221/// let client = ClientBuilder::new()
222///     .build()
223///     .unwrap();
224/// ```
225///
226/// Create a new client using the fs.defaultFS config
227///
228/// ```rust
229/// # use hdfs_native::ClientBuilder;
230/// let client = ClientBuilder::new()
231///     .with_config(vec![("fs.defaultFS", "hdfs://127.0.0.1:9000")])
232///     .build()
233///     .unwrap();
234/// ```
235///
236/// Create a new client connecting to a specific URL:
237///
238/// ```rust
239/// # use hdfs_native::ClientBuilder;
240/// let client = ClientBuilder::new()
241///     .with_url("hdfs://127.0.0.1:9000")
242///     .build()
243///     .unwrap();
244/// ```
245///
246/// Create a new client using a dedicated tokio runtime for spawned tasks and IO operations
247///
248/// ```rust
249/// # use hdfs_native::ClientBuilder;
250/// let client = ClientBuilder::new()
251///     .with_url("hdfs://127.0.0.1:9000")
252///     .with_io_runtime(tokio::runtime::Runtime::new().unwrap())
253///     .build()
254///     .unwrap();
255/// ```
256#[derive(Default)]
257pub struct ClientBuilder {
258    url: Option<String>,
259    config: Option<HashMap<String, String>>,
260    config_dir: Option<String>,
261    runtime: Option<IORuntime>,
262}
263
264impl ClientBuilder {
265    /// Create a new [ClientBuilder]
266    pub fn new() -> Self {
267        Self::default()
268    }
269
270    /// Set the URL to connect to. Can be the address of a single NameNode, or a logical NameService
271    pub fn with_url(mut self, url: impl Into<String>) -> Self {
272        self.url = Some(url.into());
273        self
274    }
275
276    /// Set configs to use for the client. The provided configs will override any found in the config files loaded
277    pub fn with_config(
278        mut self,
279        config: impl IntoIterator<Item = (impl Into<String>, impl Into<String>)>,
280    ) -> Self {
281        self.config = Some(
282            config
283                .into_iter()
284                .map(|(k, v)| (k.into(), v.into()))
285                .collect(),
286        );
287        self
288    }
289
290    /// Set the configration directory path to read from. The provided path will override the one provided by environment variable.
291    pub fn with_config_dir(mut self, config_dir: impl Into<String>) -> Self {
292        self.config_dir = Some(config_dir.into());
293        self
294    }
295
296    /// Use a dedicated tokio runtime for spawned tasks and IO operations. Can either take ownership of a whole [Runtime]
297    /// or take a [Handle] to an externally owned runtime.
298    pub fn with_io_runtime(mut self, runtime: impl Into<IORuntime>) -> Self {
299        self.runtime = Some(runtime.into());
300        self
301    }
302
303    /// Create the [Client] instance from the provided settings
304    pub fn build(self) -> Result<Client> {
305        let config = Configuration::new(self.config_dir, self.config)?;
306        let url = if let Some(url) = self.url {
307            Url::parse(&url)?
308        } else {
309            Client::default_fs(&config)?
310        };
311
312        Client::build(&url, config, self.runtime)
313    }
314}
315
316#[derive(Clone, Debug)]
317enum RuntimeHolder {
318    Custom(Arc<IORuntime>),
319    Default(Arc<OnceLock<Runtime>>),
320}
321
322impl RuntimeHolder {
323    fn new(rt: Option<IORuntime>) -> Self {
324        if let Some(rt) = rt {
325            Self::Custom(Arc::new(rt))
326        } else {
327            Self::Default(Arc::new(OnceLock::new()))
328        }
329    }
330
331    fn get_handle(&self) -> Handle {
332        match self {
333            Self::Custom(rt) => rt.handle().clone(),
334            Self::Default(rt) => match Handle::try_current() {
335                Ok(handle) => handle,
336                Err(_) => rt
337                    .get_or_init(|| Runtime::new().expect("Failed to create tokio runtime"))
338                    .handle()
339                    .clone(),
340            },
341        }
342    }
343}
344
345/// A client to a speicific NameNode, NameService, or Viewfs mount table
346#[derive(Clone, Debug)]
347pub struct Client {
348    mount_table: Arc<MountTable>,
349    config: Arc<Configuration>,
350    // Store the runtime used for spawning all internal tasks. If we are not created
351    // inside a tokio runtime, we will create our own to use.
352    rt_holder: RuntimeHolder,
353}
354
355impl Client {
356    /// Creates a new HDFS Client. The URL must include the protocol and host, and optionally a port.
357    /// If a port is included, the host is treated as a single NameNode. If no port is included, the
358    /// host is treated as a name service that will be resolved using the HDFS config.
359    #[deprecated(since = "0.12.0", note = "Use ClientBuilder instead")]
360    pub fn new(url: &str) -> Result<Self> {
361        let parsed_url = Url::parse(url)?;
362        Self::build(&parsed_url, Configuration::new(None, None)?, None)
363    }
364
365    #[deprecated(since = "0.12.0", note = "Use ClientBuilder instead")]
366    pub fn new_with_config(url: &str, config: HashMap<String, String>) -> Result<Self> {
367        let parsed_url = Url::parse(url)?;
368        Self::build(&parsed_url, Configuration::new(None, Some(config))?, None)
369    }
370
371    #[deprecated(since = "0.12.0", note = "Use ClientBuilder instead")]
372    pub fn default_with_config(config: HashMap<String, String>) -> Result<Self> {
373        let config = Configuration::new(None, Some(config))?;
374        Self::build(&Self::default_fs(&config)?, config, None)
375    }
376
377    fn default_fs(config: &Configuration) -> Result<Url> {
378        let url = config
379            .get(config::DEFAULT_FS)
380            .ok_or(HdfsError::InvalidArgument(format!(
381                "No {} setting found",
382                config::DEFAULT_FS
383            )))?;
384        Ok(Url::parse(url)?)
385    }
386
387    fn build(url: &Url, config: Configuration, rt: Option<IORuntime>) -> Result<Self> {
388        let resolved_url = if !url.has_host() {
389            let default_url = Self::default_fs(&config)?;
390            if url.scheme() != default_url.scheme() || !default_url.has_host() {
391                return Err(HdfsError::InvalidArgument(
392                    "URL must contain a host".to_string(),
393                ));
394            }
395            default_url
396        } else {
397            url.clone()
398        };
399
400        let config = Arc::new(config);
401
402        let rt_holder = RuntimeHolder::new(rt);
403
404        let user_info = User::get_user_info();
405        let username = user_info
406            .effective_user
407            .as_deref()
408            .or(user_info.real_user.as_deref())
409            .expect("User info must include a username");
410        let home_dir = build_home_dir(
411            resolved_url.scheme(),
412            resolved_url.host_str(),
413            config.as_ref(),
414            username,
415        );
416
417        let mount_table = match url.scheme() {
418            "hdfs" => {
419                let proxy = NameServiceProxy::new(
420                    &resolved_url,
421                    Arc::clone(&config),
422                    rt_holder.get_handle(),
423                )?;
424                let protocol = Arc::new(NamenodeProtocol::new(proxy, rt_holder.get_handle()));
425
426                MountTable {
427                    mounts: Vec::new(),
428                    fallback: MountLink::new("/", "/", protocol),
429                    home_dir,
430                }
431            }
432            "viewfs" => Self::build_mount_table(
433                // Host is guaranteed to be present.
434                resolved_url.host_str().expect("URL must have a host"),
435                Arc::clone(&config),
436                rt_holder.get_handle(),
437                home_dir,
438            )?,
439            _ => {
440                return Err(HdfsError::InvalidArgument(
441                    "Only `hdfs` and `viewfs` schemes are supported".to_string(),
442                ));
443            }
444        };
445
446        Ok(Self {
447            mount_table: Arc::new(mount_table),
448            config,
449            rt_holder,
450        })
451    }
452
453    fn build_mount_table(
454        host: &str,
455        config: Arc<Configuration>,
456        handle: Handle,
457        home_dir: String,
458    ) -> Result<MountTable> {
459        let mut mounts: Vec<MountLink> = Vec::new();
460        let mut fallback: Option<MountLink> = None;
461
462        for (viewfs_path, hdfs_url) in config.get_mount_table(host).iter() {
463            let url = Url::parse(hdfs_url)?;
464            if !url.has_host() {
465                return Err(HdfsError::InvalidArgument(
466                    "URL must contain a host".to_string(),
467                ));
468            }
469            if url.scheme() != "hdfs" {
470                return Err(HdfsError::InvalidArgument(
471                    "Only hdfs mounts are supported for viewfs".to_string(),
472                ));
473            }
474            let proxy = NameServiceProxy::new(&url, Arc::clone(&config), handle.clone())?;
475            let protocol = Arc::new(NamenodeProtocol::new(proxy, handle.clone()));
476
477            if let Some(prefix) = viewfs_path {
478                mounts.push(MountLink::new(prefix, url.path(), protocol));
479            } else {
480                if fallback.is_some() {
481                    return Err(HdfsError::InvalidArgument(
482                        "Multiple viewfs fallback links found".to_string(),
483                    ));
484                }
485                fallback = Some(MountLink::new("/", url.path(), protocol));
486            }
487        }
488
489        if let Some(fallback) = fallback {
490            // Sort the mount table from longest viewfs path to shortest. This makes sure more specific paths are considered first.
491            mounts.sort_by_key(|m| m.viewfs_path.chars().filter(|c| *c == '/').count());
492            mounts.reverse();
493
494            Ok(MountTable {
495                mounts,
496                fallback,
497                home_dir,
498            })
499        } else {
500            Err(HdfsError::InvalidArgument(
501                "No viewfs fallback mount found".to_string(),
502            ))
503        }
504    }
505
506    /// Retrieve the file status for the file at `path`.
507    pub async fn get_file_info(&self, path: &str) -> Result<FileStatus> {
508        let (link, resolved_path) = self.mount_table.resolve(path);
509        match link.protocol.get_file_info(&resolved_path).await?.fs {
510            Some(status) => Ok(FileStatus::from(status, path)),
511            None => Err(HdfsError::FileNotFound(path.to_string())),
512        }
513    }
514
515    /// Retrives a list of all files in directories located at `path`. Wrapper around `list_status_iter` that
516    /// returns Err if any part of the stream fails, or Ok if all file statuses were found successfully.
517    pub async fn list_status(&self, path: &str, recursive: bool) -> Result<Vec<FileStatus>> {
518        let iter = self.list_status_iter(path, recursive);
519        let statuses = iter
520            .into_stream()
521            .collect::<Vec<Result<FileStatus>>>()
522            .await;
523
524        let mut resolved_statues = Vec::<FileStatus>::with_capacity(statuses.len());
525        for status in statuses.into_iter() {
526            resolved_statues.push(status?);
527        }
528
529        Ok(resolved_statues)
530    }
531
532    /// Retrives an iterator of all files in directories located at `path`.
533    pub fn list_status_iter(&self, path: &str, recursive: bool) -> ListStatusIterator {
534        ListStatusIterator::new(path.to_string(), Arc::clone(&self.mount_table), recursive)
535    }
536
537    /// Opens a file reader for the file at `path`. Path should not include a scheme.
538    pub async fn read(&self, path: &str) -> Result<FileReader> {
539        let (link, resolved_path) = self.mount_table.resolve(path);
540        // Get all block locations. Length is actually a signed value, but the proto uses an unsigned value
541        let located_info = link
542            .protocol
543            .get_block_locations(&resolved_path, 0, i64::MAX as u64)
544            .await?;
545
546        if let Some(locations) = located_info.locations {
547            let ec_schema = if let Some(ec_policy) = locations.ec_policy.as_ref() {
548                Some(resolve_ec_policy(ec_policy)?)
549            } else {
550                None
551            };
552
553            if locations.file_encryption_info.is_some() {
554                return Err(HdfsError::UnsupportedFeature("File encryption".to_string()));
555            }
556
557            Ok(FileReader::new(
558                Arc::clone(&link.protocol),
559                locations,
560                ec_schema,
561                Arc::clone(&self.config),
562                self.rt_holder.get_handle(),
563            ))
564        } else {
565            Err(HdfsError::FileNotFound(path.to_string()))
566        }
567    }
568
569    /// Opens a new file for writing. See [WriteOptions] for options and behavior for different
570    /// scenarios.
571    pub async fn create(
572        &self,
573        src: &str,
574        write_options: impl AsRef<WriteOptions>,
575    ) -> Result<FileWriter> {
576        let write_options = write_options.as_ref();
577
578        let (link, resolved_path) = self.mount_table.resolve(src);
579
580        let create_response = link
581            .protocol
582            .create(
583                &resolved_path,
584                write_options.permission,
585                write_options.overwrite,
586                write_options.create_parent,
587                write_options.replication,
588                write_options.block_size,
589            )
590            .await?;
591
592        match create_response.fs {
593            Some(status) => {
594                if status.file_encryption_info.is_some() {
595                    let _ = self.delete(src, false).await;
596                    return Err(HdfsError::UnsupportedFeature("File encryption".to_string()));
597                }
598
599                Ok(FileWriter::new(
600                    Arc::clone(&link.protocol),
601                    resolved_path,
602                    status,
603                    Arc::clone(&self.config),
604                    self.rt_holder.get_handle(),
605                    None,
606                ))
607            }
608            None => Err(HdfsError::FileNotFound(src.to_string())),
609        }
610    }
611
612    fn needs_new_block(class: &str, msg: &str) -> bool {
613        class == "java.lang.UnsupportedOperationException" && msg.contains("NEW_BLOCK")
614    }
615
616    /// Opens an existing file for appending. An Err will be returned if the file does not exist. If the
617    /// file is replicated, the current block will be appended to until it is full. If the file is erasure
618    /// coded, a new block will be created.
619    pub async fn append(&self, src: &str) -> Result<FileWriter> {
620        let (link, resolved_path) = self.mount_table.resolve(src);
621
622        // Assume the file is replicated and try to append to the current block. If the file is
623        // erasure coded, then try again by appending to a new block.
624        let append_response = match link.protocol.append(&resolved_path, false).await {
625            Err(HdfsError::RPCError(class, msg)) if Self::needs_new_block(&class, &msg) => {
626                link.protocol.append(&resolved_path, true).await?
627            }
628            resp => resp?,
629        };
630
631        match append_response.stat {
632            Some(status) => {
633                if status.file_encryption_info.is_some() {
634                    let _ = link
635                        .protocol
636                        .complete(src, append_response.block.map(|b| b.b), status.file_id)
637                        .await;
638                    return Err(HdfsError::UnsupportedFeature("File encryption".to_string()));
639                }
640
641                Ok(FileWriter::new(
642                    Arc::clone(&link.protocol),
643                    resolved_path,
644                    status,
645                    Arc::clone(&self.config),
646                    self.rt_holder.get_handle(),
647                    append_response.block,
648                ))
649            }
650            None => Err(HdfsError::FileNotFound(src.to_string())),
651        }
652    }
653
654    /// Create a new directory at `path` with the given `permission`.
655    ///
656    /// `permission` is the raw octal value representing the Unix style permission. For example, to
657    /// set 755 (`rwxr-x-rx`) permissions, use 0o755.
658    ///
659    /// If `create_parent` is true, any missing parent directories will be created as well,
660    /// otherwise an error will be returned if the parent directory doesn't already exist.
661    pub async fn mkdirs(&self, path: &str, permission: u32, create_parent: bool) -> Result<()> {
662        let (link, resolved_path) = self.mount_table.resolve(path);
663        link.protocol
664            .mkdirs(&resolved_path, permission, create_parent)
665            .await
666            .map(|_| ())
667    }
668
669    /// Renames `src` to `dst`. Returns Ok(()) on success, and Err otherwise.
670    pub async fn rename(&self, src: &str, dst: &str, overwrite: bool) -> Result<()> {
671        let (src_link, src_resolved_path) = self.mount_table.resolve(src);
672        let (dst_link, dst_resolved_path) = self.mount_table.resolve(dst);
673        if src_link.viewfs_path == dst_link.viewfs_path {
674            src_link
675                .protocol
676                .rename(&src_resolved_path, &dst_resolved_path, overwrite)
677                .await
678                .map(|_| ())
679        } else {
680            Err(HdfsError::InvalidArgument(
681                "Cannot rename across different name services".to_string(),
682            ))
683        }
684    }
685
686    /// Deletes the file or directory at `path`. If `recursive` is false and `path` is a non-empty
687    /// directory, this will fail. Returns `Ok(true)` if it was successfully deleted.
688    pub async fn delete(&self, path: &str, recursive: bool) -> Result<bool> {
689        let (link, resolved_path) = self.mount_table.resolve(path);
690        link.protocol
691            .delete(&resolved_path, recursive)
692            .await
693            .map(|r| r.result)
694    }
695
696    /// Sets the modified and access times for a file. Times should be in milliseconds from the epoch.
697    pub async fn set_times(&self, path: &str, mtime: u64, atime: u64) -> Result<()> {
698        let (link, resolved_path) = self.mount_table.resolve(path);
699        link.protocol
700            .set_times(&resolved_path, mtime, atime)
701            .await?;
702        Ok(())
703    }
704
705    /// Optionally sets the owner and group for a file.
706    pub async fn set_owner(
707        &self,
708        path: &str,
709        owner: Option<&str>,
710        group: Option<&str>,
711    ) -> Result<()> {
712        let (link, resolved_path) = self.mount_table.resolve(path);
713        link.protocol
714            .set_owner(&resolved_path, owner, group)
715            .await?;
716        Ok(())
717    }
718
719    /// Sets permissions for a file. Permission should be an octal number reprenting the Unix style
720    /// permission.
721    ///
722    /// For example, to set permissions to rwxr-xr-x:
723    /// ```rust
724    /// # async fn func() {
725    /// # let client = hdfs_native::Client::new("localhost:9000").unwrap();
726    /// client.set_permission("/path", 0o755).await.unwrap();
727    /// }
728    /// ```
729    pub async fn set_permission(&self, path: &str, permission: u32) -> Result<()> {
730        let (link, resolved_path) = self.mount_table.resolve(path);
731        link.protocol
732            .set_permission(&resolved_path, permission)
733            .await?;
734        Ok(())
735    }
736
737    /// Sets the replication for a file.
738    pub async fn set_replication(&self, path: &str, replication: u32) -> Result<bool> {
739        let (link, resolved_path) = self.mount_table.resolve(path);
740        let result = link
741            .protocol
742            .set_replication(&resolved_path, replication)
743            .await?
744            .result;
745
746        Ok(result)
747    }
748
749    /// Gets a content summary for a file or directory rooted at `path`.
750    pub async fn get_content_summary(&self, path: &str) -> Result<ContentSummary> {
751        let (link, resolved_path) = self.mount_table.resolve(path);
752        let result = link
753            .protocol
754            .get_content_summary(&resolved_path)
755            .await?
756            .summary;
757
758        Ok(result.into())
759    }
760
761    /// Update ACL entries for file or directory at `path`. Existing entries will remain.
762    pub async fn modify_acl_entries(&self, path: &str, acl_spec: Vec<AclEntry>) -> Result<()> {
763        let (link, resolved_path) = self.mount_table.resolve(path);
764        link.protocol
765            .modify_acl_entries(&resolved_path, acl_spec)
766            .await?;
767
768        Ok(())
769    }
770
771    /// Remove specific ACL entries for file or directory at `path`.
772    pub async fn remove_acl_entries(&self, path: &str, acl_spec: Vec<AclEntry>) -> Result<()> {
773        let (link, resolved_path) = self.mount_table.resolve(path);
774        link.protocol
775            .remove_acl_entries(&resolved_path, acl_spec)
776            .await?;
777
778        Ok(())
779    }
780
781    /// Remove all default ACLs for file or directory at `path`.
782    pub async fn remove_default_acl(&self, path: &str) -> Result<()> {
783        let (link, resolved_path) = self.mount_table.resolve(path);
784        link.protocol.remove_default_acl(&resolved_path).await?;
785
786        Ok(())
787    }
788
789    /// Remove all ACL entries for file or directory at `path`.
790    pub async fn remove_acl(&self, path: &str) -> Result<()> {
791        let (link, resolved_path) = self.mount_table.resolve(path);
792        link.protocol.remove_acl(&resolved_path).await?;
793
794        Ok(())
795    }
796
797    /// Override all ACL entries for file or directory at `path`. If only access ACLs are provided,
798    /// default ACLs are maintained. Likewise if only default ACLs are provided, access ACLs are
799    /// maintained.
800    pub async fn set_acl(&self, path: &str, acl_spec: Vec<AclEntry>) -> Result<()> {
801        let (link, resolved_path) = self.mount_table.resolve(path);
802        link.protocol.set_acl(&resolved_path, acl_spec).await?;
803
804        Ok(())
805    }
806
807    /// Get the ACL status for the file or directory at `path`.
808    pub async fn get_acl_status(&self, path: &str) -> Result<AclStatus> {
809        let (link, resolved_path) = self.mount_table.resolve(path);
810        Ok(link
811            .protocol
812            .get_acl_status(&resolved_path)
813            .await?
814            .result
815            .into())
816    }
817
818    /// Get all file statuses matching the glob `pattern`. Supports Hadoop-style globbing
819    /// which only applies to individual components of a path.
820    pub async fn glob_status(&self, pattern: &str) -> Result<Vec<FileStatus>> {
821        // Expand any brace groups first
822        let flattened = expand_glob(pattern.to_string())?;
823
824        let mut results: Vec<FileStatus> = Vec::new();
825
826        for flat in flattened.into_iter() {
827            // Make the pattern absolute-ish. We keep the pattern as-is; components
828            // will be split on '/'. An empty pattern yields no results.
829            if flat.is_empty() {
830                continue;
831            }
832
833            let components = get_path_components(&flat);
834
835            // Candidate holds a path (fully built so far) and optionally a resolved FileStatus
836            #[derive(Clone, Debug)]
837            struct Candidate {
838                path: String,
839                status: Option<FileStatus>,
840            }
841
842            // Start from the root placeholder
843            let mut candidates: Vec<Candidate> = vec![Candidate {
844                path: "/".to_string(),
845                status: None,
846            }];
847
848            for (idx, comp) in components.iter().enumerate() {
849                if candidates.is_empty() {
850                    break;
851                }
852
853                let is_last = idx == components.len() - 1;
854
855                let unescaped = unescape_component(comp);
856                let glob_pat = GlobPattern::new(comp)?;
857
858                if !is_last && !glob_pat.has_wildcard() {
859                    // Optimization: just append the literal component to each candidate
860                    for cand in candidates.iter_mut() {
861                        if !cand.path.ends_with('/') {
862                            cand.path.push('/');
863                        }
864                        cand.path.push_str(&unescaped);
865                        // keep status as None (we'll resolve later if needed)
866                    }
867                    continue;
868                }
869
870                let mut new_candidates: Vec<Candidate> = Vec::new();
871
872                for cand in candidates.into_iter() {
873                    if glob_pat.has_wildcard() {
874                        // List the directory represented by cand.path
875                        let listing = match self.list_status(&cand.path, false).await {
876                            Ok(listing) => listing,
877                            Err(HdfsError::FileNotFound(_)) => continue,
878                            Err(e) => return Err(e),
879                        };
880                        if listing.len() == 1 && listing[0].path == cand.path {
881                            // listing corresponds to the candidate itself (file), skip
882                            continue;
883                        }
884
885                        for child in listing.into_iter() {
886                            // If this is not the terminal component, only recurse into directories
887                            if !is_last && !child.isdir {
888                                continue;
889                            }
890
891                            // child.path already contains the full path
892                            // Extract the name portion to match against the glob pattern
893                            let name = child
894                                .path
895                                .rsplit_once('/')
896                                .map(|(_, n)| n)
897                                .unwrap_or(child.path.as_str());
898
899                            if glob_pat.matches(name) {
900                                new_candidates.push(Candidate {
901                                    path: child.path.clone(),
902                                    status: Some(child),
903                                });
904                            }
905                        }
906                    } else {
907                        // Non-glob component: use get_file_info for exact path
908                        let mut next_path = cand.path.clone();
909                        if !next_path.ends_with('/') {
910                            next_path.push('/');
911                        }
912                        next_path.push_str(&unescaped);
913
914                        match self.get_file_info(&next_path).await {
915                            Ok(status) => {
916                                if is_last || status.isdir {
917                                    new_candidates.push(Candidate {
918                                        path: status.path.clone(),
919                                        status: Some(status),
920                                    });
921                                }
922                            }
923                            Err(HdfsError::FileNotFound(_)) => continue,
924                            Err(e) => return Err(e),
925                        }
926                    }
927                }
928
929                candidates = new_candidates;
930            }
931
932            // Resolve any placeholder candidates (including root) and collect results
933            for cand in candidates.into_iter() {
934                let status = if let Some(s) = cand.status {
935                    s
936                } else {
937                    // Try to resolve the path to a real FileStatus
938                    match self.get_file_info(&cand.path).await {
939                        Ok(s) => s,
940                        Err(HdfsError::FileNotFound(_)) => continue,
941                        Err(e) => return Err(e),
942                    }
943                };
944
945                results.push(status);
946            }
947        }
948
949        Ok(results)
950    }
951}
952
953impl Default for Client {
954    /// Creates a new HDFS Client based on the fs.defaultFS setting. Panics if the config files fail to load,
955    /// no defaultFS is defined, or the defaultFS is invalid.
956    fn default() -> Self {
957        ClientBuilder::new()
958            .build()
959            .expect("Failed to create default client")
960    }
961}
962
963pub(crate) struct DirListingIterator {
964    path: String,
965    resolved_path: String,
966    link: MountLink,
967    files_only: bool,
968    partial_listing: VecDeque<HdfsFileStatusProto>,
969    remaining: u32,
970    last_seen: Vec<u8>,
971}
972
973impl DirListingIterator {
974    fn new(path: String, mount_table: &Arc<MountTable>, files_only: bool) -> Self {
975        let (link, resolved_path) = mount_table.resolve(&path);
976
977        DirListingIterator {
978            path,
979            resolved_path,
980            link: link.clone(),
981            files_only,
982            partial_listing: VecDeque::new(),
983            remaining: 1,
984            last_seen: Vec::new(),
985        }
986    }
987
988    async fn get_next_batch(&mut self) -> Result<bool> {
989        let listing = self
990            .link
991            .protocol
992            .get_listing(&self.resolved_path, self.last_seen.clone(), false)
993            .await?;
994
995        if let Some(dir_list) = listing.dir_list {
996            self.last_seen = dir_list
997                .partial_listing
998                .last()
999                .map(|p| p.path.clone())
1000                .unwrap_or(Vec::new());
1001
1002            self.remaining = dir_list.remaining_entries;
1003
1004            self.partial_listing = dir_list
1005                .partial_listing
1006                .into_iter()
1007                .filter(|s| !self.files_only || s.file_type() != FileType::IsDir)
1008                .collect();
1009            Ok(!self.partial_listing.is_empty())
1010        } else {
1011            Err(HdfsError::FileNotFound(self.path.clone()))
1012        }
1013    }
1014
1015    pub async fn next(&mut self) -> Option<Result<FileStatus>> {
1016        if self.partial_listing.is_empty()
1017            && self.remaining > 0
1018            && let Err(error) = self.get_next_batch().await
1019        {
1020            self.remaining = 0;
1021            return Some(Err(error));
1022        }
1023        if let Some(next) = self.partial_listing.pop_front() {
1024            Some(Ok(FileStatus::from(next, &self.path)))
1025        } else {
1026            None
1027        }
1028    }
1029}
1030
1031pub struct ListStatusIterator {
1032    mount_table: Arc<MountTable>,
1033    recursive: bool,
1034    iters: Arc<tokio::sync::Mutex<Vec<DirListingIterator>>>,
1035}
1036
1037impl ListStatusIterator {
1038    fn new(path: String, mount_table: Arc<MountTable>, recursive: bool) -> Self {
1039        let initial = DirListingIterator::new(path.clone(), &mount_table, false);
1040
1041        ListStatusIterator {
1042            mount_table,
1043            recursive,
1044            iters: Arc::new(tokio::sync::Mutex::new(vec![initial])),
1045        }
1046    }
1047
1048    pub async fn next(&self) -> Option<Result<FileStatus>> {
1049        let mut next_file: Option<Result<FileStatus>> = None;
1050        let mut iters = self.iters.lock().await;
1051        while next_file.is_none() {
1052            if let Some(iter) = iters.last_mut() {
1053                if let Some(file_result) = iter.next().await {
1054                    if let Ok(file) = file_result {
1055                        // Return the directory as the next result, but start traversing into that directory
1056                        // next if we're doing a recursive listing
1057                        if file.isdir && self.recursive {
1058                            iters.push(DirListingIterator::new(
1059                                file.path.clone(),
1060                                &self.mount_table,
1061                                false,
1062                            ))
1063                        }
1064                        next_file = Some(Ok(file));
1065                    } else {
1066                        // Error, return that as the next element
1067                        next_file = Some(file_result)
1068                    }
1069                } else {
1070                    // We've exhausted this directory
1071                    iters.pop();
1072                }
1073            } else {
1074                // There's nothing left, just return None
1075                break;
1076            }
1077        }
1078
1079        next_file
1080    }
1081
1082    pub fn into_stream(self) -> BoxStream<'static, Result<FileStatus>> {
1083        let listing = stream::unfold(self, |state| async move {
1084            let next = state.next().await;
1085            next.map(|n| (n, state))
1086        });
1087        Box::pin(listing)
1088    }
1089}
1090
1091#[derive(Debug, Clone)]
1092pub struct FileStatus {
1093    pub path: String,
1094    pub length: usize,
1095    pub isdir: bool,
1096    pub permission: u16,
1097    pub owner: String,
1098    pub group: String,
1099    pub modification_time: u64,
1100    pub access_time: u64,
1101    pub replication: Option<u32>,
1102    pub blocksize: Option<u64>,
1103}
1104
1105impl FileStatus {
1106    fn from(value: HdfsFileStatusProto, base_path: &str) -> Self {
1107        let mut path = base_path.trim_end_matches("/").to_string();
1108        let relative_path = std::str::from_utf8(&value.path).unwrap();
1109        if !relative_path.is_empty() {
1110            path.push('/');
1111            path.push_str(relative_path);
1112        }
1113
1114        // Root path should be a slash
1115        if path.is_empty() {
1116            path.push('/');
1117        }
1118
1119        FileStatus {
1120            isdir: value.file_type() == FileType::IsDir,
1121            path,
1122            length: value.length as usize,
1123            permission: value.permission.perm as u16,
1124            owner: value.owner,
1125            group: value.group,
1126            modification_time: value.modification_time,
1127            access_time: value.access_time,
1128            replication: value.block_replication,
1129            blocksize: value.blocksize,
1130        }
1131    }
1132}
1133
1134#[derive(Debug)]
1135pub struct ContentSummary {
1136    pub length: u64,
1137    pub file_count: u64,
1138    pub directory_count: u64,
1139    pub quota: u64,
1140    pub space_consumed: u64,
1141    pub space_quota: u64,
1142}
1143
1144impl From<ContentSummaryProto> for ContentSummary {
1145    fn from(value: ContentSummaryProto) -> Self {
1146        ContentSummary {
1147            length: value.length,
1148            file_count: value.file_count,
1149            directory_count: value.directory_count,
1150            quota: value.quota,
1151            space_consumed: value.space_consumed,
1152            space_quota: value.space_quota,
1153        }
1154    }
1155}
1156
1157#[cfg(test)]
1158mod test {
1159    use std::sync::{Arc, LazyLock};
1160
1161    use tokio::runtime::Runtime;
1162    use url::Url;
1163
1164    use crate::{
1165        client::ClientBuilder,
1166        common::config::Configuration,
1167        hdfs::{protocol::NamenodeProtocol, proxy::NameServiceProxy},
1168    };
1169
1170    use super::{MountLink, MountTable};
1171
1172    static RT: LazyLock<Runtime> = LazyLock::new(|| Runtime::new().unwrap());
1173
1174    fn create_protocol(url: &str) -> Arc<NamenodeProtocol> {
1175        let proxy = NameServiceProxy::new(
1176            &Url::parse(url).unwrap(),
1177            Arc::new(Configuration::new(None, None).unwrap()),
1178            RT.handle().clone(),
1179        )
1180        .unwrap();
1181        Arc::new(NamenodeProtocol::new(proxy, RT.handle().clone()))
1182    }
1183
1184    #[test]
1185    fn test_default_fs() {
1186        assert!(
1187            ClientBuilder::new()
1188                .with_config(vec![("fs.defaultFS", "hdfs://test:9000")])
1189                .build()
1190                .is_ok()
1191        );
1192
1193        assert!(
1194            ClientBuilder::new()
1195                .with_config(vec![("fs.defaultFS", "hdfs://")])
1196                .build()
1197                .is_err()
1198        );
1199
1200        assert!(
1201            ClientBuilder::new()
1202                .with_url("hdfs://")
1203                .with_config(vec![("fs.defaultFS", "hdfs://test:9000")])
1204                .build()
1205                .is_ok()
1206        );
1207
1208        assert!(
1209            ClientBuilder::new()
1210                .with_url("hdfs://")
1211                .with_config(vec![("fs.defaultFS", "hdfs://")])
1212                .build()
1213                .is_err()
1214        );
1215
1216        assert!(
1217            ClientBuilder::new()
1218                .with_url("hdfs://")
1219                .with_config(vec![("fs.defaultFS", "viewfs://test")])
1220                .build()
1221                .is_err()
1222        );
1223    }
1224
1225    #[test]
1226    fn test_mount_link_resolve() {
1227        let protocol = create_protocol("hdfs://127.0.0.1:9000");
1228        let link = MountLink::new("/view", "/hdfs", protocol);
1229
1230        assert_eq!(link.resolve("/view/dir/file").unwrap(), "/hdfs/dir/file");
1231        assert_eq!(link.resolve("/view").unwrap(), "/hdfs");
1232        assert!(link.resolve("/hdfs/path").is_none());
1233    }
1234
1235    #[test]
1236    fn test_fallback_link() {
1237        let protocol = create_protocol("hdfs://127.0.0.1:9000");
1238        let link = MountLink::new("", "/hdfs", Arc::clone(&protocol));
1239
1240        assert_eq!(link.resolve("/path/to/file").unwrap(), "/hdfs/path/to/file");
1241        assert_eq!(link.resolve("/").unwrap(), "/hdfs/");
1242        assert_eq!(link.resolve("/hdfs/path").unwrap(), "/hdfs/hdfs/path");
1243
1244        let link = MountLink::new("", "", protocol);
1245        assert_eq!(link.resolve("/").unwrap(), "/");
1246    }
1247
1248    #[test]
1249    fn test_mount_table_resolve() {
1250        let link1 = MountLink::new(
1251            "/mount1",
1252            "/path1/nested",
1253            create_protocol("hdfs://127.0.0.1:9000"),
1254        );
1255        let link2 = MountLink::new(
1256            "/mount2",
1257            "/path2",
1258            create_protocol("hdfs://127.0.0.1:9001"),
1259        );
1260        let link3 = MountLink::new(
1261            "/mount3/nested",
1262            "/path3",
1263            create_protocol("hdfs://127.0.0.1:9002"),
1264        );
1265        let fallback = MountLink::new("/", "/path4", create_protocol("hdfs://127.0.0.1:9003"));
1266
1267        let mount_table = MountTable {
1268            mounts: vec![link1, link2, link3],
1269            fallback,
1270            home_dir: "/user/test".to_string(),
1271        };
1272
1273        // Exact mount path resolves to the exact HDFS path
1274        let (link, resolved) = mount_table.resolve("/mount1");
1275        assert_eq!(link.viewfs_path, "/mount1");
1276        assert_eq!(resolved, "/path1/nested");
1277
1278        // Trailing slash is treated the same
1279        let (link, resolved) = mount_table.resolve("/mount1/");
1280        assert_eq!(link.viewfs_path, "/mount1");
1281        assert_eq!(resolved, "/path1/nested/");
1282
1283        // Doesn't do partial matches on a directory name
1284        let (link, resolved) = mount_table.resolve("/mount12");
1285        assert_eq!(link.viewfs_path, "");
1286        assert_eq!(resolved, "/path4/mount12");
1287
1288        let (link, resolved) = mount_table.resolve("/mount3/file");
1289        assert_eq!(link.viewfs_path, "");
1290        assert_eq!(resolved, "/path4/mount3/file");
1291
1292        let (link, resolved) = mount_table.resolve("/mount3/nested/file");
1293        assert_eq!(link.viewfs_path, "/mount3/nested");
1294        assert_eq!(resolved, "/path3/file");
1295
1296        let (link, resolved) = mount_table.resolve("file");
1297        assert_eq!(link.viewfs_path, "");
1298        assert_eq!(resolved, "/path4/user/test/file");
1299
1300        let (link, resolved) = mount_table.resolve("dir/subdir");
1301        assert_eq!(link.viewfs_path, "");
1302        assert_eq!(resolved, "/path4/user/test/dir/subdir");
1303
1304        let mount_table = MountTable {
1305            mounts: vec![
1306                MountLink::new(
1307                    "/mount1",
1308                    "/path1/nested",
1309                    create_protocol("hdfs://127.0.0.1:9000"),
1310                ),
1311                MountLink::new(
1312                    "/mount2",
1313                    "/path2",
1314                    create_protocol("hdfs://127.0.0.1:9001"),
1315                ),
1316            ],
1317            fallback: MountLink::new("/", "/path4", create_protocol("hdfs://127.0.0.1:9003")),
1318            home_dir: "/mount1/user".to_string(),
1319        };
1320
1321        let (link, resolved) = mount_table.resolve("file");
1322        assert_eq!(link.viewfs_path, "/mount1");
1323        assert_eq!(resolved, "/path1/nested/user/file");
1324
1325        let (link, resolved) = mount_table.resolve("dir/subdir");
1326        assert_eq!(link.viewfs_path, "/mount1");
1327        assert_eq!(resolved, "/path1/nested/user/dir/subdir");
1328    }
1329
1330    #[test]
1331    fn test_io_runtime() {
1332        assert!(
1333            ClientBuilder::new()
1334                .with_url("hdfs://127.0.0.1:9000")
1335                .with_io_runtime(Runtime::new().unwrap())
1336                .build()
1337                .is_ok()
1338        );
1339
1340        let rt = Runtime::new().unwrap();
1341        assert!(
1342            ClientBuilder::new()
1343                .with_url("hdfs://127.0.0.1:9000")
1344                .with_io_runtime(rt.handle().clone())
1345                .build()
1346                .is_ok()
1347        );
1348    }
1349
1350    #[test]
1351    fn test_set_conf_dir() {
1352        assert!(
1353            ClientBuilder::new()
1354                .with_url("hdfs://127.0.0.1:9000")
1355                .with_config_dir("target/test")
1356                .build()
1357                .is_ok()
1358        )
1359    }
1360}