bee_gossip/swarm/protocols/iota_gossip/
handler.rs

1// Copyright 2020-2021 IOTA Stiftung
2// SPDX-License-Identifier: Apache-2.0
3
4use std::{
5    collections::VecDeque,
6    io,
7    task::{Context, Poll},
8};
9
10use libp2p::{
11    core::upgrade::OutboundUpgrade,
12    swarm::{
13        handler::{
14            ConnectionHandler, ConnectionHandlerEvent, ConnectionHandlerUpgrErr, InboundUpgradeSend, KeepAlive,
15            SubstreamProtocol,
16        },
17        NegotiatedSubstream,
18    },
19    Multiaddr,
20};
21use log::*;
22
23use super::{event::IotaGossipHandlerEvent, id::IotaGossipIdentifier, upgrade::IotaGossipProtocolUpgrade};
24use crate::network::origin::Origin;
25
26pub struct GossipProtocolHandler {
27    /// Exchanged protocol information necessary during negotiation.
28    info: IotaGossipIdentifier,
29
30    /// Keep alive setting.
31    keep_alive: KeepAlive,
32
33    /// All events produced by this handler.
34    events: VecDeque<ConnectionHandlerEvent<IotaGossipProtocolUpgrade, (), IotaGossipHandlerEvent, io::Error>>,
35}
36
37#[derive(Debug)]
38pub struct IotaGossipHandlerInEvent {
39    pub origin: Origin,
40}
41
42impl GossipProtocolHandler {
43    pub fn new(info: IotaGossipIdentifier) -> Self {
44        Self {
45            info,
46            keep_alive: KeepAlive::Yes,
47            events: VecDeque::with_capacity(16),
48        }
49    }
50}
51
52impl ConnectionHandler for GossipProtocolHandler {
53    type InEvent = IotaGossipHandlerInEvent;
54    type OutEvent = IotaGossipHandlerEvent;
55    type Error = io::Error;
56    type InboundProtocol = IotaGossipProtocolUpgrade;
57    type OutboundProtocol = IotaGossipProtocolUpgrade;
58    type InboundOpenInfo = ();
59    type OutboundOpenInfo = ();
60
61    /// **libp2p docs**:
62    ///
63    /// The [`InboundUpgrade`](libp2p_core::upgrade::InboundUpgrade) to apply on inbound
64    /// substreams to negotiate the desired protocols.
65    ///
66    /// > **Note**: The returned `InboundUpgrade` should always accept all the generally
67    /// > supported protocols, even if in a specific context a particular one is
68    /// > not supported, (eg. when only allowing one substream at a time for a protocol).
69    /// > This allows a remote to put the list of supported protocols in a cache.
70    fn listen_protocol(&self) -> SubstreamProtocol<Self::InboundProtocol, Self::InboundOpenInfo> {
71        debug!("gossip handler: responding to listen protocol request.");
72
73        SubstreamProtocol::new(IotaGossipProtocolUpgrade::new(self.info.clone()), ())
74    }
75
76    /// **libp2p docs**:
77    ///
78    /// Injects an event coming from the outside in the handler.
79    fn inject_event(&mut self, incoming_event: IotaGossipHandlerInEvent) {
80        debug!("gossip handler: received in-event: {:?}", incoming_event);
81
82        let IotaGossipHandlerInEvent { origin } = incoming_event;
83
84        // We only send the upgrade request if this handler belongs to an outbound connection.
85        if origin == Origin::Outbound {
86            let send_request = ConnectionHandlerEvent::OutboundSubstreamRequest {
87                protocol: SubstreamProtocol::new(IotaGossipProtocolUpgrade::new(self.info.clone()), ()),
88            };
89
90            debug!("gossip handler: sending protocol upgrade request.");
91
92            self.events.push_back(send_request);
93        }
94    }
95
96    /// **libp2p docs**:
97    ///
98    /// Injects the output of a successful upgrade on a new inbound substream.
99    fn inject_fully_negotiated_inbound(&mut self, new_inbound: NegotiatedSubstream, _: Self::InboundOpenInfo) {
100        let negotiated_inbound = ConnectionHandlerEvent::Custom(IotaGossipHandlerEvent::UpgradeCompleted {
101            substream: Box::new(new_inbound),
102        });
103
104        debug!("gossip handler: fully negotiated inbound.");
105
106        self.events.push_back(negotiated_inbound);
107    }
108
109    /// **libp2p docs**:
110    ///
111    /// Injects the output of a successful upgrade on a new outbound substream.
112    ///
113    /// The second argument is the information that was previously passed to
114    /// [`ConnectionHandlerEvent::OutboundSubstreamRequest`].
115    fn inject_fully_negotiated_outbound(&mut self, new_outbound: NegotiatedSubstream, _: Self::OutboundOpenInfo) {
116        let negotiated_outbound = ConnectionHandlerEvent::Custom(IotaGossipHandlerEvent::UpgradeCompleted {
117            substream: Box::new(new_outbound),
118        });
119
120        debug!("gossip handler: fully negotiated outbound.");
121
122        self.events.push_back(negotiated_outbound);
123    }
124
125    /// **libp2p docs**:
126    ///
127    /// Notifies the handler of a change in the address of the remote.
128    fn inject_address_change(&mut self, new_address: &Multiaddr) {
129        debug!("gossip handler: new address: {}", new_address);
130    }
131
132    /// **libp2p docs**:
133    ///
134    /// Indicates to the handler that upgrading an outbound substream to the given protocol has failed.
135    fn inject_dial_upgrade_error(
136        &mut self,
137        _: Self::OutboundOpenInfo,
138        e: ConnectionHandlerUpgrErr<<Self::OutboundProtocol as OutboundUpgrade<NegotiatedSubstream>>::Error>,
139    ) {
140        debug!("gossip handler: outbound upgrade error: {:?}", e);
141
142        // TODO: finish event management in case of an error.
143        // self.events.push_back(ProtocolsHandlerEvent::Close(e));
144    }
145
146    /// **libp2p docs**:
147    ///
148    /// Indicates to the handler that upgrading an inbound substream to the given protocol has failed.
149    fn inject_listen_upgrade_error(
150        &mut self,
151        _: Self::InboundOpenInfo,
152        e: ConnectionHandlerUpgrErr<<Self::InboundProtocol as InboundUpgradeSend>::Error>,
153    ) {
154        debug!("gossip handler: inbound upgrade error: {:?}", e);
155
156        // TODO: finish event management in case of an error.
157        // let err = match e {
158        //     ProtocolsHandlerUpgrErr::Timeout => io::Error::new(io::ErrorKind::TimedOut, "timeout"),
159        //     ProtocolsHandlerUpgrErr::Timer => io::Error::new(io::ErrorKind::TimedOut, "timer"),
160        //     ProtocolsHandlerUpgrErr::Upgrade(err) => err,
161        // };
162
163        // self.events.push_back(ProtocolsHandlerEvent::Close(err));
164    }
165
166    /// **libp2p docs**:
167    ///
168    /// Returns until when the connection should be kept alive.
169    ///
170    /// This method is called by the `Swarm` after each invocation of
171    /// [`ConnectionHandler::poll`] to determine if the connection and the associated
172    /// `ProtocolsHandler`s should be kept alive as far as this handler is concerned
173    /// and if so, for how long.
174    ///
175    /// Returning [`KeepAlive::No`] indicates that the connection should be
176    /// closed and this handler destroyed immediately.
177    ///
178    /// Returning [`KeepAlive::Until`] indicates that the connection may be closed
179    /// and this handler destroyed after the specified `Instant`.
180    ///
181    /// Returning [`KeepAlive::Yes`] indicates that the connection should
182    /// be kept alive until the next call to this method.
183    ///
184    /// > **Note**: The connection is always closed and the handler destroyed
185    /// > when [`ConnectionHandler::poll`] returns an error. Furthermore, the
186    /// > connection may be closed for reasons outside of the control
187    /// > of the handler.
188    fn connection_keep_alive(&self) -> KeepAlive {
189        self.keep_alive
190    }
191
192    /// **libp2p docs**:
193    ///
194    /// Should behave like `Stream::poll()`.
195    #[allow(clippy::type_complexity)]
196    fn poll(
197        &mut self,
198        _: &mut Context<'_>,
199    ) -> Poll<ConnectionHandlerEvent<Self::OutboundProtocol, Self::OutboundOpenInfo, Self::OutEvent, Self::Error>> {
200        if let Some(event) = self.events.pop_front() {
201            Poll::Ready(event)
202        } else {
203            Poll::Pending
204        }
205    }
206}