soil_network/sync/service/
syncing_service.rs1use crate::sync::types::{
8 ExtendedPeerInfo, SyncEvent, SyncEventStream, SyncStatus, SyncStatusProvider,
9};
10
11use futures::{channel::oneshot, Stream};
12use soil_client::import::{
13 BlockImportError, BlockImportStatus, JustificationImportResult, JustificationSyncLink, Link,
14 RuntimeOrigin,
15};
16use soil_network::types::PeerId;
17
18use soil_client::utils::mpsc::{tracing_unbounded, TracingUnboundedSender};
19use soil_network::{NetworkBlock, NetworkSyncForkRequest};
20use subsoil::runtime::traits::{Block as BlockT, NumberFor};
21
22use std::{
23 pin::Pin,
24 sync::{
25 atomic::{AtomicBool, AtomicUsize, Ordering},
26 Arc,
27 },
28};
29
30pub enum ToServiceCommand<B: BlockT> {
32 SetSyncForkRequest(Vec<PeerId>, B::Hash, NumberFor<B>),
33 RequestJustification(B::Hash, NumberFor<B>),
34 ClearJustificationRequests,
35 BlocksProcessed(
36 usize,
37 usize,
38 Vec<(Result<BlockImportStatus<NumberFor<B>>, BlockImportError>, B::Hash)>,
39 ),
40 JustificationImported(PeerId, B::Hash, NumberFor<B>, JustificationImportResult),
41 AnnounceBlock(B::Hash, Option<Vec<u8>>),
42 NewBestBlockImported(B::Hash, NumberFor<B>),
43 EventStream(TracingUnboundedSender<SyncEvent>),
44 Status(oneshot::Sender<SyncStatus<B>>),
45 NumActivePeers(oneshot::Sender<usize>),
46 NumDownloadedBlocks(oneshot::Sender<usize>),
47 NumSyncRequests(oneshot::Sender<usize>),
48 PeersInfo(oneshot::Sender<Vec<(PeerId, ExtendedPeerInfo<B>)>>),
49 OnBlockFinalized(B::Hash, B::Header),
50 }
54
55#[derive(Clone)]
57pub struct SyncingService<B: BlockT> {
58 tx: TracingUnboundedSender<ToServiceCommand<B>>,
59 num_connected: Arc<AtomicUsize>,
61 is_major_syncing: Arc<AtomicBool>,
63}
64
65impl<B: BlockT> SyncingService<B> {
66 pub fn new(
68 tx: TracingUnboundedSender<ToServiceCommand<B>>,
69 num_connected: Arc<AtomicUsize>,
70 is_major_syncing: Arc<AtomicBool>,
71 ) -> Self {
72 Self { tx, num_connected, is_major_syncing }
73 }
74
75 pub fn num_connected_peers(&self) -> usize {
77 self.num_connected.load(Ordering::Relaxed)
78 }
79
80 pub async fn num_active_peers(&self) -> Result<usize, oneshot::Canceled> {
82 let (tx, rx) = oneshot::channel();
83 let _ = self.tx.unbounded_send(ToServiceCommand::NumActivePeers(tx));
84
85 rx.await
86 }
87
88 pub async fn num_downloaded_blocks(&self) -> Result<usize, oneshot::Canceled> {
90 let (tx, rx) = oneshot::channel();
91 let _ = self.tx.unbounded_send(ToServiceCommand::NumDownloadedBlocks(tx));
92
93 rx.await
94 }
95
96 pub async fn num_sync_requests(&self) -> Result<usize, oneshot::Canceled> {
98 let (tx, rx) = oneshot::channel();
99 let _ = self.tx.unbounded_send(ToServiceCommand::NumSyncRequests(tx));
100
101 rx.await
102 }
103
104 pub async fn peers_info(
106 &self,
107 ) -> Result<Vec<(PeerId, ExtendedPeerInfo<B>)>, oneshot::Canceled> {
108 let (tx, rx) = oneshot::channel();
109 let _ = self.tx.unbounded_send(ToServiceCommand::PeersInfo(tx));
110
111 rx.await
112 }
113
114 pub fn on_block_finalized(&self, hash: B::Hash, header: B::Header) {
116 let _ = self.tx.unbounded_send(ToServiceCommand::OnBlockFinalized(hash, header));
117 }
118
119 pub async fn status(&self) -> Result<SyncStatus<B>, oneshot::Canceled> {
123 let (tx, rx) = oneshot::channel();
124 let _ = self.tx.unbounded_send(ToServiceCommand::Status(tx));
125
126 rx.await
127 }
128}
129
130impl<B: BlockT + 'static> NetworkSyncForkRequest<B::Hash, NumberFor<B>> for SyncingService<B> {
131 fn set_sync_fork_request(&self, peers: Vec<PeerId>, hash: B::Hash, number: NumberFor<B>) {
140 let _ = self
141 .tx
142 .unbounded_send(ToServiceCommand::SetSyncForkRequest(peers, hash, number));
143 }
144}
145
146impl<B: BlockT> JustificationSyncLink<B> for SyncingService<B> {
147 fn request_justification(&self, hash: &B::Hash, number: NumberFor<B>) {
152 let _ = self.tx.unbounded_send(ToServiceCommand::RequestJustification(*hash, number));
153 }
154
155 fn clear_justification_requests(&self) {
156 let _ = self.tx.unbounded_send(ToServiceCommand::ClearJustificationRequests);
157 }
158}
159
160#[async_trait::async_trait]
161impl<B: BlockT> SyncStatusProvider<B> for SyncingService<B> {
162 async fn status(&self) -> Result<SyncStatus<B>, ()> {
164 let (rtx, rrx) = oneshot::channel();
165
166 let _ = self.tx.unbounded_send(ToServiceCommand::Status(rtx));
167 rrx.await.map_err(|_| ())
168 }
169}
170
171impl<B: BlockT> Link<B> for SyncingService<B> {
172 fn blocks_processed(
173 &self,
174 imported: usize,
175 count: usize,
176 results: Vec<(Result<BlockImportStatus<NumberFor<B>>, BlockImportError>, B::Hash)>,
177 ) {
178 let _ = self
179 .tx
180 .unbounded_send(ToServiceCommand::BlocksProcessed(imported, count, results));
181 }
182
183 fn justification_imported(
184 &self,
185 who: RuntimeOrigin,
186 hash: &B::Hash,
187 number: NumberFor<B>,
188 import_result: JustificationImportResult,
189 ) {
190 let Ok(who) = PeerId::try_from(&who) else { return };
191 let _ = self.tx.unbounded_send(ToServiceCommand::JustificationImported(
192 who,
193 *hash,
194 number,
195 import_result,
196 ));
197 }
198
199 fn request_justification(&self, hash: &B::Hash, number: NumberFor<B>) {
200 let _ = self.tx.unbounded_send(ToServiceCommand::RequestJustification(*hash, number));
201 }
202}
203
204impl<B: BlockT> SyncEventStream for SyncingService<B> {
205 fn event_stream(&self, name: &'static str) -> Pin<Box<dyn Stream<Item = SyncEvent> + Send>> {
207 let (tx, rx) = tracing_unbounded(name, 100_000);
208 let _ = self.tx.unbounded_send(ToServiceCommand::EventStream(tx));
209 Box::pin(rx)
210 }
211}
212
213impl<B: BlockT> NetworkBlock<B::Hash, NumberFor<B>> for SyncingService<B> {
214 fn announce_block(&self, hash: B::Hash, data: Option<Vec<u8>>) {
215 let _ = self.tx.unbounded_send(ToServiceCommand::AnnounceBlock(hash, data));
216 }
217
218 fn new_best_block_imported(&self, hash: B::Hash, number: NumberFor<B>) {
219 let _ = self.tx.unbounded_send(ToServiceCommand::NewBestBlockImported(hash, number));
220 }
221}
222
223impl<B: BlockT> soil_client::consensus::SyncOracle for SyncingService<B> {
224 fn is_major_syncing(&self) -> bool {
225 self.is_major_syncing.load(Ordering::Relaxed)
226 }
227
228 fn is_offline(&self) -> bool {
229 self.num_connected.load(Ordering::Relaxed) == 0
230 }
231}