1use std::collections::{BTreeMap, btree_map};
2use std::num::NonZeroU64;
3use std::pin::pin;
4use std::sync::Arc;
5use std::sync::atomic::{AtomicU8, Ordering};
6use std::time::Duration;
7
8use anyhow::Result;
9use async_trait::async_trait;
10use bytes::{BufMut, BytesMut};
11use bytesize::ByteSize;
12use futures_util::future::BoxFuture;
13use serde::{Deserialize, Serialize};
14use tokio::sync::watch;
15use tokio::task::AbortHandle;
16use tycho_block_util::archive::Archive;
17use tycho_block_util::block::{BlockIdRelation, BlockStuffAug};
18use tycho_types::models::BlockId;
19use tycho_util::fs::TargetWriter;
20
21use crate::block_strider::provider::{BlockProvider, CheckProof, OptionalBlockStuff, ProofChecker};
22use crate::blockchain_rpc;
23use crate::blockchain_rpc::BlockchainRpcClient;
24use crate::overlay_client::{Neighbour, PunishReason};
25#[cfg(feature = "s3")]
26use crate::s3::S3Client;
27use crate::storage::CoreStorage;
28
29#[derive(Debug, Clone, Serialize, Deserialize)]
30#[serde(default)]
31pub struct ArchiveBlockProviderConfig {
32 pub max_archive_to_memory_size: ByteSize,
33}
34
35impl Default for ArchiveBlockProviderConfig {
36 fn default() -> Self {
37 Self {
38 max_archive_to_memory_size: ByteSize::mb(100),
39 }
40 }
41}
42
43#[derive(Clone)]
44#[repr(transparent)]
45pub struct ArchiveBlockProvider {
46 inner: Arc<Inner>,
47}
48
49impl ArchiveBlockProvider {
50 pub fn new(
51 client: impl IntoArchiveClient,
52 storage: CoreStorage,
53 config: ArchiveBlockProviderConfig,
54 ) -> Self {
55 let proof_checker = ProofChecker::new(storage.clone());
56
57 Self {
58 inner: Arc::new(Inner {
59 client: client.into_archive_client(),
60 proof_checker,
61
62 known_archives: parking_lot::Mutex::new(Default::default()),
63
64 storage,
65 config,
66 }),
67 }
68 }
69
70 async fn get_next_block_impl(&self, block_id: &BlockId) -> OptionalBlockStuff {
71 let this = self.inner.as_ref();
72
73 let next_mc_seqno = block_id.seqno + 1;
74
75 loop {
76 let Some((archive_key, info)) = this.get_archive(next_mc_seqno).await else {
77 tracing::warn!(prev_block_id = ?block_id, "archive not found");
78 break None;
79 };
80
81 let Some(block_id) = info.archive.mc_block_ids.get(&next_mc_seqno) else {
82 tracing::error!(
83 "received archive does not contain mc block with seqno {next_mc_seqno}"
84 );
85 this.remove_archive_if_same(archive_key, &info);
86 if let Some(from) = &info.from {
87 from.punish(PunishReason::Malicious);
88 }
89 continue;
90 };
91
92 match self
93 .checked_get_entry_by_id(&info.archive, block_id, block_id)
94 .await
95 {
96 Ok(block) => return Some(Ok(block.clone())),
97 Err(e) => {
98 tracing::error!(archive_key, %block_id, "invalid archive entry: {e:?}");
99 this.remove_archive_if_same(archive_key, &info);
100 if let Some(from) = &info.from {
101 from.punish(PunishReason::Malicious);
102 }
103 }
104 }
105 }
106 }
107
108 async fn get_block_impl(&self, block_id_relation: &BlockIdRelation) -> OptionalBlockStuff {
109 let this = self.inner.as_ref();
110
111 let block_id = block_id_relation.block_id;
112 let mc_block_id = block_id_relation.mc_block_id;
113
114 loop {
115 let Some((archive_key, info)) = this.get_archive(mc_block_id.seqno).await else {
116 tracing::warn!("shard block is too new for archives");
117
118 tokio::time::sleep(Duration::from_secs(1)).await;
120 continue;
121 };
122
123 match self
124 .checked_get_entry_by_id(&info.archive, &mc_block_id, &block_id)
125 .await
126 {
127 Ok(block) => return Some(Ok(block.clone())),
128 Err(e) => {
129 tracing::error!(archive_key, %block_id, %mc_block_id, "invalid archive entry: {e:?}");
130 this.remove_archive_if_same(archive_key, &info);
131 if let Some(from) = &info.from {
132 from.punish(PunishReason::Malicious);
133 }
134 }
135 }
136 }
137 }
138
139 async fn checked_get_entry_by_id(
140 &self,
141 archive: &Arc<Archive>,
142 mc_block_id: &BlockId,
143 block_id: &BlockId,
144 ) -> Result<BlockStuffAug> {
145 let (block, ref proof, ref queue_diff) = match archive.get_entry_by_id(block_id).await {
146 Ok(entry) => entry,
147 Err(e) => anyhow::bail!("archive is corrupted: {e:?}"),
148 };
149
150 self.inner
151 .proof_checker
152 .check_proof(CheckProof {
153 mc_block_id,
154 block: &block,
155 proof,
156 queue_diff,
157 store_on_success: true,
158 })
159 .await?;
160
161 Ok(block)
162 }
163}
164
165struct Inner {
166 storage: CoreStorage,
167
168 client: Arc<dyn ArchiveClient>,
169 proof_checker: ProofChecker,
170
171 known_archives: parking_lot::Mutex<ArchivesMap>,
172
173 config: ArchiveBlockProviderConfig,
174}
175
176impl Inner {
177 async fn get_archive(&self, mc_seqno: u32) -> Option<(u32, ArchiveInfo)> {
178 loop {
179 let mut pending = 'pending: {
180 let mut guard = self.known_archives.lock();
181
182 for (archive_key, value) in guard.iter() {
184 match value {
185 ArchiveSlot::Downloaded(info) => {
186 if info.archive.mc_block_ids.contains_key(&mc_seqno) {
187 return Some((*archive_key, info.clone()));
188 }
189 }
190 ArchiveSlot::Pending(task) => break 'pending task.clone(),
191 }
192 }
193
194 let task = self.make_downloader().spawn(mc_seqno);
196 guard.insert(mc_seqno, ArchiveSlot::Pending(task.clone()));
197
198 task
199 };
200
201 let mut res = None;
203 let mut finished = false;
204 loop {
205 match &*pending.rx.borrow_and_update() {
206 ArchiveTaskState::None => {}
207 ArchiveTaskState::Finished(archive) => {
208 res = archive.clone();
209 finished = true;
210 break;
211 }
212 ArchiveTaskState::Cancelled => break,
213 }
214 if pending.rx.changed().await.is_err() {
215 break;
216 }
217 }
218
219 match self.known_archives.lock().entry(pending.archive_key) {
221 btree_map::Entry::Vacant(_) => {
222 }
224 btree_map::Entry::Occupied(mut entry) => match &res {
225 None => {
226 entry.remove();
228 }
229 Some(info) => {
230 entry.insert(ArchiveSlot::Downloaded(info.clone()));
232 }
233 },
234 }
235
236 if finished {
237 return res.map(|info| (pending.archive_key, info));
238 }
239
240 tracing::warn!(mc_seqno, "archive task cancelled while in use");
241 tokio::task::yield_now().await;
243 }
244 }
245
246 fn remove_archive_if_same(&self, archive_key: u32, prev: &ArchiveInfo) -> bool {
247 match self.known_archives.lock().entry(archive_key) {
248 btree_map::Entry::Vacant(_) => false,
249 btree_map::Entry::Occupied(entry) => {
250 if matches!(
251 entry.get(),
252 ArchiveSlot::Downloaded(info)
253 if Arc::ptr_eq(&info.archive, &prev.archive)
254 ) {
255 entry.remove();
256 true
257 } else {
258 false
259 }
260 }
261 }
262 }
263
264 fn make_downloader(&self) -> ArchiveDownloader {
265 ArchiveDownloader {
266 client: self.client.clone(),
267 storage: self.storage.clone(),
268 memory_threshold: self.config.max_archive_to_memory_size,
269 }
270 }
271
272 fn clear_outdated_archives(&self, bound: u32) {
273 let mut entries_remaining = 0usize;
274 let mut entries_removed = 0usize;
275
276 let mut guard = self.known_archives.lock();
277 guard.retain(|_, archive| {
278 let retain;
279 match archive {
280 ArchiveSlot::Downloaded(info) => match info.archive.mc_block_ids.last_key_value() {
281 None => retain = false,
282 Some((last_mc_seqno, _)) => retain = *last_mc_seqno >= bound,
283 },
284 ArchiveSlot::Pending(task) => {
285 retain = task
286 .archive_key
287 .saturating_add(Archive::MAX_MC_BLOCKS_PER_ARCHIVE)
288 >= bound;
289 if !retain {
290 task.abort_handle.abort();
291 }
292 }
293 };
294
295 entries_remaining += retain as usize;
296 entries_removed += !retain as usize;
297 retain
298 });
299 drop(guard);
300
301 tracing::debug!(
302 entries_remaining,
303 entries_removed,
304 bound,
305 "removed known archives"
306 );
307 }
308}
309
310type ArchivesMap = BTreeMap<u32, ArchiveSlot>;
311
312enum ArchiveSlot {
313 Downloaded(ArchiveInfo),
314 Pending(ArchiveTask),
315}
316
317#[derive(Clone)]
318struct ArchiveInfo {
319 from: Option<Neighbour>, archive: Arc<Archive>,
321}
322
323struct ArchiveDownloader {
324 client: Arc<dyn ArchiveClient>,
325 storage: CoreStorage,
326 memory_threshold: ByteSize,
327}
328
329impl ArchiveDownloader {
330 fn spawn(self, mc_seqno: u32) -> ArchiveTask {
331 const INTERVAL: Duration = Duration::from_secs(1);
333
334 let (tx, rx) = watch::channel(ArchiveTaskState::None);
335
336 let guard = scopeguard::guard(tx, move |tx| {
337 tracing::warn!(mc_seqno, "cancelled preloading archive");
338 tx.send_modify(|prev| {
339 if !matches!(prev, ArchiveTaskState::Finished(..)) {
340 *prev = ArchiveTaskState::Cancelled;
341 }
342 });
343 });
344
345 let handle = tokio::spawn(async move {
347 tracing::debug!(mc_seqno, "started preloading archive");
348 scopeguard::defer! {
349 tracing::debug!(mc_seqno, "finished preloading archive");
350 }
351
352 loop {
353 match self.try_download(mc_seqno).await {
354 Ok(res) => {
355 let tx = scopeguard::ScopeGuard::into_inner(guard);
356 tx.send_modify(move |prev| *prev = ArchiveTaskState::Finished(res));
357 break;
358 }
359 Err(e) => {
360 tracing::error!(mc_seqno, "failed to preload archive {e:?}");
361 tokio::time::sleep(INTERVAL).await;
362 }
363 }
364 }
365 });
366
367 ArchiveTask {
368 archive_key: mc_seqno,
369 rx,
370 abort_handle: Arc::new(AbortOnDrop(handle.abort_handle())),
371 }
372 }
373
374 async fn try_download(&self, mc_seqno: u32) -> Result<Option<ArchiveInfo>> {
375 let ctx = ArchiveDownloadContext {
376 storage: &self.storage,
377 memory_threshold: self.memory_threshold,
378 };
379 let Some(found) = self.client.find_archive(mc_seqno, ctx).await? else {
380 return Ok(None);
381 };
382 let res = (found.download)().await?;
383
384 let span = tracing::Span::current();
385 tokio::task::spawn_blocking(move || {
386 let _span = span.enter();
387
388 let bytes = res.writer.try_freeze()?;
389
390 let archive = match Archive::new(bytes) {
391 Ok(array) => array,
392 Err(e) => {
393 if let Some(neighbour) = res.neighbour {
394 neighbour.punish(PunishReason::Malicious);
395 }
396 return Err(e);
397 }
398 };
399
400 if let Err(e) = archive.check_mc_blocks_range() {
401 if let Some(neighbour) = res.neighbour {
403 neighbour.punish(PunishReason::Malicious);
404 }
405 return Err(e);
406 }
407
408 Ok(ArchiveInfo {
409 archive: Arc::new(archive),
410 from: res.neighbour,
411 })
412 })
413 .await?
414 .map(Some)
415 }
416}
417
418#[derive(Clone)]
419struct ArchiveTask {
420 archive_key: u32,
421 rx: watch::Receiver<ArchiveTaskState>,
422 abort_handle: Arc<AbortOnDrop>,
423}
424
425#[repr(transparent)]
426struct AbortOnDrop(AbortHandle);
427
428impl std::ops::Deref for AbortOnDrop {
429 type Target = AbortHandle;
430
431 #[inline]
432 fn deref(&self) -> &Self::Target {
433 &self.0
434 }
435}
436
437impl Drop for AbortOnDrop {
438 fn drop(&mut self) {
439 self.0.abort();
440 }
441}
442
443#[derive(Default)]
444enum ArchiveTaskState {
445 #[default]
446 None,
447 Finished(Option<ArchiveInfo>),
448 Cancelled,
449}
450
451impl BlockProvider for ArchiveBlockProvider {
452 type GetNextBlockFut<'a> = BoxFuture<'a, OptionalBlockStuff>;
453 type GetBlockFut<'a> = BoxFuture<'a, OptionalBlockStuff>;
454 type CleanupFut<'a> = futures_util::future::Ready<Result<()>>;
455
456 fn get_next_block<'a>(&'a self, prev_block_id: &'a BlockId) -> Self::GetNextBlockFut<'a> {
457 Box::pin(self.get_next_block_impl(prev_block_id))
458 }
459
460 fn get_block<'a>(&'a self, block_id_relation: &'a BlockIdRelation) -> Self::GetBlockFut<'a> {
461 Box::pin(self.get_block_impl(block_id_relation))
462 }
463
464 fn cleanup_until(&self, mc_seqno: u32) -> Self::CleanupFut<'_> {
465 self.inner.clear_outdated_archives(mc_seqno);
466 futures_util::future::ready(Ok(()))
467 }
468}
469
470pub struct ArchiveResponse {
471 writer: TargetWriter,
472 neighbour: Option<Neighbour>,
473}
474
475#[derive(Clone, Copy)]
476pub struct ArchiveDownloadContext<'a> {
477 pub storage: &'a CoreStorage,
478 pub memory_threshold: ByteSize,
479}
480
481impl<'a> ArchiveDownloadContext<'a> {
482 pub fn get_archive_writer(&self, size: NonZeroU64) -> Result<TargetWriter> {
483 Ok(if size.get() > self.memory_threshold.as_u64() {
484 let file = self.storage.context().temp_files().unnamed_file().open()?;
485 TargetWriter::File(std::io::BufWriter::new(file))
486 } else {
487 TargetWriter::Bytes(BytesMut::new().writer())
488 })
489 }
490
491 pub fn estimate_archive_id(&self, mc_seqno: u32) -> u32 {
492 self.storage.block_storage().estimate_archive_id(mc_seqno)
493 }
494}
495
496pub trait IntoArchiveClient {
497 fn into_archive_client(self) -> Arc<dyn ArchiveClient>;
498}
499
500impl<T: ArchiveClient> IntoArchiveClient for (T,) {
501 #[inline]
502 fn into_archive_client(self) -> Arc<dyn ArchiveClient> {
503 Arc::new(self.0)
504 }
505}
506
507impl<T1: ArchiveClient, T2: ArchiveClient> IntoArchiveClient for (T1, Option<T2>) {
508 fn into_archive_client(self) -> Arc<dyn ArchiveClient> {
509 let (primary, secondary) = self;
510 match secondary {
511 None => Arc::new(primary),
512 Some(secondary) => Arc::new(HybridArchiveClient::new(primary, secondary)),
513 }
514 }
515}
516
517pub struct FoundArchive<'a> {
518 pub archive_id: u64,
519 pub download: Box<dyn FnOnce() -> BoxFuture<'a, Result<ArchiveResponse>> + Send + 'a>,
520}
521
522#[async_trait]
523pub trait ArchiveClient: Send + Sync + 'static {
524 async fn find_archive<'a>(
525 &'a self,
526 mc_seqno: u32,
527 ctx: ArchiveDownloadContext<'a>,
528 ) -> Result<Option<FoundArchive<'a>>>;
529}
530
531#[async_trait]
532impl ArchiveClient for BlockchainRpcClient {
533 async fn find_archive<'a>(
534 &'a self,
535 mc_seqno: u32,
536 ctx: ArchiveDownloadContext<'a>,
537 ) -> Result<Option<FoundArchive<'a>>> {
538 Ok(match self.find_archive(mc_seqno).await? {
539 blockchain_rpc::PendingArchiveResponse::Found(found) => Some(FoundArchive {
540 archive_id: found.id,
541 download: Box::new(move || {
542 Box::pin(async move {
543 let neighbour = found.neighbour.clone();
544
545 let output = ctx.get_archive_writer(found.size)?;
546 let writer = self.download_archive(found, output).await?;
547
548 Ok(ArchiveResponse {
549 writer,
550 neighbour: Some(neighbour),
551 })
552 })
553 }),
554 }),
555 blockchain_rpc::PendingArchiveResponse::TooNew => None,
556 })
557 }
558}
559
560impl IntoArchiveClient for BlockchainRpcClient {
561 #[inline]
562 fn into_archive_client(self) -> Arc<dyn ArchiveClient> {
563 Arc::new(self)
564 }
565}
566
567#[cfg(feature = "s3")]
568#[async_trait]
569impl ArchiveClient for S3Client {
570 async fn find_archive<'a>(
571 &'a self,
572 mc_seqno: u32,
573 ctx: ArchiveDownloadContext<'a>,
574 ) -> Result<Option<FoundArchive<'a>>> {
575 let archive_id = ctx.estimate_archive_id(mc_seqno);
576
577 let Some(info) = self.get_archive_info(archive_id).await? else {
578 return Ok(None);
579 };
580
581 Ok(Some(FoundArchive {
582 archive_id: info.archive_id as u64,
583 download: Box::new(move || {
584 Box::pin(async move {
585 let output = ctx.get_archive_writer(info.size)?;
586 let writer = self.download_archive(info.archive_id, output).await?;
587
588 Ok(ArchiveResponse {
589 writer,
590 neighbour: None,
591 })
592 })
593 }),
594 }))
595 }
596}
597
598#[cfg(feature = "s3")]
599impl IntoArchiveClient for S3Client {
600 #[inline]
601 fn into_archive_client(self) -> Arc<dyn ArchiveClient> {
602 Arc::new(self)
603 }
604}
605
606pub struct HybridArchiveClient<T1, T2> {
607 primary: T1,
608 secondary: T2,
609 prefer: HybridArchiveClientState,
610}
611
612impl<T1, T2> HybridArchiveClient<T1, T2> {
613 pub fn new(primary: T1, secondary: T2) -> Self {
614 Self {
615 primary,
616 secondary,
617 prefer: Default::default(),
618 }
619 }
620}
621
622#[async_trait]
623impl<T1, T2> ArchiveClient for HybridArchiveClient<T1, T2>
624where
625 T1: ArchiveClient,
626 T2: ArchiveClient,
627{
628 async fn find_archive<'a>(
629 &'a self,
630 mc_seqno: u32,
631 ctx: ArchiveDownloadContext<'a>,
632 ) -> Result<Option<FoundArchive<'a>>> {
633 if let Some(prefer) = self.prefer.get() {
636 tracing::debug!(mc_seqno, ?prefer);
637 match prefer {
638 HybridArchiveClientPart::Primary => {
639 let res = self.primary.find_archive(mc_seqno, ctx).await;
640 if matches!(&res, Ok(Some(_))) {
641 return res;
642 }
643 }
644 HybridArchiveClientPart::Secondary => {
645 let res = self.secondary.find_archive(mc_seqno, ctx).await;
646 if matches!(&res, Ok(Some(_))) {
647 return res;
648 }
649 }
650 }
651 }
652
653 self.prefer.set(None);
654
655 let primary = pin!(self.primary.find_archive(mc_seqno, ctx));
656 let secondary = pin!(self.secondary.find_archive(mc_seqno, ctx));
657 match futures_util::future::select(primary, secondary).await {
658 futures_util::future::Either::Left((found, other)) => {
659 match found {
660 Ok(Some(found)) => {
661 self.prefer.set(Some(HybridArchiveClientPart::Primary));
662 return Ok(Some(found));
663 }
664 Ok(None) => {}
665 Err(e) => tracing::warn!("primary archive client error: {e:?}"),
666 }
667 other.await.inspect(|res| {
668 if res.is_some() {
669 self.prefer.set(Some(HybridArchiveClientPart::Secondary));
670 }
671 })
672 }
673 futures_util::future::Either::Right((found, other)) => {
674 match found {
675 Ok(Some(found)) => {
676 self.prefer.set(Some(HybridArchiveClientPart::Secondary));
677 return Ok(Some(found));
678 }
679 Ok(None) => {}
680 Err(e) => tracing::warn!("secondary archive client error: {e:?}"),
681 }
682 other.await.inspect(|res| {
683 if res.is_some() {
684 self.prefer.set(Some(HybridArchiveClientPart::Primary));
685 }
686 })
687 }
688 }
689 }
690}
691
692#[derive(Default)]
693struct HybridArchiveClientState(AtomicU8);
694
695impl HybridArchiveClientState {
696 fn get(&self) -> Option<HybridArchiveClientPart> {
697 match self.0.load(Ordering::Acquire) {
698 1 => Some(HybridArchiveClientPart::Primary),
699 2 => Some(HybridArchiveClientPart::Secondary),
700 _ => None,
701 }
702 }
703
704 fn set(&self, value: Option<HybridArchiveClientPart>) {
705 let value = match value {
706 None => 0,
707 Some(HybridArchiveClientPart::Primary) => 1,
708 Some(HybridArchiveClientPart::Secondary) => 2,
709 };
710 self.0.store(value, Ordering::Release);
711 }
712}
713
714#[derive(Debug, Clone, Copy)]
715enum HybridArchiveClientPart {
716 Primary,
717 Secondary,
718}
719
720impl std::ops::Not for HybridArchiveClientPart {
721 type Output = Self;
722
723 #[inline]
724 fn not(self) -> Self::Output {
725 match self {
726 Self::Primary => Self::Secondary,
727 Self::Secondary => Self::Primary,
728 }
729 }
730}