use std::collections::{BTreeSet, HashSet};
use std::fmt::Debug;
use std::marker::{Send, Sync};
use std::net::SocketAddr;
use std::sync::Arc;
use std::{iter, process, thread, time};
use crossbeam;
use log::{debug, error};
use crate::network::messaging::Messaging;
use crate::network::{commst, connection};
use hbbft::broadcast::{Broadcast, Message};
use hbbft::crypto::{poly::Poly, SecretKey, SecretKeySet};
use hbbft::{ConsensusProtocol, NetworkInfo, SourcedMessage};
pub struct Node<T> {
addr: SocketAddr,
remotes: HashSet<SocketAddr>,
value: Option<T>,
}
impl<T: Clone + Debug + AsRef<[u8]> + PartialEq + Send + Sync + From<Vec<u8>> + Into<Vec<u8>>>
Node<T>
{
pub fn new(addr: SocketAddr, remotes: HashSet<SocketAddr>, value: Option<T>) -> Self {
Node {
addr,
remotes,
value,
}
}
pub fn run(&self) -> Result<T, Box<(dyn std::any::Any + Send + 'static)>> {
let value = &self.value;
let (our_str, connections) = connection::make(&self.addr, &self.remotes);
let mut node_strs: Vec<String> = iter::once(our_str.clone())
.chain(connections.iter().map(|c| c.node_str.clone()))
.collect();
node_strs.sort();
let our_id = node_strs.binary_search(&our_str).unwrap();
let all_ids: BTreeSet<_> = (0..node_strs.len()).collect();
let secret_key_set = SecretKeySet::from(Poly::zero());
let sk_share = secret_key_set.secret_key_share(our_id);
let pub_key_set = secret_key_set.public_keys();
let sk = SecretKey::default();
let pub_keys = all_ids
.iter()
.map(|id| (*id, SecretKey::default().public_key()))
.collect();
let netinfo = NetworkInfo::new(our_id, sk_share, pub_key_set, sk, pub_keys);
if value.is_some() != (our_id == 0) {
panic!("Exactly the first node must propose a value.");
}
let messaging: Messaging<Message> = Messaging::new(all_ids.len());
let rxs_to_comms = messaging.rxs_to_comms();
let tx_from_comms = messaging.tx_from_comms();
let rx_to_algo = messaging.rx_to_algo();
let tx_from_algo = messaging.tx_from_algo();
let stop_tx = messaging.stop_tx();
let mut rng = rand::rngs::OsRng::new().unwrap();
crossbeam::scope(|scope| {
let _msg_handle = messaging.spawn(scope);
let broadcast_handle = scope.spawn(move |_| {
let mut broadcast =
Broadcast::new(Arc::new(netinfo), 0).expect("failed to instantiate broadcast");
if let Some(v) = value {
let step = broadcast
.handle_input(v.clone().into(), &mut rng)
.expect("propose value");
for msg in step.messages {
tx_from_algo.send(msg).expect("send from algo");
}
}
loop {
let message = rx_to_algo.recv().expect("receive from algo");
let SourcedMessage { source: i, message } = message;
debug!("{} received from {}: {:?}", our_id, i, message);
let step = broadcast
.handle_message(&i, message)
.expect("handle broadcast message");
for msg in step.messages {
debug!("{} sending to {:?}: {:?}", our_id, msg.target, msg.message);
tx_from_algo.send(msg).expect("send from algo");
}
if let Some(output) = step.output.into_iter().next() {
println!(
"Broadcast succeeded! Node {} output: {}",
our_id,
String::from_utf8(output).unwrap()
);
break;
}
}
});
for (i, c) in connections.iter().enumerate() {
let node_index = if c.node_str < our_str { i } else { i + 1 };
let rx_to_comms = &rxs_to_comms[node_index];
scope.spawn(move |_| {
match commst::CommsTask::<Message>::new(
tx_from_comms,
rx_to_comms,
c.stream.try_clone().unwrap(),
node_index,
)
.run()
{
Ok(_) => debug!("Comms task {} succeeded", node_index),
Err(e) => error!("Comms task {}: {:?}", node_index, e),
}
});
}
let _ = broadcast_handle.join();
thread::sleep(time::Duration::from_secs(1));
stop_tx
.send(())
.map_err(|e| {
error!("{}", e);
})
.unwrap();
process::exit(0);
}) }
}