Skip to main content

commonware_p2p/utils/
mod.rs

1//! Utility functions for exchanging messages with many peers.
2
3use crate::{PeerSetUpdate, Provider, TrackedPeers};
4use commonware_cryptography::PublicKey;
5use commonware_utils::{
6    channel::{
7        fallible::FallibleExt,
8        mpsc::{self, UnboundedReceiver, UnboundedSender},
9    },
10    ordered::Set,
11};
12
13pub mod codec;
14pub mod limited;
15#[cfg(any(test, feature = "mocks"))]
16pub mod mocks;
17pub mod mux;
18
19/// Primary and secondary peer memberships at one peer set index.
20///
21/// Import as `PeerSetsAtIndexBase` (or similar) and define a local
22/// `type PeerSetsAtIndex<P> = PeerSetsAtIndexBase<...>` with the primary/secondary types you use.
23pub(crate) struct PeerSetsAtIndex<Primary, Secondary> {
24    pub(crate) primary: Primary,
25    pub(crate) secondary: Secondary,
26}
27
28/// A [Provider] over a static set of peers.
29#[derive(Debug, Clone)]
30pub struct StaticProvider<P: PublicKey> {
31    id: u64,
32    peers: Set<P>,
33    senders: Vec<UnboundedSender<PeerSetUpdate<P>>>,
34}
35
36impl<P: PublicKey> StaticProvider<P> {
37    /// Create a new [StaticProvider] with the given ID and peers.
38    pub const fn new(id: u64, peers: Set<P>) -> Self {
39        Self {
40            id,
41            peers,
42            senders: vec![],
43        }
44    }
45}
46
47impl<P: PublicKey> Provider for StaticProvider<P> {
48    type PublicKey = P;
49
50    async fn peer_set(&mut self, id: u64) -> Option<TrackedPeers<P>> {
51        assert_eq!(id, self.id);
52        Some(TrackedPeers::primary(self.peers.clone()))
53    }
54
55    async fn subscribe(&mut self) -> UnboundedReceiver<PeerSetUpdate<P>> {
56        let (sender, receiver) = mpsc::unbounded_channel();
57        sender.send_lossy(PeerSetUpdate {
58            index: self.id,
59            latest: TrackedPeers::new(self.peers.clone(), Set::default()),
60            all: TrackedPeers::new(self.peers.clone(), Set::default()),
61        });
62        self.senders.push(sender); // prevent the receiver from closing
63        receiver
64    }
65}