use battleware_client::Client;
use battleware_node::{engine, Config, Peers};
use battleware_types::NAMESPACE;
use clap::{Arg, Command};
use commonware_codec::{Decode, DecodeExt};
use commonware_cryptography::{
bls12381::primitives::{group, poly, variant::MinSig},
ed25519::{PrivateKey, PublicKey},
Signer,
};
use commonware_deployer::ec2::Hosts;
use commonware_p2p::authenticated::discovery as authenticated;
use commonware_runtime::{tokio, Metrics, Runner};
use commonware_utils::{from_hex_formatted, quorum, union_unique, NZUsize};
use futures::future::try_join_all;
use governor::Quota;
use std::{
collections::HashMap,
net::{IpAddr, Ipv4Addr, SocketAddr},
num::{NonZeroU32, NonZeroUsize},
path::PathBuf,
str::FromStr,
time::Duration,
};
use tracing::{error, info, Level};
const PENDING_CHANNEL: u32 = 0;
const RECOVERED_CHANNEL: u32 = 1;
const RESOLVER_CHANNEL: u32 = 2;
const BROADCASTER_CHANNEL: u32 = 3;
const BACKFILL_BY_DIGEST_CHANNEL: u32 = 4;
const SEEDER_CHANNEL: u32 = 5;
const AGGREGATOR_CHANNEL: u32 = 6;
const AGGREGATION_CHANNEL: u32 = 7;
const LEADER_TIMEOUT: Duration = Duration::from_secs(1);
const NOTARIZATION_TIMEOUT: Duration = Duration::from_secs(2);
const NULLIFY_RETRY: Duration = Duration::from_secs(10);
const ACTIVITY_TIMEOUT: u64 = 256;
const SKIP_TIMEOUT: u64 = 32;
const FETCH_TIMEOUT: Duration = Duration::from_secs(2);
const FETCH_CONCURRENT: usize = 16;
const MAX_MESSAGE_SIZE: usize = 10 * 1024 * 1024; const MAX_FETCH_COUNT: usize = 16;
const MAX_FETCH_SIZE: usize = 1024 * 1024; const BLOCKS_FREEZER_TABLE_INITIAL_SIZE: u32 = 2u32.pow(21); const FINALIZED_FREEZER_TABLE_INITIAL_SIZE: u32 = 2u32.pow(21); const BUFFER_POOL_PAGE_SIZE: NonZeroUsize = NZUsize!(4_096); const BUFFER_POOL_CAPACITY: NonZeroUsize = NZUsize!(32_768); const MAX_UPLOADS_OUTSTANDING: usize = 4;
fn main() {
let matches = Command::new("node")
.about("Node for a battleware chain.")
.arg(Arg::new("hosts").long("hosts").required(false))
.arg(Arg::new("peers").long("peers").required(false))
.arg(Arg::new("config").long("config").required(true))
.get_matches();
let hosts_file = matches.get_one::<String>("hosts");
let peers_file = matches.get_one::<String>("peers");
assert!(
hosts_file.is_some() || peers_file.is_some(),
"Either --hosts or --peers must be provided"
);
let config_file = matches.get_one::<String>("config").unwrap();
let config_file = std::fs::read_to_string(config_file).expect("Could not read config file");
let config: Config = serde_yaml::from_str(&config_file).expect("Could not parse config file");
let key = from_hex_formatted(&config.private_key).expect("Could not parse private key");
let signer = PrivateKey::decode(key.as_ref()).expect("Private key is invalid");
let public_key = signer.public_key();
let cfg = tokio::Config::default()
.with_tcp_nodelay(Some(true))
.with_worker_threads(config.worker_threads)
.with_storage_directory(PathBuf::from(config.directory))
.with_catch_panics(true);
let executor = tokio::Runner::new(cfg);
executor.start(|context| async move {
let log_level = Level::from_str(&config.log_level).expect("Invalid log level");
tokio::telemetry::init(
context.with_label("telemetry"),
tokio::telemetry::Logging {
level: log_level,
json: hosts_file.is_some(),
},
Some(SocketAddr::new(
IpAddr::V4(Ipv4Addr::UNSPECIFIED),
config.metrics_port,
)),
None,
);
let (ip, peers, bootstrappers) = if let Some(hosts_file) = hosts_file {
let hosts_file = std::fs::read_to_string(hosts_file).unwrap();
let hosts: Hosts =
serde_yaml::from_str(&hosts_file).expect("Could not parse peers file");
let peers: HashMap<PublicKey, IpAddr> = hosts
.hosts
.into_iter()
.filter_map(|peer| {
match from_hex_formatted(&peer.name)
.and_then(|key| PublicKey::decode(key.as_ref()).ok())
{
Some(key) => Some((key, peer.ip)),
None => {
info!(name = peer.name, "Skipping non-peer host");
None
}
}
})
.collect();
let peer_keys = peers.keys().cloned().collect::<Vec<_>>();
let mut bootstrappers = Vec::new();
for bootstrapper in &config.bootstrappers {
let key =
from_hex_formatted(bootstrapper).expect("Could not parse bootstrapper key");
let key = PublicKey::decode(key.as_ref()).expect("Bootstrapper key is invalid");
let ip = peers.get(&key).expect("Could not find bootstrapper in IPs");
let bootstrapper_socket = format!("{}:{}", ip, config.port);
let bootstrapper_socket = SocketAddr::from_str(&bootstrapper_socket)
.expect("Could not parse bootstrapper socket");
bootstrappers.push((key, bootstrapper_socket));
}
let ip = peers.get(&public_key).expect("Could not find self in IPs");
(*ip, peer_keys, bootstrappers)
} else {
let peers_file = std::fs::read_to_string(peers_file.unwrap()).unwrap();
let peers: Peers =
serde_yaml::from_str(&peers_file).expect("Could not parse peers file");
let peers: HashMap<PublicKey, SocketAddr> = peers
.addresses
.into_iter()
.filter_map(|peer| {
match from_hex_formatted(&peer.0)
.and_then(|key| PublicKey::decode(key.as_ref()).ok())
{
Some(key) => Some((key, peer.1)),
None => {
info!(name = peer.0, "Skipping non-peer address");
None
}
}
})
.collect();
let peer_keys = peers.keys().cloned().collect::<Vec<_>>();
let mut bootstrappers = Vec::new();
for bootstrapper in &config.bootstrappers {
let key =
from_hex_formatted(bootstrapper).expect("Could not parse bootstrapper key");
let key = PublicKey::decode(key.as_ref()).expect("Bootstrapper key is invalid");
let socket = peers.get(&key).expect("Could not find bootstrapper in IPs");
bootstrappers.push((key, *socket));
}
let ip = peers
.get(&public_key)
.expect("Could not find self in IPs")
.ip();
(ip, peer_keys, bootstrappers)
};
info!(peers = peers.len(), "loaded peers");
let peers_u32 = peers.len() as u32;
let share = from_hex_formatted(&config.share).expect("Could not parse share");
let share = group::Share::decode(share.as_ref()).expect("Share is invalid");
let threshold = quorum(peers_u32);
let polynomial =
from_hex_formatted(&config.polynomial).expect("Could not parse polynomial");
let polynomial =
poly::Public::<MinSig>::decode_cfg(polynomial.as_ref(), &(threshold as usize))
.expect("polynomial is invalid");
let identity = *poly::public::<MinSig>(&polynomial);
info!(
?public_key,
?identity,
?ip,
port = config.port,
"loaded config"
);
let p2p_namespace = union_unique(NAMESPACE, b"_P2P");
let mut p2p_cfg = authenticated::Config::aggressive(
signer.clone(),
&p2p_namespace,
SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), config.port),
SocketAddr::new(ip, config.port),
bootstrappers,
MAX_MESSAGE_SIZE,
);
p2p_cfg.mailbox_size = config.mailbox_size;
let (mut network, mut oracle) =
authenticated::Network::new(context.with_label("network"), p2p_cfg);
oracle.register(0, peers.clone()).await;
let pending_limit = Quota::per_second(NonZeroU32::new(128).unwrap());
let pending = network.register(PENDING_CHANNEL, pending_limit, config.message_backlog);
let recovered_limit = Quota::per_second(NonZeroU32::new(128).unwrap());
let recovered =
network.register(RECOVERED_CHANNEL, recovered_limit, config.message_backlog);
let resolver_limit = Quota::per_second(NonZeroU32::new(128).unwrap());
let resolver = network.register(RESOLVER_CHANNEL, resolver_limit, config.message_backlog);
let broadcaster_limit = Quota::per_second(NonZeroU32::new(8).unwrap());
let broadcaster = network.register(
BROADCASTER_CHANNEL,
broadcaster_limit,
config.message_backlog,
);
let backfill_quota = Quota::per_second(NonZeroU32::new(8).unwrap());
let backfill = network.register(
BACKFILL_BY_DIGEST_CHANNEL,
backfill_quota,
config.message_backlog,
);
let seeder = network.register(SEEDER_CHANNEL, backfill_quota, config.message_backlog);
let aggregator =
network.register(AGGREGATOR_CHANNEL, backfill_quota, config.message_backlog);
let aggregation_quota = Quota::per_second(NonZeroU32::new(128).unwrap());
let aggregation = network.register(
AGGREGATION_CHANNEL,
aggregation_quota,
config.message_backlog,
);
let p2p = network.start();
let indexer = Client::new(&config.indexer, identity);
let config = engine::Config {
blocker: oracle,
partition_prefix: "engine".to_string(),
blocks_freezer_table_initial_size: BLOCKS_FREEZER_TABLE_INITIAL_SIZE,
finalized_freezer_table_initial_size: FINALIZED_FREEZER_TABLE_INITIAL_SIZE,
signer,
polynomial,
share,
participants: peers,
mailbox_size: config.mailbox_size,
deque_size: config.deque_size,
backfill_quota,
leader_timeout: LEADER_TIMEOUT,
notarization_timeout: NOTARIZATION_TIMEOUT,
nullify_retry: NULLIFY_RETRY,
activity_timeout: ACTIVITY_TIMEOUT,
skip_timeout: SKIP_TIMEOUT,
fetch_timeout: FETCH_TIMEOUT,
max_fetch_count: MAX_FETCH_COUNT,
max_fetch_size: MAX_FETCH_SIZE,
fetch_concurrent: FETCH_CONCURRENT,
fetch_rate_per_peer: resolver_limit,
buffer_pool_page_size: BUFFER_POOL_PAGE_SIZE,
buffer_pool_capacity: BUFFER_POOL_CAPACITY,
indexer,
execution_concurrency: config.execution_concurrency,
max_uploads_outstanding: MAX_UPLOADS_OUTSTANDING,
};
let engine = engine::Engine::new(context.with_label("engine"), config).await;
let engine = engine.start(
pending,
recovered,
resolver,
broadcaster,
backfill,
seeder,
aggregator,
aggregation,
);
if let Err(e) = try_join_all(vec![p2p, engine]).await {
error!(?e, "task failed");
}
});
}