cobalt_aws/s3/
async_put_object.rs

1// Standard library imports
2use std::mem;
3use std::pin::Pin;
4
5// External crates
6use aws_sdk_s3::{
7    operation::put_object::{PutObjectError, PutObjectOutput},
8    types::ObjectCannedAcl,
9};
10use futures::{
11    io::Error,
12    ready,
13    task::{Context, Poll},
14    AsyncWrite, Future,
15};
16
17// Internal project imports
18use crate::s3::Client;
19use crate::types::SdkError;
20
21// Tracks the state of the AsyncWrite lifecycle for an AsyncPutObject.
22enum PutObjectState<'a> {
23    // Open for writing. Can call .write(), .flush(), or .close().
24    Writing,
25    // In the process of writing the data to s3. We store the future which is performing
26    // the pub_object operation.
27    Closing(Pin<Box<dyn Future<Output = Result<PutObjectOutput, SdkError<PutObjectError>>> + 'a>>),
28    // We have completed writing to s3.
29    Closed,
30}
31
32/// Implements the `AsyncWrite` trait and writes data to S3 using the `put_object` API.
33///
34/// # Example
35///
36/// ```no_run
37/// use aws_config;
38/// use cobalt_aws::s3::{AsyncPutObject, Client};
39/// use cobalt_aws::config::load_from_env;
40/// use futures::AsyncWriteExt;
41///
42/// # tokio_test::block_on(async {
43/// let shared_config = load_from_env().await.unwrap();
44/// let client = Client::new(&shared_config);
45///
46/// let mut writer = AsyncPutObject::new(&client, "my-bucket", "my-key");
47/// writer.write_all(b"File contents").await.unwrap();
48///
49/// // The contents are pushed to S3 when the `.close()` method is called.
50/// writer.close().await.unwrap();
51/// # })
52/// ```
53pub struct AsyncPutObject<'a> {
54    client: &'a Client,
55    key: String,
56    bucket: String,
57    buf: Vec<u8>,
58    state: PutObjectState<'a>,
59}
60
61impl<'a> AsyncPutObject<'a> {
62    /// Create a new `AsyncPutObject` which will write data to the given `bucket` and `key`.
63    pub fn new(client: &'a Client, bucket: &str, key: &str) -> Self {
64        AsyncPutObject {
65            client,
66            key: key.into(),
67            bucket: bucket.into(),
68            buf: vec![],
69            state: PutObjectState::Writing,
70        }
71    }
72}
73
74impl<'a> AsyncWrite for AsyncPutObject<'a> {
75    fn poll_write(
76        mut self: Pin<&mut Self>,
77        _cx: &mut Context<'_>,
78        buf: &[u8],
79    ) -> Poll<Result<usize, Error>> {
80        match self.state {
81            PutObjectState::Writing => {
82                self.buf.extend(buf);
83                Poll::Ready(Ok(buf.len()))
84            }
85            _ => Poll::Ready(Err(Error::other(
86                "Attempted to .write() writer after .close().",
87            ))),
88        }
89    }
90
91    fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Error>> {
92        match self.state {
93            PutObjectState::Writing => Poll::Ready(Ok(())),
94            _ => Poll::Ready(Err(Error::other(
95                "Attempted to .flush() writer after .close().",
96            ))),
97        }
98    }
99
100    fn poll_close<'b>(
101        mut self: Pin<&'b mut AsyncPutObject<'a>>,
102        cx: &'b mut Context<'_>,
103    ) -> Poll<Result<(), Error>> {
104        match self.state {
105            PutObjectState::Writing => {
106                // Create the put_object future and transition to
107                // the Closing state.
108                let fut = self
109                    .client
110                    .put_object()
111                    .bucket(&self.bucket)
112                    .key(&self.key)
113                    .body(mem::take(&mut self.buf).into())
114                    .acl(ObjectCannedAcl::BucketOwnerFullControl)
115                    .send();
116                self.state = PutObjectState::Closing(Box::pin(fut));
117                // Manually trigger a wake so that the executor immediately
118                // polls us again, which will take us into the `Closing` block.
119                cx.waker().wake_by_ref();
120                Poll::Pending
121            }
122            PutObjectState::Closing(ref mut fut) => {
123                let result = ready!(Pin::new(fut).poll(cx))
124                    .map(|_| ())
125                    .map_err(Error::other);
126                self.state = PutObjectState::Closed;
127                Poll::Ready(result)
128            }
129            PutObjectState::Closed => {
130                Poll::Ready(Err(Error::other("Attempted to .close() writer twice.")))
131            }
132        }
133    }
134}
135
136#[cfg(test)]
137mod test_async_put_object {
138    use super::*;
139    use crate::localstack;
140    use aws_sdk_s3::error::ProvideErrorMetadata;
141    use futures::{AsyncReadExt, AsyncWriteExt};
142    use serial_test::serial;
143    use std::error::Error;
144    use tokio;
145
146    async fn localstack_test_client() -> Client {
147        localstack::test_utils::wait_for_localstack().await;
148        let shared_config = crate::config::load_from_env().await.unwrap();
149        let builder = aws_sdk_s3::config::Builder::from(&shared_config)
150            .force_path_style(true)
151            .build();
152        Client::from_conf(builder)
153    }
154
155    #[tokio::test]
156    #[serial]
157    async fn test_non_existent_bucket() {
158        let client = localstack_test_client().await;
159        let mut writer = AsyncPutObject::new(&client, "non-existent-bucket", "my-object");
160        writer.write_all(b"File contents").await.unwrap();
161        let e = writer.close().await.unwrap_err();
162        let e = e
163            .source()
164            .unwrap()
165            .downcast_ref::<PutObjectError>()
166            .unwrap();
167
168        assert_eq!(e.code(), Some("NoSuchBucket"));
169    }
170
171    #[tokio::test]
172    #[serial]
173    async fn test_write() {
174        let client = localstack_test_client().await;
175        let mut writer = AsyncPutObject::new(&client, "test-bucket", "test-output.txt");
176        writer.write_all(b"File contents").await.unwrap();
177        writer.close().await.unwrap();
178
179        // Check contents
180        let mut buffer = String::new();
181        let mut reader = crate::s3::get_object(&client, "test-bucket", "test-output.txt")
182            .await
183            .unwrap();
184        reader.read_to_string(&mut buffer).await.unwrap();
185        assert_eq!(buffer, "File contents");
186
187        // Clean up
188        client
189            .delete_object()
190            .bucket("test-bucket")
191            .key("test-output.txt")
192            .send()
193            .await
194            .unwrap();
195    }
196
197    #[tokio::test]
198    #[serial]
199    async fn test_after_close_errors() {
200        let client = localstack_test_client().await;
201        let mut writer = AsyncPutObject::new(&client, "test-bucket", "test-output.txt");
202        writer.write_all(b"File contents").await.unwrap();
203        writer.close().await.unwrap();
204
205        let e = writer.close().await.unwrap_err();
206        assert_eq!(e.to_string(), "Attempted to .close() writer twice.");
207
208        let e = writer.flush().await.unwrap_err();
209        assert_eq!(
210            e.to_string(),
211            "Attempted to .flush() writer after .close()."
212        );
213
214        let e = writer.write_all(b"More content").await.unwrap_err();
215        assert_eq!(
216            e.to_string(),
217            "Attempted to .write() writer after .close()."
218        );
219
220        // Clean up
221        client
222            .delete_object()
223            .bucket("test-bucket")
224            .key("test-output.txt")
225            .send()
226            .await
227            .unwrap();
228    }
229}