1#![no_std]
7#![doc(test(
8 no_crate_inject,
9 attr(
10 deny(warnings, rust_2018_idioms, single_use_lifetimes),
11 allow(dead_code, unused_assignments, unused_variables)
12 )
13))]
14#![warn(missing_docs, )] #[cfg(feature = "alloc")]
17extern crate alloc;
18#[cfg(feature = "std")]
19extern crate std;
20
21mod ext;
22pub(crate) mod unfold_state;
23use core::ops::DerefMut;
24use core::pin::Pin;
25use core::task::{Context, Poll};
26pub use ext::*;
27
28#[cfg(feature = "sync")]
29pub mod sync;
30
31#[must_use = "sinks do nothing unless polled"]
58pub trait Sink<Item> {
59 type Error;
61
62 fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>;
75
76 fn start_send(self: Pin<&mut Self>, item: Item) -> Result<(), Self::Error>;
96
97 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>;
110
111 fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>;
123}
124
125impl<S: ?Sized + Sink<Item> + Unpin, Item> Sink<Item> for &mut S {
126 type Error = S::Error;
127
128 fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
129 Pin::new(&mut **self).poll_ready(cx)
130 }
131
132 fn start_send(mut self: Pin<&mut Self>, item: Item) -> Result<(), Self::Error> {
133 Pin::new(&mut **self).start_send(item)
134 }
135
136 fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
137 Pin::new(&mut **self).poll_flush(cx)
138 }
139
140 fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
141 Pin::new(&mut **self).poll_close(cx)
142 }
143}
144
145impl<P, Item> Sink<Item> for Pin<P>
146where
147 P: DerefMut + Unpin,
148 P::Target: Sink<Item>,
149{
150 type Error = <P::Target as Sink<Item>>::Error;
151
152 fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
153 self.get_mut().as_mut().poll_ready(cx)
154 }
155
156 fn start_send(self: Pin<&mut Self>, item: Item) -> Result<(), Self::Error> {
157 self.get_mut().as_mut().start_send(item)
158 }
159
160 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
161 self.get_mut().as_mut().poll_flush(cx)
162 }
163
164 fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
165 self.get_mut().as_mut().poll_close(cx)
166 }
167}
168
169#[cfg(feature = "alloc")]
170impl<T> Sink<T> for alloc::vec::Vec<T> {
171 type Error = core::convert::Infallible;
172
173 fn poll_ready(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
174 Poll::Ready(Ok(()))
175 }
176
177 fn start_send(self: Pin<&mut Self>, item: T) -> Result<(), Self::Error> {
178 unsafe { self.get_unchecked_mut() }.push(item);
180 Ok(())
181 }
182
183 fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
184 Poll::Ready(Ok(()))
185 }
186
187 fn poll_close(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
188 Poll::Ready(Ok(()))
189 }
190}
191
192#[cfg(feature = "alloc")]
193impl<T> Sink<T> for alloc::collections::VecDeque<T> {
194 type Error = core::convert::Infallible;
195
196 fn poll_ready(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
197 Poll::Ready(Ok(()))
198 }
199
200 fn start_send(self: Pin<&mut Self>, item: T) -> Result<(), Self::Error> {
201 unsafe { self.get_unchecked_mut() }.push_back(item);
203 Ok(())
204 }
205
206 fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
207 Poll::Ready(Ok(()))
208 }
209
210 fn poll_close(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
211 Poll::Ready(Ok(()))
212 }
213}
214
215#[cfg(feature = "alloc")]
216impl<S: ?Sized + Sink<Item> + Unpin, Item> Sink<Item> for alloc::boxed::Box<S> {
217 type Error = S::Error;
218
219 fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
220 Pin::new(&mut **self).poll_ready(cx)
221 }
222
223 fn start_send(mut self: Pin<&mut Self>, item: Item) -> Result<(), Self::Error> {
224 Pin::new(&mut **self).start_send(item)
225 }
226
227 fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
228 Pin::new(&mut **self).poll_flush(cx)
229 }
230
231 fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
232 Pin::new(&mut **self).poll_close(cx)
233 }
234}
235
236impl<SL: Sized + Sink<Item> + Unpin, SR: Sized + Sink<Item> + Unpin, Item> Sink<Item>
237 for either::Either<SL, SR>
238{
239 type Error = either::Either<SL::Error, SR::Error>;
240
241 fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
242 match self.get_mut() {
243 either::Either::Left(s) => match Pin::new(s).poll_ready(cx) {
244 Poll::Ready(Ok(())) => Poll::Ready(Ok(())),
245 Poll::Ready(Err(e)) => Poll::Ready(Err(either::Either::Left(e))),
246 Poll::Pending => Poll::Pending,
247 },
248 either::Either::Right(s) => match Pin::new(s).poll_ready(cx) {
249 Poll::Ready(Ok(())) => Poll::Ready(Ok(())),
250 Poll::Ready(Err(e)) => Poll::Ready(Err(either::Either::Right(e))),
251 Poll::Pending => Poll::Pending,
252 },
253 }
254 }
255
256 fn start_send(self: Pin<&mut Self>, item: Item) -> Result<(), Self::Error> {
257 match self.get_mut() {
258 either::Either::Left(s) => Pin::new(s).start_send(item).map_err(either::Either::Left),
259 either::Either::Right(s) => Pin::new(s).start_send(item).map_err(either::Either::Right),
260 }
261 }
262
263 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
264 match self.get_mut() {
265 either::Either::Left(s) => match Pin::new(s).poll_flush(cx) {
266 Poll::Ready(Ok(())) => Poll::Ready(Ok(())),
267 Poll::Ready(Err(e)) => Poll::Ready(Err(either::Either::Left(e))),
268 Poll::Pending => Poll::Pending,
269 },
270 either::Either::Right(s) => match Pin::new(s).poll_flush(cx) {
271 Poll::Ready(Ok(())) => Poll::Ready(Ok(())),
272 Poll::Ready(Err(e)) => Poll::Ready(Err(either::Either::Right(e))),
273 Poll::Pending => Poll::Pending,
274 },
275 }
276 }
277
278 fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
279 match self.get_mut() {
280 either::Either::Left(s) => match Pin::new(s).poll_close(cx) {
281 Poll::Ready(Ok(())) => Poll::Ready(Ok(())),
282 Poll::Ready(Err(e)) => Poll::Ready(Err(either::Either::Left(e))),
283 Poll::Pending => Poll::Pending,
284 },
285 either::Either::Right(s) => match Pin::new(s).poll_close(cx) {
286 Poll::Ready(Ok(())) => Poll::Ready(Ok(())),
287 Poll::Ready(Err(e)) => Poll::Ready(Err(either::Either::Right(e))),
288 Poll::Pending => Poll::Pending,
289 },
290 }
291 }
292}