1use crate::{
22 error,
23 protocol::notifications::handler::NotificationsSink,
24 service::{
25 metrics::NotificationMetrics,
26 traits::{
27 Direction, MessageSink, NotificationEvent, NotificationService, ValidationResult,
28 },
29 },
30 types::ProtocolName,
31};
32
33use futures::{
34 stream::{FuturesUnordered, Stream},
35 StreamExt,
36};
37use libp2p::PeerId;
38use parking_lot::Mutex;
39use tokio::sync::{mpsc, oneshot};
40use tokio_stream::wrappers::ReceiverStream;
41
42use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender};
43
44use std::{collections::HashMap, fmt::Debug, sync::Arc};
45
46pub(crate) mod metrics;
47
48#[cfg(test)]
49mod tests;
50
51const LOG_TARGET: &str = "sub-libp2p::notification::service";
53
54const COMMAND_QUEUE_SIZE: usize = 64;
56
57type Subscribers = Arc<Mutex<Vec<TracingUnboundedSender<InnerNotificationEvent>>>>;
59
60type NotificationSink = Arc<Mutex<(NotificationsSink, ProtocolName)>>;
65
66#[async_trait::async_trait]
67impl MessageSink for NotificationSink {
68 fn send_sync_notification(&self, notification: Vec<u8>) {
70 let sink = self.lock();
71
72 metrics::register_notification_sent(sink.0.metrics(), &sink.1, notification.len());
73 sink.0.send_sync_notification(notification);
74 }
75
76 async fn send_async_notification(&self, notification: Vec<u8>) -> Result<(), error::Error> {
81 let notification_len = notification.len();
85 let sink = self.lock().clone();
86 let permit = sink
87 .0
88 .reserve_notification()
89 .await
90 .map_err(|_| error::Error::ConnectionClosed)?;
91
92 permit.send(notification).map_err(|_| error::Error::ChannelClosed).inspect(|_| {
93 metrics::register_notification_sent(sink.0.metrics(), &sink.1, notification_len);
94 })
95 }
96}
97
98#[derive(Debug)]
101enum InnerNotificationEvent {
102 ValidateInboundSubstream {
104 peer: PeerId,
106
107 handshake: Vec<u8>,
109
110 result_tx: oneshot::Sender<ValidationResult>,
112 },
113
114 NotificationStreamOpened {
116 peer: PeerId,
118
119 direction: Direction,
121
122 handshake: Vec<u8>,
124
125 negotiated_fallback: Option<ProtocolName>,
127
128 sink: NotificationsSink,
130 },
131
132 NotificationStreamClosed {
134 peer: PeerId,
136 },
137
138 NotificationReceived {
140 peer: PeerId,
142
143 notification: Vec<u8>,
145 },
146
147 NotificationSinkReplaced {
149 peer: PeerId,
151
152 sink: NotificationsSink,
154 },
155}
156
157#[derive(Debug)]
161pub enum NotificationCommand {
162 #[allow(unused)]
164 OpenSubstream(PeerId),
165
166 #[allow(unused)]
168 CloseSubstream(PeerId),
169
170 SetHandshake(Vec<u8>),
172}
173
174#[derive(Debug, Clone)]
183struct PeerContext {
184 sink: NotificationsSink,
186
187 shared_sink: NotificationSink,
189}
190
191#[derive(Debug)]
193pub struct NotificationHandle {
194 protocol: ProtocolName,
196
197 tx: mpsc::Sender<NotificationCommand>,
199
200 rx: TracingUnboundedReceiver<InnerNotificationEvent>,
202
203 subscribers: Subscribers,
205
206 peers: HashMap<PeerId, PeerContext>,
208}
209
210impl NotificationHandle {
211 fn new(
213 protocol: ProtocolName,
214 tx: mpsc::Sender<NotificationCommand>,
215 rx: TracingUnboundedReceiver<InnerNotificationEvent>,
216 subscribers: Arc<Mutex<Vec<TracingUnboundedSender<InnerNotificationEvent>>>>,
217 ) -> Self {
218 Self { protocol, tx, rx, subscribers, peers: HashMap::new() }
219 }
220}
221
222#[async_trait::async_trait]
223impl NotificationService for NotificationHandle {
224 async fn open_substream(&mut self, _peer: sc_network_types::PeerId) -> Result<(), ()> {
226 todo!("support for opening substreams not implemented yet");
227 }
228
229 async fn close_substream(&mut self, _peer: sc_network_types::PeerId) -> Result<(), ()> {
231 todo!("support for closing substreams not implemented yet, call `NetworkService::disconnect_peer()` instead");
232 }
233
234 fn send_sync_notification(&mut self, peer: &sc_network_types::PeerId, notification: Vec<u8>) {
236 if let Some(info) = self.peers.get(&((*peer).into())) {
237 metrics::register_notification_sent(
238 info.sink.metrics(),
239 &self.protocol,
240 notification.len(),
241 );
242
243 let _ = info.sink.send_sync_notification(notification);
244 }
245 }
246
247 async fn send_async_notification(
249 &mut self,
250 peer: &sc_network_types::PeerId,
251 notification: Vec<u8>,
252 ) -> Result<(), error::Error> {
253 let notification_len = notification.len();
254 let sink = &self
255 .peers
256 .get(&peer.into())
257 .ok_or_else(|| error::Error::PeerDoesntExist((*peer).into()))?
258 .sink;
259
260 sink.reserve_notification()
261 .await
262 .map_err(|_| error::Error::ConnectionClosed)?
263 .send(notification)
264 .map_err(|_| error::Error::ChannelClosed)
265 .inspect(|_| {
266 metrics::register_notification_sent(
267 sink.metrics(),
268 &self.protocol,
269 notification_len,
270 );
271 })
272 }
273
274 async fn set_handshake(&mut self, handshake: Vec<u8>) -> Result<(), ()> {
276 log::trace!(target: LOG_TARGET, "{}: set handshake to {handshake:?}", self.protocol);
277
278 self.tx.send(NotificationCommand::SetHandshake(handshake)).await.map_err(|_| ())
279 }
280
281 fn try_set_handshake(&mut self, handshake: Vec<u8>) -> Result<(), ()> {
287 self.tx.try_send(NotificationCommand::SetHandshake(handshake)).map_err(|_| ())
288 }
289
290 async fn next_event(&mut self) -> Option<NotificationEvent> {
292 loop {
293 match self.rx.next().await? {
294 InnerNotificationEvent::ValidateInboundSubstream { peer, handshake, result_tx } => {
295 return Some(NotificationEvent::ValidateInboundSubstream {
296 peer: peer.into(),
297 handshake,
298 result_tx,
299 })
300 },
301 InnerNotificationEvent::NotificationStreamOpened {
302 peer,
303 handshake,
304 negotiated_fallback,
305 direction,
306 sink,
307 } => {
308 self.peers.insert(
309 peer,
310 PeerContext {
311 sink: sink.clone(),
312 shared_sink: Arc::new(Mutex::new((sink, self.protocol.clone()))),
313 },
314 );
315 return Some(NotificationEvent::NotificationStreamOpened {
316 peer: peer.into(),
317 handshake,
318 direction,
319 negotiated_fallback,
320 });
321 },
322 InnerNotificationEvent::NotificationStreamClosed { peer } => {
323 self.peers.remove(&peer);
324 return Some(NotificationEvent::NotificationStreamClosed { peer: peer.into() });
325 },
326 InnerNotificationEvent::NotificationReceived { peer, notification } => {
327 return Some(NotificationEvent::NotificationReceived {
328 peer: peer.into(),
329 notification,
330 })
331 },
332 InnerNotificationEvent::NotificationSinkReplaced { peer, sink } => {
333 match self.peers.get_mut(&peer) {
334 None => log::error!(
335 "{}: notification sink replaced for {peer} but peer does not exist",
336 self.protocol
337 ),
338 Some(context) => {
339 context.sink = sink.clone();
340 *context.shared_sink.lock() = (sink.clone(), self.protocol.clone());
341 },
342 }
343 },
344 }
345 }
346 }
347
348 fn clone(&mut self) -> Result<Box<dyn NotificationService>, ()> {
350 let mut subscribers = self.subscribers.lock();
351
352 let (event_tx, event_rx) = tracing_unbounded(self.rx.name(), 100_000);
353 subscribers.push(event_tx);
354
355 Ok(Box::new(NotificationHandle {
356 protocol: self.protocol.clone(),
357 tx: self.tx.clone(),
358 rx: event_rx,
359 peers: self.peers.clone(),
360 subscribers: self.subscribers.clone(),
361 }))
362 }
363
364 fn protocol(&self) -> &ProtocolName {
366 &self.protocol
367 }
368
369 fn message_sink(&self, peer: &sc_network_types::PeerId) -> Option<Box<dyn MessageSink>> {
371 match self.peers.get(&peer.into()) {
372 Some(context) => Some(Box::new(context.shared_sink.clone())),
373 None => None,
374 }
375 }
376}
377
378#[derive(Debug)]
380pub struct ProtocolHandlePair {
381 protocol: ProtocolName,
383
384 subscribers: Subscribers,
386
387 rx: mpsc::Receiver<NotificationCommand>,
389}
390
391impl ProtocolHandlePair {
392 fn new(
394 protocol: ProtocolName,
395 subscribers: Subscribers,
396 rx: mpsc::Receiver<NotificationCommand>,
397 ) -> Self {
398 Self { protocol, subscribers, rx }
399 }
400
401 pub(crate) fn split(
404 self,
405 ) -> (ProtocolHandle, Box<dyn Stream<Item = NotificationCommand> + Send + Unpin>) {
406 (
407 ProtocolHandle::new(self.protocol, self.subscribers),
408 Box::new(ReceiverStream::new(self.rx)),
409 )
410 }
411}
412
413#[derive(Debug, Clone)]
416pub(crate) struct ProtocolHandle {
417 protocol: ProtocolName,
419
420 subscribers: Subscribers,
422
423 num_peers: usize,
425
426 delegate_to_peerset: bool,
428
429 metrics: Option<NotificationMetrics>,
431}
432
433pub(crate) enum ValidationCallResult {
434 WaitForValidation(oneshot::Receiver<ValidationResult>),
435 Delegated,
436}
437
438impl ProtocolHandle {
439 fn new(protocol: ProtocolName, subscribers: Subscribers) -> Self {
441 Self { protocol, subscribers, num_peers: 0usize, metrics: None, delegate_to_peerset: false }
442 }
443
444 pub fn set_metrics(&mut self, metrics: NotificationMetrics) {
446 self.metrics = Some(metrics);
447 }
448
449 pub fn delegate_to_peerset(&mut self, delegate: bool) {
455 self.delegate_to_peerset = delegate;
456 }
457
458 pub fn report_incoming_substream(
464 &self,
465 peer: PeerId,
466 handshake: Vec<u8>,
467 ) -> Result<ValidationCallResult, ()> {
468 let subscribers = self.subscribers.lock();
469
470 log::trace!(
471 target: LOG_TARGET,
472 "{}: report incoming substream for {peer}, handshake {handshake:?}",
473 self.protocol
474 );
475
476 if self.delegate_to_peerset {
477 return Ok(ValidationCallResult::Delegated);
478 }
479
480 if subscribers.len() == 1 {
483 let (result_tx, rx) = oneshot::channel();
484 return subscribers[0]
485 .unbounded_send(InnerNotificationEvent::ValidateInboundSubstream {
486 peer,
487 handshake,
488 result_tx,
489 })
490 .map(|_| ValidationCallResult::WaitForValidation(rx))
491 .map_err(|_| ());
492 }
493
494 let mut results: FuturesUnordered<_> = subscribers
497 .iter()
498 .filter_map(|subscriber| {
499 let (result_tx, rx) = oneshot::channel();
500
501 subscriber
502 .unbounded_send(InnerNotificationEvent::ValidateInboundSubstream {
503 peer,
504 handshake: handshake.clone(),
505 result_tx,
506 })
507 .is_ok()
508 .then_some(rx)
509 })
510 .collect();
511
512 let (tx, rx) = oneshot::channel();
513 tokio::spawn(async move {
514 while let Some(event) = results.next().await {
515 match event {
516 Err(_) | Ok(ValidationResult::Reject) => {
517 return tx.send(ValidationResult::Reject)
518 },
519 Ok(ValidationResult::Accept) => {},
520 }
521 }
522
523 return tx.send(ValidationResult::Accept);
524 });
525
526 Ok(ValidationCallResult::WaitForValidation(rx))
527 }
528
529 pub fn report_substream_opened(
532 &mut self,
533 peer: PeerId,
534 direction: Direction,
535 handshake: Vec<u8>,
536 negotiated_fallback: Option<ProtocolName>,
537 sink: NotificationsSink,
538 ) -> Result<(), ()> {
539 metrics::register_substream_opened(&self.metrics, &self.protocol);
540
541 let mut subscribers = self.subscribers.lock();
542 log::trace!(target: LOG_TARGET, "{}: substream opened for {peer:?}", self.protocol);
543
544 subscribers.retain(|subscriber| {
545 subscriber
546 .unbounded_send(InnerNotificationEvent::NotificationStreamOpened {
547 peer,
548 direction,
549 handshake: handshake.clone(),
550 negotiated_fallback: negotiated_fallback.clone(),
551 sink: sink.clone(),
552 })
553 .is_ok()
554 });
555 self.num_peers += 1;
556
557 Ok(())
558 }
559
560 pub fn report_substream_closed(&mut self, peer: PeerId) -> Result<(), ()> {
562 metrics::register_substream_closed(&self.metrics, &self.protocol);
563
564 let mut subscribers = self.subscribers.lock();
565 log::trace!(target: LOG_TARGET, "{}: substream closed for {peer:?}", self.protocol);
566
567 subscribers.retain(|subscriber| {
568 subscriber
569 .unbounded_send(InnerNotificationEvent::NotificationStreamClosed { peer })
570 .is_ok()
571 });
572 self.num_peers -= 1;
573
574 Ok(())
575 }
576
577 pub fn report_notification_received(
579 &mut self,
580 peer: PeerId,
581 notification: Vec<u8>,
582 ) -> Result<(), ()> {
583 metrics::register_notification_received(&self.metrics, &self.protocol, notification.len());
584
585 let mut subscribers = self.subscribers.lock();
586 log::trace!(target: LOG_TARGET, "{}: notification received from {peer:?}", self.protocol);
587
588 subscribers.retain(|subscriber| {
589 subscriber
590 .unbounded_send(InnerNotificationEvent::NotificationReceived {
591 peer,
592 notification: notification.clone(),
593 })
594 .is_ok()
595 });
596
597 Ok(())
598 }
599
600 pub fn report_notification_sink_replaced(
602 &mut self,
603 peer: PeerId,
604 sink: NotificationsSink,
605 ) -> Result<(), ()> {
606 let mut subscribers = self.subscribers.lock();
607
608 log::trace!(
609 target: LOG_TARGET,
610 "{}: notification sink replaced for {peer:?}",
611 self.protocol
612 );
613
614 subscribers.retain(|subscriber| {
615 subscriber
616 .unbounded_send(InnerNotificationEvent::NotificationSinkReplaced {
617 peer,
618 sink: sink.clone(),
619 })
620 .is_ok()
621 });
622
623 Ok(())
624 }
625
626 pub fn num_peers(&self) -> usize {
628 self.num_peers
629 }
630}
631
632pub fn notification_service(
636 protocol: ProtocolName,
637) -> (ProtocolHandlePair, Box<dyn NotificationService>) {
638 let (cmd_tx, cmd_rx) = mpsc::channel(COMMAND_QUEUE_SIZE);
639
640 let (event_tx, event_rx) =
641 tracing_unbounded(metric_label_for_protocol(&protocol).leak(), 100_000);
642 let subscribers = Arc::new(Mutex::new(vec![event_tx]));
643
644 (
645 ProtocolHandlePair::new(protocol.clone(), subscribers.clone(), cmd_rx),
646 Box::new(NotificationHandle::new(protocol.clone(), cmd_tx, event_rx, subscribers)),
647 )
648}
649
650fn metric_label_for_protocol(protocol: &ProtocolName) -> String {
653 let protocol_name = protocol.to_string();
654 let keys = protocol_name.split("/").collect::<Vec<_>>();
655 keys.iter()
656 .rev()
657 .take(2) .fold("mpsc-notification-to-protocol".into(), |acc, val| format!("{}-{}", acc, val))
659}