use std::{
fmt::{Debug, Display},
hash::Hash,
time::Duration,
};
use async_trait::async_trait;
use futures::stream::BoxStream;
use nimiq_serde::{Deserialize, DeserializeError, Serialize};
use nimiq_utils::tagged_signing::{TaggedKeyPair, TaggedSignable};
use thiserror::Error;
use tokio_stream::wrappers::errors::BroadcastStreamRecvError;
use crate::{
peer_info::*,
request::{Message, Request, RequestError},
};
#[derive(Clone, Debug)]
pub enum NetworkEvent<P> {
PeerJoined(P, PeerInfo),
PeerLeft(P),
DhtReady,
}
pub type SubscribeEvents<PeerId> =
BoxStream<'static, Result<NetworkEvent<PeerId>, BroadcastStreamRecvError>>;
const DEFAULT_GOSSIPSUB_RATE_LIMIT_TIME_WINDOW: Duration = Duration::from_secs(10);
pub trait Topic {
type Item: Serialize + Deserialize + Send + Sync + Debug + 'static;
const BUFFER_SIZE: usize;
const NAME: &'static str;
const VALIDATE: bool;
const MAX_MESSAGES: u32;
const TIME_WINDOW: Duration = DEFAULT_GOSSIPSUB_RATE_LIMIT_TIME_WINDOW;
}
pub const MIN_SUPPORTED_MSG_SIZE: usize = 1024 * 1024;
#[derive(Copy, Clone, Debug)]
pub enum MsgAcceptance {
Accept,
Reject,
Ignore,
}
pub trait PubsubId<PeerId>: Clone + Send + Sync + Debug {
fn propagation_source(&self) -> PeerId;
}
pub const MIN_SUPPORTED_REQ_SIZE: usize = 20 * 1024;
pub const MIN_SUPPORTED_RESP_SIZE: usize = 10 * 1024 * 1024;
#[derive(Copy, Clone, Debug)]
pub enum CloseReason {
Other,
RemoteClosed,
GoingOffline,
Error,
MaliciousPeer,
}
#[derive(Debug, Error)]
pub enum SendError {
#[error("{0}")]
Serialization(#[from] DeserializeError),
#[error("Peer connection already closed")]
AlreadyClosed,
}
#[async_trait]
pub trait Network: Send + Sync + Unpin + 'static {
type PeerId: Copy + Debug + Display + Ord + Hash + Send + Sync + Unpin + 'static;
type AddressType: Debug + Display + 'static;
type Error: std::error::Error;
type PubsubId: PubsubId<Self::PeerId> + Send + Sync + Unpin;
type RequestId: Copy + Debug + Display + Eq + Send + Sync + 'static;
fn get_peers(&self) -> Vec<Self::PeerId>;
fn has_peer(&self, peer_id: Self::PeerId) -> bool;
fn get_peer_info(&self, peer_id: Self::PeerId) -> Option<PeerInfo>;
async fn get_peers_by_services(
&self,
services: Services,
min_peers: usize,
) -> Result<Vec<Self::PeerId>, Self::Error>;
fn peer_provides_required_services(&self, peer_id: Self::PeerId) -> bool;
fn peer_provides_services(&self, peer_id: Self::PeerId, services: Services) -> bool;
async fn disconnect_peer(&self, peer_id: Self::PeerId, close_reason: CloseReason);
fn subscribe_events(&self) -> SubscribeEvents<Self::PeerId>;
async fn subscribe<T>(
&self,
) -> Result<BoxStream<'static, (T::Item, Self::PubsubId)>, Self::Error>
where
T: Topic + Sync;
async fn unsubscribe<T>(&self) -> Result<(), Self::Error>
where
T: Topic + Sync;
async fn publish<T>(&self, item: T::Item) -> Result<(), Self::Error>
where
T: Topic + Sync;
async fn subscribe_subtopic<T>(
&self,
subtopic: String,
) -> Result<BoxStream<'static, (T::Item, Self::PubsubId)>, Self::Error>
where
T: Topic + Sync;
async fn unsubscribe_subtopic<T>(&self, subtopic: String) -> Result<(), Self::Error>
where
T: Topic + Sync;
async fn publish_subtopic<T>(&self, subtopic: String, item: T::Item) -> Result<(), Self::Error>
where
T: Topic + Sync;
fn validate_message<T>(&self, id: Self::PubsubId, acceptance: MsgAcceptance)
where
T: Topic + Sync;
async fn dht_get<K, V, T>(&self, k: &K) -> Result<Option<V>, Self::Error>
where
K: AsRef<[u8]> + Send + Sync,
V: Deserialize + Send + Sync + TaggedSignable + Ord,
T: TaggedKeyPair + Send + Sync + Serialize + Deserialize;
async fn dht_put<K, V, T>(&self, k: &K, v: &V, keypair: &T) -> Result<(), Self::Error>
where
K: AsRef<[u8]> + Send + Sync,
V: Serialize + Send + Sync + TaggedSignable + Clone + Ord,
T: TaggedKeyPair + Send + Sync + Serialize + Deserialize;
async fn dial_peer(&self, peer_id: Self::PeerId) -> Result<(), Self::Error>;
async fn dial_address(&self, address: Self::AddressType) -> Result<(), Self::Error>;
fn get_local_peer_id(&self) -> Self::PeerId;
async fn message<M: Message>(
&self,
request: M,
peer_id: Self::PeerId,
) -> Result<(), RequestError>;
async fn request<Req: Request>(
&self,
request: Req,
peer_id: Self::PeerId,
) -> Result<Req::Response, RequestError>;
fn receive_messages<M: Message>(&self) -> BoxStream<'static, (M, Self::PeerId)>;
fn receive_requests<Req: Request>(
&self,
) -> BoxStream<'static, (Req, Self::RequestId, Self::PeerId)>;
async fn respond<Req: Request>(
&self,
request_id: Self::RequestId,
response: Req::Response,
) -> Result<(), Self::Error>;
}