ruchei_extend/
lib.rs

1//! [`Extend`] for [`Pin`]ned values.
2//!
3//! Many of [`ruchei`] combinator traits return [`Extending`] wrapper around something that
4//! implements [`ExtendPinned`] extending from `self`.
5//!
6//! [`ruchei`]: https://docs.rs/ruchei
7
8#![no_std]
9#![cfg_attr(docsrs, feature(doc_cfg))]
10#![cfg_attr(docsrs, doc(cfg_hide(doc)))]
11
12use core::{
13    pin::Pin,
14    task::{Context, Poll},
15};
16
17pub use extend_pinned::ExtendPinned;
18#[cfg(feature = "sink")]
19use futures_sink::Sink;
20use futures_util::{
21    Stream, StreamExt,
22    stream::{Fuse, FusedStream},
23};
24use pin_project::pin_project;
25#[cfg(feature = "route-sink")]
26use route_sink::{FlushRoute, ReadyRoute, ReadySome};
27
28#[cfg(any(feature = "std", feature = "unstable"))]
29pub mod keyed;
30
31/// Type extending an [`ExtendPinned`] value from a fused stream.
32#[derive(Debug)]
33#[pin_project]
34pub struct Extending<S, R> {
35    #[pin]
36    incoming: Fuse<R>,
37    #[pin]
38    inner: S,
39}
40
41impl<S: Default, R: Default + Stream> Default for Extending<S, R> {
42    fn default() -> Self {
43        R::default().into()
44    }
45}
46
47impl<S, R: Stream> Extending<S, R> {
48    #[must_use]
49    pub fn new(incoming: R, inner: S) -> Self {
50        Self {
51            incoming: incoming.fuse(),
52            inner,
53        }
54    }
55
56    /// Pinned mutable reference to the inner stream/sink.
57    #[must_use]
58    pub fn as_pin_mut(self: Pin<&mut Self>) -> Pin<&mut S> {
59        self.project().inner
60    }
61
62    /// Convert into the inner stream/sink.
63    #[must_use]
64    pub fn into_inner(self) -> S {
65        self.inner
66    }
67
68    #[must_use]
69    pub fn incoming_pin_mut(self: Pin<&mut Self>) -> Pin<&mut R> {
70        self.project().incoming.get_pin_mut()
71    }
72
73    #[must_use]
74    pub fn incoming(&self) -> &R {
75        self.incoming.get_ref()
76    }
77
78    #[must_use]
79    pub fn incoming_mut(&mut self) -> &mut R {
80        self.incoming.get_mut()
81    }
82
83    #[must_use]
84    pub fn into_incoming(self) -> R {
85        self.incoming.into_inner()
86    }
87}
88
89impl<S, R> AsRef<S> for Extending<S, R> {
90    fn as_ref(&self) -> &S {
91        &self.inner
92    }
93}
94
95impl<S, R> AsMut<S> for Extending<S, R> {
96    fn as_mut(&mut self) -> &mut S {
97        &mut self.inner
98    }
99}
100
101struct PollIter<'a, 'cx, R> {
102    cx: &'a mut Context<'cx>,
103    incoming: Pin<&'a mut R>,
104}
105
106impl<R: Stream> Iterator for PollIter<'_, '_, R> {
107    type Item = R::Item;
108
109    fn next(&mut self) -> Option<Self::Item> {
110        match self.incoming.as_mut().poll_next(self.cx) {
111            Poll::Ready(o) => o,
112            Poll::Pending => None,
113        }
114    }
115}
116
117impl<A, S: Stream + ExtendPinned<A>, R: Stream<Item = A>> Stream for Extending<S, R> {
118    type Item = S::Item;
119
120    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
121        let mut this = self.project();
122        if !this.incoming.is_terminated() {
123            this.inner.as_mut().extend_pinned(PollIter {
124                cx,
125                incoming: this.incoming.as_mut(),
126            })
127        }
128        match this.inner.poll_next(cx) {
129            Poll::Ready(None) if !this.incoming.is_terminated() => Poll::Pending,
130            poll => poll,
131        }
132    }
133}
134
135impl<A, S: FusedStream + ExtendPinned<A>, R: Stream<Item = A>> FusedStream for Extending<S, R> {
136    fn is_terminated(&self) -> bool {
137        self.inner.is_terminated() && self.incoming.is_terminated()
138    }
139}
140
141#[cfg(feature = "sink")]
142impl<Item, S: Sink<Item>, R> Sink<Item> for Extending<S, R> {
143    type Error = S::Error;
144
145    fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
146        self.project().inner.poll_ready(cx)
147    }
148
149    fn start_send(self: Pin<&mut Self>, item: Item) -> Result<(), Self::Error> {
150        self.project().inner.start_send(item)
151    }
152
153    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
154        self.project().inner.poll_flush(cx)
155    }
156
157    fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
158        self.project().inner.poll_close(cx)
159    }
160}
161
162#[cfg(feature = "route-sink")]
163impl<Route, Msg, S: FlushRoute<Route, Msg>, R> FlushRoute<Route, Msg> for Extending<S, R> {
164    fn poll_flush_route(
165        self: Pin<&mut Self>,
166        route: &Route,
167        cx: &mut Context<'_>,
168    ) -> Poll<Result<(), Self::Error>> {
169        self.project().inner.poll_flush_route(route, cx)
170    }
171
172    fn poll_close_route(
173        self: Pin<&mut Self>,
174        route: &Route,
175        cx: &mut Context<'_>,
176    ) -> Poll<Result<(), Self::Error>> {
177        self.project().inner.poll_close_route(route, cx)
178    }
179}
180
181#[cfg(feature = "route-sink")]
182impl<Route, Msg, S: ReadyRoute<Route, Msg>, R> ReadyRoute<Route, Msg> for Extending<S, R> {
183    fn poll_ready_route(
184        self: Pin<&mut Self>,
185        route: &Route,
186        cx: &mut Context<'_>,
187    ) -> Poll<Result<(), Self::Error>> {
188        self.project().inner.poll_ready_route(route, cx)
189    }
190}
191
192#[cfg(feature = "route-sink")]
193impl<Route, Msg, S: ReadySome<Route, Msg>, R> ReadySome<Route, Msg> for Extending<S, R> {
194    fn poll_ready_some(
195        self: Pin<&mut Self>,
196        cx: &mut Context<'_>,
197    ) -> Poll<Result<Route, Self::Error>> {
198        self.project().inner.poll_ready_some(cx)
199    }
200}
201
202pub trait ExtendingExt: Sized + Stream {
203    #[must_use]
204    fn extending<S: ExtendPinned<Self::Item>>(self, inner: S) -> Extending<S, Self> {
205        Extending::new(self, inner)
206    }
207
208    #[must_use]
209    fn extending_default<S: Default + ExtendPinned<Self::Item>>(self) -> Extending<S, Self> {
210        self.extending(Default::default())
211    }
212}
213
214impl<R: Stream> ExtendingExt for R {}
215
216impl<S: Default, R: Stream> From<R> for Extending<S, R> {
217    fn from(incoming: R) -> Self {
218        Self::new(incoming, Default::default())
219    }
220}