use std::collections::HashMap;
use std::convert::Infallible;
use std::future::Future;
use bytes::Bytes;
use futures::{TryStream, stream};
use heck::ToKebabCase;
use http::{HeaderMap, HeaderName, Method, header};
use serde::{Deserialize, Serialize};
use super::{ServerSideEncryption, StorageClass};
use crate::body::StreamBody;
use crate::error::Result;
use crate::response::HeaderResponseProcessor;
use crate::{BoxError, Client, Ops, Prepared, Request, ser};
#[derive(Debug, Clone, Default, Serialize)]
pub struct PutObjectParams {
#[serde(skip_serializing_if = "Option::is_none")]
pub version_id: Option<String>,
}
impl PutObjectParams {
pub fn new() -> Self {
Self::default()
}
pub fn version_id(mut self, version_id: impl Into<String>) -> Self {
self.version_id = Some(version_id.into());
self
}
}
#[derive(Debug, Clone, Default)]
pub struct PutObjectOptions {
pub cache_control: Option<String>,
pub content_disposition: Option<String>,
pub content_encoding: Option<String>,
pub content_type: Option<String>,
pub expires: Option<String>,
pub forbid_overwrite: Option<bool>,
pub storage_class: Option<StorageClass>,
pub server_side_encryption: Option<ServerSideEncryption>,
pub server_side_encryption_key_id: Option<String>,
pub object_acl: Option<String>,
pub user_meta: HashMap<String, String>,
pub tagging: HashMap<String, String>,
pub content_md5: Option<String>,
}
impl PutObjectOptions {
pub fn cache_control(mut self, cache_control: impl Into<String>) -> Self {
self.cache_control = Some(cache_control.into());
self
}
pub fn content_disposition(mut self, content_disposition: impl Into<String>) -> Self {
self.content_disposition = Some(content_disposition.into());
self
}
pub fn content_encoding(mut self, content_encoding: impl Into<String>) -> Self {
self.content_encoding = Some(content_encoding.into());
self
}
pub fn content_type(mut self, content_type: impl Into<String>) -> Self {
self.content_type = Some(content_type.into());
self
}
pub fn expires(mut self, expires: impl Into<String>) -> Self {
self.expires = Some(expires.into());
self
}
pub fn forbid_overwrite(mut self, forbid: bool) -> Self {
self.forbid_overwrite = Some(forbid);
self
}
pub fn storage_class(mut self, storage_class: StorageClass) -> Self {
self.storage_class = Some(storage_class);
self
}
pub fn server_side_encryption(mut self, encryption: ServerSideEncryption) -> Self {
self.server_side_encryption = Some(encryption);
self
}
pub fn server_side_encryption_key_id(mut self, key_id: impl Into<String>) -> Self {
self.server_side_encryption_key_id = Some(key_id.into());
self
}
pub fn object_acl(mut self, acl: impl Into<String>) -> Self {
self.object_acl = Some(acl.into());
self
}
pub fn user_meta(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
self.user_meta.insert(key.into(), value.into());
self
}
pub fn user_meta_map(mut self, meta: HashMap<String, String>) -> Self {
self.user_meta.extend(meta);
self
}
pub fn tag(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
self.tagging.insert(key.into(), value.into());
self
}
pub fn tagging_map(mut self, tags: HashMap<String, String>) -> Self {
self.tagging.extend(tags);
self
}
pub fn content_md5(mut self, md5: impl Into<String>) -> Self {
self.content_md5 = Some(md5.into());
self
}
}
impl PutObjectOptions {
fn into_headers(self) -> Result<HeaderMap> {
let mut headers = HeaderMap::new();
if let Some(cache_control) = self.cache_control {
headers.insert(header::CACHE_CONTROL, cache_control.parse()?);
}
if let Some(content_disposition) = self.content_disposition {
headers.insert(header::CONTENT_DISPOSITION, content_disposition.parse()?);
}
if let Some(content_encoding) = self.content_encoding {
headers.insert(header::CONTENT_ENCODING, content_encoding.parse()?);
}
if let Some(content_type) = self.content_type {
headers.insert(header::CONTENT_TYPE, content_type.parse()?);
}
if let Some(expires) = self.expires {
headers.insert(header::EXPIRES, expires.parse()?);
}
if let Some(content_md5) = self.content_md5 {
headers.insert(HeaderName::from_static("content-md5"), content_md5.parse()?);
}
if let Some(forbid_overwrite) = self.forbid_overwrite {
headers.insert(
HeaderName::from_static("x-oss-forbid-overwrite"),
forbid_overwrite.to_string().parse()?,
);
}
if let Some(storage_class) = self.storage_class {
headers.insert(HeaderName::from_static("x-oss-storage-class"), storage_class.as_ref().parse()?);
}
if let Some(encryption) = self.server_side_encryption {
headers.insert(
HeaderName::from_static("x-oss-server-side-encryption"),
encryption.as_ref().parse()?,
);
}
if let Some(key_id) = self.server_side_encryption_key_id {
headers.insert(HeaderName::from_static("x-oss-server-side-encryption-key-id"), key_id.parse()?);
}
if let Some(acl) = self.object_acl {
headers.insert(HeaderName::from_static("x-oss-object-acl"), acl.parse()?);
}
for (key, value) in self.user_meta {
let key = key.to_kebab_case().to_lowercase();
let header_name = format!("x-oss-meta-{key}");
headers.insert(HeaderName::from_bytes(header_name.as_bytes())?, value.parse()?);
}
if !self.tagging.is_empty() {
let tagging_str = ser::to_string(&self.tagging)?;
headers.insert(HeaderName::from_static("x-oss-tagging"), tagging_str.parse()?);
}
Ok(headers)
}
}
#[derive(Debug, Clone, Deserialize)]
pub struct PutObjectResponse {
#[serde(rename = "etag")]
pub etag: String,
#[serde(rename = "x-oss-version-id")]
pub version_id: Option<String>,
#[serde(rename = "x-oss-hash-crc64ecma")]
pub hash_crc64ecma: Option<String>,
#[serde(rename = "x-oss-server-side-encryption")]
pub server_side_encryption: Option<String>,
#[serde(rename = "x-oss-server-side-encryption-key-id")]
pub server_side_encryption_key_id: Option<String>,
}
pub struct PutObject<S> {
pub object_key: String,
pub params: PutObjectParams,
pub options: PutObjectOptions,
pub stream_body: S,
}
impl<S> Ops for PutObject<S>
where
S: TryStream + Send + 'static,
S::Error: Into<BoxError>,
Bytes: From<S::Ok>,
{
type Response = HeaderResponseProcessor<PutObjectResponse>;
type Body = StreamBody<S>;
type Query = PutObjectParams;
fn prepare(self) -> Result<Prepared<PutObjectParams, S>> {
Ok(Prepared {
method: Method::PUT,
key: Some(self.object_key),
query: Some(self.params),
headers: Some(self.options.into_headers()?),
body: Some(self.stream_body),
..Default::default()
})
}
}
pub trait PutObjectOperations {
fn put_object<T>(
&self,
object_key: impl Into<String>,
body: T,
options: Option<PutObjectOptions>,
) -> impl Future<Output = Result<PutObjectResponse>>
where
T: Send + 'static,
Bytes: From<T>;
fn put_object_stream<S>(
&self,
object_key: impl Into<String>,
body: S,
options: Option<PutObjectOptions>,
) -> impl Future<Output = Result<PutObjectResponse>>
where
S: TryStream + Send + 'static,
S::Error: Into<BoxError>,
Bytes: From<S::Ok>;
}
impl PutObjectOperations for Client {
async fn put_object<T>(
&self,
object_key: impl Into<String>,
body: T,
options: Option<PutObjectOptions>,
) -> Result<PutObjectResponse>
where
T: Send + 'static,
Bytes: From<T>,
{
let ops = PutObject {
object_key: object_key.into(),
params: PutObjectParams::new(),
options: options.unwrap_or_default(),
stream_body: stream::once(async move { Result::<Bytes, Infallible>::Ok(body.into()) }),
};
self.request(ops).await
}
async fn put_object_stream<S>(
&self,
object_key: impl Into<String>,
stream: S,
options: Option<PutObjectOptions>,
) -> Result<PutObjectResponse>
where
S: TryStream + Send + 'static,
S::Error: Into<BoxError>,
Bytes: From<S::Ok>,
{
let ops = PutObject {
object_key: object_key.into(),
params: PutObjectParams::new(),
options: options.unwrap_or_default(),
stream_body: stream,
};
self.request(ops).await
}
}
#[derive(Debug, Clone, Default)]
pub struct PutObjectRequestBuilder {
options: PutObjectOptions,
}
impl PutObjectRequestBuilder {
pub fn new() -> Self {
Self::default()
}
pub fn cache_control(mut self, cache_control: impl Into<String>) -> Self {
self.options.cache_control = Some(cache_control.into());
self
}
pub fn content_disposition(mut self, content_disposition: impl Into<String>) -> Self {
self.options.content_disposition = Some(content_disposition.into());
self
}
pub fn content_encoding(mut self, content_encoding: impl Into<String>) -> Self {
self.options.content_encoding = Some(content_encoding.into());
self
}
pub fn content_type(mut self, content_type: impl Into<String>) -> Self {
self.options.content_type = Some(content_type.into());
self
}
pub fn expires(mut self, expires: impl Into<String>) -> Self {
self.options.expires = Some(expires.into());
self
}
pub fn forbid_overwrite(mut self, forbid: bool) -> Self {
self.options.forbid_overwrite = Some(forbid);
self
}
pub fn storage_class(mut self, storage_class: StorageClass) -> Self {
self.options.storage_class = Some(storage_class);
self
}
pub fn server_side_encryption(mut self, encryption: ServerSideEncryption) -> Self {
self.options.server_side_encryption = Some(encryption);
self
}
pub fn server_side_encryption_key_id(mut self, key_id: impl Into<String>) -> Self {
self.options.server_side_encryption_key_id = Some(key_id.into());
self
}
pub fn object_acl(mut self, acl: impl Into<String>) -> Self {
self.options.object_acl = Some(acl.into());
self
}
pub fn user_meta(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
self.options.user_meta.insert(key.into(), value.into());
self
}
pub fn tag(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
self.options.tagging.insert(key.into(), value.into());
self
}
pub fn content_md5(mut self, md5: impl Into<String>) -> Self {
self.options.content_md5 = Some(md5.into());
self
}
pub fn build(self) -> PutObjectOptions {
self.options
}
}