pezsc_network/protocol/notifications/service/
mod.rs

1// This file is part of Bizinikiwi.
2
3// Copyright (C) Parity Technologies (UK) Ltd. and Dijital Kurdistan Tech Institute
4// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
5
6// This program is free software: you can redistribute it and/or modify
7// it under the terms of the GNU General Public License as published by
8// the Free Software Foundation, either version 3 of the License, or
9// (at your option) any later version.
10
11// This program is distributed in the hope that it will be useful,
12// but WITHOUT ANY WARRANTY; without even the implied warranty of
13// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14// GNU General Public License for more details.
15
16// You should have received a copy of the GNU General Public License
17// along with this program. If not, see <https://www.gnu.org/licenses/>.
18
19//! Notification service implementation.
20
21use crate::{
22	error,
23	protocol::notifications::handler::NotificationsSink,
24	service::{
25		metrics::NotificationMetrics,
26		traits::{
27			Direction, MessageSink, NotificationEvent, NotificationService, ValidationResult,
28		},
29	},
30	types::ProtocolName,
31};
32
33use futures::{
34	stream::{FuturesUnordered, Stream},
35	StreamExt,
36};
37use libp2p::PeerId;
38use parking_lot::Mutex;
39use tokio::sync::{mpsc, oneshot};
40use tokio_stream::wrappers::ReceiverStream;
41
42use pezsc_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender};
43
44use std::{collections::HashMap, fmt::Debug, sync::Arc};
45
46pub(crate) mod metrics;
47
48#[cfg(test)]
49mod tests;
50
51/// Logging target for the file.
52const LOG_TARGET: &str = "sub-libp2p::notification::service";
53
54/// Default command queue size.
55const COMMAND_QUEUE_SIZE: usize = 64;
56
57/// Type representing subscribers of a notification protocol.
58type Subscribers = Arc<Mutex<Vec<TracingUnboundedSender<InnerNotificationEvent>>>>;
59
60/// Type representing a distributable message sink.
61/// Detached message sink must carry the protocol name for registering metrics.
62///
63/// See documentation for [`PeerContext`] for more details.
64type NotificationSink = Arc<Mutex<(NotificationsSink, ProtocolName)>>;
65
66#[async_trait::async_trait]
67impl MessageSink for NotificationSink {
68	/// Send synchronous `notification` to the peer associated with this [`MessageSink`].
69	fn send_sync_notification(&self, notification: Vec<u8>) {
70		let sink = self.lock();
71
72		metrics::register_notification_sent(sink.0.metrics(), &sink.1, notification.len());
73		sink.0.send_sync_notification(notification);
74	}
75
76	/// Send an asynchronous `notification` to the peer associated with this [`MessageSink`],
77	/// allowing sender to exercise backpressure.
78	///
79	/// Returns an error if the peer does not exist.
80	async fn send_async_notification(&self, notification: Vec<u8>) -> Result<(), error::Error> {
81		// notification sink must be cloned because the lock cannot be held across `.await`
82		// this makes the implementation less efficient but not prohibitively so as the same
83		// method is also used by `NetworkService` when sending notifications.
84		let notification_len = notification.len();
85		let sink = self.lock().clone();
86		let permit = sink
87			.0
88			.reserve_notification()
89			.await
90			.map_err(|_| error::Error::ConnectionClosed)?;
91
92		permit.send(notification).map_err(|_| error::Error::ChannelClosed).inspect(|_| {
93			metrics::register_notification_sent(sink.0.metrics(), &sink.1, notification_len);
94		})
95	}
96}
97
98/// Inner notification event to deal with `NotificationsSinks` without exposing that
99/// implementation detail to [`NotificationService`] consumers.
100#[derive(Debug)]
101enum InnerNotificationEvent {
102	/// Validate inbound substream.
103	ValidateInboundSubstream {
104		/// Peer ID.
105		peer: PeerId,
106
107		/// Received handshake.
108		handshake: Vec<u8>,
109
110		/// `oneshot::Sender` for sending validation result back to `Notifications`
111		result_tx: oneshot::Sender<ValidationResult>,
112	},
113
114	/// Notification substream open to `peer`.
115	NotificationStreamOpened {
116		/// Peer ID.
117		peer: PeerId,
118
119		/// Direction of the substream.
120		direction: Direction,
121
122		/// Received handshake.
123		handshake: Vec<u8>,
124
125		/// Negotiated fallback.
126		negotiated_fallback: Option<ProtocolName>,
127
128		/// Notification sink.
129		sink: NotificationsSink,
130	},
131
132	/// Substream was closed.
133	NotificationStreamClosed {
134		/// Peer ID.
135		peer: PeerId,
136	},
137
138	/// Notification was received from the substream.
139	NotificationReceived {
140		/// Peer ID.
141		peer: PeerId,
142
143		/// Received notification.
144		notification: Vec<u8>,
145	},
146
147	/// Notification sink has been replaced.
148	NotificationSinkReplaced {
149		/// Peer ID.
150		peer: PeerId,
151
152		/// Notification sink.
153		sink: NotificationsSink,
154	},
155}
156
157/// Notification commands.
158///
159/// Sent by the installed protocols to `Notifications` to open/close/modify substreams.
160#[derive(Debug)]
161pub enum NotificationCommand {
162	/// Instruct `Notifications` to open a substream to peer.
163	#[allow(unused)]
164	OpenSubstream(PeerId),
165
166	/// Instruct `Notifications` to close the substream to peer.
167	#[allow(unused)]
168	CloseSubstream(PeerId),
169
170	/// Set handshake for the notifications protocol.
171	SetHandshake(Vec<u8>),
172}
173
174/// Context assigned to each peer.
175///
176/// Contains `NotificationsSink` used by [`NotificationService`] to send notifications
177/// and an additional, distributable `NotificationsSink` which the protocol may acquire
178/// if it wishes to send notifications through `NotificationsSink` directly.
179///
180/// The distributable `NotificationsSink` is wrapped in an `Arc<Mutex<>>` to allow
181/// `NotificationsService` to swap the underlying sink in case it's replaced.
182#[derive(Debug, Clone)]
183struct PeerContext {
184	/// Sink for sending notifications.
185	sink: NotificationsSink,
186
187	/// Distributable notification sink.
188	shared_sink: NotificationSink,
189}
190
191/// Handle that is passed on to the notifications protocol.
192#[derive(Debug)]
193pub struct NotificationHandle {
194	/// Protocol name.
195	protocol: ProtocolName,
196
197	/// TX channel for sending commands to `Notifications`.
198	tx: mpsc::Sender<NotificationCommand>,
199
200	/// RX channel for receiving events from `Notifications`.
201	rx: TracingUnboundedReceiver<InnerNotificationEvent>,
202
203	/// All subscribers of `NotificationEvent`s.
204	subscribers: Subscribers,
205
206	/// Connected peers.
207	peers: HashMap<PeerId, PeerContext>,
208}
209
210impl NotificationHandle {
211	/// Create new [`NotificationHandle`].
212	fn new(
213		protocol: ProtocolName,
214		tx: mpsc::Sender<NotificationCommand>,
215		rx: TracingUnboundedReceiver<InnerNotificationEvent>,
216		subscribers: Arc<Mutex<Vec<TracingUnboundedSender<InnerNotificationEvent>>>>,
217	) -> Self {
218		Self { protocol, tx, rx, subscribers, peers: HashMap::new() }
219	}
220}
221
222#[async_trait::async_trait]
223impl NotificationService for NotificationHandle {
224	/// Instruct `Notifications` to open a new substream for `peer`.
225	async fn open_substream(&mut self, _peer: pezsc_network_types::PeerId) -> Result<(), ()> {
226		todo!("support for opening substreams not implemented yet");
227	}
228
229	/// Instruct `Notifications` to close substream for `peer`.
230	async fn close_substream(&mut self, _peer: pezsc_network_types::PeerId) -> Result<(), ()> {
231		todo!("support for closing substreams not implemented yet, call `NetworkService::disconnect_peer()` instead");
232	}
233
234	/// Send synchronous `notification` to `peer`.
235	fn send_sync_notification(
236		&mut self,
237		peer: &pezsc_network_types::PeerId,
238		notification: Vec<u8>,
239	) {
240		if let Some(info) = self.peers.get(&((*peer).into())) {
241			metrics::register_notification_sent(
242				info.sink.metrics(),
243				&self.protocol,
244				notification.len(),
245			);
246
247			let _ = info.sink.send_sync_notification(notification);
248		}
249	}
250
251	/// Send asynchronous `notification` to `peer`, allowing sender to exercise backpressure.
252	async fn send_async_notification(
253		&mut self,
254		peer: &pezsc_network_types::PeerId,
255		notification: Vec<u8>,
256	) -> Result<(), error::Error> {
257		let notification_len = notification.len();
258		let sink = &self
259			.peers
260			.get(&peer.into())
261			.ok_or_else(|| error::Error::PeerDoesntExist((*peer).into()))?
262			.sink;
263
264		sink.reserve_notification()
265			.await
266			.map_err(|_| error::Error::ConnectionClosed)?
267			.send(notification)
268			.map_err(|_| error::Error::ChannelClosed)
269			.inspect(|_| {
270				metrics::register_notification_sent(
271					sink.metrics(),
272					&self.protocol,
273					notification_len,
274				);
275			})
276	}
277
278	/// Set handshake for the notification protocol replacing the old handshake.
279	async fn set_handshake(&mut self, handshake: Vec<u8>) -> Result<(), ()> {
280		log::trace!(target: LOG_TARGET, "{}: set handshake to {handshake:?}", self.protocol);
281
282		self.tx.send(NotificationCommand::SetHandshake(handshake)).await.map_err(|_| ())
283	}
284
285	/// Non-blocking variant of `set_handshake()` that attempts to update the handshake
286	/// and returns an error if the channel is blocked.
287	///
288	/// Technically the function can return an error if the channel to `Notifications` is closed
289	/// but that doesn't happen under normal operation.
290	fn try_set_handshake(&mut self, handshake: Vec<u8>) -> Result<(), ()> {
291		self.tx.try_send(NotificationCommand::SetHandshake(handshake)).map_err(|_| ())
292	}
293
294	/// Get next event from the `Notifications` event stream.
295	async fn next_event(&mut self) -> Option<NotificationEvent> {
296		loop {
297			match self.rx.next().await? {
298				InnerNotificationEvent::ValidateInboundSubstream { peer, handshake, result_tx } => {
299					return Some(NotificationEvent::ValidateInboundSubstream {
300						peer: peer.into(),
301						handshake,
302						result_tx,
303					})
304				},
305				InnerNotificationEvent::NotificationStreamOpened {
306					peer,
307					handshake,
308					negotiated_fallback,
309					direction,
310					sink,
311				} => {
312					self.peers.insert(
313						peer,
314						PeerContext {
315							sink: sink.clone(),
316							shared_sink: Arc::new(Mutex::new((sink, self.protocol.clone()))),
317						},
318					);
319					return Some(NotificationEvent::NotificationStreamOpened {
320						peer: peer.into(),
321						handshake,
322						direction,
323						negotiated_fallback,
324					});
325				},
326				InnerNotificationEvent::NotificationStreamClosed { peer } => {
327					self.peers.remove(&peer);
328					return Some(NotificationEvent::NotificationStreamClosed { peer: peer.into() });
329				},
330				InnerNotificationEvent::NotificationReceived { peer, notification } => {
331					return Some(NotificationEvent::NotificationReceived {
332						peer: peer.into(),
333						notification,
334					})
335				},
336				InnerNotificationEvent::NotificationSinkReplaced { peer, sink } => {
337					match self.peers.get_mut(&peer) {
338						None => log::error!(
339							"{}: notification sink replaced for {peer} but peer does not exist",
340							self.protocol
341						),
342						Some(context) => {
343							context.sink = sink.clone();
344							*context.shared_sink.lock() = (sink.clone(), self.protocol.clone());
345						},
346					}
347				},
348			}
349		}
350	}
351
352	// Clone [`NotificationService`]
353	fn clone(&mut self) -> Result<Box<dyn NotificationService>, ()> {
354		let mut subscribers = self.subscribers.lock();
355
356		let (event_tx, event_rx) = tracing_unbounded(self.rx.name(), 100_000);
357		subscribers.push(event_tx);
358
359		Ok(Box::new(NotificationHandle {
360			protocol: self.protocol.clone(),
361			tx: self.tx.clone(),
362			rx: event_rx,
363			peers: self.peers.clone(),
364			subscribers: self.subscribers.clone(),
365		}))
366	}
367
368	/// Get protocol name.
369	fn protocol(&self) -> &ProtocolName {
370		&self.protocol
371	}
372
373	/// Get message sink of the peer.
374	fn message_sink(&self, peer: &pezsc_network_types::PeerId) -> Option<Box<dyn MessageSink>> {
375		match self.peers.get(&peer.into()) {
376			Some(context) => Some(Box::new(context.shared_sink.clone())),
377			None => None,
378		}
379	}
380}
381
382/// Channel pair which allows `Notifications` to interact with a protocol.
383#[derive(Debug)]
384pub struct ProtocolHandlePair {
385	/// Protocol name.
386	protocol: ProtocolName,
387
388	/// Subscribers of the notification protocol events.
389	subscribers: Subscribers,
390
391	// Receiver for notification commands received from the protocol implementation.
392	rx: mpsc::Receiver<NotificationCommand>,
393}
394
395impl ProtocolHandlePair {
396	/// Create new [`ProtocolHandlePair`].
397	fn new(
398		protocol: ProtocolName,
399		subscribers: Subscribers,
400		rx: mpsc::Receiver<NotificationCommand>,
401	) -> Self {
402		Self { protocol, subscribers, rx }
403	}
404
405	/// Consume `self` and split [`ProtocolHandlePair`] into a handle which allows it to send events
406	/// to the protocol and a stream of commands received from the protocol.
407	pub(crate) fn split(
408		self,
409	) -> (ProtocolHandle, Box<dyn Stream<Item = NotificationCommand> + Send + Unpin>) {
410		(
411			ProtocolHandle::new(self.protocol, self.subscribers),
412			Box::new(ReceiverStream::new(self.rx)),
413		)
414	}
415}
416
417/// Handle that is passed on to `Notifications` and allows it to directly communicate
418/// with the protocol.
419#[derive(Debug, Clone)]
420pub(crate) struct ProtocolHandle {
421	/// Protocol name.
422	protocol: ProtocolName,
423
424	/// Subscribers of the notification protocol.
425	subscribers: Subscribers,
426
427	/// Number of connected peers.
428	num_peers: usize,
429
430	/// Delegate validation to `Peerset`.
431	delegate_to_peerset: bool,
432
433	/// Prometheus metrics.
434	metrics: Option<NotificationMetrics>,
435}
436
437pub(crate) enum ValidationCallResult {
438	WaitForValidation(oneshot::Receiver<ValidationResult>),
439	Delegated,
440}
441
442impl ProtocolHandle {
443	/// Create new [`ProtocolHandle`].
444	fn new(protocol: ProtocolName, subscribers: Subscribers) -> Self {
445		Self { protocol, subscribers, num_peers: 0usize, metrics: None, delegate_to_peerset: false }
446	}
447
448	/// Set metrics.
449	pub fn set_metrics(&mut self, metrics: NotificationMetrics) {
450		self.metrics = Some(metrics);
451	}
452
453	/// Delegate validation to `Peerset`.
454	///
455	/// Protocols that do not do any validation themselves and only rely on `Peerset` handling
456	/// validation can disable protocol-side validation entirely by delegating all validation to
457	/// `Peerset`.
458	pub fn delegate_to_peerset(&mut self, delegate: bool) {
459		self.delegate_to_peerset = delegate;
460	}
461
462	/// Report to the protocol that a substream has been opened and it must be validated by the
463	/// protocol.
464	///
465	/// Return `oneshot::Receiver` which allows `Notifications` to poll for the validation result
466	/// from protocol.
467	pub fn report_incoming_substream(
468		&self,
469		peer: PeerId,
470		handshake: Vec<u8>,
471	) -> Result<ValidationCallResult, ()> {
472		let subscribers = self.subscribers.lock();
473
474		log::trace!(
475			target: LOG_TARGET,
476			"{}: report incoming substream for {peer}, handshake {handshake:?}",
477			self.protocol
478		);
479
480		if self.delegate_to_peerset {
481			return Ok(ValidationCallResult::Delegated);
482		}
483
484		// if there is only one subscriber, `Notifications` can wait directly on the
485		// `oneshot::channel()`'s RX half without indirection
486		if subscribers.len() == 1 {
487			let (result_tx, rx) = oneshot::channel();
488			return subscribers[0]
489				.unbounded_send(InnerNotificationEvent::ValidateInboundSubstream {
490					peer,
491					handshake,
492					result_tx,
493				})
494				.map(|_| ValidationCallResult::WaitForValidation(rx))
495				.map_err(|_| ());
496		}
497
498		// if there are multiple subscribers, create a task which waits for all of the
499		// validations to finish and returns the combined result to `Notifications`
500		let mut results: FuturesUnordered<_> = subscribers
501			.iter()
502			.filter_map(|subscriber| {
503				let (result_tx, rx) = oneshot::channel();
504
505				subscriber
506					.unbounded_send(InnerNotificationEvent::ValidateInboundSubstream {
507						peer,
508						handshake: handshake.clone(),
509						result_tx,
510					})
511					.is_ok()
512					.then_some(rx)
513			})
514			.collect();
515
516		let (tx, rx) = oneshot::channel();
517		tokio::spawn(async move {
518			while let Some(event) = results.next().await {
519				match event {
520					Err(_) | Ok(ValidationResult::Reject) => {
521						return tx.send(ValidationResult::Reject)
522					},
523					Ok(ValidationResult::Accept) => {},
524				}
525			}
526
527			return tx.send(ValidationResult::Accept);
528		});
529
530		Ok(ValidationCallResult::WaitForValidation(rx))
531	}
532
533	/// Report to the protocol that a substream has been opened and that it can now use the handle
534	/// to send notifications to the remote peer.
535	pub fn report_substream_opened(
536		&mut self,
537		peer: PeerId,
538		direction: Direction,
539		handshake: Vec<u8>,
540		negotiated_fallback: Option<ProtocolName>,
541		sink: NotificationsSink,
542	) -> Result<(), ()> {
543		metrics::register_substream_opened(&self.metrics, &self.protocol);
544
545		let mut subscribers = self.subscribers.lock();
546		log::trace!(target: LOG_TARGET, "{}: substream opened for {peer:?}", self.protocol);
547
548		subscribers.retain(|subscriber| {
549			subscriber
550				.unbounded_send(InnerNotificationEvent::NotificationStreamOpened {
551					peer,
552					direction,
553					handshake: handshake.clone(),
554					negotiated_fallback: negotiated_fallback.clone(),
555					sink: sink.clone(),
556				})
557				.is_ok()
558		});
559		self.num_peers += 1;
560
561		Ok(())
562	}
563
564	/// Substream was closed.
565	pub fn report_substream_closed(&mut self, peer: PeerId) -> Result<(), ()> {
566		metrics::register_substream_closed(&self.metrics, &self.protocol);
567
568		let mut subscribers = self.subscribers.lock();
569		log::trace!(target: LOG_TARGET, "{}: substream closed for {peer:?}", self.protocol);
570
571		subscribers.retain(|subscriber| {
572			subscriber
573				.unbounded_send(InnerNotificationEvent::NotificationStreamClosed { peer })
574				.is_ok()
575		});
576		self.num_peers -= 1;
577
578		Ok(())
579	}
580
581	/// Notification was received from the substream.
582	pub fn report_notification_received(
583		&mut self,
584		peer: PeerId,
585		notification: Vec<u8>,
586	) -> Result<(), ()> {
587		metrics::register_notification_received(&self.metrics, &self.protocol, notification.len());
588
589		let mut subscribers = self.subscribers.lock();
590		log::trace!(target: LOG_TARGET, "{}: notification received from {peer:?}", self.protocol);
591
592		subscribers.retain(|subscriber| {
593			subscriber
594				.unbounded_send(InnerNotificationEvent::NotificationReceived {
595					peer,
596					notification: notification.clone(),
597				})
598				.is_ok()
599		});
600
601		Ok(())
602	}
603
604	/// Notification sink was replaced.
605	pub fn report_notification_sink_replaced(
606		&mut self,
607		peer: PeerId,
608		sink: NotificationsSink,
609	) -> Result<(), ()> {
610		let mut subscribers = self.subscribers.lock();
611
612		log::trace!(
613			target: LOG_TARGET,
614			"{}: notification sink replaced for {peer:?}",
615			self.protocol
616		);
617
618		subscribers.retain(|subscriber| {
619			subscriber
620				.unbounded_send(InnerNotificationEvent::NotificationSinkReplaced {
621					peer,
622					sink: sink.clone(),
623				})
624				.is_ok()
625		});
626
627		Ok(())
628	}
629
630	/// Get the number of connected peers.
631	pub fn num_peers(&self) -> usize {
632		self.num_peers
633	}
634}
635
636/// Create new (protocol, notification) handle pair.
637///
638/// Handle pair allows `Notifications` and the protocol to communicate with each other directly.
639pub fn notification_service(
640	protocol: ProtocolName,
641) -> (ProtocolHandlePair, Box<dyn NotificationService>) {
642	let (cmd_tx, cmd_rx) = mpsc::channel(COMMAND_QUEUE_SIZE);
643
644	let (event_tx, event_rx) =
645		tracing_unbounded(metric_label_for_protocol(&protocol).leak(), 100_000);
646	let subscribers = Arc::new(Mutex::new(vec![event_tx]));
647
648	(
649		ProtocolHandlePair::new(protocol.clone(), subscribers.clone(), cmd_rx),
650		Box::new(NotificationHandle::new(protocol.clone(), cmd_tx, event_rx, subscribers)),
651	)
652}
653
654// Decorates the mpsc-notification-to-protocol metric with the name of the protocol,
655// to be able to distiguish between different protocols in dashboards.
656fn metric_label_for_protocol(protocol: &ProtocolName) -> String {
657	let protocol_name = protocol.to_string();
658	let keys = protocol_name.split("/").collect::<Vec<_>>();
659	keys.iter()
660		.rev()
661		.take(2) // Last two tokens give the protocol name and version
662		.fold("mpsc-notification-to-protocol".into(), |acc, val| format!("{}-{}", acc, val))
663}