use crate::node::anr::{Anr, NodeAnrs};
use crate::node::peers::{HandshakeStatus, PeersSummary};
use crate::node::protocol::*;
use crate::node::protocol::{Catchup, CatchupHeight, Instruction, NewPhoneWhoDis, NewPhoneWhoDisReply, Typename};
use crate::node::{anr, peers};
use crate::socket::UdpSocketExt;
use crate::utils::misc::format_duration;
use crate::utils::{Hash, PublicKey};
use crate::{SystemStats, Ver, config, consensus, get_system_stats, metrics, node, utils};
use amadeus_utils::vecpak;
use serde::{Deserialize, Serialize};
use serde_json::Value;
use std::net::{Ipv4Addr, SocketAddr};
use std::sync::Arc;
use tracing::{debug, info, instrument, warn};
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "lowercase")]
pub enum SoftforkStatus {
#[serde(rename = "")]
Healthy,
Minor,
Major,
}
#[derive(Debug, thiserror::Error, strum_macros::IntoStaticStr)]
pub enum Error {
#[error(transparent)]
Io(#[from] std::io::Error),
#[error(transparent)]
Fabric(#[from] consensus::fabric::Error),
#[error(transparent)]
Archiver(#[from] utils::archiver::Error),
#[error(transparent)]
Protocol(#[from] node::protocol::Error),
#[error(transparent)]
Config(#[from] config::Error),
#[error(transparent)]
Anr(#[from] anr::Error),
#[error(transparent)]
Peers(#[from] peers::Error),
#[error("other error: {0}")]
Other(String),
}
impl Typename for Error {
fn typename(&self) -> &'static str {
self.into()
}
}
pub struct Context {
pub(crate) config: config::Config,
pub(crate) metrics: metrics::Metrics,
pub(crate) reassembler: node::ReedSolomonReassembler,
pub(crate) peers: peers::NodePeers,
pub(crate) anrs: NodeAnrs,
pub(crate) fabric: crate::consensus::fabric::Fabric,
pub(crate) socket: Arc<dyn UdpSocketExt>,
}
impl Context {
pub async fn with_config_and_socket(
config: config::Config,
socket: Arc<dyn UdpSocketExt>,
) -> Result<Arc<Self>, Error> {
use crate::config::{
ANR_PERIOD_MILLIS, BROADCAST_PERIOD_MILLIS, CATCHUP_PERIOD_MILLIS, CLEANUP_PERIOD_MILLIS,
CONSENSUS_PERIOD_MILLIS,
};
use crate::consensus::fabric::Fabric;
use crate::utils::archiver::init_storage;
use metrics::Metrics;
use node::ReedSolomonReassembler;
use tokio::time::{Duration, interval};
assert_ne!(config.get_root(), "");
init_storage(&config.get_root()).await?;
let fabric = Fabric::new(&config.get_root()).await?;
let metrics = Metrics::new();
let node_peers = peers::NodePeers::default();
let node_anrs = NodeAnrs::new();
let reassembler = ReedSolomonReassembler::new();
node_anrs.seed(&config).await; node_peers.seed(&fabric, &config, &node_anrs).await?;
let ctx = Arc::new(Self { config, metrics, reassembler, peers: node_peers, anrs: node_anrs, fabric, socket });
tokio::spawn({
let ctx = ctx.clone();
async move {
if let Err(e) = ctx.bootstrap_task().await {
warn!("bootstrap task error: {e}");
}
}
});
tokio::spawn({
let ctx = ctx.clone();
async move {
let mut ticker = interval(Duration::from_millis(CLEANUP_PERIOD_MILLIS));
loop {
ticker.tick().await;
ctx.cleanup_task().await;
}
}
});
tokio::spawn({
let ctx = ctx.clone();
async move {
let mut ticker = interval(Duration::from_millis(ANR_PERIOD_MILLIS));
ticker.tick().await;
loop {
ticker.tick().await;
if let Err(e) = ctx.anr_task().await {
warn!("anr task error: {e}");
}
}
}
});
tokio::spawn({
let ctx = ctx.clone();
async move {
let mut ticker = interval(Duration::from_millis(BROADCAST_PERIOD_MILLIS));
loop {
ticker.tick().await;
if let Err(e) = ctx.broadcast_task().await {
warn!("broadcast task error: {e}");
}
}
}
});
tokio::spawn({
let ctx = ctx.clone();
async move {
let mut ticker = interval(Duration::from_millis(CONSENSUS_PERIOD_MILLIS));
loop {
ticker.tick().await;
if let Err(e) = ctx.consensus_task().await {
warn!("consensus task error: {e}");
}
}
}
});
tokio::spawn({
let ctx = ctx.clone();
async move {
let mut is_syncing = false;
loop {
let tick_ms = if is_syncing { 30 } else { CATCHUP_PERIOD_MILLIS };
tokio::time::sleep(Duration::from_millis(tick_ms)).await;
match ctx.catchup_task().await {
Ok(syncing) => is_syncing = syncing,
Err(e) => warn!("catchup task error: {e}"),
}
}
}
});
tokio::spawn({
let ctx = ctx.clone();
async move {
let mut ticker = interval(Duration::from_millis(600_000));
loop {
ticker.tick().await;
if let Err(e) = ctx.autoupdate_task().await {
warn!("autoupdate task error: {e}");
}
}
}
});
Ok(ctx)
}
#[instrument(skip(self), name = "bootstrap_task")]
async fn bootstrap_task(&self) -> Result<(), Error> {
let new_phone_who_dis = Protocol::NewPhoneWhoDis(NewPhoneWhoDis::new());
for ip in &self.config.seed_ips {
match new_phone_who_dis.send_to_with_metrics(self, *ip).await {
Ok(_) => {
debug!("sent encrypted new_phone_who_dis to seed {ip}");
}
Err(e) => {
warn!("failed to send encrypted new_phone_who_dis to seed {ip}: {e}");
}
}
self.peers.set_handshake_status(*ip, HandshakeStatus::Initiated).await?;
}
info!("sent new_phone_who_dis to {} seed nodes", self.config.seed_ips.len());
Ok(())
}
#[instrument(skip(self), name = "cleanup_task")]
async fn cleanup_task(&self) {
self.anrs.update_rate_limiting_counters().await;
let cleared_shards = self.reassembler.clear_stale().await;
let cleared_peers = self.peers.clear_stale(&self.fabric, &self.anrs).await;
if cleared_shards > 0 || cleared_peers > 0 {
debug!("cleared {} stale shards, {} stale peers", cleared_shards, cleared_peers);
}
}
#[instrument(skip(self), name = "anr_task")]
async fn anr_task(&self) -> Result<(), Error> {
let unverified_ips = self.anrs.get_random_not_verified(3).await;
if !unverified_ips.is_empty() {
let new_phone_who_dis = Protocol::NewPhoneWhoDis(NewPhoneWhoDis::new());
for ip in &unverified_ips {
new_phone_who_dis.send_to_with_metrics(self, *ip).await?;
self.peers.set_handshake_status(*ip, HandshakeStatus::Initiated).await?;
}
}
let verified_ips = self.anrs.get_random_verified(3).await;
if !verified_ips.is_empty() {
let get_peer_anrs = Protocol::GetPeerAnrs(GetPeerAnrs::new(self.anrs.get_all_b3f4().await));
for ip in &verified_ips {
self.send_message_to(&get_peer_anrs, *ip).await?;
}
}
info!("sent new_phone_who_dis to {} and get_peer_anrs to {} nodes", unverified_ips.len(), verified_ips.len());
Ok(())
}
#[instrument(skip(self), name = "broadcast_task")]
async fn broadcast_task(&self) -> Result<(), Error> {
let ping = Protocol::Ping(Ping::new());
let tip = Protocol::EventTip(EventTip::from_current_tips_db(&self.fabric)?);
let my_ip = self.config.get_public_ipv4();
let peers = self.peers.get_all().await?;
if !peers.is_empty() {
let mut sent_count = 0;
for peer in peers {
if peer.ip != my_ip {
self.send_message_to(&ping, peer.ip).await?;
self.send_message_to(&tip, peer.ip).await?;
sent_count += 1;
}
}
debug!("sent {sent_count} ping and tip messages");
}
Ok(())
}
#[instrument(skip(self), name = "autoupdate_task")]
async fn autoupdate_task(&self) -> Result<(), Error> {
Ok(())
}
#[instrument(skip(self), name = "consensus_task")]
async fn consensus_task(&self) -> Result<(), Error> {
use consensus::consensus::{proc_consensus, proc_entries};
self.fabric.start_proc_consensus();
if let Err(e) = proc_entries(&self.fabric, &self.config, self).await {
warn!("proc_entries failed: {e}");
}
if let Err(e) = proc_consensus(&self.fabric) {
warn!("proc_consensus failed: {e}");
}
self.fabric.stop_proc_consensus();
Ok(())
}
#[instrument(skip(self), name = "catchup_task")]
async fn catchup_task(&self) -> Result<bool, Error> {
if self.fabric.is_proc_consensus() {
return Ok(true);
}
let temporal_height = match self.fabric.get_temporal_height() {
Ok(Some(h)) => h,
Ok(None) => 0,
Err(e) => return Err(e.into()),
};
let rooted_height = self.fabric.get_rooted_height()?.unwrap_or_default();
info!("Temporal: {} Rooted: {}", temporal_height, rooted_height);
let trainer_pks = self.fabric.trainers_for_height(temporal_height).unwrap_or_default();
let (peers_temporal, peers_rooted, peers_bft) = self.peers.get_heights(&trainer_pks).await?;
let behind_temporal = peers_temporal.saturating_sub(temporal_height);
let behind_rooted = peers_rooted.saturating_sub(rooted_height);
let behind_bft = peers_bft.saturating_sub(temporal_height);
let rooting_stuck = temporal_height.saturating_sub(rooted_height) > 1000;
if rooting_stuck {
warn!(
"Stopped syncing: getting {} consensuses starting {}",
temporal_height - rooted_height,
rooted_height + 1
);
let online_trainer_ips = self.peers.get_trainer_ips_above_temporal(temporal_height, &trainer_pks).await?;
let heights: Vec<u64> = (rooted_height + 1..=temporal_height).take(200).collect();
let chunks: Vec<Vec<CatchupHeight>> = heights
.into_iter()
.map(|height| CatchupHeight { height, c: Some(true), e: None, a: None, hashes: None })
.collect::<Vec<_>>()
.chunks(20)
.map(|chunk| chunk.to_vec())
.collect();
self.fetch_heights(chunks, online_trainer_ips).await?;
return Ok(false);
}
if behind_bft > 0 {
info!("Behind BFT: Syncing {} entries", behind_bft);
let online_trainer_ips = self.peers.get_trainer_ips_above_temporal(peers_bft, &trainer_pks).await?;
let heights: Vec<u64> = (rooted_height + 1..=peers_bft).take(200).collect();
let chunks: Vec<Vec<CatchupHeight>> = heights
.into_iter()
.map(|height| CatchupHeight { height, c: Some(true), e: Some(true), a: None, hashes: None })
.collect::<Vec<_>>()
.chunks(20)
.map(|chunk| chunk.to_vec())
.collect();
self.fetch_heights(chunks, online_trainer_ips).await?;
return Ok(false);
}
if behind_rooted > 0 {
info!("Behind rooted: Syncing {} entries", behind_rooted);
let online_trainer_ips = self.peers.get_trainer_ips_above_rooted(peers_rooted, &trainer_pks).await?;
let heights: Vec<u64> = (rooted_height + 1..=peers_rooted).take(200).collect();
let chunks: Vec<Vec<CatchupHeight>> = heights
.into_iter()
.map(|height| {
let entries = self.fabric.entries_by_height(height).unwrap_or_default();
let hashes = entries;
CatchupHeight { height, c: Some(true), e: Some(true), a: None, hashes: Some(hashes) }
})
.collect::<Vec<_>>()
.chunks(20)
.map(|chunk| chunk.to_vec())
.collect();
self.fetch_heights(chunks, online_trainer_ips).await?;
return Ok(false);
}
if behind_temporal > 0 {
info!("Behind temporal: Syncing {} entries", behind_temporal);
let online_trainer_ips = self.peers.get_trainer_ips_above_temporal(peers_temporal, &trainer_pks).await?;
let heights: Vec<u64> = (temporal_height..=peers_temporal).take(200).collect();
let chunks: Vec<Vec<CatchupHeight>> = heights
.into_iter()
.map(|height| {
let entries = self.fabric.entries_by_height(height).unwrap_or_default();
let hashes = entries;
CatchupHeight { height, c: None, e: Some(true), a: Some(true), hashes: Some(hashes) }
})
.collect::<Vec<_>>()
.chunks(20)
.map(|chunk| chunk.to_vec())
.collect();
self.fetch_heights(chunks, online_trainer_ips).await?;
return Ok(false);
}
if behind_temporal == 0 {
info!("In sync: Fetching attestations for last entry");
let online_trainer_ips = self.peers.get_trainer_ips_above_temporal(peers_temporal, &trainer_pks).await?;
let entries = self.fabric.entries_by_height(temporal_height).unwrap_or_default();
let hashes = entries;
let chunk = vec![CatchupHeight {
height: temporal_height,
c: None,
e: Some(true),
a: Some(true),
hashes: Some(hashes),
}];
self.fetch_heights(vec![chunk], online_trainer_ips).await?;
}
Ok(behind_temporal > 0 || behind_rooted > 0 || behind_bft > 0 || rooting_stuck)
}
async fn fetch_heights(
&self,
chunks: Vec<Vec<CatchupHeight>>,
peers: Vec<std::net::Ipv4Addr>,
) -> Result<(), Error> {
use rand::seq::SliceRandom;
let mut shuffled_peers = peers;
{
let mut rng = rand::rng();
shuffled_peers.shuffle(&mut rng);
}
for (chunk, peer_ip) in chunks.into_iter().zip(shuffled_peers.into_iter().cycle()) {
Protocol::Catchup(Catchup { height_flags: chunk }).send_to_with_metrics(self, peer_ip).await?;
}
Ok(())
}
pub fn get_prometheus_metrics(&self) -> String {
self.metrics.get_prometheus()
}
pub fn get_json_health(&self) -> Value {
serde_json::json!({
"status": "ok",
"version": env!("CARGO_PKG_VERSION"),
"uptime": self.metrics.get_uptime()
})
}
pub async fn send_message_to(&self, message: &Protocol, dst: Ipv4Addr) -> Result<(), Error> {
message.send_to_with_metrics(self, dst).await.map_err(Into::into)
}
pub async fn recv_from(&self, buf: &mut [u8]) -> std::io::Result<(usize, SocketAddr)> {
self.socket.recv_from_with_metrics(buf, &self.metrics).await
}
pub async fn is_peer_handshaked(&self, ip: Ipv4Addr) -> bool {
if let Some(peer) = self.peers.by_ip(ip).await {
if let Some(ref pk) = peer.pk {
if self.anrs.is_handshaked(pk.as_ref()).await {
return true;
}
}
}
false
}
pub async fn get_peers_summary(&self) -> Result<PeersSummary, Error> {
let my_ip = self.config.get_public_ipv4();
let temporal_height = self.get_temporal_height();
let trainer_pks = self.fabric.trainers_for_height(temporal_height + 1).unwrap_or_default();
self.peers.get_peers_summary(my_ip, &trainer_pks).await.map_err(Into::into)
}
pub fn get_softfork_status(&self) -> SoftforkStatus {
let temporal_height = self.get_temporal_height();
let rooted_height = self.get_rooted_height();
let gap = temporal_height.saturating_sub(rooted_height);
match gap {
0 | 1 => SoftforkStatus::Healthy,
2..=10 => SoftforkStatus::Minor,
_ => SoftforkStatus::Major,
}
}
pub fn get_config(&self) -> &config::Config {
&self.config
}
pub fn get_socket(&self) -> Arc<dyn UdpSocketExt> {
self.socket.clone()
}
pub fn get_metrics(&self) -> &metrics::Metrics {
&self.metrics
}
pub fn get_metrics_snapshot(&self) -> metrics::MetricsSnapshot {
self.metrics.get_snapshot()
}
pub fn get_system_stats(&self) -> SystemStats {
get_system_stats()
}
pub fn get_uptime(&self) -> String {
format_duration(self.metrics.get_uptime())
}
pub fn inc_tasks(&self) {
self.metrics.inc_tasks();
}
pub fn dec_tasks(&self) {
self.metrics.dec_tasks();
}
pub fn get_temporal_height(&self) -> u64 {
self.fabric.get_temporal_height().ok().flatten().unwrap_or_default() as u64
}
pub fn get_rooted_height(&self) -> u64 {
self.fabric.get_rooted_height().ok().flatten().unwrap_or_default() as u64
}
pub async fn get_entries(&self) -> Vec<(u64, u64, u64)> {
tokio::task::spawn_blocking(|| {
tokio::runtime::Handle::current().block_on(async {
consensus::doms::entry::get_archived_entries().await.unwrap_or_else(|_| {
vec![
(201, 20100123, 1024), (201, 20100456, 2048),
(202, 20200789, 1536),
(202, 20201012, 3072),
(203, 20300345, 2560),
]
})
})
})
.await
.unwrap_or_else(|_| {
vec![
(201, 20100123, 1024),
(201, 20100456, 2048),
(202, 20200789, 1536),
(202, 20201012, 3072),
(203, 20300345, 2560),
]
})
}
pub async fn set_peer_handshake_status(&self, ip: Ipv4Addr, status: HandshakeStatus) -> Result<(), peers::Error> {
self.peers.set_handshake_status(ip, status).await
}
pub async fn update_peer_from_anr(
&self,
ip: Ipv4Addr,
pk: &PublicKey,
version: &Ver,
status: Option<HandshakeStatus>,
) {
self.peers.update_peer_from_anr(ip, pk, version, status).await
}
pub async fn get_all_anrs(&self) -> Vec<anr::Anr> {
self.anrs.get_all().await
}
pub async fn get_anr_by_pk_b58(&self, pk_b58: &str) -> Option<anr::Anr> {
if let Ok(pk_bytes) = bs58::decode(pk_b58).into_vec() {
let pk_array: Result<[u8; 48], _> = pk_bytes.try_into();
if let Ok(pk_array) = pk_array {
let pk = PublicKey::from(pk_array);
return self.get_anr_by_pk(&pk).await;
}
}
None
}
pub async fn get_anr_by_pk(&self, pk: &PublicKey) -> Option<anr::Anr> {
let all_anrs = self.anrs.get_all().await;
all_anrs.into_iter().find(|anr| &anr.pk == pk)
}
pub async fn get_validator_anrs(&self) -> Vec<anr::Anr> {
let all_anrs = self.anrs.get_all().await;
all_anrs.into_iter().filter(|anr| anr.handshaked).collect()
}
pub fn get_entries_by_height(&self, height: u64) -> Result<Vec<consensus::doms::entry::Entry>, Error> {
let entries_raw = self.fabric.entries_by_height(height)?;
entries_raw
.into_iter()
.map(|raw| {
consensus::doms::entry::Entry::from_vecpak_bin(&raw)
.map_err(|_| Error::Other("entry not vecpak in fabric".into()))
})
.collect()
}
pub fn get_temporal_entry(&self) -> Result<Option<consensus::doms::entry::Entry>, Error> {
self.fabric.get_temporal_entry().map_err(Into::into)
}
pub fn get_trainers_for_height(&self, height: u64) -> Option<Vec<PublicKey>> {
self.fabric.trainers_for_height(height)
}
pub fn get_public_key(&self) -> PublicKey {
self.config.get_pk()
}
pub fn get_wallet_balance(&self, public_key: &PublicKey, symbol: &[u8]) -> i128 {
self.fabric.chain_balance_symbol(public_key.as_ref(), symbol)
}
pub fn get_contract_state(&self, contract: &PublicKey, key: &[u8]) -> Option<Vec<u8>> {
use amadeus_utils::constants::CF_CONTRACTSTATE;
let full_key = [b"bic:contract:" as &[u8], contract.as_ref(), b":" as &[u8], key].concat();
self.fabric.db().get(CF_CONTRACTSTATE, &full_key).ok().flatten()
}
pub fn get_chain_diff_bits(&self) -> u32 {
self.fabric.chain_diff_bits() as u32
}
pub fn get_chain_total_sols(&self) -> i128 {
self.fabric.chain_total_sols() as i128
}
pub fn get_all_wallet_balances(&self, public_key: &PublicKey) -> Vec<(Vec<u8>, i128)> {
use amadeus_utils::constants::CF_CONTRACTSTATE;
let prefix = [b"account:" as &[u8], public_key.as_ref() as &[u8], b":balance:" as &[u8]].concat();
self.fabric
.db()
.iter_prefix(CF_CONTRACTSTATE, &prefix)
.unwrap_or_default()
.into_iter()
.filter_map(|(key, value)| {
if key.len() <= prefix.len() {
return None;
}
let symbol = key[prefix.len()..].to_vec();
let amount = std::str::from_utf8(&value).ok()?.parse::<i128>().ok()?;
if amount > 0 { Some((symbol, amount)) } else { None }
})
.collect()
}
pub fn get_entry_by_hash(&self, hash: &Hash) -> Option<consensus::doms::entry::Entry> {
self.fabric.get_entry_by_hash(hash)
}
pub fn db_get(&self, cf: &str, key: &[u8]) -> Result<Option<Vec<u8>>, Error> {
self.fabric.db().get(cf, key).map_err(|e| Error::Other(e.to_string()))
}
pub fn db_iter_prefix(&self, cf: &str, prefix: &[u8]) -> Result<Vec<(Vec<u8>, Vec<u8>)>, Error> {
self.fabric.db().iter_prefix(cf, prefix).map_err(|e| Error::Other(e.to_string()))
}
pub async fn parse_udp(&self, buf: &[u8], src: Ipv4Addr) -> Option<Protocol> {
self.metrics.add_incoming_udp_packet(buf.len());
if !self.anrs.is_within_udp_limit(src).await? {
return None; }
match self.reassembler.add_shard(buf, &self.config.get_sk()).await {
Ok(Some((packet, pk))) => {
match vecpak::from_slice::<Protocol>(&packet) {
Ok(proto) => {
self.peers.update_peer_from_proto(src, proto.typename()).await;
let is_allowed =
matches!(proto, Protocol::NewPhoneWhoDis(_) | Protocol::NewPhoneWhoDisReply(_))
|| self.anrs.handshaked_and_valid_ip4(pk.as_ref(), &src).await;
if !is_allowed {
self.anrs.unset_handshaked(pk.as_ref()).await;
warn!("handshake needed {src}");
return None; }
if !self.anrs.is_within_proto_limit(pk.as_ref(), proto.typename()).await? {
return None; }
self.metrics.add_incoming_proto(proto.typename());
return Some(proto);
}
Err(e) => warn!("parse error: {e} {}", hex::encode(packet)),
}
}
Ok(None) => {} Err(e) => warn!("bad udp frame from {src} - {e}"),
}
None
}
pub async fn handle(&self, message: Protocol, src: Ipv4Addr) -> Result<Vec<Instruction>, Error> {
self.metrics.add_incoming_proto(message.typename());
message.handle(self, src).await.map_err(|e| {
warn!("can't handle {}: {e}", message.typename());
e.into()
})
}
pub async fn execute(&self, instruction: Instruction) -> Result<(), Error> {
let name = instruction.typename();
self.execute_inner(instruction).await.inspect_err(|e| warn!("can't execute {name}: {e}"))
}
pub async fn execute_inner(&self, instruction: Instruction) -> Result<(), Error> {
match instruction {
Instruction::Noop { why } => {
debug!("noop: {why}");
}
Instruction::SendNewPhoneWhoDisReply { dst } => {
let anr = Anr::from_config(&self.config)?;
let reply = Protocol::NewPhoneWhoDisReply(NewPhoneWhoDisReply::new(anr));
self.send_message_to(&reply, dst).await?;
}
Instruction::SendGetPeerAnrsReply { dst, anrs } => {
let peers_v2 = Protocol::GetPeerAnrsReply(GetPeerAnrsReply::new(anrs));
self.send_message_to(&peers_v2, dst).await?;
}
Instruction::SendPingReply { ts_m, dst } => {
let pong = Protocol::PingReply(PingReply::new(ts_m));
self.send_message_to(&pong, dst).await?;
}
Instruction::ValidTxs { txs } => {
info!("received {} valid transactions", txs.len());
}
Instruction::ReceivedEntry { entry: _ } => {
}
Instruction::ReceivedAttestation { attestation } => {
info!("received attestation for entry {:?}", &attestation.entry_hash[..8]);
debug!("Attestation handling not fully implemented yet");
}
Instruction::ReceivedConsensus { consensus } => {
let _ = self.fabric.insert_consensus(&consensus);
}
Instruction::SpecialBusiness { business: _ } => {
info!("received special business");
}
Instruction::SpecialBusinessReply { business: _ } => {
info!("received special business reply");
}
Instruction::SolicitEntry { hash: _ } => {
info!("received solicit entry request");
}
Instruction::SolicitEntry2 => {
info!("received solicit entry2 request");
}
};
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::socket::MockSocket;
#[tokio::test(flavor = "multi_thread")]
async fn tokio_rwlock_allows_concurrent_reads() {
let lock = tokio::sync::RwLock::new(0usize);
let r1 = lock.read().await;
assert_eq!(*r1, 0);
let r2 = lock.try_read();
assert!(r2.is_ok(), "try_read should succeed when another reader holds the lock");
drop(r2);
drop(r1);
let mut w = lock.write().await;
*w += 1;
assert_eq!(*w, 1);
}
#[tokio::test]
async fn test_anr_verification_request_creation() {
use crate::utils::bls12_381 as bls;
use std::net::Ipv4Addr;
let sk = bls::generate_sk();
let pk = bls::get_public_key(&sk).expect("pk");
let pop = bls::sign(&sk, &*pk, consensus::DST_POP).expect("pop");
let config = config::Config {
work_folder: "/tmp/test".to_string(),
version: Ver::new(1, 2, 3),
offline: false,
http_ipv4: Ipv4Addr::new(127, 0, 0, 1),
http_port: 3000,
udp_ipv4: Ipv4Addr::new(127, 0, 0, 1),
udp_port: 36969,
public_ipv4: Some("127.0.0.1".to_string()),
seed_ips: Vec::new(),
seed_anrs: Vec::new(),
other_nodes: Vec::new(),
trust_factor: 0.8,
max_peers: 100,
trainer_sk: sk,
trainer_pk: pk,
trainer_pk_b58: String::new(),
trainer_pop: pop.to_vec(),
archival_node: false,
autoupdate: false,
computor_type: None,
snapshot_height: 0,
anr: None,
anr_desc: None,
anr_name: None,
};
let target_ip = Ipv4Addr::new(127, 0, 0, 1);
let my_anr =
Anr::build(&config.trainer_sk, &config.trainer_pk, &config.trainer_pop, target_ip, Ver::new(1, 0, 0));
assert!(my_anr.is_ok());
}
#[tokio::test]
async fn test_get_random_unverified_anrs() {
let registry = NodeAnrs::new();
let result = registry.get_random_not_verified(3).await;
assert!(result.len() <= 3);
}
#[tokio::test]
async fn test_cleanup_stale_manual_trigger() {
use crate::utils::bls12_381 as bls;
use std::net::Ipv4Addr;
let sk = bls::generate_sk();
let pk = bls::get_public_key(&sk).expect("pk");
let pop = bls::sign(&sk, &*pk, consensus::DST_POP).expect("pop");
let unique_id = format!("{}_{}", std::process::id(), utils::misc::get_unix_nanos_now());
let config = config::Config {
work_folder: format!("/tmp/test_cleanup_{}", unique_id),
version: Ver::new(1, 2, 3),
offline: false,
http_ipv4: Ipv4Addr::new(127, 0, 0, 1),
http_port: 3000,
udp_ipv4: Ipv4Addr::new(127, 0, 0, 1),
udp_port: 36969,
public_ipv4: Some("127.0.0.1".to_string()),
seed_ips: vec!["127.0.0.1".parse().unwrap()],
seed_anrs: Vec::new(),
other_nodes: Vec::new(),
trust_factor: 0.8,
max_peers: 100,
trainer_sk: sk,
trainer_pk: pk,
trainer_pk_b58: String::new(),
trainer_pop: pop.to_vec(),
archival_node: false,
autoupdate: false,
computor_type: None,
snapshot_height: 0,
anr: None,
anr_desc: None,
anr_name: None,
};
let socket = Arc::new(MockSocket::new());
match Context::with_config_and_socket(config, socket).await {
Ok(ctx) => {
ctx.cleanup_task().await;
}
Err(_) => {
}
}
}
#[tokio::test]
async fn test_bootstrap_handshake_manual_trigger() {
use crate::utils::bls12_381 as bls;
use std::net::Ipv4Addr;
let sk = bls::generate_sk();
let pk = bls::get_public_key(&sk).expect("pk");
let pop = bls::sign(&sk, &*pk, consensus::DST_POP).expect("pop");
let work_folder = format!("/tmp/test_bootstrap_{}", std::process::id());
let config = config::Config {
work_folder,
version: Ver::new(1, 2, 3),
offline: false,
http_ipv4: Ipv4Addr::new(127, 0, 0, 1),
http_port: 3000,
udp_ipv4: Ipv4Addr::new(127, 0, 0, 1),
udp_port: 36969,
public_ipv4: Some("127.0.0.1".to_string()),
seed_ips: vec!["127.0.0.1".parse().unwrap()], seed_anrs: Vec::new(),
other_nodes: Vec::new(),
trust_factor: 0.8,
max_peers: 100,
trainer_sk: sk,
trainer_pk: pk,
trainer_pk_b58: String::new(),
trainer_pop: pop.to_vec(),
archival_node: false,
autoupdate: false,
computor_type: None,
snapshot_height: 0,
anr: None,
anr_desc: None,
anr_name: None,
};
let socket = Arc::new(MockSocket::new());
let ctx = Context::with_config_and_socket(config, socket).await.expect("context creation");
match ctx.bootstrap_task().await {
Ok(()) => {
}
Err(_e) => {
}
}
}
#[tokio::test]
async fn test_context_task_tracking() {
use crate::utils::bls12_381 as bls;
use std::net::Ipv4Addr;
let sk = bls::generate_sk();
let pk = bls::get_public_key(&sk).expect("pk");
let pop = bls::sign(&sk, &*pk, consensus::DST_POP).expect("pop");
let unique_id = format!("{}_{}", std::process::id(), utils::misc::get_unix_nanos_now());
let config = config::Config {
work_folder: format!("/tmp/test_tasks_{}", unique_id),
version: Ver::new(1, 2, 3),
offline: false,
http_ipv4: Ipv4Addr::new(127, 0, 0, 1),
http_port: 3000,
udp_ipv4: Ipv4Addr::new(127, 0, 0, 1),
udp_port: 36969,
public_ipv4: Some("127.0.0.1".to_string()),
seed_ips: Vec::new(),
seed_anrs: Vec::new(),
other_nodes: Vec::new(),
trust_factor: 0.8,
max_peers: 100,
trainer_sk: sk,
trainer_pk: pk,
trainer_pk_b58: String::new(),
trainer_pop: pop.to_vec(),
archival_node: false,
autoupdate: false,
computor_type: None,
snapshot_height: 0,
anr: None,
anr_desc: None,
anr_name: None,
};
let socket = Arc::new(MockSocket::new());
let metrics = metrics::Metrics::new();
let node_peers = peers::NodePeers::default();
let node_anrs = crate::node::anr::NodeAnrs::new();
let reassembler = node::ReedSolomonReassembler::new();
let fabric = crate::consensus::fabric::Fabric::new(&config.get_root()).await.unwrap();
let ctx = Context { config, metrics, reassembler, peers: node_peers, anrs: node_anrs, fabric, socket };
let snapshot = ctx.get_metrics_snapshot();
assert_eq!(snapshot.tasks, 0);
ctx.inc_tasks();
ctx.inc_tasks();
let snapshot = ctx.get_metrics_snapshot();
assert_eq!(snapshot.tasks, 2);
ctx.dec_tasks();
let snapshot = ctx.get_metrics_snapshot();
assert_eq!(snapshot.tasks, 1);
}
#[tokio::test]
async fn test_context_convenience_socket_functions() {
use std::sync::Arc;
use crate::utils::bls12_381 as bls;
use std::net::Ipv4Addr;
let sk = bls::generate_sk();
let pk = bls::get_public_key(&sk).expect("pk");
let pop = bls::sign(&sk, &*pk, consensus::DST_POP).expect("pop");
let unique_id = format!("{}_{}", std::process::id(), utils::misc::get_unix_nanos_now());
let config = config::Config {
work_folder: format!("/tmp/test_convenience_{}", unique_id),
version: Ver::new(1, 2, 3),
offline: false,
http_ipv4: Ipv4Addr::new(127, 0, 0, 1),
http_port: 3000,
udp_ipv4: Ipv4Addr::new(127, 0, 0, 1),
udp_port: 36969,
public_ipv4: Some("127.0.0.1".to_string()),
seed_ips: Vec::new(),
seed_anrs: Vec::new(),
other_nodes: Vec::new(),
trust_factor: 0.8,
max_peers: 100,
trainer_sk: sk,
trainer_pk: pk,
trainer_pk_b58: String::new(),
trainer_pop: pop.to_vec(),
archival_node: false,
autoupdate: false,
computor_type: None,
snapshot_height: 0,
anr: None,
anr_desc: None,
anr_name: None,
};
let socket = Arc::new(MockSocket::new());
match Context::with_config_and_socket(config, socket).await {
Ok(context) => {
let mut buf = [0u8; 1024];
let target: Ipv4Addr = "127.0.0.1".parse().unwrap();
let pong = Protocol::PingReply(PingReply { ts_m: 1234567890, seen_time: 1234567890123 });
match context.send_message_to(&pong, target).await {
Ok(_) => {
}
Err(_) => {
}
}
match context.recv_from(&mut buf).await {
Ok(_) => {
}
Err(_) => {
}
}
}
Err(_) => {
}
}
}
}