commonware_p2p/authenticated/lookup/
network.rs

1//! Implementation of an `authenticated` network.
2
3use super::{
4    actors::{dialer, listener, router, spawner, tracker},
5    channels::{self, Channels},
6    config::Config,
7    types,
8};
9use crate::{authenticated::Mailbox, Channel};
10use commonware_cryptography::Signer;
11use commonware_macros::select;
12use commonware_runtime::{Clock, Handle, Metrics, Network as RNetwork, Spawner};
13use commonware_stream::public_key;
14use commonware_utils::union;
15use governor::{clock::ReasonablyRealtime, Quota};
16use rand::{CryptoRng, Rng};
17use tracing::{debug, info, warn};
18
19/// Unique suffix for all messages signed in a stream.
20const STREAM_SUFFIX: &[u8] = b"_STREAM";
21
22/// Implementation of an `authenticated` network.
23pub struct Network<
24    E: Spawner + Clock + ReasonablyRealtime + Rng + CryptoRng + RNetwork + Metrics,
25    C: Signer,
26> {
27    context: E,
28    cfg: Config<C>,
29
30    channels: Channels<C::PublicKey>,
31    tracker: tracker::Actor<E, C>,
32    tracker_mailbox: Mailbox<tracker::Message<E, C::PublicKey>>,
33    router: router::Actor<E, C::PublicKey>,
34    router_mailbox: Mailbox<router::Message<C::PublicKey>>,
35}
36
37impl<E: Spawner + Clock + ReasonablyRealtime + Rng + CryptoRng + RNetwork + Metrics, C: Signer>
38    Network<E, C>
39{
40    /// Create a new instance of an `authenticated` network.
41    ///
42    /// # Parameters
43    ///
44    /// * `cfg` - Configuration for the network.
45    ///
46    /// # Returns
47    ///
48    /// * A tuple containing the network instance and the oracle that
49    ///   can be used by a developer to configure which peers are authorized.
50    pub fn new(context: E, cfg: Config<C>) -> (Self, tracker::Oracle<E, C::PublicKey>) {
51        let (tracker, tracker_mailbox, oracle) = tracker::Actor::new(
52            context.with_label("tracker"),
53            tracker::Config {
54                crypto: cfg.crypto.clone(),
55                address: cfg.dialable,
56                mailbox_size: cfg.mailbox_size,
57                tracked_peer_sets: cfg.tracked_peer_sets,
58                allowed_connection_rate_per_peer: cfg.allowed_connection_rate_per_peer,
59                allow_private_ips: cfg.allow_private_ips,
60            },
61        );
62        let (router, router_mailbox, messenger) = router::Actor::new(
63            context.with_label("router"),
64            router::Config {
65                mailbox_size: cfg.mailbox_size,
66            },
67        );
68        let channels = Channels::new(messenger, cfg.max_message_size);
69
70        (
71            Self {
72                context,
73                cfg,
74
75                channels,
76                tracker,
77                tracker_mailbox,
78                router,
79                router_mailbox,
80            },
81            oracle,
82        )
83    }
84
85    /// Register a new channel over the network.
86    ///
87    /// # Parameters
88    ///
89    /// * `channel` - Unique identifier for the channel.
90    /// * `rate` - Rate at which messages can be received over the channel.
91    /// * `backlog` - Maximum number of messages that can be queued on the channel before blocking.
92    ///
93    /// # Returns
94    ///
95    /// * A tuple containing the sender and receiver for the channel (how to communicate
96    ///   with external peers on the network). It is safe to close either the sender or receiver
97    ///   without impacting the ability to process messages on other channels.
98    pub fn register(
99        &mut self,
100        channel: Channel,
101        rate: Quota,
102        backlog: usize,
103    ) -> (
104        channels::Sender<C::PublicKey>,
105        channels::Receiver<C::PublicKey>,
106    ) {
107        self.channels.register(channel, rate, backlog)
108    }
109
110    /// Starts the network.
111    ///
112    /// After the network is started, it is not possible to add more channels.
113    pub fn start(mut self) -> Handle<()> {
114        self.context.spawn_ref()(self.run())
115    }
116
117    async fn run(self) {
118        // Start tracker
119        let mut tracker_task = self.tracker.start();
120
121        // Start router
122        let mut router_task = self.router.start(self.channels);
123
124        // Start spawner
125        let (spawner, spawner_mailbox) = spawner::Actor::new(
126            self.context.with_label("spawner"),
127            spawner::Config {
128                mailbox_size: self.cfg.mailbox_size,
129                ping_frequency: self.cfg.ping_frequency,
130                allowed_ping_rate: self.cfg.allowed_ping_rate,
131            },
132        );
133        let mut spawner_task =
134            spawner.start(self.tracker_mailbox.clone(), self.router_mailbox.clone());
135
136        // Start listener
137        let stream_cfg = public_key::Config {
138            crypto: self.cfg.crypto,
139            namespace: union(&self.cfg.namespace, STREAM_SUFFIX),
140            max_message_size: self.cfg.max_message_size + types::MAX_PAYLOAD_DATA_OVERHEAD,
141            synchrony_bound: self.cfg.synchrony_bound,
142            max_handshake_age: self.cfg.max_handshake_age,
143            handshake_timeout: self.cfg.handshake_timeout,
144        };
145        let listener = listener::Actor::new(
146            self.context.with_label("listener"),
147            listener::Config {
148                address: self.cfg.listen,
149                stream_cfg: stream_cfg.clone(),
150                allowed_incoming_connection_rate: self.cfg.allowed_incoming_connection_rate,
151            },
152        );
153        let mut listener_task =
154            listener.start(self.tracker_mailbox.clone(), spawner_mailbox.clone());
155
156        // Start dialer
157        let dialer = dialer::Actor::new(
158            self.context.with_label("dialer"),
159            dialer::Config {
160                stream_cfg,
161                dial_frequency: self.cfg.dial_frequency,
162                query_frequency: self.cfg.query_frequency,
163            },
164        );
165        let mut dialer_task = dialer.start(self.tracker_mailbox, spawner_mailbox);
166
167        // Wait for first actor to exit
168        info!("network started");
169        let err = select! {
170            tracker = &mut tracker_task => {
171                debug!("tracker exited");
172                tracker
173            },
174            router = &mut router_task => {
175                debug!("router exited");
176                router
177            },
178            spawner = &mut spawner_task => {
179                debug!("spawner exited");
180                spawner
181            },
182            listener = &mut listener_task => {
183                debug!("listener exited");
184                listener
185            },
186            dialer = &mut dialer_task => {
187                debug!("dialer exited");
188                dialer
189            },
190        }
191        .unwrap_err();
192
193        // Log error
194        warn!(error=?err, "network shutdown");
195    }
196}