async_sink/ext/
err_into.rs

1use super::Sink;
2use core::marker::PhantomData;
3use core::pin::Pin;
4use core::task::{Context, Poll};
5use tokio_stream::Stream;
6
7/// Sink for the [`sink_err_into`](super::SinkExt::sink_err_into) method.
8#[derive(Debug)]
9#[must_use = "sinks do nothing unless polled"]
10pub struct SinkErrInto<Si, Item, E> {
11    sink: Si,
12    _phantom: PhantomData<fn(Item) -> E>,
13}
14
15impl<Si, Item, E> SinkErrInto<Si, Item, E> {
16    pub(super) fn new(sink: Si) -> Self {
17        Self {
18            sink,
19            _phantom: PhantomData,
20        }
21    }
22
23    /// Acquires a reference to the underlying sink.
24    pub fn get_ref(&self) -> &Si {
25        &self.sink
26    }
27
28    /// Acquires a mutable reference to the underlying sink.
29    ///
30    /// Note that care must be taken to avoid tampering with the state of the
31    /// sink which may otherwise confuse this combinator.
32    pub fn get_mut(&mut self) -> &mut Si {
33        &mut self.sink
34    }
35
36    /// Acquires a pinned mutable reference to the underlying sink.
37    ///
38    /// Note that care must be taken to avoid tampering with the state of the
39    /// sink which may otherwise confuse this combinator.
40    pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut Si> {
41        unsafe { self.map_unchecked_mut(|s| &mut s.sink) }
42    }
43
44    /// Consumes this combinator, returning the underlying sink.
45    ///
46    /// Note that this may discard intermediate state of this combinator, so
47    /// care should be taken to avoid losing resources when this is called.
48    pub fn into_inner(self) -> Si {
49        self.sink
50    }
51}
52
53impl<Si, Item, E> Sink<Item> for SinkErrInto<Si, Item, E>
54where
55    Si: Sink<Item>,
56    Si::Error: Into<E>,
57{
58    type Error = E;
59
60    fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
61        let sink = unsafe { self.map_unchecked_mut(|s| &mut s.sink) };
62        match sink.poll_ready(cx) {
63            Poll::Ready(Ok(())) => Poll::Ready(Ok(())),
64            Poll::Ready(Err(e)) => Poll::Ready(Err(e.into())),
65            Poll::Pending => Poll::Pending,
66        }
67    }
68
69    fn start_send(self: Pin<&mut Self>, item: Item) -> Result<(), Self::Error> {
70        let sink = unsafe { self.map_unchecked_mut(|s| &mut s.sink) };
71        match sink.start_send(item) {
72            Ok(()) => Ok(()),
73            Err(e) => Err(e.into()),
74        }
75    }
76
77    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
78        let sink = unsafe { self.map_unchecked_mut(|s| &mut s.sink) };
79        match sink.poll_flush(cx) {
80            Poll::Ready(Ok(())) => Poll::Ready(Ok(())),
81            Poll::Ready(Err(e)) => Poll::Ready(Err(e.into())),
82            Poll::Pending => Poll::Pending,
83        }
84    }
85
86    fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
87        let sink = unsafe { self.map_unchecked_mut(|s| &mut s.sink) };
88        match sink.poll_close(cx) {
89            Poll::Ready(Ok(())) => Poll::Ready(Ok(())),
90            Poll::Ready(Err(e)) => Poll::Ready(Err(e.into())),
91            Poll::Pending => Poll::Pending,
92        }
93    }
94}
95
96// Forwarding impl of Stream from the underlying sink
97impl<Si, Item, E> Stream for SinkErrInto<Si, Item, E>
98where
99    Si: Sink<Item> + Stream,
100{
101    type Item = Si::Item;
102
103    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
104        unsafe { self.map_unchecked_mut(|s| &mut s.sink) }.poll_next(cx)
105    }
106
107    fn size_hint(&self) -> (usize, Option<usize>) {
108        self.sink.size_hint()
109    }
110}