hdfs_native/
client.rs

1use std::collections::{HashMap, VecDeque};
2use std::sync::{Arc, OnceLock};
3
4use futures::stream::BoxStream;
5use futures::{stream, StreamExt};
6use tokio::runtime::{Handle, Runtime};
7use url::Url;
8
9use crate::acl::{AclEntry, AclStatus};
10use crate::common::config::{self, Configuration};
11use crate::ec::resolve_ec_policy;
12use crate::error::{HdfsError, Result};
13use crate::file::{FileReader, FileWriter};
14use crate::hdfs::protocol::NamenodeProtocol;
15use crate::hdfs::proxy::NameServiceProxy;
16use crate::proto::hdfs::hdfs_file_status_proto::FileType;
17
18use crate::proto::hdfs::{ContentSummaryProto, HdfsFileStatusProto};
19
20#[derive(Clone)]
21pub struct WriteOptions {
22    /// Block size. Default is retrieved from the server.
23    pub block_size: Option<u64>,
24    /// Replication factor. Default is retrieved from the server.
25    pub replication: Option<u32>,
26    /// Unix file permission, defaults to 0o644, which is "rw-r--r--" as a Unix permission.
27    /// This is the raw octal value represented in base 10.
28    pub permission: u32,
29    /// Whether to overwrite the file, defaults to false. If true and the
30    /// file does not exist, it will result in an error.
31    pub overwrite: bool,
32    /// Whether to create any missing parent directories, defaults to true. If false
33    /// and the parent directory does not exist, an error will be returned.
34    pub create_parent: bool,
35}
36
37impl Default for WriteOptions {
38    fn default() -> Self {
39        Self {
40            block_size: None,
41            replication: None,
42            permission: 0o644,
43            overwrite: false,
44            create_parent: true,
45        }
46    }
47}
48
49impl AsRef<WriteOptions> for WriteOptions {
50    fn as_ref(&self) -> &WriteOptions {
51        self
52    }
53}
54
55impl WriteOptions {
56    /// Set the block_size for the new file
57    pub fn block_size(mut self, block_size: u64) -> Self {
58        self.block_size = Some(block_size);
59        self
60    }
61
62    /// Set the replication for the new file
63    pub fn replication(mut self, replication: u32) -> Self {
64        self.replication = Some(replication);
65        self
66    }
67
68    /// Set the raw octal permission value for the new file
69    pub fn permission(mut self, permission: u32) -> Self {
70        self.permission = permission;
71        self
72    }
73
74    /// Set whether to overwrite an existing file
75    pub fn overwrite(mut self, overwrite: bool) -> Self {
76        self.overwrite = overwrite;
77        self
78    }
79
80    /// Set whether to create all missing parent directories
81    pub fn create_parent(mut self, create_parent: bool) -> Self {
82        self.create_parent = create_parent;
83        self
84    }
85}
86
87#[derive(Debug, Clone)]
88struct MountLink {
89    viewfs_path: String,
90    hdfs_path: String,
91    protocol: Arc<NamenodeProtocol>,
92}
93
94impl MountLink {
95    fn new(viewfs_path: &str, hdfs_path: &str, protocol: Arc<NamenodeProtocol>) -> Self {
96        // We should never have an empty path, we always want things mounted at root ("/") by default.
97        Self {
98            viewfs_path: viewfs_path.trim_end_matches("/").to_string(),
99            hdfs_path: hdfs_path.trim_end_matches("/").to_string(),
100            protocol,
101        }
102    }
103    /// Convert a viewfs path into a name service path if it matches this link
104    fn resolve(&self, path: &str) -> Option<String> {
105        // Make sure we don't partially match the last component. It either needs to be an exact
106        // match to a viewfs path, or needs to match with a trailing slash
107        if path == self.viewfs_path {
108            Some(self.hdfs_path.clone())
109        } else {
110            path.strip_prefix(&format!("{}/", self.viewfs_path))
111                .map(|relative_path| format!("{}/{}", &self.hdfs_path, relative_path))
112        }
113    }
114}
115
116#[derive(Debug)]
117struct MountTable {
118    mounts: Vec<MountLink>,
119    fallback: MountLink,
120}
121
122impl MountTable {
123    fn resolve(&self, src: &str) -> (&MountLink, String) {
124        for link in self.mounts.iter() {
125            if let Some(resolved) = link.resolve(src) {
126                return (link, resolved);
127            }
128        }
129        (&self.fallback, self.fallback.resolve(src).unwrap())
130    }
131}
132
133/// Builds a new [Client] instance. By default, configs will be loaded from the default config directories with the following precedence:
134/// - If the `HADOOP_CONF_DIR` environment variable is defined, configs will be loaded from `${HADOOP_CONF_DIR}/{core,hdfs}-site.xml`
135/// - If the `HADOOP_HOME` environment variable is defined, configs will be loaded from `${HADOOP_HOME}/etc/hadoop/{core,hdfs}-site.xml`
136/// - Otherwise no default configs are defined
137///
138/// If no URL is defined, the `fs.defaultFS` config must be defined and is used as the URL.
139///
140/// # Examples
141///
142/// Create a new client using the fs.defaultFS config
143/// ```rust
144/// # use hdfs_native::ClientBuilder;
145/// let client = ClientBuilder::new()
146///     .with_config(vec![("fs.defaultFS", "hdfs://127.0.0.1:9000")])
147///     .build()
148///     .unwrap();
149/// ```
150///
151/// Create a new client connecting to a specific URL:
152/// ```rust
153/// # use hdfs_native::ClientBuilder;
154/// let client = ClientBuilder::new()
155///     .with_url("hdfs://127.0.0.1:9000")
156///     .build()
157///     .unwrap();
158/// ```
159///
160/// Create a new client using a dedicated tokio runtime for spawned tasks and IO operations
161/// ```rust
162/// # use hdfs_native::ClientBuilder;
163/// let client = ClientBuilder::new()
164///     .with_url("hdfs://127.0.0.1:9000")
165///     .with_io_runtime(tokio::runtime::Runtime::new().unwrap())
166///     .build()
167///     .unwrap();
168/// ```
169#[derive(Default)]
170pub struct ClientBuilder {
171    url: Option<String>,
172    config: HashMap<String, String>,
173    runtime: Option<Runtime>,
174}
175
176impl ClientBuilder {
177    /// Create a new [ClientBuilder]
178    pub fn new() -> Self {
179        Self::default()
180    }
181
182    /// Set the URL to connect to. Can be the address of a single NameNode, or a logical NameService
183    pub fn with_url(mut self, url: impl Into<String>) -> Self {
184        self.url = Some(url.into());
185        self
186    }
187
188    /// Set configs to use for the client. The provided configs will override any found in the default config files loaded
189    pub fn with_config(
190        mut self,
191        config: impl IntoIterator<Item = (impl Into<String>, impl Into<String>)>,
192    ) -> Self {
193        self.config = config
194            .into_iter()
195            .map(|(k, v)| (k.into(), v.into()))
196            .collect();
197        self
198    }
199
200    /// Use a dedicated tokio runtime for spawned tasks and IO operations
201    pub fn with_io_runtime(mut self, runtime: Runtime) -> Self {
202        self.runtime = Some(runtime);
203        self
204    }
205
206    /// Create the [Client] instance from the provided settings
207    pub fn build(self) -> Result<Client> {
208        let config = Configuration::new_with_config(self.config)?;
209        let url = if let Some(url) = self.url {
210            Url::parse(&url)?
211        } else {
212            Client::default_fs(&config)?
213        };
214
215        Client::build(&url, config, self.runtime)
216    }
217}
218
219#[derive(Clone, Debug)]
220enum RuntimeHolder {
221    Custom(Arc<Runtime>),
222    Default(Arc<OnceLock<Runtime>>),
223}
224
225impl RuntimeHolder {
226    fn new(rt: Option<Runtime>) -> Self {
227        if let Some(rt) = rt {
228            Self::Custom(Arc::new(rt))
229        } else {
230            Self::Default(Arc::new(OnceLock::new()))
231        }
232    }
233
234    fn get_handle(&self) -> Handle {
235        match self {
236            Self::Custom(rt) => rt.handle().clone(),
237            Self::Default(rt) => match Handle::try_current() {
238                Ok(handle) => handle,
239                Err(_) => rt
240                    .get_or_init(|| Runtime::new().expect("Failed to create tokio runtime"))
241                    .handle()
242                    .clone(),
243            },
244        }
245    }
246}
247
248/// A client to a speicific NameNode, NameService, or Viewfs mount table
249#[derive(Clone, Debug)]
250pub struct Client {
251    mount_table: Arc<MountTable>,
252    config: Arc<Configuration>,
253    // Store the runtime used for spawning all internal tasks. If we are not created
254    // inside a tokio runtime, we will create our own to use.
255    rt_holder: RuntimeHolder,
256}
257
258impl Client {
259    /// Creates a new HDFS Client. The URL must include the protocol and host, and optionally a port.
260    /// If a port is included, the host is treated as a single NameNode. If no port is included, the
261    /// host is treated as a name service that will be resolved using the HDFS config.
262    #[deprecated(since = "0.12.0", note = "Use ClientBuilder instead")]
263    pub fn new(url: &str) -> Result<Self> {
264        let parsed_url = Url::parse(url)?;
265        Self::build(&parsed_url, Configuration::new()?, None)
266    }
267
268    #[deprecated(since = "0.12.0", note = "Use ClientBuilder instead")]
269    pub fn new_with_config(url: &str, config: HashMap<String, String>) -> Result<Self> {
270        let parsed_url = Url::parse(url)?;
271        Self::build(&parsed_url, Configuration::new_with_config(config)?, None)
272    }
273
274    #[deprecated(since = "0.12.0", note = "Use ClientBuilder instead")]
275    pub fn default_with_config(config: HashMap<String, String>) -> Result<Self> {
276        let config = Configuration::new_with_config(config)?;
277        Self::build(&Self::default_fs(&config)?, config, None)
278    }
279
280    fn default_fs(config: &Configuration) -> Result<Url> {
281        let url = config
282            .get(config::DEFAULT_FS)
283            .ok_or(HdfsError::InvalidArgument(format!(
284                "No {} setting found",
285                config::DEFAULT_FS
286            )))?;
287        Ok(Url::parse(url)?)
288    }
289
290    fn build(url: &Url, config: Configuration, rt: Option<Runtime>) -> Result<Self> {
291        let resolved_url = if !url.has_host() {
292            let default_url = Self::default_fs(&config)?;
293            if url.scheme() != default_url.scheme() || !default_url.has_host() {
294                return Err(HdfsError::InvalidArgument(
295                    "URL must contain a host".to_string(),
296                ));
297            }
298            default_url
299        } else {
300            url.clone()
301        };
302
303        let config = Arc::new(config);
304
305        let rt_holder = RuntimeHolder::new(rt);
306
307        let mount_table = match url.scheme() {
308            "hdfs" => {
309                let proxy = NameServiceProxy::new(
310                    &resolved_url,
311                    Arc::clone(&config),
312                    rt_holder.get_handle(),
313                )?;
314                let protocol = Arc::new(NamenodeProtocol::new(proxy, rt_holder.get_handle()));
315
316                MountTable {
317                    mounts: Vec::new(),
318                    fallback: MountLink::new("/", "/", protocol),
319                }
320            }
321            "viewfs" => Self::build_mount_table(
322                // Host is guaranteed to be present.
323                resolved_url.host_str().expect("URL must have a host"),
324                Arc::clone(&config),
325                rt_holder.get_handle(),
326            )?,
327            _ => {
328                return Err(HdfsError::InvalidArgument(
329                    "Only `hdfs` and `viewfs` schemes are supported".to_string(),
330                ))
331            }
332        };
333
334        Ok(Self {
335            mount_table: Arc::new(mount_table),
336            config,
337            rt_holder,
338        })
339    }
340
341    fn build_mount_table(
342        host: &str,
343        config: Arc<Configuration>,
344        handle: Handle,
345    ) -> Result<MountTable> {
346        let mut mounts: Vec<MountLink> = Vec::new();
347        let mut fallback: Option<MountLink> = None;
348
349        for (viewfs_path, hdfs_url) in config.get_mount_table(host).iter() {
350            let url = Url::parse(hdfs_url)?;
351            if !url.has_host() {
352                return Err(HdfsError::InvalidArgument(
353                    "URL must contain a host".to_string(),
354                ));
355            }
356            if url.scheme() != "hdfs" {
357                return Err(HdfsError::InvalidArgument(
358                    "Only hdfs mounts are supported for viewfs".to_string(),
359                ));
360            }
361            let proxy = NameServiceProxy::new(&url, Arc::clone(&config), handle.clone())?;
362            let protocol = Arc::new(NamenodeProtocol::new(proxy, handle.clone()));
363
364            if let Some(prefix) = viewfs_path {
365                mounts.push(MountLink::new(prefix, url.path(), protocol));
366            } else {
367                if fallback.is_some() {
368                    return Err(HdfsError::InvalidArgument(
369                        "Multiple viewfs fallback links found".to_string(),
370                    ));
371                }
372                fallback = Some(MountLink::new("/", url.path(), protocol));
373            }
374        }
375
376        if let Some(fallback) = fallback {
377            // Sort the mount table from longest viewfs path to shortest. This makes sure more specific paths are considered first.
378            mounts.sort_by_key(|m| m.viewfs_path.chars().filter(|c| *c == '/').count());
379            mounts.reverse();
380
381            Ok(MountTable { mounts, fallback })
382        } else {
383            Err(HdfsError::InvalidArgument(
384                "No viewfs fallback mount found".to_string(),
385            ))
386        }
387    }
388
389    /// Retrieve the file status for the file at `path`.
390    pub async fn get_file_info(&self, path: &str) -> Result<FileStatus> {
391        let (link, resolved_path) = self.mount_table.resolve(path);
392        match link.protocol.get_file_info(&resolved_path).await?.fs {
393            Some(status) => Ok(FileStatus::from(status, path)),
394            None => Err(HdfsError::FileNotFound(path.to_string())),
395        }
396    }
397
398    /// Retrives a list of all files in directories located at `path`. Wrapper around `list_status_iter` that
399    /// returns Err if any part of the stream fails, or Ok if all file statuses were found successfully.
400    pub async fn list_status(&self, path: &str, recursive: bool) -> Result<Vec<FileStatus>> {
401        let iter = self.list_status_iter(path, recursive);
402        let statuses = iter
403            .into_stream()
404            .collect::<Vec<Result<FileStatus>>>()
405            .await;
406
407        let mut resolved_statues = Vec::<FileStatus>::with_capacity(statuses.len());
408        for status in statuses.into_iter() {
409            resolved_statues.push(status?);
410        }
411
412        Ok(resolved_statues)
413    }
414
415    /// Retrives an iterator of all files in directories located at `path`.
416    pub fn list_status_iter(&self, path: &str, recursive: bool) -> ListStatusIterator {
417        ListStatusIterator::new(path.to_string(), Arc::clone(&self.mount_table), recursive)
418    }
419
420    /// Opens a file reader for the file at `path`. Path should not include a scheme.
421    pub async fn read(&self, path: &str) -> Result<FileReader> {
422        let (link, resolved_path) = self.mount_table.resolve(path);
423        // Get all block locations. Length is actually a signed value, but the proto uses an unsigned value
424        let located_info = link
425            .protocol
426            .get_block_locations(&resolved_path, 0, i64::MAX as u64)
427            .await?;
428
429        if let Some(locations) = located_info.locations {
430            let ec_schema = if let Some(ec_policy) = locations.ec_policy.as_ref() {
431                Some(resolve_ec_policy(ec_policy)?)
432            } else {
433                None
434            };
435
436            if locations.file_encryption_info.is_some() {
437                return Err(HdfsError::UnsupportedFeature("File encryption".to_string()));
438            }
439
440            Ok(FileReader::new(
441                Arc::clone(&link.protocol),
442                locations,
443                ec_schema,
444                Arc::clone(&self.config),
445                self.rt_holder.get_handle(),
446            ))
447        } else {
448            Err(HdfsError::FileNotFound(path.to_string()))
449        }
450    }
451
452    /// Opens a new file for writing. See [WriteOptions] for options and behavior for different
453    /// scenarios.
454    pub async fn create(
455        &self,
456        src: &str,
457        write_options: impl AsRef<WriteOptions>,
458    ) -> Result<FileWriter> {
459        let write_options = write_options.as_ref();
460
461        let (link, resolved_path) = self.mount_table.resolve(src);
462
463        let create_response = link
464            .protocol
465            .create(
466                &resolved_path,
467                write_options.permission,
468                write_options.overwrite,
469                write_options.create_parent,
470                write_options.replication,
471                write_options.block_size,
472            )
473            .await?;
474
475        match create_response.fs {
476            Some(status) => {
477                if status.file_encryption_info.is_some() {
478                    let _ = self.delete(src, false).await;
479                    return Err(HdfsError::UnsupportedFeature("File encryption".to_string()));
480                }
481
482                Ok(FileWriter::new(
483                    Arc::clone(&link.protocol),
484                    resolved_path,
485                    status,
486                    Arc::clone(&self.config),
487                    self.rt_holder.get_handle(),
488                    None,
489                ))
490            }
491            None => Err(HdfsError::FileNotFound(src.to_string())),
492        }
493    }
494
495    fn needs_new_block(class: &str, msg: &str) -> bool {
496        class == "java.lang.UnsupportedOperationException" && msg.contains("NEW_BLOCK")
497    }
498
499    /// Opens an existing file for appending. An Err will be returned if the file does not exist. If the
500    /// file is replicated, the current block will be appended to until it is full. If the file is erasure
501    /// coded, a new block will be created.
502    pub async fn append(&self, src: &str) -> Result<FileWriter> {
503        let (link, resolved_path) = self.mount_table.resolve(src);
504
505        // Assume the file is replicated and try to append to the current block. If the file is
506        // erasure coded, then try again by appending to a new block.
507        let append_response = match link.protocol.append(&resolved_path, false).await {
508            Err(HdfsError::RPCError(class, msg)) if Self::needs_new_block(&class, &msg) => {
509                link.protocol.append(&resolved_path, true).await?
510            }
511            resp => resp?,
512        };
513
514        match append_response.stat {
515            Some(status) => {
516                if status.file_encryption_info.is_some() {
517                    let _ = link
518                        .protocol
519                        .complete(src, append_response.block.map(|b| b.b), status.file_id)
520                        .await;
521                    return Err(HdfsError::UnsupportedFeature("File encryption".to_string()));
522                }
523
524                Ok(FileWriter::new(
525                    Arc::clone(&link.protocol),
526                    resolved_path,
527                    status,
528                    Arc::clone(&self.config),
529                    self.rt_holder.get_handle(),
530                    append_response.block,
531                ))
532            }
533            None => Err(HdfsError::FileNotFound(src.to_string())),
534        }
535    }
536
537    /// Create a new directory at `path` with the given `permission`.
538    ///
539    /// `permission` is the raw octal value representing the Unix style permission. For example, to
540    /// set 755 (`rwxr-x-rx`) permissions, use 0o755.
541    ///
542    /// If `create_parent` is true, any missing parent directories will be created as well,
543    /// otherwise an error will be returned if the parent directory doesn't already exist.
544    pub async fn mkdirs(&self, path: &str, permission: u32, create_parent: bool) -> Result<()> {
545        let (link, resolved_path) = self.mount_table.resolve(path);
546        link.protocol
547            .mkdirs(&resolved_path, permission, create_parent)
548            .await
549            .map(|_| ())
550    }
551
552    /// Renames `src` to `dst`. Returns Ok(()) on success, and Err otherwise.
553    pub async fn rename(&self, src: &str, dst: &str, overwrite: bool) -> Result<()> {
554        let (src_link, src_resolved_path) = self.mount_table.resolve(src);
555        let (dst_link, dst_resolved_path) = self.mount_table.resolve(dst);
556        if src_link.viewfs_path == dst_link.viewfs_path {
557            src_link
558                .protocol
559                .rename(&src_resolved_path, &dst_resolved_path, overwrite)
560                .await
561                .map(|_| ())
562        } else {
563            Err(HdfsError::InvalidArgument(
564                "Cannot rename across different name services".to_string(),
565            ))
566        }
567    }
568
569    /// Deletes the file or directory at `path`. If `recursive` is false and `path` is a non-empty
570    /// directory, this will fail. Returns `Ok(true)` if it was successfully deleted.
571    pub async fn delete(&self, path: &str, recursive: bool) -> Result<bool> {
572        let (link, resolved_path) = self.mount_table.resolve(path);
573        link.protocol
574            .delete(&resolved_path, recursive)
575            .await
576            .map(|r| r.result)
577    }
578
579    /// Sets the modified and access times for a file. Times should be in milliseconds from the epoch.
580    pub async fn set_times(&self, path: &str, mtime: u64, atime: u64) -> Result<()> {
581        let (link, resolved_path) = self.mount_table.resolve(path);
582        link.protocol
583            .set_times(&resolved_path, mtime, atime)
584            .await?;
585        Ok(())
586    }
587
588    /// Optionally sets the owner and group for a file.
589    pub async fn set_owner(
590        &self,
591        path: &str,
592        owner: Option<&str>,
593        group: Option<&str>,
594    ) -> Result<()> {
595        let (link, resolved_path) = self.mount_table.resolve(path);
596        link.protocol
597            .set_owner(&resolved_path, owner, group)
598            .await?;
599        Ok(())
600    }
601
602    /// Sets permissions for a file. Permission should be an octal number reprenting the Unix style
603    /// permission.
604    ///
605    /// For example, to set permissions to rwxr-xr-x:
606    /// ```rust
607    /// # async fn func() {
608    /// # let client = hdfs_native::Client::new("localhost:9000").unwrap();
609    /// client.set_permission("/path", 0o755).await.unwrap();
610    /// }
611    /// ```
612    pub async fn set_permission(&self, path: &str, permission: u32) -> Result<()> {
613        let (link, resolved_path) = self.mount_table.resolve(path);
614        link.protocol
615            .set_permission(&resolved_path, permission)
616            .await?;
617        Ok(())
618    }
619
620    /// Sets the replication for a file.
621    pub async fn set_replication(&self, path: &str, replication: u32) -> Result<bool> {
622        let (link, resolved_path) = self.mount_table.resolve(path);
623        let result = link
624            .protocol
625            .set_replication(&resolved_path, replication)
626            .await?
627            .result;
628
629        Ok(result)
630    }
631
632    /// Gets a content summary for a file or directory rooted at `path`.
633    pub async fn get_content_summary(&self, path: &str) -> Result<ContentSummary> {
634        let (link, resolved_path) = self.mount_table.resolve(path);
635        let result = link
636            .protocol
637            .get_content_summary(&resolved_path)
638            .await?
639            .summary;
640
641        Ok(result.into())
642    }
643
644    /// Update ACL entries for file or directory at `path`. Existing entries will remain.
645    pub async fn modify_acl_entries(&self, path: &str, acl_spec: Vec<AclEntry>) -> Result<()> {
646        let (link, resolved_path) = self.mount_table.resolve(path);
647        link.protocol
648            .modify_acl_entries(&resolved_path, acl_spec)
649            .await?;
650
651        Ok(())
652    }
653
654    /// Remove specific ACL entries for file or directory at `path`.
655    pub async fn remove_acl_entries(&self, path: &str, acl_spec: Vec<AclEntry>) -> Result<()> {
656        let (link, resolved_path) = self.mount_table.resolve(path);
657        link.protocol
658            .remove_acl_entries(&resolved_path, acl_spec)
659            .await?;
660
661        Ok(())
662    }
663
664    /// Remove all default ACLs for file or directory at `path`.
665    pub async fn remove_default_acl(&self, path: &str) -> Result<()> {
666        let (link, resolved_path) = self.mount_table.resolve(path);
667        link.protocol.remove_default_acl(&resolved_path).await?;
668
669        Ok(())
670    }
671
672    /// Remove all ACL entries for file or directory at `path`.
673    pub async fn remove_acl(&self, path: &str) -> Result<()> {
674        let (link, resolved_path) = self.mount_table.resolve(path);
675        link.protocol.remove_acl(&resolved_path).await?;
676
677        Ok(())
678    }
679
680    /// Override all ACL entries for file or directory at `path`. If only access ACLs are provided,
681    /// default ACLs are maintained. Likewise if only default ACLs are provided, access ACLs are
682    /// maintained.
683    pub async fn set_acl(&self, path: &str, acl_spec: Vec<AclEntry>) -> Result<()> {
684        let (link, resolved_path) = self.mount_table.resolve(path);
685        link.protocol.set_acl(&resolved_path, acl_spec).await?;
686
687        Ok(())
688    }
689
690    /// Get the ACL status for the file or directory at `path`.
691    pub async fn get_acl_status(&self, path: &str) -> Result<AclStatus> {
692        let (link, resolved_path) = self.mount_table.resolve(path);
693        Ok(link
694            .protocol
695            .get_acl_status(&resolved_path)
696            .await?
697            .result
698            .into())
699    }
700}
701
702impl Default for Client {
703    /// Creates a new HDFS Client based on the fs.defaultFS setting. Panics if the config files fail to load,
704    /// no defaultFS is defined, or the defaultFS is invalid.
705    fn default() -> Self {
706        ClientBuilder::new()
707            .build()
708            .expect("Failed to create default client")
709    }
710}
711
712pub(crate) struct DirListingIterator {
713    path: String,
714    resolved_path: String,
715    link: MountLink,
716    files_only: bool,
717    partial_listing: VecDeque<HdfsFileStatusProto>,
718    remaining: u32,
719    last_seen: Vec<u8>,
720}
721
722impl DirListingIterator {
723    fn new(path: String, mount_table: &Arc<MountTable>, files_only: bool) -> Self {
724        let (link, resolved_path) = mount_table.resolve(&path);
725
726        DirListingIterator {
727            path,
728            resolved_path,
729            link: link.clone(),
730            files_only,
731            partial_listing: VecDeque::new(),
732            remaining: 1,
733            last_seen: Vec::new(),
734        }
735    }
736
737    async fn get_next_batch(&mut self) -> Result<bool> {
738        let listing = self
739            .link
740            .protocol
741            .get_listing(&self.resolved_path, self.last_seen.clone(), false)
742            .await?;
743
744        if let Some(dir_list) = listing.dir_list {
745            self.last_seen = dir_list
746                .partial_listing
747                .last()
748                .map(|p| p.path.clone())
749                .unwrap_or(Vec::new());
750
751            self.remaining = dir_list.remaining_entries;
752
753            self.partial_listing = dir_list
754                .partial_listing
755                .into_iter()
756                .filter(|s| !self.files_only || s.file_type() != FileType::IsDir)
757                .collect();
758            Ok(!self.partial_listing.is_empty())
759        } else {
760            Err(HdfsError::FileNotFound(self.path.clone()))
761        }
762    }
763
764    pub async fn next(&mut self) -> Option<Result<FileStatus>> {
765        if self.partial_listing.is_empty() && self.remaining > 0 {
766            if let Err(error) = self.get_next_batch().await {
767                self.remaining = 0;
768                return Some(Err(error));
769            }
770        }
771        if let Some(next) = self.partial_listing.pop_front() {
772            Some(Ok(FileStatus::from(next, &self.path)))
773        } else {
774            None
775        }
776    }
777}
778
779pub struct ListStatusIterator {
780    mount_table: Arc<MountTable>,
781    recursive: bool,
782    iters: Arc<tokio::sync::Mutex<Vec<DirListingIterator>>>,
783}
784
785impl ListStatusIterator {
786    fn new(path: String, mount_table: Arc<MountTable>, recursive: bool) -> Self {
787        let initial = DirListingIterator::new(path.clone(), &mount_table, false);
788
789        ListStatusIterator {
790            mount_table,
791            recursive,
792            iters: Arc::new(tokio::sync::Mutex::new(vec![initial])),
793        }
794    }
795
796    pub async fn next(&self) -> Option<Result<FileStatus>> {
797        let mut next_file: Option<Result<FileStatus>> = None;
798        let mut iters = self.iters.lock().await;
799        while next_file.is_none() {
800            if let Some(iter) = iters.last_mut() {
801                if let Some(file_result) = iter.next().await {
802                    if let Ok(file) = file_result {
803                        // Return the directory as the next result, but start traversing into that directory
804                        // next if we're doing a recursive listing
805                        if file.isdir && self.recursive {
806                            iters.push(DirListingIterator::new(
807                                file.path.clone(),
808                                &self.mount_table,
809                                false,
810                            ))
811                        }
812                        next_file = Some(Ok(file));
813                    } else {
814                        // Error, return that as the next element
815                        next_file = Some(file_result)
816                    }
817                } else {
818                    // We've exhausted this directory
819                    iters.pop();
820                }
821            } else {
822                // There's nothing left, just return None
823                break;
824            }
825        }
826
827        next_file
828    }
829
830    pub fn into_stream(self) -> BoxStream<'static, Result<FileStatus>> {
831        let listing = stream::unfold(self, |state| async move {
832            let next = state.next().await;
833            next.map(|n| (n, state))
834        });
835        Box::pin(listing)
836    }
837}
838
839#[derive(Debug)]
840pub struct FileStatus {
841    pub path: String,
842    pub length: usize,
843    pub isdir: bool,
844    pub permission: u16,
845    pub owner: String,
846    pub group: String,
847    pub modification_time: u64,
848    pub access_time: u64,
849    pub replication: Option<u32>,
850    pub blocksize: Option<u64>,
851}
852
853impl FileStatus {
854    fn from(value: HdfsFileStatusProto, base_path: &str) -> Self {
855        let mut path = base_path.trim_end_matches("/").to_string();
856        let relative_path = std::str::from_utf8(&value.path).unwrap();
857        if !relative_path.is_empty() {
858            path.push('/');
859            path.push_str(relative_path);
860        }
861
862        FileStatus {
863            isdir: value.file_type() == FileType::IsDir,
864            path,
865            length: value.length as usize,
866            permission: value.permission.perm as u16,
867            owner: value.owner,
868            group: value.group,
869            modification_time: value.modification_time,
870            access_time: value.access_time,
871            replication: value.block_replication,
872            blocksize: value.blocksize,
873        }
874    }
875}
876
877#[derive(Debug)]
878pub struct ContentSummary {
879    pub length: u64,
880    pub file_count: u64,
881    pub directory_count: u64,
882    pub quota: u64,
883    pub space_consumed: u64,
884    pub space_quota: u64,
885}
886
887impl From<ContentSummaryProto> for ContentSummary {
888    fn from(value: ContentSummaryProto) -> Self {
889        ContentSummary {
890            length: value.length,
891            file_count: value.file_count,
892            directory_count: value.directory_count,
893            quota: value.quota,
894            space_consumed: value.space_consumed,
895            space_quota: value.space_quota,
896        }
897    }
898}
899
900#[cfg(test)]
901mod test {
902    use std::sync::{Arc, LazyLock};
903
904    use tokio::runtime::Runtime;
905    use url::Url;
906
907    use crate::{
908        client::ClientBuilder,
909        common::config::Configuration,
910        hdfs::{protocol::NamenodeProtocol, proxy::NameServiceProxy},
911    };
912
913    use super::{MountLink, MountTable};
914
915    static RT: LazyLock<Runtime> = LazyLock::new(|| Runtime::new().unwrap());
916
917    fn create_protocol(url: &str) -> Arc<NamenodeProtocol> {
918        let proxy = NameServiceProxy::new(
919            &Url::parse(url).unwrap(),
920            Arc::new(Configuration::new().unwrap()),
921            RT.handle().clone(),
922        )
923        .unwrap();
924        Arc::new(NamenodeProtocol::new(proxy, RT.handle().clone()))
925    }
926
927    #[test]
928    fn test_default_fs() {
929        assert!(ClientBuilder::new()
930            .with_config(vec![("fs.defaultFS", "hdfs://test:9000")])
931            .build()
932            .is_ok());
933
934        assert!(ClientBuilder::new()
935            .with_config(vec![("fs.defaultFS", "hdfs://")])
936            .build()
937            .is_err());
938
939        assert!(ClientBuilder::new()
940            .with_url("hdfs://")
941            .with_config(vec![("fs.defaultFS", "hdfs://test:9000")])
942            .build()
943            .is_ok());
944
945        assert!(ClientBuilder::new()
946            .with_url("hdfs://")
947            .with_config(vec![("fs.defaultFS", "hdfs://")])
948            .build()
949            .is_err());
950
951        assert!(ClientBuilder::new()
952            .with_url("hdfs://")
953            .with_config(vec![("fs.defaultFS", "viewfs://test")])
954            .build()
955            .is_err());
956    }
957
958    #[test]
959    fn test_mount_link_resolve() {
960        let protocol = create_protocol("hdfs://127.0.0.1:9000");
961        let link = MountLink::new("/view", "/hdfs", protocol);
962
963        assert_eq!(link.resolve("/view/dir/file").unwrap(), "/hdfs/dir/file");
964        assert_eq!(link.resolve("/view").unwrap(), "/hdfs");
965        assert!(link.resolve("/hdfs/path").is_none());
966    }
967
968    #[test]
969    fn test_fallback_link() {
970        let protocol = create_protocol("hdfs://127.0.0.1:9000");
971        let link = MountLink::new("", "/hdfs", Arc::clone(&protocol));
972
973        assert_eq!(link.resolve("/path/to/file").unwrap(), "/hdfs/path/to/file");
974        assert_eq!(link.resolve("/").unwrap(), "/hdfs/");
975        assert_eq!(link.resolve("/hdfs/path").unwrap(), "/hdfs/hdfs/path");
976
977        let link = MountLink::new("", "", protocol);
978        assert_eq!(link.resolve("/").unwrap(), "/");
979    }
980
981    #[test]
982    fn test_mount_table_resolve() {
983        let link1 = MountLink::new(
984            "/mount1",
985            "/path1/nested",
986            create_protocol("hdfs://127.0.0.1:9000"),
987        );
988        let link2 = MountLink::new(
989            "/mount2",
990            "/path2",
991            create_protocol("hdfs://127.0.0.1:9001"),
992        );
993        let link3 = MountLink::new(
994            "/mount3/nested",
995            "/path3",
996            create_protocol("hdfs://127.0.0.1:9002"),
997        );
998        let fallback = MountLink::new("/", "/path4", create_protocol("hdfs://127.0.0.1:9003"));
999
1000        let mount_table = MountTable {
1001            mounts: vec![link1, link2, link3],
1002            fallback,
1003        };
1004
1005        // Exact mount path resolves to the exact HDFS path
1006        let (link, resolved) = mount_table.resolve("/mount1");
1007        assert_eq!(link.viewfs_path, "/mount1");
1008        assert_eq!(resolved, "/path1/nested");
1009
1010        // Trailing slash is treated the same
1011        let (link, resolved) = mount_table.resolve("/mount1/");
1012        assert_eq!(link.viewfs_path, "/mount1");
1013        assert_eq!(resolved, "/path1/nested/");
1014
1015        // Doesn't do partial matches on a directory name
1016        let (link, resolved) = mount_table.resolve("/mount12");
1017        assert_eq!(link.viewfs_path, "");
1018        assert_eq!(resolved, "/path4/mount12");
1019
1020        let (link, resolved) = mount_table.resolve("/mount3/file");
1021        assert_eq!(link.viewfs_path, "");
1022        assert_eq!(resolved, "/path4/mount3/file");
1023
1024        let (link, resolved) = mount_table.resolve("/mount3/nested/file");
1025        assert_eq!(link.viewfs_path, "/mount3/nested");
1026        assert_eq!(resolved, "/path3/file");
1027    }
1028}