commonware-bridge 2026.4.0

Send succinct consensus certificates between two networks.
Documentation
use clap::{value_parser, Arg, Command};
use commonware_bridge::{
    types::{
        block::BlockFormat,
        inbound::{self, Inbound},
        outbound::Outbound,
    },
    APPLICATION_NAMESPACE, CONSENSUS_SUFFIX, INDEXER_NAMESPACE,
};
use commonware_codec::{DecodeExt, Encode};
use commonware_consensus::{
    simplex::{scheme::bls12381_threshold::standard as bls12381_threshold, types::Finalization},
    types::View,
    Viewable,
};
use commonware_cryptography::{
    bls12381::primitives::{
        group::G2,
        variant::{MinSig, Variant},
    },
    ed25519::{self, PublicKey},
    sha256::Digest as Sha256Digest,
    Digest, Hasher, Sha256, Signer as _,
};
use commonware_parallel::Sequential;
use commonware_runtime::{tokio, Listener, Metrics, Network, Runner, Spawner};
use commonware_stream::encrypted::{listen, Config as StreamConfig};
use commonware_utils::{
    channel::{mpsc, oneshot},
    from_hex,
    ordered::Set,
    union, TryCollect,
};
use std::{
    collections::{BTreeMap, HashMap},
    net::{IpAddr, Ipv4Addr, SocketAddr},
    time::Duration,
};
use tracing::{debug, info};

type Scheme = bls12381_threshold::Scheme<PublicKey, MinSig>;

#[allow(clippy::large_enum_variant)]
enum Message<D: Digest> {
    PutBlock {
        incoming: inbound::PutBlock<D>,
        response: oneshot::Sender<bool>, // wait to broadcast consensus message
    },
    GetBlock {
        incoming: inbound::GetBlock<D>,
        response: oneshot::Sender<Option<BlockFormat<D>>>,
    },
    PutFinalization {
        incoming: inbound::PutFinalization<D>,
        response: oneshot::Sender<bool>, // wait to delete from validator storage
    },
    GetFinalization {
        incoming: inbound::GetFinalization,
        response: oneshot::Sender<Option<Finalization<Scheme, D>>>,
    },
}

fn main() {
    // Parse arguments
    let matches = Command::new("indexer")
        .about("collect blocks and finality certificates")
        .arg(Arg::new("me").long("me").required(true))
        .arg(
            Arg::new("participants")
                .long("participants")
                .required(true)
                .value_delimiter(',')
                .value_parser(value_parser!(u64))
                .help("All participants"),
        )
        .arg(
            Arg::new("networks")
                .long("networks")
                .required(true)
                .value_delimiter(',')
                .value_parser(value_parser!(String))
                .help("All networks"),
        )
        .get_matches();

    // Create logger
    tracing_subscriber::fmt()
        .with_max_level(tracing::Level::DEBUG)
        .init();

    // Configure my identity
    let me = matches
        .get_one::<String>("me")
        .expect("Please provide identity");
    let parts = me.split('@').collect::<Vec<&str>>();
    if parts.len() != 2 {
        panic!("Identity not well-formed");
    }
    let key = parts[0].parse::<u64>().expect("Key not well-formed");
    let signer = ed25519::PrivateKey::from_seed(key);
    tracing::info!(key = ?signer.public_key(), "loaded signer");

    // Configure my port
    let port = parts[1].parse::<u16>().expect("Port not well-formed");
    tracing::info!(port, "loaded port");
    let socket = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), port);

    // Configure allowed peers
    let participants = matches
        .get_many::<u64>("participants")
        .expect("Please provide allowed keys")
        .copied();
    if participants.len() == 0 {
        panic!("Please provide at least one participant");
    }
    let validators: Set<_> = participants
        .into_iter()
        .map(|peer| {
            let verifier = ed25519::PrivateKey::from_seed(peer).public_key();
            tracing::info!(key = ?verifier, "registered authorized key");
            verifier
        })
        .try_collect()
        .expect("public keys are unique");

    // Configure networks
    let mut verifiers: HashMap<G2, Scheme> = HashMap::new();
    let mut blocks: HashMap<G2, HashMap<Sha256Digest, BlockFormat<Sha256Digest>>> = HashMap::new();
    let mut finalizations: HashMap<G2, BTreeMap<View, Finalization<Scheme, Sha256Digest>>> =
        HashMap::new();
    let networks = matches
        .get_many::<String>("networks")
        .expect("Please provide networks");
    if networks.len() == 0 {
        panic!("Please provide at least one network");
    }

    // Create context
    let executor = tokio::Runner::default();
    executor.start(|context| async move {
        for network in networks {
            let network = from_hex(network).expect("Network not well-formed");
            let public = <MinSig as Variant>::Public::decode(network.as_ref())
                .expect("Network not well-formed");
            let namespace = union(APPLICATION_NAMESPACE, CONSENSUS_SUFFIX);
            verifiers.insert(
                public,
                bls12381_threshold::Scheme::certificate_verifier(&namespace, public),
            );
            blocks.insert(public, HashMap::new());
            finalizations.insert(public, BTreeMap::new());
        }

        // Create message handler
        let (handler, mut receiver) = mpsc::unbounded_channel();

        // Start handler
        let mut hasher = Sha256::new();
        context.with_label("handler").spawn(|mut ctx| async move {
            while let Some(msg) = receiver.recv().await {
                match msg {
                    Message::PutBlock { incoming, response } => {
                        // Ensure we care
                        let Some(network) = blocks.get_mut(&incoming.network) else {
                            let _ = response.send(false);
                            continue;
                        };

                        // Compute digest
                        hasher.update(&incoming.block.encode());
                        let digest = hasher.finalize();

                        // Store block
                        network.insert(digest, incoming.block);
                        let _ = response.send(true);
                        info!(
                            network = ?incoming.network,
                            block = ?digest,
                            "stored block"
                        );
                    }
                    Message::GetBlock { incoming, response } => {
                        let Some(network) = blocks.get(&incoming.network) else {
                            let _ = response.send(None);
                            continue;
                        };
                        let data = network.get(&incoming.digest);
                        let _ = response.send(data.cloned());
                    }
                    Message::PutFinalization { incoming, response } => {
                        // Ensure we care
                        let Some(network) = finalizations.get_mut(&incoming.network) else {
                            let _ = response.send(false);
                            continue;
                        };

                        // Verify signature
                        let Some(verifier) = verifiers.get(&incoming.network) else {
                            let _ = response.send(false);
                            continue;
                        };
                        if !incoming
                            .finalization
                            .verify(&mut ctx, verifier, &Sequential)
                        {
                            let _ = response.send(false);
                            continue;
                        }

                        // Store finalization
                        let view = incoming.finalization.view();
                        network.insert(view, incoming.finalization);
                        let _ = response.send(true);
                        info!(
                            network = ?incoming.network,
                            %view,
                            "stored finalization"
                        );
                    }
                    Message::GetFinalization { incoming, response } => {
                        // Ensure we care
                        let Some(network) = finalizations.get(&incoming.network) else {
                            let _ = response.send(None);
                            continue;
                        };

                        // Get latest finalization
                        let Some(data) = network.iter().next_back().map(|(_, data)| data.clone())
                        else {
                            let _ = response.send(None);
                            continue;
                        };
                        let _ = response.send(Some(data));
                    }
                }
            }
        });

        // Start listener
        let mut listener = context.bind(socket).await.expect("failed to bind listener");
        let config = StreamConfig {
            signing_key: signer,
            namespace: INDEXER_NAMESPACE.to_vec(),
            max_message_size: 1024 * 1024,
            synchrony_bound: Duration::from_secs(1),
            max_handshake_age: Duration::from_secs(60),
            handshake_timeout: Duration::from_secs(5),
        };
        loop {
            // Listen for connection
            let Ok((_, sink, stream)) = listener.accept().await else {
                debug!("failed to accept connection");
                continue;
            };

            let (peer, mut sender, mut receiver) = match listen(
                context.with_label("listener"),
                |peer| {
                    let out = validators.position(&peer).is_some();
                    async move { out }
                },
                config.clone(),
                stream,
                sink,
            )
            .await
            {
                Ok(x) => x,
                Err(e) => {
                    debug!(error = ?e, "failed to upgrade connection");
                    continue;
                }
            };
            info!(?peer, "upgraded connection");

            // Spawn message handler
            context.with_label("connection").spawn({
                let handler = handler.clone();
                move |_| async move {
                    // Handle messages
                    while let Ok(msg) = receiver.recv().await {
                        // Decode message
                        let msg = match Inbound::decode(msg) {
                            Ok(msg) => msg,
                            Err(err) => {
                                debug!(?err, ?peer, "failed to decode message");
                                return;
                            }
                        };

                        // Handle message
                        match msg {
                            Inbound::PutBlock(msg) => {
                                let (response, receiver) = oneshot::channel();
                                handler
                                    .send(Message::PutBlock {
                                        incoming: msg,
                                        response,
                                    })
                                    .expect("failed to send message");
                                let success = receiver.await.expect("failed to receive response");
                                let msg = Outbound::<Sha256Digest>::Success(success).encode();
                                if sender.send(msg).await.is_err() {
                                    debug!(?peer, "failed to send message");
                                    return;
                                }
                            }
                            Inbound::GetBlock(msg) => {
                                let (response, receiver) = oneshot::channel();
                                handler
                                    .send(Message::GetBlock {
                                        incoming: msg,
                                        response,
                                    })
                                    .expect("failed to send message");
                                let response = receiver.await.expect("failed to receive response");
                                match response {
                                    Some(block) => {
                                        let msg = Outbound::Block(block).encode();
                                        if sender.send(msg).await.is_err() {
                                            debug!(?peer, "failed to send message");
                                            return;
                                        }
                                    }
                                    None => {
                                        let msg = Outbound::<Sha256Digest>::Success(false).encode();
                                        if sender.send(msg).await.is_err() {
                                            debug!(?peer, "failed to send message");
                                            return;
                                        }
                                    }
                                }
                            }
                            Inbound::PutFinalization(msg) => {
                                let (response, receiver) = oneshot::channel();
                                handler
                                    .send(Message::PutFinalization {
                                        incoming: msg,
                                        response,
                                    })
                                    .expect("failed to send message");
                                let success = receiver.await.expect("failed to receive response");
                                let msg = Outbound::<Sha256Digest>::Success(success).encode();
                                if sender.send(msg).await.is_err() {
                                    debug!(?peer, "failed to send message");
                                    return;
                                }
                            }
                            Inbound::GetFinalization(msg) => {
                                let (response, receiver) = oneshot::channel();
                                handler
                                    .send(Message::GetFinalization {
                                        incoming: msg,
                                        response,
                                    })
                                    .expect("failed to send message");
                                let response = receiver.await.expect("failed to receive response");
                                match response {
                                    Some(data) => {
                                        let msg = Outbound::Finalization(data).encode();
                                        if sender.send(msg).await.is_err() {
                                            debug!(?peer, "failed to send message");
                                            return;
                                        }
                                    }
                                    None => {
                                        let msg = Outbound::<Sha256Digest>::Success(false).encode();
                                        if sender.send(msg).await.is_err() {
                                            debug!(?peer, "failed to send message");
                                            return;
                                        }
                                    }
                                }
                            }
                        }
                    }
                }
            });
        }
    });
}