mod client;
mod native;
use async_stream::try_stream;
use async_trait::async_trait;
use chrono::{DateTime, Utc};
pub use client::HopsClient;
use futures::{
stream::{BoxStream, StreamExt},
FutureExt,
};
use object_store::path::Error::InvalidPath;
#[allow(deprecated)]
use object_store::{
path::Path, GetOptions, GetRange, GetResult, GetResultPayload, ListResult, MultipartUpload,
ObjectMeta, ObjectStore, PutMode, PutMultipartOpts, PutOptions, PutPayload, PutResult, Result,
UploadPart,
};
use std::collections::VecDeque;
use std::{
collections::HashMap,
fmt::{Display, Formatter},
path::PathBuf,
sync::Arc,
};
use tokio::{
runtime::Handle,
sync::{mpsc, oneshot},
task::JoinHandle,
};
pub type Client = HopsClient;
use crate::client::ReadRangeStream;
pub use crate::client::{FileStatus, FileWriter, HdfsError, WriteOptions};
fn generic_error(
source: Box<dyn std::error::Error + Send + Sync + 'static>,
) -> object_store::Error {
object_store::Error::Generic {
store: "HdfsObjectStore",
source,
}
}
#[derive(Default)]
pub struct HdfsObjectStoreBuilder {
url: Option<String>,
config: HashMap<String, String>,
io_runtime: Option<Handle>,
}
impl HdfsObjectStoreBuilder {
pub fn new() -> Self {
Self::default()
}
pub fn with_url(mut self, url: impl Into<String>) -> Self {
self.url = Some(url.into());
self
}
pub fn with_config(
mut self,
config: impl IntoIterator<Item = (impl Into<String>, impl Into<String>)>,
) -> Self {
self.config = config
.into_iter()
.map(|(k, v)| (k.into(), v.into()))
.collect();
self
}
pub fn with_io_runtime(mut self, runtime: Handle) -> Self {
self.io_runtime = Some(runtime);
self
}
pub fn build(self) -> Result<HdfsObjectStore> {
let url = self.url.ok_or_else(|| generic_error("URL is required".into()))?;
let client = if self.config.is_empty() {
Client::new(&url, None, self.io_runtime.clone()).to_object_store_err()?
} else {
Client::new(&url, Some(self.config), self.io_runtime.clone()).to_object_store_err()?
};
Ok(HdfsObjectStore {
client: Arc::new(client),
io_runtime: self.io_runtime,
})
}
}
#[derive(Debug)]
pub struct HdfsObjectStore {
client: Arc<Client>,
io_runtime: Option<Handle>,
}
impl Clone for HdfsObjectStore {
fn clone(&self) -> Self {
Self {
client: Arc::clone(&self.client),
io_runtime: self.io_runtime.clone(),
}
}
}
impl HdfsObjectStore {
pub fn new(client: Arc<Client>) -> Self {
Self {
client,
io_runtime: None,
}
}
#[deprecated(since = "1.1.0", note = "Use HdfsObjectStoreBuilder instead")]
pub fn with_url(url: &str) -> Result<Self> {
HdfsObjectStoreBuilder::new().with_url(url).build()
}
#[deprecated(since = "1.1.0", note = "Use HdfsObjectStoreBuilder instead")]
pub fn with_config(url: &str, config: HashMap<String, String>) -> Result<Self> {
HdfsObjectStoreBuilder::new()
.with_url(url)
.with_config(config)
.build()
}
async fn internal_copy(&self, from: &Path, to: &Path, overwrite: bool) -> Result<()> {
self.client
.hdfs_copy(
&make_absolute_file(from),
&make_absolute_file(to),
overwrite,
)
.await
.to_object_store_err()
}
async fn open_tmp_file(&self, file_path: &str) -> Result<(FileWriter, String)> {
let path_buf = PathBuf::from(file_path);
let file_name = path_buf
.file_name()
.ok_or(HdfsError::InvalidPath(file_path.to_string()))
.to_object_store_err()?
.to_str()
.ok_or(HdfsError::NoneUnicodeInPath(file_path.to_string()))
.to_object_store_err()?
.to_string();
let tmp_file_path = path_buf
.with_file_name(format!(".{}.tmp", file_name))
.to_str()
.ok_or(HdfsError::NoneUnicodeInPath(file_path.to_string()))
.to_object_store_err()?
.to_string();
let mut index = 1;
loop {
let path = format!("{}.{}", tmp_file_path, index);
match self.client.create(&path, WriteOptions::default()).await {
Ok(writer) => break Ok((writer, path)),
Err(HdfsError::AlreadyExists(_)) => index += 1,
Err(e) => break Err(e).to_object_store_err(),
}
}
}
}
impl Display for HdfsObjectStore {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "HdfsObjectStore")
}
}
impl From<Client> for HdfsObjectStore {
fn from(value: Client) -> Self {
Self::new(Arc::new(value))
}
}
#[async_trait]
impl ObjectStore for HdfsObjectStore {
async fn put_opts(
&self,
location: &Path,
payload: PutPayload,
opts: PutOptions,
) -> Result<PutResult> {
let overwrite = match opts.mode {
PutMode::Create => false,
PutMode::Overwrite => true,
PutMode::Update(_) => {
return Err(object_store::Error::NotSupported {
source: "Update mode not supported".to_string().into(),
})
}
};
let final_file_path = make_absolute_file(location);
let file_exists = self
.client
.check_file_exists(&final_file_path)
.await
.to_object_store_err()?;
if !overwrite && file_exists {
return Err(HdfsError::AlreadyExists(final_file_path)).to_object_store_err();
}
let (tmp_file, tmp_file_path) = self.open_tmp_file(&final_file_path).await?;
for bytes in payload {
tmp_file.hdfs_write(bytes).await.to_object_store_err()?;
}
tmp_file.close_file().await.to_object_store_err()?;
self.client
.rename(&tmp_file_path, &final_file_path, overwrite)
.await
.to_object_store_err()?;
let e_tag = self.head(location).await?.e_tag;
Ok(PutResult {
e_tag,
version: None,
})
}
#[allow(deprecated)]
async fn put_multipart_opts(
&self,
location: &Path,
_opts: PutMultipartOpts,
) -> Result<Box<dyn MultipartUpload>> {
let final_file_path = make_absolute_file(location);
let (tmp_file, tmp_file_path) = self.open_tmp_file(&final_file_path).await?;
Ok(Box::new(HdfsMultipartWriter::new(
Arc::clone(&self.client),
Arc::new(tmp_file),
&tmp_file_path,
&final_file_path,
self.io_runtime.clone(),
)))
}
async fn get_opts(&self, location: &Path, options: GetOptions) -> Result<GetResult> {
let meta = self.head(location).await?;
options.check_preconditions(&meta)?;
let range = options
.range
.map(|r| match r {
GetRange::Bounded(range) => range,
GetRange::Offset(offset) => offset..meta.size,
GetRange::Suffix(suffix) => meta.size.saturating_sub(suffix)..meta.size,
})
.unwrap_or(0..meta.size);
let reader = self
.client
.open_for_read(&make_absolute_file(location))
.await
.to_object_store_err()?;
let start: usize = range
.start
.try_into()
.expect("unable to convert range.start to usize");
let end: usize = range
.end
.try_into()
.expect("unable to convert range.end to usize");
let connection = self.client.get_connection();
let stream = ReadRangeStream::new(connection, reader.file, start, end);
let box_stream = stream.map(|b| b.to_object_store_err()).boxed();
let payload = GetResultPayload::Stream(box_stream);
Ok(GetResult {
payload,
meta,
range,
attributes: Default::default(),
})
}
async fn head(&self, location: &Path) -> Result<ObjectMeta> {
let status = self
.client
.get_file_info(&make_absolute_file(location))
.await
.to_object_store_err()?;
if status.isdir {
return Err(HdfsError::IsADirectoryError(
"Head must be called on a file".to_string(),
))
.to_object_store_err();
}
Ok(ObjectMeta {
location: location.clone(),
last_modified: DateTime::<Utc>::from_timestamp(status.modification_time as i64, 0)
.unwrap(),
size: status
.length
.try_into()
.expect("unable to convert status.length to usize"),
e_tag: Some(get_etag(&status)),
version: None,
})
}
async fn delete(&self, location: &Path) -> Result<()> {
let result = self
.client
.delete(&make_absolute_file(location), false)
.await
.to_object_store_err()?;
if !result {
Err(HdfsError::OperationFailed(
"failed to delete object".to_string(),
))
.to_object_store_err()?
}
Ok(())
}
fn list(&self, prefix: Option<&Path>) -> BoxStream<'static, Result<ObjectMeta>> {
let start_prefix = prefix.map(make_absolute_dir).unwrap_or("".to_string());
let client = Arc::clone(&self.client);
try_stream! {
let mut pending = VecDeque::new();
let initial_objects: Vec<FileStatus> = client.list_directory(&start_prefix).await.to_object_store_err()?;
pending.extend(initial_objects.into_iter());
while let Some(object) = pending.pop_front() {
if object.isdir {
let mut objects = client.list_directory(&object.path).await.to_object_store_err()?;
pending.extend(objects.drain(..));
} else {
yield get_object_meta(&object)?;
}
}
}.boxed()
}
async fn list_with_delimiter(&self, prefix: Option<&Path>) -> Result<ListResult> {
let statuses = self
.client
.list_directory(&prefix.map(make_absolute_dir).unwrap_or("".to_string()))
.await
.to_object_store_err()?;
let mut dirs: Vec<Path> = Vec::new();
for status in statuses.iter().filter(|s| s.isdir) {
dirs.push(Path::parse(&status.path)?)
}
let mut files: Vec<ObjectMeta> = Vec::new();
for status in statuses.iter().filter(|s| !s.isdir) {
files.push(get_object_meta(status)?)
}
Ok(ListResult {
common_prefixes: dirs,
objects: files,
})
}
async fn rename(&self, from: &Path, to: &Path) -> Result<()> {
let mut parent: Vec<_> = to.parts().collect();
parent.pop();
if !parent.is_empty() {
let parent_path: Path = parent.into_iter().collect();
self.client
.mkdir(&make_absolute_dir(&parent_path))
.await
.to_object_store_err()?;
}
Ok(self
.client
.rename(&make_absolute_file(from), &make_absolute_file(to), true)
.await
.to_object_store_err()?)
}
async fn rename_if_not_exists(&self, from: &Path, to: &Path) -> Result<()> {
Ok(self
.client
.rename(&make_absolute_file(from), &make_absolute_file(to), false)
.await
.to_object_store_err()?)
}
async fn copy(&self, from: &Path, to: &Path) -> Result<()> {
self.internal_copy(from, to, true).await
}
async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> Result<()> {
self.internal_copy(from, to, false).await
}
}
trait HdfsErrorConvert<T> {
fn to_object_store_err(self) -> Result<T>;
}
impl<T> HdfsErrorConvert<T> for client::Result<T> {
fn to_object_store_err(self) -> Result<T> {
self.map_err(|err| match err {
HdfsError::FileNotFound(path) => object_store::Error::NotFound {
path: path.clone(),
source: Box::new(HdfsError::FileNotFound(path)),
},
HdfsError::AlreadyExists(path) => object_store::Error::AlreadyExists {
path: path.clone(),
source: Box::new(HdfsError::AlreadyExists(path)),
},
HdfsError::NoneUnicodeInPath(path) => object_store::Error::InvalidPath {
source: InvalidPath {
path: PathBuf::from(path),
},
},
HdfsError::InvalidPath(path) => object_store::Error::InvalidPath {
source: InvalidPath {
path: PathBuf::from(path),
},
},
HdfsError::IsADirectoryError(path) => object_store::Error::Precondition {
path: path.clone(),
source: Box::new(HdfsError::IsADirectoryError(path)),
},
_ => object_store::Error::Generic {
store: "HdfsObjectStore",
source: Box::new(err),
},
})
}
}
type PartSender = mpsc::UnboundedSender<(oneshot::Sender<Result<()>>, PutPayload)>;
struct HdfsMultipartWriter {
client: Arc<Client>,
sender: Option<(JoinHandle<Result<()>>, PartSender)>,
tmp_filename: String,
final_filename: String,
}
impl HdfsMultipartWriter {
fn new(
client: Arc<Client>,
writer: Arc<FileWriter>,
tmp_filename: &str,
final_filename: &str,
io_runtime: Option<Handle>,
) -> Self {
let (sender, receiver) = mpsc::unbounded_channel();
let writer_handle = Self::start_writer_task(Arc::clone(&writer), receiver, io_runtime);
Self {
client,
sender: Some((writer_handle, sender)),
tmp_filename: tmp_filename.to_string(),
final_filename: final_filename.to_string(),
}
}
fn start_writer_task(
writer: Arc<FileWriter>,
mut part_receiver: mpsc::UnboundedReceiver<(oneshot::Sender<Result<()>>, PutPayload)>,
io_runtime: Option<Handle>,
) -> JoinHandle<Result<()>> {
let future = async move {
'outer: loop {
match part_receiver.recv().await {
Some((sender, part)) => {
for bytes in part {
if let Err(e) = writer.hdfs_write(bytes).await.to_object_store_err() {
let _ = sender.send(Err(e));
break 'outer;
}
}
let _ = sender.send(Ok(()));
}
None => {
return writer.close_file().await.to_object_store_err();
}
}
}
while let Some((sender, _)) = part_receiver.recv().await {
let _ = sender.send(
Err(HdfsError::OperationFailed(
"Write failed during one of the parts".to_string(),
))
.to_object_store_err(),
);
}
Err(HdfsError::OperationFailed(
"Write failed during one of the parts".to_string(),
))
.to_object_store_err()
};
match io_runtime {
Some(handle) => handle.spawn(future),
None => tokio::task::spawn(future),
}
}
}
impl std::fmt::Debug for HdfsMultipartWriter {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.debug_struct("HdfsMultipartWriter")
.field("tmp_filename", &self.tmp_filename)
.field("final_filename", &self.final_filename)
.finish()
}
}
#[async_trait]
impl MultipartUpload for HdfsMultipartWriter {
fn put_part(&mut self, payload: PutPayload) -> UploadPart {
let (result_sender, result_receiver) = oneshot::channel();
if let Some((_, payload_sender)) = self.sender.as_ref() {
payload_sender.send((result_sender, payload)).unwrap();
} else {
result_sender
.send(
Err(HdfsError::OperationFailed(
"Cannot put part after completing or aborting".to_string(),
))
.to_object_store_err(),
)
.unwrap();
}
async { result_receiver.await.unwrap() }.boxed()
}
async fn complete(&mut self) -> Result<PutResult> {
if let Some((handle, sender)) = self.sender.take() {
drop(sender);
handle.await??;
self.client
.rename(&self.tmp_filename, &self.final_filename, true)
.await
.to_object_store_err()?;
Ok(PutResult {
e_tag: None,
version: None,
})
} else {
Err(object_store::Error::NotSupported {
source: "Cannot call abort or complete multiple times".into(),
})
}
}
async fn abort(&mut self) -> Result<()> {
if let Some((handle, sender)) = self.sender.take() {
drop(sender);
handle.abort();
self.client
.delete(&self.tmp_filename, false)
.await
.to_object_store_err()?;
Ok(())
} else {
Err(object_store::Error::NotSupported {
source: "Cannot call abort or complete multiple times".into(),
})
}
}
}
fn make_absolute_file(path: &Path) -> String {
format!("/{}", path.as_ref())
}
fn make_absolute_dir(path: &Path) -> String {
if path.parts().count() > 0 {
format!("/{}/", path.as_ref())
} else {
"/".to_string()
}
}
fn get_etag(status: &FileStatus) -> String {
let size = status.length;
let mtime = status.modification_time;
format!("{mtime:x}-{size:x}")
}
fn get_object_meta(status: &FileStatus) -> Result<ObjectMeta> {
Ok(ObjectMeta {
location: Path::parse(&status.path)?,
last_modified: DateTime::<Utc>::from_timestamp(status.modification_time as i64, 0).unwrap(),
size: status
.length
.try_into()
.expect("unable to convert status.length to usize"),
e_tag: Some(get_etag(status)),
version: None,
})
}