Skip to main content

commonware_p2p/authenticated/discovery/
network.rs

1//! Implementation of an `authenticated` network.
2
3use super::{
4    actors::{dialer, listener, router, spawner, tracker},
5    channels::{self, Channels},
6    config::Config,
7    types,
8};
9use crate::{authenticated::discovery::types::InfoVerifier, Channel};
10use commonware_cryptography::Signer;
11use commonware_macros::select;
12use commonware_runtime::{
13    spawn_cell, BufferPooler, Clock, ContextCell, Handle, Metrics, Network as RNetwork, Quota,
14    Resolver, Spawner,
15};
16use commonware_stream::encrypted::Config as StreamConfig;
17use commonware_utils::union;
18use rand_core::CryptoRngCore;
19use tracing::{debug, info};
20
21/// Unique suffix for all messages signed by the tracker.
22const TRACKER_SUFFIX: &[u8] = b"_TRACKER";
23
24/// Unique suffix for all messages signed in a stream.
25const STREAM_SUFFIX: &[u8] = b"_STREAM";
26
27/// Implementation of an `authenticated` network.
28pub struct Network<
29    E: Spawner + BufferPooler + Clock + CryptoRngCore + RNetwork + Resolver + Metrics,
30    C: Signer,
31> {
32    context: ContextCell<E>,
33    cfg: Config<C>,
34
35    channels: Channels<C::PublicKey>,
36    tracker: tracker::Actor<E, C>,
37    tracker_mailbox: tracker::Mailbox<C::PublicKey>,
38    router: router::Actor<E, C::PublicKey>,
39    router_mailbox: router::Mailbox<C::PublicKey>,
40    info_verifier: InfoVerifier<C::PublicKey>,
41}
42
43impl<
44        E: Spawner + BufferPooler + Clock + CryptoRngCore + RNetwork + Resolver + Metrics,
45        C: Signer,
46    > Network<E, C>
47{
48    /// Create a new instance of an `authenticated` network.
49    ///
50    /// # Parameters
51    ///
52    /// * `cfg` - Configuration for the network.
53    ///
54    /// # Returns
55    ///
56    /// * A tuple containing the network instance and the oracle that
57    ///   can be used by a developer to configure which peers are authorized.
58    pub fn new(context: E, cfg: Config<C>) -> (Self, tracker::Oracle<C::PublicKey>) {
59        let (tracker, tracker_mailbox, oracle, info_verifier) = tracker::Actor::new(
60            context.child("tracker"),
61            tracker::Config {
62                crypto: cfg.crypto.clone(),
63                namespace: union(&cfg.namespace, TRACKER_SUFFIX),
64                address: cfg.dialable.clone(),
65                bootstrappers: cfg.bootstrappers.clone(),
66                allow_private_ips: cfg.allow_private_ips,
67                allow_dns: cfg.allow_dns,
68                synchrony_bound: cfg.synchrony_bound,
69                mailbox_size: cfg.mailbox_size,
70                tracked_peer_sets: cfg.tracked_peer_sets,
71                peer_connection_cooldown: cfg.peer_connection_cooldown,
72                peer_gossip_max_count: cfg.peer_gossip_max_count,
73                max_peer_set_size: cfg.max_peer_set_size,
74                dial_fail_limit: cfg.dial_fail_limit,
75                block_duration: cfg.block_duration,
76            },
77        );
78        let (router, router_mailbox, messenger) = router::Actor::new(
79            context.child("router"),
80            router::Config {
81                mailbox_size: cfg.mailbox_size,
82            },
83        );
84        let channels = Channels::new(messenger, cfg.max_message_size);
85
86        (
87            Self {
88                context: ContextCell::new(context),
89                cfg,
90
91                channels,
92                tracker,
93                tracker_mailbox,
94                router,
95                router_mailbox,
96                info_verifier,
97            },
98            oracle,
99        )
100    }
101
102    /// Register a new channel over the network.
103    ///
104    /// # Parameters
105    ///
106    /// * `channel` - Unique identifier for the channel.
107    /// * `rate` - Rate at which messages can be received over the channel.
108    /// * `backlog` - Maximum number of messages that can be queued on the channel before blocking.
109    ///
110    /// # Returns
111    ///
112    /// * A tuple containing the sender and receiver for the channel (how to communicate
113    ///   with external peers on the network). It is safe to close either the sender or receiver
114    ///   without impacting the ability to process messages on other channels.
115    #[allow(clippy::type_complexity)]
116    pub fn register(
117        &mut self,
118        channel: Channel,
119        rate: Quota,
120        backlog: usize,
121    ) -> (
122        channels::Sender<C::PublicKey, E>,
123        channels::Receiver<C::PublicKey>,
124    ) {
125        let context = self
126            .context
127            .child("channel")
128            .with_attribute("index", channel);
129        self.channels.register(channel, rate, backlog, context)
130    }
131
132    /// Starts the network.
133    ///
134    /// After the network is started, it is not possible to add more channels.
135    pub fn start(mut self) -> Handle<()> {
136        spawn_cell!(self.context, self.run())
137    }
138
139    async fn run(self) {
140        // Start tracker
141        let mut tracker_task = self.tracker.start();
142
143        // Start router
144        let mut router_task = self.router.start(self.channels);
145
146        // Start spawner
147        let (spawner, spawner_mailbox) = spawner::Actor::new(
148            self.context.child("spawner"),
149            spawner::Config {
150                mailbox_size: self.cfg.mailbox_size,
151                send_batch_size: self.cfg.send_batch_size,
152                gossip_bit_vec_frequency: self.cfg.gossip_bit_vec_frequency,
153                max_peer_set_size: self.cfg.max_peer_set_size,
154                peer_gossip_max_count: self.cfg.peer_gossip_max_count,
155                info_verifier: self.info_verifier,
156            },
157        );
158        let mut spawner_task =
159            spawner.start(self.tracker_mailbox.clone(), self.router_mailbox.clone());
160
161        // Start listener
162        let stream_cfg = StreamConfig {
163            signing_key: self.cfg.crypto,
164            namespace: union(&self.cfg.namespace, STREAM_SUFFIX),
165            max_message_size: self
166                .cfg
167                .max_message_size
168                .saturating_add(types::MAX_PAYLOAD_DATA_OVERHEAD),
169            synchrony_bound: self.cfg.synchrony_bound,
170            max_handshake_age: self.cfg.max_handshake_age,
171            handshake_timeout: self.cfg.handshake_timeout,
172        };
173        let listener = listener::Actor::new(
174            self.context.child("listener"),
175            listener::Config {
176                address: self.cfg.listen,
177                stream_cfg: stream_cfg.clone(),
178                allow_private_ips: self.cfg.allow_private_ips,
179                max_concurrent_handshakes: self.cfg.max_concurrent_handshakes,
180                allowed_handshake_rate_per_ip: self.cfg.allowed_handshake_rate_per_ip,
181                allowed_handshake_rate_per_subnet: self.cfg.allowed_handshake_rate_per_subnet,
182            },
183        );
184        let mut listener_task =
185            listener.start(self.tracker_mailbox.clone(), spawner_mailbox.clone());
186
187        // Start dialer
188        let dialer = dialer::Actor::new(
189            self.context.child("dialer"),
190            dialer::Config {
191                stream_cfg,
192                dial_frequency: self.cfg.dial_frequency,
193                peer_connection_cooldown: self.cfg.peer_connection_cooldown,
194                allow_private_ips: self.cfg.allow_private_ips,
195            },
196        );
197        let mut dialer_task = dialer.start(self.tracker_mailbox, spawner_mailbox);
198
199        let mut shutdown = self.context.stopped();
200
201        // If any task completes, the network should stop
202        info!("network started");
203        select! {
204            _ = &mut shutdown => {
205                debug!("context shutdown, stopping network");
206            },
207            tracker = &mut tracker_task => {
208                debug!(?tracker, "tracker stopped, shutting down network");
209            },
210            router = &mut router_task => {
211                debug!(?router, "router stopped, shutting down network");
212            },
213            spawner = &mut spawner_task => {
214                debug!(?spawner, "spawner stopped, shutting down network");
215            },
216            listener = &mut listener_task => {
217                debug!(?listener, "listener stopped, shutting down network");
218            },
219            dialer = &mut dialer_task => {
220                debug!(?dialer, "dialer stopped, shutting down network");
221            },
222        }
223    }
224}