1#![no_std]
9#![cfg_attr(docsrs, feature(doc_auto_cfg))]
10#![cfg_attr(docsrs, feature(doc_cfg_hide))]
11#![cfg_attr(docsrs, doc(cfg_hide(doc)))]
12
13use core::{
14 pin::Pin,
15 task::{Context, Poll},
16};
17
18pub use extend_pinned::ExtendPinned;
19use futures_core::{FusedStream, Stream};
20#[cfg(feature = "sink")]
21use futures_sink::Sink;
22use pin_project::pin_project;
23#[cfg(feature = "route-sink")]
24use route_sink::{FlushRoute, ReadyRoute, ReadySome};
25
26#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Default)]
28#[pin_project]
29pub struct Extending<S, R> {
30 #[pin]
31 incoming: R,
32 #[pin]
33 inner: S,
34}
35
36impl<S, R> Extending<S, R> {
37 #[must_use]
38 pub fn new(incoming: R, inner: S) -> Self {
39 Self { incoming, inner }
40 }
41
42 #[must_use]
44 pub fn as_pin_mut(self: Pin<&mut Self>) -> Pin<&mut S> {
45 self.project().inner
46 }
47
48 #[must_use]
50 pub fn into_inner(self) -> S {
51 self.inner
52 }
53
54 #[must_use]
55 pub fn incoming_pin_mut(self: Pin<&mut Self>) -> Pin<&mut R> {
56 self.project().incoming
57 }
58
59 #[must_use]
60 pub fn incoming(&self) -> &R {
61 &self.incoming
62 }
63
64 #[must_use]
65 pub fn incoming_mut(&mut self) -> &mut R {
66 &mut self.incoming
67 }
68
69 #[must_use]
70 pub fn into_incoming(self) -> R {
71 self.incoming
72 }
73}
74
75impl<S, R> AsRef<S> for Extending<S, R> {
76 fn as_ref(&self) -> &S {
77 &self.inner
78 }
79}
80
81impl<S, R> AsMut<S> for Extending<S, R> {
82 fn as_mut(&mut self) -> &mut S {
83 &mut self.inner
84 }
85}
86
87struct PollIter<'a, 'cx, R> {
88 cx: &'a mut Context<'cx>,
89 incoming: Pin<&'a mut R>,
90}
91
92impl<R: Stream> Iterator for PollIter<'_, '_, R> {
93 type Item = R::Item;
94
95 fn next(&mut self) -> Option<Self::Item> {
96 match self.incoming.as_mut().poll_next(self.cx) {
97 Poll::Ready(o) => o,
98 Poll::Pending => None,
99 }
100 }
101}
102
103impl<A, S: Stream + ExtendPinned<A>, R: FusedStream<Item = A>> Stream for Extending<S, R> {
104 type Item = S::Item;
105
106 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
107 let mut this = self.project();
108 if !this.incoming.is_terminated() {
109 this.inner.as_mut().extend_pinned(PollIter {
110 cx,
111 incoming: this.incoming.as_mut(),
112 })
113 }
114 match this.inner.poll_next(cx) {
115 Poll::Ready(None) if !this.incoming.is_terminated() => Poll::Pending,
116 poll => poll,
117 }
118 }
119}
120
121impl<A, S: FusedStream + ExtendPinned<A>, R: FusedStream<Item = A>> FusedStream
122 for Extending<S, R>
123{
124 fn is_terminated(&self) -> bool {
125 self.inner.is_terminated() && self.incoming.is_terminated()
126 }
127}
128
129#[cfg(feature = "sink")]
130impl<Item, S: Sink<Item>, R> Sink<Item> for Extending<S, R> {
131 type Error = S::Error;
132
133 fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
134 self.project().inner.poll_ready(cx)
135 }
136
137 fn start_send(self: Pin<&mut Self>, item: Item) -> Result<(), Self::Error> {
138 self.project().inner.start_send(item)
139 }
140
141 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
142 self.project().inner.poll_flush(cx)
143 }
144
145 fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
146 self.project().inner.poll_close(cx)
147 }
148}
149
150#[cfg(feature = "route-sink")]
151impl<Route, Msg, S: FlushRoute<Route, Msg>, R> FlushRoute<Route, Msg> for Extending<S, R> {
152 fn poll_flush_route(
153 self: Pin<&mut Self>,
154 route: &Route,
155 cx: &mut Context<'_>,
156 ) -> Poll<Result<(), Self::Error>> {
157 self.project().inner.poll_flush_route(route, cx)
158 }
159
160 fn poll_close_route(
161 self: Pin<&mut Self>,
162 route: &Route,
163 cx: &mut Context<'_>,
164 ) -> Poll<Result<(), Self::Error>> {
165 self.project().inner.poll_close_route(route, cx)
166 }
167}
168
169#[cfg(feature = "route-sink")]
170impl<Route, Msg, S: ReadyRoute<Route, Msg>, R> ReadyRoute<Route, Msg> for Extending<S, R> {
171 fn poll_ready_route(
172 self: Pin<&mut Self>,
173 route: &Route,
174 cx: &mut Context<'_>,
175 ) -> Poll<Result<(), Self::Error>> {
176 self.project().inner.poll_ready_route(route, cx)
177 }
178}
179
180#[cfg(feature = "route-sink")]
181impl<Route, Msg, S: ReadySome<Route, Msg>, R> ReadySome<Route, Msg> for Extending<S, R> {
182 fn poll_ready_some(
183 self: Pin<&mut Self>,
184 cx: &mut Context<'_>,
185 ) -> Poll<Result<Route, Self::Error>> {
186 self.project().inner.poll_ready_some(cx)
187 }
188}
189
190pub trait ExtendingExt: Sized + FusedStream {
191 #[must_use]
192 fn extending<S: ExtendPinned<Self::Item>>(self, inner: S) -> Extending<S, Self> {
193 Extending::new(self, inner)
194 }
195
196 #[must_use]
197 fn extending_default<S: Default + ExtendPinned<Self::Item>>(self) -> Extending<S, Self> {
198 self.extending(Default::default())
199 }
200}
201
202impl<R: FusedStream> ExtendingExt for R {}
203
204impl<S: Default, R> From<R> for Extending<S, R> {
205 fn from(incoming: R) -> Self {
206 Self {
207 incoming,
208 inner: Default::default(),
209 }
210 }
211}