tycho_core/block_strider/provider/
archive_provider.rs

1use std::collections::{BTreeMap, btree_map};
2use std::num::NonZeroU64;
3use std::pin::pin;
4use std::sync::Arc;
5use std::sync::atomic::{AtomicU8, Ordering};
6use std::time::Duration;
7
8use anyhow::Result;
9use async_trait::async_trait;
10use bytes::{BufMut, BytesMut};
11use bytesize::ByteSize;
12use futures_util::future::BoxFuture;
13use serde::{Deserialize, Serialize};
14use tokio::sync::watch;
15use tokio::task::AbortHandle;
16use tycho_block_util::archive::Archive;
17use tycho_block_util::block::{BlockIdRelation, BlockStuffAug};
18use tycho_types::models::BlockId;
19use tycho_util::fs::TargetWriter;
20
21use crate::block_strider::provider::{BlockProvider, CheckProof, OptionalBlockStuff, ProofChecker};
22use crate::blockchain_rpc;
23use crate::blockchain_rpc::BlockchainRpcClient;
24use crate::overlay_client::{Neighbour, PunishReason};
25#[cfg(feature = "s3")]
26use crate::s3::S3Client;
27use crate::storage::CoreStorage;
28
29#[derive(Debug, Clone, Serialize, Deserialize)]
30#[serde(default)]
31pub struct ArchiveBlockProviderConfig {
32    pub max_archive_to_memory_size: ByteSize,
33}
34
35impl Default for ArchiveBlockProviderConfig {
36    fn default() -> Self {
37        Self {
38            max_archive_to_memory_size: ByteSize::mb(100),
39        }
40    }
41}
42
43#[derive(Clone)]
44#[repr(transparent)]
45pub struct ArchiveBlockProvider {
46    inner: Arc<Inner>,
47}
48
49impl ArchiveBlockProvider {
50    pub fn new(
51        client: impl IntoArchiveClient,
52        storage: CoreStorage,
53        config: ArchiveBlockProviderConfig,
54    ) -> Self {
55        let proof_checker = ProofChecker::new(storage.clone());
56
57        Self {
58            inner: Arc::new(Inner {
59                client: client.into_archive_client(),
60                proof_checker,
61
62                known_archives: parking_lot::Mutex::new(Default::default()),
63
64                storage,
65                config,
66            }),
67        }
68    }
69
70    async fn get_next_block_impl(&self, block_id: &BlockId) -> OptionalBlockStuff {
71        let this = self.inner.as_ref();
72
73        let next_mc_seqno = block_id.seqno + 1;
74
75        loop {
76            let Some((archive_key, info)) = this.get_archive(next_mc_seqno).await else {
77                tracing::warn!(prev_block_id = ?block_id, "archive not found");
78                break None;
79            };
80
81            let Some(block_id) = info.archive.mc_block_ids.get(&next_mc_seqno) else {
82                tracing::error!(
83                    "received archive does not contain mc block with seqno {next_mc_seqno}"
84                );
85                this.remove_archive_if_same(archive_key, &info);
86                if let Some(from) = &info.from {
87                    from.punish(PunishReason::Malicious);
88                }
89                continue;
90            };
91
92            match self
93                .checked_get_entry_by_id(&info.archive, block_id, block_id)
94                .await
95            {
96                Ok(block) => return Some(Ok(block.clone())),
97                Err(e) => {
98                    tracing::error!(archive_key, %block_id, "invalid archive entry: {e:?}");
99                    this.remove_archive_if_same(archive_key, &info);
100                    if let Some(from) = &info.from {
101                        from.punish(PunishReason::Malicious);
102                    }
103                }
104            }
105        }
106    }
107
108    async fn get_block_impl(&self, block_id_relation: &BlockIdRelation) -> OptionalBlockStuff {
109        let this = self.inner.as_ref();
110
111        let block_id = block_id_relation.block_id;
112        let mc_block_id = block_id_relation.mc_block_id;
113
114        loop {
115            let Some((archive_key, info)) = this.get_archive(mc_block_id.seqno).await else {
116                tracing::warn!("shard block is too new for archives");
117
118                // NOTE: This is a strange situation, but if we wait a bit it might go away.
119                tokio::time::sleep(Duration::from_secs(1)).await;
120                continue;
121            };
122
123            match self
124                .checked_get_entry_by_id(&info.archive, &mc_block_id, &block_id)
125                .await
126            {
127                Ok(block) => return Some(Ok(block.clone())),
128                Err(e) => {
129                    tracing::error!(archive_key, %block_id, %mc_block_id, "invalid archive entry: {e:?}");
130                    this.remove_archive_if_same(archive_key, &info);
131                    if let Some(from) = &info.from {
132                        from.punish(PunishReason::Malicious);
133                    }
134                }
135            }
136        }
137    }
138
139    async fn checked_get_entry_by_id(
140        &self,
141        archive: &Arc<Archive>,
142        mc_block_id: &BlockId,
143        block_id: &BlockId,
144    ) -> Result<BlockStuffAug> {
145        let (block, ref proof, ref queue_diff) = match archive.get_entry_by_id(block_id).await {
146            Ok(entry) => entry,
147            Err(e) => anyhow::bail!("archive is corrupted: {e:?}"),
148        };
149
150        self.inner
151            .proof_checker
152            .check_proof(CheckProof {
153                mc_block_id,
154                block: &block,
155                proof,
156                queue_diff,
157                store_on_success: true,
158            })
159            .await?;
160
161        Ok(block)
162    }
163}
164
165struct Inner {
166    storage: CoreStorage,
167
168    client: Arc<dyn ArchiveClient>,
169    proof_checker: ProofChecker,
170
171    known_archives: parking_lot::Mutex<ArchivesMap>,
172
173    config: ArchiveBlockProviderConfig,
174}
175
176impl Inner {
177    async fn get_archive(&self, mc_seqno: u32) -> Option<(u32, ArchiveInfo)> {
178        loop {
179            let mut pending = 'pending: {
180                let mut guard = self.known_archives.lock();
181
182                // Search for the downloaded archive or for and existing downloader task.
183                for (archive_key, value) in guard.iter() {
184                    match value {
185                        ArchiveSlot::Downloaded(info) => {
186                            if info.archive.mc_block_ids.contains_key(&mc_seqno) {
187                                return Some((*archive_key, info.clone()));
188                            }
189                        }
190                        ArchiveSlot::Pending(task) => break 'pending task.clone(),
191                    }
192                }
193
194                // Start downloading otherwise
195                let task = self.make_downloader().spawn(mc_seqno);
196                guard.insert(mc_seqno, ArchiveSlot::Pending(task.clone()));
197
198                task
199            };
200
201            // Wait until the pending task is finished or cancelled
202            let mut res = None;
203            let mut finished = false;
204            loop {
205                match &*pending.rx.borrow_and_update() {
206                    ArchiveTaskState::None => {}
207                    ArchiveTaskState::Finished(archive) => {
208                        res = archive.clone();
209                        finished = true;
210                        break;
211                    }
212                    ArchiveTaskState::Cancelled => break,
213                }
214                if pending.rx.changed().await.is_err() {
215                    break;
216                }
217            }
218
219            // Replace pending with downloaded
220            match self.known_archives.lock().entry(pending.archive_key) {
221                btree_map::Entry::Vacant(_) => {
222                    // Do nothing if the entry was already removed.
223                }
224                btree_map::Entry::Occupied(mut entry) => match &res {
225                    None => {
226                        // Task was either cancelled or received `TooNew` so no archive received.
227                        entry.remove();
228                    }
229                    Some(info) => {
230                        // Task was finished with a non-empty result so store it.
231                        entry.insert(ArchiveSlot::Downloaded(info.clone()));
232                    }
233                },
234            }
235
236            if finished {
237                return res.map(|info| (pending.archive_key, info));
238            }
239
240            tracing::warn!(mc_seqno, "archive task cancelled while in use");
241            // Avoid spinloop just in case.
242            tokio::task::yield_now().await;
243        }
244    }
245
246    fn remove_archive_if_same(&self, archive_key: u32, prev: &ArchiveInfo) -> bool {
247        match self.known_archives.lock().entry(archive_key) {
248            btree_map::Entry::Vacant(_) => false,
249            btree_map::Entry::Occupied(entry) => {
250                if matches!(
251                    entry.get(),
252                    ArchiveSlot::Downloaded(info)
253                    if Arc::ptr_eq(&info.archive, &prev.archive)
254                ) {
255                    entry.remove();
256                    true
257                } else {
258                    false
259                }
260            }
261        }
262    }
263
264    fn make_downloader(&self) -> ArchiveDownloader {
265        ArchiveDownloader {
266            client: self.client.clone(),
267            storage: self.storage.clone(),
268            memory_threshold: self.config.max_archive_to_memory_size,
269        }
270    }
271
272    fn clear_outdated_archives(&self, bound: u32) {
273        let mut entries_remaining = 0usize;
274        let mut entries_removed = 0usize;
275
276        let mut guard = self.known_archives.lock();
277        guard.retain(|_, archive| {
278            let retain;
279            match archive {
280                ArchiveSlot::Downloaded(info) => match info.archive.mc_block_ids.last_key_value() {
281                    None => retain = false,
282                    Some((last_mc_seqno, _)) => retain = *last_mc_seqno >= bound,
283                },
284                ArchiveSlot::Pending(task) => {
285                    retain = task
286                        .archive_key
287                        .saturating_add(Archive::MAX_MC_BLOCKS_PER_ARCHIVE)
288                        >= bound;
289                    if !retain {
290                        task.abort_handle.abort();
291                    }
292                }
293            };
294
295            entries_remaining += retain as usize;
296            entries_removed += !retain as usize;
297            retain
298        });
299        drop(guard);
300
301        tracing::debug!(
302            entries_remaining,
303            entries_removed,
304            bound,
305            "removed known archives"
306        );
307    }
308}
309
310type ArchivesMap = BTreeMap<u32, ArchiveSlot>;
311
312enum ArchiveSlot {
313    Downloaded(ArchiveInfo),
314    Pending(ArchiveTask),
315}
316
317#[derive(Clone)]
318struct ArchiveInfo {
319    from: Option<Neighbour>, // None for S3
320    archive: Arc<Archive>,
321}
322
323struct ArchiveDownloader {
324    client: Arc<dyn ArchiveClient>,
325    storage: CoreStorage,
326    memory_threshold: ByteSize,
327}
328
329impl ArchiveDownloader {
330    fn spawn(self, mc_seqno: u32) -> ArchiveTask {
331        // TODO: Use a proper backoff here?
332        const INTERVAL: Duration = Duration::from_secs(1);
333
334        let (tx, rx) = watch::channel(ArchiveTaskState::None);
335
336        let guard = scopeguard::guard(tx, move |tx| {
337            tracing::warn!(mc_seqno, "cancelled preloading archive");
338            tx.send_modify(|prev| {
339                if !matches!(prev, ArchiveTaskState::Finished(..)) {
340                    *prev = ArchiveTaskState::Cancelled;
341                }
342            });
343        });
344
345        // NOTE: Use a separate downloader to prevent reference cycles
346        let handle = tokio::spawn(async move {
347            tracing::debug!(mc_seqno, "started preloading archive");
348            scopeguard::defer! {
349                tracing::debug!(mc_seqno, "finished preloading archive");
350            }
351
352            loop {
353                match self.try_download(mc_seqno).await {
354                    Ok(res) => {
355                        let tx = scopeguard::ScopeGuard::into_inner(guard);
356                        tx.send_modify(move |prev| *prev = ArchiveTaskState::Finished(res));
357                        break;
358                    }
359                    Err(e) => {
360                        tracing::error!(mc_seqno, "failed to preload archive {e:?}");
361                        tokio::time::sleep(INTERVAL).await;
362                    }
363                }
364            }
365        });
366
367        ArchiveTask {
368            archive_key: mc_seqno,
369            rx,
370            abort_handle: Arc::new(AbortOnDrop(handle.abort_handle())),
371        }
372    }
373
374    async fn try_download(&self, mc_seqno: u32) -> Result<Option<ArchiveInfo>> {
375        let ctx = ArchiveDownloadContext {
376            storage: &self.storage,
377            memory_threshold: self.memory_threshold,
378        };
379        let Some(found) = self.client.find_archive(mc_seqno, ctx).await? else {
380            return Ok(None);
381        };
382        let res = (found.download)().await?;
383
384        let span = tracing::Span::current();
385        tokio::task::spawn_blocking(move || {
386            let _span = span.enter();
387
388            let bytes = res.writer.try_freeze()?;
389
390            let archive = match Archive::new(bytes) {
391                Ok(array) => array,
392                Err(e) => {
393                    if let Some(neighbour) = res.neighbour {
394                        neighbour.punish(PunishReason::Malicious);
395                    }
396                    return Err(e);
397                }
398            };
399
400            if let Err(e) = archive.check_mc_blocks_range() {
401                // TODO: Punish a bit less for missing mc blocks?
402                if let Some(neighbour) = res.neighbour {
403                    neighbour.punish(PunishReason::Malicious);
404                }
405                return Err(e);
406            }
407
408            Ok(ArchiveInfo {
409                archive: Arc::new(archive),
410                from: res.neighbour,
411            })
412        })
413        .await?
414        .map(Some)
415    }
416}
417
418#[derive(Clone)]
419struct ArchiveTask {
420    archive_key: u32,
421    rx: watch::Receiver<ArchiveTaskState>,
422    abort_handle: Arc<AbortOnDrop>,
423}
424
425#[repr(transparent)]
426struct AbortOnDrop(AbortHandle);
427
428impl std::ops::Deref for AbortOnDrop {
429    type Target = AbortHandle;
430
431    #[inline]
432    fn deref(&self) -> &Self::Target {
433        &self.0
434    }
435}
436
437impl Drop for AbortOnDrop {
438    fn drop(&mut self) {
439        self.0.abort();
440    }
441}
442
443#[derive(Default)]
444enum ArchiveTaskState {
445    #[default]
446    None,
447    Finished(Option<ArchiveInfo>),
448    Cancelled,
449}
450
451impl BlockProvider for ArchiveBlockProvider {
452    type GetNextBlockFut<'a> = BoxFuture<'a, OptionalBlockStuff>;
453    type GetBlockFut<'a> = BoxFuture<'a, OptionalBlockStuff>;
454    type CleanupFut<'a> = futures_util::future::Ready<Result<()>>;
455
456    fn get_next_block<'a>(&'a self, prev_block_id: &'a BlockId) -> Self::GetNextBlockFut<'a> {
457        Box::pin(self.get_next_block_impl(prev_block_id))
458    }
459
460    fn get_block<'a>(&'a self, block_id_relation: &'a BlockIdRelation) -> Self::GetBlockFut<'a> {
461        Box::pin(self.get_block_impl(block_id_relation))
462    }
463
464    fn cleanup_until(&self, mc_seqno: u32) -> Self::CleanupFut<'_> {
465        self.inner.clear_outdated_archives(mc_seqno);
466        futures_util::future::ready(Ok(()))
467    }
468}
469
470pub struct ArchiveResponse {
471    writer: TargetWriter,
472    neighbour: Option<Neighbour>,
473}
474
475#[derive(Clone, Copy)]
476pub struct ArchiveDownloadContext<'a> {
477    pub storage: &'a CoreStorage,
478    pub memory_threshold: ByteSize,
479}
480
481impl<'a> ArchiveDownloadContext<'a> {
482    pub fn get_archive_writer(&self, size: NonZeroU64) -> Result<TargetWriter> {
483        Ok(if size.get() > self.memory_threshold.as_u64() {
484            let file = self.storage.context().temp_files().unnamed_file().open()?;
485            TargetWriter::File(std::io::BufWriter::new(file))
486        } else {
487            TargetWriter::Bytes(BytesMut::new().writer())
488        })
489    }
490
491    pub fn estimate_archive_id(&self, mc_seqno: u32) -> u32 {
492        self.storage.block_storage().estimate_archive_id(mc_seqno)
493    }
494}
495
496pub trait IntoArchiveClient {
497    fn into_archive_client(self) -> Arc<dyn ArchiveClient>;
498}
499
500impl<T: ArchiveClient> IntoArchiveClient for (T,) {
501    #[inline]
502    fn into_archive_client(self) -> Arc<dyn ArchiveClient> {
503        Arc::new(self.0)
504    }
505}
506
507impl<T1: ArchiveClient, T2: ArchiveClient> IntoArchiveClient for (T1, Option<T2>) {
508    fn into_archive_client(self) -> Arc<dyn ArchiveClient> {
509        let (primary, secondary) = self;
510        match secondary {
511            None => Arc::new(primary),
512            Some(secondary) => Arc::new(HybridArchiveClient::new(primary, secondary)),
513        }
514    }
515}
516
517pub struct FoundArchive<'a> {
518    pub archive_id: u64,
519    pub download: Box<dyn FnOnce() -> BoxFuture<'a, Result<ArchiveResponse>> + Send + 'a>,
520}
521
522#[async_trait]
523pub trait ArchiveClient: Send + Sync + 'static {
524    async fn find_archive<'a>(
525        &'a self,
526        mc_seqno: u32,
527        ctx: ArchiveDownloadContext<'a>,
528    ) -> Result<Option<FoundArchive<'a>>>;
529}
530
531#[async_trait]
532impl ArchiveClient for BlockchainRpcClient {
533    async fn find_archive<'a>(
534        &'a self,
535        mc_seqno: u32,
536        ctx: ArchiveDownloadContext<'a>,
537    ) -> Result<Option<FoundArchive<'a>>> {
538        Ok(match self.find_archive(mc_seqno).await? {
539            blockchain_rpc::PendingArchiveResponse::Found(found) => Some(FoundArchive {
540                archive_id: found.id,
541                download: Box::new(move || {
542                    Box::pin(async move {
543                        let neighbour = found.neighbour.clone();
544
545                        let output = ctx.get_archive_writer(found.size)?;
546                        let writer = self.download_archive(found, output).await?;
547
548                        Ok(ArchiveResponse {
549                            writer,
550                            neighbour: Some(neighbour),
551                        })
552                    })
553                }),
554            }),
555            blockchain_rpc::PendingArchiveResponse::TooNew => None,
556        })
557    }
558}
559
560impl IntoArchiveClient for BlockchainRpcClient {
561    #[inline]
562    fn into_archive_client(self) -> Arc<dyn ArchiveClient> {
563        Arc::new(self)
564    }
565}
566
567#[cfg(feature = "s3")]
568#[async_trait]
569impl ArchiveClient for S3Client {
570    async fn find_archive<'a>(
571        &'a self,
572        mc_seqno: u32,
573        ctx: ArchiveDownloadContext<'a>,
574    ) -> Result<Option<FoundArchive<'a>>> {
575        let archive_id = ctx.estimate_archive_id(mc_seqno);
576
577        let Some(info) = self.get_archive_info(archive_id).await? else {
578            return Ok(None);
579        };
580
581        Ok(Some(FoundArchive {
582            archive_id: info.archive_id as u64,
583            download: Box::new(move || {
584                Box::pin(async move {
585                    let output = ctx.get_archive_writer(info.size)?;
586                    let writer = self.download_archive(info.archive_id, output).await?;
587
588                    Ok(ArchiveResponse {
589                        writer,
590                        neighbour: None,
591                    })
592                })
593            }),
594        }))
595    }
596}
597
598#[cfg(feature = "s3")]
599impl IntoArchiveClient for S3Client {
600    #[inline]
601    fn into_archive_client(self) -> Arc<dyn ArchiveClient> {
602        Arc::new(self)
603    }
604}
605
606pub struct HybridArchiveClient<T1, T2> {
607    primary: T1,
608    secondary: T2,
609    prefer: HybridArchiveClientState,
610}
611
612impl<T1, T2> HybridArchiveClient<T1, T2> {
613    pub fn new(primary: T1, secondary: T2) -> Self {
614        Self {
615            primary,
616            secondary,
617            prefer: Default::default(),
618        }
619    }
620}
621
622#[async_trait]
623impl<T1, T2> ArchiveClient for HybridArchiveClient<T1, T2>
624where
625    T1: ArchiveClient,
626    T2: ArchiveClient,
627{
628    async fn find_archive<'a>(
629        &'a self,
630        mc_seqno: u32,
631        ctx: ArchiveDownloadContext<'a>,
632    ) -> Result<Option<FoundArchive<'a>>> {
633        // FIXME: There should be a better way of writing this.
634
635        if let Some(prefer) = self.prefer.get() {
636            tracing::debug!(mc_seqno, ?prefer);
637            match prefer {
638                HybridArchiveClientPart::Primary => {
639                    let res = self.primary.find_archive(mc_seqno, ctx).await;
640                    if matches!(&res, Ok(Some(_))) {
641                        return res;
642                    }
643                }
644                HybridArchiveClientPart::Secondary => {
645                    let res = self.secondary.find_archive(mc_seqno, ctx).await;
646                    if matches!(&res, Ok(Some(_))) {
647                        return res;
648                    }
649                }
650            }
651        }
652
653        self.prefer.set(None);
654
655        let primary = pin!(self.primary.find_archive(mc_seqno, ctx));
656        let secondary = pin!(self.secondary.find_archive(mc_seqno, ctx));
657        match futures_util::future::select(primary, secondary).await {
658            futures_util::future::Either::Left((found, other)) => {
659                match found {
660                    Ok(Some(found)) => {
661                        self.prefer.set(Some(HybridArchiveClientPart::Primary));
662                        return Ok(Some(found));
663                    }
664                    Ok(None) => {}
665                    Err(e) => tracing::warn!("primary archive client error: {e:?}"),
666                }
667                other.await.inspect(|res| {
668                    if res.is_some() {
669                        self.prefer.set(Some(HybridArchiveClientPart::Secondary));
670                    }
671                })
672            }
673            futures_util::future::Either::Right((found, other)) => {
674                match found {
675                    Ok(Some(found)) => {
676                        self.prefer.set(Some(HybridArchiveClientPart::Secondary));
677                        return Ok(Some(found));
678                    }
679                    Ok(None) => {}
680                    Err(e) => tracing::warn!("secondary archive client error: {e:?}"),
681                }
682                other.await.inspect(|res| {
683                    if res.is_some() {
684                        self.prefer.set(Some(HybridArchiveClientPart::Primary));
685                    }
686                })
687            }
688        }
689    }
690}
691
692#[derive(Default)]
693struct HybridArchiveClientState(AtomicU8);
694
695impl HybridArchiveClientState {
696    fn get(&self) -> Option<HybridArchiveClientPart> {
697        match self.0.load(Ordering::Acquire) {
698            1 => Some(HybridArchiveClientPart::Primary),
699            2 => Some(HybridArchiveClientPart::Secondary),
700            _ => None,
701        }
702    }
703
704    fn set(&self, value: Option<HybridArchiveClientPart>) {
705        let value = match value {
706            None => 0,
707            Some(HybridArchiveClientPart::Primary) => 1,
708            Some(HybridArchiveClientPart::Secondary) => 2,
709        };
710        self.0.store(value, Ordering::Release);
711    }
712}
713
714#[derive(Debug, Clone, Copy)]
715enum HybridArchiveClientPart {
716    Primary,
717    Secondary,
718}
719
720impl std::ops::Not for HybridArchiveClientPart {
721    type Output = Self;
722
723    #[inline]
724    fn not(self) -> Self::Output {
725        match self {
726            Self::Primary => Self::Secondary,
727            Self::Secondary => Self::Primary,
728        }
729    }
730}