aws_multipart_upload/client/
fs.rs1use async_tempfile::{Error as TmpFileError, TempFile};
2use aws_sdk_s3::primitives::ByteStream;
3use futures::{future, future::BoxFuture};
4use tokio::io::AsyncWriteExt as _;
5use tokio::sync::Mutex;
6
7use crate::{
8 types::{api::*, UploadClient},
9 AwsError,
10};
11
12#[derive(Debug)]
14pub struct AsyncTempFileClient {
15 file: Mutex<Option<TempFile>>,
16}
17
18impl Default for AsyncTempFileClient {
19 fn default() -> Self {
20 Self {
21 file: Mutex::new(None),
22 }
23 }
24}
25
26impl AsyncTempFileClient {
27 pub fn new() -> Self {
28 Self::default()
29 }
30
31 async fn set(&self) -> Result<String, AwsError> {
32 let inner = TempFile::new().await?;
33 let path = inner
34 .file_path()
35 .to_str()
36 .map(str::to_string)
37 .ok_or_else(|| AwsError::Missing("upload_id"))?;
38 let mut f = self.file.lock().await;
39 *f = Some(inner);
40 Ok(path)
41 }
42
43 async fn write(&self, buf: &[u8]) -> Result<(), AwsError> {
44 let mut inner = self.file.lock().await;
45 let f = inner
46 .as_deref_mut()
47 .ok_or_else(|| AwsError::Missing("no file set for `AsyncTempFileClient::write`"))?;
48 f.write_all(buf).await?;
49 f.flush().await?;
50 Ok(())
51 }
52}
53
54impl UploadClient for AsyncTempFileClient {
55 fn new_upload<'a, 'client: 'a>(
58 &'client self,
59 addr: &'a UploadAddress,
60 ) -> BoxFuture<'a, Result<UploadRequestParams, AwsError>> {
61 Box::pin(async move {
62 let upload_id: UploadId = self.set().await?.into();
63 Ok(UploadRequestParams::new(upload_id, addr.clone()))
64 })
65 }
66
67 fn upload_part<'a, 'client: 'a>(
68 &'client self,
69 params: &'a UploadRequestParams,
70 part_number: i32,
71 part: ByteStream,
72 ) -> BoxFuture<'a, Result<EntityTag, AwsError>> {
73 Box::pin(async move {
74 let filepath = params.upload_id();
75 let etag = EntityTag::new(format!("{filepath}_{part_number}"));
76 let bytevec = part.collect().await.map(|data| data.to_vec())?;
77 self.write(&bytevec).await?;
78 Ok(etag)
79 })
80 }
81
82 fn complete_upload<'a, 'client: 'a>(
83 &'client self,
84 params: &'a UploadRequestParams,
85 parts: &'a UploadedParts,
86 ) -> BoxFuture<'a, Result<EntityTag, AwsError>> {
87 let etag = EntityTag::from(format!("{}_{}", params.upload_id(), parts.last_completed()));
88 Box::pin(future::ready(Ok(etag)))
89 }
90}
91
92impl From<TmpFileError> for AwsError {
93 fn from(value: TmpFileError) -> Self {
94 match value {
95 TmpFileError::Io(e) => AwsError::from(e),
96 e => AwsError::DynStd(e.to_string().into()),
97 }
98 }
99}