sc_network/protocol/notifications/
handler.rs

1// This file is part of Substrate.
2
3// Copyright (C) Parity Technologies (UK) Ltd.
4// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
5
6// This program is free software: you can redistribute it and/or modify
7// it under the terms of the GNU General Public License as published by
8// the Free Software Foundation, either version 3 of the License, or
9// (at your option) any later version.
10
11// This program is distributed in the hope that it will be useful,
12// but WITHOUT ANY WARRANTY; without even the implied warranty of
13// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14// GNU General Public License for more details.
15
16// You should have received a copy of the GNU General Public License
17// along with this program. If not, see <https://www.gnu.org/licenses/>.
18
19//! Implementations of the `IntoConnectionHandler` and `ConnectionHandler` traits for both incoming
20//! and outgoing substreams for all gossiping protocols.
21//!
22//! This is the main implementation of `ConnectionHandler` in this crate, that handles all the
23//! gossiping protocols that are Substrate-related and outside of the scope of libp2p.
24//!
25//! # Usage
26//!
27//! From an API perspective, for each of its protocols, the [`NotifsHandler`] is always in one of
28//! the following state (see [`State`]):
29//!
30//! - Closed substream. This is the initial state.
31//! - Closed substream, but remote desires them to be open.
32//! - Open substream.
33//! - Open substream, but remote desires them to be closed.
34//!
35//! Each protocol in the [`NotifsHandler`] can spontaneously switch between these states:
36//!
37//! - "Closed substream" to "Closed substream but open desired". When that happens, a
38//! [`NotifsHandlerOut::OpenDesiredByRemote`] is emitted.
39//! - "Closed substream but open desired" to "Closed substream" (i.e. the remote has cancelled
40//! their request). When that happens, a [`NotifsHandlerOut::CloseDesired`] is emitted.
41//! - "Open substream" to "Open substream but close desired". When that happens, a
42//! [`NotifsHandlerOut::CloseDesired`] is emitted.
43//!
44//! The user can instruct a protocol in the `NotifsHandler` to switch from "closed" to "open" or
45//! vice-versa by sending either a [`NotifsHandlerIn::Open`] or a [`NotifsHandlerIn::Close`]. The
46//! `NotifsHandler` must answer with [`NotifsHandlerOut::OpenResultOk`] or
47//! [`NotifsHandlerOut::OpenResultErr`], or with [`NotifsHandlerOut::CloseResult`].
48//!
49//! When a [`NotifsHandlerOut::OpenResultOk`] is emitted, the substream is now in the open state.
50//! When a [`NotifsHandlerOut::OpenResultErr`] or [`NotifsHandlerOut::CloseResult`] is emitted,
51//! the `NotifsHandler` is now (or remains) in the closed state.
52//!
53//! When a [`NotifsHandlerOut::OpenDesiredByRemote`] is emitted, the user should always send back
54//! either a [`NotifsHandlerIn::Open`] or a [`NotifsHandlerIn::Close`].If this isn't done, the
55//! remote will be left in a pending state.
56//!
57//! It is illegal to send a [`NotifsHandlerIn::Open`] before a previously-emitted
58//! [`NotifsHandlerIn::Open`] has gotten an answer.
59
60use crate::{
61	protocol::notifications::upgrade::{
62		NotificationsIn, NotificationsInSubstream, NotificationsOut, NotificationsOutError,
63		NotificationsOutSubstream, UpgradeCollec,
64	},
65	service::metrics::NotificationMetrics,
66	types::ProtocolName,
67};
68
69use bytes::BytesMut;
70use futures::{
71	channel::mpsc,
72	lock::{Mutex as FuturesMutex, MutexGuard as FuturesMutexGuard},
73	prelude::*,
74};
75use libp2p::{
76	swarm::{
77		handler::ConnectionEvent, ConnectionHandler, ConnectionHandlerEvent, Stream,
78		SubstreamProtocol,
79	},
80	PeerId,
81};
82
83use parking_lot::{Mutex, RwLock};
84use std::{
85	collections::VecDeque,
86	mem,
87	pin::Pin,
88	sync::Arc,
89	task::{Context, Poll},
90	time::Duration,
91};
92
93/// Logging target for the file.
94const LOG_TARGET: &str = "sub-libp2p::notification::handler";
95
96/// Number of pending notifications in asynchronous contexts.
97/// See [`NotificationsSink::reserve_notification`] for context.
98pub(crate) const ASYNC_NOTIFICATIONS_BUFFER_SIZE: usize = 8;
99
100/// Number of pending notifications in synchronous contexts.
101const SYNC_NOTIFICATIONS_BUFFER_SIZE: usize = 2048;
102
103/// Maximum duration to open a substream and receive the handshake message. After that, we
104/// consider that we failed to open the substream.
105const OPEN_TIMEOUT: Duration = Duration::from_secs(10);
106
107/// After successfully establishing a connection with the remote, we keep the connection open for
108/// at least this amount of time in order to give the rest of the code the chance to notify us to
109/// open substreams.
110const INITIAL_KEEPALIVE_TIME: Duration = Duration::from_secs(5);
111
112/// The actual handler once the connection has been established.
113///
114/// See the documentation at the module level for more information.
115pub struct NotifsHandler {
116	/// List of notification protocols, specified by the user at initialization.
117	protocols: Vec<Protocol>,
118
119	/// Whether to keep connection alive
120	keep_alive: bool,
121
122	/// Optional future that keeps connection alive for a certain amount of time.
123	// TODO: this should be safe to remove, see https://github.com/paritytech/polkadot-sdk/issues/6350
124	keep_alive_timeout_future: Option<Pin<Box<dyn Future<Output = ()> + Send + 'static>>>,
125
126	/// Remote we are connected to.
127	peer_id: PeerId,
128
129	/// Events to return in priority from `poll`.
130	events_queue: VecDeque<ConnectionHandlerEvent<NotificationsOut, usize, NotifsHandlerOut>>,
131
132	/// Metrics.
133	metrics: Option<Arc<NotificationMetrics>>,
134}
135
136impl NotifsHandler {
137	/// Creates new [`NotifsHandler`].
138	pub fn new(
139		peer_id: PeerId,
140		protocols: Vec<ProtocolConfig>,
141		metrics: Option<NotificationMetrics>,
142	) -> Self {
143		Self {
144			protocols: protocols
145				.into_iter()
146				.map(|config| {
147					let in_upgrade = NotificationsIn::new(
148						config.name.clone(),
149						config.fallback_names.clone(),
150						config.max_notification_size,
151					);
152
153					Protocol { config, in_upgrade, state: State::Closed { pending_opening: false } }
154				})
155				.collect(),
156			peer_id,
157			// Keep connection alive initially until below timeout expires
158			keep_alive: true,
159			// A grace period of `INITIAL_KEEPALIVE_TIME` must be given to leave time for the remote
160			// to express desire to open substreams.
161			// TODO: This is a hack and ideally should not be necessary
162			keep_alive_timeout_future: Some(Box::pin(tokio::time::sleep(INITIAL_KEEPALIVE_TIME))),
163			events_queue: VecDeque::with_capacity(16),
164			metrics: metrics.map_or(None, |metrics| Some(Arc::new(metrics))),
165		}
166	}
167}
168
169/// Configuration for a notifications protocol.
170#[derive(Debug, Clone)]
171pub struct ProtocolConfig {
172	/// Name of the protocol.
173	pub name: ProtocolName,
174	/// Names of the protocol to use if the main one isn't available.
175	pub fallback_names: Vec<ProtocolName>,
176	/// Handshake of the protocol. The `RwLock` is locked every time a new substream is opened.
177	pub handshake: Arc<RwLock<Vec<u8>>>,
178	/// Maximum allowed size for a notification.
179	pub max_notification_size: u64,
180}
181
182/// Fields specific for each individual protocol.
183struct Protocol {
184	/// Other fields.
185	config: ProtocolConfig,
186
187	/// Prototype for the inbound upgrade.
188	in_upgrade: NotificationsIn,
189
190	/// Current state of the substreams for this protocol.
191	state: State,
192}
193
194/// See the module-level documentation to learn about the meaning of these variants.
195enum State {
196	/// Protocol is in the "Closed" state.
197	Closed {
198		/// True if an outgoing substream is still in the process of being opened.
199		pending_opening: bool,
200	},
201
202	/// Protocol is in the "Closed" state. A [`NotifsHandlerOut::OpenDesiredByRemote`] has been
203	/// emitted.
204	OpenDesiredByRemote {
205		/// Substream opened by the remote and that hasn't been accepted/rejected yet.
206		in_substream: NotificationsInSubstream<Stream>,
207
208		/// See [`State::Closed::pending_opening`].
209		pending_opening: bool,
210	},
211
212	/// Protocol is in the "Closed" state, but has received a [`NotifsHandlerIn::Open`] and is
213	/// consequently trying to open the various notifications substreams.
214	///
215	/// A [`NotifsHandlerOut::OpenResultOk`] or a [`NotifsHandlerOut::OpenResultErr`] event must
216	/// be emitted when transitioning to respectively [`State::Open`] or [`State::Closed`].
217	Opening {
218		/// Substream opened by the remote. If `Some`, has been accepted.
219		in_substream: Option<NotificationsInSubstream<Stream>>,
220		/// Is the connection inbound.
221		inbound: bool,
222	},
223
224	/// Protocol is in the "Open" state.
225	Open {
226		/// Contains the two `Receiver`s connected to the [`NotificationsSink`] that has been
227		/// sent out. The notifications to send out can be pulled from this receivers.
228		/// We use two different channels in order to have two different channel sizes, but from
229		/// the receiving point of view, the two channels are the same.
230		/// The receivers are fused in case the user drops the [`NotificationsSink`] entirely.
231		notifications_sink_rx: stream::Peekable<
232			stream::Select<
233				stream::Fuse<mpsc::Receiver<NotificationsSinkMessage>>,
234				stream::Fuse<mpsc::Receiver<NotificationsSinkMessage>>,
235			>,
236		>,
237
238		/// Outbound substream that has been accepted by the remote.
239		///
240		/// Always `Some` on transition to [`State::Open`]. Switched to `None` only if the remote
241		/// closed the substream. If `None`, a [`NotifsHandlerOut::CloseDesired`] event has been
242		/// emitted.
243		out_substream: Option<NotificationsOutSubstream<Stream>>,
244
245		/// Substream opened by the remote.
246		///
247		/// Contrary to the `out_substream` field, operations continue as normal even if the
248		/// substream has been closed by the remote. A `None` is treated the same way as if there
249		/// was an idle substream.
250		in_substream: Option<NotificationsInSubstream<Stream>>,
251	},
252}
253
254/// The close reason of an [`NotifsHandlerOut::CloseDesired`] event.
255#[derive(Debug, Clone, Copy, PartialEq, Eq)]
256pub enum CloseReason {
257	/// The remote has requested the substreams to be closed.
258	///
259	/// This can happen when the remote drops the substream or an IO error is encountered.
260	RemoteRequest,
261
262	/// The remote has misbehaved and did not comply with the notification spec.
263	///
264	/// This means for now that the remote has sent data on an outbound substream.
265	ProtocolMisbehavior,
266}
267
268/// Event that can be received by a `NotifsHandler`.
269#[derive(Debug, Clone)]
270pub enum NotifsHandlerIn {
271	/// Instruct the handler to open the notification substreams.
272	///
273	/// Must always be answered by a [`NotifsHandlerOut::OpenResultOk`] or a
274	/// [`NotifsHandlerOut::OpenResultErr`] event.
275	///
276	/// Importantly, it is forbidden to send a [`NotifsHandlerIn::Open`] while a previous one is
277	/// already in the fly. It is however possible if a `Close` is still in the fly.
278	Open {
279		/// Index of the protocol in the list of protocols passed at initialization.
280		protocol_index: usize,
281
282		/// The peer id of the remote.
283		peer_id: PeerId,
284	},
285
286	/// Instruct the handler to close the notification substreams, or reject any pending incoming
287	/// substream request.
288	///
289	/// Must always be answered by a [`NotifsHandlerOut::CloseResult`] event.
290	Close {
291		/// Index of the protocol in the list of protocols passed at initialization.
292		protocol_index: usize,
293	},
294}
295
296/// Event that can be emitted by a `NotifsHandler`.
297#[derive(Debug)]
298pub enum NotifsHandlerOut {
299	/// Acknowledges a [`NotifsHandlerIn::Open`].
300	OpenResultOk {
301		/// Index of the protocol in the list of protocols passed at initialization.
302		protocol_index: usize,
303		/// Name of the protocol that was actually negotiated, if the default one wasn't available.
304		negotiated_fallback: Option<ProtocolName>,
305		/// Handshake that was sent to us.
306		/// This is normally a "Status" message, but this out of the concern of this code.
307		received_handshake: Vec<u8>,
308		/// How notifications can be sent to this node.
309		notifications_sink: NotificationsSink,
310		/// Is the connection inbound.
311		inbound: bool,
312	},
313
314	/// Acknowledges a [`NotifsHandlerIn::Open`]. The remote has refused the attempt to open
315	/// notification substreams.
316	OpenResultErr {
317		/// Index of the protocol in the list of protocols passed at initialization.
318		protocol_index: usize,
319	},
320
321	/// Acknowledges a [`NotifsHandlerIn::Close`].
322	CloseResult {
323		/// Index of the protocol in the list of protocols passed at initialization.
324		protocol_index: usize,
325	},
326
327	/// The remote would like the substreams to be open. Send a [`NotifsHandlerIn::Open`] or a
328	/// [`NotifsHandlerIn::Close`] in order to either accept or deny this request. If a
329	/// [`NotifsHandlerIn::Open`] or [`NotifsHandlerIn::Close`] has been sent before and has not
330	/// yet been acknowledged by a matching [`NotifsHandlerOut`], then you don't need to a send
331	/// another [`NotifsHandlerIn`].
332	OpenDesiredByRemote {
333		/// Index of the protocol in the list of protocols passed at initialization.
334		protocol_index: usize,
335		/// Received handshake.
336		handshake: Vec<u8>,
337	},
338
339	/// The remote would like the substreams to be closed, or the remote peer has misbehaved.
340	///
341	/// Send a [`NotifsHandlerIn::Close`] in order to close them. If a [`NotifsHandlerIn::Close`]
342	/// has been sent before and has not yet been acknowledged by a
343	/// [`NotifsHandlerOut::CloseResult`], then you don't need to a send another one.
344	CloseDesired {
345		/// Index of the protocol in the list of protocols passed at initialization.
346		protocol_index: usize,
347
348		/// The close reason.
349		reason: CloseReason,
350	},
351
352	/// Received a message on a custom protocol substream.
353	///
354	/// Can only happen when the handler is in the open state.
355	Notification {
356		/// Index of the protocol in the list of protocols passed at initialization.
357		protocol_index: usize,
358		/// Message that has been received.
359		message: BytesMut,
360	},
361
362	/// Close connection
363	Close {
364		/// Index of the protocol in the list of protocols passed at initialization.
365		protocol_index: usize,
366	},
367}
368
369/// Sink connected directly to the node background task. Allows sending notifications to the peer.
370///
371/// Can be cloned in order to obtain multiple references to the substream of the same peer.
372#[derive(Debug, Clone)]
373pub struct NotificationsSink {
374	inner: Arc<NotificationsSinkInner>,
375	metrics: Option<Arc<NotificationMetrics>>,
376}
377
378impl NotificationsSink {
379	/// Create new [`NotificationsSink`].
380	/// NOTE: only used for testing but must be `pub` as other crates in `client/network` use this.
381	pub fn new(
382		peer_id: PeerId,
383	) -> (Self, mpsc::Receiver<NotificationsSinkMessage>, mpsc::Receiver<NotificationsSinkMessage>)
384	{
385		let (async_tx, async_rx) = mpsc::channel(ASYNC_NOTIFICATIONS_BUFFER_SIZE);
386		let (sync_tx, sync_rx) = mpsc::channel(SYNC_NOTIFICATIONS_BUFFER_SIZE);
387		(
388			NotificationsSink {
389				inner: Arc::new(NotificationsSinkInner {
390					peer_id,
391					async_channel: FuturesMutex::new(async_tx),
392					sync_channel: Mutex::new(Some(sync_tx)),
393				}),
394				metrics: None,
395			},
396			async_rx,
397			sync_rx,
398		)
399	}
400
401	/// Get reference to metrics.
402	pub fn metrics(&self) -> &Option<Arc<NotificationMetrics>> {
403		&self.metrics
404	}
405}
406
407#[derive(Debug)]
408struct NotificationsSinkInner {
409	/// Target of the sink.
410	peer_id: PeerId,
411	/// Sender to use in asynchronous contexts. Uses an asynchronous mutex.
412	async_channel: FuturesMutex<mpsc::Sender<NotificationsSinkMessage>>,
413	/// Sender to use in synchronous contexts. Uses a synchronous mutex.
414	/// Contains `None` if the channel was full at some point, in which case the channel will
415	/// be closed in the near future anyway.
416	/// This channel has a large capacity and is meant to be used in contexts where
417	/// back-pressure cannot be properly exerted.
418	/// It will be removed in a future version.
419	sync_channel: Mutex<Option<mpsc::Sender<NotificationsSinkMessage>>>,
420}
421
422/// Message emitted through the [`NotificationsSink`] and processed by the background task
423/// dedicated to the peer.
424#[derive(Debug, PartialEq, Eq)]
425pub enum NotificationsSinkMessage {
426	/// Message emitted by [`NotificationsSink::reserve_notification`] and
427	/// [`NotificationsSink::write_notification_now`].
428	Notification { message: Vec<u8> },
429
430	/// Must close the connection.
431	ForceClose,
432}
433
434impl NotificationsSink {
435	/// Returns the [`PeerId`] the sink is connected to.
436	pub fn peer_id(&self) -> &PeerId {
437		&self.inner.peer_id
438	}
439
440	/// Sends a notification to the peer.
441	///
442	/// If too many messages are already buffered, the notification is silently discarded and the
443	/// connection to the peer will be closed shortly after.
444	///
445	/// The protocol name is expected to be checked ahead of calling this method. It is a logic
446	/// error to send a notification using an unknown protocol.
447	///
448	/// This method will be removed in a future version.
449	pub fn send_sync_notification(&self, message: impl Into<Vec<u8>>) {
450		let mut lock = self.inner.sync_channel.lock();
451
452		if let Some(tx) = lock.as_mut() {
453			let message = message.into();
454			let result = tx.try_send(NotificationsSinkMessage::Notification { message });
455
456			if result.is_err() {
457				// Cloning the `mpsc::Sender` guarantees the allocation of an extra spot in the
458				// buffer, and therefore `try_send` will succeed.
459				let _result2 = tx.clone().try_send(NotificationsSinkMessage::ForceClose);
460				debug_assert!(_result2.map(|()| true).unwrap_or_else(|err| err.is_disconnected()));
461
462				// Destroy the sender in order to not send more `ForceClose` messages.
463				*lock = None;
464			}
465		}
466	}
467
468	/// Wait until the remote is ready to accept a notification.
469	///
470	/// Returns an error in the case where the connection is closed.
471	///
472	/// The protocol name is expected to be checked ahead of calling this method. It is a logic
473	/// error to send a notification using an unknown protocol.
474	pub async fn reserve_notification(&self) -> Result<Ready<'_>, ()> {
475		let mut lock = self.inner.async_channel.lock().await;
476
477		let poll_ready = future::poll_fn(|cx| lock.poll_ready(cx)).await;
478		if poll_ready.is_ok() {
479			Ok(Ready { lock })
480		} else {
481			Err(())
482		}
483	}
484}
485
486/// Notification slot is reserved and the notification can actually be sent.
487#[must_use]
488#[derive(Debug)]
489pub struct Ready<'a> {
490	/// Guarded channel. The channel inside is guaranteed to not be full.
491	lock: FuturesMutexGuard<'a, mpsc::Sender<NotificationsSinkMessage>>,
492}
493
494impl<'a> Ready<'a> {
495	/// Consumes this slots reservation and actually queues the notification.
496	///
497	/// Returns an error if the substream has been closed.
498	pub fn send(mut self, notification: impl Into<Vec<u8>>) -> Result<(), ()> {
499		self.lock
500			.start_send(NotificationsSinkMessage::Notification { message: notification.into() })
501			.map_err(|_| ())
502	}
503}
504
505impl ConnectionHandler for NotifsHandler {
506	type FromBehaviour = NotifsHandlerIn;
507	type ToBehaviour = NotifsHandlerOut;
508	type InboundProtocol = UpgradeCollec<NotificationsIn>;
509	type OutboundProtocol = NotificationsOut;
510	// Index within the `out_protocols`.
511	type OutboundOpenInfo = usize;
512	type InboundOpenInfo = ();
513
514	fn listen_protocol(&self) -> SubstreamProtocol<Self::InboundProtocol, ()> {
515		let protocols = self
516			.protocols
517			.iter()
518			.map(|p| p.in_upgrade.clone())
519			.collect::<UpgradeCollec<_>>();
520
521		SubstreamProtocol::new(protocols, ())
522	}
523
524	fn on_connection_event(
525		&mut self,
526		event: ConnectionEvent<
527			'_,
528			Self::InboundProtocol,
529			Self::OutboundProtocol,
530			Self::InboundOpenInfo,
531			Self::OutboundOpenInfo,
532		>,
533	) {
534		match event {
535			ConnectionEvent::FullyNegotiatedInbound(inbound) => {
536				let (mut in_substream_open, protocol_index) = inbound.protocol;
537				let protocol_info = &mut self.protocols[protocol_index];
538
539				match protocol_info.state {
540					State::Closed { pending_opening } => {
541						self.events_queue.push_back(ConnectionHandlerEvent::NotifyBehaviour(
542							NotifsHandlerOut::OpenDesiredByRemote {
543								protocol_index,
544								handshake: in_substream_open.handshake,
545							},
546						));
547
548						protocol_info.state = State::OpenDesiredByRemote {
549							in_substream: in_substream_open.substream,
550							pending_opening,
551						};
552					},
553					State::OpenDesiredByRemote { .. } => {
554						// If a substream already exists, silently drop the new one.
555						// Note that we drop the substream, which will send an equivalent to a
556						// TCP "RST" to the remote and force-close the substream. It might
557						// seem like an unclean way to get rid of a substream. However, keep
558						// in mind that it is invalid for the remote to open multiple such
559						// substreams, and therefore sending a "RST" is the most correct thing
560						// to do.
561						return
562					},
563					State::Opening { ref mut in_substream, .. } |
564					State::Open { ref mut in_substream, .. } => {
565						if in_substream.is_some() {
566							// Same remark as above.
567							return
568						}
569
570						// Create `handshake_message` on a separate line to be sure that the
571						// lock is released as soon as possible.
572						let handshake_message = protocol_info.config.handshake.read().clone();
573						in_substream_open.substream.send_handshake(handshake_message);
574						*in_substream = Some(in_substream_open.substream);
575					},
576				}
577			},
578			ConnectionEvent::FullyNegotiatedOutbound(outbound) => {
579				let (new_open, protocol_index) = (outbound.protocol, outbound.info);
580
581				match self.protocols[protocol_index].state {
582					State::Closed { ref mut pending_opening } |
583					State::OpenDesiredByRemote { ref mut pending_opening, .. } => {
584						debug_assert!(*pending_opening);
585						*pending_opening = false;
586					},
587					State::Open { .. } => {
588						log::error!(target: LOG_TARGET, "☎️ State mismatch in notifications handler");
589						debug_assert!(false);
590					},
591					State::Opening { ref mut in_substream, inbound } => {
592						let (async_tx, async_rx) = mpsc::channel(ASYNC_NOTIFICATIONS_BUFFER_SIZE);
593						let (sync_tx, sync_rx) = mpsc::channel(SYNC_NOTIFICATIONS_BUFFER_SIZE);
594						let notifications_sink = NotificationsSink {
595							inner: Arc::new(NotificationsSinkInner {
596								peer_id: self.peer_id,
597								async_channel: FuturesMutex::new(async_tx),
598								sync_channel: Mutex::new(Some(sync_tx)),
599							}),
600							metrics: self.metrics.clone(),
601						};
602
603						self.protocols[protocol_index].state = State::Open {
604							notifications_sink_rx: stream::select(async_rx.fuse(), sync_rx.fuse())
605								.peekable(),
606							out_substream: Some(new_open.substream),
607							in_substream: in_substream.take(),
608						};
609
610						self.events_queue.push_back(ConnectionHandlerEvent::NotifyBehaviour(
611							NotifsHandlerOut::OpenResultOk {
612								protocol_index,
613								negotiated_fallback: new_open.negotiated_fallback,
614								received_handshake: new_open.handshake,
615								notifications_sink,
616								inbound,
617							},
618						));
619					},
620				}
621			},
622			ConnectionEvent::AddressChange(_address_change) => {},
623			ConnectionEvent::LocalProtocolsChange(_) => {},
624			ConnectionEvent::RemoteProtocolsChange(_) => {},
625			ConnectionEvent::DialUpgradeError(dial_upgrade_error) => match self.protocols
626				[dial_upgrade_error.info]
627				.state
628			{
629				State::Closed { ref mut pending_opening } |
630				State::OpenDesiredByRemote { ref mut pending_opening, .. } => {
631					debug_assert!(*pending_opening);
632					*pending_opening = false;
633				},
634
635				State::Opening { .. } => {
636					self.protocols[dial_upgrade_error.info].state =
637						State::Closed { pending_opening: false };
638
639					self.events_queue.push_back(ConnectionHandlerEvent::NotifyBehaviour(
640						NotifsHandlerOut::OpenResultErr { protocol_index: dial_upgrade_error.info },
641					));
642				},
643
644				// No substream is being open when already `Open`.
645				State::Open { .. } => debug_assert!(false),
646			},
647			ConnectionEvent::ListenUpgradeError(_listen_upgrade_error) => {},
648			event => {
649				log::warn!(target: LOG_TARGET, "New unknown `ConnectionEvent` libp2p event: {event:?}");
650			},
651		}
652	}
653
654	fn on_behaviour_event(&mut self, message: NotifsHandlerIn) {
655		match message {
656			NotifsHandlerIn::Open { protocol_index, peer_id } => {
657				let protocol_info = &mut self.protocols[protocol_index];
658				match &mut protocol_info.state {
659					State::Closed { pending_opening } => {
660						if !*pending_opening {
661							let proto = NotificationsOut::new(
662								protocol_info.config.name.clone(),
663								protocol_info.config.fallback_names.clone(),
664								protocol_info.config.handshake.read().clone(),
665								protocol_info.config.max_notification_size,
666								peer_id,
667							);
668
669							self.events_queue.push_back(
670								ConnectionHandlerEvent::OutboundSubstreamRequest {
671									protocol: SubstreamProtocol::new(proto, protocol_index)
672										.with_timeout(OPEN_TIMEOUT),
673								},
674							);
675						}
676
677						protocol_info.state = State::Opening { in_substream: None, inbound: false };
678					},
679					State::OpenDesiredByRemote { pending_opening, in_substream } => {
680						let handshake_message = protocol_info.config.handshake.read().clone();
681
682						if !*pending_opening {
683							let proto = NotificationsOut::new(
684								protocol_info.config.name.clone(),
685								protocol_info.config.fallback_names.clone(),
686								handshake_message.clone(),
687								protocol_info.config.max_notification_size,
688								peer_id,
689							);
690
691							self.events_queue.push_back(
692								ConnectionHandlerEvent::OutboundSubstreamRequest {
693									protocol: SubstreamProtocol::new(proto, protocol_index)
694										.with_timeout(OPEN_TIMEOUT),
695								},
696							);
697						}
698
699						in_substream.send_handshake(handshake_message);
700
701						// The state change is done in two steps because of borrowing issues.
702						let in_substream = match mem::replace(
703							&mut protocol_info.state,
704							State::Opening { in_substream: None, inbound: false },
705						) {
706							State::OpenDesiredByRemote { in_substream, .. } => in_substream,
707							_ => unreachable!(),
708						};
709						protocol_info.state =
710							State::Opening { in_substream: Some(in_substream), inbound: true };
711					},
712					State::Opening { .. } | State::Open { .. } => {
713						// As documented, it is forbidden to send an `Open` while there is already
714						// one in the fly.
715						log::error!(target: LOG_TARGET, "opening already-opened handler");
716						debug_assert!(false);
717					},
718				}
719			},
720
721			NotifsHandlerIn::Close { protocol_index } => {
722				match self.protocols[protocol_index].state {
723					State::Open { .. } => {
724						self.protocols[protocol_index].state =
725							State::Closed { pending_opening: false };
726					},
727					State::Opening { .. } => {
728						self.protocols[protocol_index].state =
729							State::Closed { pending_opening: true };
730
731						self.events_queue.push_back(ConnectionHandlerEvent::NotifyBehaviour(
732							NotifsHandlerOut::OpenResultErr { protocol_index },
733						));
734					},
735					State::OpenDesiredByRemote { pending_opening, .. } => {
736						self.protocols[protocol_index].state = State::Closed { pending_opening };
737					},
738					State::Closed { .. } => {},
739				}
740
741				self.events_queue.push_back(ConnectionHandlerEvent::NotifyBehaviour(
742					NotifsHandlerOut::CloseResult { protocol_index },
743				));
744			},
745		}
746	}
747
748	fn connection_keep_alive(&self) -> bool {
749		// `Yes` if any protocol has some activity.
750		if self.protocols.iter().any(|p| !matches!(p.state, State::Closed { .. })) {
751			return true;
752		}
753
754		self.keep_alive
755	}
756
757	fn poll(
758		&mut self,
759		cx: &mut Context,
760	) -> Poll<
761		ConnectionHandlerEvent<Self::OutboundProtocol, Self::OutboundOpenInfo, Self::ToBehaviour>,
762	> {
763		{
764			let maybe_keep_alive_timeout_future = &mut self.keep_alive_timeout_future;
765			if let Some(keep_alive_timeout_future) = maybe_keep_alive_timeout_future {
766				if keep_alive_timeout_future.poll_unpin(cx).is_ready() {
767					maybe_keep_alive_timeout_future.take();
768					self.keep_alive = false;
769				}
770			}
771		}
772
773		if let Some(ev) = self.events_queue.pop_front() {
774			return Poll::Ready(ev)
775		}
776
777		// For each open substream, try to send messages from `notifications_sink_rx` to the
778		// substream.
779		for protocol_index in 0..self.protocols.len() {
780			if let State::Open {
781				notifications_sink_rx, out_substream: Some(out_substream), ..
782			} = &mut self.protocols[protocol_index].state
783			{
784				loop {
785					// Only proceed with `out_substream.poll_ready_unpin` if there is an element
786					// available in `notifications_sink_rx`. This avoids waking up the task when
787					// a substream is ready to send if there isn't actually something to send.
788					match Pin::new(&mut *notifications_sink_rx).as_mut().poll_peek(cx) {
789						Poll::Ready(Some(&NotificationsSinkMessage::ForceClose)) =>
790							return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
791								NotifsHandlerOut::Close { protocol_index },
792							)),
793						Poll::Ready(Some(&NotificationsSinkMessage::Notification { .. })) => {},
794						Poll::Ready(None) | Poll::Pending => break,
795					}
796
797					// Before we extract the element from `notifications_sink_rx`, check that the
798					// substream is ready to accept a message.
799					match out_substream.poll_ready_unpin(cx) {
800						Poll::Ready(_) => {},
801						Poll::Pending => break,
802					}
803
804					// Now that the substream is ready for a message, grab what to send.
805					let message = match notifications_sink_rx.poll_next_unpin(cx) {
806						Poll::Ready(Some(NotificationsSinkMessage::Notification { message })) =>
807							message,
808						Poll::Ready(Some(NotificationsSinkMessage::ForceClose)) |
809						Poll::Ready(None) |
810						Poll::Pending => {
811							// Should never be reached, as per `poll_peek` above.
812							debug_assert!(false);
813							break
814						},
815					};
816
817					let _ = out_substream.start_send_unpin(message);
818					// Note that flushing is performed later down this function.
819				}
820			}
821		}
822
823		// Flush all outbound substreams.
824		// When `poll` returns `Poll::Ready`, the libp2p `Swarm` may decide to no longer call
825		// `poll` again before it is ready to accept more events.
826		// In order to make sure that substreams are flushed as soon as possible, the flush is
827		// performed before the code paths that can produce `Ready` (with some rare exceptions).
828		// Importantly, however, the flush is performed *after* notifications are queued with
829		// `Sink::start_send`.
830		// Note that we must call `poll_flush` on all substreams and not only on those we
831		// have called `Sink::start_send` on, because `NotificationsOutSubstream::poll_flush`
832		// also reports the substream termination (even if no data was written into it).
833		for protocol_index in 0..self.protocols.len() {
834			match &mut self.protocols[protocol_index].state {
835				State::Open { out_substream: out_substream @ Some(_), .. } => {
836					match Sink::poll_flush(Pin::new(out_substream.as_mut().unwrap()), cx) {
837						Poll::Pending | Poll::Ready(Ok(())) => {},
838						Poll::Ready(Err(error)) => {
839							*out_substream = None;
840
841							let reason = match error {
842								NotificationsOutError::Io(_) | NotificationsOutError::Closed =>
843									CloseReason::RemoteRequest,
844								NotificationsOutError::UnexpectedData =>
845									CloseReason::ProtocolMisbehavior,
846							};
847
848							let event = NotifsHandlerOut::CloseDesired { protocol_index, reason };
849							return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(event))
850						},
851					};
852				},
853
854				State::Closed { .. } |
855				State::Opening { .. } |
856				State::Open { out_substream: None, .. } |
857				State::OpenDesiredByRemote { .. } => {},
858			}
859		}
860
861		// Poll inbound substreams.
862		for protocol_index in 0..self.protocols.len() {
863			// Inbound substreams being closed is always tolerated, except for the
864			// `OpenDesiredByRemote` state which might need to be switched back to `Closed`.
865			match &mut self.protocols[protocol_index].state {
866				State::Closed { .. } |
867				State::Open { in_substream: None, .. } |
868				State::Opening { in_substream: None, .. } => {},
869
870				State::Open { in_substream: in_substream @ Some(_), .. } =>
871					match futures::prelude::stream::Stream::poll_next(
872						Pin::new(in_substream.as_mut().unwrap()),
873						cx,
874					) {
875						Poll::Pending => {},
876						Poll::Ready(Some(Ok(message))) => {
877							let event = NotifsHandlerOut::Notification { protocol_index, message };
878							return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(event))
879						},
880						Poll::Ready(None) | Poll::Ready(Some(Err(_))) => *in_substream = None,
881					},
882
883				State::OpenDesiredByRemote { in_substream, pending_opening } =>
884					match NotificationsInSubstream::poll_process(Pin::new(in_substream), cx) {
885						Poll::Pending => {},
886						Poll::Ready(Ok(())) => {},
887						Poll::Ready(Err(_)) => {
888							self.protocols[protocol_index].state =
889								State::Closed { pending_opening: *pending_opening };
890							return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
891								NotifsHandlerOut::CloseDesired {
892									protocol_index,
893									reason: CloseReason::RemoteRequest,
894								},
895							))
896						},
897					},
898
899				State::Opening { in_substream: in_substream @ Some(_), .. } =>
900					match NotificationsInSubstream::poll_process(
901						Pin::new(in_substream.as_mut().unwrap()),
902						cx,
903					) {
904						Poll::Pending => {},
905						Poll::Ready(Ok(())) => {},
906						Poll::Ready(Err(_)) => *in_substream = None,
907					},
908			}
909		}
910
911		// This is the only place in this method that can return `Pending`.
912		// By putting it at the very bottom, we are guaranteed that everything has been properly
913		// polled.
914		Poll::Pending
915	}
916}
917
918#[cfg(test)]
919pub mod tests {
920	use super::*;
921	use crate::protocol::notifications::upgrade::{
922		NotificationsInOpen, NotificationsInSubstreamHandshake, NotificationsOutOpen,
923	};
924	use asynchronous_codec::Framed;
925	use libp2p::{
926		core::muxing::SubstreamBox,
927		swarm::handler::{self, StreamUpgradeError},
928	};
929	use multistream_select::{dialer_select_proto, listener_select_proto, Negotiated, Version};
930	use std::{
931		collections::HashMap,
932		io::{Error, IoSlice, IoSliceMut},
933	};
934	use tokio::sync::mpsc;
935	use unsigned_varint::codec::UviBytes;
936
937	struct OpenSubstream {
938		notifications: stream::Peekable<
939			stream::Select<
940				stream::Fuse<futures::channel::mpsc::Receiver<NotificationsSinkMessage>>,
941				stream::Fuse<futures::channel::mpsc::Receiver<NotificationsSinkMessage>>,
942			>,
943		>,
944		_in_substream: MockSubstream,
945		_out_substream: MockSubstream,
946	}
947
948	pub struct ConnectionYielder {
949		connections: HashMap<(PeerId, usize), OpenSubstream>,
950	}
951
952	impl ConnectionYielder {
953		/// Create new [`ConnectionYielder`].
954		pub fn new() -> Self {
955			Self { connections: HashMap::new() }
956		}
957
958		/// Open a new substream for peer.
959		pub fn open_substream(
960			&mut self,
961			peer: PeerId,
962			protocol_index: usize,
963			received_handshake: Vec<u8>,
964		) -> NotifsHandlerOut {
965			let (async_tx, async_rx) =
966				futures::channel::mpsc::channel(ASYNC_NOTIFICATIONS_BUFFER_SIZE);
967			let (sync_tx, sync_rx) =
968				futures::channel::mpsc::channel(SYNC_NOTIFICATIONS_BUFFER_SIZE);
969			let notifications_sink = NotificationsSink {
970				inner: Arc::new(NotificationsSinkInner {
971					peer_id: peer,
972					async_channel: FuturesMutex::new(async_tx),
973					sync_channel: Mutex::new(Some(sync_tx)),
974				}),
975				metrics: None,
976			};
977			let (in_substream, out_substream) = MockSubstream::new();
978
979			self.connections.insert(
980				(peer, protocol_index),
981				OpenSubstream {
982					notifications: stream::select(async_rx.fuse(), sync_rx.fuse()).peekable(),
983					_in_substream: in_substream,
984					_out_substream: out_substream,
985				},
986			);
987
988			NotifsHandlerOut::OpenResultOk {
989				protocol_index,
990				negotiated_fallback: None,
991				received_handshake,
992				notifications_sink,
993				inbound: false,
994			}
995		}
996
997		/// Attempt to get next pending event from one of the notification sinks.
998		pub async fn get_next_event(&mut self, peer: PeerId, set: usize) -> Option<Vec<u8>> {
999			let substream = if let Some(info) = self.connections.get_mut(&(peer, set)) {
1000				info
1001			} else {
1002				return None
1003			};
1004
1005			futures::future::poll_fn(|cx| match substream.notifications.poll_next_unpin(cx) {
1006				Poll::Ready(Some(NotificationsSinkMessage::Notification { message })) =>
1007					Poll::Ready(Some(message)),
1008				Poll::Pending => Poll::Ready(None),
1009				Poll::Ready(Some(NotificationsSinkMessage::ForceClose)) | Poll::Ready(None) => {
1010					panic!("sink closed")
1011				},
1012			})
1013			.await
1014		}
1015	}
1016
1017	struct MockSubstream {
1018		pub rx: mpsc::Receiver<Vec<u8>>,
1019		pub tx: mpsc::Sender<Vec<u8>>,
1020		rx_buffer: BytesMut,
1021	}
1022
1023	/// Mirror of `ActiveStreamCounter` in `libp2p`
1024	#[allow(dead_code)]
1025	struct MockActiveStreamCounter(Arc<()>);
1026
1027	// Mirror of `Stream` in `libp2p`
1028	#[allow(dead_code)]
1029	struct MockStream {
1030		stream: Negotiated<SubstreamBox>,
1031		counter: Option<MockActiveStreamCounter>,
1032	}
1033
1034	impl MockSubstream {
1035		/// Create new substream pair.
1036		pub fn new() -> (Self, Self) {
1037			let (tx1, rx1) = mpsc::channel(32);
1038			let (tx2, rx2) = mpsc::channel(32);
1039
1040			(
1041				Self { rx: rx1, tx: tx2, rx_buffer: BytesMut::with_capacity(512) },
1042				Self { rx: rx2, tx: tx1, rx_buffer: BytesMut::with_capacity(512) },
1043			)
1044		}
1045
1046		/// Create new negotiated substream pair.
1047		pub async fn negotiated() -> (Stream, Stream) {
1048			let (socket1, socket2) = Self::new();
1049			let socket1 = SubstreamBox::new(socket1);
1050			let socket2 = SubstreamBox::new(socket2);
1051
1052			let protos = vec!["/echo/1.0.0", "/echo/2.5.0"];
1053			let (res1, res2) = tokio::join!(
1054				dialer_select_proto(socket1, protos.clone(), Version::V1),
1055				listener_select_proto(socket2, protos),
1056			);
1057
1058			(Self::stream_new(res1.unwrap().1), Self::stream_new(res2.unwrap().1))
1059		}
1060
1061		/// Unsafe substitute for `Stream::new` private constructor.
1062		fn stream_new(stream: Negotiated<SubstreamBox>) -> Stream {
1063			let stream = MockStream { stream, counter: None };
1064			// Static asserts to make sure this doesn't break.
1065			const _: () = {
1066				assert!(core::mem::size_of::<Stream>() == core::mem::size_of::<MockStream>());
1067				assert!(core::mem::align_of::<Stream>() == core::mem::align_of::<MockStream>());
1068			};
1069
1070			unsafe { core::mem::transmute(stream) }
1071		}
1072	}
1073
1074	impl AsyncWrite for MockSubstream {
1075		fn poll_write<'a>(
1076			self: Pin<&mut Self>,
1077			_cx: &mut Context<'a>,
1078			buf: &[u8],
1079		) -> Poll<Result<usize, Error>> {
1080			match self.tx.try_send(buf.to_vec()) {
1081				Ok(_) => Poll::Ready(Ok(buf.len())),
1082				Err(_) => Poll::Ready(Err(std::io::ErrorKind::UnexpectedEof.into())),
1083			}
1084		}
1085
1086		fn poll_flush<'a>(self: Pin<&mut Self>, _cx: &mut Context<'a>) -> Poll<Result<(), Error>> {
1087			Poll::Ready(Ok(()))
1088		}
1089
1090		fn poll_close<'a>(self: Pin<&mut Self>, _cx: &mut Context<'a>) -> Poll<Result<(), Error>> {
1091			Poll::Ready(Ok(()))
1092		}
1093
1094		fn poll_write_vectored<'a, 'b>(
1095			self: Pin<&mut Self>,
1096			_cx: &mut Context<'a>,
1097			_bufs: &[IoSlice<'b>],
1098		) -> Poll<Result<usize, Error>> {
1099			unimplemented!();
1100		}
1101	}
1102
1103	impl AsyncRead for MockSubstream {
1104		fn poll_read<'a>(
1105			mut self: Pin<&mut Self>,
1106			cx: &mut Context<'a>,
1107			buf: &mut [u8],
1108		) -> Poll<Result<usize, Error>> {
1109			match self.rx.poll_recv(cx) {
1110				Poll::Ready(Some(data)) => self.rx_buffer.extend_from_slice(&data),
1111				Poll::Ready(None) =>
1112					return Poll::Ready(Err(std::io::ErrorKind::UnexpectedEof.into())),
1113				_ => {},
1114			}
1115
1116			let nsize = std::cmp::min(self.rx_buffer.len(), buf.len());
1117			let data = self.rx_buffer.split_to(nsize);
1118			buf[..nsize].copy_from_slice(&data[..]);
1119
1120			if nsize > 0 {
1121				return Poll::Ready(Ok(nsize))
1122			}
1123
1124			Poll::Pending
1125		}
1126
1127		fn poll_read_vectored<'a, 'b>(
1128			self: Pin<&mut Self>,
1129			_cx: &mut Context<'a>,
1130			_bufs: &mut [IoSliceMut<'b>],
1131		) -> Poll<Result<usize, Error>> {
1132			unimplemented!();
1133		}
1134	}
1135
1136	/// Create new [`NotifsHandler`].
1137	fn notifs_handler() -> NotifsHandler {
1138		NotifsHandler::new(
1139			PeerId::random(),
1140			vec![ProtocolConfig {
1141				name: "/foo".into(),
1142				fallback_names: vec![],
1143				handshake: Arc::new(RwLock::new(b"hello, world".to_vec())),
1144				max_notification_size: u64::MAX,
1145			}],
1146			None,
1147		)
1148	}
1149
1150	// verify that if another substream is attempted to be opened by remote while an inbound
1151	// substream already exists, the new inbound stream is rejected and closed by the local node.
1152	#[tokio::test]
1153	async fn second_open_desired_by_remote_rejected() {
1154		let mut handler = notifs_handler();
1155		let (io, mut io2) = MockSubstream::negotiated().await;
1156		let mut codec = UviBytes::default();
1157		codec.set_max_len(usize::MAX);
1158
1159		let notif_in = NotificationsInOpen {
1160			handshake: b"hello, world".to_vec(),
1161			substream: NotificationsInSubstream::new(
1162				Framed::new(io, codec),
1163				NotificationsInSubstreamHandshake::NotSent,
1164			),
1165		};
1166
1167		handler.on_connection_event(handler::ConnectionEvent::FullyNegotiatedInbound(
1168			handler::FullyNegotiatedInbound { protocol: (notif_in, 0), info: () },
1169		));
1170
1171		// verify that the substream is in (partly) opened state
1172		assert!(std::matches!(handler.protocols[0].state, State::OpenDesiredByRemote { .. }));
1173		futures::future::poll_fn(|cx| {
1174			let mut buf = Vec::with_capacity(512);
1175			assert!(std::matches!(Pin::new(&mut io2).poll_read(cx, &mut buf), Poll::Pending));
1176			Poll::Ready(())
1177		})
1178		.await;
1179
1180		// attempt to open another inbound substream and verify that it is rejected
1181		let (io, mut io2) = MockSubstream::negotiated().await;
1182		let mut codec = UviBytes::default();
1183		codec.set_max_len(usize::MAX);
1184
1185		let notif_in = NotificationsInOpen {
1186			handshake: b"hello, world".to_vec(),
1187			substream: NotificationsInSubstream::new(
1188				Framed::new(io, codec),
1189				NotificationsInSubstreamHandshake::NotSent,
1190			),
1191		};
1192
1193		handler.on_connection_event(handler::ConnectionEvent::FullyNegotiatedInbound(
1194			handler::FullyNegotiatedInbound { protocol: (notif_in, 0), info: () },
1195		));
1196
1197		// verify that the new substream is rejected and closed
1198		futures::future::poll_fn(|cx| {
1199			let mut buf = Vec::with_capacity(512);
1200
1201			if let Poll::Ready(Err(err)) = Pin::new(&mut io2).poll_read(cx, &mut buf) {
1202				assert_eq!(err.kind(), std::io::ErrorKind::UnexpectedEof,);
1203			}
1204
1205			Poll::Ready(())
1206		})
1207		.await;
1208	}
1209
1210	#[tokio::test]
1211	async fn open_rejected_if_substream_is_opening() {
1212		let mut handler = notifs_handler();
1213		let (io, mut io2) = MockSubstream::negotiated().await;
1214		let mut codec = UviBytes::default();
1215		codec.set_max_len(usize::MAX);
1216
1217		let notif_in = NotificationsInOpen {
1218			handshake: b"hello, world".to_vec(),
1219			substream: NotificationsInSubstream::new(
1220				Framed::new(io, codec),
1221				NotificationsInSubstreamHandshake::NotSent,
1222			),
1223		};
1224
1225		handler.on_connection_event(handler::ConnectionEvent::FullyNegotiatedInbound(
1226			handler::FullyNegotiatedInbound { protocol: (notif_in, 0), info: () },
1227		));
1228
1229		// verify that the substream is in (partly) opened state
1230		assert!(std::matches!(handler.protocols[0].state, State::OpenDesiredByRemote { .. }));
1231		futures::future::poll_fn(|cx| {
1232			let mut buf = Vec::with_capacity(512);
1233			assert!(std::matches!(Pin::new(&mut io2).poll_read(cx, &mut buf), Poll::Pending));
1234			Poll::Ready(())
1235		})
1236		.await;
1237
1238		// move the handler state to 'Opening'
1239		handler.on_behaviour_event(NotifsHandlerIn::Open {
1240			protocol_index: 0,
1241			peer_id: PeerId::random(),
1242		});
1243		assert!(std::matches!(
1244			handler.protocols[0].state,
1245			State::Opening { in_substream: Some(_), .. }
1246		));
1247
1248		// remote now tries to open another substream, verify that it is rejected and closed
1249		let (io, mut io2) = MockSubstream::negotiated().await;
1250		let mut codec = UviBytes::default();
1251		codec.set_max_len(usize::MAX);
1252
1253		let notif_in = NotificationsInOpen {
1254			handshake: b"hello, world".to_vec(),
1255			substream: NotificationsInSubstream::new(
1256				Framed::new(io, codec),
1257				NotificationsInSubstreamHandshake::NotSent,
1258			),
1259		};
1260
1261		handler.on_connection_event(handler::ConnectionEvent::FullyNegotiatedInbound(
1262			handler::FullyNegotiatedInbound { protocol: (notif_in, 0), info: () },
1263		));
1264
1265		// verify that the new substream is rejected and closed but that the first substream is
1266		// still in correct state
1267		futures::future::poll_fn(|cx| {
1268			let mut buf = Vec::with_capacity(512);
1269
1270			if let Poll::Ready(Err(err)) = Pin::new(&mut io2).poll_read(cx, &mut buf) {
1271				assert_eq!(err.kind(), std::io::ErrorKind::UnexpectedEof,);
1272			} else {
1273				panic!("unexpected result");
1274			}
1275
1276			Poll::Ready(())
1277		})
1278		.await;
1279		assert!(std::matches!(
1280			handler.protocols[0].state,
1281			State::Opening { in_substream: Some(_), .. }
1282		));
1283	}
1284
1285	#[tokio::test]
1286	async fn open_rejected_if_substream_already_open() {
1287		let mut handler = notifs_handler();
1288		let (io, mut io2) = MockSubstream::negotiated().await;
1289		let mut codec = UviBytes::default();
1290		codec.set_max_len(usize::MAX);
1291
1292		let notif_in = NotificationsInOpen {
1293			handshake: b"hello, world".to_vec(),
1294			substream: NotificationsInSubstream::new(
1295				Framed::new(io, codec),
1296				NotificationsInSubstreamHandshake::NotSent,
1297			),
1298		};
1299		handler.on_connection_event(handler::ConnectionEvent::FullyNegotiatedInbound(
1300			handler::FullyNegotiatedInbound { protocol: (notif_in, 0), info: () },
1301		));
1302
1303		// verify that the substream is in (partly) opened state
1304		assert!(std::matches!(handler.protocols[0].state, State::OpenDesiredByRemote { .. }));
1305		futures::future::poll_fn(|cx| {
1306			let mut buf = Vec::with_capacity(512);
1307			assert!(std::matches!(Pin::new(&mut io2).poll_read(cx, &mut buf), Poll::Pending));
1308			Poll::Ready(())
1309		})
1310		.await;
1311
1312		// move the handler state to 'Opening'
1313		handler.on_behaviour_event(NotifsHandlerIn::Open {
1314			protocol_index: 0,
1315			peer_id: PeerId::random(),
1316		});
1317		assert!(std::matches!(
1318			handler.protocols[0].state,
1319			State::Opening { in_substream: Some(_), .. }
1320		));
1321
1322		// accept the substream and move its state to `Open`
1323		let (io, _io2) = MockSubstream::negotiated().await;
1324		let mut codec = UviBytes::default();
1325		codec.set_max_len(usize::MAX);
1326
1327		let notif_out = NotificationsOutOpen {
1328			handshake: b"hello, world".to_vec(),
1329			negotiated_fallback: None,
1330			substream: NotificationsOutSubstream::new(Framed::new(io, codec)),
1331		};
1332		handler.on_connection_event(handler::ConnectionEvent::FullyNegotiatedOutbound(
1333			handler::FullyNegotiatedOutbound { protocol: notif_out, info: 0 },
1334		));
1335
1336		assert!(std::matches!(
1337			handler.protocols[0].state,
1338			State::Open { in_substream: Some(_), .. }
1339		));
1340
1341		// remote now tries to open another substream, verify that it is rejected and closed
1342		let (io, mut io2) = MockSubstream::negotiated().await;
1343		let mut codec = UviBytes::default();
1344		codec.set_max_len(usize::MAX);
1345		let notif_in = NotificationsInOpen {
1346			handshake: b"hello, world".to_vec(),
1347			substream: NotificationsInSubstream::new(
1348				Framed::new(io, codec),
1349				NotificationsInSubstreamHandshake::NotSent,
1350			),
1351		};
1352		handler.on_connection_event(handler::ConnectionEvent::FullyNegotiatedInbound(
1353			handler::FullyNegotiatedInbound { protocol: (notif_in, 0), info: () },
1354		));
1355
1356		// verify that the new substream is rejected and closed but that the first substream is
1357		// still in correct state
1358		futures::future::poll_fn(|cx| {
1359			let mut buf = Vec::with_capacity(512);
1360
1361			if let Poll::Ready(Err(err)) = Pin::new(&mut io2).poll_read(cx, &mut buf) {
1362				assert_eq!(err.kind(), std::io::ErrorKind::UnexpectedEof);
1363			} else {
1364				panic!("unexpected result");
1365			}
1366
1367			Poll::Ready(())
1368		})
1369		.await;
1370		assert!(std::matches!(
1371			handler.protocols[0].state,
1372			State::Open { in_substream: Some(_), .. }
1373		));
1374	}
1375
1376	#[tokio::test]
1377	async fn fully_negotiated_resets_state_for_closed_substream() {
1378		let mut handler = notifs_handler();
1379		let (io, mut io2) = MockSubstream::negotiated().await;
1380		let mut codec = UviBytes::default();
1381		codec.set_max_len(usize::MAX);
1382
1383		let notif_in = NotificationsInOpen {
1384			handshake: b"hello, world".to_vec(),
1385			substream: NotificationsInSubstream::new(
1386				Framed::new(io, codec),
1387				NotificationsInSubstreamHandshake::NotSent,
1388			),
1389		};
1390		handler.on_connection_event(handler::ConnectionEvent::FullyNegotiatedInbound(
1391			handler::FullyNegotiatedInbound { protocol: (notif_in, 0), info: () },
1392		));
1393
1394		// verify that the substream is in (partly) opened state
1395		assert!(std::matches!(handler.protocols[0].state, State::OpenDesiredByRemote { .. }));
1396		futures::future::poll_fn(|cx| {
1397			let mut buf = Vec::with_capacity(512);
1398			assert!(std::matches!(Pin::new(&mut io2).poll_read(cx, &mut buf), Poll::Pending));
1399			Poll::Ready(())
1400		})
1401		.await;
1402
1403		// first instruct the handler to open a connection and then close it right after
1404		// so the handler is in state `Closed { pending_opening: true }`
1405		handler.on_behaviour_event(NotifsHandlerIn::Open {
1406			protocol_index: 0,
1407			peer_id: PeerId::random(),
1408		});
1409		assert!(std::matches!(
1410			handler.protocols[0].state,
1411			State::Opening { in_substream: Some(_), .. }
1412		));
1413
1414		handler.on_behaviour_event(NotifsHandlerIn::Close { protocol_index: 0 });
1415		assert!(std::matches!(handler.protocols[0].state, State::Closed { pending_opening: true }));
1416
1417		// verify that if the the outbound substream is successfully negotiated, the state is not
1418		// changed as the substream was commanded to be closed by the handler.
1419		let (io, _io2) = MockSubstream::negotiated().await;
1420		let mut codec = UviBytes::default();
1421		codec.set_max_len(usize::MAX);
1422
1423		let notif_out = NotificationsOutOpen {
1424			handshake: b"hello, world".to_vec(),
1425			negotiated_fallback: None,
1426			substream: NotificationsOutSubstream::new(Framed::new(io, codec)),
1427		};
1428		handler.on_connection_event(handler::ConnectionEvent::FullyNegotiatedOutbound(
1429			handler::FullyNegotiatedOutbound { protocol: notif_out, info: 0 },
1430		));
1431
1432		assert!(std::matches!(
1433			handler.protocols[0].state,
1434			State::Closed { pending_opening: false }
1435		));
1436	}
1437
1438	#[tokio::test]
1439	async fn fully_negotiated_resets_state_for_open_desired_substream() {
1440		let mut handler = notifs_handler();
1441		let (io, mut io2) = MockSubstream::negotiated().await;
1442		let mut codec = UviBytes::default();
1443		codec.set_max_len(usize::MAX);
1444
1445		let notif_in = NotificationsInOpen {
1446			handshake: b"hello, world".to_vec(),
1447			substream: NotificationsInSubstream::new(
1448				Framed::new(io, codec),
1449				NotificationsInSubstreamHandshake::NotSent,
1450			),
1451		};
1452		handler.on_connection_event(handler::ConnectionEvent::FullyNegotiatedInbound(
1453			handler::FullyNegotiatedInbound { protocol: (notif_in, 0), info: () },
1454		));
1455
1456		// verify that the substream is in (partly) opened state
1457		assert!(std::matches!(handler.protocols[0].state, State::OpenDesiredByRemote { .. }));
1458		futures::future::poll_fn(|cx| {
1459			let mut buf = Vec::with_capacity(512);
1460			assert!(std::matches!(Pin::new(&mut io2).poll_read(cx, &mut buf), Poll::Pending));
1461			Poll::Ready(())
1462		})
1463		.await;
1464
1465		// first instruct the handler to open a connection and then close it right after
1466		// so the handler is in state `Closed { pending_opening: true }`
1467		handler.on_behaviour_event(NotifsHandlerIn::Open {
1468			protocol_index: 0,
1469			peer_id: PeerId::random(),
1470		});
1471		assert!(std::matches!(
1472			handler.protocols[0].state,
1473			State::Opening { in_substream: Some(_), .. }
1474		));
1475
1476		handler.on_behaviour_event(NotifsHandlerIn::Close { protocol_index: 0 });
1477		assert!(std::matches!(handler.protocols[0].state, State::Closed { pending_opening: true }));
1478
1479		// attempt to open another inbound substream and verify that it is rejected
1480		let (io, _io2) = MockSubstream::negotiated().await;
1481		let mut codec = UviBytes::default();
1482		codec.set_max_len(usize::MAX);
1483
1484		let notif_in = NotificationsInOpen {
1485			handshake: b"hello, world".to_vec(),
1486			substream: NotificationsInSubstream::new(
1487				Framed::new(io, codec),
1488				NotificationsInSubstreamHandshake::NotSent,
1489			),
1490		};
1491		handler.on_connection_event(handler::ConnectionEvent::FullyNegotiatedInbound(
1492			handler::FullyNegotiatedInbound { protocol: (notif_in, 0), info: () },
1493		));
1494
1495		assert!(std::matches!(
1496			handler.protocols[0].state,
1497			State::OpenDesiredByRemote { pending_opening: true, .. }
1498		));
1499
1500		// verify that if the the outbound substream is successfully negotiated, the state is not
1501		// changed as the substream was commanded to be closed by the handler.
1502		let (io, _io2) = MockSubstream::negotiated().await;
1503		let mut codec = UviBytes::default();
1504		codec.set_max_len(usize::MAX);
1505
1506		let notif_out = NotificationsOutOpen {
1507			handshake: b"hello, world".to_vec(),
1508			negotiated_fallback: None,
1509			substream: NotificationsOutSubstream::new(Framed::new(io, codec)),
1510		};
1511
1512		handler.on_connection_event(handler::ConnectionEvent::FullyNegotiatedOutbound(
1513			handler::FullyNegotiatedOutbound { protocol: notif_out, info: 0 },
1514		));
1515
1516		assert!(std::matches!(
1517			handler.protocols[0].state,
1518			State::OpenDesiredByRemote { pending_opening: false, .. }
1519		));
1520	}
1521
1522	#[tokio::test]
1523	async fn dial_upgrade_error_resets_closed_outbound_state() {
1524		let mut handler = notifs_handler();
1525		let (io, mut io2) = MockSubstream::negotiated().await;
1526		let mut codec = UviBytes::default();
1527		codec.set_max_len(usize::MAX);
1528
1529		let notif_in = NotificationsInOpen {
1530			handshake: b"hello, world".to_vec(),
1531			substream: NotificationsInSubstream::new(
1532				Framed::new(io, codec),
1533				NotificationsInSubstreamHandshake::NotSent,
1534			),
1535		};
1536		handler.on_connection_event(handler::ConnectionEvent::FullyNegotiatedInbound(
1537			handler::FullyNegotiatedInbound { protocol: (notif_in, 0), info: () },
1538		));
1539
1540		// verify that the substream is in (partly) opened state
1541		assert!(std::matches!(handler.protocols[0].state, State::OpenDesiredByRemote { .. }));
1542		futures::future::poll_fn(|cx| {
1543			let mut buf = Vec::with_capacity(512);
1544			assert!(std::matches!(Pin::new(&mut io2).poll_read(cx, &mut buf), Poll::Pending));
1545			Poll::Ready(())
1546		})
1547		.await;
1548
1549		// first instruct the handler to open a connection and then close it right after
1550		// so the handler is in state `Closed { pending_opening: true }`
1551		handler.on_behaviour_event(NotifsHandlerIn::Open {
1552			protocol_index: 0,
1553			peer_id: PeerId::random(),
1554		});
1555		assert!(std::matches!(
1556			handler.protocols[0].state,
1557			State::Opening { in_substream: Some(_), .. }
1558		));
1559
1560		handler.on_behaviour_event(NotifsHandlerIn::Close { protocol_index: 0 });
1561		assert!(std::matches!(handler.protocols[0].state, State::Closed { pending_opening: true }));
1562
1563		// inject dial failure to an already closed substream and verify outbound state is reset
1564		handler.on_connection_event(handler::ConnectionEvent::DialUpgradeError(
1565			handler::DialUpgradeError { info: 0, error: StreamUpgradeError::Timeout },
1566		));
1567		assert!(std::matches!(
1568			handler.protocols[0].state,
1569			State::Closed { pending_opening: false }
1570		));
1571	}
1572
1573	#[tokio::test]
1574	async fn dial_upgrade_error_resets_open_desired_state() {
1575		let mut handler = notifs_handler();
1576		let (io, mut io2) = MockSubstream::negotiated().await;
1577		let mut codec = UviBytes::default();
1578		codec.set_max_len(usize::MAX);
1579
1580		let notif_in = NotificationsInOpen {
1581			handshake: b"hello, world".to_vec(),
1582			substream: NotificationsInSubstream::new(
1583				Framed::new(io, codec),
1584				NotificationsInSubstreamHandshake::NotSent,
1585			),
1586		};
1587		handler.on_connection_event(handler::ConnectionEvent::FullyNegotiatedInbound(
1588			handler::FullyNegotiatedInbound { protocol: (notif_in, 0), info: () },
1589		));
1590
1591		// verify that the substream is in (partly) opened state
1592		assert!(std::matches!(handler.protocols[0].state, State::OpenDesiredByRemote { .. }));
1593		futures::future::poll_fn(|cx| {
1594			let mut buf = Vec::with_capacity(512);
1595			assert!(std::matches!(Pin::new(&mut io2).poll_read(cx, &mut buf), Poll::Pending));
1596			Poll::Ready(())
1597		})
1598		.await;
1599
1600		// first instruct the handler to open a connection and then close it right after
1601		// so the handler is in state `Closed { pending_opening: true }`
1602		handler.on_behaviour_event(NotifsHandlerIn::Open {
1603			protocol_index: 0,
1604			peer_id: PeerId::random(),
1605		});
1606		assert!(std::matches!(
1607			handler.protocols[0].state,
1608			State::Opening { in_substream: Some(_), .. }
1609		));
1610
1611		handler.on_behaviour_event(NotifsHandlerIn::Close { protocol_index: 0 });
1612		assert!(std::matches!(handler.protocols[0].state, State::Closed { pending_opening: true }));
1613
1614		let (io, _io2) = MockSubstream::negotiated().await;
1615		let mut codec = UviBytes::default();
1616		codec.set_max_len(usize::MAX);
1617
1618		let notif_in = NotificationsInOpen {
1619			handshake: b"hello, world".to_vec(),
1620			substream: NotificationsInSubstream::new(
1621				Framed::new(io, codec),
1622				NotificationsInSubstreamHandshake::NotSent,
1623			),
1624		};
1625		handler.on_connection_event(handler::ConnectionEvent::FullyNegotiatedInbound(
1626			handler::FullyNegotiatedInbound { protocol: (notif_in, 0), info: () },
1627		));
1628
1629		assert!(std::matches!(
1630			handler.protocols[0].state,
1631			State::OpenDesiredByRemote { pending_opening: true, .. }
1632		));
1633
1634		// inject dial failure to an already closed substream and verify outbound state is reset
1635		handler.on_connection_event(handler::ConnectionEvent::DialUpgradeError(
1636			handler::DialUpgradeError { info: 0, error: StreamUpgradeError::Timeout },
1637		));
1638		assert!(std::matches!(
1639			handler.protocols[0].state,
1640			State::OpenDesiredByRemote { pending_opening: false, .. }
1641		));
1642	}
1643
1644	#[tokio::test]
1645	async fn sync_notifications_clogged() {
1646		let mut handler = notifs_handler();
1647		let (io, _) = MockSubstream::negotiated().await;
1648		let codec = UviBytes::default();
1649
1650		let (async_tx, async_rx) = futures::channel::mpsc::channel(ASYNC_NOTIFICATIONS_BUFFER_SIZE);
1651		let (sync_tx, sync_rx) = futures::channel::mpsc::channel(1);
1652		let notifications_sink = NotificationsSink {
1653			inner: Arc::new(NotificationsSinkInner {
1654				peer_id: PeerId::random(),
1655				async_channel: FuturesMutex::new(async_tx),
1656				sync_channel: Mutex::new(Some(sync_tx)),
1657			}),
1658			metrics: None,
1659		};
1660
1661		handler.protocols[0].state = State::Open {
1662			notifications_sink_rx: stream::select(async_rx.fuse(), sync_rx.fuse()).peekable(),
1663			out_substream: Some(NotificationsOutSubstream::new(Framed::new(io, codec))),
1664			in_substream: None,
1665		};
1666
1667		notifications_sink.send_sync_notification(vec![1, 3, 3, 7]);
1668		notifications_sink.send_sync_notification(vec![1, 3, 3, 8]);
1669		notifications_sink.send_sync_notification(vec![1, 3, 3, 9]);
1670		notifications_sink.send_sync_notification(vec![1, 3, 4, 0]);
1671
1672		futures::future::poll_fn(|cx| {
1673			assert!(std::matches!(
1674				handler.poll(cx),
1675				Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
1676					NotifsHandlerOut::Close { .. }
1677				))
1678			));
1679			Poll::Ready(())
1680		})
1681		.await;
1682	}
1683
1684	#[tokio::test]
1685	async fn close_desired_by_remote() {
1686		let mut handler = notifs_handler();
1687		let (io, io2) = MockSubstream::negotiated().await;
1688		let mut codec = UviBytes::default();
1689		codec.set_max_len(usize::MAX);
1690
1691		let notif_in = NotificationsInOpen {
1692			handshake: b"hello, world".to_vec(),
1693			substream: NotificationsInSubstream::new(
1694				Framed::new(io, codec),
1695				NotificationsInSubstreamHandshake::PendingSend(vec![1, 2, 3, 4]),
1696			),
1697		};
1698
1699		// add new inbound substream but close it immediately and verify that correct events are
1700		// emitted
1701		handler.on_connection_event(handler::ConnectionEvent::FullyNegotiatedInbound(
1702			handler::FullyNegotiatedInbound { protocol: (notif_in, 0), info: () },
1703		));
1704		drop(io2);
1705
1706		futures::future::poll_fn(|cx| {
1707			assert!(std::matches!(
1708				handler.poll(cx),
1709				Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
1710					NotifsHandlerOut::OpenDesiredByRemote { protocol_index: 0, .. },
1711				))
1712			));
1713			assert!(std::matches!(
1714				handler.poll(cx),
1715				Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
1716					NotifsHandlerOut::CloseDesired {
1717						protocol_index: 0,
1718						reason: CloseReason::RemoteRequest,
1719					},
1720				))
1721			));
1722			Poll::Ready(())
1723		})
1724		.await;
1725	}
1726}