1use std::marker::PhantomData;
13use std::pin::pin;
14use std::sync::Arc;
15use std::time::Duration;
16
17use backoff::backoff::Backoff;
18use backoff::ExponentialBackoffBuilder;
19use celestia_types::ExtendedHeader;
20use lumina_utils::executor::{spawn, JoinHandle};
21use lumina_utils::time::{sleep, Instant, Interval};
22use serde::{Deserialize, Serialize};
23use tendermint::Time;
24use tokio::select;
25use tokio::sync::{mpsc, oneshot};
26use tokio_util::sync::CancellationToken;
27use tracing::{debug, error, info, instrument, warn};
28
29use crate::block_ranges::{BlockRange, BlockRangeExt, BlockRanges};
30use crate::events::{EventPublisher, NodeEvent};
31use crate::p2p::{P2p, P2pError};
32use crate::store::{Store, StoreError};
33use crate::utils::{FusedReusableFuture, OneshotSenderExt};
34
35type Result<T, E = SyncerError> = std::result::Result<T, E>;
36
37const TRY_INIT_BACKOFF_MAX_INTERVAL: Duration = Duration::from_secs(60);
38
39#[derive(Debug, thiserror::Error)]
41pub enum SyncerError {
42 #[error("P2p: {0}")]
44 P2p(#[from] P2pError),
45
46 #[error("Store: {0}")]
48 Store(#[from] StoreError),
49
50 #[error("Worker died")]
52 WorkerDied,
53
54 #[error("Channel closed unexpectedly")]
56 ChannelClosedUnexpectedly,
57}
58
59impl SyncerError {
60 pub(crate) fn is_fatal(&self) -> bool {
61 match self {
62 SyncerError::P2p(e) => e.is_fatal(),
63 SyncerError::Store(e) => e.is_fatal(),
64 SyncerError::WorkerDied | SyncerError::ChannelClosedUnexpectedly => true,
65 }
66 }
67}
68
69impl From<oneshot::error::RecvError> for SyncerError {
70 fn from(_value: oneshot::error::RecvError) -> Self {
71 SyncerError::ChannelClosedUnexpectedly
72 }
73}
74
75#[derive(Debug)]
77pub(crate) struct Syncer<S>
78where
79 S: Store + 'static,
80{
81 cmd_tx: mpsc::Sender<SyncerCmd>,
82 cancellation_token: CancellationToken,
83 join_handle: JoinHandle,
84 _store: PhantomData<S>,
85}
86
87pub(crate) struct SyncerArgs<S>
89where
90 S: Store + 'static,
91{
92 pub(crate) p2p: Arc<P2p>,
94 pub(crate) store: Arc<S>,
96 pub(crate) event_pub: EventPublisher,
98 pub(crate) batch_size: u64,
100 pub(crate) syncing_window: Duration,
102}
103
104#[derive(Debug)]
105enum SyncerCmd {
106 GetInfo {
107 respond_to: oneshot::Sender<SyncingInfo>,
108 },
109}
110
111#[derive(Debug, Serialize, Deserialize)]
113pub struct SyncingInfo {
114 pub stored_headers: BlockRanges,
116 pub subjective_head: u64,
118}
119
120impl<S> Syncer<S>
121where
122 S: Store,
123{
124 pub(crate) fn start(args: SyncerArgs<S>) -> Result<Self> {
126 let cancellation_token = CancellationToken::new();
127 let event_pub = args.event_pub.clone();
128 let (cmd_tx, cmd_rx) = mpsc::channel(16);
129 let mut worker = Worker::new(args, cancellation_token.child_token(), cmd_rx)?;
130
131 let join_handle = spawn(async move {
132 if let Err(e) = worker.run().await {
133 error!("Syncer stopped because of a fatal error: {e}");
134
135 event_pub.send(NodeEvent::FatalSyncerError {
136 error: e.to_string(),
137 });
138 }
139 });
140
141 Ok(Syncer {
142 cancellation_token,
143 cmd_tx,
144 join_handle,
145 _store: PhantomData,
146 })
147 }
148
149 pub(crate) fn stop(&self) {
151 self.cancellation_token.cancel();
153 }
154
155 pub(crate) async fn join(&self) {
157 self.join_handle.join().await;
158 }
159
160 async fn send_command(&self, cmd: SyncerCmd) -> Result<()> {
161 self.cmd_tx
162 .send(cmd)
163 .await
164 .map_err(|_| SyncerError::WorkerDied)
165 }
166
167 pub(crate) async fn info(&self) -> Result<SyncingInfo> {
173 let (tx, rx) = oneshot::channel();
174
175 self.send_command(SyncerCmd::GetInfo { respond_to: tx })
176 .await?;
177
178 Ok(rx.await?)
179 }
180}
181
182impl<S> Drop for Syncer<S>
183where
184 S: Store,
185{
186 fn drop(&mut self) {
187 self.stop();
188 }
189}
190
191struct Worker<S>
192where
193 S: Store + 'static,
194{
195 cancellation_token: CancellationToken,
196 cmd_rx: mpsc::Receiver<SyncerCmd>,
197 event_pub: EventPublisher,
198 p2p: Arc<P2p>,
199 store: Arc<S>,
200 header_sub_rx: Option<mpsc::Receiver<ExtendedHeader>>,
201 subjective_head_height: Option<u64>,
202 batch_size: u64,
203 ongoing_batch: Ongoing,
204 syncing_window: Duration,
205}
206
207struct Ongoing {
208 range: Option<BlockRange>,
209 task: FusedReusableFuture<(Result<Vec<ExtendedHeader>, P2pError>, Duration)>,
210}
211
212impl<S> Worker<S>
213where
214 S: Store,
215{
216 fn new(
217 args: SyncerArgs<S>,
218 cancellation_token: CancellationToken,
219 cmd_rx: mpsc::Receiver<SyncerCmd>,
220 ) -> Result<Self> {
221 Ok(Worker {
222 cancellation_token,
223 cmd_rx,
224 event_pub: args.event_pub,
225 p2p: args.p2p,
226 store: args.store,
227 header_sub_rx: None,
228 subjective_head_height: None,
229 batch_size: args.batch_size,
230 ongoing_batch: Ongoing {
231 range: None,
232 task: FusedReusableFuture::terminated(),
233 },
234 syncing_window: args.syncing_window,
235 })
236 }
237
238 async fn run(&mut self) -> Result<()> {
239 loop {
240 if self.cancellation_token.is_cancelled() {
241 break;
242 }
243
244 self.connecting_event_loop().await?;
245
246 if self.cancellation_token.is_cancelled() {
247 break;
248 }
249
250 self.connected_event_loop().await?;
251 }
252
253 debug!("Syncer stopped");
254 Ok(())
255 }
256
257 async fn connecting_event_loop(&mut self) -> Result<()> {
262 debug!("Entering connecting_event_loop");
263
264 let mut report_interval = Interval::new(Duration::from_secs(60)).await;
265 self.report().await?;
266
267 let mut try_init_fut = pin!(try_init_task(
268 self.p2p.clone(),
269 self.store.clone(),
270 self.event_pub.clone()
271 ));
272
273 loop {
274 select! {
275 _ = self.cancellation_token.cancelled() => {
276 break;
277 }
278 _ = report_interval.tick() => {
279 self.report().await?;
280 }
281 res = &mut try_init_fut => {
282 let (network_head, took) = res?;
284 let network_head_height = network_head.height().value();
285
286 info!("Setting initial subjective head to {network_head_height}");
287 self.set_subjective_head_height(network_head_height);
288
289 let (header_sub_tx, header_sub_rx) = mpsc::channel(16);
290 self.p2p.init_header_sub(network_head, header_sub_tx).await?;
291 self.header_sub_rx = Some(header_sub_rx);
292
293 self.event_pub.send(NodeEvent::FetchingHeadHeaderFinished {
294 height: network_head_height,
295 took,
296 });
297
298 break;
299 }
300 Some(cmd) = self.cmd_rx.recv() => {
301 self.on_cmd(cmd).await?;
302 }
303 }
304 }
305
306 Ok(())
307 }
308
309 async fn connected_event_loop(&mut self) -> Result<()> {
314 debug!("Entering connected_event_loop");
315
316 let mut report_interval = Interval::new(Duration::from_secs(60)).await;
317 let mut peer_tracker_info_watcher = self.p2p.peer_tracker_info_watcher();
318
319 if peer_tracker_info_watcher.borrow().num_connected_peers == 0 {
321 warn!("All peers disconnected");
322 return Ok(());
323 }
324
325 self.fetch_next_batch().await?;
326 self.report().await?;
327
328 loop {
329 select! {
330 _ = self.cancellation_token.cancelled() => {
331 break;
332 }
333 _ = peer_tracker_info_watcher.changed() => {
334 if peer_tracker_info_watcher.borrow().num_connected_peers == 0 {
335 warn!("All peers disconnected");
336 break;
337 }
338 }
339 _ = report_interval.tick() => {
340 self.report().await?;
341 }
342 res = header_sub_recv(self.header_sub_rx.as_mut()) => {
343 let header = res?;
344 self.on_header_sub_message(header).await?;
345 self.fetch_next_batch().await?;
346 }
347 Some(cmd) = self.cmd_rx.recv() => {
348 self.on_cmd(cmd).await?;
349 }
350 (res, took) = &mut self.ongoing_batch.task => {
351 self.on_fetch_next_batch_result(res, took).await?;
352 self.fetch_next_batch().await?;
353 }
354 }
355 }
356
357 if let Some(ongoing) = self.ongoing_batch.range.take() {
358 warn!("Cancelling fetching of {}", ongoing.display());
359 self.ongoing_batch.task.terminate();
360 }
361
362 self.header_sub_rx.take();
363
364 Ok(())
365 }
366
367 async fn syncing_info(&self) -> Result<SyncingInfo> {
368 Ok(SyncingInfo {
369 stored_headers: self.store.get_stored_header_ranges().await?,
370 subjective_head: self.subjective_head_height.unwrap_or(0),
371 })
372 }
373
374 #[instrument(skip_all)]
375 async fn report(&mut self) -> Result<()> {
376 let SyncingInfo {
377 stored_headers,
378 subjective_head,
379 } = self.syncing_info().await?;
380
381 let ongoing_batch = self
382 .ongoing_batch
383 .range
384 .as_ref()
385 .map(|range| format!("{}", range.display()))
386 .unwrap_or_else(|| "None".to_string());
387
388 info!("syncing: head: {subjective_head}, stored headers: {stored_headers}, ongoing batches: {ongoing_batch}");
389 Ok(())
390 }
391
392 async fn on_cmd(&mut self, cmd: SyncerCmd) -> Result<()> {
393 match cmd {
394 SyncerCmd::GetInfo { respond_to } => {
395 let info = self.syncing_info().await?;
396 respond_to.maybe_send(info);
397 }
398 }
399
400 Ok(())
401 }
402
403 #[instrument(skip_all)]
404 async fn on_header_sub_message(&mut self, new_head: ExtendedHeader) -> Result<()> {
405 let new_head_height = new_head.height().value();
406
407 self.set_subjective_head_height(new_head_height);
408
409 if let Ok(store_head_height) = self.store.head_height().await {
410 if store_head_height + 1 == new_head_height {
412 if self.store.insert(new_head).await.is_ok() {
415 self.event_pub.send(NodeEvent::AddedHeaderFromHeaderSub {
416 height: new_head_height,
417 });
418 }
419 }
420 }
421
422 Ok(())
423 }
424
425 fn set_subjective_head_height(&mut self, height: u64) {
426 if let Some(old_height) = self.subjective_head_height {
427 if height <= old_height {
428 return;
429 }
430 }
431
432 self.subjective_head_height = Some(height);
433 }
434
435 #[instrument(skip_all)]
436 async fn fetch_next_batch(&mut self) -> Result<()> {
437 debug_assert_eq!(
438 self.ongoing_batch.range.is_none(),
439 self.ongoing_batch.task.is_terminated()
440 );
441
442 if !self.ongoing_batch.task.is_terminated() {
443 return Ok(());
449 }
450
451 if self.p2p.peer_tracker_info().num_connected_peers == 0 {
452 return Ok(());
455 }
456
457 let Some(subjective_head_height) = self.subjective_head_height else {
458 return Ok(());
460 };
461
462 let store_ranges = self.store.get_stored_header_ranges().await?;
463
464 let next_batch = calculate_range_to_fetch(
465 subjective_head_height,
466 store_ranges.as_ref(),
467 self.batch_size,
468 );
469
470 if next_batch.is_empty() {
471 return Ok(());
473 }
474
475 match self.store.get_by_height(next_batch.end() + 1).await {
477 Ok(known_header) => {
478 if !self.in_syncing_window(&known_header) {
479 return Ok(());
480 }
481 }
482 Err(StoreError::NotFound) => {}
483 Err(e) => return Err(e.into()),
484 }
485
486 self.event_pub.send(NodeEvent::FetchingHeadersStarted {
487 from_height: *next_batch.start(),
488 to_height: *next_batch.end(),
489 });
490
491 let p2p = self.p2p.clone();
492
493 self.ongoing_batch.range = Some(next_batch.clone());
494
495 self.ongoing_batch.task.set(async move {
496 let now = Instant::now();
497 let res = p2p.get_unverified_header_range(next_batch).await;
498 (res, now.elapsed())
499 });
500
501 Ok(())
502 }
503
504 #[instrument(skip_all)]
506 async fn on_fetch_next_batch_result(
507 &mut self,
508 res: Result<Vec<ExtendedHeader>, P2pError>,
509 took: Duration,
510 ) -> Result<()> {
511 let range = self
512 .ongoing_batch
513 .range
514 .take()
515 .expect("ongoing_batch not initialized correctly");
516
517 let from_height = *range.start();
518 let to_height = *range.end();
519
520 let headers = match res {
521 Ok(headers) => headers,
522 Err(e) => {
523 if e.is_fatal() {
524 return Err(e.into());
525 }
526
527 self.event_pub.send(NodeEvent::FetchingHeadersFailed {
528 from_height,
529 to_height,
530 error: e.to_string(),
531 took,
532 });
533
534 return Ok(());
535 }
536 };
537
538 if let Err(e) = self.store.insert(headers).await {
539 if e.is_fatal() {
540 return Err(e.into());
541 }
542
543 self.event_pub.send(NodeEvent::FetchingHeadersFailed {
544 from_height,
545 to_height,
546 error: format!("Failed to store headers: {e}"),
547 took,
548 });
549 }
550
551 self.event_pub.send(NodeEvent::FetchingHeadersFinished {
552 from_height,
553 to_height,
554 took,
555 });
556
557 Ok(())
558 }
559
560 fn in_syncing_window(&self, header: &ExtendedHeader) -> bool {
561 let syncing_window_start =
562 Time::now()
563 .checked_sub(self.syncing_window)
564 .unwrap_or_else(|| {
565 warn!(
566 "underflow when computing syncing window start, defaulting to unix epoch"
567 );
568 Time::unix_epoch()
569 });
570
571 header.time().after(syncing_window_start)
572 }
573}
574
575fn calculate_range_to_fetch(
578 subjective_head_height: u64,
579 store_headers: &[BlockRange],
580 limit: u64,
581) -> BlockRange {
582 let mut store_headers_iter = store_headers.iter().rev();
583
584 let Some(store_head_range) = store_headers_iter.next() else {
585 let range = 1..=subjective_head_height;
587 return range.truncate_right(limit);
588 };
589
590 if store_head_range.end() < &subjective_head_height {
591 let range = store_head_range.end() + 1..=subjective_head_height;
593 return range.truncate_right(limit);
594 }
595
596 let penultimate_range_end = store_headers_iter.next().map(|r| *r.end()).unwrap_or(0);
598
599 let range = penultimate_range_end + 1..=store_head_range.start().saturating_sub(1);
600 range.truncate_left(limit)
601}
602
603#[instrument(skip_all)]
604async fn try_init_task<S>(
605 p2p: Arc<P2p>,
606 store: Arc<S>,
607 event_pub: EventPublisher,
608) -> Result<(ExtendedHeader, Duration)>
609where
610 S: Store + 'static,
611{
612 let now = Instant::now();
613 let mut event_reported = false;
614 let mut backoff = ExponentialBackoffBuilder::default()
615 .with_max_interval(TRY_INIT_BACKOFF_MAX_INTERVAL)
616 .with_max_elapsed_time(None)
617 .build();
618
619 loop {
620 match try_init(&p2p, &*store, &event_pub, &mut event_reported).await {
621 Ok(network_head) => {
622 return Ok((network_head, now.elapsed()));
623 }
624 Err(e) if e.is_fatal() => {
625 return Err(e);
626 }
627 Err(e) => {
628 let sleep_dur = backoff
629 .next_backoff()
630 .expect("backoff never stops retrying");
631
632 warn!(
633 "Initialization of subjective head failed: {e}. Trying again in {sleep_dur:?}."
634 );
635 sleep(sleep_dur).await;
636 }
637 }
638 }
639}
640
641async fn try_init<S>(
642 p2p: &P2p,
643 store: &S,
644 event_pub: &EventPublisher,
645 event_reported: &mut bool,
646) -> Result<ExtendedHeader>
647where
648 S: Store,
649{
650 p2p.wait_connected_trusted().await?;
651
652 if !*event_reported {
653 event_pub.send(NodeEvent::FetchingHeadHeaderStarted);
654 *event_reported = true;
655 }
656
657 let network_head = p2p.get_head_header().await?;
658
659 let try_insert = match store.get_head().await {
666 Ok(store_head) => store_head.hash() != network_head.hash(),
669 Err(StoreError::NotFound) => true,
670 Err(e) => return Err(e.into()),
671 };
672
673 if try_insert {
674 store.insert(network_head.clone()).await?;
677 }
678
679 Ok(network_head)
680}
681
682async fn header_sub_recv(
683 rx: Option<&mut mpsc::Receiver<ExtendedHeader>>,
684) -> Result<ExtendedHeader> {
685 rx.expect("header-sub not initialized")
686 .recv()
687 .await
688 .ok_or(SyncerError::P2p(P2pError::WorkerDied))
689}
690
691#[cfg(test)]
692mod tests {
693 use std::ops::RangeInclusive;
694
695 use super::*;
696 use crate::block_ranges::{BlockRange, BlockRangeExt};
697 use crate::events::EventChannel;
698 use crate::node::HeaderExError;
699 use crate::node::DEFAULT_SAMPLING_WINDOW;
700 use crate::p2p::header_session;
701 use crate::store::InMemoryStore;
702 use crate::test_utils::{gen_filled_store, MockP2pHandle};
703 use crate::utils::OneshotResultSenderExt;
704 use celestia_types::test_utils::ExtendedHeaderGenerator;
705 use libp2p::request_response::OutboundFailure;
706 use lumina_utils::test_utils::async_test;
707
708 #[test]
709 fn calculate_range_to_fetch_test_header_limit() {
710 let head_height = 1024;
711 let ranges = [256..=512];
712
713 let fetch_range = calculate_range_to_fetch(head_height, &ranges, 16);
714 assert_eq!(fetch_range, 513..=528);
715
716 let fetch_range = calculate_range_to_fetch(head_height, &ranges, 511);
717 assert_eq!(fetch_range, 513..=1023);
718 let fetch_range = calculate_range_to_fetch(head_height, &ranges, 512);
719 assert_eq!(fetch_range, 513..=1024);
720 let fetch_range = calculate_range_to_fetch(head_height, &ranges, 513);
721 assert_eq!(fetch_range, 513..=1024);
722
723 let fetch_range = calculate_range_to_fetch(head_height, &ranges, 1024);
724 assert_eq!(fetch_range, 513..=1024);
725 }
726
727 #[test]
728 fn calculate_range_to_fetch_empty_store() {
729 let fetch_range = calculate_range_to_fetch(1, &[], 100);
730 assert_eq!(fetch_range, 1..=1);
731
732 let fetch_range = calculate_range_to_fetch(100, &[], 10);
733 assert_eq!(fetch_range, 1..=10);
734
735 let fetch_range = calculate_range_to_fetch(100, &[], 50);
736 assert_eq!(fetch_range, 1..=50);
737 }
738
739 #[test]
740 fn calculate_range_to_fetch_fully_synced() {
741 let fetch_range = calculate_range_to_fetch(1, &[1..=1], 100);
742 assert!(fetch_range.is_empty());
743
744 let fetch_range = calculate_range_to_fetch(100, &[1..=100], 10);
745 assert!(fetch_range.is_empty());
746
747 let fetch_range = calculate_range_to_fetch(100, &[1..=100], 10);
748 assert!(fetch_range.is_empty());
749 }
750
751 #[test]
752 fn calculate_range_to_fetch_caught_up() {
753 let head_height = 4000;
754
755 let fetch_range = calculate_range_to_fetch(head_height, &[3000..=4000], 500);
756 assert_eq!(fetch_range, 2500..=2999);
757 let fetch_range = calculate_range_to_fetch(head_height, &[500..=1000, 3000..=4000], 500);
758 assert_eq!(fetch_range, 2500..=2999);
759 let fetch_range = calculate_range_to_fetch(head_height, &[2500..=2800, 3000..=4000], 500);
760 assert_eq!(fetch_range, 2801..=2999);
761 let fetch_range = calculate_range_to_fetch(head_height, &[2500..=2800, 3000..=4000], 500);
762 assert_eq!(fetch_range, 2801..=2999);
763 let fetch_range = calculate_range_to_fetch(head_height, &[300..=4000], 500);
764 assert_eq!(fetch_range, 1..=299);
765 }
766
767 #[test]
768 fn calculate_range_to_fetch_catching_up() {
769 let head_height = 4000;
770
771 let fetch_range = calculate_range_to_fetch(head_height, &[2000..=3000], 500);
772 assert_eq!(fetch_range, 3001..=3500);
773 let fetch_range = calculate_range_to_fetch(head_height, &[2000..=3500], 500);
774 assert_eq!(fetch_range, 3501..=4000);
775 let fetch_range = calculate_range_to_fetch(head_height, &[1..=2998, 3000..=3800], 500);
776 assert_eq!(fetch_range, 3801..=4000);
777 }
778
779 #[async_test]
780 async fn init_without_genesis_hash() {
781 let events = EventChannel::new();
782 let (mock, mut handle) = P2p::mocked();
783 let mut gen = ExtendedHeaderGenerator::new();
784 let header = gen.next();
785
786 let _syncer = Syncer::start(SyncerArgs {
787 p2p: Arc::new(mock),
788 store: Arc::new(InMemoryStore::new()),
789 event_pub: events.publisher(),
790 batch_size: 512,
791 syncing_window: DEFAULT_SAMPLING_WINDOW,
792 })
793 .unwrap();
794
795 handle.expect_no_cmd().await;
797 handle.announce_peer_connected();
798 handle.expect_no_cmd().await;
799 handle.announce_trusted_peer_connected();
800
801 let (height, amount, respond_to) = handle.expect_header_request_for_height_cmd().await;
803 assert_eq!(height, 0);
804 assert_eq!(amount, 1);
805 respond_to.send(Ok(vec![header.clone()])).unwrap();
806
807 let head_from_syncer = handle.expect_init_header_sub().await;
809 assert_eq!(head_from_syncer, header);
810
811 handle.expect_no_cmd().await;
813 }
814
815 #[async_test]
816 async fn init_with_genesis_hash() {
817 let mut gen = ExtendedHeaderGenerator::new();
818 let head = gen.next();
819
820 let (_syncer, _store, mut p2p_mock) = initialized_syncer(head.clone()).await;
821
822 p2p_mock.expect_no_cmd().await;
824 }
825
826 #[async_test]
827 async fn syncing() {
828 let mut gen = ExtendedHeaderGenerator::new();
829 let headers = gen.next_many(1500);
830
831 let (syncer, store, mut p2p_mock) = initialized_syncer(headers[1499].clone()).await;
832 assert_syncing(&syncer, &store, &[1500..=1500], 1500).await;
833
834 handle_session_batch(&mut p2p_mock, &headers, 988..=1499, true).await;
836 assert_syncing(&syncer, &store, &[988..=1500], 1500).await;
837
838 handle_session_batch(&mut p2p_mock, &headers, 476..=987, true).await;
840 assert_syncing(&syncer, &store, &[476..=1500], 1500).await;
841
842 let header1501 = gen.next();
844 p2p_mock.announce_new_head(header1501.clone());
845 assert_syncing(&syncer, &store, &[476..=1501], 1501).await;
848
849 handle_session_batch(&mut p2p_mock, &headers, 1..=475, true).await;
851 assert_syncing(&syncer, &store, &[1..=1501], 1501).await;
852
853 p2p_mock.expect_no_cmd().await;
855
856 let header1502 = gen.next();
858 p2p_mock.announce_new_head(header1502.clone());
859 assert_syncing(&syncer, &store, &[1..=1502], 1502).await;
860 p2p_mock.expect_no_cmd().await;
861
862 let headers_1503_1505 = gen.next_many(3);
864 p2p_mock.announce_new_head(headers_1503_1505[2].clone());
865 assert_syncing(&syncer, &store, &[1..=1502], 1505).await;
866
867 handle_session_batch(&mut p2p_mock, &headers_1503_1505, 1503..=1505, true).await;
869 assert_syncing(&syncer, &store, &[1..=1505], 1505).await;
870
871 let mut headers = gen.next_many(1495);
873 p2p_mock.announce_new_head(headers[1494].clone());
874 assert_syncing(&syncer, &store, &[1..=1505], 3000).await;
875
876 handle_session_batch(&mut p2p_mock, &headers, 1506..=2017, true).await;
878 assert_syncing(&syncer, &store, &[1..=2017], 3000).await;
879
880 headers.push(gen.next());
882 p2p_mock.announce_new_head(headers.last().unwrap().clone());
883 assert_syncing(&syncer, &store, &[1..=2017], 3001).await;
884
885 handle_session_batch(&mut p2p_mock, &headers, 2018..=2529, true).await;
887 assert_syncing(&syncer, &store, &[1..=2529], 3001).await;
888
889 handle_session_batch(&mut p2p_mock, &headers, 2530..=3001, true).await;
891 assert_syncing(&syncer, &store, &[1..=3001], 3001).await;
892
893 p2p_mock.expect_no_cmd().await;
895 }
896
897 #[async_test]
898 async fn window_edge() {
899 let month_and_day_ago = Duration::from_secs(31 * 24 * 60 * 60);
900 let mut gen = ExtendedHeaderGenerator::new();
901 gen.set_time(
902 (Time::now() - month_and_day_ago).expect("to not underflow"),
903 Duration::from_secs(1),
904 );
905 let mut headers = gen.next_many(1200);
906 gen.reset_time();
907 headers.append(&mut gen.next_many(2049 - 1200));
908
909 let (syncer, store, mut p2p_mock) = initialized_syncer(headers[2048].clone()).await;
910 assert_syncing(&syncer, &store, &[2049..=2049], 2049).await;
911
912 handle_session_batch(&mut p2p_mock, &headers, 1537..=2048, true).await;
914 assert_syncing(&syncer, &store, &[1537..=2049], 2049).await;
915
916 handle_session_batch(&mut p2p_mock, &headers, 1025..=1536, true).await;
918 assert_syncing(&syncer, &store, &[1025..=2049], 2049).await;
919
920 p2p_mock.expect_no_cmd().await;
922 }
923
924 #[async_test]
925 async fn start_with_filled_store() {
926 let events = EventChannel::new();
927 let (p2p, mut p2p_mock) = P2p::mocked();
928 let (store, mut gen) = gen_filled_store(25).await;
929 let store = Arc::new(store);
930
931 let mut headers = gen.next_many(520);
932 let network_head = gen.next(); let syncer = Syncer::start(SyncerArgs {
935 p2p: Arc::new(p2p),
936 store: store.clone(),
937 event_pub: events.publisher(),
938 batch_size: 512,
939 syncing_window: DEFAULT_SAMPLING_WINDOW,
940 })
941 .unwrap();
942
943 p2p_mock.announce_trusted_peer_connected();
944
945 let (height, amount, respond_to) = p2p_mock.expect_header_request_for_height_cmd().await;
947 assert_eq!(height, 0);
948 assert_eq!(amount, 1);
949 respond_to.send(Ok(vec![network_head.clone()])).unwrap();
950
951 let head_from_syncer = p2p_mock.expect_init_header_sub().await;
953 assert_eq!(head_from_syncer, network_head);
954
955 assert_syncing(&syncer, &store, &[1..=25, 546..=546], 546).await;
956
957 handle_session_batch(&mut p2p_mock, &headers, 34..=545, true).await;
959 assert_syncing(&syncer, &store, &[1..=25, 34..=546], 546).await;
960
961 let (height, amount, respond_to) = p2p_mock.expect_header_request_for_height_cmd().await;
963 assert_eq!(height, 26);
964 assert_eq!(amount, 8);
965 respond_to
966 .send(Ok(headers.drain(..8).collect()))
967 .map_err(|_| "headers [538, 545]")
968 .unwrap();
969 assert_syncing(&syncer, &store, &[1..=546], 546).await;
970
971 p2p_mock.expect_no_cmd().await;
973 }
974
975 #[async_test]
976 async fn stop_syncer() {
977 let mut gen = ExtendedHeaderGenerator::new();
978 let head = gen.next();
979
980 let (syncer, _store, mut p2p_mock) = initialized_syncer(head.clone()).await;
981
982 p2p_mock.expect_no_cmd().await;
984
985 syncer.stop();
986 sleep(Duration::from_millis(1)).await;
988 assert!(matches!(
989 syncer.info().await.unwrap_err(),
990 SyncerError::WorkerDied
991 ));
992 }
993
994 #[async_test]
995 async fn all_peers_disconnected() {
996 let mut gen = ExtendedHeaderGenerator::new();
997
998 let _gap = gen.next_many(24);
999 let header25 = gen.next();
1000 let _gap = gen.next_many(4);
1001 let header30 = gen.next();
1002 let _gap = gen.next_many(4);
1003 let header35 = gen.next();
1004
1005 let (syncer, store, mut p2p_mock) = initialized_syncer(header30).await;
1007
1008 handle_session_batch(&mut p2p_mock, &[], 1..=29, false).await;
1010
1011 p2p_mock.announce_all_peers_disconnected();
1012 p2p_mock.expect_no_cmd().await;
1014
1015 p2p_mock.announce_peer_connected();
1018 p2p_mock.expect_no_cmd().await;
1019
1020 p2p_mock.announce_trusted_peer_connected();
1022
1023 let (height, amount, respond_to) = p2p_mock.expect_header_request_for_height_cmd().await;
1025 assert_eq!(height, 0);
1026 assert_eq!(amount, 1);
1027
1028 respond_to.send(Ok(vec![header25])).unwrap();
1030 assert_syncing(&syncer, &store, &[30..=30], 30).await;
1031
1032 sleep(Duration::from_secs(1)).await;
1034 let (height, amount, respond_to) = p2p_mock.expect_header_request_for_height_cmd().await;
1035 assert_eq!(height, 0);
1036 assert_eq!(amount, 1);
1037
1038 respond_to.send(Ok(vec![header35.clone()])).unwrap();
1040 assert_syncing(&syncer, &store, &[30..=30, 35..=35], 35).await;
1041
1042 let head_from_syncer = p2p_mock.expect_init_header_sub().await;
1044 assert_eq!(head_from_syncer, header35);
1045
1046 let (height, amount, _respond_to) = p2p_mock.expect_header_request_for_height_cmd().await;
1049 assert_eq!(height, 31);
1050 assert_eq!(amount, 4);
1051
1052 p2p_mock.announce_all_peers_disconnected();
1053 p2p_mock.expect_no_cmd().await;
1054 }
1055
1056 #[async_test]
1057 async fn all_peers_disconnected_and_no_network_head_progress() {
1058 let mut gen = ExtendedHeaderGenerator::new_from_height(30);
1059
1060 let header30 = gen.next();
1061
1062 let (syncer, store, mut p2p_mock) = initialized_syncer(header30.clone()).await;
1064
1065 handle_session_batch(&mut p2p_mock, &[], 1..=29, false).await;
1067
1068 p2p_mock.announce_all_peers_disconnected();
1069 p2p_mock.expect_no_cmd().await;
1071
1072 p2p_mock.announce_peer_connected();
1075 p2p_mock.expect_no_cmd().await;
1076
1077 p2p_mock.announce_trusted_peer_connected();
1079
1080 let (height, amount, respond_to) = p2p_mock.expect_header_request_for_height_cmd().await;
1082 assert_eq!(height, 0);
1083 assert_eq!(amount, 1);
1084
1085 respond_to.send(Ok(vec![header30.clone()])).unwrap();
1087 assert_syncing(&syncer, &store, &[30..=30], 30).await;
1088
1089 let head_from_syncer = p2p_mock.expect_init_header_sub().await;
1091 assert_eq!(head_from_syncer, header30);
1092
1093 handle_session_batch(&mut p2p_mock, &[], 1..=29, false).await;
1095
1096 p2p_mock.announce_all_peers_disconnected();
1097 p2p_mock.expect_no_cmd().await;
1098 }
1099
1100 #[async_test]
1101 async fn non_contiguous_response() {
1102 let mut gen = ExtendedHeaderGenerator::new();
1103 let mut headers = gen.next_many(20);
1104
1105 let (syncer, store, mut p2p_mock) = initialized_syncer(headers[19].clone()).await;
1107
1108 let header10 = headers[10].clone();
1109 headers[10] = headers[11].clone();
1111
1112 handle_session_batch(&mut p2p_mock, &headers, 1..=19, true).await;
1114
1115 assert_syncing(&syncer, &store, &[20..=20], 20).await;
1117
1118 headers[10] = header10;
1120
1121 handle_session_batch(&mut p2p_mock, &headers, 1..=19, true).await;
1123
1124 assert_syncing(&syncer, &store, &[1..=20], 20).await;
1126 }
1127
1128 #[async_test]
1129 async fn another_chain_response() {
1130 let headers = ExtendedHeaderGenerator::new().next_many(20);
1131 let headers_prime = ExtendedHeaderGenerator::new().next_many(20);
1132
1133 let (syncer, store, mut p2p_mock) = initialized_syncer(headers[19].clone()).await;
1135
1136 handle_session_batch(&mut p2p_mock, &headers_prime, 1..=19, true).await;
1138
1139 assert_syncing(&syncer, &store, &[20..=20], 20).await;
1141
1142 handle_session_batch(&mut p2p_mock, &headers, 1..=19, true).await;
1144
1145 assert_syncing(&syncer, &store, &[1..=20], 20).await;
1147 }
1148
1149 async fn assert_syncing(
1150 syncer: &Syncer<InMemoryStore>,
1151 store: &InMemoryStore,
1152 expected_synced_ranges: &[RangeInclusive<u64>],
1153 expected_subjective_head: u64,
1154 ) {
1155 sleep(Duration::from_millis(1)).await;
1158
1159 let store_ranges = store.get_stored_header_ranges().await.unwrap();
1160 let syncing_info = syncer.info().await.unwrap();
1161
1162 assert_eq!(store_ranges.as_ref(), expected_synced_ranges);
1163 assert_eq!(syncing_info.stored_headers.as_ref(), expected_synced_ranges);
1164 assert_eq!(syncing_info.subjective_head, expected_subjective_head);
1165 }
1166
1167 async fn initialized_syncer(
1168 head: ExtendedHeader,
1169 ) -> (Syncer<InMemoryStore>, Arc<InMemoryStore>, MockP2pHandle) {
1170 let events = EventChannel::new();
1171 let (mock, mut handle) = P2p::mocked();
1172 let store = Arc::new(InMemoryStore::new());
1173
1174 let syncer = Syncer::start(SyncerArgs {
1175 p2p: Arc::new(mock),
1176 store: store.clone(),
1177 event_pub: events.publisher(),
1178 batch_size: 512,
1179 syncing_window: DEFAULT_SAMPLING_WINDOW,
1180 })
1181 .unwrap();
1182
1183 handle.expect_no_cmd().await;
1185 handle.announce_peer_connected();
1186 handle.expect_no_cmd().await;
1187 handle.announce_trusted_peer_connected();
1188
1189 let (height, amount, respond_to) = handle.expect_header_request_for_height_cmd().await;
1191 assert_eq!(height, 0);
1192 assert_eq!(amount, 1);
1193 respond_to.send(Ok(vec![head.clone()])).unwrap();
1194
1195 let head_from_syncer = handle.expect_init_header_sub().await;
1197 assert_eq!(head_from_syncer, head);
1198
1199 let head_height = head.height().value();
1200 assert_syncing(&syncer, &store, &[head_height..=head_height], head_height).await;
1201
1202 (syncer, store, handle)
1203 }
1204
1205 async fn handle_session_batch(
1206 p2p_mock: &mut MockP2pHandle,
1207 remaining_headers: &[ExtendedHeader],
1208 range: BlockRange,
1209 respond: bool,
1210 ) {
1211 range.validate().unwrap();
1212
1213 let mut ranges_to_request = BlockRanges::new();
1214 ranges_to_request.insert_relaxed(&range).unwrap();
1215
1216 let mut no_respond_chans = Vec::new();
1217
1218 for _ in 0..requests_in_session(range.len()) {
1219 let (height, amount, respond_to) =
1220 p2p_mock.expect_header_request_for_height_cmd().await;
1221
1222 let requested_range = height..=height + amount - 1;
1223 ranges_to_request.remove_strict(requested_range);
1224
1225 if respond {
1226 let header_index = remaining_headers
1227 .iter()
1228 .position(|h| h.height().value() == height)
1229 .expect("height not found in provided headers");
1230
1231 let response_range =
1232 remaining_headers[header_index..header_index + amount as usize].to_vec();
1233 respond_to
1234 .send(Ok(response_range))
1235 .map_err(|_| format!("headers [{}, {}]", height, height + amount - 1))
1236 .unwrap();
1237 } else {
1238 no_respond_chans.push(respond_to);
1239 }
1240 }
1241
1242 if !respond {
1246 spawn(async move {
1247 sleep(Duration::from_secs(10)).await;
1248
1249 for respond_chan in no_respond_chans {
1250 respond_chan.maybe_send_err(P2pError::HeaderEx(
1251 HeaderExError::OutboundFailure(OutboundFailure::Timeout),
1252 ));
1253 }
1254 });
1255 }
1256
1257 assert!(
1258 ranges_to_request.is_empty(),
1259 "Some headers weren't requested. expected range: {}, not requested: {}",
1260 range.display(),
1261 ranges_to_request
1262 );
1263 }
1264
1265 fn requests_in_session(headers: u64) -> usize {
1266 let max_requests = headers.div_ceil(header_session::MAX_AMOUNT_PER_REQ) as usize;
1267 let min_requests = headers.div_ceil(header_session::MIN_AMOUNT_PER_REQ) as usize;
1268
1269 if max_requests > header_session::MAX_CONCURRENT_REQS {
1270 max_requests
1272 } else {
1273 header_session::MAX_CONCURRENT_REQS.min(min_requests)
1275 }
1276 }
1277
1278 impl BlockRanges {
1279 fn remove_strict(&mut self, range: BlockRange) {
1280 for stored in self.as_ref() {
1281 if stored.contains(range.start()) && stored.contains(range.end()) {
1282 self.remove_relaxed(range).unwrap();
1283 return;
1284 }
1285 }
1286
1287 panic!("block ranges ({self}) don't contain {}", range.display());
1288 }
1289 }
1290}