commonware_p2p/authenticated/actors/tracker/
ingress.rs

1use super::Reservation;
2use crate::authenticated::{actors::peer, types};
3use commonware_cryptography::PublicKey;
4use commonware_runtime::{Metrics, Spawner};
5use futures::{
6    channel::{mpsc, oneshot},
7    SinkExt,
8};
9
10/// Messages that can be sent to the tracker actor.
11pub enum Message<E: Spawner + Metrics, C: PublicKey> {
12    // ---------- Used by oracle ----------
13    /// Register a peer set at a given index.
14    ///
15    /// The vector of peers must be sorted in ascending order by public key.
16    Register { index: u64, peers: Vec<C> },
17
18    // ---------- Used by blocker ----------
19    /// Block a peer, disconnecting them if currently connected and preventing future connections
20    /// for as long as the peer remains in at least one active peer set.
21    Block { public_key: C },
22
23    // ---------- Used by peer ----------
24    /// Notify the tracker that a peer has been successfully connected, and that a
25    /// [`types::Payload::Peers`] message (containing solely the local node's information) should be
26    /// sent to the peer.
27    Connect {
28        /// The public key of the peer.
29        public_key: C,
30
31        /// `true` if we are the dialer, `false` if we are the listener.
32        dialer: bool,
33
34        /// The mailbox of the peer actor.
35        peer: peer::Mailbox<C>,
36    },
37
38    /// Ready to send a [`types::Payload::BitVec`] message to a peer. This message doubles as a
39    /// keep-alive signal to the peer.
40    ///
41    /// This request is formed on a recurring interval.
42    Construct {
43        /// The public key of the peer.
44        public_key: C,
45
46        /// The mailbox of the peer actor.
47        peer: peer::Mailbox<C>,
48    },
49
50    /// Notify the tracker that a [`types::Payload::BitVec`] message has been received from a peer.
51    ///
52    /// The tracker will construct a [`types::Payload::Peers`] message in response.
53    BitVec {
54        /// The bit vector received.
55        bit_vec: types::BitVec,
56
57        /// The mailbox of the peer actor.
58        peer: peer::Mailbox<C>,
59    },
60
61    /// Notify the tracker that a [`types::Payload::Peers`] message has been received from a peer.
62    Peers {
63        /// The list of peers received.
64        peers: Vec<types::PeerInfo<C>>,
65
66        /// The mailbox of the peer actor.
67        peer: peer::Mailbox<C>,
68    },
69
70    // ---------- Used by dialer ----------
71    /// Request a list of dialable peers.
72    Dialable {
73        /// One-shot channel to send the list of dialable peers.
74        responder: oneshot::Sender<Vec<C>>,
75    },
76
77    /// Request a reservation for a particular peer to dial.
78    ///
79    /// The tracker will respond with an [`Option<Reservation<E, C>>`], which will be `None` if the
80    /// reservation cannot be granted (e.g., if the peer is already connected, blocked or already
81    /// has an active reservation).
82    Dial {
83        /// The public key of the peer to reserve.
84        public_key: C,
85
86        /// sender to respond with the reservation.
87        reservation: oneshot::Sender<Option<Reservation<E, C>>>,
88    },
89
90    // ---------- Used by listener ----------
91    /// Request a reservation for a particular peer.
92    ///
93    /// The tracker will respond with an [`Option<Reservation<E, C>>`], which will be `None` if  the
94    /// reservation cannot be granted (e.g., if the peer is already connected, blocked or already
95    /// has an active reservation).
96    Listen {
97        /// The public key of the peer to reserve.
98        public_key: C,
99
100        /// The sender to respond with the reservation.
101        reservation: oneshot::Sender<Option<Reservation<E, C>>>,
102    },
103}
104
105/// Mailbox for sending messages to the tracker actor.
106#[derive(Clone)]
107pub struct Mailbox<E: Spawner + Metrics, C: PublicKey> {
108    sender: mpsc::Sender<Message<E, C>>,
109}
110
111impl<E: Spawner + Metrics, C: PublicKey> Mailbox<E, C> {
112    /// Create a new mailbox for the tracker.
113    pub(super) fn new(sender: mpsc::Sender<Message<E, C>>) -> Self {
114        Self { sender }
115    }
116
117    /// Send a `Connect` message to the tracker.
118    pub async fn connect(&mut self, public_key: C, dialer: bool, peer: peer::Mailbox<C>) {
119        self.sender
120            .send(Message::Connect {
121                public_key,
122                dialer,
123                peer,
124            })
125            .await
126            .unwrap();
127    }
128
129    /// Send a `Construct` message to the tracker.
130    pub async fn construct(&mut self, public_key: C, peer: peer::Mailbox<C>) {
131        self.sender
132            .send(Message::Construct { public_key, peer })
133            .await
134            .unwrap();
135    }
136
137    /// Send a `BitVec` message to the tracker.
138    pub async fn bit_vec(&mut self, bit_vec: types::BitVec, peer: peer::Mailbox<C>) {
139        self.sender
140            .send(Message::BitVec { bit_vec, peer })
141            .await
142            .unwrap();
143    }
144
145    /// Send a `Peers` message to the tracker.
146    pub async fn peers(&mut self, peers: Vec<types::PeerInfo<C>>, peer: peer::Mailbox<C>) {
147        self.sender
148            .send(Message::Peers { peers, peer })
149            .await
150            .unwrap();
151    }
152
153    /// Send a `Block` message to the tracker.
154    pub async fn dialable(&mut self) -> Vec<C> {
155        let (sender, receiver) = oneshot::channel();
156        self.sender
157            .send(Message::Dialable { responder: sender })
158            .await
159            .unwrap();
160        receiver.await.unwrap()
161    }
162
163    /// Send a `Dial` message to the tracker.
164    pub async fn dial(&mut self, public_key: C) -> Option<Reservation<E, C>> {
165        let (tx, rx) = oneshot::channel();
166        self.sender
167            .send(Message::Dial {
168                public_key,
169                reservation: tx,
170            })
171            .await
172            .unwrap();
173        rx.await.unwrap()
174    }
175
176    /// Send a `Listen` message to the tracker.
177    pub async fn listen(&mut self, public_key: C) -> Option<Reservation<E, C>> {
178        let (tx, rx) = oneshot::channel();
179        self.sender
180            .send(Message::Listen {
181                public_key,
182                reservation: tx,
183            })
184            .await
185            .unwrap();
186        rx.await.unwrap()
187    }
188}
189
190/// Mechanism to register authorized peers.
191///
192/// Peers that are not explicitly authorized
193/// will be blocked by commonware-p2p.
194#[derive(Clone)]
195pub struct Oracle<E: Spawner + Metrics, C: PublicKey> {
196    sender: mpsc::Sender<Message<E, C>>,
197}
198
199impl<E: Spawner + Metrics, C: PublicKey> Oracle<E, C> {
200    pub(super) fn new(sender: mpsc::Sender<Message<E, C>>) -> Self {
201        Self { sender }
202    }
203
204    /// Register a set of authorized peers at a given index.
205    ///
206    /// These peer sets are used to construct a bit vector (sorted by public key)
207    /// to share knowledge about dialable IPs. If a peer does not yet have an index
208    /// associated with a bit vector, the discovery message will be dropped.
209    ///
210    /// # Parameters
211    ///
212    /// * `index` - Index of the set of authorized peers (like a blockchain height).
213    ///   Should be monotonically increasing.
214    /// * `peers` - Vector of authorized peers at an `index` (does not need to be sorted).
215    pub async fn register(&mut self, index: u64, peers: Vec<C>) {
216        let _ = self.sender.send(Message::Register { index, peers }).await;
217    }
218}
219
220impl<E: Spawner + Metrics, C: PublicKey> crate::Blocker for Oracle<E, C> {
221    type PublicKey = C;
222
223    async fn block(&mut self, public_key: Self::PublicKey) {
224        let _ = self.sender.send(Message::Block { public_key }).await;
225    }
226}