use crate::{config::MAX_ADDRESSES, schema::Response};
use codec::{CompactRef, Decode, Encode};
use cumulus_primitives_core::{relay_chain::Hash as RelayHash, ParaId};
use cumulus_relay_chain_interface::{RelayChainError, RelayChainInterface, RelayChainResult};
use futures::{
channel::oneshot,
future::{BoxFuture, Fuse, FusedFuture},
pin_mut,
stream::FuturesUnordered,
FutureExt, StreamExt,
};
use log::{debug, error, info, trace, warn};
use prost::Message;
use sc_network::{
event::{DhtEvent, Event},
request_responses::{IfDisconnected, RequestFailure},
service::traits::NetworkService,
KademliaKey, Multiaddr, PeerId, ProtocolName,
};
use sp_consensus_babe::{Epoch, Randomness};
use std::{collections::HashSet, pin::Pin, sync::Arc, time::Duration};
use tokio::time::{sleep, Sleep};
const LOG_TARGET: &str = "bootnodes::discovery";
const RETRY_DELAY: Duration = Duration::from_secs(30);
pub struct BootnodeDiscoveryParams {
pub para_id: ParaId,
pub parachain_network: Arc<dyn NetworkService>,
pub parachain_genesis_hash: Vec<u8>,
pub parachain_fork_id: Option<String>,
pub relay_chain_interface: Arc<dyn RelayChainInterface>,
pub relay_chain_network: Arc<dyn NetworkService>,
pub paranode_protocol_name: ProtocolName,
}
pub struct BootnodeDiscovery {
para_id_scale_compact: Vec<u8>,
parachain_network: Arc<dyn NetworkService>,
parachain_genesis_hash: Vec<u8>,
parachain_fork_id: Option<String>,
relay_chain_interface: Arc<dyn RelayChainInterface>,
relay_chain_network: Arc<dyn NetworkService>,
latest_relay_chain_hash: Option<RelayHash>,
key_being_discovered: Option<KademliaKey>,
paranode_protocol_name: ProtocolName,
pending_responses: FuturesUnordered<
BoxFuture<
'static,
(PeerId, Result<Result<(Vec<u8>, ProtocolName), RequestFailure>, oneshot::Canceled>),
>,
>,
direct_requests: HashSet<PeerId>,
find_node_queries: HashSet<PeerId>,
pending_start_discovery: Pin<Box<Fuse<Sleep>>>,
succeeded: bool,
}
impl BootnodeDiscovery {
pub fn new(
BootnodeDiscoveryParams {
para_id,
parachain_network,
parachain_genesis_hash,
parachain_fork_id,
relay_chain_interface,
relay_chain_network,
paranode_protocol_name,
}: BootnodeDiscoveryParams,
) -> Self {
Self {
para_id_scale_compact: CompactRef(¶_id).encode(),
parachain_network,
parachain_genesis_hash,
parachain_fork_id,
relay_chain_interface,
relay_chain_network,
latest_relay_chain_hash: None,
key_being_discovered: None,
paranode_protocol_name,
pending_responses: FuturesUnordered::default(),
direct_requests: HashSet::new(),
find_node_queries: HashSet::new(),
pending_start_discovery: Box::pin(sleep(Duration::ZERO).fuse()),
succeeded: false,
}
}
async fn current_epoch(&mut self, hash: RelayHash) -> RelayChainResult<Epoch> {
let res = self
.relay_chain_interface
.call_runtime_api("BabeApi_current_epoch", hash, &[])
.await?;
Decode::decode(&mut &*res).map_err(Into::into)
}
fn epoch_key(&self, randomness: Randomness) -> KademliaKey {
self.para_id_scale_compact
.clone()
.into_iter()
.chain(randomness.into_iter())
.collect::<Vec<_>>()
.into()
}
async fn start_discovery(&mut self) -> RelayChainResult<()> {
let Some(hash) = self.latest_relay_chain_hash else {
error!(
target: LOG_TARGET,
"Failed to start bootnode discovery: no relay chain hash available. This is a bug.",
);
return Err(RelayChainError::GenericError("no relay chain hash available".to_string()));
};
let current_epoch = self.current_epoch(hash).await?;
let current_epoch_key = self.epoch_key(current_epoch.randomness);
self.key_being_discovered = Some(current_epoch_key.clone());
self.relay_chain_network.get_providers(current_epoch_key.clone());
debug!(
target: LOG_TARGET,
"Started discovery of parachain bootnode providers for current epoch key {}",
hex::encode(current_epoch_key),
);
Ok(())
}
fn maybe_retry_discovery(&mut self) -> bool {
let discovery_in_progress = self.key_being_discovered.is_some() ||
!self.pending_responses.is_empty() ||
!self.find_node_queries.is_empty();
let discovery_scheduled = !self.pending_start_discovery.is_terminated();
if discovery_in_progress || discovery_scheduled {
true
} else {
if self.succeeded {
info!(
target: LOG_TARGET,
"Parachain bootnode discovery on the relay chain DHT succeeded",
);
false
} else {
debug!(
target: LOG_TARGET,
"Retrying parachain bootnode discovery on the relay chain DHT in {RETRY_DELAY:?}",
);
self.pending_start_discovery = Box::pin(sleep(RETRY_DELAY).fuse());
true
}
}
}
fn request_bootnode(&mut self, peer_id: PeerId) {
trace!(
target: LOG_TARGET,
"Requesting parachain bootnode from the relay chain {peer_id:?}",
);
let (tx, rx) = oneshot::channel();
self.relay_chain_network.start_request(
peer_id,
self.paranode_protocol_name.clone(),
self.para_id_scale_compact.clone(),
None,
tx,
IfDisconnected::TryConnect,
);
self.pending_responses.push(async move { (peer_id, rx.await) }.boxed());
}
fn handle_providers(&mut self, providers: Vec<PeerId>) {
debug!(
target: LOG_TARGET,
"Found parachain bootnode providers on the relay chain: {providers:?}",
);
for peer_id in providers {
if peer_id == self.relay_chain_network.local_peer_id() {
continue;
}
if self.direct_requests.contains(&peer_id) || self.find_node_queries.contains(&peer_id)
{
continue;
}
self.direct_requests.insert(peer_id);
self.request_bootnode(peer_id);
}
}
fn handle_response(
&mut self,
peer_id: PeerId,
res: Result<Result<(Vec<u8>, ProtocolName), RequestFailure>, oneshot::Canceled>,
) {
let direct_request = self.direct_requests.remove(&peer_id);
let response = match res {
Ok(Ok((payload, _))) => match Response::decode(payload.as_slice()) {
Ok(response) => response,
Err(e) => {
warn!(
target: LOG_TARGET,
"Failed to decode parachain bootnode response from {peer_id:?}: {e}",
);
return;
},
},
Ok(Err(e)) => {
if direct_request {
debug!(
target: LOG_TARGET,
"Failed to directly query parachain bootnode from {peer_id:?}: {e}. \
Starting FIND_NODE query on the DHT",
);
self.find_node_queries.insert(peer_id);
self.relay_chain_network.find_closest_peers(peer_id);
} else {
debug!(
target: LOG_TARGET,
"Failed to query parachain bootnode from {peer_id:?} after finding
the node addresses on the DHT: {e}",
);
}
return;
},
Err(_) => {
debug!(
target: LOG_TARGET,
"Parachain bootnode request to {peer_id:?} canceled. \
The node is likely terminating.",
);
return;
},
};
match (response.genesis_hash, response.fork_id) {
(genesis_hash, fork_id)
if genesis_hash == self.parachain_genesis_hash &&
fork_id == self.parachain_fork_id => {},
(genesis_hash, fork_id) => {
warn!(
target: LOG_TARGET,
"Received invalid parachain bootnode response from {peer_id:?}: \
genesis hash {}, fork ID {:?} don't match expected genesis hash {}, fork ID {:?}",
hex::encode(genesis_hash),
fork_id,
hex::encode(&self.parachain_genesis_hash),
self.parachain_fork_id,
);
return;
},
}
let paranode_peer_id = match PeerId::from_bytes(response.peer_id.as_slice()) {
Ok(peer_id) => peer_id,
Err(e) => {
warn!(
target: LOG_TARGET,
"Failed to decode parachain peer ID in response from {peer_id:?}: {e}",
);
return;
},
};
if paranode_peer_id == self.parachain_network.local_peer_id() {
warn!(
target: LOG_TARGET,
"Received own parachain node peer ID in bootnode response from {peer_id:?}. \
This should not happen as we don't request parachain bootnodes from self.",
);
return;
}
let paranode_addresses = response
.addrs
.into_iter()
.map(Multiaddr::try_from)
.take(MAX_ADDRESSES)
.collect::<Result<Vec<_>, _>>();
let paranode_addresses = match paranode_addresses {
Ok(paranode_addresses) => paranode_addresses,
Err(e) => {
warn!(
target: LOG_TARGET,
"Failed to decode parachain node addresses in response from {peer_id:?}: {e}",
);
return;
},
};
debug!(
target: LOG_TARGET,
"Discovered parachain bootnode {paranode_peer_id:?} with addresses {paranode_addresses:?}",
);
paranode_addresses.into_iter().for_each(|addr| {
self.parachain_network.add_known_address(paranode_peer_id, addr);
self.succeeded = true;
});
}
fn handle_dht_event(&mut self, event: DhtEvent) {
match event {
DhtEvent::ProvidersFound(key, providers)
if Some(key.clone()) == self.key_being_discovered && !providers.is_empty() =>
self.handle_providers(providers),
DhtEvent::NoMoreProviders(key) if Some(key.clone()) == self.key_being_discovered => {
debug!(
target: LOG_TARGET,
"Parachain bootnode providers discovery finished for key {}",
hex::encode(key),
);
self.key_being_discovered = None;
},
DhtEvent::ProvidersNotFound(key) if Some(key.clone()) == self.key_being_discovered => {
debug!(
target: LOG_TARGET,
"Parachain bootnode providers not found for key {}",
hex::encode(key),
);
self.key_being_discovered = None;
},
DhtEvent::ClosestPeersFound(peer_id, peers) if self.find_node_queries.remove(&peer_id) => {
if let Some((_, addrs)) = peers
.into_iter()
.find(|(peer, addrs)| peer == &peer_id && !addrs.is_empty())
{
trace!(
target: LOG_TARGET,
"Found addresses on the DHT for parachain bootnode provider {peer_id:?}: {addrs:?}",
);
for address in addrs {
self.relay_chain_network.add_known_address(peer_id, address);
}
self.request_bootnode(peer_id);
} else {
debug!(
target: LOG_TARGET,
"Failed to find addresses on the DHT for parachain bootnode provider {peer_id:?}",
);
}
},
DhtEvent::ClosestPeersNotFound(peer_id) if self.find_node_queries.remove(&peer_id) => {
debug!(
target: LOG_TARGET,
"Failed to find addresses on the DHT for parachain bootnode provider {peer_id:?}",
);
},
_ => {},
}
}
pub async fn run(mut self) -> RelayChainResult<()> {
let mut import_notification_stream =
self.relay_chain_interface.import_notification_stream().await?.fuse();
let dht_event_stream = self
.relay_chain_network
.event_stream("parachain-bootnode-discovery")
.filter_map(|e| async move {
match e {
Event::Dht(e) => Some(e),
_ => None,
}
})
.fuse();
pin_mut!(dht_event_stream);
let header = import_notification_stream.select_next_some().await;
self.latest_relay_chain_hash = Some(header.hash());
loop {
if !self.maybe_retry_discovery() {
return Ok(());
}
tokio::select! {
_ = &mut self.pending_start_discovery => {
self.start_discovery().await?;
},
header = import_notification_stream.select_next_some() => {
self.latest_relay_chain_hash = Some(header.hash());
},
event = dht_event_stream.select_next_some() => self.handle_dht_event(event),
(peer_id, res) = self.pending_responses.select_next_some(),
if !self.pending_responses.is_empty() =>
self.handle_response(peer_id, res),
}
}
}
}