commonware_p2p/authenticated/lookup/actors/tracker/
ingress.rs1use super::Reservation;
2use crate::{
3 authenticated::{
4 dialing::Dialable,
5 lookup::actors::{peer, tracker::Metadata},
6 },
7 types::Address,
8 AddressableTrackedPeers, Ingress, PeerSetSubscription, TrackedPeers,
9};
10use commonware_actor::{
11 mailbox::{self, Policy},
12 Feedback,
13};
14use commonware_cryptography::PublicKey;
15use commonware_utils::{
16 channel::{mpsc, oneshot},
17 ordered::Map,
18};
19use std::{collections::VecDeque, net::IpAddr};
20
21#[derive(Debug)]
23pub enum Message<C: PublicKey> {
24 Register {
27 index: u64,
28 peers: AddressableTrackedPeers<C>,
29 },
30
31 Overwrite { peers: Map<C, Address> },
33
34 PeerSet {
37 index: u64,
39 responder: oneshot::Sender<Option<TrackedPeers<C>>>,
41 },
42 Subscribe {
44 responder: oneshot::Sender<PeerSetSubscription<C>>,
46 },
47
48 Block { public_key: C },
52
53 Connect {
56 public_key: C,
58
59 peer: peer::Mailbox,
61 },
62
63 Dialable {
66 responder: oneshot::Sender<Dialable<C>>,
68 },
69
70 Dial {
76 public_key: C,
78
79 reservation: oneshot::Sender<Option<(Reservation<C>, Ingress)>>,
81 },
82
83 Acceptable {
86 public_key: C,
88
89 source_ip: IpAddr,
91
92 responder: oneshot::Sender<bool>,
94 },
95
96 Listen {
102 public_key: C,
104
105 source_ip: IpAddr,
107
108 reservation: oneshot::Sender<Option<Reservation<C>>>,
110 },
111
112 Release {
115 metadata: Metadata<C>,
117 },
118}
119
120impl<C: PublicKey> Policy for Message<C> {
121 type Overflow = VecDeque<Self>;
122
123 fn handle(overflow: &mut Self::Overflow, message: Self) {
124 overflow.push_back(message);
125 }
126}
127
128#[derive(Clone, Debug)]
130pub struct Mailbox<C: PublicKey>(mailbox::Sender<Message<C>>);
131
132impl<C: PublicKey> Mailbox<C> {
133 pub(crate) const fn new(sender: mailbox::Sender<Message<C>>) -> Self {
134 Self(sender)
135 }
136
137 pub(crate) fn connect(&self, public_key: C, peer: peer::Mailbox) -> Feedback {
139 self.0.enqueue(Message::Connect { public_key, peer })
140 }
141
142 pub(crate) async fn dialable(&self) -> Dialable<C> {
146 let (responder, receiver) = oneshot::channel();
147 let _ = self.0.enqueue(Message::Dialable { responder });
148 receiver.await.unwrap_or_default()
149 }
150
151 pub(crate) async fn dial(&self, public_key: C) -> Option<(Reservation<C>, Ingress)> {
155 let (reservation, receiver) = oneshot::channel();
156 let _ = self.0.enqueue(Message::Dial {
157 public_key,
158 reservation,
159 });
160 receiver.await.ok().flatten()
161 }
162
163 pub(crate) async fn acceptable(&self, public_key: C, source_ip: IpAddr) -> bool {
167 let (responder, receiver) = oneshot::channel();
168 let _ = self.0.enqueue(Message::Acceptable {
169 public_key,
170 source_ip,
171 responder,
172 });
173 receiver.await.unwrap_or(false)
174 }
175
176 pub(crate) async fn listen(&self, public_key: C, source_ip: IpAddr) -> Option<Reservation<C>> {
180 let (reservation, receiver) = oneshot::channel();
181 let _ = self.0.enqueue(Message::Listen {
182 public_key,
183 source_ip,
184 reservation,
185 });
186 receiver.await.ok().flatten()
187 }
188}
189
190#[derive(Clone, Debug)]
192pub struct Releaser<C: PublicKey> {
193 sender: mailbox::Sender<Message<C>>,
194}
195
196impl<C: PublicKey> Releaser<C> {
197 pub(crate) const fn new(sender: mailbox::Sender<Message<C>>) -> Self {
199 Self { sender }
200 }
201
202 pub fn release(&mut self, metadata: Metadata<C>) -> Feedback {
204 self.sender.enqueue(Message::Release { metadata })
205 }
206}
207
208#[derive(Debug, Clone)]
213pub struct Oracle<C: PublicKey> {
214 sender: mailbox::Sender<Message<C>>,
215}
216
217impl<C: PublicKey> Oracle<C> {
218 pub(super) const fn new(sender: mailbox::Sender<Message<C>>) -> Self {
219 Self { sender }
220 }
221}
222
223impl<C: PublicKey> crate::Provider for Oracle<C> {
224 type PublicKey = C;
225
226 async fn peer_set(&mut self, id: u64) -> Option<TrackedPeers<Self::PublicKey>> {
227 let (responder, receiver) = oneshot::channel();
228 let _ = self.sender.enqueue(Message::PeerSet {
229 index: id,
230 responder,
231 });
232 receiver.await.ok().flatten()
233 }
234
235 async fn subscribe(&mut self) -> PeerSetSubscription<Self::PublicKey> {
236 let (responder, receiver) = oneshot::channel();
237 let _ = self.sender.enqueue(Message::Subscribe { responder });
238 receiver.await.unwrap_or_else(|_| {
239 let (_, rx) = mpsc::unbounded_channel();
240 rx
241 })
242 }
243}
244
245impl<C: PublicKey> crate::AddressableManager for Oracle<C> {
246 fn track<R>(&mut self, index: u64, peers: R) -> Feedback
247 where
248 R: Into<AddressableTrackedPeers<Self::PublicKey>> + Send,
249 {
250 self.sender.enqueue(Message::Register {
251 index,
252 peers: peers.into(),
253 })
254 }
255
256 fn overwrite(&mut self, peers: Map<Self::PublicKey, Address>) -> Feedback {
257 self.sender.enqueue(Message::Overwrite { peers })
258 }
259}
260
261impl<C: PublicKey> crate::Blocker for Oracle<C> {
262 type PublicKey = C;
263
264 fn block(&mut self, public_key: Self::PublicKey) -> Feedback {
265 self.sender.enqueue(Message::Block { public_key })
266 }
267}