1use futures::{stream::FusedStream, Sink, Stream};
2use pin_project::pin_project;
3use std::{
4 pin::Pin,
5 task::{Context, Poll},
6};
7
8impl<T: ?Sized, Item> SinkXlf<Item> for T where T: Sink<Item> {}
9
10pub trait SinkXlf<Item>: Sink<Item> {
11 fn safe_sink_map_err<E, F>(self, f: F) -> SafeSinkMapErr<Self, F>
13 where
14 F: FnMut(Self::Error) -> E,
15 Self: Sized,
16 {
17 SafeSinkMapErr::new(self, f)
18 }
19}
20
21#[pin_project]
23#[derive(Debug, Clone)]
24pub struct SafeSinkMapErr<Si, F> {
25 #[pin]
26 sink: Si,
27 f: F,
28}
29
30impl<Si, F> SafeSinkMapErr<Si, F> {
31 pub fn new(sink: Si, f: F) -> Self {
32 Self { sink, f }
33 }
34
35 crate::future_delegate_access_inner!(sink, Si, ());
36}
37
38impl<Si, F, E, Item> Sink<Item> for SafeSinkMapErr<Si, F>
40where
41 Si: Sink<Item>,
42 F: FnMut(Si::Error) -> E,
43{
44 type Error = E;
45
46 fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
47 let this = self.as_mut().project();
48 this.sink.poll_ready(cx).map_err(|e| (this.f)(e))
49 }
50
51 fn start_send(mut self: Pin<&mut Self>, item: Item) -> Result<(), Self::Error> {
52 let this = self.as_mut().project();
53 this.sink.start_send(item).map_err(|e| (this.f)(e))
54 }
55
56 fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
57 let this = self.as_mut().project();
58 this.sink.poll_flush(cx).map_err(|e| (this.f)(e))
59 }
60
61 fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
62 let this = self.as_mut().project();
63 this.sink.poll_close(cx).map_err(|e| (this.f)(e))
64 }
65}
66
67impl<S: Stream, F> Stream for SafeSinkMapErr<S, F> {
69 type Item = S::Item;
70
71 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
72 self.project().sink.poll_next(cx)
73 }
74
75 fn size_hint(&self) -> (usize, Option<usize>) {
76 self.sink.size_hint()
77 }
78}
79
80impl<S: FusedStream, F> FusedStream for SafeSinkMapErr<S, F> {
82 fn is_terminated(&self) -> bool {
83 self.sink.is_terminated()
84 }
85}