aws_multipart_upload/write/
part_buffer.rs1use crate::client::part::CompletedParts;
2use crate::client::request::SendUploadPart;
3use crate::error::{Error as UploadError, Result};
4
5use futures::stream::FuturesUnordered;
6use futures::{Stream, ready};
7use multipart_write::MultipartWrite;
8use std::fmt::{self, Debug, Formatter};
9use std::num::NonZeroUsize;
10use std::pin::Pin;
11use std::task::{Context, Poll};
12
13#[must_use = "futures do nothing unless polled"]
15#[pin_project::pin_project]
16pub struct PartBuffer {
17 #[pin]
18 pending: FuturesUnordered<SendUploadPart>,
19 completed: CompletedParts,
20 capacity: Option<NonZeroUsize>,
21 flushing: bool,
22}
23
24impl PartBuffer {
25 pub(crate) fn new(capacity: Option<usize>) -> Self {
26 Self {
27 pending: FuturesUnordered::new(),
28 completed: CompletedParts::default(),
29 capacity: capacity.and_then(NonZeroUsize::new),
30 flushing: false,
31 }
32 }
33}
34
35impl MultipartWrite<SendUploadPart> for PartBuffer {
36 type Ret = ();
37 type Output = CompletedParts;
38 type Error = UploadError;
39
40 fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
41 let mut this = self.project();
42 if *this.flushing {
45 return Poll::Pending;
46 }
47 while let Poll::Ready(Some(res)) = this.pending.as_mut().poll_next(cx) {
49 match res {
50 Ok(v) => {
51 trace!(
52 id = %v.id,
53 etag = %v.etag,
54 part = ?v.part_number,
55 size = v.part_size,
56 "completed part",
57 );
58 this.completed.push(v);
59 }
60 Err(e) => return Poll::Ready(Err(e)),
61 }
62 }
63 if this.capacity.is_none_or(|n| this.pending.len() < n.get()) {
64 Poll::Ready(Ok(()))
65 } else {
66 Poll::Pending
67 }
68 }
69
70 fn start_send(mut self: Pin<&mut Self>, part: SendUploadPart) -> Result<Self::Ret> {
71 self.as_mut().pending.push(part);
72 Ok(())
73 }
74
75 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
76 let mut this = self.project();
77 *this.flushing = true;
78
79 while !this.pending.is_empty() {
80 match ready!(this.pending.as_mut().poll_next(cx)) {
81 Some(Ok(v)) => {
82 trace!(
83 id = %v.id,
84 etag = %v.etag,
85 part = ?v.part_number,
86 size = v.part_size,
87 "flushed completed part",
88 );
89 this.completed.push(v);
90 }
91 Some(Err(e)) => return Poll::Ready(Err(e)),
92 _ => break,
94 }
95 }
96
97 *this.flushing = false;
98 Poll::Ready(Ok(()))
99 }
100
101 fn poll_complete(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<Self::Output>> {
102 ready!(self.as_mut().poll_flush(cx))?;
103 Poll::Ready(Ok(std::mem::take(&mut self.completed)))
104 }
105}
106
107impl Debug for PartBuffer {
108 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
109 f.debug_struct("PartBuffer")
110 .field("pending", &self.pending)
111 .field("completed", &self.completed)
112 .field("capacity", &self.capacity)
113 .field("flushing", &self.flushing)
114 .finish()
115 }
116}