1use std::collections::{HashMap, VecDeque};
2use std::sync::{Arc, OnceLock};
3use std::time::{SystemTime, UNIX_EPOCH};
4
5use futures::stream::BoxStream;
6use futures::{StreamExt, stream};
7use tokio::runtime::{Handle, Runtime};
8use url::Url;
9
10use crate::acl::{AclEntry, AclStatus};
11use crate::common::config::{self, Configuration};
12use crate::ec::resolve_ec_policy;
13use crate::error::{HdfsError, Result};
14use crate::file::{FileReader, FileWriter};
15use crate::hdfs::protocol::NamenodeProtocol;
16use crate::hdfs::proxy::NameServiceProxy;
17use crate::proto::hdfs::hdfs_file_status_proto::FileType;
18use crate::security::user::User;
19
20use crate::glob::{GlobPattern, expand_glob, get_path_components, unescape_component};
21use crate::proto::hdfs::{ContentSummaryProto, HdfsFileStatusProto};
22
23const TRASH_ROOT_DIR: &str = ".Trash";
24const TRASH_CURRENT_DIR: &str = "Current";
25const TRASH_DIR_PERMISSION: u32 = 0o700;
26
27#[derive(Clone)]
28pub struct WriteOptions {
29 pub block_size: Option<u64>,
31 pub replication: Option<u32>,
33 pub permission: u32,
36 pub overwrite: bool,
39 pub create_parent: bool,
42}
43
44impl Default for WriteOptions {
45 fn default() -> Self {
46 Self {
47 block_size: None,
48 replication: None,
49 permission: 0o644,
50 overwrite: false,
51 create_parent: true,
52 }
53 }
54}
55
56impl AsRef<WriteOptions> for WriteOptions {
57 fn as_ref(&self) -> &WriteOptions {
58 self
59 }
60}
61
62impl WriteOptions {
63 pub fn block_size(mut self, block_size: u64) -> Self {
65 self.block_size = Some(block_size);
66 self
67 }
68
69 pub fn replication(mut self, replication: u32) -> Self {
71 self.replication = Some(replication);
72 self
73 }
74
75 pub fn permission(mut self, permission: u32) -> Self {
77 self.permission = permission;
78 self
79 }
80
81 pub fn overwrite(mut self, overwrite: bool) -> Self {
83 self.overwrite = overwrite;
84 self
85 }
86
87 pub fn create_parent(mut self, create_parent: bool) -> Self {
89 self.create_parent = create_parent;
90 self
91 }
92}
93
94#[derive(Debug, Clone)]
95struct MountLink {
96 viewfs_path: String,
97 hdfs_path: String,
98 protocol: Arc<NamenodeProtocol>,
99}
100
101impl MountLink {
102 fn new(viewfs_path: &str, hdfs_path: &str, protocol: Arc<NamenodeProtocol>) -> Self {
103 Self {
105 viewfs_path: viewfs_path.trim_end_matches("/").to_string(),
106 hdfs_path: hdfs_path.trim_end_matches("/").to_string(),
107 protocol,
108 }
109 }
110 fn resolve(&self, path: &str) -> Option<String> {
112 if path == self.viewfs_path {
115 Some(self.hdfs_path.clone())
116 } else {
117 path.strip_prefix(&format!("{}/", self.viewfs_path))
118 .map(|relative_path| format!("{}/{}", &self.hdfs_path, relative_path))
119 }
120 }
121}
122
123#[derive(Debug)]
124struct MountTable {
125 mounts: Vec<MountLink>,
126 fallback: MountLink,
127 home_dir: String,
128}
129
130impl MountTable {
131 fn resolve(&self, src: &str) -> (&MountLink, String) {
132 let path = if src.starts_with('/') {
133 src.to_string()
134 } else {
135 format!("{}/{}", self.home_dir, src)
136 };
137
138 for link in self.mounts.iter() {
139 if let Some(resolved) = link.resolve(&path) {
140 return (link, resolved);
141 }
142 }
143 (&self.fallback, self.fallback.resolve(&path).unwrap())
144 }
145}
146
147fn build_home_dir(
148 scheme: &str,
149 host: Option<&str>,
150 config: &Configuration,
151 username: &str,
152) -> String {
153 let prefix = match scheme {
154 "hdfs" => config.get("dfs.user.home.dir.prefix"),
155 "viewfs" => {
156 host.and_then(|host| config.get(&format!("fs.viewfs.mounttable.{host}.homedir")))
157 }
158 _ => None,
159 }
160 .unwrap_or("/user");
161
162 let prefix = prefix.trim_end_matches('/');
163 if prefix.is_empty() {
164 format!("/{username}")
165 } else {
166 format!("{prefix}/{username}")
167 }
168}
169
170#[derive(Debug)]
172pub enum IORuntime {
173 Runtime(Runtime),
174 Handle(Handle),
175}
176
177impl From<Runtime> for IORuntime {
178 fn from(value: Runtime) -> Self {
179 Self::Runtime(value)
180 }
181}
182
183impl From<Handle> for IORuntime {
184 fn from(value: Handle) -> Self {
185 Self::Handle(value)
186 }
187}
188
189impl IORuntime {
190 fn handle(&self) -> Handle {
191 match self {
192 Self::Runtime(runtime) => runtime.handle().clone(),
193 Self::Handle(handle) => handle.clone(),
194 }
195 }
196}
197
198#[derive(Default)]
262pub struct ClientBuilder {
263 url: Option<String>,
264 config: Option<HashMap<String, String>>,
265 config_dir: Option<String>,
266 runtime: Option<IORuntime>,
267 user: Option<String>,
268}
269
270impl ClientBuilder {
271 pub fn new() -> Self {
273 Self::default()
274 }
275
276 pub fn with_url(mut self, url: impl Into<String>) -> Self {
278 self.url = Some(url.into());
279 self
280 }
281
282 pub fn with_config(
284 mut self,
285 config: impl IntoIterator<Item = (impl Into<String>, impl Into<String>)>,
286 ) -> Self {
287 self.config = Some(
288 config
289 .into_iter()
290 .map(|(k, v)| (k.into(), v.into()))
291 .collect(),
292 );
293 self
294 }
295
296 pub fn with_config_dir(mut self, config_dir: impl Into<String>) -> Self {
298 self.config_dir = Some(config_dir.into());
299 self
300 }
301
302 pub fn with_io_runtime(mut self, runtime: impl Into<IORuntime>) -> Self {
305 self.runtime = Some(runtime.into());
306 self
307 }
308
309 pub fn with_user(mut self, user: impl Into<String>) -> Self {
311 self.user = Some(user.into());
312 self
313 }
314
315 pub fn build(self) -> Result<Client> {
317 let config = Configuration::new(self.config_dir, self.config)?;
318 let url = if let Some(url) = self.url {
319 Url::parse(&url)?
320 } else {
321 Client::default_fs(&config)?
322 };
323
324 Client::build(&url, config, self.runtime, self.user)
325 }
326}
327
328#[derive(Clone, Debug)]
329enum RuntimeHolder {
330 Custom(Arc<IORuntime>),
331 Default(Arc<OnceLock<Runtime>>),
332}
333
334impl RuntimeHolder {
335 fn new(rt: Option<IORuntime>) -> Self {
336 if let Some(rt) = rt {
337 Self::Custom(Arc::new(rt))
338 } else {
339 Self::Default(Arc::new(OnceLock::new()))
340 }
341 }
342
343 fn get_handle(&self) -> Handle {
344 match self {
345 Self::Custom(rt) => rt.handle().clone(),
346 Self::Default(rt) => match Handle::try_current() {
347 Ok(handle) => handle,
348 Err(_) => rt
349 .get_or_init(|| Runtime::new().expect("Failed to create tokio runtime"))
350 .handle()
351 .clone(),
352 },
353 }
354 }
355}
356
357#[derive(Clone, Debug)]
359pub struct Client {
360 mount_table: Arc<MountTable>,
361 config: Arc<Configuration>,
362 rt_holder: RuntimeHolder,
365}
366
367impl Client {
368 fn default_fs(config: &Configuration) -> Result<Url> {
369 let url = config
370 .get(config::DEFAULT_FS)
371 .ok_or(HdfsError::InvalidArgument(format!(
372 "No {} setting found",
373 config::DEFAULT_FS
374 )))?;
375 Ok(Url::parse(url)?)
376 }
377
378 fn build(
379 url: &Url,
380 config: Configuration,
381 rt: Option<IORuntime>,
382 user: Option<String>,
383 ) -> Result<Self> {
384 let resolved_url = if !url.has_host() {
385 let default_url = Self::default_fs(&config)?;
386 if url.scheme() != default_url.scheme() || !default_url.has_host() {
387 return Err(HdfsError::InvalidArgument(
388 "URL must contain a host".to_string(),
389 ));
390 }
391 default_url
392 } else {
393 url.clone()
394 };
395
396 let config = Arc::new(config);
397
398 let rt_holder = RuntimeHolder::new(rt);
399
400 let user_info = User::get_user_info(user.clone(), config.security_enabled());
401 let username = user_info
402 .effective_user
403 .as_deref()
404 .or(user_info.real_user.as_deref())
405 .expect("User info must include a username");
406 let home_dir = build_home_dir(
407 resolved_url.scheme(),
408 resolved_url.host_str(),
409 config.as_ref(),
410 username,
411 );
412
413 let mount_table = match url.scheme() {
414 "hdfs" => {
415 let proxy = NameServiceProxy::new(
416 &resolved_url,
417 Arc::clone(&config),
418 rt_holder.get_handle(),
419 user.clone(),
420 )?;
421 let protocol = Arc::new(NamenodeProtocol::new(proxy, rt_holder.get_handle()));
422
423 MountTable {
424 mounts: Vec::new(),
425 fallback: MountLink::new("/", "/", protocol),
426 home_dir,
427 }
428 }
429 "viewfs" => Self::build_mount_table(
430 resolved_url.host_str().expect("URL must have a host"),
432 Arc::clone(&config),
433 rt_holder.get_handle(),
434 user.clone(),
435 home_dir,
436 )?,
437 _ => {
438 return Err(HdfsError::InvalidArgument(
439 "Only `hdfs` and `viewfs` schemes are supported".to_string(),
440 ));
441 }
442 };
443
444 Ok(Self {
445 mount_table: Arc::new(mount_table),
446 config,
447 rt_holder,
448 })
449 }
450
451 fn build_mount_table(
452 host: &str,
453 config: Arc<Configuration>,
454 handle: Handle,
455 effective_user: Option<String>,
456 home_dir: String,
457 ) -> Result<MountTable> {
458 let mut mounts: Vec<MountLink> = Vec::new();
459 let mut fallback: Option<MountLink> = None;
460
461 for (viewfs_path, hdfs_url) in config.get_mount_table(host).iter() {
462 let url = Url::parse(hdfs_url)?;
463 if !url.has_host() {
464 return Err(HdfsError::InvalidArgument(
465 "URL must contain a host".to_string(),
466 ));
467 }
468 if url.scheme() != "hdfs" {
469 return Err(HdfsError::InvalidArgument(
470 "Only hdfs mounts are supported for viewfs".to_string(),
471 ));
472 }
473 let proxy = NameServiceProxy::new(
474 &url,
475 Arc::clone(&config),
476 handle.clone(),
477 effective_user.clone(),
478 )?;
479 let protocol = Arc::new(NamenodeProtocol::new(proxy, handle.clone()));
480
481 if let Some(prefix) = viewfs_path {
482 mounts.push(MountLink::new(prefix, url.path(), protocol));
483 } else {
484 if fallback.is_some() {
485 return Err(HdfsError::InvalidArgument(
486 "Multiple viewfs fallback links found".to_string(),
487 ));
488 }
489 fallback = Some(MountLink::new("/", url.path(), protocol));
490 }
491 }
492
493 if let Some(fallback) = fallback {
494 mounts.sort_by_key(|m| m.viewfs_path.chars().filter(|c| *c == '/').count());
496 mounts.reverse();
497
498 Ok(MountTable {
499 mounts,
500 fallback,
501 home_dir,
502 })
503 } else {
504 Err(HdfsError::InvalidArgument(
505 "No viewfs fallback mount found".to_string(),
506 ))
507 }
508 }
509
510 fn normalize_path(path: &str) -> String {
511 let mut normalized = if path.is_empty() {
512 "/".to_string()
513 } else {
514 path.to_string()
515 };
516 if !normalized.starts_with('/') {
517 normalized.insert(0, '/');
518 }
519 while normalized.len() > 1 && normalized.ends_with('/') {
520 normalized.pop();
521 }
522 normalized
523 }
524
525 fn join_paths(base: &str, suffix: &str) -> String {
526 if suffix.is_empty() {
527 return base.to_string();
528 }
529 let trimmed_base = if base.is_empty() { "/" } else { base };
530 let suffix = suffix.trim_start_matches('/');
531 if trimmed_base == "/" {
532 format!("/{suffix}")
533 } else {
534 format!("{}/{}", trimmed_base.trim_end_matches('/'), suffix)
535 }
536 }
537
538 fn is_prefix_path(parent: &str, child: &str) -> bool {
539 if parent == "/" {
540 return true;
541 }
542 child == parent || child.starts_with(&format!("{parent}/"))
543 }
544
545 fn current_time_millis() -> u64 {
546 SystemTime::now()
547 .duration_since(UNIX_EPOCH)
548 .unwrap_or_default()
549 .as_millis() as u64
550 }
551
552 fn absolute_path(&self, path: &str) -> String {
553 if path.starts_with('/') {
554 Self::normalize_path(path)
555 } else {
556 let home = self.mount_table.home_dir.trim_end_matches('/');
557 Self::normalize_path(&format!("{home}/{path}"))
558 }
559 }
560
561 fn trash_root_path(&self) -> String {
562 let home = Self::normalize_path(&self.mount_table.home_dir);
563 Self::join_paths(&home, TRASH_ROOT_DIR)
564 }
565
566 async fn trash_enabled(&self, path: &str) -> Result<bool> {
567 let (link, _) = self.mount_table.resolve(path);
568 let server_defaults = link.protocol.get_cached_server_defaults().await?;
569 Ok(server_defaults.trash_interval.unwrap_or_default() > 0)
570 }
571
572 fn split_parent_name(path: &str) -> Result<(String, String)> {
573 let normalized = Self::normalize_path(path);
574 if normalized == "/" {
575 return Err(HdfsError::InvalidArgument(
576 "Cannot move the root directory to trash".to_string(),
577 ));
578 }
579 let (parent, name) = normalized
580 .rsplit_once('/')
581 .expect("Normalized path always contains '/'");
582 let parent = if parent.is_empty() {
583 "/".to_string()
584 } else {
585 parent.to_string()
586 };
587 Ok((parent, name.to_string()))
588 }
589
590 async fn non_dir_ancestor(&self, path: &str) -> Result<Option<String>> {
591 let normalized = Self::normalize_path(path);
592 let mut current = "/".to_string();
593 for component in normalized.trim_start_matches('/').split('/') {
594 if component.is_empty() {
595 continue;
596 }
597 current = Self::join_paths(¤t, component);
598 match self.get_file_info(¤t).await {
599 Ok(status) => {
600 if !status.isdir {
601 return Ok(Some(current));
602 }
603 }
604 Err(HdfsError::FileNotFound(_)) => return Ok(None),
605 Err(err) => return Err(err),
606 }
607 }
608 Ok(None)
609 }
610
611 async fn ensure_unique_trash_path(&self, path: String) -> Result<String> {
612 let base = path.clone();
613 let mut candidate = path;
614 loop {
615 match self.get_file_info(&candidate).await {
616 Ok(_) => {
617 candidate = format!("{}{}", base, Self::current_time_millis());
618 }
619 Err(HdfsError::FileNotFound(_)) => return Ok(candidate),
620 Err(err) => return Err(err),
621 }
622 }
623 }
624
625 pub async fn get_file_info(&self, path: &str) -> Result<FileStatus> {
627 let (link, resolved_path) = self.mount_table.resolve(path);
628 match link.protocol.get_file_info(&resolved_path).await?.fs {
629 Some(status) => Ok(FileStatus::from(status, path)),
630 None => Err(HdfsError::FileNotFound(path.to_string())),
631 }
632 }
633
634 pub async fn list_status(&self, path: &str, recursive: bool) -> Result<Vec<FileStatus>> {
637 let iter = self.list_status_iter(path, recursive);
638 let statuses = iter
639 .into_stream()
640 .collect::<Vec<Result<FileStatus>>>()
641 .await;
642
643 let mut resolved_statues = Vec::<FileStatus>::with_capacity(statuses.len());
644 for status in statuses.into_iter() {
645 resolved_statues.push(status?);
646 }
647
648 Ok(resolved_statues)
649 }
650
651 pub fn list_status_iter(&self, path: &str, recursive: bool) -> ListStatusIterator {
653 ListStatusIterator::new(path.to_string(), Arc::clone(&self.mount_table), recursive)
654 }
655
656 pub async fn read(&self, path: &str) -> Result<FileReader> {
658 let (link, resolved_path) = self.mount_table.resolve(path);
659 let located_info = link
661 .protocol
662 .get_block_locations(&resolved_path, 0, i64::MAX as u64)
663 .await?;
664
665 if let Some(locations) = located_info.locations {
666 let ec_schema = if let Some(ec_policy) = locations.ec_policy.as_ref() {
667 Some(resolve_ec_policy(ec_policy)?)
668 } else {
669 None
670 };
671
672 if locations.file_encryption_info.is_some() {
673 return Err(HdfsError::UnsupportedFeature("File encryption".to_string()));
674 }
675
676 Ok(FileReader::new(
677 Arc::clone(&link.protocol),
678 locations,
679 ec_schema,
680 Arc::clone(&self.config),
681 self.rt_holder.get_handle(),
682 ))
683 } else {
684 Err(HdfsError::FileNotFound(path.to_string()))
685 }
686 }
687
688 pub async fn create(
691 &self,
692 src: &str,
693 write_options: impl AsRef<WriteOptions>,
694 ) -> Result<FileWriter> {
695 let write_options = write_options.as_ref();
696
697 let (link, resolved_path) = self.mount_table.resolve(src);
698
699 let create_response = link
700 .protocol
701 .create(
702 &resolved_path,
703 write_options.permission,
704 write_options.overwrite,
705 write_options.create_parent,
706 write_options.replication,
707 write_options.block_size,
708 )
709 .await?;
710
711 match create_response.fs {
712 Some(status) => {
713 if status.file_encryption_info.is_some() {
714 let _ = self.delete(src, false).await;
715 return Err(HdfsError::UnsupportedFeature("File encryption".to_string()));
716 }
717
718 Ok(FileWriter::new(
719 Arc::clone(&link.protocol),
720 resolved_path,
721 status,
722 Arc::clone(&self.config),
723 self.rt_holder.get_handle(),
724 None,
725 ))
726 }
727 None => Err(HdfsError::FileNotFound(src.to_string())),
728 }
729 }
730
731 fn needs_new_block(class: &str, msg: &str) -> bool {
732 class == "java.lang.UnsupportedOperationException" && msg.contains("NEW_BLOCK")
733 }
734
735 pub async fn append(&self, src: &str) -> Result<FileWriter> {
739 let (link, resolved_path) = self.mount_table.resolve(src);
740
741 let append_response = match link.protocol.append(&resolved_path, false).await {
744 Err(HdfsError::RPCError(class, msg)) if Self::needs_new_block(&class, &msg) => {
745 link.protocol.append(&resolved_path, true).await?
746 }
747 resp => resp?,
748 };
749
750 match append_response.stat {
751 Some(status) => {
752 if status.file_encryption_info.is_some() {
753 let _ = link
754 .protocol
755 .complete(src, append_response.block.map(|b| b.b), status.file_id)
756 .await;
757 return Err(HdfsError::UnsupportedFeature("File encryption".to_string()));
758 }
759
760 Ok(FileWriter::new(
761 Arc::clone(&link.protocol),
762 resolved_path,
763 status,
764 Arc::clone(&self.config),
765 self.rt_holder.get_handle(),
766 append_response.block,
767 ))
768 }
769 None => Err(HdfsError::FileNotFound(src.to_string())),
770 }
771 }
772
773 pub async fn mkdirs(&self, path: &str, permission: u32, create_parent: bool) -> Result<()> {
781 let (link, resolved_path) = self.mount_table.resolve(path);
782 link.protocol
783 .mkdirs(&resolved_path, permission, create_parent)
784 .await
785 .map(|_| ())
786 }
787
788 async fn rename_internal(
789 &self,
790 src: &str,
791 dst: &str,
792 overwrite: bool,
793 move_to_trash: bool,
794 ) -> Result<()> {
795 let (src_link, src_resolved_path) = self.mount_table.resolve(src);
796 let (dst_link, dst_resolved_path) = self.mount_table.resolve(dst);
797 if src_link.viewfs_path == dst_link.viewfs_path {
798 src_link
799 .protocol
800 .rename(
801 &src_resolved_path,
802 &dst_resolved_path,
803 overwrite,
804 move_to_trash,
805 )
806 .await
807 .map(|_| ())
808 } else {
809 Err(HdfsError::InvalidArgument(
810 "Cannot rename across different name services".to_string(),
811 ))
812 }
813 }
814
815 pub async fn rename(&self, src: &str, dst: &str, overwrite: bool) -> Result<()> {
817 self.rename_internal(src, dst, overwrite, false).await
818 }
819
820 pub async fn delete(&self, path: &str, recursive: bool) -> Result<bool> {
823 let (link, resolved_path) = self.mount_table.resolve(path);
824 link.protocol
825 .delete(&resolved_path, recursive)
826 .await
827 .map(|r| r.result)
828 }
829
830 pub async fn trash(&self, path: &str) -> Result<Option<String>> {
834 if path.is_empty() {
835 return Err(HdfsError::InvalidPath("Empty path".to_string()));
836 }
837
838 let src_abs = self.absolute_path(path);
839 if !self.trash_enabled(&src_abs).await? {
840 return Err(HdfsError::TrashNotEnabled);
841 }
842
843 let trash_root = self.trash_root_path();
844
845 if Self::is_prefix_path(&trash_root, &src_abs) {
846 return Ok(None);
847 }
848 if Self::is_prefix_path(&src_abs, &trash_root) {
849 return Err(HdfsError::InvalidArgument(
850 "Cannot move to trash because it contains the trash".to_string(),
851 ));
852 }
853
854 let _ = self.get_file_info(&src_abs).await?;
855
856 let (src_parent, src_name) = Self::split_parent_name(&src_abs)?;
857 let trash_current = Self::join_paths(&trash_root, TRASH_CURRENT_DIR);
858 let src_parent_rel = src_parent.trim_start_matches('/');
859 let mut base_trash_path = if src_parent_rel.is_empty() {
860 trash_current.clone()
861 } else {
862 Self::join_paths(&trash_current, src_parent_rel)
863 };
864 let mut trash_path = Self::join_paths(&base_trash_path, &src_name);
865
866 for attempt in 0..2 {
867 let mut mkdirs_error: Option<HdfsError> = None;
868 loop {
869 match self
870 .mkdirs(&base_trash_path, TRASH_DIR_PERMISSION, true)
871 .await
872 {
873 Ok(()) => break,
874 Err(err) => {
875 if let Some(ancestor) = self.non_dir_ancestor(&base_trash_path).await? {
876 let timestamp = Self::current_time_millis();
877 base_trash_path = base_trash_path.replacen(
878 &ancestor,
879 &format!("{ancestor}{timestamp}"),
880 1,
881 );
882 trash_path = Self::join_paths(&base_trash_path, &src_name);
883 continue;
884 }
885 mkdirs_error = Some(err);
886 break;
887 }
888 }
889 }
890
891 if let Some(err) = mkdirs_error {
892 if attempt == 0 {
893 continue;
894 }
895 return Err(err);
896 }
897
898 let unique_trash_path = self.ensure_unique_trash_path(trash_path.clone()).await?;
899 match self
900 .rename_internal(&src_abs, &unique_trash_path, false, true)
901 .await
902 {
903 Ok(()) => return Ok(Some(unique_trash_path)),
904 Err(_) if attempt == 0 => continue,
905 Err(err) => return Err(err),
906 }
907 }
908
909 Err(HdfsError::OperationFailed(
910 "Failed to move to trash after retry".to_string(),
911 ))
912 }
913
914 pub async fn set_times(&self, path: &str, mtime: u64, atime: u64) -> Result<()> {
916 let (link, resolved_path) = self.mount_table.resolve(path);
917 link.protocol
918 .set_times(&resolved_path, mtime, atime)
919 .await?;
920 Ok(())
921 }
922
923 pub async fn set_owner(
925 &self,
926 path: &str,
927 owner: Option<&str>,
928 group: Option<&str>,
929 ) -> Result<()> {
930 let (link, resolved_path) = self.mount_table.resolve(path);
931 link.protocol
932 .set_owner(&resolved_path, owner, group)
933 .await?;
934 Ok(())
935 }
936
937 pub async fn set_permission(&self, path: &str, permission: u32) -> Result<()> {
942 let (link, resolved_path) = self.mount_table.resolve(path);
943 link.protocol
944 .set_permission(&resolved_path, permission)
945 .await?;
946 Ok(())
947 }
948
949 pub async fn set_replication(&self, path: &str, replication: u32) -> Result<bool> {
951 let (link, resolved_path) = self.mount_table.resolve(path);
952 let result = link
953 .protocol
954 .set_replication(&resolved_path, replication)
955 .await?
956 .result;
957
958 Ok(result)
959 }
960
961 pub async fn get_content_summary(&self, path: &str) -> Result<ContentSummary> {
963 let (link, resolved_path) = self.mount_table.resolve(path);
964 let result = link
965 .protocol
966 .get_content_summary(&resolved_path)
967 .await?
968 .summary;
969
970 Ok(result.into())
971 }
972
973 pub async fn modify_acl_entries(&self, path: &str, acl_spec: Vec<AclEntry>) -> Result<()> {
975 let (link, resolved_path) = self.mount_table.resolve(path);
976 link.protocol
977 .modify_acl_entries(&resolved_path, acl_spec)
978 .await?;
979
980 Ok(())
981 }
982
983 pub async fn remove_acl_entries(&self, path: &str, acl_spec: Vec<AclEntry>) -> Result<()> {
985 let (link, resolved_path) = self.mount_table.resolve(path);
986 link.protocol
987 .remove_acl_entries(&resolved_path, acl_spec)
988 .await?;
989
990 Ok(())
991 }
992
993 pub async fn remove_default_acl(&self, path: &str) -> Result<()> {
995 let (link, resolved_path) = self.mount_table.resolve(path);
996 link.protocol.remove_default_acl(&resolved_path).await?;
997
998 Ok(())
999 }
1000
1001 pub async fn remove_acl(&self, path: &str) -> Result<()> {
1003 let (link, resolved_path) = self.mount_table.resolve(path);
1004 link.protocol.remove_acl(&resolved_path).await?;
1005
1006 Ok(())
1007 }
1008
1009 pub async fn set_acl(&self, path: &str, acl_spec: Vec<AclEntry>) -> Result<()> {
1013 let (link, resolved_path) = self.mount_table.resolve(path);
1014 link.protocol.set_acl(&resolved_path, acl_spec).await?;
1015
1016 Ok(())
1017 }
1018
1019 pub async fn get_acl_status(&self, path: &str) -> Result<AclStatus> {
1021 let (link, resolved_path) = self.mount_table.resolve(path);
1022 Ok(link
1023 .protocol
1024 .get_acl_status(&resolved_path)
1025 .await?
1026 .result
1027 .into())
1028 }
1029
1030 pub async fn glob_status(&self, pattern: &str) -> Result<Vec<FileStatus>> {
1033 let flattened = expand_glob(pattern.to_string())?;
1035
1036 let mut results: Vec<FileStatus> = Vec::new();
1037
1038 for flat in flattened.into_iter() {
1039 if flat.is_empty() {
1042 continue;
1043 }
1044
1045 let components = get_path_components(&flat);
1046
1047 #[derive(Clone, Debug)]
1049 struct Candidate {
1050 path: String,
1051 status: Option<FileStatus>,
1052 }
1053
1054 let mut candidates: Vec<Candidate> = vec![Candidate {
1056 path: "/".to_string(),
1057 status: None,
1058 }];
1059
1060 for (idx, comp) in components.iter().enumerate() {
1061 if candidates.is_empty() {
1062 break;
1063 }
1064
1065 let is_last = idx == components.len() - 1;
1066
1067 let unescaped = unescape_component(comp);
1068 let glob_pat = GlobPattern::new(comp)?;
1069
1070 if !is_last && !glob_pat.has_wildcard() {
1071 for cand in candidates.iter_mut() {
1073 if !cand.path.ends_with('/') {
1074 cand.path.push('/');
1075 }
1076 cand.path.push_str(&unescaped);
1077 }
1079 continue;
1080 }
1081
1082 let mut new_candidates: Vec<Candidate> = Vec::new();
1083
1084 for cand in candidates.into_iter() {
1085 if glob_pat.has_wildcard() {
1086 let listing = match self.list_status(&cand.path, false).await {
1088 Ok(listing) => listing,
1089 Err(HdfsError::FileNotFound(_)) => continue,
1090 Err(e) => return Err(e),
1091 };
1092 if listing.len() == 1 && listing[0].path == cand.path {
1093 continue;
1095 }
1096
1097 for child in listing.into_iter() {
1098 if !is_last && !child.isdir {
1100 continue;
1101 }
1102
1103 let name = child
1106 .path
1107 .rsplit_once('/')
1108 .map(|(_, n)| n)
1109 .unwrap_or(child.path.as_str());
1110
1111 if glob_pat.matches(name) {
1112 new_candidates.push(Candidate {
1113 path: child.path.clone(),
1114 status: Some(child),
1115 });
1116 }
1117 }
1118 } else {
1119 let mut next_path = cand.path.clone();
1121 if !next_path.ends_with('/') {
1122 next_path.push('/');
1123 }
1124 next_path.push_str(&unescaped);
1125
1126 match self.get_file_info(&next_path).await {
1127 Ok(status) => {
1128 if is_last || status.isdir {
1129 new_candidates.push(Candidate {
1130 path: status.path.clone(),
1131 status: Some(status),
1132 });
1133 }
1134 }
1135 Err(HdfsError::FileNotFound(_)) => continue,
1136 Err(e) => return Err(e),
1137 }
1138 }
1139 }
1140
1141 candidates = new_candidates;
1142 }
1143
1144 for cand in candidates.into_iter() {
1146 let status = if let Some(s) = cand.status {
1147 s
1148 } else {
1149 match self.get_file_info(&cand.path).await {
1151 Ok(s) => s,
1152 Err(HdfsError::FileNotFound(_)) => continue,
1153 Err(e) => return Err(e),
1154 }
1155 };
1156
1157 results.push(status);
1158 }
1159 }
1160
1161 Ok(results)
1162 }
1163}
1164
1165impl Default for Client {
1166 fn default() -> Self {
1169 ClientBuilder::new()
1170 .build()
1171 .expect("Failed to create default client")
1172 }
1173}
1174
1175pub(crate) struct DirListingIterator {
1176 path: String,
1177 resolved_path: String,
1178 link: MountLink,
1179 files_only: bool,
1180 partial_listing: VecDeque<HdfsFileStatusProto>,
1181 remaining: u32,
1182 last_seen: Vec<u8>,
1183}
1184
1185impl DirListingIterator {
1186 fn new(path: String, mount_table: &Arc<MountTable>, files_only: bool) -> Self {
1187 let (link, resolved_path) = mount_table.resolve(&path);
1188
1189 DirListingIterator {
1190 path,
1191 resolved_path,
1192 link: link.clone(),
1193 files_only,
1194 partial_listing: VecDeque::new(),
1195 remaining: 1,
1196 last_seen: Vec::new(),
1197 }
1198 }
1199
1200 async fn get_next_batch(&mut self) -> Result<bool> {
1201 let listing = self
1202 .link
1203 .protocol
1204 .get_listing(&self.resolved_path, self.last_seen.clone(), false)
1205 .await?;
1206
1207 if let Some(dir_list) = listing.dir_list {
1208 self.last_seen = dir_list
1209 .partial_listing
1210 .last()
1211 .map(|p| p.path.clone())
1212 .unwrap_or(Vec::new());
1213
1214 self.remaining = dir_list.remaining_entries;
1215
1216 self.partial_listing = dir_list
1217 .partial_listing
1218 .into_iter()
1219 .filter(|s| !self.files_only || s.file_type() != FileType::IsDir)
1220 .collect();
1221 Ok(!self.partial_listing.is_empty())
1222 } else {
1223 Err(HdfsError::FileNotFound(self.path.clone()))
1224 }
1225 }
1226
1227 pub async fn next(&mut self) -> Option<Result<FileStatus>> {
1228 if self.partial_listing.is_empty()
1229 && self.remaining > 0
1230 && let Err(error) = self.get_next_batch().await
1231 {
1232 self.remaining = 0;
1233 return Some(Err(error));
1234 }
1235 if let Some(next) = self.partial_listing.pop_front() {
1236 Some(Ok(FileStatus::from(next, &self.path)))
1237 } else {
1238 None
1239 }
1240 }
1241}
1242
1243pub struct ListStatusIterator {
1244 mount_table: Arc<MountTable>,
1245 recursive: bool,
1246 iters: Arc<tokio::sync::Mutex<Vec<DirListingIterator>>>,
1247}
1248
1249impl ListStatusIterator {
1250 fn new(path: String, mount_table: Arc<MountTable>, recursive: bool) -> Self {
1251 let initial = DirListingIterator::new(path.clone(), &mount_table, false);
1252
1253 ListStatusIterator {
1254 mount_table,
1255 recursive,
1256 iters: Arc::new(tokio::sync::Mutex::new(vec![initial])),
1257 }
1258 }
1259
1260 pub async fn next(&self) -> Option<Result<FileStatus>> {
1261 let mut next_file: Option<Result<FileStatus>> = None;
1262 let mut iters = self.iters.lock().await;
1263 while next_file.is_none() {
1264 if let Some(iter) = iters.last_mut() {
1265 if let Some(file_result) = iter.next().await {
1266 if let Ok(file) = file_result {
1267 if file.isdir && self.recursive {
1270 iters.push(DirListingIterator::new(
1271 file.path.clone(),
1272 &self.mount_table,
1273 false,
1274 ))
1275 }
1276 next_file = Some(Ok(file));
1277 } else {
1278 next_file = Some(file_result)
1280 }
1281 } else {
1282 iters.pop();
1284 }
1285 } else {
1286 break;
1288 }
1289 }
1290
1291 next_file
1292 }
1293
1294 pub fn into_stream(self) -> BoxStream<'static, Result<FileStatus>> {
1295 let listing = stream::unfold(self, |state| async move {
1296 let next = state.next().await;
1297 next.map(|n| (n, state))
1298 });
1299 Box::pin(listing)
1300 }
1301}
1302
1303#[derive(Debug, Clone)]
1304pub struct FileStatus {
1305 pub path: String,
1306 pub length: usize,
1307 pub isdir: bool,
1308 pub permission: u16,
1309 pub owner: String,
1310 pub group: String,
1311 pub modification_time: u64,
1312 pub access_time: u64,
1313 pub replication: Option<u32>,
1314 pub blocksize: Option<u64>,
1315}
1316
1317impl FileStatus {
1318 fn from(value: HdfsFileStatusProto, base_path: &str) -> Self {
1319 let mut path = base_path.trim_end_matches("/").to_string();
1320 let relative_path = std::str::from_utf8(&value.path).unwrap();
1321 if !relative_path.is_empty() {
1322 path.push('/');
1323 path.push_str(relative_path);
1324 }
1325
1326 if path.is_empty() {
1328 path.push('/');
1329 }
1330
1331 FileStatus {
1332 isdir: value.file_type() == FileType::IsDir,
1333 path,
1334 length: value.length as usize,
1335 permission: value.permission.perm as u16,
1336 owner: value.owner,
1337 group: value.group,
1338 modification_time: value.modification_time,
1339 access_time: value.access_time,
1340 replication: value.block_replication,
1341 blocksize: value.blocksize,
1342 }
1343 }
1344}
1345
1346#[derive(Debug)]
1347pub struct ContentSummary {
1348 pub length: u64,
1349 pub file_count: u64,
1350 pub directory_count: u64,
1351 pub quota: u64,
1352 pub space_consumed: u64,
1353 pub space_quota: u64,
1354}
1355
1356impl From<ContentSummaryProto> for ContentSummary {
1357 fn from(value: ContentSummaryProto) -> Self {
1358 ContentSummary {
1359 length: value.length,
1360 file_count: value.file_count,
1361 directory_count: value.directory_count,
1362 quota: value.quota,
1363 space_consumed: value.space_consumed,
1364 space_quota: value.space_quota,
1365 }
1366 }
1367}
1368
1369#[cfg(test)]
1370mod test {
1371 use std::sync::{Arc, LazyLock};
1372
1373 use tokio::runtime::Runtime;
1374 use url::Url;
1375
1376 use crate::{
1377 client::ClientBuilder,
1378 common::config::Configuration,
1379 hdfs::{protocol::NamenodeProtocol, proxy::NameServiceProxy},
1380 };
1381
1382 use super::{MountLink, MountTable};
1383
1384 static RT: LazyLock<Runtime> = LazyLock::new(|| Runtime::new().unwrap());
1385
1386 fn create_protocol(url: &str) -> Arc<NamenodeProtocol> {
1387 let proxy = NameServiceProxy::new(
1388 &Url::parse(url).unwrap(),
1389 Arc::new(Configuration::new(None, None).unwrap()),
1390 RT.handle().clone(),
1391 None,
1392 )
1393 .unwrap();
1394 Arc::new(NamenodeProtocol::new(proxy, RT.handle().clone()))
1395 }
1396
1397 #[test]
1398 fn test_default_fs() {
1399 assert!(
1400 ClientBuilder::new()
1401 .with_config(vec![("fs.defaultFS", "hdfs://test:9000")])
1402 .build()
1403 .is_ok()
1404 );
1405
1406 assert!(
1407 ClientBuilder::new()
1408 .with_config(vec![("fs.defaultFS", "hdfs://")])
1409 .build()
1410 .is_err()
1411 );
1412
1413 assert!(
1414 ClientBuilder::new()
1415 .with_url("hdfs://")
1416 .with_config(vec![("fs.defaultFS", "hdfs://test:9000")])
1417 .build()
1418 .is_ok()
1419 );
1420
1421 assert!(
1422 ClientBuilder::new()
1423 .with_url("hdfs://")
1424 .with_config(vec![("fs.defaultFS", "hdfs://")])
1425 .build()
1426 .is_err()
1427 );
1428
1429 assert!(
1430 ClientBuilder::new()
1431 .with_url("hdfs://")
1432 .with_config(vec![("fs.defaultFS", "viewfs://test")])
1433 .build()
1434 .is_err()
1435 );
1436 }
1437
1438 #[test]
1439 fn test_mount_link_resolve() {
1440 let protocol = create_protocol("hdfs://127.0.0.1:9000");
1441 let link = MountLink::new("/view", "/hdfs", protocol);
1442
1443 assert_eq!(link.resolve("/view/dir/file").unwrap(), "/hdfs/dir/file");
1444 assert_eq!(link.resolve("/view").unwrap(), "/hdfs");
1445 assert!(link.resolve("/hdfs/path").is_none());
1446 }
1447
1448 #[test]
1449 fn test_fallback_link() {
1450 let protocol = create_protocol("hdfs://127.0.0.1:9000");
1451 let link = MountLink::new("", "/hdfs", Arc::clone(&protocol));
1452
1453 assert_eq!(link.resolve("/path/to/file").unwrap(), "/hdfs/path/to/file");
1454 assert_eq!(link.resolve("/").unwrap(), "/hdfs/");
1455 assert_eq!(link.resolve("/hdfs/path").unwrap(), "/hdfs/hdfs/path");
1456
1457 let link = MountLink::new("", "", protocol);
1458 assert_eq!(link.resolve("/").unwrap(), "/");
1459 }
1460
1461 #[test]
1462 fn test_mount_table_resolve() {
1463 let link1 = MountLink::new(
1464 "/mount1",
1465 "/path1/nested",
1466 create_protocol("hdfs://127.0.0.1:9000"),
1467 );
1468 let link2 = MountLink::new(
1469 "/mount2",
1470 "/path2",
1471 create_protocol("hdfs://127.0.0.1:9001"),
1472 );
1473 let link3 = MountLink::new(
1474 "/mount3/nested",
1475 "/path3",
1476 create_protocol("hdfs://127.0.0.1:9002"),
1477 );
1478 let fallback = MountLink::new("/", "/path4", create_protocol("hdfs://127.0.0.1:9003"));
1479
1480 let mount_table = MountTable {
1481 mounts: vec![link1, link2, link3],
1482 fallback,
1483 home_dir: "/user/test".to_string(),
1484 };
1485
1486 let (link, resolved) = mount_table.resolve("/mount1");
1488 assert_eq!(link.viewfs_path, "/mount1");
1489 assert_eq!(resolved, "/path1/nested");
1490
1491 let (link, resolved) = mount_table.resolve("/mount1/");
1493 assert_eq!(link.viewfs_path, "/mount1");
1494 assert_eq!(resolved, "/path1/nested/");
1495
1496 let (link, resolved) = mount_table.resolve("/mount12");
1498 assert_eq!(link.viewfs_path, "");
1499 assert_eq!(resolved, "/path4/mount12");
1500
1501 let (link, resolved) = mount_table.resolve("/mount3/file");
1502 assert_eq!(link.viewfs_path, "");
1503 assert_eq!(resolved, "/path4/mount3/file");
1504
1505 let (link, resolved) = mount_table.resolve("/mount3/nested/file");
1506 assert_eq!(link.viewfs_path, "/mount3/nested");
1507 assert_eq!(resolved, "/path3/file");
1508
1509 let (link, resolved) = mount_table.resolve("file");
1510 assert_eq!(link.viewfs_path, "");
1511 assert_eq!(resolved, "/path4/user/test/file");
1512
1513 let (link, resolved) = mount_table.resolve("dir/subdir");
1514 assert_eq!(link.viewfs_path, "");
1515 assert_eq!(resolved, "/path4/user/test/dir/subdir");
1516
1517 let mount_table = MountTable {
1518 mounts: vec![
1519 MountLink::new(
1520 "/mount1",
1521 "/path1/nested",
1522 create_protocol("hdfs://127.0.0.1:9000"),
1523 ),
1524 MountLink::new(
1525 "/mount2",
1526 "/path2",
1527 create_protocol("hdfs://127.0.0.1:9001"),
1528 ),
1529 ],
1530 fallback: MountLink::new("/", "/path4", create_protocol("hdfs://127.0.0.1:9003")),
1531 home_dir: "/mount1/user".to_string(),
1532 };
1533
1534 let (link, resolved) = mount_table.resolve("file");
1535 assert_eq!(link.viewfs_path, "/mount1");
1536 assert_eq!(resolved, "/path1/nested/user/file");
1537
1538 let (link, resolved) = mount_table.resolve("dir/subdir");
1539 assert_eq!(link.viewfs_path, "/mount1");
1540 assert_eq!(resolved, "/path1/nested/user/dir/subdir");
1541 }
1542
1543 #[test]
1544 fn test_io_runtime() {
1545 assert!(
1546 ClientBuilder::new()
1547 .with_url("hdfs://127.0.0.1:9000")
1548 .with_io_runtime(Runtime::new().unwrap())
1549 .build()
1550 .is_ok()
1551 );
1552
1553 let rt = Runtime::new().unwrap();
1554 assert!(
1555 ClientBuilder::new()
1556 .with_url("hdfs://127.0.0.1:9000")
1557 .with_io_runtime(rt.handle().clone())
1558 .build()
1559 .is_ok()
1560 );
1561 }
1562
1563 #[test]
1564 fn test_with_user_sets_relative_path_home_dir() {
1565 let client = ClientBuilder::new()
1566 .with_url("hdfs://127.0.0.1:9000")
1567 .with_user("alice")
1568 .build()
1569 .unwrap();
1570
1571 let (_, resolved) = client.mount_table.resolve("file");
1572 assert_eq!(resolved, "/user/alice/file");
1573 }
1574
1575 #[test]
1576 fn test_set_conf_dir() {
1577 assert!(
1578 ClientBuilder::new()
1579 .with_url("hdfs://127.0.0.1:9000")
1580 .with_config_dir("target/test")
1581 .build()
1582 .is_ok()
1583 )
1584 }
1585}