1#![no_std]
9#![cfg_attr(docsrs, feature(doc_cfg))]
10#![cfg_attr(docsrs, doc(cfg_hide(doc)))]
11
12use core::{
13 pin::Pin,
14 task::{Context, Poll},
15};
16
17pub use extend_pinned::ExtendPinned;
18#[cfg(feature = "sink")]
19use futures_sink::Sink;
20use futures_util::{
21 Stream, StreamExt,
22 stream::{Fuse, FusedStream},
23};
24use pin_project::pin_project;
25#[cfg(feature = "route-sink")]
26use route_sink::{FlushRoute, ReadyRoute, ReadySome};
27
28#[cfg(any(feature = "std", feature = "unstable"))]
29pub mod keyed;
30
31#[derive(Debug)]
33#[pin_project]
34pub struct Extending<S, R> {
35 #[pin]
36 incoming: Fuse<R>,
37 #[pin]
38 inner: S,
39}
40
41impl<S: Default, R: Default + Stream> Default for Extending<S, R> {
42 fn default() -> Self {
43 R::default().into()
44 }
45}
46
47impl<S, R: Stream> Extending<S, R> {
48 #[must_use]
49 pub fn new(incoming: R, inner: S) -> Self {
50 Self {
51 incoming: incoming.fuse(),
52 inner,
53 }
54 }
55
56 #[must_use]
58 pub fn as_pin_mut(self: Pin<&mut Self>) -> Pin<&mut S> {
59 self.project().inner
60 }
61
62 #[must_use]
64 pub fn into_inner(self) -> S {
65 self.inner
66 }
67
68 #[must_use]
69 pub fn incoming_pin_mut(self: Pin<&mut Self>) -> Pin<&mut R> {
70 self.project().incoming.get_pin_mut()
71 }
72
73 #[must_use]
74 pub fn incoming(&self) -> &R {
75 self.incoming.get_ref()
76 }
77
78 #[must_use]
79 pub fn incoming_mut(&mut self) -> &mut R {
80 self.incoming.get_mut()
81 }
82
83 #[must_use]
84 pub fn into_incoming(self) -> R {
85 self.incoming.into_inner()
86 }
87}
88
89impl<S, R> AsRef<S> for Extending<S, R> {
90 fn as_ref(&self) -> &S {
91 &self.inner
92 }
93}
94
95impl<S, R> AsMut<S> for Extending<S, R> {
96 fn as_mut(&mut self) -> &mut S {
97 &mut self.inner
98 }
99}
100
101struct PollIter<'a, 'cx, R> {
102 cx: &'a mut Context<'cx>,
103 incoming: Pin<&'a mut R>,
104}
105
106impl<R: Stream> Iterator for PollIter<'_, '_, R> {
107 type Item = R::Item;
108
109 fn next(&mut self) -> Option<Self::Item> {
110 match self.incoming.as_mut().poll_next(self.cx) {
111 Poll::Ready(o) => o,
112 Poll::Pending => None,
113 }
114 }
115}
116
117impl<A, S: Stream + ExtendPinned<A>, R: Stream<Item = A>> Stream for Extending<S, R> {
118 type Item = S::Item;
119
120 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
121 let mut this = self.project();
122 if !this.incoming.is_terminated() {
123 this.inner.as_mut().extend_pinned(PollIter {
124 cx,
125 incoming: this.incoming.as_mut(),
126 })
127 }
128 match this.inner.poll_next(cx) {
129 Poll::Ready(None) if !this.incoming.is_terminated() => Poll::Pending,
130 poll => poll,
131 }
132 }
133}
134
135impl<A, S: FusedStream + ExtendPinned<A>, R: Stream<Item = A>> FusedStream for Extending<S, R> {
136 fn is_terminated(&self) -> bool {
137 self.inner.is_terminated() && self.incoming.is_terminated()
138 }
139}
140
141#[cfg(feature = "sink")]
142impl<Item, S: Sink<Item>, R> Sink<Item> for Extending<S, R> {
143 type Error = S::Error;
144
145 fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
146 self.project().inner.poll_ready(cx)
147 }
148
149 fn start_send(self: Pin<&mut Self>, item: Item) -> Result<(), Self::Error> {
150 self.project().inner.start_send(item)
151 }
152
153 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
154 self.project().inner.poll_flush(cx)
155 }
156
157 fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
158 self.project().inner.poll_close(cx)
159 }
160}
161
162#[cfg(feature = "route-sink")]
163impl<Route, Msg, S: FlushRoute<Route, Msg>, R> FlushRoute<Route, Msg> for Extending<S, R> {
164 fn poll_flush_route(
165 self: Pin<&mut Self>,
166 route: &Route,
167 cx: &mut Context<'_>,
168 ) -> Poll<Result<(), Self::Error>> {
169 self.project().inner.poll_flush_route(route, cx)
170 }
171
172 fn poll_close_route(
173 self: Pin<&mut Self>,
174 route: &Route,
175 cx: &mut Context<'_>,
176 ) -> Poll<Result<(), Self::Error>> {
177 self.project().inner.poll_close_route(route, cx)
178 }
179}
180
181#[cfg(feature = "route-sink")]
182impl<Route, Msg, S: ReadyRoute<Route, Msg>, R> ReadyRoute<Route, Msg> for Extending<S, R> {
183 fn poll_ready_route(
184 self: Pin<&mut Self>,
185 route: &Route,
186 cx: &mut Context<'_>,
187 ) -> Poll<Result<(), Self::Error>> {
188 self.project().inner.poll_ready_route(route, cx)
189 }
190}
191
192#[cfg(feature = "route-sink")]
193impl<Route, Msg, S: ReadySome<Route, Msg>, R> ReadySome<Route, Msg> for Extending<S, R> {
194 fn poll_ready_some(
195 self: Pin<&mut Self>,
196 cx: &mut Context<'_>,
197 ) -> Poll<Result<Route, Self::Error>> {
198 self.project().inner.poll_ready_some(cx)
199 }
200}
201
202pub trait ExtendingExt: Sized + Stream {
203 #[must_use]
204 fn extending<S: ExtendPinned<Self::Item>>(self, inner: S) -> Extending<S, Self> {
205 Extending::new(self, inner)
206 }
207
208 #[must_use]
209 fn extending_default<S: Default + ExtendPinned<Self::Item>>(self) -> Extending<S, Self> {
210 self.extending(Default::default())
211 }
212}
213
214impl<R: Stream> ExtendingExt for R {}
215
216impl<S: Default, R: Stream> From<R> for Extending<S, R> {
217 fn from(incoming: R) -> Self {
218 Self::new(incoming, Default::default())
219 }
220}