Skip to main content

commonware_p2p/authenticated/lookup/
network.rs

1//! Implementation of an `authenticated` network.
2
3use super::{
4    actors::{dialer, listener, router, spawner, tracker},
5    channels::{self, Channels},
6    config::Config,
7    types,
8};
9use crate::Channel;
10use commonware_cryptography::Signer;
11use commonware_macros::select;
12use commonware_runtime::{
13    spawn_cell, BufferPooler, Clock, ContextCell, Handle, Metrics, Network as RNetwork, Quota,
14    Resolver, Spawner,
15};
16use commonware_stream::encrypted::Config as StreamConfig;
17use commonware_utils::union;
18use rand_core::CryptoRngCore;
19use tracing::{debug, info};
20
21/// Unique suffix for all messages signed in a stream.
22const STREAM_SUFFIX: &[u8] = b"_STREAM";
23
24/// Implementation of an `authenticated` network.
25pub struct Network<
26    E: Spawner + BufferPooler + Clock + CryptoRngCore + RNetwork + Metrics,
27    C: Signer,
28> {
29    context: ContextCell<E>,
30    cfg: Config<C>,
31
32    channels: Channels<C::PublicKey>,
33    tracker: tracker::Actor<E, C>,
34    tracker_mailbox: tracker::Mailbox<C::PublicKey>,
35    router: router::Actor<E, C::PublicKey>,
36    router_mailbox: router::Mailbox<C::PublicKey>,
37    listener: listener::Updates,
38}
39
40impl<
41        E: Spawner + BufferPooler + Clock + CryptoRngCore + RNetwork + Resolver + Metrics,
42        C: Signer,
43    > Network<E, C>
44{
45    /// Create a new instance of an `authenticated` network.
46    ///
47    /// # Parameters
48    ///
49    /// * `cfg` - Configuration for the network.
50    ///
51    /// # Returns
52    ///
53    /// * A tuple containing the network instance and the oracle that
54    ///   can be used by a developer to configure which peers are authorized.
55    pub fn new(context: E, cfg: Config<C>) -> (Self, tracker::Oracle<C::PublicKey>) {
56        let (listener_mailbox, listener) = listener::Mailbox::new();
57        let (tracker, tracker_mailbox, oracle) = tracker::Actor::new(
58            context.child("tracker"),
59            tracker::Config {
60                crypto: cfg.crypto.clone(),
61                mailbox_size: cfg.mailbox_size,
62                tracked_peer_sets: cfg.tracked_peer_sets,
63                peer_connection_cooldown: cfg.peer_connection_cooldown,
64                allow_private_ips: cfg.allow_private_ips,
65                allow_dns: cfg.allow_dns,
66                bypass_ip_check: cfg.bypass_ip_check,
67                listener: listener_mailbox,
68                block_duration: cfg.block_duration,
69            },
70        );
71        let (router, router_mailbox, messenger) = router::Actor::new(
72            context.child("router"),
73            router::Config {
74                mailbox_size: cfg.mailbox_size,
75            },
76        );
77        let channels = Channels::new(messenger, cfg.max_message_size);
78
79        (
80            Self {
81                context: ContextCell::new(context),
82                cfg,
83
84                channels,
85                tracker,
86                tracker_mailbox,
87                router,
88                router_mailbox,
89                listener,
90            },
91            oracle,
92        )
93    }
94
95    /// Register a new channel over the network.
96    ///
97    /// # Parameters
98    ///
99    /// * `channel` - Unique identifier for the channel.
100    /// * `rate` - Rate at which messages can be received over the channel.
101    /// * `backlog` - Maximum number of messages that can be queued on the channel before blocking.
102    ///
103    /// # Returns
104    ///
105    /// * A tuple containing the sender and receiver for the channel (how to communicate
106    ///   with external peers on the network). It is safe to close either the sender or receiver
107    ///   without impacting the ability to process messages on other channels.
108    #[allow(clippy::type_complexity)]
109    pub fn register(
110        &mut self,
111        channel: Channel,
112        rate: Quota,
113        backlog: usize,
114    ) -> (
115        channels::Sender<C::PublicKey, E>,
116        channels::Receiver<C::PublicKey>,
117    ) {
118        let context = self
119            .context
120            .child("channel")
121            .with_attribute("index", channel);
122        self.channels.register(channel, rate, backlog, context)
123    }
124
125    /// Starts the network.
126    ///
127    /// After the network is started, it is not possible to add more channels.
128    pub fn start(mut self) -> Handle<()> {
129        spawn_cell!(self.context, self.run())
130    }
131
132    async fn run(self) {
133        // Start tracker
134        let mut tracker_task = self.tracker.start();
135
136        // Start router
137        let mut router_task = self.router.start(self.channels);
138
139        // Start spawner
140        let (spawner, spawner_mailbox) = spawner::Actor::new(
141            self.context.child("spawner"),
142            spawner::Config {
143                mailbox_size: self.cfg.mailbox_size,
144                send_batch_size: self.cfg.send_batch_size,
145                ping_frequency: self.cfg.ping_frequency,
146            },
147        );
148        let mut spawner_task =
149            spawner.start(self.tracker_mailbox.clone(), self.router_mailbox.clone());
150
151        // Start listener
152        let stream_cfg = StreamConfig {
153            signing_key: self.cfg.crypto,
154            namespace: union(&self.cfg.namespace, STREAM_SUFFIX),
155            max_message_size: self
156                .cfg
157                .max_message_size
158                .saturating_add(types::MAX_PAYLOAD_DATA_OVERHEAD),
159            synchrony_bound: self.cfg.synchrony_bound,
160            max_handshake_age: self.cfg.max_handshake_age,
161            handshake_timeout: self.cfg.handshake_timeout,
162        };
163        let listener = listener::Actor::new(
164            self.context.child("listener"),
165            listener::Config {
166                address: self.cfg.listen,
167                stream_cfg: stream_cfg.clone(),
168                allow_private_ips: self.cfg.allow_private_ips,
169                bypass_ip_check: self.cfg.bypass_ip_check,
170                max_concurrent_handshakes: self.cfg.max_concurrent_handshakes,
171                allowed_handshake_rate_per_ip: self.cfg.allowed_handshake_rate_per_ip,
172                allowed_handshake_rate_per_subnet: self.cfg.allowed_handshake_rate_per_subnet,
173            },
174            self.listener,
175        );
176        let mut listener_task =
177            listener.start(self.tracker_mailbox.clone(), spawner_mailbox.clone());
178
179        // Start dialer
180        let dialer = dialer::Actor::new(
181            self.context.child("dialer"),
182            dialer::Config {
183                stream_cfg,
184                dial_frequency: self.cfg.dial_frequency,
185                peer_connection_cooldown: self.cfg.peer_connection_cooldown,
186                allow_private_ips: self.cfg.allow_private_ips,
187            },
188        );
189        let mut dialer_task = dialer.start(self.tracker_mailbox, spawner_mailbox);
190
191        let mut shutdown = self.context.stopped();
192
193        // If any task completes, the network should stop
194        info!("network started");
195        select! {
196            _ = &mut shutdown => {
197                debug!("context shutdown, stopping network");
198            },
199            tracker = &mut tracker_task => {
200                debug!(?tracker, "tracker stopped, shutting down network");
201            },
202            router = &mut router_task => {
203                debug!(?router, "router stopped, shutting down network");
204            },
205            spawner = &mut spawner_task => {
206                debug!(?spawner, "spawner stopped, shutting down network");
207            },
208            listener = &mut listener_task => {
209                debug!(?listener, "listener stopped, shutting down network");
210            },
211            dialer = &mut dialer_task => {
212                debug!(?dialer, "dialer stopped, shutting down network");
213            },
214        }
215    }
216}