sc-network-sync 0.55.0

Substrate sync network protocol
Documentation
// This file is part of Substrate.

// Copyright (C) Parity Technologies (UK) Ltd.
// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0

// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.

// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.

// You should have received a copy of the GNU General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.

use futures::{channel::oneshot, StreamExt};
use sc_network_types::PeerId;

use sc_network::{
	request_responses::{IfDisconnected, RequestFailure},
	types::ProtocolName,
	NetworkPeers, NetworkRequest, ReputationChange,
};
use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender};

use std::sync::Arc;

/// Network-related services required by `sc-network-sync`
pub trait Network: NetworkPeers + NetworkRequest {}

impl<T> Network for T where T: NetworkPeers + NetworkRequest {}

/// Network service provider for `ChainSync`
///
/// It runs as an asynchronous task and listens to commands coming from `ChainSync` and
/// calls the `NetworkService` on its behalf.
pub struct NetworkServiceProvider {
	rx: TracingUnboundedReceiver<ToServiceCommand>,
	handle: NetworkServiceHandle,
}

/// Commands that `ChainSync` wishes to send to `NetworkService`
#[derive(Debug)]
pub enum ToServiceCommand {
	/// Call `NetworkPeers::disconnect_peer()`
	DisconnectPeer(PeerId, ProtocolName),

	/// Call `NetworkPeers::report_peer()`
	ReportPeer(PeerId, ReputationChange),

	/// Call `NetworkRequest::start_request()`
	StartRequest(
		PeerId,
		ProtocolName,
		Vec<u8>,
		oneshot::Sender<Result<(Vec<u8>, ProtocolName), RequestFailure>>,
		IfDisconnected,
	),
}

/// Handle that is (temporarily) passed to `ChainSync` so it can
/// communicate with `NetworkService` through `SyncingEngine`
#[derive(Debug, Clone)]
pub struct NetworkServiceHandle {
	tx: TracingUnboundedSender<ToServiceCommand>,
}

impl NetworkServiceHandle {
	/// Create new service handle
	pub fn new(tx: TracingUnboundedSender<ToServiceCommand>) -> NetworkServiceHandle {
		Self { tx }
	}

	/// Report peer
	pub fn report_peer(&self, who: PeerId, cost_benefit: ReputationChange) {
		let _ = self.tx.unbounded_send(ToServiceCommand::ReportPeer(who, cost_benefit));
	}

	/// Disconnect peer
	pub fn disconnect_peer(&self, who: PeerId, protocol: ProtocolName) {
		let _ = self.tx.unbounded_send(ToServiceCommand::DisconnectPeer(who, protocol));
	}

	/// Send request to peer
	pub fn start_request(
		&self,
		who: PeerId,
		protocol: ProtocolName,
		request: Vec<u8>,
		tx: oneshot::Sender<Result<(Vec<u8>, ProtocolName), RequestFailure>>,
		connect: IfDisconnected,
	) {
		let _ = self
			.tx
			.unbounded_send(ToServiceCommand::StartRequest(who, protocol, request, tx, connect));
	}
}

impl NetworkServiceProvider {
	/// Create new `NetworkServiceProvider`
	pub fn new() -> Self {
		let (tx, rx) = tracing_unbounded("mpsc_network_service_provider", 100_000);

		Self { rx, handle: NetworkServiceHandle::new(tx) }
	}

	/// Get handle to talk to the provider
	pub fn handle(&self) -> NetworkServiceHandle {
		self.handle.clone()
	}

	/// Run the `NetworkServiceProvider`
	pub async fn run(self, service: Arc<dyn Network + Send + Sync>) {
		let Self { mut rx, handle } = self;
		drop(handle);

		while let Some(inner) = rx.next().await {
			match inner {
				ToServiceCommand::DisconnectPeer(peer, protocol_name) => {
					service.disconnect_peer(peer, protocol_name)
				},
				ToServiceCommand::ReportPeer(peer, reputation_change) => {
					service.report_peer(peer, reputation_change)
				},
				ToServiceCommand::StartRequest(peer, protocol, request, tx, connect) => {
					service.start_request(peer, protocol, request, None, tx, connect)
				},
			}
		}
	}
}

#[cfg(test)]
mod tests {
	use super::*;
	use crate::service::mock::MockNetwork;

	// typical pattern in `Protocol` code where peer is disconnected
	// and then reported
	#[tokio::test]
	async fn disconnect_and_report_peer() {
		let provider = NetworkServiceProvider::new();
		let handle = provider.handle();

		let peer = PeerId::random();
		let proto = ProtocolName::from("test-protocol");
		let proto_clone = proto.clone();
		let change = sc_network::ReputationChange::new_fatal("test-change");

		let mut mock_network = MockNetwork::new();
		mock_network
			.expect_disconnect_peer()
			.withf(move |in_peer, in_proto| &peer == in_peer && &proto == in_proto)
			.once()
			.returning(|_, _| ());
		mock_network
			.expect_report_peer()
			.withf(move |in_peer, in_change| &peer == in_peer && &change == in_change)
			.once()
			.returning(|_, _| ());

		tokio::spawn(async move {
			provider.run(Arc::new(mock_network)).await;
		});

		handle.disconnect_peer(peer, proto_clone);
		handle.report_peer(peer, change);
	}
}