use clap::{value_parser, Arg, Command};
use commonware_bridge::{
types::{
block::BlockFormat,
inbound::{self, Inbound},
outbound::Outbound,
},
APPLICATION_NAMESPACE, CONSENSUS_SUFFIX, INDEXER_NAMESPACE,
};
use commonware_codec::{DecodeExt, Encode};
use commonware_consensus::{
simplex::{scheme::bls12381_threshold::standard as bls12381_threshold, types::Finalization},
types::View,
Viewable,
};
use commonware_cryptography::{
bls12381::primitives::{
group::G2,
variant::{MinSig, Variant},
},
ed25519::{self, PublicKey},
sha256::Digest as Sha256Digest,
Digest, Hasher, Sha256, Signer as _,
};
use commonware_parallel::Sequential;
use commonware_runtime::{tokio, Listener, Metrics, Network, Runner, Spawner};
use commonware_stream::encrypted::{listen, Config as StreamConfig};
use commonware_utils::{
channel::{mpsc, oneshot},
from_hex,
ordered::Set,
union, TryCollect,
};
use std::{
collections::{BTreeMap, HashMap},
net::{IpAddr, Ipv4Addr, SocketAddr},
time::Duration,
};
use tracing::{debug, info};
type Scheme = bls12381_threshold::Scheme<PublicKey, MinSig>;
#[allow(clippy::large_enum_variant)]
enum Message<D: Digest> {
PutBlock {
incoming: inbound::PutBlock<D>,
response: oneshot::Sender<bool>, },
GetBlock {
incoming: inbound::GetBlock<D>,
response: oneshot::Sender<Option<BlockFormat<D>>>,
},
PutFinalization {
incoming: inbound::PutFinalization<D>,
response: oneshot::Sender<bool>, },
GetFinalization {
incoming: inbound::GetFinalization,
response: oneshot::Sender<Option<Finalization<Scheme, D>>>,
},
}
fn main() {
let matches = Command::new("indexer")
.about("collect blocks and finality certificates")
.arg(Arg::new("me").long("me").required(true))
.arg(
Arg::new("participants")
.long("participants")
.required(true)
.value_delimiter(',')
.value_parser(value_parser!(u64))
.help("All participants"),
)
.arg(
Arg::new("networks")
.long("networks")
.required(true)
.value_delimiter(',')
.value_parser(value_parser!(String))
.help("All networks"),
)
.get_matches();
tracing_subscriber::fmt()
.with_max_level(tracing::Level::DEBUG)
.init();
let me = matches
.get_one::<String>("me")
.expect("Please provide identity");
let parts = me.split('@').collect::<Vec<&str>>();
if parts.len() != 2 {
panic!("Identity not well-formed");
}
let key = parts[0].parse::<u64>().expect("Key not well-formed");
let signer = ed25519::PrivateKey::from_seed(key);
tracing::info!(key = ?signer.public_key(), "loaded signer");
let port = parts[1].parse::<u16>().expect("Port not well-formed");
tracing::info!(port, "loaded port");
let socket = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), port);
let participants = matches
.get_many::<u64>("participants")
.expect("Please provide allowed keys")
.copied();
if participants.len() == 0 {
panic!("Please provide at least one participant");
}
let validators: Set<_> = participants
.into_iter()
.map(|peer| {
let verifier = ed25519::PrivateKey::from_seed(peer).public_key();
tracing::info!(key = ?verifier, "registered authorized key");
verifier
})
.try_collect()
.expect("public keys are unique");
let mut verifiers: HashMap<G2, Scheme> = HashMap::new();
let mut blocks: HashMap<G2, HashMap<Sha256Digest, BlockFormat<Sha256Digest>>> = HashMap::new();
let mut finalizations: HashMap<G2, BTreeMap<View, Finalization<Scheme, Sha256Digest>>> =
HashMap::new();
let networks = matches
.get_many::<String>("networks")
.expect("Please provide networks");
if networks.len() == 0 {
panic!("Please provide at least one network");
}
let executor = tokio::Runner::default();
executor.start(|context| async move {
for network in networks {
let network = from_hex(network).expect("Network not well-formed");
let public = <MinSig as Variant>::Public::decode(network.as_ref())
.expect("Network not well-formed");
let namespace = union(APPLICATION_NAMESPACE, CONSENSUS_SUFFIX);
verifiers.insert(
public,
bls12381_threshold::Scheme::certificate_verifier(&namespace, public),
);
blocks.insert(public, HashMap::new());
finalizations.insert(public, BTreeMap::new());
}
let (handler, mut receiver) = mpsc::unbounded_channel();
let mut hasher = Sha256::new();
context.with_label("handler").spawn(|mut ctx| async move {
while let Some(msg) = receiver.recv().await {
match msg {
Message::PutBlock { incoming, response } => {
let Some(network) = blocks.get_mut(&incoming.network) else {
let _ = response.send(false);
continue;
};
hasher.update(&incoming.block.encode());
let digest = hasher.finalize();
network.insert(digest, incoming.block);
let _ = response.send(true);
info!(
network = ?incoming.network,
block = ?digest,
"stored block"
);
}
Message::GetBlock { incoming, response } => {
let Some(network) = blocks.get(&incoming.network) else {
let _ = response.send(None);
continue;
};
let data = network.get(&incoming.digest);
let _ = response.send(data.cloned());
}
Message::PutFinalization { incoming, response } => {
let Some(network) = finalizations.get_mut(&incoming.network) else {
let _ = response.send(false);
continue;
};
let Some(verifier) = verifiers.get(&incoming.network) else {
let _ = response.send(false);
continue;
};
if !incoming
.finalization
.verify(&mut ctx, verifier, &Sequential)
{
let _ = response.send(false);
continue;
}
let view = incoming.finalization.view();
network.insert(view, incoming.finalization);
let _ = response.send(true);
info!(
network = ?incoming.network,
%view,
"stored finalization"
);
}
Message::GetFinalization { incoming, response } => {
let Some(network) = finalizations.get(&incoming.network) else {
let _ = response.send(None);
continue;
};
let Some(data) = network.iter().next_back().map(|(_, data)| data.clone())
else {
let _ = response.send(None);
continue;
};
let _ = response.send(Some(data));
}
}
}
});
let mut listener = context.bind(socket).await.expect("failed to bind listener");
let config = StreamConfig {
signing_key: signer,
namespace: INDEXER_NAMESPACE.to_vec(),
max_message_size: 1024 * 1024,
synchrony_bound: Duration::from_secs(1),
max_handshake_age: Duration::from_secs(60),
handshake_timeout: Duration::from_secs(5),
};
loop {
let Ok((_, sink, stream)) = listener.accept().await else {
debug!("failed to accept connection");
continue;
};
let (peer, mut sender, mut receiver) = match listen(
context.with_label("listener"),
|peer| {
let out = validators.position(&peer).is_some();
async move { out }
},
config.clone(),
stream,
sink,
)
.await
{
Ok(x) => x,
Err(e) => {
debug!(error = ?e, "failed to upgrade connection");
continue;
}
};
info!(?peer, "upgraded connection");
context.with_label("connection").spawn({
let handler = handler.clone();
move |_| async move {
while let Ok(msg) = receiver.recv().await {
let msg = match Inbound::decode(msg) {
Ok(msg) => msg,
Err(err) => {
debug!(?err, ?peer, "failed to decode message");
return;
}
};
match msg {
Inbound::PutBlock(msg) => {
let (response, receiver) = oneshot::channel();
handler
.send(Message::PutBlock {
incoming: msg,
response,
})
.expect("failed to send message");
let success = receiver.await.expect("failed to receive response");
let msg = Outbound::<Sha256Digest>::Success(success).encode();
if sender.send(msg).await.is_err() {
debug!(?peer, "failed to send message");
return;
}
}
Inbound::GetBlock(msg) => {
let (response, receiver) = oneshot::channel();
handler
.send(Message::GetBlock {
incoming: msg,
response,
})
.expect("failed to send message");
let response = receiver.await.expect("failed to receive response");
match response {
Some(block) => {
let msg = Outbound::Block(block).encode();
if sender.send(msg).await.is_err() {
debug!(?peer, "failed to send message");
return;
}
}
None => {
let msg = Outbound::<Sha256Digest>::Success(false).encode();
if sender.send(msg).await.is_err() {
debug!(?peer, "failed to send message");
return;
}
}
}
}
Inbound::PutFinalization(msg) => {
let (response, receiver) = oneshot::channel();
handler
.send(Message::PutFinalization {
incoming: msg,
response,
})
.expect("failed to send message");
let success = receiver.await.expect("failed to receive response");
let msg = Outbound::<Sha256Digest>::Success(success).encode();
if sender.send(msg).await.is_err() {
debug!(?peer, "failed to send message");
return;
}
}
Inbound::GetFinalization(msg) => {
let (response, receiver) = oneshot::channel();
handler
.send(Message::GetFinalization {
incoming: msg,
response,
})
.expect("failed to send message");
let response = receiver.await.expect("failed to receive response");
match response {
Some(data) => {
let msg = Outbound::Finalization(data).encode();
if sender.send(msg).await.is_err() {
debug!(?peer, "failed to send message");
return;
}
}
None => {
let msg = Outbound::<Sha256Digest>::Success(false).encode();
if sender.send(msg).await.is_err() {
debug!(?peer, "failed to send message");
return;
}
}
}
}
}
}
}
});
}
});
}