use std::collections::BTreeMap;
use std::collections::HashSet;
use std::net::AddrParseError;
use std::net::SocketAddr;
use std::time::Instant;
use config::Config;
use encoding::message::{Header, Message};
use encoding::payload::BroadcastPayload;
use handling::MessageHandler;
pub use handling::MessageInfo;
use itertools::Itertools;
use kbucket::MAX_BUCKET_HEIGHT;
use kbucket::{BucketHeight, Tree};
use maintainer::TableMaintainer;
use peer::{PeerInfo, PeerNode};
use rand::prelude::IteratorRandom;
pub(crate) use rwlock::RwLock;
use tokio::sync::mpsc::{self, Receiver, Sender};
use tokio::task;
use tracing::warn;
use tracing::{error, info};
use transport::{MessageBeanOut, WireNetwork};
pub mod config;
mod encoding;
mod handling;
mod kbucket;
mod maintainer;
mod peer;
mod rwlock;
pub mod transport;
const DEFAULT_K_K: usize = 20;
const K_K: usize = get_k_k();
const K_ID_LEN_BYTES: usize = 16;
const K_NONCE_LEN: usize = 4;
const K_DIFF_MIN_BIT: usize = 8;
const K_DIFF_PRODUCED_BIT: usize = 8;
const fn get_k_k() -> usize {
match option_env!("KADCAST_K") {
Some(v) => match konst::primitive::parse_usize(v) {
Ok(e) => e,
Err(_) => DEFAULT_K_K,
},
None => DEFAULT_K_K,
}
}
const K_ALPHA: usize = 3;
const K_BETA: usize = 3;
pub struct Peer {
outbound_sender: Sender<MessageBeanOut>,
ktable: RwLock<Tree<PeerInfo>>,
header: Header,
blocklist: RwLock<HashSet<SocketAddr>>,
}
pub trait NetworkListen: Send {
fn on_message(&self, message: Vec<u8>, metadata: MessageInfo);
}
impl Peer {
pub fn new<L: NetworkListen + 'static>(
config: Config,
listener: L,
) -> Result<Self, AddrParseError> {
let network_id = config.kadcast_id.unwrap_or_default();
let tree = Tree::new(
PeerNode::generate(&config.public_address[..], network_id)?,
config.bucket,
);
let (inbound_channel_tx, inbound_channel_rx) =
mpsc::channel(config.channel_size);
let (outbound_channel_tx, outbound_channel_rx) =
mpsc::channel(config.channel_size);
let (notification_channel_tx, listener_channel_rx) =
mpsc::channel(config.channel_size);
let header = tree.root().to_header();
let table = rwlock::new(tree);
let blocklist = rwlock::new(HashSet::new());
let peer = Peer {
outbound_sender: outbound_channel_tx.clone(),
ktable: table.clone(),
header,
blocklist: blocklist.clone(),
};
let nodes = config.bootstrapping_nodes.clone();
let idle_time = config.bucket.bucket_ttl;
let min_peers = config.bucket.min_peers;
let version =
semver::Version::parse(&config.version).expect("Invalid version");
MessageHandler::start(
table.clone(),
inbound_channel_rx,
outbound_channel_tx.clone(),
notification_channel_tx,
&config,
);
WireNetwork::start(
inbound_channel_tx,
outbound_channel_rx,
config,
blocklist,
);
TableMaintainer::start(
nodes,
table,
outbound_channel_tx,
idle_time,
min_peers,
version,
);
task::spawn(Peer::notifier(listener_channel_rx, listener));
Ok(peer)
}
async fn notifier(
mut listener_channel_rx: Receiver<(Vec<u8>, MessageInfo)>,
listener: impl NetworkListen,
) {
while let Some(notif) = listener_channel_rx.recv().await {
listener.on_message(notif.0, notif.1);
}
}
pub async fn alive_nodes(&self, amount: usize) -> Vec<SocketAddr> {
let table_read = self.ktable.read().await;
let rng = &mut rand::thread_rng();
table_read
.alive_nodes()
.map(|i| i.as_peer_info().to_socket_address())
.choose_multiple(rng, amount)
}
#[doc(hidden)]
pub async fn report(&self) {
let table_read = self.ktable.read().await;
table_read.all_sorted().for_each(|(h, nodes)| {
let nodes_joined = nodes.map(|p| p.value().address()).join(",");
info!("H: {h} - Nodes {nodes_joined}");
});
}
pub async fn to_route_table(
&self,
) -> BTreeMap<u8, Vec<(SocketAddr, Instant)>> {
let mut route_table = BTreeMap::new();
let table_read = self.ktable.read().await;
table_read.buckets().for_each(|(h, nodes)| {
let nodes = nodes
.map(|p| (*p.value().address(), *p.seen_at()))
.collect::<Vec<_>>();
route_table.insert(h, nodes);
});
route_table
}
pub async fn broadcast(
&self,
message: &[u8],
height: Option<BucketHeight>,
) {
if message.is_empty() {
error!("Message empty");
return;
}
for i in self.extract(message, height).await {
self.outbound_sender.send(i).await.unwrap_or_else(|e| {
error!("Unable to send from broadcast {e}")
});
}
}
async fn extract(
&self,
message: &[u8],
height: Option<BucketHeight>,
) -> Vec<(Message, Vec<SocketAddr>)> {
const LAST_BUCKET_IDX: u8 = MAX_BUCKET_HEIGHT as u8 - 1;
let ktable = self.ktable.read().await;
if height.is_none() && ktable.bucket_size(LAST_BUCKET_IDX) == 0 {
warn!("Broadcasting a new message with empty bucket height {LAST_BUCKET_IDX}")
}
ktable
.extract(height)
.map(|(height, nodes)| {
let msg = Message::broadcast(
self.header,
BroadcastPayload {
height,
gossip_frame: message.to_vec(),
},
);
let targets =
nodes.map(|node| *node.value().address()).collect();
(msg, targets)
})
.collect()
}
pub async fn send(&self, message: &[u8], target: SocketAddr) {
self.send_to_peers(message, vec![target]).await
}
pub async fn send_to_peers(
&self,
message: &[u8],
targets: Vec<SocketAddr>,
) {
if message.is_empty() {
return;
}
let msg = Message::broadcast(
self.header,
BroadcastPayload {
height: 0,
gossip_frame: message.to_vec(),
},
);
self.outbound_sender
.send((msg, targets))
.await
.unwrap_or_else(|e| error!("Unable to send from send method {e}"));
}
pub async fn block_source(&self, source: SocketAddr) {
self.blocklist.write().await.insert(source);
let binary_key = PeerNode::compute_id(&source.ip(), source.port());
self.ktable.write().await.remove_peer(&binary_key);
}
}
#[cfg(test)]
mod tests {
pub type Result<T> = core::result::Result<T, Box<dyn std::error::Error>>;
}