1use std::marker::PhantomData;
4use std::pin::pin;
5use std::sync::Arc;
6use std::time::Duration;
7
8use backoff::ExponentialBackoffBuilder;
9use backoff::backoff::Backoff;
10use celestia_types::ExtendedHeader;
11use lumina_utils::executor::{JoinHandle, spawn};
12use lumina_utils::time::{Instant, Interval, sleep};
13use serde::{Deserialize, Serialize};
14use tendermint::Time;
15use tokio::select;
16use tokio::sync::{broadcast, mpsc, oneshot};
17use tokio_util::sync::CancellationToken;
18use tracing::{debug, error, info, instrument, warn};
19
20use crate::block_ranges::{BlockRange, BlockRangeExt, BlockRanges};
21use crate::events::{EventPublisher, NodeEvent};
22use crate::node::subscriptions::BroadcastingStore;
23use crate::p2p::{P2p, P2pError};
24use crate::store::{Store, StoreError};
25use crate::utils::{FusedReusableFuture, OneshotSenderExt, TimeExt};
26
27type Result<T, E = SyncerError> = std::result::Result<T, E>;
28
29const TRY_INIT_BACKOFF_MAX_INTERVAL: Duration = Duration::from_secs(60);
30const SLOW_SYNC_MIN_THRESHOLD: u64 = 50;
31
32#[derive(Debug, thiserror::Error)]
34pub enum SyncerError {
35 #[error("P2p: {0}")]
37 P2p(#[from] P2pError),
38
39 #[error("Store: {0}")]
41 Store(#[from] StoreError),
42
43 #[error("Worker died")]
45 WorkerDied,
46
47 #[error("Channel closed unexpectedly")]
49 ChannelClosedUnexpectedly,
50}
51
52impl SyncerError {
53 pub(crate) fn is_fatal(&self) -> bool {
54 match self {
55 SyncerError::P2p(e) => e.is_fatal(),
56 SyncerError::Store(e) => e.is_fatal(),
57 SyncerError::WorkerDied | SyncerError::ChannelClosedUnexpectedly => true,
58 }
59 }
60}
61
62impl From<oneshot::error::RecvError> for SyncerError {
63 fn from(_value: oneshot::error::RecvError) -> Self {
64 SyncerError::ChannelClosedUnexpectedly
65 }
66}
67
68#[derive(Debug)]
70pub(crate) struct Syncer<S>
71where
72 S: Store + 'static,
73{
74 cmd_tx: mpsc::Sender<SyncerCmd>,
75 cancellation_token: CancellationToken,
76 join_handle: JoinHandle,
77 _store: PhantomData<S>,
78}
79
80pub(crate) struct SyncerArgs<S>
82where
83 S: Store + 'static,
84{
85 pub(crate) p2p: Arc<P2p>,
87 pub(crate) store: Arc<S>,
89 pub(crate) event_pub: EventPublisher,
91 pub(crate) batch_size: u64,
93 pub(crate) sampling_window: Duration,
95 pub(crate) pruning_window: Duration,
97}
98
99#[derive(Debug)]
100enum SyncerCmd {
101 GetInfo {
102 respond_to: oneshot::Sender<SyncingInfo>,
103 },
104 SubscribeHeights {
105 respond_to: oneshot::Sender<broadcast::Receiver<ExtendedHeader>>,
106 },
107 #[cfg(test)]
108 TriggerFetchNextBatch,
109}
110
111#[derive(Debug, Serialize, Deserialize)]
113pub struct SyncingInfo {
114 pub stored_headers: BlockRanges,
116 pub subjective_head: u64,
118}
119
120impl<S> Syncer<S>
121where
122 S: Store,
123{
124 pub(crate) fn start(args: SyncerArgs<S>) -> Result<Self> {
126 let cancellation_token = CancellationToken::new();
127 let event_pub = args.event_pub.clone();
128 let (cmd_tx, cmd_rx) = mpsc::channel(16);
129 let mut worker = Worker::new(args, cancellation_token.child_token(), cmd_rx)?;
130
131 let join_handle = spawn(async move {
132 if let Err(e) = worker.run().await {
133 error!("Syncer stopped because of a fatal error: {e}");
134
135 event_pub.send(NodeEvent::FatalSyncerError {
136 error: e.to_string(),
137 });
138 }
139 });
140
141 Ok(Syncer {
142 cancellation_token,
143 cmd_tx,
144 join_handle,
145 _store: PhantomData,
146 })
147 }
148
149 pub(crate) fn stop(&self) {
151 self.cancellation_token.cancel();
153 }
154
155 pub(crate) async fn join(&self) {
157 self.join_handle.join().await;
158 }
159
160 async fn send_command(&self, cmd: SyncerCmd) -> Result<()> {
161 self.cmd_tx
162 .send(cmd)
163 .await
164 .map_err(|_| SyncerError::WorkerDied)
165 }
166
167 pub(crate) async fn info(&self) -> Result<SyncingInfo> {
173 let (tx, rx) = oneshot::channel();
174
175 self.send_command(SyncerCmd::GetInfo { respond_to: tx })
176 .await?;
177
178 Ok(rx.await?)
179 }
180
181 pub(crate) async fn subscribe_headers(&self) -> Result<broadcast::Receiver<ExtendedHeader>> {
187 let (tx, rx) = oneshot::channel();
188
189 self.send_command(SyncerCmd::SubscribeHeights { respond_to: tx })
190 .await?;
191
192 Ok(rx.await?)
193 }
194
195 #[cfg(test)]
196 async fn trigger_fetch_next_batch(&self) -> Result<()> {
197 self.send_command(SyncerCmd::TriggerFetchNextBatch).await
198 }
199}
200
201impl<S> Drop for Syncer<S>
202where
203 S: Store,
204{
205 fn drop(&mut self) {
206 self.stop();
207 }
208}
209
210struct Worker<S>
211where
212 S: Store + 'static,
213{
214 cancellation_token: CancellationToken,
215 cmd_rx: mpsc::Receiver<SyncerCmd>,
216 event_pub: EventPublisher,
217 p2p: Arc<P2p>,
218 store: BroadcastingStore<S>,
219 header_sub_rx: Option<mpsc::Receiver<ExtendedHeader>>,
220 subjective_head_height: Option<u64>,
221 highest_slow_sync_height: Option<u64>,
222 batch_size: u64,
223 ongoing_batch: Ongoing,
224 sampling_window: Duration,
225 pruning_window: Duration,
226}
227
228struct Ongoing {
229 range: Option<BlockRange>,
230 task: FusedReusableFuture<(Result<Vec<ExtendedHeader>, P2pError>, Duration)>,
231}
232
233impl<S> Worker<S>
234where
235 S: Store,
236{
237 fn new(
238 args: SyncerArgs<S>,
239 cancellation_token: CancellationToken,
240 cmd_rx: mpsc::Receiver<SyncerCmd>,
241 ) -> Result<Self> {
242 Ok(Worker {
243 cancellation_token,
244 cmd_rx,
245 event_pub: args.event_pub,
246 p2p: args.p2p,
247 store: BroadcastingStore::new(args.store),
248 header_sub_rx: None,
249 subjective_head_height: None,
250 highest_slow_sync_height: None,
251 batch_size: args.batch_size,
252 ongoing_batch: Ongoing {
253 range: None,
254 task: FusedReusableFuture::terminated(),
255 },
256 sampling_window: args.sampling_window,
257 pruning_window: args.pruning_window,
258 })
259 }
260
261 async fn run(&mut self) -> Result<()> {
262 loop {
263 if self.cancellation_token.is_cancelled() {
264 break;
265 }
266
267 self.connecting_event_loop().await?;
268
269 if self.cancellation_token.is_cancelled() {
270 break;
271 }
272
273 self.connected_event_loop().await?;
274 }
275
276 debug!("Syncer stopped");
277 Ok(())
278 }
279
280 async fn connecting_event_loop(&mut self) -> Result<()> {
285 debug!("Entering connecting_event_loop");
286
287 let mut report_interval = Interval::new(Duration::from_secs(60));
288 self.report().await?;
289
290 let mut try_init_fut = pin!(try_init_task(
291 self.p2p.clone(),
292 self.store.clone_inner_store(),
293 self.event_pub.clone()
294 ));
295
296 loop {
297 select! {
298 _ = self.cancellation_token.cancelled() => {
299 break;
300 }
301 _ = report_interval.tick() => {
302 self.report().await?;
303 }
304 res = &mut try_init_fut => {
305 let (network_head, took) = res?;
307 let network_head_height = network_head.height();
308
309 info!("Setting initial subjective head to {network_head_height}");
310 self.set_subjective_head_height(network_head_height);
311 self.store.init_broadcast(network_head.clone());
312
313 let (header_sub_tx, header_sub_rx) = mpsc::channel(16);
314 self.p2p.init_header_sub(network_head, header_sub_tx).await?;
315 self.header_sub_rx = Some(header_sub_rx);
316
317 self.event_pub.send(NodeEvent::FetchingHeadHeaderFinished {
318 height: network_head_height,
319 took,
320 });
321
322 break;
323 }
324 Some(cmd) = self.cmd_rx.recv() => {
325 self.on_cmd(cmd).await?;
326 }
327 }
328 }
329
330 Ok(())
331 }
332
333 async fn connected_event_loop(&mut self) -> Result<()> {
338 debug!("Entering connected_event_loop");
339
340 let mut report_interval = Interval::new(Duration::from_secs(60));
341 let mut peer_tracker_info_watcher = self.p2p.peer_tracker_info_watcher();
342
343 if peer_tracker_info_watcher.borrow().num_connected_peers == 0 {
345 warn!("All peers disconnected");
346 return Ok(());
347 }
348
349 self.fetch_next_batch().await?;
350 self.report().await?;
351
352 loop {
353 select! {
354 _ = self.cancellation_token.cancelled() => {
355 break;
356 }
357 _ = peer_tracker_info_watcher.changed() => {
358 if peer_tracker_info_watcher.borrow().num_connected_peers == 0 {
359 warn!("All peers disconnected");
360 break;
361 }
362 }
363 _ = report_interval.tick() => {
364 self.report().await?;
365 }
366 res = header_sub_recv(self.header_sub_rx.as_mut()) => {
367 let header = res?;
368 self.on_header_sub_message(header).await?;
369 self.fetch_next_batch().await?;
370 }
371 Some(cmd) = self.cmd_rx.recv() => {
372 self.on_cmd(cmd).await?;
373 }
374 (res, took) = &mut self.ongoing_batch.task => {
375 self.on_fetch_next_batch_result(res, took).await?;
376 self.fetch_next_batch().await?;
377 }
378 }
379 }
380
381 if let Some(ongoing) = self.ongoing_batch.range.take() {
382 warn!("Cancelling fetching of {}", ongoing.display());
383 self.ongoing_batch.task.terminate();
384 }
385
386 self.header_sub_rx.take();
387
388 Ok(())
389 }
390
391 async fn syncing_info(&self) -> Result<SyncingInfo> {
392 Ok(SyncingInfo {
393 stored_headers: self.store.get_stored_header_ranges().await?,
394 subjective_head: self.subjective_head_height.unwrap_or(0),
395 })
396 }
397
398 #[instrument(skip_all)]
399 async fn report(&mut self) -> Result<()> {
400 let SyncingInfo {
401 stored_headers,
402 subjective_head,
403 } = self.syncing_info().await?;
404
405 let ongoing_batch = self
406 .ongoing_batch
407 .range
408 .as_ref()
409 .map(|range| format!("{}", range.display()))
410 .unwrap_or_else(|| "None".to_string());
411
412 info!(
413 "syncing: head: {subjective_head}, stored headers: {stored_headers}, ongoing batches: {ongoing_batch}"
414 );
415 Ok(())
416 }
417
418 async fn on_cmd(&mut self, cmd: SyncerCmd) -> Result<()> {
419 match cmd {
420 SyncerCmd::GetInfo { respond_to } => {
421 let info = self.syncing_info().await?;
422 respond_to.maybe_send(info);
423 }
424 SyncerCmd::SubscribeHeights { respond_to } => {
425 let receiver = self.store.subscribe();
426 respond_to.maybe_send(receiver);
427 }
428 #[cfg(test)]
429 SyncerCmd::TriggerFetchNextBatch => {
430 self.fetch_next_batch().await?;
431 }
432 }
433
434 Ok(())
435 }
436
437 #[instrument(skip_all)]
438 async fn on_header_sub_message(&mut self, new_head: ExtendedHeader) -> Result<()> {
439 let new_head_height = new_head.height();
440
441 self.set_subjective_head_height(new_head_height);
442
443 if let Ok(store_head_height) = self.store.head_height().await {
444 if store_head_height + 1 == new_head_height {
446 if self.store.announce_insert(vec![new_head]).await.is_ok() {
449 self.event_pub.send(NodeEvent::AddedHeaderFromHeaderSub {
450 height: new_head_height,
451 });
452 }
453 }
454 }
455
456 Ok(())
457 }
458
459 fn set_subjective_head_height(&mut self, height: u64) {
460 if self
461 .subjective_head_height
462 .is_some_and(|old_height| height <= old_height)
463 {
464 return;
465 }
466
467 self.subjective_head_height = Some(height);
468 }
469
470 #[instrument(skip_all)]
471 async fn fetch_next_batch(&mut self) -> Result<()> {
472 debug_assert_eq!(
473 self.ongoing_batch.range.is_none(),
474 self.ongoing_batch.task.is_terminated()
475 );
476
477 if !self.ongoing_batch.task.is_terminated() {
478 return Ok(());
484 }
485
486 if self.p2p.peer_tracker_info().num_connected_peers == 0 {
487 return Ok(());
490 }
491
492 let Some(subjective_head_height) = self.subjective_head_height else {
493 return Ok(());
495 };
496
497 let store_ranges = self.store.get_stored_header_ranges().await?;
498 let pruned_ranges = self.store.get_pruned_ranges().await?;
499
500 let synced_ranges = pruned_ranges + &store_ranges;
503
504 let next_batch = calculate_range_to_fetch(
505 subjective_head_height,
506 synced_ranges.as_ref(),
507 self.batch_size,
508 );
509
510 if next_batch.is_empty() {
511 return Ok(());
513 }
514
515 if self
517 .highest_slow_sync_height
518 .is_some_and(|height| *next_batch.end() <= height)
519 {
520 let threshold = (self.batch_size / 2).max(SLOW_SYNC_MIN_THRESHOLD);
522
523 let sampled_ranges = self.store.get_sampled_ranges().await?;
525 let available_for_sampling = (store_ranges - sampled_ranges).len();
526
527 if available_for_sampling > threshold {
529 return Ok(());
531 }
532 }
533
534 match self.store.get_by_height(next_batch.end() + 1).await {
536 Ok(known_header) => {
537 if !self.in_sampling_window(&known_header) {
538 return Ok(());
539 }
540 }
541 Err(StoreError::NotFound) => {}
542 Err(e) => return Err(e.into()),
543 }
544
545 self.event_pub.send(NodeEvent::FetchingHeadersStarted {
546 from_height: *next_batch.start(),
547 to_height: *next_batch.end(),
548 });
549
550 let p2p = self.p2p.clone();
551
552 self.ongoing_batch.range = Some(next_batch.clone());
553
554 self.ongoing_batch.task.set(async move {
555 let now = Instant::now();
556 let res = p2p.get_unverified_header_range(next_batch).await;
557 (res, now.elapsed())
558 });
559
560 Ok(())
561 }
562
563 #[instrument(skip_all)]
565 async fn on_fetch_next_batch_result(
566 &mut self,
567 res: Result<Vec<ExtendedHeader>, P2pError>,
568 took: Duration,
569 ) -> Result<()> {
570 let range = self
571 .ongoing_batch
572 .range
573 .take()
574 .expect("ongoing_batch not initialized correctly");
575
576 let from_height = *range.start();
577 let to_height = *range.end();
578
579 let headers = match res {
580 Ok(headers) => headers,
581 Err(e) => {
582 if e.is_fatal() {
583 return Err(e.into());
584 }
585
586 self.event_pub.send(NodeEvent::FetchingHeadersFailed {
587 from_height,
588 to_height,
589 error: e.to_string(),
590 took,
591 });
592
593 return Ok(());
594 }
595 };
596
597 let pruning_cutoff = Time::now().saturating_sub(self.pruning_window);
598
599 for header in headers.iter().rev() {
602 if self
603 .highest_slow_sync_height
604 .is_some_and(|height| header.height() <= height)
605 {
606 break;
609 }
610
611 if header.time() <= pruning_cutoff {
614 self.highest_slow_sync_height = Some(header.height());
615 break;
616 }
617 }
618
619 if let Err(e) = self.store.announce_insert(headers).await {
620 if e.is_fatal() {
621 return Err(e.into());
622 }
623
624 self.event_pub.send(NodeEvent::FetchingHeadersFailed {
625 from_height,
626 to_height,
627 error: format!("Failed to store headers: {e}"),
628 took,
629 });
630 }
631
632 self.event_pub.send(NodeEvent::FetchingHeadersFinished {
633 from_height,
634 to_height,
635 took,
636 });
637
638 Ok(())
639 }
640
641 fn in_sampling_window(&self, header: &ExtendedHeader) -> bool {
642 let sampling_window_end = Time::now().saturating_sub(self.sampling_window);
643 header.time().after(sampling_window_end)
644 }
645}
646
647fn calculate_range_to_fetch(
650 subjective_head_height: u64,
651 synced_headers: &[BlockRange],
652 limit: u64,
653) -> BlockRange {
654 let mut synced_headers_iter = synced_headers.iter().rev();
655
656 let Some(synced_head_range) = synced_headers_iter.next() else {
657 let range = 1..=subjective_head_height;
659 return range.tailn(limit);
660 };
661
662 if synced_head_range.end() < &subjective_head_height {
663 let range = synced_head_range.end() + 1..=subjective_head_height;
665 return range.tailn(limit);
666 }
667
668 let penultimate_range_end = synced_headers_iter.next().map(|r| *r.end()).unwrap_or(0);
670
671 let range = penultimate_range_end + 1..=synced_head_range.start().saturating_sub(1);
672 range.headn(limit)
673}
674
675#[instrument(skip_all)]
676async fn try_init_task<S>(
677 p2p: Arc<P2p>,
678 store: Arc<S>,
679 event_pub: EventPublisher,
680) -> Result<(ExtendedHeader, Duration)>
681where
682 S: Store + 'static,
683{
684 let now = Instant::now();
685 let mut event_reported = false;
686 let mut backoff = ExponentialBackoffBuilder::default()
687 .with_max_interval(TRY_INIT_BACKOFF_MAX_INTERVAL)
688 .with_max_elapsed_time(None)
689 .build();
690
691 loop {
692 match try_init(&p2p, &*store, &event_pub, &mut event_reported).await {
693 Ok(network_head) => {
694 return Ok((network_head, now.elapsed()));
695 }
696 Err(e) if e.is_fatal() => {
697 return Err(e);
698 }
699 Err(e) => {
700 let sleep_dur = backoff
701 .next_backoff()
702 .expect("backoff never stops retrying");
703
704 warn!(
705 "Initialization of subjective head failed: {e}. Trying again in {sleep_dur:?}."
706 );
707 sleep(sleep_dur).await;
708 }
709 }
710 }
711}
712
713async fn try_init<S>(
714 p2p: &P2p,
715 store: &S,
716 event_pub: &EventPublisher,
717 event_reported: &mut bool,
718) -> Result<ExtendedHeader>
719where
720 S: Store,
721{
722 p2p.wait_connected_trusted().await?;
723
724 #[cfg(not(test))]
729 sleep(Duration::from_secs(1)).await;
730
731 if !*event_reported {
732 event_pub.send(NodeEvent::FetchingHeadHeaderStarted);
733 *event_reported = true;
734 }
735
736 let network_head = p2p.get_head_header().await?;
737
738 let try_insert = match store.get_head().await {
745 Ok(store_head) => store_head.hash() != network_head.hash(),
748 Err(StoreError::NotFound) => true,
749 Err(e) => return Err(e.into()),
750 };
751
752 if try_insert {
753 store.insert(network_head.clone()).await?;
756 }
757
758 Ok(network_head)
759}
760
761async fn header_sub_recv(
762 rx: Option<&mut mpsc::Receiver<ExtendedHeader>>,
763) -> Result<ExtendedHeader> {
764 rx.expect("header-sub not initialized")
765 .recv()
766 .await
767 .ok_or(SyncerError::P2p(P2pError::WorkerDied))
768}
769
770#[cfg(test)]
771mod tests {
772 use std::ops::RangeInclusive;
773
774 use super::*;
775 use crate::block_ranges::{BlockRange, BlockRangeExt};
776 use crate::events::EventChannel;
777 use crate::node::{DEFAULT_PRUNING_WINDOW, HeaderExError, SAMPLING_WINDOW};
778 use crate::p2p::header_session;
779 use crate::store::InMemoryStore;
780 use crate::test_utils::{MockP2pHandle, gen_filled_store};
781 use crate::utils::OneshotResultSenderExt;
782 use celestia_types::test_utils::ExtendedHeaderGenerator;
783 use libp2p::request_response::OutboundFailure;
784 use lumina_utils::test_utils::async_test;
785 use tokio::sync::broadcast::error::RecvError;
786
787 #[test]
788 fn calculate_range_to_fetch_test_header_limit() {
789 let head_height = 1024;
790 let ranges = [256..=512];
791
792 let fetch_range = calculate_range_to_fetch(head_height, &ranges, 16);
793 assert_eq!(fetch_range, 513..=528);
794
795 let fetch_range = calculate_range_to_fetch(head_height, &ranges, 511);
796 assert_eq!(fetch_range, 513..=1023);
797 let fetch_range = calculate_range_to_fetch(head_height, &ranges, 512);
798 assert_eq!(fetch_range, 513..=1024);
799 let fetch_range = calculate_range_to_fetch(head_height, &ranges, 513);
800 assert_eq!(fetch_range, 513..=1024);
801
802 let fetch_range = calculate_range_to_fetch(head_height, &ranges, 1024);
803 assert_eq!(fetch_range, 513..=1024);
804 }
805
806 #[test]
807 fn calculate_range_to_fetch_empty_store() {
808 let fetch_range = calculate_range_to_fetch(1, &[], 100);
809 assert_eq!(fetch_range, 1..=1);
810
811 let fetch_range = calculate_range_to_fetch(100, &[], 10);
812 assert_eq!(fetch_range, 1..=10);
813
814 let fetch_range = calculate_range_to_fetch(100, &[], 50);
815 assert_eq!(fetch_range, 1..=50);
816 }
817
818 #[test]
819 fn calculate_range_to_fetch_fully_synced() {
820 let fetch_range = calculate_range_to_fetch(1, &[1..=1], 100);
821 assert!(fetch_range.is_empty());
822
823 let fetch_range = calculate_range_to_fetch(100, &[1..=100], 10);
824 assert!(fetch_range.is_empty());
825
826 let fetch_range = calculate_range_to_fetch(100, &[1..=100], 10);
827 assert!(fetch_range.is_empty());
828 }
829
830 #[test]
831 fn calculate_range_to_fetch_caught_up() {
832 let head_height = 4000;
833
834 let fetch_range = calculate_range_to_fetch(head_height, &[3000..=4000], 500);
835 assert_eq!(fetch_range, 2500..=2999);
836 let fetch_range = calculate_range_to_fetch(head_height, &[500..=1000, 3000..=4000], 500);
837 assert_eq!(fetch_range, 2500..=2999);
838 let fetch_range = calculate_range_to_fetch(head_height, &[2500..=2800, 3000..=4000], 500);
839 assert_eq!(fetch_range, 2801..=2999);
840 let fetch_range = calculate_range_to_fetch(head_height, &[2500..=2800, 3000..=4000], 500);
841 assert_eq!(fetch_range, 2801..=2999);
842 let fetch_range = calculate_range_to_fetch(head_height, &[300..=4000], 500);
843 assert_eq!(fetch_range, 1..=299);
844 }
845
846 #[test]
847 fn calculate_range_to_fetch_catching_up() {
848 let head_height = 4000;
849
850 let fetch_range = calculate_range_to_fetch(head_height, &[2000..=3000], 500);
851 assert_eq!(fetch_range, 3001..=3500);
852 let fetch_range = calculate_range_to_fetch(head_height, &[2000..=3500], 500);
853 assert_eq!(fetch_range, 3501..=4000);
854 let fetch_range = calculate_range_to_fetch(head_height, &[1..=2998, 3000..=3800], 500);
855 assert_eq!(fetch_range, 3801..=4000);
856 }
857
858 #[async_test]
859 async fn init_without_genesis_hash() {
860 let events = EventChannel::new();
861 let (mock, mut handle) = P2p::mocked();
862 let mut generator = ExtendedHeaderGenerator::new();
863 let header = generator.next();
864
865 let _syncer = Syncer::start(SyncerArgs {
866 p2p: Arc::new(mock),
867 store: Arc::new(InMemoryStore::new()),
868 event_pub: events.publisher(),
869 batch_size: 512,
870 sampling_window: SAMPLING_WINDOW,
871 pruning_window: DEFAULT_PRUNING_WINDOW,
872 })
873 .unwrap();
874
875 handle.expect_no_cmd().await;
877 handle.announce_peer_connected();
878 handle.expect_no_cmd().await;
879 handle.announce_trusted_peer_connected();
880
881 let (height, amount, respond_to) = handle.expect_header_request_for_height_cmd().await;
883 assert_eq!(height, 0);
884 assert_eq!(amount, 1);
885 respond_to.send(Ok(vec![header.clone()])).unwrap();
886
887 let head_from_syncer = handle.expect_init_header_sub().await;
889 assert_eq!(head_from_syncer, header);
890
891 handle.expect_no_cmd().await;
893 }
894
895 #[async_test]
896 async fn init_with_genesis_hash() {
897 let mut generator = ExtendedHeaderGenerator::new();
898 let head = generator.next();
899
900 let (_syncer, _store, mut p2p_mock) = initialized_syncer(head.clone()).await;
901
902 p2p_mock.expect_no_cmd().await;
904 }
905
906 #[async_test]
907 async fn syncing() {
908 let mut generator = ExtendedHeaderGenerator::new();
909 let headers = generator.next_many(1500);
910
911 let (syncer, store, mut p2p_mock) = initialized_syncer(headers[1499].clone()).await;
912 assert_syncing(&syncer, &store, &[1500..=1500], 1500).await;
913
914 handle_session_batch(&mut p2p_mock, &headers, 988..=1499, true).await;
916 assert_syncing(&syncer, &store, &[988..=1500], 1500).await;
917
918 handle_session_batch(&mut p2p_mock, &headers, 476..=987, true).await;
920 assert_syncing(&syncer, &store, &[476..=1500], 1500).await;
921
922 let header1501 = generator.next();
924 p2p_mock.announce_new_head(header1501.clone());
925 assert_syncing(&syncer, &store, &[476..=1501], 1501).await;
928
929 handle_session_batch(&mut p2p_mock, &headers, 1..=475, true).await;
931 assert_syncing(&syncer, &store, &[1..=1501], 1501).await;
932
933 p2p_mock.expect_no_cmd().await;
935
936 let header1502 = generator.next();
938 p2p_mock.announce_new_head(header1502.clone());
939 assert_syncing(&syncer, &store, &[1..=1502], 1502).await;
940 p2p_mock.expect_no_cmd().await;
941
942 let headers_1503_1505 = generator.next_many(3);
944 p2p_mock.announce_new_head(headers_1503_1505[2].clone());
945 assert_syncing(&syncer, &store, &[1..=1502], 1505).await;
946
947 handle_session_batch(&mut p2p_mock, &headers_1503_1505, 1503..=1505, true).await;
949 assert_syncing(&syncer, &store, &[1..=1505], 1505).await;
950
951 let mut headers = generator.next_many(1495);
953 p2p_mock.announce_new_head(headers[1494].clone());
954 assert_syncing(&syncer, &store, &[1..=1505], 3000).await;
955
956 handle_session_batch(&mut p2p_mock, &headers, 1506..=2017, true).await;
958 assert_syncing(&syncer, &store, &[1..=2017], 3000).await;
959
960 headers.push(generator.next());
962 p2p_mock.announce_new_head(headers.last().unwrap().clone());
963 assert_syncing(&syncer, &store, &[1..=2017], 3001).await;
964
965 handle_session_batch(&mut p2p_mock, &headers, 2018..=2529, true).await;
967 assert_syncing(&syncer, &store, &[1..=2529], 3001).await;
968
969 handle_session_batch(&mut p2p_mock, &headers, 2530..=3001, true).await;
971 assert_syncing(&syncer, &store, &[1..=3001], 3001).await;
972
973 p2p_mock.expect_no_cmd().await;
975 }
976
977 #[async_test]
978 async fn slow_sync() {
979 let pruning_window = Duration::from_secs(600);
980 let sampling_window = SAMPLING_WINDOW;
981
982 let mut generator = ExtendedHeaderGenerator::new();
983
984 let first_header_time = (Time::now() - Duration::from_secs(2048)).unwrap();
986 generator.set_time(first_header_time, Duration::from_secs(1));
987 let headers = generator.next_many(2048);
988
989 let (syncer, store, mut p2p_mock) =
990 initialized_syncer_with_windows(headers[2047].clone(), sampling_window, pruning_window)
991 .await;
992 assert_syncing(&syncer, &store, &[2048..=2048], 2048).await;
993
994 handle_session_batch(&mut p2p_mock, &headers, 1536..=2047, true).await;
996 assert_syncing(&syncer, &store, &[1536..=2048], 2048).await;
997
998 handle_session_batch(&mut p2p_mock, &headers, 1024..=1535, true).await;
1000 assert_syncing(&syncer, &store, &[1024..=2048], 2048).await;
1001
1002 syncer.trigger_fetch_next_batch().await.unwrap();
1006 p2p_mock.expect_no_cmd().await;
1007
1008 for height in 1250..=2048 {
1011 store.mark_as_sampled(height).await.unwrap();
1013 }
1014 for height in 1300..=1450 {
1015 store.remove_height(height).await.unwrap();
1017 }
1018 syncer.trigger_fetch_next_batch().await.unwrap();
1019 handle_session_batch(&mut p2p_mock, &headers, 512..=1023, true).await;
1021 assert_syncing(&syncer, &store, &[512..=1299, 1451..=2048], 2048).await;
1022
1023 syncer.trigger_fetch_next_batch().await.unwrap();
1026 p2p_mock.expect_no_cmd().await;
1027 }
1028
1029 #[async_test]
1030 async fn window_edge() {
1031 let month_and_day_ago = Duration::from_secs(31 * 24 * 60 * 60);
1032 let mut generator = ExtendedHeaderGenerator::new();
1033 generator.set_time(
1034 (Time::now() - month_and_day_ago).expect("to not underflow"),
1035 Duration::from_secs(1),
1036 );
1037 let mut headers = generator.next_many(1200);
1038 generator.reset_time();
1039 headers.append(&mut generator.next_many(2049 - 1200));
1040
1041 let (syncer, store, mut p2p_mock) = initialized_syncer(headers[2048].clone()).await;
1042 assert_syncing(&syncer, &store, &[2049..=2049], 2049).await;
1043
1044 handle_session_batch(&mut p2p_mock, &headers, 1537..=2048, true).await;
1046 assert_syncing(&syncer, &store, &[1537..=2049], 2049).await;
1047
1048 handle_session_batch(&mut p2p_mock, &headers, 1025..=1536, true).await;
1050 assert_syncing(&syncer, &store, &[1025..=2049], 2049).await;
1051
1052 p2p_mock.expect_no_cmd().await;
1054 }
1055
1056 #[async_test]
1057 async fn start_with_filled_store() {
1058 let events = EventChannel::new();
1059 let (p2p, mut p2p_mock) = P2p::mocked();
1060 let (store, mut generator) = gen_filled_store(25).await;
1061 let store = Arc::new(store);
1062
1063 let mut headers = generator.next_many(520);
1064 let network_head = generator.next(); let syncer = Syncer::start(SyncerArgs {
1067 p2p: Arc::new(p2p),
1068 store: store.clone(),
1069 event_pub: events.publisher(),
1070 batch_size: 512,
1071 sampling_window: SAMPLING_WINDOW,
1072 pruning_window: DEFAULT_PRUNING_WINDOW,
1073 })
1074 .unwrap();
1075
1076 p2p_mock.announce_trusted_peer_connected();
1077
1078 let (height, amount, respond_to) = p2p_mock.expect_header_request_for_height_cmd().await;
1080 assert_eq!(height, 0);
1081 assert_eq!(amount, 1);
1082 respond_to.send(Ok(vec![network_head.clone()])).unwrap();
1083
1084 let head_from_syncer = p2p_mock.expect_init_header_sub().await;
1086 assert_eq!(head_from_syncer, network_head);
1087
1088 assert_syncing(&syncer, &store, &[1..=25, 546..=546], 546).await;
1089
1090 handle_session_batch(&mut p2p_mock, &headers, 34..=545, true).await;
1092 assert_syncing(&syncer, &store, &[1..=25, 34..=546], 546).await;
1093
1094 let (height, amount, respond_to) = p2p_mock.expect_header_request_for_height_cmd().await;
1096 assert_eq!(height, 26);
1097 assert_eq!(amount, 8);
1098 respond_to
1099 .send(Ok(headers.drain(..8).collect()))
1100 .map_err(|_| "headers [538, 545]")
1101 .unwrap();
1102 assert_syncing(&syncer, &store, &[1..=546], 546).await;
1103
1104 p2p_mock.expect_no_cmd().await;
1106 }
1107
1108 #[async_test]
1109 async fn stop_syncer() {
1110 let mut generator = ExtendedHeaderGenerator::new();
1111 let head = generator.next();
1112
1113 let (syncer, _store, mut p2p_mock) = initialized_syncer(head.clone()).await;
1114
1115 p2p_mock.expect_no_cmd().await;
1117
1118 syncer.stop();
1119 sleep(Duration::from_millis(1)).await;
1121 assert!(matches!(
1122 syncer.info().await.unwrap_err(),
1123 SyncerError::WorkerDied
1124 ));
1125 }
1126
1127 #[async_test]
1128 async fn all_peers_disconnected() {
1129 let mut generator = ExtendedHeaderGenerator::new();
1130
1131 let _gap = generator.next_many(24);
1132 let header25 = generator.next();
1133 let _gap = generator.next_many(4);
1134 let header30 = generator.next();
1135 let _gap = generator.next_many(4);
1136 let header35 = generator.next();
1137
1138 let (syncer, store, mut p2p_mock) = initialized_syncer(header30).await;
1140
1141 handle_session_batch(&mut p2p_mock, &[], 1..=29, false).await;
1143
1144 p2p_mock.announce_all_peers_disconnected();
1145 p2p_mock.expect_no_cmd().await;
1147
1148 p2p_mock.announce_peer_connected();
1151 p2p_mock.expect_no_cmd().await;
1152
1153 p2p_mock.announce_trusted_peer_connected();
1155
1156 let (height, amount, respond_to) = p2p_mock.expect_header_request_for_height_cmd().await;
1158 assert_eq!(height, 0);
1159 assert_eq!(amount, 1);
1160
1161 respond_to.send(Ok(vec![header25])).unwrap();
1163 assert_syncing(&syncer, &store, &[30..=30], 30).await;
1164
1165 sleep(Duration::from_secs(1)).await;
1167 let (height, amount, respond_to) = p2p_mock.expect_header_request_for_height_cmd().await;
1168 assert_eq!(height, 0);
1169 assert_eq!(amount, 1);
1170
1171 respond_to.send(Ok(vec![header35.clone()])).unwrap();
1173 assert_syncing(&syncer, &store, &[30..=30, 35..=35], 35).await;
1174
1175 let head_from_syncer = p2p_mock.expect_init_header_sub().await;
1177 assert_eq!(head_from_syncer, header35);
1178
1179 let (height, amount, _respond_to) = p2p_mock.expect_header_request_for_height_cmd().await;
1182 assert_eq!(height, 31);
1183 assert_eq!(amount, 4);
1184
1185 p2p_mock.announce_all_peers_disconnected();
1186 p2p_mock.expect_no_cmd().await;
1187 }
1188
1189 #[async_test]
1190 async fn all_peers_disconnected_and_no_network_head_progress() {
1191 let mut generator = ExtendedHeaderGenerator::new_from_height(30);
1192
1193 let header30 = generator.next();
1194
1195 let (syncer, store, mut p2p_mock) = initialized_syncer(header30.clone()).await;
1197
1198 handle_session_batch(&mut p2p_mock, &[], 1..=29, false).await;
1200
1201 p2p_mock.announce_all_peers_disconnected();
1202 p2p_mock.expect_no_cmd().await;
1204
1205 p2p_mock.announce_peer_connected();
1208 p2p_mock.expect_no_cmd().await;
1209
1210 p2p_mock.announce_trusted_peer_connected();
1212
1213 let (height, amount, respond_to) = p2p_mock.expect_header_request_for_height_cmd().await;
1215 assert_eq!(height, 0);
1216 assert_eq!(amount, 1);
1217
1218 respond_to.send(Ok(vec![header30.clone()])).unwrap();
1220 assert_syncing(&syncer, &store, &[30..=30], 30).await;
1221
1222 let head_from_syncer = p2p_mock.expect_init_header_sub().await;
1224 assert_eq!(head_from_syncer, header30);
1225
1226 handle_session_batch(&mut p2p_mock, &[], 1..=29, false).await;
1228
1229 p2p_mock.announce_all_peers_disconnected();
1230 p2p_mock.expect_no_cmd().await;
1231 }
1232
1233 #[async_test]
1234 async fn non_contiguous_response() {
1235 let mut generator = ExtendedHeaderGenerator::new();
1236 let mut headers = generator.next_many(20);
1237
1238 let (syncer, store, mut p2p_mock) = initialized_syncer(headers[19].clone()).await;
1240
1241 let header10 = headers[10].clone();
1242 headers[10] = headers[11].clone();
1244
1245 handle_session_batch(&mut p2p_mock, &headers, 1..=19, true).await;
1247
1248 assert_syncing(&syncer, &store, &[20..=20], 20).await;
1250
1251 headers[10] = header10;
1253
1254 handle_session_batch(&mut p2p_mock, &headers, 1..=19, true).await;
1256
1257 assert_syncing(&syncer, &store, &[1..=20], 20).await;
1259 }
1260
1261 #[async_test]
1262 async fn another_chain_response() {
1263 let headers = ExtendedHeaderGenerator::new().next_many(20);
1264 let headers_prime = ExtendedHeaderGenerator::new().next_many(20);
1265
1266 let (syncer, store, mut p2p_mock) = initialized_syncer(headers[19].clone()).await;
1268
1269 handle_session_batch(&mut p2p_mock, &headers_prime, 1..=19, true).await;
1271
1272 assert_syncing(&syncer, &store, &[20..=20], 20).await;
1274
1275 handle_session_batch(&mut p2p_mock, &headers, 1..=19, true).await;
1277
1278 assert_syncing(&syncer, &store, &[1..=20], 20).await;
1280 }
1281
1282 #[async_test]
1283 async fn height_sequence_smoke() {
1284 let mut generator = ExtendedHeaderGenerator::new();
1285 let headers = generator.next_many(20);
1286 let (syncer, store, mut p2p_mock) = initialized_syncer(headers[9].clone()).await;
1287
1288 let mut header_receiver = syncer.subscribe_headers().await.unwrap();
1289 let (tx, mut received_heights) = mpsc::unbounded_channel();
1290 spawn(async move {
1291 while let Ok(header) = header_receiver.recv().await {
1292 tx.send(header.height()).unwrap();
1293 }
1294 });
1295
1296 p2p_mock.announce_new_head(headers[10].clone());
1298 assert_syncing(&syncer, &store, &[10..=11], 11).await;
1299 assert_eq!(received_heights.recv().await.unwrap(), 11);
1300
1301 p2p_mock.announce_new_head(headers[14].clone());
1303 assert_syncing(&syncer, &store, &[10..=11], 15).await;
1304 received_heights.try_recv().unwrap_err();
1305
1306 handle_session_batch(&mut p2p_mock, &headers, 2..=9, true).await;
1308 handle_session_batch(&mut p2p_mock, &headers, 1..=1, true).await;
1309 received_heights.try_recv().unwrap_err();
1310
1311 handle_session_batch(&mut p2p_mock, &headers, 12..=15, true).await;
1313 assert_syncing(&syncer, &store, &[1..=15], 15).await;
1314
1315 for i in 12..=15 {
1316 let height = received_heights.recv().await.unwrap();
1317 assert_eq!(height, i);
1318 }
1319
1320 p2p_mock.announce_new_head(headers[18].clone());
1322 assert_syncing(&syncer, &store, &[1..=15], 19).await;
1323 received_heights.try_recv().unwrap_err();
1324
1325 handle_session_batch(&mut p2p_mock, &headers, 16..=19, true).await;
1327 assert_syncing(&syncer, &store, &[1..=19], 19).await;
1328
1329 for i in 16..=19 {
1330 let height = received_heights.recv().await.unwrap();
1331 assert_eq!(height, i);
1332 }
1333 }
1334
1335 #[async_test]
1336 async fn height_sequence_handles_disconnect() {
1337 let mut generator = ExtendedHeaderGenerator::new();
1338 let headers = generator.next_many(20);
1339 let (syncer, store, mut p2p_mock) = initialized_syncer(headers[0].clone()).await;
1340 assert_syncing(&syncer, &store, &[1..=1], 1).await;
1341
1342 let mut header_receiver = syncer.subscribe_headers().await.unwrap();
1343 let (disconnect_tx, mut disconnect_signal) = oneshot::channel();
1344 let (tx, mut received_headers) = mpsc::unbounded_channel();
1345 spawn(async move {
1346 loop {
1347 select! {
1350 header = header_receiver.recv() => {
1351 let header = match header {
1352 Err(RecvError::Closed) => break,
1353 header_or_error => header_or_error.unwrap()
1354 };
1355 tx.send(Some(header)).unwrap();
1356 },
1357 _ = &mut disconnect_signal, if !disconnect_signal.is_terminated() => {
1358 tx.send(None).unwrap();
1359 }
1360 };
1361 }
1362 });
1363
1364 p2p_mock.announce_new_head(headers[1].clone());
1366 assert_syncing(&syncer, &store, &[1..=2], 2).await;
1367
1368 p2p_mock.announce_all_peers_disconnected();
1370 p2p_mock.expect_no_cmd().await;
1371 p2p_mock.announce_trusted_peer_connected();
1372
1373 disconnect_tx.send(()).unwrap();
1374
1375 let (height, amount, respond_to) = p2p_mock.expect_header_request_for_height_cmd().await;
1377 assert_eq!((height, amount), (0, 1));
1378
1379 respond_to.send(Ok(vec![headers[9].clone()])).unwrap();
1381 let _ = p2p_mock.expect_init_header_sub().await;
1382 assert_syncing(&syncer, &store, &[1..=2, 10..=10], 10).await;
1383
1384 handle_session_batch(&mut p2p_mock, &headers, 3..=9, true).await;
1385 assert_syncing(&syncer, &store, &[1..=10], 10).await;
1386
1387 p2p_mock.announce_new_head(headers[10].clone());
1389 assert_syncing(&syncer, &store, &[1..=11], 11).await;
1390
1391 assert_eq!(
1392 received_headers.recv().await.unwrap().map(|h| h.height()),
1393 Some(2)
1394 );
1395 assert_eq!(received_headers.recv().await.unwrap(), None);
1396 for i in 3..=11 {
1397 let header = received_headers.recv().await.unwrap().unwrap();
1398 assert_eq!(header.height(), i);
1399 }
1400 }
1401
1402 async fn assert_syncing(
1403 syncer: &Syncer<InMemoryStore>,
1404 store: &InMemoryStore,
1405 expected_synced_ranges: &[RangeInclusive<u64>],
1406 expected_subjective_head: u64,
1407 ) {
1408 sleep(Duration::from_millis(1)).await;
1411
1412 let store_ranges = store.get_stored_header_ranges().await.unwrap();
1413 let syncing_info = syncer.info().await.unwrap();
1414
1415 assert_eq!(store_ranges.as_ref(), expected_synced_ranges);
1416 assert_eq!(syncing_info.stored_headers.as_ref(), expected_synced_ranges);
1417 assert_eq!(syncing_info.subjective_head, expected_subjective_head);
1418 }
1419
1420 async fn initialized_syncer(
1421 head: ExtendedHeader,
1422 ) -> (Syncer<InMemoryStore>, Arc<InMemoryStore>, MockP2pHandle) {
1423 initialized_syncer_with_windows(head, SAMPLING_WINDOW, DEFAULT_PRUNING_WINDOW).await
1424 }
1425
1426 async fn initialized_syncer_with_windows(
1427 head: ExtendedHeader,
1428 sampling_window: Duration,
1429 pruning_window: Duration,
1430 ) -> (Syncer<InMemoryStore>, Arc<InMemoryStore>, MockP2pHandle) {
1431 let events = EventChannel::new();
1432 let (mock, mut handle) = P2p::mocked();
1433 let store = Arc::new(InMemoryStore::new());
1434
1435 let syncer = Syncer::start(SyncerArgs {
1436 p2p: Arc::new(mock),
1437 store: store.clone(),
1438 event_pub: events.publisher(),
1439 batch_size: 512,
1440 sampling_window,
1441 pruning_window,
1442 })
1443 .unwrap();
1444
1445 handle.expect_no_cmd().await;
1447 handle.announce_peer_connected();
1448 handle.expect_no_cmd().await;
1449 handle.announce_trusted_peer_connected();
1450
1451 let (height, amount, respond_to) = handle.expect_header_request_for_height_cmd().await;
1453 assert_eq!(height, 0);
1454 assert_eq!(amount, 1);
1455 respond_to.send(Ok(vec![head.clone()])).unwrap();
1456
1457 let head_from_syncer = handle.expect_init_header_sub().await;
1459 assert_eq!(head_from_syncer, head);
1460
1461 let head_height = head.height();
1462 assert_syncing(&syncer, &store, &[head_height..=head_height], head_height).await;
1463
1464 (syncer, store, handle)
1465 }
1466
1467 async fn handle_session_batch(
1468 p2p_mock: &mut MockP2pHandle,
1469 remaining_headers: &[ExtendedHeader],
1470 range: BlockRange,
1471 respond: bool,
1472 ) {
1473 range.validate().unwrap();
1474
1475 let mut ranges_to_request = BlockRanges::new();
1476 ranges_to_request.insert_relaxed(&range).unwrap();
1477
1478 let mut no_respond_chans = Vec::new();
1479
1480 for _ in 0..requests_in_session(range.len()) {
1481 let (height, amount, respond_to) =
1482 p2p_mock.expect_header_request_for_height_cmd().await;
1483
1484 let requested_range = height..=height + amount - 1;
1485 ranges_to_request.remove_strict(requested_range);
1486
1487 if respond {
1488 let header_index = remaining_headers
1489 .iter()
1490 .position(|h| h.height() == height)
1491 .expect("height not found in provided headers");
1492
1493 let response_range =
1494 remaining_headers[header_index..header_index + amount as usize].to_vec();
1495 respond_to
1496 .send(Ok(response_range))
1497 .map_err(|_| format!("headers [{}, {}]", height, height + amount - 1))
1498 .unwrap();
1499 } else {
1500 no_respond_chans.push(respond_to);
1501 }
1502 }
1503
1504 if !respond {
1508 spawn(async move {
1509 sleep(Duration::from_secs(10)).await;
1510
1511 for respond_chan in no_respond_chans {
1512 respond_chan.maybe_send_err(P2pError::HeaderEx(
1513 HeaderExError::OutboundFailure(OutboundFailure::Timeout),
1514 ));
1515 }
1516 });
1517 }
1518
1519 assert!(
1520 ranges_to_request.is_empty(),
1521 "Some headers weren't requested. expected range: {}, not requested: {}",
1522 range.display(),
1523 ranges_to_request
1524 );
1525 }
1526
1527 fn requests_in_session(headers: u64) -> usize {
1528 let max_requests = headers.div_ceil(header_session::MAX_AMOUNT_PER_REQ) as usize;
1529 let min_requests = headers.div_ceil(header_session::MIN_AMOUNT_PER_REQ) as usize;
1530
1531 if max_requests > header_session::MAX_CONCURRENT_REQS {
1532 max_requests
1534 } else {
1535 header_session::MAX_CONCURRENT_REQS.min(min_requests)
1537 }
1538 }
1539
1540 impl BlockRanges {
1541 fn remove_strict(&mut self, range: BlockRange) {
1542 for stored in self.as_ref() {
1543 if stored.contains(range.start()) && stored.contains(range.end()) {
1544 self.remove_relaxed(range).unwrap();
1545 return;
1546 }
1547 }
1548
1549 panic!("block ranges ({self}) don't contain {}", range.display());
1550 }
1551 }
1552}