request_response/
handler.rs

1// Copyright 2020 Parity Technologies (UK) Ltd.
2//
3// Permission is hereby granted, free of charge, to any person obtaining a
4// copy of this software and associated documentation files (the "Software"),
5// to deal in the Software without restriction, including without limitation
6// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7// and/or sell copies of the Software, and to permit persons to whom the
8// Software is furnished to do so, subject to the following conditions:
9//
10// The above copyright notice and this permission notice shall be included in
11// all copies or substantial portions of the Software.
12//
13// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19// DEALINGS IN THE SOFTWARE.
20
21mod protocol;
22
23use crate::{EMPTY_QUEUE_SHRINK_THRESHOLD, RequestId};
24use crate::codec::RequestResponseCodec;
25
26pub use protocol::{RequestProtocol, ResponseProtocol, ProtocolSupport};
27
28use futures::{
29    channel::oneshot,
30    future::BoxFuture,
31    prelude::*,
32    stream::FuturesUnordered
33};
34use tetsy_libp2p_core::{
35    upgrade::{UpgradeError, NegotiationError},
36};
37use tetsy_libp2p_swarm::{
38    SubstreamProtocol,
39    protocols_handler::{
40        KeepAlive,
41        ProtocolsHandler,
42        ProtocolsHandlerEvent,
43        ProtocolsHandlerUpgrErr,
44    }
45};
46use smallvec::SmallVec;
47use std::{
48    collections::VecDeque,
49    io,
50    sync::{atomic::{AtomicU64, Ordering}, Arc},
51    time::Duration,
52    task::{Context, Poll}
53};
54use wasm_timer::Instant;
55
56/// A connection handler of a `RequestResponse` protocol.
57#[doc(hidden)]
58pub struct RequestResponseHandler<TCodec>
59where
60    TCodec: RequestResponseCodec,
61{
62    /// The supported inbound protocols.
63    inbound_protocols: SmallVec<[TCodec::Protocol; 2]>,
64    /// The request/response message codec.
65    codec: TCodec,
66    /// The keep-alive timeout of idle connections. A connection is considered
67    /// idle if there are no outbound substreams.
68    keep_alive_timeout: Duration,
69    /// The timeout for inbound and outbound substreams (i.e. request
70    /// and response processing).
71    substream_timeout: Duration,
72    /// The current connection keep-alive.
73    keep_alive: KeepAlive,
74    /// A pending fatal error that results in the connection being closed.
75    pending_error: Option<ProtocolsHandlerUpgrErr<io::Error>>,
76    /// Queue of events to emit in `poll()`.
77    pending_events: VecDeque<RequestResponseHandlerEvent<TCodec>>,
78    /// Outbound upgrades waiting to be emitted as an `OutboundSubstreamRequest`.
79    outbound: VecDeque<RequestProtocol<TCodec>>,
80    /// Inbound upgrades waiting for the incoming request.
81    inbound: FuturesUnordered<BoxFuture<'static,
82        Result<
83            ((RequestId, TCodec::Request), oneshot::Sender<TCodec::Response>),
84            oneshot::Canceled
85        >>>,
86    inbound_request_id: Arc<AtomicU64>
87}
88
89impl<TCodec> RequestResponseHandler<TCodec>
90where
91    TCodec: RequestResponseCodec,
92{
93    pub(super) fn new(
94        inbound_protocols: SmallVec<[TCodec::Protocol; 2]>,
95        codec: TCodec,
96        keep_alive_timeout: Duration,
97        substream_timeout: Duration,
98        inbound_request_id: Arc<AtomicU64>
99    ) -> Self {
100        Self {
101            inbound_protocols,
102            codec,
103            keep_alive: KeepAlive::Yes,
104            keep_alive_timeout,
105            substream_timeout,
106            outbound: VecDeque::new(),
107            inbound: FuturesUnordered::new(),
108            pending_events: VecDeque::new(),
109            pending_error: None,
110            inbound_request_id
111        }
112    }
113}
114
115/// The events emitted by the [`RequestResponseHandler`].
116#[doc(hidden)]
117#[derive(Debug)]
118pub enum RequestResponseHandlerEvent<TCodec>
119where
120    TCodec: RequestResponseCodec
121{
122    /// A request has been received.
123    Request {
124        request_id: RequestId,
125        request: TCodec::Request,
126        sender: oneshot::Sender<TCodec::Response>
127    },
128    /// A response has been received.
129    Response {
130        request_id: RequestId,
131        response: TCodec::Response
132    },
133    /// A response to an inbound request has been sent.
134    ResponseSent(RequestId),
135    /// A response to an inbound request was omitted as a result
136    /// of dropping the response `sender` of an inbound `Request`.
137    ResponseOmission(RequestId),
138    /// An outbound request timed out while sending the request
139    /// or waiting for the response.
140    OutboundTimeout(RequestId),
141    /// An outbound request failed to negotiate a mutually supported protocol.
142    OutboundUnsupportedProtocols(RequestId),
143    /// An inbound request timed out while waiting for the request
144    /// or sending the response.
145    InboundTimeout(RequestId),
146    /// An inbound request failed to negotiate a mutually supported protocol.
147    InboundUnsupportedProtocols(RequestId),
148}
149
150impl<TCodec> ProtocolsHandler for RequestResponseHandler<TCodec>
151where
152    TCodec: RequestResponseCodec + Send + Clone + 'static,
153{
154    type InEvent = RequestProtocol<TCodec>;
155    type OutEvent = RequestResponseHandlerEvent<TCodec>;
156    type Error = ProtocolsHandlerUpgrErr<io::Error>;
157    type InboundProtocol = ResponseProtocol<TCodec>;
158    type OutboundProtocol = RequestProtocol<TCodec>;
159    type OutboundOpenInfo = RequestId;
160    type InboundOpenInfo = RequestId;
161
162    fn listen_protocol(&self) -> SubstreamProtocol<Self::InboundProtocol, Self::InboundOpenInfo> {
163        // A channel for notifying the handler when the inbound
164        // upgrade received the request.
165        let (rq_send, rq_recv) = oneshot::channel();
166
167        // A channel for notifying the inbound upgrade when the
168        // response is sent.
169        let (rs_send, rs_recv) = oneshot::channel();
170
171        let request_id = RequestId(self.inbound_request_id.fetch_add(1, Ordering::Relaxed));
172
173        // By keeping all I/O inside the `ResponseProtocol` and thus the
174        // inbound substream upgrade via above channels, we ensure that it
175        // is all subject to the configured timeout without extra bookkeeping
176        // for inbound substreams as well as their timeouts and also make the
177        // implementation of inbound and outbound upgrades symmetric in
178        // this sense.
179        let proto = ResponseProtocol {
180            protocols: self.inbound_protocols.clone(),
181            codec: self.codec.clone(),
182            request_sender: rq_send,
183            response_receiver: rs_recv,
184            request_id
185        };
186
187        // The handler waits for the request to come in. It then emits
188        // `RequestResponseHandlerEvent::Request` together with a
189        // `ResponseChannel`.
190        self.inbound.push(rq_recv.map_ok(move |rq| (rq, rs_send)).boxed());
191
192        SubstreamProtocol::new(proto, request_id).with_timeout(self.substream_timeout)
193    }
194
195    fn inject_fully_negotiated_inbound(
196        &mut self,
197        sent: bool,
198        request_id: RequestId
199    ) {
200        if sent {
201            self.pending_events.push_back(
202                RequestResponseHandlerEvent::ResponseSent(request_id))
203        } else {
204            self.pending_events.push_back(
205                RequestResponseHandlerEvent::ResponseOmission(request_id))
206        }
207    }
208
209    fn inject_fully_negotiated_outbound(
210        &mut self,
211        response: TCodec::Response,
212        request_id: RequestId,
213    ) {
214        self.pending_events.push_back(
215            RequestResponseHandlerEvent::Response {
216                request_id, response
217            });
218    }
219
220    fn inject_event(&mut self, request: Self::InEvent) {
221        self.keep_alive = KeepAlive::Yes;
222        self.outbound.push_back(request);
223    }
224
225    fn inject_dial_upgrade_error(
226        &mut self,
227        info: RequestId,
228        error: ProtocolsHandlerUpgrErr<io::Error>,
229    ) {
230        match error {
231            ProtocolsHandlerUpgrErr::Timeout => {
232                self.pending_events.push_back(
233                    RequestResponseHandlerEvent::OutboundTimeout(info));
234            }
235            ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Select(NegotiationError::Failed)) => {
236                // The remote merely doesn't support the protocol(s) we requested.
237                // This is no reason to close the connection, which may
238                // successfully communicate with other protocols already.
239                // An event is reported to permit user code to react to the fact that
240                // the remote peer does not support the requested protocol(s).
241                self.pending_events.push_back(
242                    RequestResponseHandlerEvent::OutboundUnsupportedProtocols(info));
243            }
244            _ => {
245                // Anything else is considered a fatal error or misbehaviour of
246                // the remote peer and results in closing the connection.
247                self.pending_error = Some(error);
248            }
249        }
250    }
251
252    fn inject_listen_upgrade_error(
253        &mut self,
254        info: RequestId,
255        error: ProtocolsHandlerUpgrErr<io::Error>
256    ) {
257        match error {
258            ProtocolsHandlerUpgrErr::Timeout => {
259                self.pending_events.push_back(RequestResponseHandlerEvent::InboundTimeout(info))
260            }
261            ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Select(NegotiationError::Failed)) => {
262                // The local peer merely doesn't support the protocol(s) requested.
263                // This is no reason to close the connection, which may
264                // successfully communicate with other protocols already.
265                // An event is reported to permit user code to react to the fact that
266                // the local peer does not support the requested protocol(s).
267                self.pending_events.push_back(
268                    RequestResponseHandlerEvent::InboundUnsupportedProtocols(info));
269            }
270            _ => {
271                // Anything else is considered a fatal error or misbehaviour of
272                // the remote peer and results in closing the connection.
273                self.pending_error = Some(error);
274            }
275        }
276    }
277
278    fn connection_keep_alive(&self) -> KeepAlive {
279        self.keep_alive
280    }
281
282    fn poll(
283        &mut self,
284        cx: &mut Context<'_>,
285    ) -> Poll<
286        ProtocolsHandlerEvent<RequestProtocol<TCodec>, RequestId, Self::OutEvent, Self::Error>,
287    > {
288        // Check for a pending (fatal) error.
289        if let Some(err) = self.pending_error.take() {
290            // The handler will not be polled again by the `Swarm`.
291            return Poll::Ready(ProtocolsHandlerEvent::Close(err))
292        }
293
294        // Drain pending events.
295        if let Some(event) = self.pending_events.pop_front() {
296            return Poll::Ready(ProtocolsHandlerEvent::Custom(event))
297        } else if self.pending_events.capacity() > EMPTY_QUEUE_SHRINK_THRESHOLD {
298            self.pending_events.shrink_to_fit();
299        }
300
301        // Check for inbound requests.
302        while let Poll::Ready(Some(result)) = self.inbound.poll_next_unpin(cx) {
303            match result {
304                Ok(((id, rq), rs_sender)) => {
305                    // We received an inbound request.
306                    self.keep_alive = KeepAlive::Yes;
307                    return Poll::Ready(ProtocolsHandlerEvent::Custom(
308                        RequestResponseHandlerEvent::Request {
309                            request_id: id, request: rq, sender: rs_sender
310                        }))
311                }
312                Err(oneshot::Canceled) => {
313                    // The inbound upgrade has errored or timed out reading
314                    // or waiting for the request. The handler is informed
315                    // via `inject_listen_upgrade_error`.
316                }
317            }
318        }
319
320        // Emit outbound requests.
321        if let Some(request) = self.outbound.pop_front() {
322            let info = request.request_id;
323            return Poll::Ready(
324                ProtocolsHandlerEvent::OutboundSubstreamRequest {
325                    protocol: SubstreamProtocol::new(request, info)
326                        .with_timeout(self.substream_timeout)
327                },
328            )
329        }
330
331        debug_assert!(self.outbound.is_empty());
332
333        if self.outbound.capacity() > EMPTY_QUEUE_SHRINK_THRESHOLD {
334            self.outbound.shrink_to_fit();
335        }
336
337        if self.inbound.is_empty() && self.keep_alive.is_yes() {
338            // No new inbound or outbound requests. However, we may just have
339            // started the latest inbound or outbound upgrade(s), so make sure
340            // the keep-alive timeout is preceded by the substream timeout.
341            let until = Instant::now() + self.substream_timeout + self.keep_alive_timeout;
342            self.keep_alive = KeepAlive::Until(until);
343        }
344
345        Poll::Pending
346    }
347}
348