Skip to main content

ve_tos_rust_sdk/asynchronous/
object.rs

1/*
2 * Copyright (c) 2025 Beijing Volcano Engine Technology Co., Ltd.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16use crate::constant::{HEADER_COPY_SOURCE_VERSION_ID, HEADER_LAST_MODIFIED, HEADER_NEXT_APPEND_OFFSET, HEADER_NEXT_MODIFY_OFFSET, HEADER_RANGE, HEADER_SYMLINK_BUCKET, HEADER_SYMLINK_TARGET, HEADER_TRANSFER_ENCODING_LOWER, QUERY_PROCESS};
17use std::collections::HashMap;
18use std::pin::Pin;
19use std::sync::atomic::AtomicU64;
20use std::sync::Arc;
21use std::task::{Context, Poll};
22
23use crate::asynchronous::common::DataTransferListener;
24use crate::asynchronous::http::HttpResponse;
25use crate::asynchronous::internal::{parse_json, read_response, OutputParser};
26use crate::asynchronous::reader::StreamAdapter;
27use crate::common::{DataTransferStatus, Meta, RequestInfo, TempCopyResult, TempFetchResult};
28use crate::constant::{HEADER_CALLBACK, HEADER_CONTENT_RANGE, HEADER_DELETE_MARKER, HEADER_ETAG, HEADER_HASH_CRC64ECMA, HEADER_PREFIX_META, HEADER_SERVER_SIDE_ENCRYPTION, HEADER_SERVER_SIDE_ENCRYPTION_KMS_KEY_ID, HEADER_SSEC_ALGORITHM, HEADER_SSEC_KEY_MD5, HEADER_VERSION_ID, TRUE};
29use crate::error::{ErrorResponse, GenericError, TosError};
30use crate::http::HttpRequest;
31use crate::internal::{get_header_value, get_header_value_from_str, get_header_value_str, get_map_value_str, parse_date_time_iso8601, parse_date_time_rfc1123, parse_json_by_buf, parse_response_string_by_buf};
32use crate::object::{AppendObjectFromBufferInput, AppendObjectInput, AppendObjectOutput, CopyObjectInput, CopyObjectOutput, DeleteMultiObjectsInput, DeleteMultiObjectsOutput, DeleteObjectInput, DeleteObjectOutput, DeleteObjectTaggingInput, DeleteObjectTaggingOutput, DoesObjectExistInput, FetchObjectInput, FetchObjectOutput, GetFetchTaskInput, GetFetchTaskOutput, GetFileStatusInput, GetFileStatusOutput, GetObjectACLInput, GetObjectACLOutput, GetObjectInput, GetObjectOutput, GetObjectTaggingInput, GetObjectTaggingOutput, GetSymlinkInput, GetSymlinkOutput, HeadObjectInput, HeadObjectOutput, ListObjectVersionsInput, ListObjectVersionsOutput, ListObjectsType2Input, ListObjectsType2Output, ModifyObjectFromBufferInput, ModifyObjectInput, ModifyObjectOutput, PutFetchTaskInput, PutFetchTaskOutput, PutObjectACLInput, PutObjectACLOutput, PutObjectFromBufferInput, PutObjectInput, PutObjectOutput, PutObjectTaggingInput, PutObjectTaggingOutput, PutSymlinkInput, PutSymlinkOutput, RenameObjectInput, RenameObjectOutput, RestoreObjectInput, RestoreObjectOutput, SetObjectMetaInput, SetObjectMetaOutput, SetObjectTimeInput, SetObjectTimeOutput};
33use crate::reader::MultifunctionalReader;
34use async_trait::async_trait;
35use bytes::Bytes;
36use futures_core::Stream;
37use futures_util::StreamExt;
38
39#[async_trait]
40pub trait ObjectAPI {
41    async fn copy_object(&self, input: &CopyObjectInput) -> Result<CopyObjectOutput, TosError>;
42    async fn delete_object(&self, input: &DeleteObjectInput) -> Result<DeleteObjectOutput, TosError>;
43    async fn delete_multi_objects(&self, input: &DeleteMultiObjectsInput) -> Result<DeleteMultiObjectsOutput, TosError>;
44    async fn put_object<B>(&self, input: &PutObjectInput<B>) -> Result<PutObjectOutput, TosError>
45    where
46        B: Stream<Item=Result<Bytes, crate::error::CommonError>> + Send + Sync + Unpin + 'static;
47    async fn put_object_from_buffer(&self, input: &PutObjectFromBufferInput) -> Result<PutObjectOutput, TosError>;
48    #[cfg(feature = "tokio-runtime")]
49    async fn put_object_from_file(&self, input: &crate::object::PutObjectFromFileInput) -> Result<PutObjectOutput, TosError>;
50    async fn append_object<B>(&self, input: &AppendObjectInput<B>) -> Result<AppendObjectOutput, TosError>
51    where
52        B: Stream<Item=Result<Bytes, crate::error::CommonError>> + Send + Sync + Unpin + 'static;
53    async fn append_object_from_buffer(&self, input: &AppendObjectFromBufferInput) -> Result<AppendObjectOutput, TosError>;
54    #[cfg(feature = "tokio-runtime")]
55    async fn append_object_from_file(&self, input: &crate::object::AppendObjectFromFileInput) -> Result<AppendObjectOutput, TosError>;
56    async fn get_object(&self, input: &GetObjectInput) -> Result<GetObjectOutput, TosError>;
57    #[cfg(feature = "tokio-runtime")]
58    async fn get_object_to_file(&self, input: &crate::object::GetObjectToFileInput) -> Result<crate::object::GetObjectToFileOutput, TosError>;
59    async fn get_object_acl(&self, input: &GetObjectACLInput) -> Result<GetObjectACLOutput, TosError>;
60    async fn head_object(&self, input: &HeadObjectInput) -> Result<HeadObjectOutput, TosError>;
61    async fn list_objects_type2(&self, input: &ListObjectsType2Input) -> Result<ListObjectsType2Output, TosError>;
62    async fn list_object_versions(&self, input: &ListObjectVersionsInput) -> Result<ListObjectVersionsOutput, TosError>;
63    async fn put_object_acl(&self, input: &PutObjectACLInput) -> Result<PutObjectACLOutput, TosError>;
64    async fn set_object_meta(&self, input: &SetObjectMetaInput) -> Result<SetObjectMetaOutput, TosError>;
65    async fn fetch_object(&self, input: &FetchObjectInput) -> Result<FetchObjectOutput, TosError>;
66    async fn put_fetch_task(&self, input: &PutFetchTaskInput) -> Result<PutFetchTaskOutput, TosError>;
67    async fn get_fetch_task(&self, input: &GetFetchTaskInput) -> Result<GetFetchTaskOutput, TosError>;
68    async fn put_object_tagging(&self, input: &PutObjectTaggingInput) -> Result<PutObjectTaggingOutput, TosError>;
69    async fn get_object_tagging(&self, input: &GetObjectTaggingInput) -> Result<GetObjectTaggingOutput, TosError>;
70    async fn delete_object_tagging(&self, input: &DeleteObjectTaggingInput) -> Result<DeleteObjectTaggingOutput, TosError>;
71    async fn rename_object(&self, input: &RenameObjectInput) -> Result<RenameObjectOutput, TosError>;
72    async fn restore_object(&self, input: &RestoreObjectInput) -> Result<RestoreObjectOutput, TosError>;
73    async fn put_symlink(&self, input: &PutSymlinkInput) -> Result<PutSymlinkOutput, TosError>;
74    async fn get_symlink(&self, input: &GetSymlinkInput) -> Result<GetSymlinkOutput, TosError>;
75    async fn get_file_status(&self, input: &GetFileStatusInput) -> Result<GetFileStatusOutput, TosError>;
76    async fn does_object_exist(&self, input: &DoesObjectExistInput) -> Result<bool, TosError>;
77    async fn set_object_time(&self, input: &SetObjectTimeInput) -> Result<SetObjectTimeOutput, TosError>;
78}
79
80#[async_trait]
81pub trait ObjectContent {
82    type Content: ?Sized;
83
84    fn content(&mut self) -> Option<&mut Self::Content>;
85
86    async fn read_all(&mut self) -> Result<Vec<u8>, TosError>;
87}
88
89#[async_trait]
90impl OutputParser for DeleteObjectOutput {
91    async fn parse<B>(_: HttpRequest<'_, B>, response: HttpResponse, request_info: RequestInfo, _: Meta) -> Result<Self, TosError>
92    where
93        B: Send,
94    {
95        let version_id = get_header_value(response.headers(), HEADER_VERSION_ID);
96        let delete_marker = get_header_value_str(response.headers(), HEADER_DELETE_MARKER) == TRUE;
97        Ok(Self {
98            request_info,
99            delete_marker,
100            version_id,
101        })
102    }
103}
104
105#[async_trait]
106impl OutputParser for HeadObjectOutput {
107    async fn parse<B>(_: HttpRequest<'_, B>, response: HttpResponse, request_info: RequestInfo, meta: Meta) -> Result<Self, TosError>
108    where
109        B: Send,
110    {
111        Self::parse_by_header(response.headers(), request_info, meta)
112    }
113}
114
115#[async_trait]
116impl OutputParser for ListObjectsType2Output {
117    async fn parse<B>(_: HttpRequest<'_, B>, response: HttpResponse, request_info: RequestInfo, _: Meta) -> Result<Self, TosError>
118    where
119        B: Send,
120    {
121        let mut result = parse_json::<Self>(response).await?;
122        for content in &mut result.contents {
123            if let Some(x) = content.last_modified_string.take() {
124                content.last_modified = parse_date_time_iso8601(&x)?;
125            }
126
127            if let Some(x) = content.user_meta.take() {
128                let mut meta = HashMap::with_capacity(x.len());
129                for item in x {
130                    if let Ok(dk) = urlencoding::decode(&item.key[HEADER_PREFIX_META.len()..]) {
131                        if let Ok(dv) = urlencoding::decode(item.value.as_str()) {
132                            meta.insert(dk.to_string(), dv.to_string());
133                        }
134                    }
135                }
136                content.meta = meta;
137            }
138        }
139
140        for common_prefix in &mut result.common_prefixes {
141            if let Some(x) = common_prefix.last_modified_string.take() {
142                common_prefix.last_modified = parse_date_time_iso8601(&x)?;
143            }
144        }
145        result.request_info = request_info;
146        Ok(result)
147    }
148}
149
150
151#[async_trait]
152impl OutputParser for GetObjectOutput {
153    async fn parse<B>(request: HttpRequest<'_, B>, response: HttpResponse, request_info: RequestInfo, meta: Meta) -> Result<Self, TosError>
154    where
155        B: Send,
156    {
157        let transfer_encoding = request_info.header.get(HEADER_TRANSFER_ENCODING_LOWER).map(|x| x.to_string());
158        let head_object_output = HeadObjectOutput::parse_by_header(response.headers(), request_info, meta)?;
159        let content_range = get_header_value(response.headers(), HEADER_CONTENT_RANGE);
160        let content = Box::new(StreamAdapter::new(response.bytes_stream())) as Box<dyn Stream<Item=Result<Bytes, crate::error::CommonError>> + Send + Unpin>;
161        let mut target_crc64 = None;
162        if request.enable_crc && !request.header.contains_key(HEADER_RANGE) &&
163            (request.query.is_none() || !request.query.as_ref().unwrap().contains_key(QUERY_PROCESS)) {
164            if let Some(te) = transfer_encoding {
165                if te != "chunked" {
166                    target_crc64 = Some(head_object_output.hash_crc64ecma);
167                }
168            } else {
169                target_crc64 = Some(head_object_output.hash_crc64ecma);
170            }
171        }
172        let mut crc64 = None;
173        if target_crc64.is_some() {
174            crc64 = Some(Arc::new(AtomicU64::new(0)));
175        }
176        let mut reader = MultifunctionalReader::with_target_crc64(content, crc64, head_object_output.content_length,
177                                                                  &request, target_crc64);
178
179        if let Some(ref rc) = request.request_context {
180            if let Some(ref rl) = rc.rate_limiter {
181                reader.set_rate_limiter(rl.clone());
182            }
183            if let Some(ref adts) = rc.async_data_transfer_listener {
184                reader.set_async_data_transfer_listener(adts.clone());
185                reader.inner.operation = request.operation.to_string();
186                reader.inner.bucket = request.bucket.to_string();
187                reader.inner.key = request.key.to_string();
188                reader.inner.retry_count = request.retry_count;
189            }
190        }
191
192        Ok(Self {
193            content_range,
194            content: None,
195            async_content: Some(reader),
196            head_object_output,
197        })
198    }
199}
200
201impl Stream for GetObjectOutput {
202    type Item = Result<Bytes, crate::error::CommonError>;
203
204    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
205        match self.async_content.as_mut() {
206            None => Poll::Ready(None),
207            Some(reader) => {
208                reader.poll_next_unpin(cx)
209            }
210        }
211    }
212}
213#[async_trait]
214impl ObjectContent for GetObjectOutput {
215    type Content = dyn Stream<Item=Result<Bytes, crate::error::CommonError>> + Unpin;
216
217    fn content(&mut self) -> Option<&mut Self::Content> {
218        match self.async_content.as_mut() {
219            None => None,
220            Some(x) => Some(x as &mut (dyn Stream<Item=Result<Bytes, crate::error::CommonError>> + Unpin)),
221        }
222    }
223
224    async fn read_all(&mut self) -> Result<Vec<u8>, TosError> {
225        let x = self.content_length();
226        if x == 0 {
227            return Ok(vec![]);
228        }
229
230        let mut buf;
231        if x > 0 {
232            buf = Vec::with_capacity(x as usize);
233        } else {
234            buf = Vec::new();
235        }
236
237        match self.async_content.as_mut() {
238            None => Err(TosError::client_error("empty content")),
239            Some(r) => {
240                loop {
241                    match r.next().await {
242                        None => return Ok(buf),
243                        Some(result) => {
244                            match result {
245                                Err(e) => {
246                                    return Err(TosError::client_error_with_cause("read error", GenericError::IoError(e.to_string())));
247                                }
248                                Ok(x) => {
249                                    buf.extend_from_slice(x.as_ref());
250                                }
251                            }
252                        }
253                    }
254                }
255            }
256        }
257    }
258}
259
260
261#[async_trait]
262impl OutputParser for PutObjectOutput {
263    async fn parse<B>(request: HttpRequest<'_, B>, response: HttpResponse, request_info: RequestInfo, _: Meta) -> Result<Self, TosError>
264    where
265        B: Send,
266    {
267        let hash_crc64ecma = get_header_value_from_str::<u64>(response.headers(), HEADER_HASH_CRC64ECMA, 0)?;
268        if let Some(ref rc) = request.request_context {
269            if let Some(calc_hash_crc64ecma) = rc.crc64 {
270                if calc_hash_crc64ecma != hash_crc64ecma {
271                    return Err(TosError::client_error(format!("expect crc64 {hash_crc64ecma}, actual crc64 {calc_hash_crc64ecma}")));
272                }
273            }
274        }
275
276        let mut result = Self::default();
277        result.etag = get_header_value(response.headers(), HEADER_ETAG);
278        result.version_id = get_header_value(response.headers(), HEADER_VERSION_ID);
279        result.ssec_algorithm = get_header_value(response.headers(), HEADER_SSEC_ALGORITHM);
280        result.ssec_key_md5 = get_header_value(response.headers(), HEADER_SSEC_KEY_MD5);
281        result.hash_crc64ecma = hash_crc64ecma;
282        result.server_side_encryption = get_header_value(response.headers(), HEADER_SERVER_SIDE_ENCRYPTION);
283        result.server_side_encryption_key_id = get_header_value(response.headers(), HEADER_SERVER_SIDE_ENCRYPTION_KMS_KEY_ID);
284        if get_map_value_str(&request.header, HEADER_CALLBACK) != "" { // callback
285            let buf = read_response(response).await?;
286            if request_info.status_code == 203 {
287                if let Ok(error_response) = parse_json_by_buf::<ErrorResponse>(buf.as_slice()) {
288                    return Err(TosError::server_error_with_code(error_response.code, error_response.ec, error_response.key, error_response.message,
289                                                                error_response.host_id, error_response.resource, request_info));
290                }
291            }
292            result.callback_result = parse_response_string_by_buf(buf)?;
293        }
294        result.request_info = request_info;
295        Ok(result)
296    }
297}
298
299#[async_trait]
300impl OutputParser for CopyObjectOutput {
301    async fn parse<B>(_: HttpRequest<'_, B>, response: HttpResponse, request_info: RequestInfo, _: Meta) -> Result<Self, TosError>
302    where
303        B: Send,
304    {
305        let mut result = Self {
306            request_info: RequestInfo::default(),
307            etag: "".to_owned(),
308            last_modified: None,
309            copy_source_version_id: get_header_value(response.headers(), HEADER_COPY_SOURCE_VERSION_ID),
310            version_id: get_header_value(response.headers(), HEADER_VERSION_ID),
311            ssec_algorithm: get_header_value(response.headers(), HEADER_SSEC_ALGORITHM),
312            ssec_key_md5: get_header_value(response.headers(), HEADER_SSEC_KEY_MD5),
313            server_side_encryption: get_header_value(response.headers(), HEADER_SERVER_SIDE_ENCRYPTION),
314            server_side_encryption_key_id: get_header_value(response.headers(), HEADER_SERVER_SIDE_ENCRYPTION_KMS_KEY_ID),
315        };
316
317        let temp_result = parse_json::<TempCopyResult>(response).await?;
318        if temp_result.etag == "" {
319            return Err(TosError::server_error_with_code(temp_result.code, temp_result.ec, temp_result.key, temp_result.message,
320                                                        temp_result.host_id, temp_result.resource, request_info));
321        }
322
323        result.etag = temp_result.etag;
324        result.last_modified = parse_date_time_iso8601(&temp_result.last_modified)?;
325        result.request_info = request_info;
326        Ok(result)
327    }
328}
329
330#[async_trait]
331impl OutputParser for DeleteMultiObjectsOutput {
332    async fn parse<B>(_: HttpRequest<'_, B>, response: HttpResponse, request_info: RequestInfo, _: Meta) -> Result<Self, TosError>
333    where
334        B: Send,
335    {
336        let mut result = parse_json::<Self>(response).await?;
337        result.request_info = request_info;
338        Ok(result)
339    }
340}
341#[async_trait]
342impl OutputParser for GetObjectACLOutput {
343    async fn parse<B>(_: HttpRequest<'_, B>, response: HttpResponse, request_info: RequestInfo, _: Meta) -> Result<Self, TosError>
344    where
345        B: Send,
346    {
347        let version_id = get_header_value(response.headers(), HEADER_VERSION_ID);
348        let mut result = parse_json::<Self>(response).await?;
349        result.version_id = version_id;
350        result.request_info = request_info;
351        Ok(result)
352    }
353}
354
355#[async_trait]
356impl OutputParser for ListObjectVersionsOutput {
357    async fn parse<B>(_: HttpRequest<'_, B>, response: HttpResponse, request_info: RequestInfo, _: Meta) -> Result<Self, TosError>
358    where
359        B: Send,
360    {
361        let mut result = parse_json::<Self>(response).await?;
362        for version in &mut result.versions {
363            if let Some(x) = version.last_modified_string.take() {
364                version.last_modified = parse_date_time_iso8601(&x)?;
365            }
366
367            if let Some(x) = version.user_meta.take() {
368                let mut meta = HashMap::with_capacity(x.len());
369                for item in x {
370                    if let Ok(dk) = urlencoding::decode(&item.key[HEADER_PREFIX_META.len()..]) {
371                        if let Ok(dv) = urlencoding::decode(item.value.as_str()) {
372                            meta.insert(dk.to_string(), dv.to_string());
373                        }
374                    }
375                }
376                version.meta = meta;
377            }
378        }
379
380        for delete_marker in &mut result.delete_markers {
381            if let Some(x) = delete_marker.last_modified_string.take() {
382                delete_marker.last_modified = parse_date_time_iso8601(&x)?;
383            }
384        }
385
386        result.request_info = request_info;
387        Ok(result)
388    }
389}
390
391#[async_trait]
392impl OutputParser for PutObjectACLOutput {
393    async fn parse<B>(_: HttpRequest<'_, B>, _: HttpResponse, request_info: RequestInfo, _: Meta) -> Result<Self, TosError>
394    where
395        B: Send,
396    {
397        Ok(Self { request_info })
398    }
399}
400
401#[async_trait]
402impl OutputParser for SetObjectMetaOutput {
403    async fn parse<B>(_: HttpRequest<'_, B>, _: HttpResponse, request_info: RequestInfo, _: Meta) -> Result<Self, TosError>
404    where
405        B: Send,
406    {
407        Ok(Self { request_info })
408    }
409}
410
411#[async_trait]
412impl OutputParser for AppendObjectOutput {
413    async fn parse<B>(request: HttpRequest<'_, B>, response: HttpResponse, request_info: RequestInfo, _: Meta) -> Result<Self, TosError>
414    where
415        B: Send,
416    {
417        let hash_crc64ecma = get_header_value_from_str::<u64>(response.headers(), HEADER_HASH_CRC64ECMA, 0)?;
418        if let Some(ref rc) = request.request_context {
419            if let Some(calc_hash_crc64ecma) = rc.crc64 {
420                if calc_hash_crc64ecma != hash_crc64ecma {
421                    return Err(TosError::client_error(format!("expect crc64 {hash_crc64ecma}, actual crc64 {calc_hash_crc64ecma}")));
422                }
423            }
424        }
425        let mut result = Self::default();
426        result.next_append_offset = get_header_value_from_str::<i64>(response.headers(), HEADER_NEXT_APPEND_OFFSET, 0)?;
427        result.hash_crc64ecma = hash_crc64ecma;
428        result.request_info = request_info;
429        Ok(result)
430    }
431}
432
433#[async_trait]
434impl OutputParser for ModifyObjectOutput {
435    async fn parse<B>(request: HttpRequest<'_, B>, response: HttpResponse, request_info: RequestInfo, _: Meta) -> Result<Self, TosError>
436    where
437        B: Send,
438    {
439        let hash_crc64ecma = get_header_value_from_str::<u64>(response.headers(), HEADER_HASH_CRC64ECMA, 0)?;
440        if let Some(ref rc) = request.request_context {
441            if let Some(calc_hash_crc64ecma) = rc.crc64 {
442                if calc_hash_crc64ecma != hash_crc64ecma {
443                    return Err(TosError::client_error(format!("expect crc64 {hash_crc64ecma}, actual crc64 {calc_hash_crc64ecma}")));
444                }
445            }
446        }
447        let mut result = Self::default();
448        result.next_modify_offset = get_header_value_from_str::<i64>(response.headers(), HEADER_NEXT_MODIFY_OFFSET, 0)?;
449        result.hash_crc64ecma = hash_crc64ecma;
450        result.request_info = request_info;
451        Ok(result)
452    }
453}
454
455#[async_trait]
456impl OutputParser for PutObjectTaggingOutput {
457    async fn parse<B>(_: HttpRequest<'_, B>, response: HttpResponse, request_info: RequestInfo, _: Meta) -> Result<Self, TosError>
458    where
459        B: Send,
460    {
461        let mut result = Self::default();
462        result.request_info = request_info;
463        result.version_id = get_header_value(response.headers(), HEADER_VERSION_ID);
464        Ok(result)
465    }
466}
467
468#[async_trait]
469impl OutputParser for GetObjectTaggingOutput {
470    async fn parse<B>(_: HttpRequest<'_, B>, response: HttpResponse, request_info: RequestInfo, _: Meta) -> Result<Self, TosError>
471    where
472        B: Send,
473    {
474        let version_id = get_header_value(response.headers(), HEADER_VERSION_ID);
475        let mut result = parse_json::<Self>(response).await?;
476        result.request_info = request_info;
477        result.version_id = version_id;
478        Ok(result)
479    }
480}
481
482#[async_trait]
483impl OutputParser for DeleteObjectTaggingOutput {
484    async fn parse<B>(_: HttpRequest<'_, B>, response: HttpResponse, request_info: RequestInfo, _: Meta) -> Result<Self, TosError>
485    where
486        B: Send,
487    {
488        let mut result = Self::default();
489        result.request_info = request_info;
490        result.version_id = get_header_value(response.headers(), HEADER_VERSION_ID);
491        Ok(result)
492    }
493}
494
495#[async_trait]
496impl OutputParser for FetchObjectOutput {
497    async fn parse<B>(_: HttpRequest<'_, B>, response: HttpResponse, request_info: RequestInfo, _: Meta) -> Result<Self, TosError>
498    where
499        B: Send,
500    {
501        let mut result = Self {
502            request_info: RequestInfo::default(),
503            etag: "".to_owned(),
504            version_id: get_header_value(response.headers(), HEADER_VERSION_ID),
505            ssec_algorithm: get_header_value(response.headers(), HEADER_SSEC_ALGORITHM),
506            ssec_key_md5: get_header_value(response.headers(), HEADER_SSEC_KEY_MD5),
507            server_side_encryption: get_header_value(response.headers(), HEADER_SERVER_SIDE_ENCRYPTION),
508            server_side_encryption_key_id: get_header_value(response.headers(), HEADER_SERVER_SIDE_ENCRYPTION_KMS_KEY_ID),
509            source_content_type: "".to_string(),
510            source_content_length: 0,
511            md5: "".to_string(),
512        };
513
514        let temp_result = parse_json::<TempFetchResult>(response).await?;
515        if temp_result.etag == "" {
516            return Err(TosError::server_error_with_code(temp_result.code, temp_result.ec, temp_result.key, temp_result.message,
517                                                        temp_result.host_id, temp_result.resource, request_info));
518        }
519        result.etag = temp_result.etag;
520        result.source_content_type = temp_result.source_content_type;
521        result.source_content_length = temp_result.source_content_length;
522        result.md5 = temp_result.md5;
523        result.request_info = request_info;
524        Ok(result)
525    }
526}
527
528
529#[async_trait]
530impl OutputParser for RenameObjectOutput {
531    async fn parse<B>(_: HttpRequest<'_, B>, _: HttpResponse, request_info: RequestInfo, _: Meta) -> Result<Self, TosError>
532    where
533        B: Send,
534    {
535        Ok(Self { request_info })
536    }
537}
538
539#[async_trait]
540impl OutputParser for RestoreObjectOutput {
541    async fn parse<B>(_: HttpRequest<'_, B>, _: HttpResponse, request_info: RequestInfo, _: Meta) -> Result<Self, TosError>
542    where
543        B: Send,
544    {
545        Ok(Self { request_info })
546    }
547}
548
549#[async_trait]
550impl OutputParser for PutFetchTaskOutput {
551    async fn parse<B>(_: HttpRequest<'_, B>, response: HttpResponse, request_info: RequestInfo, _: Meta) -> Result<Self, TosError>
552    where
553        B: Send,
554    {
555        let mut result = parse_json::<Self>(response).await?;
556        result.request_info = request_info;
557        Ok(result)
558    }
559}
560
561#[async_trait]
562impl OutputParser for GetFetchTaskOutput {
563    async fn parse<B>(_: HttpRequest<'_, B>, response: HttpResponse, request_info: RequestInfo, _: Meta) -> Result<Self, TosError>
564    where
565        B: Send,
566    {
567        let mut result = parse_json::<Self>(response).await?;
568        if let Some(fetch_task) = &mut result.task {
569            if fetch_task.user_meta.len() > 0 {
570                fetch_task.meta = HashMap::with_capacity(fetch_task.user_meta.len());
571                for item in fetch_task.user_meta.iter() {
572                    fetch_task.meta.insert(item.key.clone(), item.value.clone());
573                }
574            }
575        }
576        result.request_info = request_info;
577        Ok(result)
578    }
579}
580
581#[async_trait]
582impl OutputParser for PutSymlinkOutput {
583    async fn parse<B>(_: HttpRequest<'_, B>, response: HttpResponse, request_info: RequestInfo, _: Meta) -> Result<Self, TosError>
584    where
585        B: Send,
586    {
587        let version_id = get_header_value(response.headers(), HEADER_VERSION_ID);
588        Ok(Self {
589            request_info,
590            version_id,
591        })
592    }
593}
594
595#[async_trait]
596impl OutputParser for GetSymlinkOutput {
597    async fn parse<B>(_: HttpRequest<'_, B>, response: HttpResponse, request_info: RequestInfo, _: Meta) -> Result<Self, TosError>
598    where
599        B: Send,
600    {
601        let symlink_target_key = get_header_value(response.headers(), HEADER_SYMLINK_TARGET);
602        let symlink_target_bucket = get_header_value(response.headers(), HEADER_SYMLINK_BUCKET);
603        let version_id = get_header_value(response.headers(), HEADER_VERSION_ID);
604        let etag = get_header_value(response.headers(), HEADER_ETAG);
605        let last_modified = parse_date_time_rfc1123(&get_header_value(response.headers(), HEADER_LAST_MODIFIED))?;
606        Ok(Self {
607            request_info,
608            version_id,
609            symlink_target_key,
610            symlink_target_bucket,
611            etag,
612            last_modified,
613        })
614    }
615}
616
617#[async_trait]
618impl OutputParser for GetFileStatusOutput {
619    async fn parse<B>(_: HttpRequest<'_, B>, response: HttpResponse, request_info: RequestInfo, _: Meta) -> Result<Self, TosError>
620    where
621        B: Send,
622    {
623        let mut result = parse_json::<Self>(response).await?;
624        if let Some(x) = result.last_modified_string.take() {
625            result.last_modified = parse_date_time_iso8601(&x)?;
626        }
627        result.request_info = request_info;
628        Ok(result)
629    }
630}
631
632#[async_trait]
633impl OutputParser for SetObjectTimeOutput {
634    async fn parse<B>(_: HttpRequest<'_, B>, _: HttpResponse, request_info: RequestInfo, _: Meta) -> Result<Self, TosError>
635    where
636        B: Send,
637    {
638        Ok(Self {
639            request_info,
640        })
641    }
642}
643
644impl<B> DataTransferListener for PutObjectInput<B> {
645    fn async_data_transfer_listener(&self) -> &Option<async_channel::Sender<DataTransferStatus>> {
646        &self.inner.async_data_transfer_listener
647    }
648
649    fn set_async_data_transfer_listener(&mut self, listener: impl Into<async_channel::Sender<DataTransferStatus>>) {
650        self.inner.async_data_transfer_listener = Some(listener.into());
651    }
652}
653
654impl DataTransferListener for PutObjectFromBufferInput {
655    fn async_data_transfer_listener(&self) -> &Option<async_channel::Sender<DataTransferStatus>> {
656        &self.inner.async_data_transfer_listener
657    }
658
659    fn set_async_data_transfer_listener(&mut self, listener: impl Into<async_channel::Sender<DataTransferStatus>>) {
660        self.inner.async_data_transfer_listener = Some(listener.into());
661    }
662}
663impl<B> DataTransferListener for AppendObjectInput<B> {
664    fn async_data_transfer_listener(&self) -> &Option<async_channel::Sender<DataTransferStatus>> {
665        &self.inner.async_data_transfer_listener
666    }
667
668    fn set_async_data_transfer_listener(&mut self, listener: impl Into<async_channel::Sender<DataTransferStatus>>) {
669        self.inner.async_data_transfer_listener = Some(listener.into());
670    }
671}
672
673impl DataTransferListener for AppendObjectFromBufferInput {
674    fn async_data_transfer_listener(&self) -> &Option<async_channel::Sender<DataTransferStatus>> {
675        &self.inner.async_data_transfer_listener
676    }
677
678    fn set_async_data_transfer_listener(&mut self, listener: impl Into<async_channel::Sender<DataTransferStatus>>) {
679        self.inner.async_data_transfer_listener = Some(listener.into());
680    }
681}
682
683impl DataTransferListener for GetObjectInput {
684    fn async_data_transfer_listener(&self) -> &Option<async_channel::Sender<DataTransferStatus>> {
685        &self.async_data_transfer_listener
686    }
687
688    fn set_async_data_transfer_listener(&mut self, listener: impl Into<async_channel::Sender<DataTransferStatus>>) {
689        self.async_data_transfer_listener = Some(listener.into());
690    }
691}
692
693impl<B> DataTransferListener for ModifyObjectInput<B> {
694    fn async_data_transfer_listener(&self) -> &Option<async_channel::Sender<DataTransferStatus>> {
695        &self.async_data_transfer_listener
696    }
697
698    fn set_async_data_transfer_listener(&mut self, listener: impl Into<async_channel::Sender<DataTransferStatus>>) {
699        self.async_data_transfer_listener = Some(listener.into());
700    }
701}
702
703
704impl DataTransferListener for ModifyObjectFromBufferInput {
705    fn async_data_transfer_listener(&self) -> &Option<async_channel::Sender<DataTransferStatus>> {
706        &self.async_data_transfer_listener
707    }
708
709    fn set_async_data_transfer_listener(&mut self, listener: impl Into<async_channel::Sender<DataTransferStatus>>) {
710        self.async_data_transfer_listener = Some(listener.into());
711    }
712}