Skip to main content

hdfs_native/
client.rs

1use std::collections::{HashMap, VecDeque};
2use std::sync::{Arc, OnceLock};
3use std::time::{SystemTime, UNIX_EPOCH};
4
5use futures::stream::BoxStream;
6use futures::{StreamExt, stream};
7use tokio::runtime::{Handle, Runtime};
8use url::Url;
9
10use crate::acl::{AclEntry, AclStatus};
11use crate::common::config::{self, Configuration};
12use crate::ec::resolve_ec_policy;
13use crate::error::{HdfsError, Result};
14use crate::file::{FileReader, FileWriter};
15use crate::hdfs::protocol::NamenodeProtocol;
16use crate::hdfs::proxy::NameServiceProxy;
17use crate::proto::hdfs::hdfs_file_status_proto::FileType;
18use crate::security::user::User;
19
20use crate::glob::{GlobPattern, expand_glob, get_path_components, unescape_component};
21use crate::proto::hdfs::{ContentSummaryProto, HdfsFileStatusProto};
22
23const TRASH_ROOT_DIR: &str = ".Trash";
24const TRASH_CURRENT_DIR: &str = "Current";
25const TRASH_DIR_PERMISSION: u32 = 0o700;
26
27#[derive(Clone)]
28pub struct WriteOptions {
29    /// Block size. Default is retrieved from the server.
30    pub block_size: Option<u64>,
31    /// Replication factor. Default is retrieved from the server.
32    pub replication: Option<u32>,
33    /// Unix file permission, defaults to 0o644, which is "rw-r--r--" as a Unix permission.
34    /// This is the raw octal value represented in base 10.
35    pub permission: u32,
36    /// Whether to overwrite the file, defaults to false. If true and the
37    /// file does not exist, it will result in an error.
38    pub overwrite: bool,
39    /// Whether to create any missing parent directories, defaults to true. If false
40    /// and the parent directory does not exist, an error will be returned.
41    pub create_parent: bool,
42}
43
44impl Default for WriteOptions {
45    fn default() -> Self {
46        Self {
47            block_size: None,
48            replication: None,
49            permission: 0o644,
50            overwrite: false,
51            create_parent: true,
52        }
53    }
54}
55
56impl AsRef<WriteOptions> for WriteOptions {
57    fn as_ref(&self) -> &WriteOptions {
58        self
59    }
60}
61
62impl WriteOptions {
63    /// Set the block_size for the new file
64    pub fn block_size(mut self, block_size: u64) -> Self {
65        self.block_size = Some(block_size);
66        self
67    }
68
69    /// Set the replication for the new file
70    pub fn replication(mut self, replication: u32) -> Self {
71        self.replication = Some(replication);
72        self
73    }
74
75    /// Set the raw octal permission value for the new file
76    pub fn permission(mut self, permission: u32) -> Self {
77        self.permission = permission;
78        self
79    }
80
81    /// Set whether to overwrite an existing file
82    pub fn overwrite(mut self, overwrite: bool) -> Self {
83        self.overwrite = overwrite;
84        self
85    }
86
87    /// Set whether to create all missing parent directories
88    pub fn create_parent(mut self, create_parent: bool) -> Self {
89        self.create_parent = create_parent;
90        self
91    }
92}
93
94#[derive(Debug, Clone)]
95struct MountLink {
96    viewfs_path: String,
97    hdfs_path: String,
98    protocol: Arc<NamenodeProtocol>,
99}
100
101impl MountLink {
102    fn new(viewfs_path: &str, hdfs_path: &str, protocol: Arc<NamenodeProtocol>) -> Self {
103        // We should never have an empty path, we always want things mounted at root ("/") by default.
104        Self {
105            viewfs_path: viewfs_path.trim_end_matches("/").to_string(),
106            hdfs_path: hdfs_path.trim_end_matches("/").to_string(),
107            protocol,
108        }
109    }
110    /// Convert a viewfs path into a name service path if it matches this link
111    fn resolve(&self, path: &str) -> Option<String> {
112        // Make sure we don't partially match the last component. It either needs to be an exact
113        // match to a viewfs path, or needs to match with a trailing slash
114        if path == self.viewfs_path {
115            Some(self.hdfs_path.clone())
116        } else {
117            path.strip_prefix(&format!("{}/", self.viewfs_path))
118                .map(|relative_path| format!("{}/{}", &self.hdfs_path, relative_path))
119        }
120    }
121}
122
123#[derive(Debug)]
124struct MountTable {
125    mounts: Vec<MountLink>,
126    fallback: MountLink,
127    home_dir: String,
128}
129
130impl MountTable {
131    fn resolve(&self, src: &str) -> (&MountLink, String) {
132        let path = if src.starts_with('/') {
133            src.to_string()
134        } else {
135            format!("{}/{}", self.home_dir, src)
136        };
137
138        for link in self.mounts.iter() {
139            if let Some(resolved) = link.resolve(&path) {
140                return (link, resolved);
141            }
142        }
143        (&self.fallback, self.fallback.resolve(&path).unwrap())
144    }
145}
146
147fn build_home_dir(
148    scheme: &str,
149    host: Option<&str>,
150    config: &Configuration,
151    username: &str,
152) -> String {
153    let prefix = match scheme {
154        "hdfs" => config.get("dfs.user.home.dir.prefix"),
155        "viewfs" => {
156            host.and_then(|host| config.get(&format!("fs.viewfs.mounttable.{host}.homedir")))
157        }
158        _ => None,
159    }
160    .unwrap_or("/user");
161
162    let prefix = prefix.trim_end_matches('/');
163    if prefix.is_empty() {
164        format!("/{username}")
165    } else {
166        format!("{prefix}/{username}")
167    }
168}
169
170/// Holds either a [Runtime] or a [Handle] to an existing runtime for IO tasks
171#[derive(Debug)]
172pub enum IORuntime {
173    Runtime(Runtime),
174    Handle(Handle),
175}
176
177impl From<Runtime> for IORuntime {
178    fn from(value: Runtime) -> Self {
179        Self::Runtime(value)
180    }
181}
182
183impl From<Handle> for IORuntime {
184    fn from(value: Handle) -> Self {
185        Self::Handle(value)
186    }
187}
188
189impl IORuntime {
190    fn handle(&self) -> Handle {
191        match self {
192            Self::Runtime(runtime) => runtime.handle().clone(),
193            Self::Handle(handle) => handle.clone(),
194        }
195    }
196}
197
198/// Builds a new [Client] instance. Configs will be loaded with the following precedence:
199///
200/// - If method `ClientBuilder::with_config_dir` is invoked, configs will be loaded from `${config_dir}/{core,hdfs}-site.xml`
201/// - If the `HADOOP_CONF_DIR` environment variable is defined, configs will be loaded from `${HADOOP_CONF_DIR}/{core,hdfs}-site.xml`
202/// - If the `HADOOP_HOME` environment variable is defined, configs will be loaded from `${HADOOP_HOME}/etc/hadoop/{core,hdfs}-site.xml`
203/// - Otherwise no configs are defined
204///
205/// Finally, configs set by `with_config` will override the configs loaded above.
206///
207/// If no URL is defined, the `fs.defaultFS` config must be defined and is used as the URL.
208///
209/// # Examples
210///
211/// Create a new client with given config directory
212///
213/// ```rust,no_run
214/// # use hdfs_native::ClientBuilder;
215/// let client = ClientBuilder::new()
216///     .with_config_dir("/opt/hadoop/etc/hadoop")
217///     .build()
218///     .unwrap();
219/// ```
220///
221/// Create a new client with the environment variable
222///
223/// ```rust,no_run
224/// # use hdfs_native::ClientBuilder;
225/// unsafe { std::env::set_var("HADOOP_CONF_DIR", "/opt/hadoop/etc/hadoop") };
226/// let client = ClientBuilder::new()
227///     .build()
228///     .unwrap();
229/// ```
230///
231/// Create a new client using the fs.defaultFS config
232///
233/// ```rust
234/// # use hdfs_native::ClientBuilder;
235/// let client = ClientBuilder::new()
236///     .with_config(vec![("fs.defaultFS", "hdfs://127.0.0.1:9000")])
237///     .build()
238///     .unwrap();
239/// ```
240///
241/// Create a new client connecting to a specific URL:
242///
243/// ```rust
244/// # use hdfs_native::ClientBuilder;
245/// let client = ClientBuilder::new()
246///     .with_url("hdfs://127.0.0.1:9000")
247///     .build()
248///     .unwrap();
249/// ```
250///
251/// Create a new client using a dedicated tokio runtime for spawned tasks and IO operations
252///
253/// ```rust
254/// # use hdfs_native::ClientBuilder;
255/// let client = ClientBuilder::new()
256///     .with_url("hdfs://127.0.0.1:9000")
257///     .with_io_runtime(tokio::runtime::Runtime::new().unwrap())
258///     .build()
259///     .unwrap();
260/// ```
261#[derive(Default)]
262pub struct ClientBuilder {
263    url: Option<String>,
264    config: Option<HashMap<String, String>>,
265    config_dir: Option<String>,
266    runtime: Option<IORuntime>,
267    user: Option<String>,
268}
269
270impl ClientBuilder {
271    /// Create a new [ClientBuilder]
272    pub fn new() -> Self {
273        Self::default()
274    }
275
276    /// Set the URL to connect to. Can be the address of a single NameNode, or a logical NameService
277    pub fn with_url(mut self, url: impl Into<String>) -> Self {
278        self.url = Some(url.into());
279        self
280    }
281
282    /// Set configs to use for the client. The provided configs will override any found in the config files loaded
283    pub fn with_config(
284        mut self,
285        config: impl IntoIterator<Item = (impl Into<String>, impl Into<String>)>,
286    ) -> Self {
287        self.config = Some(
288            config
289                .into_iter()
290                .map(|(k, v)| (k.into(), v.into()))
291                .collect(),
292        );
293        self
294    }
295
296    /// Set the configration directory path to read from. The provided path will override the one provided by environment variable.
297    pub fn with_config_dir(mut self, config_dir: impl Into<String>) -> Self {
298        self.config_dir = Some(config_dir.into());
299        self
300    }
301
302    /// Use a dedicated tokio runtime for spawned tasks and IO operations. Can either take ownership of a whole [Runtime]
303    /// or take a [Handle] to an externally owned runtime.
304    pub fn with_io_runtime(mut self, runtime: impl Into<IORuntime>) -> Self {
305        self.runtime = Some(runtime.into());
306        self
307    }
308
309    /// Set the effective user for the client. If not set, the client will detect user from environment variables `HADOOP_USER_NAME` or `HADOOP_PROXY_USER`.
310    pub fn with_user(mut self, user: impl Into<String>) -> Self {
311        self.user = Some(user.into());
312        self
313    }
314
315    /// Create the [Client] instance from the provided settings
316    pub fn build(self) -> Result<Client> {
317        let config = Configuration::new(self.config_dir, self.config)?;
318        let url = if let Some(url) = self.url {
319            Url::parse(&url)?
320        } else {
321            Client::default_fs(&config)?
322        };
323
324        Client::build(&url, config, self.runtime, self.user)
325    }
326}
327
328#[derive(Clone, Debug)]
329enum RuntimeHolder {
330    Custom(Arc<IORuntime>),
331    Default(Arc<OnceLock<Runtime>>),
332}
333
334impl RuntimeHolder {
335    fn new(rt: Option<IORuntime>) -> Self {
336        if let Some(rt) = rt {
337            Self::Custom(Arc::new(rt))
338        } else {
339            Self::Default(Arc::new(OnceLock::new()))
340        }
341    }
342
343    fn get_handle(&self) -> Handle {
344        match self {
345            Self::Custom(rt) => rt.handle().clone(),
346            Self::Default(rt) => match Handle::try_current() {
347                Ok(handle) => handle,
348                Err(_) => rt
349                    .get_or_init(|| Runtime::new().expect("Failed to create tokio runtime"))
350                    .handle()
351                    .clone(),
352            },
353        }
354    }
355}
356
357/// A client to a speicific NameNode, NameService, or Viewfs mount table
358#[derive(Clone, Debug)]
359pub struct Client {
360    mount_table: Arc<MountTable>,
361    config: Arc<Configuration>,
362    // Store the runtime used for spawning all internal tasks. If we are not created
363    // inside a tokio runtime, we will create our own to use.
364    rt_holder: RuntimeHolder,
365}
366
367impl Client {
368    fn default_fs(config: &Configuration) -> Result<Url> {
369        let url = config
370            .get(config::DEFAULT_FS)
371            .ok_or(HdfsError::InvalidArgument(format!(
372                "No {} setting found",
373                config::DEFAULT_FS
374            )))?;
375        Ok(Url::parse(url)?)
376    }
377
378    fn build(
379        url: &Url,
380        config: Configuration,
381        rt: Option<IORuntime>,
382        user: Option<String>,
383    ) -> Result<Self> {
384        let resolved_url = if !url.has_host() {
385            let default_url = Self::default_fs(&config)?;
386            if url.scheme() != default_url.scheme() || !default_url.has_host() {
387                return Err(HdfsError::InvalidArgument(
388                    "URL must contain a host".to_string(),
389                ));
390            }
391            default_url
392        } else {
393            url.clone()
394        };
395
396        let config = Arc::new(config);
397
398        let rt_holder = RuntimeHolder::new(rt);
399
400        let user_info = User::get_user_info(user.clone(), config.security_enabled());
401        let username = user_info
402            .effective_user
403            .as_deref()
404            .or(user_info.real_user.as_deref())
405            .expect("User info must include a username");
406        let home_dir = build_home_dir(
407            resolved_url.scheme(),
408            resolved_url.host_str(),
409            config.as_ref(),
410            username,
411        );
412
413        let mount_table = match url.scheme() {
414            "hdfs" => {
415                let proxy = NameServiceProxy::new(
416                    &resolved_url,
417                    Arc::clone(&config),
418                    rt_holder.get_handle(),
419                    user.clone(),
420                )?;
421                let protocol = Arc::new(NamenodeProtocol::new(proxy, rt_holder.get_handle()));
422
423                MountTable {
424                    mounts: Vec::new(),
425                    fallback: MountLink::new("/", "/", protocol),
426                    home_dir,
427                }
428            }
429            "viewfs" => Self::build_mount_table(
430                // Host is guaranteed to be present.
431                resolved_url.host_str().expect("URL must have a host"),
432                Arc::clone(&config),
433                rt_holder.get_handle(),
434                user.clone(),
435                home_dir,
436            )?,
437            _ => {
438                return Err(HdfsError::InvalidArgument(
439                    "Only `hdfs` and `viewfs` schemes are supported".to_string(),
440                ));
441            }
442        };
443
444        Ok(Self {
445            mount_table: Arc::new(mount_table),
446            config,
447            rt_holder,
448        })
449    }
450
451    fn build_mount_table(
452        host: &str,
453        config: Arc<Configuration>,
454        handle: Handle,
455        effective_user: Option<String>,
456        home_dir: String,
457    ) -> Result<MountTable> {
458        let mut mounts: Vec<MountLink> = Vec::new();
459        let mut fallback: Option<MountLink> = None;
460
461        for (viewfs_path, hdfs_url) in config.get_mount_table(host).iter() {
462            let url = Url::parse(hdfs_url)?;
463            if !url.has_host() {
464                return Err(HdfsError::InvalidArgument(
465                    "URL must contain a host".to_string(),
466                ));
467            }
468            if url.scheme() != "hdfs" {
469                return Err(HdfsError::InvalidArgument(
470                    "Only hdfs mounts are supported for viewfs".to_string(),
471                ));
472            }
473            let proxy = NameServiceProxy::new(
474                &url,
475                Arc::clone(&config),
476                handle.clone(),
477                effective_user.clone(),
478            )?;
479            let protocol = Arc::new(NamenodeProtocol::new(proxy, handle.clone()));
480
481            if let Some(prefix) = viewfs_path {
482                mounts.push(MountLink::new(prefix, url.path(), protocol));
483            } else {
484                if fallback.is_some() {
485                    return Err(HdfsError::InvalidArgument(
486                        "Multiple viewfs fallback links found".to_string(),
487                    ));
488                }
489                fallback = Some(MountLink::new("/", url.path(), protocol));
490            }
491        }
492
493        if let Some(fallback) = fallback {
494            // Sort the mount table from longest viewfs path to shortest. This makes sure more specific paths are considered first.
495            mounts.sort_by_key(|m| m.viewfs_path.chars().filter(|c| *c == '/').count());
496            mounts.reverse();
497
498            Ok(MountTable {
499                mounts,
500                fallback,
501                home_dir,
502            })
503        } else {
504            Err(HdfsError::InvalidArgument(
505                "No viewfs fallback mount found".to_string(),
506            ))
507        }
508    }
509
510    fn normalize_path(path: &str) -> String {
511        let mut normalized = if path.is_empty() {
512            "/".to_string()
513        } else {
514            path.to_string()
515        };
516        if !normalized.starts_with('/') {
517            normalized.insert(0, '/');
518        }
519        while normalized.len() > 1 && normalized.ends_with('/') {
520            normalized.pop();
521        }
522        normalized
523    }
524
525    fn join_paths(base: &str, suffix: &str) -> String {
526        if suffix.is_empty() {
527            return base.to_string();
528        }
529        let trimmed_base = if base.is_empty() { "/" } else { base };
530        let suffix = suffix.trim_start_matches('/');
531        if trimmed_base == "/" {
532            format!("/{suffix}")
533        } else {
534            format!("{}/{}", trimmed_base.trim_end_matches('/'), suffix)
535        }
536    }
537
538    fn is_prefix_path(parent: &str, child: &str) -> bool {
539        if parent == "/" {
540            return true;
541        }
542        child == parent || child.starts_with(&format!("{parent}/"))
543    }
544
545    fn current_time_millis() -> u64 {
546        SystemTime::now()
547            .duration_since(UNIX_EPOCH)
548            .unwrap_or_default()
549            .as_millis() as u64
550    }
551
552    fn absolute_path(&self, path: &str) -> String {
553        if path.starts_with('/') {
554            Self::normalize_path(path)
555        } else {
556            let home = self.mount_table.home_dir.trim_end_matches('/');
557            Self::normalize_path(&format!("{home}/{path}"))
558        }
559    }
560
561    fn trash_root_path(&self) -> String {
562        let home = Self::normalize_path(&self.mount_table.home_dir);
563        Self::join_paths(&home, TRASH_ROOT_DIR)
564    }
565
566    async fn trash_enabled(&self, path: &str) -> Result<bool> {
567        let (link, _) = self.mount_table.resolve(path);
568        let server_defaults = link.protocol.get_cached_server_defaults().await?;
569        Ok(server_defaults.trash_interval.unwrap_or_default() > 0)
570    }
571
572    fn split_parent_name(path: &str) -> Result<(String, String)> {
573        let normalized = Self::normalize_path(path);
574        if normalized == "/" {
575            return Err(HdfsError::InvalidArgument(
576                "Cannot move the root directory to trash".to_string(),
577            ));
578        }
579        let (parent, name) = normalized
580            .rsplit_once('/')
581            .expect("Normalized path always contains '/'");
582        let parent = if parent.is_empty() {
583            "/".to_string()
584        } else {
585            parent.to_string()
586        };
587        Ok((parent, name.to_string()))
588    }
589
590    async fn non_dir_ancestor(&self, path: &str) -> Result<Option<String>> {
591        let normalized = Self::normalize_path(path);
592        let mut current = "/".to_string();
593        for component in normalized.trim_start_matches('/').split('/') {
594            if component.is_empty() {
595                continue;
596            }
597            current = Self::join_paths(&current, component);
598            match self.get_file_info(&current).await {
599                Ok(status) => {
600                    if !status.isdir {
601                        return Ok(Some(current));
602                    }
603                }
604                Err(HdfsError::FileNotFound(_)) => return Ok(None),
605                Err(err) => return Err(err),
606            }
607        }
608        Ok(None)
609    }
610
611    async fn ensure_unique_trash_path(&self, path: String) -> Result<String> {
612        let base = path.clone();
613        let mut candidate = path;
614        loop {
615            match self.get_file_info(&candidate).await {
616                Ok(_) => {
617                    candidate = format!("{}{}", base, Self::current_time_millis());
618                }
619                Err(HdfsError::FileNotFound(_)) => return Ok(candidate),
620                Err(err) => return Err(err),
621            }
622        }
623    }
624
625    /// Retrieve the file status for the file at `path`.
626    pub async fn get_file_info(&self, path: &str) -> Result<FileStatus> {
627        let (link, resolved_path) = self.mount_table.resolve(path);
628        match link.protocol.get_file_info(&resolved_path).await?.fs {
629            Some(status) => Ok(FileStatus::from(status, path)),
630            None => Err(HdfsError::FileNotFound(path.to_string())),
631        }
632    }
633
634    /// Retrives a list of all files in directories located at `path`. Wrapper around `list_status_iter` that
635    /// returns Err if any part of the stream fails, or Ok if all file statuses were found successfully.
636    pub async fn list_status(&self, path: &str, recursive: bool) -> Result<Vec<FileStatus>> {
637        let iter = self.list_status_iter(path, recursive);
638        let statuses = iter
639            .into_stream()
640            .collect::<Vec<Result<FileStatus>>>()
641            .await;
642
643        let mut resolved_statues = Vec::<FileStatus>::with_capacity(statuses.len());
644        for status in statuses.into_iter() {
645            resolved_statues.push(status?);
646        }
647
648        Ok(resolved_statues)
649    }
650
651    /// Retrives an iterator of all files in directories located at `path`.
652    pub fn list_status_iter(&self, path: &str, recursive: bool) -> ListStatusIterator {
653        ListStatusIterator::new(path.to_string(), Arc::clone(&self.mount_table), recursive)
654    }
655
656    /// Opens a file reader for the file at `path`. Path should not include a scheme.
657    pub async fn read(&self, path: &str) -> Result<FileReader> {
658        let (link, resolved_path) = self.mount_table.resolve(path);
659        // Get all block locations. Length is actually a signed value, but the proto uses an unsigned value
660        let located_info = link
661            .protocol
662            .get_block_locations(&resolved_path, 0, i64::MAX as u64)
663            .await?;
664
665        if let Some(locations) = located_info.locations {
666            let ec_schema = if let Some(ec_policy) = locations.ec_policy.as_ref() {
667                Some(resolve_ec_policy(ec_policy)?)
668            } else {
669                None
670            };
671
672            if locations.file_encryption_info.is_some() {
673                return Err(HdfsError::UnsupportedFeature("File encryption".to_string()));
674            }
675
676            Ok(FileReader::new(
677                Arc::clone(&link.protocol),
678                locations,
679                ec_schema,
680                Arc::clone(&self.config),
681                self.rt_holder.get_handle(),
682            ))
683        } else {
684            Err(HdfsError::FileNotFound(path.to_string()))
685        }
686    }
687
688    /// Opens a new file for writing. See [WriteOptions] for options and behavior for different
689    /// scenarios.
690    pub async fn create(
691        &self,
692        src: &str,
693        write_options: impl AsRef<WriteOptions>,
694    ) -> Result<FileWriter> {
695        let write_options = write_options.as_ref();
696
697        let (link, resolved_path) = self.mount_table.resolve(src);
698
699        let create_response = link
700            .protocol
701            .create(
702                &resolved_path,
703                write_options.permission,
704                write_options.overwrite,
705                write_options.create_parent,
706                write_options.replication,
707                write_options.block_size,
708            )
709            .await?;
710
711        match create_response.fs {
712            Some(status) => {
713                if status.file_encryption_info.is_some() {
714                    let _ = self.delete(src, false).await;
715                    return Err(HdfsError::UnsupportedFeature("File encryption".to_string()));
716                }
717
718                Ok(FileWriter::new(
719                    Arc::clone(&link.protocol),
720                    resolved_path,
721                    status,
722                    Arc::clone(&self.config),
723                    self.rt_holder.get_handle(),
724                    None,
725                ))
726            }
727            None => Err(HdfsError::FileNotFound(src.to_string())),
728        }
729    }
730
731    fn needs_new_block(class: &str, msg: &str) -> bool {
732        class == "java.lang.UnsupportedOperationException" && msg.contains("NEW_BLOCK")
733    }
734
735    /// Opens an existing file for appending. An Err will be returned if the file does not exist. If the
736    /// file is replicated, the current block will be appended to until it is full. If the file is erasure
737    /// coded, a new block will be created.
738    pub async fn append(&self, src: &str) -> Result<FileWriter> {
739        let (link, resolved_path) = self.mount_table.resolve(src);
740
741        // Assume the file is replicated and try to append to the current block. If the file is
742        // erasure coded, then try again by appending to a new block.
743        let append_response = match link.protocol.append(&resolved_path, false).await {
744            Err(HdfsError::RPCError(class, msg)) if Self::needs_new_block(&class, &msg) => {
745                link.protocol.append(&resolved_path, true).await?
746            }
747            resp => resp?,
748        };
749
750        match append_response.stat {
751            Some(status) => {
752                if status.file_encryption_info.is_some() {
753                    let _ = link
754                        .protocol
755                        .complete(src, append_response.block.map(|b| b.b), status.file_id)
756                        .await;
757                    return Err(HdfsError::UnsupportedFeature("File encryption".to_string()));
758                }
759
760                Ok(FileWriter::new(
761                    Arc::clone(&link.protocol),
762                    resolved_path,
763                    status,
764                    Arc::clone(&self.config),
765                    self.rt_holder.get_handle(),
766                    append_response.block,
767                ))
768            }
769            None => Err(HdfsError::FileNotFound(src.to_string())),
770        }
771    }
772
773    /// Create a new directory at `path` with the given `permission`.
774    ///
775    /// `permission` is the raw octal value representing the Unix style permission. For example, to
776    /// set 755 (`rwxr-x-rx`) permissions, use 0o755.
777    ///
778    /// If `create_parent` is true, any missing parent directories will be created as well,
779    /// otherwise an error will be returned if the parent directory doesn't already exist.
780    pub async fn mkdirs(&self, path: &str, permission: u32, create_parent: bool) -> Result<()> {
781        let (link, resolved_path) = self.mount_table.resolve(path);
782        link.protocol
783            .mkdirs(&resolved_path, permission, create_parent)
784            .await
785            .map(|_| ())
786    }
787
788    async fn rename_internal(
789        &self,
790        src: &str,
791        dst: &str,
792        overwrite: bool,
793        move_to_trash: bool,
794    ) -> Result<()> {
795        let (src_link, src_resolved_path) = self.mount_table.resolve(src);
796        let (dst_link, dst_resolved_path) = self.mount_table.resolve(dst);
797        if src_link.viewfs_path == dst_link.viewfs_path {
798            src_link
799                .protocol
800                .rename(
801                    &src_resolved_path,
802                    &dst_resolved_path,
803                    overwrite,
804                    move_to_trash,
805                )
806                .await
807                .map(|_| ())
808        } else {
809            Err(HdfsError::InvalidArgument(
810                "Cannot rename across different name services".to_string(),
811            ))
812        }
813    }
814
815    /// Renames `src` to `dst`. Returns Ok(()) on success, and Err otherwise.
816    pub async fn rename(&self, src: &str, dst: &str, overwrite: bool) -> Result<()> {
817        self.rename_internal(src, dst, overwrite, false).await
818    }
819
820    /// Deletes the file or directory at `path`. If `recursive` is false and `path` is a non-empty
821    /// directory, this will fail. Returns `Ok(true)` if it was successfully deleted.
822    pub async fn delete(&self, path: &str, recursive: bool) -> Result<bool> {
823        let (link, resolved_path) = self.mount_table.resolve(path);
824        link.protocol
825            .delete(&resolved_path, recursive)
826            .await
827            .map(|r| r.result)
828    }
829
830    /// Moves a file or directory at `path` into the user's trash. Returns `Ok(Some(path))` if
831    /// moved, where `path` is the new location in the trash, or `Ok(None)` if the path is already
832    /// under trash.
833    pub async fn trash(&self, path: &str) -> Result<Option<String>> {
834        if path.is_empty() {
835            return Err(HdfsError::InvalidPath("Empty path".to_string()));
836        }
837
838        let src_abs = self.absolute_path(path);
839        if !self.trash_enabled(&src_abs).await? {
840            return Err(HdfsError::TrashNotEnabled);
841        }
842
843        let trash_root = self.trash_root_path();
844
845        if Self::is_prefix_path(&trash_root, &src_abs) {
846            return Ok(None);
847        }
848        if Self::is_prefix_path(&src_abs, &trash_root) {
849            return Err(HdfsError::InvalidArgument(
850                "Cannot move to trash because it contains the trash".to_string(),
851            ));
852        }
853
854        let _ = self.get_file_info(&src_abs).await?;
855
856        let (src_parent, src_name) = Self::split_parent_name(&src_abs)?;
857        let trash_current = Self::join_paths(&trash_root, TRASH_CURRENT_DIR);
858        let src_parent_rel = src_parent.trim_start_matches('/');
859        let mut base_trash_path = if src_parent_rel.is_empty() {
860            trash_current.clone()
861        } else {
862            Self::join_paths(&trash_current, src_parent_rel)
863        };
864        let mut trash_path = Self::join_paths(&base_trash_path, &src_name);
865
866        for attempt in 0..2 {
867            let mut mkdirs_error: Option<HdfsError> = None;
868            loop {
869                match self
870                    .mkdirs(&base_trash_path, TRASH_DIR_PERMISSION, true)
871                    .await
872                {
873                    Ok(()) => break,
874                    Err(err) => {
875                        if let Some(ancestor) = self.non_dir_ancestor(&base_trash_path).await? {
876                            let timestamp = Self::current_time_millis();
877                            base_trash_path = base_trash_path.replacen(
878                                &ancestor,
879                                &format!("{ancestor}{timestamp}"),
880                                1,
881                            );
882                            trash_path = Self::join_paths(&base_trash_path, &src_name);
883                            continue;
884                        }
885                        mkdirs_error = Some(err);
886                        break;
887                    }
888                }
889            }
890
891            if let Some(err) = mkdirs_error {
892                if attempt == 0 {
893                    continue;
894                }
895                return Err(err);
896            }
897
898            let unique_trash_path = self.ensure_unique_trash_path(trash_path.clone()).await?;
899            match self
900                .rename_internal(&src_abs, &unique_trash_path, false, true)
901                .await
902            {
903                Ok(()) => return Ok(Some(unique_trash_path)),
904                Err(_) if attempt == 0 => continue,
905                Err(err) => return Err(err),
906            }
907        }
908
909        Err(HdfsError::OperationFailed(
910            "Failed to move to trash after retry".to_string(),
911        ))
912    }
913
914    /// Sets the modified and access times for a file. Times should be in milliseconds from the epoch.
915    pub async fn set_times(&self, path: &str, mtime: u64, atime: u64) -> Result<()> {
916        let (link, resolved_path) = self.mount_table.resolve(path);
917        link.protocol
918            .set_times(&resolved_path, mtime, atime)
919            .await?;
920        Ok(())
921    }
922
923    /// Optionally sets the owner and group for a file.
924    pub async fn set_owner(
925        &self,
926        path: &str,
927        owner: Option<&str>,
928        group: Option<&str>,
929    ) -> Result<()> {
930        let (link, resolved_path) = self.mount_table.resolve(path);
931        link.protocol
932            .set_owner(&resolved_path, owner, group)
933            .await?;
934        Ok(())
935    }
936
937    /// Sets permissions for a file. Permission should be an octal number reprenting the Unix style
938    /// permission.
939    ///
940    /// For example, to set permissions to rwxr-xr-x, use 0o755.
941    pub async fn set_permission(&self, path: &str, permission: u32) -> Result<()> {
942        let (link, resolved_path) = self.mount_table.resolve(path);
943        link.protocol
944            .set_permission(&resolved_path, permission)
945            .await?;
946        Ok(())
947    }
948
949    /// Sets the replication for a file.
950    pub async fn set_replication(&self, path: &str, replication: u32) -> Result<bool> {
951        let (link, resolved_path) = self.mount_table.resolve(path);
952        let result = link
953            .protocol
954            .set_replication(&resolved_path, replication)
955            .await?
956            .result;
957
958        Ok(result)
959    }
960
961    /// Gets a content summary for a file or directory rooted at `path`.
962    pub async fn get_content_summary(&self, path: &str) -> Result<ContentSummary> {
963        let (link, resolved_path) = self.mount_table.resolve(path);
964        let result = link
965            .protocol
966            .get_content_summary(&resolved_path)
967            .await?
968            .summary;
969
970        Ok(result.into())
971    }
972
973    /// Update ACL entries for file or directory at `path`. Existing entries will remain.
974    pub async fn modify_acl_entries(&self, path: &str, acl_spec: Vec<AclEntry>) -> Result<()> {
975        let (link, resolved_path) = self.mount_table.resolve(path);
976        link.protocol
977            .modify_acl_entries(&resolved_path, acl_spec)
978            .await?;
979
980        Ok(())
981    }
982
983    /// Remove specific ACL entries for file or directory at `path`.
984    pub async fn remove_acl_entries(&self, path: &str, acl_spec: Vec<AclEntry>) -> Result<()> {
985        let (link, resolved_path) = self.mount_table.resolve(path);
986        link.protocol
987            .remove_acl_entries(&resolved_path, acl_spec)
988            .await?;
989
990        Ok(())
991    }
992
993    /// Remove all default ACLs for file or directory at `path`.
994    pub async fn remove_default_acl(&self, path: &str) -> Result<()> {
995        let (link, resolved_path) = self.mount_table.resolve(path);
996        link.protocol.remove_default_acl(&resolved_path).await?;
997
998        Ok(())
999    }
1000
1001    /// Remove all ACL entries for file or directory at `path`.
1002    pub async fn remove_acl(&self, path: &str) -> Result<()> {
1003        let (link, resolved_path) = self.mount_table.resolve(path);
1004        link.protocol.remove_acl(&resolved_path).await?;
1005
1006        Ok(())
1007    }
1008
1009    /// Override all ACL entries for file or directory at `path`. If only access ACLs are provided,
1010    /// default ACLs are maintained. Likewise if only default ACLs are provided, access ACLs are
1011    /// maintained.
1012    pub async fn set_acl(&self, path: &str, acl_spec: Vec<AclEntry>) -> Result<()> {
1013        let (link, resolved_path) = self.mount_table.resolve(path);
1014        link.protocol.set_acl(&resolved_path, acl_spec).await?;
1015
1016        Ok(())
1017    }
1018
1019    /// Get the ACL status for the file or directory at `path`.
1020    pub async fn get_acl_status(&self, path: &str) -> Result<AclStatus> {
1021        let (link, resolved_path) = self.mount_table.resolve(path);
1022        Ok(link
1023            .protocol
1024            .get_acl_status(&resolved_path)
1025            .await?
1026            .result
1027            .into())
1028    }
1029
1030    /// Get all file statuses matching the glob `pattern`. Supports Hadoop-style globbing
1031    /// which only applies to individual components of a path.
1032    pub async fn glob_status(&self, pattern: &str) -> Result<Vec<FileStatus>> {
1033        // Expand any brace groups first
1034        let flattened = expand_glob(pattern.to_string())?;
1035
1036        let mut results: Vec<FileStatus> = Vec::new();
1037
1038        for flat in flattened.into_iter() {
1039            // Make the pattern absolute-ish. We keep the pattern as-is; components
1040            // will be split on '/'. An empty pattern yields no results.
1041            if flat.is_empty() {
1042                continue;
1043            }
1044
1045            let components = get_path_components(&flat);
1046
1047            // Candidate holds a path (fully built so far) and optionally a resolved FileStatus
1048            #[derive(Clone, Debug)]
1049            struct Candidate {
1050                path: String,
1051                status: Option<FileStatus>,
1052            }
1053
1054            // Start from the root placeholder
1055            let mut candidates: Vec<Candidate> = vec![Candidate {
1056                path: "/".to_string(),
1057                status: None,
1058            }];
1059
1060            for (idx, comp) in components.iter().enumerate() {
1061                if candidates.is_empty() {
1062                    break;
1063                }
1064
1065                let is_last = idx == components.len() - 1;
1066
1067                let unescaped = unescape_component(comp);
1068                let glob_pat = GlobPattern::new(comp)?;
1069
1070                if !is_last && !glob_pat.has_wildcard() {
1071                    // Optimization: just append the literal component to each candidate
1072                    for cand in candidates.iter_mut() {
1073                        if !cand.path.ends_with('/') {
1074                            cand.path.push('/');
1075                        }
1076                        cand.path.push_str(&unescaped);
1077                        // keep status as None (we'll resolve later if needed)
1078                    }
1079                    continue;
1080                }
1081
1082                let mut new_candidates: Vec<Candidate> = Vec::new();
1083
1084                for cand in candidates.into_iter() {
1085                    if glob_pat.has_wildcard() {
1086                        // List the directory represented by cand.path
1087                        let listing = match self.list_status(&cand.path, false).await {
1088                            Ok(listing) => listing,
1089                            Err(HdfsError::FileNotFound(_)) => continue,
1090                            Err(e) => return Err(e),
1091                        };
1092                        if listing.len() == 1 && listing[0].path == cand.path {
1093                            // listing corresponds to the candidate itself (file), skip
1094                            continue;
1095                        }
1096
1097                        for child in listing.into_iter() {
1098                            // If this is not the terminal component, only recurse into directories
1099                            if !is_last && !child.isdir {
1100                                continue;
1101                            }
1102
1103                            // child.path already contains the full path
1104                            // Extract the name portion to match against the glob pattern
1105                            let name = child
1106                                .path
1107                                .rsplit_once('/')
1108                                .map(|(_, n)| n)
1109                                .unwrap_or(child.path.as_str());
1110
1111                            if glob_pat.matches(name) {
1112                                new_candidates.push(Candidate {
1113                                    path: child.path.clone(),
1114                                    status: Some(child),
1115                                });
1116                            }
1117                        }
1118                    } else {
1119                        // Non-glob component: use get_file_info for exact path
1120                        let mut next_path = cand.path.clone();
1121                        if !next_path.ends_with('/') {
1122                            next_path.push('/');
1123                        }
1124                        next_path.push_str(&unescaped);
1125
1126                        match self.get_file_info(&next_path).await {
1127                            Ok(status) => {
1128                                if is_last || status.isdir {
1129                                    new_candidates.push(Candidate {
1130                                        path: status.path.clone(),
1131                                        status: Some(status),
1132                                    });
1133                                }
1134                            }
1135                            Err(HdfsError::FileNotFound(_)) => continue,
1136                            Err(e) => return Err(e),
1137                        }
1138                    }
1139                }
1140
1141                candidates = new_candidates;
1142            }
1143
1144            // Resolve any placeholder candidates (including root) and collect results
1145            for cand in candidates.into_iter() {
1146                let status = if let Some(s) = cand.status {
1147                    s
1148                } else {
1149                    // Try to resolve the path to a real FileStatus
1150                    match self.get_file_info(&cand.path).await {
1151                        Ok(s) => s,
1152                        Err(HdfsError::FileNotFound(_)) => continue,
1153                        Err(e) => return Err(e),
1154                    }
1155                };
1156
1157                results.push(status);
1158            }
1159        }
1160
1161        Ok(results)
1162    }
1163}
1164
1165impl Default for Client {
1166    /// Creates a new HDFS Client based on the fs.defaultFS setting. Panics if the config files fail to load,
1167    /// no defaultFS is defined, or the defaultFS is invalid.
1168    fn default() -> Self {
1169        ClientBuilder::new()
1170            .build()
1171            .expect("Failed to create default client")
1172    }
1173}
1174
1175pub(crate) struct DirListingIterator {
1176    path: String,
1177    resolved_path: String,
1178    link: MountLink,
1179    files_only: bool,
1180    partial_listing: VecDeque<HdfsFileStatusProto>,
1181    remaining: u32,
1182    last_seen: Vec<u8>,
1183}
1184
1185impl DirListingIterator {
1186    fn new(path: String, mount_table: &Arc<MountTable>, files_only: bool) -> Self {
1187        let (link, resolved_path) = mount_table.resolve(&path);
1188
1189        DirListingIterator {
1190            path,
1191            resolved_path,
1192            link: link.clone(),
1193            files_only,
1194            partial_listing: VecDeque::new(),
1195            remaining: 1,
1196            last_seen: Vec::new(),
1197        }
1198    }
1199
1200    async fn get_next_batch(&mut self) -> Result<bool> {
1201        let listing = self
1202            .link
1203            .protocol
1204            .get_listing(&self.resolved_path, self.last_seen.clone(), false)
1205            .await?;
1206
1207        if let Some(dir_list) = listing.dir_list {
1208            self.last_seen = dir_list
1209                .partial_listing
1210                .last()
1211                .map(|p| p.path.clone())
1212                .unwrap_or(Vec::new());
1213
1214            self.remaining = dir_list.remaining_entries;
1215
1216            self.partial_listing = dir_list
1217                .partial_listing
1218                .into_iter()
1219                .filter(|s| !self.files_only || s.file_type() != FileType::IsDir)
1220                .collect();
1221            Ok(!self.partial_listing.is_empty())
1222        } else {
1223            Err(HdfsError::FileNotFound(self.path.clone()))
1224        }
1225    }
1226
1227    pub async fn next(&mut self) -> Option<Result<FileStatus>> {
1228        if self.partial_listing.is_empty()
1229            && self.remaining > 0
1230            && let Err(error) = self.get_next_batch().await
1231        {
1232            self.remaining = 0;
1233            return Some(Err(error));
1234        }
1235        if let Some(next) = self.partial_listing.pop_front() {
1236            Some(Ok(FileStatus::from(next, &self.path)))
1237        } else {
1238            None
1239        }
1240    }
1241}
1242
1243pub struct ListStatusIterator {
1244    mount_table: Arc<MountTable>,
1245    recursive: bool,
1246    iters: Arc<tokio::sync::Mutex<Vec<DirListingIterator>>>,
1247}
1248
1249impl ListStatusIterator {
1250    fn new(path: String, mount_table: Arc<MountTable>, recursive: bool) -> Self {
1251        let initial = DirListingIterator::new(path.clone(), &mount_table, false);
1252
1253        ListStatusIterator {
1254            mount_table,
1255            recursive,
1256            iters: Arc::new(tokio::sync::Mutex::new(vec![initial])),
1257        }
1258    }
1259
1260    pub async fn next(&self) -> Option<Result<FileStatus>> {
1261        let mut next_file: Option<Result<FileStatus>> = None;
1262        let mut iters = self.iters.lock().await;
1263        while next_file.is_none() {
1264            if let Some(iter) = iters.last_mut() {
1265                if let Some(file_result) = iter.next().await {
1266                    if let Ok(file) = file_result {
1267                        // Return the directory as the next result, but start traversing into that directory
1268                        // next if we're doing a recursive listing
1269                        if file.isdir && self.recursive {
1270                            iters.push(DirListingIterator::new(
1271                                file.path.clone(),
1272                                &self.mount_table,
1273                                false,
1274                            ))
1275                        }
1276                        next_file = Some(Ok(file));
1277                    } else {
1278                        // Error, return that as the next element
1279                        next_file = Some(file_result)
1280                    }
1281                } else {
1282                    // We've exhausted this directory
1283                    iters.pop();
1284                }
1285            } else {
1286                // There's nothing left, just return None
1287                break;
1288            }
1289        }
1290
1291        next_file
1292    }
1293
1294    pub fn into_stream(self) -> BoxStream<'static, Result<FileStatus>> {
1295        let listing = stream::unfold(self, |state| async move {
1296            let next = state.next().await;
1297            next.map(|n| (n, state))
1298        });
1299        Box::pin(listing)
1300    }
1301}
1302
1303#[derive(Debug, Clone)]
1304pub struct FileStatus {
1305    pub path: String,
1306    pub length: usize,
1307    pub isdir: bool,
1308    pub permission: u16,
1309    pub owner: String,
1310    pub group: String,
1311    pub modification_time: u64,
1312    pub access_time: u64,
1313    pub replication: Option<u32>,
1314    pub blocksize: Option<u64>,
1315}
1316
1317impl FileStatus {
1318    fn from(value: HdfsFileStatusProto, base_path: &str) -> Self {
1319        let mut path = base_path.trim_end_matches("/").to_string();
1320        let relative_path = std::str::from_utf8(&value.path).unwrap();
1321        if !relative_path.is_empty() {
1322            path.push('/');
1323            path.push_str(relative_path);
1324        }
1325
1326        // Root path should be a slash
1327        if path.is_empty() {
1328            path.push('/');
1329        }
1330
1331        FileStatus {
1332            isdir: value.file_type() == FileType::IsDir,
1333            path,
1334            length: value.length as usize,
1335            permission: value.permission.perm as u16,
1336            owner: value.owner,
1337            group: value.group,
1338            modification_time: value.modification_time,
1339            access_time: value.access_time,
1340            replication: value.block_replication,
1341            blocksize: value.blocksize,
1342        }
1343    }
1344}
1345
1346#[derive(Debug)]
1347pub struct ContentSummary {
1348    pub length: u64,
1349    pub file_count: u64,
1350    pub directory_count: u64,
1351    pub quota: u64,
1352    pub space_consumed: u64,
1353    pub space_quota: u64,
1354}
1355
1356impl From<ContentSummaryProto> for ContentSummary {
1357    fn from(value: ContentSummaryProto) -> Self {
1358        ContentSummary {
1359            length: value.length,
1360            file_count: value.file_count,
1361            directory_count: value.directory_count,
1362            quota: value.quota,
1363            space_consumed: value.space_consumed,
1364            space_quota: value.space_quota,
1365        }
1366    }
1367}
1368
1369#[cfg(test)]
1370mod test {
1371    use std::sync::{Arc, LazyLock};
1372
1373    use tokio::runtime::Runtime;
1374    use url::Url;
1375
1376    use crate::{
1377        client::ClientBuilder,
1378        common::config::Configuration,
1379        hdfs::{protocol::NamenodeProtocol, proxy::NameServiceProxy},
1380    };
1381
1382    use super::{MountLink, MountTable};
1383
1384    static RT: LazyLock<Runtime> = LazyLock::new(|| Runtime::new().unwrap());
1385
1386    fn create_protocol(url: &str) -> Arc<NamenodeProtocol> {
1387        let proxy = NameServiceProxy::new(
1388            &Url::parse(url).unwrap(),
1389            Arc::new(Configuration::new(None, None).unwrap()),
1390            RT.handle().clone(),
1391            None,
1392        )
1393        .unwrap();
1394        Arc::new(NamenodeProtocol::new(proxy, RT.handle().clone()))
1395    }
1396
1397    #[test]
1398    fn test_default_fs() {
1399        assert!(
1400            ClientBuilder::new()
1401                .with_config(vec![("fs.defaultFS", "hdfs://test:9000")])
1402                .build()
1403                .is_ok()
1404        );
1405
1406        assert!(
1407            ClientBuilder::new()
1408                .with_config(vec![("fs.defaultFS", "hdfs://")])
1409                .build()
1410                .is_err()
1411        );
1412
1413        assert!(
1414            ClientBuilder::new()
1415                .with_url("hdfs://")
1416                .with_config(vec![("fs.defaultFS", "hdfs://test:9000")])
1417                .build()
1418                .is_ok()
1419        );
1420
1421        assert!(
1422            ClientBuilder::new()
1423                .with_url("hdfs://")
1424                .with_config(vec![("fs.defaultFS", "hdfs://")])
1425                .build()
1426                .is_err()
1427        );
1428
1429        assert!(
1430            ClientBuilder::new()
1431                .with_url("hdfs://")
1432                .with_config(vec![("fs.defaultFS", "viewfs://test")])
1433                .build()
1434                .is_err()
1435        );
1436    }
1437
1438    #[test]
1439    fn test_mount_link_resolve() {
1440        let protocol = create_protocol("hdfs://127.0.0.1:9000");
1441        let link = MountLink::new("/view", "/hdfs", protocol);
1442
1443        assert_eq!(link.resolve("/view/dir/file").unwrap(), "/hdfs/dir/file");
1444        assert_eq!(link.resolve("/view").unwrap(), "/hdfs");
1445        assert!(link.resolve("/hdfs/path").is_none());
1446    }
1447
1448    #[test]
1449    fn test_fallback_link() {
1450        let protocol = create_protocol("hdfs://127.0.0.1:9000");
1451        let link = MountLink::new("", "/hdfs", Arc::clone(&protocol));
1452
1453        assert_eq!(link.resolve("/path/to/file").unwrap(), "/hdfs/path/to/file");
1454        assert_eq!(link.resolve("/").unwrap(), "/hdfs/");
1455        assert_eq!(link.resolve("/hdfs/path").unwrap(), "/hdfs/hdfs/path");
1456
1457        let link = MountLink::new("", "", protocol);
1458        assert_eq!(link.resolve("/").unwrap(), "/");
1459    }
1460
1461    #[test]
1462    fn test_mount_table_resolve() {
1463        let link1 = MountLink::new(
1464            "/mount1",
1465            "/path1/nested",
1466            create_protocol("hdfs://127.0.0.1:9000"),
1467        );
1468        let link2 = MountLink::new(
1469            "/mount2",
1470            "/path2",
1471            create_protocol("hdfs://127.0.0.1:9001"),
1472        );
1473        let link3 = MountLink::new(
1474            "/mount3/nested",
1475            "/path3",
1476            create_protocol("hdfs://127.0.0.1:9002"),
1477        );
1478        let fallback = MountLink::new("/", "/path4", create_protocol("hdfs://127.0.0.1:9003"));
1479
1480        let mount_table = MountTable {
1481            mounts: vec![link1, link2, link3],
1482            fallback,
1483            home_dir: "/user/test".to_string(),
1484        };
1485
1486        // Exact mount path resolves to the exact HDFS path
1487        let (link, resolved) = mount_table.resolve("/mount1");
1488        assert_eq!(link.viewfs_path, "/mount1");
1489        assert_eq!(resolved, "/path1/nested");
1490
1491        // Trailing slash is treated the same
1492        let (link, resolved) = mount_table.resolve("/mount1/");
1493        assert_eq!(link.viewfs_path, "/mount1");
1494        assert_eq!(resolved, "/path1/nested/");
1495
1496        // Doesn't do partial matches on a directory name
1497        let (link, resolved) = mount_table.resolve("/mount12");
1498        assert_eq!(link.viewfs_path, "");
1499        assert_eq!(resolved, "/path4/mount12");
1500
1501        let (link, resolved) = mount_table.resolve("/mount3/file");
1502        assert_eq!(link.viewfs_path, "");
1503        assert_eq!(resolved, "/path4/mount3/file");
1504
1505        let (link, resolved) = mount_table.resolve("/mount3/nested/file");
1506        assert_eq!(link.viewfs_path, "/mount3/nested");
1507        assert_eq!(resolved, "/path3/file");
1508
1509        let (link, resolved) = mount_table.resolve("file");
1510        assert_eq!(link.viewfs_path, "");
1511        assert_eq!(resolved, "/path4/user/test/file");
1512
1513        let (link, resolved) = mount_table.resolve("dir/subdir");
1514        assert_eq!(link.viewfs_path, "");
1515        assert_eq!(resolved, "/path4/user/test/dir/subdir");
1516
1517        let mount_table = MountTable {
1518            mounts: vec![
1519                MountLink::new(
1520                    "/mount1",
1521                    "/path1/nested",
1522                    create_protocol("hdfs://127.0.0.1:9000"),
1523                ),
1524                MountLink::new(
1525                    "/mount2",
1526                    "/path2",
1527                    create_protocol("hdfs://127.0.0.1:9001"),
1528                ),
1529            ],
1530            fallback: MountLink::new("/", "/path4", create_protocol("hdfs://127.0.0.1:9003")),
1531            home_dir: "/mount1/user".to_string(),
1532        };
1533
1534        let (link, resolved) = mount_table.resolve("file");
1535        assert_eq!(link.viewfs_path, "/mount1");
1536        assert_eq!(resolved, "/path1/nested/user/file");
1537
1538        let (link, resolved) = mount_table.resolve("dir/subdir");
1539        assert_eq!(link.viewfs_path, "/mount1");
1540        assert_eq!(resolved, "/path1/nested/user/dir/subdir");
1541    }
1542
1543    #[test]
1544    fn test_io_runtime() {
1545        assert!(
1546            ClientBuilder::new()
1547                .with_url("hdfs://127.0.0.1:9000")
1548                .with_io_runtime(Runtime::new().unwrap())
1549                .build()
1550                .is_ok()
1551        );
1552
1553        let rt = Runtime::new().unwrap();
1554        assert!(
1555            ClientBuilder::new()
1556                .with_url("hdfs://127.0.0.1:9000")
1557                .with_io_runtime(rt.handle().clone())
1558                .build()
1559                .is_ok()
1560        );
1561    }
1562
1563    #[test]
1564    fn test_with_user_sets_relative_path_home_dir() {
1565        let client = ClientBuilder::new()
1566            .with_url("hdfs://127.0.0.1:9000")
1567            .with_user("alice")
1568            .build()
1569            .unwrap();
1570
1571        let (_, resolved) = client.mount_table.resolve("file");
1572        assert_eq!(resolved, "/user/alice/file");
1573    }
1574
1575    #[test]
1576    fn test_set_conf_dir() {
1577        assert!(
1578            ClientBuilder::new()
1579                .with_url("hdfs://127.0.0.1:9000")
1580                .with_config_dir("target/test")
1581                .build()
1582                .is_ok()
1583        )
1584    }
1585}