tycho_core/block_strider/provider/
blockchain_provider.rs1use std::future::Future;
2use std::pin::{Pin, pin};
3use std::sync::atomic::{AtomicBool, AtomicU32, Ordering};
4use std::task::{Context, Poll};
5use std::time::Duration;
6
7use anyhow::Result;
8use futures_util::FutureExt;
9use futures_util::future::{BoxFuture, Either};
10use serde::{Deserialize, Serialize};
11use tycho_block_util::archive::WithArchiveData;
12use tycho_block_util::block::{BlockIdRelation, BlockProofStuff, BlockStuff};
13use tycho_block_util::queue::QueueDiffStuff;
14use tycho_types::models::*;
15use tycho_util::serde_helpers;
16use tycho_util::sync::rayon_run;
17
18use crate::block_strider::BlockProvider;
19use crate::block_strider::provider::{
20 BoxBlockProvider, CheckProof, OptionalBlockStuff, ProofChecker,
21};
22use crate::blockchain_rpc::{BlockDataFull, BlockchainRpcClient, DataRequirement};
23use crate::overlay_client::{Neighbour, PunishReason};
24use crate::storage::CoreStorage;
25
26#[derive(Debug, Clone, Serialize, Deserialize)]
29#[serde(default)]
30#[non_exhaustive]
31pub struct BlockchainBlockProviderConfig {
32 #[serde(with = "serde_helpers::humantime")]
36 pub get_next_block_polling_interval: Duration,
37
38 #[serde(with = "serde_helpers::humantime")]
42 pub get_block_polling_interval: Duration,
43
44 #[serde(with = "serde_helpers::humantime")]
49 pub get_next_block_timeout: Duration,
50
51 #[serde(with = "serde_helpers::humantime")]
56 pub get_block_timeout: Duration,
57
58 #[serde(with = "serde_helpers::humantime")]
62 pub fallback_interval: Duration,
63}
64
65impl Default for BlockchainBlockProviderConfig {
66 fn default() -> Self {
67 Self {
68 get_next_block_polling_interval: Duration::from_secs(1),
69 get_block_polling_interval: Duration::from_secs(1),
70 get_next_block_timeout: Duration::from_secs(120),
71 get_block_timeout: Duration::from_secs(60),
72 fallback_interval: Duration::from_secs(1),
73 }
74 }
75}
76
77pub struct BlockchainBlockProvider {
78 client: BlockchainRpcClient,
79 config: BlockchainBlockProviderConfig,
80 proof_checker: ProofChecker,
81 fallback: Option<BoxBlockProvider>,
82 use_fallback: AtomicBool,
83 cleanup_fallback_at: AtomicU32,
84}
85
86impl BlockchainBlockProvider {
87 pub fn new(
88 client: BlockchainRpcClient,
89 storage: CoreStorage,
90 config: BlockchainBlockProviderConfig,
91 ) -> Self {
92 let proof_checker = ProofChecker::new(storage);
93
94 Self {
95 client,
96 config,
97 proof_checker,
98 fallback: None,
99 use_fallback: AtomicBool::new(false),
100 cleanup_fallback_at: AtomicU32::new(u32::MAX),
101 }
102 }
103
104 pub fn with_fallback<P: BlockProvider>(mut self, fallback: P) -> Self {
105 self.fallback = Some(BoxBlockProvider::new(fallback));
107 self
108 }
109
110 async fn get_next_block_impl(&self, prev_block_id: &BlockId) -> OptionalBlockStuff {
111 fn is_next_for(block_id: &BlockId, prev_block_id: &BlockId) -> bool {
112 block_id.shard == prev_block_id.shard && block_id.seqno == prev_block_id.seqno + 1
113 }
114
115 let primary = || {
116 loop_with_timeout(
117 self.config.get_next_block_polling_interval,
118 self.config.get_next_block_timeout,
119 self.fallback.is_some(),
120 || {
121 tracing::debug!(%prev_block_id, "get_next_block_full requested");
122 self.client
123 .get_next_block_full(prev_block_id, DataRequirement::Optional)
124 },
125 |res| async move {
126 match res {
127 Ok(res) => match res.data {
128 Some(data) if !is_next_for(&data.block_id, prev_block_id) => {
129 res.neighbour.punish(PunishReason::Malicious);
130 tracing::warn!("got response for an unknown block id");
131 }
132 Some(data) => {
133 let mc_block_id = data.block_id;
134 let parsed = self
135 .process_received_block(&mc_block_id, data, res.neighbour)
136 .await;
137 if parsed.is_some() {
138 return parsed;
139 }
140 }
141 None => tracing::warn!(?prev_block_id, "block not found"),
142 },
143 Err(e) => tracing::error!("failed to get next block: {e:?}"),
144 }
145 None
146 },
147 )
148 };
149
150 loop {
151 if !self.use_fallback.load(Ordering::Relaxed)
153 && let res @ Some(_) = primary().await
154 {
155 return res;
156 }
157
158 if let Some(fallback) = &self.fallback {
160 tracing::debug!(%prev_block_id, "get_next_block_full fallback");
161 self.use_fallback.store(true, Ordering::Relaxed);
162 if let res @ Some(_) = fallback.get_next_block(prev_block_id).await {
163 return res;
164 }
165
166 tokio::time::sleep(self.config.fallback_interval).await;
168 }
169
170 self.use_fallback.store(false, Ordering::Relaxed);
172
173 self.cleanup_fallback_at
175 .store(prev_block_id.seqno.saturating_add(1), Ordering::Release);
176 }
177 }
178
179 async fn get_block_impl(&self, block_id_relation: &BlockIdRelation) -> OptionalBlockStuff {
180 let BlockIdRelation {
181 mc_block_id,
182 block_id,
183 } = block_id_relation;
184
185 let primary = || {
186 loop_with_timeout(
187 self.config.get_block_polling_interval,
188 self.config.get_block_timeout,
189 self.fallback.is_some(),
190 || {
191 tracing::debug!(%block_id, "get_block_full requested");
192
193 self.client
196 .get_block_full(block_id, DataRequirement::Optional)
197 },
198 |res| async move {
199 match res {
200 Ok(res) => match res.data {
201 Some(data) => {
202 let parsed = self
203 .process_received_block(mc_block_id, data, res.neighbour)
204 .await;
205 if parsed.is_some() {
206 return parsed;
207 }
208 }
209 None => tracing::warn!(%block_id, "block not found"),
210 },
211 Err(e) => tracing::error!("failed to get block: {e:?}"),
212 }
213 None
214 },
215 )
216 };
217
218 loop {
219 if !self.use_fallback.load(Ordering::Relaxed)
221 && let res @ Some(_) = primary().await
222 {
223 return res;
224 }
225
226 if let Some(fallback) = &self.fallback {
228 tracing::debug!(%block_id, "get_block_full fallback");
229 self.use_fallback.store(true, Ordering::Relaxed);
230 if let res @ Some(_) = fallback.get_block(block_id_relation).await {
231 return res;
232 }
233
234 tokio::time::sleep(self.config.fallback_interval).await;
236 }
237
238 self.use_fallback.store(false, Ordering::Relaxed);
240
241 }
243 }
244
245 #[tracing::instrument(
246 skip(self,mc_block_id, block_full, neighbour),
247 fields(mc_block_id = %mc_block_id.as_short_id())
248 )]
249 async fn process_received_block(
250 &self,
251 mc_block_id: &BlockId,
252 block_full: BlockDataFull,
253 neighbour: Neighbour,
254 ) -> OptionalBlockStuff {
255 let block_stuff_fut = pin!(rayon_run({
256 let block_id = block_full.block_id;
257 let block_data = block_full.block_data.clone();
258 move || BlockStuff::deserialize_checked(&block_id, &block_data)
259 }));
260
261 let other_data_fut = pin!(rayon_run({
262 let block_id = block_full.block_id;
263 let proof_data = block_full.proof_data.clone();
264 let queue_diff_data = block_full.queue_diff_data.clone();
265 move || {
266 (
267 BlockProofStuff::deserialize(&block_id, &proof_data),
268 QueueDiffStuff::deserialize(&block_id, &queue_diff_data),
269 )
270 }
271 }));
272
273 let (block_stuff, (block_proof, queue_diff)) =
274 futures_util::future::join(block_stuff_fut, other_data_fut).await;
275
276 match (block_stuff, block_proof, queue_diff) {
277 (Ok(block), Ok(proof), Ok(diff)) => {
278 let proof = WithArchiveData::new(proof, block_full.proof_data);
279 let diff = WithArchiveData::new(diff, block_full.queue_diff_data);
280 if let Err(e) = self
281 .proof_checker
282 .check_proof(CheckProof {
283 mc_block_id,
284 block: &block,
285 proof: &proof,
286 queue_diff: &diff,
287 store_on_success: true,
288 })
289 .await
290 {
291 neighbour.punish(PunishReason::Malicious);
292 tracing::error!("got invalid block proof: {e:?}");
293 return None;
294 }
295
296 Some(Ok(block.with_archive_data(block_full.block_data)))
297 }
298 (Err(e), _, _) | (_, Err(e), _) | (_, _, Err(e)) => {
299 neighbour.punish(PunishReason::Malicious);
300 tracing::error!("failed to deserialize shard block or block proof: {e:?}");
301 None
302 }
303 }
304 }
305}
306
307impl BlockProvider for BlockchainBlockProvider {
308 type GetNextBlockFut<'a> = BoxFuture<'a, OptionalBlockStuff>;
309 type GetBlockFut<'a> = BoxFuture<'a, OptionalBlockStuff>;
310 type CleanupFut<'a> = BlockchainBlockProviderCleanupFut<'a>;
311
312 fn get_next_block<'a>(&'a self, prev_block_id: &'a BlockId) -> Self::GetNextBlockFut<'a> {
313 Box::pin(self.get_next_block_impl(prev_block_id))
314 }
315
316 fn get_block<'a>(&'a self, block_id_relation: &'a BlockIdRelation) -> Self::GetBlockFut<'a> {
317 Box::pin(self.get_block_impl(block_id_relation))
318 }
319
320 fn cleanup_until(&self, mc_seqno: u32) -> Self::CleanupFut<'_> {
321 match &self.fallback {
322 Some(fallback) if self.cleanup_fallback_at.load(Ordering::Acquire) <= mc_seqno => {
323 BlockchainBlockProviderCleanupFut::Fallback {
324 fut: fallback.cleanup_until(mc_seqno),
325 cleanup_fallback_at: &self.cleanup_fallback_at,
326 mc_seqno,
327 }
328 }
329 _ => BlockchainBlockProviderCleanupFut::Noop,
330 }
331 }
332}
333
334pub enum BlockchainBlockProviderCleanupFut<'a> {
335 Noop,
336 Fallback {
337 fut: BoxFuture<'a, Result<()>>,
338 cleanup_fallback_at: &'a AtomicU32,
339 mc_seqno: u32,
340 },
341}
342
343impl Future for BlockchainBlockProviderCleanupFut<'_> {
344 type Output = Result<()>;
345
346 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
347 match self.get_mut() {
348 Self::Noop => Poll::Ready(Ok(())),
349 Self::Fallback {
350 fut,
351 cleanup_fallback_at,
352 mc_seqno,
353 } => {
354 let res = fut.poll_unpin(cx);
355
356 if matches!(&res, Poll::Ready(r) if r.is_ok()) {
358 cleanup_fallback_at
359 .compare_exchange(*mc_seqno, u32::MAX, Ordering::Release, Ordering::Relaxed)
360 .ok();
361 }
362
363 res
364 }
365 }
366 }
367}
368
369async fn loop_with_timeout<E, EFut, P, PFut, R, T>(
370 interval: Duration,
371 timeout: Duration,
372 use_timeout: bool,
373 request: E,
374 process: P,
375) -> Option<T>
376where
377 E: Fn() -> EFut,
378 EFut: Future<Output = R>,
379 P: Fn(R) -> PFut,
380 PFut: Future<Output = Option<T>>,
381{
382 let mut interval = tokio::time::interval(interval);
384
385 let mut timeout = pin!(if use_timeout {
386 Either::Left(tokio::time::sleep(timeout))
387 } else {
388 Either::Right(futures_util::future::pending::<()>())
389 });
390
391 loop {
392 tokio::select! {
393 res = request() => {
394 if let res @ Some(_) = process(res).await {
395 return res;
396 }
397 },
398 _ = &mut timeout => return None,
399 }
400
401 tokio::select! {
402 _ = interval.tick() => {},
403 _ = &mut timeout => return None,
404 }
405 }
406}