tycho_core/block_strider/provider/
blockchain_provider.rs1use std::future::Future;
2use std::pin::{Pin, pin};
3use std::sync::atomic::{AtomicBool, AtomicU32, Ordering};
4use std::task::{Context, Poll};
5use std::time::Duration;
6
7use anyhow::Result;
8use futures_util::FutureExt;
9use futures_util::future::{BoxFuture, Either};
10use serde::{Deserialize, Serialize};
11use tycho_block_util::archive::WithArchiveData;
12use tycho_block_util::block::{BlockIdRelation, BlockProofStuff, BlockStuff};
13use tycho_block_util::queue::QueueDiffStuff;
14use tycho_types::models::*;
15use tycho_util::serde_helpers;
16use tycho_util::sync::rayon_run;
17
18use crate::block_strider::BlockProvider;
19use crate::block_strider::provider::{
20 BoxBlockProvider, CheckProof, OptionalBlockStuff, ProofChecker,
21};
22use crate::blockchain_rpc::{BlockDataFull, BlockchainRpcClient, DataRequirement};
23use crate::overlay_client::{Neighbour, PunishReason};
24use crate::storage::CoreStorage;
25
26#[derive(Debug, Clone, Serialize, Deserialize)]
29#[serde(default)]
30#[non_exhaustive]
31pub struct BlockchainBlockProviderConfig {
32 #[serde(with = "serde_helpers::humantime")]
36 pub get_next_block_polling_interval: Duration,
37
38 #[serde(with = "serde_helpers::humantime")]
42 pub get_block_polling_interval: Duration,
43
44 #[serde(with = "serde_helpers::humantime")]
49 pub get_next_block_timeout: Duration,
50
51 #[serde(with = "serde_helpers::humantime")]
56 pub get_block_timeout: Duration,
57}
58
59impl Default for BlockchainBlockProviderConfig {
60 fn default() -> Self {
61 Self {
62 get_next_block_polling_interval: Duration::from_secs(1),
63 get_block_polling_interval: Duration::from_secs(1),
64 get_next_block_timeout: Duration::from_secs(120),
65 get_block_timeout: Duration::from_secs(60),
66 }
67 }
68}
69
70pub struct BlockchainBlockProvider {
71 client: BlockchainRpcClient,
72 config: BlockchainBlockProviderConfig,
73 proof_checker: ProofChecker,
74 fallback: Option<BoxBlockProvider>,
75 use_fallback: AtomicBool,
76 cleanup_fallback_at: AtomicU32,
77}
78
79impl BlockchainBlockProvider {
80 pub fn new(
81 client: BlockchainRpcClient,
82 storage: CoreStorage,
83 config: BlockchainBlockProviderConfig,
84 ) -> Self {
85 let proof_checker = ProofChecker::new(storage);
86
87 Self {
88 client,
89 config,
90 proof_checker,
91 fallback: None,
92 use_fallback: AtomicBool::new(false),
93 cleanup_fallback_at: AtomicU32::new(u32::MAX),
94 }
95 }
96
97 pub fn with_fallback<P: BlockProvider>(mut self, fallback: P) -> Self {
98 self.fallback = Some(BoxBlockProvider::new(fallback));
100 self
101 }
102
103 async fn get_next_block_impl(&self, prev_block_id: &BlockId) -> OptionalBlockStuff {
104 fn is_next_for(block_id: &BlockId, prev_block_id: &BlockId) -> bool {
105 block_id.shard == prev_block_id.shard && block_id.seqno == prev_block_id.seqno + 1
106 }
107
108 let primary = || {
109 loop_with_timeout(
110 self.config.get_next_block_polling_interval,
111 self.config.get_next_block_timeout,
112 self.fallback.is_some(),
113 || {
114 tracing::debug!(%prev_block_id, "get_next_block_full requested");
115 self.client
116 .get_next_block_full(prev_block_id, DataRequirement::Optional)
117 },
118 |res| async move {
119 match res {
120 Ok(res) => match res.data {
121 Some(data) if !is_next_for(&data.block_id, prev_block_id) => {
122 res.neighbour.punish(PunishReason::Malicious);
123 tracing::warn!("got response for an unknown block id");
124 }
125 Some(data) => {
126 let mc_block_id = data.block_id;
127 let parsed = self
128 .process_received_block(&mc_block_id, data, res.neighbour)
129 .await;
130 if parsed.is_some() {
131 return parsed;
132 }
133 }
134 None => tracing::warn!(?prev_block_id, "block not found"),
135 },
136 Err(e) => tracing::error!("failed to get next block: {e}"),
137 }
138 None
139 },
140 )
141 };
142
143 loop {
144 if !self.use_fallback.load(Ordering::Relaxed)
146 && let res @ Some(_) = primary().await
147 {
148 return res;
149 }
150
151 if let Some(fallback) = &self.fallback {
153 tracing::debug!(%prev_block_id, "get_next_block_full fallback");
154 self.use_fallback.store(true, Ordering::Relaxed);
155 if let res @ Some(_) = fallback.get_next_block(prev_block_id).await {
156 return res;
157 }
158 }
159
160 self.use_fallback.store(false, Ordering::Relaxed);
162
163 self.cleanup_fallback_at
165 .store(prev_block_id.seqno.saturating_add(1), Ordering::Release);
166 }
167 }
168
169 async fn get_block_impl(&self, block_id_relation: &BlockIdRelation) -> OptionalBlockStuff {
170 let BlockIdRelation {
171 mc_block_id,
172 block_id,
173 } = block_id_relation;
174
175 let primary = || {
176 loop_with_timeout(
177 self.config.get_block_polling_interval,
178 self.config.get_block_timeout,
179 self.fallback.is_some(),
180 || {
181 tracing::debug!(%block_id, "get_block_full requested");
182
183 self.client
186 .get_block_full(block_id, DataRequirement::Optional)
187 },
188 |res| async move {
189 match res {
190 Ok(res) => match res.data {
191 Some(data) => {
192 let parsed = self
193 .process_received_block(mc_block_id, data, res.neighbour)
194 .await;
195 if parsed.is_some() {
196 return parsed;
197 }
198 }
199 None => tracing::warn!(%block_id, "block not found"),
200 },
201 Err(e) => tracing::error!("failed to get block: {e}"),
202 }
203 None
204 },
205 )
206 };
207
208 loop {
209 if !self.use_fallback.load(Ordering::Relaxed)
211 && let res @ Some(_) = primary().await
212 {
213 return res;
214 }
215
216 if let Some(fallback) = &self.fallback {
218 tracing::debug!(%block_id, "get_block_full fallback");
219 self.use_fallback.store(true, Ordering::Relaxed);
220 if let res @ Some(_) = fallback.get_block(block_id_relation).await {
221 return res;
222 }
223 }
224
225 self.use_fallback.store(false, Ordering::Relaxed);
227
228 }
230 }
231
232 #[tracing::instrument(
233 skip(self,mc_block_id, block_full, neighbour),
234 fields(mc_block_id = %mc_block_id.as_short_id())
235 )]
236 async fn process_received_block(
237 &self,
238 mc_block_id: &BlockId,
239 block_full: BlockDataFull,
240 neighbour: Neighbour,
241 ) -> OptionalBlockStuff {
242 let block_stuff_fut = pin!(rayon_run({
243 let block_id = block_full.block_id;
244 let block_data = block_full.block_data.clone();
245 move || BlockStuff::deserialize_checked(&block_id, &block_data)
246 }));
247
248 let other_data_fut = pin!(rayon_run({
249 let block_id = block_full.block_id;
250 let proof_data = block_full.proof_data.clone();
251 let queue_diff_data = block_full.queue_diff_data.clone();
252 move || {
253 (
254 BlockProofStuff::deserialize(&block_id, &proof_data),
255 QueueDiffStuff::deserialize(&block_id, &queue_diff_data),
256 )
257 }
258 }));
259
260 let (block_stuff, (block_proof, queue_diff)) =
261 futures_util::future::join(block_stuff_fut, other_data_fut).await;
262
263 match (block_stuff, block_proof, queue_diff) {
264 (Ok(block), Ok(proof), Ok(diff)) => {
265 let proof = WithArchiveData::new(proof, block_full.proof_data);
266 let diff = WithArchiveData::new(diff, block_full.queue_diff_data);
267 if let Err(e) = self
268 .proof_checker
269 .check_proof(CheckProof {
270 mc_block_id,
271 block: &block,
272 proof: &proof,
273 queue_diff: &diff,
274 store_on_success: true,
275 })
276 .await
277 {
278 neighbour.punish(PunishReason::Malicious);
279 tracing::error!("got invalid block proof: {e}");
280 return None;
281 }
282
283 Some(Ok(block.with_archive_data(block_full.block_data)))
284 }
285 (Err(e), _, _) | (_, Err(e), _) | (_, _, Err(e)) => {
286 neighbour.punish(PunishReason::Malicious);
287 tracing::error!("failed to deserialize shard block or block proof: {e}");
288 None
289 }
290 }
291 }
292}
293
294impl BlockProvider for BlockchainBlockProvider {
295 type GetNextBlockFut<'a> = BoxFuture<'a, OptionalBlockStuff>;
296 type GetBlockFut<'a> = BoxFuture<'a, OptionalBlockStuff>;
297 type CleanupFut<'a> = BlockchainBlockProviderCleanupFut<'a>;
298
299 fn get_next_block<'a>(&'a self, prev_block_id: &'a BlockId) -> Self::GetNextBlockFut<'a> {
300 Box::pin(self.get_next_block_impl(prev_block_id))
301 }
302
303 fn get_block<'a>(&'a self, block_id_relation: &'a BlockIdRelation) -> Self::GetBlockFut<'a> {
304 Box::pin(self.get_block_impl(block_id_relation))
305 }
306
307 fn cleanup_until(&self, mc_seqno: u32) -> Self::CleanupFut<'_> {
308 match &self.fallback {
309 Some(fallback) if self.cleanup_fallback_at.load(Ordering::Acquire) <= mc_seqno => {
310 BlockchainBlockProviderCleanupFut::Fallback {
311 fut: fallback.cleanup_until(mc_seqno),
312 cleanup_fallback_at: &self.cleanup_fallback_at,
313 mc_seqno,
314 }
315 }
316 _ => BlockchainBlockProviderCleanupFut::Noop,
317 }
318 }
319}
320
321pub enum BlockchainBlockProviderCleanupFut<'a> {
322 Noop,
323 Fallback {
324 fut: BoxFuture<'a, Result<()>>,
325 cleanup_fallback_at: &'a AtomicU32,
326 mc_seqno: u32,
327 },
328}
329
330impl Future for BlockchainBlockProviderCleanupFut<'_> {
331 type Output = Result<()>;
332
333 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
334 match self.get_mut() {
335 Self::Noop => Poll::Ready(Ok(())),
336 Self::Fallback {
337 fut,
338 cleanup_fallback_at,
339 mc_seqno,
340 } => {
341 let res = fut.poll_unpin(cx);
342
343 if matches!(&res, Poll::Ready(r) if r.is_ok()) {
345 cleanup_fallback_at
346 .compare_exchange(*mc_seqno, u32::MAX, Ordering::Release, Ordering::Relaxed)
347 .ok();
348 }
349
350 res
351 }
352 }
353 }
354}
355
356async fn loop_with_timeout<E, EFut, P, PFut, R, T>(
357 interval: Duration,
358 timeout: Duration,
359 use_timeout: bool,
360 request: E,
361 process: P,
362) -> Option<T>
363where
364 E: Fn() -> EFut,
365 EFut: Future<Output = R>,
366 P: Fn(R) -> PFut,
367 PFut: Future<Output = Option<T>>,
368{
369 let mut interval = tokio::time::interval(interval);
371
372 let mut timeout = pin!(if use_timeout {
373 Either::Left(tokio::time::sleep(timeout))
374 } else {
375 Either::Right(futures_util::future::pending::<()>())
376 });
377
378 loop {
379 tokio::select! {
380 res = request() => {
381 if let res @ Some(_) = process(res).await {
382 return res;
383 }
384 },
385 _ = &mut timeout => return None,
386 }
387
388 tokio::select! {
389 _ = interval.tick() => {},
390 _ = &mut timeout => return None,
391 }
392 }
393}