commonware_p2p/authenticated/actors/tracker/
ingress.rs1use crate::authenticated::{actors::peer, types};
2use commonware_cryptography::Verifier;
3use commonware_runtime::{Metrics, Spawner};
4use futures::{
5 channel::{mpsc, oneshot},
6 SinkExt,
7};
8use std::net::SocketAddr;
9
10pub enum Message<E: Spawner + Metrics, C: Verifier> {
11 Register {
13 index: u64,
14 peers: Vec<C::PublicKey>,
15 },
16
17 Construct {
19 public_key: C::PublicKey,
20 peer: peer::Mailbox<C>,
21 },
22 BitVec {
23 bit_vec: types::BitVec,
24 peer: peer::Mailbox<C>,
25 },
26 Peers {
27 peers: Vec<types::PeerInfo<C>>,
28 peer: peer::Mailbox<C>,
29 },
30
31 Dialable {
33 #[allow(clippy::type_complexity)]
34 peers: oneshot::Sender<Vec<(C::PublicKey, SocketAddr, Reservation<E, C>)>>,
35 },
36
37 Reserve {
39 peer: C::PublicKey,
40 reservation: oneshot::Sender<Option<Reservation<E, C>>>,
41 },
42
43 Release {
45 peer: C::PublicKey,
46 },
47}
48
49#[derive(Clone)]
50pub struct Mailbox<E: Spawner + Metrics, C: Verifier> {
51 sender: mpsc::Sender<Message<E, C>>,
52}
53
54impl<E: Spawner + Metrics, C: Verifier> Mailbox<E, C> {
55 pub(super) fn new(sender: mpsc::Sender<Message<E, C>>) -> Self {
56 Self { sender }
57 }
58
59 pub async fn construct(&mut self, public_key: C::PublicKey, peer: peer::Mailbox<C>) {
60 self.sender
61 .send(Message::Construct { public_key, peer })
62 .await
63 .unwrap();
64 }
65
66 pub async fn bit_vec(&mut self, bit_vec: types::BitVec, peer: peer::Mailbox<C>) {
67 self.sender
68 .send(Message::BitVec { bit_vec, peer })
69 .await
70 .unwrap();
71 }
72
73 pub async fn peers(&mut self, peers: Vec<types::PeerInfo<C>>, peer: peer::Mailbox<C>) {
74 self.sender
75 .send(Message::Peers { peers, peer })
76 .await
77 .unwrap();
78 }
79
80 pub async fn dialable(&mut self) -> Vec<(C::PublicKey, SocketAddr, Reservation<E, C>)> {
81 let (response, receiver) = oneshot::channel();
82 self.sender
83 .send(Message::Dialable { peers: response })
84 .await
85 .unwrap();
86 receiver.await.unwrap()
87 }
88
89 pub async fn reserve(&mut self, peer: C::PublicKey) -> Option<Reservation<E, C>> {
90 let (tx, rx) = oneshot::channel();
91 self.sender
92 .send(Message::Reserve {
93 peer,
94 reservation: tx,
95 })
96 .await
97 .unwrap();
98 rx.await.unwrap()
99 }
100
101 pub fn try_release(&mut self, peer: C::PublicKey) -> bool {
102 let Err(e) = self.sender.try_send(Message::Release { peer }) else {
103 return true;
104 };
105 if e.is_full() {
106 return false;
107 }
108
109 panic!(
111 "unexpected error while trying to release reservation: {:?}",
112 e
113 );
114 }
115
116 pub async fn release(&mut self, peer: C::PublicKey) {
117 self.sender.send(Message::Release { peer }).await.unwrap();
118 }
119}
120
121#[derive(Clone)]
126pub struct Oracle<E: Spawner + Metrics, C: Verifier> {
127 sender: mpsc::Sender<Message<E, C>>,
128}
129
130impl<E: Spawner + Metrics, C: Verifier> Oracle<E, C> {
131 pub(super) fn new(sender: mpsc::Sender<Message<E, C>>) -> Self {
132 Self { sender }
133 }
134
135 pub async fn register(&mut self, index: u64, peers: Vec<C::PublicKey>) {
147 let _ = self.sender.send(Message::Register { index, peers }).await;
148 }
149}
150
151pub struct Reservation<E: Spawner + Metrics, C: Verifier> {
152 context: E,
153 closer: Option<(C::PublicKey, Mailbox<E, C>)>,
154}
155
156impl<E: Spawner + Metrics, C: Verifier> Reservation<E, C> {
157 pub fn new(context: E, peer: C::PublicKey, mailbox: Mailbox<E, C>) -> Self {
158 Self {
159 context,
160 closer: Some((peer, mailbox)),
161 }
162 }
163}
164
165impl<E: Spawner + Metrics, C: Verifier> Drop for Reservation<E, C> {
166 fn drop(&mut self) {
167 let (peer, mut mailbox) = self.closer.take().unwrap();
168
169 if mailbox.try_release(peer.clone()) {
171 return;
172 }
173
174 self.context.spawn_ref()(async move {
177 mailbox.release(peer).await;
178 });
179 }
180}