aws_multipart_upload/client/
fs.rs

1use async_tempfile::{Error as TmpFileError, TempFile};
2use aws_sdk_s3::primitives::ByteStream;
3use futures::{future, future::BoxFuture};
4use tokio::io::AsyncWriteExt as _;
5use tokio::sync::Mutex;
6
7use crate::{
8    types::{api::*, UploadClient},
9    AwsError,
10};
11
12/// Another upload client for testing that writes to a temp file.
13#[derive(Debug)]
14pub struct AsyncTempFileClient {
15    file: Mutex<Option<TempFile>>,
16}
17
18impl Default for AsyncTempFileClient {
19    fn default() -> Self {
20        Self {
21            file: Mutex::new(None),
22        }
23    }
24}
25
26impl AsyncTempFileClient {
27    pub fn new() -> Self {
28        Self::default()
29    }
30
31    async fn set(&self) -> Result<String, AwsError> {
32        let inner = TempFile::new().await?;
33        let path = inner
34            .file_path()
35            .to_str()
36            .map(str::to_string)
37            .ok_or_else(|| AwsError::Missing("upload_id"))?;
38        let mut f = self.file.lock().await;
39        *f = Some(inner);
40        Ok(path)
41    }
42
43    async fn write(&self, buf: &[u8]) -> Result<(), AwsError> {
44        let mut inner = self.file.lock().await;
45        let f = inner
46            .as_deref_mut()
47            .ok_or_else(|| AwsError::Missing("no file set for `AsyncTempFileClient::write`"))?;
48        f.write_all(buf).await?;
49        f.flush().await?;
50        Ok(())
51    }
52}
53
54impl UploadClient for AsyncTempFileClient {
55    // Make the `upload_id` the temp file's path, so we're really initializing
56    // `AsyncTempFileClient` here.
57    fn new_upload<'a, 'client: 'a>(
58        &'client self,
59        addr: &'a UploadAddress,
60    ) -> BoxFuture<'a, Result<UploadRequestParams, AwsError>> {
61        Box::pin(async move {
62            let upload_id: UploadId = self.set().await?.into();
63            Ok(UploadRequestParams::new(upload_id, addr.clone()))
64        })
65    }
66
67    fn upload_part<'a, 'client: 'a>(
68        &'client self,
69        params: &'a UploadRequestParams,
70        part_number: i32,
71        part: ByteStream,
72    ) -> BoxFuture<'a, Result<EntityTag, AwsError>> {
73        Box::pin(async move {
74            let filepath = params.upload_id();
75            let etag = EntityTag::new(format!("{filepath}_{part_number}"));
76            let bytevec = part.collect().await.map(|data| data.to_vec())?;
77            self.write(&bytevec).await?;
78            Ok(etag)
79        })
80    }
81
82    fn complete_upload<'a, 'client: 'a>(
83        &'client self,
84        params: &'a UploadRequestParams,
85        parts: &'a UploadedParts,
86    ) -> BoxFuture<'a, Result<EntityTag, AwsError>> {
87        let etag = EntityTag::from(format!("{}_{}", params.upload_id(), parts.last_completed()));
88        Box::pin(future::ready(Ok(etag)))
89    }
90}
91
92impl From<TmpFileError> for AwsError {
93    fn from(value: TmpFileError) -> Self {
94        match value {
95            TmpFileError::Io(e) => AwsError::from(e),
96            e => AwsError::DynStd(e.to_string().into()),
97        }
98    }
99}