commonware_p2p/authenticated/actors/tracker/
ingress.rs

1use crate::authenticated::{actors::peer, types};
2use commonware_cryptography::Verifier;
3use commonware_runtime::{Metrics, Spawner};
4use futures::{
5    channel::{mpsc, oneshot},
6    SinkExt,
7};
8use std::net::SocketAddr;
9
10pub enum Message<E: Spawner + Metrics, C: Verifier> {
11    // Used by oracle
12    Register {
13        index: u64,
14        peers: Vec<C::PublicKey>,
15    },
16
17    // Used by peer
18    Construct {
19        public_key: C::PublicKey,
20        peer: peer::Mailbox<C>,
21    },
22    BitVec {
23        bit_vec: types::BitVec,
24        peer: peer::Mailbox<C>,
25    },
26    Peers {
27        peers: Vec<types::PeerInfo<C>>,
28        peer: peer::Mailbox<C>,
29    },
30
31    // Used by dialer
32    Dialable {
33        #[allow(clippy::type_complexity)]
34        peers: oneshot::Sender<Vec<(C::PublicKey, SocketAddr, Reservation<E, C>)>>,
35    },
36
37    // Used by listener
38    Reserve {
39        peer: C::PublicKey,
40        reservation: oneshot::Sender<Option<Reservation<E, C>>>,
41    },
42
43    // Used by peer
44    Release {
45        peer: C::PublicKey,
46    },
47}
48
49#[derive(Clone)]
50pub struct Mailbox<E: Spawner + Metrics, C: Verifier> {
51    sender: mpsc::Sender<Message<E, C>>,
52}
53
54impl<E: Spawner + Metrics, C: Verifier> Mailbox<E, C> {
55    pub(super) fn new(sender: mpsc::Sender<Message<E, C>>) -> Self {
56        Self { sender }
57    }
58
59    pub async fn construct(&mut self, public_key: C::PublicKey, peer: peer::Mailbox<C>) {
60        self.sender
61            .send(Message::Construct { public_key, peer })
62            .await
63            .unwrap();
64    }
65
66    pub async fn bit_vec(&mut self, bit_vec: types::BitVec, peer: peer::Mailbox<C>) {
67        self.sender
68            .send(Message::BitVec { bit_vec, peer })
69            .await
70            .unwrap();
71    }
72
73    pub async fn peers(&mut self, peers: Vec<types::PeerInfo<C>>, peer: peer::Mailbox<C>) {
74        self.sender
75            .send(Message::Peers { peers, peer })
76            .await
77            .unwrap();
78    }
79
80    pub async fn dialable(&mut self) -> Vec<(C::PublicKey, SocketAddr, Reservation<E, C>)> {
81        let (response, receiver) = oneshot::channel();
82        self.sender
83            .send(Message::Dialable { peers: response })
84            .await
85            .unwrap();
86        receiver.await.unwrap()
87    }
88
89    pub async fn reserve(&mut self, peer: C::PublicKey) -> Option<Reservation<E, C>> {
90        let (tx, rx) = oneshot::channel();
91        self.sender
92            .send(Message::Reserve {
93                peer,
94                reservation: tx,
95            })
96            .await
97            .unwrap();
98        rx.await.unwrap()
99    }
100
101    pub fn try_release(&mut self, peer: C::PublicKey) -> bool {
102        let Err(e) = self.sender.try_send(Message::Release { peer }) else {
103            return true;
104        };
105        if e.is_full() {
106            return false;
107        }
108
109        // If any other error occurs, we should panic!
110        panic!(
111            "unexpected error while trying to release reservation: {:?}",
112            e
113        );
114    }
115
116    pub async fn release(&mut self, peer: C::PublicKey) {
117        self.sender.send(Message::Release { peer }).await.unwrap();
118    }
119}
120
121/// Mechanism to register authorized peers.
122///
123/// Peers that are not explicitly authorized
124/// will be blocked by commonware-p2p.
125#[derive(Clone)]
126pub struct Oracle<E: Spawner + Metrics, C: Verifier> {
127    sender: mpsc::Sender<Message<E, C>>,
128}
129
130impl<E: Spawner + Metrics, C: Verifier> Oracle<E, C> {
131    pub(super) fn new(sender: mpsc::Sender<Message<E, C>>) -> Self {
132        Self { sender }
133    }
134
135    /// Register a set of authorized peers at a given index.
136    ///
137    /// These peer sets are used to construct a bit vector (sorted by public key)
138    /// to share knowledge about dialable IPs. If a peer does not yet have an index
139    /// associated with a bit vector, the discovery message will be dropped.
140    ///
141    /// # Parameters
142    ///
143    /// * `index` - Index of the set of authorized peers (like a blockchain height).
144    ///   Should be monotonically increasing.
145    /// * `peers` - Vector of authorized peers at an `index` (does not need to be sorted).
146    pub async fn register(&mut self, index: u64, peers: Vec<C::PublicKey>) {
147        let _ = self.sender.send(Message::Register { index, peers }).await;
148    }
149}
150
151pub struct Reservation<E: Spawner + Metrics, C: Verifier> {
152    context: E,
153    closer: Option<(C::PublicKey, Mailbox<E, C>)>,
154}
155
156impl<E: Spawner + Metrics, C: Verifier> Reservation<E, C> {
157    pub fn new(context: E, peer: C::PublicKey, mailbox: Mailbox<E, C>) -> Self {
158        Self {
159            context,
160            closer: Some((peer, mailbox)),
161        }
162    }
163}
164
165impl<E: Spawner + Metrics, C: Verifier> Drop for Reservation<E, C> {
166    fn drop(&mut self) {
167        let (peer, mut mailbox) = self.closer.take().unwrap();
168
169        // If the mailbox is not full, we can release the reservation immediately without spawning a task.
170        if mailbox.try_release(peer.clone()) {
171            return;
172        }
173
174        // If the mailbox is full, we need to spawn a task to handle the release. If we used `block_on` here,
175        // it could cause a deadlock.
176        self.context.spawn_ref()(async move {
177            mailbox.release(peer).await;
178        });
179    }
180}