tycho_core/block_strider/provider/
blockchain_provider.rs

1use std::future::Future;
2use std::pin::{Pin, pin};
3use std::sync::atomic::{AtomicBool, AtomicU32, Ordering};
4use std::task::{Context, Poll};
5use std::time::Duration;
6
7use anyhow::Result;
8use futures_util::FutureExt;
9use futures_util::future::{BoxFuture, Either};
10use serde::{Deserialize, Serialize};
11use tycho_block_util::archive::WithArchiveData;
12use tycho_block_util::block::{BlockIdRelation, BlockProofStuff, BlockStuff};
13use tycho_block_util::queue::QueueDiffStuff;
14use tycho_types::models::*;
15use tycho_util::serde_helpers;
16use tycho_util::sync::rayon_run;
17
18use crate::block_strider::BlockProvider;
19use crate::block_strider::provider::{
20    BoxBlockProvider, CheckProof, OptionalBlockStuff, ProofChecker,
21};
22use crate::blockchain_rpc::{BlockDataFull, BlockchainRpcClient, DataRequirement};
23use crate::overlay_client::{Neighbour, PunishReason};
24use crate::storage::CoreStorage;
25
26// TODO: Use backoff instead of simple polling.
27
28#[derive(Debug, Clone, Serialize, Deserialize)]
29#[serde(default)]
30#[non_exhaustive]
31pub struct BlockchainBlockProviderConfig {
32    /// Polling interval for `get_next_block` method.
33    ///
34    /// Default: 1 second.
35    #[serde(with = "serde_helpers::humantime")]
36    pub get_next_block_polling_interval: Duration,
37
38    /// Polling interval for `get_block` method.
39    ///
40    /// Default: 1 second.
41    #[serde(with = "serde_helpers::humantime")]
42    pub get_block_polling_interval: Duration,
43
44    /// Timeout of `get_next_block` for the primary logic (get full block request).
45    /// Ignored if no fallback.
46    ///
47    /// Default: 120 seconds.
48    #[serde(with = "serde_helpers::humantime")]
49    pub get_next_block_timeout: Duration,
50
51    /// Timeout of `get_block` for the primary logic (get full block request).
52    /// Ignored if no fallback.
53    ///
54    /// Default: 60 seconds.
55    #[serde(with = "serde_helpers::humantime")]
56    pub get_block_timeout: Duration,
57}
58
59impl Default for BlockchainBlockProviderConfig {
60    fn default() -> Self {
61        Self {
62            get_next_block_polling_interval: Duration::from_secs(1),
63            get_block_polling_interval: Duration::from_secs(1),
64            get_next_block_timeout: Duration::from_secs(120),
65            get_block_timeout: Duration::from_secs(60),
66        }
67    }
68}
69
70pub struct BlockchainBlockProvider {
71    client: BlockchainRpcClient,
72    config: BlockchainBlockProviderConfig,
73    proof_checker: ProofChecker,
74    fallback: Option<BoxBlockProvider>,
75    use_fallback: AtomicBool,
76    cleanup_fallback_at: AtomicU32,
77}
78
79impl BlockchainBlockProvider {
80    pub fn new(
81        client: BlockchainRpcClient,
82        storage: CoreStorage,
83        config: BlockchainBlockProviderConfig,
84    ) -> Self {
85        let proof_checker = ProofChecker::new(storage);
86
87        Self {
88            client,
89            config,
90            proof_checker,
91            fallback: None,
92            use_fallback: AtomicBool::new(false),
93            cleanup_fallback_at: AtomicU32::new(u32::MAX),
94        }
95    }
96
97    pub fn with_fallback<P: BlockProvider>(mut self, fallback: P) -> Self {
98        // TODO: Don't wrap if `typeid::of::<P>() == typeid::of::<BoxBlockProvider>()`
99        self.fallback = Some(BoxBlockProvider::new(fallback));
100        self
101    }
102
103    async fn get_next_block_impl(&self, prev_block_id: &BlockId) -> OptionalBlockStuff {
104        fn is_next_for(block_id: &BlockId, prev_block_id: &BlockId) -> bool {
105            block_id.shard == prev_block_id.shard && block_id.seqno == prev_block_id.seqno + 1
106        }
107
108        let primary = || {
109            loop_with_timeout(
110                self.config.get_next_block_polling_interval,
111                self.config.get_next_block_timeout,
112                self.fallback.is_some(),
113                || {
114                    tracing::debug!(%prev_block_id, "get_next_block_full requested");
115                    self.client
116                        .get_next_block_full(prev_block_id, DataRequirement::Optional)
117                },
118                |res| async move {
119                    match res {
120                        Ok(res) => match res.data {
121                            Some(data) if !is_next_for(&data.block_id, prev_block_id) => {
122                                res.neighbour.punish(PunishReason::Malicious);
123                                tracing::warn!("got response for an unknown block id");
124                            }
125                            Some(data) => {
126                                let mc_block_id = data.block_id;
127                                let parsed = self
128                                    .process_received_block(&mc_block_id, data, res.neighbour)
129                                    .await;
130                                if parsed.is_some() {
131                                    return parsed;
132                                }
133                            }
134                            None => tracing::warn!(?prev_block_id, "block not found"),
135                        },
136                        Err(e) => tracing::error!("failed to get next block: {e}"),
137                    }
138                    None
139                },
140            )
141        };
142
143        loop {
144            // Primary
145            if !self.use_fallback.load(Ordering::Relaxed)
146                && let res @ Some(_) = primary().await
147            {
148                return res;
149            }
150
151            // Fallback
152            if let Some(fallback) = &self.fallback {
153                tracing::debug!(%prev_block_id, "get_next_block_full fallback");
154                self.use_fallback.store(true, Ordering::Relaxed);
155                if let res @ Some(_) = fallback.get_next_block(prev_block_id).await {
156                    return res;
157                }
158            }
159
160            // Reset fallback
161            self.use_fallback.store(false, Ordering::Relaxed);
162
163            // Schedule next cleanup
164            self.cleanup_fallback_at
165                .store(prev_block_id.seqno.saturating_add(1), Ordering::Release);
166        }
167    }
168
169    async fn get_block_impl(&self, block_id_relation: &BlockIdRelation) -> OptionalBlockStuff {
170        let BlockIdRelation {
171            mc_block_id,
172            block_id,
173        } = block_id_relation;
174
175        let primary = || {
176            loop_with_timeout(
177                self.config.get_block_polling_interval,
178                self.config.get_block_timeout,
179                self.fallback.is_some(),
180                || {
181                    tracing::debug!(%block_id, "get_block_full requested");
182
183                    // Should be optional to avoid of banning of all neighbors
184                    // when requests too new blocks
185                    self.client
186                        .get_block_full(block_id, DataRequirement::Optional)
187                },
188                |res| async move {
189                    match res {
190                        Ok(res) => match res.data {
191                            Some(data) => {
192                                let parsed = self
193                                    .process_received_block(mc_block_id, data, res.neighbour)
194                                    .await;
195                                if parsed.is_some() {
196                                    return parsed;
197                                }
198                            }
199                            None => tracing::warn!(%block_id, "block not found"),
200                        },
201                        Err(e) => tracing::error!("failed to get block: {e}"),
202                    }
203                    None
204                },
205            )
206        };
207
208        loop {
209            // Primary
210            if !self.use_fallback.load(Ordering::Relaxed)
211                && let res @ Some(_) = primary().await
212            {
213                return res;
214            }
215
216            // Fallback
217            if let Some(fallback) = &self.fallback {
218                tracing::debug!(%block_id, "get_block_full fallback");
219                self.use_fallback.store(true, Ordering::Relaxed);
220                if let res @ Some(_) = fallback.get_block(block_id_relation).await {
221                    return res;
222                }
223            }
224
225            // Reset fallback
226            self.use_fallback.store(false, Ordering::Relaxed);
227
228            // NOTE: Don't schedule next cleanup for fallback, get_next is enough for that.
229        }
230    }
231
232    #[tracing::instrument(
233        skip(self,mc_block_id, block_full, neighbour),
234        fields(mc_block_id = %mc_block_id.as_short_id())
235    )]
236    async fn process_received_block(
237        &self,
238        mc_block_id: &BlockId,
239        block_full: BlockDataFull,
240        neighbour: Neighbour,
241    ) -> OptionalBlockStuff {
242        let block_stuff_fut = pin!(rayon_run({
243            let block_id = block_full.block_id;
244            let block_data = block_full.block_data.clone();
245            move || BlockStuff::deserialize_checked(&block_id, &block_data)
246        }));
247
248        let other_data_fut = pin!(rayon_run({
249            let block_id = block_full.block_id;
250            let proof_data = block_full.proof_data.clone();
251            let queue_diff_data = block_full.queue_diff_data.clone();
252            move || {
253                (
254                    BlockProofStuff::deserialize(&block_id, &proof_data),
255                    QueueDiffStuff::deserialize(&block_id, &queue_diff_data),
256                )
257            }
258        }));
259
260        let (block_stuff, (block_proof, queue_diff)) =
261            futures_util::future::join(block_stuff_fut, other_data_fut).await;
262
263        match (block_stuff, block_proof, queue_diff) {
264            (Ok(block), Ok(proof), Ok(diff)) => {
265                let proof = WithArchiveData::new(proof, block_full.proof_data);
266                let diff = WithArchiveData::new(diff, block_full.queue_diff_data);
267                if let Err(e) = self
268                    .proof_checker
269                    .check_proof(CheckProof {
270                        mc_block_id,
271                        block: &block,
272                        proof: &proof,
273                        queue_diff: &diff,
274                        store_on_success: true,
275                    })
276                    .await
277                {
278                    neighbour.punish(PunishReason::Malicious);
279                    tracing::error!("got invalid block proof: {e}");
280                    return None;
281                }
282
283                Some(Ok(block.with_archive_data(block_full.block_data)))
284            }
285            (Err(e), _, _) | (_, Err(e), _) | (_, _, Err(e)) => {
286                neighbour.punish(PunishReason::Malicious);
287                tracing::error!("failed to deserialize shard block or block proof: {e}");
288                None
289            }
290        }
291    }
292}
293
294impl BlockProvider for BlockchainBlockProvider {
295    type GetNextBlockFut<'a> = BoxFuture<'a, OptionalBlockStuff>;
296    type GetBlockFut<'a> = BoxFuture<'a, OptionalBlockStuff>;
297    type CleanupFut<'a> = BlockchainBlockProviderCleanupFut<'a>;
298
299    fn get_next_block<'a>(&'a self, prev_block_id: &'a BlockId) -> Self::GetNextBlockFut<'a> {
300        Box::pin(self.get_next_block_impl(prev_block_id))
301    }
302
303    fn get_block<'a>(&'a self, block_id_relation: &'a BlockIdRelation) -> Self::GetBlockFut<'a> {
304        Box::pin(self.get_block_impl(block_id_relation))
305    }
306
307    fn cleanup_until(&self, mc_seqno: u32) -> Self::CleanupFut<'_> {
308        match &self.fallback {
309            Some(fallback) if self.cleanup_fallback_at.load(Ordering::Acquire) <= mc_seqno => {
310                BlockchainBlockProviderCleanupFut::Fallback {
311                    fut: fallback.cleanup_until(mc_seqno),
312                    cleanup_fallback_at: &self.cleanup_fallback_at,
313                    mc_seqno,
314                }
315            }
316            _ => BlockchainBlockProviderCleanupFut::Noop,
317        }
318    }
319}
320
321pub enum BlockchainBlockProviderCleanupFut<'a> {
322    Noop,
323    Fallback {
324        fut: BoxFuture<'a, Result<()>>,
325        cleanup_fallback_at: &'a AtomicU32,
326        mc_seqno: u32,
327    },
328}
329
330impl Future for BlockchainBlockProviderCleanupFut<'_> {
331    type Output = Result<()>;
332
333    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
334        match self.get_mut() {
335            Self::Noop => Poll::Ready(Ok(())),
336            Self::Fallback {
337                fut,
338                cleanup_fallback_at,
339                mc_seqno,
340            } => {
341                let res = fut.poll_unpin(cx);
342
343                // Reset `cleanup_fallback_at` when future is ready.
344                if matches!(&res, Poll::Ready(r) if r.is_ok()) {
345                    cleanup_fallback_at
346                        .compare_exchange(*mc_seqno, u32::MAX, Ordering::Release, Ordering::Relaxed)
347                        .ok();
348                }
349
350                res
351            }
352        }
353    }
354}
355
356async fn loop_with_timeout<E, EFut, P, PFut, R, T>(
357    interval: Duration,
358    timeout: Duration,
359    use_timeout: bool,
360    request: E,
361    process: P,
362) -> Option<T>
363where
364    E: Fn() -> EFut,
365    EFut: Future<Output = R>,
366    P: Fn(R) -> PFut,
367    PFut: Future<Output = Option<T>>,
368{
369    // TODO: Backoff?
370    let mut interval = tokio::time::interval(interval);
371
372    let mut timeout = pin!(if use_timeout {
373        Either::Left(tokio::time::sleep(timeout))
374    } else {
375        Either::Right(futures_util::future::pending::<()>())
376    });
377
378    loop {
379        tokio::select! {
380            res = request() => {
381                if let res @ Some(_) = process(res).await {
382                    return res;
383                }
384            },
385            _ = &mut timeout => return None,
386        }
387
388        tokio::select! {
389            _ = interval.tick() => {},
390            _ = &mut timeout => return None,
391        }
392    }
393}