request_response/handler/
protocol.rs

1// Copyright 2020 Parity Technologies (UK) Ltd.
2//
3// Permission is hereby granted, free of charge, to any person obtaining a
4// copy of this software and associated documentation files (the "Software"),
5// to deal in the Software without restriction, including without limitation
6// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7// and/or sell copies of the Software, and to permit persons to whom the
8// Software is furnished to do so, subject to the following conditions:
9//
10// The above copyright notice and this permission notice shall be included in
11// all copies or substantial portions of the Software.
12//
13// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19// DEALINGS IN THE SOFTWARE.
20
21//! The definition of a request/response protocol via inbound
22//! and outbound substream upgrades. The inbound upgrade
23//! receives a request and sends a response, whereas the
24//! outbound upgrade send a request and receives a response.
25
26use crate::RequestId;
27use crate::codec::RequestResponseCodec;
28
29use futures::{channel::oneshot, future::BoxFuture, prelude::*};
30use tetsy_libp2p_core::upgrade::{InboundUpgrade, OutboundUpgrade, UpgradeInfo};
31use tetsy_libp2p_swarm::NegotiatedSubstream;
32use smallvec::SmallVec;
33use std::io;
34
35/// The level of support for a particular protocol.
36#[derive(Debug, Clone)]
37pub enum ProtocolSupport {
38    /// The protocol is only supported for inbound requests.
39    Inbound,
40    /// The protocol is only supported for outbound requests.
41    Outbound,
42    /// The protocol is supported for inbound and outbound requests.
43    Full
44}
45
46impl ProtocolSupport {
47    /// Whether inbound requests are supported.
48    pub fn inbound(&self) -> bool {
49        match self {
50            ProtocolSupport::Inbound | ProtocolSupport::Full => true,
51            ProtocolSupport::Outbound => false,
52        }
53    }
54
55    /// Whether outbound requests are supported.
56    pub fn outbound(&self) -> bool {
57        match self {
58            ProtocolSupport::Outbound | ProtocolSupport::Full => true,
59            ProtocolSupport::Inbound => false,
60        }
61    }
62}
63
64/// Response substream upgrade protocol.
65///
66/// Receives a request and sends a response.
67#[derive(Debug)]
68pub struct ResponseProtocol<TCodec>
69where
70    TCodec: RequestResponseCodec
71{
72    pub(crate) codec: TCodec,
73    pub(crate) protocols: SmallVec<[TCodec::Protocol; 2]>,
74    pub(crate) request_sender: oneshot::Sender<(RequestId, TCodec::Request)>,
75    pub(crate) response_receiver: oneshot::Receiver<TCodec::Response>,
76    pub(crate) request_id: RequestId
77
78}
79
80impl<TCodec> UpgradeInfo for ResponseProtocol<TCodec>
81where
82    TCodec: RequestResponseCodec
83{
84    type Info = TCodec::Protocol;
85    type InfoIter = smallvec::IntoIter<[Self::Info; 2]>;
86
87    fn protocol_info(&self) -> Self::InfoIter {
88        self.protocols.clone().into_iter()
89    }
90}
91
92impl<TCodec> InboundUpgrade<NegotiatedSubstream> for ResponseProtocol<TCodec>
93where
94    TCodec: RequestResponseCodec + Send + 'static,
95{
96    type Output = bool;
97    type Error = io::Error;
98    type Future = BoxFuture<'static, Result<Self::Output, Self::Error>>;
99
100    fn upgrade_inbound(mut self, mut io: NegotiatedSubstream, protocol: Self::Info) -> Self::Future {
101        async move {
102            let read = self.codec.read_request(&protocol, &mut io);
103            let request = read.await?;
104            if let Ok(()) = self.request_sender.send((self.request_id, request)) {
105                if let Ok(response) = self.response_receiver.await {
106                    let write = self.codec.write_response(&protocol, &mut io, response);
107                    write.await?;
108                } else {
109                    io.close().await?;
110                    return Ok(false)
111                }
112            }
113            io.close().await?;
114            Ok(true)
115        }.boxed()
116    }
117}
118
119/// Request substream upgrade protocol.
120///
121/// Sends a request and receives a response.
122#[derive(Debug)]
123pub struct RequestProtocol<TCodec>
124where
125    TCodec: RequestResponseCodec
126{
127    pub(crate) codec: TCodec,
128    pub(crate) protocols: SmallVec<[TCodec::Protocol; 2]>,
129    pub(crate) request_id: RequestId,
130    pub(crate) request: TCodec::Request,
131}
132
133impl<TCodec> UpgradeInfo for RequestProtocol<TCodec>
134where
135    TCodec: RequestResponseCodec
136{
137    type Info = TCodec::Protocol;
138    type InfoIter = smallvec::IntoIter<[Self::Info; 2]>;
139
140    fn protocol_info(&self) -> Self::InfoIter {
141        self.protocols.clone().into_iter()
142    }
143}
144
145impl<TCodec> OutboundUpgrade<NegotiatedSubstream> for RequestProtocol<TCodec>
146where
147    TCodec: RequestResponseCodec + Send + 'static,
148{
149    type Output = TCodec::Response;
150    type Error = io::Error;
151    type Future = BoxFuture<'static, Result<Self::Output, Self::Error>>;
152
153    fn upgrade_outbound(mut self, mut io: NegotiatedSubstream, protocol: Self::Info) -> Self::Future {
154        async move {
155            let write = self.codec.write_request(&protocol, &mut io, self.request);
156            write.await?;
157            io.close().await?;
158            let read = self.codec.read_response(&protocol, &mut io);
159            let response = read.await?;
160            Ok(response)
161        }.boxed()
162    }
163}