use std::collections::HashMap;
use std::fmt::{Debug, Formatter};
use std::pin::Pin;
use std::task::{Context, Poll};
use async_trait::async_trait;
use bytes::Bytes;
use chrono::{DateTime, Utc};
use futures_core::Stream;
use futures_util::StreamExt;
use ve_tos_generic::{AclHeader, CallbackHeader, HttpBasicHeader, MiscHeader, SsecHeader, SseHeader};
use crate::asynchronous::http::HttpResponse;
use crate::asynchronous::internal::{OutputParser, parse_json, parse_json_by_buf, parse_response_string_by_buf, read_response};
use crate::common::{Meta, RequestInfo, RequestInfoTrait};
use crate::constant::{HEADER_CALLBACK, HEADER_CONTENT_LENGTH, HEADER_CONTENT_RANGE, HEADER_DELETE_MARKER, HEADER_ETAG, HEADER_HASH_CRC64ECMA, HEADER_SERVER_SIDE_ENCRYPTION, HEADER_SERVER_SIDE_ENCRYPTION_KMS_KEY_ID, HEADER_SSEC_ALGORITHM, HEADER_SSEC_KEY_MD5, HEADER_VERSION_ID, TRUE};
use crate::enumeration::{ReplicationStatusType, StorageClassType};
use crate::error::{ErrorResponse, TosError};
use crate::http::HttpRequest;
use crate::internal::{get_header_value, get_header_value_from_str, get_map_value_str, InputDescriptor, InputTranslator, parse_date_time_iso8601};
use crate::object::{DeleteObjectInput, DeleteObjectOutput, GetObjectInput, HeadObjectInput, HeadObjectOutput, ListObjectsType2Input, ListObjectsType2Output, PutObjectBasicInput, PutObjectOutput, RestoreInfo};
use crate::reader::{BuildBufferReader, InternalReader};
#[async_trait]
pub trait ObjectAPI {
async fn put_object<B>(&self, input: &mut PutObjectInput<B>) -> Result<PutObjectOutput, TosError> where B: Stream<Item=Result<Bytes, crate::error::CommonError>> + Send + Sync + Unpin + 'static;
async fn put_object_from_buffer(&self, input: &PutObjectFromBufferInput) -> Result<PutObjectOutput, TosError>;
async fn get_object(&self, input: &GetObjectInput) -> Result<GetObjectOutput, TosError>;
async fn delete_object(&self, input: &DeleteObjectInput) -> Result<DeleteObjectOutput, TosError>;
async fn head_object(&self, input: &HeadObjectInput) -> Result<HeadObjectOutput, TosError>;
async fn list_objects_type2(&self, input: &ListObjectsType2Input) -> Result<ListObjectsType2Output, TosError>;
}
#[async_trait]
impl OutputParser for DeleteObjectOutput {
async fn parse<B>(_: HttpRequest<'_, B>, response: HttpResponse, request_info: RequestInfo, _: Meta) -> Result<Self, TosError> where B: Send {
let version_id = get_header_value(response.headers(), HEADER_VERSION_ID);
let delete_marker = get_header_value(response.headers(), HEADER_DELETE_MARKER) == TRUE;
Ok(Self {
request_info,
delete_marker,
version_id,
})
}
}
#[async_trait]
impl OutputParser for HeadObjectOutput {
async fn parse<B>(_: HttpRequest<'_, B>, response: HttpResponse, request_info: RequestInfo, meta: Meta) -> Result<Self, TosError> where B: Send {
Self::parse_by_header(response.headers(), request_info, meta)
}
}
#[async_trait]
impl OutputParser for ListObjectsType2Output {
async fn parse<B>(_: HttpRequest<'_, B>, response: HttpResponse, request_info: RequestInfo, _: Meta) -> Result<Self, TosError> where B: Send {
let mut result = parse_json::<Self>(response).await?;
for content in &mut result.contents {
if let Some(x) = content.last_modified_string.take() {
content.last_modified = parse_date_time_iso8601(&x)?;
}
if let Some(x) = content.user_meta.take() {
let mut meta = HashMap::with_capacity(x.len());
for item in x {
meta.insert(item.key, item.value);
}
content.meta = meta;
}
}
result.request_info = request_info;
Ok(result)
}
}
#[derive(Default)]
pub struct GetObjectOutput {
pub(crate) content_range: String,
pub(crate) content: Option<InternalReader<Box<dyn Stream<Item=reqwest::Result<Bytes>> + Send + Sync + Unpin>>>,
pub(crate) head_object_output: HeadObjectOutput,
}
impl Debug for GetObjectOutput {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "({:?}, {})", self.head_object_output, self.content_range)
}
}
#[async_trait]
impl OutputParser for GetObjectOutput {
async fn parse<B>(_: HttpRequest<'_, B>, response: HttpResponse, request_info: RequestInfo, meta: Meta) -> Result<Self, TosError> where B: Send {
let head_object_output = HeadObjectOutput::parse_by_header(response.headers(), request_info, meta)?;
let content_range = get_header_value(response.headers(), HEADER_CONTENT_RANGE);
let reader = Box::new(response.bytes_stream()) as Box<dyn Stream<Item=reqwest::Result<Bytes>> + Send + Sync + Unpin>;
let content;
if head_object_output.content_length >= 0 {
content = Some(InternalReader::sized(reader, head_object_output.content_length as usize));
} else {
content = Some(InternalReader::new(reader));
}
Ok(Self {
content_range,
content,
head_object_output,
})
}
}
impl Stream for GetObjectOutput {
type Item = Result<Bytes, crate::error::CommonError>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
match self.content.as_mut() {
None => Poll::Ready(None),
Some(reader) => {
reader.poll_next_unpin(cx)
}
}
}
}
impl RequestInfoTrait for GetObjectOutput {
fn request_id(&self) -> &str {
&self.head_object_output.request_info.request_id
}
fn id2(&self) -> &str {
&self.head_object_output.request_info.id2
}
fn status_code(&self) -> isize {
self.head_object_output.request_info.status_code
}
fn header(&self) -> &HashMap<String, String> {
&self.head_object_output.request_info.header
}
}
impl GetObjectOutput {
pub fn request_id(&self) -> &str {
&self.head_object_output.request_info.request_id
}
pub fn id2(&self) -> &str {
&self.head_object_output.request_info.id2
}
pub fn status_code(&self) -> isize {
self.head_object_output.request_info.status_code
}
pub fn header(&self) -> &HashMap<String, String> {
&self.head_object_output.request_info.header
}
pub fn content_range(&self) -> &str {
&self.content_range
}
pub fn content(&mut self) -> Option<&mut (dyn Stream<Item=Result<Bytes, crate::error::CommonError>> + Unpin)> {
match self.content.as_mut() {
None => None,
Some(x) => Some(x)
}
}
pub fn etag(&self) -> &str {
&self.head_object_output.etag
}
pub fn last_modified(&self) -> Option<DateTime<Utc>> {
self.head_object_output.last_modified
}
pub fn delete_marker(&self) -> bool {
self.head_object_output.delete_marker
}
pub fn ssec_algorithm(&self) -> &str {
&self.head_object_output.ssec_algorithm
}
pub fn ssec_key_md5(&self) -> &str {
&self.head_object_output.ssec_key_md5
}
pub fn version_id(&self) -> &str {
&self.head_object_output.version_id
}
pub fn website_redirect_location(&self) -> &str {
&self.head_object_output.website_redirect_location
}
pub fn object_type(&self) -> &str {
&self.head_object_output.object_type
}
pub fn hash_crc64ecma(&self) -> u64 {
self.head_object_output.hash_crc64ecma
}
pub fn storage_class(&self) -> &Option<StorageClassType> {
&self.head_object_output.storage_class
}
pub fn meta(&self) -> &HashMap<String, String> {
&self.head_object_output.meta
}
pub fn content_length(&self) -> i64 {
self.head_object_output.content_length
}
pub fn cache_control(&self) -> &str {
&self.head_object_output.cache_control
}
pub fn content_disposition(&self) -> &str {
&self.head_object_output.content_disposition
}
pub fn content_encoding(&self) -> &str {
&self.head_object_output.content_encoding
}
pub fn content_language(&self) -> &str {
&self.head_object_output.content_language
}
pub fn content_type(&self) -> &str {
&self.head_object_output.content_type
}
pub fn expires(&self) -> Option<DateTime<Utc>> {
self.head_object_output.expires
}
pub fn restore_info(&self) -> &Option<RestoreInfo> {
&self.head_object_output.restore_info
}
pub fn server_side_encryption(&self) -> &str {
&self.head_object_output.server_side_encryption
}
pub fn server_side_encryption_key_id(&self) -> &str {
&self.head_object_output.server_side_encryption_key_id
}
pub fn replication_status(&self) -> &Option<ReplicationStatusType> {
&self.head_object_output.replication_status
}
}
#[derive(Debug, HttpBasicHeader, AclHeader, SseHeader, SsecHeader, MiscHeader, CallbackHeader)]
#[enable_content_length]
#[handle_async]
#[use_inner]
pub struct PutObjectInput<B> where B: Stream<Item=Result<Bytes, crate::error::CommonError>> + Send + Unpin {
pub(crate) inner: PutObjectBasicInput,
pub(crate) content: Option<B>,
}
impl<B> InputDescriptor for PutObjectInput<B> where B: Send + Stream<Item=Result<Bytes, crate::error::CommonError>> + Unpin {
fn operation(&self) -> &str {
"PutObject"
}
fn bucket(&self) -> Result<&str, TosError> {
Ok(&self.inner.bucket)
}
fn key(&self) -> Result<&str, TosError> {
Ok(&self.inner.key)
}
}
impl<B> InputTranslator<B> for PutObjectInput<B> where B: Stream<Item=Result<Bytes, crate::error::CommonError>> + Send + Unpin {
fn trans_mut(&mut self) -> Result<HttpRequest<B>, TosError> {
let mut request = self.inner.trans()?;
request.body = self.content.take();
request.operation = self.operation();
Ok(request)
}
}
impl<B> Default for PutObjectInput<B> where B: Stream<Item=Result<Bytes, crate::error::CommonError>> + Send + Unpin {
fn default() -> Self {
Self {
inner: Default::default(),
content: None,
}
}
}
impl<B> PutObjectInput<B> where B: Stream<Item=Result<Bytes, crate::error::CommonError>> + Send + Unpin {
pub fn new(bucket: impl Into<String>, key: impl Into<String>) -> Self {
let mut input = Self::default();
input.inner.bucket = bucket.into();
input.inner.key = key.into();
input
}
pub fn new_with_content(bucket: impl Into<String>, key: impl Into<String>, content: impl Into<B>) -> Self {
let mut input = Self::default();
input.inner.bucket = bucket.into();
input.inner.key = key.into();
input.set_content(content);
input
}
pub fn bucket(&self) -> &str {
&self.inner.bucket
}
pub fn key(&self) -> &str {
&self.inner.key
}
pub fn content(&self) -> &Option<B> {
&self.content
}
pub fn content_md5(&self) -> &str {
&self.inner.content_md5
}
pub fn content_sha256(&self) -> &str {
&self.inner.content_sha256
}
pub fn meta(&self) -> &HashMap<String, String> {
&self.inner.meta
}
pub fn traffic_limit(&self) -> i64 {
self.inner.traffic_limit
}
pub fn forbid_overwrite(&self) -> bool {
self.inner.forbid_overwrite
}
pub fn if_match(&self) -> &str {
&self.inner.if_match
}
pub fn tagging(&self) -> &str {
&self.inner.tagging
}
pub fn set_bucket(&mut self, bucket: impl Into<String>) {
self.inner.bucket = bucket.into();
}
pub fn set_key(&mut self, key: impl Into<String>) {
self.inner.key = key.into();
}
pub fn set_content(&mut self, content: impl Into<B>) {
self.content = Some(content.into());
}
pub fn set_content_md5(&mut self, content_md5: impl Into<String>) {
self.inner.content_md5 = content_md5.into();
}
pub fn set_content_sha256(&mut self, content_sha256: impl Into<String>) {
self.inner.content_sha256 = content_sha256.into();
}
pub fn set_meta(&mut self, meta: impl Into<HashMap<String, String>>) {
self.inner.meta = meta.into();
}
pub fn set_traffic_limit(&mut self, traffic_limit: i64) {
self.inner.traffic_limit = traffic_limit;
}
pub fn set_forbid_overwrite(&mut self, forbid_overwrite: bool) {
self.inner.forbid_overwrite = forbid_overwrite;
}
pub fn set_if_match(&mut self, if_match: impl Into<String>) {
self.inner.if_match = if_match.into();
}
pub fn set_tagging(&mut self, tagging: impl Into<String>) {
self.inner.tagging = tagging.into();
}
}
#[async_trait]
impl OutputParser for PutObjectOutput {
async fn parse<B>(request: HttpRequest<'_, B>, response: HttpResponse, request_info: RequestInfo, _: Meta) -> Result<Self, TosError> where B: Send {
let mut result = Self::default();
result.etag = get_header_value(response.headers(), HEADER_ETAG);
result.version_id = get_header_value(response.headers(), HEADER_VERSION_ID);
result.ssec_algorithm = get_header_value(response.headers(), HEADER_SSEC_ALGORITHM);
result.ssec_key_md5 = get_header_value(response.headers(), HEADER_SSEC_KEY_MD5);
result.hash_crc64ecma = get_header_value_from_str::<u64>(response.headers(), HEADER_HASH_CRC64ECMA, 0)?;
result.server_side_encryption = get_header_value(response.headers(), HEADER_SERVER_SIDE_ENCRYPTION);
result.server_side_encryption_key_id = get_header_value(response.headers(), HEADER_SERVER_SIDE_ENCRYPTION_KMS_KEY_ID);
if get_map_value_str(&request.header, HEADER_CALLBACK) != "" { let buf = read_response(response).await?;
if request_info.status_code == 203 {
if let Ok(error_response) = parse_json_by_buf::<ErrorResponse>(buf.as_slice()) {
return Err(TosError::server_error_with_code(error_response.code, error_response.ec, error_response.message,
error_response.host_id, error_response.resource, request_info));
}
}
result.callback_result = parse_response_string_by_buf(buf)?;
}
result.request_info = request_info;
Ok(result)
}
}
#[derive(Debug, HttpBasicHeader, AclHeader, SseHeader, SsecHeader, MiscHeader, CallbackHeader)]
#[enable_content_length]
#[use_inner]
pub struct PutObjectFromBufferInput {
pub(crate) inner: PutObjectBasicInput,
pub(crate) content: Option<Vec<u8>>,
}
impl InputDescriptor for PutObjectFromBufferInput {
fn operation(&self) -> &str {
"PutObjectFromBuffer"
}
fn bucket(&self) -> Result<&str, TosError> {
Ok(&self.inner.bucket)
}
fn key(&self) -> Result<&str, TosError> {
Ok(&self.inner.key)
}
}
impl<B> InputTranslator<B> for PutObjectFromBufferInput where B: Stream<Item=Result<Bytes, crate::error::CommonError>> + Send + Sync + Unpin + BuildBufferReader {
fn trans(&self) -> Result<HttpRequest<B>, TosError> {
let mut request = self.inner.trans()?;
request.operation = self.operation();
if let Some(content) = &self.content {
let (body, len) = B::new(content.to_owned())?;
request.body = Some(body);
if self.inner.content_length < 0 {
request.header.insert(HEADER_CONTENT_LENGTH, len.to_string());
}
}
Ok(request)
}
}
impl PutObjectFromBufferInput {
pub fn new(bucket: impl Into<String>, key: impl Into<String>) -> Self {
let mut input = Self::default();
input.inner.bucket = bucket.into();
input.inner.key = key.into();
input
}
pub fn new_with_content(bucket: impl Into<String>, key: impl Into<String>, content: impl AsRef<[u8]>) -> Self {
let mut input = Self::default();
input.inner.bucket = bucket.into();
input.inner.key = key.into();
input.set_content(content);
input
}
pub fn bucket(&self) -> &str {
&self.inner.bucket
}
pub fn key(&self) -> &str {
&self.inner.key
}
pub fn content(&self) -> &Option<impl AsRef<[u8]>> {
&self.content
}
pub fn content_md5(&self) -> &str {
&self.inner.content_md5
}
pub fn content_sha256(&self) -> &str {
&self.inner.content_sha256
}
pub fn meta(&self) -> &HashMap<String, String> {
&self.inner.meta
}
pub fn traffic_limit(&self) -> i64 {
self.inner.traffic_limit
}
pub fn forbid_overwrite(&self) -> bool {
self.inner.forbid_overwrite
}
pub fn if_match(&self) -> &str {
&self.inner.if_match
}
pub fn tagging(&self) -> &str {
&self.inner.tagging
}
pub fn set_bucket(&mut self, bucket: impl Into<String>) {
self.inner.bucket = bucket.into();
}
pub fn set_key(&mut self, key: impl Into<String>) {
self.inner.key = key.into();
}
pub fn set_content(&mut self, content: impl AsRef<[u8]>) {
self.content = Some(content.as_ref().to_owned());
}
pub fn set_content_md5(&mut self, content_md5: impl Into<String>) {
self.inner.content_md5 = content_md5.into();
}
pub fn set_content_sha256(&mut self, content_sha256: impl Into<String>) {
self.inner.content_sha256 = content_sha256.into();
}
pub fn set_meta(&mut self, meta: impl Into<HashMap<String, String>>) {
self.inner.meta = meta.into();
}
pub fn set_traffic_limit(&mut self, traffic_limit: i64) {
self.inner.traffic_limit = traffic_limit;
}
pub fn set_forbid_overwrite(&mut self, forbid_overwrite: bool) {
self.inner.forbid_overwrite = forbid_overwrite;
}
pub fn set_if_match(&mut self, if_match: impl Into<String>) {
self.inner.if_match = if_match.into();
}
pub fn set_tagging(&mut self, tagging: impl Into<String>) {
self.inner.tagging = tagging.into();
}
}
impl Default for PutObjectFromBufferInput {
fn default() -> Self {
Self {
inner: Default::default(),
content: None,
}
}
}