#![allow(clippy::style)]
use std::collections::{HashMap, HashSet};
use std::ops::Deref;
use std::path::PathBuf;
use std::sync::{mpsc, Arc, Mutex};
use std::thread::JoinHandle;
use bit_vec::BitVec;
use primitive_types::U256;
use priority_queue::PriorityQueue;
use rand::seq::IteratorRandom;
use serde::{Deserialize, Serialize};
use sha3::Digest;
use thiserror::Error;
use kindelia_common::crypto::{self, Hashed, Keccakable};
use kindelia_common::Name;
use kindelia_lang::ast::Statement;
use kindelia_lang::parser;
use crate::api::{self, CtrInfo, RegInfo};
use crate::api::{BlockInfo, FuncInfo, NodeRequest};
use crate::bits;
use crate::bits::ProtoSerialize;
use crate::config::MineConfig;
use crate::constants;
use crate::net::{ProtoAddr, ProtoComm};
use crate::persistence::{BlockStorage, BlockStorageError};
use crate::runtime::*;
use crate::util::*;
use crate::events::{NodeEventEmittedInfo, NodeEventType};
use crate::heartbeat;
macro_rules! emit_event {
($tx: expr, $event: expr) => {
#[cfg(feature = "events")]
if let Some(ref tx) = $tx {
if let Err(_) = tx.send(($event, get_time_micro())) {
eprintln!("Could not send event");
}
}
};
($tx: expr, $event: expr, tags = $($tag:ident),+) => {
#[cfg(feature = "events")]
if let Some(ref tx) = $tx {
if let Err(_) = tx.send(($event, get_time_micro())) {
eprintln!("Could not send event");
}
}
};
}
#[derive(Debug, Clone, Eq)]
pub struct Transaction {
data: Vec<u8>,
pub hash: U256,
}
#[derive(Debug, Error, Serialize, Deserialize)]
pub enum TransactionError {
#[error(
"Transaction size ({len}) is greater than max transaction size ({})",
MAX_TRANSACTION_SIZE
)]
Oversize { len: usize },
}
impl Transaction {
pub fn new(mut data: Vec<u8>) -> Result<Self, TransactionError> {
while data.len() == 0 || data.len() % 5 != 0 {
data.push(0);
}
if data.len() > MAX_TRANSACTION_SIZE {
return Err(TransactionError::Oversize { len: data.len() });
}
let hash = hash_bytes(&data);
Ok(Transaction { data, hash })
}
pub fn encode_length(&self) -> (u8, u8) {
let len = self.data.len() as u16;
let num = (len as u16).reverse_bits();
(((num >> 8) & 0xFF) as u8, (num & 0xFF) as u8)
}
fn decode_length(pair: (u8, u8)) -> usize {
(((pair.0 as u16) << 8) | (pair.1 as u16)).reverse_bits() as usize
}
pub fn to_statement(&self) -> Option<Statement> {
Statement::proto_deserialized(&BitVec::from_bytes(&self.data))
}
}
impl Deref for Transaction {
type Target = [u8];
fn deref(&self) -> &[u8] {
&self.data
}
}
impl TryFrom<&Statement> for Transaction {
type Error = TransactionError;
fn try_from(stmt: &Statement) -> Result<Self, Self::Error> {
Transaction::new(stmt.proto_serialized().to_bytes())
}
}
impl PartialEq for Transaction {
fn eq(&self, other: &Self) -> bool {
self.hash == other.hash
}
}
impl std::hash::Hash for Transaction {
fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
self.hash.hash(state);
}
}
#[derive(Debug, Clone, PartialEq)]
pub struct Body {
pub data: Vec<u8>,
}
impl Body {
pub fn fill_from<I, T>(transactions: I) -> Body
where
I: IntoIterator<Item = T>,
T: Into<Transaction>,
{
let mut body_vec = vec![0];
let mut tx_count = 0;
for transaction in transactions.into_iter() {
let transaction = transaction.into();
let tx_len = transaction.data.len();
if tx_len == 0 {
continue;
}
if tx_count + 1 > 255 {
break;
}
if add_transaction_to_body_vec(&mut body_vec, &transaction).is_err() {
break;
}
tx_count += 1;
}
body_vec[0] = (tx_count as u8).reverse_bits();
Body { data: body_vec }
}
pub fn from_transactions_iter<I, T>(transactions: I) -> Result<Body, ()>
where
I: IntoIterator<Item = T>,
T: TryInto<Transaction>,
{
let mut data = vec![0];
let mut tx_count = 0;
for transaction in transactions.into_iter() {
let transaction = transaction.try_into().map_err(|_| ())?;
let tx_len = transaction.data.len();
if tx_len == 0 {
continue;
}
if data.len() + 2 + tx_len > MAX_BODY_SIZE {
return Err(());
}
if tx_count + 1 > 255 {
return Err(());
}
tx_count += 1;
let len_bytes = transaction.encode_length();
data.push(len_bytes.0);
data.push(len_bytes.1);
data.extend_from_slice(&transaction.data);
}
data[0] = (tx_count as u8).reverse_bits();
Ok(Body { data })
}
}
pub type HashedBlock = Hashed<Block>;
#[derive(Debug, Clone)]
pub struct Block {
pub prev: U256,
pub time: u128,
pub meta: u128,
pub body: Body,
}
impl Block {
pub fn new(prev: U256, time: u128, meta: u128, body: Body) -> Block {
Block { prev, time, meta, body }
}
}
impl crypto::Keccakable for Block {
fn keccak256(&self) -> crypto::Hash {
let mut bytes: Vec<u8> = Vec::new();
bytes.extend_from_slice(&u256_to_bytes(self.prev));
bytes.extend_from_slice(&u128_to_bytes(self.time));
bytes.extend_from_slice(&u128_to_bytes(self.meta));
bytes.extend_from_slice(&self.body.data);
crypto::Hash::keccak256_from_bytes(&bytes)
}
}
#[derive(Debug, Clone, PartialEq)]
pub enum InclusionState {
UNSEEN,
MISSING,
PENDING,
INCLUDED,
}
#[rustfmt::skip]
pub struct Node<C: ProtoComm, S: BlockStorage> {
pub network_id : u32, pub comm : C, pub addr : C::Address, pub storage : S, pub runtime : Runtime, pub query_recv : mpsc::Receiver<NodeRequest<C>>, pub pool : PriorityQueue<Transaction, u64>, pub peers : PeersStore<C::Address>, pub genesis_hash : U256,
pub tip : U256, pub block : U256Map<HashedBlock>, pub pending : U256Map<HashedBlock>, pub ancestor : U256Map<U256>, pub wait_list : U256Map<Vec<U256>>, pub children : U256Map<Vec<U256>>, pub work : U256Map<U256>, pub target : U256Map<U256>, pub height : U256Map<u128>, pub results : U256Map<Vec<StatementResult>>,
#[cfg(feature = "events")]
pub event_emitter : Option<mpsc::Sender<NodeEventEmittedInfo>>,
pub miner_comm : Option<MinerCommunication>,
}
#[derive(Debug, Error, Serialize, Deserialize)]
pub enum PoolError {
#[error("Transaction {hash} already included on pool")]
AlreadyIncluded { hash: api::Hash },
}
#[derive(Debug, Copy, Clone, Serialize, Deserialize)]
pub struct Peer<A: ProtoAddr> {
pub seen_at: u128,
pub address: A,
}
pub struct PeersStore<A: ProtoAddr> {
seen: HashMap<A, Peer<A>>,
active: HashMap<A, Peer<A>>,
}
impl<A: ProtoAddr> PeersStore<A> {
pub fn new() -> PeersStore<A> {
PeersStore { seen: HashMap::new(), active: HashMap::new() }
}
pub fn activate(&mut self, addr: &A, peer: Peer<A>) {
let now = get_time();
if peer.seen_at >= now - PEER_TIMEOUT {
self.active.insert(*addr, peer);
}
}
pub fn see_peer(
&mut self,
peer: Peer<A>,
#[cfg(feature = "events")] event_emitter: Option<
mpsc::Sender<NodeEventEmittedInfo>,
>,
) {
let addr = peer.address;
match self.seen.get(&addr) {
None => {
self.seen.insert(addr, peer);
emit_event!(
event_emitter,
NodeEventType::see_peer_not_seen(&peer),
tags = peers,
see_peer
);
self.activate(&addr, peer);
}
Some(_) => {
let old_peer = self.active.get_mut(&addr);
match old_peer {
None => {
emit_event!(
event_emitter,
NodeEventType::see_peer_activated(&peer),
tags = peers,
see_peer
);
self.activate(&addr, peer);
}
Some(old_peer) => {
let new_seen_at = std::cmp::max(peer.seen_at, old_peer.seen_at);
emit_event!(
event_emitter,
NodeEventType::see_peer_already_active(&old_peer, new_seen_at),
tags = peers,
see_peer
);
old_peer.seen_at = new_seen_at;
}
}
}
}
}
fn timeout(
&mut self,
#[cfg(feature = "events")] event_emitter: Option<
mpsc::Sender<NodeEventEmittedInfo>,
>,
) {
let mut forget = Vec::new();
for (_, peer) in &self.active {
if peer.seen_at < get_time() - PEER_TIMEOUT {
emit_event!(
event_emitter,
NodeEventType::timeout(&peer),
tags = peers,
timeout
);
forget.push(peer.address);
}
}
for addr in forget {
self.inactivate_peer(&addr);
}
}
pub fn inactivate_peer(&mut self, addr: &A) {
self.active.remove(addr);
}
pub fn get_all_active(&self) -> Vec<Peer<A>> {
self.active.values().cloned().collect()
}
pub fn get_all(&self) -> Vec<Peer<A>> {
self.seen.values().cloned().collect()
}
pub fn get_random_active(&self, amount: u128) -> Vec<Peer<A>> {
let amount = amount as usize;
let mut rng = rand::thread_rng();
let peers = self.active.values().cloned().choose_multiple(&mut rng, amount);
peers
}
}
#[derive(Debug, Clone)]
pub enum MinerMessage {
Request { prev: U256, body: Body, targ: U256 },
Answer { block: HashedBlock },
Stop,
}
#[derive(Debug, Clone)]
pub struct MinerCommunication {
message: Arc<Mutex<MinerMessage>>,
}
#[allow(clippy::large_enum_variant)]
#[derive(Debug, Clone)]
pub enum Message<A: ProtoAddr> {
NoticeTheseBlocks {
magic: u32,
gossip: bool,
blocks: Vec<Block>,
peers: Vec<Peer<A>>,
},
GiveMeThatBlock {
magic: u32,
bhash: Hash,
},
PleaseMineThisTransaction {
magic: u32,
tx: Transaction,
},
}
pub const _HASH_SIZE: usize = 32;
pub const MAX_BODY_SIZE: usize = 1280;
pub const TRANSACTION_LENGTH_ENCODE_SIZE: usize = 2;
pub const MAX_TRANSACTION_SIZE: usize =
MAX_BODY_SIZE - TRANSACTION_LENGTH_ENCODE_SIZE - 1;
pub const MAX_UDP_SIZE_SLOW: usize = 8000;
pub const _MAX_UDP_SIZE_FAST: usize = 1500;
pub const _IPV4_SIZE: usize = 4;
pub const _IPV6_SIZE: usize = 16;
pub const _PORT_SIZE: usize = 2;
pub const _GOSSIP_FACTOR: u128 = 16;
pub const MINE_ATTEMPTS: u128 = 1024;
pub const TIME_PER_BLOCK: u128 = 1000;
pub const DELAY_TOLERANCE: u128 = 60 * 60 * 1000;
pub const BLOCKS_PER_PERIOD: u128 = 20;
pub const TIME_PER_PERIOD: u128 = TIME_PER_BLOCK * BLOCKS_PER_PERIOD;
pub const INITIAL_DIFFICULTY: u128 = 256;
pub const PEER_TIMEOUT: u128 = 10 * 1000;
pub const _PEER_COUNT_MINIMUM: u128 = 256;
pub const _SHARE_PEER_COUNT: u128 = 3;
pub const _LAST_SEEN_SIZE: u128 = 2;
pub const HANDLE_MESSAGE_DELAY: u128 = 20;
pub const HANDLE_REQUEST_DELAY: u128 = 20;
pub const _HANDLE_MESSAGE_LIMIT: u128 = 5;
pub fn target_to_difficulty(target: U256) -> U256 {
let p256 =
"0xFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF";
let p256 = U256::from(p256);
return p256 / (p256 - target);
}
pub fn difficulty_to_target(difficulty: U256) -> U256 {
let p256 =
"0xFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF";
let p256 = U256::from(p256);
return p256 - p256 / difficulty;
}
pub fn compute_next_target(last_target: U256, scale: U256) -> U256 {
let p32 = U256::from("0x100000000");
let last_difficulty = target_to_difficulty(last_target);
let next_difficulty = u256(1) + (last_difficulty * scale - u256(1)) / p32;
return difficulty_to_target(next_difficulty);
}
pub fn get_hash_work(hash: U256) -> U256 {
if hash == u256(0) {
return u256(0);
} else {
return target_to_difficulty(hash);
}
}
pub fn hash_u256(value: U256) -> U256 {
return hash_bytes(u256_to_bytes(value).as_slice());
}
pub fn hash_bytes(bytes: &[u8]) -> U256 {
let mut hasher = sha3::Keccak256::new();
hasher.update(&bytes);
let hash = hasher.finalize();
return U256::from_little_endian(&hash);
}
pub fn add_transaction_to_body_vec(
body_vec: &mut Vec<u8>,
transaction: &Transaction,
) -> Result<(), String> {
let tx_len = transaction.data.len() + TRANSACTION_LENGTH_ENCODE_SIZE;
let len_info = transaction.encode_length();
if body_vec.len() + tx_len > MAX_BODY_SIZE {
return Err("No enough space in block".to_string());
}
body_vec.push(len_info.0);
body_vec.push(len_info.1);
body_vec.extend_from_slice(&transaction.data);
Ok(())
}
pub fn extract_transactions(body: &Body) -> Vec<Transaction> {
let mut transactions = Vec::new();
let mut index = 1;
let tx_count = body.data[0].reverse_bits();
for _ in 0..tx_count {
if index >= body.data.len() {
break;
}
let tx_len =
Transaction::decode_length((body.data[index], body.data[index + 1]));
index += 2;
if index + tx_len > body.data.len() {
break;
}
let transaction_body = body.data[index..index + tx_len].to_vec();
match Transaction::new(transaction_body) {
Ok(transaction) => transactions.push(transaction),
Err(err) => eprintln!(
"A transaction bigger than block was created from a block{}",
err
), };
index += tx_len;
}
transactions
}
pub fn initial_target() -> U256 {
difficulty_to_target(u256(INITIAL_DIFFICULTY))
}
pub fn zero_hash() -> U256 {
u256(0)
}
pub fn build_genesis_block(stmts: &[Statement]) -> Block {
let body = Body::from_transactions_iter(stmts)
.expect("Genesis statements should fit in a block body");
Block::new(zero_hash(), 0, 0, body)
}
pub fn try_mine(
prev: U256,
body: Body,
targ: U256,
max_attempts: u128,
) -> Option<HashedBlock> {
let rand = rand::random::<u128>();
let time = get_time();
let mut block = Block::new(prev, time, rand, body);
for _i in 0..max_attempts {
block = {
let hashed = block.hashed();
let hash_n = U256::from(hashed.get_hash());
if hash_n >= targ {
return Some(hashed);
}
let mut block = hashed.take();
block.meta = block.meta.wrapping_add(1);
block
}
}
None
}
impl MinerCommunication {
pub fn new() -> Self {
MinerCommunication { message: Arc::new(Mutex::new(MinerMessage::Stop)) }
}
pub fn write(&mut self, new_message: MinerMessage) {
let mut value = self.message.lock().unwrap();
*value = new_message;
}
pub fn read(&self) -> MinerMessage {
return (*self.message.lock().unwrap()).clone();
}
}
pub fn miner_loop(
mut miner_comm: MinerCommunication,
slow_mining: Option<u64>,
#[cfg(feature = "events")] event_emitter: Option<
mpsc::Sender<NodeEventEmittedInfo>,
>,
) {
loop {
if let MinerMessage::Request { prev, body, targ } = miner_comm.read() {
let before = std::time::Instant::now();
let mined = try_mine(prev, body, targ, MINE_ATTEMPTS);
if let Some(slow_ratio) = slow_mining {
let elapsed = before.elapsed();
let sleep_time = elapsed.saturating_mul(slow_ratio as u32);
std::thread::sleep(sleep_time);
}
if let Some(block) = mined {
emit_event!(
event_emitter,
NodeEventType::mined(block.get_hash().into(), targ),
tags = mining,
mined
);
miner_comm.write(MinerMessage::Answer { block });
} else {
emit_event!(
event_emitter,
NodeEventType::failed_mined(targ),
tags = mining,
failed_mined
);
}
}
}
}
#[derive(Error, Debug)]
pub enum NodeError {
#[error(transparent)]
BlockStorage(#[from] BlockStorageError),
}
impl<C: ProtoComm, S: BlockStorage> Node<C, S> {
#[allow(clippy::too_many_arguments)]
pub fn new(
data_path: PathBuf,
network_id: u32,
addr: C::Address, initial_peers: Vec<C::Address>,
comm: C,
miner_comm: Option<MinerCommunication>,
storage: S,
#[cfg(feature = "events")] event_emitter: Option<
mpsc::Sender<NodeEventEmittedInfo>,
>,
) -> (mpsc::SyncSender<NodeRequest<C>>, Self) {
let (query_sender, query_receiver) = mpsc::sync_channel(1);
let genesis_stmts =
parser::parse_code(constants::GENESIS_CODE).expect("Genesis code parses");
let genesis_block = build_genesis_block(&genesis_stmts);
let genesis_block = genesis_block.hashed();
let genesis_hash = genesis_block.get_hash().into();
let runtime = init_runtime(data_path.join("heaps"), &genesis_stmts);
#[rustfmt::skip]
let mut node = Node {
network_id,
addr,
comm,
runtime,
pool : PriorityQueue:: new(),
peers : PeersStore:: new(),
genesis_hash,
tip : genesis_hash,
block : u256map_from([(genesis_hash, genesis_block)]),
pending : u256map_new(),
ancestor : u256map_new(),
wait_list: u256map_new(),
children : u256map_from([(genesis_hash, vec![] )]),
work : u256map_from([(genesis_hash, u256(0) )]),
height : u256map_from([(genesis_hash, 0 )]),
target : u256map_from([(genesis_hash, initial_target())]),
results : u256map_from([(genesis_hash, vec![] )]),
#[cfg(feature = "events")]
event_emitter: event_emitter.clone(),
storage,
query_recv : query_receiver,
miner_comm,
};
let now = get_time();
initial_peers.iter().for_each(|address| {
return node.peers.see_peer(
Peer { address: *address, seen_at: now },
#[cfg(feature = "events")] event_emitter.clone(),
);
});
(query_sender, node)
}
pub fn add_transaction(
&mut self,
transaction: Transaction,
) -> Result<(), PoolError> {
let hash = transaction.hash;
let t_score = hash.low_u64();
if self.pool.get(&transaction).is_none() {
self.pool.push(transaction, t_score);
Ok(())
} else {
Err(PoolError::AlreadyIncluded { hash: hash.into() })
}
}
pub fn add_block(&mut self, block: &HashedBlock) {
let mut must_include = vec![block.clone()];
while let Some(block) = must_include.pop() {
let btime = block.time;
if btime >= get_time() + DELAY_TOLERANCE {
emit_event!(
self.event_emitter,
NodeEventType::too_late(&block),
tags = add_block,
too_late
);
continue;
}
let bhash = block.get_hash().into();
if let Some(block) = self.block.get(&bhash) {
let height = self.height[&bhash];
emit_event!(
self.event_emitter,
NodeEventType::already_included(block, height),
tags = add_block,
already_included
);
continue;
}
let phash = block.prev;
if self.block.get(&phash).is_some() {
let work = get_hash_work(bhash); self.block.insert(bhash, block.clone()); self.work.insert(bhash, u256(0)); self.height.insert(bhash, 0); self.target.insert(bhash, u256(0)); self.children.insert(bhash, vec![]); self.ancestor.remove(&bhash);
let has_enough_work = bhash >= self.target[&phash];
let advances_time = btime > self.block[&phash].time;
if has_enough_work && advances_time {
self.work.insert(bhash, self.work[&phash] + work); self.height.insert(bhash, self.height[&phash] + 1);
if self.height[&bhash] > 0
&& self.height[&bhash] > BLOCKS_PER_PERIOD
&& self.height[&bhash] % BLOCKS_PER_PERIOD == 1
{
let mut checkpoint_hash = phash;
for _ in 0..BLOCKS_PER_PERIOD - 1 {
checkpoint_hash = self.block[&checkpoint_hash].prev;
}
let period_time = btime - self.block[&checkpoint_hash].time;
let last_target = self.target[&phash];
let next_scaler = 2u128.pow(32) * TIME_PER_PERIOD / period_time;
let next_target =
compute_next_target(last_target, u256(next_scaler));
self.target.insert(bhash, next_target);
} else {
self.target.insert(bhash, self.target[&phash]);
}
let cur_tip = self.tip;
let new_tip = bhash;
if self.work[&new_tip] > self.work[&cur_tip] {
self.send_to_miner(MinerMessage::Stop);
emit_event!(
self.event_emitter,
NodeEventType::stop_mining(),
tags = mining,
stopped
);
self.tip = bhash;
if true {
let mut must_compute = Vec::new();
let mut old_bhash = cur_tip;
let mut new_bhash = new_tip;
while self.height[&new_bhash] > self.height[&old_bhash] {
must_compute.push(new_bhash);
new_bhash = self.block[&new_bhash].prev;
}
while self.height[&old_bhash] > self.height[&new_bhash] {
old_bhash = self.block[&old_bhash].prev;
}
while old_bhash != new_bhash {
must_compute.push(new_bhash);
old_bhash = self.block[&old_bhash].prev;
new_bhash = self.block[&new_bhash].prev;
}
for bhash_comp in must_compute.iter().rev() {
let block_height = self.height[bhash_comp];
if let Err(e) = self
.storage
.write_block(block_height, self.block[bhash_comp].clone())
{
eprintln!("WARN: Error writing block to disk.\n{}", e)
}
let block_comp = &self.block[bhash_comp];
for tx in extract_transactions(&block_comp.body) {
self.pool.remove(&tx);
}
}
let mut tick = self.height[&old_bhash];
let runtime_old_tick = self.runtime.get_tick();
self.runtime.rollback(tick as u64);
let old = (&self.block[&cur_tip], self.height[&cur_tip]);
let new = (&self.block[&new_tip], self.height[&new_tip]);
let common = (&self.block[&old_bhash], self.height[&old_bhash]); let ticks =
(runtime_old_tick as u128, self.runtime.get_tick() as u128);
emit_event!(
self.event_emitter,
NodeEventType::reorg(old, new, common, ticks, work),
tags = add_block,
reorg
);
while tick as u64 > self.runtime.get_tick() {
must_compute.push(new_bhash);
new_bhash = self.block[&new_bhash].prev;
tick -= 1;
}
for bhash_comp in must_compute.iter().rev() {
let block_comp = &self.block[bhash_comp].clone();
self.compute_block(block_comp, &block); }
}
}
} else {
emit_event!(
self.event_emitter,
NodeEventType::not_enough_work(&self.block[&bhash]),
tags = add_block,
not_enough_work
);
}
let height = self.height.get(&bhash).copied();
let siblings: Vec<_> =
self.children[&block.prev].iter().copied().collect();
emit_event!(
self.event_emitter,
NodeEventType::included(&block, height, &siblings, work),
tags = add_block,
block_included
);
self.children.entry(phash).or_insert_with(Vec::new).push(bhash);
if let Some(wait_list) = self.wait_list.get(&bhash) {
for waiting_for_me in wait_list {
must_include
.push(self.pending.remove(waiting_for_me).expect("block"));
}
self.wait_list.remove(&bhash);
}
} else if self.pending.get(&bhash).is_none() {
self.pending.insert(bhash, block.clone());
self.wait_list.entry(phash).or_insert_with(|| Vec::new()).push(bhash);
emit_event!(
self.event_emitter,
NodeEventType::missing_parent(&block),
tags = add_block,
missing_parent
);
}
}
}
pub fn compute_block(
&mut self,
block: &HashedBlock,
added_block: &HashedBlock,
) {
let transactions = extract_transactions(&block.body);
let mut statements = Vec::new();
for transaction in transactions {
if let Some(statement) = transaction.to_statement() {
statements.push(statement);
}
}
let bhash = U256::from(block.get_hash());
self.runtime.set_time(block.time >> 8);
self.runtime.set_meta(block.meta >> 8);
self.runtime.set_hax0((bhash >> 000).low_u128() >> 8);
self.runtime.set_hax1((bhash >> 120).low_u128() >> 8);
self.runtime.open();
let result = self.runtime.run_statements(&statements, false, false);
emit_event!(
self.event_emitter,
NodeEventType::computed(
(added_block, self.height[&self.tip]),
(block, self.height[&bhash]),
&result
),
tags = add_block,
computed_block
);
self.results.insert(bhash, result);
self.runtime.commit();
}
pub fn get_tip_target(&self) -> U256 {
self.target[&self.tip]
}
pub fn get_longest_chain(&self, max: Option<usize>) -> Vec<U256> {
let mut longest = Vec::new();
let mut bhash = self.tip;
let mut count = 0;
while self.block.contains_key(&bhash) && bhash != zero_hash() {
let block = self.block.get(&bhash).unwrap();
longest.push(bhash);
bhash = block.prev;
count += 1;
if let Some(num) = max {
if count >= num {
break;
}
}
}
longest.reverse();
return longest;
}
pub fn receive_message(&mut self) {
let mut count = 0;
for (addr, msg) in self.comm.proto_recv() {
self.handle_message(addr, &msg);
count = count + 1;
}
}
fn receive_request(&mut self) {
if let Ok(request) = self.query_recv.try_recv() {
self.handle_request(request);
}
}
pub fn get_block_hash_by_index(&self, index: u64) -> Option<U256> {
let mut hsh = self.tip;
let mut idx = self.height[&hsh] as u64;
if index > idx {
return None;
}
while index < idx {
hsh = self.block[&hsh].prev;
idx = idx - 1;
}
return Some(hsh);
}
pub fn get_block_info(&self, hash: &U256) -> Option<BlockInfo> {
let block = self.block.get(hash)?;
let height = self.height.get(hash).expect("Missing block height.");
let height: u64 = (*height).try_into().expect("Block height is too big.");
let results = self.results.get(hash).map(|r| r.clone());
let info = BlockInfo {
block: (&**block).into(),
hash: (*hash).into(),
height,
results,
};
Some(info)
}
pub fn get_func_info(&self, name: &Name) -> Option<FuncInfo> {
let comp_func = self.runtime.read_file(name)?;
let func = comp_func.func;
Some(FuncInfo { func })
}
pub fn get_ctr_info(&self, name: &Name) -> Option<CtrInfo> {
if let Some(arit) = self.runtime.get_arity(name) {
Some(CtrInfo { arit: arit as u64 })
} else {
None
}
}
pub fn get_reg_info(&self, name: Name) -> Option<RegInfo> {
let ownr = self.runtime.get_owner(&name)?;
let ownr = Name::from(ownr);
let pred = |c: &Name| c.to_string().starts_with(&format!("{}.", name));
let ctrs: Vec<Name> =
self.runtime.get_all_ctr().into_iter().filter(pred).collect();
let funs: Vec<Name> =
self.runtime.get_all_funs().into_iter().filter(pred).collect();
let ns: Vec<Name> =
self.runtime.get_all_ns().into_iter().filter(pred).collect();
let stmt = [ctrs, funs, ns].concat();
Some(RegInfo { ownr, stmt })
}
pub fn handle_request(&mut self, request: NodeRequest<C>) {
fn handle_ans_err<T>(req_txt: &str, res: Result<(), T>) {
if let Err(_) = res {
eprintln!("WARN: failed to send node request {} answer back", req_txt);
}
}
match request {
NodeRequest::GetStats { tx } => {
let tick = self.runtime.get_tick();
let mana = {
let limit = self.runtime.get_mana_limit();
let used = self.runtime.get_mana();
let available = limit - used;
api::LimitStats { limit, used, available }
};
let size = {
let limit = self.runtime.get_size_limit();
let used = self.runtime.get_size();
let available = limit - used;
api::LimitStats { limit, used, available }
};
let mut fun_count = 0;
self.runtime.reduce_with(&mut fun_count, |acc, heap| {
*acc += heap.get_fn_count();
});
let mut ctr_count = 0;
self.runtime.reduce_with(&mut ctr_count, |acc, heap| {
*acc += heap.get_ct_count();
});
let mut reg_count = 0;
self.runtime.reduce_with(&mut reg_count, |acc, heap| {
*acc += heap.get_ns_count();
});
let stats = api::Stats {
tick,
mana,
space: size,
fun_count,
ctr_count,
reg_count,
};
handle_ans_err("GetStats", tx.send(stats));
}
NodeRequest::GetBlocks { range, tx } => {
let (start, end) = range;
debug_assert!(start <= end);
debug_assert!(end == -1);
let num = (end - start + 1) as usize;
let hashes = self.get_longest_chain(Some(num));
let infos = hashes
.iter()
.map(|h| self.get_block_info(h).expect("Missing block."))
.collect();
handle_ans_err("GetBlocks", tx.send(infos));
}
NodeRequest::GetBlock { hash, tx } => {
let info = self.get_block_info(&hash);
handle_ans_err("GetBlock", tx.send(info));
}
NodeRequest::GetBlockHash { index, tx } => {
let info = self.get_block_hash_by_index(index);
handle_ans_err("GetBlockHash", tx.send(info));
}
NodeRequest::GetFunctions { tx } => {
let mut funcs: HashSet<u128> = HashSet::new();
self.runtime.reduce_with(&mut funcs, |acc, heap| {
for func in heap.disk.links.keys() {
acc.insert(**func);
}
});
handle_ans_err("GetFunctions", tx.send(funcs));
}
NodeRequest::GetFunction { name, tx } => {
let info = self.get_func_info(&name);
handle_ans_err("GetFunction", tx.send(info));
}
NodeRequest::GetState { name, tx } => {
let state = self.runtime.read_disk_as_term(name.into(), Some(1 << 16));
handle_ans_err("GetState", tx.send(state));
}
NodeRequest::GetPeers { all, tx } => {
let peers =
if all { self.peers.get_all() } else { self.peers.get_all_active() };
handle_ans_err("GetPeers", tx.send(peers));
}
NodeRequest::GetConstructor { name, tx } => {
let info = self.get_ctr_info(&name);
handle_ans_err("GetConstructor", tx.send(info));
}
NodeRequest::GetReg { name, tx } => {
let info = self.get_reg_info(name);
handle_ans_err("GetReg", tx.send(info));
}
NodeRequest::RunCode { code, tx } => {
let result = self.runtime.test_statements_from_code(&code);
handle_ans_err("RunCode", tx.send(result));
}
NodeRequest::PublishCode { code, tx } => {
let statements = parser::parse_statements(&code)
.map_err(|err| err.erro)
.map(|(_, s)| s);
let res = match statements {
Err(err) => Err(err),
Ok(stmts) => {
let results: Vec<_> = stmts
.into_iter()
.map(|stmt| {
let result: Result<(), api::PublishError> = {
let bytes = bitvec_to_bytes(&stmt.proto_serialized());
let t = Transaction::new(bytes)?; self.add_transaction(t)?; Ok(())
};
result
})
.collect();
Ok(results)
}
};
handle_ans_err("PublishCode", tx.send(res));
}
NodeRequest::Run { code, tx } => {
let result = self.runtime.test_statements(&code);
handle_ans_err("Run", tx.send(result));
}
NodeRequest::Publish { code, tx } => {
let result: Vec<_> = code
.into_iter()
.map(|stmt| {
let result: Result<(), api::PublishError> = {
let bytes = bitvec_to_bytes(&stmt.proto_serialized());
let t = Transaction::new(bytes)?; self.add_transaction(t)?; Ok(())
};
result
})
.collect();
handle_ans_err("Publish", tx.send(result));
}
}
}
pub fn send_blocks_to(
&mut self,
addrs: Vec<C::Address>,
gossip: bool,
blocks: Vec<Block>,
share_peers: u128,
) {
let magic = self.network_id;
let peers = self.peers.get_random_active(share_peers);
let msg = Message::NoticeTheseBlocks { magic, gossip, blocks, peers };
self.comm.proto_send(addrs, &msg);
}
pub fn inclusion_state(&self, bhash: &U256) -> InclusionState {
if self.block.contains_key(bhash) {
return InclusionState::INCLUDED;
}
if self.pending.contains_key(bhash) {
return InclusionState::PENDING;
}
if self.wait_list.contains_key(bhash) {
return InclusionState::MISSING;
}
return InclusionState::UNSEEN;
}
pub fn find_missing_ancestor(&mut self, bhash: &U256) -> Option<U256> {
if self.inclusion_state(bhash) == InclusionState::PENDING {
let ancestor = {
let mut ancestor = bhash;
while self.inclusion_state(ancestor) == InclusionState::PENDING {
if let Some(jump_to) = self.ancestor.get(&ancestor) {
ancestor = jump_to;
}
if let Some(ancestor_block) = self.pending.get(&ancestor) {
ancestor = &ancestor_block.prev;
}
}
*ancestor
};
self.ancestor.insert(*bhash, ancestor);
return Some(ancestor);
}
debug_assert!(self.inclusion_state(bhash) != InclusionState::MISSING);
debug_assert!(self.inclusion_state(bhash) != InclusionState::UNSEEN);
return None;
}
pub fn request_missing_ancestor(&mut self, addr: C::Address, bhash: &U256) {
if let Some(missing_ancestor) = self.find_missing_ancestor(bhash) {
let magic = self.network_id;
let msg = &Message::GiveMeThatBlock { magic, bhash: missing_ancestor };
self.comm.proto_send(vec![addr], msg);
}
}
pub fn handle_message(
&mut self,
addr: C::Address,
msg: &Message<C::Address>,
) {
if addr != self.addr {
match msg {
Message::GiveMeThatBlock { magic, .. }
| Message::NoticeTheseBlocks { magic, .. }
| Message::PleaseMineThisTransaction { magic, .. } => {
if magic != &self.network_id {
return;
}
}
}
self.peers.see_peer(
Peer { address: addr, seen_at: get_time() },
#[cfg(feature = "events")]
self.event_emitter.clone(),
);
match msg {
Message::GiveMeThatBlock { magic, bhash } => {
emit_event!(
self.event_emitter,
NodeEventType::give_me_block(*magic, *bhash),
tags = handle_message,
give_me_block
);
let mut bhash = bhash;
let mut chunk = vec![];
let mut tsize = 0; loop {
if !self.block.contains_key(&bhash) {
break;
}
if *bhash == zero_hash() {
break;
}
let block = &self.block[bhash];
let bsize = bits::serialized_block_size(block) as usize;
if tsize + bsize > MAX_UDP_SIZE_SLOW {
break;
}
chunk.push((**block).clone());
tsize += bsize;
bhash = &block.prev;
}
self.send_blocks_to(vec![addr], false, chunk, 0);
}
Message::NoticeTheseBlocks { magic, gossip, blocks, peers } => {
let blocks: Vec<_> =
blocks.iter().cloned().map(|block| block.hashed()).collect();
emit_event!(
self.event_emitter,
NodeEventType::notice_blocks(*magic, *gossip, &blocks, peers),
tags = handle_message,
notice_blocks
);
for peer in peers {
self.peers.see_peer(
*peer,
#[cfg(feature = "events")]
self.event_emitter.clone(),
);
}
for block in &blocks {
self.add_block(&block);
}
if *gossip && blocks.len() > 0 {
let bhash = U256::from(&blocks[0].keccak256());
self.request_missing_ancestor(addr, &bhash);
}
}
Message::PleaseMineThisTransaction { magic, tx } => {
emit_event!(
self.event_emitter,
NodeEventType::mine_trans(*magic, tx.hash),
tags = handle_message,
mine_trans
);
if self.pool.get(&tx).is_none() {
self.pool.push(tx.clone(), tx.hash.low_u64());
self.gossip(5, msg);
}
}
}
}
}
pub fn gossip(&mut self, peer_count: u128, message: &Message<C::Address>) {
let addrs = self
.peers
.get_random_active(peer_count)
.iter()
.map(|x| x.address)
.collect();
self.comm.proto_send(addrs, message);
}
fn broadcast_tip_block(&mut self) {
let addrs: Vec<C::Address> =
self.peers.get_all_active().iter().map(|x| x.address).collect();
let blocks = vec![(*self.block[&self.tip]).clone()];
self.send_blocks_to(addrs, true, blocks, 3);
}
fn gossip_tip_block(&mut self, peer_count: u128) {
let addrs: Vec<C::Address> = self
.peers
.get_random_active(peer_count)
.iter()
.map(|x| x.address)
.collect();
let blocks = vec![(*self.block[&self.tip]).clone()];
self.send_blocks_to(addrs, true, blocks, 3);
}
pub fn load_blocks(&mut self) -> Result<(), NodeError> {
self.storage.disable();
let storage = self.storage.clone();
storage
.read_blocks(|(block, _file_path)| self.add_block(&block.hashed()))?;
self.storage.enable();
Ok(())
}
fn send_to_miner(&mut self, msg: MinerMessage) {
if let Some(comm) = &mut self.miner_comm {
comm.write(msg);
}
}
fn do_ask_mine(&mut self, body: Body) {
let targ = self.get_tip_target();
emit_event!(
self.event_emitter,
NodeEventType::ask_mine(targ),
tags = mining,
ask_mine
);
self.send_to_miner(MinerMessage::Request { prev: self.tip, body, targ });
}
fn do_handle_mined_block(&mut self) {
if let Some(miner_comm) = &mut self.miner_comm {
if let MinerMessage::Answer { block } = miner_comm.read() {
self.add_block(&block);
self.broadcast_tip_block();
}
}
}
pub fn build_body_from_pool(&self) -> Body {
let txs = self.pool.iter().map(|(tx, _score)| tx.clone());
Body::fill_from(txs)
}
fn log_heartbeat(&self) {
let tip = self.tip;
let tip_height = *self.height.get(&tip).unwrap() as u64;
let tip_work = *self.work.get(&tip).unwrap();
let tip_target = *self.target.get(&tip).unwrap();
let difficulty = target_to_difficulty(tip_target);
let included_count = self.block.keys().count();
let mut missing_count: u64 = 0;
let mut pending_count: u64 = 0;
for (bhash, _) in self.wait_list.iter() {
if self.pending.get(bhash).is_some() {
pending_count += 1;
} else {
missing_count += 1;
}
}
let mana_cur = self.runtime.get_mana() as i64;
let mana_lim = self.runtime.get_mana_limit() as i64;
let size_cur = self.runtime.get_size() as i64;
let size_lim = self.runtime.get_size_limit() as i64;
let mana_avail = mana_lim - mana_cur;
let size_avail = size_lim - size_cur;
debug_assert!(size_avail >= 0);
debug_assert!(mana_avail >= 0);
let peers_num = self.peers.get_all_active().len();
let mut tip_blocks = vec![];
let mut block = &self.block[&self.tip];
let mut count = 0;
while block.prev != u256(0) && count < 10 {
tip_blocks.push(block.get_hash().into());
block = &self.block[&block.prev];
count += 1;
}
tip_blocks.reverse();
let event = heartbeat! {
peers: { num: peers_num },
tip: {
height: tip_height,
difficulty: difficulty.low_u64(),
work: tip_work,
},
blocks: {
missing: missing_count,
pending: pending_count,
included: included_count,
},
runtime: {
mana: {
current: mana_cur,
limit: mana_lim,
available: mana_avail,
},
size: {
current: size_cur,
limit: size_lim,
available: size_avail,
}
},
tip_blocks: tip_blocks
};
emit_event!(self.event_emitter, event, tags = heartbeat);
}
pub fn main(mut self) -> ! {
eprintln!("Genesis hash: {:#34x}", self.genesis_hash);
eprintln!("UDP/protocol port: {}", self.addr);
eprintln!("Initial peers: ");
for peer in self.peers.get_all_active() {
eprintln!(" - {}", peer.address);
}
eprintln!("Loading blocks from disk...");
if let Err(e) = self.load_blocks() {
eprintln!("Error loading blocks from disk.\n{}", e);
};
struct Task<C: ProtoComm, S: BlockStorage> {
pub delay: u128,
pub action: fn(&mut Node<C, S>) -> (),
}
let mut tasks = vec![
Task {
delay: 20,
action: |node| {
node.gossip_tip_block(8);
},
},
Task {
delay: HANDLE_MESSAGE_DELAY,
action: |node| {
node.receive_message();
},
},
Task {
delay: HANDLE_REQUEST_DELAY,
action: |node| {
node.receive_request();
},
},
Task {
delay: 5_000,
action: |node| {
node.peers.timeout(
#[cfg(feature = "events")]
node.event_emitter.clone(),
);
},
},
#[cfg(feature = "events")]
Task {
delay: 5_000,
action: |node| {
node.log_heartbeat();
},
},
];
if let Some(_) = self.miner_comm {
let miner_tasks = vec![
Task {
delay: 25,
action: |node| {
if let Some(comm) = &mut node.miner_comm {
if let MinerMessage::Stop { .. } = comm.read() {
node.do_ask_mine(node.build_body_from_pool());
}
}
},
},
Task {
delay: 5,
action: |node| {
node.do_handle_mined_block();
},
},
];
tasks.extend(miner_tasks);
}
let mut last_tick_time: Vec<u128> = vec![0; tasks.len()];
loop {
let now = std::time::Instant::now();
let system_time = get_time(); for (i, task) in tasks.iter().enumerate() {
if last_tick_time[i] + task.delay <= system_time {
(task.action)(&mut self);
last_tick_time[i] = system_time;
}
}
let elapsed = now.elapsed();
let extra = std::time::Duration::from_millis(1).checked_sub(elapsed);
if let Some(extra) = extra {
std::thread::sleep(extra);
}
}
}
}
pub fn spawn_miner(
mine_config: MineConfig,
#[cfg(feature = "events")] event_tx: Option<
mpsc::Sender<NodeEventEmittedInfo>,
>,
) -> (Option<MinerCommunication>, Vec<JoinHandle<()>>) {
if mine_config.enabled {
let miner_comm_0 = MinerCommunication::new();
let miner_comm_1 = miner_comm_0.clone();
let handle = std::thread::spawn(move || {
miner_loop(
miner_comm_0,
mine_config.slow_mining,
#[cfg(feature = "events")]
event_tx,
);
});
(Some(miner_comm_1), vec![handle])
} else {
(None, vec![])
}
}