commonware_p2p/authenticated/discovery/
network.rs1use super::{
4 actors::{dialer, listener, router, spawner, tracker},
5 channels::{self, Channels},
6 config::Config,
7 types,
8};
9use crate::{
10 authenticated::{discovery::types::InfoVerifier, mailbox::UnboundedMailbox, Mailbox},
11 Channel,
12};
13use commonware_cryptography::Signer;
14use commonware_macros::select;
15use commonware_runtime::{
16 spawn_cell, BufferPooler, Clock, ContextCell, Handle, Metrics, Network as RNetwork, Quota,
17 Resolver, Spawner,
18};
19use commonware_stream::encrypted::Config as StreamConfig;
20use commonware_utils::union;
21use rand_core::CryptoRngCore;
22use tracing::{debug, info};
23
24const TRACKER_SUFFIX: &[u8] = b"_TRACKER";
26
27const STREAM_SUFFIX: &[u8] = b"_STREAM";
29
30pub struct Network<
32 E: Spawner + BufferPooler + Clock + CryptoRngCore + RNetwork + Resolver + Metrics,
33 C: Signer,
34> {
35 context: ContextCell<E>,
36 cfg: Config<C>,
37
38 channels: Channels<C::PublicKey>,
39 tracker: tracker::Actor<E, C>,
40 tracker_mailbox: UnboundedMailbox<tracker::Message<C::PublicKey>>,
41 router: router::Actor<E, C::PublicKey>,
42 router_mailbox: Mailbox<router::Message<C::PublicKey>>,
43 info_verifier: InfoVerifier<C::PublicKey>,
44}
45
46impl<
47 E: Spawner + BufferPooler + Clock + CryptoRngCore + RNetwork + Resolver + Metrics,
48 C: Signer,
49 > Network<E, C>
50{
51 pub fn new(context: E, cfg: Config<C>) -> (Self, tracker::Oracle<C::PublicKey>) {
62 let (tracker, tracker_mailbox, oracle, info_verifier) = tracker::Actor::new(
63 context.with_label("tracker"),
64 tracker::Config {
65 crypto: cfg.crypto.clone(),
66 namespace: union(&cfg.namespace, TRACKER_SUFFIX),
67 address: cfg.dialable.clone(),
68 bootstrappers: cfg.bootstrappers.clone(),
69 allow_private_ips: cfg.allow_private_ips,
70 allow_dns: cfg.allow_dns,
71 synchrony_bound: cfg.synchrony_bound,
72 tracked_peer_sets: cfg.tracked_peer_sets,
73 allowed_connection_rate_per_peer: cfg.allowed_connection_rate_per_peer,
74 peer_gossip_max_count: cfg.peer_gossip_max_count,
75 max_peer_set_size: cfg.max_peer_set_size,
76 dial_fail_limit: cfg.dial_fail_limit,
77 block_duration: cfg.block_duration,
78 },
79 );
80 let (router, router_mailbox, messenger) = router::Actor::new(
81 context.with_label("router"),
82 router::Config {
83 mailbox_size: cfg.mailbox_size,
84 },
85 );
86 let channels = Channels::new(messenger, cfg.max_message_size);
87
88 (
89 Self {
90 context: ContextCell::new(context),
91 cfg,
92
93 channels,
94 tracker,
95 tracker_mailbox,
96 router,
97 router_mailbox,
98 info_verifier,
99 },
100 oracle,
101 )
102 }
103
104 #[allow(clippy::type_complexity)]
118 pub fn register(
119 &mut self,
120 channel: Channel,
121 rate: Quota,
122 backlog: usize,
123 ) -> (
124 channels::Sender<C::PublicKey, E>,
125 channels::Receiver<C::PublicKey>,
126 ) {
127 let clock = self
128 .context
129 .with_label("channel")
130 .with_attribute("idx", channel)
131 .take();
132 self.channels.register(channel, rate, backlog, clock)
133 }
134
135 pub fn start(mut self) -> Handle<()> {
139 spawn_cell!(self.context, self.run().await)
140 }
141
142 async fn run(self) {
143 let mut tracker_task = self.tracker.start();
145
146 let mut router_task = self.router.start(self.channels);
148
149 let (spawner, spawner_mailbox) = spawner::Actor::new(
151 self.context.with_label("spawner"),
152 spawner::Config {
153 mailbox_size: self.cfg.mailbox_size,
154 gossip_bit_vec_frequency: self.cfg.gossip_bit_vec_frequency,
155 max_peer_set_size: self.cfg.max_peer_set_size,
156 peer_gossip_max_count: self.cfg.peer_gossip_max_count,
157 info_verifier: self.info_verifier,
158 },
159 );
160 let mut spawner_task =
161 spawner.start(self.tracker_mailbox.clone(), self.router_mailbox.clone());
162
163 let stream_cfg = StreamConfig {
165 signing_key: self.cfg.crypto,
166 namespace: union(&self.cfg.namespace, STREAM_SUFFIX),
167 max_message_size: self
168 .cfg
169 .max_message_size
170 .saturating_add(types::MAX_PAYLOAD_DATA_OVERHEAD),
171 synchrony_bound: self.cfg.synchrony_bound,
172 max_handshake_age: self.cfg.max_handshake_age,
173 handshake_timeout: self.cfg.handshake_timeout,
174 };
175 let listener = listener::Actor::new(
176 self.context.with_label("listener"),
177 listener::Config {
178 address: self.cfg.listen,
179 stream_cfg: stream_cfg.clone(),
180 allow_private_ips: self.cfg.allow_private_ips,
181 max_concurrent_handshakes: self.cfg.max_concurrent_handshakes,
182 allowed_handshake_rate_per_ip: self.cfg.allowed_handshake_rate_per_ip,
183 allowed_handshake_rate_per_subnet: self.cfg.allowed_handshake_rate_per_subnet,
184 },
185 );
186 let mut listener_task =
187 listener.start(self.tracker_mailbox.clone(), spawner_mailbox.clone());
188
189 let dialer = dialer::Actor::new(
191 self.context.with_label("dialer"),
192 dialer::Config {
193 stream_cfg,
194 dial_frequency: self.cfg.dial_frequency,
195 query_frequency: self.cfg.query_frequency,
196 allow_private_ips: self.cfg.allow_private_ips,
197 },
198 );
199 let mut dialer_task = dialer.start(self.tracker_mailbox, spawner_mailbox);
200
201 let mut shutdown = self.context.stopped();
202
203 info!("network started");
205 select! {
206 _ = &mut shutdown => {
207 debug!("context shutdown, stopping network");
208 },
209 tracker = &mut tracker_task => {
210 panic!("tracker exited unexpectedly: {tracker:?}");
211 },
212 router = &mut router_task => {
213 panic!("router exited unexpectedly: {router:?}");
214 },
215 spawner = &mut spawner_task => {
216 panic!("spawner exited unexpectedly: {spawner:?}");
217 },
218 listener = &mut listener_task => {
219 panic!("listener exited unexpectedly: {listener:?}");
220 },
221 dialer = &mut dialer_task => {
222 panic!("dialer exited unexpectedly: {dialer:?}");
223 },
224 }
225 }
226}