use self::ping_config::PingInfo;
use libp2p::gossipsub::MessageId;
use libp2p_identity::PublicKey;
use serde::{Deserialize, Serialize};
use std::fmt::Debug;
use std::hash::Hash;
use std::{collections::VecDeque, time::Instant};
use thiserror::Error;
use super::*;
pub const NETWORK_READ_TIMEOUT: Seconds = 30;
pub const TASK_SLEEP_DURATION: Seconds = 3;
const MAX_QUEUE_ELEMENTS: usize = 300;
pub type AppResponseResult = Result<AppResponse, NetworkError>;
pub type RpcData = ByteVector;
pub type ByteVector = Vec<Vec<u8>>;
pub type ShardId = String;
pub type NetworkResult<T> = Result<T, NetworkError>;
pub type StringVector = Vec<String>;
pub type Nonce = u64;
pub(super) const BOOT_WAIT_TIME: Seconds = 1;
pub(super) const STREAM_BUFFER_CAPACITY: usize = 100;
#[derive(Debug, Clone)]
pub(super) enum StreamData {
FromApplication(StreamId, AppData),
ToApplication(StreamId, AppResponse),
}
#[derive(Debug, Clone)]
pub enum AppData {
Echo(String),
DailPeer(PeerId, MultiaddrString),
KademliaStoreRecord {
key: Vec<u8>,
value: Vec<u8>,
expiration_time: Option<Instant>,
explicit_peers: Option<Vec<PeerIdString>>,
},
KademliaLookupRecord { key: Vec<u8> },
KademliaGetProviders { key: Vec<u8> },
KademliaStopProviding { key: Vec<u8> },
KademliaDeleteRecord { key: Vec<u8> },
KademliaGetRoutingTableInfo,
SendRpc { keys: RpcData, peer: PeerId },
GetNetworkInfo,
GossipsubBroadcastMessage {
topic: String,
message: ByteVector,
},
GossipsubJoinNetwork(String),
GossipsubGetInfo,
GossipsubExitNetwork(String),
GossipsubBlacklistPeer(PeerId),
GossipsubFilterBlacklist(PeerId),
}
#[derive(Debug, Clone, PartialEq)]
pub enum AppResponse {
Echo(String),
DailPeerSuccess(String),
KademliaStoreRecordSuccess,
KademliaLookupSuccess(Vec<u8>),
KademliaGetProviders {
key: Vec<u8>,
providers: Vec<PeerIdString>,
},
KademliaNoProvidersFound,
KademliaGetRoutingTableInfo { protocol_id: String },
SendRpc(RpcData),
Error(NetworkError),
GetNetworkInfo {
peer_id: PeerId,
connected_peers: Vec<PeerId>,
external_addresses: Vec<MultiaddrString>,
},
GossipsubBroadcastSuccess,
GossipsubJoinSuccess,
GossipsubExitSuccess,
GossipsubGetInfo {
topics: StringVector,
mesh_peers: Vec<(PeerId, StringVector)>,
blacklist: HashSet<PeerId>,
},
GossipsubBlacklistSuccess,
}
#[derive(Error, Debug, Clone, PartialEq)]
pub enum NetworkError {
#[error("timeout occured waiting for data from network layer")]
NetworkReadTimeout,
#[error("internal request stream buffer is full")]
StreamBufferOverflow,
#[error("failed to store record in DHT")]
KadStoreRecordError(Vec<u8>),
#[error("failed to fetch data from peer")]
RpcDataFetchError,
#[error("failed to fetch record from the DHT")]
KadFetchRecordError(Vec<u8>),
#[error("task carrying app response panicked")]
InternalTaskError,
#[error("failed to dail peer")]
DailPeerError,
#[error("failed to broadcast message to peers in the topic")]
GossipsubBroadcastMessageError,
#[error("failed to join a mesh network")]
GossipsubJoinNetworkError,
#[error("failed to exit a mesh network")]
GossipsubExitNetworkError,
#[error("internal stream failed to transport data")]
InternalStreamError,
#[error("replica network not found")]
MissingReplNetwork,
#[error("network id for sharding has not been configured. See `CoreBuilder::with_shard()`")]
MissingShardingNetworkIdError,
#[error("threshold for data forwarding not met")]
DataForwardingError,
#[error("failed to shard data")]
ShardingFailureError,
#[error("failed to fetch sharded data")]
ShardingFetchError,
#[error("shard not found for input key")]
ShardNotFound,
#[error("no nodes found in logical shard")]
MissingShardNodesError,
}
#[derive(Debug, Clone, Copy, Eq, PartialEq, Hash)]
pub struct StreamId(u32);
impl StreamId {
pub fn new() -> Self {
StreamId(0)
}
pub fn next(current_id: StreamId) -> Self {
StreamId(current_id.0.wrapping_add(1))
}
}
#[derive(Clone, Debug)]
pub(super) struct StreamRequestBuffer {
size: usize,
buffer: HashSet<StreamId>,
}
impl StreamRequestBuffer {
pub fn new(buffer_size: usize) -> Self {
Self {
size: buffer_size,
buffer: HashSet::new(),
}
}
pub fn insert(&mut self, id: StreamId) -> bool {
if self.buffer.len() < self.size {
self.buffer.insert(id);
return true;
}
false
}
}
pub(super) struct StreamResponseBuffer {
size: usize,
buffer: HashMap<StreamId, AppResponseResult>,
}
impl StreamResponseBuffer {
pub fn new(buffer_size: usize) -> Self {
Self {
size: buffer_size,
buffer: HashMap::new(),
}
}
pub fn insert(&mut self, id: StreamId, response: AppResponseResult) -> bool {
if self.buffer.len() < self.size {
self.buffer.insert(id, response);
return true;
}
false
}
pub fn remove(&mut self, id: &StreamId) -> Option<AppResponseResult> {
self.buffer.remove(&id)
}
}
#[derive(Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub(super) enum Rpc {
ReqResponse { data: RpcData },
}
pub enum RpcConfig {
Default,
Custom {
timeout: Duration,
max_concurrent_streams: usize,
},
}
#[derive(Debug, Clone, Eq, PartialEq, Hash)]
pub enum NetworkEvent {
NewListenAddr {
local_peer_id: PeerId,
listener_id: ListenerId,
address: Multiaddr,
},
RoutingTableUpdated { peer_id: PeerId },
ConnectionEstablished {
peer_id: PeerId,
connection_id: ConnectionId,
endpoint: ConnectedPoint,
num_established: NonZeroU32,
established_in: Duration,
},
ConnectionClosed {
peer_id: PeerId,
connection_id: ConnectionId,
endpoint: ConnectedPoint,
num_established: u32,
},
ExpiredListenAddr {
listener_id: ListenerId,
address: Multiaddr,
},
ListenerClosed {
listener_id: ListenerId,
addresses: Vec<Multiaddr>,
},
ListenerError { listener_id: ListenerId },
Dialing {
peer_id: Option<PeerId>,
connection_id: ConnectionId,
},
NewExternalAddrCandidate { address: Multiaddr },
ExternalAddrConfirmed { address: Multiaddr },
ExternalAddrExpired { address: Multiaddr },
IncomingConnection {
connection_id: ConnectionId,
local_addr: Multiaddr,
send_back_addr: Multiaddr,
},
IncomingConnectionError {
connection_id: ConnectionId,
local_addr: Multiaddr,
send_back_addr: Multiaddr,
},
OutgoingConnectionError {
connection_id: ConnectionId,
peer_id: Option<PeerId>,
},
OutboundPingSuccess { peer_id: PeerId, duration: Duration },
OutboundPingError { peer_id: PeerId },
IdentifyInfoReceived { peer_id: PeerId, info: IdentifyInfo },
KademliaPutRecordSuccess { key: Vec<u8> },
KademliaPutRecordError,
KademliaStartProvidingSuccess { key: Vec<u8> },
KademliaStartProvidingError,
RpcIncomingMessageHandled { data: RpcData },
GossipsubUnsubscribeMessageReceived { peer_id: PeerId, topic: String },
GossipsubSubscribeMessageReceived { peer_id: PeerId, topic: String },
ReplicaDataIncoming {
data: StringVector,
network: String,
outgoing_timestamp: Seconds,
incoming_timestamp: Seconds,
message_id: String,
source: PeerId,
},
IncomingForwardedData {
data: StringVector,
source: PeerId,
},
GossipsubIncomingMessageHandled { source: PeerId, data: StringVector },
}
#[derive(Debug, Clone, Eq, PartialEq, Hash)]
pub struct IdentifyInfo {
pub public_key: PublicKey,
pub listen_addrs: Vec<Multiaddr>,
pub protocols: Vec<StreamProtocol>,
pub observed_addr: Multiaddr,
}
#[derive(Clone)]
pub(super) struct NetworkInfo {
pub id: StreamProtocol,
pub ping: PingInfo,
pub gossipsub: gossipsub_cfg::GossipsubInfo,
pub rpc_handler_fn: fn(RpcData) -> RpcData,
pub gossip_filter_fn: fn(PeerId, MessageId, Option<PeerId>, String, StringVector) -> bool,
pub replication: replication::ReplInfo,
pub sharding: sharding::ShardingInfo,
}
pub mod ping_config {
use libp2p_identity::PeerId;
use std::{collections::HashMap, time::Duration};
#[derive(Debug, Clone)]
pub enum PingErrorPolicy {
NoDisconnect,
DisconnectAfterMaxErrors(u16),
DisconnectAfterMaxTimeouts(u16),
}
#[derive(Debug, Clone)]
pub struct PingManager {
pub timeouts: HashMap<PeerId, u16>,
pub outbound_errors: HashMap<PeerId, u16>,
}
#[derive(Debug, Clone)]
pub struct PingConfig {
pub interval: Duration,
pub timeout: Duration,
pub err_policy: PingErrorPolicy,
}
#[derive(Debug, Clone)]
pub struct PingInfo {
pub policy: PingErrorPolicy,
pub manager: PingManager,
}
}
pub mod gossipsub_cfg {
use super::*;
#[derive(Clone, Debug, Default)]
pub struct Blacklist {
pub list: HashSet<PeerId>,
}
pub enum GossipsubConfig {
Default,
Custom {
config: gossipsub::Config,
auth: gossipsub::MessageAuthenticity,
},
}
impl Blacklist {
pub fn into_inner(&self) -> HashSet<PeerId> {
self.list.clone()
}
}
#[derive(Clone)]
pub struct GossipsubInfo {
pub blacklist: Blacklist,
}
}
#[derive(Clone)]
pub(super) struct DataQueue<T: Debug + Clone + Eq + PartialEq + Hash> {
buffer: Arc<Mutex<VecDeque<T>>>,
}
impl<T> DataQueue<T>
where
T: Debug + Clone + Eq + PartialEq + Hash,
{
const INITIAL_BUFFER_CAPACITY: usize = 300;
pub fn new() -> Self {
Self {
buffer: Arc::new(Mutex::new(VecDeque::with_capacity(
DataQueue::<T>::INITIAL_BUFFER_CAPACITY,
))),
}
}
pub async fn pop(&self) -> Option<T> {
self.buffer.lock().await.pop_front()
}
pub async fn push(&self, item: T) {
let mut buffer = self.buffer.lock().await;
if buffer.len() >= MAX_QUEUE_ELEMENTS {
buffer.pop_front();
}
buffer.push_back(item);
}
pub async fn into_inner(&self) -> VecDeque<T> {
self.buffer.lock().await.clone()
}
pub async fn drain(&mut self) {
self.buffer.lock().await.drain(..);
}
}