bee_network/network/
host.rs

1// Copyright 2020-2021 IOTA Stiftung
2// SPDX-License-Identifier: Apache-2.0
3
4use super::error::Error;
5
6use crate::{
7    alias,
8    peer::{info::PeerInfo, list::PeerListWrapper as PeerList},
9    service::{
10        command::{Command, CommandReceiver},
11        event::{InternalEvent, InternalEventSender},
12    },
13    swarm::behavior::SwarmBehavior,
14};
15
16use futures::{channel::oneshot, StreamExt};
17use libp2p::{swarm::SwarmEvent, Multiaddr, PeerId, Swarm};
18use log::*;
19
20pub struct NetworkHostConfig {
21    pub internal_event_sender: InternalEventSender,
22    pub internal_command_receiver: CommandReceiver,
23    pub peerlist: PeerList,
24    pub swarm: Swarm<SwarmBehavior>,
25    pub bind_multiaddr: Multiaddr,
26}
27
28pub mod integrated {
29    use super::*;
30    use crate::service::host::integrated::ServiceHost;
31
32    use bee_runtime::{node::Node, worker::Worker};
33
34    use async_trait::async_trait;
35
36    use std::{any::TypeId, convert::Infallible};
37
38    /// A node worker, that deals with accepting and initiating connections with remote peers.
39    ///
40    /// NOTE: This type is only exported to be used as a worker dependency.
41    #[derive(Default)]
42    pub struct NetworkHost {}
43
44    #[async_trait]
45    impl<N: Node> Worker<N> for NetworkHost {
46        type Config = NetworkHostConfig;
47        type Error = Infallible;
48
49        fn dependencies() -> &'static [TypeId] {
50            vec![TypeId::of::<ServiceHost>()].leak()
51        }
52
53        async fn start(node: &mut N, config: Self::Config) -> Result<Self, Self::Error> {
54            node.spawn::<Self, _, _>(|shutdown| async move {
55                network_host_processor(config, shutdown)
56                    .await
57                    .expect("network host processor");
58
59                info!("Network Host stopped.");
60            });
61
62            info!("Network Host started.");
63
64            Ok(Self::default())
65        }
66    }
67}
68
69pub mod standalone {
70    use super::*;
71
72    pub struct NetworkHost {
73        pub shutdown: oneshot::Receiver<()>,
74    }
75
76    impl NetworkHost {
77        pub fn new(shutdown: oneshot::Receiver<()>) -> Self {
78            Self { shutdown }
79        }
80
81        pub async fn start(self, config: NetworkHostConfig) {
82            let NetworkHost { shutdown } = self;
83
84            tokio::spawn(async move {
85                network_host_processor(config, shutdown)
86                    .await
87                    .expect("network host processor");
88
89                info!("Network Host stopped.");
90            });
91
92            info!("Network Host started.");
93        }
94    }
95}
96
97async fn network_host_processor(
98    config: NetworkHostConfig,
99    mut shutdown: oneshot::Receiver<()>,
100) -> Result<(), crate::Error> {
101    let NetworkHostConfig {
102        internal_event_sender,
103        mut internal_command_receiver,
104        peerlist,
105        mut swarm,
106        bind_multiaddr,
107    } = config;
108
109    // Try binding to the configured bind address.
110    info!("Binding to: {}", bind_multiaddr);
111    let _listener_id = Swarm::listen_on(&mut swarm, bind_multiaddr).map_err(|_| crate::Error::BindingAddressFailed)?;
112
113    loop {
114        tokio::select! {
115            _ = &mut shutdown => break,
116            event = (&mut swarm).next() => {
117                let event = event.ok_or(crate::Error::HostEventLoopError)?;
118                process_swarm_event(event, &internal_event_sender, &peerlist).await;
119            }
120            command = (&mut internal_command_receiver).recv() => {
121                let command = command.ok_or(crate::Error::HostEventLoopError)?;
122                process_internal_command(command, &mut swarm, &peerlist).await;
123            },
124        }
125    }
126
127    Ok(())
128}
129
130async fn process_swarm_event(
131    event: SwarmEvent<(), impl std::error::Error>,
132    internal_event_sender: &InternalEventSender,
133    peerlist: &PeerList,
134) {
135    match event {
136        SwarmEvent::NewListenAddr { address, .. } => {
137            debug!("Swarm event: new listen address {}.", address);
138
139            internal_event_sender
140                .send(InternalEvent::AddressBound {
141                    address: address.clone(),
142                })
143                .expect("send error");
144
145            peerlist
146                .0
147                .write()
148                .await
149                .insert_local_addr(address)
150                .expect("insert_local_addr");
151        }
152        SwarmEvent::ConnectionEstablished { peer_id, .. } => {
153            debug!("Swarm event: connection established with {}.", alias!(peer_id));
154        }
155        SwarmEvent::ConnectionClosed { peer_id, .. } => {
156            debug!("Swarm event: connection closed with {}.", alias!(peer_id));
157        }
158        SwarmEvent::ListenerError { error, .. } => {
159            error!("Swarm event: listener error {}.", error);
160        }
161        SwarmEvent::Dialing(peer_id) => {
162            // TODO: strange, but this event is not actually fired when dialing. (open issue?)
163            debug!("Swarm event: dialing {}.", alias!(peer_id));
164        }
165        SwarmEvent::IncomingConnection { send_back_addr, .. } => {
166            debug!("Swarm event: being dialed from {}.", send_back_addr);
167        }
168        _ => {}
169    }
170}
171
172async fn process_internal_command(internal_command: Command, swarm: &mut Swarm<SwarmBehavior>, peerlist: &PeerList) {
173    match internal_command {
174        Command::DialAddress { address } => {
175            if let Err(e) = dial_addr(swarm, address.clone(), peerlist).await {
176                warn!("{:?}", e);
177            }
178        }
179        Command::DialPeer { peer_id } => {
180            if let Err(e) = dial_peer(swarm, peer_id, peerlist).await {
181                warn!("{:?}", e);
182            }
183        }
184        _ => {}
185    }
186}
187
188async fn dial_addr(swarm: &mut Swarm<SwarmBehavior>, addr: Multiaddr, peerlist: &PeerList) -> Result<(), Error> {
189    if let Err(e) = peerlist.0.read().await.allows_dialing_addr(&addr) {
190        warn!("Dialing address {} denied. Cause: {:?}", addr, e);
191        return Err(Error::DialingAddressDenied(addr));
192    }
193
194    info!("Dialing address: {}.", addr);
195
196    Swarm::dial_addr(swarm, addr.clone()).map_err(|e| Error::DialingAddressFailed(addr, e))?;
197
198    Ok(())
199}
200
201async fn dial_peer(swarm: &mut Swarm<SwarmBehavior>, peer_id: PeerId, peerlist: &PeerList) -> Result<(), Error> {
202    if let Err(e) = peerlist.0.read().await.allows_dialing_peer(&peer_id) {
203        warn!("Dialing peer {} denied. Cause: {:?}", alias!(peer_id), e);
204        return Err(Error::DialingPeerDenied(peer_id));
205    }
206
207    // Panic:
208    // We just checked, that the peer is fine to be dialed.
209    let PeerInfo {
210        address: addr, alias, ..
211    } = peerlist.0.read().await.info(&peer_id).unwrap();
212
213    info!("Dialing peer: {} ({}).", alias, alias!(peer_id));
214
215    // TODO: We also use `Swarm::dial_addr` here (instead of `Swarm::dial`) for now. See if it's better to change
216    // that.
217    Swarm::dial_addr(swarm, addr).map_err(|e| Error::DialingPeerFailed(peer_id, e))?;
218
219    Ok(())
220}