commonware_p2p/authenticated/
network.rs1use super::{
4 actors::{dialer, listener, router, spawner, tracker},
5 channels::{self, Channels},
6 config::Config,
7 types,
8};
9use crate::Channel;
10use commonware_cryptography::Scheme;
11use commonware_macros::select;
12use commonware_runtime::{Clock, Handle, Metrics, Network as RNetwork, Spawner};
13use commonware_stream::public_key;
14use commonware_utils::union;
15use governor::{clock::ReasonablyRealtime, Quota};
16use rand::{CryptoRng, Rng};
17use tracing::{debug, info, warn};
18
19const TRACKER_SUFFIX: &[u8] = b"_TRACKER";
21
22const STREAM_SUFFIX: &[u8] = b"_STREAM";
24
25pub struct Network<
27 E: Spawner + Clock + ReasonablyRealtime + Rng + CryptoRng + RNetwork + Metrics,
28 C: Scheme,
29> {
30 context: E,
31 cfg: Config<C>,
32
33 channels: Channels<C::PublicKey>,
34 tracker: tracker::Actor<E, C>,
35 tracker_mailbox: tracker::Mailbox<E, C>,
36 router: router::Actor<E, C::PublicKey>,
37 router_mailbox: router::Mailbox<C::PublicKey>,
38}
39
40impl<E: Spawner + Clock + ReasonablyRealtime + Rng + CryptoRng + RNetwork + Metrics, C: Scheme>
41 Network<E, C>
42{
43 pub fn new(context: E, cfg: Config<C>) -> (Self, tracker::Oracle<E, C>) {
54 let (tracker, tracker_mailbox, oracle) = tracker::Actor::new(
55 context.with_label("tracker"),
56 tracker::Config {
57 crypto: cfg.crypto.clone(),
58 namespace: union(&cfg.namespace, TRACKER_SUFFIX),
59 address: cfg.dialable,
60 bootstrappers: cfg.bootstrappers.clone(),
61 allow_private_ips: cfg.allow_private_ips,
62 mailbox_size: cfg.mailbox_size,
63 synchrony_bound: cfg.synchrony_bound,
64 tracked_peer_sets: cfg.tracked_peer_sets,
65 allowed_connection_rate_per_peer: cfg.allowed_connection_rate_per_peer,
66 peer_gossip_max_count: cfg.peer_gossip_max_count,
67 max_peer_set_size: cfg.max_peer_set_size,
68 dial_fail_limit: cfg.dial_fail_limit,
69 },
70 );
71 let (router, router_mailbox, messenger) = router::Actor::new(
72 context.with_label("router"),
73 router::Config {
74 mailbox_size: cfg.mailbox_size,
75 },
76 );
77 let channels = Channels::new(messenger, cfg.max_message_size);
78
79 (
80 Self {
81 context,
82 cfg,
83
84 channels,
85 tracker,
86 tracker_mailbox,
87 router,
88 router_mailbox,
89 },
90 oracle,
91 )
92 }
93
94 pub fn register(
109 &mut self,
110 channel: Channel,
111 rate: Quota,
112 backlog: usize,
113 compression: Option<i32>,
114 ) -> (
115 channels::Sender<C::PublicKey>,
116 channels::Receiver<C::PublicKey>,
117 ) {
118 self.channels.register(channel, rate, backlog, compression)
119 }
120
121 pub fn start(mut self) -> Handle<()> {
125 self.context.spawn_ref()(self.run())
126 }
127
128 async fn run(self) {
129 let mut tracker_task = self.tracker.start();
131
132 let mut router_task = self.router.start(self.channels);
134
135 let (spawner, spawner_mailbox) = spawner::Actor::new(
137 self.context.with_label("spawner"),
138 spawner::Config {
139 mailbox_size: self.cfg.mailbox_size,
140 gossip_bit_vec_frequency: self.cfg.gossip_bit_vec_frequency,
141 allowed_bit_vec_rate: self.cfg.allowed_bit_vec_rate,
142 max_peer_set_size: self.cfg.max_peer_set_size,
143 allowed_peers_rate: self.cfg.allowed_peers_rate,
144 peer_gossip_max_count: self.cfg.peer_gossip_max_count,
145 },
146 );
147 let mut spawner_task =
148 spawner.start(self.tracker_mailbox.clone(), self.router_mailbox.clone());
149
150 let stream_cfg = public_key::Config {
152 crypto: self.cfg.crypto,
153 namespace: union(&self.cfg.namespace, STREAM_SUFFIX),
154 max_message_size: self.cfg.max_message_size + types::MAX_PAYLOAD_DATA_OVERHEAD,
155 synchrony_bound: self.cfg.synchrony_bound,
156 max_handshake_age: self.cfg.max_handshake_age,
157 handshake_timeout: self.cfg.handshake_timeout,
158 };
159 let listener = listener::Actor::new(
160 self.context.with_label("listener"),
161 listener::Config {
162 address: self.cfg.listen,
163 stream_cfg: stream_cfg.clone(),
164 allowed_incoming_connection_rate: self.cfg.allowed_incoming_connection_rate,
165 },
166 );
167 let mut listener_task =
168 listener.start(self.tracker_mailbox.clone(), spawner_mailbox.clone());
169
170 let dialer = dialer::Actor::new(
172 self.context.with_label("dialer"),
173 dialer::Config {
174 stream_cfg,
175 dial_frequency: self.cfg.dial_frequency,
176 query_frequency: self.cfg.query_frequency,
177 },
178 );
179 let mut dialer_task = dialer.start(self.tracker_mailbox, spawner_mailbox);
180
181 info!("network started");
183 let err = select! {
184 tracker = &mut tracker_task => {
185 debug!("tracker exited");
186 tracker
187 },
188 router = &mut router_task => {
189 debug!("router exited");
190 router
191 },
192 spawner = &mut spawner_task => {
193 debug!("spawner exited");
194 spawner
195 },
196 listener = &mut listener_task => {
197 debug!("listener exited");
198 listener
199 },
200 dialer = &mut dialer_task => {
201 debug!("dialer exited");
202 dialer
203 },
204 }
205 .unwrap_err();
206
207 tracker_task.abort();
209 router_task.abort();
210 spawner_task.abort();
211 listener_task.abort();
212 dialer_task.abort();
213
214 warn!(error=?err, "network shutdown")
216 }
217}