Skip to main content

commonware_p2p/authenticated/lookup/actors/tracker/
ingress.rs

1use super::Reservation;
2use crate::{
3    authenticated::{
4        lookup::actors::{peer, tracker::Metadata},
5        mailbox::UnboundedMailbox,
6        Mailbox,
7    },
8    types::Address,
9    Ingress, PeerSetSubscription,
10};
11use commonware_cryptography::PublicKey;
12use commonware_utils::{
13    channel::{fallible::FallibleExt, mpsc, oneshot},
14    ordered::{Map, Set},
15};
16use std::net::IpAddr;
17
18/// Messages that can be sent to the tracker actor.
19#[derive(Debug)]
20pub enum Message<C: PublicKey> {
21    // ---------- Used by oracle ----------
22    /// Register a peer set at a given index.
23    Register { index: u64, peers: Map<C, Address> },
24
25    /// Update addresses for multiple peers without creating a new peer set.
26    Overwrite { peers: Map<C, Address> },
27
28    // ---------- Used by peer set provider ----------
29    /// Fetch the peer set at a given index.
30    PeerSet {
31        /// The index of the peer set to fetch.
32        index: u64,
33        /// One-shot channel to send the peer set.
34        responder: oneshot::Sender<Option<Set<C>>>,
35    },
36    /// Subscribe to notifications when new peer sets are added.
37    Subscribe {
38        /// One-shot channel to send the subscription receiver.
39        responder: oneshot::Sender<PeerSetSubscription<C>>,
40    },
41
42    // ---------- Used by blocker ----------
43    /// Block a peer, disconnecting them if currently connected and preventing future connections
44    /// for as long as the peer remains in at least one active peer set.
45    Block { public_key: C },
46
47    // ---------- Used by peer ----------
48    /// Notify the tracker that a peer has been successfully connected.
49    Connect {
50        /// The public key of the peer.
51        public_key: C,
52
53        /// The mailbox of the peer actor.
54        peer: Mailbox<peer::Message>,
55    },
56
57    // ---------- Used by dialer ----------
58    /// Request a list of dialable peers.
59    Dialable {
60        /// One-shot channel to send the list of dialable peers.
61        responder: oneshot::Sender<Vec<C>>,
62    },
63
64    /// Request a reservation for a particular peer to dial.
65    ///
66    /// The tracker will respond with an [`Option<(Reservation<C>, Ingress)>`], which will be
67    /// `None` if the reservation cannot be granted (e.g., if the peer is already connected,
68    /// blocked or already has an active reservation).
69    Dial {
70        /// The public key of the peer to reserve.
71        public_key: C,
72
73        /// Sender to respond with the reservation and ingress address.
74        reservation: oneshot::Sender<Option<(Reservation<C>, Ingress)>>,
75    },
76
77    // ---------- Used by listener ----------
78    /// Check if a peer is acceptable (can accept an incoming connection from them).
79    Acceptable {
80        /// The public key of the peer to check.
81        public_key: C,
82
83        /// The IP address the peer connected from.
84        source_ip: IpAddr,
85
86        /// The sender to respond with whether the peer is acceptable.
87        responder: oneshot::Sender<bool>,
88    },
89
90    /// Request a reservation for a particular peer.
91    ///
92    /// The tracker will respond with an [`Option<Reservation<C>>`], which will be `None` if  the
93    /// reservation cannot be granted (e.g., if the peer is already connected, blocked or already
94    /// has an active reservation).
95    Listen {
96        /// The public key of the peer to reserve.
97        public_key: C,
98
99        /// The sender to respond with the reservation.
100        reservation: oneshot::Sender<Option<Reservation<C>>>,
101    },
102
103    // ---------- Used by reservation ----------
104    /// Release a reservation.
105    Release {
106        /// The metadata of the reservation to release.
107        metadata: Metadata<C>,
108    },
109}
110
111impl<C: PublicKey> UnboundedMailbox<Message<C>> {
112    /// Send a `Connect` message to the tracker.
113    pub fn connect(&mut self, public_key: C, peer: Mailbox<peer::Message>) {
114        self.0.send_lossy(Message::Connect { public_key, peer });
115    }
116
117    /// Request a list of dialable peers from the tracker.
118    ///
119    /// Returns an empty list if the tracker is shut down.
120    pub async fn dialable(&mut self) -> Vec<C> {
121        self.0
122            .request_or_default(|responder| Message::Dialable { responder })
123            .await
124    }
125
126    /// Send a `Dial` message to the tracker.
127    ///
128    /// Returns `None` if the tracker is shut down.
129    pub async fn dial(&mut self, public_key: C) -> Option<(Reservation<C>, Ingress)> {
130        self.0
131            .request(|reservation| Message::Dial {
132                public_key,
133                reservation,
134            })
135            .await
136            .flatten()
137    }
138
139    /// Send an `Acceptable` message to the tracker.
140    ///
141    /// Returns `false` if the tracker is shut down.
142    pub async fn acceptable(&mut self, public_key: C, source_ip: IpAddr) -> bool {
143        self.0
144            .request_or(
145                |responder| Message::Acceptable {
146                    public_key,
147                    source_ip,
148                    responder,
149                },
150                false,
151            )
152            .await
153    }
154
155    /// Send a `Listen` message to the tracker.
156    ///
157    /// Returns `None` if the tracker is shut down.
158    pub async fn listen(&mut self, public_key: C) -> Option<Reservation<C>> {
159        self.0
160            .request(|reservation| Message::Listen {
161                public_key,
162                reservation,
163            })
164            .await
165            .flatten()
166    }
167}
168
169/// Allows releasing reservations
170#[derive(Clone, Debug)]
171pub struct Releaser<C: PublicKey> {
172    sender: UnboundedMailbox<Message<C>>,
173}
174
175impl<C: PublicKey> Releaser<C> {
176    /// Create a new releaser.
177    pub(crate) const fn new(sender: UnboundedMailbox<Message<C>>) -> Self {
178        Self { sender }
179    }
180
181    /// Release a reservation.
182    pub fn release(&mut self, metadata: Metadata<C>) {
183        self.sender.0.send_lossy(Message::Release { metadata });
184    }
185}
186
187/// Mechanism to register authorized peers.
188///
189/// Peers that are not explicitly authorized
190/// will be blocked by commonware-p2p.
191#[derive(Debug, Clone)]
192pub struct Oracle<C: PublicKey> {
193    sender: UnboundedMailbox<Message<C>>,
194}
195
196impl<C: PublicKey> Oracle<C> {
197    pub(super) const fn new(sender: UnboundedMailbox<Message<C>>) -> Self {
198        Self { sender }
199    }
200}
201
202impl<C: PublicKey> crate::Provider for Oracle<C> {
203    type PublicKey = C;
204
205    async fn peer_set(&mut self, id: u64) -> Option<Set<Self::PublicKey>> {
206        self.sender
207            .0
208            .request(|responder| Message::PeerSet {
209                index: id,
210                responder,
211            })
212            .await
213            .flatten()
214    }
215
216    async fn subscribe(&mut self) -> PeerSetSubscription<Self::PublicKey> {
217        self.sender
218            .0
219            .request(|responder| Message::Subscribe { responder })
220            .await
221            .unwrap_or_else(|| {
222                let (_, rx) = mpsc::unbounded_channel();
223                rx
224            })
225    }
226}
227
228impl<C: PublicKey> crate::AddressableManager for Oracle<C> {
229    async fn track(&mut self, index: u64, peers: Map<Self::PublicKey, Address>) {
230        self.sender.0.send_lossy(Message::Register { index, peers });
231    }
232
233    async fn overwrite(&mut self, peers: Map<Self::PublicKey, Address>) {
234        self.sender.0.send_lossy(Message::Overwrite { peers });
235    }
236}
237
238impl<C: PublicKey> crate::Blocker for Oracle<C> {
239    type PublicKey = C;
240
241    async fn block(&mut self, public_key: Self::PublicKey) {
242        self.sender.0.send_lossy(Message::Block { public_key });
243    }
244}