1use crate::{
10 error,
11 protocol::notifications::handler::NotificationsSink,
12 service::{
13 metrics::NotificationMetrics,
14 traits::{
15 Direction, MessageSink, NotificationEvent, NotificationService, ValidationResult,
16 },
17 },
18 types::ProtocolName,
19};
20
21use futures::{
22 stream::{FuturesUnordered, Stream},
23 StreamExt,
24};
25use libp2p::PeerId;
26use parking_lot::Mutex;
27use tokio::sync::{mpsc, oneshot};
28use tokio_stream::wrappers::ReceiverStream;
29
30use soil_client::utils::mpsc::{
31 tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender,
32};
33
34use std::{collections::HashMap, fmt::Debug, sync::Arc};
35
36pub(crate) mod metrics;
37
38#[cfg(test)]
39mod tests;
40
41const LOG_TARGET: &str = "sub-libp2p::notification::service";
43
44const COMMAND_QUEUE_SIZE: usize = 64;
46
47type Subscribers = Arc<Mutex<Vec<TracingUnboundedSender<InnerNotificationEvent>>>>;
49
50type NotificationSink = Arc<Mutex<(NotificationsSink, ProtocolName)>>;
55
56#[async_trait::async_trait]
57impl MessageSink for NotificationSink {
58 fn send_sync_notification(&self, notification: Vec<u8>) {
60 let sink = self.lock();
61
62 metrics::register_notification_sent(sink.0.metrics(), &sink.1, notification.len());
63 sink.0.send_sync_notification(notification);
64 }
65
66 async fn send_async_notification(&self, notification: Vec<u8>) -> Result<(), error::Error> {
71 let notification_len = notification.len();
75 let sink = self.lock().clone();
76 let permit = sink
77 .0
78 .reserve_notification()
79 .await
80 .map_err(|_| error::Error::ConnectionClosed)?;
81
82 permit.send(notification).map_err(|_| error::Error::ChannelClosed).inspect(|_| {
83 metrics::register_notification_sent(sink.0.metrics(), &sink.1, notification_len);
84 })
85 }
86}
87
88#[derive(Debug)]
91enum InnerNotificationEvent {
92 ValidateInboundSubstream {
94 peer: PeerId,
96
97 handshake: Vec<u8>,
99
100 result_tx: oneshot::Sender<ValidationResult>,
102 },
103
104 NotificationStreamOpened {
106 peer: PeerId,
108
109 direction: Direction,
111
112 handshake: Vec<u8>,
114
115 negotiated_fallback: Option<ProtocolName>,
117
118 sink: NotificationsSink,
120 },
121
122 NotificationStreamClosed {
124 peer: PeerId,
126 },
127
128 NotificationReceived {
130 peer: PeerId,
132
133 notification: Vec<u8>,
135 },
136
137 NotificationSinkReplaced {
139 peer: PeerId,
141
142 sink: NotificationsSink,
144 },
145}
146
147#[derive(Debug)]
151pub enum NotificationCommand {
152 #[allow(unused)]
154 OpenSubstream(PeerId),
155
156 #[allow(unused)]
158 CloseSubstream(PeerId),
159
160 SetHandshake(Vec<u8>),
162}
163
164#[derive(Debug, Clone)]
173struct PeerContext {
174 sink: NotificationsSink,
176
177 shared_sink: NotificationSink,
179}
180
181#[derive(Debug)]
183pub struct NotificationHandle {
184 protocol: ProtocolName,
186
187 tx: mpsc::Sender<NotificationCommand>,
189
190 rx: TracingUnboundedReceiver<InnerNotificationEvent>,
192
193 subscribers: Subscribers,
195
196 peers: HashMap<PeerId, PeerContext>,
198}
199
200impl NotificationHandle {
201 fn new(
203 protocol: ProtocolName,
204 tx: mpsc::Sender<NotificationCommand>,
205 rx: TracingUnboundedReceiver<InnerNotificationEvent>,
206 subscribers: Arc<Mutex<Vec<TracingUnboundedSender<InnerNotificationEvent>>>>,
207 ) -> Self {
208 Self { protocol, tx, rx, subscribers, peers: HashMap::new() }
209 }
210}
211
212#[async_trait::async_trait]
213impl NotificationService for NotificationHandle {
214 async fn open_substream(&mut self, _peer: crate::types::PeerId) -> Result<(), ()> {
216 todo!("support for opening substreams not implemented yet");
217 }
218
219 async fn close_substream(&mut self, _peer: crate::types::PeerId) -> Result<(), ()> {
221 todo!("support for closing substreams not implemented yet, call `NetworkService::disconnect_peer()` instead");
222 }
223
224 fn send_sync_notification(&mut self, peer: &crate::types::PeerId, notification: Vec<u8>) {
226 if let Some(info) = self.peers.get(&((*peer).into())) {
227 metrics::register_notification_sent(
228 info.sink.metrics(),
229 &self.protocol,
230 notification.len(),
231 );
232
233 let _ = info.sink.send_sync_notification(notification);
234 }
235 }
236
237 async fn send_async_notification(
239 &mut self,
240 peer: &crate::types::PeerId,
241 notification: Vec<u8>,
242 ) -> Result<(), error::Error> {
243 let notification_len = notification.len();
244 let sink = &self
245 .peers
246 .get(&peer.into())
247 .ok_or_else(|| error::Error::PeerDoesntExist((*peer).into()))?
248 .sink;
249
250 sink.reserve_notification()
251 .await
252 .map_err(|_| error::Error::ConnectionClosed)?
253 .send(notification)
254 .map_err(|_| error::Error::ChannelClosed)
255 .inspect(|_| {
256 metrics::register_notification_sent(
257 sink.metrics(),
258 &self.protocol,
259 notification_len,
260 );
261 })
262 }
263
264 async fn set_handshake(&mut self, handshake: Vec<u8>) -> Result<(), ()> {
266 log::trace!(target: LOG_TARGET, "{}: set handshake to {handshake:?}", self.protocol);
267
268 self.tx.send(NotificationCommand::SetHandshake(handshake)).await.map_err(|_| ())
269 }
270
271 fn try_set_handshake(&mut self, handshake: Vec<u8>) -> Result<(), ()> {
277 self.tx.try_send(NotificationCommand::SetHandshake(handshake)).map_err(|_| ())
278 }
279
280 async fn next_event(&mut self) -> Option<NotificationEvent> {
282 loop {
283 match self.rx.next().await? {
284 InnerNotificationEvent::ValidateInboundSubstream { peer, handshake, result_tx } => {
285 return Some(NotificationEvent::ValidateInboundSubstream {
286 peer: peer.into(),
287 handshake,
288 result_tx,
289 })
290 },
291 InnerNotificationEvent::NotificationStreamOpened {
292 peer,
293 handshake,
294 negotiated_fallback,
295 direction,
296 sink,
297 } => {
298 self.peers.insert(
299 peer,
300 PeerContext {
301 sink: sink.clone(),
302 shared_sink: Arc::new(Mutex::new((sink, self.protocol.clone()))),
303 },
304 );
305 return Some(NotificationEvent::NotificationStreamOpened {
306 peer: peer.into(),
307 handshake,
308 direction,
309 negotiated_fallback,
310 });
311 },
312 InnerNotificationEvent::NotificationStreamClosed { peer } => {
313 self.peers.remove(&peer);
314 return Some(NotificationEvent::NotificationStreamClosed { peer: peer.into() });
315 },
316 InnerNotificationEvent::NotificationReceived { peer, notification } => {
317 return Some(NotificationEvent::NotificationReceived {
318 peer: peer.into(),
319 notification,
320 })
321 },
322 InnerNotificationEvent::NotificationSinkReplaced { peer, sink } => {
323 match self.peers.get_mut(&peer) {
324 None => log::error!(
325 "{}: notification sink replaced for {peer} but peer does not exist",
326 self.protocol
327 ),
328 Some(context) => {
329 context.sink = sink.clone();
330 *context.shared_sink.lock() = (sink.clone(), self.protocol.clone());
331 },
332 }
333 },
334 }
335 }
336 }
337
338 fn clone(&mut self) -> Result<Box<dyn NotificationService>, ()> {
340 let mut subscribers = self.subscribers.lock();
341
342 let (event_tx, event_rx) = tracing_unbounded(self.rx.name(), 100_000);
343 subscribers.push(event_tx);
344
345 Ok(Box::new(NotificationHandle {
346 protocol: self.protocol.clone(),
347 tx: self.tx.clone(),
348 rx: event_rx,
349 peers: self.peers.clone(),
350 subscribers: self.subscribers.clone(),
351 }))
352 }
353
354 fn protocol(&self) -> &ProtocolName {
356 &self.protocol
357 }
358
359 fn message_sink(&self, peer: &crate::types::PeerId) -> Option<Box<dyn MessageSink>> {
361 match self.peers.get(&peer.into()) {
362 Some(context) => Some(Box::new(context.shared_sink.clone())),
363 None => None,
364 }
365 }
366}
367
368#[derive(Debug)]
370pub struct ProtocolHandlePair {
371 protocol: ProtocolName,
373
374 subscribers: Subscribers,
376
377 rx: mpsc::Receiver<NotificationCommand>,
379}
380
381impl ProtocolHandlePair {
382 fn new(
384 protocol: ProtocolName,
385 subscribers: Subscribers,
386 rx: mpsc::Receiver<NotificationCommand>,
387 ) -> Self {
388 Self { protocol, subscribers, rx }
389 }
390
391 pub(crate) fn split(
394 self,
395 ) -> (ProtocolHandle, Box<dyn Stream<Item = NotificationCommand> + Send + Unpin>) {
396 (
397 ProtocolHandle::new(self.protocol, self.subscribers),
398 Box::new(ReceiverStream::new(self.rx)),
399 )
400 }
401}
402
403#[derive(Debug, Clone)]
406pub(crate) struct ProtocolHandle {
407 protocol: ProtocolName,
409
410 subscribers: Subscribers,
412
413 num_peers: usize,
415
416 delegate_to_peerset: bool,
418
419 metrics: Option<NotificationMetrics>,
421}
422
423pub(crate) enum ValidationCallResult {
424 WaitForValidation(oneshot::Receiver<ValidationResult>),
425 Delegated,
426}
427
428impl ProtocolHandle {
429 fn new(protocol: ProtocolName, subscribers: Subscribers) -> Self {
431 Self { protocol, subscribers, num_peers: 0usize, metrics: None, delegate_to_peerset: false }
432 }
433
434 pub fn set_metrics(&mut self, metrics: NotificationMetrics) {
436 self.metrics = Some(metrics);
437 }
438
439 pub fn delegate_to_peerset(&mut self, delegate: bool) {
445 self.delegate_to_peerset = delegate;
446 }
447
448 pub fn report_incoming_substream(
454 &self,
455 peer: PeerId,
456 handshake: Vec<u8>,
457 ) -> Result<ValidationCallResult, ()> {
458 let subscribers = self.subscribers.lock();
459
460 log::trace!(
461 target: LOG_TARGET,
462 "{}: report incoming substream for {peer}, handshake {handshake:?}",
463 self.protocol
464 );
465
466 if self.delegate_to_peerset {
467 return Ok(ValidationCallResult::Delegated);
468 }
469
470 if subscribers.len() == 1 {
473 let (result_tx, rx) = oneshot::channel();
474 return subscribers[0]
475 .unbounded_send(InnerNotificationEvent::ValidateInboundSubstream {
476 peer,
477 handshake,
478 result_tx,
479 })
480 .map(|_| ValidationCallResult::WaitForValidation(rx))
481 .map_err(|_| ());
482 }
483
484 let mut results: FuturesUnordered<_> = subscribers
487 .iter()
488 .filter_map(|subscriber| {
489 let (result_tx, rx) = oneshot::channel();
490
491 subscriber
492 .unbounded_send(InnerNotificationEvent::ValidateInboundSubstream {
493 peer,
494 handshake: handshake.clone(),
495 result_tx,
496 })
497 .is_ok()
498 .then_some(rx)
499 })
500 .collect();
501
502 let (tx, rx) = oneshot::channel();
503 tokio::spawn(async move {
504 while let Some(event) = results.next().await {
505 match event {
506 Err(_) | Ok(ValidationResult::Reject) => {
507 return tx.send(ValidationResult::Reject)
508 },
509 Ok(ValidationResult::Accept) => {},
510 }
511 }
512
513 return tx.send(ValidationResult::Accept);
514 });
515
516 Ok(ValidationCallResult::WaitForValidation(rx))
517 }
518
519 pub fn report_substream_opened(
522 &mut self,
523 peer: PeerId,
524 direction: Direction,
525 handshake: Vec<u8>,
526 negotiated_fallback: Option<ProtocolName>,
527 sink: NotificationsSink,
528 ) -> Result<(), ()> {
529 metrics::register_substream_opened(&self.metrics, &self.protocol);
530
531 let mut subscribers = self.subscribers.lock();
532 log::trace!(target: LOG_TARGET, "{}: substream opened for {peer:?}", self.protocol);
533
534 subscribers.retain(|subscriber| {
535 subscriber
536 .unbounded_send(InnerNotificationEvent::NotificationStreamOpened {
537 peer,
538 direction,
539 handshake: handshake.clone(),
540 negotiated_fallback: negotiated_fallback.clone(),
541 sink: sink.clone(),
542 })
543 .is_ok()
544 });
545 self.num_peers += 1;
546
547 Ok(())
548 }
549
550 pub fn report_substream_closed(&mut self, peer: PeerId) -> Result<(), ()> {
552 metrics::register_substream_closed(&self.metrics, &self.protocol);
553
554 let mut subscribers = self.subscribers.lock();
555 log::trace!(target: LOG_TARGET, "{}: substream closed for {peer:?}", self.protocol);
556
557 subscribers.retain(|subscriber| {
558 subscriber
559 .unbounded_send(InnerNotificationEvent::NotificationStreamClosed { peer })
560 .is_ok()
561 });
562 self.num_peers -= 1;
563
564 Ok(())
565 }
566
567 pub fn report_notification_received(
569 &mut self,
570 peer: PeerId,
571 notification: Vec<u8>,
572 ) -> Result<(), ()> {
573 metrics::register_notification_received(&self.metrics, &self.protocol, notification.len());
574
575 let mut subscribers = self.subscribers.lock();
576 log::trace!(target: LOG_TARGET, "{}: notification received from {peer:?}", self.protocol);
577
578 subscribers.retain(|subscriber| {
579 subscriber
580 .unbounded_send(InnerNotificationEvent::NotificationReceived {
581 peer,
582 notification: notification.clone(),
583 })
584 .is_ok()
585 });
586
587 Ok(())
588 }
589
590 pub fn report_notification_sink_replaced(
592 &mut self,
593 peer: PeerId,
594 sink: NotificationsSink,
595 ) -> Result<(), ()> {
596 let mut subscribers = self.subscribers.lock();
597
598 log::trace!(
599 target: LOG_TARGET,
600 "{}: notification sink replaced for {peer:?}",
601 self.protocol
602 );
603
604 subscribers.retain(|subscriber| {
605 subscriber
606 .unbounded_send(InnerNotificationEvent::NotificationSinkReplaced {
607 peer,
608 sink: sink.clone(),
609 })
610 .is_ok()
611 });
612
613 Ok(())
614 }
615
616 pub fn num_peers(&self) -> usize {
618 self.num_peers
619 }
620}
621
622pub fn notification_service(
626 protocol: ProtocolName,
627) -> (ProtocolHandlePair, Box<dyn NotificationService>) {
628 let (cmd_tx, cmd_rx) = mpsc::channel(COMMAND_QUEUE_SIZE);
629
630 let (event_tx, event_rx) =
631 tracing_unbounded(metric_label_for_protocol(&protocol).leak(), 100_000);
632 let subscribers = Arc::new(Mutex::new(vec![event_tx]));
633
634 (
635 ProtocolHandlePair::new(protocol.clone(), subscribers.clone(), cmd_rx),
636 Box::new(NotificationHandle::new(protocol.clone(), cmd_tx, event_rx, subscribers)),
637 )
638}
639
640fn metric_label_for_protocol(protocol: &ProtocolName) -> String {
643 let protocol_name = protocol.to_string();
644 let keys = protocol_name.split("/").collect::<Vec<_>>();
645 keys.iter()
646 .rev()
647 .take(2) .fold("mpsc-notification-to-protocol".into(), |acc, val| format!("{}-{}", acc, val))
649}