1use std::marker::PhantomData;
4use std::pin::pin;
5use std::sync::Arc;
6use std::time::Duration;
7
8use backoff::ExponentialBackoffBuilder;
9use backoff::backoff::Backoff;
10use celestia_types::ExtendedHeader;
11use lumina_utils::executor::{JoinHandle, spawn};
12use lumina_utils::time::{Instant, Interval, sleep};
13use serde::{Deserialize, Serialize};
14use tendermint::Time;
15use tokio::select;
16use tokio::sync::{mpsc, oneshot};
17use tokio_util::sync::CancellationToken;
18use tracing::{debug, error, info, instrument, warn};
19
20use crate::block_ranges::{BlockRange, BlockRangeExt, BlockRanges};
21use crate::events::{EventPublisher, NodeEvent};
22use crate::p2p::{P2p, P2pError};
23use crate::store::{Store, StoreError};
24use crate::utils::{FusedReusableFuture, OneshotSenderExt, TimeExt};
25
26type Result<T, E = SyncerError> = std::result::Result<T, E>;
27
28const TRY_INIT_BACKOFF_MAX_INTERVAL: Duration = Duration::from_secs(60);
29const SLOW_SYNC_MIN_THRESHOLD: u64 = 50;
30
31#[derive(Debug, thiserror::Error)]
33pub enum SyncerError {
34 #[error("P2p: {0}")]
36 P2p(#[from] P2pError),
37
38 #[error("Store: {0}")]
40 Store(#[from] StoreError),
41
42 #[error("Worker died")]
44 WorkerDied,
45
46 #[error("Channel closed unexpectedly")]
48 ChannelClosedUnexpectedly,
49}
50
51impl SyncerError {
52 pub(crate) fn is_fatal(&self) -> bool {
53 match self {
54 SyncerError::P2p(e) => e.is_fatal(),
55 SyncerError::Store(e) => e.is_fatal(),
56 SyncerError::WorkerDied | SyncerError::ChannelClosedUnexpectedly => true,
57 }
58 }
59}
60
61impl From<oneshot::error::RecvError> for SyncerError {
62 fn from(_value: oneshot::error::RecvError) -> Self {
63 SyncerError::ChannelClosedUnexpectedly
64 }
65}
66
67#[derive(Debug)]
69pub(crate) struct Syncer<S>
70where
71 S: Store + 'static,
72{
73 cmd_tx: mpsc::Sender<SyncerCmd>,
74 cancellation_token: CancellationToken,
75 join_handle: JoinHandle,
76 _store: PhantomData<S>,
77}
78
79pub(crate) struct SyncerArgs<S>
81where
82 S: Store + 'static,
83{
84 pub(crate) p2p: Arc<P2p>,
86 pub(crate) store: Arc<S>,
88 pub(crate) event_pub: EventPublisher,
90 pub(crate) batch_size: u64,
92 pub(crate) sampling_window: Duration,
94 pub(crate) pruning_window: Duration,
96}
97
98#[derive(Debug)]
99enum SyncerCmd {
100 GetInfo {
101 respond_to: oneshot::Sender<SyncingInfo>,
102 },
103 #[cfg(test)]
104 TriggerFetchNextBatch,
105}
106
107#[derive(Debug, Serialize, Deserialize)]
109pub struct SyncingInfo {
110 pub stored_headers: BlockRanges,
112 pub subjective_head: u64,
114}
115
116impl<S> Syncer<S>
117where
118 S: Store,
119{
120 pub(crate) fn start(args: SyncerArgs<S>) -> Result<Self> {
122 let cancellation_token = CancellationToken::new();
123 let event_pub = args.event_pub.clone();
124 let (cmd_tx, cmd_rx) = mpsc::channel(16);
125 let mut worker = Worker::new(args, cancellation_token.child_token(), cmd_rx)?;
126
127 let join_handle = spawn(async move {
128 if let Err(e) = worker.run().await {
129 error!("Syncer stopped because of a fatal error: {e}");
130
131 event_pub.send(NodeEvent::FatalSyncerError {
132 error: e.to_string(),
133 });
134 }
135 });
136
137 Ok(Syncer {
138 cancellation_token,
139 cmd_tx,
140 join_handle,
141 _store: PhantomData,
142 })
143 }
144
145 pub(crate) fn stop(&self) {
147 self.cancellation_token.cancel();
149 }
150
151 pub(crate) async fn join(&self) {
153 self.join_handle.join().await;
154 }
155
156 async fn send_command(&self, cmd: SyncerCmd) -> Result<()> {
157 self.cmd_tx
158 .send(cmd)
159 .await
160 .map_err(|_| SyncerError::WorkerDied)
161 }
162
163 pub(crate) async fn info(&self) -> Result<SyncingInfo> {
169 let (tx, rx) = oneshot::channel();
170
171 self.send_command(SyncerCmd::GetInfo { respond_to: tx })
172 .await?;
173
174 Ok(rx.await?)
175 }
176
177 #[cfg(test)]
178 async fn trigger_fetch_next_batch(&self) -> Result<()> {
179 self.send_command(SyncerCmd::TriggerFetchNextBatch).await
180 }
181}
182
183impl<S> Drop for Syncer<S>
184where
185 S: Store,
186{
187 fn drop(&mut self) {
188 self.stop();
189 }
190}
191
192struct Worker<S>
193where
194 S: Store + 'static,
195{
196 cancellation_token: CancellationToken,
197 cmd_rx: mpsc::Receiver<SyncerCmd>,
198 event_pub: EventPublisher,
199 p2p: Arc<P2p>,
200 store: Arc<S>,
201 header_sub_rx: Option<mpsc::Receiver<ExtendedHeader>>,
202 subjective_head_height: Option<u64>,
203 highest_slow_sync_height: Option<u64>,
204 batch_size: u64,
205 ongoing_batch: Ongoing,
206 sampling_window: Duration,
207 pruning_window: Duration,
208}
209
210struct Ongoing {
211 range: Option<BlockRange>,
212 task: FusedReusableFuture<(Result<Vec<ExtendedHeader>, P2pError>, Duration)>,
213}
214
215impl<S> Worker<S>
216where
217 S: Store,
218{
219 fn new(
220 args: SyncerArgs<S>,
221 cancellation_token: CancellationToken,
222 cmd_rx: mpsc::Receiver<SyncerCmd>,
223 ) -> Result<Self> {
224 Ok(Worker {
225 cancellation_token,
226 cmd_rx,
227 event_pub: args.event_pub,
228 p2p: args.p2p,
229 store: args.store,
230 header_sub_rx: None,
231 subjective_head_height: None,
232 highest_slow_sync_height: None,
233 batch_size: args.batch_size,
234 ongoing_batch: Ongoing {
235 range: None,
236 task: FusedReusableFuture::terminated(),
237 },
238 sampling_window: args.sampling_window,
239 pruning_window: args.pruning_window,
240 })
241 }
242
243 async fn run(&mut self) -> Result<()> {
244 loop {
245 if self.cancellation_token.is_cancelled() {
246 break;
247 }
248
249 self.connecting_event_loop().await?;
250
251 if self.cancellation_token.is_cancelled() {
252 break;
253 }
254
255 self.connected_event_loop().await?;
256 }
257
258 debug!("Syncer stopped");
259 Ok(())
260 }
261
262 async fn connecting_event_loop(&mut self) -> Result<()> {
267 debug!("Entering connecting_event_loop");
268
269 let mut report_interval = Interval::new(Duration::from_secs(60));
270 self.report().await?;
271
272 let mut try_init_fut = pin!(try_init_task(
273 self.p2p.clone(),
274 self.store.clone(),
275 self.event_pub.clone()
276 ));
277
278 loop {
279 select! {
280 _ = self.cancellation_token.cancelled() => {
281 break;
282 }
283 _ = report_interval.tick() => {
284 self.report().await?;
285 }
286 res = &mut try_init_fut => {
287 let (network_head, took) = res?;
289 let network_head_height = network_head.height().value();
290
291 info!("Setting initial subjective head to {network_head_height}");
292 self.set_subjective_head_height(network_head_height);
293
294 let (header_sub_tx, header_sub_rx) = mpsc::channel(16);
295 self.p2p.init_header_sub(network_head, header_sub_tx).await?;
296 self.header_sub_rx = Some(header_sub_rx);
297
298 self.event_pub.send(NodeEvent::FetchingHeadHeaderFinished {
299 height: network_head_height,
300 took,
301 });
302
303 break;
304 }
305 Some(cmd) = self.cmd_rx.recv() => {
306 self.on_cmd(cmd).await?;
307 }
308 }
309 }
310
311 Ok(())
312 }
313
314 async fn connected_event_loop(&mut self) -> Result<()> {
319 debug!("Entering connected_event_loop");
320
321 let mut report_interval = Interval::new(Duration::from_secs(60));
322 let mut peer_tracker_info_watcher = self.p2p.peer_tracker_info_watcher();
323
324 if peer_tracker_info_watcher.borrow().num_connected_peers == 0 {
326 warn!("All peers disconnected");
327 return Ok(());
328 }
329
330 self.fetch_next_batch().await?;
331 self.report().await?;
332
333 loop {
334 select! {
335 _ = self.cancellation_token.cancelled() => {
336 break;
337 }
338 _ = peer_tracker_info_watcher.changed() => {
339 if peer_tracker_info_watcher.borrow().num_connected_peers == 0 {
340 warn!("All peers disconnected");
341 break;
342 }
343 }
344 _ = report_interval.tick() => {
345 self.report().await?;
346 }
347 res = header_sub_recv(self.header_sub_rx.as_mut()) => {
348 let header = res?;
349 self.on_header_sub_message(header).await?;
350 self.fetch_next_batch().await?;
351 }
352 Some(cmd) = self.cmd_rx.recv() => {
353 self.on_cmd(cmd).await?;
354 }
355 (res, took) = &mut self.ongoing_batch.task => {
356 self.on_fetch_next_batch_result(res, took).await?;
357 self.fetch_next_batch().await?;
358 }
359 }
360 }
361
362 if let Some(ongoing) = self.ongoing_batch.range.take() {
363 warn!("Cancelling fetching of {}", ongoing.display());
364 self.ongoing_batch.task.terminate();
365 }
366
367 self.header_sub_rx.take();
368
369 Ok(())
370 }
371
372 async fn syncing_info(&self) -> Result<SyncingInfo> {
373 Ok(SyncingInfo {
374 stored_headers: self.store.get_stored_header_ranges().await?,
375 subjective_head: self.subjective_head_height.unwrap_or(0),
376 })
377 }
378
379 #[instrument(skip_all)]
380 async fn report(&mut self) -> Result<()> {
381 let SyncingInfo {
382 stored_headers,
383 subjective_head,
384 } = self.syncing_info().await?;
385
386 let ongoing_batch = self
387 .ongoing_batch
388 .range
389 .as_ref()
390 .map(|range| format!("{}", range.display()))
391 .unwrap_or_else(|| "None".to_string());
392
393 info!(
394 "syncing: head: {subjective_head}, stored headers: {stored_headers}, ongoing batches: {ongoing_batch}"
395 );
396 Ok(())
397 }
398
399 async fn on_cmd(&mut self, cmd: SyncerCmd) -> Result<()> {
400 match cmd {
401 SyncerCmd::GetInfo { respond_to } => {
402 let info = self.syncing_info().await?;
403 respond_to.maybe_send(info);
404 }
405 #[cfg(test)]
406 SyncerCmd::TriggerFetchNextBatch => {
407 self.fetch_next_batch().await?;
408 }
409 }
410
411 Ok(())
412 }
413
414 #[instrument(skip_all)]
415 async fn on_header_sub_message(&mut self, new_head: ExtendedHeader) -> Result<()> {
416 let new_head_height = new_head.height().value();
417
418 self.set_subjective_head_height(new_head_height);
419
420 if let Ok(store_head_height) = self.store.head_height().await {
421 if store_head_height + 1 == new_head_height {
423 if self.store.insert(new_head).await.is_ok() {
426 self.event_pub.send(NodeEvent::AddedHeaderFromHeaderSub {
427 height: new_head_height,
428 });
429 }
430 }
431 }
432
433 Ok(())
434 }
435
436 fn set_subjective_head_height(&mut self, height: u64) {
437 if self
438 .subjective_head_height
439 .is_some_and(|old_height| height <= old_height)
440 {
441 return;
442 }
443
444 self.subjective_head_height = Some(height);
445 }
446
447 #[instrument(skip_all)]
448 async fn fetch_next_batch(&mut self) -> Result<()> {
449 debug_assert_eq!(
450 self.ongoing_batch.range.is_none(),
451 self.ongoing_batch.task.is_terminated()
452 );
453
454 if !self.ongoing_batch.task.is_terminated() {
455 return Ok(());
461 }
462
463 if self.p2p.peer_tracker_info().num_connected_peers == 0 {
464 return Ok(());
467 }
468
469 let Some(subjective_head_height) = self.subjective_head_height else {
470 return Ok(());
472 };
473
474 let store_ranges = self.store.get_stored_header_ranges().await?;
475 let pruned_ranges = self.store.get_pruned_ranges().await?;
476
477 let synced_ranges = pruned_ranges + &store_ranges;
480
481 let next_batch = calculate_range_to_fetch(
482 subjective_head_height,
483 synced_ranges.as_ref(),
484 self.batch_size,
485 );
486
487 if next_batch.is_empty() {
488 return Ok(());
490 }
491
492 if self
494 .highest_slow_sync_height
495 .is_some_and(|height| *next_batch.end() <= height)
496 {
497 let threshold = (self.batch_size / 2).max(SLOW_SYNC_MIN_THRESHOLD);
499
500 let sampled_ranges = self.store.get_sampled_ranges().await?;
502 let available_for_sampling = (store_ranges - sampled_ranges).len();
503
504 if available_for_sampling > threshold {
506 return Ok(());
508 }
509 }
510
511 match self.store.get_by_height(next_batch.end() + 1).await {
513 Ok(known_header) => {
514 if !self.in_sampling_window(&known_header) {
515 return Ok(());
516 }
517 }
518 Err(StoreError::NotFound) => {}
519 Err(e) => return Err(e.into()),
520 }
521
522 self.event_pub.send(NodeEvent::FetchingHeadersStarted {
523 from_height: *next_batch.start(),
524 to_height: *next_batch.end(),
525 });
526
527 let p2p = self.p2p.clone();
528
529 self.ongoing_batch.range = Some(next_batch.clone());
530
531 self.ongoing_batch.task.set(async move {
532 let now = Instant::now();
533 let res = p2p.get_unverified_header_range(next_batch).await;
534 (res, now.elapsed())
535 });
536
537 Ok(())
538 }
539
540 #[instrument(skip_all)]
542 async fn on_fetch_next_batch_result(
543 &mut self,
544 res: Result<Vec<ExtendedHeader>, P2pError>,
545 took: Duration,
546 ) -> Result<()> {
547 let range = self
548 .ongoing_batch
549 .range
550 .take()
551 .expect("ongoing_batch not initialized correctly");
552
553 let from_height = *range.start();
554 let to_height = *range.end();
555
556 let headers = match res {
557 Ok(headers) => headers,
558 Err(e) => {
559 if e.is_fatal() {
560 return Err(e.into());
561 }
562
563 self.event_pub.send(NodeEvent::FetchingHeadersFailed {
564 from_height,
565 to_height,
566 error: e.to_string(),
567 took,
568 });
569
570 return Ok(());
571 }
572 };
573
574 let pruning_cutoff = Time::now().saturating_sub(self.pruning_window);
575
576 for header in headers.iter().rev() {
579 if self
580 .highest_slow_sync_height
581 .is_some_and(|height| header.height().value() <= height)
582 {
583 break;
586 }
587
588 if header.time() <= pruning_cutoff {
591 self.highest_slow_sync_height = Some(header.height().value());
592 break;
593 }
594 }
595
596 if let Err(e) = self.store.insert(headers).await {
597 if e.is_fatal() {
598 return Err(e.into());
599 }
600
601 self.event_pub.send(NodeEvent::FetchingHeadersFailed {
602 from_height,
603 to_height,
604 error: format!("Failed to store headers: {e}"),
605 took,
606 });
607 }
608
609 self.event_pub.send(NodeEvent::FetchingHeadersFinished {
610 from_height,
611 to_height,
612 took,
613 });
614
615 Ok(())
616 }
617
618 fn in_sampling_window(&self, header: &ExtendedHeader) -> bool {
619 let sampling_window_end = Time::now().saturating_sub(self.sampling_window);
620 header.time().after(sampling_window_end)
621 }
622}
623
624fn calculate_range_to_fetch(
627 subjective_head_height: u64,
628 synced_headers: &[BlockRange],
629 limit: u64,
630) -> BlockRange {
631 let mut synced_headers_iter = synced_headers.iter().rev();
632
633 let Some(synced_head_range) = synced_headers_iter.next() else {
634 let range = 1..=subjective_head_height;
636 return range.tailn(limit);
637 };
638
639 if synced_head_range.end() < &subjective_head_height {
640 let range = synced_head_range.end() + 1..=subjective_head_height;
642 return range.tailn(limit);
643 }
644
645 let penultimate_range_end = synced_headers_iter.next().map(|r| *r.end()).unwrap_or(0);
647
648 let range = penultimate_range_end + 1..=synced_head_range.start().saturating_sub(1);
649 range.headn(limit)
650}
651
652#[instrument(skip_all)]
653async fn try_init_task<S>(
654 p2p: Arc<P2p>,
655 store: Arc<S>,
656 event_pub: EventPublisher,
657) -> Result<(ExtendedHeader, Duration)>
658where
659 S: Store + 'static,
660{
661 let now = Instant::now();
662 let mut event_reported = false;
663 let mut backoff = ExponentialBackoffBuilder::default()
664 .with_max_interval(TRY_INIT_BACKOFF_MAX_INTERVAL)
665 .with_max_elapsed_time(None)
666 .build();
667
668 loop {
669 match try_init(&p2p, &*store, &event_pub, &mut event_reported).await {
670 Ok(network_head) => {
671 return Ok((network_head, now.elapsed()));
672 }
673 Err(e) if e.is_fatal() => {
674 return Err(e);
675 }
676 Err(e) => {
677 let sleep_dur = backoff
678 .next_backoff()
679 .expect("backoff never stops retrying");
680
681 warn!(
682 "Initialization of subjective head failed: {e}. Trying again in {sleep_dur:?}."
683 );
684 sleep(sleep_dur).await;
685 }
686 }
687 }
688}
689
690async fn try_init<S>(
691 p2p: &P2p,
692 store: &S,
693 event_pub: &EventPublisher,
694 event_reported: &mut bool,
695) -> Result<ExtendedHeader>
696where
697 S: Store,
698{
699 p2p.wait_connected_trusted().await?;
700
701 if !*event_reported {
702 event_pub.send(NodeEvent::FetchingHeadHeaderStarted);
703 *event_reported = true;
704 }
705
706 let network_head = p2p.get_head_header().await?;
707
708 let try_insert = match store.get_head().await {
715 Ok(store_head) => store_head.hash() != network_head.hash(),
718 Err(StoreError::NotFound) => true,
719 Err(e) => return Err(e.into()),
720 };
721
722 if try_insert {
723 store.insert(network_head.clone()).await?;
726 }
727
728 Ok(network_head)
729}
730
731async fn header_sub_recv(
732 rx: Option<&mut mpsc::Receiver<ExtendedHeader>>,
733) -> Result<ExtendedHeader> {
734 rx.expect("header-sub not initialized")
735 .recv()
736 .await
737 .ok_or(SyncerError::P2p(P2pError::WorkerDied))
738}
739
740#[cfg(test)]
741mod tests {
742 use std::ops::RangeInclusive;
743
744 use super::*;
745 use crate::block_ranges::{BlockRange, BlockRangeExt};
746 use crate::events::EventChannel;
747 use crate::node::{DEFAULT_PRUNING_WINDOW, HeaderExError, SAMPLING_WINDOW};
748 use crate::p2p::header_session;
749 use crate::store::InMemoryStore;
750 use crate::test_utils::{MockP2pHandle, gen_filled_store};
751 use crate::utils::OneshotResultSenderExt;
752 use celestia_types::test_utils::ExtendedHeaderGenerator;
753 use libp2p::request_response::OutboundFailure;
754 use lumina_utils::test_utils::async_test;
755
756 #[test]
757 fn calculate_range_to_fetch_test_header_limit() {
758 let head_height = 1024;
759 let ranges = [256..=512];
760
761 let fetch_range = calculate_range_to_fetch(head_height, &ranges, 16);
762 assert_eq!(fetch_range, 513..=528);
763
764 let fetch_range = calculate_range_to_fetch(head_height, &ranges, 511);
765 assert_eq!(fetch_range, 513..=1023);
766 let fetch_range = calculate_range_to_fetch(head_height, &ranges, 512);
767 assert_eq!(fetch_range, 513..=1024);
768 let fetch_range = calculate_range_to_fetch(head_height, &ranges, 513);
769 assert_eq!(fetch_range, 513..=1024);
770
771 let fetch_range = calculate_range_to_fetch(head_height, &ranges, 1024);
772 assert_eq!(fetch_range, 513..=1024);
773 }
774
775 #[test]
776 fn calculate_range_to_fetch_empty_store() {
777 let fetch_range = calculate_range_to_fetch(1, &[], 100);
778 assert_eq!(fetch_range, 1..=1);
779
780 let fetch_range = calculate_range_to_fetch(100, &[], 10);
781 assert_eq!(fetch_range, 1..=10);
782
783 let fetch_range = calculate_range_to_fetch(100, &[], 50);
784 assert_eq!(fetch_range, 1..=50);
785 }
786
787 #[test]
788 fn calculate_range_to_fetch_fully_synced() {
789 let fetch_range = calculate_range_to_fetch(1, &[1..=1], 100);
790 assert!(fetch_range.is_empty());
791
792 let fetch_range = calculate_range_to_fetch(100, &[1..=100], 10);
793 assert!(fetch_range.is_empty());
794
795 let fetch_range = calculate_range_to_fetch(100, &[1..=100], 10);
796 assert!(fetch_range.is_empty());
797 }
798
799 #[test]
800 fn calculate_range_to_fetch_caught_up() {
801 let head_height = 4000;
802
803 let fetch_range = calculate_range_to_fetch(head_height, &[3000..=4000], 500);
804 assert_eq!(fetch_range, 2500..=2999);
805 let fetch_range = calculate_range_to_fetch(head_height, &[500..=1000, 3000..=4000], 500);
806 assert_eq!(fetch_range, 2500..=2999);
807 let fetch_range = calculate_range_to_fetch(head_height, &[2500..=2800, 3000..=4000], 500);
808 assert_eq!(fetch_range, 2801..=2999);
809 let fetch_range = calculate_range_to_fetch(head_height, &[2500..=2800, 3000..=4000], 500);
810 assert_eq!(fetch_range, 2801..=2999);
811 let fetch_range = calculate_range_to_fetch(head_height, &[300..=4000], 500);
812 assert_eq!(fetch_range, 1..=299);
813 }
814
815 #[test]
816 fn calculate_range_to_fetch_catching_up() {
817 let head_height = 4000;
818
819 let fetch_range = calculate_range_to_fetch(head_height, &[2000..=3000], 500);
820 assert_eq!(fetch_range, 3001..=3500);
821 let fetch_range = calculate_range_to_fetch(head_height, &[2000..=3500], 500);
822 assert_eq!(fetch_range, 3501..=4000);
823 let fetch_range = calculate_range_to_fetch(head_height, &[1..=2998, 3000..=3800], 500);
824 assert_eq!(fetch_range, 3801..=4000);
825 }
826
827 #[async_test]
828 async fn init_without_genesis_hash() {
829 let events = EventChannel::new();
830 let (mock, mut handle) = P2p::mocked();
831 let mut generator = ExtendedHeaderGenerator::new();
832 let header = generator.next();
833
834 let _syncer = Syncer::start(SyncerArgs {
835 p2p: Arc::new(mock),
836 store: Arc::new(InMemoryStore::new()),
837 event_pub: events.publisher(),
838 batch_size: 512,
839 sampling_window: SAMPLING_WINDOW,
840 pruning_window: DEFAULT_PRUNING_WINDOW,
841 })
842 .unwrap();
843
844 handle.expect_no_cmd().await;
846 handle.announce_peer_connected();
847 handle.expect_no_cmd().await;
848 handle.announce_trusted_peer_connected();
849
850 let (height, amount, respond_to) = handle.expect_header_request_for_height_cmd().await;
852 assert_eq!(height, 0);
853 assert_eq!(amount, 1);
854 respond_to.send(Ok(vec![header.clone()])).unwrap();
855
856 let head_from_syncer = handle.expect_init_header_sub().await;
858 assert_eq!(head_from_syncer, header);
859
860 handle.expect_no_cmd().await;
862 }
863
864 #[async_test]
865 async fn init_with_genesis_hash() {
866 let mut generator = ExtendedHeaderGenerator::new();
867 let head = generator.next();
868
869 let (_syncer, _store, mut p2p_mock) = initialized_syncer(head.clone()).await;
870
871 p2p_mock.expect_no_cmd().await;
873 }
874
875 #[async_test]
876 async fn syncing() {
877 let mut generator = ExtendedHeaderGenerator::new();
878 let headers = generator.next_many(1500);
879
880 let (syncer, store, mut p2p_mock) = initialized_syncer(headers[1499].clone()).await;
881 assert_syncing(&syncer, &store, &[1500..=1500], 1500).await;
882
883 handle_session_batch(&mut p2p_mock, &headers, 988..=1499, true).await;
885 assert_syncing(&syncer, &store, &[988..=1500], 1500).await;
886
887 handle_session_batch(&mut p2p_mock, &headers, 476..=987, true).await;
889 assert_syncing(&syncer, &store, &[476..=1500], 1500).await;
890
891 let header1501 = generator.next();
893 p2p_mock.announce_new_head(header1501.clone());
894 assert_syncing(&syncer, &store, &[476..=1501], 1501).await;
897
898 handle_session_batch(&mut p2p_mock, &headers, 1..=475, true).await;
900 assert_syncing(&syncer, &store, &[1..=1501], 1501).await;
901
902 p2p_mock.expect_no_cmd().await;
904
905 let header1502 = generator.next();
907 p2p_mock.announce_new_head(header1502.clone());
908 assert_syncing(&syncer, &store, &[1..=1502], 1502).await;
909 p2p_mock.expect_no_cmd().await;
910
911 let headers_1503_1505 = generator.next_many(3);
913 p2p_mock.announce_new_head(headers_1503_1505[2].clone());
914 assert_syncing(&syncer, &store, &[1..=1502], 1505).await;
915
916 handle_session_batch(&mut p2p_mock, &headers_1503_1505, 1503..=1505, true).await;
918 assert_syncing(&syncer, &store, &[1..=1505], 1505).await;
919
920 let mut headers = generator.next_many(1495);
922 p2p_mock.announce_new_head(headers[1494].clone());
923 assert_syncing(&syncer, &store, &[1..=1505], 3000).await;
924
925 handle_session_batch(&mut p2p_mock, &headers, 1506..=2017, true).await;
927 assert_syncing(&syncer, &store, &[1..=2017], 3000).await;
928
929 headers.push(generator.next());
931 p2p_mock.announce_new_head(headers.last().unwrap().clone());
932 assert_syncing(&syncer, &store, &[1..=2017], 3001).await;
933
934 handle_session_batch(&mut p2p_mock, &headers, 2018..=2529, true).await;
936 assert_syncing(&syncer, &store, &[1..=2529], 3001).await;
937
938 handle_session_batch(&mut p2p_mock, &headers, 2530..=3001, true).await;
940 assert_syncing(&syncer, &store, &[1..=3001], 3001).await;
941
942 p2p_mock.expect_no_cmd().await;
944 }
945
946 #[async_test]
947 async fn slow_sync() {
948 let pruning_window = Duration::from_secs(600);
949 let sampling_window = SAMPLING_WINDOW;
950
951 let mut generator = ExtendedHeaderGenerator::new();
952
953 let first_header_time = (Time::now() - Duration::from_secs(2048)).unwrap();
955 generator.set_time(first_header_time, Duration::from_secs(1));
956 let headers = generator.next_many(2048);
957
958 let (syncer, store, mut p2p_mock) =
959 initialized_syncer_with_windows(headers[2047].clone(), sampling_window, pruning_window)
960 .await;
961 assert_syncing(&syncer, &store, &[2048..=2048], 2048).await;
962
963 handle_session_batch(&mut p2p_mock, &headers, 1536..=2047, true).await;
965 assert_syncing(&syncer, &store, &[1536..=2048], 2048).await;
966
967 handle_session_batch(&mut p2p_mock, &headers, 1024..=1535, true).await;
969 assert_syncing(&syncer, &store, &[1024..=2048], 2048).await;
970
971 syncer.trigger_fetch_next_batch().await.unwrap();
975 p2p_mock.expect_no_cmd().await;
976
977 for height in 1250..=2048 {
980 store.mark_as_sampled(height).await.unwrap();
982 }
983 for height in 1300..=1450 {
984 store.remove_height(height).await.unwrap();
986 }
987 syncer.trigger_fetch_next_batch().await.unwrap();
988 handle_session_batch(&mut p2p_mock, &headers, 512..=1023, true).await;
990 assert_syncing(&syncer, &store, &[512..=1299, 1451..=2048], 2048).await;
991
992 syncer.trigger_fetch_next_batch().await.unwrap();
995 p2p_mock.expect_no_cmd().await;
996 }
997
998 #[async_test]
999 async fn window_edge() {
1000 let month_and_day_ago = Duration::from_secs(31 * 24 * 60 * 60);
1001 let mut generator = ExtendedHeaderGenerator::new();
1002 generator.set_time(
1003 (Time::now() - month_and_day_ago).expect("to not underflow"),
1004 Duration::from_secs(1),
1005 );
1006 let mut headers = generator.next_many(1200);
1007 generator.reset_time();
1008 headers.append(&mut generator.next_many(2049 - 1200));
1009
1010 let (syncer, store, mut p2p_mock) = initialized_syncer(headers[2048].clone()).await;
1011 assert_syncing(&syncer, &store, &[2049..=2049], 2049).await;
1012
1013 handle_session_batch(&mut p2p_mock, &headers, 1537..=2048, true).await;
1015 assert_syncing(&syncer, &store, &[1537..=2049], 2049).await;
1016
1017 handle_session_batch(&mut p2p_mock, &headers, 1025..=1536, true).await;
1019 assert_syncing(&syncer, &store, &[1025..=2049], 2049).await;
1020
1021 p2p_mock.expect_no_cmd().await;
1023 }
1024
1025 #[async_test]
1026 async fn start_with_filled_store() {
1027 let events = EventChannel::new();
1028 let (p2p, mut p2p_mock) = P2p::mocked();
1029 let (store, mut generator) = gen_filled_store(25).await;
1030 let store = Arc::new(store);
1031
1032 let mut headers = generator.next_many(520);
1033 let network_head = generator.next(); let syncer = Syncer::start(SyncerArgs {
1036 p2p: Arc::new(p2p),
1037 store: store.clone(),
1038 event_pub: events.publisher(),
1039 batch_size: 512,
1040 sampling_window: SAMPLING_WINDOW,
1041 pruning_window: DEFAULT_PRUNING_WINDOW,
1042 })
1043 .unwrap();
1044
1045 p2p_mock.announce_trusted_peer_connected();
1046
1047 let (height, amount, respond_to) = p2p_mock.expect_header_request_for_height_cmd().await;
1049 assert_eq!(height, 0);
1050 assert_eq!(amount, 1);
1051 respond_to.send(Ok(vec![network_head.clone()])).unwrap();
1052
1053 let head_from_syncer = p2p_mock.expect_init_header_sub().await;
1055 assert_eq!(head_from_syncer, network_head);
1056
1057 assert_syncing(&syncer, &store, &[1..=25, 546..=546], 546).await;
1058
1059 handle_session_batch(&mut p2p_mock, &headers, 34..=545, true).await;
1061 assert_syncing(&syncer, &store, &[1..=25, 34..=546], 546).await;
1062
1063 let (height, amount, respond_to) = p2p_mock.expect_header_request_for_height_cmd().await;
1065 assert_eq!(height, 26);
1066 assert_eq!(amount, 8);
1067 respond_to
1068 .send(Ok(headers.drain(..8).collect()))
1069 .map_err(|_| "headers [538, 545]")
1070 .unwrap();
1071 assert_syncing(&syncer, &store, &[1..=546], 546).await;
1072
1073 p2p_mock.expect_no_cmd().await;
1075 }
1076
1077 #[async_test]
1078 async fn stop_syncer() {
1079 let mut generator = ExtendedHeaderGenerator::new();
1080 let head = generator.next();
1081
1082 let (syncer, _store, mut p2p_mock) = initialized_syncer(head.clone()).await;
1083
1084 p2p_mock.expect_no_cmd().await;
1086
1087 syncer.stop();
1088 sleep(Duration::from_millis(1)).await;
1090 assert!(matches!(
1091 syncer.info().await.unwrap_err(),
1092 SyncerError::WorkerDied
1093 ));
1094 }
1095
1096 #[async_test]
1097 async fn all_peers_disconnected() {
1098 let mut generator = ExtendedHeaderGenerator::new();
1099
1100 let _gap = generator.next_many(24);
1101 let header25 = generator.next();
1102 let _gap = generator.next_many(4);
1103 let header30 = generator.next();
1104 let _gap = generator.next_many(4);
1105 let header35 = generator.next();
1106
1107 let (syncer, store, mut p2p_mock) = initialized_syncer(header30).await;
1109
1110 handle_session_batch(&mut p2p_mock, &[], 1..=29, false).await;
1112
1113 p2p_mock.announce_all_peers_disconnected();
1114 p2p_mock.expect_no_cmd().await;
1116
1117 p2p_mock.announce_peer_connected();
1120 p2p_mock.expect_no_cmd().await;
1121
1122 p2p_mock.announce_trusted_peer_connected();
1124
1125 let (height, amount, respond_to) = p2p_mock.expect_header_request_for_height_cmd().await;
1127 assert_eq!(height, 0);
1128 assert_eq!(amount, 1);
1129
1130 respond_to.send(Ok(vec![header25])).unwrap();
1132 assert_syncing(&syncer, &store, &[30..=30], 30).await;
1133
1134 sleep(Duration::from_secs(1)).await;
1136 let (height, amount, respond_to) = p2p_mock.expect_header_request_for_height_cmd().await;
1137 assert_eq!(height, 0);
1138 assert_eq!(amount, 1);
1139
1140 respond_to.send(Ok(vec![header35.clone()])).unwrap();
1142 assert_syncing(&syncer, &store, &[30..=30, 35..=35], 35).await;
1143
1144 let head_from_syncer = p2p_mock.expect_init_header_sub().await;
1146 assert_eq!(head_from_syncer, header35);
1147
1148 let (height, amount, _respond_to) = p2p_mock.expect_header_request_for_height_cmd().await;
1151 assert_eq!(height, 31);
1152 assert_eq!(amount, 4);
1153
1154 p2p_mock.announce_all_peers_disconnected();
1155 p2p_mock.expect_no_cmd().await;
1156 }
1157
1158 #[async_test]
1159 async fn all_peers_disconnected_and_no_network_head_progress() {
1160 let mut generator = ExtendedHeaderGenerator::new_from_height(30);
1161
1162 let header30 = generator.next();
1163
1164 let (syncer, store, mut p2p_mock) = initialized_syncer(header30.clone()).await;
1166
1167 handle_session_batch(&mut p2p_mock, &[], 1..=29, false).await;
1169
1170 p2p_mock.announce_all_peers_disconnected();
1171 p2p_mock.expect_no_cmd().await;
1173
1174 p2p_mock.announce_peer_connected();
1177 p2p_mock.expect_no_cmd().await;
1178
1179 p2p_mock.announce_trusted_peer_connected();
1181
1182 let (height, amount, respond_to) = p2p_mock.expect_header_request_for_height_cmd().await;
1184 assert_eq!(height, 0);
1185 assert_eq!(amount, 1);
1186
1187 respond_to.send(Ok(vec![header30.clone()])).unwrap();
1189 assert_syncing(&syncer, &store, &[30..=30], 30).await;
1190
1191 let head_from_syncer = p2p_mock.expect_init_header_sub().await;
1193 assert_eq!(head_from_syncer, header30);
1194
1195 handle_session_batch(&mut p2p_mock, &[], 1..=29, false).await;
1197
1198 p2p_mock.announce_all_peers_disconnected();
1199 p2p_mock.expect_no_cmd().await;
1200 }
1201
1202 #[async_test]
1203 async fn non_contiguous_response() {
1204 let mut generator = ExtendedHeaderGenerator::new();
1205 let mut headers = generator.next_many(20);
1206
1207 let (syncer, store, mut p2p_mock) = initialized_syncer(headers[19].clone()).await;
1209
1210 let header10 = headers[10].clone();
1211 headers[10] = headers[11].clone();
1213
1214 handle_session_batch(&mut p2p_mock, &headers, 1..=19, true).await;
1216
1217 assert_syncing(&syncer, &store, &[20..=20], 20).await;
1219
1220 headers[10] = header10;
1222
1223 handle_session_batch(&mut p2p_mock, &headers, 1..=19, true).await;
1225
1226 assert_syncing(&syncer, &store, &[1..=20], 20).await;
1228 }
1229
1230 #[async_test]
1231 async fn another_chain_response() {
1232 let headers = ExtendedHeaderGenerator::new().next_many(20);
1233 let headers_prime = ExtendedHeaderGenerator::new().next_many(20);
1234
1235 let (syncer, store, mut p2p_mock) = initialized_syncer(headers[19].clone()).await;
1237
1238 handle_session_batch(&mut p2p_mock, &headers_prime, 1..=19, true).await;
1240
1241 assert_syncing(&syncer, &store, &[20..=20], 20).await;
1243
1244 handle_session_batch(&mut p2p_mock, &headers, 1..=19, true).await;
1246
1247 assert_syncing(&syncer, &store, &[1..=20], 20).await;
1249 }
1250
1251 async fn assert_syncing(
1252 syncer: &Syncer<InMemoryStore>,
1253 store: &InMemoryStore,
1254 expected_synced_ranges: &[RangeInclusive<u64>],
1255 expected_subjective_head: u64,
1256 ) {
1257 sleep(Duration::from_millis(1)).await;
1260
1261 let store_ranges = store.get_stored_header_ranges().await.unwrap();
1262 let syncing_info = syncer.info().await.unwrap();
1263
1264 assert_eq!(store_ranges.as_ref(), expected_synced_ranges);
1265 assert_eq!(syncing_info.stored_headers.as_ref(), expected_synced_ranges);
1266 assert_eq!(syncing_info.subjective_head, expected_subjective_head);
1267 }
1268
1269 async fn initialized_syncer(
1270 head: ExtendedHeader,
1271 ) -> (Syncer<InMemoryStore>, Arc<InMemoryStore>, MockP2pHandle) {
1272 initialized_syncer_with_windows(head, SAMPLING_WINDOW, DEFAULT_PRUNING_WINDOW).await
1273 }
1274
1275 async fn initialized_syncer_with_windows(
1276 head: ExtendedHeader,
1277 sampling_window: Duration,
1278 pruning_window: Duration,
1279 ) -> (Syncer<InMemoryStore>, Arc<InMemoryStore>, MockP2pHandle) {
1280 let events = EventChannel::new();
1281 let (mock, mut handle) = P2p::mocked();
1282 let store = Arc::new(InMemoryStore::new());
1283
1284 let syncer = Syncer::start(SyncerArgs {
1285 p2p: Arc::new(mock),
1286 store: store.clone(),
1287 event_pub: events.publisher(),
1288 batch_size: 512,
1289 sampling_window,
1290 pruning_window,
1291 })
1292 .unwrap();
1293
1294 handle.expect_no_cmd().await;
1296 handle.announce_peer_connected();
1297 handle.expect_no_cmd().await;
1298 handle.announce_trusted_peer_connected();
1299
1300 let (height, amount, respond_to) = handle.expect_header_request_for_height_cmd().await;
1302 assert_eq!(height, 0);
1303 assert_eq!(amount, 1);
1304 respond_to.send(Ok(vec![head.clone()])).unwrap();
1305
1306 let head_from_syncer = handle.expect_init_header_sub().await;
1308 assert_eq!(head_from_syncer, head);
1309
1310 let head_height = head.height().value();
1311 assert_syncing(&syncer, &store, &[head_height..=head_height], head_height).await;
1312
1313 (syncer, store, handle)
1314 }
1315
1316 async fn handle_session_batch(
1317 p2p_mock: &mut MockP2pHandle,
1318 remaining_headers: &[ExtendedHeader],
1319 range: BlockRange,
1320 respond: bool,
1321 ) {
1322 range.validate().unwrap();
1323
1324 let mut ranges_to_request = BlockRanges::new();
1325 ranges_to_request.insert_relaxed(&range).unwrap();
1326
1327 let mut no_respond_chans = Vec::new();
1328
1329 for _ in 0..requests_in_session(range.len()) {
1330 let (height, amount, respond_to) =
1331 p2p_mock.expect_header_request_for_height_cmd().await;
1332
1333 let requested_range = height..=height + amount - 1;
1334 ranges_to_request.remove_strict(requested_range);
1335
1336 if respond {
1337 let header_index = remaining_headers
1338 .iter()
1339 .position(|h| h.height().value() == height)
1340 .expect("height not found in provided headers");
1341
1342 let response_range =
1343 remaining_headers[header_index..header_index + amount as usize].to_vec();
1344 respond_to
1345 .send(Ok(response_range))
1346 .map_err(|_| format!("headers [{}, {}]", height, height + amount - 1))
1347 .unwrap();
1348 } else {
1349 no_respond_chans.push(respond_to);
1350 }
1351 }
1352
1353 if !respond {
1357 spawn(async move {
1358 sleep(Duration::from_secs(10)).await;
1359
1360 for respond_chan in no_respond_chans {
1361 respond_chan.maybe_send_err(P2pError::HeaderEx(
1362 HeaderExError::OutboundFailure(OutboundFailure::Timeout),
1363 ));
1364 }
1365 });
1366 }
1367
1368 assert!(
1369 ranges_to_request.is_empty(),
1370 "Some headers weren't requested. expected range: {}, not requested: {}",
1371 range.display(),
1372 ranges_to_request
1373 );
1374 }
1375
1376 fn requests_in_session(headers: u64) -> usize {
1377 let max_requests = headers.div_ceil(header_session::MAX_AMOUNT_PER_REQ) as usize;
1378 let min_requests = headers.div_ceil(header_session::MIN_AMOUNT_PER_REQ) as usize;
1379
1380 if max_requests > header_session::MAX_CONCURRENT_REQS {
1381 max_requests
1383 } else {
1384 header_session::MAX_CONCURRENT_REQS.min(min_requests)
1386 }
1387 }
1388
1389 impl BlockRanges {
1390 fn remove_strict(&mut self, range: BlockRange) {
1391 for stored in self.as_ref() {
1392 if stored.contains(range.start()) && stored.contains(range.end()) {
1393 self.remove_relaxed(range).unwrap();
1394 return;
1395 }
1396 }
1397
1398 panic!("block ranges ({self}) don't contain {}", range.display());
1399 }
1400 }
1401}