#[cfg(feature = "integration-test")]
mod common;
#[cfg(feature = "integration-test")]
mod test {
use crate::common::assert_bufs_equal;
use bytes::{BufMut, BytesMut};
use hdfs_native::{
Client, Result, WriteOptions,
minidfs::{DfsFeatures, MiniDfs},
};
use serial_test::serial;
use std::collections::HashSet;
#[tokio::test]
#[serial]
async fn test_write_simple() {
test_write(&HashSet::from([DfsFeatures::HA])).await.unwrap();
}
#[ignore = "Takes too long to run"]
#[tokio::test]
#[serial]
async fn test_write_sasl_encryption() {
test_write(&HashSet::from([
DfsFeatures::HA,
DfsFeatures::Security,
DfsFeatures::Privacy,
]))
.await
.unwrap();
}
#[ignore = "Takes too long to run"]
#[tokio::test]
#[serial]
async fn test_write_cipher_encryption() {
test_write(&HashSet::from([
DfsFeatures::HA,
DfsFeatures::Security,
DfsFeatures::Privacy,
DfsFeatures::AES,
]))
.await
.unwrap();
}
async fn test_write(features: &HashSet<DfsFeatures>) -> Result<()> {
let _ = env_logger::builder().is_test(true).try_init();
let _dfs = MiniDfs::with_features(features);
let client = Client::default();
test_create(&client).await?;
test_append(&client).await?;
Ok(())
}
async fn test_create(client: &Client) -> Result<()> {
let write_options = WriteOptions::default().overwrite(true);
let mut writer = client.create("/newfile", &write_options).await?;
writer.close().await?;
assert_eq!(client.get_file_info("/newfile").await?.length, 0);
for size_to_check in [16i32, 128 * 1024 * 1024, 130 * 1024 * 1024] {
let ints_to_write = size_to_check / 4;
let mut writer = client.create("/newfile", &write_options).await?;
let mut data = BytesMut::with_capacity(size_to_check as usize);
for i in 0..ints_to_write {
data.put_i32(i);
}
let buf = data.freeze();
writer.write(buf.clone()).await?;
writer.close().await?;
assert_eq!(
client.get_file_info("/newfile").await?.length,
size_to_check as usize
);
let mut reader = client.read("/newfile").await?;
let read_data = reader.read(reader.file_length()).await?;
assert_bufs_equal(&buf, &read_data, Some(format!("for size {size_to_check}")));
}
assert!(client.delete("/newfile", false).await.is_ok_and(|r| r));
Ok(())
}
async fn test_append(client: &Client) -> Result<()> {
client
.create("/newfile", WriteOptions::default())
.await?
.close()
.await?;
assert_eq!(client.get_file_info("/newfile").await?.length, 0);
let mut file_contents = BytesMut::new();
for range in [
0u32..4,
4..8,
8..2048,
2048..(128 * 1024 * 1024),
0..511,
512..1024,
] {
let mut data = BytesMut::new();
for i in range {
file_contents.put_u8((i % 256) as u8);
data.put_u8((i % 256) as u8);
}
let buf = data.freeze();
let mut writer = client.append("/newfile").await?;
writer.write(buf).await?;
writer.close().await?;
let mut reader = client.read("/newfile").await?;
let read_data = reader.read(reader.file_length()).await?;
assert_bufs_equal(&file_contents, &read_data, None);
}
Ok(())
}
}