Skip to main content

commonware_p2p/authenticated/discovery/
network.rs

1//! Implementation of an `authenticated` network.
2
3use super::{
4    actors::{dialer, listener, router, spawner, tracker},
5    channels::{self, Channels},
6    config::Config,
7    types,
8};
9use crate::{
10    authenticated::{discovery::types::InfoVerifier, mailbox::UnboundedMailbox, Mailbox},
11    Channel,
12};
13use commonware_cryptography::Signer;
14use commonware_macros::select;
15use commonware_runtime::{
16    spawn_cell, BufferPooler, Clock, ContextCell, Handle, Metrics, Network as RNetwork, Quota,
17    Resolver, Spawner,
18};
19use commonware_stream::encrypted::Config as StreamConfig;
20use commonware_utils::union;
21use rand_core::CryptoRngCore;
22use tracing::{debug, info};
23
24/// Unique suffix for all messages signed by the tracker.
25const TRACKER_SUFFIX: &[u8] = b"_TRACKER";
26
27/// Unique suffix for all messages signed in a stream.
28const STREAM_SUFFIX: &[u8] = b"_STREAM";
29
30/// Implementation of an `authenticated` network.
31pub struct Network<
32    E: Spawner + BufferPooler + Clock + CryptoRngCore + RNetwork + Resolver + Metrics,
33    C: Signer,
34> {
35    context: ContextCell<E>,
36    cfg: Config<C>,
37
38    channels: Channels<C::PublicKey>,
39    tracker: tracker::Actor<E, C>,
40    tracker_mailbox: UnboundedMailbox<tracker::Message<C::PublicKey>>,
41    router: router::Actor<E, C::PublicKey>,
42    router_mailbox: Mailbox<router::Message<C::PublicKey>>,
43    info_verifier: InfoVerifier<C::PublicKey>,
44}
45
46impl<
47        E: Spawner + BufferPooler + Clock + CryptoRngCore + RNetwork + Resolver + Metrics,
48        C: Signer,
49    > Network<E, C>
50{
51    /// Create a new instance of an `authenticated` network.
52    ///
53    /// # Parameters
54    ///
55    /// * `cfg` - Configuration for the network.
56    ///
57    /// # Returns
58    ///
59    /// * A tuple containing the network instance and the oracle that
60    ///   can be used by a developer to configure which peers are authorized.
61    pub fn new(context: E, cfg: Config<C>) -> (Self, tracker::Oracle<C::PublicKey>) {
62        let (tracker, tracker_mailbox, oracle, info_verifier) = tracker::Actor::new(
63            context.with_label("tracker"),
64            tracker::Config {
65                crypto: cfg.crypto.clone(),
66                namespace: union(&cfg.namespace, TRACKER_SUFFIX),
67                address: cfg.dialable.clone(),
68                bootstrappers: cfg.bootstrappers.clone(),
69                allow_private_ips: cfg.allow_private_ips,
70                allow_dns: cfg.allow_dns,
71                synchrony_bound: cfg.synchrony_bound,
72                tracked_peer_sets: cfg.tracked_peer_sets,
73                allowed_connection_rate_per_peer: cfg.allowed_connection_rate_per_peer,
74                peer_gossip_max_count: cfg.peer_gossip_max_count,
75                max_peer_set_size: cfg.max_peer_set_size,
76                dial_fail_limit: cfg.dial_fail_limit,
77                block_duration: cfg.block_duration,
78            },
79        );
80        let (router, router_mailbox, messenger) = router::Actor::new(
81            context.with_label("router"),
82            router::Config {
83                mailbox_size: cfg.mailbox_size,
84            },
85        );
86        let channels = Channels::new(messenger, cfg.max_message_size);
87
88        (
89            Self {
90                context: ContextCell::new(context),
91                cfg,
92
93                channels,
94                tracker,
95                tracker_mailbox,
96                router,
97                router_mailbox,
98                info_verifier,
99            },
100            oracle,
101        )
102    }
103
104    /// Register a new channel over the network.
105    ///
106    /// # Parameters
107    ///
108    /// * `channel` - Unique identifier for the channel.
109    /// * `rate` - Rate at which messages can be received over the channel.
110    /// * `backlog` - Maximum number of messages that can be queued on the channel before blocking.
111    ///
112    /// # Returns
113    ///
114    /// * A tuple containing the sender and receiver for the channel (how to communicate
115    ///   with external peers on the network). It is safe to close either the sender or receiver
116    ///   without impacting the ability to process messages on other channels.
117    #[allow(clippy::type_complexity)]
118    pub fn register(
119        &mut self,
120        channel: Channel,
121        rate: Quota,
122        backlog: usize,
123    ) -> (
124        channels::Sender<C::PublicKey, E>,
125        channels::Receiver<C::PublicKey>,
126    ) {
127        let clock = self
128            .context
129            .with_label("channel")
130            .with_attribute("idx", channel)
131            .take();
132        self.channels.register(channel, rate, backlog, clock)
133    }
134
135    /// Starts the network.
136    ///
137    /// After the network is started, it is not possible to add more channels.
138    pub fn start(mut self) -> Handle<()> {
139        spawn_cell!(self.context, self.run().await)
140    }
141
142    async fn run(self) {
143        // Start tracker
144        let mut tracker_task = self.tracker.start();
145
146        // Start router
147        let mut router_task = self.router.start(self.channels);
148
149        // Start spawner
150        let (spawner, spawner_mailbox) = spawner::Actor::new(
151            self.context.with_label("spawner"),
152            spawner::Config {
153                mailbox_size: self.cfg.mailbox_size,
154                gossip_bit_vec_frequency: self.cfg.gossip_bit_vec_frequency,
155                max_peer_set_size: self.cfg.max_peer_set_size,
156                peer_gossip_max_count: self.cfg.peer_gossip_max_count,
157                info_verifier: self.info_verifier,
158            },
159        );
160        let mut spawner_task =
161            spawner.start(self.tracker_mailbox.clone(), self.router_mailbox.clone());
162
163        // Start listener
164        let stream_cfg = StreamConfig {
165            signing_key: self.cfg.crypto,
166            namespace: union(&self.cfg.namespace, STREAM_SUFFIX),
167            max_message_size: self
168                .cfg
169                .max_message_size
170                .saturating_add(types::MAX_PAYLOAD_DATA_OVERHEAD),
171            synchrony_bound: self.cfg.synchrony_bound,
172            max_handshake_age: self.cfg.max_handshake_age,
173            handshake_timeout: self.cfg.handshake_timeout,
174        };
175        let listener = listener::Actor::new(
176            self.context.with_label("listener"),
177            listener::Config {
178                address: self.cfg.listen,
179                stream_cfg: stream_cfg.clone(),
180                allow_private_ips: self.cfg.allow_private_ips,
181                max_concurrent_handshakes: self.cfg.max_concurrent_handshakes,
182                allowed_handshake_rate_per_ip: self.cfg.allowed_handshake_rate_per_ip,
183                allowed_handshake_rate_per_subnet: self.cfg.allowed_handshake_rate_per_subnet,
184            },
185        );
186        let mut listener_task =
187            listener.start(self.tracker_mailbox.clone(), spawner_mailbox.clone());
188
189        // Start dialer
190        let dialer = dialer::Actor::new(
191            self.context.with_label("dialer"),
192            dialer::Config {
193                stream_cfg,
194                dial_frequency: self.cfg.dial_frequency,
195                query_frequency: self.cfg.query_frequency,
196                allow_private_ips: self.cfg.allow_private_ips,
197            },
198        );
199        let mut dialer_task = dialer.start(self.tracker_mailbox, spawner_mailbox);
200
201        let mut shutdown = self.context.stopped();
202
203        // Wait for first actor to exit
204        info!("network started");
205        select! {
206            _ = &mut shutdown => {
207                debug!("context shutdown, stopping network");
208            },
209            tracker = &mut tracker_task => {
210                panic!("tracker exited unexpectedly: {tracker:?}");
211            },
212            router = &mut router_task => {
213                panic!("router exited unexpectedly: {router:?}");
214            },
215            spawner = &mut spawner_task => {
216                panic!("spawner exited unexpectedly: {spawner:?}");
217            },
218            listener = &mut listener_task => {
219                panic!("listener exited unexpectedly: {listener:?}");
220            },
221            dialer = &mut dialer_task => {
222                panic!("dialer exited unexpectedly: {dialer:?}");
223            },
224        }
225    }
226}