aws_multipart_upload/write/
part_buffer.rs

1use crate::client::part::CompletedParts;
2use crate::client::request::SendUploadPart;
3use crate::error::{Error as UploadError, Result};
4
5use futures::stream::FuturesUnordered;
6use futures::{Stream, ready};
7use multipart_write::MultipartWrite;
8use std::fmt::{self, Debug, Formatter};
9use std::num::NonZeroUsize;
10use std::pin::Pin;
11use std::task::{Context, Poll};
12
13/// Utility `MultipartWrite` for buffering upload request futures.
14#[must_use = "futures do nothing unless polled"]
15#[pin_project::pin_project]
16pub struct PartBuffer {
17    #[pin]
18    pending: FuturesUnordered<SendUploadPart>,
19    completed: CompletedParts,
20    capacity: Option<NonZeroUsize>,
21    flushing: bool,
22}
23
24impl PartBuffer {
25    pub(crate) fn new(capacity: Option<usize>) -> Self {
26        Self {
27            pending: FuturesUnordered::new(),
28            completed: CompletedParts::default(),
29            capacity: capacity.and_then(NonZeroUsize::new),
30            flushing: false,
31        }
32    }
33}
34
35impl MultipartWrite<SendUploadPart> for PartBuffer {
36    type Ret = ();
37    type Output = CompletedParts;
38    type Error = UploadError;
39
40    fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
41        let mut this = self.project();
42        // Flushing means completely emptying the pending buffer, so we don't
43        // want to be adding to it when that's in progress.
44        if *this.flushing {
45            return Poll::Pending;
46        }
47        // Poke the pending uploads to see if any are ready.
48        while let Poll::Ready(Some(res)) = this.pending.as_mut().poll_next(cx) {
49            match res {
50                Ok(v) => {
51                    trace!(
52                        id = %v.id,
53                        etag = %v.etag,
54                        part = ?v.part_number,
55                        size = v.part_size,
56                        "completed part",
57                    );
58                    this.completed.push(v);
59                }
60                Err(e) => return Poll::Ready(Err(e)),
61            }
62        }
63        if this.capacity.is_none_or(|n| this.pending.len() < n.get()) {
64            Poll::Ready(Ok(()))
65        } else {
66            Poll::Pending
67        }
68    }
69
70    fn start_send(mut self: Pin<&mut Self>, part: SendUploadPart) -> Result<Self::Ret> {
71        self.as_mut().pending.push(part);
72        Ok(())
73    }
74
75    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
76        let mut this = self.project();
77        *this.flushing = true;
78
79        while !this.pending.is_empty() {
80            match ready!(this.pending.as_mut().poll_next(cx)) {
81                Some(Ok(v)) => {
82                    trace!(
83                        id = %v.id,
84                        etag = %v.etag,
85                        part = ?v.part_number,
86                        size = v.part_size,
87                        "flushed completed part",
88                    );
89                    this.completed.push(v);
90                }
91                Some(Err(e)) => return Poll::Ready(Err(e)),
92                // The stream stopped producing, i.e., the collection is empty.
93                _ => break,
94            }
95        }
96
97        *this.flushing = false;
98        Poll::Ready(Ok(()))
99    }
100
101    fn poll_complete(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<Self::Output>> {
102        ready!(self.as_mut().poll_flush(cx))?;
103        Poll::Ready(Ok(std::mem::take(&mut self.completed)))
104    }
105}
106
107impl Debug for PartBuffer {
108    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
109        f.debug_struct("PartBuffer")
110            .field("pending", &self.pending)
111            .field("completed", &self.completed)
112            .field("capacity", &self.capacity)
113            .field("flushing", &self.flushing)
114            .finish()
115    }
116}