bee_network/service/
host.rs

1// Copyright 2020-2021 IOTA Stiftung
2// SPDX-License-Identifier: Apache-2.0
3
4use super::{
5    command::{Command, CommandReceiver, CommandSender},
6    error::Error,
7    event::{Event, EventSender, InternalEvent, InternalEventReceiver, InternalEventSender},
8};
9
10use crate::{
11    alias,
12    init::global::reconnect_interval_secs,
13    peer::{
14        error::Error as PeerError,
15        info::{PeerInfo, PeerRelation},
16        list::PeerListWrapper as PeerList,
17    },
18    swarm::protocols::iota_gossip,
19};
20
21use bee_runtime::shutdown_stream::ShutdownStream;
22
23use futures::{
24    channel::oneshot,
25    io::{BufReader, BufWriter},
26    AsyncReadExt, StreamExt,
27};
28use libp2p::{identity, Multiaddr, PeerId};
29use log::*;
30use rand::Rng;
31use tokio::time::{self, Duration, Instant};
32use tokio_stream::wrappers::{IntervalStream, UnboundedReceiverStream};
33
34const MAX_PEER_STATE_CHECKER_DELAY_MILLIS: u64 = 2000;
35
36pub struct ServiceHostConfig {
37    pub local_keys: identity::Keypair,
38    pub senders: Senders,
39    pub receivers: Receivers,
40    pub peerlist: PeerList,
41}
42
43#[derive(Clone)]
44pub struct Senders {
45    pub events: EventSender,
46    pub internal_events: InternalEventSender,
47    pub internal_commands: CommandSender,
48}
49
50pub struct Receivers {
51    pub commands: CommandReceiver,
52    pub internal_events: InternalEventReceiver,
53}
54
55type Shutdown = oneshot::Receiver<()>;
56
57const IO_BUFFER_LEN: usize = 32 * 1024;
58
59pub mod integrated {
60    use super::*;
61
62    use bee_runtime::{node::Node, worker::Worker};
63
64    use async_trait::async_trait;
65
66    use std::{any::TypeId, convert::Infallible};
67
68    /// A node worker, that deals with processing user commands, and publishing events.
69    ///
70    /// NOTE: This type is only exported to be used as a worker dependency.
71    #[derive(Default)]
72    pub struct ServiceHost {}
73
74    #[async_trait]
75    impl<N: Node> Worker<N> for ServiceHost {
76        type Config = ServiceHostConfig;
77        type Error = Infallible;
78
79        fn dependencies() -> &'static [TypeId] {
80            &[]
81        }
82
83        async fn start(node: &mut N, config: Self::Config) -> Result<Self, Self::Error> {
84            let ServiceHostConfig {
85                local_keys: _,
86                senders,
87                receivers,
88                peerlist,
89            } = config;
90
91            let Receivers {
92                commands,
93                internal_events,
94            } = receivers;
95
96            node.spawn::<Self, _, _>(|shutdown| {
97                command_processor(shutdown, commands, senders.clone(), peerlist.clone())
98            });
99            node.spawn::<Self, _, _>(|shutdown| {
100                event_processor(shutdown, internal_events, senders.clone(), peerlist.clone())
101            });
102            node.spawn::<Self, _, _>(|shutdown| peerstate_checker(shutdown, senders, peerlist));
103
104            info!("Network service started.");
105
106            Ok(Self::default())
107        }
108    }
109}
110
111pub mod standalone {
112    use super::*;
113
114    pub struct ServiceHost {
115        pub shutdown: oneshot::Receiver<()>,
116    }
117
118    impl ServiceHost {
119        pub fn new(shutdown: oneshot::Receiver<()>) -> Self {
120            Self { shutdown }
121        }
122
123        pub async fn start(self, config: ServiceHostConfig) {
124            let ServiceHost { shutdown } = self;
125            let ServiceHostConfig {
126                local_keys: _,
127                senders,
128                receivers,
129                peerlist,
130            } = config;
131
132            let Receivers {
133                commands,
134                internal_events,
135            } = receivers;
136
137            let (shutdown_tx1, shutdown_rx1) = oneshot::channel::<()>();
138            let (shutdown_tx2, shutdown_rx2) = oneshot::channel::<()>();
139            let (shutdown_tx3, shutdown_rx3) = oneshot::channel::<()>();
140
141            tokio::spawn(async move {
142                shutdown.await.expect("receiving shutdown signal");
143
144                shutdown_tx1.send(()).expect("receiving shutdown signal");
145                shutdown_tx2.send(()).expect("receiving shutdown signal");
146                shutdown_tx3.send(()).expect("receiving shutdown signal");
147            });
148            tokio::spawn(command_processor(
149                shutdown_rx1,
150                commands,
151                senders.clone(),
152                peerlist.clone(),
153            ));
154            tokio::spawn(event_processor(
155                shutdown_rx2,
156                internal_events,
157                senders.clone(),
158                peerlist.clone(),
159            ));
160            tokio::spawn(peerstate_checker(shutdown_rx3, senders, peerlist));
161
162            info!("Network service started.");
163        }
164    }
165}
166
167async fn command_processor(shutdown: Shutdown, commands: CommandReceiver, senders: Senders, peerlist: PeerList) {
168    debug!("Command processor running.");
169
170    let mut commands = ShutdownStream::new(shutdown, UnboundedReceiverStream::new(commands));
171
172    while let Some(command) = commands.next().await {
173        if let Err(e) = process_command(command, &senders, &peerlist).await {
174            error!("Error processing command. Cause: {}", e);
175            continue;
176        }
177    }
178
179    debug!("Command processor stopped.");
180}
181
182async fn event_processor(shutdown: Shutdown, events: InternalEventReceiver, senders: Senders, peerlist: PeerList) {
183    debug!("Event processor running.");
184
185    let mut internal_events = ShutdownStream::new(shutdown, UnboundedReceiverStream::new(events));
186
187    while let Some(internal_event) = internal_events.next().await {
188        if let Err(e) = process_internal_event(internal_event, &senders, &peerlist).await {
189            error!("Error processing internal event. Cause: {}", e);
190            continue;
191        }
192    }
193
194    debug!("Event processor stopped.");
195}
196
197// TODO: implement exponential back-off to not spam the peer with reconnect attempts.
198async fn peerstate_checker(shutdown: Shutdown, senders: Senders, peerlist: PeerList) {
199    debug!("Peer checker running.");
200
201    let Senders { internal_commands, .. } = senders;
202
203    // NOTE:
204    // We want to reduce the overhead of simultaneous mutual dialing even if several nodes are started at the same time
205    // (by script for example). We do this here by adding a small random delay to when this task will be executing
206    // regular peer state checks.
207    let delay = Duration::from_millis(rand::thread_rng().gen_range(0u64..MAX_PEER_STATE_CHECKER_DELAY_MILLIS));
208    let start = Instant::now() + delay;
209
210    // The (currently) constant interval at which peer state checks happen.
211    let period = Duration::from_secs(reconnect_interval_secs());
212
213    let mut interval = ShutdownStream::new(shutdown, IntervalStream::new(time::interval_at(start, period)));
214
215    // Check, if there are any disconnected known peers, and schedule a reconnect attempt for each
216    // of those.
217    while interval.next().await.is_some() {
218        let peerlist = peerlist.0.read().await;
219
220        let num_known = peerlist.filter_count(|info, _| info.relation.is_known());
221        let num_connected_known = peerlist.filter_count(|info, state| info.relation.is_known() && state.is_connected());
222        let num_connected_unknown =
223            peerlist.filter_count(|info, state| info.relation.is_unknown() && state.is_connected());
224
225        info!(
226            "Connected peers: known {}/{} unknown {}.",
227            num_connected_known, num_known, num_connected_unknown,
228        );
229
230        for (peer_id, info) in peerlist.filter_info(|info, state| info.relation.is_known() && state.is_disconnected()) {
231            info!("Trying to connect to: {} ({}).", info.alias, alias!(peer_id));
232
233            // Ignore if the command fails. We can always retry the next time.
234            let _ = internal_commands.send(Command::DialPeer { peer_id });
235        }
236    }
237
238    debug!("Peer checker stopped.");
239}
240
241async fn process_command(command: Command, senders: &Senders, peerlist: &PeerList) -> Result<(), Error> {
242    trace!("Received {:?}.", command);
243
244    match command {
245        Command::AddPeer {
246            peer_id,
247            multiaddr,
248            alias,
249            relation,
250        } => {
251            let alias = alias.unwrap_or_else(|| alias!(peer_id).to_string());
252
253            add_peer(peer_id, multiaddr, alias, relation, senders, peerlist).await?;
254        }
255
256        Command::BanAddress { address } => {
257            peerlist.0.write().await.ban_address(address.clone())?;
258
259            senders
260                .events
261                .send(Event::AddressBanned { address })
262                .map_err(|_| Error::SendingEventFailed)?;
263        }
264
265        Command::BanPeer { peer_id } => {
266            peerlist.0.write().await.ban_peer(peer_id)?;
267
268            senders
269                .events
270                .send(Event::PeerBanned { peer_id })
271                .map_err(|_| Error::SendingEventFailed)?;
272        }
273
274        Command::ChangeRelation { peer_id, to } => {
275            peerlist
276                .0
277                .write()
278                .await
279                .update_info(&peer_id, |info| info.relation = to)?;
280        }
281
282        Command::DialAddress { address } => {
283            senders
284                .internal_commands
285                .send(Command::DialAddress { address })
286                .map_err(|_| Error::SendingCommandFailed)?;
287        }
288
289        Command::DialPeer { peer_id } => {
290            senders
291                .internal_commands
292                .send(Command::DialPeer { peer_id })
293                .map_err(|_| Error::SendingCommandFailed)?;
294        }
295
296        Command::DisconnectPeer { peer_id } => {
297            disconnect_peer(peer_id, senders, peerlist).await?;
298        }
299
300        Command::RemovePeer { peer_id } => {
301            remove_peer(peer_id, senders, peerlist).await?;
302        }
303
304        Command::UnbanAddress { address } => {
305            peerlist.0.write().await.unban_address(&address)?;
306
307            senders
308                .events
309                .send(Event::AddressUnbanned { address })
310                .map_err(|_| Error::SendingEventFailed)?;
311        }
312
313        Command::UnbanPeer { peer_id } => {
314            peerlist.0.write().await.unban_peer(&peer_id)?;
315
316            senders
317                .events
318                .send(Event::PeerUnbanned { peer_id })
319                .map_err(|_| Error::SendingEventFailed)?;
320        }
321    }
322
323    Ok(())
324}
325
326async fn process_internal_event(
327    internal_event: InternalEvent,
328    senders: &Senders,
329    peerlist: &PeerList,
330) -> Result<(), Error> {
331    match internal_event {
332        InternalEvent::AddressBound { address } => {
333            senders
334                .events
335                .send(Event::AddressBound { address })
336                .map_err(|_| Error::SendingEventFailed)?;
337        }
338
339        InternalEvent::ProtocolDropped { peer_id } => {
340            let mut peerlist = peerlist.0.write().await;
341
342            // Try to disconnect, but ignore errors in-case the peer was disconnected already.
343            let _ = peerlist.update_state(&peer_id, |state| state.set_disconnected());
344
345            // Try to remove unknown peers.
346            let _ = peerlist.filter_remove(&peer_id, |peer_info, _| peer_info.relation.is_unknown());
347
348            // We no longer need to hold the lock.
349            drop(peerlist);
350
351            senders
352                .events
353                .send(Event::PeerDisconnected { peer_id })
354                .map_err(|_| Error::SendingEventFailed)?;
355        }
356
357        InternalEvent::ProtocolEstablished {
358            peer_id,
359            peer_addr,
360            origin,
361            substream,
362        } => {
363            let mut peerlist = peerlist.0.write().await;
364            let mut peer_added = false;
365
366            // NOTE: It's a bit unfortunate that atm there seems to be no way to inject custom criteria to prevent
367            // protocol negotiation. So we have to run the checks whether we want to allow that peer - and spend
368            // resources on it - here.
369
370            let accepted = peerlist.accepts_incoming_peer(&peer_id, &peer_addr);
371
372            if accepted.is_ok() {
373                // If the peer doesn't exist yet - but is accepted as an "unknown" peer, we insert it now.
374                if !peerlist.contains(&peer_id) {
375                    let peer_info = PeerInfo {
376                        address: peer_addr,
377                        alias: alias!(peer_id).to_string(),
378                        relation: PeerRelation::Unknown,
379                    };
380                    peerlist.insert_peer(peer_id, peer_info).map_err(|(_, _, e)| e)?;
381                    peer_added = true;
382                }
383
384                // Panic:
385                // We made sure, that the peer id exists in the above if-branch, hence, unwrapping is fine.
386                let peer_info = peerlist.info(&peer_id).unwrap();
387
388                // Spin up separate buffered reader and writer to efficiently process the gossip with that peer.
389                let (r, w) = substream.split();
390
391                let reader = BufReader::with_capacity(IO_BUFFER_LEN, r);
392                let writer = BufWriter::with_capacity(IO_BUFFER_LEN, w);
393
394                let (incoming_tx, incoming_rx) = iota_gossip::channel();
395                let (outgoing_tx, outgoing_rx) = iota_gossip::channel();
396
397                iota_gossip::start_incoming_processor(peer_id, reader, incoming_tx, senders.internal_events.clone());
398                iota_gossip::start_outgoing_processor(peer_id, writer, outgoing_rx, senders.internal_events.clone());
399
400                // We store a clone of the gossip send channel in order to send a shutdown signal.
401                let _ = peerlist.update_state(&peer_id, |state| state.set_connected(outgoing_tx.clone()));
402
403                // We no longer need to hold the lock.
404                drop(peerlist);
405
406                // We only want to fire events when no longer holding the lock to the peerlist to make this code more
407                // resilient against different channel implementations.
408
409                if peer_added {
410                    senders
411                        .events
412                        .send(Event::PeerAdded {
413                            peer_id,
414                            info: peer_info.clone(),
415                        })
416                        .map_err(|_| Error::SendingEventFailed)?;
417                }
418
419                info!(
420                    "Established ({}) protocol with {} ({}).",
421                    origin,
422                    peer_info.alias,
423                    alias!(peer_id)
424                );
425
426                senders
427                    .events
428                    .send(Event::PeerConnected {
429                        peer_id,
430                        info: peer_info,
431                        gossip_in: incoming_rx,
432                        gossip_out: outgoing_tx,
433                    })
434                    .map_err(|_| Error::SendingEventFailed)?;
435            } else {
436                // Panic:
437                // This branch handles the error case, so unwrapping it is fine.
438                debug!("{}", accepted.unwrap_err());
439            }
440        }
441    }
442
443    Ok(())
444}
445
446async fn add_peer(
447    peer_id: PeerId,
448    address: Multiaddr,
449    alias: String,
450    relation: PeerRelation,
451    senders: &Senders,
452    peerlist: &PeerList,
453) -> Result<(), Error> {
454    let peer_info = PeerInfo {
455        address,
456        alias,
457        relation,
458    };
459
460    let mut peerlist = peerlist.0.write().await;
461
462    // If the insert fails for some reason, we get the peer data back, so it can be reused.
463    match peerlist.insert_peer(peer_id, peer_info) {
464        Ok(()) => {
465            // Panic:
466            // We just added the peer_id so unwrapping here is fine.
467            let info = peerlist.info(&peer_id).unwrap();
468
469            // We no longer need to hold the lock.
470            drop(peerlist);
471
472            senders
473                .events
474                .send(Event::PeerAdded { peer_id, info })
475                .map_err(|_| Error::SendingEventFailed)?;
476
477            Ok(())
478        }
479        Err((peer_id, peer_info, mut e)) => {
480            // NB: This fixes an edge case where an in fact known peer connects before being added by the
481            // manual peer manager, and hence, as unknown. In such a case we simply update to the correct
482            // info (address, alias, relation).
483
484            // TODO: Since we nowadays add static peers during initialization (`init`), the above mentioned edge case is
485            // impossible to happen, and hence this match case can probably be removed. But this needs to be tested
486            // thoroughly in a live setup to really be sure.
487
488            if matches!(e, PeerError::PeerIsDuplicate(_)) {
489                match peerlist.update_info(&peer_id, |info| *info = peer_info.clone()) {
490                    Ok(()) => {
491                        // We no longer need to hold the lock.
492                        drop(peerlist);
493
494                        senders
495                            .events
496                            .send(Event::PeerAdded {
497                                peer_id,
498                                info: peer_info,
499                            })
500                            .map_err(|_| Error::SendingEventFailed)?;
501
502                        return Ok(());
503                    }
504                    Err(error) => e = error,
505                }
506            }
507
508            // We no longer need to hold the lock.
509            drop(peerlist);
510
511            senders
512                .events
513                .send(Event::CommandFailed {
514                    command: Command::AddPeer {
515                        peer_id,
516                        multiaddr: peer_info.address,
517                        // NOTE: the returned failed command now has the default alias, if none was specified
518                        // originally.
519                        alias: Some(peer_info.alias),
520                        relation: peer_info.relation,
521                    },
522                    reason: e.clone(),
523                })
524                .map_err(|_| Error::SendingEventFailed)?;
525
526            Err(e.into())
527        }
528    }
529}
530
531async fn remove_peer(peer_id: PeerId, senders: &Senders, peerlist: &PeerList) -> Result<(), Error> {
532    disconnect_peer(peer_id, senders, peerlist).await?;
533
534    let peer_removal = peerlist.0.write().await.remove(&peer_id);
535
536    match peer_removal {
537        Ok(_peer_info) => {
538            senders
539                .events
540                .send(Event::PeerRemoved { peer_id })
541                .map_err(|_| Error::SendingEventFailed)?;
542
543            Ok(())
544        }
545        Err(e) => {
546            senders
547                .events
548                .send(Event::CommandFailed {
549                    command: Command::RemovePeer { peer_id },
550                    reason: e.clone(),
551                })
552                .map_err(|_| Error::SendingEventFailed)?;
553
554            Err(e.into())
555        }
556    }
557}
558
559async fn disconnect_peer(peer_id: PeerId, senders: &Senders, peerlist: &PeerList) -> Result<(), Error> {
560    let state_update = peerlist
561        .0
562        .write()
563        .await
564        .update_state(&peer_id, |state| state.set_disconnected());
565
566    match state_update {
567        Ok(Some(gossip_sender)) => {
568            // We sent the `PeerDisconnected` event *before* we sent the shutdown signal to the stream writer task, so
569            // it can stop adding messages to the channel before we drop the receiver.
570
571            senders
572                .events
573                .send(Event::PeerDisconnected { peer_id })
574                .map_err(|_| Error::SendingEventFailed)?;
575
576            // Try to send the shutdown signal. It has to be a Vec<u8>, but it doesn't have to allocate.
577            // We ignore the potential error in case that peer disconnected from us already in the meantime.
578            let _ = gossip_sender.send(Vec::new());
579
580            Ok(())
581        }
582        Ok(None) => {
583            // already disconnected
584            Ok(())
585        }
586        Err(e) => {
587            senders
588                .events
589                .send(Event::CommandFailed {
590                    command: Command::DisconnectPeer { peer_id },
591                    reason: e.clone(),
592                })
593                .map_err(|_| Error::SendingEventFailed)?;
594
595            Err(e.into())
596        }
597    }
598}