1use std::collections::{HashMap, VecDeque};
2use std::sync::{Arc, OnceLock};
3
4use futures::stream::BoxStream;
5use futures::{stream, StreamExt};
6use tokio::runtime::{Handle, Runtime};
7use url::Url;
8
9use crate::acl::{AclEntry, AclStatus};
10use crate::common::config::{self, Configuration};
11use crate::ec::resolve_ec_policy;
12use crate::error::{HdfsError, Result};
13use crate::file::{FileReader, FileWriter};
14use crate::hdfs::protocol::NamenodeProtocol;
15use crate::hdfs::proxy::NameServiceProxy;
16use crate::proto::hdfs::hdfs_file_status_proto::FileType;
17
18use crate::proto::hdfs::{ContentSummaryProto, HdfsFileStatusProto};
19
20#[derive(Clone)]
21pub struct WriteOptions {
22 pub block_size: Option<u64>,
24 pub replication: Option<u32>,
26 pub permission: u32,
29 pub overwrite: bool,
32 pub create_parent: bool,
35}
36
37impl Default for WriteOptions {
38 fn default() -> Self {
39 Self {
40 block_size: None,
41 replication: None,
42 permission: 0o644,
43 overwrite: false,
44 create_parent: true,
45 }
46 }
47}
48
49impl AsRef<WriteOptions> for WriteOptions {
50 fn as_ref(&self) -> &WriteOptions {
51 self
52 }
53}
54
55impl WriteOptions {
56 pub fn block_size(mut self, block_size: u64) -> Self {
58 self.block_size = Some(block_size);
59 self
60 }
61
62 pub fn replication(mut self, replication: u32) -> Self {
64 self.replication = Some(replication);
65 self
66 }
67
68 pub fn permission(mut self, permission: u32) -> Self {
70 self.permission = permission;
71 self
72 }
73
74 pub fn overwrite(mut self, overwrite: bool) -> Self {
76 self.overwrite = overwrite;
77 self
78 }
79
80 pub fn create_parent(mut self, create_parent: bool) -> Self {
82 self.create_parent = create_parent;
83 self
84 }
85}
86
87#[derive(Debug, Clone)]
88struct MountLink {
89 viewfs_path: String,
90 hdfs_path: String,
91 protocol: Arc<NamenodeProtocol>,
92}
93
94impl MountLink {
95 fn new(viewfs_path: &str, hdfs_path: &str, protocol: Arc<NamenodeProtocol>) -> Self {
96 Self {
98 viewfs_path: viewfs_path.trim_end_matches("/").to_string(),
99 hdfs_path: hdfs_path.trim_end_matches("/").to_string(),
100 protocol,
101 }
102 }
103 fn resolve(&self, path: &str) -> Option<String> {
105 if path == self.viewfs_path {
108 Some(self.hdfs_path.clone())
109 } else {
110 path.strip_prefix(&format!("{}/", self.viewfs_path))
111 .map(|relative_path| format!("{}/{}", &self.hdfs_path, relative_path))
112 }
113 }
114}
115
116#[derive(Debug)]
117struct MountTable {
118 mounts: Vec<MountLink>,
119 fallback: MountLink,
120}
121
122impl MountTable {
123 fn resolve(&self, src: &str) -> (&MountLink, String) {
124 for link in self.mounts.iter() {
125 if let Some(resolved) = link.resolve(src) {
126 return (link, resolved);
127 }
128 }
129 (&self.fallback, self.fallback.resolve(src).unwrap())
130 }
131}
132
133#[derive(Default)]
170pub struct ClientBuilder {
171 url: Option<String>,
172 config: HashMap<String, String>,
173 runtime: Option<Runtime>,
174}
175
176impl ClientBuilder {
177 pub fn new() -> Self {
179 Self::default()
180 }
181
182 pub fn with_url(mut self, url: impl Into<String>) -> Self {
184 self.url = Some(url.into());
185 self
186 }
187
188 pub fn with_config(
190 mut self,
191 config: impl IntoIterator<Item = (impl Into<String>, impl Into<String>)>,
192 ) -> Self {
193 self.config = config
194 .into_iter()
195 .map(|(k, v)| (k.into(), v.into()))
196 .collect();
197 self
198 }
199
200 pub fn with_io_runtime(mut self, runtime: Runtime) -> Self {
202 self.runtime = Some(runtime);
203 self
204 }
205
206 pub fn build(self) -> Result<Client> {
208 let config = Configuration::new_with_config(self.config)?;
209 let url = if let Some(url) = self.url {
210 Url::parse(&url)?
211 } else {
212 Client::default_fs(&config)?
213 };
214
215 Client::build(&url, config, self.runtime)
216 }
217}
218
219#[derive(Clone, Debug)]
220enum RuntimeHolder {
221 Custom(Arc<Runtime>),
222 Default(Arc<OnceLock<Runtime>>),
223}
224
225impl RuntimeHolder {
226 fn new(rt: Option<Runtime>) -> Self {
227 if let Some(rt) = rt {
228 Self::Custom(Arc::new(rt))
229 } else {
230 Self::Default(Arc::new(OnceLock::new()))
231 }
232 }
233
234 fn get_handle(&self) -> Handle {
235 match self {
236 Self::Custom(rt) => rt.handle().clone(),
237 Self::Default(rt) => match Handle::try_current() {
238 Ok(handle) => handle,
239 Err(_) => rt
240 .get_or_init(|| Runtime::new().expect("Failed to create tokio runtime"))
241 .handle()
242 .clone(),
243 },
244 }
245 }
246}
247
248#[derive(Clone, Debug)]
250pub struct Client {
251 mount_table: Arc<MountTable>,
252 config: Arc<Configuration>,
253 rt_holder: RuntimeHolder,
256}
257
258impl Client {
259 #[deprecated(since = "0.12.0", note = "Use ClientBuilder instead")]
263 pub fn new(url: &str) -> Result<Self> {
264 let parsed_url = Url::parse(url)?;
265 Self::build(&parsed_url, Configuration::new()?, None)
266 }
267
268 #[deprecated(since = "0.12.0", note = "Use ClientBuilder instead")]
269 pub fn new_with_config(url: &str, config: HashMap<String, String>) -> Result<Self> {
270 let parsed_url = Url::parse(url)?;
271 Self::build(&parsed_url, Configuration::new_with_config(config)?, None)
272 }
273
274 #[deprecated(since = "0.12.0", note = "Use ClientBuilder instead")]
275 pub fn default_with_config(config: HashMap<String, String>) -> Result<Self> {
276 let config = Configuration::new_with_config(config)?;
277 Self::build(&Self::default_fs(&config)?, config, None)
278 }
279
280 fn default_fs(config: &Configuration) -> Result<Url> {
281 let url = config
282 .get(config::DEFAULT_FS)
283 .ok_or(HdfsError::InvalidArgument(format!(
284 "No {} setting found",
285 config::DEFAULT_FS
286 )))?;
287 Ok(Url::parse(url)?)
288 }
289
290 fn build(url: &Url, config: Configuration, rt: Option<Runtime>) -> Result<Self> {
291 let resolved_url = if !url.has_host() {
292 let default_url = Self::default_fs(&config)?;
293 if url.scheme() != default_url.scheme() || !default_url.has_host() {
294 return Err(HdfsError::InvalidArgument(
295 "URL must contain a host".to_string(),
296 ));
297 }
298 default_url
299 } else {
300 url.clone()
301 };
302
303 let config = Arc::new(config);
304
305 let rt_holder = RuntimeHolder::new(rt);
306
307 let mount_table = match url.scheme() {
308 "hdfs" => {
309 let proxy = NameServiceProxy::new(
310 &resolved_url,
311 Arc::clone(&config),
312 rt_holder.get_handle(),
313 )?;
314 let protocol = Arc::new(NamenodeProtocol::new(proxy, rt_holder.get_handle()));
315
316 MountTable {
317 mounts: Vec::new(),
318 fallback: MountLink::new("/", "/", protocol),
319 }
320 }
321 "viewfs" => Self::build_mount_table(
322 resolved_url.host_str().expect("URL must have a host"),
324 Arc::clone(&config),
325 rt_holder.get_handle(),
326 )?,
327 _ => {
328 return Err(HdfsError::InvalidArgument(
329 "Only `hdfs` and `viewfs` schemes are supported".to_string(),
330 ))
331 }
332 };
333
334 Ok(Self {
335 mount_table: Arc::new(mount_table),
336 config,
337 rt_holder,
338 })
339 }
340
341 fn build_mount_table(
342 host: &str,
343 config: Arc<Configuration>,
344 handle: Handle,
345 ) -> Result<MountTable> {
346 let mut mounts: Vec<MountLink> = Vec::new();
347 let mut fallback: Option<MountLink> = None;
348
349 for (viewfs_path, hdfs_url) in config.get_mount_table(host).iter() {
350 let url = Url::parse(hdfs_url)?;
351 if !url.has_host() {
352 return Err(HdfsError::InvalidArgument(
353 "URL must contain a host".to_string(),
354 ));
355 }
356 if url.scheme() != "hdfs" {
357 return Err(HdfsError::InvalidArgument(
358 "Only hdfs mounts are supported for viewfs".to_string(),
359 ));
360 }
361 let proxy = NameServiceProxy::new(&url, Arc::clone(&config), handle.clone())?;
362 let protocol = Arc::new(NamenodeProtocol::new(proxy, handle.clone()));
363
364 if let Some(prefix) = viewfs_path {
365 mounts.push(MountLink::new(prefix, url.path(), protocol));
366 } else {
367 if fallback.is_some() {
368 return Err(HdfsError::InvalidArgument(
369 "Multiple viewfs fallback links found".to_string(),
370 ));
371 }
372 fallback = Some(MountLink::new("/", url.path(), protocol));
373 }
374 }
375
376 if let Some(fallback) = fallback {
377 mounts.sort_by_key(|m| m.viewfs_path.chars().filter(|c| *c == '/').count());
379 mounts.reverse();
380
381 Ok(MountTable { mounts, fallback })
382 } else {
383 Err(HdfsError::InvalidArgument(
384 "No viewfs fallback mount found".to_string(),
385 ))
386 }
387 }
388
389 pub async fn get_file_info(&self, path: &str) -> Result<FileStatus> {
391 let (link, resolved_path) = self.mount_table.resolve(path);
392 match link.protocol.get_file_info(&resolved_path).await?.fs {
393 Some(status) => Ok(FileStatus::from(status, path)),
394 None => Err(HdfsError::FileNotFound(path.to_string())),
395 }
396 }
397
398 pub async fn list_status(&self, path: &str, recursive: bool) -> Result<Vec<FileStatus>> {
401 let iter = self.list_status_iter(path, recursive);
402 let statuses = iter
403 .into_stream()
404 .collect::<Vec<Result<FileStatus>>>()
405 .await;
406
407 let mut resolved_statues = Vec::<FileStatus>::with_capacity(statuses.len());
408 for status in statuses.into_iter() {
409 resolved_statues.push(status?);
410 }
411
412 Ok(resolved_statues)
413 }
414
415 pub fn list_status_iter(&self, path: &str, recursive: bool) -> ListStatusIterator {
417 ListStatusIterator::new(path.to_string(), Arc::clone(&self.mount_table), recursive)
418 }
419
420 pub async fn read(&self, path: &str) -> Result<FileReader> {
422 let (link, resolved_path) = self.mount_table.resolve(path);
423 let located_info = link
425 .protocol
426 .get_block_locations(&resolved_path, 0, i64::MAX as u64)
427 .await?;
428
429 if let Some(locations) = located_info.locations {
430 let ec_schema = if let Some(ec_policy) = locations.ec_policy.as_ref() {
431 Some(resolve_ec_policy(ec_policy)?)
432 } else {
433 None
434 };
435
436 if locations.file_encryption_info.is_some() {
437 return Err(HdfsError::UnsupportedFeature("File encryption".to_string()));
438 }
439
440 Ok(FileReader::new(
441 Arc::clone(&link.protocol),
442 locations,
443 ec_schema,
444 Arc::clone(&self.config),
445 self.rt_holder.get_handle(),
446 ))
447 } else {
448 Err(HdfsError::FileNotFound(path.to_string()))
449 }
450 }
451
452 pub async fn create(
455 &self,
456 src: &str,
457 write_options: impl AsRef<WriteOptions>,
458 ) -> Result<FileWriter> {
459 let write_options = write_options.as_ref();
460
461 let (link, resolved_path) = self.mount_table.resolve(src);
462
463 let create_response = link
464 .protocol
465 .create(
466 &resolved_path,
467 write_options.permission,
468 write_options.overwrite,
469 write_options.create_parent,
470 write_options.replication,
471 write_options.block_size,
472 )
473 .await?;
474
475 match create_response.fs {
476 Some(status) => {
477 if status.file_encryption_info.is_some() {
478 let _ = self.delete(src, false).await;
479 return Err(HdfsError::UnsupportedFeature("File encryption".to_string()));
480 }
481
482 Ok(FileWriter::new(
483 Arc::clone(&link.protocol),
484 resolved_path,
485 status,
486 Arc::clone(&self.config),
487 self.rt_holder.get_handle(),
488 None,
489 ))
490 }
491 None => Err(HdfsError::FileNotFound(src.to_string())),
492 }
493 }
494
495 fn needs_new_block(class: &str, msg: &str) -> bool {
496 class == "java.lang.UnsupportedOperationException" && msg.contains("NEW_BLOCK")
497 }
498
499 pub async fn append(&self, src: &str) -> Result<FileWriter> {
503 let (link, resolved_path) = self.mount_table.resolve(src);
504
505 let append_response = match link.protocol.append(&resolved_path, false).await {
508 Err(HdfsError::RPCError(class, msg)) if Self::needs_new_block(&class, &msg) => {
509 link.protocol.append(&resolved_path, true).await?
510 }
511 resp => resp?,
512 };
513
514 match append_response.stat {
515 Some(status) => {
516 if status.file_encryption_info.is_some() {
517 let _ = link
518 .protocol
519 .complete(src, append_response.block.map(|b| b.b), status.file_id)
520 .await;
521 return Err(HdfsError::UnsupportedFeature("File encryption".to_string()));
522 }
523
524 Ok(FileWriter::new(
525 Arc::clone(&link.protocol),
526 resolved_path,
527 status,
528 Arc::clone(&self.config),
529 self.rt_holder.get_handle(),
530 append_response.block,
531 ))
532 }
533 None => Err(HdfsError::FileNotFound(src.to_string())),
534 }
535 }
536
537 pub async fn mkdirs(&self, path: &str, permission: u32, create_parent: bool) -> Result<()> {
545 let (link, resolved_path) = self.mount_table.resolve(path);
546 link.protocol
547 .mkdirs(&resolved_path, permission, create_parent)
548 .await
549 .map(|_| ())
550 }
551
552 pub async fn rename(&self, src: &str, dst: &str, overwrite: bool) -> Result<()> {
554 let (src_link, src_resolved_path) = self.mount_table.resolve(src);
555 let (dst_link, dst_resolved_path) = self.mount_table.resolve(dst);
556 if src_link.viewfs_path == dst_link.viewfs_path {
557 src_link
558 .protocol
559 .rename(&src_resolved_path, &dst_resolved_path, overwrite)
560 .await
561 .map(|_| ())
562 } else {
563 Err(HdfsError::InvalidArgument(
564 "Cannot rename across different name services".to_string(),
565 ))
566 }
567 }
568
569 pub async fn delete(&self, path: &str, recursive: bool) -> Result<bool> {
572 let (link, resolved_path) = self.mount_table.resolve(path);
573 link.protocol
574 .delete(&resolved_path, recursive)
575 .await
576 .map(|r| r.result)
577 }
578
579 pub async fn set_times(&self, path: &str, mtime: u64, atime: u64) -> Result<()> {
581 let (link, resolved_path) = self.mount_table.resolve(path);
582 link.protocol
583 .set_times(&resolved_path, mtime, atime)
584 .await?;
585 Ok(())
586 }
587
588 pub async fn set_owner(
590 &self,
591 path: &str,
592 owner: Option<&str>,
593 group: Option<&str>,
594 ) -> Result<()> {
595 let (link, resolved_path) = self.mount_table.resolve(path);
596 link.protocol
597 .set_owner(&resolved_path, owner, group)
598 .await?;
599 Ok(())
600 }
601
602 pub async fn set_permission(&self, path: &str, permission: u32) -> Result<()> {
613 let (link, resolved_path) = self.mount_table.resolve(path);
614 link.protocol
615 .set_permission(&resolved_path, permission)
616 .await?;
617 Ok(())
618 }
619
620 pub async fn set_replication(&self, path: &str, replication: u32) -> Result<bool> {
622 let (link, resolved_path) = self.mount_table.resolve(path);
623 let result = link
624 .protocol
625 .set_replication(&resolved_path, replication)
626 .await?
627 .result;
628
629 Ok(result)
630 }
631
632 pub async fn get_content_summary(&self, path: &str) -> Result<ContentSummary> {
634 let (link, resolved_path) = self.mount_table.resolve(path);
635 let result = link
636 .protocol
637 .get_content_summary(&resolved_path)
638 .await?
639 .summary;
640
641 Ok(result.into())
642 }
643
644 pub async fn modify_acl_entries(&self, path: &str, acl_spec: Vec<AclEntry>) -> Result<()> {
646 let (link, resolved_path) = self.mount_table.resolve(path);
647 link.protocol
648 .modify_acl_entries(&resolved_path, acl_spec)
649 .await?;
650
651 Ok(())
652 }
653
654 pub async fn remove_acl_entries(&self, path: &str, acl_spec: Vec<AclEntry>) -> Result<()> {
656 let (link, resolved_path) = self.mount_table.resolve(path);
657 link.protocol
658 .remove_acl_entries(&resolved_path, acl_spec)
659 .await?;
660
661 Ok(())
662 }
663
664 pub async fn remove_default_acl(&self, path: &str) -> Result<()> {
666 let (link, resolved_path) = self.mount_table.resolve(path);
667 link.protocol.remove_default_acl(&resolved_path).await?;
668
669 Ok(())
670 }
671
672 pub async fn remove_acl(&self, path: &str) -> Result<()> {
674 let (link, resolved_path) = self.mount_table.resolve(path);
675 link.protocol.remove_acl(&resolved_path).await?;
676
677 Ok(())
678 }
679
680 pub async fn set_acl(&self, path: &str, acl_spec: Vec<AclEntry>) -> Result<()> {
684 let (link, resolved_path) = self.mount_table.resolve(path);
685 link.protocol.set_acl(&resolved_path, acl_spec).await?;
686
687 Ok(())
688 }
689
690 pub async fn get_acl_status(&self, path: &str) -> Result<AclStatus> {
692 let (link, resolved_path) = self.mount_table.resolve(path);
693 Ok(link
694 .protocol
695 .get_acl_status(&resolved_path)
696 .await?
697 .result
698 .into())
699 }
700}
701
702impl Default for Client {
703 fn default() -> Self {
706 ClientBuilder::new()
707 .build()
708 .expect("Failed to create default client")
709 }
710}
711
712pub(crate) struct DirListingIterator {
713 path: String,
714 resolved_path: String,
715 link: MountLink,
716 files_only: bool,
717 partial_listing: VecDeque<HdfsFileStatusProto>,
718 remaining: u32,
719 last_seen: Vec<u8>,
720}
721
722impl DirListingIterator {
723 fn new(path: String, mount_table: &Arc<MountTable>, files_only: bool) -> Self {
724 let (link, resolved_path) = mount_table.resolve(&path);
725
726 DirListingIterator {
727 path,
728 resolved_path,
729 link: link.clone(),
730 files_only,
731 partial_listing: VecDeque::new(),
732 remaining: 1,
733 last_seen: Vec::new(),
734 }
735 }
736
737 async fn get_next_batch(&mut self) -> Result<bool> {
738 let listing = self
739 .link
740 .protocol
741 .get_listing(&self.resolved_path, self.last_seen.clone(), false)
742 .await?;
743
744 if let Some(dir_list) = listing.dir_list {
745 self.last_seen = dir_list
746 .partial_listing
747 .last()
748 .map(|p| p.path.clone())
749 .unwrap_or(Vec::new());
750
751 self.remaining = dir_list.remaining_entries;
752
753 self.partial_listing = dir_list
754 .partial_listing
755 .into_iter()
756 .filter(|s| !self.files_only || s.file_type() != FileType::IsDir)
757 .collect();
758 Ok(!self.partial_listing.is_empty())
759 } else {
760 Err(HdfsError::FileNotFound(self.path.clone()))
761 }
762 }
763
764 pub async fn next(&mut self) -> Option<Result<FileStatus>> {
765 if self.partial_listing.is_empty() && self.remaining > 0 {
766 if let Err(error) = self.get_next_batch().await {
767 self.remaining = 0;
768 return Some(Err(error));
769 }
770 }
771 if let Some(next) = self.partial_listing.pop_front() {
772 Some(Ok(FileStatus::from(next, &self.path)))
773 } else {
774 None
775 }
776 }
777}
778
779pub struct ListStatusIterator {
780 mount_table: Arc<MountTable>,
781 recursive: bool,
782 iters: Arc<tokio::sync::Mutex<Vec<DirListingIterator>>>,
783}
784
785impl ListStatusIterator {
786 fn new(path: String, mount_table: Arc<MountTable>, recursive: bool) -> Self {
787 let initial = DirListingIterator::new(path.clone(), &mount_table, false);
788
789 ListStatusIterator {
790 mount_table,
791 recursive,
792 iters: Arc::new(tokio::sync::Mutex::new(vec![initial])),
793 }
794 }
795
796 pub async fn next(&self) -> Option<Result<FileStatus>> {
797 let mut next_file: Option<Result<FileStatus>> = None;
798 let mut iters = self.iters.lock().await;
799 while next_file.is_none() {
800 if let Some(iter) = iters.last_mut() {
801 if let Some(file_result) = iter.next().await {
802 if let Ok(file) = file_result {
803 if file.isdir && self.recursive {
806 iters.push(DirListingIterator::new(
807 file.path.clone(),
808 &self.mount_table,
809 false,
810 ))
811 }
812 next_file = Some(Ok(file));
813 } else {
814 next_file = Some(file_result)
816 }
817 } else {
818 iters.pop();
820 }
821 } else {
822 break;
824 }
825 }
826
827 next_file
828 }
829
830 pub fn into_stream(self) -> BoxStream<'static, Result<FileStatus>> {
831 let listing = stream::unfold(self, |state| async move {
832 let next = state.next().await;
833 next.map(|n| (n, state))
834 });
835 Box::pin(listing)
836 }
837}
838
839#[derive(Debug)]
840pub struct FileStatus {
841 pub path: String,
842 pub length: usize,
843 pub isdir: bool,
844 pub permission: u16,
845 pub owner: String,
846 pub group: String,
847 pub modification_time: u64,
848 pub access_time: u64,
849 pub replication: Option<u32>,
850 pub blocksize: Option<u64>,
851}
852
853impl FileStatus {
854 fn from(value: HdfsFileStatusProto, base_path: &str) -> Self {
855 let mut path = base_path.trim_end_matches("/").to_string();
856 let relative_path = std::str::from_utf8(&value.path).unwrap();
857 if !relative_path.is_empty() {
858 path.push('/');
859 path.push_str(relative_path);
860 }
861
862 FileStatus {
863 isdir: value.file_type() == FileType::IsDir,
864 path,
865 length: value.length as usize,
866 permission: value.permission.perm as u16,
867 owner: value.owner,
868 group: value.group,
869 modification_time: value.modification_time,
870 access_time: value.access_time,
871 replication: value.block_replication,
872 blocksize: value.blocksize,
873 }
874 }
875}
876
877#[derive(Debug)]
878pub struct ContentSummary {
879 pub length: u64,
880 pub file_count: u64,
881 pub directory_count: u64,
882 pub quota: u64,
883 pub space_consumed: u64,
884 pub space_quota: u64,
885}
886
887impl From<ContentSummaryProto> for ContentSummary {
888 fn from(value: ContentSummaryProto) -> Self {
889 ContentSummary {
890 length: value.length,
891 file_count: value.file_count,
892 directory_count: value.directory_count,
893 quota: value.quota,
894 space_consumed: value.space_consumed,
895 space_quota: value.space_quota,
896 }
897 }
898}
899
900#[cfg(test)]
901mod test {
902 use std::sync::{Arc, LazyLock};
903
904 use tokio::runtime::Runtime;
905 use url::Url;
906
907 use crate::{
908 client::ClientBuilder,
909 common::config::Configuration,
910 hdfs::{protocol::NamenodeProtocol, proxy::NameServiceProxy},
911 };
912
913 use super::{MountLink, MountTable};
914
915 static RT: LazyLock<Runtime> = LazyLock::new(|| Runtime::new().unwrap());
916
917 fn create_protocol(url: &str) -> Arc<NamenodeProtocol> {
918 let proxy = NameServiceProxy::new(
919 &Url::parse(url).unwrap(),
920 Arc::new(Configuration::new().unwrap()),
921 RT.handle().clone(),
922 )
923 .unwrap();
924 Arc::new(NamenodeProtocol::new(proxy, RT.handle().clone()))
925 }
926
927 #[test]
928 fn test_default_fs() {
929 assert!(ClientBuilder::new()
930 .with_config(vec![("fs.defaultFS", "hdfs://test:9000")])
931 .build()
932 .is_ok());
933
934 assert!(ClientBuilder::new()
935 .with_config(vec![("fs.defaultFS", "hdfs://")])
936 .build()
937 .is_err());
938
939 assert!(ClientBuilder::new()
940 .with_url("hdfs://")
941 .with_config(vec![("fs.defaultFS", "hdfs://test:9000")])
942 .build()
943 .is_ok());
944
945 assert!(ClientBuilder::new()
946 .with_url("hdfs://")
947 .with_config(vec![("fs.defaultFS", "hdfs://")])
948 .build()
949 .is_err());
950
951 assert!(ClientBuilder::new()
952 .with_url("hdfs://")
953 .with_config(vec![("fs.defaultFS", "viewfs://test")])
954 .build()
955 .is_err());
956 }
957
958 #[test]
959 fn test_mount_link_resolve() {
960 let protocol = create_protocol("hdfs://127.0.0.1:9000");
961 let link = MountLink::new("/view", "/hdfs", protocol);
962
963 assert_eq!(link.resolve("/view/dir/file").unwrap(), "/hdfs/dir/file");
964 assert_eq!(link.resolve("/view").unwrap(), "/hdfs");
965 assert!(link.resolve("/hdfs/path").is_none());
966 }
967
968 #[test]
969 fn test_fallback_link() {
970 let protocol = create_protocol("hdfs://127.0.0.1:9000");
971 let link = MountLink::new("", "/hdfs", Arc::clone(&protocol));
972
973 assert_eq!(link.resolve("/path/to/file").unwrap(), "/hdfs/path/to/file");
974 assert_eq!(link.resolve("/").unwrap(), "/hdfs/");
975 assert_eq!(link.resolve("/hdfs/path").unwrap(), "/hdfs/hdfs/path");
976
977 let link = MountLink::new("", "", protocol);
978 assert_eq!(link.resolve("/").unwrap(), "/");
979 }
980
981 #[test]
982 fn test_mount_table_resolve() {
983 let link1 = MountLink::new(
984 "/mount1",
985 "/path1/nested",
986 create_protocol("hdfs://127.0.0.1:9000"),
987 );
988 let link2 = MountLink::new(
989 "/mount2",
990 "/path2",
991 create_protocol("hdfs://127.0.0.1:9001"),
992 );
993 let link3 = MountLink::new(
994 "/mount3/nested",
995 "/path3",
996 create_protocol("hdfs://127.0.0.1:9002"),
997 );
998 let fallback = MountLink::new("/", "/path4", create_protocol("hdfs://127.0.0.1:9003"));
999
1000 let mount_table = MountTable {
1001 mounts: vec![link1, link2, link3],
1002 fallback,
1003 };
1004
1005 let (link, resolved) = mount_table.resolve("/mount1");
1007 assert_eq!(link.viewfs_path, "/mount1");
1008 assert_eq!(resolved, "/path1/nested");
1009
1010 let (link, resolved) = mount_table.resolve("/mount1/");
1012 assert_eq!(link.viewfs_path, "/mount1");
1013 assert_eq!(resolved, "/path1/nested/");
1014
1015 let (link, resolved) = mount_table.resolve("/mount12");
1017 assert_eq!(link.viewfs_path, "");
1018 assert_eq!(resolved, "/path4/mount12");
1019
1020 let (link, resolved) = mount_table.resolve("/mount3/file");
1021 assert_eq!(link.viewfs_path, "");
1022 assert_eq!(resolved, "/path4/mount3/file");
1023
1024 let (link, resolved) = mount_table.resolve("/mount3/nested/file");
1025 assert_eq!(link.viewfs_path, "/mount3/nested");
1026 assert_eq!(resolved, "/path3/file");
1027 }
1028}