commonware_p2p/authenticated/discovery/
network.rs1use super::{
4 actors::{dialer, listener, router, spawner, tracker},
5 channels::{self, Channels},
6 config::Config,
7 types,
8};
9use crate::{authenticated::discovery::types::InfoVerifier, Channel};
10use commonware_cryptography::Signer;
11use commonware_macros::select;
12use commonware_runtime::{
13 spawn_cell, BufferPooler, Clock, ContextCell, Handle, Metrics, Network as RNetwork, Quota,
14 Resolver, Spawner,
15};
16use commonware_stream::encrypted::Config as StreamConfig;
17use commonware_utils::union;
18use rand_core::CryptoRngCore;
19use tracing::{debug, info};
20
21const TRACKER_SUFFIX: &[u8] = b"_TRACKER";
23
24const STREAM_SUFFIX: &[u8] = b"_STREAM";
26
27pub struct Network<
29 E: Spawner + BufferPooler + Clock + CryptoRngCore + RNetwork + Resolver + Metrics,
30 C: Signer,
31> {
32 context: ContextCell<E>,
33 cfg: Config<C>,
34
35 channels: Channels<C::PublicKey>,
36 tracker: tracker::Actor<E, C>,
37 tracker_mailbox: tracker::Mailbox<C::PublicKey>,
38 router: router::Actor<E, C::PublicKey>,
39 router_mailbox: router::Mailbox<C::PublicKey>,
40 info_verifier: InfoVerifier<C::PublicKey>,
41}
42
43impl<
44 E: Spawner + BufferPooler + Clock + CryptoRngCore + RNetwork + Resolver + Metrics,
45 C: Signer,
46 > Network<E, C>
47{
48 pub fn new(context: E, cfg: Config<C>) -> (Self, tracker::Oracle<C::PublicKey>) {
59 let (tracker, tracker_mailbox, oracle, info_verifier) = tracker::Actor::new(
60 context.child("tracker"),
61 tracker::Config {
62 crypto: cfg.crypto.clone(),
63 namespace: union(&cfg.namespace, TRACKER_SUFFIX),
64 address: cfg.dialable.clone(),
65 bootstrappers: cfg.bootstrappers.clone(),
66 allow_private_ips: cfg.allow_private_ips,
67 allow_dns: cfg.allow_dns,
68 synchrony_bound: cfg.synchrony_bound,
69 mailbox_size: cfg.mailbox_size,
70 tracked_peer_sets: cfg.tracked_peer_sets,
71 peer_connection_cooldown: cfg.peer_connection_cooldown,
72 peer_gossip_max_count: cfg.peer_gossip_max_count,
73 max_peer_set_size: cfg.max_peer_set_size,
74 dial_fail_limit: cfg.dial_fail_limit,
75 block_duration: cfg.block_duration,
76 },
77 );
78 let (router, router_mailbox, messenger) = router::Actor::new(
79 context.child("router"),
80 router::Config {
81 mailbox_size: cfg.mailbox_size,
82 },
83 );
84 let channels = Channels::new(messenger, cfg.max_message_size);
85
86 (
87 Self {
88 context: ContextCell::new(context),
89 cfg,
90
91 channels,
92 tracker,
93 tracker_mailbox,
94 router,
95 router_mailbox,
96 info_verifier,
97 },
98 oracle,
99 )
100 }
101
102 #[allow(clippy::type_complexity)]
116 pub fn register(
117 &mut self,
118 channel: Channel,
119 rate: Quota,
120 backlog: usize,
121 ) -> (
122 channels::Sender<C::PublicKey, E>,
123 channels::Receiver<C::PublicKey>,
124 ) {
125 let context = self
126 .context
127 .child("channel")
128 .with_attribute("index", channel);
129 self.channels.register(channel, rate, backlog, context)
130 }
131
132 pub fn start(mut self) -> Handle<()> {
136 spawn_cell!(self.context, self.run())
137 }
138
139 async fn run(self) {
140 let mut tracker_task = self.tracker.start();
142
143 let mut router_task = self.router.start(self.channels);
145
146 let (spawner, spawner_mailbox) = spawner::Actor::new(
148 self.context.child("spawner"),
149 spawner::Config {
150 mailbox_size: self.cfg.mailbox_size,
151 send_batch_size: self.cfg.send_batch_size,
152 gossip_bit_vec_frequency: self.cfg.gossip_bit_vec_frequency,
153 max_peer_set_size: self.cfg.max_peer_set_size,
154 peer_gossip_max_count: self.cfg.peer_gossip_max_count,
155 info_verifier: self.info_verifier,
156 },
157 );
158 let mut spawner_task =
159 spawner.start(self.tracker_mailbox.clone(), self.router_mailbox.clone());
160
161 let stream_cfg = StreamConfig {
163 signing_key: self.cfg.crypto,
164 namespace: union(&self.cfg.namespace, STREAM_SUFFIX),
165 max_message_size: self
166 .cfg
167 .max_message_size
168 .saturating_add(types::MAX_PAYLOAD_DATA_OVERHEAD),
169 synchrony_bound: self.cfg.synchrony_bound,
170 max_handshake_age: self.cfg.max_handshake_age,
171 handshake_timeout: self.cfg.handshake_timeout,
172 };
173 let listener = listener::Actor::new(
174 self.context.child("listener"),
175 listener::Config {
176 address: self.cfg.listen,
177 stream_cfg: stream_cfg.clone(),
178 allow_private_ips: self.cfg.allow_private_ips,
179 max_concurrent_handshakes: self.cfg.max_concurrent_handshakes,
180 allowed_handshake_rate_per_ip: self.cfg.allowed_handshake_rate_per_ip,
181 allowed_handshake_rate_per_subnet: self.cfg.allowed_handshake_rate_per_subnet,
182 },
183 );
184 let mut listener_task =
185 listener.start(self.tracker_mailbox.clone(), spawner_mailbox.clone());
186
187 let dialer = dialer::Actor::new(
189 self.context.child("dialer"),
190 dialer::Config {
191 stream_cfg,
192 dial_frequency: self.cfg.dial_frequency,
193 peer_connection_cooldown: self.cfg.peer_connection_cooldown,
194 allow_private_ips: self.cfg.allow_private_ips,
195 },
196 );
197 let mut dialer_task = dialer.start(self.tracker_mailbox, spawner_mailbox);
198
199 let mut shutdown = self.context.stopped();
200
201 info!("network started");
203 select! {
204 _ = &mut shutdown => {
205 debug!("context shutdown, stopping network");
206 },
207 tracker = &mut tracker_task => {
208 debug!(?tracker, "tracker stopped, shutting down network");
209 },
210 router = &mut router_task => {
211 debug!(?router, "router stopped, shutting down network");
212 },
213 spawner = &mut spawner_task => {
214 debug!(?spawner, "spawner stopped, shutting down network");
215 },
216 listener = &mut listener_task => {
217 debug!(?listener, "listener stopped, shutting down network");
218 },
219 dialer = &mut dialer_task => {
220 debug!(?dialer, "dialer stopped, shutting down network");
221 },
222 }
223 }
224}