use std::sync::Arc;
use object_store::MultipartUpload;
use object_store::ObjectStore;
use object_store::PutPayload;
use object_store::path::Path as ObjectStorePath;
use object_store::{Attribute, AttributeValue};
use mea::mutex::Mutex;
use opendal::raw::oio::MultipartPart;
use opendal::raw::*;
use opendal::*;
use super::core::{format_put_multipart_options, format_put_result, parse_op_write};
use super::error::parse_error;
pub struct ObjectStoreWriter {
store: Arc<dyn ObjectStore + 'static>,
path: ObjectStorePath,
args: OpWrite,
upload: Mutex<Option<Box<dyn MultipartUpload>>>,
}
impl ObjectStoreWriter {
pub fn new(store: Arc<dyn ObjectStore + 'static>, path: &str, args: OpWrite) -> Self {
Self {
store,
path: ObjectStorePath::from(path),
args,
upload: Mutex::new(None),
}
}
}
impl oio::MultipartWrite for ObjectStoreWriter {
async fn write_once(&self, size: u64, body: Buffer) -> Result<Metadata> {
let actual_size = body.len() as u64;
if actual_size != size {
return Err(Error::new(
ErrorKind::Unexpected,
format!("Expected size {size} but got {actual_size}"),
));
}
let bytes = body.to_bytes();
let payload = PutPayload::from(bytes);
let mut opts = parse_op_write(&self.args)?;
opts.attributes.insert(
Attribute::Metadata("content-size".into()),
AttributeValue::from(size.to_string()),
);
let result = self
.store
.put_opts(&self.path, payload, opts)
.await
.map_err(parse_error)?;
let mut metadata = Metadata::new(EntryMode::FILE);
if let Some(etag) = &result.e_tag {
metadata.set_etag(etag);
}
if let Some(version) = &result.version {
metadata.set_version(version);
}
Ok(metadata)
}
async fn initiate_part(&self) -> Result<String> {
let opts = parse_op_write(&self.args)?;
let multipart_opts = format_put_multipart_options(opts);
let upload = self
.store
.put_multipart_opts(&self.path, multipart_opts)
.await
.map_err(parse_error)?;
let mut guard = self.upload.lock().await;
if guard.is_some() {
return Err(Error::new(
ErrorKind::Unexpected,
"Upload already initiated, abort the previous upload first",
));
}
*guard = Some(upload);
Ok("".to_string())
}
async fn write_part(
&self,
_upload_id: &str,
part_number: usize,
size: u64,
body: Buffer,
) -> Result<MultipartPart> {
let actual_size = body.len() as u64;
if actual_size != size {
return Err(Error::new(
ErrorKind::Unexpected,
format!("Expected size {size} but got {actual_size}"),
));
}
let bytes = body.to_bytes();
let etag = String::new();
let payload = PutPayload::from(bytes);
let mut guard = self.upload.lock().await;
let upload = guard
.as_mut()
.ok_or_else(|| Error::new(ErrorKind::Unexpected, "Upload not initiated"))?;
upload.put_part(payload).await.map_err(parse_error)?;
let multipart_part = MultipartPart {
part_number,
etag,
checksum: None, size: None,
};
Ok(multipart_part)
}
async fn complete_part(&self, _upload_id: &str, parts: &[MultipartPart]) -> Result<Metadata> {
if parts.is_empty() {
return Err(Error::new(
ErrorKind::Unexpected,
"Cannot complete multipart upload with no parts",
));
}
let mut guard = self.upload.lock().await;
let upload = guard
.as_mut()
.ok_or_else(|| Error::new(ErrorKind::Unexpected, "Upload not initiated"))?;
let result = upload.complete().await.map_err(parse_error)?;
*guard = None;
let metadata = format_put_result(result);
Ok(metadata)
}
async fn abort_part(&self, _upload_id: &str) -> Result<()> {
let mut guard = self.upload.lock().await;
let upload = guard
.as_mut()
.ok_or_else(|| Error::new(ErrorKind::Unexpected, "Upload not initiated"))?;
upload.abort().await.map_err(parse_error)?;
*guard = None;
Ok(())
}
}