commonware_p2p/authenticated/lookup/
network.rs1use super::{
4 actors::{dialer, listener, router, spawner, tracker},
5 channels::{self, Channels},
6 config::Config,
7 types,
8};
9use crate::{
10 authenticated::{mailbox::UnboundedMailbox, Mailbox},
11 Channel,
12};
13use commonware_cryptography::Signer;
14use commonware_macros::select;
15use commonware_runtime::{
16 spawn_cell, BufferPooler, Clock, ContextCell, Handle, Metrics, Network as RNetwork, Quota,
17 Resolver, Spawner,
18};
19use commonware_stream::encrypted::Config as StreamConfig;
20use commonware_utils::{channel::mpsc, union};
21use rand_core::CryptoRngCore;
22use std::{collections::HashSet, net::IpAddr};
23use tracing::{debug, info};
24
25const STREAM_SUFFIX: &[u8] = b"_STREAM";
27
28pub struct Network<
30 E: Spawner + BufferPooler + Clock + CryptoRngCore + RNetwork + Metrics,
31 C: Signer,
32> {
33 context: ContextCell<E>,
34 cfg: Config<C>,
35
36 channels: Channels<C::PublicKey>,
37 tracker: tracker::Actor<E, C>,
38 tracker_mailbox: UnboundedMailbox<tracker::Message<C::PublicKey>>,
39 router: router::Actor<E, C::PublicKey>,
40 router_mailbox: Mailbox<router::Message<C::PublicKey>>,
41 listener: mpsc::Receiver<HashSet<IpAddr>>,
42}
43
44impl<
45 E: Spawner + BufferPooler + Clock + CryptoRngCore + RNetwork + Resolver + Metrics,
46 C: Signer,
47 > Network<E, C>
48{
49 pub fn new(context: E, cfg: Config<C>) -> (Self, tracker::Oracle<C::PublicKey>) {
60 let (listener_mailbox, listener) = Mailbox::<HashSet<IpAddr>>::new(cfg.mailbox_size);
61 let (tracker, tracker_mailbox, oracle) = tracker::Actor::new(
62 context.with_label("tracker"),
63 tracker::Config {
64 crypto: cfg.crypto.clone(),
65 tracked_peer_sets: cfg.tracked_peer_sets,
66 allowed_connection_rate_per_peer: cfg.allowed_connection_rate_per_peer,
67 allow_private_ips: cfg.allow_private_ips,
68 allow_dns: cfg.allow_dns,
69 bypass_ip_check: cfg.bypass_ip_check,
70 listener: listener_mailbox,
71 block_duration: cfg.block_duration,
72 },
73 );
74 let (router, router_mailbox, messenger) = router::Actor::new(
75 context.with_label("router"),
76 router::Config {
77 mailbox_size: cfg.mailbox_size,
78 },
79 );
80 let channels = Channels::new(messenger, cfg.max_message_size);
81
82 (
83 Self {
84 context: ContextCell::new(context),
85 cfg,
86
87 channels,
88 tracker,
89 tracker_mailbox,
90 router,
91 router_mailbox,
92 listener,
93 },
94 oracle,
95 )
96 }
97
98 #[allow(clippy::type_complexity)]
112 pub fn register(
113 &mut self,
114 channel: Channel,
115 rate: Quota,
116 backlog: usize,
117 ) -> (
118 channels::Sender<C::PublicKey, E>,
119 channels::Receiver<C::PublicKey>,
120 ) {
121 let clock = self
122 .context
123 .with_label("channel")
124 .with_attribute("idx", channel)
125 .take();
126 self.channels.register(channel, rate, backlog, clock)
127 }
128
129 pub fn start(mut self) -> Handle<()> {
133 spawn_cell!(self.context, self.run().await)
134 }
135
136 async fn run(self) {
137 let mut tracker_task = self.tracker.start();
139
140 let mut router_task = self.router.start(self.channels);
142
143 let (spawner, spawner_mailbox) = spawner::Actor::new(
145 self.context.with_label("spawner"),
146 spawner::Config {
147 mailbox_size: self.cfg.mailbox_size,
148 ping_frequency: self.cfg.ping_frequency,
149 },
150 );
151 let mut spawner_task =
152 spawner.start(self.tracker_mailbox.clone(), self.router_mailbox.clone());
153
154 let stream_cfg = StreamConfig {
156 signing_key: self.cfg.crypto,
157 namespace: union(&self.cfg.namespace, STREAM_SUFFIX),
158 max_message_size: self
159 .cfg
160 .max_message_size
161 .saturating_add(types::MAX_PAYLOAD_DATA_OVERHEAD),
162 synchrony_bound: self.cfg.synchrony_bound,
163 max_handshake_age: self.cfg.max_handshake_age,
164 handshake_timeout: self.cfg.handshake_timeout,
165 };
166 let listener = listener::Actor::new(
167 self.context.with_label("listener"),
168 listener::Config {
169 address: self.cfg.listen,
170 stream_cfg: stream_cfg.clone(),
171 allow_private_ips: self.cfg.allow_private_ips,
172 bypass_ip_check: self.cfg.bypass_ip_check,
173 max_concurrent_handshakes: self.cfg.max_concurrent_handshakes,
174 allowed_handshake_rate_per_ip: self.cfg.allowed_handshake_rate_per_ip,
175 allowed_handshake_rate_per_subnet: self.cfg.allowed_handshake_rate_per_subnet,
176 },
177 self.listener,
178 );
179 let mut listener_task =
180 listener.start(self.tracker_mailbox.clone(), spawner_mailbox.clone());
181
182 let dialer = dialer::Actor::new(
184 self.context.with_label("dialer"),
185 dialer::Config {
186 stream_cfg,
187 dial_frequency: self.cfg.dial_frequency,
188 query_frequency: self.cfg.query_frequency,
189 allow_private_ips: self.cfg.allow_private_ips,
190 },
191 );
192 let mut dialer_task = dialer.start(self.tracker_mailbox, spawner_mailbox);
193
194 let mut shutdown = self.context.stopped();
195
196 info!("network started");
198 select! {
199 _ = &mut shutdown => {
200 debug!("context shutdown, stopping network");
201 },
202 tracker = &mut tracker_task => {
203 panic!("tracker exited unexpectedly: {tracker:?}");
204 },
205 router = &mut router_task => {
206 panic!("router exited unexpectedly: {router:?}");
207 },
208 spawner = &mut spawner_task => {
209 panic!("spawner exited unexpectedly: {spawner:?}");
210 },
211 listener = &mut listener_task => {
212 panic!("listener exited unexpectedly: {listener:?}");
213 },
214 dialer = &mut dialer_task => {
215 panic!("dialer exited unexpectedly: {dialer:?}");
216 },
217 }
218 }
219}