commonware_p2p/utils/
mod.rs

1//! Utility functions for exchanging messages with many peers.
2
3use crate::Manager;
4use commonware_cryptography::PublicKey;
5use commonware_utils::ordered::Set;
6use futures::channel::mpsc::{unbounded, UnboundedReceiver, UnboundedSender};
7
8pub mod codec;
9pub mod limited;
10pub mod mux;
11
12/// A [Manager] over a static set of peers.
13#[derive(Debug, Clone)]
14pub struct StaticManager<P: PublicKey> {
15    id: u64,
16    peers: Set<P>,
17    #[allow(clippy::type_complexity)]
18    senders: Vec<UnboundedSender<(u64, Set<P>, Set<P>)>>,
19}
20
21impl<P: PublicKey> StaticManager<P> {
22    /// Create a new [StaticManager] with the given ID and peers.
23    pub const fn new(id: u64, peers: Set<P>) -> Self {
24        Self {
25            id,
26            peers,
27            senders: vec![],
28        }
29    }
30}
31
32impl<P: PublicKey> Manager for StaticManager<P> {
33    type PublicKey = P;
34    type Peers = Set<P>;
35
36    async fn update(&mut self, _: u64, _: Set<P>) {
37        panic!("updates are not supported");
38    }
39
40    async fn peer_set(&mut self, id: u64) -> Option<Set<P>> {
41        assert_eq!(id, self.id);
42        Some(self.peers.clone())
43    }
44
45    async fn subscribe(&mut self) -> UnboundedReceiver<(u64, Set<P>, Set<P>)> {
46        let (sender, receiver) = unbounded();
47        let _ = sender.unbounded_send((self.id, self.peers.clone(), self.peers.clone()));
48        self.senders.push(sender); // prevent the receiver from closing
49        receiver
50    }
51}