ant_libp2p_swarm/behaviour/
toggle.rs1use std::task::{Context, Poll};
22
23use either::Either;
24use futures::future;
25use ant_libp2p_core::{transport::PortUse, upgrade::DeniedUpgrade, Endpoint, Multiaddr};
26use libp2p_identity::PeerId;
27
28use crate::{
29 behaviour::FromSwarm,
30 connection::ConnectionId,
31 handler::{
32 AddressChange, ConnectionEvent, ConnectionHandler, ConnectionHandlerEvent,
33 DialUpgradeError, FullyNegotiatedInbound, FullyNegotiatedOutbound, ListenUpgradeError,
34 SubstreamProtocol,
35 },
36 upgrade::SendWrapper,
37 ConnectionDenied, NetworkBehaviour, THandler, THandlerInEvent, THandlerOutEvent, ToSwarm,
38};
39
40pub struct Toggle<TBehaviour> {
44 inner: Option<TBehaviour>,
45}
46
47impl<TBehaviour> Toggle<TBehaviour> {
48 pub fn is_enabled(&self) -> bool {
50 self.inner.is_some()
51 }
52
53 pub fn as_ref(&self) -> Option<&TBehaviour> {
55 self.inner.as_ref()
56 }
57
58 pub fn as_mut(&mut self) -> Option<&mut TBehaviour> {
60 self.inner.as_mut()
61 }
62}
63
64impl<TBehaviour> From<Option<TBehaviour>> for Toggle<TBehaviour> {
65 fn from(inner: Option<TBehaviour>) -> Self {
66 Toggle { inner }
67 }
68}
69
70impl<TBehaviour> NetworkBehaviour for Toggle<TBehaviour>
71where
72 TBehaviour: NetworkBehaviour,
73{
74 type ConnectionHandler = ToggleConnectionHandler<THandler<TBehaviour>>;
75 type ToSwarm = TBehaviour::ToSwarm;
76
77 fn handle_pending_inbound_connection(
78 &mut self,
79 connection_id: ConnectionId,
80 local_addr: &Multiaddr,
81 remote_addr: &Multiaddr,
82 ) -> Result<(), ConnectionDenied> {
83 let inner = match self.inner.as_mut() {
84 None => return Ok(()),
85 Some(inner) => inner,
86 };
87
88 inner.handle_pending_inbound_connection(connection_id, local_addr, remote_addr)?;
89
90 Ok(())
91 }
92
93 fn handle_established_inbound_connection(
94 &mut self,
95 connection_id: ConnectionId,
96 peer: PeerId,
97 local_addr: &Multiaddr,
98 remote_addr: &Multiaddr,
99 ) -> Result<THandler<Self>, ConnectionDenied> {
100 let inner = match self.inner.as_mut() {
101 None => return Ok(ToggleConnectionHandler { inner: None }),
102 Some(inner) => inner,
103 };
104
105 let handler = inner.handle_established_inbound_connection(
106 connection_id,
107 peer,
108 local_addr,
109 remote_addr,
110 )?;
111
112 Ok(ToggleConnectionHandler {
113 inner: Some(handler),
114 })
115 }
116
117 fn handle_pending_outbound_connection(
118 &mut self,
119 connection_id: ConnectionId,
120 maybe_peer: Option<PeerId>,
121 addresses: &[Multiaddr],
122 effective_role: Endpoint,
123 ) -> Result<Vec<Multiaddr>, ConnectionDenied> {
124 let inner = match self.inner.as_mut() {
125 None => return Ok(vec![]),
126 Some(inner) => inner,
127 };
128
129 let addresses = inner.handle_pending_outbound_connection(
130 connection_id,
131 maybe_peer,
132 addresses,
133 effective_role,
134 )?;
135
136 Ok(addresses)
137 }
138
139 fn handle_established_outbound_connection(
140 &mut self,
141 connection_id: ConnectionId,
142 peer: PeerId,
143 addr: &Multiaddr,
144 role_override: Endpoint,
145 port_use: PortUse,
146 ) -> Result<THandler<Self>, ConnectionDenied> {
147 let inner = match self.inner.as_mut() {
148 None => return Ok(ToggleConnectionHandler { inner: None }),
149 Some(inner) => inner,
150 };
151
152 let handler = inner.handle_established_outbound_connection(
153 connection_id,
154 peer,
155 addr,
156 role_override,
157 port_use,
158 )?;
159
160 Ok(ToggleConnectionHandler {
161 inner: Some(handler),
162 })
163 }
164
165 fn on_swarm_event(&mut self, event: FromSwarm) {
166 if let Some(behaviour) = &mut self.inner {
167 behaviour.on_swarm_event(event);
168 }
169 }
170
171 fn on_connection_handler_event(
172 &mut self,
173 peer_id: PeerId,
174 connection_id: ConnectionId,
175 event: THandlerOutEvent<Self>,
176 ) {
177 if let Some(behaviour) = &mut self.inner {
178 behaviour.on_connection_handler_event(peer_id, connection_id, event)
179 }
180 }
181
182 fn poll(
183 &mut self,
184 cx: &mut Context<'_>,
185 ) -> Poll<ToSwarm<Self::ToSwarm, THandlerInEvent<Self>>> {
186 if let Some(inner) = self.inner.as_mut() {
187 inner.poll(cx)
188 } else {
189 Poll::Pending
190 }
191 }
192}
193
194pub struct ToggleConnectionHandler<TInner> {
196 inner: Option<TInner>,
197}
198
199impl<TInner> ToggleConnectionHandler<TInner>
200where
201 TInner: ConnectionHandler,
202{
203 fn on_fully_negotiated_inbound(
204 &mut self,
205 FullyNegotiatedInbound {
206 protocol: out,
207 info,
208 }: FullyNegotiatedInbound<
209 <Self as ConnectionHandler>::InboundProtocol,
210 <Self as ConnectionHandler>::InboundOpenInfo,
211 >,
212 ) {
213 let out = match out {
214 future::Either::Left(out) => out,
215 #[allow(unreachable_patterns)]
217 future::Either::Right(v) => ant_libp2p_core::util::unreachable(v),
218 };
219
220 if let Either::Left(info) = info {
221 self.inner
222 .as_mut()
223 .expect("Can't receive an inbound substream if disabled; QED")
224 .on_connection_event(ConnectionEvent::FullyNegotiatedInbound(
225 FullyNegotiatedInbound {
226 protocol: out,
227 info,
228 },
229 ));
230 } else {
231 panic!("Unexpected Either::Right in enabled `on_fully_negotiated_inbound`.")
232 }
233 }
234
235 fn on_listen_upgrade_error(
236 &mut self,
237 ListenUpgradeError { info, error: err }: ListenUpgradeError<
238 <Self as ConnectionHandler>::InboundOpenInfo,
239 <Self as ConnectionHandler>::InboundProtocol,
240 >,
241 ) {
242 let (inner, info) = match (self.inner.as_mut(), info) {
243 (Some(inner), Either::Left(info)) => (inner, info),
244 (None, Either::Right(())) => return,
246 (Some(_), Either::Right(())) => panic!(
247 "Unexpected `Either::Right` inbound info through \
248 `on_listen_upgrade_error` in enabled state.",
249 ),
250 (None, Either::Left(_)) => panic!(
251 "Unexpected `Either::Left` inbound info through \
252 `on_listen_upgrade_error` in disabled state.",
253 ),
254 };
255
256 let err = match err {
257 Either::Left(e) => e,
258 #[allow(unreachable_patterns)]
260 Either::Right(v) => ant_libp2p_core::util::unreachable(v),
261 };
262
263 inner.on_connection_event(ConnectionEvent::ListenUpgradeError(ListenUpgradeError {
264 info,
265 error: err,
266 }));
267 }
268}
269
270impl<TInner> ConnectionHandler for ToggleConnectionHandler<TInner>
271where
272 TInner: ConnectionHandler,
273{
274 type FromBehaviour = TInner::FromBehaviour;
275 type ToBehaviour = TInner::ToBehaviour;
276 type InboundProtocol = Either<SendWrapper<TInner::InboundProtocol>, SendWrapper<DeniedUpgrade>>;
277 type OutboundProtocol = TInner::OutboundProtocol;
278 type OutboundOpenInfo = TInner::OutboundOpenInfo;
279 type InboundOpenInfo = Either<TInner::InboundOpenInfo, ()>;
280
281 fn listen_protocol(&self) -> SubstreamProtocol<Self::InboundProtocol, Self::InboundOpenInfo> {
282 if let Some(inner) = self.inner.as_ref() {
283 inner
284 .listen_protocol()
285 .map_upgrade(|u| Either::Left(SendWrapper(u)))
286 .map_info(Either::Left)
287 } else {
288 SubstreamProtocol::new(Either::Right(SendWrapper(DeniedUpgrade)), Either::Right(()))
289 }
290 }
291
292 fn on_behaviour_event(&mut self, event: Self::FromBehaviour) {
293 self.inner
294 .as_mut()
295 .expect("Can't receive events if disabled; QED")
296 .on_behaviour_event(event)
297 }
298
299 fn connection_keep_alive(&self) -> bool {
300 self.inner
301 .as_ref()
302 .map(|h| h.connection_keep_alive())
303 .unwrap_or(false)
304 }
305
306 fn poll(
307 &mut self,
308 cx: &mut Context<'_>,
309 ) -> Poll<
310 ConnectionHandlerEvent<Self::OutboundProtocol, Self::OutboundOpenInfo, Self::ToBehaviour>,
311 > {
312 if let Some(inner) = self.inner.as_mut() {
313 inner.poll(cx)
314 } else {
315 Poll::Pending
316 }
317 }
318
319 fn on_connection_event(
320 &mut self,
321 event: ConnectionEvent<
322 Self::InboundProtocol,
323 Self::OutboundProtocol,
324 Self::InboundOpenInfo,
325 Self::OutboundOpenInfo,
326 >,
327 ) {
328 match event {
329 ConnectionEvent::FullyNegotiatedInbound(fully_negotiated_inbound) => {
330 self.on_fully_negotiated_inbound(fully_negotiated_inbound)
331 }
332 ConnectionEvent::FullyNegotiatedOutbound(FullyNegotiatedOutbound {
333 protocol: out,
334 info,
335 }) => self
336 .inner
337 .as_mut()
338 .expect("Can't receive an outbound substream if disabled; QED")
339 .on_connection_event(ConnectionEvent::FullyNegotiatedOutbound(
340 FullyNegotiatedOutbound {
341 protocol: out,
342 info,
343 },
344 )),
345 ConnectionEvent::AddressChange(address_change) => {
346 if let Some(inner) = self.inner.as_mut() {
347 inner.on_connection_event(ConnectionEvent::AddressChange(AddressChange {
348 new_address: address_change.new_address,
349 }));
350 }
351 }
352 ConnectionEvent::DialUpgradeError(DialUpgradeError { info, error: err }) => self
353 .inner
354 .as_mut()
355 .expect("Can't receive an outbound substream if disabled; QED")
356 .on_connection_event(ConnectionEvent::DialUpgradeError(DialUpgradeError {
357 info,
358 error: err,
359 })),
360 ConnectionEvent::ListenUpgradeError(listen_upgrade_error) => {
361 self.on_listen_upgrade_error(listen_upgrade_error)
362 }
363 ConnectionEvent::LocalProtocolsChange(change) => {
364 if let Some(inner) = self.inner.as_mut() {
365 inner.on_connection_event(ConnectionEvent::LocalProtocolsChange(change));
366 }
367 }
368 ConnectionEvent::RemoteProtocolsChange(change) => {
369 if let Some(inner) = self.inner.as_mut() {
370 inner.on_connection_event(ConnectionEvent::RemoteProtocolsChange(change));
371 }
372 }
373 }
374 }
375
376 fn poll_close(&mut self, cx: &mut Context<'_>) -> Poll<Option<Self::ToBehaviour>> {
377 let Some(inner) = self.inner.as_mut() else {
378 return Poll::Ready(None);
379 };
380
381 inner.poll_close(cx)
382 }
383}