use futures::stream::FuturesOrdered;
use futures::StreamExt;
use gadget_io::tokio::task::JoinHandle;
use gadget_sdk::clients::tangle::runtime::TangleRuntimeClient;
use gadget_sdk::network::Network;
use gadget_sdk::prometheus::PrometheusConfig;
use gadget_sdk::store::{ECDSAKeyStore, KeyValueStoreBackend};
use sp_core::{keccak_256, sr25519, Pair};
use std::collections::BTreeMap;
use std::time::Duration;
use crate::sdk::config::SingleGadgetConfig;
pub use gadget_io::KeystoreContainer;
use gadget_io::SubstrateKeystore;
use gadget_sdk::debug;
use gadget_sdk::network::gossip::GossipHandle;
use gadget_sdk::network::setup::NetworkConfig;
use itertools::Itertools;
use libp2p::Multiaddr;
pub struct NodeInput<N: Network, KBE: KeyValueStoreBackend, D> {
pub clients: Vec<TangleRuntimeClient>,
pub networks: Vec<N>,
pub account_id: sr25519::Public,
pub keystore: ECDSAKeyStore<KBE>,
pub node_index: usize,
pub additional_params: D,
pub prometheus_config: PrometheusConfig,
}
pub type SingleGadgetInput<KBE> = NodeInput<GossipHandle, KBE, ()>;
#[tracing::instrument(skip(config))]
pub async fn generate_node_input<KBE: KeyValueStoreBackend>(
config: SingleGadgetConfig<KBE>,
) -> color_eyre::Result<(SingleGadgetInput<KBE>, JoinHandle<()>)> {
let keystore_config = KeystoreContainer::new(&config.keystore)?;
let (ecdsa_key, acco_key) = (keystore_config.ecdsa_key()?, keystore_config.sr25519_key()?);
let keystore = ECDSAKeyStore::new(config.keystore_backend.clone(), ecdsa_key.clone());
let network_key = &mut acco_key.as_ref().to_half_ed25519_bytes()[..32];
let libp2p_key = libp2p::identity::Keypair::ed25519_from_bytes(network_key)
.map_err(|e| color_eyre::eyre::eyre!("Failed to create libp2p keypair: {e}"))?;
let network_ids = (0..config.n_protocols)
.map(|_| format!("{:?}", config.services[0]))
.map(|r| keccak_256(r.as_bytes()))
.map(hex::encode)
.enumerate()
.map(|(id, r)| format!("/tangle/{r}-{id}/1.0.0"))
.sorted()
.collect::<Vec<_>>();
let libp2p_config = NetworkConfig::new(
libp2p_key,
ecdsa_key,
config.bootnodes.clone(),
config.bind_port,
network_ids.clone(),
);
let (networks, network_task) =
gadget_sdk::network::setup::multiplexed_libp2p_network(libp2p_config)
.map_err(|e| color_eyre::eyre::eyre!("Failed to setup network: {e}"))?;
debug!("Successfully initialized network, now waiting for bootnodes to connect ...");
wait_for_connection_to_bootnodes(&config.bootnodes, &networks).await?;
let client =
TangleRuntimeClient::from_url(&config.bind_ip.to_string(), acco_key.public().0.into())
.await?;
let networks = networks
.into_iter()
.sorted_by_key(|r| r.0.clone())
.map(|r| r.1)
.collect::<Vec<_>>();
let clients = (0..networks.len())
.map(|_| client.clone())
.collect::<Vec<_>>();
let node_input = NodeInput::<GossipHandle, KBE, ()> {
clients,
account_id: acco_key.public(),
keystore,
node_index: 0,
additional_params: (),
prometheus_config: PrometheusConfig::Disabled,
networks,
};
Ok((node_input, network_task))
}
pub async fn wait_for_connection_to_bootnodes(
bootnodes: &[Multiaddr],
handles: &BTreeMap<String, GossipHandle>,
) -> color_eyre::Result<()> {
let n_required = bootnodes.len();
let n_networks = handles.len();
debug!("Waiting for {n_required} peers to show up across {n_networks} networks");
let mut tasks = FuturesOrdered::new();
for handle in handles.values() {
tasks.push_back(wait_for_peers(handle, n_required));
}
tasks.collect::<()>().await;
Ok(())
}
async fn wait_for_peers(handle: &GossipHandle, required: usize) {
loop {
let n_connected = handle.connected_peers();
if n_connected >= required {
return;
}
let topic = handle.topic();
debug!("`{topic}`: We currently have {n_connected}/{required} peers connected to network");
gadget_io::tokio::time::sleep(Duration::from_millis(1000)).await;
}
}