commonware_p2p/authenticated/
network.rs

1//! Implementation of an `authenticated` network.
2
3use super::{
4    actors::{dialer, listener, router, spawner, tracker},
5    channels::{self, Channels},
6    config::Config,
7    types,
8};
9use crate::Channel;
10use commonware_cryptography::Scheme;
11use commonware_macros::select;
12use commonware_runtime::{Clock, Handle, Metrics, Network as RNetwork, Spawner};
13use commonware_stream::public_key;
14use commonware_utils::union;
15use governor::{clock::ReasonablyRealtime, Quota};
16use rand::{CryptoRng, Rng};
17use tracing::{debug, info, warn};
18
19/// Unique suffix for all messages signed by the tracker.
20const TRACKER_SUFFIX: &[u8] = b"_TRACKER";
21
22/// Unique suffix for all messages signed in a stream.
23const STREAM_SUFFIX: &[u8] = b"_STREAM";
24
25/// Implementation of an `authenticated` network.
26pub struct Network<
27    E: Spawner + Clock + ReasonablyRealtime + Rng + CryptoRng + RNetwork + Metrics,
28    C: Scheme,
29> {
30    context: E,
31    cfg: Config<C>,
32
33    channels: Channels<C::PublicKey>,
34    tracker: tracker::Actor<E, C>,
35    tracker_mailbox: tracker::Mailbox<E, C>,
36    router: router::Actor<E, C::PublicKey>,
37    router_mailbox: router::Mailbox<C::PublicKey>,
38}
39
40impl<E: Spawner + Clock + ReasonablyRealtime + Rng + CryptoRng + RNetwork + Metrics, C: Scheme>
41    Network<E, C>
42{
43    /// Create a new instance of an `authenticated` network.
44    ///
45    /// # Parameters
46    ///
47    /// * `cfg` - Configuration for the network.
48    ///
49    /// # Returns
50    ///
51    /// * A tuple containing the network instance and the oracle that
52    ///   can be used by a developer to configure which peers are authorized.
53    pub fn new(context: E, cfg: Config<C>) -> (Self, tracker::Oracle<E, C>) {
54        let (tracker, tracker_mailbox, oracle) = tracker::Actor::new(
55            context.with_label("tracker"),
56            tracker::Config {
57                crypto: cfg.crypto.clone(),
58                namespace: union(&cfg.namespace, TRACKER_SUFFIX),
59                address: cfg.dialable,
60                bootstrappers: cfg.bootstrappers.clone(),
61                allow_private_ips: cfg.allow_private_ips,
62                mailbox_size: cfg.mailbox_size,
63                synchrony_bound: cfg.synchrony_bound,
64                tracked_peer_sets: cfg.tracked_peer_sets,
65                allowed_connection_rate_per_peer: cfg.allowed_connection_rate_per_peer,
66                peer_gossip_max_count: cfg.peer_gossip_max_count,
67                max_peer_set_size: cfg.max_peer_set_size,
68            },
69        );
70        let (router, router_mailbox, messenger) = router::Actor::new(
71            context.with_label("router"),
72            router::Config {
73                mailbox_size: cfg.mailbox_size,
74            },
75        );
76        let channels = Channels::new(messenger, cfg.max_message_size);
77
78        (
79            Self {
80                context,
81                cfg,
82
83                channels,
84                tracker,
85                tracker_mailbox,
86                router,
87                router_mailbox,
88            },
89            oracle,
90        )
91    }
92
93    /// Register a new channel over the network.
94    ///
95    /// # Parameters
96    ///
97    /// * `channel` - Unique identifier for the channel.
98    /// * `rate` - Rate at which messages can be received over the channel.
99    /// * `backlog` - Maximum number of messages that can be queued on the channel before blocking.
100    /// * `compression` - Optional compression level (using `zstd`) to use for messages on the channel.
101    ///
102    /// # Returns
103    ///
104    /// * A tuple containing the sender and receiver for the channel (how to communicate
105    ///   with external peers on the network). It is safe to close either the sender or receiver
106    ///   without impacting the ability to process messages on other channels.
107    pub fn register(
108        &mut self,
109        channel: Channel,
110        rate: Quota,
111        backlog: usize,
112        compression: Option<i32>,
113    ) -> (
114        channels::Sender<C::PublicKey>,
115        channels::Receiver<C::PublicKey>,
116    ) {
117        self.channels.register(channel, rate, backlog, compression)
118    }
119
120    /// Starts the network.
121    ///
122    /// After the network is started, it is not possible to add more channels.
123    pub fn start(mut self) -> Handle<()> {
124        self.context.spawn_ref()(self.run())
125    }
126
127    async fn run(self) {
128        // Start tracker
129        let mut tracker_task = self.tracker.start();
130
131        // Start router
132        let mut router_task = self.router.start(self.channels);
133
134        // Start spawner
135        let (spawner, spawner_mailbox) = spawner::Actor::new(
136            self.context.with_label("spawner"),
137            spawner::Config {
138                mailbox_size: self.cfg.mailbox_size,
139                gossip_bit_vec_frequency: self.cfg.gossip_bit_vec_frequency,
140                allowed_bit_vec_rate: self.cfg.allowed_bit_vec_rate,
141                max_peer_set_size: self.cfg.max_peer_set_size,
142                allowed_peers_rate: self.cfg.allowed_peers_rate,
143                peer_gossip_max_count: self.cfg.peer_gossip_max_count,
144            },
145        );
146        let mut spawner_task =
147            spawner.start(self.tracker_mailbox.clone(), self.router_mailbox.clone());
148
149        // Start listener
150        let stream_cfg = public_key::Config {
151            crypto: self.cfg.crypto,
152            namespace: union(&self.cfg.namespace, STREAM_SUFFIX),
153            max_message_size: self.cfg.max_message_size + types::MAX_PAYLOAD_DATA_OVERHEAD,
154            synchrony_bound: self.cfg.synchrony_bound,
155            max_handshake_age: self.cfg.max_handshake_age,
156            handshake_timeout: self.cfg.handshake_timeout,
157        };
158        let listener = listener::Actor::new(
159            self.context.with_label("listener"),
160            listener::Config {
161                address: self.cfg.listen,
162                stream_cfg: stream_cfg.clone(),
163                allowed_incoming_connection_rate: self.cfg.allowed_incoming_connection_rate,
164            },
165        );
166        let mut listener_task =
167            listener.start(self.tracker_mailbox.clone(), spawner_mailbox.clone());
168
169        // Start dialer
170        let dialer = dialer::Actor::new(
171            self.context.with_label("dialer"),
172            dialer::Config {
173                stream_cfg,
174                dial_frequency: self.cfg.dial_frequency,
175                dial_rate: self.cfg.dial_rate,
176            },
177        );
178        let mut dialer_task = dialer.start(self.tracker_mailbox, spawner_mailbox);
179
180        // Wait for first actor to exit
181        info!("network started");
182        let err = select! {
183            tracker = &mut tracker_task => {
184                debug!("tracker exited");
185                tracker
186            },
187            router = &mut router_task => {
188                debug!("router exited");
189                router
190            },
191            spawner = &mut spawner_task => {
192                debug!("spawner exited");
193                spawner
194            },
195            listener = &mut listener_task => {
196                debug!("listener exited");
197                listener
198            },
199            dialer = &mut dialer_task => {
200                debug!("dialer exited");
201                dialer
202            },
203        }
204        .unwrap_err();
205
206        // Ensure all tasks close
207        tracker_task.abort();
208        router_task.abort();
209        spawner_task.abort();
210        listener_task.abort();
211        dialer_task.abort();
212
213        // Log error
214        warn!(error=?err, "network shutdown")
215    }
216}