use crate::{error::{Error, Result}, interval::ExpIncInterval, ServicetoWorkerMsg};
use std::collections::{HashMap, HashSet};
use std::convert::TryInto;
use std::marker::PhantomData;
use std::sync::Arc;
use std::time::Duration;
use futures::channel::mpsc;
use futures::{FutureExt, Stream, StreamExt, stream::Fuse};
use addr_cache::AddrCache;
use async_trait::async_trait;
use codec::Decode;
use libp2p::{core::multiaddr, multihash::{Multihash, Hasher}};
use log::{debug, error, log_enabled};
use prometheus_endpoint::{Counter, CounterVec, Gauge, Opts, U64, register};
use prost::Message;
use rand::{seq::SliceRandom, thread_rng};
use sc_client_api::blockchain::HeaderBackend;
use sc_network::{
DhtEvent,
ExHashT,
Multiaddr,
NetworkStateInfo,
PeerId,
};
use sp_authority_discovery::{AuthorityDiscoveryApi, AuthorityId, AuthoritySignature, AuthorityPair};
use sp_core::crypto::{key_types, Pair};
use sp_keystore::CryptoStore;
use sp_runtime::{traits::Block as BlockT, generic::BlockId};
use sp_api::ProvideRuntimeApi;
mod addr_cache;
mod schema { include!(concat!(env!("OUT_DIR"), "/authority_discovery.rs")); }
#[cfg(test)]
pub mod tests;
const LOG_TARGET: &'static str = "sub-authority-discovery";
const MAX_ADDRESSES_PER_AUTHORITY: usize = 10;
const MAX_IN_FLIGHT_LOOKUPS: usize = 8;
pub enum Role {
PublishAndDiscover(Arc<dyn CryptoStore>),
Discover,
}
pub struct Worker<Client, Network, Block, DhtEventStream> {
from_service: Fuse<mpsc::Receiver<ServicetoWorkerMsg>>,
client: Arc<Client>,
network: Arc<Network>,
dht_event_rx: DhtEventStream,
publish_interval: ExpIncInterval,
query_interval: ExpIncInterval,
pending_lookups: Vec<AuthorityId>,
in_flight_lookups: HashMap<libp2p::kad::record::Key, AuthorityId>,
addr_cache: addr_cache::AddrCache,
metrics: Option<Metrics>,
role: Role,
phantom: PhantomData<Block>,
}
impl<Client, Network, Block, DhtEventStream> Worker<Client, Network, Block, DhtEventStream>
where
Block: BlockT + Unpin + 'static,
Network: NetworkProvider,
Client: ProvideRuntimeApi<Block> + Send + Sync + 'static + HeaderBackend<Block>,
<Client as ProvideRuntimeApi<Block>>::Api:
AuthorityDiscoveryApi<Block, Error = sp_blockchain::Error>,
DhtEventStream: Stream<Item = DhtEvent> + Unpin,
{
pub(crate) fn new(
from_service: mpsc::Receiver<ServicetoWorkerMsg>,
client: Arc<Client>,
network: Arc<Network>,
dht_event_rx: DhtEventStream,
role: Role,
prometheus_registry: Option<prometheus_endpoint::Registry>,
config: crate::WorkerConfig,
) -> Self {
let publish_interval = ExpIncInterval::new(
Duration::from_secs(2),
config.max_publish_interval,
);
let query_interval = ExpIncInterval::new(
Duration::from_secs(2),
config.max_query_interval,
);
let addr_cache = AddrCache::new();
let metrics = match prometheus_registry {
Some(registry) => {
match Metrics::register(®istry) {
Ok(metrics) => Some(metrics),
Err(e) => {
error!(target: LOG_TARGET, "Failed to register metrics: {:?}", e);
None
},
}
},
None => None,
};
Worker {
from_service: from_service.fuse(),
client,
network,
dht_event_rx,
publish_interval,
query_interval,
pending_lookups: Vec::new(),
in_flight_lookups: HashMap::new(),
addr_cache,
role,
metrics,
phantom: PhantomData,
}
}
pub async fn run(mut self) {
loop {
self.start_new_lookups();
futures::select! {
event = self.dht_event_rx.next().fuse() => {
if let Some(event) = event {
self.handle_dht_event(event).await;
} else {
return;
}
},
msg = self.from_service.select_next_some() => {
self.process_message_from_service(msg);
},
_ = self.publish_interval.next().fuse() => {
if let Err(e) = self.publish_ext_addresses().await {
error!(
target: LOG_TARGET,
"Failed to publish external addresses: {:?}", e,
);
}
},
_ = self.query_interval.next().fuse() => {
if let Err(e) = self.refill_pending_lookups_queue().await {
error!(
target: LOG_TARGET,
"Failed to request addresses of authorities: {:?}", e,
);
}
},
}
}
}
fn process_message_from_service(&self, msg: ServicetoWorkerMsg) {
match msg {
ServicetoWorkerMsg::GetAddressesByAuthorityId(authority, sender) => {
let _ = sender.send(
self.addr_cache.get_addresses_by_authority_id(&authority).map(Clone::clone),
);
}
ServicetoWorkerMsg::GetAuthorityIdByPeerId(peer_id, sender) => {
let _ = sender.send(
self.addr_cache.get_authority_id_by_peer_id(&peer_id).map(Clone::clone),
);
}
}
}
fn addresses_to_publish(&self) -> impl ExactSizeIterator<Item = Multiaddr> {
let peer_id: Multihash = self.network.local_peer_id().into();
self.network.external_addresses()
.into_iter()
.map(move |a| {
if a.iter().any(|p| matches!(p, multiaddr::Protocol::P2p(_))) {
a
} else {
a.with(multiaddr::Protocol::P2p(peer_id.clone()))
}
})
}
async fn publish_ext_addresses(&mut self) -> Result<()> {
let key_store = match &self.role {
Role::PublishAndDiscover(key_store) => key_store,
Role::Discover => return Ok(()),
};
let addresses = self.addresses_to_publish();
if let Some(metrics) = &self.metrics {
metrics.publish.inc();
metrics.amount_addresses_last_published.set(
addresses.len().try_into().unwrap_or(std::u64::MAX),
);
}
let mut serialized_addresses = vec![];
schema::AuthorityAddresses { addresses: addresses.map(|a| a.to_vec()).collect() }
.encode(&mut serialized_addresses)
.map_err(Error::EncodingProto)?;
let keys = Worker::<Client, Network, Block, DhtEventStream>::get_own_public_keys_within_authority_set(
key_store.clone(),
self.client.as_ref(),
).await?.into_iter().map(Into::into).collect::<Vec<_>>();
let signatures = key_store.sign_with_all(
key_types::AUTHORITY_DISCOVERY,
keys.clone(),
serialized_addresses.as_slice(),
).await.map_err(|_| Error::Signing)?;
for (sign_result, key) in signatures.into_iter().zip(keys) {
let mut signed_addresses = vec![];
let signature = sign_result.map_err(|_| Error::MissingSignature(key.clone()))?;
schema::SignedAuthorityAddresses {
addresses: serialized_addresses.clone(),
signature,
}
.encode(&mut signed_addresses)
.map_err(Error::EncodingProto)?;
self.network.put_value(
hash_authority_id(key.1.as_ref()),
signed_addresses,
);
}
Ok(())
}
async fn refill_pending_lookups_queue(&mut self) -> Result<()> {
let id = BlockId::hash(self.client.info().best_hash);
let local_keys = match &self.role {
Role::PublishAndDiscover(key_store) => {
key_store.sr25519_public_keys(
key_types::AUTHORITY_DISCOVERY
).await.into_iter().collect::<HashSet<_>>()
},
Role::Discover => HashSet::new(),
};
let mut authorities = self
.client
.runtime_api()
.authorities(&id)
.map_err(Error::CallingRuntime)?
.into_iter()
.filter(|id| !local_keys.contains(id.as_ref()))
.collect();
self.addr_cache.retain_ids(&authorities);
authorities.shuffle(&mut thread_rng());
self.pending_lookups = authorities;
self.in_flight_lookups.clear();
if let Some(metrics) = &self.metrics {
metrics.requests_pending.set(
self.pending_lookups.len().try_into().unwrap_or(std::u64::MAX),
);
}
Ok(())
}
fn start_new_lookups(&mut self) {
while self.in_flight_lookups.len() < MAX_IN_FLIGHT_LOOKUPS {
let authority_id = match self.pending_lookups.pop() {
Some(authority) => authority,
None => return,
};
let hash = hash_authority_id(authority_id.as_ref());
self.network
.get_value(&hash);
self.in_flight_lookups.insert(hash, authority_id);
if let Some(metrics) = &self.metrics {
metrics.requests.inc();
metrics.requests_pending.set(
self.pending_lookups.len().try_into().unwrap_or(std::u64::MAX),
);
}
}
}
async fn handle_dht_event(&mut self, event: DhtEvent) {
match event {
DhtEvent::ValueFound(v) => {
if let Some(metrics) = &self.metrics {
metrics.dht_event_received.with_label_values(&["value_found"]).inc();
}
if log_enabled!(log::Level::Debug) {
let hashes: Vec<_> = v.iter().map(|(hash, _value)| hash.clone()).collect();
debug!(
target: LOG_TARGET,
"Value for hash '{:?}' found on Dht.", hashes,
);
}
if let Err(e) = self.handle_dht_value_found_event(v) {
if let Some(metrics) = &self.metrics {
metrics.handle_value_found_event_failure.inc();
}
debug!(
target: LOG_TARGET,
"Failed to handle Dht value found event: {:?}", e,
);
}
}
DhtEvent::ValueNotFound(hash) => {
if let Some(metrics) = &self.metrics {
metrics.dht_event_received.with_label_values(&["value_not_found"]).inc();
}
if self.in_flight_lookups.remove(&hash).is_some() {
debug!(
target: LOG_TARGET,
"Value for hash '{:?}' not found on Dht.", hash
)
} else {
debug!(
target: LOG_TARGET,
"Received 'ValueNotFound' for unexpected hash '{:?}'.", hash
)
}
},
DhtEvent::ValuePut(hash) => {
self.publish_interval.set_to_max();
if let Some(metrics) = &self.metrics {
metrics.dht_event_received.with_label_values(&["value_put"]).inc();
}
debug!(
target: LOG_TARGET,
"Successfully put hash '{:?}' on Dht.", hash,
)
},
DhtEvent::ValuePutFailed(hash) => {
if let Some(metrics) = &self.metrics {
metrics.dht_event_received.with_label_values(&["value_put_failed"]).inc();
}
debug!(
target: LOG_TARGET,
"Failed to put hash '{:?}' on Dht.", hash
)
}
}
}
fn handle_dht_value_found_event(
&mut self,
values: Vec<(libp2p::kad::record::Key, Vec<u8>)>,
) -> Result<()> {
let remote_key = values.iter().fold(Ok(None), |acc, (key, _)| {
match acc {
Ok(None) => Ok(Some(key.clone())),
Ok(Some(ref prev_key)) if prev_key != key => Err(
Error::ReceivingDhtValueFoundEventWithDifferentKeys
),
x @ Ok(_) => x,
Err(e) => Err(e),
}
})?.ok_or(Error::ReceivingDhtValueFoundEventWithNoRecords)?;
let authority_id: AuthorityId = self.in_flight_lookups
.remove(&remote_key)
.ok_or(Error::ReceivingUnexpectedRecord)?;
let local_peer_id = self.network.local_peer_id();
let remote_addresses: Vec<Multiaddr> = values.into_iter()
.map(|(_k, v)| {
let schema::SignedAuthorityAddresses { signature, addresses } =
schema::SignedAuthorityAddresses::decode(v.as_slice())
.map_err(Error::DecodingProto)?;
let signature = AuthoritySignature::decode(&mut &signature[..])
.map_err(Error::EncodingDecodingScale)?;
if !AuthorityPair::verify(&signature, &addresses, &authority_id) {
return Err(Error::VerifyingDhtPayload);
}
let addresses = schema::AuthorityAddresses::decode(addresses.as_slice())
.map(|a| a.addresses)
.map_err(Error::DecodingProto)?
.into_iter()
.map(|a| a.try_into())
.collect::<std::result::Result<_, _>>()
.map_err(Error::ParsingMultiaddress)?;
Ok(addresses)
})
.collect::<Result<Vec<Vec<Multiaddr>>>>()?
.into_iter()
.flatten()
.filter(|addr| addr.iter().any(|protocol| {
if let multiaddr::Protocol::P2p(hash) = protocol {
let peer_id = match PeerId::from_multihash(hash) {
Ok(peer_id) => peer_id,
Err(_) => return false,
};
return !(peer_id == local_peer_id);
}
false
}))
.take(MAX_ADDRESSES_PER_AUTHORITY)
.collect();
if !remote_addresses.is_empty() {
self.addr_cache.insert(authority_id, remote_addresses);
if let Some(metrics) = &self.metrics {
metrics.known_authorities_count.set(
self.addr_cache.num_ids().try_into().unwrap_or(std::u64::MAX)
);
}
}
Ok(())
}
async fn get_own_public_keys_within_authority_set(
key_store: Arc<dyn CryptoStore>,
client: &Client,
) -> Result<HashSet<AuthorityId>> {
let local_pub_keys = key_store
.sr25519_public_keys(key_types::AUTHORITY_DISCOVERY)
.await
.into_iter()
.collect::<HashSet<_>>();
let id = BlockId::hash(client.info().best_hash);
let authorities = client.runtime_api()
.authorities(&id)
.map_err(Error::CallingRuntime)?
.into_iter()
.map(std::convert::Into::into)
.collect::<HashSet<_>>();
let intersection = local_pub_keys.intersection(&authorities)
.cloned()
.map(std::convert::Into::into)
.collect();
Ok(intersection)
}
}
#[async_trait]
pub trait NetworkProvider: NetworkStateInfo {
fn put_value(&self, key: libp2p::kad::record::Key, value: Vec<u8>);
fn get_value(&self, key: &libp2p::kad::record::Key);
}
#[async_trait::async_trait]
impl<B, H> NetworkProvider for sc_network::NetworkService<B, H>
where
B: BlockT + 'static,
H: ExHashT,
{
fn put_value(&self, key: libp2p::kad::record::Key, value: Vec<u8>) {
self.put_value(key, value)
}
fn get_value(&self, key: &libp2p::kad::record::Key) {
self.get_value(key)
}
}
fn hash_authority_id(id: &[u8]) -> libp2p::kad::record::Key {
libp2p::kad::record::Key::new(&libp2p::multihash::Sha2_256::digest(id))
}
#[derive(Clone)]
pub(crate) struct Metrics {
publish: Counter<U64>,
amount_addresses_last_published: Gauge<U64>,
requests: Counter<U64>,
requests_pending: Gauge<U64>,
dht_event_received: CounterVec<U64>,
handle_value_found_event_failure: Counter<U64>,
known_authorities_count: Gauge<U64>,
}
impl Metrics {
pub(crate) fn register(registry: &prometheus_endpoint::Registry) -> Result<Self> {
Ok(Self {
publish: register(
Counter::new(
"authority_discovery_times_published_total",
"Number of times authority discovery has published external addresses."
)?,
registry,
)?,
amount_addresses_last_published: register(
Gauge::new(
"authority_discovery_amount_external_addresses_last_published",
"Number of external addresses published when authority discovery last \
published addresses."
)?,
registry,
)?,
requests: register(
Counter::new(
"authority_discovery_authority_addresses_requested_total",
"Number of times authority discovery has requested external addresses of a \
single authority."
)?,
registry,
)?,
requests_pending: register(
Gauge::new(
"authority_discovery_authority_address_requests_pending",
"Number of pending authority address requests."
)?,
registry,
)?,
dht_event_received: register(
CounterVec::new(
Opts::new(
"authority_discovery_dht_event_received",
"Number of dht events received by authority discovery."
),
&["name"],
)?,
registry,
)?,
handle_value_found_event_failure: register(
Counter::new(
"authority_discovery_handle_value_found_event_failure",
"Number of times handling a dht value found event failed."
)?,
registry,
)?,
known_authorities_count: register(
Gauge::new(
"authority_discovery_known_authorities_count",
"Number of authorities known by authority discovery."
)?,
registry,
)?,
})
}
}
#[cfg(test)]
impl<Block, Client, Network, DhtEventStream> Worker<Client, Network, Block, DhtEventStream> {
pub(crate) fn inject_addresses(&mut self, authority: AuthorityId, addresses: Vec<Multiaddr>) {
self.addr_cache.insert(authority, addresses);
}
}