xelf/future/
sink.rs

1use futures::{stream::FusedStream, Sink, Stream};
2use pin_project::pin_project;
3use std::{
4    pin::Pin,
5    task::{Context, Poll},
6};
7
8impl<T: ?Sized, Item> SinkXlf<Item> for T where T: Sink<Item> {}
9
10pub trait SinkXlf<Item>: Sink<Item> {
11    /// Transforms the error returned by the sink.
12    fn safe_sink_map_err<E, F>(self, f: F) -> SafeSinkMapErr<Self, F>
13    where
14        F: FnMut(Self::Error) -> E,
15        Self: Sized,
16    {
17        SafeSinkMapErr::new(self, f)
18    }
19}
20
21/// Sink for the [`safe_sink_map_err`](super::SinkXlf::safe_sink_map_err) method.
22#[pin_project]
23#[derive(Debug, Clone)]
24pub struct SafeSinkMapErr<Si, F> {
25    #[pin]
26    sink: Si,
27    f: F,
28}
29
30impl<Si, F> SafeSinkMapErr<Si, F> {
31    pub fn new(sink: Si, f: F) -> Self {
32        Self { sink, f }
33    }
34
35    crate::future_delegate_access_inner!(sink, Si, ());
36}
37
38// Forwarding impl of Sink from the underlying sink
39impl<Si, F, E, Item> Sink<Item> for SafeSinkMapErr<Si, F>
40where
41    Si: Sink<Item>,
42    F: FnMut(Si::Error) -> E,
43{
44    type Error = E;
45
46    fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
47        let this = self.as_mut().project();
48        this.sink.poll_ready(cx).map_err(|e| (this.f)(e))
49    }
50
51    fn start_send(mut self: Pin<&mut Self>, item: Item) -> Result<(), Self::Error> {
52        let this = self.as_mut().project();
53        this.sink.start_send(item).map_err(|e| (this.f)(e))
54    }
55
56    fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
57        let this = self.as_mut().project();
58        this.sink.poll_flush(cx).map_err(|e| (this.f)(e))
59    }
60
61    fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
62        let this = self.as_mut().project();
63        this.sink.poll_close(cx).map_err(|e| (this.f)(e))
64    }
65}
66
67// Forwarding impl of Stream from the underlying sink
68impl<S: Stream, F> Stream for SafeSinkMapErr<S, F> {
69    type Item = S::Item;
70
71    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
72        self.project().sink.poll_next(cx)
73    }
74
75    fn size_hint(&self) -> (usize, Option<usize>) {
76        self.sink.size_hint()
77    }
78}
79
80// Forwarding impl of FusedStream from the underlying sink
81impl<S: FusedStream, F> FusedStream for SafeSinkMapErr<S, F> {
82    fn is_terminated(&self) -> bool {
83        self.sink.is_terminated()
84    }
85}