1use crate::{
22 muxing::{StreamMuxer, StreamMuxerEvent},
23 ProtocolName,
24 transport::{Transport, ListenerEvent, TransportError},
25 Multiaddr
26};
27use futures::{prelude::*, io::{IoSlice, IoSliceMut}};
28use pin_project::pin_project;
29use std::{fmt, io::{Error as IoError}, pin::Pin, task::Context, task::Poll};
30
31#[derive(Debug, Copy, Clone)]
32pub enum EitherError<A, B> {
33 A(A),
34 B(B)
35}
36
37impl<A, B> fmt::Display for EitherError<A, B>
38where
39 A: fmt::Display,
40 B: fmt::Display
41{
42 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
43 match self {
44 EitherError::A(a) => a.fmt(f),
45 EitherError::B(b) => b.fmt(f)
46 }
47 }
48}
49
50impl<A, B> std::error::Error for EitherError<A, B>
51where
52 A: std::error::Error,
53 B: std::error::Error
54{
55 fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
56 match self {
57 EitherError::A(a) => a.source(),
58 EitherError::B(b) => b.source()
59 }
60 }
61}
62
63#[pin_project(project = EitherOutputProj)]
66#[derive(Debug, Copy, Clone)]
67pub enum EitherOutput<A, B> {
68 First(#[pin] A),
69 Second(#[pin] B),
70}
71
72impl<A, B> AsyncRead for EitherOutput<A, B>
73where
74 A: AsyncRead,
75 B: AsyncRead,
76{
77 fn poll_read(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut [u8]) -> Poll<Result<usize, IoError>> {
78 match self.project() {
79 EitherOutputProj::First(a) => AsyncRead::poll_read(a, cx, buf),
80 EitherOutputProj::Second(b) => AsyncRead::poll_read(b, cx, buf),
81 }
82 }
83
84 fn poll_read_vectored(self: Pin<&mut Self>, cx: &mut Context<'_>, bufs: &mut [IoSliceMut<'_>])
85 -> Poll<Result<usize, IoError>>
86 {
87 match self.project() {
88 EitherOutputProj::First(a) => AsyncRead::poll_read_vectored(a, cx, bufs),
89 EitherOutputProj::Second(b) => AsyncRead::poll_read_vectored(b, cx, bufs),
90 }
91 }
92}
93
94impl<A, B> AsyncWrite for EitherOutput<A, B>
95where
96 A: AsyncWrite,
97 B: AsyncWrite,
98{
99 fn poll_write(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll<Result<usize, IoError>> {
100 match self.project() {
101 EitherOutputProj::First(a) => AsyncWrite::poll_write(a, cx, buf),
102 EitherOutputProj::Second(b) => AsyncWrite::poll_write(b, cx, buf),
103 }
104 }
105
106 fn poll_write_vectored(self: Pin<&mut Self>, cx: &mut Context<'_>, bufs: &[IoSlice<'_>])
107 -> Poll<Result<usize, IoError>>
108 {
109 match self.project() {
110 EitherOutputProj::First(a) => AsyncWrite::poll_write_vectored(a, cx, bufs),
111 EitherOutputProj::Second(b) => AsyncWrite::poll_write_vectored(b, cx, bufs),
112 }
113 }
114
115 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), IoError>> {
116 match self.project() {
117 EitherOutputProj::First(a) => AsyncWrite::poll_flush(a, cx),
118 EitherOutputProj::Second(b) => AsyncWrite::poll_flush(b, cx),
119 }
120 }
121
122 fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), IoError>> {
123 match self.project() {
124 EitherOutputProj::First(a) => AsyncWrite::poll_close(a, cx),
125 EitherOutputProj::Second(b) => AsyncWrite::poll_close(b, cx),
126 }
127 }
128}
129
130impl<A, B, I> Stream for EitherOutput<A, B>
131where
132 A: TryStream<Ok = I>,
133 B: TryStream<Ok = I>,
134{
135 type Item = Result<I, EitherError<A::Error, B::Error>>;
136
137 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
138 match self.project() {
139 EitherOutputProj::First(a) => TryStream::try_poll_next(a, cx)
140 .map(|v| v.map(|r| r.map_err(EitherError::A))),
141 EitherOutputProj::Second(b) => TryStream::try_poll_next(b, cx)
142 .map(|v| v.map(|r| r.map_err(EitherError::B))),
143 }
144 }
145}
146
147impl<A, B, I> Sink<I> for EitherOutput<A, B>
148where
149 A: Sink<I>,
150 B: Sink<I>,
151{
152 type Error = EitherError<A::Error, B::Error>;
153
154 fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
155 match self.project() {
156 EitherOutputProj::First(a) => Sink::poll_ready(a, cx).map_err(EitherError::A),
157 EitherOutputProj::Second(b) => Sink::poll_ready(b, cx).map_err(EitherError::B),
158 }
159 }
160
161 fn start_send(self: Pin<&mut Self>, item: I) -> Result<(), Self::Error> {
162 match self.project() {
163 EitherOutputProj::First(a) => Sink::start_send(a, item).map_err(EitherError::A),
164 EitherOutputProj::Second(b) => Sink::start_send(b, item).map_err(EitherError::B),
165 }
166 }
167
168 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
169 match self.project() {
170 EitherOutputProj::First(a) => Sink::poll_flush(a, cx).map_err(EitherError::A),
171 EitherOutputProj::Second(b) => Sink::poll_flush(b, cx).map_err(EitherError::B),
172 }
173 }
174
175 fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
176 match self.project() {
177 EitherOutputProj::First(a) => Sink::poll_close(a, cx).map_err(EitherError::A),
178 EitherOutputProj::Second(b) => Sink::poll_close(b, cx).map_err(EitherError::B),
179 }
180 }
181}
182
183impl<A, B> StreamMuxer for EitherOutput<A, B>
184where
185 A: StreamMuxer,
186 B: StreamMuxer,
187{
188 type Substream = EitherOutput<A::Substream, B::Substream>;
189 type OutboundSubstream = EitherOutbound<A, B>;
190 type Error = IoError;
191
192 fn poll_event(&self, cx: &mut Context<'_>) -> Poll<Result<StreamMuxerEvent<Self::Substream>, Self::Error>> {
193 match self {
194 EitherOutput::First(inner) => inner.poll_event(cx).map(|result| {
195 result.map_err(|e| e.into()).map(|event| {
196 match event {
197 StreamMuxerEvent::AddressChange(addr) => StreamMuxerEvent::AddressChange(addr),
198 StreamMuxerEvent::InboundSubstream(substream) =>
199 StreamMuxerEvent::InboundSubstream(EitherOutput::First(substream))
200 }
201 })
202 }),
203 EitherOutput::Second(inner) => inner.poll_event(cx).map(|result| {
204 result.map_err(|e| e.into()).map(|event| {
205 match event {
206 StreamMuxerEvent::AddressChange(addr) => StreamMuxerEvent::AddressChange(addr),
207 StreamMuxerEvent::InboundSubstream(substream) =>
208 StreamMuxerEvent::InboundSubstream(EitherOutput::Second(substream))
209 }
210 })
211 }),
212 }
213 }
214
215 fn open_outbound(&self) -> Self::OutboundSubstream {
216 match self {
217 EitherOutput::First(inner) => EitherOutbound::A(inner.open_outbound()),
218 EitherOutput::Second(inner) => EitherOutbound::B(inner.open_outbound()),
219 }
220 }
221
222 fn poll_outbound(&self, cx: &mut Context<'_>, substream: &mut Self::OutboundSubstream) -> Poll<Result<Self::Substream, Self::Error>> {
223 match (self, substream) {
224 (EitherOutput::First(ref inner), EitherOutbound::A(ref mut substream)) => {
225 inner.poll_outbound(cx, substream).map(|p| p.map(EitherOutput::First)).map_err(|e| e.into())
226 },
227 (EitherOutput::Second(ref inner), EitherOutbound::B(ref mut substream)) => {
228 inner.poll_outbound(cx, substream).map(|p| p.map(EitherOutput::Second)).map_err(|e| e.into())
229 },
230 _ => panic!("Wrong API usage")
231 }
232 }
233
234 fn destroy_outbound(&self, substream: Self::OutboundSubstream) {
235 match self {
236 EitherOutput::First(inner) => {
237 match substream {
238 EitherOutbound::A(substream) => inner.destroy_outbound(substream),
239 _ => panic!("Wrong API usage")
240 }
241 },
242 EitherOutput::Second(inner) => {
243 match substream {
244 EitherOutbound::B(substream) => inner.destroy_outbound(substream),
245 _ => panic!("Wrong API usage")
246 }
247 },
248 }
249 }
250
251 fn read_substream(&self, cx: &mut Context<'_>, sub: &mut Self::Substream, buf: &mut [u8]) -> Poll<Result<usize, Self::Error>> {
252 match (self, sub) {
253 (EitherOutput::First(ref inner), EitherOutput::First(ref mut sub)) => {
254 inner.read_substream(cx, sub, buf).map_err(|e| e.into())
255 },
256 (EitherOutput::Second(ref inner), EitherOutput::Second(ref mut sub)) => {
257 inner.read_substream(cx, sub, buf).map_err(|e| e.into())
258 },
259 _ => panic!("Wrong API usage")
260 }
261 }
262
263 fn write_substream(&self, cx: &mut Context<'_>, sub: &mut Self::Substream, buf: &[u8]) -> Poll<Result<usize, Self::Error>> {
264 match (self, sub) {
265 (EitherOutput::First(ref inner), EitherOutput::First(ref mut sub)) => {
266 inner.write_substream(cx, sub, buf).map_err(|e| e.into())
267 },
268 (EitherOutput::Second(ref inner), EitherOutput::Second(ref mut sub)) => {
269 inner.write_substream(cx, sub, buf).map_err(|e| e.into())
270 },
271 _ => panic!("Wrong API usage")
272 }
273 }
274
275 fn flush_substream(&self, cx: &mut Context<'_>, sub: &mut Self::Substream) -> Poll<Result<(), Self::Error>> {
276 match (self, sub) {
277 (EitherOutput::First(ref inner), EitherOutput::First(ref mut sub)) => {
278 inner.flush_substream(cx, sub).map_err(|e| e.into())
279 },
280 (EitherOutput::Second(ref inner), EitherOutput::Second(ref mut sub)) => {
281 inner.flush_substream(cx, sub).map_err(|e| e.into())
282 },
283 _ => panic!("Wrong API usage")
284 }
285 }
286
287 fn shutdown_substream(&self, cx: &mut Context<'_>, sub: &mut Self::Substream) -> Poll<Result<(), Self::Error>> {
288 match (self, sub) {
289 (EitherOutput::First(ref inner), EitherOutput::First(ref mut sub)) => {
290 inner.shutdown_substream(cx, sub).map_err(|e| e.into())
291 },
292 (EitherOutput::Second(ref inner), EitherOutput::Second(ref mut sub)) => {
293 inner.shutdown_substream(cx, sub).map_err(|e| e.into())
294 },
295 _ => panic!("Wrong API usage")
296 }
297 }
298
299 fn destroy_substream(&self, substream: Self::Substream) {
300 match self {
301 EitherOutput::First(inner) => {
302 match substream {
303 EitherOutput::First(substream) => inner.destroy_substream(substream),
304 _ => panic!("Wrong API usage")
305 }
306 },
307 EitherOutput::Second(inner) => {
308 match substream {
309 EitherOutput::Second(substream) => inner.destroy_substream(substream),
310 _ => panic!("Wrong API usage")
311 }
312 },
313 }
314 }
315
316 fn close(&self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
317 match self {
318 EitherOutput::First(inner) => inner.close(cx).map_err(|e| e.into()),
319 EitherOutput::Second(inner) => inner.close(cx).map_err(|e| e.into()),
320 }
321 }
322
323 fn flush_all(&self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
324 match self {
325 EitherOutput::First(inner) => inner.flush_all(cx).map_err(|e| e.into()),
326 EitherOutput::Second(inner) => inner.flush_all(cx).map_err(|e| e.into()),
327 }
328 }
329}
330
331#[derive(Debug, Copy, Clone)]
332#[must_use = "futures do nothing unless polled"]
333pub enum EitherOutbound<A: StreamMuxer, B: StreamMuxer> {
334 A(A::OutboundSubstream),
335 B(B::OutboundSubstream),
336}
337
338#[pin_project(project = EitherListenStreamProj)]
340#[derive(Debug, Copy, Clone)]
341#[must_use = "futures do nothing unless polled"]
342pub enum EitherListenStream<A, B> {
343 First(#[pin] A),
344 Second(#[pin] B),
345}
346
347impl<AStream, BStream, AInner, BInner, AError, BError> Stream for EitherListenStream<AStream, BStream>
348where
349 AStream: TryStream<Ok = ListenerEvent<AInner, AError>, Error = AError>,
350 BStream: TryStream<Ok = ListenerEvent<BInner, BError>, Error = BError>,
351{
352 type Item = Result<ListenerEvent<EitherFuture<AInner, BInner>, EitherError<AError, BError>>, EitherError<AError, BError>>;
353
354 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
355 match self.project() {
356 EitherListenStreamProj::First(a) => match TryStream::try_poll_next(a, cx) {
357 Poll::Pending => Poll::Pending,
358 Poll::Ready(None) => Poll::Ready(None),
359 Poll::Ready(Some(Ok(le))) => Poll::Ready(Some(Ok(le.map(EitherFuture::First).map_err(EitherError::A)))),
360 Poll::Ready(Some(Err(err))) => Poll::Ready(Some(Err(EitherError::A(err)))),
361 },
362 EitherListenStreamProj::Second(a) => match TryStream::try_poll_next(a, cx) {
363 Poll::Pending => Poll::Pending,
364 Poll::Ready(None) => Poll::Ready(None),
365 Poll::Ready(Some(Ok(le))) => Poll::Ready(Some(Ok(le.map(EitherFuture::Second).map_err(EitherError::B)))),
366 Poll::Ready(Some(Err(err))) => Poll::Ready(Some(Err(EitherError::B(err)))),
367 },
368 }
369 }
370}
371
372#[pin_project(project = EitherFutureProj)]
374#[derive(Debug, Copy, Clone)]
375#[must_use = "futures do nothing unless polled"]
376pub enum EitherFuture<A, B> {
377 First(#[pin] A),
378 Second(#[pin] B),
379}
380
381impl<AFuture, BFuture, AInner, BInner> Future for EitherFuture<AFuture, BFuture>
382where
383 AFuture: TryFuture<Ok = AInner>,
384 BFuture: TryFuture<Ok = BInner>,
385{
386 type Output = Result<EitherOutput<AInner, BInner>, EitherError<AFuture::Error, BFuture::Error>>;
387
388 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
389 match self.project() {
390 EitherFutureProj::First(a) => TryFuture::try_poll(a, cx)
391 .map_ok(EitherOutput::First).map_err(EitherError::A),
392 EitherFutureProj::Second(a) => TryFuture::try_poll(a, cx)
393 .map_ok(EitherOutput::Second).map_err(EitherError::B),
394 }
395 }
396}
397
398#[pin_project(project = EitherFuture2Proj)]
399#[derive(Debug, Copy, Clone)]
400#[must_use = "futures do nothing unless polled"]
401pub enum EitherFuture2<A, B> { A(#[pin] A), B(#[pin] B) }
402
403impl<AFut, BFut, AItem, BItem, AError, BError> Future for EitherFuture2<AFut, BFut>
404where
405 AFut: TryFuture<Ok = AItem, Error = AError>,
406 BFut: TryFuture<Ok = BItem, Error = BError>,
407{
408 type Output = Result<EitherOutput<AItem, BItem>, EitherError<AError, BError>>;
409
410 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
411 match self.project() {
412 EitherFuture2Proj::A(a) => TryFuture::try_poll(a, cx)
413 .map_ok(EitherOutput::First).map_err(EitherError::A),
414 EitherFuture2Proj::B(a) => TryFuture::try_poll(a, cx)
415 .map_ok(EitherOutput::Second).map_err(EitherError::B),
416 }
417 }
418}
419
420#[derive(Debug, Clone)]
421pub enum EitherName<A, B> { A(A), B(B) }
422
423impl<A: ProtocolName, B: ProtocolName> ProtocolName for EitherName<A, B> {
424 fn protocol_name(&self) -> &[u8] {
425 match self {
426 EitherName::A(a) => a.protocol_name(),
427 EitherName::B(b) => b.protocol_name()
428 }
429 }
430}
431
432#[derive(Debug, Copy, Clone)]
433pub enum EitherTransport<A, B> {
434 Left(A),
435 Right(B),
436}
437
438impl<A, B> Transport for EitherTransport<A, B>
439where
440 B: Transport,
441 A: Transport,
442{
443 type Output = EitherOutput<A::Output, B::Output>;
444 type Error = EitherError<A::Error, B::Error>;
445 type Listener = EitherListenStream<A::Listener, B::Listener>;
446 type ListenerUpgrade = EitherFuture<A::ListenerUpgrade, B::ListenerUpgrade>;
447 type Dial = EitherFuture<A::Dial, B::Dial>;
448
449 fn listen_on(self, addr: Multiaddr) -> Result<Self::Listener, TransportError<Self::Error>> {
450 use TransportError::*;
451 match self {
452 EitherTransport::Left(a) => match a.listen_on(addr) {
453 Ok(listener) => Ok(EitherListenStream::First(listener)),
454 Err(MultiaddrNotSupported(addr)) => Err(MultiaddrNotSupported(addr)),
455 Err(Other(err)) => Err(Other(EitherError::A(err))),
456 },
457 EitherTransport::Right(b) => match b.listen_on(addr) {
458 Ok(listener) => Ok(EitherListenStream::Second(listener)),
459 Err(MultiaddrNotSupported(addr)) => Err(MultiaddrNotSupported(addr)),
460 Err(Other(err)) => Err(Other(EitherError::B(err))),
461 },
462 }
463 }
464
465 fn dial(self, addr: Multiaddr) -> Result<Self::Dial, TransportError<Self::Error>> {
466 use TransportError::*;
467 match self {
468 EitherTransport::Left(a) => match a.dial(addr) {
469 Ok(connec) => Ok(EitherFuture::First(connec)),
470 Err(MultiaddrNotSupported(addr)) => Err(MultiaddrNotSupported(addr)),
471 Err(Other(err)) => Err(Other(EitherError::A(err))),
472 },
473 EitherTransport::Right(b) => match b.dial(addr) {
474 Ok(connec) => Ok(EitherFuture::Second(connec)),
475 Err(MultiaddrNotSupported(addr)) => Err(MultiaddrNotSupported(addr)),
476 Err(Other(err)) => Err(Other(EitherError::B(err))),
477 },
478 }
479 }
480
481 fn address_translation(&self, server: &Multiaddr, observed: &Multiaddr) -> Option<Multiaddr> {
482 match self {
483 EitherTransport::Left(a) => a.address_translation(server, observed),
484 EitherTransport::Right(b) => b.address_translation(server, observed),
485 }
486 }
487}