commonware_p2p/authenticated/lookup/
network.rs1use super::{
4 actors::{dialer, listener, router, spawner, tracker},
5 channels::{self, Channels},
6 config::Config,
7 types,
8};
9use crate::{authenticated::Mailbox, Channel};
10use commonware_cryptography::Signer;
11use commonware_macros::select;
12use commonware_runtime::{Clock, Handle, Metrics, Network as RNetwork, Spawner};
13use commonware_stream::public_key;
14use commonware_utils::union;
15use governor::{clock::ReasonablyRealtime, Quota};
16use rand::{CryptoRng, Rng};
17use tracing::{debug, info, warn};
18
19const STREAM_SUFFIX: &[u8] = b"_STREAM";
21
22pub struct Network<
24 E: Spawner + Clock + ReasonablyRealtime + Rng + CryptoRng + RNetwork + Metrics,
25 C: Signer,
26> {
27 context: E,
28 cfg: Config<C>,
29
30 channels: Channels<C::PublicKey>,
31 tracker: tracker::Actor<E, C>,
32 tracker_mailbox: Mailbox<tracker::Message<E, C::PublicKey>>,
33 router: router::Actor<E, C::PublicKey>,
34 router_mailbox: Mailbox<router::Message<C::PublicKey>>,
35}
36
37impl<E: Spawner + Clock + ReasonablyRealtime + Rng + CryptoRng + RNetwork + Metrics, C: Signer>
38 Network<E, C>
39{
40 pub fn new(context: E, cfg: Config<C>) -> (Self, tracker::Oracle<E, C::PublicKey>) {
51 let (tracker, tracker_mailbox, oracle) = tracker::Actor::new(
52 context.with_label("tracker"),
53 tracker::Config {
54 crypto: cfg.crypto.clone(),
55 address: cfg.dialable,
56 mailbox_size: cfg.mailbox_size,
57 tracked_peer_sets: cfg.tracked_peer_sets,
58 allowed_connection_rate_per_peer: cfg.allowed_connection_rate_per_peer,
59 allow_private_ips: cfg.allow_private_ips,
60 },
61 );
62 let (router, router_mailbox, messenger) = router::Actor::new(
63 context.with_label("router"),
64 router::Config {
65 mailbox_size: cfg.mailbox_size,
66 },
67 );
68 let channels = Channels::new(messenger, cfg.max_message_size);
69
70 (
71 Self {
72 context,
73 cfg,
74
75 channels,
76 tracker,
77 tracker_mailbox,
78 router,
79 router_mailbox,
80 },
81 oracle,
82 )
83 }
84
85 pub fn register(
99 &mut self,
100 channel: Channel,
101 rate: Quota,
102 backlog: usize,
103 ) -> (
104 channels::Sender<C::PublicKey>,
105 channels::Receiver<C::PublicKey>,
106 ) {
107 self.channels.register(channel, rate, backlog)
108 }
109
110 pub fn start(mut self) -> Handle<()> {
114 self.context.spawn_ref()(self.run())
115 }
116
117 async fn run(self) {
118 let mut tracker_task = self.tracker.start();
120
121 let mut router_task = self.router.start(self.channels);
123
124 let (spawner, spawner_mailbox) = spawner::Actor::new(
126 self.context.with_label("spawner"),
127 spawner::Config {
128 mailbox_size: self.cfg.mailbox_size,
129 ping_frequency: self.cfg.ping_frequency,
130 allowed_ping_rate: self.cfg.allowed_ping_rate,
131 },
132 );
133 let mut spawner_task =
134 spawner.start(self.tracker_mailbox.clone(), self.router_mailbox.clone());
135
136 let stream_cfg = public_key::Config {
138 crypto: self.cfg.crypto,
139 namespace: union(&self.cfg.namespace, STREAM_SUFFIX),
140 max_message_size: self.cfg.max_message_size + types::MAX_PAYLOAD_DATA_OVERHEAD,
141 synchrony_bound: self.cfg.synchrony_bound,
142 max_handshake_age: self.cfg.max_handshake_age,
143 handshake_timeout: self.cfg.handshake_timeout,
144 };
145 let listener = listener::Actor::new(
146 self.context.with_label("listener"),
147 listener::Config {
148 address: self.cfg.listen,
149 stream_cfg: stream_cfg.clone(),
150 allowed_incoming_connection_rate: self.cfg.allowed_incoming_connection_rate,
151 },
152 );
153 let mut listener_task =
154 listener.start(self.tracker_mailbox.clone(), spawner_mailbox.clone());
155
156 let dialer = dialer::Actor::new(
158 self.context.with_label("dialer"),
159 dialer::Config {
160 stream_cfg,
161 dial_frequency: self.cfg.dial_frequency,
162 query_frequency: self.cfg.query_frequency,
163 },
164 );
165 let mut dialer_task = dialer.start(self.tracker_mailbox, spawner_mailbox);
166
167 info!("network started");
169 let err = select! {
170 tracker = &mut tracker_task => {
171 debug!("tracker exited");
172 tracker
173 },
174 router = &mut router_task => {
175 debug!("router exited");
176 router
177 },
178 spawner = &mut spawner_task => {
179 debug!("spawner exited");
180 spawner
181 },
182 listener = &mut listener_task => {
183 debug!("listener exited");
184 listener
185 },
186 dialer = &mut dialer_task => {
187 debug!("dialer exited");
188 dialer
189 },
190 }
191 .unwrap_err();
192
193 warn!(error=?err, "network shutdown");
195 }
196}