aws_multipart_upload/types/
upload.rs1use futures::Sink;
2use pin_project_lite::pin_project;
3use std::task::{ready, Context, Poll};
4use std::{io::Error as IoError, pin::Pin, sync::Arc};
5use tokio_util::codec::{Encoder, FramedWrite};
6
7use crate::{
8 types::{api::*, write_parts::WriteParts, UploadClient, UploadControl},
9 AwsError,
10};
11
12pin_project! {
13 pub struct Upload<E> {
19 #[pin]
20 inner: FramedWrite<WriteParts, E>,
21 client: Arc<dyn UploadClient + Send + Sync>,
22 ctrl: Arc<dyn UploadControl + Send + Sync>,
23 }
24}
25
26impl<E> Upload<E> {
27 pub fn new<C, U>(client: U, ctrl: C, encoder: E, params: UploadRequestParams) -> Self
28 where
29 C: UploadControl + Send + Sync + 'static,
30 U: UploadClient + Send + Sync + 'static,
31 {
32 let client = Arc::new(client);
33 let ctrl = Arc::new(ctrl);
34 let write = WriteParts::new(Arc::clone(&client), Arc::clone(&ctrl), params);
35 let inner = FramedWrite::new(write, encoder);
36 Self {
37 inner,
38 client,
39 ctrl,
40 }
41 }
42
43 fn should_upload_part(&self) -> bool {
44 let part_size = self.inner.write_buffer().len();
45 self.ctrl.is_part_ready(part_size)
46 }
47
48 fn poll_complete_upload<I>(
49 self: Pin<&mut Self>,
50 cx: &mut Context<'_>,
51 ) -> Poll<Result<(), AwsError>>
52 where
53 E: Encoder<I>,
54 E::Error: From<IoError>,
55 AwsError: From<E::Error>,
56 {
57 let parts = self.inner.get_ref().uploaded_parts();
58 let params = self.inner.get_ref().params();
59 tracing::trace!(?parts, ?params, "completing upload");
60
61 let etag = ready!(self
62 .client
63 .complete_upload(¶ms, &parts)
64 .as_mut()
65 .poll(cx))?;
66 ready!(self.client.on_upload_complete(etag).as_mut().poll(cx))?;
68
69 Poll::Ready(Ok(()))
70 }
71}
72
73impl<E, I> Sink<I> for Upload<E>
74where
75 E: Encoder<I> + Clone,
76 E::Error: From<IoError>,
77 AwsError: From<E::Error>,
78{
79 type Error = AwsError;
80
81 fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
82 if self.should_upload_part() {
83 ready!(self.as_mut().project().inner.poll_flush(cx))?;
84 }
85
86 Poll::Ready(Ok(()))
87 }
88
89 fn start_send(self: Pin<&mut Self>, item: I) -> Result<(), Self::Error> {
90 self.project().inner.start_send(item)?;
91 Ok(())
92 }
93
94 fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
95 ready!(self.as_mut().project().inner.poll_flush(cx))?;
99 self.poll_complete_upload(cx)
100 }
101
102 fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
103 ready!(self.as_mut().project().inner.poll_flush(cx))?;
104 self.poll_complete_upload(cx)
105 }
106}