1#![no_std]
7#![doc(test(
8 no_crate_inject,
9 attr(
10 deny(warnings, rust_2018_idioms, single_use_lifetimes),
11 allow(dead_code, unused_assignments, unused_variables)
12 )
13))]
14#![warn(missing_docs, )] #[cfg(feature = "alloc")]
17extern crate alloc;
18#[cfg(feature = "std")]
19extern crate std;
20
21mod ext;
22pub(crate) mod unfold_state;
23use core::ops::DerefMut;
24use core::pin::Pin;
25use core::task::{Context, Poll};
26pub use ext::*;
27
28#[must_use = "sinks do nothing unless polled"]
55pub trait Sink<Item> {
56 type Error: core::error::Error;
58
59 fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>;
72
73 fn start_send(self: Pin<&mut Self>, item: Item) -> Result<(), Self::Error>;
93
94 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>;
107
108 fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>;
120}
121
122impl<S: ?Sized + Sink<Item> + Unpin, Item> Sink<Item> for &mut S {
123 type Error = S::Error;
124
125 fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
126 Pin::new(&mut **self).poll_ready(cx)
127 }
128
129 fn start_send(mut self: Pin<&mut Self>, item: Item) -> Result<(), Self::Error> {
130 Pin::new(&mut **self).start_send(item)
131 }
132
133 fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
134 Pin::new(&mut **self).poll_flush(cx)
135 }
136
137 fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
138 Pin::new(&mut **self).poll_close(cx)
139 }
140}
141
142impl<P, Item> Sink<Item> for Pin<P>
143where
144 P: DerefMut + Unpin,
145 P::Target: Sink<Item>,
146{
147 type Error = <P::Target as Sink<Item>>::Error;
148
149 fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
150 self.get_mut().as_mut().poll_ready(cx)
151 }
152
153 fn start_send(self: Pin<&mut Self>, item: Item) -> Result<(), Self::Error> {
154 self.get_mut().as_mut().start_send(item)
155 }
156
157 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
158 self.get_mut().as_mut().poll_flush(cx)
159 }
160
161 fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
162 self.get_mut().as_mut().poll_close(cx)
163 }
164}
165
166#[cfg(feature = "alloc")]
167impl<T> Sink<T> for alloc::vec::Vec<T> {
168 type Error = core::convert::Infallible;
169
170 fn poll_ready(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
171 Poll::Ready(Ok(()))
172 }
173
174 fn start_send(self: Pin<&mut Self>, item: T) -> Result<(), Self::Error> {
175 unsafe { self.get_unchecked_mut() }.push(item);
177 Ok(())
178 }
179
180 fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
181 Poll::Ready(Ok(()))
182 }
183
184 fn poll_close(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
185 Poll::Ready(Ok(()))
186 }
187}
188
189#[cfg(feature = "alloc")]
190impl<T> Sink<T> for alloc::collections::VecDeque<T> {
191 type Error = core::convert::Infallible;
192
193 fn poll_ready(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
194 Poll::Ready(Ok(()))
195 }
196
197 fn start_send(self: Pin<&mut Self>, item: T) -> Result<(), Self::Error> {
198 unsafe { self.get_unchecked_mut() }.push_back(item);
200 Ok(())
201 }
202
203 fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
204 Poll::Ready(Ok(()))
205 }
206
207 fn poll_close(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
208 Poll::Ready(Ok(()))
209 }
210}
211
212#[cfg(feature = "alloc")]
213impl<S: ?Sized + Sink<Item> + Unpin, Item> Sink<Item> for alloc::boxed::Box<S> {
214 type Error = S::Error;
215
216 fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
217 Pin::new(&mut **self).poll_ready(cx)
218 }
219
220 fn start_send(mut self: Pin<&mut Self>, item: Item) -> Result<(), Self::Error> {
221 Pin::new(&mut **self).start_send(item)
222 }
223
224 fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
225 Pin::new(&mut **self).poll_flush(cx)
226 }
227
228 fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
229 Pin::new(&mut **self).poll_close(cx)
230 }
231}
232
233impl<SL: Sized + Sink<Item> + Unpin, SR: Sized + Sink<Item> + Unpin, Item> Sink<Item>
234 for either::Either<SL, SR>
235where
236 either::Either<SL::Error, SR::Error>: core::error::Error,
237{
238 type Error = either::Either<SL::Error, SR::Error>;
239
240 fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
241 match self.get_mut() {
242 either::Either::Left(s) => match Pin::new(s).poll_ready(cx) {
243 Poll::Ready(Ok(())) => Poll::Ready(Ok(())),
244 Poll::Ready(Err(e)) => Poll::Ready(Err(either::Either::Left(e))),
245 Poll::Pending => Poll::Pending,
246 },
247 either::Either::Right(s) => match Pin::new(s).poll_ready(cx) {
248 Poll::Ready(Ok(())) => Poll::Ready(Ok(())),
249 Poll::Ready(Err(e)) => Poll::Ready(Err(either::Either::Right(e))),
250 Poll::Pending => Poll::Pending,
251 },
252 }
253 }
254
255 fn start_send(self: Pin<&mut Self>, item: Item) -> Result<(), Self::Error> {
256 match self.get_mut() {
257 either::Either::Left(s) => Pin::new(s).start_send(item).map_err(either::Either::Left),
258 either::Either::Right(s) => Pin::new(s).start_send(item).map_err(either::Either::Right),
259 }
260 }
261
262 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
263 match self.get_mut() {
264 either::Either::Left(s) => match Pin::new(s).poll_flush(cx) {
265 Poll::Ready(Ok(())) => Poll::Ready(Ok(())),
266 Poll::Ready(Err(e)) => Poll::Ready(Err(either::Either::Left(e))),
267 Poll::Pending => Poll::Pending,
268 },
269 either::Either::Right(s) => match Pin::new(s).poll_flush(cx) {
270 Poll::Ready(Ok(())) => Poll::Ready(Ok(())),
271 Poll::Ready(Err(e)) => Poll::Ready(Err(either::Either::Right(e))),
272 Poll::Pending => Poll::Pending,
273 },
274 }
275 }
276
277 fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
278 match self.get_mut() {
279 either::Either::Left(s) => match Pin::new(s).poll_close(cx) {
280 Poll::Ready(Ok(())) => Poll::Ready(Ok(())),
281 Poll::Ready(Err(e)) => Poll::Ready(Err(either::Either::Left(e))),
282 Poll::Pending => Poll::Pending,
283 },
284 either::Either::Right(s) => match Pin::new(s).poll_close(cx) {
285 Poll::Ready(Ok(())) => Poll::Ready(Ok(())),
286 Poll::Ready(Err(e)) => Poll::Ready(Err(either::Either::Right(e))),
287 Poll::Pending => Poll::Pending,
288 },
289 }
290 }
291}