Skip to main content

soil_network/sync/service/
syncing_service.rs

1// This file is part of Soil.
2
3// Copyright (C) Soil contributors.
4// Copyright (C) Parity Technologies (UK) Ltd.
5// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
6
7use crate::sync::types::{
8	ExtendedPeerInfo, SyncEvent, SyncEventStream, SyncStatus, SyncStatusProvider,
9};
10
11use futures::{channel::oneshot, Stream};
12use soil_client::import::{
13	BlockImportError, BlockImportStatus, JustificationImportResult, JustificationSyncLink, Link,
14	RuntimeOrigin,
15};
16use soil_network::types::PeerId;
17
18use soil_client::utils::mpsc::{tracing_unbounded, TracingUnboundedSender};
19use soil_network::{NetworkBlock, NetworkSyncForkRequest};
20use subsoil::runtime::traits::{Block as BlockT, NumberFor};
21
22use std::{
23	pin::Pin,
24	sync::{
25		atomic::{AtomicBool, AtomicUsize, Ordering},
26		Arc,
27	},
28};
29
30/// Commands send to `SyncingEngine`
31pub enum ToServiceCommand<B: BlockT> {
32	SetSyncForkRequest(Vec<PeerId>, B::Hash, NumberFor<B>),
33	RequestJustification(B::Hash, NumberFor<B>),
34	ClearJustificationRequests,
35	BlocksProcessed(
36		usize,
37		usize,
38		Vec<(Result<BlockImportStatus<NumberFor<B>>, BlockImportError>, B::Hash)>,
39	),
40	JustificationImported(PeerId, B::Hash, NumberFor<B>, JustificationImportResult),
41	AnnounceBlock(B::Hash, Option<Vec<u8>>),
42	NewBestBlockImported(B::Hash, NumberFor<B>),
43	EventStream(TracingUnboundedSender<SyncEvent>),
44	Status(oneshot::Sender<SyncStatus<B>>),
45	NumActivePeers(oneshot::Sender<usize>),
46	NumDownloadedBlocks(oneshot::Sender<usize>),
47	NumSyncRequests(oneshot::Sender<usize>),
48	PeersInfo(oneshot::Sender<Vec<(PeerId, ExtendedPeerInfo<B>)>>),
49	OnBlockFinalized(B::Hash, B::Header),
50	// Status {
51	// 	pending_response: oneshot::Sender<SyncStatus<B>>,
52	// },
53}
54
55/// Handle for communicating with `SyncingEngine` asynchronously
56#[derive(Clone)]
57pub struct SyncingService<B: BlockT> {
58	tx: TracingUnboundedSender<ToServiceCommand<B>>,
59	/// Number of peers we're connected to.
60	num_connected: Arc<AtomicUsize>,
61	/// Are we actively catching up with the chain?
62	is_major_syncing: Arc<AtomicBool>,
63}
64
65impl<B: BlockT> SyncingService<B> {
66	/// Create new handle
67	pub fn new(
68		tx: TracingUnboundedSender<ToServiceCommand<B>>,
69		num_connected: Arc<AtomicUsize>,
70		is_major_syncing: Arc<AtomicBool>,
71	) -> Self {
72		Self { tx, num_connected, is_major_syncing }
73	}
74
75	/// Get the number of peers known to `SyncingEngine` (both full and light).
76	pub fn num_connected_peers(&self) -> usize {
77		self.num_connected.load(Ordering::Relaxed)
78	}
79
80	/// Get the number of active peers.
81	pub async fn num_active_peers(&self) -> Result<usize, oneshot::Canceled> {
82		let (tx, rx) = oneshot::channel();
83		let _ = self.tx.unbounded_send(ToServiceCommand::NumActivePeers(tx));
84
85		rx.await
86	}
87
88	/// Get the number of downloaded blocks.
89	pub async fn num_downloaded_blocks(&self) -> Result<usize, oneshot::Canceled> {
90		let (tx, rx) = oneshot::channel();
91		let _ = self.tx.unbounded_send(ToServiceCommand::NumDownloadedBlocks(tx));
92
93		rx.await
94	}
95
96	/// Get the number of sync requests.
97	pub async fn num_sync_requests(&self) -> Result<usize, oneshot::Canceled> {
98		let (tx, rx) = oneshot::channel();
99		let _ = self.tx.unbounded_send(ToServiceCommand::NumSyncRequests(tx));
100
101		rx.await
102	}
103
104	/// Get peer information.
105	pub async fn peers_info(
106		&self,
107	) -> Result<Vec<(PeerId, ExtendedPeerInfo<B>)>, oneshot::Canceled> {
108		let (tx, rx) = oneshot::channel();
109		let _ = self.tx.unbounded_send(ToServiceCommand::PeersInfo(tx));
110
111		rx.await
112	}
113
114	/// Notify the `SyncingEngine` that a block has been finalized.
115	pub fn on_block_finalized(&self, hash: B::Hash, header: B::Header) {
116		let _ = self.tx.unbounded_send(ToServiceCommand::OnBlockFinalized(hash, header));
117	}
118
119	/// Get sync status
120	///
121	/// Returns an error if `SyncingEngine` has terminated.
122	pub async fn status(&self) -> Result<SyncStatus<B>, oneshot::Canceled> {
123		let (tx, rx) = oneshot::channel();
124		let _ = self.tx.unbounded_send(ToServiceCommand::Status(tx));
125
126		rx.await
127	}
128}
129
130impl<B: BlockT + 'static> NetworkSyncForkRequest<B::Hash, NumberFor<B>> for SyncingService<B> {
131	/// Configure an explicit fork sync request.
132	///
133	/// Note that this function should not be used for recent blocks.
134	/// Sync should be able to download all the recent forks normally.
135	/// `set_sync_fork_request` should only be used if external code detects that there's
136	/// a stale fork missing.
137	///
138	/// Passing empty `peers` set effectively removes the sync request.
139	fn set_sync_fork_request(&self, peers: Vec<PeerId>, hash: B::Hash, number: NumberFor<B>) {
140		let _ = self
141			.tx
142			.unbounded_send(ToServiceCommand::SetSyncForkRequest(peers, hash, number));
143	}
144}
145
146impl<B: BlockT> JustificationSyncLink<B> for SyncingService<B> {
147	/// Request a justification for the given block from the network.
148	///
149	/// On success, the justification will be passed to the import queue that was part at
150	/// initialization as part of the configuration.
151	fn request_justification(&self, hash: &B::Hash, number: NumberFor<B>) {
152		let _ = self.tx.unbounded_send(ToServiceCommand::RequestJustification(*hash, number));
153	}
154
155	fn clear_justification_requests(&self) {
156		let _ = self.tx.unbounded_send(ToServiceCommand::ClearJustificationRequests);
157	}
158}
159
160#[async_trait::async_trait]
161impl<B: BlockT> SyncStatusProvider<B> for SyncingService<B> {
162	/// Get high-level view of the syncing status.
163	async fn status(&self) -> Result<SyncStatus<B>, ()> {
164		let (rtx, rrx) = oneshot::channel();
165
166		let _ = self.tx.unbounded_send(ToServiceCommand::Status(rtx));
167		rrx.await.map_err(|_| ())
168	}
169}
170
171impl<B: BlockT> Link<B> for SyncingService<B> {
172	fn blocks_processed(
173		&self,
174		imported: usize,
175		count: usize,
176		results: Vec<(Result<BlockImportStatus<NumberFor<B>>, BlockImportError>, B::Hash)>,
177	) {
178		let _ = self
179			.tx
180			.unbounded_send(ToServiceCommand::BlocksProcessed(imported, count, results));
181	}
182
183	fn justification_imported(
184		&self,
185		who: RuntimeOrigin,
186		hash: &B::Hash,
187		number: NumberFor<B>,
188		import_result: JustificationImportResult,
189	) {
190		let Ok(who) = PeerId::try_from(&who) else { return };
191		let _ = self.tx.unbounded_send(ToServiceCommand::JustificationImported(
192			who,
193			*hash,
194			number,
195			import_result,
196		));
197	}
198
199	fn request_justification(&self, hash: &B::Hash, number: NumberFor<B>) {
200		let _ = self.tx.unbounded_send(ToServiceCommand::RequestJustification(*hash, number));
201	}
202}
203
204impl<B: BlockT> SyncEventStream for SyncingService<B> {
205	/// Get syncing event stream.
206	fn event_stream(&self, name: &'static str) -> Pin<Box<dyn Stream<Item = SyncEvent> + Send>> {
207		let (tx, rx) = tracing_unbounded(name, 100_000);
208		let _ = self.tx.unbounded_send(ToServiceCommand::EventStream(tx));
209		Box::pin(rx)
210	}
211}
212
213impl<B: BlockT> NetworkBlock<B::Hash, NumberFor<B>> for SyncingService<B> {
214	fn announce_block(&self, hash: B::Hash, data: Option<Vec<u8>>) {
215		let _ = self.tx.unbounded_send(ToServiceCommand::AnnounceBlock(hash, data));
216	}
217
218	fn new_best_block_imported(&self, hash: B::Hash, number: NumberFor<B>) {
219		let _ = self.tx.unbounded_send(ToServiceCommand::NewBestBlockImported(hash, number));
220	}
221}
222
223impl<B: BlockT> soil_client::consensus::SyncOracle for SyncingService<B> {
224	fn is_major_syncing(&self) -> bool {
225		self.is_major_syncing.load(Ordering::Relaxed)
226	}
227
228	fn is_offline(&self) -> bool {
229		self.num_connected.load(Ordering::Relaxed) == 0
230	}
231}