bee_network/swarm/protocols/iota_gossip/
io.rs

1// Copyright 2020-2021 IOTA Stiftung
2// SPDX-License-Identifier: Apache-2.0
3
4use crate::{
5    alias,
6    service::event::{InternalEvent, InternalEventSender},
7};
8
9use futures::{
10    io::{BufReader, BufWriter, ReadHalf, WriteHalf},
11    AsyncReadExt, AsyncWriteExt, StreamExt,
12};
13use libp2p::{swarm::NegotiatedSubstream, PeerId};
14use log::*;
15use tokio::sync::mpsc;
16use tokio_stream::wrappers::UnboundedReceiverStream;
17
18const MSG_BUFFER_LEN: usize = 32768;
19
20/// A type alias for an unbounded channel sender.
21pub type GossipSender = mpsc::UnboundedSender<Vec<u8>>;
22
23/// A type alias for an unbounded channel receiver.
24pub type GossipReceiver = UnboundedReceiverStream<Vec<u8>>;
25
26pub fn channel() -> (GossipSender, GossipReceiver) {
27    let (sender, receiver) = mpsc::unbounded_channel();
28    (sender, UnboundedReceiverStream::new(receiver))
29}
30
31pub fn start_incoming_processor(
32    peer_id: PeerId,
33    mut reader: BufReader<ReadHalf<Box<NegotiatedSubstream>>>,
34    incoming_tx: GossipSender,
35    internal_event_sender: InternalEventSender,
36) {
37    tokio::spawn(async move {
38        let mut msg_buf = vec![0u8; MSG_BUFFER_LEN];
39
40        loop {
41            if let Some(len) = (&mut reader).read(&mut msg_buf).await.ok().filter(|len| *len > 0) {
42                if incoming_tx.send(msg_buf[..len].to_vec()).is_err() {
43                    debug!("gossip-in: receiver dropped locally.");
44
45                    // The receiver of this channel was dropped, maybe due to a shutdown. There is nothing we can do to
46                    // salvage this situation, hence we drop the connection.
47                    break;
48                }
49            } else {
50                debug!("gossip-in: stream closed remotely.");
51
52                // NB: The network service will not shut down before it has received the `ProtocolDropped` event from
53                // all once connected peers, hence if the following send fails, then it must be
54                // considered a bug.
55
56                // The remote peer dropped the connection.
57                internal_event_sender
58                    .send(InternalEvent::ProtocolDropped { peer_id })
59                    .expect("The service must not shutdown as long as there are gossip tasks running.");
60
61                break;
62            }
63        }
64
65        // Reasons why this task might end:
66        // (1) The remote dropped the TCP connection.
67        // (2) The local dropped the gossip_in receiver channel.
68
69        debug!("gossip-in: exiting gossip-in processor for {}.", alias!(peer_id));
70    });
71}
72
73pub fn start_outgoing_processor(
74    peer_id: PeerId,
75    mut writer: BufWriter<WriteHalf<Box<NegotiatedSubstream>>>,
76    outgoing_rx: GossipReceiver,
77    internal_event_sender: InternalEventSender,
78) {
79    tokio::spawn(async move {
80        let mut outgoing_gossip_receiver = outgoing_rx.fuse();
81
82        // If the gossip sender dropped we end the connection.
83        while let Some(message) = outgoing_gossip_receiver.next().await {
84            // NB: Instead of polling another shutdown channel, we use an empty message
85            // to signal that we want to end the connection. We use this "trick" whenever the network
86            // receives the `DisconnectPeer` command to enforce that the connection will be dropped.
87
88            if message.is_empty() {
89                debug!("gossip-out: received shutdown message.");
90
91                // NB: The network service will not shut down before it has received the `ConnectionDropped` event from
92                // all once connected peers, hence if the following send fails, then it must be
93                // considered a bug.
94
95                internal_event_sender
96                    .send(InternalEvent::ProtocolDropped { peer_id })
97                    .expect("The service must not shutdown as long as there are gossip tasks running.");
98
99                break;
100            }
101
102            // If sending to the stream fails we end the connection.
103            // TODO: buffer for x milliseconds before flushing.
104            if (&mut writer).write_all(&message).await.is_err() || (&mut writer).flush().await.is_err() {
105                debug!("gossip-out: stream closed remotely");
106                break;
107            }
108        }
109
110        // Reasons why this task might end:
111        // (1) The local send the shutdown message (len = 0)
112        // (2) The remote dropped the TCP connection.
113
114        debug!("gossip-out: exiting gossip-out processor for {}.", alias!(peer_id));
115    });
116}