Skip to main content

sc_network/protocol/notifications/
handler.rs

1// This file is part of Substrate.
2
3// Copyright (C) Parity Technologies (UK) Ltd.
4// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
5
6// This program is free software: you can redistribute it and/or modify
7// it under the terms of the GNU General Public License as published by
8// the Free Software Foundation, either version 3 of the License, or
9// (at your option) any later version.
10
11// This program is distributed in the hope that it will be useful,
12// but WITHOUT ANY WARRANTY; without even the implied warranty of
13// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14// GNU General Public License for more details.
15
16// You should have received a copy of the GNU General Public License
17// along with this program. If not, see <https://www.gnu.org/licenses/>.
18
19//! Implementations of the `IntoConnectionHandler` and `ConnectionHandler` traits for both incoming
20//! and outgoing substreams for all gossiping protocols.
21//!
22//! This is the main implementation of `ConnectionHandler` in this crate, that handles all the
23//! gossiping protocols that are Substrate-related and outside of the scope of libp2p.
24//!
25//! # Usage
26//!
27//! From an API perspective, for each of its protocols, the [`NotifsHandler`] is always in one of
28//! the following state (see [`State`]):
29//!
30//! - Closed substream. This is the initial state.
31//! - Closed substream, but remote desires them to be open.
32//! - Open substream.
33//! - Open substream, but remote desires them to be closed.
34//!
35//! Each protocol in the [`NotifsHandler`] can spontaneously switch between these states:
36//!
37//! - "Closed substream" to "Closed substream but open desired". When that happens, a
38//! [`NotifsHandlerOut::OpenDesiredByRemote`] is emitted.
39//! - "Closed substream but open desired" to "Closed substream" (i.e. the remote has cancelled
40//! their request). When that happens, a [`NotifsHandlerOut::CloseDesired`] is emitted.
41//! - "Open substream" to "Open substream but close desired". When that happens, a
42//! [`NotifsHandlerOut::CloseDesired`] is emitted.
43//!
44//! The user can instruct a protocol in the `NotifsHandler` to switch from "closed" to "open" or
45//! vice-versa by sending either a [`NotifsHandlerIn::Open`] or a [`NotifsHandlerIn::Close`]. The
46//! `NotifsHandler` must answer with [`NotifsHandlerOut::OpenResultOk`] or
47//! [`NotifsHandlerOut::OpenResultErr`], or with [`NotifsHandlerOut::CloseResult`].
48//!
49//! When a [`NotifsHandlerOut::OpenResultOk`] is emitted, the substream is now in the open state.
50//! When a [`NotifsHandlerOut::OpenResultErr`] or [`NotifsHandlerOut::CloseResult`] is emitted,
51//! the `NotifsHandler` is now (or remains) in the closed state.
52//!
53//! When a [`NotifsHandlerOut::OpenDesiredByRemote`] is emitted, the user should always send back
54//! either a [`NotifsHandlerIn::Open`] or a [`NotifsHandlerIn::Close`].If this isn't done, the
55//! remote will be left in a pending state.
56//!
57//! It is illegal to send a [`NotifsHandlerIn::Open`] before a previously-emitted
58//! [`NotifsHandlerIn::Open`] has gotten an answer.
59
60use crate::{
61	protocol::notifications::upgrade::{
62		NotificationsIn, NotificationsInSubstream, NotificationsOut, NotificationsOutError,
63		NotificationsOutSubstream, UpgradeCollec,
64	},
65	service::metrics::NotificationMetrics,
66	types::ProtocolName,
67};
68
69use bytes::BytesMut;
70use futures::{
71	channel::mpsc,
72	lock::{Mutex as FuturesMutex, MutexGuard as FuturesMutexGuard},
73	prelude::*,
74};
75use libp2p::{
76	swarm::{
77		handler::ConnectionEvent, ConnectionHandler, ConnectionHandlerEvent, Stream,
78		SubstreamProtocol,
79	},
80	PeerId,
81};
82
83use parking_lot::{Mutex, RwLock};
84use std::{
85	collections::VecDeque,
86	mem,
87	pin::Pin,
88	sync::Arc,
89	task::{Context, Poll},
90	time::Duration,
91};
92
93/// Logging target for the file.
94const LOG_TARGET: &str = "sub-libp2p::notification::handler";
95
96/// Number of pending notifications in asynchronous contexts.
97/// See [`NotificationsSink::reserve_notification`] for context.
98pub(crate) const ASYNC_NOTIFICATIONS_BUFFER_SIZE: usize = 8;
99
100/// Number of pending notifications in synchronous contexts.
101const SYNC_NOTIFICATIONS_BUFFER_SIZE: usize = 2048;
102
103/// Maximum duration to open a substream and receive the handshake message. After that, we
104/// consider that we failed to open the substream.
105const OPEN_TIMEOUT: Duration = Duration::from_secs(10);
106
107/// After successfully establishing a connection with the remote, we keep the connection open for
108/// at least this amount of time in order to give the rest of the code the chance to notify us to
109/// open substreams.
110const INITIAL_KEEPALIVE_TIME: Duration = Duration::from_secs(5);
111
112/// The actual handler once the connection has been established.
113///
114/// See the documentation at the module level for more information.
115pub struct NotifsHandler {
116	/// List of notification protocols, specified by the user at initialization.
117	protocols: Vec<Protocol>,
118
119	/// Whether to keep connection alive
120	keep_alive: bool,
121
122	/// Optional future that keeps connection alive for a certain amount of time.
123	// TODO: this should be safe to remove, see https://github.com/paritytech/polkadot-sdk/issues/6350
124	keep_alive_timeout_future: Option<Pin<Box<dyn Future<Output = ()> + Send + 'static>>>,
125
126	/// Remote we are connected to.
127	peer_id: PeerId,
128
129	/// Events to return in priority from `poll`.
130	events_queue: VecDeque<ConnectionHandlerEvent<NotificationsOut, usize, NotifsHandlerOut>>,
131
132	/// Metrics.
133	metrics: Option<Arc<NotificationMetrics>>,
134}
135
136impl NotifsHandler {
137	/// Creates new [`NotifsHandler`].
138	pub fn new(
139		peer_id: PeerId,
140		protocols: Vec<ProtocolConfig>,
141		metrics: Option<NotificationMetrics>,
142	) -> Self {
143		Self {
144			protocols: protocols
145				.into_iter()
146				.map(|config| {
147					let in_upgrade = NotificationsIn::new(
148						config.name.clone(),
149						config.fallback_names.clone(),
150						config.max_notification_size,
151					);
152
153					Protocol { config, in_upgrade, state: State::Closed { pending_opening: false } }
154				})
155				.collect(),
156			peer_id,
157			// Keep connection alive initially until below timeout expires
158			keep_alive: true,
159			// A grace period of `INITIAL_KEEPALIVE_TIME` must be given to leave time for the remote
160			// to express desire to open substreams.
161			// TODO: This is a hack and ideally should not be necessary
162			keep_alive_timeout_future: Some(Box::pin(tokio::time::sleep(INITIAL_KEEPALIVE_TIME))),
163			events_queue: VecDeque::with_capacity(16),
164			metrics: metrics.map_or(None, |metrics| Some(Arc::new(metrics))),
165		}
166	}
167}
168
169/// Configuration for a notifications protocol.
170#[derive(Debug, Clone)]
171pub struct ProtocolConfig {
172	/// Name of the protocol.
173	pub name: ProtocolName,
174	/// Names of the protocol to use if the main one isn't available.
175	pub fallback_names: Vec<ProtocolName>,
176	/// Handshake of the protocol. The `RwLock` is locked every time a new substream is opened.
177	pub handshake: Arc<RwLock<Vec<u8>>>,
178	/// Maximum allowed size for a notification.
179	pub max_notification_size: u64,
180}
181
182/// Fields specific for each individual protocol.
183struct Protocol {
184	/// Other fields.
185	config: ProtocolConfig,
186
187	/// Prototype for the inbound upgrade.
188	in_upgrade: NotificationsIn,
189
190	/// Current state of the substreams for this protocol.
191	state: State,
192}
193
194/// See the module-level documentation to learn about the meaning of these variants.
195enum State {
196	/// Protocol is in the "Closed" state.
197	Closed {
198		/// True if an outgoing substream is still in the process of being opened.
199		pending_opening: bool,
200	},
201
202	/// Protocol is in the "Closed" state. A [`NotifsHandlerOut::OpenDesiredByRemote`] has been
203	/// emitted.
204	OpenDesiredByRemote {
205		/// Substream opened by the remote and that hasn't been accepted/rejected yet.
206		in_substream: NotificationsInSubstream<Stream>,
207
208		/// See [`State::Closed::pending_opening`].
209		pending_opening: bool,
210	},
211
212	/// Protocol is in the "Closed" state, but has received a [`NotifsHandlerIn::Open`] and is
213	/// consequently trying to open the various notifications substreams.
214	///
215	/// A [`NotifsHandlerOut::OpenResultOk`] or a [`NotifsHandlerOut::OpenResultErr`] event must
216	/// be emitted when transitioning to respectively [`State::Open`] or [`State::Closed`].
217	Opening {
218		/// Substream opened by the remote. If `Some`, has been accepted.
219		in_substream: Option<NotificationsInSubstream<Stream>>,
220		/// Is the connection inbound.
221		inbound: bool,
222	},
223
224	/// Protocol is in the "Open" state.
225	Open {
226		/// Contains the two `Receiver`s connected to the [`NotificationsSink`] that has been
227		/// sent out. The notifications to send out can be pulled from this receivers.
228		/// We use two different channels in order to have two different channel sizes, but from
229		/// the receiving point of view, the two channels are the same.
230		/// The receivers are fused in case the user drops the [`NotificationsSink`] entirely.
231		notifications_sink_rx: stream::Peekable<
232			stream::Select<
233				stream::Fuse<mpsc::Receiver<NotificationsSinkMessage>>,
234				stream::Fuse<mpsc::Receiver<NotificationsSinkMessage>>,
235			>,
236		>,
237
238		/// Outbound substream that has been accepted by the remote.
239		///
240		/// Always `Some` on transition to [`State::Open`]. Switched to `None` only if the remote
241		/// closed the substream. If `None`, a [`NotifsHandlerOut::CloseDesired`] event has been
242		/// emitted.
243		out_substream: Option<NotificationsOutSubstream<Stream>>,
244
245		/// Substream opened by the remote.
246		///
247		/// Contrary to the `out_substream` field, operations continue as normal even if the
248		/// substream has been closed by the remote. A `None` is treated the same way as if there
249		/// was an idle substream.
250		in_substream: Option<NotificationsInSubstream<Stream>>,
251	},
252}
253
254/// The close reason of an [`NotifsHandlerOut::CloseDesired`] event.
255#[derive(Debug, Clone, Copy, PartialEq, Eq)]
256pub enum CloseReason {
257	/// The remote has requested the substreams to be closed.
258	///
259	/// This can happen when the remote drops the substream or an IO error is encountered.
260	RemoteRequest,
261
262	/// The remote has misbehaved and did not comply with the notification spec.
263	///
264	/// This means for now that the remote has sent data on an outbound substream.
265	ProtocolMisbehavior,
266}
267
268/// Event that can be received by a `NotifsHandler`.
269#[derive(Debug, Clone)]
270pub enum NotifsHandlerIn {
271	/// Instruct the handler to open the notification substreams.
272	///
273	/// Must always be answered by a [`NotifsHandlerOut::OpenResultOk`] or a
274	/// [`NotifsHandlerOut::OpenResultErr`] event.
275	///
276	/// Importantly, it is forbidden to send a [`NotifsHandlerIn::Open`] while a previous one is
277	/// already in the fly. It is however possible if a `Close` is still in the fly.
278	Open {
279		/// Index of the protocol in the list of protocols passed at initialization.
280		protocol_index: usize,
281
282		/// The peer id of the remote.
283		peer_id: PeerId,
284	},
285
286	/// Instruct the handler to close the notification substreams, or reject any pending incoming
287	/// substream request.
288	///
289	/// Must always be answered by a [`NotifsHandlerOut::CloseResult`] event.
290	Close {
291		/// Index of the protocol in the list of protocols passed at initialization.
292		protocol_index: usize,
293	},
294}
295
296/// Event that can be emitted by a `NotifsHandler`.
297#[derive(Debug)]
298pub enum NotifsHandlerOut {
299	/// Acknowledges a [`NotifsHandlerIn::Open`].
300	OpenResultOk {
301		/// Index of the protocol in the list of protocols passed at initialization.
302		protocol_index: usize,
303		/// Name of the protocol that was actually negotiated, if the default one wasn't available.
304		negotiated_fallback: Option<ProtocolName>,
305		/// Handshake that was sent to us.
306		/// This is normally a "Status" message, but this out of the concern of this code.
307		received_handshake: Vec<u8>,
308		/// How notifications can be sent to this node.
309		notifications_sink: NotificationsSink,
310		/// Is the connection inbound.
311		inbound: bool,
312	},
313
314	/// Acknowledges a [`NotifsHandlerIn::Open`]. The remote has refused the attempt to open
315	/// notification substreams.
316	OpenResultErr {
317		/// Index of the protocol in the list of protocols passed at initialization.
318		protocol_index: usize,
319	},
320
321	/// Acknowledges a [`NotifsHandlerIn::Close`].
322	CloseResult {
323		/// Index of the protocol in the list of protocols passed at initialization.
324		protocol_index: usize,
325	},
326
327	/// The remote would like the substreams to be open. Send a [`NotifsHandlerIn::Open`] or a
328	/// [`NotifsHandlerIn::Close`] in order to either accept or deny this request. If a
329	/// [`NotifsHandlerIn::Open`] or [`NotifsHandlerIn::Close`] has been sent before and has not
330	/// yet been acknowledged by a matching [`NotifsHandlerOut`], then you don't need to a send
331	/// another [`NotifsHandlerIn`].
332	OpenDesiredByRemote {
333		/// Index of the protocol in the list of protocols passed at initialization.
334		protocol_index: usize,
335		/// Received handshake.
336		handshake: Vec<u8>,
337	},
338
339	/// The remote would like the substreams to be closed, or the remote peer has misbehaved.
340	///
341	/// Send a [`NotifsHandlerIn::Close`] in order to close them. If a [`NotifsHandlerIn::Close`]
342	/// has been sent before and has not yet been acknowledged by a
343	/// [`NotifsHandlerOut::CloseResult`], then you don't need to a send another one.
344	CloseDesired {
345		/// Index of the protocol in the list of protocols passed at initialization.
346		protocol_index: usize,
347
348		/// The close reason.
349		reason: CloseReason,
350	},
351
352	/// Received a message on a custom protocol substream.
353	///
354	/// Can only happen when the handler is in the open state.
355	Notification {
356		/// Index of the protocol in the list of protocols passed at initialization.
357		protocol_index: usize,
358		/// Message that has been received.
359		message: BytesMut,
360	},
361
362	/// Close connection
363	Close {
364		/// Index of the protocol in the list of protocols passed at initialization.
365		protocol_index: usize,
366	},
367}
368
369/// Sink connected directly to the node background task. Allows sending notifications to the peer.
370///
371/// Can be cloned in order to obtain multiple references to the substream of the same peer.
372#[derive(Debug, Clone)]
373pub struct NotificationsSink {
374	inner: Arc<NotificationsSinkInner>,
375	metrics: Option<Arc<NotificationMetrics>>,
376}
377
378impl NotificationsSink {
379	/// Create new [`NotificationsSink`].
380	/// NOTE: only used for testing but must be `pub` as other crates in `client/network` use this.
381	pub fn new(
382		peer_id: PeerId,
383	) -> (Self, mpsc::Receiver<NotificationsSinkMessage>, mpsc::Receiver<NotificationsSinkMessage>)
384	{
385		let (async_tx, async_rx) = mpsc::channel(ASYNC_NOTIFICATIONS_BUFFER_SIZE);
386		let (sync_tx, sync_rx) = mpsc::channel(SYNC_NOTIFICATIONS_BUFFER_SIZE);
387		(
388			NotificationsSink {
389				inner: Arc::new(NotificationsSinkInner {
390					peer_id,
391					async_channel: FuturesMutex::new(async_tx),
392					sync_channel: Mutex::new(Some(sync_tx)),
393				}),
394				metrics: None,
395			},
396			async_rx,
397			sync_rx,
398		)
399	}
400
401	/// Get reference to metrics.
402	pub fn metrics(&self) -> &Option<Arc<NotificationMetrics>> {
403		&self.metrics
404	}
405}
406
407#[derive(Debug)]
408struct NotificationsSinkInner {
409	/// Target of the sink.
410	peer_id: PeerId,
411	/// Sender to use in asynchronous contexts. Uses an asynchronous mutex.
412	async_channel: FuturesMutex<mpsc::Sender<NotificationsSinkMessage>>,
413	/// Sender to use in synchronous contexts. Uses a synchronous mutex.
414	/// Contains `None` if the channel was full at some point, in which case the channel will
415	/// be closed in the near future anyway.
416	/// This channel has a large capacity and is meant to be used in contexts where
417	/// back-pressure cannot be properly exerted.
418	/// It will be removed in a future version.
419	sync_channel: Mutex<Option<mpsc::Sender<NotificationsSinkMessage>>>,
420}
421
422/// Message emitted through the [`NotificationsSink`] and processed by the background task
423/// dedicated to the peer.
424#[derive(Debug, PartialEq, Eq)]
425pub enum NotificationsSinkMessage {
426	/// Message emitted by [`NotificationsSink::reserve_notification`] and
427	/// [`NotificationsSink::write_notification_now`].
428	Notification { message: Vec<u8> },
429
430	/// Must close the connection.
431	ForceClose,
432}
433
434impl NotificationsSink {
435	/// Returns the [`PeerId`] the sink is connected to.
436	pub fn peer_id(&self) -> &PeerId {
437		&self.inner.peer_id
438	}
439
440	/// Sends a notification to the peer.
441	///
442	/// If too many messages are already buffered, the notification is silently discarded and the
443	/// connection to the peer will be closed shortly after.
444	///
445	/// The protocol name is expected to be checked ahead of calling this method. It is a logic
446	/// error to send a notification using an unknown protocol.
447	///
448	/// This method will be removed in a future version.
449	pub fn send_sync_notification(&self, message: impl Into<Vec<u8>>) {
450		let mut lock = self.inner.sync_channel.lock();
451
452		if let Some(tx) = lock.as_mut() {
453			let message = message.into();
454			let result = tx.try_send(NotificationsSinkMessage::Notification { message });
455
456			if result.is_err() {
457				// Cloning the `mpsc::Sender` guarantees the allocation of an extra spot in the
458				// buffer, and therefore `try_send` will succeed.
459				let _result2 = tx.clone().try_send(NotificationsSinkMessage::ForceClose);
460				debug_assert!(_result2.map(|()| true).unwrap_or_else(|err| err.is_disconnected()));
461
462				// Destroy the sender in order to not send more `ForceClose` messages.
463				*lock = None;
464			}
465		}
466	}
467
468	/// Wait until the remote is ready to accept a notification.
469	///
470	/// Returns an error in the case where the connection is closed.
471	///
472	/// The protocol name is expected to be checked ahead of calling this method. It is a logic
473	/// error to send a notification using an unknown protocol.
474	pub async fn reserve_notification(&self) -> Result<Ready<'_>, ()> {
475		let mut lock = self.inner.async_channel.lock().await;
476
477		let poll_ready = future::poll_fn(|cx| lock.poll_ready(cx)).await;
478		if poll_ready.is_ok() {
479			Ok(Ready { lock })
480		} else {
481			Err(())
482		}
483	}
484}
485
486/// Notification slot is reserved and the notification can actually be sent.
487#[must_use]
488#[derive(Debug)]
489pub struct Ready<'a> {
490	/// Guarded channel. The channel inside is guaranteed to not be full.
491	lock: FuturesMutexGuard<'a, mpsc::Sender<NotificationsSinkMessage>>,
492}
493
494impl<'a> Ready<'a> {
495	/// Consumes this slots reservation and actually queues the notification.
496	///
497	/// Returns an error if the substream has been closed.
498	pub fn send(mut self, notification: impl Into<Vec<u8>>) -> Result<(), ()> {
499		self.lock
500			.start_send(NotificationsSinkMessage::Notification { message: notification.into() })
501			.map_err(|_| ())
502	}
503}
504
505impl ConnectionHandler for NotifsHandler {
506	type FromBehaviour = NotifsHandlerIn;
507	type ToBehaviour = NotifsHandlerOut;
508	type InboundProtocol = UpgradeCollec<NotificationsIn>;
509	type OutboundProtocol = NotificationsOut;
510	// Index within the `out_protocols`.
511	type OutboundOpenInfo = usize;
512	type InboundOpenInfo = ();
513
514	fn listen_protocol(&self) -> SubstreamProtocol<Self::InboundProtocol, ()> {
515		let protocols = self
516			.protocols
517			.iter()
518			.map(|p| p.in_upgrade.clone())
519			.collect::<UpgradeCollec<_>>();
520
521		SubstreamProtocol::new(protocols, ())
522	}
523
524	fn on_connection_event(
525		&mut self,
526		event: ConnectionEvent<
527			'_,
528			Self::InboundProtocol,
529			Self::OutboundProtocol,
530			Self::InboundOpenInfo,
531			Self::OutboundOpenInfo,
532		>,
533	) {
534		match event {
535			ConnectionEvent::FullyNegotiatedInbound(inbound) => {
536				let (mut in_substream_open, protocol_index) = inbound.protocol;
537				let protocol_info = &mut self.protocols[protocol_index];
538
539				match protocol_info.state {
540					State::Closed { pending_opening } => {
541						self.events_queue.push_back(ConnectionHandlerEvent::NotifyBehaviour(
542							NotifsHandlerOut::OpenDesiredByRemote {
543								protocol_index,
544								handshake: in_substream_open.handshake,
545							},
546						));
547
548						protocol_info.state = State::OpenDesiredByRemote {
549							in_substream: in_substream_open.substream,
550							pending_opening,
551						};
552					},
553					State::OpenDesiredByRemote { .. } => {
554						// If a substream already exists, silently drop the new one.
555						// Note that we drop the substream, which will send an equivalent to a
556						// TCP "RST" to the remote and force-close the substream. It might
557						// seem like an unclean way to get rid of a substream. However, keep
558						// in mind that it is invalid for the remote to open multiple such
559						// substreams, and therefore sending a "RST" is the most correct thing
560						// to do.
561						return;
562					},
563					State::Opening { ref mut in_substream, .. } |
564					State::Open { ref mut in_substream, .. } => {
565						if in_substream.is_some() {
566							// Same remark as above.
567							return;
568						}
569
570						// Create `handshake_message` on a separate line to be sure that the
571						// lock is released as soon as possible.
572						let handshake_message = protocol_info.config.handshake.read().clone();
573						in_substream_open.substream.send_handshake(handshake_message);
574						*in_substream = Some(in_substream_open.substream);
575					},
576				}
577			},
578			ConnectionEvent::FullyNegotiatedOutbound(outbound) => {
579				let (new_open, protocol_index) = (outbound.protocol, outbound.info);
580
581				match self.protocols[protocol_index].state {
582					State::Closed { ref mut pending_opening } |
583					State::OpenDesiredByRemote { ref mut pending_opening, .. } => {
584						debug_assert!(*pending_opening);
585						*pending_opening = false;
586					},
587					State::Open { .. } => {
588						log::error!(target: LOG_TARGET, "☎️ State mismatch in notifications handler");
589						debug_assert!(false);
590					},
591					State::Opening { ref mut in_substream, inbound } => {
592						let (async_tx, async_rx) = mpsc::channel(ASYNC_NOTIFICATIONS_BUFFER_SIZE);
593						let (sync_tx, sync_rx) = mpsc::channel(SYNC_NOTIFICATIONS_BUFFER_SIZE);
594						let notifications_sink = NotificationsSink {
595							inner: Arc::new(NotificationsSinkInner {
596								peer_id: self.peer_id,
597								async_channel: FuturesMutex::new(async_tx),
598								sync_channel: Mutex::new(Some(sync_tx)),
599							}),
600							metrics: self.metrics.clone(),
601						};
602
603						self.protocols[protocol_index].state = State::Open {
604							notifications_sink_rx: stream::select(async_rx.fuse(), sync_rx.fuse())
605								.peekable(),
606							out_substream: Some(new_open.substream),
607							in_substream: in_substream.take(),
608						};
609
610						self.events_queue.push_back(ConnectionHandlerEvent::NotifyBehaviour(
611							NotifsHandlerOut::OpenResultOk {
612								protocol_index,
613								negotiated_fallback: new_open.negotiated_fallback,
614								received_handshake: new_open.handshake,
615								notifications_sink,
616								inbound,
617							},
618						));
619					},
620				}
621			},
622			ConnectionEvent::AddressChange(_address_change) => {},
623			ConnectionEvent::LocalProtocolsChange(_) => {},
624			ConnectionEvent::RemoteProtocolsChange(_) => {},
625			ConnectionEvent::DialUpgradeError(dial_upgrade_error) => match self.protocols
626				[dial_upgrade_error.info]
627				.state
628			{
629				State::Closed { ref mut pending_opening } |
630				State::OpenDesiredByRemote { ref mut pending_opening, .. } => {
631					debug_assert!(*pending_opening);
632					*pending_opening = false;
633				},
634
635				State::Opening { .. } => {
636					self.protocols[dial_upgrade_error.info].state =
637						State::Closed { pending_opening: false };
638
639					self.events_queue.push_back(ConnectionHandlerEvent::NotifyBehaviour(
640						NotifsHandlerOut::OpenResultErr { protocol_index: dial_upgrade_error.info },
641					));
642				},
643
644				// No substream is being open when already `Open`.
645				State::Open { .. } => debug_assert!(false),
646			},
647			ConnectionEvent::ListenUpgradeError(_listen_upgrade_error) => {},
648			event => {
649				log::warn!(target: LOG_TARGET, "New unknown `ConnectionEvent` libp2p event: {event:?}");
650			},
651		}
652	}
653
654	fn on_behaviour_event(&mut self, message: NotifsHandlerIn) {
655		match message {
656			NotifsHandlerIn::Open { protocol_index, peer_id } => {
657				let protocol_info = &mut self.protocols[protocol_index];
658				match &mut protocol_info.state {
659					State::Closed { pending_opening } => {
660						if !*pending_opening {
661							let proto = NotificationsOut::new(
662								protocol_info.config.name.clone(),
663								protocol_info.config.fallback_names.clone(),
664								protocol_info.config.handshake.read().clone(),
665								protocol_info.config.max_notification_size,
666								peer_id,
667							);
668
669							self.events_queue.push_back(
670								ConnectionHandlerEvent::OutboundSubstreamRequest {
671									protocol: SubstreamProtocol::new(proto, protocol_index)
672										.with_timeout(OPEN_TIMEOUT),
673								},
674							);
675						}
676
677						protocol_info.state = State::Opening { in_substream: None, inbound: false };
678					},
679					State::OpenDesiredByRemote { pending_opening, in_substream } => {
680						let handshake_message = protocol_info.config.handshake.read().clone();
681
682						if !*pending_opening {
683							let proto = NotificationsOut::new(
684								protocol_info.config.name.clone(),
685								protocol_info.config.fallback_names.clone(),
686								handshake_message.clone(),
687								protocol_info.config.max_notification_size,
688								peer_id,
689							);
690
691							self.events_queue.push_back(
692								ConnectionHandlerEvent::OutboundSubstreamRequest {
693									protocol: SubstreamProtocol::new(proto, protocol_index)
694										.with_timeout(OPEN_TIMEOUT),
695								},
696							);
697						}
698
699						in_substream.send_handshake(handshake_message);
700
701						// The state change is done in two steps because of borrowing issues.
702						let in_substream = match mem::replace(
703							&mut protocol_info.state,
704							State::Opening { in_substream: None, inbound: false },
705						) {
706							State::OpenDesiredByRemote { in_substream, .. } => in_substream,
707							_ => unreachable!(),
708						};
709						protocol_info.state =
710							State::Opening { in_substream: Some(in_substream), inbound: true };
711					},
712					State::Opening { .. } | State::Open { .. } => {
713						// As documented, it is forbidden to send an `Open` while there is already
714						// one in the fly.
715						log::error!(target: LOG_TARGET, "opening already-opened handler");
716						debug_assert!(false);
717					},
718				}
719			},
720
721			NotifsHandlerIn::Close { protocol_index } => {
722				match self.protocols[protocol_index].state {
723					State::Open { .. } => {
724						self.protocols[protocol_index].state =
725							State::Closed { pending_opening: false };
726					},
727					State::Opening { .. } => {
728						self.protocols[protocol_index].state =
729							State::Closed { pending_opening: true };
730
731						self.events_queue.push_back(ConnectionHandlerEvent::NotifyBehaviour(
732							NotifsHandlerOut::OpenResultErr { protocol_index },
733						));
734					},
735					State::OpenDesiredByRemote { pending_opening, .. } => {
736						self.protocols[protocol_index].state = State::Closed { pending_opening };
737					},
738					State::Closed { .. } => {},
739				}
740
741				self.events_queue.push_back(ConnectionHandlerEvent::NotifyBehaviour(
742					NotifsHandlerOut::CloseResult { protocol_index },
743				));
744			},
745		}
746	}
747
748	fn connection_keep_alive(&self) -> bool {
749		// `Yes` if any protocol has some activity.
750		if self.protocols.iter().any(|p| !matches!(p.state, State::Closed { .. })) {
751			return true;
752		}
753
754		self.keep_alive
755	}
756
757	fn poll(
758		&mut self,
759		cx: &mut Context,
760	) -> Poll<
761		ConnectionHandlerEvent<Self::OutboundProtocol, Self::OutboundOpenInfo, Self::ToBehaviour>,
762	> {
763		{
764			let maybe_keep_alive_timeout_future = &mut self.keep_alive_timeout_future;
765			if let Some(keep_alive_timeout_future) = maybe_keep_alive_timeout_future {
766				if keep_alive_timeout_future.poll_unpin(cx).is_ready() {
767					maybe_keep_alive_timeout_future.take();
768					self.keep_alive = false;
769				}
770			}
771		}
772
773		if let Some(ev) = self.events_queue.pop_front() {
774			return Poll::Ready(ev);
775		}
776
777		// For each open substream, try to send messages from `notifications_sink_rx` to the
778		// substream.
779		for protocol_index in 0..self.protocols.len() {
780			if let State::Open {
781				notifications_sink_rx, out_substream: Some(out_substream), ..
782			} = &mut self.protocols[protocol_index].state
783			{
784				loop {
785					// Only proceed with `out_substream.poll_ready_unpin` if there is an element
786					// available in `notifications_sink_rx`. This avoids waking up the task when
787					// a substream is ready to send if there isn't actually something to send.
788					match Pin::new(&mut *notifications_sink_rx).as_mut().poll_peek(cx) {
789						Poll::Ready(Some(&NotificationsSinkMessage::ForceClose)) => {
790							return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
791								NotifsHandlerOut::Close { protocol_index },
792							))
793						},
794						Poll::Ready(Some(&NotificationsSinkMessage::Notification { .. })) => {},
795						Poll::Ready(None) | Poll::Pending => break,
796					}
797
798					// Before we extract the element from `notifications_sink_rx`, check that the
799					// substream is ready to accept a message.
800					match out_substream.poll_ready_unpin(cx) {
801						Poll::Ready(_) => {},
802						Poll::Pending => break,
803					}
804
805					// Now that the substream is ready for a message, grab what to send.
806					let message = match notifications_sink_rx.poll_next_unpin(cx) {
807						Poll::Ready(Some(NotificationsSinkMessage::Notification { message })) => {
808							message
809						},
810						Poll::Ready(Some(NotificationsSinkMessage::ForceClose)) |
811						Poll::Ready(None) |
812						Poll::Pending => {
813							// Should never be reached, as per `poll_peek` above.
814							debug_assert!(false);
815							break;
816						},
817					};
818
819					let _ = out_substream.start_send_unpin(message);
820					// Note that flushing is performed later down this function.
821				}
822			}
823		}
824
825		// Flush all outbound substreams.
826		// When `poll` returns `Poll::Ready`, the libp2p `Swarm` may decide to no longer call
827		// `poll` again before it is ready to accept more events.
828		// In order to make sure that substreams are flushed as soon as possible, the flush is
829		// performed before the code paths that can produce `Ready` (with some rare exceptions).
830		// Importantly, however, the flush is performed *after* notifications are queued with
831		// `Sink::start_send`.
832		// Note that we must call `poll_flush` on all substreams and not only on those we
833		// have called `Sink::start_send` on, because `NotificationsOutSubstream::poll_flush`
834		// also reports the substream termination (even if no data was written into it).
835		for protocol_index in 0..self.protocols.len() {
836			match &mut self.protocols[protocol_index].state {
837				State::Open { out_substream: out_substream @ Some(_), .. } => {
838					match Sink::poll_flush(Pin::new(out_substream.as_mut().unwrap()), cx) {
839						Poll::Pending | Poll::Ready(Ok(())) => {},
840						Poll::Ready(Err(error)) => {
841							*out_substream = None;
842
843							let reason = match error {
844								NotificationsOutError::Io(_) | NotificationsOutError::Closed => {
845									CloseReason::RemoteRequest
846								},
847								NotificationsOutError::UnexpectedData => {
848									CloseReason::ProtocolMisbehavior
849								},
850							};
851
852							let event = NotifsHandlerOut::CloseDesired { protocol_index, reason };
853							return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(event));
854						},
855					};
856				},
857
858				State::Closed { .. } |
859				State::Opening { .. } |
860				State::Open { out_substream: None, .. } |
861				State::OpenDesiredByRemote { .. } => {},
862			}
863		}
864
865		// Poll inbound substreams.
866		for protocol_index in 0..self.protocols.len() {
867			// Inbound substreams being closed is always tolerated, except for the
868			// `OpenDesiredByRemote` state which might need to be switched back to `Closed`.
869			match &mut self.protocols[protocol_index].state {
870				State::Closed { .. } |
871				State::Open { in_substream: None, .. } |
872				State::Opening { in_substream: None, .. } => {},
873
874				State::Open { in_substream: in_substream @ Some(_), .. } => {
875					match futures::prelude::stream::Stream::poll_next(
876						Pin::new(in_substream.as_mut().unwrap()),
877						cx,
878					) {
879						Poll::Pending => {},
880						Poll::Ready(Some(Ok(message))) => {
881							let event = NotifsHandlerOut::Notification { protocol_index, message };
882							return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(event));
883						},
884						Poll::Ready(None) | Poll::Ready(Some(Err(_))) => *in_substream = None,
885					}
886				},
887
888				State::OpenDesiredByRemote { in_substream, pending_opening } => {
889					match NotificationsInSubstream::poll_process(Pin::new(in_substream), cx) {
890						Poll::Pending => {},
891						Poll::Ready(Ok(())) => {},
892						Poll::Ready(Err(_)) => {
893							self.protocols[protocol_index].state =
894								State::Closed { pending_opening: *pending_opening };
895							return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
896								NotifsHandlerOut::CloseDesired {
897									protocol_index,
898									reason: CloseReason::RemoteRequest,
899								},
900							));
901						},
902					}
903				},
904
905				State::Opening { in_substream: in_substream @ Some(_), .. } => {
906					match NotificationsInSubstream::poll_process(
907						Pin::new(in_substream.as_mut().unwrap()),
908						cx,
909					) {
910						Poll::Pending => {},
911						Poll::Ready(Ok(())) => {},
912						Poll::Ready(Err(_)) => *in_substream = None,
913					}
914				},
915			}
916		}
917
918		// This is the only place in this method that can return `Pending`.
919		// By putting it at the very bottom, we are guaranteed that everything has been properly
920		// polled.
921		Poll::Pending
922	}
923}
924
925#[cfg(test)]
926pub mod tests {
927	use super::*;
928	use crate::protocol::notifications::upgrade::{
929		NotificationsInOpen, NotificationsInSubstreamHandshake, NotificationsOutOpen,
930	};
931	use asynchronous_codec::Framed;
932	use libp2p::{
933		core::muxing::SubstreamBox,
934		swarm::handler::{self, StreamUpgradeError},
935	};
936	use multistream_select::{dialer_select_proto, listener_select_proto, Negotiated, Version};
937	use std::{
938		collections::HashMap,
939		io::{Error, IoSlice, IoSliceMut},
940	};
941	use tokio::sync::mpsc;
942	use unsigned_varint::codec::UviBytes;
943
944	struct OpenSubstream {
945		notifications: stream::Peekable<
946			stream::Select<
947				stream::Fuse<futures::channel::mpsc::Receiver<NotificationsSinkMessage>>,
948				stream::Fuse<futures::channel::mpsc::Receiver<NotificationsSinkMessage>>,
949			>,
950		>,
951		_in_substream: MockSubstream,
952		_out_substream: MockSubstream,
953	}
954
955	pub struct ConnectionYielder {
956		connections: HashMap<(PeerId, usize), OpenSubstream>,
957	}
958
959	impl ConnectionYielder {
960		/// Create new [`ConnectionYielder`].
961		pub fn new() -> Self {
962			Self { connections: HashMap::new() }
963		}
964
965		/// Open a new substream for peer.
966		pub fn open_substream(
967			&mut self,
968			peer: PeerId,
969			protocol_index: usize,
970			received_handshake: Vec<u8>,
971		) -> NotifsHandlerOut {
972			let (async_tx, async_rx) =
973				futures::channel::mpsc::channel(ASYNC_NOTIFICATIONS_BUFFER_SIZE);
974			let (sync_tx, sync_rx) =
975				futures::channel::mpsc::channel(SYNC_NOTIFICATIONS_BUFFER_SIZE);
976			let notifications_sink = NotificationsSink {
977				inner: Arc::new(NotificationsSinkInner {
978					peer_id: peer,
979					async_channel: FuturesMutex::new(async_tx),
980					sync_channel: Mutex::new(Some(sync_tx)),
981				}),
982				metrics: None,
983			};
984			let (in_substream, out_substream) = MockSubstream::new();
985
986			self.connections.insert(
987				(peer, protocol_index),
988				OpenSubstream {
989					notifications: stream::select(async_rx.fuse(), sync_rx.fuse()).peekable(),
990					_in_substream: in_substream,
991					_out_substream: out_substream,
992				},
993			);
994
995			NotifsHandlerOut::OpenResultOk {
996				protocol_index,
997				negotiated_fallback: None,
998				received_handshake,
999				notifications_sink,
1000				inbound: false,
1001			}
1002		}
1003
1004		/// Attempt to get next pending event from one of the notification sinks.
1005		pub async fn get_next_event(&mut self, peer: PeerId, set: usize) -> Option<Vec<u8>> {
1006			let substream = if let Some(info) = self.connections.get_mut(&(peer, set)) {
1007				info
1008			} else {
1009				return None;
1010			};
1011
1012			futures::future::poll_fn(|cx| match substream.notifications.poll_next_unpin(cx) {
1013				Poll::Ready(Some(NotificationsSinkMessage::Notification { message })) => {
1014					Poll::Ready(Some(message))
1015				},
1016				Poll::Pending => Poll::Ready(None),
1017				Poll::Ready(Some(NotificationsSinkMessage::ForceClose)) | Poll::Ready(None) => {
1018					panic!("sink closed")
1019				},
1020			})
1021			.await
1022		}
1023	}
1024
1025	struct MockSubstream {
1026		pub rx: mpsc::Receiver<Vec<u8>>,
1027		pub tx: mpsc::Sender<Vec<u8>>,
1028		rx_buffer: BytesMut,
1029	}
1030
1031	/// Mirror of `ActiveStreamCounter` in `libp2p`
1032	#[allow(dead_code)]
1033	struct MockActiveStreamCounter(Arc<()>);
1034
1035	// Mirror of `Stream` in `libp2p`
1036	#[allow(dead_code)]
1037	struct MockStream {
1038		stream: Negotiated<SubstreamBox>,
1039		counter: Option<MockActiveStreamCounter>,
1040	}
1041
1042	impl MockSubstream {
1043		/// Create new substream pair.
1044		pub fn new() -> (Self, Self) {
1045			let (tx1, rx1) = mpsc::channel(32);
1046			let (tx2, rx2) = mpsc::channel(32);
1047
1048			(
1049				Self { rx: rx1, tx: tx2, rx_buffer: BytesMut::with_capacity(512) },
1050				Self { rx: rx2, tx: tx1, rx_buffer: BytesMut::with_capacity(512) },
1051			)
1052		}
1053
1054		/// Create new negotiated substream pair.
1055		pub async fn negotiated() -> (Stream, Stream) {
1056			let (socket1, socket2) = Self::new();
1057			let socket1 = SubstreamBox::new(socket1);
1058			let socket2 = SubstreamBox::new(socket2);
1059
1060			let protos = vec!["/echo/1.0.0", "/echo/2.5.0"];
1061			let (res1, res2) = tokio::join!(
1062				dialer_select_proto(socket1, protos.clone(), Version::V1),
1063				listener_select_proto(socket2, protos),
1064			);
1065
1066			(Self::stream_new(res1.unwrap().1), Self::stream_new(res2.unwrap().1))
1067		}
1068
1069		/// Unsafe substitute for `Stream::new` private constructor.
1070		fn stream_new(stream: Negotiated<SubstreamBox>) -> Stream {
1071			let stream = MockStream { stream, counter: None };
1072			// Static asserts to make sure this doesn't break.
1073			const _: () = {
1074				assert!(core::mem::size_of::<Stream>() == core::mem::size_of::<MockStream>());
1075				assert!(core::mem::align_of::<Stream>() == core::mem::align_of::<MockStream>());
1076			};
1077
1078			unsafe { core::mem::transmute(stream) }
1079		}
1080	}
1081
1082	impl AsyncWrite for MockSubstream {
1083		fn poll_write<'a>(
1084			self: Pin<&mut Self>,
1085			_cx: &mut Context<'a>,
1086			buf: &[u8],
1087		) -> Poll<Result<usize, Error>> {
1088			match self.tx.try_send(buf.to_vec()) {
1089				Ok(_) => Poll::Ready(Ok(buf.len())),
1090				Err(_) => Poll::Ready(Err(std::io::ErrorKind::UnexpectedEof.into())),
1091			}
1092		}
1093
1094		fn poll_flush<'a>(self: Pin<&mut Self>, _cx: &mut Context<'a>) -> Poll<Result<(), Error>> {
1095			Poll::Ready(Ok(()))
1096		}
1097
1098		fn poll_close<'a>(self: Pin<&mut Self>, _cx: &mut Context<'a>) -> Poll<Result<(), Error>> {
1099			Poll::Ready(Ok(()))
1100		}
1101
1102		fn poll_write_vectored<'a, 'b>(
1103			self: Pin<&mut Self>,
1104			_cx: &mut Context<'a>,
1105			_bufs: &[IoSlice<'b>],
1106		) -> Poll<Result<usize, Error>> {
1107			unimplemented!();
1108		}
1109	}
1110
1111	impl AsyncRead for MockSubstream {
1112		fn poll_read<'a>(
1113			mut self: Pin<&mut Self>,
1114			cx: &mut Context<'a>,
1115			buf: &mut [u8],
1116		) -> Poll<Result<usize, Error>> {
1117			match self.rx.poll_recv(cx) {
1118				Poll::Ready(Some(data)) => self.rx_buffer.extend_from_slice(&data),
1119				Poll::Ready(None) => {
1120					return Poll::Ready(Err(std::io::ErrorKind::UnexpectedEof.into()))
1121				},
1122				_ => {},
1123			}
1124
1125			let nsize = std::cmp::min(self.rx_buffer.len(), buf.len());
1126			let data = self.rx_buffer.split_to(nsize);
1127			buf[..nsize].copy_from_slice(&data[..]);
1128
1129			if nsize > 0 {
1130				return Poll::Ready(Ok(nsize));
1131			}
1132
1133			Poll::Pending
1134		}
1135
1136		fn poll_read_vectored<'a, 'b>(
1137			self: Pin<&mut Self>,
1138			_cx: &mut Context<'a>,
1139			_bufs: &mut [IoSliceMut<'b>],
1140		) -> Poll<Result<usize, Error>> {
1141			unimplemented!();
1142		}
1143	}
1144
1145	/// Create new [`NotifsHandler`].
1146	fn notifs_handler() -> NotifsHandler {
1147		NotifsHandler::new(
1148			PeerId::random(),
1149			vec![ProtocolConfig {
1150				name: "/foo".into(),
1151				fallback_names: vec![],
1152				handshake: Arc::new(RwLock::new(b"hello, world".to_vec())),
1153				max_notification_size: u64::MAX,
1154			}],
1155			None,
1156		)
1157	}
1158
1159	// verify that if another substream is attempted to be opened by remote while an inbound
1160	// substream already exists, the new inbound stream is rejected and closed by the local node.
1161	#[tokio::test]
1162	async fn second_open_desired_by_remote_rejected() {
1163		let mut handler = notifs_handler();
1164		let (io, mut io2) = MockSubstream::negotiated().await;
1165		let mut codec = UviBytes::default();
1166		codec.set_max_len(usize::MAX);
1167
1168		let notif_in = NotificationsInOpen {
1169			handshake: b"hello, world".to_vec(),
1170			substream: NotificationsInSubstream::new(
1171				Framed::new(io, codec),
1172				NotificationsInSubstreamHandshake::NotSent,
1173			),
1174		};
1175
1176		handler.on_connection_event(handler::ConnectionEvent::FullyNegotiatedInbound(
1177			handler::FullyNegotiatedInbound { protocol: (notif_in, 0), info: () },
1178		));
1179
1180		// verify that the substream is in (partly) opened state
1181		assert!(std::matches!(handler.protocols[0].state, State::OpenDesiredByRemote { .. }));
1182		futures::future::poll_fn(|cx| {
1183			let mut buf = Vec::with_capacity(512);
1184			assert!(std::matches!(Pin::new(&mut io2).poll_read(cx, &mut buf), Poll::Pending));
1185			Poll::Ready(())
1186		})
1187		.await;
1188
1189		// attempt to open another inbound substream and verify that it is rejected
1190		let (io, mut io2) = MockSubstream::negotiated().await;
1191		let mut codec = UviBytes::default();
1192		codec.set_max_len(usize::MAX);
1193
1194		let notif_in = NotificationsInOpen {
1195			handshake: b"hello, world".to_vec(),
1196			substream: NotificationsInSubstream::new(
1197				Framed::new(io, codec),
1198				NotificationsInSubstreamHandshake::NotSent,
1199			),
1200		};
1201
1202		handler.on_connection_event(handler::ConnectionEvent::FullyNegotiatedInbound(
1203			handler::FullyNegotiatedInbound { protocol: (notif_in, 0), info: () },
1204		));
1205
1206		// verify that the new substream is rejected and closed
1207		futures::future::poll_fn(|cx| {
1208			let mut buf = Vec::with_capacity(512);
1209
1210			if let Poll::Ready(Err(err)) = Pin::new(&mut io2).poll_read(cx, &mut buf) {
1211				assert_eq!(err.kind(), std::io::ErrorKind::UnexpectedEof,);
1212			}
1213
1214			Poll::Ready(())
1215		})
1216		.await;
1217	}
1218
1219	#[tokio::test]
1220	async fn open_rejected_if_substream_is_opening() {
1221		let mut handler = notifs_handler();
1222		let (io, mut io2) = MockSubstream::negotiated().await;
1223		let mut codec = UviBytes::default();
1224		codec.set_max_len(usize::MAX);
1225
1226		let notif_in = NotificationsInOpen {
1227			handshake: b"hello, world".to_vec(),
1228			substream: NotificationsInSubstream::new(
1229				Framed::new(io, codec),
1230				NotificationsInSubstreamHandshake::NotSent,
1231			),
1232		};
1233
1234		handler.on_connection_event(handler::ConnectionEvent::FullyNegotiatedInbound(
1235			handler::FullyNegotiatedInbound { protocol: (notif_in, 0), info: () },
1236		));
1237
1238		// verify that the substream is in (partly) opened state
1239		assert!(std::matches!(handler.protocols[0].state, State::OpenDesiredByRemote { .. }));
1240		futures::future::poll_fn(|cx| {
1241			let mut buf = Vec::with_capacity(512);
1242			assert!(std::matches!(Pin::new(&mut io2).poll_read(cx, &mut buf), Poll::Pending));
1243			Poll::Ready(())
1244		})
1245		.await;
1246
1247		// move the handler state to 'Opening'
1248		handler.on_behaviour_event(NotifsHandlerIn::Open {
1249			protocol_index: 0,
1250			peer_id: PeerId::random(),
1251		});
1252		assert!(std::matches!(
1253			handler.protocols[0].state,
1254			State::Opening { in_substream: Some(_), .. }
1255		));
1256
1257		// remote now tries to open another substream, verify that it is rejected and closed
1258		let (io, mut io2) = MockSubstream::negotiated().await;
1259		let mut codec = UviBytes::default();
1260		codec.set_max_len(usize::MAX);
1261
1262		let notif_in = NotificationsInOpen {
1263			handshake: b"hello, world".to_vec(),
1264			substream: NotificationsInSubstream::new(
1265				Framed::new(io, codec),
1266				NotificationsInSubstreamHandshake::NotSent,
1267			),
1268		};
1269
1270		handler.on_connection_event(handler::ConnectionEvent::FullyNegotiatedInbound(
1271			handler::FullyNegotiatedInbound { protocol: (notif_in, 0), info: () },
1272		));
1273
1274		// verify that the new substream is rejected and closed but that the first substream is
1275		// still in correct state
1276		futures::future::poll_fn(|cx| {
1277			let mut buf = Vec::with_capacity(512);
1278
1279			if let Poll::Ready(Err(err)) = Pin::new(&mut io2).poll_read(cx, &mut buf) {
1280				assert_eq!(err.kind(), std::io::ErrorKind::UnexpectedEof,);
1281			} else {
1282				panic!("unexpected result");
1283			}
1284
1285			Poll::Ready(())
1286		})
1287		.await;
1288		assert!(std::matches!(
1289			handler.protocols[0].state,
1290			State::Opening { in_substream: Some(_), .. }
1291		));
1292	}
1293
1294	#[tokio::test]
1295	async fn open_rejected_if_substream_already_open() {
1296		let mut handler = notifs_handler();
1297		let (io, mut io2) = MockSubstream::negotiated().await;
1298		let mut codec = UviBytes::default();
1299		codec.set_max_len(usize::MAX);
1300
1301		let notif_in = NotificationsInOpen {
1302			handshake: b"hello, world".to_vec(),
1303			substream: NotificationsInSubstream::new(
1304				Framed::new(io, codec),
1305				NotificationsInSubstreamHandshake::NotSent,
1306			),
1307		};
1308		handler.on_connection_event(handler::ConnectionEvent::FullyNegotiatedInbound(
1309			handler::FullyNegotiatedInbound { protocol: (notif_in, 0), info: () },
1310		));
1311
1312		// verify that the substream is in (partly) opened state
1313		assert!(std::matches!(handler.protocols[0].state, State::OpenDesiredByRemote { .. }));
1314		futures::future::poll_fn(|cx| {
1315			let mut buf = Vec::with_capacity(512);
1316			assert!(std::matches!(Pin::new(&mut io2).poll_read(cx, &mut buf), Poll::Pending));
1317			Poll::Ready(())
1318		})
1319		.await;
1320
1321		// move the handler state to 'Opening'
1322		handler.on_behaviour_event(NotifsHandlerIn::Open {
1323			protocol_index: 0,
1324			peer_id: PeerId::random(),
1325		});
1326		assert!(std::matches!(
1327			handler.protocols[0].state,
1328			State::Opening { in_substream: Some(_), .. }
1329		));
1330
1331		// accept the substream and move its state to `Open`
1332		let (io, _io2) = MockSubstream::negotiated().await;
1333		let mut codec = UviBytes::default();
1334		codec.set_max_len(usize::MAX);
1335
1336		let notif_out = NotificationsOutOpen {
1337			handshake: b"hello, world".to_vec(),
1338			negotiated_fallback: None,
1339			substream: NotificationsOutSubstream::new(Framed::new(io, codec)),
1340		};
1341		handler.on_connection_event(handler::ConnectionEvent::FullyNegotiatedOutbound(
1342			handler::FullyNegotiatedOutbound { protocol: notif_out, info: 0 },
1343		));
1344
1345		assert!(std::matches!(
1346			handler.protocols[0].state,
1347			State::Open { in_substream: Some(_), .. }
1348		));
1349
1350		// remote now tries to open another substream, verify that it is rejected and closed
1351		let (io, mut io2) = MockSubstream::negotiated().await;
1352		let mut codec = UviBytes::default();
1353		codec.set_max_len(usize::MAX);
1354		let notif_in = NotificationsInOpen {
1355			handshake: b"hello, world".to_vec(),
1356			substream: NotificationsInSubstream::new(
1357				Framed::new(io, codec),
1358				NotificationsInSubstreamHandshake::NotSent,
1359			),
1360		};
1361		handler.on_connection_event(handler::ConnectionEvent::FullyNegotiatedInbound(
1362			handler::FullyNegotiatedInbound { protocol: (notif_in, 0), info: () },
1363		));
1364
1365		// verify that the new substream is rejected and closed but that the first substream is
1366		// still in correct state
1367		futures::future::poll_fn(|cx| {
1368			let mut buf = Vec::with_capacity(512);
1369
1370			if let Poll::Ready(Err(err)) = Pin::new(&mut io2).poll_read(cx, &mut buf) {
1371				assert_eq!(err.kind(), std::io::ErrorKind::UnexpectedEof);
1372			} else {
1373				panic!("unexpected result");
1374			}
1375
1376			Poll::Ready(())
1377		})
1378		.await;
1379		assert!(std::matches!(
1380			handler.protocols[0].state,
1381			State::Open { in_substream: Some(_), .. }
1382		));
1383	}
1384
1385	#[tokio::test]
1386	async fn fully_negotiated_resets_state_for_closed_substream() {
1387		let mut handler = notifs_handler();
1388		let (io, mut io2) = MockSubstream::negotiated().await;
1389		let mut codec = UviBytes::default();
1390		codec.set_max_len(usize::MAX);
1391
1392		let notif_in = NotificationsInOpen {
1393			handshake: b"hello, world".to_vec(),
1394			substream: NotificationsInSubstream::new(
1395				Framed::new(io, codec),
1396				NotificationsInSubstreamHandshake::NotSent,
1397			),
1398		};
1399		handler.on_connection_event(handler::ConnectionEvent::FullyNegotiatedInbound(
1400			handler::FullyNegotiatedInbound { protocol: (notif_in, 0), info: () },
1401		));
1402
1403		// verify that the substream is in (partly) opened state
1404		assert!(std::matches!(handler.protocols[0].state, State::OpenDesiredByRemote { .. }));
1405		futures::future::poll_fn(|cx| {
1406			let mut buf = Vec::with_capacity(512);
1407			assert!(std::matches!(Pin::new(&mut io2).poll_read(cx, &mut buf), Poll::Pending));
1408			Poll::Ready(())
1409		})
1410		.await;
1411
1412		// first instruct the handler to open a connection and then close it right after
1413		// so the handler is in state `Closed { pending_opening: true }`
1414		handler.on_behaviour_event(NotifsHandlerIn::Open {
1415			protocol_index: 0,
1416			peer_id: PeerId::random(),
1417		});
1418		assert!(std::matches!(
1419			handler.protocols[0].state,
1420			State::Opening { in_substream: Some(_), .. }
1421		));
1422
1423		handler.on_behaviour_event(NotifsHandlerIn::Close { protocol_index: 0 });
1424		assert!(std::matches!(handler.protocols[0].state, State::Closed { pending_opening: true }));
1425
1426		// verify that if the the outbound substream is successfully negotiated, the state is not
1427		// changed as the substream was commanded to be closed by the handler.
1428		let (io, _io2) = MockSubstream::negotiated().await;
1429		let mut codec = UviBytes::default();
1430		codec.set_max_len(usize::MAX);
1431
1432		let notif_out = NotificationsOutOpen {
1433			handshake: b"hello, world".to_vec(),
1434			negotiated_fallback: None,
1435			substream: NotificationsOutSubstream::new(Framed::new(io, codec)),
1436		};
1437		handler.on_connection_event(handler::ConnectionEvent::FullyNegotiatedOutbound(
1438			handler::FullyNegotiatedOutbound { protocol: notif_out, info: 0 },
1439		));
1440
1441		assert!(std::matches!(
1442			handler.protocols[0].state,
1443			State::Closed { pending_opening: false }
1444		));
1445	}
1446
1447	#[tokio::test]
1448	async fn fully_negotiated_resets_state_for_open_desired_substream() {
1449		let mut handler = notifs_handler();
1450		let (io, mut io2) = MockSubstream::negotiated().await;
1451		let mut codec = UviBytes::default();
1452		codec.set_max_len(usize::MAX);
1453
1454		let notif_in = NotificationsInOpen {
1455			handshake: b"hello, world".to_vec(),
1456			substream: NotificationsInSubstream::new(
1457				Framed::new(io, codec),
1458				NotificationsInSubstreamHandshake::NotSent,
1459			),
1460		};
1461		handler.on_connection_event(handler::ConnectionEvent::FullyNegotiatedInbound(
1462			handler::FullyNegotiatedInbound { protocol: (notif_in, 0), info: () },
1463		));
1464
1465		// verify that the substream is in (partly) opened state
1466		assert!(std::matches!(handler.protocols[0].state, State::OpenDesiredByRemote { .. }));
1467		futures::future::poll_fn(|cx| {
1468			let mut buf = Vec::with_capacity(512);
1469			assert!(std::matches!(Pin::new(&mut io2).poll_read(cx, &mut buf), Poll::Pending));
1470			Poll::Ready(())
1471		})
1472		.await;
1473
1474		// first instruct the handler to open a connection and then close it right after
1475		// so the handler is in state `Closed { pending_opening: true }`
1476		handler.on_behaviour_event(NotifsHandlerIn::Open {
1477			protocol_index: 0,
1478			peer_id: PeerId::random(),
1479		});
1480		assert!(std::matches!(
1481			handler.protocols[0].state,
1482			State::Opening { in_substream: Some(_), .. }
1483		));
1484
1485		handler.on_behaviour_event(NotifsHandlerIn::Close { protocol_index: 0 });
1486		assert!(std::matches!(handler.protocols[0].state, State::Closed { pending_opening: true }));
1487
1488		// attempt to open another inbound substream and verify that it is rejected
1489		let (io, _io2) = MockSubstream::negotiated().await;
1490		let mut codec = UviBytes::default();
1491		codec.set_max_len(usize::MAX);
1492
1493		let notif_in = NotificationsInOpen {
1494			handshake: b"hello, world".to_vec(),
1495			substream: NotificationsInSubstream::new(
1496				Framed::new(io, codec),
1497				NotificationsInSubstreamHandshake::NotSent,
1498			),
1499		};
1500		handler.on_connection_event(handler::ConnectionEvent::FullyNegotiatedInbound(
1501			handler::FullyNegotiatedInbound { protocol: (notif_in, 0), info: () },
1502		));
1503
1504		assert!(std::matches!(
1505			handler.protocols[0].state,
1506			State::OpenDesiredByRemote { pending_opening: true, .. }
1507		));
1508
1509		// verify that if the the outbound substream is successfully negotiated, the state is not
1510		// changed as the substream was commanded to be closed by the handler.
1511		let (io, _io2) = MockSubstream::negotiated().await;
1512		let mut codec = UviBytes::default();
1513		codec.set_max_len(usize::MAX);
1514
1515		let notif_out = NotificationsOutOpen {
1516			handshake: b"hello, world".to_vec(),
1517			negotiated_fallback: None,
1518			substream: NotificationsOutSubstream::new(Framed::new(io, codec)),
1519		};
1520
1521		handler.on_connection_event(handler::ConnectionEvent::FullyNegotiatedOutbound(
1522			handler::FullyNegotiatedOutbound { protocol: notif_out, info: 0 },
1523		));
1524
1525		assert!(std::matches!(
1526			handler.protocols[0].state,
1527			State::OpenDesiredByRemote { pending_opening: false, .. }
1528		));
1529	}
1530
1531	#[tokio::test]
1532	async fn dial_upgrade_error_resets_closed_outbound_state() {
1533		let mut handler = notifs_handler();
1534		let (io, mut io2) = MockSubstream::negotiated().await;
1535		let mut codec = UviBytes::default();
1536		codec.set_max_len(usize::MAX);
1537
1538		let notif_in = NotificationsInOpen {
1539			handshake: b"hello, world".to_vec(),
1540			substream: NotificationsInSubstream::new(
1541				Framed::new(io, codec),
1542				NotificationsInSubstreamHandshake::NotSent,
1543			),
1544		};
1545		handler.on_connection_event(handler::ConnectionEvent::FullyNegotiatedInbound(
1546			handler::FullyNegotiatedInbound { protocol: (notif_in, 0), info: () },
1547		));
1548
1549		// verify that the substream is in (partly) opened state
1550		assert!(std::matches!(handler.protocols[0].state, State::OpenDesiredByRemote { .. }));
1551		futures::future::poll_fn(|cx| {
1552			let mut buf = Vec::with_capacity(512);
1553			assert!(std::matches!(Pin::new(&mut io2).poll_read(cx, &mut buf), Poll::Pending));
1554			Poll::Ready(())
1555		})
1556		.await;
1557
1558		// first instruct the handler to open a connection and then close it right after
1559		// so the handler is in state `Closed { pending_opening: true }`
1560		handler.on_behaviour_event(NotifsHandlerIn::Open {
1561			protocol_index: 0,
1562			peer_id: PeerId::random(),
1563		});
1564		assert!(std::matches!(
1565			handler.protocols[0].state,
1566			State::Opening { in_substream: Some(_), .. }
1567		));
1568
1569		handler.on_behaviour_event(NotifsHandlerIn::Close { protocol_index: 0 });
1570		assert!(std::matches!(handler.protocols[0].state, State::Closed { pending_opening: true }));
1571
1572		// inject dial failure to an already closed substream and verify outbound state is reset
1573		handler.on_connection_event(handler::ConnectionEvent::DialUpgradeError(
1574			handler::DialUpgradeError { info: 0, error: StreamUpgradeError::Timeout },
1575		));
1576		assert!(std::matches!(
1577			handler.protocols[0].state,
1578			State::Closed { pending_opening: false }
1579		));
1580	}
1581
1582	#[tokio::test]
1583	async fn dial_upgrade_error_resets_open_desired_state() {
1584		let mut handler = notifs_handler();
1585		let (io, mut io2) = MockSubstream::negotiated().await;
1586		let mut codec = UviBytes::default();
1587		codec.set_max_len(usize::MAX);
1588
1589		let notif_in = NotificationsInOpen {
1590			handshake: b"hello, world".to_vec(),
1591			substream: NotificationsInSubstream::new(
1592				Framed::new(io, codec),
1593				NotificationsInSubstreamHandshake::NotSent,
1594			),
1595		};
1596		handler.on_connection_event(handler::ConnectionEvent::FullyNegotiatedInbound(
1597			handler::FullyNegotiatedInbound { protocol: (notif_in, 0), info: () },
1598		));
1599
1600		// verify that the substream is in (partly) opened state
1601		assert!(std::matches!(handler.protocols[0].state, State::OpenDesiredByRemote { .. }));
1602		futures::future::poll_fn(|cx| {
1603			let mut buf = Vec::with_capacity(512);
1604			assert!(std::matches!(Pin::new(&mut io2).poll_read(cx, &mut buf), Poll::Pending));
1605			Poll::Ready(())
1606		})
1607		.await;
1608
1609		// first instruct the handler to open a connection and then close it right after
1610		// so the handler is in state `Closed { pending_opening: true }`
1611		handler.on_behaviour_event(NotifsHandlerIn::Open {
1612			protocol_index: 0,
1613			peer_id: PeerId::random(),
1614		});
1615		assert!(std::matches!(
1616			handler.protocols[0].state,
1617			State::Opening { in_substream: Some(_), .. }
1618		));
1619
1620		handler.on_behaviour_event(NotifsHandlerIn::Close { protocol_index: 0 });
1621		assert!(std::matches!(handler.protocols[0].state, State::Closed { pending_opening: true }));
1622
1623		let (io, _io2) = MockSubstream::negotiated().await;
1624		let mut codec = UviBytes::default();
1625		codec.set_max_len(usize::MAX);
1626
1627		let notif_in = NotificationsInOpen {
1628			handshake: b"hello, world".to_vec(),
1629			substream: NotificationsInSubstream::new(
1630				Framed::new(io, codec),
1631				NotificationsInSubstreamHandshake::NotSent,
1632			),
1633		};
1634		handler.on_connection_event(handler::ConnectionEvent::FullyNegotiatedInbound(
1635			handler::FullyNegotiatedInbound { protocol: (notif_in, 0), info: () },
1636		));
1637
1638		assert!(std::matches!(
1639			handler.protocols[0].state,
1640			State::OpenDesiredByRemote { pending_opening: true, .. }
1641		));
1642
1643		// inject dial failure to an already closed substream and verify outbound state is reset
1644		handler.on_connection_event(handler::ConnectionEvent::DialUpgradeError(
1645			handler::DialUpgradeError { info: 0, error: StreamUpgradeError::Timeout },
1646		));
1647		assert!(std::matches!(
1648			handler.protocols[0].state,
1649			State::OpenDesiredByRemote { pending_opening: false, .. }
1650		));
1651	}
1652
1653	#[tokio::test]
1654	async fn sync_notifications_clogged() {
1655		let mut handler = notifs_handler();
1656		let (io, _) = MockSubstream::negotiated().await;
1657		let codec = UviBytes::default();
1658
1659		let (async_tx, async_rx) = futures::channel::mpsc::channel(ASYNC_NOTIFICATIONS_BUFFER_SIZE);
1660		let (sync_tx, sync_rx) = futures::channel::mpsc::channel(1);
1661		let notifications_sink = NotificationsSink {
1662			inner: Arc::new(NotificationsSinkInner {
1663				peer_id: PeerId::random(),
1664				async_channel: FuturesMutex::new(async_tx),
1665				sync_channel: Mutex::new(Some(sync_tx)),
1666			}),
1667			metrics: None,
1668		};
1669
1670		handler.protocols[0].state = State::Open {
1671			notifications_sink_rx: stream::select(async_rx.fuse(), sync_rx.fuse()).peekable(),
1672			out_substream: Some(NotificationsOutSubstream::new(Framed::new(io, codec))),
1673			in_substream: None,
1674		};
1675
1676		notifications_sink.send_sync_notification(vec![1, 3, 3, 7]);
1677		notifications_sink.send_sync_notification(vec![1, 3, 3, 8]);
1678		notifications_sink.send_sync_notification(vec![1, 3, 3, 9]);
1679		notifications_sink.send_sync_notification(vec![1, 3, 4, 0]);
1680
1681		futures::future::poll_fn(|cx| {
1682			assert!(std::matches!(
1683				handler.poll(cx),
1684				Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
1685					NotifsHandlerOut::Close { .. }
1686				))
1687			));
1688			Poll::Ready(())
1689		})
1690		.await;
1691	}
1692
1693	#[tokio::test]
1694	async fn close_desired_by_remote() {
1695		let mut handler = notifs_handler();
1696		let (io, io2) = MockSubstream::negotiated().await;
1697		let mut codec = UviBytes::default();
1698		codec.set_max_len(usize::MAX);
1699
1700		let notif_in = NotificationsInOpen {
1701			handshake: b"hello, world".to_vec(),
1702			substream: NotificationsInSubstream::new(
1703				Framed::new(io, codec),
1704				NotificationsInSubstreamHandshake::PendingSend(vec![1, 2, 3, 4]),
1705			),
1706		};
1707
1708		// add new inbound substream but close it immediately and verify that correct events are
1709		// emitted
1710		handler.on_connection_event(handler::ConnectionEvent::FullyNegotiatedInbound(
1711			handler::FullyNegotiatedInbound { protocol: (notif_in, 0), info: () },
1712		));
1713		drop(io2);
1714
1715		futures::future::poll_fn(|cx| {
1716			assert!(std::matches!(
1717				handler.poll(cx),
1718				Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
1719					NotifsHandlerOut::OpenDesiredByRemote { protocol_index: 0, .. },
1720				))
1721			));
1722			assert!(std::matches!(
1723				handler.poll(cx),
1724				Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
1725					NotifsHandlerOut::CloseDesired {
1726						protocol_index: 0,
1727						reason: CloseReason::RemoteRequest,
1728					},
1729				))
1730			));
1731			Poll::Ready(())
1732		})
1733		.await;
1734	}
1735}