1use std::collections::{HashMap, VecDeque};
2use std::sync::{Arc, OnceLock};
3
4use futures::stream::BoxStream;
5use futures::{StreamExt, stream};
6use tokio::runtime::{Handle, Runtime};
7use url::Url;
8
9use crate::acl::{AclEntry, AclStatus};
10use crate::common::config::{self, Configuration};
11use crate::ec::resolve_ec_policy;
12use crate::error::{HdfsError, Result};
13use crate::file::{FileReader, FileWriter};
14use crate::hdfs::protocol::NamenodeProtocol;
15use crate::hdfs::proxy::NameServiceProxy;
16use crate::proto::hdfs::hdfs_file_status_proto::FileType;
17
18use crate::glob::{GlobPattern, expand_glob, get_path_components, unescape_component};
19use crate::proto::hdfs::{ContentSummaryProto, HdfsFileStatusProto};
20
21#[derive(Clone)]
22pub struct WriteOptions {
23 pub block_size: Option<u64>,
25 pub replication: Option<u32>,
27 pub permission: u32,
30 pub overwrite: bool,
33 pub create_parent: bool,
36}
37
38impl Default for WriteOptions {
39 fn default() -> Self {
40 Self {
41 block_size: None,
42 replication: None,
43 permission: 0o644,
44 overwrite: false,
45 create_parent: true,
46 }
47 }
48}
49
50impl AsRef<WriteOptions> for WriteOptions {
51 fn as_ref(&self) -> &WriteOptions {
52 self
53 }
54}
55
56impl WriteOptions {
57 pub fn block_size(mut self, block_size: u64) -> Self {
59 self.block_size = Some(block_size);
60 self
61 }
62
63 pub fn replication(mut self, replication: u32) -> Self {
65 self.replication = Some(replication);
66 self
67 }
68
69 pub fn permission(mut self, permission: u32) -> Self {
71 self.permission = permission;
72 self
73 }
74
75 pub fn overwrite(mut self, overwrite: bool) -> Self {
77 self.overwrite = overwrite;
78 self
79 }
80
81 pub fn create_parent(mut self, create_parent: bool) -> Self {
83 self.create_parent = create_parent;
84 self
85 }
86}
87
88#[derive(Debug, Clone)]
89struct MountLink {
90 viewfs_path: String,
91 hdfs_path: String,
92 protocol: Arc<NamenodeProtocol>,
93}
94
95impl MountLink {
96 fn new(viewfs_path: &str, hdfs_path: &str, protocol: Arc<NamenodeProtocol>) -> Self {
97 Self {
99 viewfs_path: viewfs_path.trim_end_matches("/").to_string(),
100 hdfs_path: hdfs_path.trim_end_matches("/").to_string(),
101 protocol,
102 }
103 }
104 fn resolve(&self, path: &str) -> Option<String> {
106 if path == self.viewfs_path {
109 Some(self.hdfs_path.clone())
110 } else {
111 path.strip_prefix(&format!("{}/", self.viewfs_path))
112 .map(|relative_path| format!("{}/{}", &self.hdfs_path, relative_path))
113 }
114 }
115}
116
117#[derive(Debug)]
118struct MountTable {
119 mounts: Vec<MountLink>,
120 fallback: MountLink,
121}
122
123impl MountTable {
124 fn resolve(&self, src: &str) -> (&MountLink, String) {
125 for link in self.mounts.iter() {
126 if let Some(resolved) = link.resolve(src) {
127 return (link, resolved);
128 }
129 }
130 (&self.fallback, self.fallback.resolve(src).unwrap())
131 }
132}
133
134#[derive(Debug)]
136pub enum IORuntime {
137 Runtime(Runtime),
138 Handle(Handle),
139}
140
141impl From<Runtime> for IORuntime {
142 fn from(value: Runtime) -> Self {
143 Self::Runtime(value)
144 }
145}
146
147impl From<Handle> for IORuntime {
148 fn from(value: Handle) -> Self {
149 Self::Handle(value)
150 }
151}
152
153impl IORuntime {
154 fn handle(&self) -> Handle {
155 match self {
156 Self::Runtime(runtime) => runtime.handle().clone(),
157 Self::Handle(handle) => handle.clone(),
158 }
159 }
160}
161
162#[derive(Default)]
199pub struct ClientBuilder {
200 url: Option<String>,
201 config: HashMap<String, String>,
202 runtime: Option<IORuntime>,
203}
204
205impl ClientBuilder {
206 pub fn new() -> Self {
208 Self::default()
209 }
210
211 pub fn with_url(mut self, url: impl Into<String>) -> Self {
213 self.url = Some(url.into());
214 self
215 }
216
217 pub fn with_config(
219 mut self,
220 config: impl IntoIterator<Item = (impl Into<String>, impl Into<String>)>,
221 ) -> Self {
222 self.config = config
223 .into_iter()
224 .map(|(k, v)| (k.into(), v.into()))
225 .collect();
226 self
227 }
228
229 pub fn with_io_runtime(mut self, runtime: impl Into<IORuntime>) -> Self {
232 self.runtime = Some(runtime.into());
233 self
234 }
235
236 pub fn build(self) -> Result<Client> {
238 let config = Configuration::new_with_config(self.config)?;
239 let url = if let Some(url) = self.url {
240 Url::parse(&url)?
241 } else {
242 Client::default_fs(&config)?
243 };
244
245 Client::build(&url, config, self.runtime)
246 }
247}
248
249#[derive(Clone, Debug)]
250enum RuntimeHolder {
251 Custom(Arc<IORuntime>),
252 Default(Arc<OnceLock<Runtime>>),
253}
254
255impl RuntimeHolder {
256 fn new(rt: Option<IORuntime>) -> Self {
257 if let Some(rt) = rt {
258 Self::Custom(Arc::new(rt))
259 } else {
260 Self::Default(Arc::new(OnceLock::new()))
261 }
262 }
263
264 fn get_handle(&self) -> Handle {
265 match self {
266 Self::Custom(rt) => rt.handle().clone(),
267 Self::Default(rt) => match Handle::try_current() {
268 Ok(handle) => handle,
269 Err(_) => rt
270 .get_or_init(|| Runtime::new().expect("Failed to create tokio runtime"))
271 .handle()
272 .clone(),
273 },
274 }
275 }
276}
277
278#[derive(Clone, Debug)]
280pub struct Client {
281 mount_table: Arc<MountTable>,
282 config: Arc<Configuration>,
283 rt_holder: RuntimeHolder,
286}
287
288impl Client {
289 #[deprecated(since = "0.12.0", note = "Use ClientBuilder instead")]
293 pub fn new(url: &str) -> Result<Self> {
294 let parsed_url = Url::parse(url)?;
295 Self::build(&parsed_url, Configuration::new()?, None)
296 }
297
298 #[deprecated(since = "0.12.0", note = "Use ClientBuilder instead")]
299 pub fn new_with_config(url: &str, config: HashMap<String, String>) -> Result<Self> {
300 let parsed_url = Url::parse(url)?;
301 Self::build(&parsed_url, Configuration::new_with_config(config)?, None)
302 }
303
304 #[deprecated(since = "0.12.0", note = "Use ClientBuilder instead")]
305 pub fn default_with_config(config: HashMap<String, String>) -> Result<Self> {
306 let config = Configuration::new_with_config(config)?;
307 Self::build(&Self::default_fs(&config)?, config, None)
308 }
309
310 fn default_fs(config: &Configuration) -> Result<Url> {
311 let url = config
312 .get(config::DEFAULT_FS)
313 .ok_or(HdfsError::InvalidArgument(format!(
314 "No {} setting found",
315 config::DEFAULT_FS
316 )))?;
317 Ok(Url::parse(url)?)
318 }
319
320 fn build(url: &Url, config: Configuration, rt: Option<IORuntime>) -> Result<Self> {
321 let resolved_url = if !url.has_host() {
322 let default_url = Self::default_fs(&config)?;
323 if url.scheme() != default_url.scheme() || !default_url.has_host() {
324 return Err(HdfsError::InvalidArgument(
325 "URL must contain a host".to_string(),
326 ));
327 }
328 default_url
329 } else {
330 url.clone()
331 };
332
333 let config = Arc::new(config);
334
335 let rt_holder = RuntimeHolder::new(rt);
336
337 let mount_table = match url.scheme() {
338 "hdfs" => {
339 let proxy = NameServiceProxy::new(
340 &resolved_url,
341 Arc::clone(&config),
342 rt_holder.get_handle(),
343 )?;
344 let protocol = Arc::new(NamenodeProtocol::new(proxy, rt_holder.get_handle()));
345
346 MountTable {
347 mounts: Vec::new(),
348 fallback: MountLink::new("/", "/", protocol),
349 }
350 }
351 "viewfs" => Self::build_mount_table(
352 resolved_url.host_str().expect("URL must have a host"),
354 Arc::clone(&config),
355 rt_holder.get_handle(),
356 )?,
357 _ => {
358 return Err(HdfsError::InvalidArgument(
359 "Only `hdfs` and `viewfs` schemes are supported".to_string(),
360 ));
361 }
362 };
363
364 Ok(Self {
365 mount_table: Arc::new(mount_table),
366 config,
367 rt_holder,
368 })
369 }
370
371 fn build_mount_table(
372 host: &str,
373 config: Arc<Configuration>,
374 handle: Handle,
375 ) -> Result<MountTable> {
376 let mut mounts: Vec<MountLink> = Vec::new();
377 let mut fallback: Option<MountLink> = None;
378
379 for (viewfs_path, hdfs_url) in config.get_mount_table(host).iter() {
380 let url = Url::parse(hdfs_url)?;
381 if !url.has_host() {
382 return Err(HdfsError::InvalidArgument(
383 "URL must contain a host".to_string(),
384 ));
385 }
386 if url.scheme() != "hdfs" {
387 return Err(HdfsError::InvalidArgument(
388 "Only hdfs mounts are supported for viewfs".to_string(),
389 ));
390 }
391 let proxy = NameServiceProxy::new(&url, Arc::clone(&config), handle.clone())?;
392 let protocol = Arc::new(NamenodeProtocol::new(proxy, handle.clone()));
393
394 if let Some(prefix) = viewfs_path {
395 mounts.push(MountLink::new(prefix, url.path(), protocol));
396 } else {
397 if fallback.is_some() {
398 return Err(HdfsError::InvalidArgument(
399 "Multiple viewfs fallback links found".to_string(),
400 ));
401 }
402 fallback = Some(MountLink::new("/", url.path(), protocol));
403 }
404 }
405
406 if let Some(fallback) = fallback {
407 mounts.sort_by_key(|m| m.viewfs_path.chars().filter(|c| *c == '/').count());
409 mounts.reverse();
410
411 Ok(MountTable { mounts, fallback })
412 } else {
413 Err(HdfsError::InvalidArgument(
414 "No viewfs fallback mount found".to_string(),
415 ))
416 }
417 }
418
419 pub async fn get_file_info(&self, path: &str) -> Result<FileStatus> {
421 let (link, resolved_path) = self.mount_table.resolve(path);
422 match link.protocol.get_file_info(&resolved_path).await?.fs {
423 Some(status) => Ok(FileStatus::from(status, path)),
424 None => Err(HdfsError::FileNotFound(path.to_string())),
425 }
426 }
427
428 pub async fn list_status(&self, path: &str, recursive: bool) -> Result<Vec<FileStatus>> {
431 let iter = self.list_status_iter(path, recursive);
432 let statuses = iter
433 .into_stream()
434 .collect::<Vec<Result<FileStatus>>>()
435 .await;
436
437 let mut resolved_statues = Vec::<FileStatus>::with_capacity(statuses.len());
438 for status in statuses.into_iter() {
439 resolved_statues.push(status?);
440 }
441
442 Ok(resolved_statues)
443 }
444
445 pub fn list_status_iter(&self, path: &str, recursive: bool) -> ListStatusIterator {
447 ListStatusIterator::new(path.to_string(), Arc::clone(&self.mount_table), recursive)
448 }
449
450 pub async fn read(&self, path: &str) -> Result<FileReader> {
452 let (link, resolved_path) = self.mount_table.resolve(path);
453 let located_info = link
455 .protocol
456 .get_block_locations(&resolved_path, 0, i64::MAX as u64)
457 .await?;
458
459 if let Some(locations) = located_info.locations {
460 let ec_schema = if let Some(ec_policy) = locations.ec_policy.as_ref() {
461 Some(resolve_ec_policy(ec_policy)?)
462 } else {
463 None
464 };
465
466 if locations.file_encryption_info.is_some() {
467 return Err(HdfsError::UnsupportedFeature("File encryption".to_string()));
468 }
469
470 Ok(FileReader::new(
471 Arc::clone(&link.protocol),
472 locations,
473 ec_schema,
474 Arc::clone(&self.config),
475 self.rt_holder.get_handle(),
476 ))
477 } else {
478 Err(HdfsError::FileNotFound(path.to_string()))
479 }
480 }
481
482 pub async fn create(
485 &self,
486 src: &str,
487 write_options: impl AsRef<WriteOptions>,
488 ) -> Result<FileWriter> {
489 let write_options = write_options.as_ref();
490
491 let (link, resolved_path) = self.mount_table.resolve(src);
492
493 let create_response = link
494 .protocol
495 .create(
496 &resolved_path,
497 write_options.permission,
498 write_options.overwrite,
499 write_options.create_parent,
500 write_options.replication,
501 write_options.block_size,
502 )
503 .await?;
504
505 match create_response.fs {
506 Some(status) => {
507 if status.file_encryption_info.is_some() {
508 let _ = self.delete(src, false).await;
509 return Err(HdfsError::UnsupportedFeature("File encryption".to_string()));
510 }
511
512 Ok(FileWriter::new(
513 Arc::clone(&link.protocol),
514 resolved_path,
515 status,
516 Arc::clone(&self.config),
517 self.rt_holder.get_handle(),
518 None,
519 ))
520 }
521 None => Err(HdfsError::FileNotFound(src.to_string())),
522 }
523 }
524
525 fn needs_new_block(class: &str, msg: &str) -> bool {
526 class == "java.lang.UnsupportedOperationException" && msg.contains("NEW_BLOCK")
527 }
528
529 pub async fn append(&self, src: &str) -> Result<FileWriter> {
533 let (link, resolved_path) = self.mount_table.resolve(src);
534
535 let append_response = match link.protocol.append(&resolved_path, false).await {
538 Err(HdfsError::RPCError(class, msg)) if Self::needs_new_block(&class, &msg) => {
539 link.protocol.append(&resolved_path, true).await?
540 }
541 resp => resp?,
542 };
543
544 match append_response.stat {
545 Some(status) => {
546 if status.file_encryption_info.is_some() {
547 let _ = link
548 .protocol
549 .complete(src, append_response.block.map(|b| b.b), status.file_id)
550 .await;
551 return Err(HdfsError::UnsupportedFeature("File encryption".to_string()));
552 }
553
554 Ok(FileWriter::new(
555 Arc::clone(&link.protocol),
556 resolved_path,
557 status,
558 Arc::clone(&self.config),
559 self.rt_holder.get_handle(),
560 append_response.block,
561 ))
562 }
563 None => Err(HdfsError::FileNotFound(src.to_string())),
564 }
565 }
566
567 pub async fn mkdirs(&self, path: &str, permission: u32, create_parent: bool) -> Result<()> {
575 let (link, resolved_path) = self.mount_table.resolve(path);
576 link.protocol
577 .mkdirs(&resolved_path, permission, create_parent)
578 .await
579 .map(|_| ())
580 }
581
582 pub async fn rename(&self, src: &str, dst: &str, overwrite: bool) -> Result<()> {
584 let (src_link, src_resolved_path) = self.mount_table.resolve(src);
585 let (dst_link, dst_resolved_path) = self.mount_table.resolve(dst);
586 if src_link.viewfs_path == dst_link.viewfs_path {
587 src_link
588 .protocol
589 .rename(&src_resolved_path, &dst_resolved_path, overwrite)
590 .await
591 .map(|_| ())
592 } else {
593 Err(HdfsError::InvalidArgument(
594 "Cannot rename across different name services".to_string(),
595 ))
596 }
597 }
598
599 pub async fn delete(&self, path: &str, recursive: bool) -> Result<bool> {
602 let (link, resolved_path) = self.mount_table.resolve(path);
603 link.protocol
604 .delete(&resolved_path, recursive)
605 .await
606 .map(|r| r.result)
607 }
608
609 pub async fn set_times(&self, path: &str, mtime: u64, atime: u64) -> Result<()> {
611 let (link, resolved_path) = self.mount_table.resolve(path);
612 link.protocol
613 .set_times(&resolved_path, mtime, atime)
614 .await?;
615 Ok(())
616 }
617
618 pub async fn set_owner(
620 &self,
621 path: &str,
622 owner: Option<&str>,
623 group: Option<&str>,
624 ) -> Result<()> {
625 let (link, resolved_path) = self.mount_table.resolve(path);
626 link.protocol
627 .set_owner(&resolved_path, owner, group)
628 .await?;
629 Ok(())
630 }
631
632 pub async fn set_permission(&self, path: &str, permission: u32) -> Result<()> {
643 let (link, resolved_path) = self.mount_table.resolve(path);
644 link.protocol
645 .set_permission(&resolved_path, permission)
646 .await?;
647 Ok(())
648 }
649
650 pub async fn set_replication(&self, path: &str, replication: u32) -> Result<bool> {
652 let (link, resolved_path) = self.mount_table.resolve(path);
653 let result = link
654 .protocol
655 .set_replication(&resolved_path, replication)
656 .await?
657 .result;
658
659 Ok(result)
660 }
661
662 pub async fn get_content_summary(&self, path: &str) -> Result<ContentSummary> {
664 let (link, resolved_path) = self.mount_table.resolve(path);
665 let result = link
666 .protocol
667 .get_content_summary(&resolved_path)
668 .await?
669 .summary;
670
671 Ok(result.into())
672 }
673
674 pub async fn modify_acl_entries(&self, path: &str, acl_spec: Vec<AclEntry>) -> Result<()> {
676 let (link, resolved_path) = self.mount_table.resolve(path);
677 link.protocol
678 .modify_acl_entries(&resolved_path, acl_spec)
679 .await?;
680
681 Ok(())
682 }
683
684 pub async fn remove_acl_entries(&self, path: &str, acl_spec: Vec<AclEntry>) -> Result<()> {
686 let (link, resolved_path) = self.mount_table.resolve(path);
687 link.protocol
688 .remove_acl_entries(&resolved_path, acl_spec)
689 .await?;
690
691 Ok(())
692 }
693
694 pub async fn remove_default_acl(&self, path: &str) -> Result<()> {
696 let (link, resolved_path) = self.mount_table.resolve(path);
697 link.protocol.remove_default_acl(&resolved_path).await?;
698
699 Ok(())
700 }
701
702 pub async fn remove_acl(&self, path: &str) -> Result<()> {
704 let (link, resolved_path) = self.mount_table.resolve(path);
705 link.protocol.remove_acl(&resolved_path).await?;
706
707 Ok(())
708 }
709
710 pub async fn set_acl(&self, path: &str, acl_spec: Vec<AclEntry>) -> Result<()> {
714 let (link, resolved_path) = self.mount_table.resolve(path);
715 link.protocol.set_acl(&resolved_path, acl_spec).await?;
716
717 Ok(())
718 }
719
720 pub async fn get_acl_status(&self, path: &str) -> Result<AclStatus> {
722 let (link, resolved_path) = self.mount_table.resolve(path);
723 Ok(link
724 .protocol
725 .get_acl_status(&resolved_path)
726 .await?
727 .result
728 .into())
729 }
730
731 pub async fn glob_status(&self, pattern: &str) -> Result<Vec<FileStatus>> {
734 let flattened = expand_glob(pattern.to_string())?;
736
737 let mut results: Vec<FileStatus> = Vec::new();
738
739 for flat in flattened.into_iter() {
740 if flat.is_empty() {
743 continue;
744 }
745
746 let components = get_path_components(&flat);
747
748 #[derive(Clone, Debug)]
750 struct Candidate {
751 path: String,
752 status: Option<FileStatus>,
753 }
754
755 let mut candidates: Vec<Candidate> = vec![Candidate {
757 path: "/".to_string(),
758 status: None,
759 }];
760
761 for (idx, comp) in components.iter().enumerate() {
762 if candidates.is_empty() {
763 break;
764 }
765
766 let is_last = idx == components.len() - 1;
767
768 let unescaped = unescape_component(comp);
769 let glob_pat = GlobPattern::new(comp)?;
770
771 if !is_last && !glob_pat.has_wildcard() {
772 for cand in candidates.iter_mut() {
774 if !cand.path.ends_with('/') {
775 cand.path.push('/');
776 }
777 cand.path.push_str(&unescaped);
778 }
780 continue;
781 }
782
783 let mut new_candidates: Vec<Candidate> = Vec::new();
784
785 for cand in candidates.into_iter() {
786 if glob_pat.has_wildcard() {
787 let listing = match self.list_status(&cand.path, false).await {
789 Ok(listing) => listing,
790 Err(HdfsError::FileNotFound(_)) => continue,
791 Err(e) => return Err(e),
792 };
793 if listing.len() == 1 && listing[0].path == cand.path {
794 continue;
796 }
797
798 for child in listing.into_iter() {
799 if !is_last && !child.isdir {
801 continue;
802 }
803
804 let name = child
807 .path
808 .rsplit_once('/')
809 .map(|(_, n)| n)
810 .unwrap_or(child.path.as_str());
811
812 if glob_pat.matches(name) {
813 new_candidates.push(Candidate {
814 path: child.path.clone(),
815 status: Some(child),
816 });
817 }
818 }
819 } else {
820 let mut next_path = cand.path.clone();
822 if !next_path.ends_with('/') {
823 next_path.push('/');
824 }
825 next_path.push_str(&unescaped);
826
827 match self.get_file_info(&next_path).await {
828 Ok(status) => {
829 if is_last || status.isdir {
830 new_candidates.push(Candidate {
831 path: status.path.clone(),
832 status: Some(status),
833 });
834 }
835 }
836 Err(HdfsError::FileNotFound(_)) => continue,
837 Err(e) => return Err(e),
838 }
839 }
840 }
841
842 candidates = new_candidates;
843 }
844
845 for cand in candidates.into_iter() {
847 let status = if let Some(s) = cand.status {
848 s
849 } else {
850 match self.get_file_info(&cand.path).await {
852 Ok(s) => s,
853 Err(HdfsError::FileNotFound(_)) => continue,
854 Err(e) => return Err(e),
855 }
856 };
857
858 results.push(status);
859 }
860 }
861
862 Ok(results)
863 }
864}
865
866impl Default for Client {
867 fn default() -> Self {
870 ClientBuilder::new()
871 .build()
872 .expect("Failed to create default client")
873 }
874}
875
876pub(crate) struct DirListingIterator {
877 path: String,
878 resolved_path: String,
879 link: MountLink,
880 files_only: bool,
881 partial_listing: VecDeque<HdfsFileStatusProto>,
882 remaining: u32,
883 last_seen: Vec<u8>,
884}
885
886impl DirListingIterator {
887 fn new(path: String, mount_table: &Arc<MountTable>, files_only: bool) -> Self {
888 let (link, resolved_path) = mount_table.resolve(&path);
889
890 DirListingIterator {
891 path,
892 resolved_path,
893 link: link.clone(),
894 files_only,
895 partial_listing: VecDeque::new(),
896 remaining: 1,
897 last_seen: Vec::new(),
898 }
899 }
900
901 async fn get_next_batch(&mut self) -> Result<bool> {
902 let listing = self
903 .link
904 .protocol
905 .get_listing(&self.resolved_path, self.last_seen.clone(), false)
906 .await?;
907
908 if let Some(dir_list) = listing.dir_list {
909 self.last_seen = dir_list
910 .partial_listing
911 .last()
912 .map(|p| p.path.clone())
913 .unwrap_or(Vec::new());
914
915 self.remaining = dir_list.remaining_entries;
916
917 self.partial_listing = dir_list
918 .partial_listing
919 .into_iter()
920 .filter(|s| !self.files_only || s.file_type() != FileType::IsDir)
921 .collect();
922 Ok(!self.partial_listing.is_empty())
923 } else {
924 Err(HdfsError::FileNotFound(self.path.clone()))
925 }
926 }
927
928 pub async fn next(&mut self) -> Option<Result<FileStatus>> {
929 if self.partial_listing.is_empty()
930 && self.remaining > 0
931 && let Err(error) = self.get_next_batch().await
932 {
933 self.remaining = 0;
934 return Some(Err(error));
935 }
936 if let Some(next) = self.partial_listing.pop_front() {
937 Some(Ok(FileStatus::from(next, &self.path)))
938 } else {
939 None
940 }
941 }
942}
943
944pub struct ListStatusIterator {
945 mount_table: Arc<MountTable>,
946 recursive: bool,
947 iters: Arc<tokio::sync::Mutex<Vec<DirListingIterator>>>,
948}
949
950impl ListStatusIterator {
951 fn new(path: String, mount_table: Arc<MountTable>, recursive: bool) -> Self {
952 let initial = DirListingIterator::new(path.clone(), &mount_table, false);
953
954 ListStatusIterator {
955 mount_table,
956 recursive,
957 iters: Arc::new(tokio::sync::Mutex::new(vec![initial])),
958 }
959 }
960
961 pub async fn next(&self) -> Option<Result<FileStatus>> {
962 let mut next_file: Option<Result<FileStatus>> = None;
963 let mut iters = self.iters.lock().await;
964 while next_file.is_none() {
965 if let Some(iter) = iters.last_mut() {
966 if let Some(file_result) = iter.next().await {
967 if let Ok(file) = file_result {
968 if file.isdir && self.recursive {
971 iters.push(DirListingIterator::new(
972 file.path.clone(),
973 &self.mount_table,
974 false,
975 ))
976 }
977 next_file = Some(Ok(file));
978 } else {
979 next_file = Some(file_result)
981 }
982 } else {
983 iters.pop();
985 }
986 } else {
987 break;
989 }
990 }
991
992 next_file
993 }
994
995 pub fn into_stream(self) -> BoxStream<'static, Result<FileStatus>> {
996 let listing = stream::unfold(self, |state| async move {
997 let next = state.next().await;
998 next.map(|n| (n, state))
999 });
1000 Box::pin(listing)
1001 }
1002}
1003
1004#[derive(Debug, Clone)]
1005pub struct FileStatus {
1006 pub path: String,
1007 pub length: usize,
1008 pub isdir: bool,
1009 pub permission: u16,
1010 pub owner: String,
1011 pub group: String,
1012 pub modification_time: u64,
1013 pub access_time: u64,
1014 pub replication: Option<u32>,
1015 pub blocksize: Option<u64>,
1016}
1017
1018impl FileStatus {
1019 fn from(value: HdfsFileStatusProto, base_path: &str) -> Self {
1020 let mut path = base_path.trim_end_matches("/").to_string();
1021 let relative_path = std::str::from_utf8(&value.path).unwrap();
1022 if !relative_path.is_empty() {
1023 path.push('/');
1024 path.push_str(relative_path);
1025 }
1026
1027 if path.is_empty() {
1029 path.push('/');
1030 }
1031
1032 FileStatus {
1033 isdir: value.file_type() == FileType::IsDir,
1034 path,
1035 length: value.length as usize,
1036 permission: value.permission.perm as u16,
1037 owner: value.owner,
1038 group: value.group,
1039 modification_time: value.modification_time,
1040 access_time: value.access_time,
1041 replication: value.block_replication,
1042 blocksize: value.blocksize,
1043 }
1044 }
1045}
1046
1047#[derive(Debug)]
1048pub struct ContentSummary {
1049 pub length: u64,
1050 pub file_count: u64,
1051 pub directory_count: u64,
1052 pub quota: u64,
1053 pub space_consumed: u64,
1054 pub space_quota: u64,
1055}
1056
1057impl From<ContentSummaryProto> for ContentSummary {
1058 fn from(value: ContentSummaryProto) -> Self {
1059 ContentSummary {
1060 length: value.length,
1061 file_count: value.file_count,
1062 directory_count: value.directory_count,
1063 quota: value.quota,
1064 space_consumed: value.space_consumed,
1065 space_quota: value.space_quota,
1066 }
1067 }
1068}
1069
1070#[cfg(test)]
1071mod test {
1072 use std::sync::{Arc, LazyLock};
1073
1074 use tokio::runtime::Runtime;
1075 use url::Url;
1076
1077 use crate::{
1078 client::ClientBuilder,
1079 common::config::Configuration,
1080 hdfs::{protocol::NamenodeProtocol, proxy::NameServiceProxy},
1081 };
1082
1083 use super::{MountLink, MountTable};
1084
1085 static RT: LazyLock<Runtime> = LazyLock::new(|| Runtime::new().unwrap());
1086
1087 fn create_protocol(url: &str) -> Arc<NamenodeProtocol> {
1088 let proxy = NameServiceProxy::new(
1089 &Url::parse(url).unwrap(),
1090 Arc::new(Configuration::new().unwrap()),
1091 RT.handle().clone(),
1092 )
1093 .unwrap();
1094 Arc::new(NamenodeProtocol::new(proxy, RT.handle().clone()))
1095 }
1096
1097 #[test]
1098 fn test_default_fs() {
1099 assert!(
1100 ClientBuilder::new()
1101 .with_config(vec![("fs.defaultFS", "hdfs://test:9000")])
1102 .build()
1103 .is_ok()
1104 );
1105
1106 assert!(
1107 ClientBuilder::new()
1108 .with_config(vec![("fs.defaultFS", "hdfs://")])
1109 .build()
1110 .is_err()
1111 );
1112
1113 assert!(
1114 ClientBuilder::new()
1115 .with_url("hdfs://")
1116 .with_config(vec![("fs.defaultFS", "hdfs://test:9000")])
1117 .build()
1118 .is_ok()
1119 );
1120
1121 assert!(
1122 ClientBuilder::new()
1123 .with_url("hdfs://")
1124 .with_config(vec![("fs.defaultFS", "hdfs://")])
1125 .build()
1126 .is_err()
1127 );
1128
1129 assert!(
1130 ClientBuilder::new()
1131 .with_url("hdfs://")
1132 .with_config(vec![("fs.defaultFS", "viewfs://test")])
1133 .build()
1134 .is_err()
1135 );
1136 }
1137
1138 #[test]
1139 fn test_mount_link_resolve() {
1140 let protocol = create_protocol("hdfs://127.0.0.1:9000");
1141 let link = MountLink::new("/view", "/hdfs", protocol);
1142
1143 assert_eq!(link.resolve("/view/dir/file").unwrap(), "/hdfs/dir/file");
1144 assert_eq!(link.resolve("/view").unwrap(), "/hdfs");
1145 assert!(link.resolve("/hdfs/path").is_none());
1146 }
1147
1148 #[test]
1149 fn test_fallback_link() {
1150 let protocol = create_protocol("hdfs://127.0.0.1:9000");
1151 let link = MountLink::new("", "/hdfs", Arc::clone(&protocol));
1152
1153 assert_eq!(link.resolve("/path/to/file").unwrap(), "/hdfs/path/to/file");
1154 assert_eq!(link.resolve("/").unwrap(), "/hdfs/");
1155 assert_eq!(link.resolve("/hdfs/path").unwrap(), "/hdfs/hdfs/path");
1156
1157 let link = MountLink::new("", "", protocol);
1158 assert_eq!(link.resolve("/").unwrap(), "/");
1159 }
1160
1161 #[test]
1162 fn test_mount_table_resolve() {
1163 let link1 = MountLink::new(
1164 "/mount1",
1165 "/path1/nested",
1166 create_protocol("hdfs://127.0.0.1:9000"),
1167 );
1168 let link2 = MountLink::new(
1169 "/mount2",
1170 "/path2",
1171 create_protocol("hdfs://127.0.0.1:9001"),
1172 );
1173 let link3 = MountLink::new(
1174 "/mount3/nested",
1175 "/path3",
1176 create_protocol("hdfs://127.0.0.1:9002"),
1177 );
1178 let fallback = MountLink::new("/", "/path4", create_protocol("hdfs://127.0.0.1:9003"));
1179
1180 let mount_table = MountTable {
1181 mounts: vec![link1, link2, link3],
1182 fallback,
1183 };
1184
1185 let (link, resolved) = mount_table.resolve("/mount1");
1187 assert_eq!(link.viewfs_path, "/mount1");
1188 assert_eq!(resolved, "/path1/nested");
1189
1190 let (link, resolved) = mount_table.resolve("/mount1/");
1192 assert_eq!(link.viewfs_path, "/mount1");
1193 assert_eq!(resolved, "/path1/nested/");
1194
1195 let (link, resolved) = mount_table.resolve("/mount12");
1197 assert_eq!(link.viewfs_path, "");
1198 assert_eq!(resolved, "/path4/mount12");
1199
1200 let (link, resolved) = mount_table.resolve("/mount3/file");
1201 assert_eq!(link.viewfs_path, "");
1202 assert_eq!(resolved, "/path4/mount3/file");
1203
1204 let (link, resolved) = mount_table.resolve("/mount3/nested/file");
1205 assert_eq!(link.viewfs_path, "/mount3/nested");
1206 assert_eq!(resolved, "/path3/file");
1207 }
1208
1209 #[test]
1210 fn test_io_runtime() {
1211 assert!(
1212 ClientBuilder::new()
1213 .with_url("hdfs://127.0.0.1:9000")
1214 .with_io_runtime(Runtime::new().unwrap())
1215 .build()
1216 .is_ok()
1217 );
1218
1219 let rt = Runtime::new().unwrap();
1220 assert!(
1221 ClientBuilder::new()
1222 .with_url("hdfs://127.0.0.1:9000")
1223 .with_io_runtime(rt.handle().clone())
1224 .build()
1225 .is_ok()
1226 );
1227 }
1228}