bee_network/
init.rs

1// Copyright 2020-2021 IOTA Stiftung
2// SPDX-License-Identifier: Apache-2.0
3
4#![cfg(feature = "full")]
5
6use super::{
7    config::NetworkConfig,
8    error::Error,
9    peer::{
10        info::{PeerInfo, PeerRelation},
11        list::{PeerList, PeerListWrapper},
12    },
13    service::{
14        command::{command_channel, NetworkCommandSender},
15        event::{event_channel, Event, InternalEvent, NetworkEventReceiver},
16    },
17    Keypair, PeerId,
18};
19
20use crate::{
21    alias,
22    network::host::NetworkHostConfig,
23    service::host::{self, ServiceHostConfig},
24    swarm::builder::build_swarm,
25};
26
27use libp2p::identity;
28use log::info;
29use once_cell::sync::OnceCell;
30
31pub mod global {
32    use super::*;
33
34    static RECONNECT_INTERVAL_SECS: OnceCell<u64> = OnceCell::new();
35    static NETWORK_ID: OnceCell<u64> = OnceCell::new();
36    static MAX_UNKNOWN_PEERS: OnceCell<usize> = OnceCell::new();
37
38    pub fn set_reconnect_interval_secs(reconnect_interval_secs: u64) {
39        if cfg!(test) {
40            let _ = RECONNECT_INTERVAL_SECS.set(reconnect_interval_secs);
41        } else {
42            RECONNECT_INTERVAL_SECS
43                .set(reconnect_interval_secs)
44                .expect("oncecell set");
45        }
46    }
47    pub fn reconnect_interval_secs() -> u64 {
48        *RECONNECT_INTERVAL_SECS.get().expect("oncecell get")
49    }
50
51    pub fn set_network_id(network_id: u64) {
52        if cfg!(test) {
53            let _ = NETWORK_ID.set(network_id);
54        } else {
55            NETWORK_ID.set(network_id).expect("oncecell set");
56        }
57    }
58    pub fn network_id() -> u64 {
59        *NETWORK_ID.get().expect("oncecell get")
60    }
61
62    pub fn set_max_unknown_peers(max_unknown_peers: usize) {
63        if cfg!(test) {
64            let _ = MAX_UNKNOWN_PEERS.set(max_unknown_peers);
65        } else {
66            MAX_UNKNOWN_PEERS.set(max_unknown_peers).expect("oncecell set");
67        }
68    }
69    pub fn max_unknown_peers() -> usize {
70        *MAX_UNKNOWN_PEERS.get().expect("oncecell get")
71    }
72}
73
74/// Initializes a "standalone" version of the network layer.
75pub mod standalone {
76    use super::*;
77    use crate::{network::host::standalone::NetworkHost, service::host::standalone::ServiceHost};
78
79    use futures::channel::oneshot;
80
81    use std::future::Future;
82
83    /// Initializes the network.
84    pub async fn init(
85        config: NetworkConfig,
86        keys: Keypair,
87        network_id: u64,
88        shutdown: impl Future + Send + Unpin + 'static,
89    ) -> Result<(NetworkCommandSender, NetworkEventReceiver), Error> {
90        let (network_config, service_config, network_command_sender, network_event_receiver) =
91            super::init(config, keys, network_id)?;
92
93        let (shutdown_signal_tx1, shutdown_signal_rx1) = oneshot::channel::<()>();
94        let (shutdown_signal_tx2, shutdown_signal_rx2) = oneshot::channel::<()>();
95
96        tokio::spawn(async move {
97            shutdown.await;
98
99            shutdown_signal_tx1.send(()).expect("sending shutdown signal");
100            shutdown_signal_tx2.send(()).expect("sending shutdown signal");
101        });
102
103        ServiceHost::new(shutdown_signal_rx1).start(service_config).await;
104        NetworkHost::new(shutdown_signal_rx2).start(network_config).await;
105
106        Ok((network_command_sender, network_event_receiver))
107    }
108}
109
110/// Initializes an "integrated" version of the network layer, which is used in the Bee node.
111pub mod integrated {
112    use super::*;
113    use crate::{network::host::integrated::NetworkHost, service::host::integrated::ServiceHost};
114
115    use bee_runtime::node::{Node, NodeBuilder};
116
117    /// Initializes the network.
118    pub async fn init<N: Node>(
119        config: NetworkConfig,
120        keys: Keypair,
121        network_id: u64,
122        mut node_builder: N::Builder,
123    ) -> Result<(N::Builder, NetworkEventReceiver), Error> {
124        let (host_config, service_config, network_command_sender, network_event_receiver) =
125            super::init(config, keys, network_id)?;
126
127        node_builder = node_builder
128            .with_worker_cfg::<NetworkHost>(host_config)
129            .with_worker_cfg::<ServiceHost>(service_config)
130            .with_resource(network_command_sender);
131
132        Ok((node_builder, network_event_receiver))
133    }
134}
135
136fn init(
137    config: NetworkConfig,
138    keys: Keypair,
139    network_id: u64,
140) -> Result<
141    (
142        NetworkHostConfig,
143        ServiceHostConfig,
144        NetworkCommandSender,
145        NetworkEventReceiver,
146    ),
147    Error,
148> {
149    let NetworkConfig {
150        bind_multiaddr,
151        reconnect_interval_secs,
152        max_unknown_peers,
153        static_peers: peers,
154    } = config;
155
156    global::set_reconnect_interval_secs(reconnect_interval_secs);
157    global::set_network_id(network_id);
158    global::set_max_unknown_peers(max_unknown_peers);
159
160    let (command_sender, command_receiver) = command_channel();
161    let (internal_command_sender, internal_command_receiver) = command_channel();
162
163    let (event_sender, event_receiver) = event_channel::<Event>();
164    let (internal_event_sender, internal_event_receiver) = event_channel::<InternalEvent>();
165
166    let local_keys = identity::Keypair::Ed25519(keys);
167    let local_id = PeerId::from_public_key(local_keys.public());
168
169    info!("Local Id: {}", local_id);
170
171    event_sender
172        .send(Event::LocalIdCreated { local_id })
173        .map_err(|_| Error::LocalIdAnnouncementFailed)?;
174
175    let peerlist = PeerListWrapper::new(PeerList::from_peers(local_id, peers.iter().cloned().collect()));
176
177    for peer in peers.into_iter() {
178        let peer_id = peer.peer_id;
179        event_sender
180            .send(Event::PeerAdded {
181                peer_id,
182                info: PeerInfo {
183                    address: peer.multiaddr,
184                    alias: peer.alias.unwrap_or_else(|| alias!(peer_id).into()),
185                    relation: PeerRelation::Known,
186                },
187            })
188            .map_err(|_| Error::StaticPeersAnnouncementFailed)?;
189    }
190
191    // Create the transport layer
192    let swarm = build_swarm(&local_keys, internal_event_sender.clone()).map_err(|_| Error::CreatingTransportFailed)?;
193
194    let network_host_config = NetworkHostConfig {
195        internal_event_sender: internal_event_sender.clone(),
196        internal_command_receiver,
197        peerlist: peerlist.clone(),
198        swarm,
199        bind_multiaddr,
200    };
201
202    let service_host_config = ServiceHostConfig {
203        local_keys,
204        senders: host::Senders {
205            events: event_sender,
206            internal_events: internal_event_sender,
207            internal_commands: internal_command_sender,
208        },
209        receivers: host::Receivers {
210            commands: command_receiver,
211            internal_events: internal_event_receiver,
212        },
213        peerlist,
214    };
215
216    let network_command_sender = NetworkCommandSender::new(command_sender);
217    let network_event_receiver = NetworkEventReceiver::new(event_receiver);
218
219    Ok((
220        network_host_config,
221        service_host_config,
222        network_command_sender,
223        network_event_receiver,
224    ))
225}