async_sink/ext/
err_into.rs1use super::Sink;
2use core::marker::PhantomData;
3use core::pin::Pin;
4use core::task::{Context, Poll};
5use tokio_stream::Stream;
6use tokio_stream_util::FusedStream;
7
8#[derive(Debug)]
10#[must_use = "sinks do nothing unless polled"]
11pub struct SinkErrInto<Si, Item, E> {
12 sink: Si,
13 _phantom: PhantomData<fn(Item) -> E>,
14}
15
16impl<Si, Item, E> SinkErrInto<Si, Item, E> {
17 pub(super) fn new(sink: Si) -> Self {
18 Self {
19 sink,
20 _phantom: PhantomData,
21 }
22 }
23
24 pub fn get_ref(&self) -> &Si {
26 &self.sink
27 }
28
29 pub fn get_mut(&mut self) -> &mut Si {
34 &mut self.sink
35 }
36
37 pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut Si> {
42 unsafe { self.map_unchecked_mut(|s| &mut s.sink) }
43 }
44
45 pub fn into_inner(self) -> Si {
50 self.sink
51 }
52}
53
54impl<Si, Item, E> Sink<Item> for SinkErrInto<Si, Item, E>
55where
56 Si: Sink<Item>,
57 Si::Error: Into<E>,
58{
59 type Error = E;
60
61 fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
62 let sink = unsafe { self.map_unchecked_mut(|s| &mut s.sink) };
63 match sink.poll_ready(cx) {
64 Poll::Ready(Ok(())) => Poll::Ready(Ok(())),
65 Poll::Ready(Err(e)) => Poll::Ready(Err(e.into())),
66 Poll::Pending => Poll::Pending,
67 }
68 }
69
70 fn start_send(self: Pin<&mut Self>, item: Item) -> Result<(), Self::Error> {
71 let sink = unsafe { self.map_unchecked_mut(|s| &mut s.sink) };
72 match sink.start_send(item) {
73 Ok(()) => Ok(()),
74 Err(e) => Err(e.into()),
75 }
76 }
77
78 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
79 let sink = unsafe { self.map_unchecked_mut(|s| &mut s.sink) };
80 match sink.poll_flush(cx) {
81 Poll::Ready(Ok(())) => Poll::Ready(Ok(())),
82 Poll::Ready(Err(e)) => Poll::Ready(Err(e.into())),
83 Poll::Pending => Poll::Pending,
84 }
85 }
86
87 fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
88 let sink = unsafe { self.map_unchecked_mut(|s| &mut s.sink) };
89 match sink.poll_close(cx) {
90 Poll::Ready(Ok(())) => Poll::Ready(Ok(())),
91 Poll::Ready(Err(e)) => Poll::Ready(Err(e.into())),
92 Poll::Pending => Poll::Pending,
93 }
94 }
95}
96
97impl<Si, Item, E> Stream for SinkErrInto<Si, Item, E>
99where
100 Si: Sink<Item> + Stream,
101{
102 type Item = Si::Item;
103
104 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
105 unsafe { self.map_unchecked_mut(|s| &mut s.sink) }.poll_next(cx)
106 }
107
108 fn size_hint(&self) -> (usize, Option<usize>) {
109 self.sink.size_hint()
110 }
111}
112
113impl<Si, Item, E> FusedStream for SinkErrInto<Si, Item, E>
114where
115 Si: Sink<Item> + FusedStream,
116{
117 fn is_terminated(&self) -> bool {
118 self.sink.is_terminated()
119 }
120}