1use crate::{
22 error,
23 protocol::notifications::handler::NotificationsSink,
24 service::{
25 metrics::NotificationMetrics,
26 traits::{
27 Direction, MessageSink, NotificationEvent, NotificationService, ValidationResult,
28 },
29 },
30 types::ProtocolName,
31};
32
33use futures::{
34 stream::{FuturesUnordered, Stream},
35 StreamExt,
36};
37use libp2p::PeerId;
38use parking_lot::Mutex;
39use tokio::sync::{mpsc, oneshot};
40use tokio_stream::wrappers::ReceiverStream;
41
42use pezsc_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender};
43
44use std::{collections::HashMap, fmt::Debug, sync::Arc};
45
46pub(crate) mod metrics;
47
48#[cfg(test)]
49mod tests;
50
51const LOG_TARGET: &str = "sub-libp2p::notification::service";
53
54const COMMAND_QUEUE_SIZE: usize = 64;
56
57type Subscribers = Arc<Mutex<Vec<TracingUnboundedSender<InnerNotificationEvent>>>>;
59
60type NotificationSink = Arc<Mutex<(NotificationsSink, ProtocolName)>>;
65
66#[async_trait::async_trait]
67impl MessageSink for NotificationSink {
68 fn send_sync_notification(&self, notification: Vec<u8>) {
70 let sink = self.lock();
71
72 metrics::register_notification_sent(sink.0.metrics(), &sink.1, notification.len());
73 sink.0.send_sync_notification(notification);
74 }
75
76 async fn send_async_notification(&self, notification: Vec<u8>) -> Result<(), error::Error> {
81 let notification_len = notification.len();
85 let sink = self.lock().clone();
86 let permit = sink
87 .0
88 .reserve_notification()
89 .await
90 .map_err(|_| error::Error::ConnectionClosed)?;
91
92 permit.send(notification).map_err(|_| error::Error::ChannelClosed).inspect(|_| {
93 metrics::register_notification_sent(sink.0.metrics(), &sink.1, notification_len);
94 })
95 }
96}
97
98#[derive(Debug)]
101enum InnerNotificationEvent {
102 ValidateInboundSubstream {
104 peer: PeerId,
106
107 handshake: Vec<u8>,
109
110 result_tx: oneshot::Sender<ValidationResult>,
112 },
113
114 NotificationStreamOpened {
116 peer: PeerId,
118
119 direction: Direction,
121
122 handshake: Vec<u8>,
124
125 negotiated_fallback: Option<ProtocolName>,
127
128 sink: NotificationsSink,
130 },
131
132 NotificationStreamClosed {
134 peer: PeerId,
136 },
137
138 NotificationReceived {
140 peer: PeerId,
142
143 notification: Vec<u8>,
145 },
146
147 NotificationSinkReplaced {
149 peer: PeerId,
151
152 sink: NotificationsSink,
154 },
155}
156
157#[derive(Debug)]
161pub enum NotificationCommand {
162 #[allow(unused)]
164 OpenSubstream(PeerId),
165
166 #[allow(unused)]
168 CloseSubstream(PeerId),
169
170 SetHandshake(Vec<u8>),
172}
173
174#[derive(Debug, Clone)]
183struct PeerContext {
184 sink: NotificationsSink,
186
187 shared_sink: NotificationSink,
189}
190
191#[derive(Debug)]
193pub struct NotificationHandle {
194 protocol: ProtocolName,
196
197 tx: mpsc::Sender<NotificationCommand>,
199
200 rx: TracingUnboundedReceiver<InnerNotificationEvent>,
202
203 subscribers: Subscribers,
205
206 peers: HashMap<PeerId, PeerContext>,
208}
209
210impl NotificationHandle {
211 fn new(
213 protocol: ProtocolName,
214 tx: mpsc::Sender<NotificationCommand>,
215 rx: TracingUnboundedReceiver<InnerNotificationEvent>,
216 subscribers: Arc<Mutex<Vec<TracingUnboundedSender<InnerNotificationEvent>>>>,
217 ) -> Self {
218 Self { protocol, tx, rx, subscribers, peers: HashMap::new() }
219 }
220}
221
222#[async_trait::async_trait]
223impl NotificationService for NotificationHandle {
224 async fn open_substream(&mut self, _peer: pezsc_network_types::PeerId) -> Result<(), ()> {
226 todo!("support for opening substreams not implemented yet");
227 }
228
229 async fn close_substream(&mut self, _peer: pezsc_network_types::PeerId) -> Result<(), ()> {
231 todo!("support for closing substreams not implemented yet, call `NetworkService::disconnect_peer()` instead");
232 }
233
234 fn send_sync_notification(
236 &mut self,
237 peer: &pezsc_network_types::PeerId,
238 notification: Vec<u8>,
239 ) {
240 if let Some(info) = self.peers.get(&((*peer).into())) {
241 metrics::register_notification_sent(
242 info.sink.metrics(),
243 &self.protocol,
244 notification.len(),
245 );
246
247 let _ = info.sink.send_sync_notification(notification);
248 }
249 }
250
251 async fn send_async_notification(
253 &mut self,
254 peer: &pezsc_network_types::PeerId,
255 notification: Vec<u8>,
256 ) -> Result<(), error::Error> {
257 let notification_len = notification.len();
258 let sink = &self
259 .peers
260 .get(&peer.into())
261 .ok_or_else(|| error::Error::PeerDoesntExist((*peer).into()))?
262 .sink;
263
264 sink.reserve_notification()
265 .await
266 .map_err(|_| error::Error::ConnectionClosed)?
267 .send(notification)
268 .map_err(|_| error::Error::ChannelClosed)
269 .inspect(|_| {
270 metrics::register_notification_sent(
271 sink.metrics(),
272 &self.protocol,
273 notification_len,
274 );
275 })
276 }
277
278 async fn set_handshake(&mut self, handshake: Vec<u8>) -> Result<(), ()> {
280 log::trace!(target: LOG_TARGET, "{}: set handshake to {handshake:?}", self.protocol);
281
282 self.tx.send(NotificationCommand::SetHandshake(handshake)).await.map_err(|_| ())
283 }
284
285 fn try_set_handshake(&mut self, handshake: Vec<u8>) -> Result<(), ()> {
291 self.tx.try_send(NotificationCommand::SetHandshake(handshake)).map_err(|_| ())
292 }
293
294 async fn next_event(&mut self) -> Option<NotificationEvent> {
296 loop {
297 match self.rx.next().await? {
298 InnerNotificationEvent::ValidateInboundSubstream { peer, handshake, result_tx } => {
299 return Some(NotificationEvent::ValidateInboundSubstream {
300 peer: peer.into(),
301 handshake,
302 result_tx,
303 })
304 },
305 InnerNotificationEvent::NotificationStreamOpened {
306 peer,
307 handshake,
308 negotiated_fallback,
309 direction,
310 sink,
311 } => {
312 self.peers.insert(
313 peer,
314 PeerContext {
315 sink: sink.clone(),
316 shared_sink: Arc::new(Mutex::new((sink, self.protocol.clone()))),
317 },
318 );
319 return Some(NotificationEvent::NotificationStreamOpened {
320 peer: peer.into(),
321 handshake,
322 direction,
323 negotiated_fallback,
324 });
325 },
326 InnerNotificationEvent::NotificationStreamClosed { peer } => {
327 self.peers.remove(&peer);
328 return Some(NotificationEvent::NotificationStreamClosed { peer: peer.into() });
329 },
330 InnerNotificationEvent::NotificationReceived { peer, notification } => {
331 return Some(NotificationEvent::NotificationReceived {
332 peer: peer.into(),
333 notification,
334 })
335 },
336 InnerNotificationEvent::NotificationSinkReplaced { peer, sink } => {
337 match self.peers.get_mut(&peer) {
338 None => log::error!(
339 "{}: notification sink replaced for {peer} but peer does not exist",
340 self.protocol
341 ),
342 Some(context) => {
343 context.sink = sink.clone();
344 *context.shared_sink.lock() = (sink.clone(), self.protocol.clone());
345 },
346 }
347 },
348 }
349 }
350 }
351
352 fn clone(&mut self) -> Result<Box<dyn NotificationService>, ()> {
354 let mut subscribers = self.subscribers.lock();
355
356 let (event_tx, event_rx) = tracing_unbounded(self.rx.name(), 100_000);
357 subscribers.push(event_tx);
358
359 Ok(Box::new(NotificationHandle {
360 protocol: self.protocol.clone(),
361 tx: self.tx.clone(),
362 rx: event_rx,
363 peers: self.peers.clone(),
364 subscribers: self.subscribers.clone(),
365 }))
366 }
367
368 fn protocol(&self) -> &ProtocolName {
370 &self.protocol
371 }
372
373 fn message_sink(&self, peer: &pezsc_network_types::PeerId) -> Option<Box<dyn MessageSink>> {
375 match self.peers.get(&peer.into()) {
376 Some(context) => Some(Box::new(context.shared_sink.clone())),
377 None => None,
378 }
379 }
380}
381
382#[derive(Debug)]
384pub struct ProtocolHandlePair {
385 protocol: ProtocolName,
387
388 subscribers: Subscribers,
390
391 rx: mpsc::Receiver<NotificationCommand>,
393}
394
395impl ProtocolHandlePair {
396 fn new(
398 protocol: ProtocolName,
399 subscribers: Subscribers,
400 rx: mpsc::Receiver<NotificationCommand>,
401 ) -> Self {
402 Self { protocol, subscribers, rx }
403 }
404
405 pub(crate) fn split(
408 self,
409 ) -> (ProtocolHandle, Box<dyn Stream<Item = NotificationCommand> + Send + Unpin>) {
410 (
411 ProtocolHandle::new(self.protocol, self.subscribers),
412 Box::new(ReceiverStream::new(self.rx)),
413 )
414 }
415}
416
417#[derive(Debug, Clone)]
420pub(crate) struct ProtocolHandle {
421 protocol: ProtocolName,
423
424 subscribers: Subscribers,
426
427 num_peers: usize,
429
430 delegate_to_peerset: bool,
432
433 metrics: Option<NotificationMetrics>,
435}
436
437pub(crate) enum ValidationCallResult {
438 WaitForValidation(oneshot::Receiver<ValidationResult>),
439 Delegated,
440}
441
442impl ProtocolHandle {
443 fn new(protocol: ProtocolName, subscribers: Subscribers) -> Self {
445 Self { protocol, subscribers, num_peers: 0usize, metrics: None, delegate_to_peerset: false }
446 }
447
448 pub fn set_metrics(&mut self, metrics: NotificationMetrics) {
450 self.metrics = Some(metrics);
451 }
452
453 pub fn delegate_to_peerset(&mut self, delegate: bool) {
459 self.delegate_to_peerset = delegate;
460 }
461
462 pub fn report_incoming_substream(
468 &self,
469 peer: PeerId,
470 handshake: Vec<u8>,
471 ) -> Result<ValidationCallResult, ()> {
472 let subscribers = self.subscribers.lock();
473
474 log::trace!(
475 target: LOG_TARGET,
476 "{}: report incoming substream for {peer}, handshake {handshake:?}",
477 self.protocol
478 );
479
480 if self.delegate_to_peerset {
481 return Ok(ValidationCallResult::Delegated);
482 }
483
484 if subscribers.len() == 1 {
487 let (result_tx, rx) = oneshot::channel();
488 return subscribers[0]
489 .unbounded_send(InnerNotificationEvent::ValidateInboundSubstream {
490 peer,
491 handshake,
492 result_tx,
493 })
494 .map(|_| ValidationCallResult::WaitForValidation(rx))
495 .map_err(|_| ());
496 }
497
498 let mut results: FuturesUnordered<_> = subscribers
501 .iter()
502 .filter_map(|subscriber| {
503 let (result_tx, rx) = oneshot::channel();
504
505 subscriber
506 .unbounded_send(InnerNotificationEvent::ValidateInboundSubstream {
507 peer,
508 handshake: handshake.clone(),
509 result_tx,
510 })
511 .is_ok()
512 .then_some(rx)
513 })
514 .collect();
515
516 let (tx, rx) = oneshot::channel();
517 tokio::spawn(async move {
518 while let Some(event) = results.next().await {
519 match event {
520 Err(_) | Ok(ValidationResult::Reject) => {
521 return tx.send(ValidationResult::Reject)
522 },
523 Ok(ValidationResult::Accept) => {},
524 }
525 }
526
527 return tx.send(ValidationResult::Accept);
528 });
529
530 Ok(ValidationCallResult::WaitForValidation(rx))
531 }
532
533 pub fn report_substream_opened(
536 &mut self,
537 peer: PeerId,
538 direction: Direction,
539 handshake: Vec<u8>,
540 negotiated_fallback: Option<ProtocolName>,
541 sink: NotificationsSink,
542 ) -> Result<(), ()> {
543 metrics::register_substream_opened(&self.metrics, &self.protocol);
544
545 let mut subscribers = self.subscribers.lock();
546 log::trace!(target: LOG_TARGET, "{}: substream opened for {peer:?}", self.protocol);
547
548 subscribers.retain(|subscriber| {
549 subscriber
550 .unbounded_send(InnerNotificationEvent::NotificationStreamOpened {
551 peer,
552 direction,
553 handshake: handshake.clone(),
554 negotiated_fallback: negotiated_fallback.clone(),
555 sink: sink.clone(),
556 })
557 .is_ok()
558 });
559 self.num_peers += 1;
560
561 Ok(())
562 }
563
564 pub fn report_substream_closed(&mut self, peer: PeerId) -> Result<(), ()> {
566 metrics::register_substream_closed(&self.metrics, &self.protocol);
567
568 let mut subscribers = self.subscribers.lock();
569 log::trace!(target: LOG_TARGET, "{}: substream closed for {peer:?}", self.protocol);
570
571 subscribers.retain(|subscriber| {
572 subscriber
573 .unbounded_send(InnerNotificationEvent::NotificationStreamClosed { peer })
574 .is_ok()
575 });
576 self.num_peers -= 1;
577
578 Ok(())
579 }
580
581 pub fn report_notification_received(
583 &mut self,
584 peer: PeerId,
585 notification: Vec<u8>,
586 ) -> Result<(), ()> {
587 metrics::register_notification_received(&self.metrics, &self.protocol, notification.len());
588
589 let mut subscribers = self.subscribers.lock();
590 log::trace!(target: LOG_TARGET, "{}: notification received from {peer:?}", self.protocol);
591
592 subscribers.retain(|subscriber| {
593 subscriber
594 .unbounded_send(InnerNotificationEvent::NotificationReceived {
595 peer,
596 notification: notification.clone(),
597 })
598 .is_ok()
599 });
600
601 Ok(())
602 }
603
604 pub fn report_notification_sink_replaced(
606 &mut self,
607 peer: PeerId,
608 sink: NotificationsSink,
609 ) -> Result<(), ()> {
610 let mut subscribers = self.subscribers.lock();
611
612 log::trace!(
613 target: LOG_TARGET,
614 "{}: notification sink replaced for {peer:?}",
615 self.protocol
616 );
617
618 subscribers.retain(|subscriber| {
619 subscriber
620 .unbounded_send(InnerNotificationEvent::NotificationSinkReplaced {
621 peer,
622 sink: sink.clone(),
623 })
624 .is_ok()
625 });
626
627 Ok(())
628 }
629
630 pub fn num_peers(&self) -> usize {
632 self.num_peers
633 }
634}
635
636pub fn notification_service(
640 protocol: ProtocolName,
641) -> (ProtocolHandlePair, Box<dyn NotificationService>) {
642 let (cmd_tx, cmd_rx) = mpsc::channel(COMMAND_QUEUE_SIZE);
643
644 let (event_tx, event_rx) =
645 tracing_unbounded(metric_label_for_protocol(&protocol).leak(), 100_000);
646 let subscribers = Arc::new(Mutex::new(vec![event_tx]));
647
648 (
649 ProtocolHandlePair::new(protocol.clone(), subscribers.clone(), cmd_rx),
650 Box::new(NotificationHandle::new(protocol.clone(), cmd_tx, event_rx, subscribers)),
651 )
652}
653
654fn metric_label_for_protocol(protocol: &ProtocolName) -> String {
657 let protocol_name = protocol.to_string();
658 let keys = protocol_name.split("/").collect::<Vec<_>>();
659 keys.iter()
660 .rev()
661 .take(2) .fold("mpsc-notification-to-protocol".into(), |acc, val| format!("{}-{}", acc, val))
663}