1use crate::constant::{HEADER_COPY_SOURCE_VERSION_ID, HEADER_LAST_MODIFIED, HEADER_NEXT_APPEND_OFFSET, HEADER_NEXT_MODIFY_OFFSET, HEADER_RANGE, HEADER_SYMLINK_BUCKET, HEADER_SYMLINK_TARGET, HEADER_TRANSFER_ENCODING_LOWER, QUERY_PROCESS};
17use std::collections::HashMap;
18use std::pin::Pin;
19use std::sync::atomic::AtomicU64;
20use std::sync::Arc;
21use std::task::{Context, Poll};
22
23use crate::asynchronous::common::DataTransferListener;
24use crate::asynchronous::http::HttpResponse;
25use crate::asynchronous::internal::{parse_json, read_response, OutputParser};
26use crate::asynchronous::reader::StreamAdapter;
27use crate::common::{DataTransferStatus, Meta, RequestInfo, TempCopyResult, TempFetchResult};
28use crate::constant::{HEADER_CALLBACK, HEADER_CONTENT_RANGE, HEADER_DELETE_MARKER, HEADER_ETAG, HEADER_HASH_CRC64ECMA, HEADER_PREFIX_META, HEADER_SERVER_SIDE_ENCRYPTION, HEADER_SERVER_SIDE_ENCRYPTION_KMS_KEY_ID, HEADER_SSEC_ALGORITHM, HEADER_SSEC_KEY_MD5, HEADER_VERSION_ID, TRUE};
29use crate::error::{ErrorResponse, GenericError, TosError};
30use crate::http::HttpRequest;
31use crate::internal::{get_header_value, get_header_value_from_str, get_header_value_str, get_map_value_str, parse_date_time_iso8601, parse_date_time_rfc1123, parse_json_by_buf, parse_response_string_by_buf};
32use crate::object::{AppendObjectFromBufferInput, AppendObjectInput, AppendObjectOutput, CopyObjectInput, CopyObjectOutput, DeleteMultiObjectsInput, DeleteMultiObjectsOutput, DeleteObjectInput, DeleteObjectOutput, DeleteObjectTaggingInput, DeleteObjectTaggingOutput, DoesObjectExistInput, FetchObjectInput, FetchObjectOutput, GetFetchTaskInput, GetFetchTaskOutput, GetFileStatusInput, GetFileStatusOutput, GetObjectACLInput, GetObjectACLOutput, GetObjectInput, GetObjectOutput, GetObjectTaggingInput, GetObjectTaggingOutput, GetSymlinkInput, GetSymlinkOutput, HeadObjectInput, HeadObjectOutput, ListObjectVersionsInput, ListObjectVersionsOutput, ListObjectsType2Input, ListObjectsType2Output, ModifyObjectFromBufferInput, ModifyObjectInput, ModifyObjectOutput, PutFetchTaskInput, PutFetchTaskOutput, PutObjectACLInput, PutObjectACLOutput, PutObjectFromBufferInput, PutObjectInput, PutObjectOutput, PutObjectTaggingInput, PutObjectTaggingOutput, PutSymlinkInput, PutSymlinkOutput, RenameObjectInput, RenameObjectOutput, RestoreObjectInput, RestoreObjectOutput, SetObjectMetaInput, SetObjectMetaOutput, SetObjectTimeInput, SetObjectTimeOutput};
33use crate::reader::MultifunctionalReader;
34use async_trait::async_trait;
35use bytes::Bytes;
36use futures_core::Stream;
37use futures_util::StreamExt;
38
39#[async_trait]
40pub trait ObjectAPI {
41 async fn copy_object(&self, input: &CopyObjectInput) -> Result<CopyObjectOutput, TosError>;
42 async fn delete_object(&self, input: &DeleteObjectInput) -> Result<DeleteObjectOutput, TosError>;
43 async fn delete_multi_objects(&self, input: &DeleteMultiObjectsInput) -> Result<DeleteMultiObjectsOutput, TosError>;
44 async fn put_object<B>(&self, input: &PutObjectInput<B>) -> Result<PutObjectOutput, TosError>
45 where
46 B: Stream<Item=Result<Bytes, crate::error::CommonError>> + Send + Sync + Unpin + 'static;
47 async fn put_object_from_buffer(&self, input: &PutObjectFromBufferInput) -> Result<PutObjectOutput, TosError>;
48 #[cfg(feature = "tokio-runtime")]
49 async fn put_object_from_file(&self, input: &crate::object::PutObjectFromFileInput) -> Result<PutObjectOutput, TosError>;
50 async fn append_object<B>(&self, input: &AppendObjectInput<B>) -> Result<AppendObjectOutput, TosError>
51 where
52 B: Stream<Item=Result<Bytes, crate::error::CommonError>> + Send + Sync + Unpin + 'static;
53 async fn append_object_from_buffer(&self, input: &AppendObjectFromBufferInput) -> Result<AppendObjectOutput, TosError>;
54 #[cfg(feature = "tokio-runtime")]
55 async fn append_object_from_file(&self, input: &crate::object::AppendObjectFromFileInput) -> Result<AppendObjectOutput, TosError>;
56 async fn get_object(&self, input: &GetObjectInput) -> Result<GetObjectOutput, TosError>;
57 #[cfg(feature = "tokio-runtime")]
58 async fn get_object_to_file(&self, input: &crate::object::GetObjectToFileInput) -> Result<crate::object::GetObjectToFileOutput, TosError>;
59 async fn get_object_acl(&self, input: &GetObjectACLInput) -> Result<GetObjectACLOutput, TosError>;
60 async fn head_object(&self, input: &HeadObjectInput) -> Result<HeadObjectOutput, TosError>;
61 async fn list_objects_type2(&self, input: &ListObjectsType2Input) -> Result<ListObjectsType2Output, TosError>;
62 async fn list_object_versions(&self, input: &ListObjectVersionsInput) -> Result<ListObjectVersionsOutput, TosError>;
63 async fn put_object_acl(&self, input: &PutObjectACLInput) -> Result<PutObjectACLOutput, TosError>;
64 async fn set_object_meta(&self, input: &SetObjectMetaInput) -> Result<SetObjectMetaOutput, TosError>;
65 async fn fetch_object(&self, input: &FetchObjectInput) -> Result<FetchObjectOutput, TosError>;
66 async fn put_fetch_task(&self, input: &PutFetchTaskInput) -> Result<PutFetchTaskOutput, TosError>;
67 async fn get_fetch_task(&self, input: &GetFetchTaskInput) -> Result<GetFetchTaskOutput, TosError>;
68 async fn put_object_tagging(&self, input: &PutObjectTaggingInput) -> Result<PutObjectTaggingOutput, TosError>;
69 async fn get_object_tagging(&self, input: &GetObjectTaggingInput) -> Result<GetObjectTaggingOutput, TosError>;
70 async fn delete_object_tagging(&self, input: &DeleteObjectTaggingInput) -> Result<DeleteObjectTaggingOutput, TosError>;
71 async fn rename_object(&self, input: &RenameObjectInput) -> Result<RenameObjectOutput, TosError>;
72 async fn restore_object(&self, input: &RestoreObjectInput) -> Result<RestoreObjectOutput, TosError>;
73 async fn put_symlink(&self, input: &PutSymlinkInput) -> Result<PutSymlinkOutput, TosError>;
74 async fn get_symlink(&self, input: &GetSymlinkInput) -> Result<GetSymlinkOutput, TosError>;
75 async fn get_file_status(&self, input: &GetFileStatusInput) -> Result<GetFileStatusOutput, TosError>;
76 async fn does_object_exist(&self, input: &DoesObjectExistInput) -> Result<bool, TosError>;
77 async fn set_object_time(&self, input: &SetObjectTimeInput) -> Result<SetObjectTimeOutput, TosError>;
78}
79
80#[async_trait]
81pub trait ObjectContent {
82 type Content: ?Sized;
83
84 fn content(&mut self) -> Option<&mut Self::Content>;
85
86 async fn read_all(&mut self) -> Result<Vec<u8>, TosError>;
87}
88
89#[async_trait]
90impl OutputParser for DeleteObjectOutput {
91 async fn parse<B>(_: HttpRequest<'_, B>, response: HttpResponse, request_info: RequestInfo, _: Meta) -> Result<Self, TosError>
92 where
93 B: Send,
94 {
95 let version_id = get_header_value(response.headers(), HEADER_VERSION_ID);
96 let delete_marker = get_header_value_str(response.headers(), HEADER_DELETE_MARKER) == TRUE;
97 Ok(Self {
98 request_info,
99 delete_marker,
100 version_id,
101 })
102 }
103}
104
105#[async_trait]
106impl OutputParser for HeadObjectOutput {
107 async fn parse<B>(_: HttpRequest<'_, B>, response: HttpResponse, request_info: RequestInfo, meta: Meta) -> Result<Self, TosError>
108 where
109 B: Send,
110 {
111 Self::parse_by_header(response.headers(), request_info, meta)
112 }
113}
114
115#[async_trait]
116impl OutputParser for ListObjectsType2Output {
117 async fn parse<B>(_: HttpRequest<'_, B>, response: HttpResponse, request_info: RequestInfo, _: Meta) -> Result<Self, TosError>
118 where
119 B: Send,
120 {
121 let mut result = parse_json::<Self>(response).await?;
122 for content in &mut result.contents {
123 if let Some(x) = content.last_modified_string.take() {
124 content.last_modified = parse_date_time_iso8601(&x)?;
125 }
126
127 if let Some(x) = content.user_meta.take() {
128 let mut meta = HashMap::with_capacity(x.len());
129 for item in x {
130 if let Ok(dk) = urlencoding::decode(&item.key[HEADER_PREFIX_META.len()..]) {
131 if let Ok(dv) = urlencoding::decode(item.value.as_str()) {
132 meta.insert(dk.to_string(), dv.to_string());
133 }
134 }
135 }
136 content.meta = meta;
137 }
138 }
139
140 for common_prefix in &mut result.common_prefixes {
141 if let Some(x) = common_prefix.last_modified_string.take() {
142 common_prefix.last_modified = parse_date_time_iso8601(&x)?;
143 }
144 }
145 result.request_info = request_info;
146 Ok(result)
147 }
148}
149
150
151#[async_trait]
152impl OutputParser for GetObjectOutput {
153 async fn parse<B>(request: HttpRequest<'_, B>, response: HttpResponse, request_info: RequestInfo, meta: Meta) -> Result<Self, TosError>
154 where
155 B: Send,
156 {
157 let transfer_encoding = request_info.header.get(HEADER_TRANSFER_ENCODING_LOWER).map(|x| x.to_string());
158 let head_object_output = HeadObjectOutput::parse_by_header(response.headers(), request_info, meta)?;
159 let content_range = get_header_value(response.headers(), HEADER_CONTENT_RANGE);
160 let content = Box::new(StreamAdapter::new(response.bytes_stream())) as Box<dyn Stream<Item=Result<Bytes, crate::error::CommonError>> + Send + Unpin>;
161 let mut target_crc64 = None;
162 if request.enable_crc && !request.header.contains_key(HEADER_RANGE) &&
163 (request.query.is_none() || !request.query.as_ref().unwrap().contains_key(QUERY_PROCESS)) {
164 if let Some(te) = transfer_encoding {
165 if te != "chunked" {
166 target_crc64 = Some(head_object_output.hash_crc64ecma);
167 }
168 } else {
169 target_crc64 = Some(head_object_output.hash_crc64ecma);
170 }
171 }
172 let mut crc64 = None;
173 if target_crc64.is_some() {
174 crc64 = Some(Arc::new(AtomicU64::new(0)));
175 }
176 let mut reader = MultifunctionalReader::with_target_crc64(content, crc64, head_object_output.content_length,
177 &request, target_crc64);
178
179 if let Some(ref rc) = request.request_context {
180 if let Some(ref rl) = rc.rate_limiter {
181 reader.set_rate_limiter(rl.clone());
182 }
183 if let Some(ref adts) = rc.async_data_transfer_listener {
184 reader.set_async_data_transfer_listener(adts.clone());
185 reader.inner.operation = request.operation.to_string();
186 reader.inner.bucket = request.bucket.to_string();
187 reader.inner.key = request.key.to_string();
188 reader.inner.retry_count = request.retry_count;
189 }
190 }
191
192 Ok(Self {
193 content_range,
194 content: None,
195 async_content: Some(reader),
196 head_object_output,
197 })
198 }
199}
200
201impl Stream for GetObjectOutput {
202 type Item = Result<Bytes, crate::error::CommonError>;
203
204 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
205 match self.async_content.as_mut() {
206 None => Poll::Ready(None),
207 Some(reader) => {
208 reader.poll_next_unpin(cx)
209 }
210 }
211 }
212}
213#[async_trait]
214impl ObjectContent for GetObjectOutput {
215 type Content = dyn Stream<Item=Result<Bytes, crate::error::CommonError>> + Unpin;
216
217 fn content(&mut self) -> Option<&mut Self::Content> {
218 match self.async_content.as_mut() {
219 None => None,
220 Some(x) => Some(x as &mut (dyn Stream<Item=Result<Bytes, crate::error::CommonError>> + Unpin)),
221 }
222 }
223
224 async fn read_all(&mut self) -> Result<Vec<u8>, TosError> {
225 let x = self.content_length();
226 if x == 0 {
227 return Ok(vec![]);
228 }
229
230 let mut buf;
231 if x > 0 {
232 buf = Vec::with_capacity(x as usize);
233 } else {
234 buf = Vec::new();
235 }
236
237 match self.async_content.as_mut() {
238 None => Err(TosError::client_error("empty content")),
239 Some(r) => {
240 loop {
241 match r.next().await {
242 None => return Ok(buf),
243 Some(result) => {
244 match result {
245 Err(e) => {
246 return Err(TosError::client_error_with_cause("read error", GenericError::IoError(e.to_string())));
247 }
248 Ok(x) => {
249 buf.extend_from_slice(x.as_ref());
250 }
251 }
252 }
253 }
254 }
255 }
256 }
257 }
258}
259
260
261#[async_trait]
262impl OutputParser for PutObjectOutput {
263 async fn parse<B>(request: HttpRequest<'_, B>, response: HttpResponse, request_info: RequestInfo, _: Meta) -> Result<Self, TosError>
264 where
265 B: Send,
266 {
267 let hash_crc64ecma = get_header_value_from_str::<u64>(response.headers(), HEADER_HASH_CRC64ECMA, 0)?;
268 if let Some(ref rc) = request.request_context {
269 if let Some(calc_hash_crc64ecma) = rc.crc64 {
270 if calc_hash_crc64ecma != hash_crc64ecma {
271 return Err(TosError::client_error(format!("expect crc64 {hash_crc64ecma}, actual crc64 {calc_hash_crc64ecma}")));
272 }
273 }
274 }
275
276 let mut result = Self::default();
277 result.etag = get_header_value(response.headers(), HEADER_ETAG);
278 result.version_id = get_header_value(response.headers(), HEADER_VERSION_ID);
279 result.ssec_algorithm = get_header_value(response.headers(), HEADER_SSEC_ALGORITHM);
280 result.ssec_key_md5 = get_header_value(response.headers(), HEADER_SSEC_KEY_MD5);
281 result.hash_crc64ecma = hash_crc64ecma;
282 result.server_side_encryption = get_header_value(response.headers(), HEADER_SERVER_SIDE_ENCRYPTION);
283 result.server_side_encryption_key_id = get_header_value(response.headers(), HEADER_SERVER_SIDE_ENCRYPTION_KMS_KEY_ID);
284 if get_map_value_str(&request.header, HEADER_CALLBACK) != "" { let buf = read_response(response).await?;
286 if request_info.status_code == 203 {
287 if let Ok(error_response) = parse_json_by_buf::<ErrorResponse>(buf.as_slice()) {
288 return Err(TosError::server_error_with_code(error_response.code, error_response.ec, error_response.key, error_response.message,
289 error_response.host_id, error_response.resource, request_info));
290 }
291 }
292 result.callback_result = parse_response_string_by_buf(buf)?;
293 }
294 result.request_info = request_info;
295 Ok(result)
296 }
297}
298
299#[async_trait]
300impl OutputParser for CopyObjectOutput {
301 async fn parse<B>(_: HttpRequest<'_, B>, response: HttpResponse, request_info: RequestInfo, _: Meta) -> Result<Self, TosError>
302 where
303 B: Send,
304 {
305 let mut result = Self {
306 request_info: RequestInfo::default(),
307 etag: "".to_owned(),
308 last_modified: None,
309 copy_source_version_id: get_header_value(response.headers(), HEADER_COPY_SOURCE_VERSION_ID),
310 version_id: get_header_value(response.headers(), HEADER_VERSION_ID),
311 ssec_algorithm: get_header_value(response.headers(), HEADER_SSEC_ALGORITHM),
312 ssec_key_md5: get_header_value(response.headers(), HEADER_SSEC_KEY_MD5),
313 server_side_encryption: get_header_value(response.headers(), HEADER_SERVER_SIDE_ENCRYPTION),
314 server_side_encryption_key_id: get_header_value(response.headers(), HEADER_SERVER_SIDE_ENCRYPTION_KMS_KEY_ID),
315 };
316
317 let temp_result = parse_json::<TempCopyResult>(response).await?;
318 if temp_result.etag == "" {
319 return Err(TosError::server_error_with_code(temp_result.code, temp_result.ec, temp_result.key, temp_result.message,
320 temp_result.host_id, temp_result.resource, request_info));
321 }
322
323 result.etag = temp_result.etag;
324 result.last_modified = parse_date_time_iso8601(&temp_result.last_modified)?;
325 result.request_info = request_info;
326 Ok(result)
327 }
328}
329
330#[async_trait]
331impl OutputParser for DeleteMultiObjectsOutput {
332 async fn parse<B>(_: HttpRequest<'_, B>, response: HttpResponse, request_info: RequestInfo, _: Meta) -> Result<Self, TosError>
333 where
334 B: Send,
335 {
336 let mut result = parse_json::<Self>(response).await?;
337 result.request_info = request_info;
338 Ok(result)
339 }
340}
341#[async_trait]
342impl OutputParser for GetObjectACLOutput {
343 async fn parse<B>(_: HttpRequest<'_, B>, response: HttpResponse, request_info: RequestInfo, _: Meta) -> Result<Self, TosError>
344 where
345 B: Send,
346 {
347 let version_id = get_header_value(response.headers(), HEADER_VERSION_ID);
348 let mut result = parse_json::<Self>(response).await?;
349 result.version_id = version_id;
350 result.request_info = request_info;
351 Ok(result)
352 }
353}
354
355#[async_trait]
356impl OutputParser for ListObjectVersionsOutput {
357 async fn parse<B>(_: HttpRequest<'_, B>, response: HttpResponse, request_info: RequestInfo, _: Meta) -> Result<Self, TosError>
358 where
359 B: Send,
360 {
361 let mut result = parse_json::<Self>(response).await?;
362 for version in &mut result.versions {
363 if let Some(x) = version.last_modified_string.take() {
364 version.last_modified = parse_date_time_iso8601(&x)?;
365 }
366
367 if let Some(x) = version.user_meta.take() {
368 let mut meta = HashMap::with_capacity(x.len());
369 for item in x {
370 if let Ok(dk) = urlencoding::decode(&item.key[HEADER_PREFIX_META.len()..]) {
371 if let Ok(dv) = urlencoding::decode(item.value.as_str()) {
372 meta.insert(dk.to_string(), dv.to_string());
373 }
374 }
375 }
376 version.meta = meta;
377 }
378 }
379
380 for delete_marker in &mut result.delete_markers {
381 if let Some(x) = delete_marker.last_modified_string.take() {
382 delete_marker.last_modified = parse_date_time_iso8601(&x)?;
383 }
384 }
385
386 result.request_info = request_info;
387 Ok(result)
388 }
389}
390
391#[async_trait]
392impl OutputParser for PutObjectACLOutput {
393 async fn parse<B>(_: HttpRequest<'_, B>, _: HttpResponse, request_info: RequestInfo, _: Meta) -> Result<Self, TosError>
394 where
395 B: Send,
396 {
397 Ok(Self { request_info })
398 }
399}
400
401#[async_trait]
402impl OutputParser for SetObjectMetaOutput {
403 async fn parse<B>(_: HttpRequest<'_, B>, _: HttpResponse, request_info: RequestInfo, _: Meta) -> Result<Self, TosError>
404 where
405 B: Send,
406 {
407 Ok(Self { request_info })
408 }
409}
410
411#[async_trait]
412impl OutputParser for AppendObjectOutput {
413 async fn parse<B>(request: HttpRequest<'_, B>, response: HttpResponse, request_info: RequestInfo, _: Meta) -> Result<Self, TosError>
414 where
415 B: Send,
416 {
417 let hash_crc64ecma = get_header_value_from_str::<u64>(response.headers(), HEADER_HASH_CRC64ECMA, 0)?;
418 if let Some(ref rc) = request.request_context {
419 if let Some(calc_hash_crc64ecma) = rc.crc64 {
420 if calc_hash_crc64ecma != hash_crc64ecma {
421 return Err(TosError::client_error(format!("expect crc64 {hash_crc64ecma}, actual crc64 {calc_hash_crc64ecma}")));
422 }
423 }
424 }
425 let mut result = Self::default();
426 result.next_append_offset = get_header_value_from_str::<i64>(response.headers(), HEADER_NEXT_APPEND_OFFSET, 0)?;
427 result.hash_crc64ecma = hash_crc64ecma;
428 result.request_info = request_info;
429 Ok(result)
430 }
431}
432
433#[async_trait]
434impl OutputParser for ModifyObjectOutput {
435 async fn parse<B>(request: HttpRequest<'_, B>, response: HttpResponse, request_info: RequestInfo, _: Meta) -> Result<Self, TosError>
436 where
437 B: Send,
438 {
439 let hash_crc64ecma = get_header_value_from_str::<u64>(response.headers(), HEADER_HASH_CRC64ECMA, 0)?;
440 if let Some(ref rc) = request.request_context {
441 if let Some(calc_hash_crc64ecma) = rc.crc64 {
442 if calc_hash_crc64ecma != hash_crc64ecma {
443 return Err(TosError::client_error(format!("expect crc64 {hash_crc64ecma}, actual crc64 {calc_hash_crc64ecma}")));
444 }
445 }
446 }
447 let mut result = Self::default();
448 result.next_modify_offset = get_header_value_from_str::<i64>(response.headers(), HEADER_NEXT_MODIFY_OFFSET, 0)?;
449 result.hash_crc64ecma = hash_crc64ecma;
450 result.request_info = request_info;
451 Ok(result)
452 }
453}
454
455#[async_trait]
456impl OutputParser for PutObjectTaggingOutput {
457 async fn parse<B>(_: HttpRequest<'_, B>, response: HttpResponse, request_info: RequestInfo, _: Meta) -> Result<Self, TosError>
458 where
459 B: Send,
460 {
461 let mut result = Self::default();
462 result.request_info = request_info;
463 result.version_id = get_header_value(response.headers(), HEADER_VERSION_ID);
464 Ok(result)
465 }
466}
467
468#[async_trait]
469impl OutputParser for GetObjectTaggingOutput {
470 async fn parse<B>(_: HttpRequest<'_, B>, response: HttpResponse, request_info: RequestInfo, _: Meta) -> Result<Self, TosError>
471 where
472 B: Send,
473 {
474 let version_id = get_header_value(response.headers(), HEADER_VERSION_ID);
475 let mut result = parse_json::<Self>(response).await?;
476 result.request_info = request_info;
477 result.version_id = version_id;
478 Ok(result)
479 }
480}
481
482#[async_trait]
483impl OutputParser for DeleteObjectTaggingOutput {
484 async fn parse<B>(_: HttpRequest<'_, B>, response: HttpResponse, request_info: RequestInfo, _: Meta) -> Result<Self, TosError>
485 where
486 B: Send,
487 {
488 let mut result = Self::default();
489 result.request_info = request_info;
490 result.version_id = get_header_value(response.headers(), HEADER_VERSION_ID);
491 Ok(result)
492 }
493}
494
495#[async_trait]
496impl OutputParser for FetchObjectOutput {
497 async fn parse<B>(_: HttpRequest<'_, B>, response: HttpResponse, request_info: RequestInfo, _: Meta) -> Result<Self, TosError>
498 where
499 B: Send,
500 {
501 let mut result = Self {
502 request_info: RequestInfo::default(),
503 etag: "".to_owned(),
504 version_id: get_header_value(response.headers(), HEADER_VERSION_ID),
505 ssec_algorithm: get_header_value(response.headers(), HEADER_SSEC_ALGORITHM),
506 ssec_key_md5: get_header_value(response.headers(), HEADER_SSEC_KEY_MD5),
507 server_side_encryption: get_header_value(response.headers(), HEADER_SERVER_SIDE_ENCRYPTION),
508 server_side_encryption_key_id: get_header_value(response.headers(), HEADER_SERVER_SIDE_ENCRYPTION_KMS_KEY_ID),
509 source_content_type: "".to_string(),
510 source_content_length: 0,
511 md5: "".to_string(),
512 };
513
514 let temp_result = parse_json::<TempFetchResult>(response).await?;
515 if temp_result.etag == "" {
516 return Err(TosError::server_error_with_code(temp_result.code, temp_result.ec, temp_result.key, temp_result.message,
517 temp_result.host_id, temp_result.resource, request_info));
518 }
519 result.etag = temp_result.etag;
520 result.source_content_type = temp_result.source_content_type;
521 result.source_content_length = temp_result.source_content_length;
522 result.md5 = temp_result.md5;
523 result.request_info = request_info;
524 Ok(result)
525 }
526}
527
528
529#[async_trait]
530impl OutputParser for RenameObjectOutput {
531 async fn parse<B>(_: HttpRequest<'_, B>, _: HttpResponse, request_info: RequestInfo, _: Meta) -> Result<Self, TosError>
532 where
533 B: Send,
534 {
535 Ok(Self { request_info })
536 }
537}
538
539#[async_trait]
540impl OutputParser for RestoreObjectOutput {
541 async fn parse<B>(_: HttpRequest<'_, B>, _: HttpResponse, request_info: RequestInfo, _: Meta) -> Result<Self, TosError>
542 where
543 B: Send,
544 {
545 Ok(Self { request_info })
546 }
547}
548
549#[async_trait]
550impl OutputParser for PutFetchTaskOutput {
551 async fn parse<B>(_: HttpRequest<'_, B>, response: HttpResponse, request_info: RequestInfo, _: Meta) -> Result<Self, TosError>
552 where
553 B: Send,
554 {
555 let mut result = parse_json::<Self>(response).await?;
556 result.request_info = request_info;
557 Ok(result)
558 }
559}
560
561#[async_trait]
562impl OutputParser for GetFetchTaskOutput {
563 async fn parse<B>(_: HttpRequest<'_, B>, response: HttpResponse, request_info: RequestInfo, _: Meta) -> Result<Self, TosError>
564 where
565 B: Send,
566 {
567 let mut result = parse_json::<Self>(response).await?;
568 if let Some(fetch_task) = &mut result.task {
569 if fetch_task.user_meta.len() > 0 {
570 fetch_task.meta = HashMap::with_capacity(fetch_task.user_meta.len());
571 for item in fetch_task.user_meta.iter() {
572 fetch_task.meta.insert(item.key.clone(), item.value.clone());
573 }
574 }
575 }
576 result.request_info = request_info;
577 Ok(result)
578 }
579}
580
581#[async_trait]
582impl OutputParser for PutSymlinkOutput {
583 async fn parse<B>(_: HttpRequest<'_, B>, response: HttpResponse, request_info: RequestInfo, _: Meta) -> Result<Self, TosError>
584 where
585 B: Send,
586 {
587 let version_id = get_header_value(response.headers(), HEADER_VERSION_ID);
588 Ok(Self {
589 request_info,
590 version_id,
591 })
592 }
593}
594
595#[async_trait]
596impl OutputParser for GetSymlinkOutput {
597 async fn parse<B>(_: HttpRequest<'_, B>, response: HttpResponse, request_info: RequestInfo, _: Meta) -> Result<Self, TosError>
598 where
599 B: Send,
600 {
601 let symlink_target_key = get_header_value(response.headers(), HEADER_SYMLINK_TARGET);
602 let symlink_target_bucket = get_header_value(response.headers(), HEADER_SYMLINK_BUCKET);
603 let version_id = get_header_value(response.headers(), HEADER_VERSION_ID);
604 let etag = get_header_value(response.headers(), HEADER_ETAG);
605 let last_modified = parse_date_time_rfc1123(&get_header_value(response.headers(), HEADER_LAST_MODIFIED))?;
606 Ok(Self {
607 request_info,
608 version_id,
609 symlink_target_key,
610 symlink_target_bucket,
611 etag,
612 last_modified,
613 })
614 }
615}
616
617#[async_trait]
618impl OutputParser for GetFileStatusOutput {
619 async fn parse<B>(_: HttpRequest<'_, B>, response: HttpResponse, request_info: RequestInfo, _: Meta) -> Result<Self, TosError>
620 where
621 B: Send,
622 {
623 let mut result = parse_json::<Self>(response).await?;
624 if let Some(x) = result.last_modified_string.take() {
625 result.last_modified = parse_date_time_iso8601(&x)?;
626 }
627 result.request_info = request_info;
628 Ok(result)
629 }
630}
631
632#[async_trait]
633impl OutputParser for SetObjectTimeOutput {
634 async fn parse<B>(_: HttpRequest<'_, B>, _: HttpResponse, request_info: RequestInfo, _: Meta) -> Result<Self, TosError>
635 where
636 B: Send,
637 {
638 Ok(Self {
639 request_info,
640 })
641 }
642}
643
644impl<B> DataTransferListener for PutObjectInput<B> {
645 fn async_data_transfer_listener(&self) -> &Option<async_channel::Sender<DataTransferStatus>> {
646 &self.inner.async_data_transfer_listener
647 }
648
649 fn set_async_data_transfer_listener(&mut self, listener: impl Into<async_channel::Sender<DataTransferStatus>>) {
650 self.inner.async_data_transfer_listener = Some(listener.into());
651 }
652}
653
654impl DataTransferListener for PutObjectFromBufferInput {
655 fn async_data_transfer_listener(&self) -> &Option<async_channel::Sender<DataTransferStatus>> {
656 &self.inner.async_data_transfer_listener
657 }
658
659 fn set_async_data_transfer_listener(&mut self, listener: impl Into<async_channel::Sender<DataTransferStatus>>) {
660 self.inner.async_data_transfer_listener = Some(listener.into());
661 }
662}
663impl<B> DataTransferListener for AppendObjectInput<B> {
664 fn async_data_transfer_listener(&self) -> &Option<async_channel::Sender<DataTransferStatus>> {
665 &self.inner.async_data_transfer_listener
666 }
667
668 fn set_async_data_transfer_listener(&mut self, listener: impl Into<async_channel::Sender<DataTransferStatus>>) {
669 self.inner.async_data_transfer_listener = Some(listener.into());
670 }
671}
672
673impl DataTransferListener for AppendObjectFromBufferInput {
674 fn async_data_transfer_listener(&self) -> &Option<async_channel::Sender<DataTransferStatus>> {
675 &self.inner.async_data_transfer_listener
676 }
677
678 fn set_async_data_transfer_listener(&mut self, listener: impl Into<async_channel::Sender<DataTransferStatus>>) {
679 self.inner.async_data_transfer_listener = Some(listener.into());
680 }
681}
682
683impl DataTransferListener for GetObjectInput {
684 fn async_data_transfer_listener(&self) -> &Option<async_channel::Sender<DataTransferStatus>> {
685 &self.async_data_transfer_listener
686 }
687
688 fn set_async_data_transfer_listener(&mut self, listener: impl Into<async_channel::Sender<DataTransferStatus>>) {
689 self.async_data_transfer_listener = Some(listener.into());
690 }
691}
692
693impl<B> DataTransferListener for ModifyObjectInput<B> {
694 fn async_data_transfer_listener(&self) -> &Option<async_channel::Sender<DataTransferStatus>> {
695 &self.async_data_transfer_listener
696 }
697
698 fn set_async_data_transfer_listener(&mut self, listener: impl Into<async_channel::Sender<DataTransferStatus>>) {
699 self.async_data_transfer_listener = Some(listener.into());
700 }
701}
702
703
704impl DataTransferListener for ModifyObjectFromBufferInput {
705 fn async_data_transfer_listener(&self) -> &Option<async_channel::Sender<DataTransferStatus>> {
706 &self.async_data_transfer_listener
707 }
708
709 fn set_async_data_transfer_listener(&mut self, listener: impl Into<async_channel::Sender<DataTransferStatus>>) {
710 self.async_data_transfer_listener = Some(listener.into());
711 }
712}