bee_gossip/service/
event.rs

1// Copyright 2020-2021 IOTA Stiftung
2// SPDX-License-Identifier: Apache-2.0
3
4use libp2p::swarm::NegotiatedSubstream;
5use libp2p_core::{Multiaddr, PeerId};
6use tokio::sync::mpsc;
7
8use super::command::Command;
9use crate::{
10    network::origin::Origin,
11    peer::{error::Error as PeerError, info::PeerInfo},
12    swarm::protocols::iota_gossip::{GossipReceiver, GossipSender},
13};
14
15pub type EventSender = mpsc::UnboundedSender<Event>;
16pub type EventReceiver = mpsc::UnboundedReceiver<Event>;
17pub type InternalEventReceiver = mpsc::UnboundedReceiver<InternalEvent>;
18pub type InternalEventSender = mpsc::UnboundedSender<InternalEvent>;
19
20pub fn event_channel<T>() -> (mpsc::UnboundedSender<T>, mpsc::UnboundedReceiver<T>) {
21    mpsc::unbounded_channel()
22}
23
24/// Describes the public events produced by the networking layer.
25#[derive(Debug)]
26#[non_exhaustive]
27pub enum Event {
28    /// An address was banned.
29    AddressBanned {
30        /// The peer's address.
31        address: Multiaddr,
32    },
33
34    /// An address was unbanned.
35    AddressUnbanned {
36        /// The peer's address.
37        address: Multiaddr,
38    },
39
40    /// An address was bound.
41    AddressBound {
42        /// The assigned bind address.
43        address: Multiaddr,
44    },
45
46    /// A command failed.
47    CommandFailed {
48        /// The command that failed.
49        command: Command,
50        /// The reason for the failure.
51        reason: PeerError,
52    },
53
54    /// The local peer id was created.
55    LocalIdCreated {
56        /// The created peer id from the Ed25519 keypair.
57        local_id: PeerId,
58    },
59
60    /// A peer was added.
61    PeerAdded {
62        /// The peer's id.
63        peer_id: PeerId,
64        /// The peer's info.
65        info: PeerInfo,
66    },
67
68    /// A peer was banned.
69    PeerBanned {
70        /// The peer's id.
71        peer_id: PeerId,
72    },
73
74    /// A peer was connected.
75    PeerConnected {
76        /// The peer's id.
77        peer_id: PeerId,
78        /// The peer's info.
79        info: PeerInfo,
80        /// The peer's message recv channel.
81        gossip_in: GossipReceiver,
82        /// The peer's message send channel.
83        gossip_out: GossipSender,
84    },
85
86    /// A peer was disconnected.
87    PeerDisconnected {
88        /// The peer's id.
89        peer_id: PeerId,
90    },
91
92    /// A peer was removed.
93    PeerRemoved {
94        /// The peer's id.
95        peer_id: PeerId,
96    },
97
98    /// A peer was unbanned.
99    PeerUnbanned {
100        /// The peer's id.
101        peer_id: PeerId,
102    },
103
104    /// A peer didn't answer our repeated calls.
105    PeerUnreachable {
106        /// The peer's id.
107        peer_id: PeerId,
108        /// The peer's info.
109        peer_info: PeerInfo,
110    },
111}
112
113/// Describes the internal events.
114#[derive(Debug)]
115pub enum InternalEvent {
116    /// An address was bound.
117    AddressBound {
118        /// The assigned bind address.
119        address: Multiaddr,
120    },
121
122    /// The gossip protocol has been established with a peer.
123    ProtocolEstablished {
124        /// The peer's id.
125        peer_id: PeerId,
126        /// The peer's address.
127        peer_addr: Multiaddr,
128        /// The associated connection info with that peer.
129        origin: Origin,
130        /// The negotiated substream the protocol is running on.
131        substream: Box<NegotiatedSubstream>,
132    },
133
134    /// The gossip protocol with a peer was stopped.
135    ProtocolStopped {
136        /// The peer's id.
137        peer_id: PeerId,
138    },
139
140    /// A peer didn't answer our repeated calls.
141    PeerUnreachable {
142        /// The peer's id.
143        peer_id: PeerId,
144    },
145
146    /// A peer has identified itself via the `libp2p` Identify protocol.
147    PeerIdentified {
148        /// The peer's id.
149        peer_id: PeerId,
150    },
151}
152
153/// Allows the user to receive [`Event`]s published by the network layer.
154pub struct NetworkEventReceiver(EventReceiver);
155
156impl NetworkEventReceiver {
157    pub(crate) fn new(inner: EventReceiver) -> Self {
158        Self(inner)
159    }
160
161    /// Waits for an event from the network.
162    pub async fn recv(&mut self) -> Option<Event> {
163        self.0.recv().await
164    }
165}
166
167impl From<NetworkEventReceiver> for EventReceiver {
168    fn from(rx: NetworkEventReceiver) -> EventReceiver {
169        rx.0
170    }
171}