#![cfg(all(feature = "integration-test", not(target_os = "windows")))]
use buoyant_kernel as delta_kernel;
use delta_kernel::Snapshot;
use hdfs_native::{Client, WriteOptions};
use hdfs_native_object_store::minidfs::MiniDfs;
use std::collections::HashSet;
use std::fs;
use std::path::Path;
extern crate walkdir;
use walkdir::WalkDir;
async fn write_local_path_to_hdfs(
local_path: &Path,
remote_path: &Path,
client: &Client,
) -> Result<(), Box<dyn std::error::Error>> {
for entry in WalkDir::new(local_path) {
let entry = entry?;
let path = entry.path();
let relative_path = path.strip_prefix(local_path)?;
let destination = remote_path.join(relative_path);
if path.is_file() {
let bytes = fs::read(path)?;
let mut writer = client
.create(
destination.as_path().to_str().unwrap(),
WriteOptions::default(),
)
.await?;
writer.write(bytes.into()).await?;
writer.close().await?;
} else {
client
.mkdirs(destination.as_path().to_str().unwrap(), 0o755, true)
.await?;
}
}
Ok(())
}
#[tokio::test]
#[ignore = "Skipping HDFS integration test"]
async fn read_table_version_hdfs() -> Result<(), Box<dyn std::error::Error>> {
let minidfs = MiniDfs::with_features(&HashSet::new());
let hdfs_client = Client::default();
write_local_path_to_hdfs(
"./tests/data/app-txn-checkpoint".as_ref(),
"/my-delta-table".as_ref(),
&hdfs_client,
)
.await?;
let url_str = format!("{}/my-delta-table", minidfs.url);
let url = url::Url::parse(&url_str).unwrap();
let engine = test_utils::create_default_engine(&url)?;
let snapshot = Snapshot::builder_for(url).build(engine.as_ref())?;
assert_eq!(snapshot.version(), 1);
Ok(())
}