Skip to main content

commonware_p2p/utils/
mod.rs

1//! Utility functions for exchanging messages with many peers.
2
3use crate::Provider;
4use commonware_cryptography::PublicKey;
5use commonware_utils::{
6    channel::{
7        fallible::FallibleExt,
8        mpsc::{self, UnboundedReceiver, UnboundedSender},
9    },
10    ordered::Set,
11};
12
13pub mod codec;
14pub mod limited;
15pub mod mux;
16
17/// A [Provider] over a static set of peers.
18#[derive(Debug, Clone)]
19pub struct StaticProvider<P: PublicKey> {
20    id: u64,
21    peers: Set<P>,
22    #[allow(clippy::type_complexity)]
23    senders: Vec<UnboundedSender<(u64, Set<P>, Set<P>)>>,
24}
25
26impl<P: PublicKey> StaticProvider<P> {
27    /// Create a new [StaticProvider] with the given ID and peers.
28    pub const fn new(id: u64, peers: Set<P>) -> Self {
29        Self {
30            id,
31            peers,
32            senders: vec![],
33        }
34    }
35}
36
37impl<P: PublicKey> Provider for StaticProvider<P> {
38    type PublicKey = P;
39
40    async fn peer_set(&mut self, id: u64) -> Option<Set<P>> {
41        assert_eq!(id, self.id);
42        Some(self.peers.clone())
43    }
44
45    async fn subscribe(&mut self) -> UnboundedReceiver<(u64, Set<P>, Set<P>)> {
46        let (sender, receiver) = mpsc::unbounded_channel();
47        sender.send_lossy((self.id, self.peers.clone(), self.peers.clone()));
48        self.senders.push(sender); // prevent the receiver from closing
49        receiver
50    }
51}