async_sink/ext/
err_into.rs1use super::Sink;
2use core::marker::PhantomData;
3use core::pin::Pin;
4use core::task::{Context, Poll};
5use tokio_stream::Stream;
6
7#[derive(Debug)]
9#[must_use = "sinks do nothing unless polled"]
10pub struct SinkErrInto<Si, Item, E> {
11 sink: Si,
12 _phantom: PhantomData<fn(Item) -> E>,
13}
14
15impl<Si, Item, E> SinkErrInto<Si, Item, E> {
16 pub(super) fn new(sink: Si) -> Self {
17 Self {
18 sink,
19 _phantom: PhantomData,
20 }
21 }
22
23 pub fn get_ref(&self) -> &Si {
25 &self.sink
26 }
27
28 pub fn get_mut(&mut self) -> &mut Si {
33 &mut self.sink
34 }
35
36 pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut Si> {
41 unsafe { self.map_unchecked_mut(|s| &mut s.sink) }
42 }
43
44 pub fn into_inner(self) -> Si {
49 self.sink
50 }
51}
52
53impl<Si, Item, E> Sink<Item> for SinkErrInto<Si, Item, E>
54where
55 Si: Sink<Item>,
56 Si::Error: Into<E>,
57{
58 type Error = E;
59
60 fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
61 let sink = unsafe { self.map_unchecked_mut(|s| &mut s.sink) };
62 match sink.poll_ready(cx) {
63 Poll::Ready(Ok(())) => Poll::Ready(Ok(())),
64 Poll::Ready(Err(e)) => Poll::Ready(Err(e.into())),
65 Poll::Pending => Poll::Pending,
66 }
67 }
68
69 fn start_send(self: Pin<&mut Self>, item: Item) -> Result<(), Self::Error> {
70 let sink = unsafe { self.map_unchecked_mut(|s| &mut s.sink) };
71 match sink.start_send(item) {
72 Ok(()) => Ok(()),
73 Err(e) => Err(e.into()),
74 }
75 }
76
77 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
78 let sink = unsafe { self.map_unchecked_mut(|s| &mut s.sink) };
79 match sink.poll_flush(cx) {
80 Poll::Ready(Ok(())) => Poll::Ready(Ok(())),
81 Poll::Ready(Err(e)) => Poll::Ready(Err(e.into())),
82 Poll::Pending => Poll::Pending,
83 }
84 }
85
86 fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
87 let sink = unsafe { self.map_unchecked_mut(|s| &mut s.sink) };
88 match sink.poll_close(cx) {
89 Poll::Ready(Ok(())) => Poll::Ready(Ok(())),
90 Poll::Ready(Err(e)) => Poll::Ready(Err(e.into())),
91 Poll::Pending => Poll::Pending,
92 }
93 }
94}
95
96impl<Si, Item, E> Stream for SinkErrInto<Si, Item, E>
98where
99 Si: Sink<Item> + Stream,
100{
101 type Item = Si::Item;
102
103 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
104 unsafe { self.map_unchecked_mut(|s| &mut s.sink) }.poll_next(cx)
105 }
106
107 fn size_hint(&self) -> (usize, Option<usize>) {
108 self.sink.size_hint()
109 }
110}