tycho_core/block_strider/provider/
blockchain_provider.rs1use std::future::Future;
2use std::pin::{Pin, pin};
3use std::sync::atomic::{AtomicBool, AtomicU32, Ordering};
4use std::task::{Context, Poll};
5use std::time::Duration;
6
7use anyhow::Result;
8use futures_util::FutureExt;
9use futures_util::future::{BoxFuture, Either};
10use serde::{Deserialize, Serialize};
11use tycho_block_util::archive::WithArchiveData;
12use tycho_block_util::block::{BlockIdRelation, BlockProofStuff, BlockStuff};
13use tycho_block_util::queue::QueueDiffStuff;
14use tycho_types::models::*;
15use tycho_util::serde_helpers;
16use tycho_util::sync::rayon_run;
17
18use crate::block_strider::BlockProvider;
19use crate::block_strider::provider::{
20 BoxBlockProvider, CheckProof, OptionalBlockStuff, ProofChecker,
21};
22use crate::blockchain_rpc::{BlockDataFull, BlockchainRpcClient, DataRequirement};
23use crate::overlay_client::{Neighbour, PunishReason};
24use crate::storage::CoreStorage;
25
26#[derive(Debug, Clone, Serialize, Deserialize)]
29#[serde(default)]
30#[non_exhaustive]
31pub struct BlockchainBlockProviderConfig {
32 #[serde(with = "serde_helpers::humantime")]
36 pub get_next_block_polling_interval: Duration,
37
38 #[serde(with = "serde_helpers::humantime")]
42 pub get_block_polling_interval: Duration,
43
44 #[serde(with = "serde_helpers::humantime")]
49 pub get_next_block_timeout: Duration,
50
51 #[serde(with = "serde_helpers::humantime")]
56 pub get_block_timeout: Duration,
57}
58
59impl Default for BlockchainBlockProviderConfig {
60 fn default() -> Self {
61 Self {
62 get_next_block_polling_interval: Duration::from_secs(1),
63 get_block_polling_interval: Duration::from_secs(1),
64 get_next_block_timeout: Duration::from_secs(120),
65 get_block_timeout: Duration::from_secs(60),
66 }
67 }
68}
69
70pub struct BlockchainBlockProvider {
71 client: BlockchainRpcClient,
72 config: BlockchainBlockProviderConfig,
73 proof_checker: ProofChecker,
74 fallback: Option<BoxBlockProvider>,
75 use_fallback: AtomicBool,
76 cleanup_fallback_at: AtomicU32,
77}
78
79impl BlockchainBlockProvider {
80 pub fn new(
81 client: BlockchainRpcClient,
82 storage: CoreStorage,
83 config: BlockchainBlockProviderConfig,
84 ) -> Self {
85 let proof_checker = ProofChecker::new(storage);
86
87 Self {
88 client,
89 config,
90 proof_checker,
91 fallback: None,
92 use_fallback: AtomicBool::new(false),
93 cleanup_fallback_at: AtomicU32::new(u32::MAX),
94 }
95 }
96
97 pub fn with_fallback<P: BlockProvider>(mut self, fallback: P) -> Self {
98 self.fallback = Some(BoxBlockProvider::new(fallback));
100 self
101 }
102
103 async fn get_next_block_impl(&self, prev_block_id: &BlockId) -> OptionalBlockStuff {
104 fn is_next_for(block_id: &BlockId, prev_block_id: &BlockId) -> bool {
105 block_id.shard == prev_block_id.shard && block_id.seqno == prev_block_id.seqno + 1
106 }
107
108 let primary = || {
109 loop_with_timeout(
110 self.config.get_next_block_polling_interval,
111 self.config.get_next_block_timeout,
112 self.fallback.is_some(),
113 || {
114 tracing::debug!(%prev_block_id, "get_next_block_full requested");
115 self.client
116 .get_next_block_full(prev_block_id, DataRequirement::Optional)
117 },
118 |res| async move {
119 match res {
120 Ok(res) => match res.data {
121 Some(data) if !is_next_for(&data.block_id, prev_block_id) => {
122 res.neighbour.punish(PunishReason::Malicious);
123 tracing::warn!("got response for an unknown block id");
124 }
125 Some(data) => {
126 let mc_block_id = data.block_id;
127 let parsed = self
128 .process_received_block(&mc_block_id, data, res.neighbour)
129 .await;
130 if parsed.is_some() {
131 return parsed;
132 }
133 }
134 None => tracing::warn!(?prev_block_id, "block not found"),
135 },
136 Err(e) => tracing::error!("failed to get next block: {e}"),
137 }
138 None
139 },
140 )
141 };
142
143 loop {
144 if !self.use_fallback.load(Ordering::Relaxed) {
146 if let res @ Some(_) = primary().await {
147 return res;
148 }
149 }
150
151 if let Some(fallback) = &self.fallback {
153 tracing::debug!(%prev_block_id, "get_next_block_full fallback");
154 self.use_fallback.store(true, Ordering::Relaxed);
155 if let res @ Some(_) = fallback.get_next_block(prev_block_id).await {
156 return res;
157 }
158 }
159
160 self.use_fallback.store(false, Ordering::Relaxed);
162
163 self.cleanup_fallback_at
165 .store(prev_block_id.seqno.saturating_add(1), Ordering::Release);
166 }
167 }
168
169 async fn get_block_impl(&self, block_id_relation: &BlockIdRelation) -> OptionalBlockStuff {
170 let BlockIdRelation {
171 mc_block_id,
172 block_id,
173 } = block_id_relation;
174
175 let primary = || {
176 loop_with_timeout(
177 self.config.get_block_polling_interval,
178 self.config.get_block_timeout,
179 self.fallback.is_some(),
180 || {
181 tracing::debug!(%block_id, "get_block_full requested");
182 self.client
183 .get_block_full(block_id, DataRequirement::Expected)
184 },
185 |res| async move {
186 match res {
187 Ok(res) => match res.data {
188 Some(data) => {
189 let parsed = self
190 .process_received_block(mc_block_id, data, res.neighbour)
191 .await;
192 if parsed.is_some() {
193 return parsed;
194 }
195 }
196 None => tracing::warn!(%block_id, "block not found"),
197 },
198 Err(e) => tracing::error!("failed to get block: {e}"),
199 }
200 None
201 },
202 )
203 };
204
205 loop {
206 if !self.use_fallback.load(Ordering::Relaxed) {
208 if let res @ Some(_) = primary().await {
209 return res;
210 }
211 }
212
213 if let Some(fallback) = &self.fallback {
215 tracing::debug!(%block_id, "get_block_full fallback");
216 self.use_fallback.store(true, Ordering::Relaxed);
217 if let res @ Some(_) = fallback.get_block(block_id_relation).await {
218 return res;
219 }
220 }
221
222 self.use_fallback.store(false, Ordering::Relaxed);
224
225 }
227 }
228
229 async fn process_received_block(
230 &self,
231 mc_block_id: &BlockId,
232 block_full: BlockDataFull,
233 neighbour: Neighbour,
234 ) -> OptionalBlockStuff {
235 let block_stuff_fut = pin!(rayon_run({
236 let block_id = block_full.block_id;
237 let block_data = block_full.block_data.clone();
238 move || BlockStuff::deserialize_checked(&block_id, &block_data)
239 }));
240
241 let other_data_fut = pin!(rayon_run({
242 let block_id = block_full.block_id;
243 let proof_data = block_full.proof_data.clone();
244 let queue_diff_data = block_full.queue_diff_data.clone();
245 move || {
246 (
247 BlockProofStuff::deserialize(&block_id, &proof_data),
248 QueueDiffStuff::deserialize(&block_id, &queue_diff_data),
249 )
250 }
251 }));
252
253 let (block_stuff, (block_proof, queue_diff)) =
254 futures_util::future::join(block_stuff_fut, other_data_fut).await;
255
256 match (block_stuff, block_proof, queue_diff) {
257 (Ok(block), Ok(proof), Ok(diff)) => {
258 let proof = WithArchiveData::new(proof, block_full.proof_data);
259 let diff = WithArchiveData::new(diff, block_full.queue_diff_data);
260 if let Err(e) = self
261 .proof_checker
262 .check_proof(CheckProof {
263 mc_block_id,
264 block: &block,
265 proof: &proof,
266 queue_diff: &diff,
267 store_on_success: true,
268 })
269 .await
270 {
271 neighbour.punish(PunishReason::Malicious);
272 tracing::error!("got invalid block proof: {e}");
273 return None;
274 }
275
276 Some(Ok(block.with_archive_data(block_full.block_data)))
277 }
278 (Err(e), _, _) | (_, Err(e), _) | (_, _, Err(e)) => {
279 neighbour.punish(PunishReason::Malicious);
280 tracing::error!("failed to deserialize shard block or block proof: {e}");
281 None
282 }
283 }
284 }
285}
286
287impl BlockProvider for BlockchainBlockProvider {
288 type GetNextBlockFut<'a> = BoxFuture<'a, OptionalBlockStuff>;
289 type GetBlockFut<'a> = BoxFuture<'a, OptionalBlockStuff>;
290 type CleanupFut<'a> = BlockchainBlockProviderCleanupFut<'a>;
291
292 fn get_next_block<'a>(&'a self, prev_block_id: &'a BlockId) -> Self::GetNextBlockFut<'a> {
293 Box::pin(self.get_next_block_impl(prev_block_id))
294 }
295
296 fn get_block<'a>(&'a self, block_id_relation: &'a BlockIdRelation) -> Self::GetBlockFut<'a> {
297 Box::pin(self.get_block_impl(block_id_relation))
298 }
299
300 fn cleanup_until(&self, mc_seqno: u32) -> Self::CleanupFut<'_> {
301 match &self.fallback {
302 Some(fallback) if self.cleanup_fallback_at.load(Ordering::Acquire) <= mc_seqno => {
303 BlockchainBlockProviderCleanupFut::Fallback {
304 fut: fallback.cleanup_until(mc_seqno),
305 cleanup_fallback_at: &self.cleanup_fallback_at,
306 mc_seqno,
307 }
308 }
309 _ => BlockchainBlockProviderCleanupFut::Noop,
310 }
311 }
312}
313
314pub enum BlockchainBlockProviderCleanupFut<'a> {
315 Noop,
316 Fallback {
317 fut: BoxFuture<'a, Result<()>>,
318 cleanup_fallback_at: &'a AtomicU32,
319 mc_seqno: u32,
320 },
321}
322
323impl Future for BlockchainBlockProviderCleanupFut<'_> {
324 type Output = Result<()>;
325
326 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
327 match self.get_mut() {
328 Self::Noop => Poll::Ready(Ok(())),
329 Self::Fallback {
330 fut,
331 cleanup_fallback_at,
332 mc_seqno,
333 } => {
334 let res = fut.poll_unpin(cx);
335
336 if matches!(&res, Poll::Ready(r) if r.is_ok()) {
338 cleanup_fallback_at
339 .compare_exchange(*mc_seqno, u32::MAX, Ordering::Release, Ordering::Relaxed)
340 .ok();
341 }
342
343 res
344 }
345 }
346 }
347}
348
349async fn loop_with_timeout<E, EFut, P, PFut, R, T>(
350 interval: Duration,
351 timeout: Duration,
352 use_timeout: bool,
353 request: E,
354 process: P,
355) -> Option<T>
356where
357 E: Fn() -> EFut,
358 EFut: Future<Output = R>,
359 P: Fn(R) -> PFut,
360 PFut: Future<Output = Option<T>>,
361{
362 let mut interval = tokio::time::interval(interval);
364
365 let mut timeout = pin!(if use_timeout {
366 Either::Left(tokio::time::sleep(timeout))
367 } else {
368 Either::Right(futures_util::future::pending::<()>())
369 });
370
371 loop {
372 tokio::select! {
373 res = request() => {
374 if let res @ Some(_) = process(res).await {
375 return res;
376 }
377 },
378 _ = &mut timeout => return None,
379 }
380
381 tokio::select! {
382 _ = interval.tick() => {},
383 _ = &mut timeout => return None,
384 }
385 }
386}