1use std::{
2 fmt::{Debug, Display},
3 hash::Hash,
4 time::Duration,
5};
6
7use async_trait::async_trait;
8use futures::stream::BoxStream;
9use nimiq_serde::{Deserialize, DeserializeError, Serialize};
10use nimiq_utils::tagged_signing::{TaggedKeyPair, TaggedSignable};
11use thiserror::Error;
12use tokio_stream::wrappers::errors::BroadcastStreamRecvError;
13
14use crate::{
15 peer_info::*,
16 request::{Message, Request, RequestError},
17};
18
19#[derive(Clone, Debug)]
21pub enum NetworkEvent<P> {
22 PeerJoined(P, PeerInfo),
24 PeerLeft(P),
26 DhtReady,
28}
29
30pub type SubscribeEvents<PeerId> =
31 BoxStream<'static, Result<NetworkEvent<PeerId>, BroadcastStreamRecvError>>;
32
33const DEFAULT_GOSSIPSUB_RATE_LIMIT_TIME_WINDOW: Duration = Duration::from_secs(10);
34
35pub trait Topic {
36 type Item: Serialize + Deserialize + Send + Sync + Debug + 'static;
37
38 const BUFFER_SIZE: usize;
39 const NAME: &'static str;
40 const VALIDATE: bool;
41 const MAX_MESSAGES: u32;
42 const TIME_WINDOW: Duration = DEFAULT_GOSSIPSUB_RATE_LIMIT_TIME_WINDOW;
43}
44
45pub const MIN_SUPPORTED_MSG_SIZE: usize = 1024 * 1024;
47
48#[derive(Copy, Clone, Debug)]
49pub enum MsgAcceptance {
50 Accept,
51 Reject,
52 Ignore,
53}
54
55pub trait PubsubId<PeerId>: Clone + Send + Sync + Debug {
56 fn propagation_source(&self) -> PeerId;
57}
58
59pub const MIN_SUPPORTED_REQ_SIZE: usize = 20 * 1024;
61pub const MIN_SUPPORTED_RESP_SIZE: usize = 10 * 1024 * 1024;
63
64#[derive(Copy, Clone, Debug)]
65pub enum CloseReason {
67 Other,
69 RemoteClosed,
71 GoingOffline,
74 Error,
76 MaliciousPeer,
78}
79
80#[derive(Debug, Error)]
81pub enum SendError {
82 #[error("{0}")]
83 Serialization(#[from] DeserializeError),
84 #[error("Peer connection already closed")]
85 AlreadyClosed,
86}
87
88#[async_trait]
89pub trait Network: Send + Sync + Unpin + 'static {
90 type PeerId: Copy + Debug + Display + Ord + Hash + Send + Sync + Unpin + 'static;
91 type AddressType: Debug + Display + 'static;
92 type Error: std::error::Error;
93 type PubsubId: PubsubId<Self::PeerId> + Send + Sync + Unpin;
94 type RequestId: Copy + Debug + Display + Eq + Send + Sync + 'static;
95
96 fn get_peers(&self) -> Vec<Self::PeerId>;
98
99 fn has_peer(&self, peer_id: Self::PeerId) -> bool;
101
102 fn get_peer_info(&self, peer_id: Self::PeerId) -> Option<PeerInfo>;
105
106 async fn get_peers_by_services(
111 &self,
112 services: Services,
113 min_peers: usize,
114 ) -> Result<Vec<Self::PeerId>, Self::Error>;
115
116 fn peer_provides_required_services(&self, peer_id: Self::PeerId) -> bool;
118
119 fn peer_provides_services(&self, peer_id: Self::PeerId, services: Services) -> bool;
121
122 async fn disconnect_peer(&self, peer_id: Self::PeerId, close_reason: CloseReason);
124
125 fn subscribe_events(&self) -> SubscribeEvents<Self::PeerId>;
127
128 async fn subscribe<T>(
130 &self,
131 ) -> Result<BoxStream<'static, (T::Item, Self::PubsubId)>, Self::Error>
132 where
133 T: Topic + Sync;
134
135 async fn unsubscribe<T>(&self) -> Result<(), Self::Error>
137 where
138 T: Topic + Sync;
139
140 async fn publish<T>(&self, item: T::Item) -> Result<(), Self::Error>
142 where
143 T: Topic + Sync;
144
145 async fn subscribe_subtopic<T>(
147 &self,
148 subtopic: String,
149 ) -> Result<BoxStream<'static, (T::Item, Self::PubsubId)>, Self::Error>
150 where
151 T: Topic + Sync;
152
153 async fn unsubscribe_subtopic<T>(&self, subtopic: String) -> Result<(), Self::Error>
155 where
156 T: Topic + Sync;
157
158 async fn publish_subtopic<T>(&self, subtopic: String, item: T::Item) -> Result<(), Self::Error>
160 where
161 T: Topic + Sync;
162
163 fn validate_message<T>(&self, id: Self::PubsubId, acceptance: MsgAcceptance)
165 where
166 T: Topic + Sync;
167
168 async fn dht_get<K, V, T>(&self, k: &K) -> Result<Option<V>, Self::Error>
170 where
171 K: AsRef<[u8]> + Send + Sync,
172 V: Deserialize + Send + Sync + TaggedSignable + Ord,
173 T: TaggedKeyPair + Send + Sync + Serialize + Deserialize;
174
175 async fn dht_put<K, V, T>(&self, k: &K, v: &V, keypair: &T) -> Result<(), Self::Error>
177 where
178 K: AsRef<[u8]> + Send + Sync,
179 V: Serialize + Send + Sync + TaggedSignable + Clone + Ord,
180 T: TaggedKeyPair + Send + Sync + Serialize + Deserialize;
181
182 async fn dial_peer(&self, peer_id: Self::PeerId) -> Result<(), Self::Error>;
184
185 async fn dial_address(&self, address: Self::AddressType) -> Result<(), Self::Error>;
187
188 fn get_local_peer_id(&self) -> Self::PeerId;
190
191 async fn message<M: Message>(
193 &self,
194 request: M,
195 peer_id: Self::PeerId,
196 ) -> Result<(), RequestError>;
197
198 async fn request<Req: Request>(
200 &self,
201 request: Req,
202 peer_id: Self::PeerId,
203 ) -> Result<Req::Response, RequestError>;
204
205 fn receive_messages<M: Message>(&self) -> BoxStream<'static, (M, Self::PeerId)>;
208
209 fn receive_requests<Req: Request>(
212 &self,
213 ) -> BoxStream<'static, (Req, Self::RequestId, Self::PeerId)>;
214
215 async fn respond<Req: Request>(
217 &self,
218 request_id: Self::RequestId,
219 response: Req::Response,
220 ) -> Result<(), Self::Error>;
221}