use crate::platform::PlatformRef;
use alloc::{
boxed::Box,
format,
string::{String, ToString as _},
sync::Arc,
vec::Vec,
};
use async_lock::Mutex;
use core::{cmp, num::NonZeroUsize, task::Poll, time::Duration};
use futures_channel::{mpsc, oneshot};
use futures_util::{future, stream, FutureExt as _, SinkExt as _, StreamExt as _};
use hashbrown::{hash_map, HashMap, HashSet};
use itertools::Itertools as _;
use smoldot::{
header,
informant::{BytesDisplay, HashDisplay},
libp2p::{connection, multiaddr::Multiaddr, peer_id::PeerId, peers},
network::{protocol, service},
};
pub use service::EncodedMerkleProof;
mod tasks;
pub struct Config<TPlat> {
pub platform: TPlat,
pub identify_agent_version: String,
pub noise_key: connection::NoiseKey,
pub num_events_receivers: usize,
pub chains: Vec<ConfigChain>,
}
pub struct ConfigChain {
pub log_name: String,
pub genesis_block_hash: [u8; 32],
pub finalized_block_height: u64,
pub best_block: (u64, [u8; 32]),
pub fork_id: Option<String>,
pub block_number_bytes: usize,
pub has_grandpa_protocol: bool,
}
pub struct NetworkService<TPlat: PlatformRef> {
shared: Arc<Shared<TPlat>>,
abort_handles: Vec<future::AbortHandle>,
}
struct Shared<TPlat: PlatformRef> {
guarded: Mutex<SharedGuarded<TPlat>>,
platform: TPlat,
identify_agent_version: String,
log_chain_names: Vec<String>,
wake_up_main_background_task: event_listener::Event,
}
struct SharedGuarded<TPlat: PlatformRef> {
network: service::ChainNetwork<TPlat::Instant>,
important_nodes: HashSet<PeerId, fnv::FnvBuildHasher>,
slots_assign_backoff: HashMap<(PeerId, usize), TPlat::Instant, fnv::FnvBuildHasher>,
messages_from_connections_tx:
mpsc::Sender<(service::ConnectionId, service::ConnectionToCoordinator)>,
messages_from_connections_rx:
mpsc::Receiver<(service::ConnectionId, service::ConnectionToCoordinator)>,
active_connections: HashMap<
service::ConnectionId,
mpsc::Sender<service::CoordinatorToConnection<TPlat::Instant>>,
fnv::FnvBuildHasher,
>,
blocks_requests: HashMap<
service::OutRequestId,
oneshot::Sender<Result<Vec<protocol::BlockData>, service::BlocksRequestError>>,
fnv::FnvBuildHasher,
>,
grandpa_warp_sync_requests: HashMap<
service::OutRequestId,
oneshot::Sender<
Result<service::EncodedGrandpaWarpSyncResponse, service::GrandpaWarpSyncRequestError>,
>,
fnv::FnvBuildHasher,
>,
storage_proof_requests: HashMap<
service::OutRequestId,
oneshot::Sender<Result<service::EncodedMerkleProof, service::StorageProofRequestError>>,
fnv::FnvBuildHasher,
>,
call_proof_requests: HashMap<
service::OutRequestId,
oneshot::Sender<Result<service::EncodedMerkleProof, service::CallProofRequestError>>,
fnv::FnvBuildHasher,
>,
kademlia_discovery_operations:
HashMap<service::KademliaOperationId, usize, fnv::FnvBuildHasher>,
}
impl<TPlat: PlatformRef> NetworkService<TPlat> {
pub async fn new(config: Config<TPlat>) -> (Arc<Self>, Vec<stream::BoxStream<'static, Event>>) {
let (event_senders, event_receivers): (Vec<_>, Vec<_>) = (0..config.num_events_receivers)
.map(|_| mpsc::channel(16))
.unzip();
let num_chains = config.chains.len();
let mut chains = Vec::with_capacity(num_chains);
let mut log_chain_names = Vec::with_capacity(num_chains);
for chain in config.chains {
chains.push(service::ChainConfig {
in_slots: 3,
out_slots: 4,
grandpa_protocol_config: if chain.has_grandpa_protocol {
Some(service::GrandpaState {
commit_finalized_height: chain.finalized_block_height,
round_number: 1,
set_id: 0,
})
} else {
None
},
fork_id: chain.fork_id.clone(),
block_number_bytes: chain.block_number_bytes,
best_hash: chain.best_block.1,
best_number: chain.best_block.0,
genesis_hash: chain.genesis_block_hash,
role: protocol::Role::Light,
allow_inbound_block_requests: false,
});
log_chain_names.push(chain.log_name);
}
let mut abort_handles = Vec::new();
let (messages_from_connections_tx, messages_from_connections_rx) = mpsc::channel(32);
let shared = Arc::new(Shared {
guarded: Mutex::new(SharedGuarded {
network: service::ChainNetwork::new(service::Config {
now: config.platform.now(),
chains,
connections_capacity: 32,
peers_capacity: 8,
max_addresses_per_peer: NonZeroUsize::new(5).unwrap(),
noise_key: config.noise_key,
handshake_timeout: Duration::from_secs(8),
randomness_seed: rand::random(),
}),
slots_assign_backoff: HashMap::with_capacity_and_hasher(32, Default::default()),
important_nodes: HashSet::with_capacity_and_hasher(16, Default::default()),
active_connections: HashMap::with_capacity_and_hasher(32, Default::default()),
messages_from_connections_tx,
messages_from_connections_rx,
blocks_requests: HashMap::with_capacity_and_hasher(8, Default::default()),
grandpa_warp_sync_requests: HashMap::with_capacity_and_hasher(
8,
Default::default(),
),
storage_proof_requests: HashMap::with_capacity_and_hasher(8, Default::default()),
call_proof_requests: HashMap::with_capacity_and_hasher(8, Default::default()),
kademlia_discovery_operations: HashMap::with_capacity_and_hasher(
2,
Default::default(),
),
}),
platform: config.platform,
identify_agent_version: config.identify_agent_version,
log_chain_names,
wake_up_main_background_task: event_listener::Event::new(),
});
shared.platform.spawn_task(
"network-service".into(),
Box::pin({
let shared = shared.clone();
let future = background_task(shared, event_senders);
let (abortable, abort_handle) = future::abortable(future);
abort_handles.push(abort_handle);
abortable.map(|_| ())
}),
);
shared.platform.spawn_task(
"network-discovery".into(),
Box::pin({
let shared = shared.clone();
let future = async move {
let mut next_discovery = Duration::from_secs(5);
loop {
shared.platform.sleep(next_discovery).await;
next_discovery = cmp::min(next_discovery * 2, Duration::from_secs(120));
let mut guarded = shared.guarded.lock().await;
for chain_index in 0..shared.log_chain_names.len() {
let operation_id = guarded
.network
.start_kademlia_discovery_round(shared.platform.now(), chain_index);
let _prev_value = guarded
.kademlia_discovery_operations
.insert(operation_id, chain_index);
debug_assert!(_prev_value.is_none());
}
shared.wake_up_main_background_task.notify(1);
}
};
let (abortable, abort_handle) = future::abortable(future);
abort_handles.push(abort_handle);
abortable.map(|_| ())
}),
);
abort_handles.shrink_to_fit();
let final_network_service = Arc::new(NetworkService {
shared,
abort_handles,
});
let event_receivers = event_receivers
.into_iter()
.map(|rx| {
let mut final_network_service = Some(final_network_service.clone());
rx.chain(stream::poll_fn(move |_| {
drop(final_network_service.take());
Poll::Ready(None)
}))
.boxed()
})
.collect();
(final_network_service, event_receivers)
}
pub async fn blocks_request(
self: Arc<Self>,
target: PeerId, chain_index: usize,
config: protocol::BlocksRequestConfig,
timeout: Duration,
) -> Result<Vec<protocol::BlockData>, BlocksRequestError> {
let rx = {
let mut guarded = self.shared.guarded.lock().await;
if !guarded.network.can_start_requests(&target) {
return Err(BlocksRequestError::NoConnection);
}
match &config.start {
protocol::BlocksRequestConfigStart::Hash(hash) => {
log::debug!(
target: "network",
"Connection({}) <= BlocksRequest(chain={}, start={}, num={}, descending={:?}, header={:?}, body={:?}, justifications={:?})",
target, self.shared.log_chain_names[chain_index], HashDisplay(hash),
config.desired_count.get(),
matches!(config.direction, protocol::BlocksRequestDirection::Descending),
config.fields.header, config.fields.body, config.fields.justifications
);
}
protocol::BlocksRequestConfigStart::Number(number) => {
log::debug!(
target: "network",
"Connection({}) <= BlocksRequest(chain={}, start=#{}, num={}, descending={:?}, header={:?}, body={:?}, justifications={:?})",
target, self.shared.log_chain_names[chain_index], number,
config.desired_count.get(),
matches!(config.direction, protocol::BlocksRequestDirection::Descending),
config.fields.header, config.fields.body, config.fields.justifications
);
}
}
let request_id = guarded.network.start_blocks_request(
self.shared.platform.now(),
&target,
chain_index,
config,
timeout,
);
self.shared.wake_up_main_background_task.notify(1);
let (tx, rx) = oneshot::channel();
guarded.blocks_requests.insert(request_id, tx);
rx
};
let result = rx.await.unwrap();
match &result {
Ok(blocks) => {
log::debug!(
target: "network",
"Connection({}) => BlocksRequest(chain={}, num_blocks={}, block_data_total_size={})",
target,
self.shared.log_chain_names[chain_index],
blocks.len(),
BytesDisplay(blocks.iter().fold(0, |sum, block| {
let block_size = block.header.as_ref().map_or(0, |h| h.len()) +
block.body.as_ref().map_or(0, |b| b.iter().fold(0, |s, e| s + e.len())) +
block.justifications.as_ref().into_iter().flat_map(|l| l.iter()).fold(0, |s, j| s + j.1.len());
sum + u64::try_from(block_size).unwrap()
}))
);
}
Err(err) => {
log::debug!(
target: "network",
"Connection({}) => BlocksRequest(chain={}, error={:?})",
target,
self.shared.log_chain_names[chain_index],
err
);
}
}
if !log::log_enabled!(log::Level::Debug) {
match &result {
Ok(_)
| Err(service::BlocksRequestError::EmptyResponse)
| Err(service::BlocksRequestError::NotVerifiable) => {}
Err(service::BlocksRequestError::Request(err)) if !err.is_protocol_error() => {}
Err(err) => {
log::warn!(
target: "network",
"Error in block request with {}. This might indicate an incompatibility. Error: {}",
target,
err
);
}
}
}
result.map_err(BlocksRequestError::Request)
}
pub async fn grandpa_warp_sync_request(
self: Arc<Self>,
target: PeerId, chain_index: usize,
begin_hash: [u8; 32],
timeout: Duration,
) -> Result<service::EncodedGrandpaWarpSyncResponse, GrandpaWarpSyncRequestError> {
let rx = {
let mut guarded = self.shared.guarded.lock().await;
if !guarded.network.can_start_requests(&target) {
return Err(GrandpaWarpSyncRequestError::NoConnection);
}
log::debug!(
target: "network", "Connection({}) <= GrandpaWarpSyncRequest(chain={}, start={})",
target, self.shared.log_chain_names[chain_index], HashDisplay(&begin_hash)
);
let request_id = guarded.network.start_grandpa_warp_sync_request(
self.shared.platform.now(),
&target,
chain_index,
begin_hash,
timeout,
);
self.shared.wake_up_main_background_task.notify(1);
let (tx, rx) = oneshot::channel();
guarded.grandpa_warp_sync_requests.insert(request_id, tx);
rx
};
let result = rx.await.unwrap();
match &result {
Ok(response) => {
let decoded = response.decode();
log::debug!(
target: "network",
"Connection({}) => GrandpaWarpSyncRequest(chain={}, num_fragments={}, finished={:?})",
target,
self.shared.log_chain_names[chain_index],
decoded.fragments.len(),
decoded.is_finished,
);
}
Err(err) => {
log::debug!(
target: "network",
"Connection({}) => GrandpaWarpSyncRequest(chain={}, error={:?})",
target,
self.shared.log_chain_names[chain_index],
err,
);
}
}
result.map_err(GrandpaWarpSyncRequestError::Request)
}
pub async fn set_local_best_block(
&self,
chain_index: usize,
best_hash: [u8; 32],
best_number: u64,
) {
self.shared
.guarded
.lock()
.await
.network
.set_local_best_block(chain_index, best_hash, best_number)
}
pub async fn set_local_grandpa_state(
&self,
chain_index: usize,
grandpa_state: service::GrandpaState,
) {
log::debug!(
target: "network",
"Chain({}) <= SetLocalGrandpaState(set_id: {}, commit_finalized_height: {})",
self.shared.log_chain_names[chain_index],
grandpa_state.set_id,
grandpa_state.commit_finalized_height,
);
self.shared
.guarded
.lock()
.await
.network
.set_local_grandpa_state(chain_index, grandpa_state)
}
pub async fn storage_proof_request(
self: Arc<Self>,
chain_index: usize,
target: PeerId, config: protocol::StorageProofRequestConfig<impl Iterator<Item = impl AsRef<[u8]> + Clone>>,
timeout: Duration,
) -> Result<service::EncodedMerkleProof, StorageProofRequestError> {
let rx = {
let mut guarded = self.shared.guarded.lock().await;
if !guarded.network.can_start_requests(&target) {
return Err(StorageProofRequestError::NoConnection);
}
log::debug!(
target: "network",
"Connection({}) <= StorageProofRequest(chain={}, block={})",
target,
self.shared.log_chain_names[chain_index],
HashDisplay(&config.block_hash)
);
let request_id = match guarded.network.start_storage_proof_request(
self.shared.platform.now(),
&target,
chain_index,
config,
timeout,
) {
Ok(r) => r,
Err(service::StartRequestError::RequestTooLarge) => {
return Err(StorageProofRequestError::RequestTooLarge);
}
};
self.shared.wake_up_main_background_task.notify(1);
let (tx, rx) = oneshot::channel();
guarded.storage_proof_requests.insert(request_id, tx);
rx
};
let result = rx.await.unwrap();
match &result {
Ok(items) => {
let decoded = items.decode();
log::debug!(
target: "network",
"Connection({}) => StorageProofRequest(chain={}, total_size={})",
target,
self.shared.log_chain_names[chain_index],
BytesDisplay(u64::try_from(decoded.len()).unwrap()),
);
}
Err(err) => {
log::debug!(
target: "network",
"Connection({}) => StorageProofRequest(chain={}, error={:?})",
target,
self.shared.log_chain_names[chain_index],
err
);
}
}
result.map_err(StorageProofRequestError::Request)
}
pub async fn call_proof_request(
self: Arc<Self>,
chain_index: usize,
target: PeerId, config: protocol::CallProofRequestConfig<'_, impl Iterator<Item = impl AsRef<[u8]>>>,
timeout: Duration,
) -> Result<EncodedMerkleProof, CallProofRequestError> {
let rx = {
let mut guarded = self.shared.guarded.lock().await;
if !guarded.network.can_start_requests(&target) {
return Err(CallProofRequestError::NoConnection);
}
log::debug!(
target: "network",
"Connection({}) <= CallProofRequest({}, {}, {})",
target,
self.shared.log_chain_names[chain_index],
HashDisplay(&config.block_hash),
config.method
);
let request_id = match guarded.network.start_call_proof_request(
self.shared.platform.now(),
&target,
chain_index,
config,
timeout,
) {
Ok(r) => r,
Err(service::StartRequestError::RequestTooLarge) => {
return Err(CallProofRequestError::RequestTooLarge)
}
};
self.shared.wake_up_main_background_task.notify(1);
let (tx, rx) = oneshot::channel();
guarded.call_proof_requests.insert(request_id, tx);
rx
};
let result = rx.await.unwrap();
match &result {
Ok(items) => {
let decoded = items.decode();
log::debug!(
target: "network",
"Connection({}) => CallProofRequest({}, total_size: {})",
target,
self.shared.log_chain_names[chain_index],
BytesDisplay(u64::try_from(decoded.len()).unwrap())
);
}
Err(err) => {
log::debug!(
target: "network",
"Connection({}) => CallProofRequest({}, {})",
target,
self.shared.log_chain_names[chain_index],
err
);
}
}
result.map_err(CallProofRequestError::Request)
}
pub async fn announce_transaction(
self: Arc<Self>,
chain_index: usize,
transaction: &[u8],
) -> Vec<PeerId> {
let mut sent_peers = Vec::with_capacity(16);
let mut guarded = self.shared.guarded.lock().await;
for peer in guarded
.network
.opened_transactions_substream(chain_index)
.cloned()
.collect::<Vec<_>>()
{
if guarded
.network
.announce_transaction(&peer, chain_index, transaction)
.is_ok()
{
sent_peers.push(peer);
};
}
self.shared.wake_up_main_background_task.notify(1);
sent_peers
}
pub async fn send_block_announce(
self: Arc<Self>,
target: &PeerId,
chain_index: usize,
scale_encoded_header: &[u8],
is_best: bool,
) -> Result<(), QueueNotificationError> {
let mut guarded = self.shared.guarded.lock().await;
if !guarded
.network
.can_send_block_announces(target, chain_index)
{
return Err(QueueNotificationError::NoConnection);
}
let result = guarded
.network
.send_block_announce(target, chain_index, scale_encoded_header, is_best)
.map_err(QueueNotificationError::Queue);
self.shared.wake_up_main_background_task.notify(1);
result
}
pub async fn discover(
&self,
now: &TPlat::Instant,
chain_index: usize,
list: impl IntoIterator<Item = (PeerId, impl IntoIterator<Item = Multiaddr>)>,
important_nodes: bool,
) {
let mut guarded = self.shared.guarded.lock().await;
for (peer_id, addrs) in list {
if important_nodes {
guarded.important_nodes.insert(peer_id.clone());
}
guarded.network.discover(now, chain_index, peer_id, addrs);
}
self.shared.wake_up_main_background_task.notify(1);
}
pub async fn discovered_nodes(
&self,
chain_index: usize,
) -> impl Iterator<Item = (PeerId, impl Iterator<Item = Multiaddr>)> {
let guarded = self.shared.guarded.lock().await;
guarded
.network
.discovered_nodes(chain_index)
.map(|(peer_id, addresses)| {
(
peer_id.clone(),
addresses.cloned().collect::<Vec<_>>().into_iter(),
)
})
.collect::<Vec<_>>()
.into_iter()
}
pub async fn peers_list(&self) -> impl Iterator<Item = PeerId> {
self.shared
.guarded
.lock()
.await
.network
.peers_list()
.cloned()
.collect::<Vec<_>>()
.into_iter()
}
}
impl<TPlat: PlatformRef> Drop for NetworkService<TPlat> {
fn drop(&mut self) {
for abort in &self.abort_handles {
abort.abort();
}
}
}
#[derive(Debug, Clone)]
pub enum Event {
Connected {
peer_id: PeerId,
chain_index: usize,
role: protocol::Role,
best_block_number: u64,
best_block_hash: [u8; 32],
},
Disconnected {
peer_id: PeerId,
chain_index: usize,
},
BlockAnnounce {
peer_id: PeerId,
chain_index: usize,
announce: service::EncodedBlockAnnounce,
},
GrandpaNeighborPacket {
peer_id: PeerId,
chain_index: usize,
finalized_block_height: u64,
},
GrandpaCommitMessage {
peer_id: PeerId,
chain_index: usize,
message: service::EncodedGrandpaCommitMessage,
},
}
#[derive(Debug, derive_more::Display)]
pub enum BlocksRequestError {
NoConnection,
#[display(fmt = "{_0}")]
Request(service::BlocksRequestError),
}
#[derive(Debug, derive_more::Display)]
pub enum GrandpaWarpSyncRequestError {
NoConnection,
#[display(fmt = "{_0}")]
Request(service::GrandpaWarpSyncRequestError),
}
#[derive(Debug, derive_more::Display, Clone)]
pub enum StorageProofRequestError {
NoConnection,
RequestTooLarge,
#[display(fmt = "{_0}")]
Request(service::StorageProofRequestError),
}
#[derive(Debug, derive_more::Display, Clone)]
pub enum CallProofRequestError {
NoConnection,
RequestTooLarge,
#[display(fmt = "{_0}")]
Request(service::CallProofRequestError),
}
impl CallProofRequestError {
pub fn is_network_problem(&self) -> bool {
match self {
CallProofRequestError::Request(err) => err.is_network_problem(),
CallProofRequestError::RequestTooLarge => false,
CallProofRequestError::NoConnection => true,
}
}
}
#[derive(Debug, derive_more::Display)]
pub enum QueueNotificationError {
NoConnection,
#[display(fmt = "{_0}")]
Queue(peers::QueueNotificationError),
}
async fn background_task<TPlat: PlatformRef>(
shared: Arc<Shared<TPlat>>,
mut event_senders: Vec<mpsc::Sender<Event>>,
) {
loop {
let notified = shared.wake_up_main_background_task.listen();
update_round(&shared, &mut event_senders).await;
notified.await;
}
}
async fn update_round<TPlat: PlatformRef>(
shared: &Arc<Shared<TPlat>>,
event_senders: &mut [mpsc::Sender<Event>],
) {
let mut guarded = shared.guarded.lock().await;
loop {
let (connection_id, message) =
match guarded.messages_from_connections_rx.next().now_or_never() {
Some(Some(v)) => v,
_ => break,
};
guarded
.network
.inject_connection_message(connection_id, message);
}
'events_loop: loop {
let event = loop {
let inner_event = match guarded.network.next_event(shared.platform.now()) {
Some(ev) => ev,
None => break 'events_loop,
};
match inner_event {
service::Event::Connected(peer_id) => {
log::debug!(target: "network", "Connected({})", peer_id);
}
service::Event::Disconnected {
peer_id,
chain_indices,
} => {
log::debug!(target: "network", "Disconnected({})", peer_id);
if !chain_indices.is_empty() {
if chain_indices.len() == 1 {
log::debug!(
target: "network",
"Connection({}, {}) => ChainDisconnected",
peer_id,
&shared.log_chain_names[chain_indices[0]],
);
break Event::Disconnected {
peer_id,
chain_index: chain_indices[0],
};
} else {
todo!()
}
}
}
service::Event::BlockAnnounce {
chain_index,
peer_id,
announce,
} => {
log::debug!(
target: "network",
"Connection({}, {}) => BlockAnnounce(best_hash={}, is_best={})",
peer_id,
&shared.log_chain_names[chain_index],
HashDisplay(&header::hash_from_scale_encoded_header(announce.decode().scale_encoded_header)),
announce.decode().is_best
);
break Event::BlockAnnounce {
chain_index,
peer_id,
announce,
};
}
service::Event::ChainConnected {
peer_id,
chain_index,
role,
best_number,
best_hash,
slot_ty: _,
} => {
log::debug!(
target: "network",
"Connection({}, {}) => ChainConnected(best_height={}, best_hash={})",
peer_id,
&shared.log_chain_names[chain_index],
best_number,
HashDisplay(&best_hash)
);
break Event::Connected {
peer_id,
chain_index,
role,
best_block_number: best_number,
best_block_hash: best_hash,
};
}
service::Event::ChainConnectAttemptFailed {
peer_id,
chain_index,
unassigned_slot_ty,
error,
} => {
log::debug!(
target: "network",
"Connection({}, {}) => ChainConnectAttemptFailed(error={:?})",
&shared.log_chain_names[chain_index],
peer_id, error,
);
log::debug!(
target: "connections",
"{}Slots({}) ∌ {}",
match unassigned_slot_ty {
service::SlotTy::Inbound => "In",
service::SlotTy::Outbound => "Out",
},
&shared.log_chain_names[chain_index],
peer_id
);
guarded.unassign_slot_and_ban(&shared.platform, chain_index, peer_id);
shared.wake_up_main_background_task.notify(1);
}
service::Event::ChainDisconnected {
peer_id,
chain_index,
unassigned_slot_ty,
} => {
log::debug!(
target: "network",
"Connection({}, {}) => ChainDisconnected",
peer_id,
&shared.log_chain_names[chain_index],
);
log::debug!(
target: "connections",
"{}Slots({}) ∌ {}",
match unassigned_slot_ty {
service::SlotTy::Inbound => "In",
service::SlotTy::Outbound => "Out",
},
&shared.log_chain_names[chain_index],
peer_id
);
guarded.unassign_slot_and_ban(&shared.platform, chain_index, peer_id.clone());
shared.wake_up_main_background_task.notify(1);
break Event::Disconnected {
peer_id,
chain_index,
};
}
service::Event::RequestResult {
request_id,
response: service::RequestResult::Blocks(response),
} => {
let _ = guarded
.blocks_requests
.remove(&request_id)
.unwrap()
.send(response);
}
service::Event::RequestResult {
request_id,
response: service::RequestResult::GrandpaWarpSync(response),
} => {
let _ = guarded
.grandpa_warp_sync_requests
.remove(&request_id)
.unwrap()
.send(response);
}
service::Event::RequestResult {
request_id,
response: service::RequestResult::StorageProof(response),
} => {
let _ = guarded
.storage_proof_requests
.remove(&request_id)
.unwrap()
.send(response);
}
service::Event::RequestResult {
request_id,
response: service::RequestResult::CallProof(response),
} => {
let _ = guarded
.call_proof_requests
.remove(&request_id)
.unwrap()
.send(response);
}
service::Event::RequestResult { .. } => {
unreachable!()
}
service::Event::KademliaDiscoveryResult {
operation_id,
result,
} => {
let chain_index = guarded
.kademlia_discovery_operations
.remove(&operation_id)
.unwrap();
match result {
Ok(nodes) => {
log::debug!(
target: "connections", "On chain {}, discovered: {}",
&shared.log_chain_names[chain_index],
nodes.iter().map(|(p, _)| p.to_string()).join(", ")
);
for (peer_id, addrs) in nodes {
guarded.network.discover(
&shared.platform.now(),
chain_index,
peer_id,
addrs,
);
}
}
Err(error) => {
log::debug!(
target: "connections",
"Discovery => {:?}",
error
);
match error {
service::DiscoveryError::NoPeer => {}
service::DiscoveryError::FindNode(
service::KademliaFindNodeError::RequestFailed(err),
) if !err.is_protocol_error() => {}
_ => {
log::warn!(
target: "connections",
"Problem during discovery on {}: {}",
&shared.log_chain_names[chain_index],
error
);
}
}
}
}
}
service::Event::InboundSlotAssigned {
peer_id,
chain_index,
} => {
log::debug!(
target: "connections",
"InSlots({}) ∋ {}",
&shared.log_chain_names[chain_index],
peer_id
);
}
service::Event::IdentifyRequestIn {
peer_id,
request_id,
} => {
log::debug!(
target: "network",
"Connection({}) => IdentifyRequest",
peer_id,
);
guarded
.network
.respond_identify(request_id, &shared.identify_agent_version);
}
service::Event::BlocksRequestIn { .. } => unreachable!(),
service::Event::RequestInCancel { .. } => {
unreachable!()
}
service::Event::GrandpaNeighborPacket {
chain_index,
peer_id,
state,
} => {
log::debug!(
target: "network",
"Connection({}, {}) => GrandpaNeighborPacket(round_number={}, set_id={}, commit_finalized_height={})",
peer_id,
&shared.log_chain_names[chain_index],
state.round_number,
state.set_id,
state.commit_finalized_height,
);
break Event::GrandpaNeighborPacket {
chain_index,
peer_id,
finalized_block_height: state.commit_finalized_height,
};
}
service::Event::GrandpaCommitMessage {
chain_index,
peer_id,
message,
} => {
log::debug!(
target: "network",
"Connection({}, {}) => GrandpaCommitMessage(target_block_hash={})",
peer_id,
&shared.log_chain_names[chain_index],
HashDisplay(message.decode().message.target_hash),
);
break Event::GrandpaCommitMessage {
chain_index,
peer_id,
message,
};
}
service::Event::ProtocolError { peer_id, error } => {
log::warn!(
target: "network",
"Connection({}) => ProtocolError(error={:?})",
peer_id,
error,
);
for chain_index in 0..guarded.network.num_chains() {
guarded.unassign_slot_and_ban(
&shared.platform,
chain_index,
peer_id.clone(),
);
}
shared.wake_up_main_background_task.notify(1);
}
}
};
drop(guarded);
if event_senders.len() == 1 {
let _ = event_senders[0].send(event).await;
} else {
for sender in event_senders.iter_mut() {
let _ = sender.send(event.clone()).await;
}
}
guarded = shared.guarded.lock().await;
}
for chain_index in 0..shared.log_chain_names.len() {
let now = shared.platform.now();
guarded
.slots_assign_backoff
.retain(|_, expiration| *expiration > now);
loop {
let peer_id = guarded
.network
.slots_to_assign(chain_index)
.find(|peer_id| {
!guarded
.slots_assign_backoff
.contains_key(&((**peer_id).clone(), chain_index)) })
.cloned();
let Some(peer_id) = peer_id else { break };
log::debug!(
target: "connections",
"OutSlots({}) ∋ {}",
&shared.log_chain_names[chain_index],
peer_id
);
guarded.network.assign_out_slot(chain_index, peer_id);
}
}
loop {
let start_connect = match guarded.network.next_start_connect(|| shared.platform.now()) {
Some(sc) => sc,
None => break,
};
let is_important = guarded
.important_nodes
.contains(&start_connect.expected_peer_id);
let task_name = format!(
"connection-{}-{}",
start_connect.expected_peer_id, start_connect.multiaddr
);
let task = tasks::connection_task(
start_connect,
shared.clone(),
guarded.messages_from_connections_tx.clone(),
is_important,
);
shared.platform.spawn_task(task_name.into(), Box::pin(task));
}
loop {
let (connection_id, message) = match guarded.network.pull_message_to_connection() {
Some(m) => m,
None => break,
};
guarded
.active_connections
.get_mut(&connection_id)
.unwrap()
.send(message)
.await
.unwrap();
}
}
impl<TPlat: PlatformRef> SharedGuarded<TPlat> {
fn unassign_slot_and_ban(&mut self, platform: &TPlat, chain_index: usize, peer_id: PeerId) {
self.network.unassign_slot(chain_index, &peer_id);
let new_expiration = platform.now() + Duration::from_secs(20); match self.slots_assign_backoff.entry((peer_id, chain_index)) {
hash_map::Entry::Occupied(e) if *e.get() < new_expiration => {
*e.into_mut() = new_expiration;
}
hash_map::Entry::Occupied(_) => {}
hash_map::Entry::Vacant(e) => {
e.insert(new_expiration);
}
}
}
}