#[cfg(feature = "integration-test")]
mod test {
use bytes::{Buf, BufMut, BytesMut};
use hopsfs_native_object_store::{HdfsObjectStore, HopsClient, WriteOptions};
use object_store::{PutMode, PutOptions, PutPayload};
use serial_test::serial;
use std::sync::Arc;
pub const TEST_FILE_INTS: u64 = 64 * 1024 * 1024;
#[tokio::test]
#[serial]
async fn test_object_store() -> object_store::Result<()> {
let client = HopsClient::with_url("hdfs://127.0.0.1:8020").unwrap();
let write_opts = WriteOptions {
block_size: None,
replication: None,
overwrite: true,
create_parent: true,
buffer_size: 0,
};
let file = client.create("/testfile", write_opts).await.unwrap();
let mut buf = BytesMut::new();
for i in 0..TEST_FILE_INTS as i32 {
buf.put_i32(i);
}
file.hdfs_write(buf.freeze()).await.unwrap();
file.close_file().await.unwrap();
client.mkdir("/testdir").await.unwrap();
let store = HdfsObjectStore::new(Arc::new(client));
println!("testing functions");
test_object_store_head(&store).await?;
println!("test_object_store_head passed");
test_object_store_list(&store).await?;
println!("test_object_store_list passed");
test_object_store_rename(&store).await?;
println!("test_object_store_rename passed");
test_object_store_read(&store).await?;
println!("test_object_store_read passed");
test_object_store_write(&store).await?;
println!("test_object_store_write passed");
test_object_store_write_multipart(&store).await?;
println!("test_object_store_write_multipart passed");
test_object_store_copy(&store).await?;
println!("test_object_store_copy passed");
Ok(())
}
async fn test_object_store_head(store: &HdfsObjectStore) -> object_store::Result<()> {
use object_store::{path::Path, ObjectStore};
let head = store.head(&Path::from("/testfile")).await?;
assert_eq!(head.location, Path::from("/testfile"));
assert_eq!(head.size, TEST_FILE_INTS * 4);
assert!(store.head(&Path::from("/testfile2")).await.is_err());
assert!(store.head(&Path::from("/testdir")).await.is_err());
Ok(())
}
async fn test_object_store_list(store: &HdfsObjectStore) -> object_store::Result<()> {
use futures::StreamExt;
use object_store::{path::Path, ObjectMeta, ObjectStore};
let list: Vec<object_store::Result<ObjectMeta>> =
store.list(Some(&Path::from("/"))).collect().await;
assert_eq!(list.len(), 1);
assert_eq!(list[0].as_ref().unwrap().location, Path::from("/testfile"));
assert_eq!(
store.list(Some(&Path::from("/doesnt/exist"))).count().await,
0
);
let list = store
.list_with_delimiter(Some(&Path::from("/doesnt/exist")))
.await?;
assert_eq!(list.common_prefixes.len(), 0);
assert_eq!(list.objects.len(), 0);
Ok(())
}
async fn test_object_store_rename(store: &HdfsObjectStore) -> object_store::Result<()> {
use object_store::{path::Path, ObjectStore};
store
.rename(&Path::from("/testfile"), &Path::from("/testfile2"))
.await?;
assert!(store.head(&Path::from("/testfile2")).await.is_ok());
assert!(store.head(&Path::from("/testfile")).await.is_err());
store
.rename(&Path::from("/testfile2"), &Path::from("/testfile"))
.await?;
assert!(store.head(&Path::from("/testfile")).await.is_ok());
assert!(store.head(&Path::from("/testfile2")).await.is_err());
Ok(())
}
async fn test_object_store_read(store: &HdfsObjectStore) -> object_store::Result<()> {
use object_store::{path::Path, ObjectStore};
let location = Path::from("/testfile");
let mut buf = store.get(&location).await?.bytes().await?;
for i in 0..TEST_FILE_INTS as i32 {
assert_eq!(buf.get_i32(), i);
}
let offset = TEST_FILE_INTS / 2 * 4;
let mut buf = store.get_range(&location, offset..(offset + 4)).await?;
assert_eq!(buf.get_i32(), TEST_FILE_INTS as i32 / 2);
Ok(())
}
async fn test_object_store_write(store: &HdfsObjectStore) -> object_store::Result<()> {
use object_store::{path::Path, ObjectStore};
store
.put(&Path::from("/newfile"), PutPayload::new())
.await?;
store.head(&Path::from("/newfile")).await?;
match store
.put_opts(
&Path::from("/newfile"),
PutPayload::new(),
PutOptions {
mode: PutMode::Create,
..Default::default()
},
)
.await
{
Err(object_store::Error::AlreadyExists { .. }) => (),
Err(e) => panic!("Wrong error was thrown for put without overewrite: {:?}", e),
Ok(_) => panic!("No error was thrown for put without overwrite for existing file"),
}
for size_to_check in [16i32, 128 * 1024 * 1024, 130 * 1024 * 1024] {
let ints_to_write = size_to_check / 4;
let mut data = BytesMut::with_capacity(size_to_check as usize);
for i in 0..ints_to_write {
data.put_i32(i);
}
let buf = data.freeze();
store
.put(&Path::from("/newfile"), PutPayload::from_bytes(buf.clone()))
.await?;
assert_eq!(
store.head(&Path::from("/newfile")).await?.size,
size_to_check.try_into().unwrap()
);
let read_data = store.get(&Path::from("/newfile")).await?.bytes().await?;
assert_eq!(buf.len(), read_data.len());
for pos in 0..buf.len() {
assert_eq!(
buf[pos], read_data[pos],
"data is different as position {} for size {}",
pos, size_to_check
);
}
}
store.delete(&Path::from("/newfile")).await?;
Ok(())
}
async fn test_object_store_write_multipart(
store: &HdfsObjectStore,
) -> object_store::Result<()> {
use object_store::{path::Path, ObjectStore};
let mut uploader = store.put_multipart(&"/newfile".into()).await?;
uploader.complete().await?;
store.head(&Path::from("/newfile")).await?;
for size_to_check in [16i32, 128 * 1024 * 1024, 130 * 1024 * 1024] {
let ints_to_write = size_to_check / 4;
let mut uploader = store.put_multipart(&"/newfile".into()).await?;
let mut data = BytesMut::with_capacity(size_to_check as usize);
for i in 0..ints_to_write {
data.put_i32(i);
}
let mut buf = data.freeze();
while !buf.is_empty() {
let to_write = usize::min(buf.len(), 10 * 1024 * 1024);
uploader.put_part(buf.split_to(to_write).into()).await?;
}
uploader.complete().await?;
assert_eq!(
store.head(&Path::from("/newfile")).await?.size,
size_to_check.try_into().unwrap()
);
let mut read_data = store.get(&Path::from("/newfile")).await?.bytes().await?;
assert_eq!(size_to_check as usize, read_data.len());
for pos in 0..ints_to_write {
assert_eq!(
pos,
read_data.get_i32(),
"data is different at integer {} for size {}",
pos,
size_to_check
);
}
}
store.delete(&Path::from("/newfile")).await?;
let mut uploader = store.put_multipart(&"/newfile".into()).await?;
assert!(store.head(&"/.newfile.tmp.1".into()).await.is_ok());
uploader.abort().await?;
assert!(store.head(&"/.newfile.tmp.1".into()).await.is_err());
assert!(store.head(&"/newfile".into()).await.is_err());
let mut uploader1 = store.put_multipart(&"/newfile".into()).await?;
let mut uploader2 = store.put_multipart(&"/newfile".into()).await?;
uploader1.put_part(vec![1].into()).await?;
uploader2.put_part(vec![2].into()).await?;
uploader1.complete().await?;
uploader2.complete().await?;
let result = store.get(&"/newfile".into()).await?;
assert!(result.bytes().await?.to_vec() == vec![2]);
Ok(())
}
async fn test_object_store_copy(store: &HdfsObjectStore) -> object_store::Result<()> {
use object_store::{path::Path, ObjectStore};
store
.put(&Path::from("/newfile"), PutPayload::new())
.await?;
let size_to_check = 128 * 1024 * 1024;
let ints_to_write = size_to_check / 4;
let mut data = BytesMut::with_capacity(size_to_check as usize);
for i in 0..ints_to_write {
data.put_i32(i);
}
let buf = data.freeze();
store
.put(&Path::from("/newfile"), buf.clone().into())
.await?;
store
.copy(&Path::from("/newfile"), &Path::from("/newfile2"))
.await?;
let read_data = store.get(&Path::from("/newfile2")).await?.bytes().await?;
assert_eq!(buf.len(), read_data.len());
for pos in 0..buf.len() {
assert_eq!(
buf[pos], read_data[pos],
"data is different as position {} for size {}",
pos, size_to_check
);
}
assert!(store
.copy_if_not_exists(&Path::from("/newfile"), &Path::from("/newfile2"))
.await
.is_err());
store.delete(&Path::from("/newfile")).await?;
store.delete(&Path::from("/newfile2")).await?;
Ok(())
}
}