1use std::marker::PhantomData;
4use std::pin::pin;
5use std::sync::Arc;
6use std::time::Duration;
7
8use backoff::backoff::Backoff;
9use backoff::ExponentialBackoffBuilder;
10use celestia_types::ExtendedHeader;
11use lumina_utils::executor::{spawn, JoinHandle};
12use lumina_utils::time::{sleep, Instant, Interval};
13use serde::{Deserialize, Serialize};
14use tendermint::Time;
15use tokio::select;
16use tokio::sync::{mpsc, oneshot};
17use tokio_util::sync::CancellationToken;
18use tracing::{debug, error, info, instrument, warn};
19
20use crate::block_ranges::{BlockRange, BlockRangeExt, BlockRanges};
21use crate::events::{EventPublisher, NodeEvent};
22use crate::p2p::{P2p, P2pError};
23use crate::store::{Store, StoreError};
24use crate::utils::{FusedReusableFuture, OneshotSenderExt, TimeExt};
25
26type Result<T, E = SyncerError> = std::result::Result<T, E>;
27
28const TRY_INIT_BACKOFF_MAX_INTERVAL: Duration = Duration::from_secs(60);
29const SLOW_SYNC_MIN_THRESHOLD: u64 = 50;
30
31#[derive(Debug, thiserror::Error)]
33pub enum SyncerError {
34 #[error("P2p: {0}")]
36 P2p(#[from] P2pError),
37
38 #[error("Store: {0}")]
40 Store(#[from] StoreError),
41
42 #[error("Worker died")]
44 WorkerDied,
45
46 #[error("Channel closed unexpectedly")]
48 ChannelClosedUnexpectedly,
49}
50
51impl SyncerError {
52 pub(crate) fn is_fatal(&self) -> bool {
53 match self {
54 SyncerError::P2p(e) => e.is_fatal(),
55 SyncerError::Store(e) => e.is_fatal(),
56 SyncerError::WorkerDied | SyncerError::ChannelClosedUnexpectedly => true,
57 }
58 }
59}
60
61impl From<oneshot::error::RecvError> for SyncerError {
62 fn from(_value: oneshot::error::RecvError) -> Self {
63 SyncerError::ChannelClosedUnexpectedly
64 }
65}
66
67#[derive(Debug)]
69pub(crate) struct Syncer<S>
70where
71 S: Store + 'static,
72{
73 cmd_tx: mpsc::Sender<SyncerCmd>,
74 cancellation_token: CancellationToken,
75 join_handle: JoinHandle,
76 _store: PhantomData<S>,
77}
78
79pub(crate) struct SyncerArgs<S>
81where
82 S: Store + 'static,
83{
84 pub(crate) p2p: Arc<P2p>,
86 pub(crate) store: Arc<S>,
88 pub(crate) event_pub: EventPublisher,
90 pub(crate) batch_size: u64,
92 pub(crate) sampling_window: Duration,
94 pub(crate) pruning_window: Duration,
96}
97
98#[derive(Debug)]
99enum SyncerCmd {
100 GetInfo {
101 respond_to: oneshot::Sender<SyncingInfo>,
102 },
103 #[cfg(test)]
104 TriggerFetchNextBatch,
105}
106
107#[derive(Debug, Serialize, Deserialize)]
109pub struct SyncingInfo {
110 pub stored_headers: BlockRanges,
112 pub subjective_head: u64,
114}
115
116impl<S> Syncer<S>
117where
118 S: Store,
119{
120 pub(crate) fn start(args: SyncerArgs<S>) -> Result<Self> {
122 let cancellation_token = CancellationToken::new();
123 let event_pub = args.event_pub.clone();
124 let (cmd_tx, cmd_rx) = mpsc::channel(16);
125 let mut worker = Worker::new(args, cancellation_token.child_token(), cmd_rx)?;
126
127 let join_handle = spawn(async move {
128 if let Err(e) = worker.run().await {
129 error!("Syncer stopped because of a fatal error: {e}");
130
131 event_pub.send(NodeEvent::FatalSyncerError {
132 error: e.to_string(),
133 });
134 }
135 });
136
137 Ok(Syncer {
138 cancellation_token,
139 cmd_tx,
140 join_handle,
141 _store: PhantomData,
142 })
143 }
144
145 pub(crate) fn stop(&self) {
147 self.cancellation_token.cancel();
149 }
150
151 pub(crate) async fn join(&self) {
153 self.join_handle.join().await;
154 }
155
156 async fn send_command(&self, cmd: SyncerCmd) -> Result<()> {
157 self.cmd_tx
158 .send(cmd)
159 .await
160 .map_err(|_| SyncerError::WorkerDied)
161 }
162
163 pub(crate) async fn info(&self) -> Result<SyncingInfo> {
169 let (tx, rx) = oneshot::channel();
170
171 self.send_command(SyncerCmd::GetInfo { respond_to: tx })
172 .await?;
173
174 Ok(rx.await?)
175 }
176
177 #[cfg(test)]
178 async fn trigger_fetch_next_batch(&self) -> Result<()> {
179 self.send_command(SyncerCmd::TriggerFetchNextBatch).await
180 }
181}
182
183impl<S> Drop for Syncer<S>
184where
185 S: Store,
186{
187 fn drop(&mut self) {
188 self.stop();
189 }
190}
191
192struct Worker<S>
193where
194 S: Store + 'static,
195{
196 cancellation_token: CancellationToken,
197 cmd_rx: mpsc::Receiver<SyncerCmd>,
198 event_pub: EventPublisher,
199 p2p: Arc<P2p>,
200 store: Arc<S>,
201 header_sub_rx: Option<mpsc::Receiver<ExtendedHeader>>,
202 subjective_head_height: Option<u64>,
203 highest_slow_sync_height: Option<u64>,
204 batch_size: u64,
205 ongoing_batch: Ongoing,
206 sampling_window: Duration,
207 pruning_window: Duration,
208}
209
210struct Ongoing {
211 range: Option<BlockRange>,
212 task: FusedReusableFuture<(Result<Vec<ExtendedHeader>, P2pError>, Duration)>,
213}
214
215impl<S> Worker<S>
216where
217 S: Store,
218{
219 fn new(
220 args: SyncerArgs<S>,
221 cancellation_token: CancellationToken,
222 cmd_rx: mpsc::Receiver<SyncerCmd>,
223 ) -> Result<Self> {
224 Ok(Worker {
225 cancellation_token,
226 cmd_rx,
227 event_pub: args.event_pub,
228 p2p: args.p2p,
229 store: args.store,
230 header_sub_rx: None,
231 subjective_head_height: None,
232 highest_slow_sync_height: None,
233 batch_size: args.batch_size,
234 ongoing_batch: Ongoing {
235 range: None,
236 task: FusedReusableFuture::terminated(),
237 },
238 sampling_window: args.sampling_window,
239 pruning_window: args.pruning_window,
240 })
241 }
242
243 async fn run(&mut self) -> Result<()> {
244 loop {
245 if self.cancellation_token.is_cancelled() {
246 break;
247 }
248
249 self.connecting_event_loop().await?;
250
251 if self.cancellation_token.is_cancelled() {
252 break;
253 }
254
255 self.connected_event_loop().await?;
256 }
257
258 debug!("Syncer stopped");
259 Ok(())
260 }
261
262 async fn connecting_event_loop(&mut self) -> Result<()> {
267 debug!("Entering connecting_event_loop");
268
269 let mut report_interval = Interval::new(Duration::from_secs(60)).await;
270 self.report().await?;
271
272 let mut try_init_fut = pin!(try_init_task(
273 self.p2p.clone(),
274 self.store.clone(),
275 self.event_pub.clone()
276 ));
277
278 loop {
279 select! {
280 _ = self.cancellation_token.cancelled() => {
281 break;
282 }
283 _ = report_interval.tick() => {
284 self.report().await?;
285 }
286 res = &mut try_init_fut => {
287 let (network_head, took) = res?;
289 let network_head_height = network_head.height().value();
290
291 info!("Setting initial subjective head to {network_head_height}");
292 self.set_subjective_head_height(network_head_height);
293
294 let (header_sub_tx, header_sub_rx) = mpsc::channel(16);
295 self.p2p.init_header_sub(network_head, header_sub_tx).await?;
296 self.header_sub_rx = Some(header_sub_rx);
297
298 self.event_pub.send(NodeEvent::FetchingHeadHeaderFinished {
299 height: network_head_height,
300 took,
301 });
302
303 break;
304 }
305 Some(cmd) = self.cmd_rx.recv() => {
306 self.on_cmd(cmd).await?;
307 }
308 }
309 }
310
311 Ok(())
312 }
313
314 async fn connected_event_loop(&mut self) -> Result<()> {
319 debug!("Entering connected_event_loop");
320
321 let mut report_interval = Interval::new(Duration::from_secs(60)).await;
322 let mut peer_tracker_info_watcher = self.p2p.peer_tracker_info_watcher();
323
324 if peer_tracker_info_watcher.borrow().num_connected_peers == 0 {
326 warn!("All peers disconnected");
327 return Ok(());
328 }
329
330 self.fetch_next_batch().await?;
331 self.report().await?;
332
333 loop {
334 select! {
335 _ = self.cancellation_token.cancelled() => {
336 break;
337 }
338 _ = peer_tracker_info_watcher.changed() => {
339 if peer_tracker_info_watcher.borrow().num_connected_peers == 0 {
340 warn!("All peers disconnected");
341 break;
342 }
343 }
344 _ = report_interval.tick() => {
345 self.report().await?;
346 }
347 res = header_sub_recv(self.header_sub_rx.as_mut()) => {
348 let header = res?;
349 self.on_header_sub_message(header).await?;
350 self.fetch_next_batch().await?;
351 }
352 Some(cmd) = self.cmd_rx.recv() => {
353 self.on_cmd(cmd).await?;
354 }
355 (res, took) = &mut self.ongoing_batch.task => {
356 self.on_fetch_next_batch_result(res, took).await?;
357 self.fetch_next_batch().await?;
358 }
359 }
360 }
361
362 if let Some(ongoing) = self.ongoing_batch.range.take() {
363 warn!("Cancelling fetching of {}", ongoing.display());
364 self.ongoing_batch.task.terminate();
365 }
366
367 self.header_sub_rx.take();
368
369 Ok(())
370 }
371
372 async fn syncing_info(&self) -> Result<SyncingInfo> {
373 Ok(SyncingInfo {
374 stored_headers: self.store.get_stored_header_ranges().await?,
375 subjective_head: self.subjective_head_height.unwrap_or(0),
376 })
377 }
378
379 #[instrument(skip_all)]
380 async fn report(&mut self) -> Result<()> {
381 let SyncingInfo {
382 stored_headers,
383 subjective_head,
384 } = self.syncing_info().await?;
385
386 let ongoing_batch = self
387 .ongoing_batch
388 .range
389 .as_ref()
390 .map(|range| format!("{}", range.display()))
391 .unwrap_or_else(|| "None".to_string());
392
393 info!("syncing: head: {subjective_head}, stored headers: {stored_headers}, ongoing batches: {ongoing_batch}");
394 Ok(())
395 }
396
397 async fn on_cmd(&mut self, cmd: SyncerCmd) -> Result<()> {
398 match cmd {
399 SyncerCmd::GetInfo { respond_to } => {
400 let info = self.syncing_info().await?;
401 respond_to.maybe_send(info);
402 }
403 #[cfg(test)]
404 SyncerCmd::TriggerFetchNextBatch => {
405 self.fetch_next_batch().await?;
406 }
407 }
408
409 Ok(())
410 }
411
412 #[instrument(skip_all)]
413 async fn on_header_sub_message(&mut self, new_head: ExtendedHeader) -> Result<()> {
414 let new_head_height = new_head.height().value();
415
416 self.set_subjective_head_height(new_head_height);
417
418 if let Ok(store_head_height) = self.store.head_height().await {
419 if store_head_height + 1 == new_head_height {
421 if self.store.insert(new_head).await.is_ok() {
424 self.event_pub.send(NodeEvent::AddedHeaderFromHeaderSub {
425 height: new_head_height,
426 });
427 }
428 }
429 }
430
431 Ok(())
432 }
433
434 fn set_subjective_head_height(&mut self, height: u64) {
435 if let Some(old_height) = self.subjective_head_height {
436 if height <= old_height {
437 return;
438 }
439 }
440
441 self.subjective_head_height = Some(height);
442 }
443
444 #[instrument(skip_all)]
445 async fn fetch_next_batch(&mut self) -> Result<()> {
446 debug_assert_eq!(
447 self.ongoing_batch.range.is_none(),
448 self.ongoing_batch.task.is_terminated()
449 );
450
451 if !self.ongoing_batch.task.is_terminated() {
452 return Ok(());
458 }
459
460 if self.p2p.peer_tracker_info().num_connected_peers == 0 {
461 return Ok(());
464 }
465
466 let Some(subjective_head_height) = self.subjective_head_height else {
467 return Ok(());
469 };
470
471 let store_ranges = self.store.get_stored_header_ranges().await?;
472 let pruned_ranges = self.store.get_pruned_ranges().await?;
473
474 let synced_ranges = pruned_ranges + &store_ranges;
477
478 let next_batch = calculate_range_to_fetch(
479 subjective_head_height,
480 synced_ranges.as_ref(),
481 self.batch_size,
482 );
483
484 if next_batch.is_empty() {
485 return Ok(());
487 }
488
489 if self
491 .highest_slow_sync_height
492 .is_some_and(|height| *next_batch.end() <= height)
493 {
494 let threshold = (self.batch_size / 2).max(SLOW_SYNC_MIN_THRESHOLD);
496
497 let sampled_ranges = self.store.get_sampled_ranges().await?;
499 let available_for_sampling = (store_ranges - sampled_ranges).len();
500
501 if available_for_sampling > threshold {
503 return Ok(());
505 }
506 }
507
508 match self.store.get_by_height(next_batch.end() + 1).await {
510 Ok(known_header) => {
511 if !self.in_sampling_window(&known_header) {
512 return Ok(());
513 }
514 }
515 Err(StoreError::NotFound) => {}
516 Err(e) => return Err(e.into()),
517 }
518
519 self.event_pub.send(NodeEvent::FetchingHeadersStarted {
520 from_height: *next_batch.start(),
521 to_height: *next_batch.end(),
522 });
523
524 let p2p = self.p2p.clone();
525
526 self.ongoing_batch.range = Some(next_batch.clone());
527
528 self.ongoing_batch.task.set(async move {
529 let now = Instant::now();
530 let res = p2p.get_unverified_header_range(next_batch).await;
531 (res, now.elapsed())
532 });
533
534 Ok(())
535 }
536
537 #[instrument(skip_all)]
539 async fn on_fetch_next_batch_result(
540 &mut self,
541 res: Result<Vec<ExtendedHeader>, P2pError>,
542 took: Duration,
543 ) -> Result<()> {
544 let range = self
545 .ongoing_batch
546 .range
547 .take()
548 .expect("ongoing_batch not initialized correctly");
549
550 let from_height = *range.start();
551 let to_height = *range.end();
552
553 let headers = match res {
554 Ok(headers) => headers,
555 Err(e) => {
556 if e.is_fatal() {
557 return Err(e.into());
558 }
559
560 self.event_pub.send(NodeEvent::FetchingHeadersFailed {
561 from_height,
562 to_height,
563 error: e.to_string(),
564 took,
565 });
566
567 return Ok(());
568 }
569 };
570
571 let pruning_cutoff = Time::now().saturating_sub(self.pruning_window);
572
573 for header in headers.iter().rev() {
576 if self
577 .highest_slow_sync_height
578 .is_some_and(|height| header.height().value() <= height)
579 {
580 break;
583 }
584
585 if header.time() <= pruning_cutoff {
588 self.highest_slow_sync_height = Some(header.height().value());
589 break;
590 }
591 }
592
593 if let Err(e) = self.store.insert(headers).await {
594 if e.is_fatal() {
595 return Err(e.into());
596 }
597
598 self.event_pub.send(NodeEvent::FetchingHeadersFailed {
599 from_height,
600 to_height,
601 error: format!("Failed to store headers: {e}"),
602 took,
603 });
604 }
605
606 self.event_pub.send(NodeEvent::FetchingHeadersFinished {
607 from_height,
608 to_height,
609 took,
610 });
611
612 Ok(())
613 }
614
615 fn in_sampling_window(&self, header: &ExtendedHeader) -> bool {
616 let sampling_window_end = Time::now().saturating_sub(self.sampling_window);
617 header.time().after(sampling_window_end)
618 }
619}
620
621fn calculate_range_to_fetch(
624 subjective_head_height: u64,
625 synced_headers: &[BlockRange],
626 limit: u64,
627) -> BlockRange {
628 let mut synced_headers_iter = synced_headers.iter().rev();
629
630 let Some(synced_head_range) = synced_headers_iter.next() else {
631 let range = 1..=subjective_head_height;
633 return range.tailn(limit);
634 };
635
636 if synced_head_range.end() < &subjective_head_height {
637 let range = synced_head_range.end() + 1..=subjective_head_height;
639 return range.tailn(limit);
640 }
641
642 let penultimate_range_end = synced_headers_iter.next().map(|r| *r.end()).unwrap_or(0);
644
645 let range = penultimate_range_end + 1..=synced_head_range.start().saturating_sub(1);
646 range.headn(limit)
647}
648
649#[instrument(skip_all)]
650async fn try_init_task<S>(
651 p2p: Arc<P2p>,
652 store: Arc<S>,
653 event_pub: EventPublisher,
654) -> Result<(ExtendedHeader, Duration)>
655where
656 S: Store + 'static,
657{
658 let now = Instant::now();
659 let mut event_reported = false;
660 let mut backoff = ExponentialBackoffBuilder::default()
661 .with_max_interval(TRY_INIT_BACKOFF_MAX_INTERVAL)
662 .with_max_elapsed_time(None)
663 .build();
664
665 loop {
666 match try_init(&p2p, &*store, &event_pub, &mut event_reported).await {
667 Ok(network_head) => {
668 return Ok((network_head, now.elapsed()));
669 }
670 Err(e) if e.is_fatal() => {
671 return Err(e);
672 }
673 Err(e) => {
674 let sleep_dur = backoff
675 .next_backoff()
676 .expect("backoff never stops retrying");
677
678 warn!(
679 "Initialization of subjective head failed: {e}. Trying again in {sleep_dur:?}."
680 );
681 sleep(sleep_dur).await;
682 }
683 }
684 }
685}
686
687async fn try_init<S>(
688 p2p: &P2p,
689 store: &S,
690 event_pub: &EventPublisher,
691 event_reported: &mut bool,
692) -> Result<ExtendedHeader>
693where
694 S: Store,
695{
696 p2p.wait_connected_trusted().await?;
697
698 if !*event_reported {
699 event_pub.send(NodeEvent::FetchingHeadHeaderStarted);
700 *event_reported = true;
701 }
702
703 let network_head = p2p.get_head_header().await?;
704
705 let try_insert = match store.get_head().await {
712 Ok(store_head) => store_head.hash() != network_head.hash(),
715 Err(StoreError::NotFound) => true,
716 Err(e) => return Err(e.into()),
717 };
718
719 if try_insert {
720 store.insert(network_head.clone()).await?;
723 }
724
725 Ok(network_head)
726}
727
728async fn header_sub_recv(
729 rx: Option<&mut mpsc::Receiver<ExtendedHeader>>,
730) -> Result<ExtendedHeader> {
731 rx.expect("header-sub not initialized")
732 .recv()
733 .await
734 .ok_or(SyncerError::P2p(P2pError::WorkerDied))
735}
736
737#[cfg(test)]
738mod tests {
739 use std::ops::RangeInclusive;
740
741 use super::*;
742 use crate::block_ranges::{BlockRange, BlockRangeExt};
743 use crate::events::EventChannel;
744 use crate::node::{HeaderExError, DEFAULT_PRUNING_WINDOW, SAMPLING_WINDOW};
745 use crate::p2p::header_session;
746 use crate::store::InMemoryStore;
747 use crate::test_utils::{gen_filled_store, MockP2pHandle};
748 use crate::utils::OneshotResultSenderExt;
749 use celestia_types::test_utils::ExtendedHeaderGenerator;
750 use libp2p::request_response::OutboundFailure;
751 use lumina_utils::test_utils::async_test;
752
753 #[test]
754 fn calculate_range_to_fetch_test_header_limit() {
755 let head_height = 1024;
756 let ranges = [256..=512];
757
758 let fetch_range = calculate_range_to_fetch(head_height, &ranges, 16);
759 assert_eq!(fetch_range, 513..=528);
760
761 let fetch_range = calculate_range_to_fetch(head_height, &ranges, 511);
762 assert_eq!(fetch_range, 513..=1023);
763 let fetch_range = calculate_range_to_fetch(head_height, &ranges, 512);
764 assert_eq!(fetch_range, 513..=1024);
765 let fetch_range = calculate_range_to_fetch(head_height, &ranges, 513);
766 assert_eq!(fetch_range, 513..=1024);
767
768 let fetch_range = calculate_range_to_fetch(head_height, &ranges, 1024);
769 assert_eq!(fetch_range, 513..=1024);
770 }
771
772 #[test]
773 fn calculate_range_to_fetch_empty_store() {
774 let fetch_range = calculate_range_to_fetch(1, &[], 100);
775 assert_eq!(fetch_range, 1..=1);
776
777 let fetch_range = calculate_range_to_fetch(100, &[], 10);
778 assert_eq!(fetch_range, 1..=10);
779
780 let fetch_range = calculate_range_to_fetch(100, &[], 50);
781 assert_eq!(fetch_range, 1..=50);
782 }
783
784 #[test]
785 fn calculate_range_to_fetch_fully_synced() {
786 let fetch_range = calculate_range_to_fetch(1, &[1..=1], 100);
787 assert!(fetch_range.is_empty());
788
789 let fetch_range = calculate_range_to_fetch(100, &[1..=100], 10);
790 assert!(fetch_range.is_empty());
791
792 let fetch_range = calculate_range_to_fetch(100, &[1..=100], 10);
793 assert!(fetch_range.is_empty());
794 }
795
796 #[test]
797 fn calculate_range_to_fetch_caught_up() {
798 let head_height = 4000;
799
800 let fetch_range = calculate_range_to_fetch(head_height, &[3000..=4000], 500);
801 assert_eq!(fetch_range, 2500..=2999);
802 let fetch_range = calculate_range_to_fetch(head_height, &[500..=1000, 3000..=4000], 500);
803 assert_eq!(fetch_range, 2500..=2999);
804 let fetch_range = calculate_range_to_fetch(head_height, &[2500..=2800, 3000..=4000], 500);
805 assert_eq!(fetch_range, 2801..=2999);
806 let fetch_range = calculate_range_to_fetch(head_height, &[2500..=2800, 3000..=4000], 500);
807 assert_eq!(fetch_range, 2801..=2999);
808 let fetch_range = calculate_range_to_fetch(head_height, &[300..=4000], 500);
809 assert_eq!(fetch_range, 1..=299);
810 }
811
812 #[test]
813 fn calculate_range_to_fetch_catching_up() {
814 let head_height = 4000;
815
816 let fetch_range = calculate_range_to_fetch(head_height, &[2000..=3000], 500);
817 assert_eq!(fetch_range, 3001..=3500);
818 let fetch_range = calculate_range_to_fetch(head_height, &[2000..=3500], 500);
819 assert_eq!(fetch_range, 3501..=4000);
820 let fetch_range = calculate_range_to_fetch(head_height, &[1..=2998, 3000..=3800], 500);
821 assert_eq!(fetch_range, 3801..=4000);
822 }
823
824 #[async_test]
825 async fn init_without_genesis_hash() {
826 let events = EventChannel::new();
827 let (mock, mut handle) = P2p::mocked();
828 let mut gen = ExtendedHeaderGenerator::new();
829 let header = gen.next();
830
831 let _syncer = Syncer::start(SyncerArgs {
832 p2p: Arc::new(mock),
833 store: Arc::new(InMemoryStore::new()),
834 event_pub: events.publisher(),
835 batch_size: 512,
836 sampling_window: SAMPLING_WINDOW,
837 pruning_window: DEFAULT_PRUNING_WINDOW,
838 })
839 .unwrap();
840
841 handle.expect_no_cmd().await;
843 handle.announce_peer_connected();
844 handle.expect_no_cmd().await;
845 handle.announce_trusted_peer_connected();
846
847 let (height, amount, respond_to) = handle.expect_header_request_for_height_cmd().await;
849 assert_eq!(height, 0);
850 assert_eq!(amount, 1);
851 respond_to.send(Ok(vec![header.clone()])).unwrap();
852
853 let head_from_syncer = handle.expect_init_header_sub().await;
855 assert_eq!(head_from_syncer, header);
856
857 handle.expect_no_cmd().await;
859 }
860
861 #[async_test]
862 async fn init_with_genesis_hash() {
863 let mut gen = ExtendedHeaderGenerator::new();
864 let head = gen.next();
865
866 let (_syncer, _store, mut p2p_mock) = initialized_syncer(head.clone()).await;
867
868 p2p_mock.expect_no_cmd().await;
870 }
871
872 #[async_test]
873 async fn syncing() {
874 let mut gen = ExtendedHeaderGenerator::new();
875 let headers = gen.next_many(1500);
876
877 let (syncer, store, mut p2p_mock) = initialized_syncer(headers[1499].clone()).await;
878 assert_syncing(&syncer, &store, &[1500..=1500], 1500).await;
879
880 handle_session_batch(&mut p2p_mock, &headers, 988..=1499, true).await;
882 assert_syncing(&syncer, &store, &[988..=1500], 1500).await;
883
884 handle_session_batch(&mut p2p_mock, &headers, 476..=987, true).await;
886 assert_syncing(&syncer, &store, &[476..=1500], 1500).await;
887
888 let header1501 = gen.next();
890 p2p_mock.announce_new_head(header1501.clone());
891 assert_syncing(&syncer, &store, &[476..=1501], 1501).await;
894
895 handle_session_batch(&mut p2p_mock, &headers, 1..=475, true).await;
897 assert_syncing(&syncer, &store, &[1..=1501], 1501).await;
898
899 p2p_mock.expect_no_cmd().await;
901
902 let header1502 = gen.next();
904 p2p_mock.announce_new_head(header1502.clone());
905 assert_syncing(&syncer, &store, &[1..=1502], 1502).await;
906 p2p_mock.expect_no_cmd().await;
907
908 let headers_1503_1505 = gen.next_many(3);
910 p2p_mock.announce_new_head(headers_1503_1505[2].clone());
911 assert_syncing(&syncer, &store, &[1..=1502], 1505).await;
912
913 handle_session_batch(&mut p2p_mock, &headers_1503_1505, 1503..=1505, true).await;
915 assert_syncing(&syncer, &store, &[1..=1505], 1505).await;
916
917 let mut headers = gen.next_many(1495);
919 p2p_mock.announce_new_head(headers[1494].clone());
920 assert_syncing(&syncer, &store, &[1..=1505], 3000).await;
921
922 handle_session_batch(&mut p2p_mock, &headers, 1506..=2017, true).await;
924 assert_syncing(&syncer, &store, &[1..=2017], 3000).await;
925
926 headers.push(gen.next());
928 p2p_mock.announce_new_head(headers.last().unwrap().clone());
929 assert_syncing(&syncer, &store, &[1..=2017], 3001).await;
930
931 handle_session_batch(&mut p2p_mock, &headers, 2018..=2529, true).await;
933 assert_syncing(&syncer, &store, &[1..=2529], 3001).await;
934
935 handle_session_batch(&mut p2p_mock, &headers, 2530..=3001, true).await;
937 assert_syncing(&syncer, &store, &[1..=3001], 3001).await;
938
939 p2p_mock.expect_no_cmd().await;
941 }
942
943 #[async_test]
944 async fn slow_sync() {
945 let pruning_window = Duration::from_secs(600);
946 let sampling_window = SAMPLING_WINDOW;
947
948 let mut gen = ExtendedHeaderGenerator::new();
949
950 let first_header_time = (Time::now() - Duration::from_secs(2048)).unwrap();
952 gen.set_time(first_header_time, Duration::from_secs(1));
953 let headers = gen.next_many(2048);
954
955 let (syncer, store, mut p2p_mock) =
956 initialized_syncer_with_windows(headers[2047].clone(), sampling_window, pruning_window)
957 .await;
958 assert_syncing(&syncer, &store, &[2048..=2048], 2048).await;
959
960 handle_session_batch(&mut p2p_mock, &headers, 1536..=2047, true).await;
962 assert_syncing(&syncer, &store, &[1536..=2048], 2048).await;
963
964 handle_session_batch(&mut p2p_mock, &headers, 1024..=1535, true).await;
966 assert_syncing(&syncer, &store, &[1024..=2048], 2048).await;
967
968 syncer.trigger_fetch_next_batch().await.unwrap();
972 p2p_mock.expect_no_cmd().await;
973
974 for height in 1250..=2048 {
977 store.mark_as_sampled(height).await.unwrap();
979 }
980 for height in 1300..=1450 {
981 store.remove_height(height).await.unwrap();
983 }
984 syncer.trigger_fetch_next_batch().await.unwrap();
985 handle_session_batch(&mut p2p_mock, &headers, 512..=1023, true).await;
987 assert_syncing(&syncer, &store, &[512..=1299, 1451..=2048], 2048).await;
988
989 syncer.trigger_fetch_next_batch().await.unwrap();
992 p2p_mock.expect_no_cmd().await;
993 }
994
995 #[async_test]
996 async fn window_edge() {
997 let month_and_day_ago = Duration::from_secs(31 * 24 * 60 * 60);
998 let mut gen = ExtendedHeaderGenerator::new();
999 gen.set_time(
1000 (Time::now() - month_and_day_ago).expect("to not underflow"),
1001 Duration::from_secs(1),
1002 );
1003 let mut headers = gen.next_many(1200);
1004 gen.reset_time();
1005 headers.append(&mut gen.next_many(2049 - 1200));
1006
1007 let (syncer, store, mut p2p_mock) = initialized_syncer(headers[2048].clone()).await;
1008 assert_syncing(&syncer, &store, &[2049..=2049], 2049).await;
1009
1010 handle_session_batch(&mut p2p_mock, &headers, 1537..=2048, true).await;
1012 assert_syncing(&syncer, &store, &[1537..=2049], 2049).await;
1013
1014 handle_session_batch(&mut p2p_mock, &headers, 1025..=1536, true).await;
1016 assert_syncing(&syncer, &store, &[1025..=2049], 2049).await;
1017
1018 p2p_mock.expect_no_cmd().await;
1020 }
1021
1022 #[async_test]
1023 async fn start_with_filled_store() {
1024 let events = EventChannel::new();
1025 let (p2p, mut p2p_mock) = P2p::mocked();
1026 let (store, mut gen) = gen_filled_store(25).await;
1027 let store = Arc::new(store);
1028
1029 let mut headers = gen.next_many(520);
1030 let network_head = gen.next(); let syncer = Syncer::start(SyncerArgs {
1033 p2p: Arc::new(p2p),
1034 store: store.clone(),
1035 event_pub: events.publisher(),
1036 batch_size: 512,
1037 sampling_window: SAMPLING_WINDOW,
1038 pruning_window: DEFAULT_PRUNING_WINDOW,
1039 })
1040 .unwrap();
1041
1042 p2p_mock.announce_trusted_peer_connected();
1043
1044 let (height, amount, respond_to) = p2p_mock.expect_header_request_for_height_cmd().await;
1046 assert_eq!(height, 0);
1047 assert_eq!(amount, 1);
1048 respond_to.send(Ok(vec![network_head.clone()])).unwrap();
1049
1050 let head_from_syncer = p2p_mock.expect_init_header_sub().await;
1052 assert_eq!(head_from_syncer, network_head);
1053
1054 assert_syncing(&syncer, &store, &[1..=25, 546..=546], 546).await;
1055
1056 handle_session_batch(&mut p2p_mock, &headers, 34..=545, true).await;
1058 assert_syncing(&syncer, &store, &[1..=25, 34..=546], 546).await;
1059
1060 let (height, amount, respond_to) = p2p_mock.expect_header_request_for_height_cmd().await;
1062 assert_eq!(height, 26);
1063 assert_eq!(amount, 8);
1064 respond_to
1065 .send(Ok(headers.drain(..8).collect()))
1066 .map_err(|_| "headers [538, 545]")
1067 .unwrap();
1068 assert_syncing(&syncer, &store, &[1..=546], 546).await;
1069
1070 p2p_mock.expect_no_cmd().await;
1072 }
1073
1074 #[async_test]
1075 async fn stop_syncer() {
1076 let mut gen = ExtendedHeaderGenerator::new();
1077 let head = gen.next();
1078
1079 let (syncer, _store, mut p2p_mock) = initialized_syncer(head.clone()).await;
1080
1081 p2p_mock.expect_no_cmd().await;
1083
1084 syncer.stop();
1085 sleep(Duration::from_millis(1)).await;
1087 assert!(matches!(
1088 syncer.info().await.unwrap_err(),
1089 SyncerError::WorkerDied
1090 ));
1091 }
1092
1093 #[async_test]
1094 async fn all_peers_disconnected() {
1095 let mut gen = ExtendedHeaderGenerator::new();
1096
1097 let _gap = gen.next_many(24);
1098 let header25 = gen.next();
1099 let _gap = gen.next_many(4);
1100 let header30 = gen.next();
1101 let _gap = gen.next_many(4);
1102 let header35 = gen.next();
1103
1104 let (syncer, store, mut p2p_mock) = initialized_syncer(header30).await;
1106
1107 handle_session_batch(&mut p2p_mock, &[], 1..=29, false).await;
1109
1110 p2p_mock.announce_all_peers_disconnected();
1111 p2p_mock.expect_no_cmd().await;
1113
1114 p2p_mock.announce_peer_connected();
1117 p2p_mock.expect_no_cmd().await;
1118
1119 p2p_mock.announce_trusted_peer_connected();
1121
1122 let (height, amount, respond_to) = p2p_mock.expect_header_request_for_height_cmd().await;
1124 assert_eq!(height, 0);
1125 assert_eq!(amount, 1);
1126
1127 respond_to.send(Ok(vec![header25])).unwrap();
1129 assert_syncing(&syncer, &store, &[30..=30], 30).await;
1130
1131 sleep(Duration::from_secs(1)).await;
1133 let (height, amount, respond_to) = p2p_mock.expect_header_request_for_height_cmd().await;
1134 assert_eq!(height, 0);
1135 assert_eq!(amount, 1);
1136
1137 respond_to.send(Ok(vec![header35.clone()])).unwrap();
1139 assert_syncing(&syncer, &store, &[30..=30, 35..=35], 35).await;
1140
1141 let head_from_syncer = p2p_mock.expect_init_header_sub().await;
1143 assert_eq!(head_from_syncer, header35);
1144
1145 let (height, amount, _respond_to) = p2p_mock.expect_header_request_for_height_cmd().await;
1148 assert_eq!(height, 31);
1149 assert_eq!(amount, 4);
1150
1151 p2p_mock.announce_all_peers_disconnected();
1152 p2p_mock.expect_no_cmd().await;
1153 }
1154
1155 #[async_test]
1156 async fn all_peers_disconnected_and_no_network_head_progress() {
1157 let mut gen = ExtendedHeaderGenerator::new_from_height(30);
1158
1159 let header30 = gen.next();
1160
1161 let (syncer, store, mut p2p_mock) = initialized_syncer(header30.clone()).await;
1163
1164 handle_session_batch(&mut p2p_mock, &[], 1..=29, false).await;
1166
1167 p2p_mock.announce_all_peers_disconnected();
1168 p2p_mock.expect_no_cmd().await;
1170
1171 p2p_mock.announce_peer_connected();
1174 p2p_mock.expect_no_cmd().await;
1175
1176 p2p_mock.announce_trusted_peer_connected();
1178
1179 let (height, amount, respond_to) = p2p_mock.expect_header_request_for_height_cmd().await;
1181 assert_eq!(height, 0);
1182 assert_eq!(amount, 1);
1183
1184 respond_to.send(Ok(vec![header30.clone()])).unwrap();
1186 assert_syncing(&syncer, &store, &[30..=30], 30).await;
1187
1188 let head_from_syncer = p2p_mock.expect_init_header_sub().await;
1190 assert_eq!(head_from_syncer, header30);
1191
1192 handle_session_batch(&mut p2p_mock, &[], 1..=29, false).await;
1194
1195 p2p_mock.announce_all_peers_disconnected();
1196 p2p_mock.expect_no_cmd().await;
1197 }
1198
1199 #[async_test]
1200 async fn non_contiguous_response() {
1201 let mut gen = ExtendedHeaderGenerator::new();
1202 let mut headers = gen.next_many(20);
1203
1204 let (syncer, store, mut p2p_mock) = initialized_syncer(headers[19].clone()).await;
1206
1207 let header10 = headers[10].clone();
1208 headers[10] = headers[11].clone();
1210
1211 handle_session_batch(&mut p2p_mock, &headers, 1..=19, true).await;
1213
1214 assert_syncing(&syncer, &store, &[20..=20], 20).await;
1216
1217 headers[10] = header10;
1219
1220 handle_session_batch(&mut p2p_mock, &headers, 1..=19, true).await;
1222
1223 assert_syncing(&syncer, &store, &[1..=20], 20).await;
1225 }
1226
1227 #[async_test]
1228 async fn another_chain_response() {
1229 let headers = ExtendedHeaderGenerator::new().next_many(20);
1230 let headers_prime = ExtendedHeaderGenerator::new().next_many(20);
1231
1232 let (syncer, store, mut p2p_mock) = initialized_syncer(headers[19].clone()).await;
1234
1235 handle_session_batch(&mut p2p_mock, &headers_prime, 1..=19, true).await;
1237
1238 assert_syncing(&syncer, &store, &[20..=20], 20).await;
1240
1241 handle_session_batch(&mut p2p_mock, &headers, 1..=19, true).await;
1243
1244 assert_syncing(&syncer, &store, &[1..=20], 20).await;
1246 }
1247
1248 async fn assert_syncing(
1249 syncer: &Syncer<InMemoryStore>,
1250 store: &InMemoryStore,
1251 expected_synced_ranges: &[RangeInclusive<u64>],
1252 expected_subjective_head: u64,
1253 ) {
1254 sleep(Duration::from_millis(1)).await;
1257
1258 let store_ranges = store.get_stored_header_ranges().await.unwrap();
1259 let syncing_info = syncer.info().await.unwrap();
1260
1261 assert_eq!(store_ranges.as_ref(), expected_synced_ranges);
1262 assert_eq!(syncing_info.stored_headers.as_ref(), expected_synced_ranges);
1263 assert_eq!(syncing_info.subjective_head, expected_subjective_head);
1264 }
1265
1266 async fn initialized_syncer(
1267 head: ExtendedHeader,
1268 ) -> (Syncer<InMemoryStore>, Arc<InMemoryStore>, MockP2pHandle) {
1269 initialized_syncer_with_windows(head, SAMPLING_WINDOW, DEFAULT_PRUNING_WINDOW).await
1270 }
1271
1272 async fn initialized_syncer_with_windows(
1273 head: ExtendedHeader,
1274 sampling_window: Duration,
1275 pruning_window: Duration,
1276 ) -> (Syncer<InMemoryStore>, Arc<InMemoryStore>, MockP2pHandle) {
1277 let events = EventChannel::new();
1278 let (mock, mut handle) = P2p::mocked();
1279 let store = Arc::new(InMemoryStore::new());
1280
1281 let syncer = Syncer::start(SyncerArgs {
1282 p2p: Arc::new(mock),
1283 store: store.clone(),
1284 event_pub: events.publisher(),
1285 batch_size: 512,
1286 sampling_window,
1287 pruning_window,
1288 })
1289 .unwrap();
1290
1291 handle.expect_no_cmd().await;
1293 handle.announce_peer_connected();
1294 handle.expect_no_cmd().await;
1295 handle.announce_trusted_peer_connected();
1296
1297 let (height, amount, respond_to) = handle.expect_header_request_for_height_cmd().await;
1299 assert_eq!(height, 0);
1300 assert_eq!(amount, 1);
1301 respond_to.send(Ok(vec![head.clone()])).unwrap();
1302
1303 let head_from_syncer = handle.expect_init_header_sub().await;
1305 assert_eq!(head_from_syncer, head);
1306
1307 let head_height = head.height().value();
1308 assert_syncing(&syncer, &store, &[head_height..=head_height], head_height).await;
1309
1310 (syncer, store, handle)
1311 }
1312
1313 async fn handle_session_batch(
1314 p2p_mock: &mut MockP2pHandle,
1315 remaining_headers: &[ExtendedHeader],
1316 range: BlockRange,
1317 respond: bool,
1318 ) {
1319 range.validate().unwrap();
1320
1321 let mut ranges_to_request = BlockRanges::new();
1322 ranges_to_request.insert_relaxed(&range).unwrap();
1323
1324 let mut no_respond_chans = Vec::new();
1325
1326 for _ in 0..requests_in_session(range.len()) {
1327 let (height, amount, respond_to) =
1328 p2p_mock.expect_header_request_for_height_cmd().await;
1329
1330 let requested_range = height..=height + amount - 1;
1331 ranges_to_request.remove_strict(requested_range);
1332
1333 if respond {
1334 let header_index = remaining_headers
1335 .iter()
1336 .position(|h| h.height().value() == height)
1337 .expect("height not found in provided headers");
1338
1339 let response_range =
1340 remaining_headers[header_index..header_index + amount as usize].to_vec();
1341 respond_to
1342 .send(Ok(response_range))
1343 .map_err(|_| format!("headers [{}, {}]", height, height + amount - 1))
1344 .unwrap();
1345 } else {
1346 no_respond_chans.push(respond_to);
1347 }
1348 }
1349
1350 if !respond {
1354 spawn(async move {
1355 sleep(Duration::from_secs(10)).await;
1356
1357 for respond_chan in no_respond_chans {
1358 respond_chan.maybe_send_err(P2pError::HeaderEx(
1359 HeaderExError::OutboundFailure(OutboundFailure::Timeout),
1360 ));
1361 }
1362 });
1363 }
1364
1365 assert!(
1366 ranges_to_request.is_empty(),
1367 "Some headers weren't requested. expected range: {}, not requested: {}",
1368 range.display(),
1369 ranges_to_request
1370 );
1371 }
1372
1373 fn requests_in_session(headers: u64) -> usize {
1374 let max_requests = headers.div_ceil(header_session::MAX_AMOUNT_PER_REQ) as usize;
1375 let min_requests = headers.div_ceil(header_session::MIN_AMOUNT_PER_REQ) as usize;
1376
1377 if max_requests > header_session::MAX_CONCURRENT_REQS {
1378 max_requests
1380 } else {
1381 header_session::MAX_CONCURRENT_REQS.min(min_requests)
1383 }
1384 }
1385
1386 impl BlockRanges {
1387 fn remove_strict(&mut self, range: BlockRange) {
1388 for stored in self.as_ref() {
1389 if stored.contains(range.start()) && stored.contains(range.end()) {
1390 self.remove_relaxed(range).unwrap();
1391 return;
1392 }
1393 }
1394
1395 panic!("block ranges ({self}) don't contain {}", range.display());
1396 }
1397 }
1398}