1use super::error::Error;
5
6use crate::{
7 alias,
8 peer::{info::PeerInfo, list::PeerListWrapper as PeerList},
9 service::{
10 command::{Command, CommandReceiver},
11 event::{InternalEvent, InternalEventSender},
12 },
13 swarm::behavior::SwarmBehavior,
14};
15
16use futures::{channel::oneshot, StreamExt};
17use libp2p::{swarm::SwarmEvent, Multiaddr, PeerId, Swarm};
18use log::*;
19
20pub struct NetworkHostConfig {
21 pub internal_event_sender: InternalEventSender,
22 pub internal_command_receiver: CommandReceiver,
23 pub peerlist: PeerList,
24 pub swarm: Swarm<SwarmBehavior>,
25 pub bind_multiaddr: Multiaddr,
26}
27
28pub mod integrated {
29 use super::*;
30 use crate::service::host::integrated::ServiceHost;
31
32 use bee_runtime::{node::Node, worker::Worker};
33
34 use async_trait::async_trait;
35
36 use std::{any::TypeId, convert::Infallible};
37
38 #[derive(Default)]
42 pub struct NetworkHost {}
43
44 #[async_trait]
45 impl<N: Node> Worker<N> for NetworkHost {
46 type Config = NetworkHostConfig;
47 type Error = Infallible;
48
49 fn dependencies() -> &'static [TypeId] {
50 vec![TypeId::of::<ServiceHost>()].leak()
51 }
52
53 async fn start(node: &mut N, config: Self::Config) -> Result<Self, Self::Error> {
54 node.spawn::<Self, _, _>(|shutdown| async move {
55 network_host_processor(config, shutdown)
56 .await
57 .expect("network host processor");
58
59 info!("Network Host stopped.");
60 });
61
62 info!("Network Host started.");
63
64 Ok(Self::default())
65 }
66 }
67}
68
69pub mod standalone {
70 use super::*;
71
72 pub struct NetworkHost {
73 pub shutdown: oneshot::Receiver<()>,
74 }
75
76 impl NetworkHost {
77 pub fn new(shutdown: oneshot::Receiver<()>) -> Self {
78 Self { shutdown }
79 }
80
81 pub async fn start(self, config: NetworkHostConfig) {
82 let NetworkHost { shutdown } = self;
83
84 tokio::spawn(async move {
85 network_host_processor(config, shutdown)
86 .await
87 .expect("network host processor");
88
89 info!("Network Host stopped.");
90 });
91
92 info!("Network Host started.");
93 }
94 }
95}
96
97async fn network_host_processor(
98 config: NetworkHostConfig,
99 mut shutdown: oneshot::Receiver<()>,
100) -> Result<(), crate::Error> {
101 let NetworkHostConfig {
102 internal_event_sender,
103 mut internal_command_receiver,
104 peerlist,
105 mut swarm,
106 bind_multiaddr,
107 } = config;
108
109 info!("Binding to: {}", bind_multiaddr);
111 let _listener_id = Swarm::listen_on(&mut swarm, bind_multiaddr).map_err(|_| crate::Error::BindingAddressFailed)?;
112
113 loop {
114 tokio::select! {
115 _ = &mut shutdown => break,
116 event = (&mut swarm).next() => {
117 let event = event.ok_or(crate::Error::HostEventLoopError)?;
118 process_swarm_event(event, &internal_event_sender, &peerlist).await;
119 }
120 command = (&mut internal_command_receiver).recv() => {
121 let command = command.ok_or(crate::Error::HostEventLoopError)?;
122 process_internal_command(command, &mut swarm, &peerlist).await;
123 },
124 }
125 }
126
127 Ok(())
128}
129
130async fn process_swarm_event(
131 event: SwarmEvent<(), impl std::error::Error>,
132 internal_event_sender: &InternalEventSender,
133 peerlist: &PeerList,
134) {
135 match event {
136 SwarmEvent::NewListenAddr { address, .. } => {
137 debug!("Swarm event: new listen address {}.", address);
138
139 internal_event_sender
140 .send(InternalEvent::AddressBound {
141 address: address.clone(),
142 })
143 .expect("send error");
144
145 peerlist
146 .0
147 .write()
148 .await
149 .insert_local_addr(address)
150 .expect("insert_local_addr");
151 }
152 SwarmEvent::ConnectionEstablished { peer_id, .. } => {
153 debug!("Swarm event: connection established with {}.", alias!(peer_id));
154 }
155 SwarmEvent::ConnectionClosed { peer_id, .. } => {
156 debug!("Swarm event: connection closed with {}.", alias!(peer_id));
157 }
158 SwarmEvent::ListenerError { error, .. } => {
159 error!("Swarm event: listener error {}.", error);
160 }
161 SwarmEvent::Dialing(peer_id) => {
162 debug!("Swarm event: dialing {}.", alias!(peer_id));
164 }
165 SwarmEvent::IncomingConnection { send_back_addr, .. } => {
166 debug!("Swarm event: being dialed from {}.", send_back_addr);
167 }
168 _ => {}
169 }
170}
171
172async fn process_internal_command(internal_command: Command, swarm: &mut Swarm<SwarmBehavior>, peerlist: &PeerList) {
173 match internal_command {
174 Command::DialAddress { address } => {
175 if let Err(e) = dial_addr(swarm, address.clone(), peerlist).await {
176 warn!("{:?}", e);
177 }
178 }
179 Command::DialPeer { peer_id } => {
180 if let Err(e) = dial_peer(swarm, peer_id, peerlist).await {
181 warn!("{:?}", e);
182 }
183 }
184 _ => {}
185 }
186}
187
188async fn dial_addr(swarm: &mut Swarm<SwarmBehavior>, addr: Multiaddr, peerlist: &PeerList) -> Result<(), Error> {
189 if let Err(e) = peerlist.0.read().await.allows_dialing_addr(&addr) {
190 warn!("Dialing address {} denied. Cause: {:?}", addr, e);
191 return Err(Error::DialingAddressDenied(addr));
192 }
193
194 info!("Dialing address: {}.", addr);
195
196 Swarm::dial_addr(swarm, addr.clone()).map_err(|e| Error::DialingAddressFailed(addr, e))?;
197
198 Ok(())
199}
200
201async fn dial_peer(swarm: &mut Swarm<SwarmBehavior>, peer_id: PeerId, peerlist: &PeerList) -> Result<(), Error> {
202 if let Err(e) = peerlist.0.read().await.allows_dialing_peer(&peer_id) {
203 warn!("Dialing peer {} denied. Cause: {:?}", alias!(peer_id), e);
204 return Err(Error::DialingPeerDenied(peer_id));
205 }
206
207 let PeerInfo {
210 address: addr, alias, ..
211 } = peerlist.0.read().await.info(&peer_id).unwrap();
212
213 info!("Dialing peer: {} ({}).", alias, alias!(peer_id));
214
215 Swarm::dial_addr(swarm, addr).map_err(|e| Error::DialingPeerFailed(peer_id, e))?;
218
219 Ok(())
220}