commonware_p2p/authenticated/lookup/actors/tracker/
ingress.rs

1use super::Reservation;
2use crate::authenticated::{
3    lookup::actors::{peer, tracker::Metadata},
4    Mailbox,
5};
6use commonware_cryptography::PublicKey;
7use commonware_runtime::{Metrics, Spawner};
8use futures::{
9    channel::{mpsc, oneshot},
10    SinkExt,
11};
12use std::net::SocketAddr;
13
14/// Messages that can be sent to the tracker actor.
15pub enum Message<E: Spawner + Metrics, C: PublicKey> {
16    // ---------- Used by oracle ----------
17    /// Register a peer set at a given index.
18    Register {
19        index: u64,
20        peers: Vec<(C, SocketAddr)>,
21    },
22
23    // ---------- Used by blocker ----------
24    /// Block a peer, disconnecting them if currently connected and preventing future connections
25    /// for as long as the peer remains in at least one active peer set.
26    Block { public_key: C },
27
28    // ---------- Used by peer ----------
29    /// Notify the tracker that a peer has been successfully connected.
30    Connect {
31        /// The public key of the peer.
32        public_key: C,
33
34        /// The mailbox of the peer actor.
35        peer: Mailbox<peer::Message>,
36    },
37
38    // ---------- Used by dialer ----------
39    /// Request a list of dialable peers.
40    Dialable {
41        /// One-shot channel to send the list of dialable peers.
42        responder: oneshot::Sender<Vec<C>>,
43    },
44
45    /// Request a reservation for a particular peer to dial.
46    ///
47    /// The tracker will respond with an [Option<Reservation<E, C>>], which will be `None` if the
48    /// reservation cannot be granted (e.g., if the peer is already connected, blocked or already
49    /// has an active reservation).
50    Dial {
51        /// The public key of the peer to reserve.
52        public_key: C,
53
54        /// sender to respond with the reservation.
55        reservation: oneshot::Sender<Option<Reservation<E, C>>>,
56    },
57
58    // ---------- Used by listener ----------
59    /// Check if we should listen to a peer.
60    Listenable {
61        /// The public key of the peer to check.
62        public_key: C,
63
64        /// The sender to respond with the listenable status.
65        responder: oneshot::Sender<bool>,
66    },
67
68    /// Request a reservation for a particular peer.
69    ///
70    /// The tracker will respond with an [Option<Reservation<E, C>>], which will be `None` if  the
71    /// reservation cannot be granted (e.g., if the peer is already connected, blocked or already
72    /// has an active reservation).
73    Listen {
74        /// The public key of the peer to reserve.
75        public_key: C,
76
77        /// The sender to respond with the reservation.
78        reservation: oneshot::Sender<Option<Reservation<E, C>>>,
79    },
80
81    // ---------- Used by reservation ----------
82    /// Release a reservation.
83    Release {
84        /// The metadata of the reservation to release.
85        metadata: Metadata<C>,
86    },
87}
88
89impl<E: Spawner + Metrics, C: PublicKey> Mailbox<Message<E, C>> {
90    /// Send a `Connect` message to the tracker.
91    pub async fn connect(&mut self, public_key: C, peer: Mailbox<peer::Message>) {
92        self.send(Message::Connect { public_key, peer })
93            .await
94            .unwrap();
95    }
96
97    /// Send a `Block` message to the tracker.
98    pub async fn dialable(&mut self) -> Vec<C> {
99        let (sender, receiver) = oneshot::channel();
100        self.send(Message::Dialable { responder: sender })
101            .await
102            .unwrap();
103        receiver.await.unwrap()
104    }
105
106    /// Send a `Dial` message to the tracker.
107    pub async fn dial(&mut self, public_key: C) -> Option<Reservation<E, C>> {
108        let (tx, rx) = oneshot::channel();
109        self.send(Message::Dial {
110            public_key,
111            reservation: tx,
112        })
113        .await
114        .unwrap();
115        rx.await.unwrap()
116    }
117
118    /// Send a `Listenable` message to the tracker.
119    pub async fn listenable(&mut self, public_key: C) -> bool {
120        let (tx, rx) = oneshot::channel();
121        self.send(Message::Listenable {
122            public_key,
123            responder: tx,
124        })
125        .await
126        .unwrap();
127        rx.await.unwrap()
128    }
129
130    /// Send a `Listen` message to the tracker.
131    pub async fn listen(&mut self, public_key: C) -> Option<Reservation<E, C>> {
132        let (tx, rx) = oneshot::channel();
133        self.send(Message::Listen {
134            public_key,
135            reservation: tx,
136        })
137        .await
138        .unwrap();
139        rx.await.unwrap()
140    }
141}
142
143/// Allows releasing reservations
144#[derive(Clone)]
145pub struct Releaser<E: Spawner + Metrics, C: PublicKey> {
146    sender: mpsc::Sender<Message<E, C>>,
147}
148
149impl<E: Spawner + Metrics, C: PublicKey> Releaser<E, C> {
150    /// Create a new releaser.
151    pub(super) fn new(sender: mpsc::Sender<Message<E, C>>) -> Self {
152        Self { sender }
153    }
154
155    /// Try to release a reservation.
156    ///
157    /// Returns `true` if the reservation was released, `false` if the mailbox is full.
158    pub fn try_release(&mut self, metadata: Metadata<C>) -> bool {
159        let Err(e) = self.sender.try_send(Message::Release { metadata }) else {
160            return true;
161        };
162        assert!(
163            e.is_full(),
164            "Unexpected error trying to release reservation {e:?}"
165        );
166        false
167    }
168
169    /// Release a reservation.
170    ///
171    /// This method will block if the mailbox is full.
172    pub async fn release(&mut self, metadata: Metadata<C>) {
173        self.sender
174            .send(Message::Release { metadata })
175            .await
176            .unwrap();
177    }
178}
179
180/// Mechanism to register authorized peers.
181///
182/// Peers that are not explicitly authorized
183/// will be blocked by commonware-p2p.
184#[derive(Clone)]
185pub struct Oracle<E: Spawner + Metrics, C: PublicKey> {
186    sender: mpsc::Sender<Message<E, C>>,
187}
188
189impl<E: Spawner + Metrics, C: PublicKey> Oracle<E, C> {
190    pub(super) fn new(sender: mpsc::Sender<Message<E, C>>) -> Self {
191        Self { sender }
192    }
193
194    /// Register a set of authorized peers at a given index.
195    ///
196    /// # Parameters
197    ///
198    /// * `index` - Index of the set of authorized peers (like a blockchain height).
199    ///   Should be monotonically increasing.
200    /// * `peers` - Vector of authorized peers at an `index` (does not need to be sorted).
201    ///   Each element is a tuple containing the public key and the socket address of the peer.
202    pub async fn register(&mut self, index: u64, peers: Vec<(C, SocketAddr)>) {
203        let _ = self.sender.send(Message::Register { index, peers }).await;
204    }
205}
206
207impl<E: Spawner + Metrics, C: PublicKey> crate::Blocker for Oracle<E, C> {
208    type PublicKey = C;
209
210    async fn block(&mut self, public_key: Self::PublicKey) {
211        let _ = self.sender.send(Message::Block { public_key }).await;
212    }
213}