1use std::collections::{BTreeMap, btree_map};
2use std::num::NonZeroU64;
3use std::pin::pin;
4use std::sync::Arc;
5use std::sync::atomic::{AtomicU8, Ordering};
6use std::time::Duration;
7
8use anyhow::Result;
9use async_trait::async_trait;
10use bytes::{BufMut, BytesMut};
11use bytesize::ByteSize;
12use futures_util::future::BoxFuture;
13use serde::{Deserialize, Serialize};
14use tokio::sync::watch;
15use tokio::task::AbortHandle;
16use tycho_block_util::archive::Archive;
17use tycho_block_util::block::{BlockIdRelation, BlockStuffAug};
18use tycho_types::models::BlockId;
19use tycho_util::fs::TargetWriter;
20
21use crate::block_strider::provider::{BlockProvider, CheckProof, OptionalBlockStuff, ProofChecker};
22use crate::blockchain_rpc;
23use crate::blockchain_rpc::BlockchainRpcClient;
24use crate::overlay_client::{Neighbour, PunishReason};
25#[cfg(feature = "s3")]
26use crate::s3::S3Client;
27use crate::storage::CoreStorage;
28
29#[derive(Debug, Clone, Serialize, Deserialize)]
30#[serde(default)]
31pub struct ArchiveBlockProviderConfig {
32 pub max_archive_to_memory_size: ByteSize,
33}
34
35impl Default for ArchiveBlockProviderConfig {
36 fn default() -> Self {
37 Self {
38 max_archive_to_memory_size: ByteSize::mb(100),
39 }
40 }
41}
42
43#[derive(Clone)]
44#[repr(transparent)]
45pub struct ArchiveBlockProvider {
46 inner: Arc<Inner>,
47}
48
49impl ArchiveBlockProvider {
50 pub fn new(
51 client: impl IntoArchiveClient,
52 storage: CoreStorage,
53 config: ArchiveBlockProviderConfig,
54 ) -> Self {
55 let proof_checker = ProofChecker::new(storage.clone());
56
57 let sync_tracker = SyncTracker::new();
58
59 Self {
60 inner: Arc::new(Inner {
61 client: client.into_archive_client(),
62 proof_checker,
63
64 known_archives: parking_lot::Mutex::new(Default::default()),
65
66 sync_tracker,
67 storage,
68 config,
69 }),
70 }
71 }
72
73 async fn get_next_block_impl(&self, block_id: &BlockId) -> OptionalBlockStuff {
74 let this = self.inner.as_ref();
75
76 let next_mc_seqno = block_id.seqno + 1;
77
78 loop {
79 let Some((archive_key, info)) = this.get_archive(next_mc_seqno).await else {
80 tracing::warn!(prev_block_id = ?block_id, "archive not found");
81 break None;
82 };
83
84 let Some(block_id) = info.archive.mc_block_ids.get(&next_mc_seqno) else {
85 tracing::error!(
86 "received archive does not contain mc block with seqno {next_mc_seqno}"
87 );
88 this.remove_archive_if_same(archive_key, &info);
89 if let Some(from) = &info.from {
90 from.punish(PunishReason::Malicious);
91 }
92 continue;
93 };
94
95 match self
96 .checked_get_entry_by_id(&info.archive, block_id, block_id)
97 .await
98 {
99 Ok(block) => {
100 self.inner.sync_tracker.try_log(archive_key, &block);
101 return Some(Ok(block.clone()));
102 }
103 Err(e) => {
104 tracing::error!(archive_key, %block_id, "invalid archive entry: {e:?}");
105 this.remove_archive_if_same(archive_key, &info);
106 if let Some(from) = &info.from {
107 from.punish(PunishReason::Malicious);
108 }
109 }
110 }
111 }
112 }
113
114 async fn get_block_impl(&self, block_id_relation: &BlockIdRelation) -> OptionalBlockStuff {
115 let this = self.inner.as_ref();
116
117 let block_id = block_id_relation.block_id;
118 let mc_block_id = block_id_relation.mc_block_id;
119
120 loop {
121 let Some((archive_key, info)) = this.get_archive(mc_block_id.seqno).await else {
122 tracing::warn!("shard block is too new for archives");
123
124 tokio::time::sleep(Duration::from_secs(1)).await;
126 continue;
127 };
128
129 match self
130 .checked_get_entry_by_id(&info.archive, &mc_block_id, &block_id)
131 .await
132 {
133 Ok(block) => return Some(Ok(block.clone())),
134 Err(e) => {
135 tracing::error!(archive_key, %block_id, %mc_block_id, "invalid archive entry: {e:?}");
136 this.remove_archive_if_same(archive_key, &info);
137 if let Some(from) = &info.from {
138 from.punish(PunishReason::Malicious);
139 }
140 }
141 }
142 }
143 }
144
145 async fn checked_get_entry_by_id(
146 &self,
147 archive: &Arc<Archive>,
148 mc_block_id: &BlockId,
149 block_id: &BlockId,
150 ) -> Result<BlockStuffAug> {
151 let (block, ref proof, ref queue_diff) = match archive.get_entry_by_id(block_id).await {
152 Ok(entry) => entry,
153 Err(e) => anyhow::bail!("archive is corrupted: {e:?}"),
154 };
155
156 self.inner
157 .proof_checker
158 .check_proof(CheckProof {
159 mc_block_id,
160 block: &block,
161 proof,
162 queue_diff,
163 store_on_success: true,
164 })
165 .await?;
166
167 Ok(block)
168 }
169}
170
171struct Inner {
172 storage: CoreStorage,
173
174 client: Arc<dyn ArchiveClient>,
175 proof_checker: ProofChecker,
176
177 known_archives: parking_lot::Mutex<ArchivesMap>,
178
179 sync_tracker: SyncTracker,
180
181 config: ArchiveBlockProviderConfig,
182}
183
184impl Inner {
185 async fn get_archive(&self, mc_seqno: u32) -> Option<(u32, ArchiveInfo)> {
186 loop {
187 let mut pending = 'pending: {
188 let mut guard = self.known_archives.lock();
189
190 for (archive_key, value) in guard.iter() {
192 match value {
193 ArchiveSlot::Downloaded(info) => {
194 if info.archive.mc_block_ids.contains_key(&mc_seqno) {
195 return Some((*archive_key, info.clone()));
196 }
197 }
198 ArchiveSlot::Pending(task) => break 'pending task.clone(),
199 }
200 }
201
202 let task = self.make_downloader().spawn(mc_seqno);
204 guard.insert(mc_seqno, ArchiveSlot::Pending(task.clone()));
205
206 task
207 };
208
209 let mut res = None;
211 let mut finished = false;
212 loop {
213 match &*pending.rx.borrow_and_update() {
214 ArchiveTaskState::None => {}
215 ArchiveTaskState::Finished(archive) => {
216 res = archive.clone();
217 finished = true;
218 break;
219 }
220 ArchiveTaskState::Cancelled => break,
221 }
222 if pending.rx.changed().await.is_err() {
223 break;
224 }
225 }
226
227 match self.known_archives.lock().entry(pending.archive_key) {
229 btree_map::Entry::Vacant(_) => {
230 }
232 btree_map::Entry::Occupied(mut entry) => match &res {
233 None => {
234 entry.remove();
236 }
237 Some(info) => {
238 entry.insert(ArchiveSlot::Downloaded(info.clone()));
240 }
241 },
242 }
243
244 if finished {
245 return res.map(|info| (pending.archive_key, info));
246 }
247
248 tracing::warn!(mc_seqno, "archive task cancelled while in use");
249 tokio::task::yield_now().await;
251 }
252 }
253
254 fn remove_archive_if_same(&self, archive_key: u32, prev: &ArchiveInfo) -> bool {
255 match self.known_archives.lock().entry(archive_key) {
256 btree_map::Entry::Vacant(_) => false,
257 btree_map::Entry::Occupied(entry) => {
258 if matches!(
259 entry.get(),
260 ArchiveSlot::Downloaded(info)
261 if Arc::ptr_eq(&info.archive, &prev.archive)
262 ) {
263 entry.remove();
264 true
265 } else {
266 false
267 }
268 }
269 }
270 }
271
272 fn make_downloader(&self) -> ArchiveDownloader {
273 ArchiveDownloader {
274 client: self.client.clone(),
275 storage: self.storage.clone(),
276 memory_threshold: self.config.max_archive_to_memory_size,
277 }
278 }
279
280 fn clear_outdated_archives(&self, bound: u32) {
281 let mut entries_remaining = 0usize;
282 let mut entries_removed = 0usize;
283
284 let mut guard = self.known_archives.lock();
285 guard.retain(|_, archive| {
286 let retain;
287 match archive {
288 ArchiveSlot::Downloaded(info) => match info.archive.mc_block_ids.last_key_value() {
289 None => retain = false,
290 Some((last_mc_seqno, _)) => retain = *last_mc_seqno >= bound,
291 },
292 ArchiveSlot::Pending(task) => {
293 retain = task
294 .archive_key
295 .saturating_add(Archive::MAX_MC_BLOCKS_PER_ARCHIVE)
296 >= bound;
297 if !retain {
298 task.abort_handle.abort();
299 }
300 }
301 };
302
303 entries_remaining += retain as usize;
304 entries_removed += !retain as usize;
305 retain
306 });
307 drop(guard);
308
309 tracing::debug!(
310 entries_remaining,
311 entries_removed,
312 bound,
313 "removed known archives"
314 );
315 }
316}
317
318type ArchivesMap = BTreeMap<u32, ArchiveSlot>;
319
320enum ArchiveSlot {
321 Downloaded(ArchiveInfo),
322 Pending(ArchiveTask),
323}
324
325#[derive(Clone)]
326struct ArchiveInfo {
327 from: Option<Neighbour>, archive: Arc<Archive>,
329}
330
331struct ArchiveDownloader {
332 client: Arc<dyn ArchiveClient>,
333 storage: CoreStorage,
334 memory_threshold: ByteSize,
335}
336
337impl ArchiveDownloader {
338 fn spawn(self, mc_seqno: u32) -> ArchiveTask {
339 const INTERVAL: Duration = Duration::from_secs(1);
341
342 let (tx, rx) = watch::channel(ArchiveTaskState::None);
343
344 let guard = scopeguard::guard(tx, move |tx| {
345 tracing::warn!(mc_seqno, "cancelled preloading archive");
346 tx.send_modify(|prev| {
347 if !matches!(prev, ArchiveTaskState::Finished(..)) {
348 *prev = ArchiveTaskState::Cancelled;
349 }
350 });
351 });
352
353 let handle = tokio::spawn(async move {
355 tracing::debug!(mc_seqno, "started preloading archive");
356 scopeguard::defer! {
357 tracing::debug!(mc_seqno, "finished preloading archive");
358 }
359
360 loop {
361 match self.try_download(mc_seqno).await {
362 Ok(res) => {
363 let tx = scopeguard::ScopeGuard::into_inner(guard);
364 tx.send_modify(move |prev| *prev = ArchiveTaskState::Finished(res));
365 break;
366 }
367 Err(e) => {
368 tracing::error!(mc_seqno, "failed to preload archive {e:?}");
369 tokio::time::sleep(INTERVAL).await;
370 }
371 }
372 }
373 });
374
375 ArchiveTask {
376 archive_key: mc_seqno,
377 rx,
378 abort_handle: Arc::new(AbortOnDrop(handle.abort_handle())),
379 }
380 }
381
382 async fn try_download(&self, mc_seqno: u32) -> Result<Option<ArchiveInfo>> {
383 let ctx = ArchiveDownloadContext {
384 storage: &self.storage,
385 memory_threshold: self.memory_threshold,
386 };
387 let Some(found) = self.client.find_archive(mc_seqno, ctx).await? else {
388 return Ok(None);
389 };
390 let res = (found.download)().await?;
391
392 let span = tracing::Span::current();
393 tokio::task::spawn_blocking(move || {
394 let _span = span.enter();
395
396 let bytes = res.writer.try_freeze()?;
397
398 let archive = match Archive::new(bytes) {
399 Ok(array) => array,
400 Err(e) => {
401 if let Some(neighbour) = res.neighbour {
402 neighbour.punish(PunishReason::Malicious);
403 }
404 return Err(e);
405 }
406 };
407
408 if let Err(e) = archive.check_mc_blocks_range() {
409 if let Some(neighbour) = res.neighbour {
411 neighbour.punish(PunishReason::Malicious);
412 }
413 return Err(e);
414 }
415
416 Ok(ArchiveInfo {
417 archive: Arc::new(archive),
418 from: res.neighbour,
419 })
420 })
421 .await?
422 .map(Some)
423 }
424}
425
426#[derive(Clone)]
427struct ArchiveTask {
428 archive_key: u32,
429 rx: watch::Receiver<ArchiveTaskState>,
430 abort_handle: Arc<AbortOnDrop>,
431}
432
433#[repr(transparent)]
434struct AbortOnDrop(AbortHandle);
435
436impl std::ops::Deref for AbortOnDrop {
437 type Target = AbortHandle;
438
439 #[inline]
440 fn deref(&self) -> &Self::Target {
441 &self.0
442 }
443}
444
445impl Drop for AbortOnDrop {
446 fn drop(&mut self) {
447 self.0.abort();
448 }
449}
450
451#[derive(Default)]
452enum ArchiveTaskState {
453 #[default]
454 None,
455 Finished(Option<ArchiveInfo>),
456 Cancelled,
457}
458
459impl BlockProvider for ArchiveBlockProvider {
460 type GetNextBlockFut<'a> = BoxFuture<'a, OptionalBlockStuff>;
461 type GetBlockFut<'a> = BoxFuture<'a, OptionalBlockStuff>;
462 type CleanupFut<'a> = futures_util::future::Ready<Result<()>>;
463
464 fn get_next_block<'a>(&'a self, prev_block_id: &'a BlockId) -> Self::GetNextBlockFut<'a> {
465 Box::pin(self.get_next_block_impl(prev_block_id))
466 }
467
468 fn get_block<'a>(&'a self, block_id_relation: &'a BlockIdRelation) -> Self::GetBlockFut<'a> {
469 Box::pin(self.get_block_impl(block_id_relation))
470 }
471
472 fn cleanup_until(&self, mc_seqno: u32) -> Self::CleanupFut<'_> {
473 self.inner.clear_outdated_archives(mc_seqno);
474 futures_util::future::ready(Ok(()))
475 }
476}
477
478pub struct ArchiveResponse {
479 writer: TargetWriter,
480 neighbour: Option<Neighbour>,
481}
482
483#[derive(Clone, Copy)]
484pub struct ArchiveDownloadContext<'a> {
485 pub storage: &'a CoreStorage,
486 pub memory_threshold: ByteSize,
487}
488
489impl<'a> ArchiveDownloadContext<'a> {
490 pub fn get_archive_writer(&self, size: NonZeroU64) -> Result<TargetWriter> {
491 Ok(if size.get() > self.memory_threshold.as_u64() {
492 let file = self.storage.context().temp_files().unnamed_file().open()?;
493 TargetWriter::File(std::io::BufWriter::new(file))
494 } else {
495 TargetWriter::Bytes(BytesMut::new().writer())
496 })
497 }
498
499 pub fn estimate_archive_id(&self, mc_seqno: u32) -> u32 {
500 self.storage.block_storage().estimate_archive_id(mc_seqno)
501 }
502}
503
504pub trait IntoArchiveClient {
505 fn into_archive_client(self) -> Arc<dyn ArchiveClient>;
506}
507
508impl<T: ArchiveClient> IntoArchiveClient for (T,) {
509 #[inline]
510 fn into_archive_client(self) -> Arc<dyn ArchiveClient> {
511 Arc::new(self.0)
512 }
513}
514
515impl<T1: ArchiveClient, T2: ArchiveClient> IntoArchiveClient for (T1, Option<T2>) {
516 fn into_archive_client(self) -> Arc<dyn ArchiveClient> {
517 let (primary, secondary) = self;
518 match secondary {
519 None => Arc::new(primary),
520 Some(secondary) => Arc::new(HybridArchiveClient::new(primary, secondary)),
521 }
522 }
523}
524
525pub struct FoundArchive<'a> {
526 pub archive_id: u64,
527 pub download: Box<dyn FnOnce() -> BoxFuture<'a, Result<ArchiveResponse>> + Send + 'a>,
528}
529
530#[async_trait]
531pub trait ArchiveClient: Send + Sync + 'static {
532 async fn find_archive<'a>(
533 &'a self,
534 mc_seqno: u32,
535 ctx: ArchiveDownloadContext<'a>,
536 ) -> Result<Option<FoundArchive<'a>>>;
537}
538
539#[async_trait]
540impl ArchiveClient for BlockchainRpcClient {
541 async fn find_archive<'a>(
542 &'a self,
543 mc_seqno: u32,
544 ctx: ArchiveDownloadContext<'a>,
545 ) -> Result<Option<FoundArchive<'a>>> {
546 Ok(match self.find_archive(mc_seqno).await? {
547 blockchain_rpc::PendingArchiveResponse::Found(found) => Some(FoundArchive {
548 archive_id: found.id,
549 download: Box::new(move || {
550 Box::pin(async move {
551 let neighbour = found.neighbour.clone();
552
553 let output = ctx.get_archive_writer(found.size)?;
554 let writer = self.download_archive(found, output).await?;
555
556 Ok(ArchiveResponse {
557 writer,
558 neighbour: Some(neighbour),
559 })
560 })
561 }),
562 }),
563 blockchain_rpc::PendingArchiveResponse::TooNew => None,
564 })
565 }
566}
567
568impl IntoArchiveClient for BlockchainRpcClient {
569 #[inline]
570 fn into_archive_client(self) -> Arc<dyn ArchiveClient> {
571 Arc::new(self)
572 }
573}
574
575#[cfg(feature = "s3")]
576#[async_trait]
577impl ArchiveClient for S3Client {
578 async fn find_archive<'a>(
579 &'a self,
580 mc_seqno: u32,
581 ctx: ArchiveDownloadContext<'a>,
582 ) -> Result<Option<FoundArchive<'a>>> {
583 let archive_id = ctx.estimate_archive_id(mc_seqno);
584
585 let Some(info) = self.get_archive_info(archive_id).await? else {
586 return Ok(None);
587 };
588
589 Ok(Some(FoundArchive {
590 archive_id: info.archive_id as u64,
591 download: Box::new(move || {
592 Box::pin(async move {
593 let output = ctx.get_archive_writer(info.size)?;
594 let writer = self.download_archive(info.archive_id, output).await?;
595
596 Ok(ArchiveResponse {
597 writer,
598 neighbour: None,
599 })
600 })
601 }),
602 }))
603 }
604}
605
606#[cfg(feature = "s3")]
607impl IntoArchiveClient for S3Client {
608 #[inline]
609 fn into_archive_client(self) -> Arc<dyn ArchiveClient> {
610 Arc::new(self)
611 }
612}
613
614pub struct HybridArchiveClient<T1, T2> {
615 primary: T1,
616 secondary: T2,
617 prefer: HybridArchiveClientState,
618}
619
620impl<T1, T2> HybridArchiveClient<T1, T2> {
621 pub fn new(primary: T1, secondary: T2) -> Self {
622 Self {
623 primary,
624 secondary,
625 prefer: Default::default(),
626 }
627 }
628}
629
630#[async_trait]
631impl<T1, T2> ArchiveClient for HybridArchiveClient<T1, T2>
632where
633 T1: ArchiveClient,
634 T2: ArchiveClient,
635{
636 async fn find_archive<'a>(
637 &'a self,
638 mc_seqno: u32,
639 ctx: ArchiveDownloadContext<'a>,
640 ) -> Result<Option<FoundArchive<'a>>> {
641 if let Some(prefer) = self.prefer.get() {
644 tracing::debug!(mc_seqno, ?prefer);
645 match prefer {
646 HybridArchiveClientPart::Primary => {
647 let res = self.primary.find_archive(mc_seqno, ctx).await;
648 if matches!(&res, Ok(Some(_))) {
649 return res;
650 }
651 }
652 HybridArchiveClientPart::Secondary => {
653 let res = self.secondary.find_archive(mc_seqno, ctx).await;
654 if matches!(&res, Ok(Some(_))) {
655 return res;
656 }
657 }
658 }
659 }
660
661 self.prefer.set(None);
662
663 let primary = pin!(self.primary.find_archive(mc_seqno, ctx));
664 let secondary = pin!(self.secondary.find_archive(mc_seqno, ctx));
665 match futures_util::future::select(primary, secondary).await {
666 futures_util::future::Either::Left((found, other)) => {
667 match found {
668 Ok(Some(found)) => {
669 self.prefer.set(Some(HybridArchiveClientPart::Primary));
670 return Ok(Some(found));
671 }
672 Ok(None) => {}
673 Err(e) => tracing::warn!("primary archive client error: {e:?}"),
674 }
675 other.await.inspect(|res| {
676 if res.is_some() {
677 self.prefer.set(Some(HybridArchiveClientPart::Secondary));
678 }
679 })
680 }
681 futures_util::future::Either::Right((found, other)) => {
682 match found {
683 Ok(Some(found)) => {
684 self.prefer.set(Some(HybridArchiveClientPart::Secondary));
685 return Ok(Some(found));
686 }
687 Ok(None) => {}
688 Err(e) => tracing::warn!("secondary archive client error: {e:?}"),
689 }
690 other.await.inspect(|res| {
691 if res.is_some() {
692 self.prefer.set(Some(HybridArchiveClientPart::Primary));
693 }
694 })
695 }
696 }
697 }
698}
699
700#[derive(Default)]
701struct HybridArchiveClientState(AtomicU8);
702
703impl HybridArchiveClientState {
704 fn get(&self) -> Option<HybridArchiveClientPart> {
705 match self.0.load(Ordering::Acquire) {
706 1 => Some(HybridArchiveClientPart::Primary),
707 2 => Some(HybridArchiveClientPart::Secondary),
708 _ => None,
709 }
710 }
711
712 fn set(&self, value: Option<HybridArchiveClientPart>) {
713 let value = match value {
714 None => 0,
715 Some(HybridArchiveClientPart::Primary) => 1,
716 Some(HybridArchiveClientPart::Secondary) => 2,
717 };
718 self.0.store(value, Ordering::Release);
719 }
720}
721
722#[derive(Debug, Clone, Copy)]
723enum HybridArchiveClientPart {
724 Primary,
725 Secondary,
726}
727
728impl std::ops::Not for HybridArchiveClientPart {
729 type Output = Self;
730
731 #[inline]
732 fn not(self) -> Self::Output {
733 match self {
734 Self::Primary => Self::Secondary,
735 Self::Secondary => Self::Primary,
736 }
737 }
738}
739
740struct SyncTracker {
741 inner: parking_lot::Mutex<SyncTrackerInner>,
742}
743
744struct SyncTrackerInner {
745 window: std::collections::VecDeque<(u32, u64, u64)>, last_archive_key: Option<u32>,
747}
748
749impl SyncTracker {
750 const WINDOW_SIZE: usize = 300;
752
753 fn new() -> Self {
754 Self {
755 inner: parking_lot::Mutex::new(SyncTrackerInner {
756 window: std::collections::VecDeque::with_capacity(Self::WINDOW_SIZE),
757 last_archive_key: None,
758 }),
759 }
760 }
761
762 fn try_log(&self, archive_key: u32, block: &BlockStuffAug) -> Option<()> {
763 let mut inner = self.inner.lock();
764
765 let seqno = block.id().seqno;
766
767 if let Some((last_seqno, _, _)) = inner.window.back()
769 && seqno != last_seqno + 1
770 {
771 inner.window.clear();
772 inner.last_archive_key = None;
773 }
774
775 let block_info = block.data.load_info().ok()?;
776
777 let now = tycho_util::time::now_millis();
778 let gen_utime = (block_info.gen_utime as u64) * 1000 + (block_info.gen_utime_ms as u64);
779 inner.window.push_back((seqno, gen_utime, now));
780
781 if inner.window.len() > Self::WINDOW_SIZE {
782 inner.window.pop_front();
783 }
784
785 if inner.last_archive_key == Some(archive_key) {
787 return None;
788 }
789 inner.last_archive_key = Some(archive_key);
790
791 let (_, first_gen_time, first_insert_time) = inner.window.front()?;
792 let (_, last_gen_time, last_insert_time) = inner.window.back()?;
793
794 if first_gen_time >= last_gen_time || first_insert_time >= last_insert_time {
795 return None;
796 }
797
798 let lag_ms = now.saturating_sub(*last_gen_time);
799
800 let sync_rate =
801 (last_gen_time - first_gen_time) as f64 / (last_insert_time - first_insert_time) as f64;
802
803 let eta_ms = if sync_rate > 1.0 {
805 (lag_ms as f64 / (sync_rate - 1.0)) as u64
806 } else {
807 lag_ms };
809
810 tracing::info!(
811 seqno = block.id().seqno,
812 lag = %humantime::format_duration(Duration::from_millis(lag_ms)),
813 rate = format!("{:.2}x", sync_rate),
814 eta = %humantime::format_duration(Duration::from_millis(eta_ms)),
815 "sync progress"
816 );
817
818 Some(())
819 }
820}