ruchei_extend/
lib.rs

1//! [`Extend`] for [`Pin`]ned values.
2//!
3//! Many of [`ruchei`] combinator traits return [`Extending`] wrapper around something that
4//! implements [`ExtendPinned`] extending from `self`.
5//!
6//! [`ruchei`]: https://docs.rs/ruchei
7
8#![no_std]
9#![cfg_attr(docsrs, feature(doc_auto_cfg))]
10#![cfg_attr(docsrs, feature(doc_cfg_hide))]
11#![cfg_attr(docsrs, doc(cfg_hide(doc)))]
12
13use core::{
14    pin::Pin,
15    task::{Context, Poll},
16};
17
18pub use extend_pinned::ExtendPinned;
19use futures_core::{FusedStream, Stream};
20#[cfg(feature = "sink")]
21use futures_sink::Sink;
22use pin_project::pin_project;
23#[cfg(feature = "route-sink")]
24use route_sink::{FlushRoute, ReadyRoute, ReadySome};
25
26/// Type extending an [`ExtendPinned`] value from a fused stream.
27#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Default)]
28#[pin_project]
29pub struct Extending<S, R> {
30    #[pin]
31    incoming: R,
32    #[pin]
33    inner: S,
34}
35
36impl<S, R> Extending<S, R> {
37    #[must_use]
38    pub fn new(incoming: R, inner: S) -> Self {
39        Self { incoming, inner }
40    }
41
42    /// Pinned mutable reference to the inner stream/sink.
43    #[must_use]
44    pub fn as_pin_mut(self: Pin<&mut Self>) -> Pin<&mut S> {
45        self.project().inner
46    }
47
48    /// Convert into the inner stream/sink.
49    #[must_use]
50    pub fn into_inner(self) -> S {
51        self.inner
52    }
53
54    #[must_use]
55    pub fn incoming_pin_mut(self: Pin<&mut Self>) -> Pin<&mut R> {
56        self.project().incoming
57    }
58
59    #[must_use]
60    pub fn incoming(&self) -> &R {
61        &self.incoming
62    }
63
64    #[must_use]
65    pub fn incoming_mut(&mut self) -> &mut R {
66        &mut self.incoming
67    }
68
69    #[must_use]
70    pub fn into_incoming(self) -> R {
71        self.incoming
72    }
73}
74
75impl<S, R> AsRef<S> for Extending<S, R> {
76    fn as_ref(&self) -> &S {
77        &self.inner
78    }
79}
80
81impl<S, R> AsMut<S> for Extending<S, R> {
82    fn as_mut(&mut self) -> &mut S {
83        &mut self.inner
84    }
85}
86
87struct PollIter<'a, 'cx, R> {
88    cx: &'a mut Context<'cx>,
89    incoming: Pin<&'a mut R>,
90}
91
92impl<R: Stream> Iterator for PollIter<'_, '_, R> {
93    type Item = R::Item;
94
95    fn next(&mut self) -> Option<Self::Item> {
96        match self.incoming.as_mut().poll_next(self.cx) {
97            Poll::Ready(o) => o,
98            Poll::Pending => None,
99        }
100    }
101}
102
103impl<A, S: Stream + ExtendPinned<A>, R: FusedStream<Item = A>> Stream for Extending<S, R> {
104    type Item = S::Item;
105
106    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
107        let mut this = self.project();
108        if !this.incoming.is_terminated() {
109            this.inner.as_mut().extend_pinned(PollIter {
110                cx,
111                incoming: this.incoming.as_mut(),
112            })
113        }
114        match this.inner.poll_next(cx) {
115            Poll::Ready(None) if !this.incoming.is_terminated() => Poll::Pending,
116            poll => poll,
117        }
118    }
119}
120
121impl<A, S: FusedStream + ExtendPinned<A>, R: FusedStream<Item = A>> FusedStream
122    for Extending<S, R>
123{
124    fn is_terminated(&self) -> bool {
125        self.inner.is_terminated() && self.incoming.is_terminated()
126    }
127}
128
129#[cfg(feature = "sink")]
130impl<Item, S: Sink<Item>, R> Sink<Item> for Extending<S, R> {
131    type Error = S::Error;
132
133    fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
134        self.project().inner.poll_ready(cx)
135    }
136
137    fn start_send(self: Pin<&mut Self>, item: Item) -> Result<(), Self::Error> {
138        self.project().inner.start_send(item)
139    }
140
141    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
142        self.project().inner.poll_flush(cx)
143    }
144
145    fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
146        self.project().inner.poll_close(cx)
147    }
148}
149
150#[cfg(feature = "route-sink")]
151impl<Route, Msg, S: FlushRoute<Route, Msg>, R> FlushRoute<Route, Msg> for Extending<S, R> {
152    fn poll_flush_route(
153        self: Pin<&mut Self>,
154        route: &Route,
155        cx: &mut Context<'_>,
156    ) -> Poll<Result<(), Self::Error>> {
157        self.project().inner.poll_flush_route(route, cx)
158    }
159
160    fn poll_close_route(
161        self: Pin<&mut Self>,
162        route: &Route,
163        cx: &mut Context<'_>,
164    ) -> Poll<Result<(), Self::Error>> {
165        self.project().inner.poll_close_route(route, cx)
166    }
167}
168
169#[cfg(feature = "route-sink")]
170impl<Route, Msg, S: ReadyRoute<Route, Msg>, R> ReadyRoute<Route, Msg> for Extending<S, R> {
171    fn poll_ready_route(
172        self: Pin<&mut Self>,
173        route: &Route,
174        cx: &mut Context<'_>,
175    ) -> Poll<Result<(), Self::Error>> {
176        self.project().inner.poll_ready_route(route, cx)
177    }
178}
179
180#[cfg(feature = "route-sink")]
181impl<Route, Msg, S: ReadySome<Route, Msg>, R> ReadySome<Route, Msg> for Extending<S, R> {
182    fn poll_ready_some(
183        self: Pin<&mut Self>,
184        cx: &mut Context<'_>,
185    ) -> Poll<Result<Route, Self::Error>> {
186        self.project().inner.poll_ready_some(cx)
187    }
188}
189
190pub trait ExtendingExt: Sized + FusedStream {
191    #[must_use]
192    fn extending<S: ExtendPinned<Self::Item>>(self, inner: S) -> Extending<S, Self> {
193        Extending::new(self, inner)
194    }
195
196    #[must_use]
197    fn extending_default<S: Default + ExtendPinned<Self::Item>>(self) -> Extending<S, Self> {
198        self.extending(Default::default())
199    }
200}
201
202impl<R: FusedStream> ExtendingExt for R {}
203
204impl<S: Default, R> From<R> for Extending<S, R> {
205    fn from(incoming: R) -> Self {
206        Self {
207            incoming,
208            inner: Default::default(),
209        }
210    }
211}