commonware_p2p/authenticated/discovery/
network.rs1use super::{
4 actors::{dialer, listener, router, spawner, tracker},
5 channels::{self, Channels},
6 config::Config,
7 types,
8};
9use crate::{
10 authenticated::{discovery::types::InfoVerifier, mailbox::UnboundedMailbox, Mailbox},
11 Channel,
12};
13use commonware_cryptography::Signer;
14use commonware_macros::select;
15use commonware_runtime::{
16 spawn_cell, Clock, ContextCell, Handle, Metrics, Network as RNetwork, Quota, Resolver, Spawner,
17};
18use commonware_stream::Config as StreamConfig;
19use commonware_utils::union;
20use rand_core::CryptoRngCore;
21use tracing::{debug, info};
22
23const TRACKER_SUFFIX: &[u8] = b"_TRACKER";
25
26const STREAM_SUFFIX: &[u8] = b"_STREAM";
28
29pub struct Network<E: Spawner + Clock + CryptoRngCore + RNetwork + Resolver + Metrics, C: Signer> {
31 context: ContextCell<E>,
32 cfg: Config<C>,
33
34 channels: Channels<C::PublicKey>,
35 tracker: tracker::Actor<E, C>,
36 tracker_mailbox: UnboundedMailbox<tracker::Message<C::PublicKey>>,
37 router: router::Actor<E, C::PublicKey>,
38 router_mailbox: Mailbox<router::Message<C::PublicKey>>,
39 info_verifier: InfoVerifier<C::PublicKey>,
40}
41
42impl<E: Spawner + Clock + CryptoRngCore + RNetwork + Resolver + Metrics, C: Signer> Network<E, C> {
43 pub fn new(context: E, cfg: Config<C>) -> (Self, tracker::Oracle<C::PublicKey>) {
54 let (tracker, tracker_mailbox, oracle, info_verifier) = tracker::Actor::new(
55 context.with_label("tracker"),
56 tracker::Config {
57 crypto: cfg.crypto.clone(),
58 namespace: union(&cfg.namespace, TRACKER_SUFFIX),
59 address: cfg.dialable.clone(),
60 bootstrappers: cfg.bootstrappers.clone(),
61 allow_private_ips: cfg.allow_private_ips,
62 allow_dns: cfg.allow_dns,
63 synchrony_bound: cfg.synchrony_bound,
64 tracked_peer_sets: cfg.tracked_peer_sets,
65 allowed_connection_rate_per_peer: cfg.allowed_connection_rate_per_peer,
66 peer_gossip_max_count: cfg.peer_gossip_max_count,
67 max_peer_set_size: cfg.max_peer_set_size,
68 dial_fail_limit: cfg.dial_fail_limit,
69 block_duration: cfg.block_duration,
70 },
71 );
72 let (router, router_mailbox, messenger) = router::Actor::new(
73 context.with_label("router"),
74 router::Config {
75 mailbox_size: cfg.mailbox_size,
76 },
77 );
78 let channels = Channels::new(messenger, cfg.max_message_size);
79
80 (
81 Self {
82 context: ContextCell::new(context),
83 cfg,
84
85 channels,
86 tracker,
87 tracker_mailbox,
88 router,
89 router_mailbox,
90 info_verifier,
91 },
92 oracle,
93 )
94 }
95
96 #[allow(clippy::type_complexity)]
110 pub fn register(
111 &mut self,
112 channel: Channel,
113 rate: Quota,
114 backlog: usize,
115 ) -> (
116 channels::Sender<C::PublicKey, E>,
117 channels::Receiver<C::PublicKey>,
118 ) {
119 let clock = self
120 .context
121 .with_label(&format!("channel_{channel}"))
122 .take();
123 self.channels.register(channel, rate, backlog, clock)
124 }
125
126 pub fn start(mut self) -> Handle<()> {
130 spawn_cell!(self.context, self.run().await)
131 }
132
133 async fn run(self) {
134 let mut tracker_task = self.tracker.start();
136
137 let mut router_task = self.router.start(self.channels);
139
140 let (spawner, spawner_mailbox) = spawner::Actor::new(
142 self.context.with_label("spawner"),
143 spawner::Config {
144 mailbox_size: self.cfg.mailbox_size,
145 gossip_bit_vec_frequency: self.cfg.gossip_bit_vec_frequency,
146 max_peer_set_size: self.cfg.max_peer_set_size,
147 peer_gossip_max_count: self.cfg.peer_gossip_max_count,
148 info_verifier: self.info_verifier,
149 },
150 );
151 let mut spawner_task =
152 spawner.start(self.tracker_mailbox.clone(), self.router_mailbox.clone());
153
154 let stream_cfg = StreamConfig {
156 signing_key: self.cfg.crypto,
157 namespace: union(&self.cfg.namespace, STREAM_SUFFIX),
158 max_message_size: self
159 .cfg
160 .max_message_size
161 .saturating_add(types::MAX_PAYLOAD_DATA_OVERHEAD),
162 synchrony_bound: self.cfg.synchrony_bound,
163 max_handshake_age: self.cfg.max_handshake_age,
164 handshake_timeout: self.cfg.handshake_timeout,
165 };
166 let listener = listener::Actor::new(
167 self.context.with_label("listener"),
168 listener::Config {
169 address: self.cfg.listen,
170 stream_cfg: stream_cfg.clone(),
171 allow_private_ips: self.cfg.allow_private_ips,
172 max_concurrent_handshakes: self.cfg.max_concurrent_handshakes,
173 allowed_handshake_rate_per_ip: self.cfg.allowed_handshake_rate_per_ip,
174 allowed_handshake_rate_per_subnet: self.cfg.allowed_handshake_rate_per_subnet,
175 },
176 );
177 let mut listener_task =
178 listener.start(self.tracker_mailbox.clone(), spawner_mailbox.clone());
179
180 let dialer = dialer::Actor::new(
182 self.context.with_label("dialer"),
183 dialer::Config {
184 stream_cfg,
185 dial_frequency: self.cfg.dial_frequency,
186 query_frequency: self.cfg.query_frequency,
187 allow_private_ips: self.cfg.allow_private_ips,
188 },
189 );
190 let mut dialer_task = dialer.start(self.tracker_mailbox, spawner_mailbox);
191
192 let mut shutdown = self.context.stopped();
193
194 info!("network started");
196 select! {
197 _ = &mut shutdown => {
198 debug!("context shutdown, stopping network");
199 },
200 tracker = &mut tracker_task => {
201 panic!("tracker exited unexpectedly: {tracker:?}");
202 },
203 router = &mut router_task => {
204 panic!("router exited unexpectedly: {router:?}");
205 },
206 spawner = &mut spawner_task => {
207 panic!("spawner exited unexpectedly: {spawner:?}");
208 },
209 listener = &mut listener_task => {
210 panic!("listener exited unexpectedly: {listener:?}");
211 },
212 dialer = &mut dialer_task => {
213 panic!("dialer exited unexpectedly: {dialer:?}");
214 },
215 }
216 }
217}