multipart_write/stream/
write_until.rs1use super::TrySend;
2use crate::{FusedMultipartWrite, MultipartWrite};
3
4use futures_core::ready;
5use futures_core::stream::{FusedStream, Stream};
6use std::fmt::{self, Debug, Formatter};
7use std::pin::Pin;
8use std::task::{Context, Poll};
9
10pin_project_lite::pin_project! {
11 #[must_use = "futures do nothing unless polled"]
15 pub struct WriteUntil<St: Stream, Wr, F> {
16 #[pin]
17 inner: TrySend<St, Wr>,
18 f: F,
19 state: State,
20 is_empty: bool,
21 }
22}
23
24#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
25enum State {
26 PollComplete,
27 FinalPollComplete,
28 #[default]
29 PollNext,
30 Terminating,
31 Terminated,
32}
33
34impl State {
35 fn should_complete(self) -> bool {
36 self == Self::PollComplete || self.is_last_complete()
37 }
38
39 fn is_last_complete(self) -> bool {
40 self == Self::FinalPollComplete
41 }
42
43 fn is_terminated(self) -> bool {
44 self == Self::Terminated
45 }
46}
47
48impl<St: Stream, Wr, F> WriteUntil<St, Wr, F> {
49 pub(super) fn new(inner: St, writer: Wr, f: F) -> Self {
50 Self {
51 inner: TrySend::new(inner, writer),
52 f,
53 state: State::PollNext,
54 is_empty: true,
55 }
56 }
57}
58
59impl<St, Wr, F> FusedStream for WriteUntil<St, Wr, F>
60where
61 St: Stream,
62 Wr: FusedMultipartWrite<St::Item>,
63 F: FnMut(Wr::Ret) -> bool,
64{
65 fn is_terminated(&self) -> bool {
66 self.inner.is_terminated() || self.state.is_terminated()
67 }
68}
69
70impl<St, Wr, F> Stream for WriteUntil<St, Wr, F>
71where
72 St: Stream,
73 Wr: MultipartWrite<St::Item>,
74 F: FnMut(Wr::Ret) -> bool,
75{
76 type Item = Result<Wr::Output, Wr::Error>;
77
78 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
79 let mut this = self.project();
80
81 if *this.state == State::Terminating {
85 *this.state = State::Terminated;
86 return Poll::Ready(None);
87 }
88
89 loop {
90 if this.state.should_complete() {
91 let res = ready!(this.inner.as_mut().poll_complete(cx));
92 if this.state.is_last_complete() {
93 *this.state = State::Terminating;
94 } else {
95 *this.state = State::PollNext;
96 }
97 *this.is_empty = true;
98 return Poll::Ready(Some(res));
99 }
100
101 match ready!(this.inner.as_mut().poll_next(cx)) {
102 Some(Ok(ret)) => {
103 if (this.f)(ret) {
104 *this.state = State::PollComplete;
105 }
106 *this.is_empty = false;
107 }
108 Some(Err(e)) => return Poll::Ready(Some(Err(e))),
109 None => {
110 if *this.is_empty {
114 *this.state = State::Terminated;
115 return Poll::Ready(None);
116 }
117 *this.state = State::FinalPollComplete;
118 }
119 }
120 }
121 }
122}
123
124impl<St: Stream, Wr, F> Debug for WriteUntil<St, Wr, F>
125where
126 St: Debug,
127 St::Item: Debug,
128 Wr: Debug,
129{
130 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
131 f.debug_struct("WriteUntil")
132 .field("inner", &self.inner)
133 .field("f", &"FnMut(Wr::Ret) -> bool")
134 .field("state", &self.state)
135 .field("is_empty", &self.is_empty)
136 .finish()
137 }
138}