Skip to main content

sc_network/protocol/notifications/service/
mod.rs

1// This file is part of Substrate.
2
3// Copyright (C) Parity Technologies (UK) Ltd.
4// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
5
6// This program is free software: you can redistribute it and/or modify
7// it under the terms of the GNU General Public License as published by
8// the Free Software Foundation, either version 3 of the License, or
9// (at your option) any later version.
10
11// This program is distributed in the hope that it will be useful,
12// but WITHOUT ANY WARRANTY; without even the implied warranty of
13// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14// GNU General Public License for more details.
15
16// You should have received a copy of the GNU General Public License
17// along with this program. If not, see <https://www.gnu.org/licenses/>.
18
19//! Notification service implementation.
20
21use crate::{
22	error,
23	protocol::notifications::handler::NotificationsSink,
24	service::{
25		metrics::NotificationMetrics,
26		traits::{
27			Direction, MessageSink, NotificationEvent, NotificationService, ValidationResult,
28		},
29	},
30	types::ProtocolName,
31};
32
33use futures::{
34	stream::{FuturesUnordered, Stream},
35	StreamExt,
36};
37use libp2p::PeerId;
38use parking_lot::Mutex;
39use tokio::sync::{mpsc, oneshot};
40use tokio_stream::wrappers::ReceiverStream;
41
42use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender};
43
44use std::{collections::HashMap, fmt::Debug, sync::Arc};
45
46pub(crate) mod metrics;
47
48#[cfg(test)]
49mod tests;
50
51/// Logging target for the file.
52const LOG_TARGET: &str = "sub-libp2p::notification::service";
53
54/// Default command queue size.
55const COMMAND_QUEUE_SIZE: usize = 64;
56
57/// Type representing subscribers of a notification protocol.
58type Subscribers = Arc<Mutex<Vec<TracingUnboundedSender<InnerNotificationEvent>>>>;
59
60/// Type representing a distributable message sink.
61/// Detached message sink must carry the protocol name for registering metrics.
62///
63/// See documentation for [`PeerContext`] for more details.
64type NotificationSink = Arc<Mutex<(NotificationsSink, ProtocolName)>>;
65
66#[async_trait::async_trait]
67impl MessageSink for NotificationSink {
68	/// Send synchronous `notification` to the peer associated with this [`MessageSink`].
69	fn send_sync_notification(&self, notification: Vec<u8>) {
70		let sink = self.lock();
71
72		metrics::register_notification_sent(sink.0.metrics(), &sink.1, notification.len());
73		sink.0.send_sync_notification(notification);
74	}
75
76	/// Send an asynchronous `notification` to the peer associated with this [`MessageSink`],
77	/// allowing sender to exercise backpressure.
78	///
79	/// Returns an error if the peer does not exist.
80	async fn send_async_notification(&self, notification: Vec<u8>) -> Result<(), error::Error> {
81		// notification sink must be cloned because the lock cannot be held across `.await`
82		// this makes the implementation less efficient but not prohibitively so as the same
83		// method is also used by `NetworkService` when sending notifications.
84		let notification_len = notification.len();
85		let sink = self.lock().clone();
86		let permit = sink
87			.0
88			.reserve_notification()
89			.await
90			.map_err(|_| error::Error::ConnectionClosed)?;
91
92		permit.send(notification).map_err(|_| error::Error::ChannelClosed).inspect(|_| {
93			metrics::register_notification_sent(sink.0.metrics(), &sink.1, notification_len);
94		})
95	}
96}
97
98/// Inner notification event to deal with `NotificationsSinks` without exposing that
99/// implementation detail to [`NotificationService`] consumers.
100#[derive(Debug)]
101enum InnerNotificationEvent {
102	/// Validate inbound substream.
103	ValidateInboundSubstream {
104		/// Peer ID.
105		peer: PeerId,
106
107		/// Received handshake.
108		handshake: Vec<u8>,
109
110		/// `oneshot::Sender` for sending validation result back to `Notifications`
111		result_tx: oneshot::Sender<ValidationResult>,
112	},
113
114	/// Notification substream open to `peer`.
115	NotificationStreamOpened {
116		/// Peer ID.
117		peer: PeerId,
118
119		/// Direction of the substream.
120		direction: Direction,
121
122		/// Received handshake.
123		handshake: Vec<u8>,
124
125		/// Negotiated fallback.
126		negotiated_fallback: Option<ProtocolName>,
127
128		/// Notification sink.
129		sink: NotificationsSink,
130	},
131
132	/// Substream was closed.
133	NotificationStreamClosed {
134		/// Peer ID.
135		peer: PeerId,
136	},
137
138	/// Notification was received from the substream.
139	NotificationReceived {
140		/// Peer ID.
141		peer: PeerId,
142
143		/// Received notification.
144		notification: Vec<u8>,
145	},
146
147	/// Notification sink has been replaced.
148	NotificationSinkReplaced {
149		/// Peer ID.
150		peer: PeerId,
151
152		/// Notification sink.
153		sink: NotificationsSink,
154	},
155}
156
157/// Notification commands.
158///
159/// Sent by the installed protocols to `Notifications` to open/close/modify substreams.
160#[derive(Debug)]
161pub enum NotificationCommand {
162	/// Instruct `Notifications` to open a substream to peer.
163	#[allow(unused)]
164	OpenSubstream(PeerId),
165
166	/// Instruct `Notifications` to close the substream to peer.
167	#[allow(unused)]
168	CloseSubstream(PeerId),
169
170	/// Set handshake for the notifications protocol.
171	SetHandshake(Vec<u8>),
172}
173
174/// Context assigned to each peer.
175///
176/// Contains `NotificationsSink` used by [`NotificationService`] to send notifications
177/// and an additional, distributable `NotificationsSink` which the protocol may acquire
178/// if it wishes to send notifications through `NotificationsSink` directly.
179///
180/// The distributable `NotificationsSink` is wrapped in an `Arc<Mutex<>>` to allow
181/// `NotificationsService` to swap the underlying sink in case it's replaced.
182#[derive(Debug, Clone)]
183struct PeerContext {
184	/// Sink for sending notifications.
185	sink: NotificationsSink,
186
187	/// Distributable notification sink.
188	shared_sink: NotificationSink,
189}
190
191/// Handle that is passed on to the notifications protocol.
192#[derive(Debug)]
193pub struct NotificationHandle {
194	/// Protocol name.
195	protocol: ProtocolName,
196
197	/// TX channel for sending commands to `Notifications`.
198	tx: mpsc::Sender<NotificationCommand>,
199
200	/// RX channel for receiving events from `Notifications`.
201	rx: TracingUnboundedReceiver<InnerNotificationEvent>,
202
203	/// All subscribers of `NotificationEvent`s.
204	subscribers: Subscribers,
205
206	/// Connected peers.
207	peers: HashMap<PeerId, PeerContext>,
208}
209
210impl NotificationHandle {
211	/// Create new [`NotificationHandle`].
212	fn new(
213		protocol: ProtocolName,
214		tx: mpsc::Sender<NotificationCommand>,
215		rx: TracingUnboundedReceiver<InnerNotificationEvent>,
216		subscribers: Arc<Mutex<Vec<TracingUnboundedSender<InnerNotificationEvent>>>>,
217	) -> Self {
218		Self { protocol, tx, rx, subscribers, peers: HashMap::new() }
219	}
220}
221
222#[async_trait::async_trait]
223impl NotificationService for NotificationHandle {
224	/// Instruct `Notifications` to open a new substream for `peer`.
225	async fn open_substream(&mut self, _peer: sc_network_types::PeerId) -> Result<(), ()> {
226		todo!("support for opening substreams not implemented yet");
227	}
228
229	/// Instruct `Notifications` to close substream for `peer`.
230	async fn close_substream(&mut self, _peer: sc_network_types::PeerId) -> Result<(), ()> {
231		todo!("support for closing substreams not implemented yet, call `NetworkService::disconnect_peer()` instead");
232	}
233
234	/// Send synchronous `notification` to `peer`.
235	fn send_sync_notification(&mut self, peer: &sc_network_types::PeerId, notification: Vec<u8>) {
236		if let Some(info) = self.peers.get(&((*peer).into())) {
237			metrics::register_notification_sent(
238				info.sink.metrics(),
239				&self.protocol,
240				notification.len(),
241			);
242
243			let _ = info.sink.send_sync_notification(notification);
244		}
245	}
246
247	/// Send asynchronous `notification` to `peer`, allowing sender to exercise backpressure.
248	async fn send_async_notification(
249		&mut self,
250		peer: &sc_network_types::PeerId,
251		notification: Vec<u8>,
252	) -> Result<(), error::Error> {
253		let notification_len = notification.len();
254		let sink = &self
255			.peers
256			.get(&peer.into())
257			.ok_or_else(|| error::Error::PeerDoesntExist((*peer).into()))?
258			.sink;
259
260		sink.reserve_notification()
261			.await
262			.map_err(|_| error::Error::ConnectionClosed)?
263			.send(notification)
264			.map_err(|_| error::Error::ChannelClosed)
265			.inspect(|_| {
266				metrics::register_notification_sent(
267					sink.metrics(),
268					&self.protocol,
269					notification_len,
270				);
271			})
272	}
273
274	/// Set handshake for the notification protocol replacing the old handshake.
275	async fn set_handshake(&mut self, handshake: Vec<u8>) -> Result<(), ()> {
276		log::trace!(target: LOG_TARGET, "{}: set handshake to {handshake:?}", self.protocol);
277
278		self.tx.send(NotificationCommand::SetHandshake(handshake)).await.map_err(|_| ())
279	}
280
281	/// Non-blocking variant of `set_handshake()` that attempts to update the handshake
282	/// and returns an error if the channel is blocked.
283	///
284	/// Technically the function can return an error if the channel to `Notifications` is closed
285	/// but that doesn't happen under normal operation.
286	fn try_set_handshake(&mut self, handshake: Vec<u8>) -> Result<(), ()> {
287		self.tx.try_send(NotificationCommand::SetHandshake(handshake)).map_err(|_| ())
288	}
289
290	/// Get next event from the `Notifications` event stream.
291	async fn next_event(&mut self) -> Option<NotificationEvent> {
292		loop {
293			match self.rx.next().await? {
294				InnerNotificationEvent::ValidateInboundSubstream { peer, handshake, result_tx } => {
295					return Some(NotificationEvent::ValidateInboundSubstream {
296						peer: peer.into(),
297						handshake,
298						result_tx,
299					})
300				},
301				InnerNotificationEvent::NotificationStreamOpened {
302					peer,
303					handshake,
304					negotiated_fallback,
305					direction,
306					sink,
307				} => {
308					self.peers.insert(
309						peer,
310						PeerContext {
311							sink: sink.clone(),
312							shared_sink: Arc::new(Mutex::new((sink, self.protocol.clone()))),
313						},
314					);
315					return Some(NotificationEvent::NotificationStreamOpened {
316						peer: peer.into(),
317						handshake,
318						direction,
319						negotiated_fallback,
320					});
321				},
322				InnerNotificationEvent::NotificationStreamClosed { peer } => {
323					self.peers.remove(&peer);
324					return Some(NotificationEvent::NotificationStreamClosed { peer: peer.into() });
325				},
326				InnerNotificationEvent::NotificationReceived { peer, notification } => {
327					return Some(NotificationEvent::NotificationReceived {
328						peer: peer.into(),
329						notification,
330					})
331				},
332				InnerNotificationEvent::NotificationSinkReplaced { peer, sink } => {
333					match self.peers.get_mut(&peer) {
334						None => log::error!(
335							"{}: notification sink replaced for {peer} but peer does not exist",
336							self.protocol
337						),
338						Some(context) => {
339							context.sink = sink.clone();
340							*context.shared_sink.lock() = (sink.clone(), self.protocol.clone());
341						},
342					}
343				},
344			}
345		}
346	}
347
348	// Clone [`NotificationService`]
349	fn clone(&mut self) -> Result<Box<dyn NotificationService>, ()> {
350		let mut subscribers = self.subscribers.lock();
351
352		let (event_tx, event_rx) = tracing_unbounded(self.rx.name(), 100_000);
353		subscribers.push(event_tx);
354
355		Ok(Box::new(NotificationHandle {
356			protocol: self.protocol.clone(),
357			tx: self.tx.clone(),
358			rx: event_rx,
359			peers: self.peers.clone(),
360			subscribers: self.subscribers.clone(),
361		}))
362	}
363
364	/// Get protocol name.
365	fn protocol(&self) -> &ProtocolName {
366		&self.protocol
367	}
368
369	/// Get message sink of the peer.
370	fn message_sink(&self, peer: &sc_network_types::PeerId) -> Option<Box<dyn MessageSink>> {
371		match self.peers.get(&peer.into()) {
372			Some(context) => Some(Box::new(context.shared_sink.clone())),
373			None => None,
374		}
375	}
376}
377
378/// Channel pair which allows `Notifications` to interact with a protocol.
379#[derive(Debug)]
380pub struct ProtocolHandlePair {
381	/// Protocol name.
382	protocol: ProtocolName,
383
384	/// Subscribers of the notification protocol events.
385	subscribers: Subscribers,
386
387	// Receiver for notification commands received from the protocol implementation.
388	rx: mpsc::Receiver<NotificationCommand>,
389}
390
391impl ProtocolHandlePair {
392	/// Create new [`ProtocolHandlePair`].
393	fn new(
394		protocol: ProtocolName,
395		subscribers: Subscribers,
396		rx: mpsc::Receiver<NotificationCommand>,
397	) -> Self {
398		Self { protocol, subscribers, rx }
399	}
400
401	/// Consume `self` and split [`ProtocolHandlePair`] into a handle which allows it to send events
402	/// to the protocol and a stream of commands received from the protocol.
403	pub(crate) fn split(
404		self,
405	) -> (ProtocolHandle, Box<dyn Stream<Item = NotificationCommand> + Send + Unpin>) {
406		(
407			ProtocolHandle::new(self.protocol, self.subscribers),
408			Box::new(ReceiverStream::new(self.rx)),
409		)
410	}
411}
412
413/// Handle that is passed on to `Notifications` and allows it to directly communicate
414/// with the protocol.
415#[derive(Debug, Clone)]
416pub(crate) struct ProtocolHandle {
417	/// Protocol name.
418	protocol: ProtocolName,
419
420	/// Subscribers of the notification protocol.
421	subscribers: Subscribers,
422
423	/// Number of connected peers.
424	num_peers: usize,
425
426	/// Delegate validation to `Peerset`.
427	delegate_to_peerset: bool,
428
429	/// Prometheus metrics.
430	metrics: Option<NotificationMetrics>,
431}
432
433pub(crate) enum ValidationCallResult {
434	WaitForValidation(oneshot::Receiver<ValidationResult>),
435	Delegated,
436}
437
438impl ProtocolHandle {
439	/// Create new [`ProtocolHandle`].
440	fn new(protocol: ProtocolName, subscribers: Subscribers) -> Self {
441		Self { protocol, subscribers, num_peers: 0usize, metrics: None, delegate_to_peerset: false }
442	}
443
444	/// Set metrics.
445	pub fn set_metrics(&mut self, metrics: NotificationMetrics) {
446		self.metrics = Some(metrics);
447	}
448
449	/// Delegate validation to `Peerset`.
450	///
451	/// Protocols that do not do any validation themselves and only rely on `Peerset` handling
452	/// validation can disable protocol-side validation entirely by delegating all validation to
453	/// `Peerset`.
454	pub fn delegate_to_peerset(&mut self, delegate: bool) {
455		self.delegate_to_peerset = delegate;
456	}
457
458	/// Report to the protocol that a substream has been opened and it must be validated by the
459	/// protocol.
460	///
461	/// Return `oneshot::Receiver` which allows `Notifications` to poll for the validation result
462	/// from protocol.
463	pub fn report_incoming_substream(
464		&self,
465		peer: PeerId,
466		handshake: Vec<u8>,
467	) -> Result<ValidationCallResult, ()> {
468		let subscribers = self.subscribers.lock();
469
470		log::trace!(
471			target: LOG_TARGET,
472			"{}: report incoming substream for {peer}, handshake {handshake:?}",
473			self.protocol
474		);
475
476		if self.delegate_to_peerset {
477			return Ok(ValidationCallResult::Delegated);
478		}
479
480		// if there is only one subscriber, `Notifications` can wait directly on the
481		// `oneshot::channel()`'s RX half without indirection
482		if subscribers.len() == 1 {
483			let (result_tx, rx) = oneshot::channel();
484			return subscribers[0]
485				.unbounded_send(InnerNotificationEvent::ValidateInboundSubstream {
486					peer,
487					handshake,
488					result_tx,
489				})
490				.map(|_| ValidationCallResult::WaitForValidation(rx))
491				.map_err(|_| ());
492		}
493
494		// if there are multiple subscribers, create a task which waits for all of the
495		// validations to finish and returns the combined result to `Notifications`
496		let mut results: FuturesUnordered<_> = subscribers
497			.iter()
498			.filter_map(|subscriber| {
499				let (result_tx, rx) = oneshot::channel();
500
501				subscriber
502					.unbounded_send(InnerNotificationEvent::ValidateInboundSubstream {
503						peer,
504						handshake: handshake.clone(),
505						result_tx,
506					})
507					.is_ok()
508					.then_some(rx)
509			})
510			.collect();
511
512		let (tx, rx) = oneshot::channel();
513		tokio::spawn(async move {
514			while let Some(event) = results.next().await {
515				match event {
516					Err(_) | Ok(ValidationResult::Reject) => {
517						return tx.send(ValidationResult::Reject)
518					},
519					Ok(ValidationResult::Accept) => {},
520				}
521			}
522
523			return tx.send(ValidationResult::Accept);
524		});
525
526		Ok(ValidationCallResult::WaitForValidation(rx))
527	}
528
529	/// Report to the protocol that a substream has been opened and that it can now use the handle
530	/// to send notifications to the remote peer.
531	pub fn report_substream_opened(
532		&mut self,
533		peer: PeerId,
534		direction: Direction,
535		handshake: Vec<u8>,
536		negotiated_fallback: Option<ProtocolName>,
537		sink: NotificationsSink,
538	) -> Result<(), ()> {
539		metrics::register_substream_opened(&self.metrics, &self.protocol);
540
541		let mut subscribers = self.subscribers.lock();
542		log::trace!(target: LOG_TARGET, "{}: substream opened for {peer:?}", self.protocol);
543
544		subscribers.retain(|subscriber| {
545			subscriber
546				.unbounded_send(InnerNotificationEvent::NotificationStreamOpened {
547					peer,
548					direction,
549					handshake: handshake.clone(),
550					negotiated_fallback: negotiated_fallback.clone(),
551					sink: sink.clone(),
552				})
553				.is_ok()
554		});
555		self.num_peers += 1;
556
557		Ok(())
558	}
559
560	/// Substream was closed.
561	pub fn report_substream_closed(&mut self, peer: PeerId) -> Result<(), ()> {
562		metrics::register_substream_closed(&self.metrics, &self.protocol);
563
564		let mut subscribers = self.subscribers.lock();
565		log::trace!(target: LOG_TARGET, "{}: substream closed for {peer:?}", self.protocol);
566
567		subscribers.retain(|subscriber| {
568			subscriber
569				.unbounded_send(InnerNotificationEvent::NotificationStreamClosed { peer })
570				.is_ok()
571		});
572		self.num_peers -= 1;
573
574		Ok(())
575	}
576
577	/// Notification was received from the substream.
578	pub fn report_notification_received(
579		&mut self,
580		peer: PeerId,
581		notification: Vec<u8>,
582	) -> Result<(), ()> {
583		metrics::register_notification_received(&self.metrics, &self.protocol, notification.len());
584
585		let mut subscribers = self.subscribers.lock();
586		log::trace!(target: LOG_TARGET, "{}: notification received from {peer:?}", self.protocol);
587
588		subscribers.retain(|subscriber| {
589			subscriber
590				.unbounded_send(InnerNotificationEvent::NotificationReceived {
591					peer,
592					notification: notification.clone(),
593				})
594				.is_ok()
595		});
596
597		Ok(())
598	}
599
600	/// Notification sink was replaced.
601	pub fn report_notification_sink_replaced(
602		&mut self,
603		peer: PeerId,
604		sink: NotificationsSink,
605	) -> Result<(), ()> {
606		let mut subscribers = self.subscribers.lock();
607
608		log::trace!(
609			target: LOG_TARGET,
610			"{}: notification sink replaced for {peer:?}",
611			self.protocol
612		);
613
614		subscribers.retain(|subscriber| {
615			subscriber
616				.unbounded_send(InnerNotificationEvent::NotificationSinkReplaced {
617					peer,
618					sink: sink.clone(),
619				})
620				.is_ok()
621		});
622
623		Ok(())
624	}
625
626	/// Get the number of connected peers.
627	pub fn num_peers(&self) -> usize {
628		self.num_peers
629	}
630}
631
632/// Create new (protocol, notification) handle pair.
633///
634/// Handle pair allows `Notifications` and the protocol to communicate with each other directly.
635pub fn notification_service(
636	protocol: ProtocolName,
637) -> (ProtocolHandlePair, Box<dyn NotificationService>) {
638	let (cmd_tx, cmd_rx) = mpsc::channel(COMMAND_QUEUE_SIZE);
639
640	let (event_tx, event_rx) =
641		tracing_unbounded(metric_label_for_protocol(&protocol).leak(), 100_000);
642	let subscribers = Arc::new(Mutex::new(vec![event_tx]));
643
644	(
645		ProtocolHandlePair::new(protocol.clone(), subscribers.clone(), cmd_rx),
646		Box::new(NotificationHandle::new(protocol.clone(), cmd_tx, event_rx, subscribers)),
647	)
648}
649
650// Decorates the mpsc-notification-to-protocol metric with the name of the protocol,
651// to be able to distiguish between different protocols in dashboards.
652fn metric_label_for_protocol(protocol: &ProtocolName) -> String {
653	let protocol_name = protocol.to_string();
654	let keys = protocol_name.split("/").collect::<Vec<_>>();
655	keys.iter()
656		.rev()
657		.take(2) // Last two tokens give the protocol name and version
658		.fold("mpsc-notification-to-protocol".into(), |acc, val| format!("{}-{}", acc, val))
659}