Skip to main content

soil_network/sync/service/
network.rs

1// This file is part of Soil.
2
3// Copyright (C) Soil contributors.
4// Copyright (C) Parity Technologies (UK) Ltd.
5// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
6
7use futures::{channel::oneshot, StreamExt};
8use soil_network::types::PeerId;
9
10use soil_client::utils::mpsc::{
11	tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender,
12};
13use soil_network::{
14	request_responses::{IfDisconnected, RequestFailure},
15	types::ProtocolName,
16	NetworkPeers, NetworkRequest, ReputationChange,
17};
18
19use std::sync::Arc;
20
21/// Network-related services required by `soil-network::sync`
22pub trait Network: NetworkPeers + NetworkRequest {}
23
24impl<T> Network for T where T: NetworkPeers + NetworkRequest {}
25
26/// Network service provider for `ChainSync`
27///
28/// It runs as an asynchronous task and listens to commands coming from `ChainSync` and
29/// calls the `NetworkService` on its behalf.
30pub struct NetworkServiceProvider {
31	rx: TracingUnboundedReceiver<ToServiceCommand>,
32	handle: NetworkServiceHandle,
33}
34
35/// Commands that `ChainSync` wishes to send to `NetworkService`
36#[derive(Debug)]
37pub enum ToServiceCommand {
38	/// Call `NetworkPeers::disconnect_peer()`
39	DisconnectPeer(PeerId, ProtocolName),
40
41	/// Call `NetworkPeers::report_peer()`
42	ReportPeer(PeerId, ReputationChange),
43
44	/// Call `NetworkRequest::start_request()`
45	StartRequest(
46		PeerId,
47		ProtocolName,
48		Vec<u8>,
49		oneshot::Sender<Result<(Vec<u8>, ProtocolName), RequestFailure>>,
50		IfDisconnected,
51	),
52}
53
54/// Handle that is (temporarily) passed to `ChainSync` so it can
55/// communicate with `NetworkService` through `SyncingEngine`
56#[derive(Debug, Clone)]
57pub struct NetworkServiceHandle {
58	tx: TracingUnboundedSender<ToServiceCommand>,
59}
60
61impl NetworkServiceHandle {
62	/// Create new service handle
63	pub fn new(tx: TracingUnboundedSender<ToServiceCommand>) -> NetworkServiceHandle {
64		Self { tx }
65	}
66
67	/// Report peer
68	pub fn report_peer(&self, who: PeerId, cost_benefit: ReputationChange) {
69		let _ = self.tx.unbounded_send(ToServiceCommand::ReportPeer(who, cost_benefit));
70	}
71
72	/// Disconnect peer
73	pub fn disconnect_peer(&self, who: PeerId, protocol: ProtocolName) {
74		let _ = self.tx.unbounded_send(ToServiceCommand::DisconnectPeer(who, protocol));
75	}
76
77	/// Send request to peer
78	pub fn start_request(
79		&self,
80		who: PeerId,
81		protocol: ProtocolName,
82		request: Vec<u8>,
83		tx: oneshot::Sender<Result<(Vec<u8>, ProtocolName), RequestFailure>>,
84		connect: IfDisconnected,
85	) {
86		let _ = self
87			.tx
88			.unbounded_send(ToServiceCommand::StartRequest(who, protocol, request, tx, connect));
89	}
90}
91
92impl NetworkServiceProvider {
93	/// Create new `NetworkServiceProvider`
94	pub fn new() -> Self {
95		let (tx, rx) = tracing_unbounded("mpsc_network_service_provider", 100_000);
96
97		Self { rx, handle: NetworkServiceHandle::new(tx) }
98	}
99
100	/// Get handle to talk to the provider
101	pub fn handle(&self) -> NetworkServiceHandle {
102		self.handle.clone()
103	}
104
105	/// Run the `NetworkServiceProvider`
106	pub async fn run(self, service: Arc<dyn Network + Send + Sync>) {
107		let Self { mut rx, handle } = self;
108		drop(handle);
109
110		while let Some(inner) = rx.next().await {
111			match inner {
112				ToServiceCommand::DisconnectPeer(peer, protocol_name) => {
113					service.disconnect_peer(peer, protocol_name)
114				},
115				ToServiceCommand::ReportPeer(peer, reputation_change) => {
116					service.report_peer(peer, reputation_change)
117				},
118				ToServiceCommand::StartRequest(peer, protocol, request, tx, connect) => {
119					service.start_request(peer, protocol, request, None, tx, connect)
120				},
121			}
122		}
123	}
124}
125
126#[cfg(test)]
127mod tests {
128	use super::*;
129	use crate::sync::service::mock::MockNetwork;
130
131	// typical pattern in `Protocol` code where peer is disconnected
132	// and then reported
133	#[tokio::test]
134	async fn disconnect_and_report_peer() {
135		let provider = NetworkServiceProvider::new();
136		let handle = provider.handle();
137
138		let peer = PeerId::random();
139		let proto = ProtocolName::from("test-protocol");
140		let proto_clone = proto.clone();
141		let change = soil_network::ReputationChange::new_fatal("test-change");
142
143		let mut mock_network = MockNetwork::new();
144		mock_network
145			.expect_disconnect_peer()
146			.withf(move |in_peer, in_proto| &peer == in_peer && &proto == in_proto)
147			.once()
148			.returning(|_, _| ());
149		mock_network
150			.expect_report_peer()
151			.withf(move |in_peer, in_change| &peer == in_peer && &change == in_change)
152			.once()
153			.returning(|_, _| ());
154
155		tokio::spawn(async move {
156			provider.run(Arc::new(mock_network)).await;
157		});
158
159		handle.disconnect_peer(peer, proto_clone);
160		handle.report_peer(peer, change);
161	}
162}