libp2p_request_response/handler/
protocol.rs

1// Copyright 2020 Parity Technologies (UK) Ltd.
2//
3// Permission is hereby granted, free of charge, to any person obtaining a
4// copy of this software and associated documentation files (the "Software"),
5// to deal in the Software without restriction, including without limitation
6// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7// and/or sell copies of the Software, and to permit persons to whom the
8// Software is furnished to do so, subject to the following conditions:
9//
10// The above copyright notice and this permission notice shall be included in
11// all copies or substantial portions of the Software.
12//
13// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19// DEALINGS IN THE SOFTWARE.
20
21//! The definition of a request/response protocol via inbound
22//! and outbound substream upgrades. The inbound upgrade
23//! receives a request and sends a response, whereas the
24//! outbound upgrade send a request and receives a response.
25
26use crate::RequestId;
27use crate::codec::RequestResponseCodec;
28
29use futures::{channel::oneshot, future::BoxFuture, prelude::*};
30use libp2p_core::upgrade::{InboundUpgrade, OutboundUpgrade, UpgradeInfo};
31use libp2p_swarm::NegotiatedSubstream;
32use smallvec::SmallVec;
33use std::io;
34
35/// The level of support for a particular protocol.
36#[derive(Debug, Clone)]
37pub enum ProtocolSupport {
38    /// The protocol is only supported for inbound requests.
39    Inbound,
40    /// The protocol is only supported for outbound requests.
41    Outbound,
42    /// The protocol is supported for inbound and outbound requests.
43    Full
44}
45
46impl ProtocolSupport {
47    /// Whether inbound requests are supported.
48    pub fn inbound(&self) -> bool {
49        match self {
50            ProtocolSupport::Inbound | ProtocolSupport::Full => true,
51            ProtocolSupport::Outbound => false,
52        }
53    }
54
55    /// Whether outbound requests are supported.
56    pub fn outbound(&self) -> bool {
57        match self {
58            ProtocolSupport::Outbound | ProtocolSupport::Full => true,
59            ProtocolSupport::Inbound => false,
60        }
61    }
62}
63
64/// Response substream upgrade protocol.
65///
66/// Receives a request and sends a response.
67#[derive(Debug)]
68pub struct ResponseProtocol<TCodec>
69where
70    TCodec: RequestResponseCodec
71{
72    pub(crate) codec: TCodec,
73    pub(crate) protocols: SmallVec<[TCodec::Protocol; 2]>,
74    pub(crate) request_sender: oneshot::Sender<(RequestId, TCodec::Request)>,
75    pub(crate) response_receiver: oneshot::Receiver<TCodec::Response>,
76    pub(crate) request_id: RequestId
77
78}
79
80impl<TCodec> UpgradeInfo for ResponseProtocol<TCodec>
81where
82    TCodec: RequestResponseCodec
83{
84    type Info = TCodec::Protocol;
85    type InfoIter = smallvec::IntoIter<[Self::Info; 2]>;
86
87    fn protocol_info(&self) -> Self::InfoIter {
88        self.protocols.clone().into_iter()
89    }
90}
91
92impl<TCodec> InboundUpgrade<NegotiatedSubstream> for ResponseProtocol<TCodec>
93where
94    TCodec: RequestResponseCodec + Send + 'static,
95{
96    type Output = bool;
97    type Error = io::Error;
98    type Future = BoxFuture<'static, Result<Self::Output, Self::Error>>;
99
100    fn upgrade_inbound(mut self, mut io: NegotiatedSubstream, protocol: Self::Info) -> Self::Future {
101        async move {
102            let read = self.codec.read_request(&protocol, &mut io);
103            let request = read.await?;
104            if let Ok(()) = self.request_sender.send((self.request_id, request)) {
105                if let Ok(response) = self.response_receiver.await {
106                    let write = self.codec.write_response(&protocol, &mut io, response);
107                    write.await?;
108                } else {
109                    return Ok(false)
110                }
111            }
112            io.close().await?;
113            Ok(true)
114        }.boxed()
115    }
116}
117
118/// Request substream upgrade protocol.
119///
120/// Sends a request and receives a response.
121#[derive(Debug)]
122pub struct RequestProtocol<TCodec>
123where
124    TCodec: RequestResponseCodec
125{
126    pub(crate) codec: TCodec,
127    pub(crate) protocols: SmallVec<[TCodec::Protocol; 2]>,
128    pub(crate) request_id: RequestId,
129    pub(crate) request: TCodec::Request,
130}
131
132impl<TCodec> UpgradeInfo for RequestProtocol<TCodec>
133where
134    TCodec: RequestResponseCodec
135{
136    type Info = TCodec::Protocol;
137    type InfoIter = smallvec::IntoIter<[Self::Info; 2]>;
138
139    fn protocol_info(&self) -> Self::InfoIter {
140        self.protocols.clone().into_iter()
141    }
142}
143
144impl<TCodec> OutboundUpgrade<NegotiatedSubstream> for RequestProtocol<TCodec>
145where
146    TCodec: RequestResponseCodec + Send + 'static,
147{
148    type Output = TCodec::Response;
149    type Error = io::Error;
150    type Future = BoxFuture<'static, Result<Self::Output, Self::Error>>;
151
152    fn upgrade_outbound(mut self, mut io: NegotiatedSubstream, protocol: Self::Info) -> Self::Future {
153        async move {
154            let write = self.codec.write_request(&protocol, &mut io, self.request);
155            write.await?;
156            io.close().await?;
157            let read = self.codec.read_response(&protocol, &mut io);
158            let response = read.await?;
159            Ok(response)
160        }.boxed()
161    }
162}