commonware_p2p/authenticated/discovery/
network.rs1use super::{
4 actors::{dialer, listener, router, spawner, tracker},
5 channels::{self, Channels},
6 config::Config,
7 types,
8};
9use crate::{authenticated::Mailbox, Channel};
10use commonware_cryptography::Signer;
11use commonware_macros::select;
12use commonware_runtime::{Clock, Handle, Metrics, Network as RNetwork, Spawner};
13use commonware_stream::public_key;
14use commonware_utils::union;
15use governor::{clock::ReasonablyRealtime, Quota};
16use rand::{CryptoRng, Rng};
17use tracing::{debug, info, warn};
18
19const TRACKER_SUFFIX: &[u8] = b"_TRACKER";
21
22const STREAM_SUFFIX: &[u8] = b"_STREAM";
24
25pub struct Network<
27 E: Spawner + Clock + ReasonablyRealtime + Rng + CryptoRng + RNetwork + Metrics,
28 C: Signer,
29> {
30 context: E,
31 cfg: Config<C>,
32
33 channels: Channels<C::PublicKey>,
34 tracker: tracker::Actor<E, C>,
35 tracker_mailbox: Mailbox<tracker::Message<E, C::PublicKey>>,
36 router: router::Actor<E, C::PublicKey>,
37 router_mailbox: Mailbox<router::Message<C::PublicKey>>,
38}
39
40impl<E: Spawner + Clock + ReasonablyRealtime + Rng + CryptoRng + RNetwork + Metrics, C: Signer>
41 Network<E, C>
42{
43 pub fn new(context: E, cfg: Config<C>) -> (Self, tracker::Oracle<E, C::PublicKey>) {
54 let (tracker, tracker_mailbox, oracle) = tracker::Actor::new(
55 context.with_label("tracker"),
56 tracker::Config {
57 crypto: cfg.crypto.clone(),
58 namespace: union(&cfg.namespace, TRACKER_SUFFIX),
59 address: cfg.dialable,
60 bootstrappers: cfg.bootstrappers.clone(),
61 allow_private_ips: cfg.allow_private_ips,
62 mailbox_size: cfg.mailbox_size,
63 synchrony_bound: cfg.synchrony_bound,
64 tracked_peer_sets: cfg.tracked_peer_sets,
65 allowed_connection_rate_per_peer: cfg.allowed_connection_rate_per_peer,
66 peer_gossip_max_count: cfg.peer_gossip_max_count,
67 max_peer_set_size: cfg.max_peer_set_size,
68 dial_fail_limit: cfg.dial_fail_limit,
69 },
70 );
71 let (router, router_mailbox, messenger) = router::Actor::new(
72 context.with_label("router"),
73 router::Config {
74 mailbox_size: cfg.mailbox_size,
75 },
76 );
77 let channels = Channels::new(messenger, cfg.max_message_size);
78
79 (
80 Self {
81 context,
82 cfg,
83
84 channels,
85 tracker,
86 tracker_mailbox,
87 router,
88 router_mailbox,
89 },
90 oracle,
91 )
92 }
93
94 pub fn register(
108 &mut self,
109 channel: Channel,
110 rate: Quota,
111 backlog: usize,
112 ) -> (
113 channels::Sender<C::PublicKey>,
114 channels::Receiver<C::PublicKey>,
115 ) {
116 self.channels.register(channel, rate, backlog)
117 }
118
119 pub fn start(mut self) -> Handle<()> {
123 self.context.spawn_ref()(self.run())
124 }
125
126 async fn run(self) {
127 let mut tracker_task = self.tracker.start();
129
130 let mut router_task = self.router.start(self.channels);
132
133 let (spawner, spawner_mailbox) = spawner::Actor::new(
135 self.context.with_label("spawner"),
136 spawner::Config {
137 mailbox_size: self.cfg.mailbox_size,
138 gossip_bit_vec_frequency: self.cfg.gossip_bit_vec_frequency,
139 allowed_bit_vec_rate: self.cfg.allowed_bit_vec_rate,
140 max_peer_set_size: self.cfg.max_peer_set_size,
141 allowed_peers_rate: self.cfg.allowed_peers_rate,
142 peer_gossip_max_count: self.cfg.peer_gossip_max_count,
143 },
144 );
145 let mut spawner_task =
146 spawner.start(self.tracker_mailbox.clone(), self.router_mailbox.clone());
147
148 let stream_cfg = public_key::Config {
150 crypto: self.cfg.crypto,
151 namespace: union(&self.cfg.namespace, STREAM_SUFFIX),
152 max_message_size: self.cfg.max_message_size + types::MAX_PAYLOAD_DATA_OVERHEAD,
153 synchrony_bound: self.cfg.synchrony_bound,
154 max_handshake_age: self.cfg.max_handshake_age,
155 handshake_timeout: self.cfg.handshake_timeout,
156 };
157 let listener = listener::Actor::new(
158 self.context.with_label("listener"),
159 listener::Config {
160 address: self.cfg.listen,
161 stream_cfg: stream_cfg.clone(),
162 allowed_incoming_connection_rate: self.cfg.allowed_incoming_connection_rate,
163 },
164 );
165 let mut listener_task =
166 listener.start(self.tracker_mailbox.clone(), spawner_mailbox.clone());
167
168 let dialer = dialer::Actor::new(
170 self.context.with_label("dialer"),
171 dialer::Config {
172 stream_cfg,
173 dial_frequency: self.cfg.dial_frequency,
174 query_frequency: self.cfg.query_frequency,
175 },
176 );
177 let mut dialer_task = dialer.start(self.tracker_mailbox, spawner_mailbox);
178
179 info!("network started");
181 let err = select! {
182 tracker = &mut tracker_task => {
183 debug!("tracker exited");
184 tracker
185 },
186 router = &mut router_task => {
187 debug!("router exited");
188 router
189 },
190 spawner = &mut spawner_task => {
191 debug!("spawner exited");
192 spawner
193 },
194 listener = &mut listener_task => {
195 debug!("listener exited");
196 listener
197 },
198 dialer = &mut dialer_task => {
199 debug!("dialer exited");
200 dialer
201 },
202 }
203 .unwrap_err();
204
205 tracker_task.abort();
207 router_task.abort();
208 spawner_task.abort();
209 listener_task.abort();
210 dialer_task.abort();
211
212 warn!(error=?err, "network shutdown")
214 }
215}