cobalt_aws/s3/
async_put_object.rs1use std::mem;
3use std::pin::Pin;
4
5use aws_sdk_s3::{
7 operation::put_object::{PutObjectError, PutObjectOutput},
8 types::ObjectCannedAcl,
9};
10use futures::{
11 io::Error,
12 ready,
13 task::{Context, Poll},
14 AsyncWrite, Future,
15};
16
17use crate::s3::Client;
19use crate::types::SdkError;
20
21enum PutObjectState<'a> {
23 Writing,
25 Closing(Pin<Box<dyn Future<Output = Result<PutObjectOutput, SdkError<PutObjectError>>> + 'a>>),
28 Closed,
30}
31
32pub struct AsyncPutObject<'a> {
54 client: &'a Client,
55 key: String,
56 bucket: String,
57 buf: Vec<u8>,
58 state: PutObjectState<'a>,
59}
60
61impl<'a> AsyncPutObject<'a> {
62 pub fn new(client: &'a Client, bucket: &str, key: &str) -> Self {
64 AsyncPutObject {
65 client,
66 key: key.into(),
67 bucket: bucket.into(),
68 buf: vec![],
69 state: PutObjectState::Writing,
70 }
71 }
72}
73
74impl<'a> AsyncWrite for AsyncPutObject<'a> {
75 fn poll_write(
76 mut self: Pin<&mut Self>,
77 _cx: &mut Context<'_>,
78 buf: &[u8],
79 ) -> Poll<Result<usize, Error>> {
80 match self.state {
81 PutObjectState::Writing => {
82 self.buf.extend(buf);
83 Poll::Ready(Ok(buf.len()))
84 }
85 _ => Poll::Ready(Err(Error::other(
86 "Attempted to .write() writer after .close().",
87 ))),
88 }
89 }
90
91 fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Error>> {
92 match self.state {
93 PutObjectState::Writing => Poll::Ready(Ok(())),
94 _ => Poll::Ready(Err(Error::other(
95 "Attempted to .flush() writer after .close().",
96 ))),
97 }
98 }
99
100 fn poll_close<'b>(
101 mut self: Pin<&'b mut AsyncPutObject<'a>>,
102 cx: &'b mut Context<'_>,
103 ) -> Poll<Result<(), Error>> {
104 match self.state {
105 PutObjectState::Writing => {
106 let fut = self
109 .client
110 .put_object()
111 .bucket(&self.bucket)
112 .key(&self.key)
113 .body(mem::take(&mut self.buf).into())
114 .acl(ObjectCannedAcl::BucketOwnerFullControl)
115 .send();
116 self.state = PutObjectState::Closing(Box::pin(fut));
117 cx.waker().wake_by_ref();
120 Poll::Pending
121 }
122 PutObjectState::Closing(ref mut fut) => {
123 let result = ready!(Pin::new(fut).poll(cx))
124 .map(|_| ())
125 .map_err(Error::other);
126 self.state = PutObjectState::Closed;
127 Poll::Ready(result)
128 }
129 PutObjectState::Closed => {
130 Poll::Ready(Err(Error::other("Attempted to .close() writer twice.")))
131 }
132 }
133 }
134}
135
136#[cfg(test)]
137mod test_async_put_object {
138 use super::*;
139 use crate::localstack;
140 use aws_sdk_s3::error::ProvideErrorMetadata;
141 use futures::{AsyncReadExt, AsyncWriteExt};
142 use serial_test::serial;
143 use std::error::Error;
144 use tokio;
145
146 async fn localstack_test_client() -> Client {
147 localstack::test_utils::wait_for_localstack().await;
148 let shared_config = crate::config::load_from_env().await.unwrap();
149 let builder = aws_sdk_s3::config::Builder::from(&shared_config)
150 .force_path_style(true)
151 .build();
152 Client::from_conf(builder)
153 }
154
155 #[tokio::test]
156 #[serial]
157 async fn test_non_existent_bucket() {
158 let client = localstack_test_client().await;
159 let mut writer = AsyncPutObject::new(&client, "non-existent-bucket", "my-object");
160 writer.write_all(b"File contents").await.unwrap();
161 let e = writer.close().await.unwrap_err();
162 let e = e
163 .source()
164 .unwrap()
165 .downcast_ref::<PutObjectError>()
166 .unwrap();
167
168 assert_eq!(e.code(), Some("NoSuchBucket"));
169 }
170
171 #[tokio::test]
172 #[serial]
173 async fn test_write() {
174 let client = localstack_test_client().await;
175 let mut writer = AsyncPutObject::new(&client, "test-bucket", "test-output.txt");
176 writer.write_all(b"File contents").await.unwrap();
177 writer.close().await.unwrap();
178
179 let mut buffer = String::new();
181 let mut reader = crate::s3::get_object(&client, "test-bucket", "test-output.txt")
182 .await
183 .unwrap();
184 reader.read_to_string(&mut buffer).await.unwrap();
185 assert_eq!(buffer, "File contents");
186
187 client
189 .delete_object()
190 .bucket("test-bucket")
191 .key("test-output.txt")
192 .send()
193 .await
194 .unwrap();
195 }
196
197 #[tokio::test]
198 #[serial]
199 async fn test_after_close_errors() {
200 let client = localstack_test_client().await;
201 let mut writer = AsyncPutObject::new(&client, "test-bucket", "test-output.txt");
202 writer.write_all(b"File contents").await.unwrap();
203 writer.close().await.unwrap();
204
205 let e = writer.close().await.unwrap_err();
206 assert_eq!(e.to_string(), "Attempted to .close() writer twice.");
207
208 let e = writer.flush().await.unwrap_err();
209 assert_eq!(
210 e.to_string(),
211 "Attempted to .flush() writer after .close()."
212 );
213
214 let e = writer.write_all(b"More content").await.unwrap_err();
215 assert_eq!(
216 e.to_string(),
217 "Attempted to .write() writer after .close()."
218 );
219
220 client
222 .delete_object()
223 .bucket("test-bucket")
224 .key("test-output.txt")
225 .send()
226 .await
227 .unwrap();
228 }
229}