Skip to main content

tycho_core/block_strider/provider/
blockchain_provider.rs

1use std::future::Future;
2use std::pin::{Pin, pin};
3use std::sync::atomic::{AtomicBool, AtomicU32, Ordering};
4use std::task::{Context, Poll};
5use std::time::Duration;
6
7use anyhow::Result;
8use futures_util::FutureExt;
9use futures_util::future::{BoxFuture, Either};
10use serde::{Deserialize, Serialize};
11use tycho_block_util::archive::WithArchiveData;
12use tycho_block_util::block::{BlockIdRelation, BlockProofStuff, BlockStuff};
13use tycho_block_util::queue::QueueDiffStuff;
14use tycho_types::models::*;
15use tycho_util::serde_helpers;
16use tycho_util::sync::rayon_run;
17
18use crate::block_strider::BlockProvider;
19use crate::block_strider::provider::{
20    BoxBlockProvider, CheckProof, OptionalBlockStuff, ProofChecker,
21};
22use crate::blockchain_rpc::{BlockDataFull, BlockchainRpcClient, DataRequirement};
23use crate::overlay_client::{Neighbour, PunishReason};
24use crate::storage::CoreStorage;
25
26// TODO: Use backoff instead of simple polling.
27
28#[derive(Debug, Clone, Serialize, Deserialize)]
29#[serde(default)]
30#[non_exhaustive]
31pub struct BlockchainBlockProviderConfig {
32    /// Polling interval for `get_next_block` method.
33    ///
34    /// Default: 1 second.
35    #[serde(with = "serde_helpers::humantime")]
36    pub get_next_block_polling_interval: Duration,
37
38    /// Polling interval for `get_block` method.
39    ///
40    /// Default: 1 second.
41    #[serde(with = "serde_helpers::humantime")]
42    pub get_block_polling_interval: Duration,
43
44    /// Timeout of `get_next_block` for the primary logic (get full block request).
45    /// Ignored if no fallback.
46    ///
47    /// Default: 120 seconds.
48    #[serde(with = "serde_helpers::humantime")]
49    pub get_next_block_timeout: Duration,
50
51    /// Timeout of `get_block` for the primary logic (get full block request).
52    /// Ignored if no fallback.
53    ///
54    /// Default: 60 seconds.
55    #[serde(with = "serde_helpers::humantime")]
56    pub get_block_timeout: Duration,
57
58    /// How long to wait after the fallback.
59    ///
60    /// Default: 1 second.
61    #[serde(with = "serde_helpers::humantime")]
62    pub fallback_interval: Duration,
63}
64
65impl Default for BlockchainBlockProviderConfig {
66    fn default() -> Self {
67        Self {
68            get_next_block_polling_interval: Duration::from_secs(1),
69            get_block_polling_interval: Duration::from_secs(1),
70            get_next_block_timeout: Duration::from_secs(120),
71            get_block_timeout: Duration::from_secs(60),
72            fallback_interval: Duration::from_secs(1),
73        }
74    }
75}
76
77pub struct BlockchainBlockProvider {
78    client: BlockchainRpcClient,
79    config: BlockchainBlockProviderConfig,
80    proof_checker: ProofChecker,
81    fallback: Option<BoxBlockProvider>,
82    use_fallback: AtomicBool,
83    cleanup_fallback_at: AtomicU32,
84}
85
86impl BlockchainBlockProvider {
87    pub fn new(
88        client: BlockchainRpcClient,
89        storage: CoreStorage,
90        config: BlockchainBlockProviderConfig,
91    ) -> Self {
92        let proof_checker = ProofChecker::new(storage);
93
94        Self {
95            client,
96            config,
97            proof_checker,
98            fallback: None,
99            use_fallback: AtomicBool::new(false),
100            cleanup_fallback_at: AtomicU32::new(u32::MAX),
101        }
102    }
103
104    pub fn with_fallback<P: BlockProvider>(mut self, fallback: P) -> Self {
105        // TODO: Don't wrap if `typeid::of::<P>() == typeid::of::<BoxBlockProvider>()`
106        self.fallback = Some(BoxBlockProvider::new(fallback));
107        self
108    }
109
110    async fn get_next_block_impl(&self, prev_block_id: &BlockId) -> OptionalBlockStuff {
111        fn is_next_for(block_id: &BlockId, prev_block_id: &BlockId) -> bool {
112            block_id.shard == prev_block_id.shard && block_id.seqno == prev_block_id.seqno + 1
113        }
114
115        let primary = || {
116            loop_with_timeout(
117                self.config.get_next_block_polling_interval,
118                self.config.get_next_block_timeout,
119                self.fallback.is_some(),
120                || {
121                    tracing::debug!(%prev_block_id, "get_next_block_full requested");
122                    self.client
123                        .get_next_block_full(prev_block_id, DataRequirement::Optional)
124                },
125                |res| async move {
126                    match res {
127                        Ok(res) => match res.data {
128                            Some(data) if !is_next_for(&data.block_id, prev_block_id) => {
129                                res.neighbour.punish(PunishReason::Malicious);
130                                tracing::warn!("got response for an unknown block id");
131                            }
132                            Some(data) => {
133                                let mc_block_id = data.block_id;
134                                let parsed = self
135                                    .process_received_block(&mc_block_id, data, res.neighbour)
136                                    .await;
137                                if parsed.is_some() {
138                                    return parsed;
139                                }
140                            }
141                            None => tracing::warn!(?prev_block_id, "block not found"),
142                        },
143                        Err(e) => tracing::error!("failed to get next block: {e:?}"),
144                    }
145                    None
146                },
147            )
148        };
149
150        loop {
151            // Primary
152            if !self.use_fallback.load(Ordering::Relaxed)
153                && let res @ Some(_) = primary().await
154            {
155                return res;
156            }
157
158            // Fallback
159            if let Some(fallback) = &self.fallback {
160                tracing::debug!(%prev_block_id, "get_next_block_full fallback");
161                self.use_fallback.store(true, Ordering::Relaxed);
162                if let res @ Some(_) = fallback.get_next_block(prev_block_id).await {
163                    return res;
164                }
165
166                // Wait for some time before the next request
167                tokio::time::sleep(self.config.fallback_interval).await;
168            }
169
170            // Reset fallback
171            self.use_fallback.store(false, Ordering::Relaxed);
172
173            // Schedule next cleanup
174            self.cleanup_fallback_at
175                .store(prev_block_id.seqno.saturating_add(1), Ordering::Release);
176        }
177    }
178
179    async fn get_block_impl(&self, block_id_relation: &BlockIdRelation) -> OptionalBlockStuff {
180        let BlockIdRelation {
181            mc_block_id,
182            block_id,
183        } = block_id_relation;
184
185        let primary = || {
186            loop_with_timeout(
187                self.config.get_block_polling_interval,
188                self.config.get_block_timeout,
189                self.fallback.is_some(),
190                || {
191                    tracing::debug!(%block_id, "get_block_full requested");
192
193                    // Should be optional to avoid of banning of all neighbors
194                    // when requests too new blocks
195                    self.client
196                        .get_block_full(block_id, DataRequirement::Optional)
197                },
198                |res| async move {
199                    match res {
200                        Ok(res) => match res.data {
201                            Some(data) => {
202                                let parsed = self
203                                    .process_received_block(mc_block_id, data, res.neighbour)
204                                    .await;
205                                if parsed.is_some() {
206                                    return parsed;
207                                }
208                            }
209                            None => tracing::warn!(%block_id, "block not found"),
210                        },
211                        Err(e) => tracing::error!("failed to get block: {e:?}"),
212                    }
213                    None
214                },
215            )
216        };
217
218        loop {
219            // Primary
220            if !self.use_fallback.load(Ordering::Relaxed)
221                && let res @ Some(_) = primary().await
222            {
223                return res;
224            }
225
226            // Fallback
227            if let Some(fallback) = &self.fallback {
228                tracing::debug!(%block_id, "get_block_full fallback");
229                self.use_fallback.store(true, Ordering::Relaxed);
230                if let res @ Some(_) = fallback.get_block(block_id_relation).await {
231                    return res;
232                }
233
234                // Wait for some time before the next request
235                tokio::time::sleep(self.config.fallback_interval).await;
236            }
237
238            // Reset fallback
239            self.use_fallback.store(false, Ordering::Relaxed);
240
241            // NOTE: Don't schedule next cleanup for fallback, get_next is enough for that.
242        }
243    }
244
245    #[tracing::instrument(
246        skip(self,mc_block_id, block_full, neighbour),
247        fields(mc_block_id = %mc_block_id.as_short_id())
248    )]
249    async fn process_received_block(
250        &self,
251        mc_block_id: &BlockId,
252        block_full: BlockDataFull,
253        neighbour: Neighbour,
254    ) -> OptionalBlockStuff {
255        let block_stuff_fut = pin!(rayon_run({
256            let block_id = block_full.block_id;
257            let block_data = block_full.block_data.clone();
258            move || BlockStuff::deserialize_checked(&block_id, &block_data)
259        }));
260
261        let other_data_fut = pin!(rayon_run({
262            let block_id = block_full.block_id;
263            let proof_data = block_full.proof_data.clone();
264            let queue_diff_data = block_full.queue_diff_data.clone();
265            move || {
266                (
267                    BlockProofStuff::deserialize(&block_id, &proof_data),
268                    QueueDiffStuff::deserialize(&block_id, &queue_diff_data),
269                )
270            }
271        }));
272
273        let (block_stuff, (block_proof, queue_diff)) =
274            futures_util::future::join(block_stuff_fut, other_data_fut).await;
275
276        match (block_stuff, block_proof, queue_diff) {
277            (Ok(block), Ok(proof), Ok(diff)) => {
278                let proof = WithArchiveData::new(proof, block_full.proof_data);
279                let diff = WithArchiveData::new(diff, block_full.queue_diff_data);
280                if let Err(e) = self
281                    .proof_checker
282                    .check_proof(CheckProof {
283                        mc_block_id,
284                        block: &block,
285                        proof: &proof,
286                        queue_diff: &diff,
287                        store_on_success: true,
288                    })
289                    .await
290                {
291                    neighbour.punish(PunishReason::Malicious);
292                    tracing::error!("got invalid block proof: {e:?}");
293                    return None;
294                }
295
296                Some(Ok(block.with_archive_data(block_full.block_data)))
297            }
298            (Err(e), _, _) | (_, Err(e), _) | (_, _, Err(e)) => {
299                neighbour.punish(PunishReason::Malicious);
300                tracing::error!("failed to deserialize shard block or block proof: {e:?}");
301                None
302            }
303        }
304    }
305}
306
307impl BlockProvider for BlockchainBlockProvider {
308    type GetNextBlockFut<'a> = BoxFuture<'a, OptionalBlockStuff>;
309    type GetBlockFut<'a> = BoxFuture<'a, OptionalBlockStuff>;
310    type CleanupFut<'a> = BlockchainBlockProviderCleanupFut<'a>;
311
312    fn get_next_block<'a>(&'a self, prev_block_id: &'a BlockId) -> Self::GetNextBlockFut<'a> {
313        Box::pin(self.get_next_block_impl(prev_block_id))
314    }
315
316    fn get_block<'a>(&'a self, block_id_relation: &'a BlockIdRelation) -> Self::GetBlockFut<'a> {
317        Box::pin(self.get_block_impl(block_id_relation))
318    }
319
320    fn cleanup_until(&self, mc_seqno: u32) -> Self::CleanupFut<'_> {
321        match &self.fallback {
322            Some(fallback) if self.cleanup_fallback_at.load(Ordering::Acquire) <= mc_seqno => {
323                BlockchainBlockProviderCleanupFut::Fallback {
324                    fut: fallback.cleanup_until(mc_seqno),
325                    cleanup_fallback_at: &self.cleanup_fallback_at,
326                    mc_seqno,
327                }
328            }
329            _ => BlockchainBlockProviderCleanupFut::Noop,
330        }
331    }
332}
333
334pub enum BlockchainBlockProviderCleanupFut<'a> {
335    Noop,
336    Fallback {
337        fut: BoxFuture<'a, Result<()>>,
338        cleanup_fallback_at: &'a AtomicU32,
339        mc_seqno: u32,
340    },
341}
342
343impl Future for BlockchainBlockProviderCleanupFut<'_> {
344    type Output = Result<()>;
345
346    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
347        match self.get_mut() {
348            Self::Noop => Poll::Ready(Ok(())),
349            Self::Fallback {
350                fut,
351                cleanup_fallback_at,
352                mc_seqno,
353            } => {
354                let res = fut.poll_unpin(cx);
355
356                // Reset `cleanup_fallback_at` when future is ready.
357                if matches!(&res, Poll::Ready(r) if r.is_ok()) {
358                    cleanup_fallback_at
359                        .compare_exchange(*mc_seqno, u32::MAX, Ordering::Release, Ordering::Relaxed)
360                        .ok();
361                }
362
363                res
364            }
365        }
366    }
367}
368
369async fn loop_with_timeout<E, EFut, P, PFut, R, T>(
370    interval: Duration,
371    timeout: Duration,
372    use_timeout: bool,
373    request: E,
374    process: P,
375) -> Option<T>
376where
377    E: Fn() -> EFut,
378    EFut: Future<Output = R>,
379    P: Fn(R) -> PFut,
380    PFut: Future<Output = Option<T>>,
381{
382    // TODO: Backoff?
383    let mut interval = tokio::time::interval(interval);
384
385    let mut timeout = pin!(if use_timeout {
386        Either::Left(tokio::time::sleep(timeout))
387    } else {
388        Either::Right(futures_util::future::pending::<()>())
389    });
390
391    loop {
392        tokio::select! {
393            res = request() => {
394                if let res @ Some(_) = process(res).await {
395                    return res;
396                }
397            },
398            _ = &mut timeout => return None,
399        }
400
401        tokio::select! {
402            _ = interval.tick() => {},
403            _ = &mut timeout => return None,
404        }
405    }
406}