commonware_p2p/authenticated/discovery/
network.rs

1//! Implementation of an `authenticated` network.
2
3use super::{
4    actors::{dialer, listener, router, spawner, tracker},
5    channels::{self, Channels},
6    config::Config,
7    types,
8};
9use crate::{
10    authenticated::{discovery::types::InfoVerifier, mailbox::UnboundedMailbox, Mailbox},
11    Channel,
12};
13use commonware_cryptography::Signer;
14use commonware_macros::select;
15use commonware_runtime::{
16    spawn_cell, Clock, ContextCell, Handle, Metrics, Network as RNetwork, Quota, Resolver, Spawner,
17};
18use commonware_stream::Config as StreamConfig;
19use commonware_utils::union;
20use rand_core::CryptoRngCore;
21use tracing::{debug, info};
22
23/// Unique suffix for all messages signed by the tracker.
24const TRACKER_SUFFIX: &[u8] = b"_TRACKER";
25
26/// Unique suffix for all messages signed in a stream.
27const STREAM_SUFFIX: &[u8] = b"_STREAM";
28
29/// Implementation of an `authenticated` network.
30pub struct Network<E: Spawner + Clock + CryptoRngCore + RNetwork + Resolver + Metrics, C: Signer> {
31    context: ContextCell<E>,
32    cfg: Config<C>,
33
34    channels: Channels<C::PublicKey>,
35    tracker: tracker::Actor<E, C>,
36    tracker_mailbox: UnboundedMailbox<tracker::Message<C::PublicKey>>,
37    router: router::Actor<E, C::PublicKey>,
38    router_mailbox: Mailbox<router::Message<C::PublicKey>>,
39    info_verifier: InfoVerifier<C::PublicKey>,
40}
41
42impl<E: Spawner + Clock + CryptoRngCore + RNetwork + Resolver + Metrics, C: Signer> Network<E, C> {
43    /// Create a new instance of an `authenticated` network.
44    ///
45    /// # Parameters
46    ///
47    /// * `cfg` - Configuration for the network.
48    ///
49    /// # Returns
50    ///
51    /// * A tuple containing the network instance and the oracle that
52    ///   can be used by a developer to configure which peers are authorized.
53    pub fn new(context: E, cfg: Config<C>) -> (Self, tracker::Oracle<C::PublicKey>) {
54        let (tracker, tracker_mailbox, oracle, info_verifier) = tracker::Actor::new(
55            context.with_label("tracker"),
56            tracker::Config {
57                crypto: cfg.crypto.clone(),
58                namespace: union(&cfg.namespace, TRACKER_SUFFIX),
59                address: cfg.dialable.clone(),
60                bootstrappers: cfg.bootstrappers.clone(),
61                allow_private_ips: cfg.allow_private_ips,
62                allow_dns: cfg.allow_dns,
63                synchrony_bound: cfg.synchrony_bound,
64                tracked_peer_sets: cfg.tracked_peer_sets,
65                allowed_connection_rate_per_peer: cfg.allowed_connection_rate_per_peer,
66                peer_gossip_max_count: cfg.peer_gossip_max_count,
67                max_peer_set_size: cfg.max_peer_set_size,
68                dial_fail_limit: cfg.dial_fail_limit,
69                block_duration: cfg.block_duration,
70            },
71        );
72        let (router, router_mailbox, messenger) = router::Actor::new(
73            context.with_label("router"),
74            router::Config {
75                mailbox_size: cfg.mailbox_size,
76            },
77        );
78        let channels = Channels::new(messenger, cfg.max_message_size);
79
80        (
81            Self {
82                context: ContextCell::new(context),
83                cfg,
84
85                channels,
86                tracker,
87                tracker_mailbox,
88                router,
89                router_mailbox,
90                info_verifier,
91            },
92            oracle,
93        )
94    }
95
96    /// Register a new channel over the network.
97    ///
98    /// # Parameters
99    ///
100    /// * `channel` - Unique identifier for the channel.
101    /// * `rate` - Rate at which messages can be received over the channel.
102    /// * `backlog` - Maximum number of messages that can be queued on the channel before blocking.
103    ///
104    /// # Returns
105    ///
106    /// * A tuple containing the sender and receiver for the channel (how to communicate
107    ///   with external peers on the network). It is safe to close either the sender or receiver
108    ///   without impacting the ability to process messages on other channels.
109    #[allow(clippy::type_complexity)]
110    pub fn register(
111        &mut self,
112        channel: Channel,
113        rate: Quota,
114        backlog: usize,
115    ) -> (
116        channels::Sender<C::PublicKey, E>,
117        channels::Receiver<C::PublicKey>,
118    ) {
119        let clock = self
120            .context
121            .with_label(&format!("channel_{channel}"))
122            .take();
123        self.channels.register(channel, rate, backlog, clock)
124    }
125
126    /// Starts the network.
127    ///
128    /// After the network is started, it is not possible to add more channels.
129    pub fn start(mut self) -> Handle<()> {
130        spawn_cell!(self.context, self.run().await)
131    }
132
133    async fn run(self) {
134        // Start tracker
135        let mut tracker_task = self.tracker.start();
136
137        // Start router
138        let mut router_task = self.router.start(self.channels);
139
140        // Start spawner
141        let (spawner, spawner_mailbox) = spawner::Actor::new(
142            self.context.with_label("spawner"),
143            spawner::Config {
144                mailbox_size: self.cfg.mailbox_size,
145                gossip_bit_vec_frequency: self.cfg.gossip_bit_vec_frequency,
146                max_peer_set_size: self.cfg.max_peer_set_size,
147                peer_gossip_max_count: self.cfg.peer_gossip_max_count,
148                info_verifier: self.info_verifier,
149            },
150        );
151        let mut spawner_task =
152            spawner.start(self.tracker_mailbox.clone(), self.router_mailbox.clone());
153
154        // Start listener
155        let stream_cfg = StreamConfig {
156            signing_key: self.cfg.crypto,
157            namespace: union(&self.cfg.namespace, STREAM_SUFFIX),
158            max_message_size: self
159                .cfg
160                .max_message_size
161                .saturating_add(types::MAX_PAYLOAD_DATA_OVERHEAD),
162            synchrony_bound: self.cfg.synchrony_bound,
163            max_handshake_age: self.cfg.max_handshake_age,
164            handshake_timeout: self.cfg.handshake_timeout,
165        };
166        let listener = listener::Actor::new(
167            self.context.with_label("listener"),
168            listener::Config {
169                address: self.cfg.listen,
170                stream_cfg: stream_cfg.clone(),
171                allow_private_ips: self.cfg.allow_private_ips,
172                max_concurrent_handshakes: self.cfg.max_concurrent_handshakes,
173                allowed_handshake_rate_per_ip: self.cfg.allowed_handshake_rate_per_ip,
174                allowed_handshake_rate_per_subnet: self.cfg.allowed_handshake_rate_per_subnet,
175            },
176        );
177        let mut listener_task =
178            listener.start(self.tracker_mailbox.clone(), spawner_mailbox.clone());
179
180        // Start dialer
181        let dialer = dialer::Actor::new(
182            self.context.with_label("dialer"),
183            dialer::Config {
184                stream_cfg,
185                dial_frequency: self.cfg.dial_frequency,
186                query_frequency: self.cfg.query_frequency,
187                allow_private_ips: self.cfg.allow_private_ips,
188            },
189        );
190        let mut dialer_task = dialer.start(self.tracker_mailbox, spawner_mailbox);
191
192        let mut shutdown = self.context.stopped();
193
194        // Wait for first actor to exit
195        info!("network started");
196        select! {
197            _ = &mut shutdown => {
198                debug!("context shutdown, stopping network");
199            },
200            tracker = &mut tracker_task => {
201                panic!("tracker exited unexpectedly: {tracker:?}");
202            },
203            router = &mut router_task => {
204                panic!("router exited unexpectedly: {router:?}");
205            },
206            spawner = &mut spawner_task => {
207                panic!("spawner exited unexpectedly: {spawner:?}");
208            },
209            listener = &mut listener_task => {
210                panic!("listener exited unexpectedly: {listener:?}");
211            },
212            dialer = &mut dialer_task => {
213                panic!("dialer exited unexpectedly: {dialer:?}");
214            },
215        }
216    }
217}