1use std::collections::{HashMap, VecDeque};
2use std::sync::{Arc, OnceLock};
3
4use futures::stream::BoxStream;
5use futures::{StreamExt, stream};
6use tokio::runtime::{Handle, Runtime};
7use url::Url;
8
9use crate::acl::{AclEntry, AclStatus};
10use crate::common::config::{self, Configuration};
11use crate::ec::resolve_ec_policy;
12use crate::error::{HdfsError, Result};
13use crate::file::{FileReader, FileWriter};
14use crate::hdfs::protocol::NamenodeProtocol;
15use crate::hdfs::proxy::NameServiceProxy;
16use crate::proto::hdfs::hdfs_file_status_proto::FileType;
17use crate::security::user::User;
18
19use crate::glob::{GlobPattern, expand_glob, get_path_components, unescape_component};
20use crate::proto::hdfs::{ContentSummaryProto, HdfsFileStatusProto};
21
22#[derive(Clone)]
23pub struct WriteOptions {
24 pub block_size: Option<u64>,
26 pub replication: Option<u32>,
28 pub permission: u32,
31 pub overwrite: bool,
34 pub create_parent: bool,
37}
38
39impl Default for WriteOptions {
40 fn default() -> Self {
41 Self {
42 block_size: None,
43 replication: None,
44 permission: 0o644,
45 overwrite: false,
46 create_parent: true,
47 }
48 }
49}
50
51impl AsRef<WriteOptions> for WriteOptions {
52 fn as_ref(&self) -> &WriteOptions {
53 self
54 }
55}
56
57impl WriteOptions {
58 pub fn block_size(mut self, block_size: u64) -> Self {
60 self.block_size = Some(block_size);
61 self
62 }
63
64 pub fn replication(mut self, replication: u32) -> Self {
66 self.replication = Some(replication);
67 self
68 }
69
70 pub fn permission(mut self, permission: u32) -> Self {
72 self.permission = permission;
73 self
74 }
75
76 pub fn overwrite(mut self, overwrite: bool) -> Self {
78 self.overwrite = overwrite;
79 self
80 }
81
82 pub fn create_parent(mut self, create_parent: bool) -> Self {
84 self.create_parent = create_parent;
85 self
86 }
87}
88
89#[derive(Debug, Clone)]
90struct MountLink {
91 viewfs_path: String,
92 hdfs_path: String,
93 protocol: Arc<NamenodeProtocol>,
94}
95
96impl MountLink {
97 fn new(viewfs_path: &str, hdfs_path: &str, protocol: Arc<NamenodeProtocol>) -> Self {
98 Self {
100 viewfs_path: viewfs_path.trim_end_matches("/").to_string(),
101 hdfs_path: hdfs_path.trim_end_matches("/").to_string(),
102 protocol,
103 }
104 }
105 fn resolve(&self, path: &str) -> Option<String> {
107 if path == self.viewfs_path {
110 Some(self.hdfs_path.clone())
111 } else {
112 path.strip_prefix(&format!("{}/", self.viewfs_path))
113 .map(|relative_path| format!("{}/{}", &self.hdfs_path, relative_path))
114 }
115 }
116}
117
118#[derive(Debug)]
119struct MountTable {
120 mounts: Vec<MountLink>,
121 fallback: MountLink,
122 home_dir: String,
123}
124
125impl MountTable {
126 fn resolve(&self, src: &str) -> (&MountLink, String) {
127 let path = if src.starts_with('/') {
128 src.to_string()
129 } else {
130 format!("{}/{}", self.home_dir, src)
131 };
132
133 for link in self.mounts.iter() {
134 if let Some(resolved) = link.resolve(&path) {
135 return (link, resolved);
136 }
137 }
138 (&self.fallback, self.fallback.resolve(&path).unwrap())
139 }
140}
141
142fn build_home_dir(
143 scheme: &str,
144 host: Option<&str>,
145 config: &Configuration,
146 username: &str,
147) -> String {
148 let prefix = match scheme {
149 "hdfs" => config.get("dfs.user.home.dir.prefix"),
150 "viewfs" => {
151 host.and_then(|host| config.get(&format!("fs.viewfs.mounttable.{host}.homedir")))
152 }
153 _ => None,
154 }
155 .unwrap_or("/user");
156
157 let prefix = prefix.trim_end_matches('/');
158 if prefix.is_empty() {
159 format!("/{username}")
160 } else {
161 format!("{prefix}/{username}")
162 }
163}
164
165#[derive(Debug)]
167pub enum IORuntime {
168 Runtime(Runtime),
169 Handle(Handle),
170}
171
172impl From<Runtime> for IORuntime {
173 fn from(value: Runtime) -> Self {
174 Self::Runtime(value)
175 }
176}
177
178impl From<Handle> for IORuntime {
179 fn from(value: Handle) -> Self {
180 Self::Handle(value)
181 }
182}
183
184impl IORuntime {
185 fn handle(&self) -> Handle {
186 match self {
187 Self::Runtime(runtime) => runtime.handle().clone(),
188 Self::Handle(handle) => handle.clone(),
189 }
190 }
191}
192
193#[derive(Default)]
257pub struct ClientBuilder {
258 url: Option<String>,
259 config: Option<HashMap<String, String>>,
260 config_dir: Option<String>,
261 runtime: Option<IORuntime>,
262}
263
264impl ClientBuilder {
265 pub fn new() -> Self {
267 Self::default()
268 }
269
270 pub fn with_url(mut self, url: impl Into<String>) -> Self {
272 self.url = Some(url.into());
273 self
274 }
275
276 pub fn with_config(
278 mut self,
279 config: impl IntoIterator<Item = (impl Into<String>, impl Into<String>)>,
280 ) -> Self {
281 self.config = Some(
282 config
283 .into_iter()
284 .map(|(k, v)| (k.into(), v.into()))
285 .collect(),
286 );
287 self
288 }
289
290 pub fn with_config_dir(mut self, config_dir: impl Into<String>) -> Self {
292 self.config_dir = Some(config_dir.into());
293 self
294 }
295
296 pub fn with_io_runtime(mut self, runtime: impl Into<IORuntime>) -> Self {
299 self.runtime = Some(runtime.into());
300 self
301 }
302
303 pub fn build(self) -> Result<Client> {
305 let config = Configuration::new(self.config_dir, self.config)?;
306 let url = if let Some(url) = self.url {
307 Url::parse(&url)?
308 } else {
309 Client::default_fs(&config)?
310 };
311
312 Client::build(&url, config, self.runtime)
313 }
314}
315
316#[derive(Clone, Debug)]
317enum RuntimeHolder {
318 Custom(Arc<IORuntime>),
319 Default(Arc<OnceLock<Runtime>>),
320}
321
322impl RuntimeHolder {
323 fn new(rt: Option<IORuntime>) -> Self {
324 if let Some(rt) = rt {
325 Self::Custom(Arc::new(rt))
326 } else {
327 Self::Default(Arc::new(OnceLock::new()))
328 }
329 }
330
331 fn get_handle(&self) -> Handle {
332 match self {
333 Self::Custom(rt) => rt.handle().clone(),
334 Self::Default(rt) => match Handle::try_current() {
335 Ok(handle) => handle,
336 Err(_) => rt
337 .get_or_init(|| Runtime::new().expect("Failed to create tokio runtime"))
338 .handle()
339 .clone(),
340 },
341 }
342 }
343}
344
345#[derive(Clone, Debug)]
347pub struct Client {
348 mount_table: Arc<MountTable>,
349 config: Arc<Configuration>,
350 rt_holder: RuntimeHolder,
353}
354
355impl Client {
356 #[deprecated(since = "0.12.0", note = "Use ClientBuilder instead")]
360 pub fn new(url: &str) -> Result<Self> {
361 let parsed_url = Url::parse(url)?;
362 Self::build(&parsed_url, Configuration::new(None, None)?, None)
363 }
364
365 #[deprecated(since = "0.12.0", note = "Use ClientBuilder instead")]
366 pub fn new_with_config(url: &str, config: HashMap<String, String>) -> Result<Self> {
367 let parsed_url = Url::parse(url)?;
368 Self::build(&parsed_url, Configuration::new(None, Some(config))?, None)
369 }
370
371 #[deprecated(since = "0.12.0", note = "Use ClientBuilder instead")]
372 pub fn default_with_config(config: HashMap<String, String>) -> Result<Self> {
373 let config = Configuration::new(None, Some(config))?;
374 Self::build(&Self::default_fs(&config)?, config, None)
375 }
376
377 fn default_fs(config: &Configuration) -> Result<Url> {
378 let url = config
379 .get(config::DEFAULT_FS)
380 .ok_or(HdfsError::InvalidArgument(format!(
381 "No {} setting found",
382 config::DEFAULT_FS
383 )))?;
384 Ok(Url::parse(url)?)
385 }
386
387 fn build(url: &Url, config: Configuration, rt: Option<IORuntime>) -> Result<Self> {
388 let resolved_url = if !url.has_host() {
389 let default_url = Self::default_fs(&config)?;
390 if url.scheme() != default_url.scheme() || !default_url.has_host() {
391 return Err(HdfsError::InvalidArgument(
392 "URL must contain a host".to_string(),
393 ));
394 }
395 default_url
396 } else {
397 url.clone()
398 };
399
400 let config = Arc::new(config);
401
402 let rt_holder = RuntimeHolder::new(rt);
403
404 let user_info = User::get_user_info();
405 let username = user_info
406 .effective_user
407 .as_deref()
408 .or(user_info.real_user.as_deref())
409 .expect("User info must include a username");
410 let home_dir = build_home_dir(
411 resolved_url.scheme(),
412 resolved_url.host_str(),
413 config.as_ref(),
414 username,
415 );
416
417 let mount_table = match url.scheme() {
418 "hdfs" => {
419 let proxy = NameServiceProxy::new(
420 &resolved_url,
421 Arc::clone(&config),
422 rt_holder.get_handle(),
423 )?;
424 let protocol = Arc::new(NamenodeProtocol::new(proxy, rt_holder.get_handle()));
425
426 MountTable {
427 mounts: Vec::new(),
428 fallback: MountLink::new("/", "/", protocol),
429 home_dir,
430 }
431 }
432 "viewfs" => Self::build_mount_table(
433 resolved_url.host_str().expect("URL must have a host"),
435 Arc::clone(&config),
436 rt_holder.get_handle(),
437 home_dir,
438 )?,
439 _ => {
440 return Err(HdfsError::InvalidArgument(
441 "Only `hdfs` and `viewfs` schemes are supported".to_string(),
442 ));
443 }
444 };
445
446 Ok(Self {
447 mount_table: Arc::new(mount_table),
448 config,
449 rt_holder,
450 })
451 }
452
453 fn build_mount_table(
454 host: &str,
455 config: Arc<Configuration>,
456 handle: Handle,
457 home_dir: String,
458 ) -> Result<MountTable> {
459 let mut mounts: Vec<MountLink> = Vec::new();
460 let mut fallback: Option<MountLink> = None;
461
462 for (viewfs_path, hdfs_url) in config.get_mount_table(host).iter() {
463 let url = Url::parse(hdfs_url)?;
464 if !url.has_host() {
465 return Err(HdfsError::InvalidArgument(
466 "URL must contain a host".to_string(),
467 ));
468 }
469 if url.scheme() != "hdfs" {
470 return Err(HdfsError::InvalidArgument(
471 "Only hdfs mounts are supported for viewfs".to_string(),
472 ));
473 }
474 let proxy = NameServiceProxy::new(&url, Arc::clone(&config), handle.clone())?;
475 let protocol = Arc::new(NamenodeProtocol::new(proxy, handle.clone()));
476
477 if let Some(prefix) = viewfs_path {
478 mounts.push(MountLink::new(prefix, url.path(), protocol));
479 } else {
480 if fallback.is_some() {
481 return Err(HdfsError::InvalidArgument(
482 "Multiple viewfs fallback links found".to_string(),
483 ));
484 }
485 fallback = Some(MountLink::new("/", url.path(), protocol));
486 }
487 }
488
489 if let Some(fallback) = fallback {
490 mounts.sort_by_key(|m| m.viewfs_path.chars().filter(|c| *c == '/').count());
492 mounts.reverse();
493
494 Ok(MountTable {
495 mounts,
496 fallback,
497 home_dir,
498 })
499 } else {
500 Err(HdfsError::InvalidArgument(
501 "No viewfs fallback mount found".to_string(),
502 ))
503 }
504 }
505
506 pub async fn get_file_info(&self, path: &str) -> Result<FileStatus> {
508 let (link, resolved_path) = self.mount_table.resolve(path);
509 match link.protocol.get_file_info(&resolved_path).await?.fs {
510 Some(status) => Ok(FileStatus::from(status, path)),
511 None => Err(HdfsError::FileNotFound(path.to_string())),
512 }
513 }
514
515 pub async fn list_status(&self, path: &str, recursive: bool) -> Result<Vec<FileStatus>> {
518 let iter = self.list_status_iter(path, recursive);
519 let statuses = iter
520 .into_stream()
521 .collect::<Vec<Result<FileStatus>>>()
522 .await;
523
524 let mut resolved_statues = Vec::<FileStatus>::with_capacity(statuses.len());
525 for status in statuses.into_iter() {
526 resolved_statues.push(status?);
527 }
528
529 Ok(resolved_statues)
530 }
531
532 pub fn list_status_iter(&self, path: &str, recursive: bool) -> ListStatusIterator {
534 ListStatusIterator::new(path.to_string(), Arc::clone(&self.mount_table), recursive)
535 }
536
537 pub async fn read(&self, path: &str) -> Result<FileReader> {
539 let (link, resolved_path) = self.mount_table.resolve(path);
540 let located_info = link
542 .protocol
543 .get_block_locations(&resolved_path, 0, i64::MAX as u64)
544 .await?;
545
546 if let Some(locations) = located_info.locations {
547 let ec_schema = if let Some(ec_policy) = locations.ec_policy.as_ref() {
548 Some(resolve_ec_policy(ec_policy)?)
549 } else {
550 None
551 };
552
553 if locations.file_encryption_info.is_some() {
554 return Err(HdfsError::UnsupportedFeature("File encryption".to_string()));
555 }
556
557 Ok(FileReader::new(
558 Arc::clone(&link.protocol),
559 locations,
560 ec_schema,
561 Arc::clone(&self.config),
562 self.rt_holder.get_handle(),
563 ))
564 } else {
565 Err(HdfsError::FileNotFound(path.to_string()))
566 }
567 }
568
569 pub async fn create(
572 &self,
573 src: &str,
574 write_options: impl AsRef<WriteOptions>,
575 ) -> Result<FileWriter> {
576 let write_options = write_options.as_ref();
577
578 let (link, resolved_path) = self.mount_table.resolve(src);
579
580 let create_response = link
581 .protocol
582 .create(
583 &resolved_path,
584 write_options.permission,
585 write_options.overwrite,
586 write_options.create_parent,
587 write_options.replication,
588 write_options.block_size,
589 )
590 .await?;
591
592 match create_response.fs {
593 Some(status) => {
594 if status.file_encryption_info.is_some() {
595 let _ = self.delete(src, false).await;
596 return Err(HdfsError::UnsupportedFeature("File encryption".to_string()));
597 }
598
599 Ok(FileWriter::new(
600 Arc::clone(&link.protocol),
601 resolved_path,
602 status,
603 Arc::clone(&self.config),
604 self.rt_holder.get_handle(),
605 None,
606 ))
607 }
608 None => Err(HdfsError::FileNotFound(src.to_string())),
609 }
610 }
611
612 fn needs_new_block(class: &str, msg: &str) -> bool {
613 class == "java.lang.UnsupportedOperationException" && msg.contains("NEW_BLOCK")
614 }
615
616 pub async fn append(&self, src: &str) -> Result<FileWriter> {
620 let (link, resolved_path) = self.mount_table.resolve(src);
621
622 let append_response = match link.protocol.append(&resolved_path, false).await {
625 Err(HdfsError::RPCError(class, msg)) if Self::needs_new_block(&class, &msg) => {
626 link.protocol.append(&resolved_path, true).await?
627 }
628 resp => resp?,
629 };
630
631 match append_response.stat {
632 Some(status) => {
633 if status.file_encryption_info.is_some() {
634 let _ = link
635 .protocol
636 .complete(src, append_response.block.map(|b| b.b), status.file_id)
637 .await;
638 return Err(HdfsError::UnsupportedFeature("File encryption".to_string()));
639 }
640
641 Ok(FileWriter::new(
642 Arc::clone(&link.protocol),
643 resolved_path,
644 status,
645 Arc::clone(&self.config),
646 self.rt_holder.get_handle(),
647 append_response.block,
648 ))
649 }
650 None => Err(HdfsError::FileNotFound(src.to_string())),
651 }
652 }
653
654 pub async fn mkdirs(&self, path: &str, permission: u32, create_parent: bool) -> Result<()> {
662 let (link, resolved_path) = self.mount_table.resolve(path);
663 link.protocol
664 .mkdirs(&resolved_path, permission, create_parent)
665 .await
666 .map(|_| ())
667 }
668
669 pub async fn rename(&self, src: &str, dst: &str, overwrite: bool) -> Result<()> {
671 let (src_link, src_resolved_path) = self.mount_table.resolve(src);
672 let (dst_link, dst_resolved_path) = self.mount_table.resolve(dst);
673 if src_link.viewfs_path == dst_link.viewfs_path {
674 src_link
675 .protocol
676 .rename(&src_resolved_path, &dst_resolved_path, overwrite)
677 .await
678 .map(|_| ())
679 } else {
680 Err(HdfsError::InvalidArgument(
681 "Cannot rename across different name services".to_string(),
682 ))
683 }
684 }
685
686 pub async fn delete(&self, path: &str, recursive: bool) -> Result<bool> {
689 let (link, resolved_path) = self.mount_table.resolve(path);
690 link.protocol
691 .delete(&resolved_path, recursive)
692 .await
693 .map(|r| r.result)
694 }
695
696 pub async fn set_times(&self, path: &str, mtime: u64, atime: u64) -> Result<()> {
698 let (link, resolved_path) = self.mount_table.resolve(path);
699 link.protocol
700 .set_times(&resolved_path, mtime, atime)
701 .await?;
702 Ok(())
703 }
704
705 pub async fn set_owner(
707 &self,
708 path: &str,
709 owner: Option<&str>,
710 group: Option<&str>,
711 ) -> Result<()> {
712 let (link, resolved_path) = self.mount_table.resolve(path);
713 link.protocol
714 .set_owner(&resolved_path, owner, group)
715 .await?;
716 Ok(())
717 }
718
719 pub async fn set_permission(&self, path: &str, permission: u32) -> Result<()> {
730 let (link, resolved_path) = self.mount_table.resolve(path);
731 link.protocol
732 .set_permission(&resolved_path, permission)
733 .await?;
734 Ok(())
735 }
736
737 pub async fn set_replication(&self, path: &str, replication: u32) -> Result<bool> {
739 let (link, resolved_path) = self.mount_table.resolve(path);
740 let result = link
741 .protocol
742 .set_replication(&resolved_path, replication)
743 .await?
744 .result;
745
746 Ok(result)
747 }
748
749 pub async fn get_content_summary(&self, path: &str) -> Result<ContentSummary> {
751 let (link, resolved_path) = self.mount_table.resolve(path);
752 let result = link
753 .protocol
754 .get_content_summary(&resolved_path)
755 .await?
756 .summary;
757
758 Ok(result.into())
759 }
760
761 pub async fn modify_acl_entries(&self, path: &str, acl_spec: Vec<AclEntry>) -> Result<()> {
763 let (link, resolved_path) = self.mount_table.resolve(path);
764 link.protocol
765 .modify_acl_entries(&resolved_path, acl_spec)
766 .await?;
767
768 Ok(())
769 }
770
771 pub async fn remove_acl_entries(&self, path: &str, acl_spec: Vec<AclEntry>) -> Result<()> {
773 let (link, resolved_path) = self.mount_table.resolve(path);
774 link.protocol
775 .remove_acl_entries(&resolved_path, acl_spec)
776 .await?;
777
778 Ok(())
779 }
780
781 pub async fn remove_default_acl(&self, path: &str) -> Result<()> {
783 let (link, resolved_path) = self.mount_table.resolve(path);
784 link.protocol.remove_default_acl(&resolved_path).await?;
785
786 Ok(())
787 }
788
789 pub async fn remove_acl(&self, path: &str) -> Result<()> {
791 let (link, resolved_path) = self.mount_table.resolve(path);
792 link.protocol.remove_acl(&resolved_path).await?;
793
794 Ok(())
795 }
796
797 pub async fn set_acl(&self, path: &str, acl_spec: Vec<AclEntry>) -> Result<()> {
801 let (link, resolved_path) = self.mount_table.resolve(path);
802 link.protocol.set_acl(&resolved_path, acl_spec).await?;
803
804 Ok(())
805 }
806
807 pub async fn get_acl_status(&self, path: &str) -> Result<AclStatus> {
809 let (link, resolved_path) = self.mount_table.resolve(path);
810 Ok(link
811 .protocol
812 .get_acl_status(&resolved_path)
813 .await?
814 .result
815 .into())
816 }
817
818 pub async fn glob_status(&self, pattern: &str) -> Result<Vec<FileStatus>> {
821 let flattened = expand_glob(pattern.to_string())?;
823
824 let mut results: Vec<FileStatus> = Vec::new();
825
826 for flat in flattened.into_iter() {
827 if flat.is_empty() {
830 continue;
831 }
832
833 let components = get_path_components(&flat);
834
835 #[derive(Clone, Debug)]
837 struct Candidate {
838 path: String,
839 status: Option<FileStatus>,
840 }
841
842 let mut candidates: Vec<Candidate> = vec![Candidate {
844 path: "/".to_string(),
845 status: None,
846 }];
847
848 for (idx, comp) in components.iter().enumerate() {
849 if candidates.is_empty() {
850 break;
851 }
852
853 let is_last = idx == components.len() - 1;
854
855 let unescaped = unescape_component(comp);
856 let glob_pat = GlobPattern::new(comp)?;
857
858 if !is_last && !glob_pat.has_wildcard() {
859 for cand in candidates.iter_mut() {
861 if !cand.path.ends_with('/') {
862 cand.path.push('/');
863 }
864 cand.path.push_str(&unescaped);
865 }
867 continue;
868 }
869
870 let mut new_candidates: Vec<Candidate> = Vec::new();
871
872 for cand in candidates.into_iter() {
873 if glob_pat.has_wildcard() {
874 let listing = match self.list_status(&cand.path, false).await {
876 Ok(listing) => listing,
877 Err(HdfsError::FileNotFound(_)) => continue,
878 Err(e) => return Err(e),
879 };
880 if listing.len() == 1 && listing[0].path == cand.path {
881 continue;
883 }
884
885 for child in listing.into_iter() {
886 if !is_last && !child.isdir {
888 continue;
889 }
890
891 let name = child
894 .path
895 .rsplit_once('/')
896 .map(|(_, n)| n)
897 .unwrap_or(child.path.as_str());
898
899 if glob_pat.matches(name) {
900 new_candidates.push(Candidate {
901 path: child.path.clone(),
902 status: Some(child),
903 });
904 }
905 }
906 } else {
907 let mut next_path = cand.path.clone();
909 if !next_path.ends_with('/') {
910 next_path.push('/');
911 }
912 next_path.push_str(&unescaped);
913
914 match self.get_file_info(&next_path).await {
915 Ok(status) => {
916 if is_last || status.isdir {
917 new_candidates.push(Candidate {
918 path: status.path.clone(),
919 status: Some(status),
920 });
921 }
922 }
923 Err(HdfsError::FileNotFound(_)) => continue,
924 Err(e) => return Err(e),
925 }
926 }
927 }
928
929 candidates = new_candidates;
930 }
931
932 for cand in candidates.into_iter() {
934 let status = if let Some(s) = cand.status {
935 s
936 } else {
937 match self.get_file_info(&cand.path).await {
939 Ok(s) => s,
940 Err(HdfsError::FileNotFound(_)) => continue,
941 Err(e) => return Err(e),
942 }
943 };
944
945 results.push(status);
946 }
947 }
948
949 Ok(results)
950 }
951}
952
953impl Default for Client {
954 fn default() -> Self {
957 ClientBuilder::new()
958 .build()
959 .expect("Failed to create default client")
960 }
961}
962
963pub(crate) struct DirListingIterator {
964 path: String,
965 resolved_path: String,
966 link: MountLink,
967 files_only: bool,
968 partial_listing: VecDeque<HdfsFileStatusProto>,
969 remaining: u32,
970 last_seen: Vec<u8>,
971}
972
973impl DirListingIterator {
974 fn new(path: String, mount_table: &Arc<MountTable>, files_only: bool) -> Self {
975 let (link, resolved_path) = mount_table.resolve(&path);
976
977 DirListingIterator {
978 path,
979 resolved_path,
980 link: link.clone(),
981 files_only,
982 partial_listing: VecDeque::new(),
983 remaining: 1,
984 last_seen: Vec::new(),
985 }
986 }
987
988 async fn get_next_batch(&mut self) -> Result<bool> {
989 let listing = self
990 .link
991 .protocol
992 .get_listing(&self.resolved_path, self.last_seen.clone(), false)
993 .await?;
994
995 if let Some(dir_list) = listing.dir_list {
996 self.last_seen = dir_list
997 .partial_listing
998 .last()
999 .map(|p| p.path.clone())
1000 .unwrap_or(Vec::new());
1001
1002 self.remaining = dir_list.remaining_entries;
1003
1004 self.partial_listing = dir_list
1005 .partial_listing
1006 .into_iter()
1007 .filter(|s| !self.files_only || s.file_type() != FileType::IsDir)
1008 .collect();
1009 Ok(!self.partial_listing.is_empty())
1010 } else {
1011 Err(HdfsError::FileNotFound(self.path.clone()))
1012 }
1013 }
1014
1015 pub async fn next(&mut self) -> Option<Result<FileStatus>> {
1016 if self.partial_listing.is_empty()
1017 && self.remaining > 0
1018 && let Err(error) = self.get_next_batch().await
1019 {
1020 self.remaining = 0;
1021 return Some(Err(error));
1022 }
1023 if let Some(next) = self.partial_listing.pop_front() {
1024 Some(Ok(FileStatus::from(next, &self.path)))
1025 } else {
1026 None
1027 }
1028 }
1029}
1030
1031pub struct ListStatusIterator {
1032 mount_table: Arc<MountTable>,
1033 recursive: bool,
1034 iters: Arc<tokio::sync::Mutex<Vec<DirListingIterator>>>,
1035}
1036
1037impl ListStatusIterator {
1038 fn new(path: String, mount_table: Arc<MountTable>, recursive: bool) -> Self {
1039 let initial = DirListingIterator::new(path.clone(), &mount_table, false);
1040
1041 ListStatusIterator {
1042 mount_table,
1043 recursive,
1044 iters: Arc::new(tokio::sync::Mutex::new(vec![initial])),
1045 }
1046 }
1047
1048 pub async fn next(&self) -> Option<Result<FileStatus>> {
1049 let mut next_file: Option<Result<FileStatus>> = None;
1050 let mut iters = self.iters.lock().await;
1051 while next_file.is_none() {
1052 if let Some(iter) = iters.last_mut() {
1053 if let Some(file_result) = iter.next().await {
1054 if let Ok(file) = file_result {
1055 if file.isdir && self.recursive {
1058 iters.push(DirListingIterator::new(
1059 file.path.clone(),
1060 &self.mount_table,
1061 false,
1062 ))
1063 }
1064 next_file = Some(Ok(file));
1065 } else {
1066 next_file = Some(file_result)
1068 }
1069 } else {
1070 iters.pop();
1072 }
1073 } else {
1074 break;
1076 }
1077 }
1078
1079 next_file
1080 }
1081
1082 pub fn into_stream(self) -> BoxStream<'static, Result<FileStatus>> {
1083 let listing = stream::unfold(self, |state| async move {
1084 let next = state.next().await;
1085 next.map(|n| (n, state))
1086 });
1087 Box::pin(listing)
1088 }
1089}
1090
1091#[derive(Debug, Clone)]
1092pub struct FileStatus {
1093 pub path: String,
1094 pub length: usize,
1095 pub isdir: bool,
1096 pub permission: u16,
1097 pub owner: String,
1098 pub group: String,
1099 pub modification_time: u64,
1100 pub access_time: u64,
1101 pub replication: Option<u32>,
1102 pub blocksize: Option<u64>,
1103}
1104
1105impl FileStatus {
1106 fn from(value: HdfsFileStatusProto, base_path: &str) -> Self {
1107 let mut path = base_path.trim_end_matches("/").to_string();
1108 let relative_path = std::str::from_utf8(&value.path).unwrap();
1109 if !relative_path.is_empty() {
1110 path.push('/');
1111 path.push_str(relative_path);
1112 }
1113
1114 if path.is_empty() {
1116 path.push('/');
1117 }
1118
1119 FileStatus {
1120 isdir: value.file_type() == FileType::IsDir,
1121 path,
1122 length: value.length as usize,
1123 permission: value.permission.perm as u16,
1124 owner: value.owner,
1125 group: value.group,
1126 modification_time: value.modification_time,
1127 access_time: value.access_time,
1128 replication: value.block_replication,
1129 blocksize: value.blocksize,
1130 }
1131 }
1132}
1133
1134#[derive(Debug)]
1135pub struct ContentSummary {
1136 pub length: u64,
1137 pub file_count: u64,
1138 pub directory_count: u64,
1139 pub quota: u64,
1140 pub space_consumed: u64,
1141 pub space_quota: u64,
1142}
1143
1144impl From<ContentSummaryProto> for ContentSummary {
1145 fn from(value: ContentSummaryProto) -> Self {
1146 ContentSummary {
1147 length: value.length,
1148 file_count: value.file_count,
1149 directory_count: value.directory_count,
1150 quota: value.quota,
1151 space_consumed: value.space_consumed,
1152 space_quota: value.space_quota,
1153 }
1154 }
1155}
1156
1157#[cfg(test)]
1158mod test {
1159 use std::sync::{Arc, LazyLock};
1160
1161 use tokio::runtime::Runtime;
1162 use url::Url;
1163
1164 use crate::{
1165 client::ClientBuilder,
1166 common::config::Configuration,
1167 hdfs::{protocol::NamenodeProtocol, proxy::NameServiceProxy},
1168 };
1169
1170 use super::{MountLink, MountTable};
1171
1172 static RT: LazyLock<Runtime> = LazyLock::new(|| Runtime::new().unwrap());
1173
1174 fn create_protocol(url: &str) -> Arc<NamenodeProtocol> {
1175 let proxy = NameServiceProxy::new(
1176 &Url::parse(url).unwrap(),
1177 Arc::new(Configuration::new(None, None).unwrap()),
1178 RT.handle().clone(),
1179 )
1180 .unwrap();
1181 Arc::new(NamenodeProtocol::new(proxy, RT.handle().clone()))
1182 }
1183
1184 #[test]
1185 fn test_default_fs() {
1186 assert!(
1187 ClientBuilder::new()
1188 .with_config(vec![("fs.defaultFS", "hdfs://test:9000")])
1189 .build()
1190 .is_ok()
1191 );
1192
1193 assert!(
1194 ClientBuilder::new()
1195 .with_config(vec![("fs.defaultFS", "hdfs://")])
1196 .build()
1197 .is_err()
1198 );
1199
1200 assert!(
1201 ClientBuilder::new()
1202 .with_url("hdfs://")
1203 .with_config(vec![("fs.defaultFS", "hdfs://test:9000")])
1204 .build()
1205 .is_ok()
1206 );
1207
1208 assert!(
1209 ClientBuilder::new()
1210 .with_url("hdfs://")
1211 .with_config(vec![("fs.defaultFS", "hdfs://")])
1212 .build()
1213 .is_err()
1214 );
1215
1216 assert!(
1217 ClientBuilder::new()
1218 .with_url("hdfs://")
1219 .with_config(vec![("fs.defaultFS", "viewfs://test")])
1220 .build()
1221 .is_err()
1222 );
1223 }
1224
1225 #[test]
1226 fn test_mount_link_resolve() {
1227 let protocol = create_protocol("hdfs://127.0.0.1:9000");
1228 let link = MountLink::new("/view", "/hdfs", protocol);
1229
1230 assert_eq!(link.resolve("/view/dir/file").unwrap(), "/hdfs/dir/file");
1231 assert_eq!(link.resolve("/view").unwrap(), "/hdfs");
1232 assert!(link.resolve("/hdfs/path").is_none());
1233 }
1234
1235 #[test]
1236 fn test_fallback_link() {
1237 let protocol = create_protocol("hdfs://127.0.0.1:9000");
1238 let link = MountLink::new("", "/hdfs", Arc::clone(&protocol));
1239
1240 assert_eq!(link.resolve("/path/to/file").unwrap(), "/hdfs/path/to/file");
1241 assert_eq!(link.resolve("/").unwrap(), "/hdfs/");
1242 assert_eq!(link.resolve("/hdfs/path").unwrap(), "/hdfs/hdfs/path");
1243
1244 let link = MountLink::new("", "", protocol);
1245 assert_eq!(link.resolve("/").unwrap(), "/");
1246 }
1247
1248 #[test]
1249 fn test_mount_table_resolve() {
1250 let link1 = MountLink::new(
1251 "/mount1",
1252 "/path1/nested",
1253 create_protocol("hdfs://127.0.0.1:9000"),
1254 );
1255 let link2 = MountLink::new(
1256 "/mount2",
1257 "/path2",
1258 create_protocol("hdfs://127.0.0.1:9001"),
1259 );
1260 let link3 = MountLink::new(
1261 "/mount3/nested",
1262 "/path3",
1263 create_protocol("hdfs://127.0.0.1:9002"),
1264 );
1265 let fallback = MountLink::new("/", "/path4", create_protocol("hdfs://127.0.0.1:9003"));
1266
1267 let mount_table = MountTable {
1268 mounts: vec![link1, link2, link3],
1269 fallback,
1270 home_dir: "/user/test".to_string(),
1271 };
1272
1273 let (link, resolved) = mount_table.resolve("/mount1");
1275 assert_eq!(link.viewfs_path, "/mount1");
1276 assert_eq!(resolved, "/path1/nested");
1277
1278 let (link, resolved) = mount_table.resolve("/mount1/");
1280 assert_eq!(link.viewfs_path, "/mount1");
1281 assert_eq!(resolved, "/path1/nested/");
1282
1283 let (link, resolved) = mount_table.resolve("/mount12");
1285 assert_eq!(link.viewfs_path, "");
1286 assert_eq!(resolved, "/path4/mount12");
1287
1288 let (link, resolved) = mount_table.resolve("/mount3/file");
1289 assert_eq!(link.viewfs_path, "");
1290 assert_eq!(resolved, "/path4/mount3/file");
1291
1292 let (link, resolved) = mount_table.resolve("/mount3/nested/file");
1293 assert_eq!(link.viewfs_path, "/mount3/nested");
1294 assert_eq!(resolved, "/path3/file");
1295
1296 let (link, resolved) = mount_table.resolve("file");
1297 assert_eq!(link.viewfs_path, "");
1298 assert_eq!(resolved, "/path4/user/test/file");
1299
1300 let (link, resolved) = mount_table.resolve("dir/subdir");
1301 assert_eq!(link.viewfs_path, "");
1302 assert_eq!(resolved, "/path4/user/test/dir/subdir");
1303
1304 let mount_table = MountTable {
1305 mounts: vec![
1306 MountLink::new(
1307 "/mount1",
1308 "/path1/nested",
1309 create_protocol("hdfs://127.0.0.1:9000"),
1310 ),
1311 MountLink::new(
1312 "/mount2",
1313 "/path2",
1314 create_protocol("hdfs://127.0.0.1:9001"),
1315 ),
1316 ],
1317 fallback: MountLink::new("/", "/path4", create_protocol("hdfs://127.0.0.1:9003")),
1318 home_dir: "/mount1/user".to_string(),
1319 };
1320
1321 let (link, resolved) = mount_table.resolve("file");
1322 assert_eq!(link.viewfs_path, "/mount1");
1323 assert_eq!(resolved, "/path1/nested/user/file");
1324
1325 let (link, resolved) = mount_table.resolve("dir/subdir");
1326 assert_eq!(link.viewfs_path, "/mount1");
1327 assert_eq!(resolved, "/path1/nested/user/dir/subdir");
1328 }
1329
1330 #[test]
1331 fn test_io_runtime() {
1332 assert!(
1333 ClientBuilder::new()
1334 .with_url("hdfs://127.0.0.1:9000")
1335 .with_io_runtime(Runtime::new().unwrap())
1336 .build()
1337 .is_ok()
1338 );
1339
1340 let rt = Runtime::new().unwrap();
1341 assert!(
1342 ClientBuilder::new()
1343 .with_url("hdfs://127.0.0.1:9000")
1344 .with_io_runtime(rt.handle().clone())
1345 .build()
1346 .is_ok()
1347 );
1348 }
1349
1350 #[test]
1351 fn test_set_conf_dir() {
1352 assert!(
1353 ClientBuilder::new()
1354 .with_url("hdfs://127.0.0.1:9000")
1355 .with_config_dir("target/test")
1356 .build()
1357 .is_ok()
1358 )
1359 }
1360}