commonware_p2p/authenticated/discovery/
network.rs

1//! Implementation of an `authenticated` network.
2
3use super::{
4    actors::{dialer, listener, router, spawner, tracker},
5    channels::{self, Channels},
6    config::Config,
7    types,
8};
9use crate::{authenticated::Mailbox, Channel};
10use commonware_cryptography::Signer;
11use commonware_macros::select;
12use commonware_runtime::{Clock, Handle, Metrics, Network as RNetwork, Spawner};
13use commonware_stream::public_key;
14use commonware_utils::union;
15use governor::{clock::ReasonablyRealtime, Quota};
16use rand::{CryptoRng, Rng};
17use tracing::{debug, info, warn};
18
19/// Unique suffix for all messages signed by the tracker.
20const TRACKER_SUFFIX: &[u8] = b"_TRACKER";
21
22/// Unique suffix for all messages signed in a stream.
23const STREAM_SUFFIX: &[u8] = b"_STREAM";
24
25/// Implementation of an `authenticated` network.
26pub struct Network<
27    E: Spawner + Clock + ReasonablyRealtime + Rng + CryptoRng + RNetwork + Metrics,
28    C: Signer,
29> {
30    context: E,
31    cfg: Config<C>,
32
33    channels: Channels<C::PublicKey>,
34    tracker: tracker::Actor<E, C>,
35    tracker_mailbox: Mailbox<tracker::Message<E, C::PublicKey>>,
36    router: router::Actor<E, C::PublicKey>,
37    router_mailbox: Mailbox<router::Message<C::PublicKey>>,
38}
39
40impl<E: Spawner + Clock + ReasonablyRealtime + Rng + CryptoRng + RNetwork + Metrics, C: Signer>
41    Network<E, C>
42{
43    /// Create a new instance of an `authenticated` network.
44    ///
45    /// # Parameters
46    ///
47    /// * `cfg` - Configuration for the network.
48    ///
49    /// # Returns
50    ///
51    /// * A tuple containing the network instance and the oracle that
52    ///   can be used by a developer to configure which peers are authorized.
53    pub fn new(context: E, cfg: Config<C>) -> (Self, tracker::Oracle<E, C::PublicKey>) {
54        let (tracker, tracker_mailbox, oracle) = tracker::Actor::new(
55            context.with_label("tracker"),
56            tracker::Config {
57                crypto: cfg.crypto.clone(),
58                namespace: union(&cfg.namespace, TRACKER_SUFFIX),
59                address: cfg.dialable,
60                bootstrappers: cfg.bootstrappers.clone(),
61                allow_private_ips: cfg.allow_private_ips,
62                mailbox_size: cfg.mailbox_size,
63                synchrony_bound: cfg.synchrony_bound,
64                tracked_peer_sets: cfg.tracked_peer_sets,
65                allowed_connection_rate_per_peer: cfg.allowed_connection_rate_per_peer,
66                peer_gossip_max_count: cfg.peer_gossip_max_count,
67                max_peer_set_size: cfg.max_peer_set_size,
68                dial_fail_limit: cfg.dial_fail_limit,
69            },
70        );
71        let (router, router_mailbox, messenger) = router::Actor::new(
72            context.with_label("router"),
73            router::Config {
74                mailbox_size: cfg.mailbox_size,
75            },
76        );
77        let channels = Channels::new(messenger, cfg.max_message_size);
78
79        (
80            Self {
81                context,
82                cfg,
83
84                channels,
85                tracker,
86                tracker_mailbox,
87                router,
88                router_mailbox,
89            },
90            oracle,
91        )
92    }
93
94    /// Register a new channel over the network.
95    ///
96    /// # Parameters
97    ///
98    /// * `channel` - Unique identifier for the channel.
99    /// * `rate` - Rate at which messages can be received over the channel.
100    /// * `backlog` - Maximum number of messages that can be queued on the channel before blocking.
101    ///
102    /// # Returns
103    ///
104    /// * A tuple containing the sender and receiver for the channel (how to communicate
105    ///   with external peers on the network). It is safe to close either the sender or receiver
106    ///   without impacting the ability to process messages on other channels.
107    pub fn register(
108        &mut self,
109        channel: Channel,
110        rate: Quota,
111        backlog: usize,
112    ) -> (
113        channels::Sender<C::PublicKey>,
114        channels::Receiver<C::PublicKey>,
115    ) {
116        self.channels.register(channel, rate, backlog)
117    }
118
119    /// Starts the network.
120    ///
121    /// After the network is started, it is not possible to add more channels.
122    pub fn start(mut self) -> Handle<()> {
123        self.context.spawn_ref()(self.run())
124    }
125
126    async fn run(self) {
127        // Start tracker
128        let mut tracker_task = self.tracker.start();
129
130        // Start router
131        let mut router_task = self.router.start(self.channels);
132
133        // Start spawner
134        let (spawner, spawner_mailbox) = spawner::Actor::new(
135            self.context.with_label("spawner"),
136            spawner::Config {
137                mailbox_size: self.cfg.mailbox_size,
138                gossip_bit_vec_frequency: self.cfg.gossip_bit_vec_frequency,
139                allowed_bit_vec_rate: self.cfg.allowed_bit_vec_rate,
140                max_peer_set_size: self.cfg.max_peer_set_size,
141                allowed_peers_rate: self.cfg.allowed_peers_rate,
142                peer_gossip_max_count: self.cfg.peer_gossip_max_count,
143            },
144        );
145        let mut spawner_task =
146            spawner.start(self.tracker_mailbox.clone(), self.router_mailbox.clone());
147
148        // Start listener
149        let stream_cfg = public_key::Config {
150            crypto: self.cfg.crypto,
151            namespace: union(&self.cfg.namespace, STREAM_SUFFIX),
152            max_message_size: self.cfg.max_message_size + types::MAX_PAYLOAD_DATA_OVERHEAD,
153            synchrony_bound: self.cfg.synchrony_bound,
154            max_handshake_age: self.cfg.max_handshake_age,
155            handshake_timeout: self.cfg.handshake_timeout,
156        };
157        let listener = listener::Actor::new(
158            self.context.with_label("listener"),
159            listener::Config {
160                address: self.cfg.listen,
161                stream_cfg: stream_cfg.clone(),
162                allowed_incoming_connection_rate: self.cfg.allowed_incoming_connection_rate,
163            },
164        );
165        let mut listener_task =
166            listener.start(self.tracker_mailbox.clone(), spawner_mailbox.clone());
167
168        // Start dialer
169        let dialer = dialer::Actor::new(
170            self.context.with_label("dialer"),
171            dialer::Config {
172                stream_cfg,
173                dial_frequency: self.cfg.dial_frequency,
174                query_frequency: self.cfg.query_frequency,
175            },
176        );
177        let mut dialer_task = dialer.start(self.tracker_mailbox, spawner_mailbox);
178
179        // Wait for first actor to exit
180        info!("network started");
181        let err = select! {
182            tracker = &mut tracker_task => {
183                debug!("tracker exited");
184                tracker
185            },
186            router = &mut router_task => {
187                debug!("router exited");
188                router
189            },
190            spawner = &mut spawner_task => {
191                debug!("spawner exited");
192                spawner
193            },
194            listener = &mut listener_task => {
195                debug!("listener exited");
196                listener
197            },
198            dialer = &mut dialer_task => {
199                debug!("dialer exited");
200                dialer
201            },
202        }
203        .unwrap_err();
204
205        // Ensure all tasks close
206        tracker_task.abort();
207        router_task.abort();
208        spawner_task.abort();
209        listener_task.abort();
210        dialer_task.abort();
211
212        // Log error
213        warn!(error=?err, "network shutdown")
214    }
215}