bee_gossip/swarm/protocols/iota_gossip/handler.rs
1// Copyright 2020-2021 IOTA Stiftung
2// SPDX-License-Identifier: Apache-2.0
3
4use std::{
5 collections::VecDeque,
6 io,
7 task::{Context, Poll},
8};
9
10use libp2p::{
11 core::upgrade::OutboundUpgrade,
12 swarm::{
13 handler::{
14 ConnectionHandler, ConnectionHandlerEvent, ConnectionHandlerUpgrErr, InboundUpgradeSend, KeepAlive,
15 SubstreamProtocol,
16 },
17 NegotiatedSubstream,
18 },
19 Multiaddr,
20};
21use log::*;
22
23use super::{event::IotaGossipHandlerEvent, id::IotaGossipIdentifier, upgrade::IotaGossipProtocolUpgrade};
24use crate::network::origin::Origin;
25
26pub struct GossipProtocolHandler {
27 /// Exchanged protocol information necessary during negotiation.
28 info: IotaGossipIdentifier,
29
30 /// Keep alive setting.
31 keep_alive: KeepAlive,
32
33 /// All events produced by this handler.
34 events: VecDeque<ConnectionHandlerEvent<IotaGossipProtocolUpgrade, (), IotaGossipHandlerEvent, io::Error>>,
35}
36
37#[derive(Debug)]
38pub struct IotaGossipHandlerInEvent {
39 pub origin: Origin,
40}
41
42impl GossipProtocolHandler {
43 pub fn new(info: IotaGossipIdentifier) -> Self {
44 Self {
45 info,
46 keep_alive: KeepAlive::Yes,
47 events: VecDeque::with_capacity(16),
48 }
49 }
50}
51
52impl ConnectionHandler for GossipProtocolHandler {
53 type InEvent = IotaGossipHandlerInEvent;
54 type OutEvent = IotaGossipHandlerEvent;
55 type Error = io::Error;
56 type InboundProtocol = IotaGossipProtocolUpgrade;
57 type OutboundProtocol = IotaGossipProtocolUpgrade;
58 type InboundOpenInfo = ();
59 type OutboundOpenInfo = ();
60
61 /// **libp2p docs**:
62 ///
63 /// The [`InboundUpgrade`](libp2p_core::upgrade::InboundUpgrade) to apply on inbound
64 /// substreams to negotiate the desired protocols.
65 ///
66 /// > **Note**: The returned `InboundUpgrade` should always accept all the generally
67 /// > supported protocols, even if in a specific context a particular one is
68 /// > not supported, (eg. when only allowing one substream at a time for a protocol).
69 /// > This allows a remote to put the list of supported protocols in a cache.
70 fn listen_protocol(&self) -> SubstreamProtocol<Self::InboundProtocol, Self::InboundOpenInfo> {
71 debug!("gossip handler: responding to listen protocol request.");
72
73 SubstreamProtocol::new(IotaGossipProtocolUpgrade::new(self.info.clone()), ())
74 }
75
76 /// **libp2p docs**:
77 ///
78 /// Injects an event coming from the outside in the handler.
79 fn inject_event(&mut self, incoming_event: IotaGossipHandlerInEvent) {
80 debug!("gossip handler: received in-event: {:?}", incoming_event);
81
82 let IotaGossipHandlerInEvent { origin } = incoming_event;
83
84 // We only send the upgrade request if this handler belongs to an outbound connection.
85 if origin == Origin::Outbound {
86 let send_request = ConnectionHandlerEvent::OutboundSubstreamRequest {
87 protocol: SubstreamProtocol::new(IotaGossipProtocolUpgrade::new(self.info.clone()), ()),
88 };
89
90 debug!("gossip handler: sending protocol upgrade request.");
91
92 self.events.push_back(send_request);
93 }
94 }
95
96 /// **libp2p docs**:
97 ///
98 /// Injects the output of a successful upgrade on a new inbound substream.
99 fn inject_fully_negotiated_inbound(&mut self, new_inbound: NegotiatedSubstream, _: Self::InboundOpenInfo) {
100 let negotiated_inbound = ConnectionHandlerEvent::Custom(IotaGossipHandlerEvent::UpgradeCompleted {
101 substream: Box::new(new_inbound),
102 });
103
104 debug!("gossip handler: fully negotiated inbound.");
105
106 self.events.push_back(negotiated_inbound);
107 }
108
109 /// **libp2p docs**:
110 ///
111 /// Injects the output of a successful upgrade on a new outbound substream.
112 ///
113 /// The second argument is the information that was previously passed to
114 /// [`ConnectionHandlerEvent::OutboundSubstreamRequest`].
115 fn inject_fully_negotiated_outbound(&mut self, new_outbound: NegotiatedSubstream, _: Self::OutboundOpenInfo) {
116 let negotiated_outbound = ConnectionHandlerEvent::Custom(IotaGossipHandlerEvent::UpgradeCompleted {
117 substream: Box::new(new_outbound),
118 });
119
120 debug!("gossip handler: fully negotiated outbound.");
121
122 self.events.push_back(negotiated_outbound);
123 }
124
125 /// **libp2p docs**:
126 ///
127 /// Notifies the handler of a change in the address of the remote.
128 fn inject_address_change(&mut self, new_address: &Multiaddr) {
129 debug!("gossip handler: new address: {}", new_address);
130 }
131
132 /// **libp2p docs**:
133 ///
134 /// Indicates to the handler that upgrading an outbound substream to the given protocol has failed.
135 fn inject_dial_upgrade_error(
136 &mut self,
137 _: Self::OutboundOpenInfo,
138 e: ConnectionHandlerUpgrErr<<Self::OutboundProtocol as OutboundUpgrade<NegotiatedSubstream>>::Error>,
139 ) {
140 debug!("gossip handler: outbound upgrade error: {:?}", e);
141
142 // TODO: finish event management in case of an error.
143 // self.events.push_back(ProtocolsHandlerEvent::Close(e));
144 }
145
146 /// **libp2p docs**:
147 ///
148 /// Indicates to the handler that upgrading an inbound substream to the given protocol has failed.
149 fn inject_listen_upgrade_error(
150 &mut self,
151 _: Self::InboundOpenInfo,
152 e: ConnectionHandlerUpgrErr<<Self::InboundProtocol as InboundUpgradeSend>::Error>,
153 ) {
154 debug!("gossip handler: inbound upgrade error: {:?}", e);
155
156 // TODO: finish event management in case of an error.
157 // let err = match e {
158 // ProtocolsHandlerUpgrErr::Timeout => io::Error::new(io::ErrorKind::TimedOut, "timeout"),
159 // ProtocolsHandlerUpgrErr::Timer => io::Error::new(io::ErrorKind::TimedOut, "timer"),
160 // ProtocolsHandlerUpgrErr::Upgrade(err) => err,
161 // };
162
163 // self.events.push_back(ProtocolsHandlerEvent::Close(err));
164 }
165
166 /// **libp2p docs**:
167 ///
168 /// Returns until when the connection should be kept alive.
169 ///
170 /// This method is called by the `Swarm` after each invocation of
171 /// [`ConnectionHandler::poll`] to determine if the connection and the associated
172 /// `ProtocolsHandler`s should be kept alive as far as this handler is concerned
173 /// and if so, for how long.
174 ///
175 /// Returning [`KeepAlive::No`] indicates that the connection should be
176 /// closed and this handler destroyed immediately.
177 ///
178 /// Returning [`KeepAlive::Until`] indicates that the connection may be closed
179 /// and this handler destroyed after the specified `Instant`.
180 ///
181 /// Returning [`KeepAlive::Yes`] indicates that the connection should
182 /// be kept alive until the next call to this method.
183 ///
184 /// > **Note**: The connection is always closed and the handler destroyed
185 /// > when [`ConnectionHandler::poll`] returns an error. Furthermore, the
186 /// > connection may be closed for reasons outside of the control
187 /// > of the handler.
188 fn connection_keep_alive(&self) -> KeepAlive {
189 self.keep_alive
190 }
191
192 /// **libp2p docs**:
193 ///
194 /// Should behave like `Stream::poll()`.
195 #[allow(clippy::type_complexity)]
196 fn poll(
197 &mut self,
198 _: &mut Context<'_>,
199 ) -> Poll<ConnectionHandlerEvent<Self::OutboundProtocol, Self::OutboundOpenInfo, Self::OutEvent, Self::Error>> {
200 if let Some(event) = self.events.pop_front() {
201 Poll::Ready(event)
202 } else {
203 Poll::Pending
204 }
205 }
206}