commonware_p2p/authenticated/lookup/
network.rs1use super::{
4 actors::{dialer, listener, router, spawner, tracker},
5 channels::{self, Channels},
6 config::Config,
7 types,
8};
9use crate::{
10 authenticated::{mailbox::UnboundedMailbox, Mailbox},
11 Channel,
12};
13use commonware_cryptography::Signer;
14use commonware_macros::select;
15use commonware_runtime::{
16 spawn_cell, Clock, ContextCell, Handle, Metrics, Network as RNetwork, Quota, Resolver, Spawner,
17};
18use commonware_stream::Config as StreamConfig;
19use commonware_utils::union;
20use futures::channel::mpsc;
21use rand_core::CryptoRngCore;
22use std::{collections::HashSet, net::IpAddr};
23use tracing::{debug, info};
24
25const STREAM_SUFFIX: &[u8] = b"_STREAM";
27
28pub struct Network<E: Spawner + Clock + CryptoRngCore + RNetwork + Metrics, C: Signer> {
30 context: ContextCell<E>,
31 cfg: Config<C>,
32
33 channels: Channels<C::PublicKey>,
34 tracker: tracker::Actor<E, C>,
35 tracker_mailbox: UnboundedMailbox<tracker::Message<C::PublicKey>>,
36 router: router::Actor<E, C::PublicKey>,
37 router_mailbox: Mailbox<router::Message<C::PublicKey>>,
38 listener: mpsc::Receiver<HashSet<IpAddr>>,
39}
40
41impl<E: Spawner + Clock + CryptoRngCore + RNetwork + Resolver + Metrics, C: Signer> Network<E, C> {
42 pub fn new(context: E, cfg: Config<C>) -> (Self, tracker::Oracle<C::PublicKey>) {
53 let (listener_mailbox, listener) = Mailbox::<HashSet<IpAddr>>::new(cfg.mailbox_size);
54 let (tracker, tracker_mailbox, oracle) = tracker::Actor::new(
55 context.with_label("tracker"),
56 tracker::Config {
57 crypto: cfg.crypto.clone(),
58 tracked_peer_sets: cfg.tracked_peer_sets,
59 allowed_connection_rate_per_peer: cfg.allowed_connection_rate_per_peer,
60 allow_private_ips: cfg.allow_private_ips,
61 allow_dns: cfg.allow_dns,
62 bypass_ip_check: cfg.bypass_ip_check,
63 listener: listener_mailbox,
64 block_duration: cfg.block_duration,
65 },
66 );
67 let (router, router_mailbox, messenger) = router::Actor::new(
68 context.with_label("router"),
69 router::Config {
70 mailbox_size: cfg.mailbox_size,
71 },
72 );
73 let channels = Channels::new(messenger, cfg.max_message_size);
74
75 (
76 Self {
77 context: ContextCell::new(context),
78 cfg,
79
80 channels,
81 tracker,
82 tracker_mailbox,
83 router,
84 router_mailbox,
85 listener,
86 },
87 oracle,
88 )
89 }
90
91 #[allow(clippy::type_complexity)]
105 pub fn register(
106 &mut self,
107 channel: Channel,
108 rate: Quota,
109 backlog: usize,
110 ) -> (
111 channels::Sender<C::PublicKey, E>,
112 channels::Receiver<C::PublicKey>,
113 ) {
114 let clock = self
115 .context
116 .with_label(&format!("channel_{channel}"))
117 .take();
118 self.channels.register(channel, rate, backlog, clock)
119 }
120
121 pub fn start(mut self) -> Handle<()> {
125 spawn_cell!(self.context, self.run().await)
126 }
127
128 async fn run(self) {
129 let mut tracker_task = self.tracker.start();
131
132 let mut router_task = self.router.start(self.channels);
134
135 let (spawner, spawner_mailbox) = spawner::Actor::new(
137 self.context.with_label("spawner"),
138 spawner::Config {
139 mailbox_size: self.cfg.mailbox_size,
140 ping_frequency: self.cfg.ping_frequency,
141 },
142 );
143 let mut spawner_task =
144 spawner.start(self.tracker_mailbox.clone(), self.router_mailbox.clone());
145
146 let stream_cfg = StreamConfig {
148 signing_key: self.cfg.crypto,
149 namespace: union(&self.cfg.namespace, STREAM_SUFFIX),
150 max_message_size: self
151 .cfg
152 .max_message_size
153 .saturating_add(types::MAX_PAYLOAD_DATA_OVERHEAD),
154 synchrony_bound: self.cfg.synchrony_bound,
155 max_handshake_age: self.cfg.max_handshake_age,
156 handshake_timeout: self.cfg.handshake_timeout,
157 };
158 let listener = listener::Actor::new(
159 self.context.with_label("listener"),
160 listener::Config {
161 address: self.cfg.listen,
162 stream_cfg: stream_cfg.clone(),
163 allow_private_ips: self.cfg.allow_private_ips,
164 bypass_ip_check: self.cfg.bypass_ip_check,
165 max_concurrent_handshakes: self.cfg.max_concurrent_handshakes,
166 allowed_handshake_rate_per_ip: self.cfg.allowed_handshake_rate_per_ip,
167 allowed_handshake_rate_per_subnet: self.cfg.allowed_handshake_rate_per_subnet,
168 },
169 self.listener,
170 );
171 let mut listener_task =
172 listener.start(self.tracker_mailbox.clone(), spawner_mailbox.clone());
173
174 let dialer = dialer::Actor::new(
176 self.context.with_label("dialer"),
177 dialer::Config {
178 stream_cfg,
179 dial_frequency: self.cfg.dial_frequency,
180 query_frequency: self.cfg.query_frequency,
181 allow_private_ips: self.cfg.allow_private_ips,
182 },
183 );
184 let mut dialer_task = dialer.start(self.tracker_mailbox, spawner_mailbox);
185
186 let mut shutdown = self.context.stopped();
187
188 info!("network started");
190 select! {
191 _ = &mut shutdown => {
192 debug!("context shutdown, stopping network");
193 },
194 tracker = &mut tracker_task => {
195 panic!("tracker exited unexpectedly: {tracker:?}");
196 },
197 router = &mut router_task => {
198 panic!("router exited unexpectedly: {router:?}");
199 },
200 spawner = &mut spawner_task => {
201 panic!("spawner exited unexpectedly: {spawner:?}");
202 },
203 listener = &mut listener_task => {
204 panic!("listener exited unexpectedly: {listener:?}");
205 },
206 dialer = &mut dialer_task => {
207 panic!("dialer exited unexpectedly: {dialer:?}");
208 },
209 }
210 }
211}