use std::{
collections::HashMap,
fmt::{Display, Formatter},
future,
path::PathBuf,
};
use async_trait::async_trait;
use chrono::{DateTime, Utc};
use futures::{
FutureExt,
stream::{BoxStream, StreamExt},
};
use hdfs_native::{
Client, ClientBuilder, HdfsError, WriteOptions, client::FileStatus, file::FileWriter,
};
use object_store::{CopyMode, CopyOptions, RenameOptions, RenameTargetMode};
#[allow(deprecated)]
use object_store::{
GetOptions, GetResult, GetResultPayload, ListResult, MultipartUpload, ObjectMeta, ObjectStore,
PutMode, PutMultipartOpts, PutOptions, PutPayload, PutResult, Result, UploadPart, path::Path,
};
use tokio::{
runtime::Handle,
sync::{mpsc, oneshot},
task::{self, JoinHandle},
};
#[cfg(feature = "integration-test")]
pub use hdfs_native::minidfs;
fn generic_error(
source: Box<dyn std::error::Error + Send + Sync + 'static>,
) -> object_store::Error {
object_store::Error::Generic {
store: "HFDS",
source,
}
}
#[derive(Default)]
pub struct HdfsObjectStoreBuilder {
inner: ClientBuilder,
}
impl HdfsObjectStoreBuilder {
pub fn new() -> Self {
Self::default()
}
pub fn with_url(mut self, url: impl Into<String>) -> Self {
self.inner = self.inner.with_url(url);
self
}
pub fn with_config(
mut self,
config: impl IntoIterator<Item = (impl Into<String>, impl Into<String>)>,
) -> Self {
self.inner = self.inner.with_config(config);
self
}
pub fn with_io_runtime(mut self, runtime: Handle) -> Self {
self.inner = self.inner.with_io_runtime(runtime);
self
}
pub fn build(self) -> Result<HdfsObjectStore> {
let client = self.inner.build().to_object_store_err()?;
Ok(HdfsObjectStore { client })
}
}
#[derive(Debug, Clone)]
pub struct HdfsObjectStore {
client: Client,
}
impl HdfsObjectStore {
pub fn new(client: Client) -> Self {
Self { client }
}
#[deprecated(since = "0.15.0", note = "Use HdfsObjectStoreBuilder instead")]
pub fn with_url(url: &str) -> Result<Self> {
let client = ClientBuilder::new()
.with_url(url)
.build()
.to_object_store_err()?;
Ok(Self { client })
}
#[deprecated(since = "0.15.0", note = "Use HdfsObjectStoreBuilder instead")]
pub fn with_config(url: &str, config: HashMap<String, String>) -> Result<Self> {
let client = ClientBuilder::new()
.with_url(url)
.with_config(config)
.build()
.to_object_store_err()?;
Ok(Self { client })
}
async fn open_tmp_file(&self, file_path: &str) -> Result<(FileWriter, String)> {
let path_buf = PathBuf::from(file_path);
let file_name = path_buf
.file_name()
.ok_or(HdfsError::InvalidPath("path missing filename".to_string()))
.to_object_store_err()?
.to_str()
.ok_or(HdfsError::InvalidPath("path not valid unicode".to_string()))
.to_object_store_err()?
.to_string();
let tmp_file_path = path_buf
.with_file_name(format!(".{file_name}.tmp"))
.to_str()
.ok_or(HdfsError::InvalidPath("path not valid unicode".to_string()))
.to_object_store_err()?
.to_string();
let mut index = 1;
loop {
let path = format!("{tmp_file_path}.{index}");
match self.client.create(&path, WriteOptions::default()).await {
Ok(writer) => break Ok((writer, path)),
Err(HdfsError::AlreadyExists(_)) => index += 1,
Err(e) => break Err(e).to_object_store_err(),
}
}
}
}
impl Display for HdfsObjectStore {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "HdfsObjectStore")
}
}
impl From<Client> for HdfsObjectStore {
fn from(value: Client) -> Self {
Self { client: value }
}
}
#[async_trait]
impl ObjectStore for HdfsObjectStore {
async fn put_opts(
&self,
location: &Path,
payload: PutPayload,
opts: PutOptions,
) -> Result<PutResult> {
let overwrite = match opts.mode {
PutMode::Create => false,
PutMode::Overwrite => true,
PutMode::Update(_) => {
return Err(object_store::Error::NotImplemented {
operation: "PutOptions with Update precondition".to_string(),
implementer: "HdfsObjectStore".to_string(),
});
}
};
let final_file_path = make_absolute_file(location);
if !overwrite && self.client.get_file_info(&final_file_path).await.is_ok() {
return Err(HdfsError::AlreadyExists(final_file_path)).to_object_store_err();
}
let (mut tmp_file, tmp_file_path) = self.open_tmp_file(&final_file_path).await?;
for bytes in payload {
tmp_file.write(bytes).await.to_object_store_err()?;
}
tmp_file.close().await.to_object_store_err()?;
self.client
.rename(&tmp_file_path, &final_file_path, overwrite)
.await
.to_object_store_err()?;
let e_tag = self
.get_opts(location, GetOptions::default().with_head(true))
.await?
.meta
.e_tag;
Ok(PutResult {
e_tag,
version: None,
})
}
#[allow(deprecated)]
async fn put_multipart_opts(
&self,
location: &Path,
_opts: PutMultipartOpts,
) -> Result<Box<dyn MultipartUpload>> {
let final_file_path = make_absolute_file(location);
let (tmp_file, tmp_file_path) = self.open_tmp_file(&final_file_path).await?;
Ok(Box::new(HdfsMultipartWriter::new(
self.client.clone(),
tmp_file,
&tmp_file_path,
&final_file_path,
)))
}
async fn get_opts(&self, location: &Path, options: GetOptions) -> Result<GetResult> {
let status = self
.client
.get_file_info(&make_absolute_file(location))
.await
.to_object_store_err()?;
if status.isdir {
return Err(object_store::Error::NotFound {
path: location.to_string(),
source: "Head cannot be called on a directory".into(),
});
}
let meta = get_object_meta(&status)?;
options.check_preconditions(&meta)?;
let (range, payload) = if options.head {
(
(0..0),
GetResultPayload::Stream(futures::stream::empty().boxed()),
)
} else {
let range = options
.range
.map(|r| r.as_range(meta.size))
.transpose()
.map_err(|source| generic_error(source.into()))?
.unwrap_or(0..meta.size);
let reader = self
.client
.read(&make_absolute_file(location))
.await
.to_object_store_err()?;
let start: usize = range
.start
.try_into()
.expect("unable to convert range.start to usize");
let end: usize = range
.end
.try_into()
.expect("unable to convert range.end to usize");
let stream = reader
.read_range_stream(start, end - start)
.map(|b| b.to_object_store_err())
.boxed();
(range, GetResultPayload::Stream(stream))
};
Ok(GetResult {
payload,
meta,
range,
attributes: Default::default(),
})
}
fn delete_stream(
&self,
locations: BoxStream<'static, Result<Path>>,
) -> BoxStream<'static, Result<Path>> {
let client = self.client.clone();
locations
.map(move |location| {
let client = client.clone();
async move {
let location = location?;
client
.delete(&make_absolute_file(&location), false)
.await
.to_object_store_err()?;
Ok(location)
}
})
.buffered(10)
.boxed()
}
fn list(&self, prefix: Option<&Path>) -> BoxStream<'static, Result<ObjectMeta>> {
let absolute_dir = prefix.map(make_absolute_file).unwrap_or("/".to_string());
let status_stream = self
.client
.list_status_iter(&absolute_dir, true)
.into_stream()
.filter(move |res| {
let result = match res {
Ok(status) => !status.isdir && status.path != absolute_dir,
Err(HdfsError::FileNotFound(_)) => false,
_ => true,
};
future::ready(result)
})
.map(|res| res.map_or_else(|e| Err(e).to_object_store_err(), |s| get_object_meta(&s)));
Box::pin(status_stream)
}
async fn list_with_delimiter(&self, prefix: Option<&Path>) -> Result<ListResult> {
let absolute_dir = prefix.map(make_absolute_file).unwrap_or("/".to_string());
let mut status_stream = self
.client
.list_status_iter(&absolute_dir, false)
.into_stream()
.filter(move |res| {
let result = match res {
Ok(status) => status.path != absolute_dir,
Err(HdfsError::FileNotFound(_)) => false,
_ => true,
};
future::ready(result)
});
let mut statuses = Vec::<FileStatus>::new();
while let Some(status) = status_stream.next().await {
statuses.push(status.to_object_store_err()?);
}
let mut dirs: Vec<Path> = Vec::new();
for status in statuses.iter().filter(|s| s.isdir) {
dirs.push(Path::parse(&status.path)?)
}
let mut files: Vec<ObjectMeta> = Vec::new();
for status in statuses.iter().filter(|s| !s.isdir) {
files.push(get_object_meta(status)?)
}
Ok(ListResult {
common_prefixes: dirs,
objects: files,
})
}
async fn rename_opts(&self, from: &Path, to: &Path, options: RenameOptions) -> Result<()> {
let mut parent: Vec<_> = to.parts().collect();
parent.pop();
if !parent.is_empty() {
let parent_path: Path = parent.into_iter().collect();
self.client
.mkdirs(&make_absolute_dir(&parent_path), 0o755, true)
.await
.to_object_store_err()?;
}
let overwrite = match options.target_mode {
RenameTargetMode::Overwrite => true,
RenameTargetMode::Create => false,
};
Ok(self
.client
.rename(
&make_absolute_file(from),
&make_absolute_file(to),
overwrite,
)
.await
.to_object_store_err()?)
}
async fn copy_opts(&self, from: &Path, to: &Path, options: CopyOptions) -> Result<()> {
let overwrite = match options.mode {
CopyMode::Create => {
match self.client.get_file_info(&make_absolute_file(to)).await {
Ok(_) => {
return Err(HdfsError::AlreadyExists(make_absolute_file(to)))
.to_object_store_err();
}
Err(HdfsError::FileNotFound(_)) => false,
Err(e) => return Err(e).to_object_store_err(),
}
}
CopyMode::Overwrite => true,
};
let write_options = WriteOptions {
overwrite,
..Default::default()
};
let file = self
.client
.read(&make_absolute_file(from))
.await
.to_object_store_err()?;
let mut stream = file.read_range_stream(0, file.file_length()).boxed();
let mut new_file = self
.client
.create(&make_absolute_file(to), write_options)
.await
.to_object_store_err()?;
while let Some(bytes) = stream.next().await.transpose().to_object_store_err()? {
new_file.write(bytes).await.to_object_store_err()?;
}
new_file.close().await.to_object_store_err()?;
Ok(())
}
}
trait HdfsErrorConvert<T> {
fn to_object_store_err(self) -> Result<T>;
}
impl<T> HdfsErrorConvert<T> for hdfs_native::Result<T> {
fn to_object_store_err(self) -> Result<T> {
self.map_err(|err| match err {
HdfsError::FileNotFound(path) => object_store::Error::NotFound {
path: path.clone(),
source: Box::new(HdfsError::FileNotFound(path)),
},
HdfsError::AlreadyExists(path) => object_store::Error::AlreadyExists {
path: path.clone(),
source: Box::new(HdfsError::AlreadyExists(path)),
},
_ => object_store::Error::Generic {
store: "HdfsObjectStore",
source: Box::new(err),
},
})
}
}
type PartSender = mpsc::UnboundedSender<(oneshot::Sender<Result<()>>, PutPayload)>;
struct HdfsMultipartWriter {
client: Client,
sender: Option<(JoinHandle<Result<()>>, PartSender)>,
tmp_filename: String,
final_filename: String,
}
impl HdfsMultipartWriter {
fn new(client: Client, writer: FileWriter, tmp_filename: &str, final_filename: &str) -> Self {
let (sender, receiver) = mpsc::unbounded_channel();
Self {
client,
sender: Some((Self::start_writer_task(writer, receiver), sender)),
tmp_filename: tmp_filename.to_string(),
final_filename: final_filename.to_string(),
}
}
fn start_writer_task(
mut writer: FileWriter,
mut part_receiver: mpsc::UnboundedReceiver<(oneshot::Sender<Result<()>>, PutPayload)>,
) -> JoinHandle<Result<()>> {
task::spawn(async move {
loop {
match part_receiver.recv().await {
Some((sender, part)) => {
for bytes in part {
if let Err(e) = writer.write(bytes).await {
let _ = sender.send(Err(e).to_object_store_err());
return Err(generic_error("Failed to write all parts".into()));
}
}
let _ = sender.send(Ok(()));
}
None => return writer.close().await.to_object_store_err(),
}
}
})
}
}
impl std::fmt::Debug for HdfsMultipartWriter {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.debug_struct("HdfsMultipartWriter")
.field("tmp_filename", &self.tmp_filename)
.field("final_filename", &self.final_filename)
.finish()
}
}
#[async_trait]
impl MultipartUpload for HdfsMultipartWriter {
fn put_part(&mut self, payload: PutPayload) -> UploadPart {
let (result_sender, result_receiver) = oneshot::channel();
if let Some((_, payload_sender)) = self.sender.as_ref() {
if let Err(mpsc::error::SendError((result_sender, _))) =
payload_sender.send((result_sender, payload))
{
let _ = result_sender.send(Err(generic_error("Write task failed".into())));
}
} else {
let _ = result_sender.send(Err(generic_error(
"Cannot put part after completing or aborting".into(),
)));
}
async {
result_receiver
.await
.unwrap_or_else(|_| Err(generic_error("Write task failed".into())))
}
.boxed()
}
async fn complete(&mut self) -> Result<PutResult> {
if let Some((handle, sender)) = self.sender.take() {
drop(sender);
handle.await??;
self.client
.rename(&self.tmp_filename, &self.final_filename, true)
.await
.to_object_store_err()?;
Ok(PutResult {
e_tag: None,
version: None,
})
} else {
Err(generic_error(
"Cannot call abort or complete multiple times".into(),
))
}
}
async fn abort(&mut self) -> Result<()> {
if let Some((handle, sender)) = self.sender.take() {
drop(sender);
handle.abort();
self.client
.delete(&self.tmp_filename, false)
.await
.to_object_store_err()?;
Ok(())
} else {
Err(generic_error(
"Cannot call abort or complete multiple times".into(),
))
}
}
}
fn make_absolute_file(path: &Path) -> String {
format!("/{}", path.as_ref())
}
fn make_absolute_dir(path: &Path) -> String {
if path.parts().count() > 0 {
format!("/{}/", path.as_ref())
} else {
"/".to_string()
}
}
fn get_etag(status: &FileStatus) -> String {
let size = status.length;
let mtime = status.modification_time;
format!("{mtime:x}-{size:x}")
}
fn get_object_meta(status: &FileStatus) -> Result<ObjectMeta> {
Ok(ObjectMeta {
location: Path::parse(&status.path)?,
last_modified: DateTime::<Utc>::from_timestamp_millis(status.modification_time as i64)
.ok_or(generic_error(
"Last modified timestamp out of bounds".into(),
))?,
size: status.length as u64,
e_tag: Some(get_etag(status)),
version: None,
})
}
#[cfg(test)]
#[cfg(feature = "integration-test")]
mod test {
use std::collections::HashSet;
use object_store::integration::*;
use serial_test::serial;
use tokio::runtime::Runtime;
use crate::HdfsObjectStoreBuilder;
#[tokio::test]
#[serial]
async fn hdfs_test() {
let dfs = hdfs_native::minidfs::MiniDfs::with_features(&HashSet::from([
hdfs_native::minidfs::DfsFeatures::HA,
]));
let integration = HdfsObjectStoreBuilder::new()
.with_url(&dfs.url)
.build()
.unwrap();
put_get_delete_list(&integration).await;
list_uses_directories_correctly(&integration).await;
list_with_delimiter(&integration).await;
rename_and_copy(&integration).await;
copy_if_not_exists(&integration).await;
multipart_race_condition(&integration, true).await;
multipart_out_of_order(&integration).await;
get_opts(&integration).await;
put_opts(&integration, false).await;
}
#[test]
#[serial]
fn test_no_tokio() {
let dfs = hdfs_native::minidfs::MiniDfs::with_features(&HashSet::from([
hdfs_native::minidfs::DfsFeatures::HA,
]));
let integration = HdfsObjectStoreBuilder::new()
.with_url(&dfs.url)
.build()
.unwrap();
futures::executor::block_on(get_opts(&integration));
let rt = Runtime::new().unwrap();
let integration = HdfsObjectStoreBuilder::new()
.with_url(&dfs.url)
.with_io_runtime(rt.handle().clone())
.build()
.unwrap();
futures::executor::block_on(get_opts(&integration));
}
}