commonware_p2p/authenticated/lookup/actors/tracker/
ingress.rs1use super::Reservation;
2use crate::{
3 authenticated::{
4 dialing::Dialable,
5 lookup::actors::{peer, tracker::Metadata},
6 mailbox::UnboundedMailbox,
7 Mailbox,
8 },
9 types::Address,
10 AddressableTrackedPeers, Ingress, PeerSetSubscription, TrackedPeers,
11};
12use commonware_cryptography::PublicKey;
13use commonware_utils::{
14 channel::{fallible::FallibleExt, mpsc, oneshot},
15 ordered::Map,
16};
17use std::net::IpAddr;
18
19#[derive(Debug)]
21pub enum Message<C: PublicKey> {
22 Register {
25 index: u64,
26 peers: AddressableTrackedPeers<C>,
27 },
28
29 Overwrite { peers: Map<C, Address> },
31
32 PeerSet {
35 index: u64,
37 responder: oneshot::Sender<Option<TrackedPeers<C>>>,
39 },
40 Subscribe {
42 responder: oneshot::Sender<PeerSetSubscription<C>>,
44 },
45
46 Block { public_key: C },
50
51 Connect {
54 public_key: C,
56
57 peer: Mailbox<peer::Message>,
59 },
60
61 Dialable {
64 responder: oneshot::Sender<Dialable<C>>,
66 },
67
68 Dial {
74 public_key: C,
76
77 reservation: oneshot::Sender<Option<(Reservation<C>, Ingress)>>,
79 },
80
81 Acceptable {
84 public_key: C,
86
87 source_ip: IpAddr,
89
90 responder: oneshot::Sender<bool>,
92 },
93
94 Listen {
100 public_key: C,
102
103 reservation: oneshot::Sender<Option<Reservation<C>>>,
105 },
106
107 Release {
110 metadata: Metadata<C>,
112 },
113}
114
115impl<C: PublicKey> UnboundedMailbox<Message<C>> {
116 pub fn connect(&mut self, public_key: C, peer: Mailbox<peer::Message>) {
118 self.0.send_lossy(Message::Connect { public_key, peer });
119 }
120
121 pub async fn dialable(&mut self) -> Dialable<C> {
125 self.0
126 .request_or_default(|responder| Message::Dialable { responder })
127 .await
128 }
129
130 pub async fn dial(&mut self, public_key: C) -> Option<(Reservation<C>, Ingress)> {
134 self.0
135 .request(|reservation| Message::Dial {
136 public_key,
137 reservation,
138 })
139 .await
140 .flatten()
141 }
142
143 pub async fn acceptable(&mut self, public_key: C, source_ip: IpAddr) -> bool {
147 self.0
148 .request_or(
149 |responder| Message::Acceptable {
150 public_key,
151 source_ip,
152 responder,
153 },
154 false,
155 )
156 .await
157 }
158
159 pub async fn listen(&mut self, public_key: C) -> Option<Reservation<C>> {
163 self.0
164 .request(|reservation| Message::Listen {
165 public_key,
166 reservation,
167 })
168 .await
169 .flatten()
170 }
171}
172
173#[derive(Clone, Debug)]
175pub struct Releaser<C: PublicKey> {
176 sender: UnboundedMailbox<Message<C>>,
177}
178
179impl<C: PublicKey> Releaser<C> {
180 pub(crate) const fn new(sender: UnboundedMailbox<Message<C>>) -> Self {
182 Self { sender }
183 }
184
185 pub fn release(&mut self, metadata: Metadata<C>) {
187 self.sender.0.send_lossy(Message::Release { metadata });
188 }
189}
190
191#[derive(Debug, Clone)]
196pub struct Oracle<C: PublicKey> {
197 sender: UnboundedMailbox<Message<C>>,
198}
199
200impl<C: PublicKey> Oracle<C> {
201 pub(super) const fn new(sender: UnboundedMailbox<Message<C>>) -> Self {
202 Self { sender }
203 }
204}
205
206impl<C: PublicKey> crate::Provider for Oracle<C> {
207 type PublicKey = C;
208
209 async fn peer_set(&mut self, id: u64) -> Option<TrackedPeers<Self::PublicKey>> {
210 self.sender
211 .0
212 .request(|responder| Message::PeerSet {
213 index: id,
214 responder,
215 })
216 .await
217 .flatten()
218 }
219
220 async fn subscribe(&mut self) -> PeerSetSubscription<Self::PublicKey> {
221 self.sender
222 .0
223 .request(|responder| Message::Subscribe { responder })
224 .await
225 .unwrap_or_else(|| {
226 let (_, rx) = mpsc::unbounded_channel();
227 rx
228 })
229 }
230}
231
232impl<C: PublicKey> crate::AddressableManager for Oracle<C> {
233 async fn track<R>(&mut self, index: u64, peers: R)
234 where
235 R: Into<AddressableTrackedPeers<Self::PublicKey>> + Send,
236 {
237 self.sender.0.send_lossy(Message::Register {
238 index,
239 peers: peers.into(),
240 });
241 }
242
243 async fn overwrite(&mut self, peers: Map<Self::PublicKey, Address>) {
244 self.sender.0.send_lossy(Message::Overwrite { peers });
245 }
246}
247
248impl<C: PublicKey> crate::Blocker for Oracle<C> {
249 type PublicKey = C;
250
251 async fn block(&mut self, public_key: Self::PublicKey) {
252 self.sender.0.send_lossy(Message::Block { public_key });
253 }
254}