1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
//
// Copyright (c) DUSK NETWORK. All rights reserved.
use std::collections::HashSet;
use std::net::AddrParseError;
use std::net::SocketAddr;
use config::Config;
use encoding::message::{Header, Message};
use encoding::payload::BroadcastPayload;
use handling::MessageHandler;
pub use handling::MessageInfo;
use itertools::Itertools;
use kbucket::{BucketHeight, Tree};
use maintainer::TableMaintainer;
use peer::{PeerInfo, PeerNode};
use rand::prelude::IteratorRandom;
pub(crate) use rwlock::RwLock;
use tokio::sync::mpsc::{self, Receiver, Sender};
use tokio::task;
use tracing::{error, info};
use transport::{MessageBeanOut, WireNetwork};
pub mod config;
mod encoding;
mod handling;
mod kbucket;
mod maintainer;
mod peer;
mod rwlock;
pub mod transport;
// Max amount of nodes a bucket should contain
const DEFAULT_K_K: usize = 20;
const K_K: usize = get_k_k();
const K_ID_LEN_BYTES: usize = 16;
const K_NONCE_LEN: usize = 4;
const K_DIFF_MIN_BIT: usize = 8;
const K_DIFF_PRODUCED_BIT: usize = 8;
const fn get_k_k() -> usize {
match option_env!("KADCAST_K") {
Some(v) => match konst::primitive::parse_usize(v) {
Ok(e) => e,
Err(_) => DEFAULT_K_K,
},
None => DEFAULT_K_K,
}
}
// Redundacy factor for lookup
const K_ALPHA: usize = 3;
// Redundacy factor for broadcast
const K_BETA: usize = 3;
/// Struct representing the Kadcast Network Peer
pub struct Peer {
outbound_sender: Sender<MessageBeanOut>,
ktable: RwLock<Tree<PeerInfo>>,
header: Header,
blocklist: RwLock<HashSet<SocketAddr>>,
}
/// The [NetworkListen] trait receives notifications whenever a broadcasted
/// message is received from the network.
pub trait NetworkListen: Send {
fn on_message(&self, message: Vec<u8>, metadata: MessageInfo);
}
impl Peer {
/// Create a [Peer].
///
/// * `config` - The [Config] used to create the Peer
/// * `listener` - The [NetworkListen] impl notified each time a broadcasted
/// message is received from the network
pub fn new<L: NetworkListen + 'static>(
config: Config,
listener: L,
) -> Result<Self, AddrParseError> {
let network_id = config.kadcast_id.unwrap_or_default();
let tree = Tree::new(
PeerNode::generate(&config.public_address[..], network_id)?,
config.bucket,
);
let (inbound_channel_tx, inbound_channel_rx) =
mpsc::channel(config.channel_size);
let (outbound_channel_tx, outbound_channel_rx) =
mpsc::channel(config.channel_size);
let (notification_channel_tx, listener_channel_rx) =
mpsc::channel(config.channel_size);
let header = tree.root().to_header();
let table = rwlock::new(tree);
let blocklist = rwlock::new(HashSet::new());
let peer = Peer {
outbound_sender: outbound_channel_tx.clone(),
ktable: table.clone(),
header,
blocklist: blocklist.clone(),
};
let nodes = config.bootstrapping_nodes.clone();
let idle_time = config.bucket.bucket_ttl;
MessageHandler::start(
table.clone(),
inbound_channel_rx,
outbound_channel_tx.clone(),
notification_channel_tx,
&config,
);
WireNetwork::start(
inbound_channel_tx,
outbound_channel_rx,
config,
blocklist,
);
TableMaintainer::start(nodes, table, outbound_channel_tx, idle_time);
task::spawn(Peer::notifier(listener_channel_rx, listener));
Ok(peer)
}
async fn notifier(
mut listener_channel_rx: Receiver<(Vec<u8>, MessageInfo)>,
listener: impl NetworkListen,
) {
while let Some(notif) = listener_channel_rx.recv().await {
listener.on_message(notif.0, notif.1);
}
}
/// Return the [SocketAddr] of a set of random active nodes.
///
/// * `amount` - The max amount of nodes to return
pub async fn alive_nodes(&self, amount: usize) -> Vec<SocketAddr> {
let table_read = self.ktable.read().await;
// If the `rng` is generated between the `await`, it leads into "the
// trait `std::marker::Send` is not implemented for
// `Rc<UnsafeCell<ReseedingRng<rand_chacha::chacha::ChaCha12Core,
// OsRng>>>`" when used outside this crate
let rng = &mut rand::thread_rng();
table_read
.alive_nodes()
.map(|i| i.as_peer_info().to_socket_address())
.choose_multiple(rng, amount)
}
#[doc(hidden)]
pub async fn report(&self) {
let table_read = self.ktable.read().await;
/*
The usage of `info!` macro can potentially raise a compilation error
depending of which `tracing` crate features are used.
Eg: if you use the `log` feature (or if you have any dependency which
enables it), the `info` macro perform some `move` internally which
conflicts with the `move` performed by the `map` method.
Refactoring this way, we are sure there will be only one `move`
independently to which features are you using
See also: https://github.com/dusk-network/kadcast/issues/60
*/
table_read.all_sorted().for_each(|(h, nodes)| {
let nodes_joined = nodes.map(|p| p.value().address()).join(",");
info!("H: {h} - Nodes {nodes_joined}");
});
}
/// Broadcast a message to the network
///
/// # Arguments
///
/// * `message` - Byte array containing the message to be broadcasted
/// * `height` - (Optional) Overrides default Kadcast broadcast height
///
/// Note:
/// The function returns just after the message is put on the internal queue
/// system. It **does not guarantee** the message will be broadcasted
pub async fn broadcast(
&self,
message: &[u8],
height: Option<BucketHeight>,
) {
if message.is_empty() {
error!("Message empty");
return;
}
let tosend: Vec<_> = self
.ktable
.read()
.await
.extract(height)
.map(|(height, nodes)| {
let msg = Message::Broadcast(
self.header,
BroadcastPayload {
height,
gossip_frame: message.to_vec(), //FIX_ME: avoid clone
},
);
let targets =
nodes.map(|node| *node.value().address()).collect();
(msg, targets)
})
.collect();
for i in tosend {
self.outbound_sender.send(i).await.unwrap_or_else(|e| {
error!("Unable to send from broadcast {e}")
});
}
}
/// Send a message to a peer in the network
///
/// # Arguments
///
/// * `message` - Byte array containing the message to be sent
/// * `target` - Receiver address
///
/// Note:
/// The function returns just after the message is put on the internal queue
/// system. It **does not guarantee** the message will be broadcasted
pub async fn send(&self, message: &[u8], target: SocketAddr) {
if message.is_empty() {
return;
}
// We use the Broadcast message type while setting height to 0
// to prevent further propagation at the receiver
let msg = Message::Broadcast(
self.header,
BroadcastPayload {
height: 0,
gossip_frame: message.to_vec(), //FIX_ME: avoid clone
},
);
let targets = vec![target];
self.outbound_sender
.send((msg, targets))
.await
.unwrap_or_else(|e| error!("Unable to send from send method {e}"));
}
/// Blocks a network source and removes it from the routing table.
///
/// # Arguments
///
/// * `source` - The address of the network source to be blocked.
///
/// This method blocks a network source by adding its address to the
/// blocklist and subsequently removes the corresponding peer from the
/// routing table. This action prevents further communication with the
/// blocked source.
pub async fn block_source(&self, source: SocketAddr) {
self.blocklist.write().await.insert(source);
let binary_key = PeerNode::compute_id(&source.ip(), source.port());
self.ktable.write().await.remove_peer(&binary_key);
}
}
#[cfg(test)]
mod tests {
pub type Result<T> = core::result::Result<T, Box<dyn std::error::Error>>;
}