async_sink/ext/
err_into.rs

1use super::Sink;
2use core::marker::PhantomData;
3use core::pin::Pin;
4use core::task::{Context, Poll};
5use tokio_stream::Stream;
6use tokio_stream_util::FusedStream;
7
8/// Sink for the [`sink_err_into`](super::SinkExt::sink_err_into) method.
9#[derive(Debug)]
10#[must_use = "sinks do nothing unless polled"]
11pub struct SinkErrInto<Si, Item, E> {
12    sink: Si,
13    _phantom: PhantomData<fn(Item) -> E>,
14}
15
16impl<Si, Item, E> SinkErrInto<Si, Item, E> {
17    pub(super) fn new(sink: Si) -> Self {
18        Self {
19            sink,
20            _phantom: PhantomData,
21        }
22    }
23
24    /// Acquires a reference to the underlying sink.
25    pub fn get_ref(&self) -> &Si {
26        &self.sink
27    }
28
29    /// Acquires a mutable reference to the underlying sink.
30    ///
31    /// Note that care must be taken to avoid tampering with the state of the
32    /// sink which may otherwise confuse this combinator.
33    pub fn get_mut(&mut self) -> &mut Si {
34        &mut self.sink
35    }
36
37    /// Acquires a pinned mutable reference to the underlying sink.
38    ///
39    /// Note that care must be taken to avoid tampering with the state of the
40    /// sink which may otherwise confuse this combinator.
41    pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut Si> {
42        unsafe { self.map_unchecked_mut(|s| &mut s.sink) }
43    }
44
45    /// Consumes this combinator, returning the underlying sink.
46    ///
47    /// Note that this may discard intermediate state of this combinator, so
48    /// care should be taken to avoid losing resources when this is called.
49    pub fn into_inner(self) -> Si {
50        self.sink
51    }
52}
53
54impl<Si, Item, E> Sink<Item> for SinkErrInto<Si, Item, E>
55where
56    Si: Sink<Item>,
57    Si::Error: Into<E>,
58{
59    type Error = E;
60
61    fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
62        let sink = unsafe { self.map_unchecked_mut(|s| &mut s.sink) };
63        match sink.poll_ready(cx) {
64            Poll::Ready(Ok(())) => Poll::Ready(Ok(())),
65            Poll::Ready(Err(e)) => Poll::Ready(Err(e.into())),
66            Poll::Pending => Poll::Pending,
67        }
68    }
69
70    fn start_send(self: Pin<&mut Self>, item: Item) -> Result<(), Self::Error> {
71        let sink = unsafe { self.map_unchecked_mut(|s| &mut s.sink) };
72        match sink.start_send(item) {
73            Ok(()) => Ok(()),
74            Err(e) => Err(e.into()),
75        }
76    }
77
78    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
79        let sink = unsafe { self.map_unchecked_mut(|s| &mut s.sink) };
80        match sink.poll_flush(cx) {
81            Poll::Ready(Ok(())) => Poll::Ready(Ok(())),
82            Poll::Ready(Err(e)) => Poll::Ready(Err(e.into())),
83            Poll::Pending => Poll::Pending,
84        }
85    }
86
87    fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
88        let sink = unsafe { self.map_unchecked_mut(|s| &mut s.sink) };
89        match sink.poll_close(cx) {
90            Poll::Ready(Ok(())) => Poll::Ready(Ok(())),
91            Poll::Ready(Err(e)) => Poll::Ready(Err(e.into())),
92            Poll::Pending => Poll::Pending,
93        }
94    }
95}
96
97// Forwarding impl of Stream from the underlying sink
98impl<Si, Item, E> Stream for SinkErrInto<Si, Item, E>
99where
100    Si: Sink<Item> + Stream,
101{
102    type Item = Si::Item;
103
104    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
105        unsafe { self.map_unchecked_mut(|s| &mut s.sink) }.poll_next(cx)
106    }
107
108    fn size_hint(&self) -> (usize, Option<usize>) {
109        self.sink.size_hint()
110    }
111}
112
113impl<Si, Item, E> FusedStream for SinkErrInto<Si, Item, E>
114where
115    Si: Sink<Item> + FusedStream,
116{
117    fn is_terminated(&self) -> bool {
118        self.sink.is_terminated()
119    }
120}