1use std::collections::{HashMap, VecDeque};
2use std::sync::{Arc, OnceLock};
3
4use futures::stream::BoxStream;
5use futures::{stream, StreamExt};
6use tokio::runtime::{Handle, Runtime};
7use url::Url;
8
9use crate::acl::{AclEntry, AclStatus};
10use crate::common::config::{self, Configuration};
11use crate::ec::resolve_ec_policy;
12use crate::error::{HdfsError, Result};
13use crate::file::{FileReader, FileWriter};
14use crate::hdfs::protocol::NamenodeProtocol;
15use crate::hdfs::proxy::NameServiceProxy;
16use crate::proto::hdfs::hdfs_file_status_proto::FileType;
17
18use crate::proto::hdfs::{ContentSummaryProto, HdfsFileStatusProto};
19
20#[derive(Clone)]
21pub struct WriteOptions {
22 pub block_size: Option<u64>,
24 pub replication: Option<u32>,
26 pub permission: u32,
29 pub overwrite: bool,
32 pub create_parent: bool,
35}
36
37impl Default for WriteOptions {
38 fn default() -> Self {
39 Self {
40 block_size: None,
41 replication: None,
42 permission: 0o644,
43 overwrite: false,
44 create_parent: true,
45 }
46 }
47}
48
49impl AsRef<WriteOptions> for WriteOptions {
50 fn as_ref(&self) -> &WriteOptions {
51 self
52 }
53}
54
55impl WriteOptions {
56 pub fn block_size(mut self, block_size: u64) -> Self {
58 self.block_size = Some(block_size);
59 self
60 }
61
62 pub fn replication(mut self, replication: u32) -> Self {
64 self.replication = Some(replication);
65 self
66 }
67
68 pub fn permission(mut self, permission: u32) -> Self {
70 self.permission = permission;
71 self
72 }
73
74 pub fn overwrite(mut self, overwrite: bool) -> Self {
76 self.overwrite = overwrite;
77 self
78 }
79
80 pub fn create_parent(mut self, create_parent: bool) -> Self {
82 self.create_parent = create_parent;
83 self
84 }
85}
86
87#[derive(Debug, Clone)]
88struct MountLink {
89 viewfs_path: String,
90 hdfs_path: String,
91 protocol: Arc<NamenodeProtocol>,
92}
93
94impl MountLink {
95 fn new(viewfs_path: &str, hdfs_path: &str, protocol: Arc<NamenodeProtocol>) -> Self {
96 Self {
98 viewfs_path: viewfs_path.trim_end_matches("/").to_string(),
99 hdfs_path: hdfs_path.trim_end_matches("/").to_string(),
100 protocol,
101 }
102 }
103 fn resolve(&self, path: &str) -> Option<String> {
105 if path == self.viewfs_path {
108 Some(self.hdfs_path.clone())
109 } else {
110 path.strip_prefix(&format!("{}/", self.viewfs_path))
111 .map(|relative_path| format!("{}/{}", &self.hdfs_path, relative_path))
112 }
113 }
114}
115
116#[derive(Debug)]
117struct MountTable {
118 mounts: Vec<MountLink>,
119 fallback: MountLink,
120}
121
122impl MountTable {
123 fn resolve(&self, src: &str) -> (&MountLink, String) {
124 for link in self.mounts.iter() {
125 if let Some(resolved) = link.resolve(src) {
126 return (link, resolved);
127 }
128 }
129 (&self.fallback, self.fallback.resolve(src).unwrap())
130 }
131}
132
133#[derive(Debug)]
135pub enum IORuntime {
136 Runtime(Runtime),
137 Handle(Handle),
138}
139
140impl From<Runtime> for IORuntime {
141 fn from(value: Runtime) -> Self {
142 Self::Runtime(value)
143 }
144}
145
146impl From<Handle> for IORuntime {
147 fn from(value: Handle) -> Self {
148 Self::Handle(value)
149 }
150}
151
152impl IORuntime {
153 fn handle(&self) -> Handle {
154 match self {
155 Self::Runtime(runtime) => runtime.handle().clone(),
156 Self::Handle(handle) => handle.clone(),
157 }
158 }
159}
160
161#[derive(Default)]
198pub struct ClientBuilder {
199 url: Option<String>,
200 config: HashMap<String, String>,
201 runtime: Option<IORuntime>,
202}
203
204impl ClientBuilder {
205 pub fn new() -> Self {
207 Self::default()
208 }
209
210 pub fn with_url(mut self, url: impl Into<String>) -> Self {
212 self.url = Some(url.into());
213 self
214 }
215
216 pub fn with_config(
218 mut self,
219 config: impl IntoIterator<Item = (impl Into<String>, impl Into<String>)>,
220 ) -> Self {
221 self.config = config
222 .into_iter()
223 .map(|(k, v)| (k.into(), v.into()))
224 .collect();
225 self
226 }
227
228 pub fn with_io_runtime(mut self, runtime: impl Into<IORuntime>) -> Self {
231 self.runtime = Some(runtime.into());
232 self
233 }
234
235 pub fn build(self) -> Result<Client> {
237 let config = Configuration::new_with_config(self.config)?;
238 let url = if let Some(url) = self.url {
239 Url::parse(&url)?
240 } else {
241 Client::default_fs(&config)?
242 };
243
244 Client::build(&url, config, self.runtime)
245 }
246}
247
248#[derive(Clone, Debug)]
249enum RuntimeHolder {
250 Custom(Arc<IORuntime>),
251 Default(Arc<OnceLock<Runtime>>),
252}
253
254impl RuntimeHolder {
255 fn new(rt: Option<IORuntime>) -> Self {
256 if let Some(rt) = rt {
257 Self::Custom(Arc::new(rt))
258 } else {
259 Self::Default(Arc::new(OnceLock::new()))
260 }
261 }
262
263 fn get_handle(&self) -> Handle {
264 match self {
265 Self::Custom(rt) => rt.handle().clone(),
266 Self::Default(rt) => match Handle::try_current() {
267 Ok(handle) => handle,
268 Err(_) => rt
269 .get_or_init(|| Runtime::new().expect("Failed to create tokio runtime"))
270 .handle()
271 .clone(),
272 },
273 }
274 }
275}
276
277#[derive(Clone, Debug)]
279pub struct Client {
280 mount_table: Arc<MountTable>,
281 config: Arc<Configuration>,
282 rt_holder: RuntimeHolder,
285}
286
287impl Client {
288 #[deprecated(since = "0.12.0", note = "Use ClientBuilder instead")]
292 pub fn new(url: &str) -> Result<Self> {
293 let parsed_url = Url::parse(url)?;
294 Self::build(&parsed_url, Configuration::new()?, None)
295 }
296
297 #[deprecated(since = "0.12.0", note = "Use ClientBuilder instead")]
298 pub fn new_with_config(url: &str, config: HashMap<String, String>) -> Result<Self> {
299 let parsed_url = Url::parse(url)?;
300 Self::build(&parsed_url, Configuration::new_with_config(config)?, None)
301 }
302
303 #[deprecated(since = "0.12.0", note = "Use ClientBuilder instead")]
304 pub fn default_with_config(config: HashMap<String, String>) -> Result<Self> {
305 let config = Configuration::new_with_config(config)?;
306 Self::build(&Self::default_fs(&config)?, config, None)
307 }
308
309 fn default_fs(config: &Configuration) -> Result<Url> {
310 let url = config
311 .get(config::DEFAULT_FS)
312 .ok_or(HdfsError::InvalidArgument(format!(
313 "No {} setting found",
314 config::DEFAULT_FS
315 )))?;
316 Ok(Url::parse(url)?)
317 }
318
319 fn build(url: &Url, config: Configuration, rt: Option<IORuntime>) -> Result<Self> {
320 let resolved_url = if !url.has_host() {
321 let default_url = Self::default_fs(&config)?;
322 if url.scheme() != default_url.scheme() || !default_url.has_host() {
323 return Err(HdfsError::InvalidArgument(
324 "URL must contain a host".to_string(),
325 ));
326 }
327 default_url
328 } else {
329 url.clone()
330 };
331
332 let config = Arc::new(config);
333
334 let rt_holder = RuntimeHolder::new(rt);
335
336 let mount_table = match url.scheme() {
337 "hdfs" => {
338 let proxy = NameServiceProxy::new(
339 &resolved_url,
340 Arc::clone(&config),
341 rt_holder.get_handle(),
342 )?;
343 let protocol = Arc::new(NamenodeProtocol::new(proxy, rt_holder.get_handle()));
344
345 MountTable {
346 mounts: Vec::new(),
347 fallback: MountLink::new("/", "/", protocol),
348 }
349 }
350 "viewfs" => Self::build_mount_table(
351 resolved_url.host_str().expect("URL must have a host"),
353 Arc::clone(&config),
354 rt_holder.get_handle(),
355 )?,
356 _ => {
357 return Err(HdfsError::InvalidArgument(
358 "Only `hdfs` and `viewfs` schemes are supported".to_string(),
359 ))
360 }
361 };
362
363 Ok(Self {
364 mount_table: Arc::new(mount_table),
365 config,
366 rt_holder,
367 })
368 }
369
370 fn build_mount_table(
371 host: &str,
372 config: Arc<Configuration>,
373 handle: Handle,
374 ) -> Result<MountTable> {
375 let mut mounts: Vec<MountLink> = Vec::new();
376 let mut fallback: Option<MountLink> = None;
377
378 for (viewfs_path, hdfs_url) in config.get_mount_table(host).iter() {
379 let url = Url::parse(hdfs_url)?;
380 if !url.has_host() {
381 return Err(HdfsError::InvalidArgument(
382 "URL must contain a host".to_string(),
383 ));
384 }
385 if url.scheme() != "hdfs" {
386 return Err(HdfsError::InvalidArgument(
387 "Only hdfs mounts are supported for viewfs".to_string(),
388 ));
389 }
390 let proxy = NameServiceProxy::new(&url, Arc::clone(&config), handle.clone())?;
391 let protocol = Arc::new(NamenodeProtocol::new(proxy, handle.clone()));
392
393 if let Some(prefix) = viewfs_path {
394 mounts.push(MountLink::new(prefix, url.path(), protocol));
395 } else {
396 if fallback.is_some() {
397 return Err(HdfsError::InvalidArgument(
398 "Multiple viewfs fallback links found".to_string(),
399 ));
400 }
401 fallback = Some(MountLink::new("/", url.path(), protocol));
402 }
403 }
404
405 if let Some(fallback) = fallback {
406 mounts.sort_by_key(|m| m.viewfs_path.chars().filter(|c| *c == '/').count());
408 mounts.reverse();
409
410 Ok(MountTable { mounts, fallback })
411 } else {
412 Err(HdfsError::InvalidArgument(
413 "No viewfs fallback mount found".to_string(),
414 ))
415 }
416 }
417
418 pub async fn get_file_info(&self, path: &str) -> Result<FileStatus> {
420 let (link, resolved_path) = self.mount_table.resolve(path);
421 match link.protocol.get_file_info(&resolved_path).await?.fs {
422 Some(status) => Ok(FileStatus::from(status, path)),
423 None => Err(HdfsError::FileNotFound(path.to_string())),
424 }
425 }
426
427 pub async fn list_status(&self, path: &str, recursive: bool) -> Result<Vec<FileStatus>> {
430 let iter = self.list_status_iter(path, recursive);
431 let statuses = iter
432 .into_stream()
433 .collect::<Vec<Result<FileStatus>>>()
434 .await;
435
436 let mut resolved_statues = Vec::<FileStatus>::with_capacity(statuses.len());
437 for status in statuses.into_iter() {
438 resolved_statues.push(status?);
439 }
440
441 Ok(resolved_statues)
442 }
443
444 pub fn list_status_iter(&self, path: &str, recursive: bool) -> ListStatusIterator {
446 ListStatusIterator::new(path.to_string(), Arc::clone(&self.mount_table), recursive)
447 }
448
449 pub async fn read(&self, path: &str) -> Result<FileReader> {
451 let (link, resolved_path) = self.mount_table.resolve(path);
452 let located_info = link
454 .protocol
455 .get_block_locations(&resolved_path, 0, i64::MAX as u64)
456 .await?;
457
458 if let Some(locations) = located_info.locations {
459 let ec_schema = if let Some(ec_policy) = locations.ec_policy.as_ref() {
460 Some(resolve_ec_policy(ec_policy)?)
461 } else {
462 None
463 };
464
465 if locations.file_encryption_info.is_some() {
466 return Err(HdfsError::UnsupportedFeature("File encryption".to_string()));
467 }
468
469 Ok(FileReader::new(
470 Arc::clone(&link.protocol),
471 locations,
472 ec_schema,
473 Arc::clone(&self.config),
474 self.rt_holder.get_handle(),
475 ))
476 } else {
477 Err(HdfsError::FileNotFound(path.to_string()))
478 }
479 }
480
481 pub async fn create(
484 &self,
485 src: &str,
486 write_options: impl AsRef<WriteOptions>,
487 ) -> Result<FileWriter> {
488 let write_options = write_options.as_ref();
489
490 let (link, resolved_path) = self.mount_table.resolve(src);
491
492 let create_response = link
493 .protocol
494 .create(
495 &resolved_path,
496 write_options.permission,
497 write_options.overwrite,
498 write_options.create_parent,
499 write_options.replication,
500 write_options.block_size,
501 )
502 .await?;
503
504 match create_response.fs {
505 Some(status) => {
506 if status.file_encryption_info.is_some() {
507 let _ = self.delete(src, false).await;
508 return Err(HdfsError::UnsupportedFeature("File encryption".to_string()));
509 }
510
511 Ok(FileWriter::new(
512 Arc::clone(&link.protocol),
513 resolved_path,
514 status,
515 Arc::clone(&self.config),
516 self.rt_holder.get_handle(),
517 None,
518 ))
519 }
520 None => Err(HdfsError::FileNotFound(src.to_string())),
521 }
522 }
523
524 fn needs_new_block(class: &str, msg: &str) -> bool {
525 class == "java.lang.UnsupportedOperationException" && msg.contains("NEW_BLOCK")
526 }
527
528 pub async fn append(&self, src: &str) -> Result<FileWriter> {
532 let (link, resolved_path) = self.mount_table.resolve(src);
533
534 let append_response = match link.protocol.append(&resolved_path, false).await {
537 Err(HdfsError::RPCError(class, msg)) if Self::needs_new_block(&class, &msg) => {
538 link.protocol.append(&resolved_path, true).await?
539 }
540 resp => resp?,
541 };
542
543 match append_response.stat {
544 Some(status) => {
545 if status.file_encryption_info.is_some() {
546 let _ = link
547 .protocol
548 .complete(src, append_response.block.map(|b| b.b), status.file_id)
549 .await;
550 return Err(HdfsError::UnsupportedFeature("File encryption".to_string()));
551 }
552
553 Ok(FileWriter::new(
554 Arc::clone(&link.protocol),
555 resolved_path,
556 status,
557 Arc::clone(&self.config),
558 self.rt_holder.get_handle(),
559 append_response.block,
560 ))
561 }
562 None => Err(HdfsError::FileNotFound(src.to_string())),
563 }
564 }
565
566 pub async fn mkdirs(&self, path: &str, permission: u32, create_parent: bool) -> Result<()> {
574 let (link, resolved_path) = self.mount_table.resolve(path);
575 link.protocol
576 .mkdirs(&resolved_path, permission, create_parent)
577 .await
578 .map(|_| ())
579 }
580
581 pub async fn rename(&self, src: &str, dst: &str, overwrite: bool) -> Result<()> {
583 let (src_link, src_resolved_path) = self.mount_table.resolve(src);
584 let (dst_link, dst_resolved_path) = self.mount_table.resolve(dst);
585 if src_link.viewfs_path == dst_link.viewfs_path {
586 src_link
587 .protocol
588 .rename(&src_resolved_path, &dst_resolved_path, overwrite)
589 .await
590 .map(|_| ())
591 } else {
592 Err(HdfsError::InvalidArgument(
593 "Cannot rename across different name services".to_string(),
594 ))
595 }
596 }
597
598 pub async fn delete(&self, path: &str, recursive: bool) -> Result<bool> {
601 let (link, resolved_path) = self.mount_table.resolve(path);
602 link.protocol
603 .delete(&resolved_path, recursive)
604 .await
605 .map(|r| r.result)
606 }
607
608 pub async fn set_times(&self, path: &str, mtime: u64, atime: u64) -> Result<()> {
610 let (link, resolved_path) = self.mount_table.resolve(path);
611 link.protocol
612 .set_times(&resolved_path, mtime, atime)
613 .await?;
614 Ok(())
615 }
616
617 pub async fn set_owner(
619 &self,
620 path: &str,
621 owner: Option<&str>,
622 group: Option<&str>,
623 ) -> Result<()> {
624 let (link, resolved_path) = self.mount_table.resolve(path);
625 link.protocol
626 .set_owner(&resolved_path, owner, group)
627 .await?;
628 Ok(())
629 }
630
631 pub async fn set_permission(&self, path: &str, permission: u32) -> Result<()> {
642 let (link, resolved_path) = self.mount_table.resolve(path);
643 link.protocol
644 .set_permission(&resolved_path, permission)
645 .await?;
646 Ok(())
647 }
648
649 pub async fn set_replication(&self, path: &str, replication: u32) -> Result<bool> {
651 let (link, resolved_path) = self.mount_table.resolve(path);
652 let result = link
653 .protocol
654 .set_replication(&resolved_path, replication)
655 .await?
656 .result;
657
658 Ok(result)
659 }
660
661 pub async fn get_content_summary(&self, path: &str) -> Result<ContentSummary> {
663 let (link, resolved_path) = self.mount_table.resolve(path);
664 let result = link
665 .protocol
666 .get_content_summary(&resolved_path)
667 .await?
668 .summary;
669
670 Ok(result.into())
671 }
672
673 pub async fn modify_acl_entries(&self, path: &str, acl_spec: Vec<AclEntry>) -> Result<()> {
675 let (link, resolved_path) = self.mount_table.resolve(path);
676 link.protocol
677 .modify_acl_entries(&resolved_path, acl_spec)
678 .await?;
679
680 Ok(())
681 }
682
683 pub async fn remove_acl_entries(&self, path: &str, acl_spec: Vec<AclEntry>) -> Result<()> {
685 let (link, resolved_path) = self.mount_table.resolve(path);
686 link.protocol
687 .remove_acl_entries(&resolved_path, acl_spec)
688 .await?;
689
690 Ok(())
691 }
692
693 pub async fn remove_default_acl(&self, path: &str) -> Result<()> {
695 let (link, resolved_path) = self.mount_table.resolve(path);
696 link.protocol.remove_default_acl(&resolved_path).await?;
697
698 Ok(())
699 }
700
701 pub async fn remove_acl(&self, path: &str) -> Result<()> {
703 let (link, resolved_path) = self.mount_table.resolve(path);
704 link.protocol.remove_acl(&resolved_path).await?;
705
706 Ok(())
707 }
708
709 pub async fn set_acl(&self, path: &str, acl_spec: Vec<AclEntry>) -> Result<()> {
713 let (link, resolved_path) = self.mount_table.resolve(path);
714 link.protocol.set_acl(&resolved_path, acl_spec).await?;
715
716 Ok(())
717 }
718
719 pub async fn get_acl_status(&self, path: &str) -> Result<AclStatus> {
721 let (link, resolved_path) = self.mount_table.resolve(path);
722 Ok(link
723 .protocol
724 .get_acl_status(&resolved_path)
725 .await?
726 .result
727 .into())
728 }
729}
730
731impl Default for Client {
732 fn default() -> Self {
735 ClientBuilder::new()
736 .build()
737 .expect("Failed to create default client")
738 }
739}
740
741pub(crate) struct DirListingIterator {
742 path: String,
743 resolved_path: String,
744 link: MountLink,
745 files_only: bool,
746 partial_listing: VecDeque<HdfsFileStatusProto>,
747 remaining: u32,
748 last_seen: Vec<u8>,
749}
750
751impl DirListingIterator {
752 fn new(path: String, mount_table: &Arc<MountTable>, files_only: bool) -> Self {
753 let (link, resolved_path) = mount_table.resolve(&path);
754
755 DirListingIterator {
756 path,
757 resolved_path,
758 link: link.clone(),
759 files_only,
760 partial_listing: VecDeque::new(),
761 remaining: 1,
762 last_seen: Vec::new(),
763 }
764 }
765
766 async fn get_next_batch(&mut self) -> Result<bool> {
767 let listing = self
768 .link
769 .protocol
770 .get_listing(&self.resolved_path, self.last_seen.clone(), false)
771 .await?;
772
773 if let Some(dir_list) = listing.dir_list {
774 self.last_seen = dir_list
775 .partial_listing
776 .last()
777 .map(|p| p.path.clone())
778 .unwrap_or(Vec::new());
779
780 self.remaining = dir_list.remaining_entries;
781
782 self.partial_listing = dir_list
783 .partial_listing
784 .into_iter()
785 .filter(|s| !self.files_only || s.file_type() != FileType::IsDir)
786 .collect();
787 Ok(!self.partial_listing.is_empty())
788 } else {
789 Err(HdfsError::FileNotFound(self.path.clone()))
790 }
791 }
792
793 pub async fn next(&mut self) -> Option<Result<FileStatus>> {
794 if self.partial_listing.is_empty() && self.remaining > 0 {
795 if let Err(error) = self.get_next_batch().await {
796 self.remaining = 0;
797 return Some(Err(error));
798 }
799 }
800 if let Some(next) = self.partial_listing.pop_front() {
801 Some(Ok(FileStatus::from(next, &self.path)))
802 } else {
803 None
804 }
805 }
806}
807
808pub struct ListStatusIterator {
809 mount_table: Arc<MountTable>,
810 recursive: bool,
811 iters: Arc<tokio::sync::Mutex<Vec<DirListingIterator>>>,
812}
813
814impl ListStatusIterator {
815 fn new(path: String, mount_table: Arc<MountTable>, recursive: bool) -> Self {
816 let initial = DirListingIterator::new(path.clone(), &mount_table, false);
817
818 ListStatusIterator {
819 mount_table,
820 recursive,
821 iters: Arc::new(tokio::sync::Mutex::new(vec![initial])),
822 }
823 }
824
825 pub async fn next(&self) -> Option<Result<FileStatus>> {
826 let mut next_file: Option<Result<FileStatus>> = None;
827 let mut iters = self.iters.lock().await;
828 while next_file.is_none() {
829 if let Some(iter) = iters.last_mut() {
830 if let Some(file_result) = iter.next().await {
831 if let Ok(file) = file_result {
832 if file.isdir && self.recursive {
835 iters.push(DirListingIterator::new(
836 file.path.clone(),
837 &self.mount_table,
838 false,
839 ))
840 }
841 next_file = Some(Ok(file));
842 } else {
843 next_file = Some(file_result)
845 }
846 } else {
847 iters.pop();
849 }
850 } else {
851 break;
853 }
854 }
855
856 next_file
857 }
858
859 pub fn into_stream(self) -> BoxStream<'static, Result<FileStatus>> {
860 let listing = stream::unfold(self, |state| async move {
861 let next = state.next().await;
862 next.map(|n| (n, state))
863 });
864 Box::pin(listing)
865 }
866}
867
868#[derive(Debug)]
869pub struct FileStatus {
870 pub path: String,
871 pub length: usize,
872 pub isdir: bool,
873 pub permission: u16,
874 pub owner: String,
875 pub group: String,
876 pub modification_time: u64,
877 pub access_time: u64,
878 pub replication: Option<u32>,
879 pub blocksize: Option<u64>,
880}
881
882impl FileStatus {
883 fn from(value: HdfsFileStatusProto, base_path: &str) -> Self {
884 let mut path = base_path.trim_end_matches("/").to_string();
885 let relative_path = std::str::from_utf8(&value.path).unwrap();
886 if !relative_path.is_empty() {
887 path.push('/');
888 path.push_str(relative_path);
889 }
890
891 FileStatus {
892 isdir: value.file_type() == FileType::IsDir,
893 path,
894 length: value.length as usize,
895 permission: value.permission.perm as u16,
896 owner: value.owner,
897 group: value.group,
898 modification_time: value.modification_time,
899 access_time: value.access_time,
900 replication: value.block_replication,
901 blocksize: value.blocksize,
902 }
903 }
904}
905
906#[derive(Debug)]
907pub struct ContentSummary {
908 pub length: u64,
909 pub file_count: u64,
910 pub directory_count: u64,
911 pub quota: u64,
912 pub space_consumed: u64,
913 pub space_quota: u64,
914}
915
916impl From<ContentSummaryProto> for ContentSummary {
917 fn from(value: ContentSummaryProto) -> Self {
918 ContentSummary {
919 length: value.length,
920 file_count: value.file_count,
921 directory_count: value.directory_count,
922 quota: value.quota,
923 space_consumed: value.space_consumed,
924 space_quota: value.space_quota,
925 }
926 }
927}
928
929#[cfg(test)]
930mod test {
931 use std::sync::{Arc, LazyLock};
932
933 use tokio::runtime::Runtime;
934 use url::Url;
935
936 use crate::{
937 client::ClientBuilder,
938 common::config::Configuration,
939 hdfs::{protocol::NamenodeProtocol, proxy::NameServiceProxy},
940 };
941
942 use super::{MountLink, MountTable};
943
944 static RT: LazyLock<Runtime> = LazyLock::new(|| Runtime::new().unwrap());
945
946 fn create_protocol(url: &str) -> Arc<NamenodeProtocol> {
947 let proxy = NameServiceProxy::new(
948 &Url::parse(url).unwrap(),
949 Arc::new(Configuration::new().unwrap()),
950 RT.handle().clone(),
951 )
952 .unwrap();
953 Arc::new(NamenodeProtocol::new(proxy, RT.handle().clone()))
954 }
955
956 #[test]
957 fn test_default_fs() {
958 assert!(ClientBuilder::new()
959 .with_config(vec![("fs.defaultFS", "hdfs://test:9000")])
960 .build()
961 .is_ok());
962
963 assert!(ClientBuilder::new()
964 .with_config(vec![("fs.defaultFS", "hdfs://")])
965 .build()
966 .is_err());
967
968 assert!(ClientBuilder::new()
969 .with_url("hdfs://")
970 .with_config(vec![("fs.defaultFS", "hdfs://test:9000")])
971 .build()
972 .is_ok());
973
974 assert!(ClientBuilder::new()
975 .with_url("hdfs://")
976 .with_config(vec![("fs.defaultFS", "hdfs://")])
977 .build()
978 .is_err());
979
980 assert!(ClientBuilder::new()
981 .with_url("hdfs://")
982 .with_config(vec![("fs.defaultFS", "viewfs://test")])
983 .build()
984 .is_err());
985 }
986
987 #[test]
988 fn test_mount_link_resolve() {
989 let protocol = create_protocol("hdfs://127.0.0.1:9000");
990 let link = MountLink::new("/view", "/hdfs", protocol);
991
992 assert_eq!(link.resolve("/view/dir/file").unwrap(), "/hdfs/dir/file");
993 assert_eq!(link.resolve("/view").unwrap(), "/hdfs");
994 assert!(link.resolve("/hdfs/path").is_none());
995 }
996
997 #[test]
998 fn test_fallback_link() {
999 let protocol = create_protocol("hdfs://127.0.0.1:9000");
1000 let link = MountLink::new("", "/hdfs", Arc::clone(&protocol));
1001
1002 assert_eq!(link.resolve("/path/to/file").unwrap(), "/hdfs/path/to/file");
1003 assert_eq!(link.resolve("/").unwrap(), "/hdfs/");
1004 assert_eq!(link.resolve("/hdfs/path").unwrap(), "/hdfs/hdfs/path");
1005
1006 let link = MountLink::new("", "", protocol);
1007 assert_eq!(link.resolve("/").unwrap(), "/");
1008 }
1009
1010 #[test]
1011 fn test_mount_table_resolve() {
1012 let link1 = MountLink::new(
1013 "/mount1",
1014 "/path1/nested",
1015 create_protocol("hdfs://127.0.0.1:9000"),
1016 );
1017 let link2 = MountLink::new(
1018 "/mount2",
1019 "/path2",
1020 create_protocol("hdfs://127.0.0.1:9001"),
1021 );
1022 let link3 = MountLink::new(
1023 "/mount3/nested",
1024 "/path3",
1025 create_protocol("hdfs://127.0.0.1:9002"),
1026 );
1027 let fallback = MountLink::new("/", "/path4", create_protocol("hdfs://127.0.0.1:9003"));
1028
1029 let mount_table = MountTable {
1030 mounts: vec![link1, link2, link3],
1031 fallback,
1032 };
1033
1034 let (link, resolved) = mount_table.resolve("/mount1");
1036 assert_eq!(link.viewfs_path, "/mount1");
1037 assert_eq!(resolved, "/path1/nested");
1038
1039 let (link, resolved) = mount_table.resolve("/mount1/");
1041 assert_eq!(link.viewfs_path, "/mount1");
1042 assert_eq!(resolved, "/path1/nested/");
1043
1044 let (link, resolved) = mount_table.resolve("/mount12");
1046 assert_eq!(link.viewfs_path, "");
1047 assert_eq!(resolved, "/path4/mount12");
1048
1049 let (link, resolved) = mount_table.resolve("/mount3/file");
1050 assert_eq!(link.viewfs_path, "");
1051 assert_eq!(resolved, "/path4/mount3/file");
1052
1053 let (link, resolved) = mount_table.resolve("/mount3/nested/file");
1054 assert_eq!(link.viewfs_path, "/mount3/nested");
1055 assert_eq!(resolved, "/path3/file");
1056 }
1057
1058 #[test]
1059 fn test_io_runtime() {
1060 assert!(ClientBuilder::new()
1061 .with_url("hdfs://127.0.0.1:9000")
1062 .with_io_runtime(Runtime::new().unwrap())
1063 .build()
1064 .is_ok());
1065
1066 let rt = Runtime::new().unwrap();
1067 assert!(ClientBuilder::new()
1068 .with_url("hdfs://127.0.0.1:9000")
1069 .with_io_runtime(rt.handle().clone())
1070 .build()
1071 .is_ok());
1072 }
1073}