commonware_p2p/authenticated/
network.rs

1//! Implementation of an `authenticated` network.
2
3use super::{
4    actors::{dialer, listener, router, spawner, tracker},
5    channels::{self, Channels},
6    config::Config,
7    types,
8};
9use crate::Channel;
10use commonware_cryptography::Scheme;
11use commonware_macros::select;
12use commonware_runtime::{Clock, Handle, Metrics, Network as RNetwork, Spawner};
13use commonware_stream::public_key;
14use commonware_utils::union;
15use governor::{clock::ReasonablyRealtime, Quota};
16use rand::{CryptoRng, Rng};
17use tracing::{debug, info, warn};
18
19/// Unique suffix for all messages signed by the tracker.
20const TRACKER_SUFFIX: &[u8] = b"_TRACKER";
21
22/// Unique suffix for all messages signed in a stream.
23const STREAM_SUFFIX: &[u8] = b"_STREAM";
24
25/// Implementation of an `authenticated` network.
26pub struct Network<
27    E: Spawner + Clock + ReasonablyRealtime + Rng + CryptoRng + RNetwork + Metrics,
28    C: Scheme,
29> {
30    context: E,
31    cfg: Config<C>,
32
33    channels: Channels<C::PublicKey>,
34    tracker: tracker::Actor<E, C>,
35    tracker_mailbox: tracker::Mailbox<E, C>,
36    router: router::Actor<E, C::PublicKey>,
37    router_mailbox: router::Mailbox<C::PublicKey>,
38}
39
40impl<E: Spawner + Clock + ReasonablyRealtime + Rng + CryptoRng + RNetwork + Metrics, C: Scheme>
41    Network<E, C>
42{
43    /// Create a new instance of an `authenticated` network.
44    ///
45    /// # Parameters
46    ///
47    /// * `cfg` - Configuration for the network.
48    ///
49    /// # Returns
50    ///
51    /// * A tuple containing the network instance and the oracle that
52    ///   can be used by a developer to configure which peers are authorized.
53    pub fn new(context: E, cfg: Config<C>) -> (Self, tracker::Oracle<E, C>) {
54        let (tracker, tracker_mailbox, oracle) = tracker::Actor::new(
55            context.with_label("tracker"),
56            tracker::Config {
57                crypto: cfg.crypto.clone(),
58                namespace: union(&cfg.namespace, TRACKER_SUFFIX),
59                address: cfg.dialable,
60                bootstrappers: cfg.bootstrappers.clone(),
61                allow_private_ips: cfg.allow_private_ips,
62                mailbox_size: cfg.mailbox_size,
63                synchrony_bound: cfg.synchrony_bound,
64                tracked_peer_sets: cfg.tracked_peer_sets,
65                allowed_connection_rate_per_peer: cfg.allowed_connection_rate_per_peer,
66                peer_gossip_max_count: cfg.peer_gossip_max_count,
67                max_peer_set_size: cfg.max_peer_set_size,
68                dial_fail_limit: cfg.dial_fail_limit,
69            },
70        );
71        let (router, router_mailbox, messenger) = router::Actor::new(
72            context.with_label("router"),
73            router::Config {
74                mailbox_size: cfg.mailbox_size,
75            },
76        );
77        let channels = Channels::new(messenger, cfg.max_message_size);
78
79        (
80            Self {
81                context,
82                cfg,
83
84                channels,
85                tracker,
86                tracker_mailbox,
87                router,
88                router_mailbox,
89            },
90            oracle,
91        )
92    }
93
94    /// Register a new channel over the network.
95    ///
96    /// # Parameters
97    ///
98    /// * `channel` - Unique identifier for the channel.
99    /// * `rate` - Rate at which messages can be received over the channel.
100    /// * `backlog` - Maximum number of messages that can be queued on the channel before blocking.
101    /// * `compression` - Optional compression level (using `zstd`) to use for messages on the channel.
102    ///
103    /// # Returns
104    ///
105    /// * A tuple containing the sender and receiver for the channel (how to communicate
106    ///   with external peers on the network). It is safe to close either the sender or receiver
107    ///   without impacting the ability to process messages on other channels.
108    pub fn register(
109        &mut self,
110        channel: Channel,
111        rate: Quota,
112        backlog: usize,
113        compression: Option<i32>,
114    ) -> (
115        channels::Sender<C::PublicKey>,
116        channels::Receiver<C::PublicKey>,
117    ) {
118        self.channels.register(channel, rate, backlog, compression)
119    }
120
121    /// Starts the network.
122    ///
123    /// After the network is started, it is not possible to add more channels.
124    pub fn start(mut self) -> Handle<()> {
125        self.context.spawn_ref()(self.run())
126    }
127
128    async fn run(self) {
129        // Start tracker
130        let mut tracker_task = self.tracker.start();
131
132        // Start router
133        let mut router_task = self.router.start(self.channels);
134
135        // Start spawner
136        let (spawner, spawner_mailbox) = spawner::Actor::new(
137            self.context.with_label("spawner"),
138            spawner::Config {
139                mailbox_size: self.cfg.mailbox_size,
140                gossip_bit_vec_frequency: self.cfg.gossip_bit_vec_frequency,
141                allowed_bit_vec_rate: self.cfg.allowed_bit_vec_rate,
142                max_peer_set_size: self.cfg.max_peer_set_size,
143                allowed_peers_rate: self.cfg.allowed_peers_rate,
144                peer_gossip_max_count: self.cfg.peer_gossip_max_count,
145            },
146        );
147        let mut spawner_task =
148            spawner.start(self.tracker_mailbox.clone(), self.router_mailbox.clone());
149
150        // Start listener
151        let stream_cfg = public_key::Config {
152            crypto: self.cfg.crypto,
153            namespace: union(&self.cfg.namespace, STREAM_SUFFIX),
154            max_message_size: self.cfg.max_message_size + types::MAX_PAYLOAD_DATA_OVERHEAD,
155            synchrony_bound: self.cfg.synchrony_bound,
156            max_handshake_age: self.cfg.max_handshake_age,
157            handshake_timeout: self.cfg.handshake_timeout,
158        };
159        let listener = listener::Actor::new(
160            self.context.with_label("listener"),
161            listener::Config {
162                address: self.cfg.listen,
163                stream_cfg: stream_cfg.clone(),
164                allowed_incoming_connection_rate: self.cfg.allowed_incoming_connection_rate,
165            },
166        );
167        let mut listener_task =
168            listener.start(self.tracker_mailbox.clone(), spawner_mailbox.clone());
169
170        // Start dialer
171        let dialer = dialer::Actor::new(
172            self.context.with_label("dialer"),
173            dialer::Config {
174                stream_cfg,
175                dial_frequency: self.cfg.dial_frequency,
176                query_frequency: self.cfg.query_frequency,
177            },
178        );
179        let mut dialer_task = dialer.start(self.tracker_mailbox, spawner_mailbox);
180
181        // Wait for first actor to exit
182        info!("network started");
183        let err = select! {
184            tracker = &mut tracker_task => {
185                debug!("tracker exited");
186                tracker
187            },
188            router = &mut router_task => {
189                debug!("router exited");
190                router
191            },
192            spawner = &mut spawner_task => {
193                debug!("spawner exited");
194                spawner
195            },
196            listener = &mut listener_task => {
197                debug!("listener exited");
198                listener
199            },
200            dialer = &mut dialer_task => {
201                debug!("dialer exited");
202                dialer
203            },
204        }
205        .unwrap_err();
206
207        // Ensure all tasks close
208        tracker_task.abort();
209        router_task.abort();
210        spawner_task.abort();
211        listener_task.abort();
212        dialer_task.abort();
213
214        // Log error
215        warn!(error=?err, "network shutdown")
216    }
217}