#[cfg(feature = "integration-test")]
mod common;
#[cfg(feature = "integration-test")]
mod test {
use crate::common::{TEST_FILE_INTS, assert_bufs_equal};
use bytes::{BufMut, BytesMut};
use hdfs_native::{
Client, Result, WriteOptions,
acl::AclEntry,
client::{ClientBuilder, FileStatus},
minidfs::{DfsFeatures, MiniDfs},
sync::ClientBuilder as SyncClientBuilder,
test::PROXY_CALLS,
};
use serial_test::serial;
use std::collections::HashSet;
use std::io::{Read, Seek, SeekFrom, Write};
use tokio::io::{AsyncReadExt, AsyncSeekExt};
use whoami::username;
#[tokio::test]
#[serial]
async fn test_basic_non_ha() {
test_with_features(&HashSet::new()).await.unwrap();
}
#[tokio::test]
#[serial]
async fn test_security_kerberos() {
test_with_features(&HashSet::from([DfsFeatures::Security]))
.await
.unwrap();
}
#[tokio::test]
#[serial]
async fn test_security_token() {
test_with_features(&HashSet::from([DfsFeatures::Security, DfsFeatures::Token]))
.await
.unwrap();
}
#[tokio::test]
#[serial]
async fn test_integrity_kerberos() {
test_with_features(&HashSet::from([
DfsFeatures::Security,
DfsFeatures::Integrity,
]))
.await
.unwrap();
}
#[tokio::test]
#[serial]
async fn test_integrity_token() {
test_with_features(&HashSet::from([
DfsFeatures::Security,
DfsFeatures::Token,
DfsFeatures::Integrity,
]))
.await
.unwrap();
}
#[tokio::test]
#[serial]
async fn test_privacy_kerberos() {
test_with_features(&HashSet::from([
DfsFeatures::Security,
DfsFeatures::Privacy,
]))
.await
.unwrap();
}
#[tokio::test]
#[serial]
async fn test_privacy_token() {
test_with_features(&HashSet::from([
DfsFeatures::Security,
DfsFeatures::Token,
DfsFeatures::Privacy,
]))
.await
.unwrap();
}
#[tokio::test]
#[serial]
async fn test_aes() {
test_with_features(&HashSet::from([
DfsFeatures::Security,
DfsFeatures::Privacy,
DfsFeatures::AES,
]))
.await
.unwrap();
}
#[tokio::test]
#[serial]
async fn test_forced_data_transfer_encryption() {
test_with_features(&HashSet::from([
DfsFeatures::Security,
DfsFeatures::DataTransferEncryption,
]))
.await
.unwrap();
}
#[tokio::test]
#[serial]
async fn test_data_transfer_encryption() {
test_with_features(&HashSet::from([
DfsFeatures::Security,
DfsFeatures::Privacy,
DfsFeatures::DataTransferEncryption,
]))
.await
.unwrap();
}
#[tokio::test]
#[serial]
async fn test_data_transfer_encryption_aes() {
test_with_features(&HashSet::from([
DfsFeatures::Security,
DfsFeatures::Privacy,
DfsFeatures::DataTransferEncryption,
DfsFeatures::AES,
]))
.await
.unwrap();
}
#[tokio::test]
#[serial]
async fn test_basic_ha() {
test_with_features(&HashSet::from([DfsFeatures::HA]))
.await
.unwrap();
}
#[tokio::test]
#[serial]
async fn test_security_privacy_ha() {
test_with_features(&HashSet::from([
DfsFeatures::Security,
DfsFeatures::Privacy,
DfsFeatures::HA,
]))
.await
.unwrap();
}
#[tokio::test]
#[serial]
async fn test_security_token_ha() {
test_with_features(&HashSet::from([
DfsFeatures::Security,
DfsFeatures::Token,
DfsFeatures::HA,
]))
.await
.unwrap();
}
#[tokio::test]
#[serial]
async fn test_rbf() {
test_with_features(&HashSet::from([DfsFeatures::RBF]))
.await
.unwrap();
}
#[test]
#[serial]
fn test_no_tokio() {
futures::executor::block_on(test_with_features(&HashSet::new())).unwrap();
}
#[test]
#[serial]
fn test_basic_sync_client() -> Result<()> {
let _ = env_logger::builder().is_test(true).try_init();
let _dfs = MiniDfs::with_features(&HashSet::new());
let client = SyncClientBuilder::new().build()?;
let write_options = WriteOptions::default().overwrite(true);
let mut writer = client.create("/syncfile", &write_options)?;
writer.write_all(b"hello ").unwrap();
writer.close()?;
assert_eq!(client.get_file_info("/syncfile")?.length, 6);
let statuses: Vec<FileStatus> = client
.list_status_iter("/", false)
.collect::<Result<Vec<_>>>()?
.into_iter()
.filter(|status| !status.isdir)
.collect();
assert_eq!(statuses.len(), 1);
assert_eq!(statuses[0].path, "/syncfile");
let mut writer = client.append("/syncfile")?;
writer.write_all(b"sync").unwrap();
writer.close()?;
let mut reader = client.read("/syncfile")?;
assert_eq!(reader.file_length(), 10);
let mut prefix = [0; 6];
reader.read_exact(&mut prefix).unwrap();
assert_eq!(&prefix, b"hello ");
assert_eq!(reader.tell(), 6);
assert_eq!(Seek::seek(&mut reader, SeekFrom::Start(0)).unwrap(), 0);
let data = reader.read_range(0, reader.file_length())?;
assert_eq!(data.as_ref(), b"hello sync");
let streamed = reader
.read_range_stream(6, 4)
.collect::<Result<Vec<_>>>()?
.concat();
assert_eq!(streamed, b"sync");
assert!(client.delete("/syncfile", false)?);
assert!(client.get_file_info("/syncfile").is_err());
Ok(())
}
pub async fn test_with_features(features: &HashSet<DfsFeatures>) -> Result<()> {
let _ = env_logger::builder().is_test(true).try_init();
let kerberos_enabled =
features.contains(&DfsFeatures::Security) && !features.contains(&DfsFeatures::Token);
let _dfs = MiniDfs::with_features(features);
let client = ClientBuilder::new().build().unwrap();
let mut file = client.create("/testfile", WriteOptions::default()).await?;
for i in 0..TEST_FILE_INTS as i32 {
file.write_bytes(i.to_be_bytes().to_vec().into()).await?;
}
file.close().await?;
test_file_info(&client).await?;
test_listing(&client).await?;
test_rename(&client).await?;
test_dirs(&client).await?;
test_read_write(&client).await?;
test_relative_paths(&client, kerberos_enabled).await?;
test_recursive_listing(&client).await?;
test_glob(&client).await?;
test_set_times(&client).await?;
test_set_owner(&client).await?;
test_set_permission(&client).await?;
test_set_replication(&client).await?;
test_get_content_summary(&client).await?;
test_acls(&client).await?;
if features.contains(&DfsFeatures::HA) {
test_observer_read(&client).await?;
}
Ok(())
}
async fn test_file_info(client: &Client) -> Result<()> {
let status = client.get_file_info("/testfile").await?;
assert_eq!(status.path, "/testfile");
assert_eq!(status.length, TEST_FILE_INTS * 4);
let status = client.get_file_info("/").await?;
assert_eq!(status.path, "/");
assert!(status.isdir);
Ok(())
}
async fn test_listing(client: &Client) -> Result<()> {
let statuses: Vec<FileStatus> = client
.list_status("/", false)
.await?
.into_iter()
.filter(|s| !s.isdir)
.collect();
assert_eq!(statuses.len(), 1);
let status = &statuses[0];
assert_eq!(status.path, "/testfile");
assert_eq!(status.length, TEST_FILE_INTS * 4);
Ok(())
}
async fn test_rename(client: &Client) -> Result<()> {
client.rename("/testfile", "/testfile2", false).await?;
assert!(client.list_status("/testfile", false).await.is_err());
assert_eq!(client.list_status("/testfile2", false).await?.len(), 1);
client.rename("/testfile2", "/testfile", false).await?;
assert!(client.list_status("/testfile2", false).await.is_err());
assert_eq!(client.list_status("/testfile", false).await?.len(), 1);
Ok(())
}
async fn test_dirs(client: &Client) -> Result<()> {
client.mkdirs("/testdir", 0o755, false).await?;
assert!(
client
.list_status("/testdir", false)
.await
.is_ok_and(|s| s.is_empty())
);
client.delete("/testdir", false).await?;
assert!(client.list_status("/testdir", false).await.is_err());
client.mkdirs("/testdir1/testdir2", 0o755, true).await?;
assert!(
client
.list_status("/testdir1", false)
.await
.is_ok_and(|s| s.len() == 1)
);
assert!(client.delete("/testdir1", false).await.is_err());
assert!(client.delete("/testdir1", true).await.is_ok_and(|r| r));
Ok(())
}
async fn test_read_write(client: &Client) -> Result<()> {
let write_options = WriteOptions::default().overwrite(true);
let mut writer = client.create("/newfile", &write_options).await?;
writer.close().await?;
assert_eq!(client.get_file_info("/newfile").await?.length, 0);
let mut writer = client.create("/newfile", &write_options).await?;
let mut file_contents = BytesMut::new();
let mut data = BytesMut::new();
for i in 0..1024 {
file_contents.put_i32(i);
data.put_i32(i);
}
let buf = data.freeze();
writer.write_bytes(buf).await?;
writer.close().await?;
assert_eq!(client.get_file_info("/newfile").await?.length, 4096);
let mut reader = client.read("/newfile").await?;
let read_data = reader.read_bytes(reader.file_length()).await?;
assert_bufs_equal(&file_contents, &read_data, None);
let mut data = BytesMut::new();
for i in 0..1024 {
file_contents.put_i32(i);
data.put_i32(i);
}
let buf = data.freeze();
let mut writer = client.append("/newfile").await?;
writer.write_bytes(buf).await?;
writer.close().await?;
let mut reader = client.read("/newfile").await?;
let read_data = reader.read_bytes(reader.file_length()).await?;
assert_bufs_equal(&file_contents, &read_data, None);
let mut reader = client.read("/newfile").await?;
let mut async_read_data = Vec::new();
reader.read_to_end(&mut async_read_data).await?;
assert_bufs_equal(
&file_contents,
&BytesMut::from(&async_read_data[..]),
Some("async read".to_string()),
);
let mut reader = client.read("/newfile").await?;
assert_eq!(reader.seek(SeekFrom::Start(4)).await?, 4);
let mut async_seek_data = [0; 4];
reader.read_exact(&mut async_seek_data).await?;
assert_eq!(async_seek_data, 1i32.to_be_bytes());
assert!(client.delete("/newfile", false).await.is_ok_and(|r| r));
client.mkdirs("/testdir", 0o755, true).await?;
assert!(client.read("/testdir").await.is_err());
client.delete("/testdir", true).await?;
Ok(())
}
async fn test_relative_paths(client: &Client, kerberos_enabled: bool) -> Result<()> {
let user = if kerberos_enabled {
"hdfs".to_string()
} else {
username().unwrap()
};
let write_options = WriteOptions::default().overwrite(true);
let rel_path = "relative_dir/relative_file";
let mut writer = client.create(rel_path, &write_options).await?;
writer.close().await?;
let statuses = client.list_status("/", true).await?;
let status = statuses
.iter()
.find(|s| s.path.ends_with(&format!("/{rel_path}")))
.expect("relative path file should exist");
let expected_path = format!("/user/{}/{}", user, rel_path);
assert_eq!(status.path, expected_path);
let status = client.get_file_info(&expected_path).await?;
assert_eq!(status.path, expected_path);
client.delete("/user", true).await?;
Ok(())
}
async fn test_recursive_listing(client: &Client) -> Result<()> {
let write_options = WriteOptions::default();
client.mkdirs("/dir/nested", 0o755, true).await?;
client
.create("/dir/file1", &write_options)
.await?
.close()
.await?;
client
.create("/dir/nested/file2", &write_options)
.await?
.close()
.await?;
client
.create("/dir/nested/file3", &write_options)
.await?
.close()
.await?;
let statuses = client.list_status("/dir", true).await?;
assert_eq!(statuses.len(), 4);
client.delete("/dir", true).await?;
Ok(())
}
async fn test_glob(client: &Client) -> Result<()> {
let write_options = WriteOptions::default();
client.mkdirs("/dir/nested", 0o755, true).await?;
client
.create("/dir/file1", &write_options)
.await?
.close()
.await?;
client
.create("/dir/nested/file2", &write_options)
.await?
.close()
.await?;
client
.create("/dir/nested/file3", &write_options)
.await?
.close()
.await?;
let statuses = client.glob_status("/").await?;
assert_eq!(statuses.len(), 1);
let statuses = client.glob_status("/dir/*").await?;
assert_eq!(statuses.len(), 2);
let statuses = client.glob_status("/dir/nested/file*").await?;
assert_eq!(statuses.len(), 2);
let statuses = client.glob_status("/dir/file").await?;
assert_eq!(statuses.len(), 0);
let statuses = client.glob_status("/dir/file?").await?;
assert_eq!(statuses.len(), 1);
let statuses = client.glob_status("/dir/{nested/file*,file*}").await?;
assert_eq!(statuses.len(), 3);
let statuses = client.glob_status("/*/nonexistent/*").await?;
assert_eq!(statuses.len(), 0);
let statuses = client.glob_status("/dir/nested/*/nested").await?;
assert_eq!(statuses.len(), 0);
let statuses = client.glob_status("/dir/*/file2/nested").await?;
assert_eq!(statuses.len(), 0);
client.delete("/dir", true).await?;
Ok(())
}
async fn test_set_times(client: &Client) -> Result<()> {
client
.create("/test", WriteOptions::default())
.await?
.close()
.await?;
let mtime = 1717641455;
let atime = 1717641456;
client.set_times("/test", mtime, atime).await?;
let file_info = client.get_file_info("/test").await?;
assert_eq!(file_info.modification_time, mtime);
assert_eq!(file_info.access_time, atime);
client.delete("/test", false).await?;
Ok(())
}
async fn test_set_owner(client: &Client) -> Result<()> {
client
.create("/test", WriteOptions::default())
.await?
.close()
.await?;
client
.set_owner("/test", Some("testuser"), Some("testgroup"))
.await?;
let file_info = client.get_file_info("/test").await?;
assert_eq!(file_info.owner, "testuser");
assert_eq!(file_info.group, "testgroup");
client.set_owner("/test", Some("testuser2"), None).await?;
let file_info = client.get_file_info("/test").await?;
assert_eq!(file_info.owner, "testuser2");
assert_eq!(file_info.group, "testgroup");
client.set_owner("/test", None, Some("testgroup2")).await?;
let file_info = client.get_file_info("/test").await?;
assert_eq!(file_info.owner, "testuser2");
assert_eq!(file_info.group, "testgroup2");
client.delete("/test", false).await?;
Ok(())
}
async fn test_set_permission(client: &Client) -> Result<()> {
client
.create("/test", WriteOptions::default())
.await?
.close()
.await?;
let file_info = client.get_file_info("/test").await?;
assert_eq!(file_info.permission, 0o644);
client.set_permission("/test", 0o600).await?;
let file_info = client.get_file_info("/test").await?;
assert_eq!(file_info.permission, 0o600);
client.delete("/test", false).await?;
Ok(())
}
async fn test_set_replication(client: &Client) -> Result<()> {
client
.create("/test", WriteOptions::default())
.await?
.close()
.await?;
client.set_replication("/test", 1).await?;
let file_info = client.get_file_info("/test").await?;
assert_eq!(file_info.replication, Some(1));
client.set_replication("/test", 2).await?;
let file_info = client.get_file_info("/test").await?;
assert_eq!(file_info.replication, Some(2));
client.delete("/test", false).await?;
Ok(())
}
async fn test_get_content_summary(client: &Client) -> Result<()> {
let mut file1 = client.create("/test", WriteOptions::default()).await?;
file1.write_bytes(vec![0, 1, 2, 3].into()).await?;
file1.close().await?;
let mut file2 = client.create("/test2", WriteOptions::default()).await?;
file2.write_bytes(vec![0, 1, 2, 3, 4, 5].into()).await?;
file2.close().await?;
client.mkdirs("/testdir", 0o755, true).await?;
let content_summary = client.get_content_summary("/").await?;
assert_eq!(content_summary.file_count, 3,);
assert_eq!(content_summary.directory_count, 2);
assert_eq!(content_summary.length, TEST_FILE_INTS as u64 * 4 + 4 + 6);
client.delete("/test", false).await?;
client.delete("/test2", false).await?;
Ok(())
}
async fn test_acls(client: &Client) -> Result<()> {
client
.create("/test", WriteOptions::default())
.await?
.close()
.await?;
let acl_status = client.get_acl_status("/test").await?;
assert!(acl_status.entries.is_empty());
assert!(!acl_status.sticky);
let user_entry = AclEntry::new("user", "access", "r--", Some("testuser".to_string()));
let group_entry = AclEntry::new("group", "access", "-w-", Some("testgroup".to_string()));
client
.modify_acl_entries("/test", vec![user_entry.clone()])
.await?;
let acl_status = client.get_acl_status("/test").await?;
assert_eq!(acl_status.entries.len(), 2, "{:?}", acl_status.entries);
assert!(acl_status.entries.contains(&user_entry));
client
.modify_acl_entries("/test", vec![group_entry.clone()])
.await?;
let acl_status = client.get_acl_status("/test").await?;
assert_eq!(acl_status.entries.len(), 3, "{:?}", acl_status.entries);
assert!(acl_status.entries.contains(&user_entry));
assert!(acl_status.entries.contains(&group_entry));
client
.remove_acl_entries("/test", vec![group_entry.clone()])
.await?;
let acl_status = client.get_acl_status("/test").await?;
assert_eq!(acl_status.entries.len(), 2, "{:?}", acl_status.entries);
assert!(acl_status.entries.contains(&user_entry));
assert!(!acl_status.entries.contains(&group_entry));
client.remove_acl("/test").await?;
let acl_status = client.get_acl_status("/test").await?;
assert_eq!(acl_status.entries.len(), 0);
client.delete("/test", false).await?;
client.mkdirs("/testdir", 0o755, true).await?;
client
.modify_acl_entries(
"/testdir",
vec![AclEntry {
r#type: hdfs_native::acl::AclEntryType::User,
scope: hdfs_native::acl::AclEntryScope::Default,
permissions: hdfs_native::acl::FsAction::Read,
name: Some("testuser".to_string()),
}],
)
.await?;
let acl_status = client.get_acl_status("/testdir").await?;
assert_eq!(acl_status.entries.len(), 5, "{:?}", acl_status.entries);
client
.create("/testdir/test", WriteOptions::default())
.await?
.close()
.await?;
let acl_status = client.get_acl_status("/testdir/test").await?;
assert_eq!(acl_status.entries.len(), 2, "{:?}", acl_status.entries);
client.remove_default_acl("/testdir").await?;
client.delete("/testdir", true).await?;
Ok(())
}
async fn test_observer_read(client: &Client) -> Result<()> {
*PROXY_CALLS.lock().unwrap() = Some(Vec::new());
client.mkdirs("/test", 0o755, true).await?;
client.get_file_info("/test").await?;
let calls = PROXY_CALLS.lock().unwrap().take().unwrap();
assert!(
calls
.into_iter()
.any(|(name, observer)| name == "getFileInfo" && observer)
);
client.delete("/test", true).await?;
Ok(())
}
}