commonware_p2p/authenticated/
network.rs1use super::{
4 actors::{dialer, listener, router, spawner, tracker},
5 channels::{self, Channels},
6 config::Config,
7 types,
8};
9use crate::Channel;
10use commonware_cryptography::Scheme;
11use commonware_macros::select;
12use commonware_runtime::{Clock, Handle, Metrics, Network as RNetwork, Spawner};
13use commonware_stream::public_key;
14use commonware_utils::union;
15use governor::{clock::ReasonablyRealtime, Quota};
16use rand::{CryptoRng, Rng};
17use tracing::{debug, info, warn};
18
19const TRACKER_SUFFIX: &[u8] = b"_TRACKER";
21
22const STREAM_SUFFIX: &[u8] = b"_STREAM";
24
25pub struct Network<
27 E: Spawner + Clock + ReasonablyRealtime + Rng + CryptoRng + RNetwork + Metrics,
28 C: Scheme,
29> {
30 context: E,
31 cfg: Config<C>,
32
33 channels: Channels<C::PublicKey>,
34 tracker: tracker::Actor<E, C>,
35 tracker_mailbox: tracker::Mailbox<E, C>,
36 router: router::Actor<E, C::PublicKey>,
37 router_mailbox: router::Mailbox<C::PublicKey>,
38}
39
40impl<E: Spawner + Clock + ReasonablyRealtime + Rng + CryptoRng + RNetwork + Metrics, C: Scheme>
41 Network<E, C>
42{
43 pub fn new(context: E, cfg: Config<C>) -> (Self, tracker::Oracle<E, C>) {
54 let (tracker, tracker_mailbox, oracle) = tracker::Actor::new(
55 context.with_label("tracker"),
56 tracker::Config {
57 crypto: cfg.crypto.clone(),
58 namespace: union(&cfg.namespace, TRACKER_SUFFIX),
59 address: cfg.dialable,
60 bootstrappers: cfg.bootstrappers.clone(),
61 allow_private_ips: cfg.allow_private_ips,
62 mailbox_size: cfg.mailbox_size,
63 synchrony_bound: cfg.synchrony_bound,
64 tracked_peer_sets: cfg.tracked_peer_sets,
65 allowed_connection_rate_per_peer: cfg.allowed_connection_rate_per_peer,
66 peer_gossip_max_count: cfg.peer_gossip_max_count,
67 max_peer_set_size: cfg.max_peer_set_size,
68 },
69 );
70 let (router, router_mailbox, messenger) = router::Actor::new(
71 context.with_label("router"),
72 router::Config {
73 mailbox_size: cfg.mailbox_size,
74 },
75 );
76 let channels = Channels::new(messenger, cfg.max_message_size);
77
78 (
79 Self {
80 context,
81 cfg,
82
83 channels,
84 tracker,
85 tracker_mailbox,
86 router,
87 router_mailbox,
88 },
89 oracle,
90 )
91 }
92
93 pub fn register(
108 &mut self,
109 channel: Channel,
110 rate: Quota,
111 backlog: usize,
112 compression: Option<i32>,
113 ) -> (
114 channels::Sender<C::PublicKey>,
115 channels::Receiver<C::PublicKey>,
116 ) {
117 self.channels.register(channel, rate, backlog, compression)
118 }
119
120 pub fn start(mut self) -> Handle<()> {
124 self.context.spawn_ref()(self.run())
125 }
126
127 async fn run(self) {
128 let mut tracker_task = self.tracker.start();
130
131 let mut router_task = self.router.start(self.channels);
133
134 let (spawner, spawner_mailbox) = spawner::Actor::new(
136 self.context.with_label("spawner"),
137 spawner::Config {
138 mailbox_size: self.cfg.mailbox_size,
139 gossip_bit_vec_frequency: self.cfg.gossip_bit_vec_frequency,
140 allowed_bit_vec_rate: self.cfg.allowed_bit_vec_rate,
141 max_peer_set_size: self.cfg.max_peer_set_size,
142 allowed_peers_rate: self.cfg.allowed_peers_rate,
143 peer_gossip_max_count: self.cfg.peer_gossip_max_count,
144 },
145 );
146 let mut spawner_task =
147 spawner.start(self.tracker_mailbox.clone(), self.router_mailbox.clone());
148
149 let stream_cfg = public_key::Config {
151 crypto: self.cfg.crypto,
152 namespace: union(&self.cfg.namespace, STREAM_SUFFIX),
153 max_message_size: self.cfg.max_message_size + types::MAX_PAYLOAD_DATA_OVERHEAD,
154 synchrony_bound: self.cfg.synchrony_bound,
155 max_handshake_age: self.cfg.max_handshake_age,
156 handshake_timeout: self.cfg.handshake_timeout,
157 };
158 let listener = listener::Actor::new(
159 self.context.with_label("listener"),
160 listener::Config {
161 address: self.cfg.listen,
162 stream_cfg: stream_cfg.clone(),
163 allowed_incoming_connection_rate: self.cfg.allowed_incoming_connection_rate,
164 },
165 );
166 let mut listener_task =
167 listener.start(self.tracker_mailbox.clone(), spawner_mailbox.clone());
168
169 let dialer = dialer::Actor::new(
171 self.context.with_label("dialer"),
172 dialer::Config {
173 stream_cfg,
174 dial_frequency: self.cfg.dial_frequency,
175 dial_rate: self.cfg.dial_rate,
176 },
177 );
178 let mut dialer_task = dialer.start(self.tracker_mailbox, spawner_mailbox);
179
180 info!("network started");
182 let err = select! {
183 tracker = &mut tracker_task => {
184 debug!("tracker exited");
185 tracker
186 },
187 router = &mut router_task => {
188 debug!("router exited");
189 router
190 },
191 spawner = &mut spawner_task => {
192 debug!("spawner exited");
193 spawner
194 },
195 listener = &mut listener_task => {
196 debug!("listener exited");
197 listener
198 },
199 dialer = &mut dialer_task => {
200 debug!("dialer exited");
201 dialer
202 },
203 }
204 .unwrap_err();
205
206 tracker_task.abort();
208 router_task.abort();
209 spawner_task.abort();
210 listener_task.abort();
211 dialer_task.abort();
212
213 warn!(error=?err, "network shutdown")
215 }
216}