Skip to main content

commonware_p2p/authenticated/lookup/
network.rs

1//! Implementation of an `authenticated` network.
2
3use super::{
4    actors::{dialer, listener, router, spawner, tracker},
5    channels::{self, Channels},
6    config::Config,
7    types,
8};
9use crate::{
10    authenticated::{mailbox::UnboundedMailbox, Mailbox},
11    Channel,
12};
13use commonware_cryptography::Signer;
14use commonware_macros::select;
15use commonware_runtime::{
16    spawn_cell, BufferPooler, Clock, ContextCell, Handle, Metrics, Network as RNetwork, Quota,
17    Resolver, Spawner,
18};
19use commonware_stream::encrypted::Config as StreamConfig;
20use commonware_utils::{channel::mpsc, union};
21use rand_core::CryptoRngCore;
22use std::{collections::HashSet, net::IpAddr};
23use tracing::{debug, info};
24
25/// Unique suffix for all messages signed in a stream.
26const STREAM_SUFFIX: &[u8] = b"_STREAM";
27
28/// Implementation of an `authenticated` network.
29pub struct Network<
30    E: Spawner + BufferPooler + Clock + CryptoRngCore + RNetwork + Metrics,
31    C: Signer,
32> {
33    context: ContextCell<E>,
34    cfg: Config<C>,
35
36    channels: Channels<C::PublicKey>,
37    tracker: tracker::Actor<E, C>,
38    tracker_mailbox: UnboundedMailbox<tracker::Message<C::PublicKey>>,
39    router: router::Actor<E, C::PublicKey>,
40    router_mailbox: Mailbox<router::Message<C::PublicKey>>,
41    listener: mpsc::Receiver<HashSet<IpAddr>>,
42}
43
44impl<
45        E: Spawner + BufferPooler + Clock + CryptoRngCore + RNetwork + Resolver + Metrics,
46        C: Signer,
47    > Network<E, C>
48{
49    /// Create a new instance of an `authenticated` network.
50    ///
51    /// # Parameters
52    ///
53    /// * `cfg` - Configuration for the network.
54    ///
55    /// # Returns
56    ///
57    /// * A tuple containing the network instance and the oracle that
58    ///   can be used by a developer to configure which peers are authorized.
59    pub fn new(context: E, cfg: Config<C>) -> (Self, tracker::Oracle<C::PublicKey>) {
60        let (listener_mailbox, listener) = Mailbox::<HashSet<IpAddr>>::new(cfg.mailbox_size);
61        let (tracker, tracker_mailbox, oracle) = tracker::Actor::new(
62            context.with_label("tracker"),
63            tracker::Config {
64                crypto: cfg.crypto.clone(),
65                tracked_peer_sets: cfg.tracked_peer_sets,
66                allowed_connection_rate_per_peer: cfg.allowed_connection_rate_per_peer,
67                allow_private_ips: cfg.allow_private_ips,
68                allow_dns: cfg.allow_dns,
69                bypass_ip_check: cfg.bypass_ip_check,
70                listener: listener_mailbox,
71                block_duration: cfg.block_duration,
72            },
73        );
74        let (router, router_mailbox, messenger) = router::Actor::new(
75            context.with_label("router"),
76            router::Config {
77                mailbox_size: cfg.mailbox_size,
78            },
79        );
80        let channels = Channels::new(messenger, cfg.max_message_size);
81
82        (
83            Self {
84                context: ContextCell::new(context),
85                cfg,
86
87                channels,
88                tracker,
89                tracker_mailbox,
90                router,
91                router_mailbox,
92                listener,
93            },
94            oracle,
95        )
96    }
97
98    /// Register a new channel over the network.
99    ///
100    /// # Parameters
101    ///
102    /// * `channel` - Unique identifier for the channel.
103    /// * `rate` - Rate at which messages can be received over the channel.
104    /// * `backlog` - Maximum number of messages that can be queued on the channel before blocking.
105    ///
106    /// # Returns
107    ///
108    /// * A tuple containing the sender and receiver for the channel (how to communicate
109    ///   with external peers on the network). It is safe to close either the sender or receiver
110    ///   without impacting the ability to process messages on other channels.
111    #[allow(clippy::type_complexity)]
112    pub fn register(
113        &mut self,
114        channel: Channel,
115        rate: Quota,
116        backlog: usize,
117    ) -> (
118        channels::Sender<C::PublicKey, E>,
119        channels::Receiver<C::PublicKey>,
120    ) {
121        let clock = self
122            .context
123            .with_label("channel")
124            .with_attribute("idx", channel)
125            .take();
126        self.channels.register(channel, rate, backlog, clock)
127    }
128
129    /// Starts the network.
130    ///
131    /// After the network is started, it is not possible to add more channels.
132    pub fn start(mut self) -> Handle<()> {
133        spawn_cell!(self.context, self.run().await)
134    }
135
136    async fn run(self) {
137        // Start tracker
138        let mut tracker_task = self.tracker.start();
139
140        // Start router
141        let mut router_task = self.router.start(self.channels);
142
143        // Start spawner
144        let (spawner, spawner_mailbox) = spawner::Actor::new(
145            self.context.with_label("spawner"),
146            spawner::Config {
147                mailbox_size: self.cfg.mailbox_size,
148                ping_frequency: self.cfg.ping_frequency,
149            },
150        );
151        let mut spawner_task =
152            spawner.start(self.tracker_mailbox.clone(), self.router_mailbox.clone());
153
154        // Start listener
155        let stream_cfg = StreamConfig {
156            signing_key: self.cfg.crypto,
157            namespace: union(&self.cfg.namespace, STREAM_SUFFIX),
158            max_message_size: self
159                .cfg
160                .max_message_size
161                .saturating_add(types::MAX_PAYLOAD_DATA_OVERHEAD),
162            synchrony_bound: self.cfg.synchrony_bound,
163            max_handshake_age: self.cfg.max_handshake_age,
164            handshake_timeout: self.cfg.handshake_timeout,
165        };
166        let listener = listener::Actor::new(
167            self.context.with_label("listener"),
168            listener::Config {
169                address: self.cfg.listen,
170                stream_cfg: stream_cfg.clone(),
171                allow_private_ips: self.cfg.allow_private_ips,
172                bypass_ip_check: self.cfg.bypass_ip_check,
173                max_concurrent_handshakes: self.cfg.max_concurrent_handshakes,
174                allowed_handshake_rate_per_ip: self.cfg.allowed_handshake_rate_per_ip,
175                allowed_handshake_rate_per_subnet: self.cfg.allowed_handshake_rate_per_subnet,
176            },
177            self.listener,
178        );
179        let mut listener_task =
180            listener.start(self.tracker_mailbox.clone(), spawner_mailbox.clone());
181
182        // Start dialer
183        let dialer = dialer::Actor::new(
184            self.context.with_label("dialer"),
185            dialer::Config {
186                stream_cfg,
187                dial_frequency: self.cfg.dial_frequency,
188                query_frequency: self.cfg.query_frequency,
189                allow_private_ips: self.cfg.allow_private_ips,
190            },
191        );
192        let mut dialer_task = dialer.start(self.tracker_mailbox, spawner_mailbox);
193
194        let mut shutdown = self.context.stopped();
195
196        // Wait for first actor to exit
197        info!("network started");
198        select! {
199            _ = &mut shutdown => {
200                debug!("context shutdown, stopping network");
201            },
202            tracker = &mut tracker_task => {
203                panic!("tracker exited unexpectedly: {tracker:?}");
204            },
205            router = &mut router_task => {
206                panic!("router exited unexpectedly: {router:?}");
207            },
208            spawner = &mut spawner_task => {
209                panic!("spawner exited unexpectedly: {spawner:?}");
210            },
211            listener = &mut listener_task => {
212                panic!("listener exited unexpectedly: {listener:?}");
213            },
214            dialer = &mut dialer_task => {
215                panic!("dialer exited unexpectedly: {dialer:?}");
216            },
217        }
218    }
219}