1use std::collections::{BTreeMap, btree_map};
2use std::num::NonZeroU64;
3use std::pin::pin;
4use std::sync::Arc;
5use std::sync::atomic::{AtomicU8, Ordering};
6use std::time::Duration;
7
8use anyhow::Result;
9use async_trait::async_trait;
10use bytes::{BufMut, BytesMut};
11use bytesize::ByteSize;
12use futures_util::future::BoxFuture;
13use serde::{Deserialize, Serialize};
14use tokio::sync::watch;
15use tokio::task::AbortHandle;
16use tycho_block_util::archive::Archive;
17use tycho_block_util::block::{BlockIdRelation, BlockStuffAug};
18use tycho_types::models::BlockId;
19use tycho_util::fs::TargetWriter;
20use tycho_util::serde_helpers;
21
22use crate::block_strider::provider::{BlockProvider, CheckProof, OptionalBlockStuff, ProofChecker};
23use crate::blockchain_rpc;
24use crate::blockchain_rpc::BlockchainRpcClient;
25use crate::overlay_client::{Error, Neighbour, PunishReason};
26#[cfg(feature = "s3")]
27use crate::s3::S3Client;
28use crate::storage::CoreStorage;
29
30#[derive(Debug, Clone, Serialize, Deserialize)]
31#[serde(default)]
32pub struct ArchiveBlockProviderConfig {
33 pub max_archive_to_memory_size: ByteSize,
34
35 #[serde(with = "serde_helpers::humantime")]
39 pub recent_block_threshold: Duration,
40}
41
42impl Default for ArchiveBlockProviderConfig {
43 fn default() -> Self {
44 Self {
45 max_archive_to_memory_size: ByteSize::mb(100),
46 recent_block_threshold: Duration::from_secs(600),
47 }
48 }
49}
50
51#[derive(Clone)]
52#[repr(transparent)]
53pub struct ArchiveBlockProvider {
54 inner: Arc<Inner>,
55}
56
57impl ArchiveBlockProvider {
58 pub fn new(
59 client: impl IntoArchiveClient,
60 storage: CoreStorage,
61 config: ArchiveBlockProviderConfig,
62 ) -> Self {
63 let proof_checker = ProofChecker::new(storage.clone());
64
65 let sync_tracker = SyncTracker::new();
66
67 Self {
68 inner: Arc::new(Inner {
69 client: client.into_archive_client(),
70 proof_checker,
71
72 known_archives: parking_lot::Mutex::new(Default::default()),
73
74 sync_tracker,
75 storage,
76 config,
77 }),
78 }
79 }
80
81 async fn get_next_block_impl(&self, block_id: &BlockId) -> OptionalBlockStuff {
82 let this = self.inner.as_ref();
83
84 let next_mc_seqno = block_id.seqno + 1;
85
86 loop {
87 let Some((archive_key, info)) = this
88 .get_archive(ArchiveRequest::NextMcBlock {
89 mc_seqno: next_mc_seqno,
90 prev_block_id: *block_id,
91 })
92 .await
93 else {
94 tracing::warn!(prev_block_id = ?block_id, "archive not found");
95 break None;
96 };
97
98 let Some(block_id) = info.archive.mc_block_ids.get(&next_mc_seqno) else {
99 tracing::error!(
100 "received archive does not contain mc block with seqno {next_mc_seqno}"
101 );
102 this.remove_archive_if_same(archive_key, &info);
103 if let Some(from) = &info.from {
104 from.punish(PunishReason::Malicious);
105 }
106 continue;
107 };
108
109 match self
110 .checked_get_entry_by_id(&info.archive, block_id, block_id)
111 .await
112 {
113 Ok(block) => {
114 self.inner.sync_tracker.try_log(archive_key, &block);
115 return Some(Ok(block.clone()));
116 }
117 Err(e) => {
118 tracing::error!(archive_key, %block_id, "invalid archive entry: {e:?}");
119 this.remove_archive_if_same(archive_key, &info);
120 if let Some(from) = &info.from {
121 from.punish(PunishReason::Malicious);
122 }
123 }
124 }
125 }
126 }
127
128 async fn get_block_impl(&self, block_id_relation: &BlockIdRelation) -> OptionalBlockStuff {
129 let this = self.inner.as_ref();
130
131 let block_id = block_id_relation.block_id;
132 let mc_block_id = block_id_relation.mc_block_id;
133
134 loop {
135 let Some((archive_key, info)) = this
136 .get_archive(ArchiveRequest::ShardBlock {
137 mc_seqno: mc_block_id.seqno,
138 })
139 .await
140 else {
141 tracing::warn!("shard block is too new for archives");
142
143 tokio::time::sleep(Duration::from_secs(1)).await;
145 continue;
146 };
147
148 match self
149 .checked_get_entry_by_id(&info.archive, &mc_block_id, &block_id)
150 .await
151 {
152 Ok(block) => return Some(Ok(block.clone())),
153 Err(e) => {
154 tracing::error!(archive_key, %block_id, %mc_block_id, "invalid archive entry: {e:?}");
155 this.remove_archive_if_same(archive_key, &info);
156 if let Some(from) = &info.from {
157 from.punish(PunishReason::Malicious);
158 }
159 }
160 }
161 }
162 }
163
164 async fn checked_get_entry_by_id(
165 &self,
166 archive: &Arc<Archive>,
167 mc_block_id: &BlockId,
168 block_id: &BlockId,
169 ) -> Result<BlockStuffAug> {
170 let (block, ref proof, ref queue_diff) = match archive.get_entry_by_id(block_id).await {
171 Ok(entry) => entry,
172 Err(e) => anyhow::bail!("archive is corrupted: {e:?}"),
173 };
174
175 self.inner
176 .proof_checker
177 .check_proof(CheckProof {
178 mc_block_id,
179 block: &block,
180 proof,
181 queue_diff,
182 store_on_success: true,
183 })
184 .await?;
185
186 Ok(block)
187 }
188}
189
190struct Inner {
191 storage: CoreStorage,
192
193 client: Arc<dyn ArchiveClient>,
194 proof_checker: ProofChecker,
195
196 known_archives: parking_lot::Mutex<ArchivesMap>,
197
198 sync_tracker: SyncTracker,
199
200 config: ArchiveBlockProviderConfig,
201}
202
203impl Inner {
204 async fn get_archive(&self, request: ArchiveRequest) -> Option<(u32, ArchiveInfo)> {
205 let mc_seqno = request.mc_seqno();
206 loop {
207 let mut pending = 'pending: {
208 let mut guard = self.known_archives.lock();
209
210 for (archive_key, value) in guard.iter() {
212 match value {
213 ArchiveSlot::Downloaded(info) => {
214 if info.archive.mc_block_ids.contains_key(&mc_seqno) {
215 return Some((*archive_key, info.clone()));
216 }
217 }
218 ArchiveSlot::Pending(task) => break 'pending task.clone(),
219 }
220 }
221
222 let task = self.make_downloader().spawn(request);
224 guard.insert(mc_seqno, ArchiveSlot::Pending(task.clone()));
225
226 task
227 };
228
229 let mut res = None;
231 let mut finished = false;
232 loop {
233 match &*pending.rx.borrow_and_update() {
234 ArchiveTaskState::None => {}
235 ArchiveTaskState::Finished(archive) => {
236 res = archive.clone();
237 finished = true;
238 break;
239 }
240 ArchiveTaskState::Cancelled => break,
241 }
242 if pending.rx.changed().await.is_err() {
243 break;
244 }
245 }
246
247 match self.known_archives.lock().entry(pending.archive_key) {
249 btree_map::Entry::Vacant(_) => {
250 }
252 btree_map::Entry::Occupied(mut entry) => match &res {
253 None => {
254 entry.remove();
256 }
257 Some(info) => {
258 entry.insert(ArchiveSlot::Downloaded(info.clone()));
260 }
261 },
262 }
263
264 if finished {
265 return res.map(|info| (pending.archive_key, info));
266 }
267
268 tracing::warn!(mc_seqno, "archive task cancelled while in use");
269 tokio::task::yield_now().await;
271 }
272 }
273
274 fn remove_archive_if_same(&self, archive_key: u32, prev: &ArchiveInfo) -> bool {
275 match self.known_archives.lock().entry(archive_key) {
276 btree_map::Entry::Vacant(_) => false,
277 btree_map::Entry::Occupied(entry) => {
278 if matches!(
279 entry.get(),
280 ArchiveSlot::Downloaded(info)
281 if Arc::ptr_eq(&info.archive, &prev.archive)
282 ) {
283 entry.remove();
284 true
285 } else {
286 false
287 }
288 }
289 }
290 }
291
292 fn make_downloader(&self) -> ArchiveDownloader {
293 ArchiveDownloader {
294 client: self.client.clone(),
295 storage: self.storage.clone(),
296 memory_threshold: self.config.max_archive_to_memory_size,
297 recent_block_threshold: self.config.recent_block_threshold,
298 }
299 }
300
301 fn clear_outdated_archives(&self, bound: u32) {
302 let mut entries_remaining = 0usize;
303 let mut entries_removed = 0usize;
304
305 let mut guard = self.known_archives.lock();
306 guard.retain(|_, archive| {
307 let retain;
308 match archive {
309 ArchiveSlot::Downloaded(info) => match info.archive.mc_block_ids.last_key_value() {
310 None => retain = false,
311 Some((last_mc_seqno, _)) => retain = *last_mc_seqno >= bound,
312 },
313 ArchiveSlot::Pending(task) => {
314 retain = task
315 .archive_key
316 .saturating_add(Archive::MAX_MC_BLOCKS_PER_ARCHIVE)
317 >= bound;
318 if !retain {
319 task.abort_handle.abort();
320 }
321 }
322 };
323
324 entries_remaining += retain as usize;
325 entries_removed += !retain as usize;
326 retain
327 });
328 drop(guard);
329
330 tracing::debug!(
331 entries_remaining,
332 entries_removed,
333 bound,
334 "removed known archives"
335 );
336 }
337}
338
339type ArchivesMap = BTreeMap<u32, ArchiveSlot>;
340
341enum ArchiveSlot {
342 Downloaded(ArchiveInfo),
343 Pending(ArchiveTask),
344}
345
346#[derive(Clone, Copy)]
347enum ArchiveRequest {
348 NextMcBlock {
349 mc_seqno: u32,
350 prev_block_id: BlockId,
351 },
352 ShardBlock {
353 mc_seqno: u32,
354 },
355}
356
357impl ArchiveRequest {
358 fn mc_seqno(&self) -> u32 {
359 match self {
360 Self::NextMcBlock { mc_seqno, .. } | Self::ShardBlock { mc_seqno } => *mc_seqno,
361 }
362 }
363
364 fn prev_block_id(&self) -> Option<BlockId> {
365 match self {
366 Self::NextMcBlock { prev_block_id, .. } => Some(*prev_block_id),
367 Self::ShardBlock { .. } => None,
368 }
369 }
370}
371
372#[derive(Clone)]
373struct ArchiveInfo {
374 from: Option<Neighbour>, archive: Arc<Archive>,
376}
377
378struct ArchiveDownloader {
379 client: Arc<dyn ArchiveClient>,
380 storage: CoreStorage,
381 memory_threshold: ByteSize,
382 recent_block_threshold: Duration,
383}
384
385impl ArchiveDownloader {
386 fn spawn(self, request: ArchiveRequest) -> ArchiveTask {
387 const INTERVAL: Duration = Duration::from_secs(1);
389
390 let mc_seqno = request.mc_seqno();
391
392 let (tx, rx) = watch::channel(ArchiveTaskState::None);
393
394 let guard = scopeguard::guard(tx, move |tx| {
395 tracing::warn!(mc_seqno, "cancelled preloading archive");
396 tx.send_modify(|prev| {
397 if !matches!(prev, ArchiveTaskState::Finished(..)) {
398 *prev = ArchiveTaskState::Cancelled;
399 }
400 });
401 });
402
403 let handle = tokio::spawn(async move {
405 tracing::debug!(mc_seqno, "started preloading archive");
406 scopeguard::defer! {
407 tracing::debug!(mc_seqno, "finished preloading archive");
408 }
409
410 loop {
411 match self.try_download(mc_seqno).await {
412 Ok(res) => {
413 let tx = scopeguard::ScopeGuard::into_inner(guard);
414 tx.send_modify(move |prev| *prev = ArchiveTaskState::Finished(res));
415 break;
416 }
417 Err(e) => {
418 if let Some(Error::NotFound) = e.downcast_ref::<Error>() {
419 if let Some(prev_block_id) = request.prev_block_id()
421 && let Some(handle) = self
422 .storage
423 .block_handle_storage()
424 .load_handle(&prev_block_id)
425 {
426 let now = tycho_util::time::now_sec();
427 let prev_gen_utime = handle.gen_utime();
428
429 if now.saturating_sub(prev_gen_utime)
430 < self.recent_block_threshold.as_secs() as u32
431 {
432 tracing::info!(mc_seqno, prev_gen_utime, "block is recent");
434
435 let tx = scopeguard::ScopeGuard::into_inner(guard);
436 tx.send_modify(move |prev| {
437 *prev = ArchiveTaskState::Finished(None);
438 });
439
440 break;
441 }
442 }
443 }
444
445 tracing::error!(mc_seqno, "failed to preload archive {e:?}");
446 tokio::time::sleep(INTERVAL).await;
447 }
448 }
449 }
450 });
451
452 ArchiveTask {
453 archive_key: mc_seqno,
454 rx,
455 abort_handle: Arc::new(AbortOnDrop(handle.abort_handle())),
456 }
457 }
458
459 async fn try_download(&self, mc_seqno: u32) -> Result<Option<ArchiveInfo>> {
460 let ctx = ArchiveDownloadContext {
461 storage: &self.storage,
462 memory_threshold: self.memory_threshold,
463 };
464 let Some(found) = self.client.find_archive(mc_seqno, ctx).await? else {
465 return Ok(None);
466 };
467 let res = (found.download)().await?;
468
469 let span = tracing::Span::current();
470 tokio::task::spawn_blocking(move || {
471 let _span = span.enter();
472
473 let bytes = res.writer.try_freeze()?;
474
475 let archive = match Archive::new(bytes) {
476 Ok(array) => array,
477 Err(e) => {
478 if let Some(neighbour) = res.neighbour {
479 neighbour.punish(PunishReason::Malicious);
480 }
481 return Err(e);
482 }
483 };
484
485 if let Err(e) = archive.check_mc_blocks_range() {
486 if let Some(neighbour) = res.neighbour {
488 neighbour.punish(PunishReason::Malicious);
489 }
490 return Err(e);
491 }
492
493 Ok(ArchiveInfo {
494 archive: Arc::new(archive),
495 from: res.neighbour,
496 })
497 })
498 .await?
499 .map(Some)
500 }
501}
502
503#[derive(Clone)]
504struct ArchiveTask {
505 archive_key: u32,
506 rx: watch::Receiver<ArchiveTaskState>,
507 abort_handle: Arc<AbortOnDrop>,
508}
509
510#[repr(transparent)]
511struct AbortOnDrop(AbortHandle);
512
513impl std::ops::Deref for AbortOnDrop {
514 type Target = AbortHandle;
515
516 #[inline]
517 fn deref(&self) -> &Self::Target {
518 &self.0
519 }
520}
521
522impl Drop for AbortOnDrop {
523 fn drop(&mut self) {
524 self.0.abort();
525 }
526}
527
528#[derive(Default)]
529enum ArchiveTaskState {
530 #[default]
531 None,
532 Finished(Option<ArchiveInfo>),
533 Cancelled,
534}
535
536impl BlockProvider for ArchiveBlockProvider {
537 type GetNextBlockFut<'a> = BoxFuture<'a, OptionalBlockStuff>;
538 type GetBlockFut<'a> = BoxFuture<'a, OptionalBlockStuff>;
539 type CleanupFut<'a> = futures_util::future::Ready<Result<()>>;
540
541 fn get_next_block<'a>(&'a self, prev_block_id: &'a BlockId) -> Self::GetNextBlockFut<'a> {
542 Box::pin(self.get_next_block_impl(prev_block_id))
543 }
544
545 fn get_block<'a>(&'a self, block_id_relation: &'a BlockIdRelation) -> Self::GetBlockFut<'a> {
546 Box::pin(self.get_block_impl(block_id_relation))
547 }
548
549 fn cleanup_until(&self, mc_seqno: u32) -> Self::CleanupFut<'_> {
550 self.inner.clear_outdated_archives(mc_seqno);
551 futures_util::future::ready(Ok(()))
552 }
553}
554
555pub struct ArchiveResponse {
556 writer: TargetWriter,
557 neighbour: Option<Neighbour>,
558}
559
560#[derive(Clone, Copy)]
561pub struct ArchiveDownloadContext<'a> {
562 pub storage: &'a CoreStorage,
563 pub memory_threshold: ByteSize,
564}
565
566impl<'a> ArchiveDownloadContext<'a> {
567 pub fn get_archive_writer(&self, size: NonZeroU64) -> Result<TargetWriter> {
568 Ok(if size.get() > self.memory_threshold.as_u64() {
569 let file = self.storage.context().temp_files().unnamed_file().open()?;
570 TargetWriter::File(std::io::BufWriter::new(file))
571 } else {
572 TargetWriter::Bytes(BytesMut::new().writer())
573 })
574 }
575
576 pub fn estimate_archive_id(&self, mc_seqno: u32) -> u32 {
577 self.storage.block_storage().estimate_archive_id(mc_seqno)
578 }
579}
580
581pub trait IntoArchiveClient {
582 fn into_archive_client(self) -> Arc<dyn ArchiveClient>;
583}
584
585impl<T: ArchiveClient> IntoArchiveClient for (T,) {
586 #[inline]
587 fn into_archive_client(self) -> Arc<dyn ArchiveClient> {
588 Arc::new(self.0)
589 }
590}
591
592impl<T1: ArchiveClient, T2: ArchiveClient> IntoArchiveClient for (T1, Option<T2>) {
593 fn into_archive_client(self) -> Arc<dyn ArchiveClient> {
594 let (primary, secondary) = self;
595 match secondary {
596 None => Arc::new(primary),
597 Some(secondary) => Arc::new(HybridArchiveClient::new(primary, secondary)),
598 }
599 }
600}
601
602pub struct FoundArchive<'a> {
603 pub archive_id: u64,
604 pub download: Box<dyn FnOnce() -> BoxFuture<'a, Result<ArchiveResponse>> + Send + 'a>,
605}
606
607#[async_trait]
608pub trait ArchiveClient: Send + Sync + 'static {
609 async fn find_archive<'a>(
610 &'a self,
611 mc_seqno: u32,
612 ctx: ArchiveDownloadContext<'a>,
613 ) -> Result<Option<FoundArchive<'a>>>;
614}
615
616#[async_trait]
617impl ArchiveClient for BlockchainRpcClient {
618 async fn find_archive<'a>(
619 &'a self,
620 mc_seqno: u32,
621 ctx: ArchiveDownloadContext<'a>,
622 ) -> Result<Option<FoundArchive<'a>>> {
623 Ok(match self.find_archive(mc_seqno).await? {
624 blockchain_rpc::PendingArchiveResponse::Found(found) => Some(FoundArchive {
625 archive_id: found.id,
626 download: Box::new(move || {
627 Box::pin(async move {
628 let neighbour = found.neighbour.clone();
629
630 let output = ctx.get_archive_writer(found.size)?;
631 let writer = self.download_archive(found, output).await?;
632
633 Ok(ArchiveResponse {
634 writer,
635 neighbour: Some(neighbour),
636 })
637 })
638 }),
639 }),
640 blockchain_rpc::PendingArchiveResponse::TooNew => None,
641 })
642 }
643}
644
645impl IntoArchiveClient for BlockchainRpcClient {
646 #[inline]
647 fn into_archive_client(self) -> Arc<dyn ArchiveClient> {
648 Arc::new(self)
649 }
650}
651
652#[cfg(feature = "s3")]
653#[async_trait]
654impl ArchiveClient for S3Client {
655 async fn find_archive<'a>(
656 &'a self,
657 mc_seqno: u32,
658 ctx: ArchiveDownloadContext<'a>,
659 ) -> Result<Option<FoundArchive<'a>>> {
660 let archive_id = ctx.estimate_archive_id(mc_seqno);
661
662 let Some(info) = self.get_archive_info(archive_id).await? else {
663 return Ok(None);
664 };
665
666 Ok(Some(FoundArchive {
667 archive_id: info.archive_id as u64,
668 download: Box::new(move || {
669 Box::pin(async move {
670 let output = ctx.get_archive_writer(info.size)?;
671 let writer = self.download_archive(info.archive_id, output).await?;
672
673 Ok(ArchiveResponse {
674 writer,
675 neighbour: None,
676 })
677 })
678 }),
679 }))
680 }
681}
682
683#[cfg(feature = "s3")]
684impl IntoArchiveClient for S3Client {
685 #[inline]
686 fn into_archive_client(self) -> Arc<dyn ArchiveClient> {
687 Arc::new(self)
688 }
689}
690
691pub struct HybridArchiveClient<T1, T2> {
692 primary: T1,
693 secondary: T2,
694 prefer: HybridArchiveClientState,
695}
696
697impl<T1, T2> HybridArchiveClient<T1, T2> {
698 pub fn new(primary: T1, secondary: T2) -> Self {
699 Self {
700 primary,
701 secondary,
702 prefer: Default::default(),
703 }
704 }
705}
706
707#[async_trait]
708impl<T1, T2> ArchiveClient for HybridArchiveClient<T1, T2>
709where
710 T1: ArchiveClient,
711 T2: ArchiveClient,
712{
713 async fn find_archive<'a>(
714 &'a self,
715 mc_seqno: u32,
716 ctx: ArchiveDownloadContext<'a>,
717 ) -> Result<Option<FoundArchive<'a>>> {
718 if let Some(prefer) = self.prefer.get() {
721 tracing::debug!(mc_seqno, ?prefer);
722 match prefer {
723 HybridArchiveClientPart::Primary => {
724 let res = self.primary.find_archive(mc_seqno, ctx).await;
725 if matches!(&res, Ok(Some(_))) {
726 return res;
727 }
728 }
729 HybridArchiveClientPart::Secondary => {
730 let res = self.secondary.find_archive(mc_seqno, ctx).await;
731 if matches!(&res, Ok(Some(_))) {
732 return res;
733 }
734 }
735 }
736 }
737
738 self.prefer.set(None);
739
740 let primary = pin!(self.primary.find_archive(mc_seqno, ctx));
741 let secondary = pin!(self.secondary.find_archive(mc_seqno, ctx));
742 match futures_util::future::select(primary, secondary).await {
743 futures_util::future::Either::Left((found, other)) => {
744 match found {
745 Ok(Some(found)) => {
746 self.prefer.set(Some(HybridArchiveClientPart::Primary));
747 return Ok(Some(found));
748 }
749 Ok(None) => {}
750 Err(e) => tracing::warn!("primary archive client error: {e:?}"),
751 }
752 other.await.inspect(|res| {
753 if res.is_some() {
754 self.prefer.set(Some(HybridArchiveClientPart::Secondary));
755 }
756 })
757 }
758 futures_util::future::Either::Right((found, other)) => {
759 match found {
760 Ok(Some(found)) => {
761 self.prefer.set(Some(HybridArchiveClientPart::Secondary));
762 return Ok(Some(found));
763 }
764 Ok(None) => {}
765 Err(e) => tracing::warn!("secondary archive client error: {e:?}"),
766 }
767 other.await.inspect(|res| {
768 if res.is_some() {
769 self.prefer.set(Some(HybridArchiveClientPart::Primary));
770 }
771 })
772 }
773 }
774 }
775}
776
777#[derive(Default)]
778struct HybridArchiveClientState(AtomicU8);
779
780impl HybridArchiveClientState {
781 fn get(&self) -> Option<HybridArchiveClientPart> {
782 match self.0.load(Ordering::Acquire) {
783 1 => Some(HybridArchiveClientPart::Primary),
784 2 => Some(HybridArchiveClientPart::Secondary),
785 _ => None,
786 }
787 }
788
789 fn set(&self, value: Option<HybridArchiveClientPart>) {
790 let value = match value {
791 None => 0,
792 Some(HybridArchiveClientPart::Primary) => 1,
793 Some(HybridArchiveClientPart::Secondary) => 2,
794 };
795 self.0.store(value, Ordering::Release);
796 }
797}
798
799#[derive(Debug, Clone, Copy)]
800enum HybridArchiveClientPart {
801 Primary,
802 Secondary,
803}
804
805impl std::ops::Not for HybridArchiveClientPart {
806 type Output = Self;
807
808 #[inline]
809 fn not(self) -> Self::Output {
810 match self {
811 Self::Primary => Self::Secondary,
812 Self::Secondary => Self::Primary,
813 }
814 }
815}
816
817struct SyncTracker {
818 inner: parking_lot::Mutex<SyncTrackerInner>,
819}
820
821struct SyncTrackerInner {
822 window: std::collections::VecDeque<(u32, u64, u64)>, last_archive_key: Option<u32>,
824}
825
826impl SyncTracker {
827 const WINDOW_SIZE: usize = 300;
829
830 fn new() -> Self {
831 Self {
832 inner: parking_lot::Mutex::new(SyncTrackerInner {
833 window: std::collections::VecDeque::with_capacity(Self::WINDOW_SIZE),
834 last_archive_key: None,
835 }),
836 }
837 }
838
839 fn try_log(&self, archive_key: u32, block: &BlockStuffAug) -> Option<()> {
840 let mut inner = self.inner.lock();
841
842 let seqno = block.id().seqno;
843
844 if let Some((last_seqno, _, _)) = inner.window.back()
846 && seqno != last_seqno + 1
847 {
848 inner.window.clear();
849 inner.last_archive_key = None;
850 }
851
852 let block_info = block.data.load_info().ok()?;
853
854 let now = tycho_util::time::now_millis();
855 let gen_utime = (block_info.gen_utime as u64) * 1000 + (block_info.gen_utime_ms as u64);
856 inner.window.push_back((seqno, gen_utime, now));
857
858 if inner.window.len() > Self::WINDOW_SIZE {
859 inner.window.pop_front();
860 }
861
862 if inner.last_archive_key == Some(archive_key) {
864 return None;
865 }
866 inner.last_archive_key = Some(archive_key);
867
868 let (_, first_gen_time, first_insert_time) = inner.window.front()?;
869 let (_, last_gen_time, last_insert_time) = inner.window.back()?;
870
871 if first_gen_time >= last_gen_time || first_insert_time >= last_insert_time {
872 return None;
873 }
874
875 let lag_ms = now.saturating_sub(*last_gen_time);
876
877 let sync_rate =
878 (last_gen_time - first_gen_time) as f64 / (last_insert_time - first_insert_time) as f64;
879
880 let eta_ms = if sync_rate > 1.0 {
882 (lag_ms as f64 / (sync_rate - 1.0)) as u64
883 } else {
884 lag_ms };
886
887 tracing::info!(
888 seqno = block.id().seqno,
889 lag = %humantime::format_duration(Duration::from_millis(lag_ms)),
890 rate = format!("{:.2}x", sync_rate),
891 eta = %humantime::format_duration(Duration::from_millis(eta_ms)),
892 "sync progress"
893 );
894
895 Some(())
896 }
897}