libp2p_core/
either.rs

1// Copyright 2017 Parity Technologies (UK) Ltd.
2//
3// Permission is hereby granted, free of charge, to any person obtaining a
4// copy of this software and associated documentation files (the "Software"),
5// to deal in the Software without restriction, including without limitation
6// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7// and/or sell copies of the Software, and to permit persons to whom the
8// Software is furnished to do so, subject to the following conditions:
9//
10// The above copyright notice and this permission notice shall be included in
11// all copies or substantial portions of the Software.
12//
13// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19// DEALINGS IN THE SOFTWARE.
20
21use crate::{
22    muxing::{StreamMuxer, StreamMuxerEvent},
23    ProtocolName,
24    transport::{Transport, ListenerEvent, TransportError},
25    Multiaddr
26};
27use futures::{prelude::*, io::{IoSlice, IoSliceMut}};
28use pin_project::pin_project;
29use std::{fmt, io::{Error as IoError}, pin::Pin, task::Context, task::Poll};
30
31#[derive(Debug, Copy, Clone)]
32pub enum EitherError<A, B> {
33    A(A),
34    B(B)
35}
36
37impl<A, B> fmt::Display for EitherError<A, B>
38where
39    A: fmt::Display,
40    B: fmt::Display
41{
42    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
43        match self {
44            EitherError::A(a) => a.fmt(f),
45            EitherError::B(b) => b.fmt(f)
46        }
47    }
48}
49
50impl<A, B> std::error::Error for EitherError<A, B>
51where
52    A: std::error::Error,
53    B: std::error::Error
54{
55    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
56        match self {
57            EitherError::A(a) => a.source(),
58            EitherError::B(b) => b.source()
59        }
60    }
61}
62
63/// Implements `AsyncRead` and `AsyncWrite` and dispatches all method calls to
64/// either `First` or `Second`.
65#[pin_project(project = EitherOutputProj)]
66#[derive(Debug, Copy, Clone)]
67pub enum EitherOutput<A, B> {
68    First(#[pin] A),
69    Second(#[pin] B),
70}
71
72impl<A, B> AsyncRead for EitherOutput<A, B>
73where
74    A: AsyncRead,
75    B: AsyncRead,
76{
77    fn poll_read(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut [u8]) -> Poll<Result<usize, IoError>> {
78        match self.project() {
79            EitherOutputProj::First(a) => AsyncRead::poll_read(a, cx, buf),
80            EitherOutputProj::Second(b) => AsyncRead::poll_read(b, cx, buf),
81        }
82    }
83
84    fn poll_read_vectored(self: Pin<&mut Self>, cx: &mut Context<'_>, bufs: &mut [IoSliceMut<'_>])
85        -> Poll<Result<usize, IoError>>
86    {
87        match self.project() {
88            EitherOutputProj::First(a) => AsyncRead::poll_read_vectored(a, cx, bufs),
89            EitherOutputProj::Second(b) => AsyncRead::poll_read_vectored(b, cx, bufs),
90        }
91    }
92}
93
94impl<A, B> AsyncWrite for EitherOutput<A, B>
95where
96    A: AsyncWrite,
97    B: AsyncWrite,
98{
99    fn poll_write(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll<Result<usize, IoError>> {
100        match self.project() {
101            EitherOutputProj::First(a) => AsyncWrite::poll_write(a, cx, buf),
102            EitherOutputProj::Second(b) => AsyncWrite::poll_write(b, cx, buf),
103        }
104    }
105
106    fn poll_write_vectored(self: Pin<&mut Self>, cx: &mut Context<'_>, bufs: &[IoSlice<'_>])
107        -> Poll<Result<usize, IoError>>
108    {
109        match self.project() {
110            EitherOutputProj::First(a) => AsyncWrite::poll_write_vectored(a, cx, bufs),
111            EitherOutputProj::Second(b) => AsyncWrite::poll_write_vectored(b, cx, bufs),
112        }
113    }
114
115    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), IoError>> {
116        match self.project() {
117            EitherOutputProj::First(a) => AsyncWrite::poll_flush(a, cx),
118            EitherOutputProj::Second(b) => AsyncWrite::poll_flush(b, cx),
119        }
120    }
121
122    fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), IoError>> {
123        match self.project() {
124            EitherOutputProj::First(a) => AsyncWrite::poll_close(a, cx),
125            EitherOutputProj::Second(b) => AsyncWrite::poll_close(b, cx),
126        }
127    }
128}
129
130impl<A, B, I> Stream for EitherOutput<A, B>
131where
132    A: TryStream<Ok = I>,
133    B: TryStream<Ok = I>,
134{
135    type Item = Result<I, EitherError<A::Error, B::Error>>;
136
137    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
138        match self.project() {
139            EitherOutputProj::First(a) => TryStream::try_poll_next(a, cx)
140                .map(|v| v.map(|r| r.map_err(EitherError::A))),
141            EitherOutputProj::Second(b) => TryStream::try_poll_next(b, cx)
142                .map(|v| v.map(|r| r.map_err(EitherError::B))),
143        }
144    }
145}
146
147impl<A, B, I> Sink<I> for EitherOutput<A, B>
148where
149    A: Sink<I>,
150    B: Sink<I>,
151{
152    type Error = EitherError<A::Error, B::Error>;
153
154    fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
155        match self.project() {
156            EitherOutputProj::First(a) => Sink::poll_ready(a, cx).map_err(EitherError::A),
157            EitherOutputProj::Second(b) => Sink::poll_ready(b, cx).map_err(EitherError::B),
158        }
159    }
160
161    fn start_send(self: Pin<&mut Self>, item: I) -> Result<(), Self::Error> {
162        match self.project() {
163            EitherOutputProj::First(a) => Sink::start_send(a, item).map_err(EitherError::A),
164            EitherOutputProj::Second(b) => Sink::start_send(b, item).map_err(EitherError::B),
165        }
166    }
167
168    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
169        match self.project() {
170            EitherOutputProj::First(a) => Sink::poll_flush(a, cx).map_err(EitherError::A),
171            EitherOutputProj::Second(b) => Sink::poll_flush(b, cx).map_err(EitherError::B),
172        }
173    }
174
175    fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
176        match self.project() {
177            EitherOutputProj::First(a) => Sink::poll_close(a, cx).map_err(EitherError::A),
178            EitherOutputProj::Second(b) => Sink::poll_close(b, cx).map_err(EitherError::B),
179        }
180    }
181}
182
183impl<A, B> StreamMuxer for EitherOutput<A, B>
184where
185    A: StreamMuxer,
186    B: StreamMuxer,
187{
188    type Substream = EitherOutput<A::Substream, B::Substream>;
189    type OutboundSubstream = EitherOutbound<A, B>;
190    type Error = IoError;
191
192    fn poll_event(&self, cx: &mut Context<'_>) -> Poll<Result<StreamMuxerEvent<Self::Substream>, Self::Error>> {
193        match self {
194            EitherOutput::First(inner) => inner.poll_event(cx).map(|result| {
195                result.map_err(|e| e.into()).map(|event| {
196                    match event {
197                        StreamMuxerEvent::AddressChange(addr) => StreamMuxerEvent::AddressChange(addr),
198                        StreamMuxerEvent::InboundSubstream(substream) =>
199                            StreamMuxerEvent::InboundSubstream(EitherOutput::First(substream))
200                    }
201                })
202            }),
203            EitherOutput::Second(inner) => inner.poll_event(cx).map(|result| {
204                result.map_err(|e| e.into()).map(|event| {
205                    match event {
206                        StreamMuxerEvent::AddressChange(addr) => StreamMuxerEvent::AddressChange(addr),
207                        StreamMuxerEvent::InboundSubstream(substream) =>
208                            StreamMuxerEvent::InboundSubstream(EitherOutput::Second(substream))
209                    }
210                })
211            }),
212        }
213    }
214
215    fn open_outbound(&self) -> Self::OutboundSubstream {
216        match self {
217            EitherOutput::First(inner) => EitherOutbound::A(inner.open_outbound()),
218            EitherOutput::Second(inner) => EitherOutbound::B(inner.open_outbound()),
219        }
220    }
221
222    fn poll_outbound(&self, cx: &mut Context<'_>, substream: &mut Self::OutboundSubstream) -> Poll<Result<Self::Substream, Self::Error>> {
223        match (self, substream) {
224            (EitherOutput::First(ref inner), EitherOutbound::A(ref mut substream)) => {
225                inner.poll_outbound(cx, substream).map(|p| p.map(EitherOutput::First)).map_err(|e| e.into())
226            },
227            (EitherOutput::Second(ref inner), EitherOutbound::B(ref mut substream)) => {
228                inner.poll_outbound(cx, substream).map(|p| p.map(EitherOutput::Second)).map_err(|e| e.into())
229            },
230            _ => panic!("Wrong API usage")
231        }
232    }
233
234    fn destroy_outbound(&self, substream: Self::OutboundSubstream) {
235        match self {
236            EitherOutput::First(inner) => {
237                match substream {
238                    EitherOutbound::A(substream) => inner.destroy_outbound(substream),
239                    _ => panic!("Wrong API usage")
240                }
241            },
242            EitherOutput::Second(inner) => {
243                match substream {
244                    EitherOutbound::B(substream) => inner.destroy_outbound(substream),
245                    _ => panic!("Wrong API usage")
246                }
247            },
248        }
249    }
250
251    fn read_substream(&self, cx: &mut Context<'_>, sub: &mut Self::Substream, buf: &mut [u8]) -> Poll<Result<usize, Self::Error>> {
252        match (self, sub) {
253            (EitherOutput::First(ref inner), EitherOutput::First(ref mut sub)) => {
254                inner.read_substream(cx, sub, buf).map_err(|e| e.into())
255            },
256            (EitherOutput::Second(ref inner), EitherOutput::Second(ref mut sub)) => {
257                inner.read_substream(cx, sub, buf).map_err(|e| e.into())
258            },
259            _ => panic!("Wrong API usage")
260        }
261    }
262
263    fn write_substream(&self, cx: &mut Context<'_>, sub: &mut Self::Substream, buf: &[u8]) -> Poll<Result<usize, Self::Error>> {
264        match (self, sub) {
265            (EitherOutput::First(ref inner), EitherOutput::First(ref mut sub)) => {
266                inner.write_substream(cx, sub, buf).map_err(|e| e.into())
267            },
268            (EitherOutput::Second(ref inner), EitherOutput::Second(ref mut sub)) => {
269                inner.write_substream(cx, sub, buf).map_err(|e| e.into())
270            },
271            _ => panic!("Wrong API usage")
272        }
273    }
274
275    fn flush_substream(&self, cx: &mut Context<'_>, sub: &mut Self::Substream) -> Poll<Result<(), Self::Error>> {
276        match (self, sub) {
277            (EitherOutput::First(ref inner), EitherOutput::First(ref mut sub)) => {
278                inner.flush_substream(cx, sub).map_err(|e| e.into())
279            },
280            (EitherOutput::Second(ref inner), EitherOutput::Second(ref mut sub)) => {
281                inner.flush_substream(cx, sub).map_err(|e| e.into())
282            },
283            _ => panic!("Wrong API usage")
284        }
285    }
286
287    fn shutdown_substream(&self, cx: &mut Context<'_>, sub: &mut Self::Substream) -> Poll<Result<(), Self::Error>> {
288        match (self, sub) {
289            (EitherOutput::First(ref inner), EitherOutput::First(ref mut sub)) => {
290                inner.shutdown_substream(cx, sub).map_err(|e| e.into())
291            },
292            (EitherOutput::Second(ref inner), EitherOutput::Second(ref mut sub)) => {
293                inner.shutdown_substream(cx, sub).map_err(|e| e.into())
294            },
295            _ => panic!("Wrong API usage")
296        }
297    }
298
299    fn destroy_substream(&self, substream: Self::Substream) {
300        match self {
301            EitherOutput::First(inner) => {
302                match substream {
303                    EitherOutput::First(substream) => inner.destroy_substream(substream),
304                    _ => panic!("Wrong API usage")
305                }
306            },
307            EitherOutput::Second(inner) => {
308                match substream {
309                    EitherOutput::Second(substream) => inner.destroy_substream(substream),
310                    _ => panic!("Wrong API usage")
311                }
312            },
313        }
314    }
315
316    fn close(&self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
317        match self {
318            EitherOutput::First(inner) => inner.close(cx).map_err(|e| e.into()),
319            EitherOutput::Second(inner) => inner.close(cx).map_err(|e| e.into()),
320        }
321    }
322
323    fn flush_all(&self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
324        match self {
325            EitherOutput::First(inner) => inner.flush_all(cx).map_err(|e| e.into()),
326            EitherOutput::Second(inner) => inner.flush_all(cx).map_err(|e| e.into()),
327        }
328    }
329}
330
331#[derive(Debug, Copy, Clone)]
332#[must_use = "futures do nothing unless polled"]
333pub enum EitherOutbound<A: StreamMuxer, B: StreamMuxer> {
334    A(A::OutboundSubstream),
335    B(B::OutboundSubstream),
336}
337
338/// Implements `Stream` and dispatches all method calls to either `First` or `Second`.
339#[pin_project(project = EitherListenStreamProj)]
340#[derive(Debug, Copy, Clone)]
341#[must_use = "futures do nothing unless polled"]
342pub enum EitherListenStream<A, B> {
343    First(#[pin] A),
344    Second(#[pin] B),
345}
346
347impl<AStream, BStream, AInner, BInner, AError, BError> Stream for EitherListenStream<AStream, BStream>
348where
349    AStream: TryStream<Ok = ListenerEvent<AInner, AError>, Error = AError>,
350    BStream: TryStream<Ok = ListenerEvent<BInner, BError>, Error = BError>,
351{
352    type Item = Result<ListenerEvent<EitherFuture<AInner, BInner>, EitherError<AError, BError>>, EitherError<AError, BError>>;
353
354    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
355        match self.project() {
356            EitherListenStreamProj::First(a) => match TryStream::try_poll_next(a, cx) {
357                Poll::Pending => Poll::Pending,
358                Poll::Ready(None) => Poll::Ready(None),
359                Poll::Ready(Some(Ok(le))) => Poll::Ready(Some(Ok(le.map(EitherFuture::First).map_err(EitherError::A)))),
360                Poll::Ready(Some(Err(err))) => Poll::Ready(Some(Err(EitherError::A(err)))),
361            },
362            EitherListenStreamProj::Second(a) => match TryStream::try_poll_next(a, cx) {
363                Poll::Pending => Poll::Pending,
364                Poll::Ready(None) => Poll::Ready(None),
365                Poll::Ready(Some(Ok(le))) => Poll::Ready(Some(Ok(le.map(EitherFuture::Second).map_err(EitherError::B)))),
366                Poll::Ready(Some(Err(err))) => Poll::Ready(Some(Err(EitherError::B(err)))),
367            },
368        }
369    }
370}
371
372/// Implements `Future` and dispatches all method calls to either `First` or `Second`.
373#[pin_project(project = EitherFutureProj)]
374#[derive(Debug, Copy, Clone)]
375#[must_use = "futures do nothing unless polled"]
376pub enum EitherFuture<A, B> {
377    First(#[pin] A),
378    Second(#[pin] B),
379}
380
381impl<AFuture, BFuture, AInner, BInner> Future for EitherFuture<AFuture, BFuture>
382where
383    AFuture: TryFuture<Ok = AInner>,
384    BFuture: TryFuture<Ok = BInner>,
385{
386    type Output = Result<EitherOutput<AInner, BInner>, EitherError<AFuture::Error, BFuture::Error>>;
387
388    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
389        match self.project() {
390            EitherFutureProj::First(a) => TryFuture::try_poll(a, cx)
391                .map_ok(EitherOutput::First).map_err(EitherError::A),
392            EitherFutureProj::Second(a) => TryFuture::try_poll(a, cx)
393                .map_ok(EitherOutput::Second).map_err(EitherError::B),
394        }
395    }
396}
397
398#[pin_project(project = EitherFuture2Proj)]
399#[derive(Debug, Copy, Clone)]
400#[must_use = "futures do nothing unless polled"]
401pub enum EitherFuture2<A, B> { A(#[pin] A), B(#[pin] B) }
402
403impl<AFut, BFut, AItem, BItem, AError, BError> Future for EitherFuture2<AFut, BFut>
404where
405    AFut: TryFuture<Ok = AItem, Error = AError>,
406    BFut: TryFuture<Ok = BItem, Error = BError>,
407{
408    type Output = Result<EitherOutput<AItem, BItem>, EitherError<AError, BError>>;
409
410    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
411        match self.project() {
412            EitherFuture2Proj::A(a) => TryFuture::try_poll(a, cx)
413                .map_ok(EitherOutput::First).map_err(EitherError::A),
414            EitherFuture2Proj::B(a) => TryFuture::try_poll(a, cx)
415                .map_ok(EitherOutput::Second).map_err(EitherError::B),
416        }
417    }
418}
419
420#[derive(Debug, Clone)]
421pub enum EitherName<A, B> { A(A), B(B) }
422
423impl<A: ProtocolName, B: ProtocolName> ProtocolName for EitherName<A, B> {
424    fn protocol_name(&self) -> &[u8] {
425        match self {
426            EitherName::A(a) => a.protocol_name(),
427            EitherName::B(b) => b.protocol_name()
428        }
429    }
430}
431
432#[derive(Debug, Copy, Clone)]
433pub enum EitherTransport<A, B> {
434    Left(A),
435    Right(B),
436}
437
438impl<A, B> Transport for EitherTransport<A, B>
439where
440    B: Transport,
441    A: Transport,
442{
443    type Output = EitherOutput<A::Output, B::Output>;
444    type Error = EitherError<A::Error, B::Error>;
445    type Listener = EitherListenStream<A::Listener, B::Listener>;
446    type ListenerUpgrade = EitherFuture<A::ListenerUpgrade, B::ListenerUpgrade>;
447    type Dial = EitherFuture<A::Dial, B::Dial>;
448
449    fn listen_on(self, addr: Multiaddr) -> Result<Self::Listener, TransportError<Self::Error>> {
450        use TransportError::*;
451        match self {
452            EitherTransport::Left(a) => match a.listen_on(addr) {
453                Ok(listener) => Ok(EitherListenStream::First(listener)),
454                Err(MultiaddrNotSupported(addr)) => Err(MultiaddrNotSupported(addr)),
455                Err(Other(err)) => Err(Other(EitherError::A(err))),
456            },
457            EitherTransport::Right(b) => match b.listen_on(addr) {
458                Ok(listener) => Ok(EitherListenStream::Second(listener)),
459                Err(MultiaddrNotSupported(addr)) => Err(MultiaddrNotSupported(addr)),
460                Err(Other(err)) => Err(Other(EitherError::B(err))),
461            },
462        }
463    }
464
465    fn dial(self, addr: Multiaddr) -> Result<Self::Dial, TransportError<Self::Error>> {
466        use TransportError::*;
467        match self {
468            EitherTransport::Left(a) => match a.dial(addr) {
469                Ok(connec) => Ok(EitherFuture::First(connec)),
470                Err(MultiaddrNotSupported(addr)) => Err(MultiaddrNotSupported(addr)),
471                Err(Other(err)) => Err(Other(EitherError::A(err))),
472            },
473            EitherTransport::Right(b) => match b.dial(addr) {
474                Ok(connec) => Ok(EitherFuture::Second(connec)),
475                Err(MultiaddrNotSupported(addr)) => Err(MultiaddrNotSupported(addr)),
476                Err(Other(err)) => Err(Other(EitherError::B(err))),
477            },
478        }
479    }
480
481    fn address_translation(&self, server: &Multiaddr, observed: &Multiaddr) -> Option<Multiaddr> {
482        match self {
483            EitherTransport::Left(a) => a.address_translation(server, observed),
484            EitherTransport::Right(b) => b.address_translation(server, observed),
485        }
486    }
487}