1use std::collections::{HashMap, VecDeque};
2use std::sync::{Arc, OnceLock};
3
4use futures::stream::BoxStream;
5use futures::{StreamExt, stream};
6use tokio::runtime::{Handle, Runtime};
7use url::Url;
8
9use crate::acl::{AclEntry, AclStatus};
10use crate::common::config::{self, Configuration};
11use crate::ec::resolve_ec_policy;
12use crate::error::{HdfsError, Result};
13use crate::file::{FileReader, FileWriter};
14use crate::hdfs::protocol::NamenodeProtocol;
15use crate::hdfs::proxy::NameServiceProxy;
16use crate::proto::hdfs::hdfs_file_status_proto::FileType;
17
18use crate::glob::{GlobPattern, expand_glob, get_path_components, unescape_component};
19use crate::proto::hdfs::{ContentSummaryProto, HdfsFileStatusProto};
20
21#[derive(Clone)]
22pub struct WriteOptions {
23 pub block_size: Option<u64>,
25 pub replication: Option<u32>,
27 pub permission: u32,
30 pub overwrite: bool,
33 pub create_parent: bool,
36}
37
38impl Default for WriteOptions {
39 fn default() -> Self {
40 Self {
41 block_size: None,
42 replication: None,
43 permission: 0o644,
44 overwrite: false,
45 create_parent: true,
46 }
47 }
48}
49
50impl AsRef<WriteOptions> for WriteOptions {
51 fn as_ref(&self) -> &WriteOptions {
52 self
53 }
54}
55
56impl WriteOptions {
57 pub fn block_size(mut self, block_size: u64) -> Self {
59 self.block_size = Some(block_size);
60 self
61 }
62
63 pub fn replication(mut self, replication: u32) -> Self {
65 self.replication = Some(replication);
66 self
67 }
68
69 pub fn permission(mut self, permission: u32) -> Self {
71 self.permission = permission;
72 self
73 }
74
75 pub fn overwrite(mut self, overwrite: bool) -> Self {
77 self.overwrite = overwrite;
78 self
79 }
80
81 pub fn create_parent(mut self, create_parent: bool) -> Self {
83 self.create_parent = create_parent;
84 self
85 }
86}
87
88#[derive(Debug, Clone)]
89struct MountLink {
90 viewfs_path: String,
91 hdfs_path: String,
92 protocol: Arc<NamenodeProtocol>,
93}
94
95impl MountLink {
96 fn new(viewfs_path: &str, hdfs_path: &str, protocol: Arc<NamenodeProtocol>) -> Self {
97 Self {
99 viewfs_path: viewfs_path.trim_end_matches("/").to_string(),
100 hdfs_path: hdfs_path.trim_end_matches("/").to_string(),
101 protocol,
102 }
103 }
104 fn resolve(&self, path: &str) -> Option<String> {
106 if path == self.viewfs_path {
109 Some(self.hdfs_path.clone())
110 } else {
111 path.strip_prefix(&format!("{}/", self.viewfs_path))
112 .map(|relative_path| format!("{}/{}", &self.hdfs_path, relative_path))
113 }
114 }
115}
116
117#[derive(Debug)]
118struct MountTable {
119 mounts: Vec<MountLink>,
120 fallback: MountLink,
121}
122
123impl MountTable {
124 fn resolve(&self, src: &str) -> (&MountLink, String) {
125 for link in self.mounts.iter() {
126 if let Some(resolved) = link.resolve(src) {
127 return (link, resolved);
128 }
129 }
130 (&self.fallback, self.fallback.resolve(src).unwrap())
131 }
132}
133
134#[derive(Debug)]
136pub enum IORuntime {
137 Runtime(Runtime),
138 Handle(Handle),
139}
140
141impl From<Runtime> for IORuntime {
142 fn from(value: Runtime) -> Self {
143 Self::Runtime(value)
144 }
145}
146
147impl From<Handle> for IORuntime {
148 fn from(value: Handle) -> Self {
149 Self::Handle(value)
150 }
151}
152
153impl IORuntime {
154 fn handle(&self) -> Handle {
155 match self {
156 Self::Runtime(runtime) => runtime.handle().clone(),
157 Self::Handle(handle) => handle.clone(),
158 }
159 }
160}
161
162#[derive(Default)]
199pub struct ClientBuilder {
200 url: Option<String>,
201 config: HashMap<String, String>,
202 runtime: Option<IORuntime>,
203}
204
205impl ClientBuilder {
206 pub fn new() -> Self {
208 Self::default()
209 }
210
211 pub fn with_url(mut self, url: impl Into<String>) -> Self {
213 self.url = Some(url.into());
214 self
215 }
216
217 pub fn with_config(
219 mut self,
220 config: impl IntoIterator<Item = (impl Into<String>, impl Into<String>)>,
221 ) -> Self {
222 self.config = config
223 .into_iter()
224 .map(|(k, v)| (k.into(), v.into()))
225 .collect();
226 self
227 }
228
229 pub fn with_io_runtime(mut self, runtime: impl Into<IORuntime>) -> Self {
232 self.runtime = Some(runtime.into());
233 self
234 }
235
236 pub fn build(self) -> Result<Client> {
238 let config = Configuration::new_with_config(self.config)?;
239 let url = if let Some(url) = self.url {
240 Url::parse(&url)?
241 } else {
242 Client::default_fs(&config)?
243 };
244
245 Client::build(&url, config, self.runtime)
246 }
247}
248
249#[derive(Clone, Debug)]
250enum RuntimeHolder {
251 Custom(Arc<IORuntime>),
252 Default(Arc<OnceLock<Runtime>>),
253}
254
255impl RuntimeHolder {
256 fn new(rt: Option<IORuntime>) -> Self {
257 if let Some(rt) = rt {
258 Self::Custom(Arc::new(rt))
259 } else {
260 Self::Default(Arc::new(OnceLock::new()))
261 }
262 }
263
264 fn get_handle(&self) -> Handle {
265 match self {
266 Self::Custom(rt) => rt.handle().clone(),
267 Self::Default(rt) => match Handle::try_current() {
268 Ok(handle) => handle,
269 Err(_) => rt
270 .get_or_init(|| Runtime::new().expect("Failed to create tokio runtime"))
271 .handle()
272 .clone(),
273 },
274 }
275 }
276}
277
278#[derive(Clone, Debug)]
280pub struct Client {
281 mount_table: Arc<MountTable>,
282 config: Arc<Configuration>,
283 rt_holder: RuntimeHolder,
286}
287
288impl Client {
289 #[deprecated(since = "0.12.0", note = "Use ClientBuilder instead")]
293 pub fn new(url: &str) -> Result<Self> {
294 let parsed_url = Url::parse(url)?;
295 Self::build(&parsed_url, Configuration::new()?, None)
296 }
297
298 #[deprecated(since = "0.12.0", note = "Use ClientBuilder instead")]
299 pub fn new_with_config(url: &str, config: HashMap<String, String>) -> Result<Self> {
300 let parsed_url = Url::parse(url)?;
301 Self::build(&parsed_url, Configuration::new_with_config(config)?, None)
302 }
303
304 #[deprecated(since = "0.12.0", note = "Use ClientBuilder instead")]
305 pub fn default_with_config(config: HashMap<String, String>) -> Result<Self> {
306 let config = Configuration::new_with_config(config)?;
307 Self::build(&Self::default_fs(&config)?, config, None)
308 }
309
310 fn default_fs(config: &Configuration) -> Result<Url> {
311 let url = config
312 .get(config::DEFAULT_FS)
313 .ok_or(HdfsError::InvalidArgument(format!(
314 "No {} setting found",
315 config::DEFAULT_FS
316 )))?;
317 Ok(Url::parse(url)?)
318 }
319
320 fn build(url: &Url, config: Configuration, rt: Option<IORuntime>) -> Result<Self> {
321 let resolved_url = if !url.has_host() {
322 let default_url = Self::default_fs(&config)?;
323 if url.scheme() != default_url.scheme() || !default_url.has_host() {
324 return Err(HdfsError::InvalidArgument(
325 "URL must contain a host".to_string(),
326 ));
327 }
328 default_url
329 } else {
330 url.clone()
331 };
332
333 let config = Arc::new(config);
334
335 let rt_holder = RuntimeHolder::new(rt);
336
337 let mount_table = match url.scheme() {
338 "hdfs" => {
339 let proxy = NameServiceProxy::new(
340 &resolved_url,
341 Arc::clone(&config),
342 rt_holder.get_handle(),
343 )?;
344 let protocol = Arc::new(NamenodeProtocol::new(proxy, rt_holder.get_handle()));
345
346 MountTable {
347 mounts: Vec::new(),
348 fallback: MountLink::new("/", "/", protocol),
349 }
350 }
351 "viewfs" => Self::build_mount_table(
352 resolved_url.host_str().expect("URL must have a host"),
354 Arc::clone(&config),
355 rt_holder.get_handle(),
356 )?,
357 _ => {
358 return Err(HdfsError::InvalidArgument(
359 "Only `hdfs` and `viewfs` schemes are supported".to_string(),
360 ));
361 }
362 };
363
364 Ok(Self {
365 mount_table: Arc::new(mount_table),
366 config,
367 rt_holder,
368 })
369 }
370
371 fn build_mount_table(
372 host: &str,
373 config: Arc<Configuration>,
374 handle: Handle,
375 ) -> Result<MountTable> {
376 let mut mounts: Vec<MountLink> = Vec::new();
377 let mut fallback: Option<MountLink> = None;
378
379 for (viewfs_path, hdfs_url) in config.get_mount_table(host).iter() {
380 let url = Url::parse(hdfs_url)?;
381 if !url.has_host() {
382 return Err(HdfsError::InvalidArgument(
383 "URL must contain a host".to_string(),
384 ));
385 }
386 if url.scheme() != "hdfs" {
387 return Err(HdfsError::InvalidArgument(
388 "Only hdfs mounts are supported for viewfs".to_string(),
389 ));
390 }
391 let proxy = NameServiceProxy::new(&url, Arc::clone(&config), handle.clone())?;
392 let protocol = Arc::new(NamenodeProtocol::new(proxy, handle.clone()));
393
394 if let Some(prefix) = viewfs_path {
395 mounts.push(MountLink::new(prefix, url.path(), protocol));
396 } else {
397 if fallback.is_some() {
398 return Err(HdfsError::InvalidArgument(
399 "Multiple viewfs fallback links found".to_string(),
400 ));
401 }
402 fallback = Some(MountLink::new("/", url.path(), protocol));
403 }
404 }
405
406 if let Some(fallback) = fallback {
407 mounts.sort_by_key(|m| m.viewfs_path.chars().filter(|c| *c == '/').count());
409 mounts.reverse();
410
411 Ok(MountTable { mounts, fallback })
412 } else {
413 Err(HdfsError::InvalidArgument(
414 "No viewfs fallback mount found".to_string(),
415 ))
416 }
417 }
418
419 pub async fn get_file_info(&self, path: &str) -> Result<FileStatus> {
421 let (link, resolved_path) = self.mount_table.resolve(path);
422 match link.protocol.get_file_info(&resolved_path).await?.fs {
423 Some(status) => Ok(FileStatus::from(status, path)),
424 None => Err(HdfsError::FileNotFound(path.to_string())),
425 }
426 }
427
428 pub async fn list_status(&self, path: &str, recursive: bool) -> Result<Vec<FileStatus>> {
431 let iter = self.list_status_iter(path, recursive);
432 let statuses = iter
433 .into_stream()
434 .collect::<Vec<Result<FileStatus>>>()
435 .await;
436
437 let mut resolved_statues = Vec::<FileStatus>::with_capacity(statuses.len());
438 for status in statuses.into_iter() {
439 resolved_statues.push(status?);
440 }
441
442 Ok(resolved_statues)
443 }
444
445 pub fn list_status_iter(&self, path: &str, recursive: bool) -> ListStatusIterator {
447 ListStatusIterator::new(path.to_string(), Arc::clone(&self.mount_table), recursive)
448 }
449
450 pub async fn read(&self, path: &str) -> Result<FileReader> {
452 let (link, resolved_path) = self.mount_table.resolve(path);
453 let located_info = link
455 .protocol
456 .get_block_locations(&resolved_path, 0, i64::MAX as u64)
457 .await?;
458
459 if let Some(locations) = located_info.locations {
460 let ec_schema = if let Some(ec_policy) = locations.ec_policy.as_ref() {
461 Some(resolve_ec_policy(ec_policy)?)
462 } else {
463 None
464 };
465
466 if locations.file_encryption_info.is_some() {
467 return Err(HdfsError::UnsupportedFeature("File encryption".to_string()));
468 }
469
470 Ok(FileReader::new(
471 Arc::clone(&link.protocol),
472 locations,
473 ec_schema,
474 Arc::clone(&self.config),
475 self.rt_holder.get_handle(),
476 ))
477 } else {
478 Err(HdfsError::FileNotFound(path.to_string()))
479 }
480 }
481
482 pub async fn create(
485 &self,
486 src: &str,
487 write_options: impl AsRef<WriteOptions>,
488 ) -> Result<FileWriter> {
489 let write_options = write_options.as_ref();
490
491 let (link, resolved_path) = self.mount_table.resolve(src);
492
493 let create_response = link
494 .protocol
495 .create(
496 &resolved_path,
497 write_options.permission,
498 write_options.overwrite,
499 write_options.create_parent,
500 write_options.replication,
501 write_options.block_size,
502 )
503 .await?;
504
505 match create_response.fs {
506 Some(status) => {
507 if status.file_encryption_info.is_some() {
508 let _ = self.delete(src, false).await;
509 return Err(HdfsError::UnsupportedFeature("File encryption".to_string()));
510 }
511
512 Ok(FileWriter::new(
513 Arc::clone(&link.protocol),
514 resolved_path,
515 status,
516 Arc::clone(&self.config),
517 self.rt_holder.get_handle(),
518 None,
519 ))
520 }
521 None => Err(HdfsError::FileNotFound(src.to_string())),
522 }
523 }
524
525 fn needs_new_block(class: &str, msg: &str) -> bool {
526 class == "java.lang.UnsupportedOperationException" && msg.contains("NEW_BLOCK")
527 }
528
529 pub async fn append(&self, src: &str) -> Result<FileWriter> {
533 let (link, resolved_path) = self.mount_table.resolve(src);
534
535 let append_response = match link.protocol.append(&resolved_path, false).await {
538 Err(HdfsError::RPCError(class, msg)) if Self::needs_new_block(&class, &msg) => {
539 link.protocol.append(&resolved_path, true).await?
540 }
541 resp => resp?,
542 };
543
544 match append_response.stat {
545 Some(status) => {
546 if status.file_encryption_info.is_some() {
547 let _ = link
548 .protocol
549 .complete(src, append_response.block.map(|b| b.b), status.file_id)
550 .await;
551 return Err(HdfsError::UnsupportedFeature("File encryption".to_string()));
552 }
553
554 Ok(FileWriter::new(
555 Arc::clone(&link.protocol),
556 resolved_path,
557 status,
558 Arc::clone(&self.config),
559 self.rt_holder.get_handle(),
560 append_response.block,
561 ))
562 }
563 None => Err(HdfsError::FileNotFound(src.to_string())),
564 }
565 }
566
567 pub async fn mkdirs(&self, path: &str, permission: u32, create_parent: bool) -> Result<()> {
575 let (link, resolved_path) = self.mount_table.resolve(path);
576 link.protocol
577 .mkdirs(&resolved_path, permission, create_parent)
578 .await
579 .map(|_| ())
580 }
581
582 pub async fn rename(&self, src: &str, dst: &str, overwrite: bool) -> Result<()> {
584 let (src_link, src_resolved_path) = self.mount_table.resolve(src);
585 let (dst_link, dst_resolved_path) = self.mount_table.resolve(dst);
586 if src_link.viewfs_path == dst_link.viewfs_path {
587 src_link
588 .protocol
589 .rename(&src_resolved_path, &dst_resolved_path, overwrite)
590 .await
591 .map(|_| ())
592 } else {
593 Err(HdfsError::InvalidArgument(
594 "Cannot rename across different name services".to_string(),
595 ))
596 }
597 }
598
599 pub async fn delete(&self, path: &str, recursive: bool) -> Result<bool> {
602 let (link, resolved_path) = self.mount_table.resolve(path);
603 link.protocol
604 .delete(&resolved_path, recursive)
605 .await
606 .map(|r| r.result)
607 }
608
609 pub async fn set_times(&self, path: &str, mtime: u64, atime: u64) -> Result<()> {
611 let (link, resolved_path) = self.mount_table.resolve(path);
612 link.protocol
613 .set_times(&resolved_path, mtime, atime)
614 .await?;
615 Ok(())
616 }
617
618 pub async fn set_owner(
620 &self,
621 path: &str,
622 owner: Option<&str>,
623 group: Option<&str>,
624 ) -> Result<()> {
625 let (link, resolved_path) = self.mount_table.resolve(path);
626 link.protocol
627 .set_owner(&resolved_path, owner, group)
628 .await?;
629 Ok(())
630 }
631
632 pub async fn set_permission(&self, path: &str, permission: u32) -> Result<()> {
643 let (link, resolved_path) = self.mount_table.resolve(path);
644 link.protocol
645 .set_permission(&resolved_path, permission)
646 .await?;
647 Ok(())
648 }
649
650 pub async fn set_replication(&self, path: &str, replication: u32) -> Result<bool> {
652 let (link, resolved_path) = self.mount_table.resolve(path);
653 let result = link
654 .protocol
655 .set_replication(&resolved_path, replication)
656 .await?
657 .result;
658
659 Ok(result)
660 }
661
662 pub async fn get_content_summary(&self, path: &str) -> Result<ContentSummary> {
664 let (link, resolved_path) = self.mount_table.resolve(path);
665 let result = link
666 .protocol
667 .get_content_summary(&resolved_path)
668 .await?
669 .summary;
670
671 Ok(result.into())
672 }
673
674 pub async fn modify_acl_entries(&self, path: &str, acl_spec: Vec<AclEntry>) -> Result<()> {
676 let (link, resolved_path) = self.mount_table.resolve(path);
677 link.protocol
678 .modify_acl_entries(&resolved_path, acl_spec)
679 .await?;
680
681 Ok(())
682 }
683
684 pub async fn remove_acl_entries(&self, path: &str, acl_spec: Vec<AclEntry>) -> Result<()> {
686 let (link, resolved_path) = self.mount_table.resolve(path);
687 link.protocol
688 .remove_acl_entries(&resolved_path, acl_spec)
689 .await?;
690
691 Ok(())
692 }
693
694 pub async fn remove_default_acl(&self, path: &str) -> Result<()> {
696 let (link, resolved_path) = self.mount_table.resolve(path);
697 link.protocol.remove_default_acl(&resolved_path).await?;
698
699 Ok(())
700 }
701
702 pub async fn remove_acl(&self, path: &str) -> Result<()> {
704 let (link, resolved_path) = self.mount_table.resolve(path);
705 link.protocol.remove_acl(&resolved_path).await?;
706
707 Ok(())
708 }
709
710 pub async fn set_acl(&self, path: &str, acl_spec: Vec<AclEntry>) -> Result<()> {
714 let (link, resolved_path) = self.mount_table.resolve(path);
715 link.protocol.set_acl(&resolved_path, acl_spec).await?;
716
717 Ok(())
718 }
719
720 pub async fn get_acl_status(&self, path: &str) -> Result<AclStatus> {
722 let (link, resolved_path) = self.mount_table.resolve(path);
723 Ok(link
724 .protocol
725 .get_acl_status(&resolved_path)
726 .await?
727 .result
728 .into())
729 }
730
731 pub async fn glob_status(&self, pattern: &str) -> Result<Vec<FileStatus>> {
734 let flattened = expand_glob(pattern.to_string())?;
736
737 let mut results: Vec<FileStatus> = Vec::new();
738
739 for flat in flattened.into_iter() {
740 if flat.is_empty() {
743 continue;
744 }
745
746 let components = get_path_components(&flat);
747
748 #[derive(Clone, Debug)]
750 struct Candidate {
751 path: String,
752 status: Option<FileStatus>,
753 }
754
755 let mut candidates: Vec<Candidate> = vec![Candidate {
757 path: "/".to_string(),
758 status: None,
759 }];
760
761 for (idx, comp) in components.iter().enumerate() {
762 if candidates.is_empty() {
763 break;
764 }
765
766 let is_last = idx == components.len() - 1;
767
768 let unescaped = unescape_component(comp);
769 let glob_pat = GlobPattern::new(comp)?;
770
771 if !is_last && !glob_pat.has_wildcard() {
772 for cand in candidates.iter_mut() {
774 if !cand.path.ends_with('/') {
775 cand.path.push('/');
776 }
777 cand.path.push_str(&unescaped);
778 }
780 continue;
781 }
782
783 let mut new_candidates: Vec<Candidate> = Vec::new();
784
785 for cand in candidates.into_iter() {
786 if glob_pat.has_wildcard() {
787 let listing = self.list_status(&cand.path, false).await?;
789 if listing.len() == 1 && listing[0].path == cand.path {
790 continue;
792 }
793
794 for child in listing.into_iter() {
795 if !is_last && !child.isdir {
797 continue;
798 }
799
800 let name = child
803 .path
804 .rsplit_once('/')
805 .map(|(_, n)| n)
806 .unwrap_or(child.path.as_str());
807
808 if glob_pat.matches(name) {
809 new_candidates.push(Candidate {
810 path: child.path.clone(),
811 status: Some(child),
812 });
813 }
814 }
815 } else {
816 let mut next_path = cand.path.clone();
818 if !next_path.ends_with('/') {
819 next_path.push('/');
820 }
821 next_path.push_str(&unescaped);
822
823 if let Ok(status) = self.get_file_info(&next_path).await {
824 new_candidates.push(Candidate {
825 path: status.path.clone(),
826 status: Some(status),
827 });
828 }
829 }
830 }
831
832 candidates = new_candidates;
833 }
834
835 for cand in candidates.into_iter() {
837 let status = if let Some(s) = cand.status {
838 s
839 } else {
840 match self.get_file_info(&cand.path).await {
842 Ok(s) => s,
843 Err(HdfsError::FileNotFound(_)) => continue,
844 Err(e) => return Err(e),
845 }
846 };
847
848 results.push(status);
849 }
850 }
851
852 Ok(results)
853 }
854}
855
856impl Default for Client {
857 fn default() -> Self {
860 ClientBuilder::new()
861 .build()
862 .expect("Failed to create default client")
863 }
864}
865
866pub(crate) struct DirListingIterator {
867 path: String,
868 resolved_path: String,
869 link: MountLink,
870 files_only: bool,
871 partial_listing: VecDeque<HdfsFileStatusProto>,
872 remaining: u32,
873 last_seen: Vec<u8>,
874}
875
876impl DirListingIterator {
877 fn new(path: String, mount_table: &Arc<MountTable>, files_only: bool) -> Self {
878 let (link, resolved_path) = mount_table.resolve(&path);
879
880 DirListingIterator {
881 path,
882 resolved_path,
883 link: link.clone(),
884 files_only,
885 partial_listing: VecDeque::new(),
886 remaining: 1,
887 last_seen: Vec::new(),
888 }
889 }
890
891 async fn get_next_batch(&mut self) -> Result<bool> {
892 let listing = self
893 .link
894 .protocol
895 .get_listing(&self.resolved_path, self.last_seen.clone(), false)
896 .await?;
897
898 if let Some(dir_list) = listing.dir_list {
899 self.last_seen = dir_list
900 .partial_listing
901 .last()
902 .map(|p| p.path.clone())
903 .unwrap_or(Vec::new());
904
905 self.remaining = dir_list.remaining_entries;
906
907 self.partial_listing = dir_list
908 .partial_listing
909 .into_iter()
910 .filter(|s| !self.files_only || s.file_type() != FileType::IsDir)
911 .collect();
912 Ok(!self.partial_listing.is_empty())
913 } else {
914 Err(HdfsError::FileNotFound(self.path.clone()))
915 }
916 }
917
918 pub async fn next(&mut self) -> Option<Result<FileStatus>> {
919 if self.partial_listing.is_empty()
920 && self.remaining > 0
921 && let Err(error) = self.get_next_batch().await
922 {
923 self.remaining = 0;
924 return Some(Err(error));
925 }
926 if let Some(next) = self.partial_listing.pop_front() {
927 Some(Ok(FileStatus::from(next, &self.path)))
928 } else {
929 None
930 }
931 }
932}
933
934pub struct ListStatusIterator {
935 mount_table: Arc<MountTable>,
936 recursive: bool,
937 iters: Arc<tokio::sync::Mutex<Vec<DirListingIterator>>>,
938}
939
940impl ListStatusIterator {
941 fn new(path: String, mount_table: Arc<MountTable>, recursive: bool) -> Self {
942 let initial = DirListingIterator::new(path.clone(), &mount_table, false);
943
944 ListStatusIterator {
945 mount_table,
946 recursive,
947 iters: Arc::new(tokio::sync::Mutex::new(vec![initial])),
948 }
949 }
950
951 pub async fn next(&self) -> Option<Result<FileStatus>> {
952 let mut next_file: Option<Result<FileStatus>> = None;
953 let mut iters = self.iters.lock().await;
954 while next_file.is_none() {
955 if let Some(iter) = iters.last_mut() {
956 if let Some(file_result) = iter.next().await {
957 if let Ok(file) = file_result {
958 if file.isdir && self.recursive {
961 iters.push(DirListingIterator::new(
962 file.path.clone(),
963 &self.mount_table,
964 false,
965 ))
966 }
967 next_file = Some(Ok(file));
968 } else {
969 next_file = Some(file_result)
971 }
972 } else {
973 iters.pop();
975 }
976 } else {
977 break;
979 }
980 }
981
982 next_file
983 }
984
985 pub fn into_stream(self) -> BoxStream<'static, Result<FileStatus>> {
986 let listing = stream::unfold(self, |state| async move {
987 let next = state.next().await;
988 next.map(|n| (n, state))
989 });
990 Box::pin(listing)
991 }
992}
993
994#[derive(Debug, Clone)]
995pub struct FileStatus {
996 pub path: String,
997 pub length: usize,
998 pub isdir: bool,
999 pub permission: u16,
1000 pub owner: String,
1001 pub group: String,
1002 pub modification_time: u64,
1003 pub access_time: u64,
1004 pub replication: Option<u32>,
1005 pub blocksize: Option<u64>,
1006}
1007
1008impl FileStatus {
1009 fn from(value: HdfsFileStatusProto, base_path: &str) -> Self {
1010 let mut path = base_path.trim_end_matches("/").to_string();
1011 let relative_path = std::str::from_utf8(&value.path).unwrap();
1012 if !relative_path.is_empty() {
1013 path.push('/');
1014 path.push_str(relative_path);
1015 }
1016
1017 if path.is_empty() {
1019 path.push('/');
1020 }
1021
1022 FileStatus {
1023 isdir: value.file_type() == FileType::IsDir,
1024 path,
1025 length: value.length as usize,
1026 permission: value.permission.perm as u16,
1027 owner: value.owner,
1028 group: value.group,
1029 modification_time: value.modification_time,
1030 access_time: value.access_time,
1031 replication: value.block_replication,
1032 blocksize: value.blocksize,
1033 }
1034 }
1035}
1036
1037#[derive(Debug)]
1038pub struct ContentSummary {
1039 pub length: u64,
1040 pub file_count: u64,
1041 pub directory_count: u64,
1042 pub quota: u64,
1043 pub space_consumed: u64,
1044 pub space_quota: u64,
1045}
1046
1047impl From<ContentSummaryProto> for ContentSummary {
1048 fn from(value: ContentSummaryProto) -> Self {
1049 ContentSummary {
1050 length: value.length,
1051 file_count: value.file_count,
1052 directory_count: value.directory_count,
1053 quota: value.quota,
1054 space_consumed: value.space_consumed,
1055 space_quota: value.space_quota,
1056 }
1057 }
1058}
1059
1060#[cfg(test)]
1061mod test {
1062 use std::sync::{Arc, LazyLock};
1063
1064 use tokio::runtime::Runtime;
1065 use url::Url;
1066
1067 use crate::{
1068 client::ClientBuilder,
1069 common::config::Configuration,
1070 hdfs::{protocol::NamenodeProtocol, proxy::NameServiceProxy},
1071 };
1072
1073 use super::{MountLink, MountTable};
1074
1075 static RT: LazyLock<Runtime> = LazyLock::new(|| Runtime::new().unwrap());
1076
1077 fn create_protocol(url: &str) -> Arc<NamenodeProtocol> {
1078 let proxy = NameServiceProxy::new(
1079 &Url::parse(url).unwrap(),
1080 Arc::new(Configuration::new().unwrap()),
1081 RT.handle().clone(),
1082 )
1083 .unwrap();
1084 Arc::new(NamenodeProtocol::new(proxy, RT.handle().clone()))
1085 }
1086
1087 #[test]
1088 fn test_default_fs() {
1089 assert!(
1090 ClientBuilder::new()
1091 .with_config(vec![("fs.defaultFS", "hdfs://test:9000")])
1092 .build()
1093 .is_ok()
1094 );
1095
1096 assert!(
1097 ClientBuilder::new()
1098 .with_config(vec![("fs.defaultFS", "hdfs://")])
1099 .build()
1100 .is_err()
1101 );
1102
1103 assert!(
1104 ClientBuilder::new()
1105 .with_url("hdfs://")
1106 .with_config(vec![("fs.defaultFS", "hdfs://test:9000")])
1107 .build()
1108 .is_ok()
1109 );
1110
1111 assert!(
1112 ClientBuilder::new()
1113 .with_url("hdfs://")
1114 .with_config(vec![("fs.defaultFS", "hdfs://")])
1115 .build()
1116 .is_err()
1117 );
1118
1119 assert!(
1120 ClientBuilder::new()
1121 .with_url("hdfs://")
1122 .with_config(vec![("fs.defaultFS", "viewfs://test")])
1123 .build()
1124 .is_err()
1125 );
1126 }
1127
1128 #[test]
1129 fn test_mount_link_resolve() {
1130 let protocol = create_protocol("hdfs://127.0.0.1:9000");
1131 let link = MountLink::new("/view", "/hdfs", protocol);
1132
1133 assert_eq!(link.resolve("/view/dir/file").unwrap(), "/hdfs/dir/file");
1134 assert_eq!(link.resolve("/view").unwrap(), "/hdfs");
1135 assert!(link.resolve("/hdfs/path").is_none());
1136 }
1137
1138 #[test]
1139 fn test_fallback_link() {
1140 let protocol = create_protocol("hdfs://127.0.0.1:9000");
1141 let link = MountLink::new("", "/hdfs", Arc::clone(&protocol));
1142
1143 assert_eq!(link.resolve("/path/to/file").unwrap(), "/hdfs/path/to/file");
1144 assert_eq!(link.resolve("/").unwrap(), "/hdfs/");
1145 assert_eq!(link.resolve("/hdfs/path").unwrap(), "/hdfs/hdfs/path");
1146
1147 let link = MountLink::new("", "", protocol);
1148 assert_eq!(link.resolve("/").unwrap(), "/");
1149 }
1150
1151 #[test]
1152 fn test_mount_table_resolve() {
1153 let link1 = MountLink::new(
1154 "/mount1",
1155 "/path1/nested",
1156 create_protocol("hdfs://127.0.0.1:9000"),
1157 );
1158 let link2 = MountLink::new(
1159 "/mount2",
1160 "/path2",
1161 create_protocol("hdfs://127.0.0.1:9001"),
1162 );
1163 let link3 = MountLink::new(
1164 "/mount3/nested",
1165 "/path3",
1166 create_protocol("hdfs://127.0.0.1:9002"),
1167 );
1168 let fallback = MountLink::new("/", "/path4", create_protocol("hdfs://127.0.0.1:9003"));
1169
1170 let mount_table = MountTable {
1171 mounts: vec![link1, link2, link3],
1172 fallback,
1173 };
1174
1175 let (link, resolved) = mount_table.resolve("/mount1");
1177 assert_eq!(link.viewfs_path, "/mount1");
1178 assert_eq!(resolved, "/path1/nested");
1179
1180 let (link, resolved) = mount_table.resolve("/mount1/");
1182 assert_eq!(link.viewfs_path, "/mount1");
1183 assert_eq!(resolved, "/path1/nested/");
1184
1185 let (link, resolved) = mount_table.resolve("/mount12");
1187 assert_eq!(link.viewfs_path, "");
1188 assert_eq!(resolved, "/path4/mount12");
1189
1190 let (link, resolved) = mount_table.resolve("/mount3/file");
1191 assert_eq!(link.viewfs_path, "");
1192 assert_eq!(resolved, "/path4/mount3/file");
1193
1194 let (link, resolved) = mount_table.resolve("/mount3/nested/file");
1195 assert_eq!(link.viewfs_path, "/mount3/nested");
1196 assert_eq!(resolved, "/path3/file");
1197 }
1198
1199 #[test]
1200 fn test_io_runtime() {
1201 assert!(
1202 ClientBuilder::new()
1203 .with_url("hdfs://127.0.0.1:9000")
1204 .with_io_runtime(Runtime::new().unwrap())
1205 .build()
1206 .is_ok()
1207 );
1208
1209 let rt = Runtime::new().unwrap();
1210 assert!(
1211 ClientBuilder::new()
1212 .with_url("hdfs://127.0.0.1:9000")
1213 .with_io_runtime(rt.handle().clone())
1214 .build()
1215 .is_ok()
1216 );
1217 }
1218}