commonware_p2p/authenticated/lookup/
network.rs

1//! Implementation of an `authenticated` network.
2
3use super::{
4    actors::{dialer, listener, router, spawner, tracker},
5    channels::{self, Channels},
6    config::Config,
7    types,
8};
9use crate::{
10    authenticated::{mailbox::UnboundedMailbox, Mailbox},
11    Channel,
12};
13use commonware_cryptography::Signer;
14use commonware_macros::select;
15use commonware_runtime::{
16    spawn_cell, Clock, ContextCell, Handle, Metrics, Network as RNetwork, Quota, Resolver, Spawner,
17};
18use commonware_stream::Config as StreamConfig;
19use commonware_utils::union;
20use futures::channel::mpsc;
21use rand_core::CryptoRngCore;
22use std::{collections::HashSet, net::IpAddr};
23use tracing::{debug, info};
24
25/// Unique suffix for all messages signed in a stream.
26const STREAM_SUFFIX: &[u8] = b"_STREAM";
27
28/// Implementation of an `authenticated` network.
29pub struct Network<E: Spawner + Clock + CryptoRngCore + RNetwork + Metrics, C: Signer> {
30    context: ContextCell<E>,
31    cfg: Config<C>,
32
33    channels: Channels<C::PublicKey>,
34    tracker: tracker::Actor<E, C>,
35    tracker_mailbox: UnboundedMailbox<tracker::Message<C::PublicKey>>,
36    router: router::Actor<E, C::PublicKey>,
37    router_mailbox: Mailbox<router::Message<C::PublicKey>>,
38    listener: mpsc::Receiver<HashSet<IpAddr>>,
39}
40
41impl<E: Spawner + Clock + CryptoRngCore + RNetwork + Resolver + Metrics, C: Signer> Network<E, C> {
42    /// Create a new instance of an `authenticated` network.
43    ///
44    /// # Parameters
45    ///
46    /// * `cfg` - Configuration for the network.
47    ///
48    /// # Returns
49    ///
50    /// * A tuple containing the network instance and the oracle that
51    ///   can be used by a developer to configure which peers are authorized.
52    pub fn new(context: E, cfg: Config<C>) -> (Self, tracker::Oracle<C::PublicKey>) {
53        let (listener_mailbox, listener) = Mailbox::<HashSet<IpAddr>>::new(cfg.mailbox_size);
54        let (tracker, tracker_mailbox, oracle) = tracker::Actor::new(
55            context.with_label("tracker"),
56            tracker::Config {
57                crypto: cfg.crypto.clone(),
58                tracked_peer_sets: cfg.tracked_peer_sets,
59                allowed_connection_rate_per_peer: cfg.allowed_connection_rate_per_peer,
60                allow_private_ips: cfg.allow_private_ips,
61                allow_dns: cfg.allow_dns,
62                bypass_ip_check: cfg.bypass_ip_check,
63                listener: listener_mailbox,
64                block_duration: cfg.block_duration,
65            },
66        );
67        let (router, router_mailbox, messenger) = router::Actor::new(
68            context.with_label("router"),
69            router::Config {
70                mailbox_size: cfg.mailbox_size,
71            },
72        );
73        let channels = Channels::new(messenger, cfg.max_message_size);
74
75        (
76            Self {
77                context: ContextCell::new(context),
78                cfg,
79
80                channels,
81                tracker,
82                tracker_mailbox,
83                router,
84                router_mailbox,
85                listener,
86            },
87            oracle,
88        )
89    }
90
91    /// Register a new channel over the network.
92    ///
93    /// # Parameters
94    ///
95    /// * `channel` - Unique identifier for the channel.
96    /// * `rate` - Rate at which messages can be received over the channel.
97    /// * `backlog` - Maximum number of messages that can be queued on the channel before blocking.
98    ///
99    /// # Returns
100    ///
101    /// * A tuple containing the sender and receiver for the channel (how to communicate
102    ///   with external peers on the network). It is safe to close either the sender or receiver
103    ///   without impacting the ability to process messages on other channels.
104    #[allow(clippy::type_complexity)]
105    pub fn register(
106        &mut self,
107        channel: Channel,
108        rate: Quota,
109        backlog: usize,
110    ) -> (
111        channels::Sender<C::PublicKey, E>,
112        channels::Receiver<C::PublicKey>,
113    ) {
114        let clock = self
115            .context
116            .with_label(&format!("channel_{channel}"))
117            .take();
118        self.channels.register(channel, rate, backlog, clock)
119    }
120
121    /// Starts the network.
122    ///
123    /// After the network is started, it is not possible to add more channels.
124    pub fn start(mut self) -> Handle<()> {
125        spawn_cell!(self.context, self.run().await)
126    }
127
128    async fn run(self) {
129        // Start tracker
130        let mut tracker_task = self.tracker.start();
131
132        // Start router
133        let mut router_task = self.router.start(self.channels);
134
135        // Start spawner
136        let (spawner, spawner_mailbox) = spawner::Actor::new(
137            self.context.with_label("spawner"),
138            spawner::Config {
139                mailbox_size: self.cfg.mailbox_size,
140                ping_frequency: self.cfg.ping_frequency,
141            },
142        );
143        let mut spawner_task =
144            spawner.start(self.tracker_mailbox.clone(), self.router_mailbox.clone());
145
146        // Start listener
147        let stream_cfg = StreamConfig {
148            signing_key: self.cfg.crypto,
149            namespace: union(&self.cfg.namespace, STREAM_SUFFIX),
150            max_message_size: self
151                .cfg
152                .max_message_size
153                .saturating_add(types::MAX_PAYLOAD_DATA_OVERHEAD),
154            synchrony_bound: self.cfg.synchrony_bound,
155            max_handshake_age: self.cfg.max_handshake_age,
156            handshake_timeout: self.cfg.handshake_timeout,
157        };
158        let listener = listener::Actor::new(
159            self.context.with_label("listener"),
160            listener::Config {
161                address: self.cfg.listen,
162                stream_cfg: stream_cfg.clone(),
163                allow_private_ips: self.cfg.allow_private_ips,
164                bypass_ip_check: self.cfg.bypass_ip_check,
165                max_concurrent_handshakes: self.cfg.max_concurrent_handshakes,
166                allowed_handshake_rate_per_ip: self.cfg.allowed_handshake_rate_per_ip,
167                allowed_handshake_rate_per_subnet: self.cfg.allowed_handshake_rate_per_subnet,
168            },
169            self.listener,
170        );
171        let mut listener_task =
172            listener.start(self.tracker_mailbox.clone(), spawner_mailbox.clone());
173
174        // Start dialer
175        let dialer = dialer::Actor::new(
176            self.context.with_label("dialer"),
177            dialer::Config {
178                stream_cfg,
179                dial_frequency: self.cfg.dial_frequency,
180                query_frequency: self.cfg.query_frequency,
181                allow_private_ips: self.cfg.allow_private_ips,
182            },
183        );
184        let mut dialer_task = dialer.start(self.tracker_mailbox, spawner_mailbox);
185
186        let mut shutdown = self.context.stopped();
187
188        // Wait for first actor to exit
189        info!("network started");
190        select! {
191            _ = &mut shutdown => {
192                debug!("context shutdown, stopping network");
193            },
194            tracker = &mut tracker_task => {
195                panic!("tracker exited unexpectedly: {tracker:?}");
196            },
197            router = &mut router_task => {
198                panic!("router exited unexpectedly: {router:?}");
199            },
200            spawner = &mut spawner_task => {
201                panic!("spawner exited unexpectedly: {spawner:?}");
202            },
203            listener = &mut listener_task => {
204                panic!("listener exited unexpectedly: {listener:?}");
205            },
206            dialer = &mut dialer_task => {
207                panic!("dialer exited unexpectedly: {dialer:?}");
208            },
209        }
210    }
211}