tokio_stream_util/try_stream/ext/
err_into.rs1use super::TryStream;
2use core::marker::PhantomData;
3use core::pin::Pin;
4use core::task::{Context, Poll};
5use tokio_stream::Stream;
6
7#[derive(Clone)]
9#[must_use = "streams do nothing unless polled"]
10pub struct ErrInto<St, E> {
11 stream: St,
12 _phantom: PhantomData<E>,
13}
14
15impl<St, E> ErrInto<St, E> {
16 pub(super) fn new(stream: St) -> Self {
17 Self {
18 stream,
19 _phantom: PhantomData,
20 }
21 }
22
23 pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut St> {
32 unsafe { self.map_unchecked_mut(|s| &mut s.stream) }
34 }
35}
36
37impl<St, E> Stream for ErrInto<St, E>
38where
39 St: TryStream,
40 St::Error: Into<E>,
41{
42 type Item = Result<St::Ok, E>;
43
44 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
45 match self.get_pin_mut().try_poll_next(cx) {
46 Poll::Ready(Some(Ok(ok))) => Poll::Ready(Some(Ok(ok))),
47 Poll::Ready(Some(Err(err))) => Poll::Ready(Some(Err(err.into()))),
48 Poll::Ready(None) => Poll::Ready(None),
49 Poll::Pending => Poll::Pending,
50 }
51 }
52}
53
54#[cfg(feature = "sink")]
55use async_sink::Sink;
56#[cfg(feature = "sink")]
57impl<St, E, Item> async_sink::Sink<Item> for ErrInto<St, E>
58where
59 E: From<St::Error> + core::error::Error,
60 St: Sink<Item>,
61{
62 type Error = E;
63
64 fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
65 match self.get_pin_mut().poll_ready(cx) {
66 Poll::Ready(Ok(())) => Poll::Ready(Ok(())),
67 Poll::Ready(Err(e)) => Poll::Ready(Err(e.into())),
68 Poll::Pending => Poll::Pending,
69 }
70 }
71
72 fn start_send(self: Pin<&mut Self>, item: Item) -> Result<(), Self::Error> {
73 match self.get_pin_mut().start_send(item) {
74 Ok(()) => Ok(()),
75 Err(e) => Err(e.into()),
76 }
77 }
78
79 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
80 match self.get_pin_mut().poll_flush(cx) {
81 Poll::Ready(Ok(())) => Poll::Ready(Ok(())),
82 Poll::Ready(Err(e)) => Poll::Ready(Err(e.into())),
83 Poll::Pending => Poll::Pending,
84 }
85 }
86
87 fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
88 match self.get_pin_mut().poll_close(cx) {
89 Poll::Ready(Ok(())) => Poll::Ready(Ok(())),
90 Poll::Ready(Err(e)) => Poll::Ready(Err(e.into())),
91 Poll::Pending => Poll::Pending,
92 }
93 }
94}