1use super::{
5 command::{Command, CommandReceiver, CommandSender},
6 error::Error,
7 event::{Event, EventSender, InternalEvent, InternalEventReceiver, InternalEventSender},
8};
9
10use crate::{
11 alias,
12 init::global::reconnect_interval_secs,
13 peer::{
14 error::Error as PeerError,
15 info::{PeerInfo, PeerRelation},
16 list::PeerListWrapper as PeerList,
17 },
18 swarm::protocols::iota_gossip,
19};
20
21use bee_runtime::shutdown_stream::ShutdownStream;
22
23use futures::{
24 channel::oneshot,
25 io::{BufReader, BufWriter},
26 AsyncReadExt, StreamExt,
27};
28use libp2p::{identity, Multiaddr, PeerId};
29use log::*;
30use rand::Rng;
31use tokio::time::{self, Duration, Instant};
32use tokio_stream::wrappers::{IntervalStream, UnboundedReceiverStream};
33
34const MAX_PEER_STATE_CHECKER_DELAY_MILLIS: u64 = 2000;
35
36pub struct ServiceHostConfig {
37 pub local_keys: identity::Keypair,
38 pub senders: Senders,
39 pub receivers: Receivers,
40 pub peerlist: PeerList,
41}
42
43#[derive(Clone)]
44pub struct Senders {
45 pub events: EventSender,
46 pub internal_events: InternalEventSender,
47 pub internal_commands: CommandSender,
48}
49
50pub struct Receivers {
51 pub commands: CommandReceiver,
52 pub internal_events: InternalEventReceiver,
53}
54
55type Shutdown = oneshot::Receiver<()>;
56
57const IO_BUFFER_LEN: usize = 32 * 1024;
58
59pub mod integrated {
60 use super::*;
61
62 use bee_runtime::{node::Node, worker::Worker};
63
64 use async_trait::async_trait;
65
66 use std::{any::TypeId, convert::Infallible};
67
68 #[derive(Default)]
72 pub struct ServiceHost {}
73
74 #[async_trait]
75 impl<N: Node> Worker<N> for ServiceHost {
76 type Config = ServiceHostConfig;
77 type Error = Infallible;
78
79 fn dependencies() -> &'static [TypeId] {
80 &[]
81 }
82
83 async fn start(node: &mut N, config: Self::Config) -> Result<Self, Self::Error> {
84 let ServiceHostConfig {
85 local_keys: _,
86 senders,
87 receivers,
88 peerlist,
89 } = config;
90
91 let Receivers {
92 commands,
93 internal_events,
94 } = receivers;
95
96 node.spawn::<Self, _, _>(|shutdown| {
97 command_processor(shutdown, commands, senders.clone(), peerlist.clone())
98 });
99 node.spawn::<Self, _, _>(|shutdown| {
100 event_processor(shutdown, internal_events, senders.clone(), peerlist.clone())
101 });
102 node.spawn::<Self, _, _>(|shutdown| peerstate_checker(shutdown, senders, peerlist));
103
104 info!("Network service started.");
105
106 Ok(Self::default())
107 }
108 }
109}
110
111pub mod standalone {
112 use super::*;
113
114 pub struct ServiceHost {
115 pub shutdown: oneshot::Receiver<()>,
116 }
117
118 impl ServiceHost {
119 pub fn new(shutdown: oneshot::Receiver<()>) -> Self {
120 Self { shutdown }
121 }
122
123 pub async fn start(self, config: ServiceHostConfig) {
124 let ServiceHost { shutdown } = self;
125 let ServiceHostConfig {
126 local_keys: _,
127 senders,
128 receivers,
129 peerlist,
130 } = config;
131
132 let Receivers {
133 commands,
134 internal_events,
135 } = receivers;
136
137 let (shutdown_tx1, shutdown_rx1) = oneshot::channel::<()>();
138 let (shutdown_tx2, shutdown_rx2) = oneshot::channel::<()>();
139 let (shutdown_tx3, shutdown_rx3) = oneshot::channel::<()>();
140
141 tokio::spawn(async move {
142 shutdown.await.expect("receiving shutdown signal");
143
144 shutdown_tx1.send(()).expect("receiving shutdown signal");
145 shutdown_tx2.send(()).expect("receiving shutdown signal");
146 shutdown_tx3.send(()).expect("receiving shutdown signal");
147 });
148 tokio::spawn(command_processor(
149 shutdown_rx1,
150 commands,
151 senders.clone(),
152 peerlist.clone(),
153 ));
154 tokio::spawn(event_processor(
155 shutdown_rx2,
156 internal_events,
157 senders.clone(),
158 peerlist.clone(),
159 ));
160 tokio::spawn(peerstate_checker(shutdown_rx3, senders, peerlist));
161
162 info!("Network service started.");
163 }
164 }
165}
166
167async fn command_processor(shutdown: Shutdown, commands: CommandReceiver, senders: Senders, peerlist: PeerList) {
168 debug!("Command processor running.");
169
170 let mut commands = ShutdownStream::new(shutdown, UnboundedReceiverStream::new(commands));
171
172 while let Some(command) = commands.next().await {
173 if let Err(e) = process_command(command, &senders, &peerlist).await {
174 error!("Error processing command. Cause: {}", e);
175 continue;
176 }
177 }
178
179 debug!("Command processor stopped.");
180}
181
182async fn event_processor(shutdown: Shutdown, events: InternalEventReceiver, senders: Senders, peerlist: PeerList) {
183 debug!("Event processor running.");
184
185 let mut internal_events = ShutdownStream::new(shutdown, UnboundedReceiverStream::new(events));
186
187 while let Some(internal_event) = internal_events.next().await {
188 if let Err(e) = process_internal_event(internal_event, &senders, &peerlist).await {
189 error!("Error processing internal event. Cause: {}", e);
190 continue;
191 }
192 }
193
194 debug!("Event processor stopped.");
195}
196
197async fn peerstate_checker(shutdown: Shutdown, senders: Senders, peerlist: PeerList) {
199 debug!("Peer checker running.");
200
201 let Senders { internal_commands, .. } = senders;
202
203 let delay = Duration::from_millis(rand::thread_rng().gen_range(0u64..MAX_PEER_STATE_CHECKER_DELAY_MILLIS));
208 let start = Instant::now() + delay;
209
210 let period = Duration::from_secs(reconnect_interval_secs());
212
213 let mut interval = ShutdownStream::new(shutdown, IntervalStream::new(time::interval_at(start, period)));
214
215 while interval.next().await.is_some() {
218 let peerlist = peerlist.0.read().await;
219
220 let num_known = peerlist.filter_count(|info, _| info.relation.is_known());
221 let num_connected_known = peerlist.filter_count(|info, state| info.relation.is_known() && state.is_connected());
222 let num_connected_unknown =
223 peerlist.filter_count(|info, state| info.relation.is_unknown() && state.is_connected());
224
225 info!(
226 "Connected peers: known {}/{} unknown {}.",
227 num_connected_known, num_known, num_connected_unknown,
228 );
229
230 for (peer_id, info) in peerlist.filter_info(|info, state| info.relation.is_known() && state.is_disconnected()) {
231 info!("Trying to connect to: {} ({}).", info.alias, alias!(peer_id));
232
233 let _ = internal_commands.send(Command::DialPeer { peer_id });
235 }
236 }
237
238 debug!("Peer checker stopped.");
239}
240
241async fn process_command(command: Command, senders: &Senders, peerlist: &PeerList) -> Result<(), Error> {
242 trace!("Received {:?}.", command);
243
244 match command {
245 Command::AddPeer {
246 peer_id,
247 multiaddr,
248 alias,
249 relation,
250 } => {
251 let alias = alias.unwrap_or_else(|| alias!(peer_id).to_string());
252
253 add_peer(peer_id, multiaddr, alias, relation, senders, peerlist).await?;
254 }
255
256 Command::BanAddress { address } => {
257 peerlist.0.write().await.ban_address(address.clone())?;
258
259 senders
260 .events
261 .send(Event::AddressBanned { address })
262 .map_err(|_| Error::SendingEventFailed)?;
263 }
264
265 Command::BanPeer { peer_id } => {
266 peerlist.0.write().await.ban_peer(peer_id)?;
267
268 senders
269 .events
270 .send(Event::PeerBanned { peer_id })
271 .map_err(|_| Error::SendingEventFailed)?;
272 }
273
274 Command::ChangeRelation { peer_id, to } => {
275 peerlist
276 .0
277 .write()
278 .await
279 .update_info(&peer_id, |info| info.relation = to)?;
280 }
281
282 Command::DialAddress { address } => {
283 senders
284 .internal_commands
285 .send(Command::DialAddress { address })
286 .map_err(|_| Error::SendingCommandFailed)?;
287 }
288
289 Command::DialPeer { peer_id } => {
290 senders
291 .internal_commands
292 .send(Command::DialPeer { peer_id })
293 .map_err(|_| Error::SendingCommandFailed)?;
294 }
295
296 Command::DisconnectPeer { peer_id } => {
297 disconnect_peer(peer_id, senders, peerlist).await?;
298 }
299
300 Command::RemovePeer { peer_id } => {
301 remove_peer(peer_id, senders, peerlist).await?;
302 }
303
304 Command::UnbanAddress { address } => {
305 peerlist.0.write().await.unban_address(&address)?;
306
307 senders
308 .events
309 .send(Event::AddressUnbanned { address })
310 .map_err(|_| Error::SendingEventFailed)?;
311 }
312
313 Command::UnbanPeer { peer_id } => {
314 peerlist.0.write().await.unban_peer(&peer_id)?;
315
316 senders
317 .events
318 .send(Event::PeerUnbanned { peer_id })
319 .map_err(|_| Error::SendingEventFailed)?;
320 }
321 }
322
323 Ok(())
324}
325
326async fn process_internal_event(
327 internal_event: InternalEvent,
328 senders: &Senders,
329 peerlist: &PeerList,
330) -> Result<(), Error> {
331 match internal_event {
332 InternalEvent::AddressBound { address } => {
333 senders
334 .events
335 .send(Event::AddressBound { address })
336 .map_err(|_| Error::SendingEventFailed)?;
337 }
338
339 InternalEvent::ProtocolDropped { peer_id } => {
340 let mut peerlist = peerlist.0.write().await;
341
342 let _ = peerlist.update_state(&peer_id, |state| state.set_disconnected());
344
345 let _ = peerlist.filter_remove(&peer_id, |peer_info, _| peer_info.relation.is_unknown());
347
348 drop(peerlist);
350
351 senders
352 .events
353 .send(Event::PeerDisconnected { peer_id })
354 .map_err(|_| Error::SendingEventFailed)?;
355 }
356
357 InternalEvent::ProtocolEstablished {
358 peer_id,
359 peer_addr,
360 origin,
361 substream,
362 } => {
363 let mut peerlist = peerlist.0.write().await;
364 let mut peer_added = false;
365
366 let accepted = peerlist.accepts_incoming_peer(&peer_id, &peer_addr);
371
372 if accepted.is_ok() {
373 if !peerlist.contains(&peer_id) {
375 let peer_info = PeerInfo {
376 address: peer_addr,
377 alias: alias!(peer_id).to_string(),
378 relation: PeerRelation::Unknown,
379 };
380 peerlist.insert_peer(peer_id, peer_info).map_err(|(_, _, e)| e)?;
381 peer_added = true;
382 }
383
384 let peer_info = peerlist.info(&peer_id).unwrap();
387
388 let (r, w) = substream.split();
390
391 let reader = BufReader::with_capacity(IO_BUFFER_LEN, r);
392 let writer = BufWriter::with_capacity(IO_BUFFER_LEN, w);
393
394 let (incoming_tx, incoming_rx) = iota_gossip::channel();
395 let (outgoing_tx, outgoing_rx) = iota_gossip::channel();
396
397 iota_gossip::start_incoming_processor(peer_id, reader, incoming_tx, senders.internal_events.clone());
398 iota_gossip::start_outgoing_processor(peer_id, writer, outgoing_rx, senders.internal_events.clone());
399
400 let _ = peerlist.update_state(&peer_id, |state| state.set_connected(outgoing_tx.clone()));
402
403 drop(peerlist);
405
406 if peer_added {
410 senders
411 .events
412 .send(Event::PeerAdded {
413 peer_id,
414 info: peer_info.clone(),
415 })
416 .map_err(|_| Error::SendingEventFailed)?;
417 }
418
419 info!(
420 "Established ({}) protocol with {} ({}).",
421 origin,
422 peer_info.alias,
423 alias!(peer_id)
424 );
425
426 senders
427 .events
428 .send(Event::PeerConnected {
429 peer_id,
430 info: peer_info,
431 gossip_in: incoming_rx,
432 gossip_out: outgoing_tx,
433 })
434 .map_err(|_| Error::SendingEventFailed)?;
435 } else {
436 debug!("{}", accepted.unwrap_err());
439 }
440 }
441 }
442
443 Ok(())
444}
445
446async fn add_peer(
447 peer_id: PeerId,
448 address: Multiaddr,
449 alias: String,
450 relation: PeerRelation,
451 senders: &Senders,
452 peerlist: &PeerList,
453) -> Result<(), Error> {
454 let peer_info = PeerInfo {
455 address,
456 alias,
457 relation,
458 };
459
460 let mut peerlist = peerlist.0.write().await;
461
462 match peerlist.insert_peer(peer_id, peer_info) {
464 Ok(()) => {
465 let info = peerlist.info(&peer_id).unwrap();
468
469 drop(peerlist);
471
472 senders
473 .events
474 .send(Event::PeerAdded { peer_id, info })
475 .map_err(|_| Error::SendingEventFailed)?;
476
477 Ok(())
478 }
479 Err((peer_id, peer_info, mut e)) => {
480 if matches!(e, PeerError::PeerIsDuplicate(_)) {
489 match peerlist.update_info(&peer_id, |info| *info = peer_info.clone()) {
490 Ok(()) => {
491 drop(peerlist);
493
494 senders
495 .events
496 .send(Event::PeerAdded {
497 peer_id,
498 info: peer_info,
499 })
500 .map_err(|_| Error::SendingEventFailed)?;
501
502 return Ok(());
503 }
504 Err(error) => e = error,
505 }
506 }
507
508 drop(peerlist);
510
511 senders
512 .events
513 .send(Event::CommandFailed {
514 command: Command::AddPeer {
515 peer_id,
516 multiaddr: peer_info.address,
517 alias: Some(peer_info.alias),
520 relation: peer_info.relation,
521 },
522 reason: e.clone(),
523 })
524 .map_err(|_| Error::SendingEventFailed)?;
525
526 Err(e.into())
527 }
528 }
529}
530
531async fn remove_peer(peer_id: PeerId, senders: &Senders, peerlist: &PeerList) -> Result<(), Error> {
532 disconnect_peer(peer_id, senders, peerlist).await?;
533
534 let peer_removal = peerlist.0.write().await.remove(&peer_id);
535
536 match peer_removal {
537 Ok(_peer_info) => {
538 senders
539 .events
540 .send(Event::PeerRemoved { peer_id })
541 .map_err(|_| Error::SendingEventFailed)?;
542
543 Ok(())
544 }
545 Err(e) => {
546 senders
547 .events
548 .send(Event::CommandFailed {
549 command: Command::RemovePeer { peer_id },
550 reason: e.clone(),
551 })
552 .map_err(|_| Error::SendingEventFailed)?;
553
554 Err(e.into())
555 }
556 }
557}
558
559async fn disconnect_peer(peer_id: PeerId, senders: &Senders, peerlist: &PeerList) -> Result<(), Error> {
560 let state_update = peerlist
561 .0
562 .write()
563 .await
564 .update_state(&peer_id, |state| state.set_disconnected());
565
566 match state_update {
567 Ok(Some(gossip_sender)) => {
568 senders
572 .events
573 .send(Event::PeerDisconnected { peer_id })
574 .map_err(|_| Error::SendingEventFailed)?;
575
576 let _ = gossip_sender.send(Vec::new());
579
580 Ok(())
581 }
582 Ok(None) => {
583 Ok(())
585 }
586 Err(e) => {
587 senders
588 .events
589 .send(Event::CommandFailed {
590 command: Command::DisconnectPeer { peer_id },
591 reason: e.clone(),
592 })
593 .map_err(|_| Error::SendingEventFailed)?;
594
595 Err(e.into())
596 }
597 }
598}