tycho_core/block_strider/provider/
blockchain_provider.rs

1use std::future::Future;
2use std::pin::{Pin, pin};
3use std::sync::atomic::{AtomicBool, AtomicU32, Ordering};
4use std::task::{Context, Poll};
5use std::time::Duration;
6
7use anyhow::Result;
8use futures_util::FutureExt;
9use futures_util::future::{BoxFuture, Either};
10use serde::{Deserialize, Serialize};
11use tycho_block_util::archive::WithArchiveData;
12use tycho_block_util::block::{BlockIdRelation, BlockProofStuff, BlockStuff};
13use tycho_block_util::queue::QueueDiffStuff;
14use tycho_types::models::*;
15use tycho_util::serde_helpers;
16use tycho_util::sync::rayon_run;
17
18use crate::block_strider::BlockProvider;
19use crate::block_strider::provider::{
20    BoxBlockProvider, CheckProof, OptionalBlockStuff, ProofChecker,
21};
22use crate::blockchain_rpc::{BlockDataFull, BlockchainRpcClient, DataRequirement};
23use crate::overlay_client::{Neighbour, PunishReason};
24use crate::storage::CoreStorage;
25
26// TODO: Use backoff instead of simple polling.
27
28#[derive(Debug, Clone, Serialize, Deserialize)]
29#[serde(default)]
30#[non_exhaustive]
31pub struct BlockchainBlockProviderConfig {
32    /// Polling interval for `get_next_block` method.
33    ///
34    /// Default: 1 second.
35    #[serde(with = "serde_helpers::humantime")]
36    pub get_next_block_polling_interval: Duration,
37
38    /// Polling interval for `get_block` method.
39    ///
40    /// Default: 1 second.
41    #[serde(with = "serde_helpers::humantime")]
42    pub get_block_polling_interval: Duration,
43
44    /// Timeout of `get_next_block` for the primary logic (get full block request).
45    /// Ignored if no fallback.
46    ///
47    /// Default: 120 seconds.
48    #[serde(with = "serde_helpers::humantime")]
49    pub get_next_block_timeout: Duration,
50
51    /// Timeout of `get_block` for the primary logic (get full block request).
52    /// Ignored if no fallback.
53    ///
54    /// Default: 60 seconds.
55    #[serde(with = "serde_helpers::humantime")]
56    pub get_block_timeout: Duration,
57}
58
59impl Default for BlockchainBlockProviderConfig {
60    fn default() -> Self {
61        Self {
62            get_next_block_polling_interval: Duration::from_secs(1),
63            get_block_polling_interval: Duration::from_secs(1),
64            get_next_block_timeout: Duration::from_secs(120),
65            get_block_timeout: Duration::from_secs(60),
66        }
67    }
68}
69
70pub struct BlockchainBlockProvider {
71    client: BlockchainRpcClient,
72    config: BlockchainBlockProviderConfig,
73    proof_checker: ProofChecker,
74    fallback: Option<BoxBlockProvider>,
75    use_fallback: AtomicBool,
76    cleanup_fallback_at: AtomicU32,
77}
78
79impl BlockchainBlockProvider {
80    pub fn new(
81        client: BlockchainRpcClient,
82        storage: CoreStorage,
83        config: BlockchainBlockProviderConfig,
84    ) -> Self {
85        let proof_checker = ProofChecker::new(storage);
86
87        Self {
88            client,
89            config,
90            proof_checker,
91            fallback: None,
92            use_fallback: AtomicBool::new(false),
93            cleanup_fallback_at: AtomicU32::new(u32::MAX),
94        }
95    }
96
97    pub fn with_fallback<P: BlockProvider>(mut self, fallback: P) -> Self {
98        // TODO: Don't wrap if `typeid::of::<P>() == typeid::of::<BoxBlockProvider>()`
99        self.fallback = Some(BoxBlockProvider::new(fallback));
100        self
101    }
102
103    async fn get_next_block_impl(&self, prev_block_id: &BlockId) -> OptionalBlockStuff {
104        fn is_next_for(block_id: &BlockId, prev_block_id: &BlockId) -> bool {
105            block_id.shard == prev_block_id.shard && block_id.seqno == prev_block_id.seqno + 1
106        }
107
108        let primary = || {
109            loop_with_timeout(
110                self.config.get_next_block_polling_interval,
111                self.config.get_next_block_timeout,
112                self.fallback.is_some(),
113                || {
114                    tracing::debug!(%prev_block_id, "get_next_block_full requested");
115                    self.client
116                        .get_next_block_full(prev_block_id, DataRequirement::Optional)
117                },
118                |res| async move {
119                    match res {
120                        Ok(res) => match res.data {
121                            Some(data) if !is_next_for(&data.block_id, prev_block_id) => {
122                                res.neighbour.punish(PunishReason::Malicious);
123                                tracing::warn!("got response for an unknown block id");
124                            }
125                            Some(data) => {
126                                let mc_block_id = data.block_id;
127                                let parsed = self
128                                    .process_received_block(&mc_block_id, data, res.neighbour)
129                                    .await;
130                                if parsed.is_some() {
131                                    return parsed;
132                                }
133                            }
134                            None => tracing::warn!(?prev_block_id, "block not found"),
135                        },
136                        Err(e) => tracing::error!("failed to get next block: {e}"),
137                    }
138                    None
139                },
140            )
141        };
142
143        loop {
144            // Primary
145            if !self.use_fallback.load(Ordering::Relaxed) {
146                if let res @ Some(_) = primary().await {
147                    return res;
148                }
149            }
150
151            // Fallback
152            if let Some(fallback) = &self.fallback {
153                tracing::debug!(%prev_block_id, "get_next_block_full fallback");
154                self.use_fallback.store(true, Ordering::Relaxed);
155                if let res @ Some(_) = fallback.get_next_block(prev_block_id).await {
156                    return res;
157                }
158            }
159
160            // Reset fallback
161            self.use_fallback.store(false, Ordering::Relaxed);
162
163            // Schedule next cleanup
164            self.cleanup_fallback_at
165                .store(prev_block_id.seqno.saturating_add(1), Ordering::Release);
166        }
167    }
168
169    async fn get_block_impl(&self, block_id_relation: &BlockIdRelation) -> OptionalBlockStuff {
170        let BlockIdRelation {
171            mc_block_id,
172            block_id,
173        } = block_id_relation;
174
175        let primary = || {
176            loop_with_timeout(
177                self.config.get_block_polling_interval,
178                self.config.get_block_timeout,
179                self.fallback.is_some(),
180                || {
181                    tracing::debug!(%block_id, "get_block_full requested");
182                    self.client
183                        .get_block_full(block_id, DataRequirement::Expected)
184                },
185                |res| async move {
186                    match res {
187                        Ok(res) => match res.data {
188                            Some(data) => {
189                                let parsed = self
190                                    .process_received_block(mc_block_id, data, res.neighbour)
191                                    .await;
192                                if parsed.is_some() {
193                                    return parsed;
194                                }
195                            }
196                            None => tracing::warn!(%block_id, "block not found"),
197                        },
198                        Err(e) => tracing::error!("failed to get block: {e}"),
199                    }
200                    None
201                },
202            )
203        };
204
205        loop {
206            // Primary
207            if !self.use_fallback.load(Ordering::Relaxed) {
208                if let res @ Some(_) = primary().await {
209                    return res;
210                }
211            }
212
213            // Fallback
214            if let Some(fallback) = &self.fallback {
215                tracing::debug!(%block_id, "get_block_full fallback");
216                self.use_fallback.store(true, Ordering::Relaxed);
217                if let res @ Some(_) = fallback.get_block(block_id_relation).await {
218                    return res;
219                }
220            }
221
222            // Reset fallback
223            self.use_fallback.store(false, Ordering::Relaxed);
224
225            // NOTE: Don't schedule next cleanup for fallback, get_next is enough for that.
226        }
227    }
228
229    async fn process_received_block(
230        &self,
231        mc_block_id: &BlockId,
232        block_full: BlockDataFull,
233        neighbour: Neighbour,
234    ) -> OptionalBlockStuff {
235        let block_stuff_fut = pin!(rayon_run({
236            let block_id = block_full.block_id;
237            let block_data = block_full.block_data.clone();
238            move || BlockStuff::deserialize_checked(&block_id, &block_data)
239        }));
240
241        let other_data_fut = pin!(rayon_run({
242            let block_id = block_full.block_id;
243            let proof_data = block_full.proof_data.clone();
244            let queue_diff_data = block_full.queue_diff_data.clone();
245            move || {
246                (
247                    BlockProofStuff::deserialize(&block_id, &proof_data),
248                    QueueDiffStuff::deserialize(&block_id, &queue_diff_data),
249                )
250            }
251        }));
252
253        let (block_stuff, (block_proof, queue_diff)) =
254            futures_util::future::join(block_stuff_fut, other_data_fut).await;
255
256        match (block_stuff, block_proof, queue_diff) {
257            (Ok(block), Ok(proof), Ok(diff)) => {
258                let proof = WithArchiveData::new(proof, block_full.proof_data);
259                let diff = WithArchiveData::new(diff, block_full.queue_diff_data);
260                if let Err(e) = self
261                    .proof_checker
262                    .check_proof(CheckProof {
263                        mc_block_id,
264                        block: &block,
265                        proof: &proof,
266                        queue_diff: &diff,
267                        store_on_success: true,
268                    })
269                    .await
270                {
271                    neighbour.punish(PunishReason::Malicious);
272                    tracing::error!("got invalid block proof: {e}");
273                    return None;
274                }
275
276                Some(Ok(block.with_archive_data(block_full.block_data)))
277            }
278            (Err(e), _, _) | (_, Err(e), _) | (_, _, Err(e)) => {
279                neighbour.punish(PunishReason::Malicious);
280                tracing::error!("failed to deserialize shard block or block proof: {e}");
281                None
282            }
283        }
284    }
285}
286
287impl BlockProvider for BlockchainBlockProvider {
288    type GetNextBlockFut<'a> = BoxFuture<'a, OptionalBlockStuff>;
289    type GetBlockFut<'a> = BoxFuture<'a, OptionalBlockStuff>;
290    type CleanupFut<'a> = BlockchainBlockProviderCleanupFut<'a>;
291
292    fn get_next_block<'a>(&'a self, prev_block_id: &'a BlockId) -> Self::GetNextBlockFut<'a> {
293        Box::pin(self.get_next_block_impl(prev_block_id))
294    }
295
296    fn get_block<'a>(&'a self, block_id_relation: &'a BlockIdRelation) -> Self::GetBlockFut<'a> {
297        Box::pin(self.get_block_impl(block_id_relation))
298    }
299
300    fn cleanup_until(&self, mc_seqno: u32) -> Self::CleanupFut<'_> {
301        match &self.fallback {
302            Some(fallback) if self.cleanup_fallback_at.load(Ordering::Acquire) <= mc_seqno => {
303                BlockchainBlockProviderCleanupFut::Fallback {
304                    fut: fallback.cleanup_until(mc_seqno),
305                    cleanup_fallback_at: &self.cleanup_fallback_at,
306                    mc_seqno,
307                }
308            }
309            _ => BlockchainBlockProviderCleanupFut::Noop,
310        }
311    }
312}
313
314pub enum BlockchainBlockProviderCleanupFut<'a> {
315    Noop,
316    Fallback {
317        fut: BoxFuture<'a, Result<()>>,
318        cleanup_fallback_at: &'a AtomicU32,
319        mc_seqno: u32,
320    },
321}
322
323impl Future for BlockchainBlockProviderCleanupFut<'_> {
324    type Output = Result<()>;
325
326    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
327        match self.get_mut() {
328            Self::Noop => Poll::Ready(Ok(())),
329            Self::Fallback {
330                fut,
331                cleanup_fallback_at,
332                mc_seqno,
333            } => {
334                let res = fut.poll_unpin(cx);
335
336                // Reset `cleanup_fallback_at` when future is ready.
337                if matches!(&res, Poll::Ready(r) if r.is_ok()) {
338                    cleanup_fallback_at
339                        .compare_exchange(*mc_seqno, u32::MAX, Ordering::Release, Ordering::Relaxed)
340                        .ok();
341                }
342
343                res
344            }
345        }
346    }
347}
348
349async fn loop_with_timeout<E, EFut, P, PFut, R, T>(
350    interval: Duration,
351    timeout: Duration,
352    use_timeout: bool,
353    request: E,
354    process: P,
355) -> Option<T>
356where
357    E: Fn() -> EFut,
358    EFut: Future<Output = R>,
359    P: Fn(R) -> PFut,
360    PFut: Future<Output = Option<T>>,
361{
362    // TODO: Backoff?
363    let mut interval = tokio::time::interval(interval);
364
365    let mut timeout = pin!(if use_timeout {
366        Either::Left(tokio::time::sleep(timeout))
367    } else {
368        Either::Right(futures_util::future::pending::<()>())
369    });
370
371    loop {
372        tokio::select! {
373            res = request() => {
374                if let res @ Some(_) = process(res).await {
375                    return res;
376                }
377            },
378            _ = &mut timeout => return None,
379        }
380
381        tokio::select! {
382            _ = interval.tick() => {},
383            _ = &mut timeout => return None,
384        }
385    }
386}