use clap::{value_parser, Arg, Command};
use commonware_bridge::{
application, APPLICATION_NAMESPACE, CONSENSUS_SUFFIX, INDEXER_NAMESPACE, P2P_SUFFIX,
};
use commonware_codec::{Decode, DecodeExt};
use commonware_consensus::{
simplex::{self, elector::RoundRobin, scheme::bls12381_threshold::standard::Scheme, Engine},
types::{Epoch, ViewDelta},
};
use commonware_cryptography::{
bls12381::primitives::{
group,
sharing::{ModeVersion, Sharing},
variant::{MinSig, Variant},
},
ed25519, Sha256, Signer as _,
};
use commonware_p2p::{authenticated, Manager as _};
use commonware_runtime::{
buffer::paged::CacheRef, tokio, Metrics, Network, Quota, Runner, ThreadPooler,
};
use commonware_stream::encrypted::{dial, Config as StreamConfig};
use commonware_utils::{from_hex, ordered::Set, union, NZUsize, TryCollect, NZU16, NZU32};
use std::{
net::{IpAddr, Ipv4Addr, SocketAddr},
str::FromStr,
time::Duration,
};
fn main() {
let matches = Command::new("validator")
.about("produce finality certificates and verify external finality certificates")
.arg(
Arg::new("bootstrappers")
.long("bootstrappers")
.required(false)
.value_delimiter(',')
.value_parser(value_parser!(String)),
)
.arg(Arg::new("me").long("me").required(true))
.arg(
Arg::new("participants")
.long("participants")
.required(true)
.value_delimiter(',')
.value_parser(value_parser!(u64))
.help("All participants"),
)
.arg(Arg::new("storage-dir").long("storage-dir").required(true))
.arg(Arg::new("indexer").long("indexer").required(true))
.arg(Arg::new("identity").long("identity").required(true))
.arg(Arg::new("share").long("share").required(true))
.arg(Arg::new("other-public").long("other-public").required(true))
.get_matches();
tracing_subscriber::fmt()
.with_max_level(tracing::Level::DEBUG)
.init();
let me = matches
.get_one::<String>("me")
.expect("Please provide identity");
let parts = me.split('@').collect::<Vec<&str>>();
if parts.len() != 2 {
panic!("Identity not well-formed");
}
let key = parts[0].parse::<u64>().expect("Key not well-formed");
let signer = ed25519::PrivateKey::from_seed(key);
tracing::info!(key = ?signer.public_key(), "loaded signer");
let port = parts[1].parse::<u16>().expect("Port not well-formed");
tracing::info!(port, "loaded port");
let participants = matches
.get_many::<u64>("participants")
.expect("Please provide allowed keys")
.copied();
if participants.len() == 0 {
panic!("Please provide at least one participant");
}
let validators: Set<_> = participants
.into_iter()
.map(|peer| {
let verifier = ed25519::PrivateKey::from_seed(peer).public_key();
tracing::info!(key = ?verifier, "registered authorized key");
verifier
})
.try_collect()
.expect("public keys are unique");
let bootstrappers = matches.get_many::<String>("bootstrappers");
let mut bootstrapper_identities = Vec::new();
if let Some(bootstrappers) = bootstrappers {
for bootstrapper in bootstrappers {
let parts = bootstrapper.split('@').collect::<Vec<&str>>();
let bootstrapper_key = parts[0]
.parse::<u64>()
.expect("Bootstrapper key not well-formed");
let verifier = ed25519::PrivateKey::from_seed(bootstrapper_key).public_key();
let bootstrapper_address =
SocketAddr::from_str(parts[1]).expect("Bootstrapper address not well-formed");
bootstrapper_identities.push((verifier, bootstrapper_address.into()));
}
}
let storage_directory = matches
.get_one::<String>("storage-dir")
.expect("Please provide storage directory");
let identity = matches
.get_one::<String>("identity")
.expect("Please provide identity");
let identity = from_hex(identity).expect("Identity not well-formed");
let identity: Sharing<MinSig> = Sharing::decode_cfg(
identity.as_ref(),
&(NZU32!(validators.len() as u32), ModeVersion::v0()),
)
.expect("Identity not well-formed");
let share = matches
.get_one::<String>("share")
.expect("Please provide share");
let share = from_hex(share).expect("Share not well-formed");
let share = group::Share::decode(share.as_ref()).expect("Share not well-formed");
let indexer = matches
.get_one::<String>("indexer")
.expect("Please provide indexer");
let parts = indexer.split('@').collect::<Vec<&str>>();
let indexer_key = parts[0]
.parse::<u64>()
.expect("Indexer key not well-formed");
let indexer = ed25519::PrivateKey::from_seed(indexer_key).public_key();
let indexer_address = SocketAddr::from_str(parts[1]).expect("Indexer address not well-formed");
let other_public = matches
.get_one::<String>("other-public")
.expect("Please provide other public");
let other_public = from_hex(other_public).expect("Other identity not well-formed");
let other_public = <MinSig as Variant>::Public::decode(other_public.as_ref())
.expect("Other identity not well-formed");
let runtime_cfg = tokio::Config::new().with_storage_directory(storage_directory);
let executor = tokio::Runner::new(runtime_cfg);
let indexer_cfg = StreamConfig {
signing_key: signer.clone(),
namespace: INDEXER_NAMESPACE.to_vec(),
max_message_size: 1024 * 1024,
synchrony_bound: Duration::from_secs(1),
max_handshake_age: Duration::from_secs(60),
handshake_timeout: Duration::from_secs(5),
};
let p2p_cfg = authenticated::discovery::Config::local(
signer,
&union(APPLICATION_NAMESPACE, P2P_SUFFIX),
SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), port),
SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), port),
bootstrapper_identities.clone(),
1024 * 1024, );
executor.start(|context| async move {
let (sink, stream) = context
.dial(indexer_address)
.await
.expect("Failed to dial indexer");
let indexer = dial(
context.with_label("dialer"),
indexer_cfg,
indexer,
stream,
sink,
)
.await
.expect("Failed to upgrade connection with indexer");
let (mut network, mut oracle) =
authenticated::discovery::Network::new(context.with_label("network"), p2p_cfg);
oracle.track(0, validators.clone()).await;
let (vote_sender, vote_receiver) = network.register(
0,
Quota::per_second(NZU32!(10)),
256, );
let (certificate_sender, certificate_receiver) = network.register(
1,
Quota::per_second(NZU32!(10)),
256, );
let (resolver_sender, resolver_receiver) = network.register(
2,
Quota::per_second(NZU32!(10)),
256, );
let strategy = context.clone().create_strategy(NZUsize!(2)).unwrap();
let consensus_namespace = union(APPLICATION_NAMESPACE, CONSENSUS_SUFFIX);
let this_network =
Scheme::signer(&consensus_namespace, validators.clone(), identity, share)
.expect("share must be in participants");
let other_network = Scheme::certificate_verifier(&consensus_namespace, other_public);
let (application, scheme, mailbox) = application::Application::new(
context.with_label("application"),
application::Config {
indexer,
hasher: Sha256::default(),
this_network,
other_network,
mailbox_size: 1024,
},
);
let engine = Engine::new(
context.with_label("engine"),
simplex::Config {
scheme,
elector: RoundRobin::<Sha256>::default(),
blocker: oracle,
automaton: mailbox.clone(),
relay: mailbox.clone(),
reporter: mailbox.clone(),
partition: String::from("log"),
mailbox_size: 1024,
epoch: Epoch::zero(),
replay_buffer: NZUsize!(1024 * 1024),
write_buffer: NZUsize!(1024 * 1024),
leader_timeout: Duration::from_secs(1),
certification_timeout: Duration::from_secs(2),
timeout_retry: Duration::from_secs(10),
fetch_timeout: Duration::from_secs(1),
activity_timeout: ViewDelta::new(10),
skip_timeout: ViewDelta::new(5),
fetch_concurrent: 32,
page_cache: CacheRef::from_pooler(&context, NZU16!(16_384), NZUsize!(10_000)),
strategy,
forwarding: simplex::ForwardingPolicy::Disabled,
},
);
network.start();
engine.start(
(vote_sender, vote_receiver),
(certificate_sender, certificate_receiver),
(resolver_sender, resolver_receiver),
);
application.run().await;
});
}