Skip to main content

nimiq_network_interface/
network.rs

1use std::{
2    fmt::{Debug, Display},
3    hash::Hash,
4    time::Duration,
5};
6
7use async_trait::async_trait;
8use futures::stream::BoxStream;
9use nimiq_serde::{Deserialize, DeserializeError, Serialize};
10use nimiq_utils::tagged_signing::{TaggedKeyPair, TaggedSignable};
11use thiserror::Error;
12use tokio_stream::wrappers::errors::BroadcastStreamRecvError;
13
14use crate::{
15    peer_info::*,
16    request::{Message, Request, RequestError},
17};
18
19/// Network events that the network will report when subscribing
20#[derive(Clone, Debug)]
21pub enum NetworkEvent<P> {
22    /// A connection to a new peer has been started
23    PeerJoined(P, PeerInfo),
24    /// A peer disconnected
25    PeerLeft(P),
26    /// DHT is ready (bootstrapped and in server mode) to publish records
27    DhtReady,
28}
29
30pub type SubscribeEvents<PeerId> =
31    BoxStream<'static, Result<NetworkEvent<PeerId>, BroadcastStreamRecvError>>;
32
33const DEFAULT_GOSSIPSUB_RATE_LIMIT_TIME_WINDOW: Duration = Duration::from_secs(10);
34
35pub trait Topic {
36    type Item: Serialize + Deserialize + Send + Sync + Debug + 'static;
37
38    const BUFFER_SIZE: usize;
39    const NAME: &'static str;
40    const VALIDATE: bool;
41    const MAX_MESSAGES: u32;
42    const TIME_WINDOW: Duration = DEFAULT_GOSSIPSUB_RATE_LIMIT_TIME_WINDOW;
43}
44
45/// Network implementations have to at least support messages of this size.
46pub const MIN_SUPPORTED_MSG_SIZE: usize = 1024 * 1024;
47
48#[derive(Copy, Clone, Debug)]
49pub enum MsgAcceptance {
50    Accept,
51    Reject,
52    Ignore,
53}
54
55pub trait PubsubId<PeerId>: Clone + Send + Sync + Debug {
56    fn propagation_source(&self) -> PeerId;
57}
58
59/// Network implementations have to at least support requests of this size.
60pub const MIN_SUPPORTED_REQ_SIZE: usize = 20 * 1024;
61/// Network implementations have to at least support responses of this size.
62pub const MIN_SUPPORTED_RESP_SIZE: usize = 10 * 1024 * 1024;
63
64#[derive(Copy, Clone, Debug)]
65/// Reasons for closing a connection
66pub enum CloseReason {
67    /// Reason is unknown or doesn't fit the other reasons
68    Other,
69    /// The other peer closed the connection
70    RemoteClosed,
71    /// We need to close the connection to this peer because we are going offline
72    /// and don't want new connections.
73    GoingOffline,
74    /// There was an error and there is need to close the connection
75    Error,
76    /// Peer is malicious. This will cause the peer ID and address to get banned.
77    MaliciousPeer,
78}
79
80#[derive(Debug, Error)]
81pub enum SendError {
82    #[error("{0}")]
83    Serialization(#[from] DeserializeError),
84    #[error("Peer connection already closed")]
85    AlreadyClosed,
86}
87
88#[async_trait]
89pub trait Network: Send + Sync + Unpin + 'static {
90    type PeerId: Copy + Debug + Display + Ord + Hash + Send + Sync + Unpin + 'static;
91    type AddressType: Debug + Display + 'static;
92    type Error: std::error::Error;
93    type PubsubId: PubsubId<Self::PeerId> + Send + Sync + Unpin;
94    type RequestId: Copy + Debug + Display + Eq + Send + Sync + 'static;
95
96    /// Gets the set of connected peers
97    fn get_peers(&self) -> Vec<Self::PeerId>;
98
99    /// Returns whether the current peer has a connection to another peer
100    fn has_peer(&self, peer_id: Self::PeerId) -> bool;
101
102    /// Gets a peer information.
103    /// If the peer isn't found, `None` is returned.
104    fn get_peer_info(&self, peer_id: Self::PeerId) -> Option<PeerInfo>;
105
106    /// Gets the set of connected peers that provide the supplied services.
107    /// If we currently don't have min number of connected peer that provides those services,
108    /// we dial peers.
109    /// If there aren't enough peers in the network that provides the required services, we return an error
110    async fn get_peers_by_services(
111        &self,
112        services: Services,
113        min_peers: usize,
114    ) -> Result<Vec<Self::PeerId>, Self::Error>;
115
116    /// Returns true when the given peer provides the services flags that are required by us
117    fn peer_provides_required_services(&self, peer_id: Self::PeerId) -> bool;
118
119    /// Returns true when the given peer provides the services flags that are required by us
120    fn peer_provides_services(&self, peer_id: Self::PeerId, services: Services) -> bool;
121
122    /// Disconnects a peer with a close reason
123    async fn disconnect_peer(&self, peer_id: Self::PeerId, close_reason: CloseReason);
124
125    /// Subscribes to network events
126    fn subscribe_events(&self) -> SubscribeEvents<Self::PeerId>;
127
128    /// Subscribes to a Gossipsub topic
129    async fn subscribe<T>(
130        &self,
131    ) -> Result<BoxStream<'static, (T::Item, Self::PubsubId)>, Self::Error>
132    where
133        T: Topic + Sync;
134
135    /// Unsubscribes from a Gossipsub topic
136    async fn unsubscribe<T>(&self) -> Result<(), Self::Error>
137    where
138        T: Topic + Sync;
139
140    /// Publishes a message to a Gossipsub topic
141    async fn publish<T>(&self, item: T::Item) -> Result<(), Self::Error>
142    where
143        T: Topic + Sync;
144
145    /// Subscribes to a Gossipsub subtopic, providing the subtopic name
146    async fn subscribe_subtopic<T>(
147        &self,
148        subtopic: String,
149    ) -> Result<BoxStream<'static, (T::Item, Self::PubsubId)>, Self::Error>
150    where
151        T: Topic + Sync;
152
153    /// Unsubscribes from a Gossipsub subtopic
154    async fn unsubscribe_subtopic<T>(&self, subtopic: String) -> Result<(), Self::Error>
155    where
156        T: Topic + Sync;
157
158    /// Publishes a message to a Gossipsub subtopic
159    async fn publish_subtopic<T>(&self, subtopic: String, item: T::Item) -> Result<(), Self::Error>
160    where
161        T: Topic + Sync;
162
163    /// Validates a message received from a Gossipsub topic
164    fn validate_message<T>(&self, id: Self::PubsubId, acceptance: MsgAcceptance)
165    where
166        T: Topic + Sync;
167
168    /// Gets a value from the distributed hash table
169    async fn dht_get<K, V, T>(&self, k: &K) -> Result<Option<V>, Self::Error>
170    where
171        K: AsRef<[u8]> + Send + Sync,
172        V: Deserialize + Send + Sync + TaggedSignable + Ord,
173        T: TaggedKeyPair + Send + Sync + Serialize + Deserialize;
174
175    /// Puts a value to the distributed hash table
176    async fn dht_put<K, V, T>(&self, k: &K, v: &V, keypair: &T) -> Result<(), Self::Error>
177    where
178        K: AsRef<[u8]> + Send + Sync,
179        V: Serialize + Send + Sync + TaggedSignable + Clone + Ord,
180        T: TaggedKeyPair + Send + Sync + Serialize + Deserialize;
181
182    /// Dials a peer
183    async fn dial_peer(&self, peer_id: Self::PeerId) -> Result<(), Self::Error>;
184
185    /// Dials an address
186    async fn dial_address(&self, address: Self::AddressType) -> Result<(), Self::Error>;
187
188    /// Gets the local peer ID
189    fn get_local_peer_id(&self) -> Self::PeerId;
190
191    /// Sends a message to a specific peer
192    async fn message<M: Message>(
193        &self,
194        request: M,
195        peer_id: Self::PeerId,
196    ) -> Result<(), RequestError>;
197
198    /// Requests data from a specific peer
199    async fn request<Req: Request>(
200        &self,
201        request: Req,
202        peer_id: Self::PeerId,
203    ) -> Result<Req::Response, RequestError>;
204
205    /// Receives messages from peers.
206    /// This function returns a stream where the messages are going to be propagated.
207    fn receive_messages<M: Message>(&self) -> BoxStream<'static, (M, Self::PeerId)>;
208
209    /// Receives requests from peers.
210    /// This function returns a stream where the requests are going to be propagated.
211    fn receive_requests<Req: Request>(
212        &self,
213    ) -> BoxStream<'static, (Req, Self::RequestId, Self::PeerId)>;
214
215    /// Sends a response to a specific request
216    async fn respond<Req: Request>(
217        &self,
218        request_id: Self::RequestId,
219        response: Req::Response,
220    ) -> Result<(), Self::Error>;
221}