commonware_p2p/authenticated/lookup/
network.rs1use super::{
4 actors::{dialer, listener, router, spawner, tracker},
5 channels::{self, Channels},
6 config::Config,
7 types,
8};
9use crate::Channel;
10use commonware_cryptography::Signer;
11use commonware_macros::select;
12use commonware_runtime::{
13 spawn_cell, BufferPooler, Clock, ContextCell, Handle, Metrics, Network as RNetwork, Quota,
14 Resolver, Spawner,
15};
16use commonware_stream::encrypted::Config as StreamConfig;
17use commonware_utils::union;
18use rand_core::CryptoRngCore;
19use tracing::{debug, info};
20
21const STREAM_SUFFIX: &[u8] = b"_STREAM";
23
24pub struct Network<
26 E: Spawner + BufferPooler + Clock + CryptoRngCore + RNetwork + Metrics,
27 C: Signer,
28> {
29 context: ContextCell<E>,
30 cfg: Config<C>,
31
32 channels: Channels<C::PublicKey>,
33 tracker: tracker::Actor<E, C>,
34 tracker_mailbox: tracker::Mailbox<C::PublicKey>,
35 router: router::Actor<E, C::PublicKey>,
36 router_mailbox: router::Mailbox<C::PublicKey>,
37 listener: listener::Updates,
38}
39
40impl<
41 E: Spawner + BufferPooler + Clock + CryptoRngCore + RNetwork + Resolver + Metrics,
42 C: Signer,
43 > Network<E, C>
44{
45 pub fn new(context: E, cfg: Config<C>) -> (Self, tracker::Oracle<C::PublicKey>) {
56 let (listener_mailbox, listener) = listener::Mailbox::new();
57 let (tracker, tracker_mailbox, oracle) = tracker::Actor::new(
58 context.child("tracker"),
59 tracker::Config {
60 crypto: cfg.crypto.clone(),
61 mailbox_size: cfg.mailbox_size,
62 tracked_peer_sets: cfg.tracked_peer_sets,
63 peer_connection_cooldown: cfg.peer_connection_cooldown,
64 allow_private_ips: cfg.allow_private_ips,
65 allow_dns: cfg.allow_dns,
66 bypass_ip_check: cfg.bypass_ip_check,
67 listener: listener_mailbox,
68 block_duration: cfg.block_duration,
69 },
70 );
71 let (router, router_mailbox, messenger) = router::Actor::new(
72 context.child("router"),
73 router::Config {
74 mailbox_size: cfg.mailbox_size,
75 },
76 );
77 let channels = Channels::new(messenger, cfg.max_message_size);
78
79 (
80 Self {
81 context: ContextCell::new(context),
82 cfg,
83
84 channels,
85 tracker,
86 tracker_mailbox,
87 router,
88 router_mailbox,
89 listener,
90 },
91 oracle,
92 )
93 }
94
95 #[allow(clippy::type_complexity)]
109 pub fn register(
110 &mut self,
111 channel: Channel,
112 rate: Quota,
113 backlog: usize,
114 ) -> (
115 channels::Sender<C::PublicKey, E>,
116 channels::Receiver<C::PublicKey>,
117 ) {
118 let context = self
119 .context
120 .child("channel")
121 .with_attribute("index", channel);
122 self.channels.register(channel, rate, backlog, context)
123 }
124
125 pub fn start(mut self) -> Handle<()> {
129 spawn_cell!(self.context, self.run())
130 }
131
132 async fn run(self) {
133 let mut tracker_task = self.tracker.start();
135
136 let mut router_task = self.router.start(self.channels);
138
139 let (spawner, spawner_mailbox) = spawner::Actor::new(
141 self.context.child("spawner"),
142 spawner::Config {
143 mailbox_size: self.cfg.mailbox_size,
144 send_batch_size: self.cfg.send_batch_size,
145 ping_frequency: self.cfg.ping_frequency,
146 },
147 );
148 let mut spawner_task =
149 spawner.start(self.tracker_mailbox.clone(), self.router_mailbox.clone());
150
151 let stream_cfg = StreamConfig {
153 signing_key: self.cfg.crypto,
154 namespace: union(&self.cfg.namespace, STREAM_SUFFIX),
155 max_message_size: self
156 .cfg
157 .max_message_size
158 .saturating_add(types::MAX_PAYLOAD_DATA_OVERHEAD),
159 synchrony_bound: self.cfg.synchrony_bound,
160 max_handshake_age: self.cfg.max_handshake_age,
161 handshake_timeout: self.cfg.handshake_timeout,
162 };
163 let listener = listener::Actor::new(
164 self.context.child("listener"),
165 listener::Config {
166 address: self.cfg.listen,
167 stream_cfg: stream_cfg.clone(),
168 allow_private_ips: self.cfg.allow_private_ips,
169 bypass_ip_check: self.cfg.bypass_ip_check,
170 max_concurrent_handshakes: self.cfg.max_concurrent_handshakes,
171 allowed_handshake_rate_per_ip: self.cfg.allowed_handshake_rate_per_ip,
172 allowed_handshake_rate_per_subnet: self.cfg.allowed_handshake_rate_per_subnet,
173 },
174 self.listener,
175 );
176 let mut listener_task =
177 listener.start(self.tracker_mailbox.clone(), spawner_mailbox.clone());
178
179 let dialer = dialer::Actor::new(
181 self.context.child("dialer"),
182 dialer::Config {
183 stream_cfg,
184 dial_frequency: self.cfg.dial_frequency,
185 peer_connection_cooldown: self.cfg.peer_connection_cooldown,
186 allow_private_ips: self.cfg.allow_private_ips,
187 },
188 );
189 let mut dialer_task = dialer.start(self.tracker_mailbox, spawner_mailbox);
190
191 let mut shutdown = self.context.stopped();
192
193 info!("network started");
195 select! {
196 _ = &mut shutdown => {
197 debug!("context shutdown, stopping network");
198 },
199 tracker = &mut tracker_task => {
200 debug!(?tracker, "tracker stopped, shutting down network");
201 },
202 router = &mut router_task => {
203 debug!(?router, "router stopped, shutting down network");
204 },
205 spawner = &mut spawner_task => {
206 debug!(?spawner, "spawner stopped, shutting down network");
207 },
208 listener = &mut listener_task => {
209 debug!(?listener, "listener stopped, shutting down network");
210 },
211 dialer = &mut dialer_task => {
212 debug!(?dialer, "dialer stopped, shutting down network");
213 },
214 }
215 }
216}