bee_network/swarm/protocols/iota_gossip/
io.rs1use crate::{
5 alias,
6 service::event::{InternalEvent, InternalEventSender},
7};
8
9use futures::{
10 io::{BufReader, BufWriter, ReadHalf, WriteHalf},
11 AsyncReadExt, AsyncWriteExt, StreamExt,
12};
13use libp2p::{swarm::NegotiatedSubstream, PeerId};
14use log::*;
15use tokio::sync::mpsc;
16use tokio_stream::wrappers::UnboundedReceiverStream;
17
18const MSG_BUFFER_LEN: usize = 32768;
19
20pub type GossipSender = mpsc::UnboundedSender<Vec<u8>>;
22
23pub type GossipReceiver = UnboundedReceiverStream<Vec<u8>>;
25
26pub fn channel() -> (GossipSender, GossipReceiver) {
27 let (sender, receiver) = mpsc::unbounded_channel();
28 (sender, UnboundedReceiverStream::new(receiver))
29}
30
31pub fn start_incoming_processor(
32 peer_id: PeerId,
33 mut reader: BufReader<ReadHalf<Box<NegotiatedSubstream>>>,
34 incoming_tx: GossipSender,
35 internal_event_sender: InternalEventSender,
36) {
37 tokio::spawn(async move {
38 let mut msg_buf = vec![0u8; MSG_BUFFER_LEN];
39
40 loop {
41 if let Some(len) = (&mut reader).read(&mut msg_buf).await.ok().filter(|len| *len > 0) {
42 if incoming_tx.send(msg_buf[..len].to_vec()).is_err() {
43 debug!("gossip-in: receiver dropped locally.");
44
45 break;
48 }
49 } else {
50 debug!("gossip-in: stream closed remotely.");
51
52 internal_event_sender
58 .send(InternalEvent::ProtocolDropped { peer_id })
59 .expect("The service must not shutdown as long as there are gossip tasks running.");
60
61 break;
62 }
63 }
64
65 debug!("gossip-in: exiting gossip-in processor for {}.", alias!(peer_id));
70 });
71}
72
73pub fn start_outgoing_processor(
74 peer_id: PeerId,
75 mut writer: BufWriter<WriteHalf<Box<NegotiatedSubstream>>>,
76 outgoing_rx: GossipReceiver,
77 internal_event_sender: InternalEventSender,
78) {
79 tokio::spawn(async move {
80 let mut outgoing_gossip_receiver = outgoing_rx.fuse();
81
82 while let Some(message) = outgoing_gossip_receiver.next().await {
84 if message.is_empty() {
89 debug!("gossip-out: received shutdown message.");
90
91 internal_event_sender
96 .send(InternalEvent::ProtocolDropped { peer_id })
97 .expect("The service must not shutdown as long as there are gossip tasks running.");
98
99 break;
100 }
101
102 if (&mut writer).write_all(&message).await.is_err() || (&mut writer).flush().await.is_err() {
105 debug!("gossip-out: stream closed remotely");
106 break;
107 }
108 }
109
110 debug!("gossip-out: exiting gossip-out processor for {}.", alias!(peer_id));
115 });
116}