1use std::marker::PhantomData;
13use std::pin::pin;
14use std::sync::Arc;
15use std::time::Duration;
16
17use backoff::backoff::Backoff;
18use backoff::ExponentialBackoffBuilder;
19use celestia_types::ExtendedHeader;
20use lumina_utils::executor::{spawn, JoinHandle};
21use lumina_utils::time::{sleep, Interval};
22use serde::{Deserialize, Serialize};
23use tendermint::Time;
24use tokio::select;
25use tokio::sync::{mpsc, oneshot};
26use tokio_util::sync::CancellationToken;
27use tracing::{debug, error, info, instrument, warn};
28use web_time::Instant;
29
30use crate::block_ranges::{BlockRange, BlockRangeExt, BlockRanges};
31use crate::events::{EventPublisher, NodeEvent};
32use crate::p2p::{P2p, P2pError};
33use crate::store::{Store, StoreError};
34use crate::utils::{FusedReusableFuture, OneshotSenderExt};
35
36type Result<T, E = SyncerError> = std::result::Result<T, E>;
37
38const TRY_INIT_BACKOFF_MAX_INTERVAL: Duration = Duration::from_secs(60);
39
40#[derive(Debug, thiserror::Error)]
42pub enum SyncerError {
43 #[error("P2p: {0}")]
45 P2p(#[from] P2pError),
46
47 #[error("Store: {0}")]
49 Store(#[from] StoreError),
50
51 #[error("Worker died")]
53 WorkerDied,
54
55 #[error("Channel closed unexpectedly")]
57 ChannelClosedUnexpectedly,
58}
59
60impl SyncerError {
61 pub(crate) fn is_fatal(&self) -> bool {
62 match self {
63 SyncerError::P2p(e) => e.is_fatal(),
64 SyncerError::Store(e) => e.is_fatal(),
65 SyncerError::WorkerDied | SyncerError::ChannelClosedUnexpectedly => true,
66 }
67 }
68}
69
70impl From<oneshot::error::RecvError> for SyncerError {
71 fn from(_value: oneshot::error::RecvError) -> Self {
72 SyncerError::ChannelClosedUnexpectedly
73 }
74}
75
76#[derive(Debug)]
78pub(crate) struct Syncer<S>
79where
80 S: Store + 'static,
81{
82 cmd_tx: mpsc::Sender<SyncerCmd>,
83 cancellation_token: CancellationToken,
84 join_handle: JoinHandle,
85 _store: PhantomData<S>,
86}
87
88pub(crate) struct SyncerArgs<S>
90where
91 S: Store + 'static,
92{
93 pub(crate) p2p: Arc<P2p>,
95 pub(crate) store: Arc<S>,
97 pub(crate) event_pub: EventPublisher,
99 pub(crate) batch_size: u64,
101 pub(crate) syncing_window: Duration,
103}
104
105#[derive(Debug)]
106enum SyncerCmd {
107 GetInfo {
108 respond_to: oneshot::Sender<SyncingInfo>,
109 },
110}
111
112#[derive(Debug, Serialize, Deserialize)]
114pub struct SyncingInfo {
115 pub stored_headers: BlockRanges,
117 pub subjective_head: u64,
119}
120
121impl<S> Syncer<S>
122where
123 S: Store,
124{
125 pub(crate) fn start(args: SyncerArgs<S>) -> Result<Self> {
127 let cancellation_token = CancellationToken::new();
128 let event_pub = args.event_pub.clone();
129 let (cmd_tx, cmd_rx) = mpsc::channel(16);
130 let mut worker = Worker::new(args, cancellation_token.child_token(), cmd_rx)?;
131
132 let join_handle = spawn(async move {
133 if let Err(e) = worker.run().await {
134 error!("Syncer stopped because of a fatal error: {e}");
135
136 event_pub.send(NodeEvent::FatalSyncerError {
137 error: e.to_string(),
138 });
139 }
140 });
141
142 Ok(Syncer {
143 cancellation_token,
144 cmd_tx,
145 join_handle,
146 _store: PhantomData,
147 })
148 }
149
150 pub(crate) fn stop(&self) {
152 self.cancellation_token.cancel();
154 }
155
156 pub(crate) async fn join(&self) {
158 self.join_handle.join().await;
159 }
160
161 async fn send_command(&self, cmd: SyncerCmd) -> Result<()> {
162 self.cmd_tx
163 .send(cmd)
164 .await
165 .map_err(|_| SyncerError::WorkerDied)
166 }
167
168 pub(crate) async fn info(&self) -> Result<SyncingInfo> {
174 let (tx, rx) = oneshot::channel();
175
176 self.send_command(SyncerCmd::GetInfo { respond_to: tx })
177 .await?;
178
179 Ok(rx.await?)
180 }
181}
182
183impl<S> Drop for Syncer<S>
184where
185 S: Store,
186{
187 fn drop(&mut self) {
188 self.stop();
189 }
190}
191
192struct Worker<S>
193where
194 S: Store + 'static,
195{
196 cancellation_token: CancellationToken,
197 cmd_rx: mpsc::Receiver<SyncerCmd>,
198 event_pub: EventPublisher,
199 p2p: Arc<P2p>,
200 store: Arc<S>,
201 header_sub_rx: Option<mpsc::Receiver<ExtendedHeader>>,
202 subjective_head_height: Option<u64>,
203 batch_size: u64,
204 ongoing_batch: Ongoing,
205 syncing_window: Duration,
206}
207
208struct Ongoing {
209 range: Option<BlockRange>,
210 task: FusedReusableFuture<(Result<Vec<ExtendedHeader>, P2pError>, Duration)>,
211}
212
213impl<S> Worker<S>
214where
215 S: Store,
216{
217 fn new(
218 args: SyncerArgs<S>,
219 cancellation_token: CancellationToken,
220 cmd_rx: mpsc::Receiver<SyncerCmd>,
221 ) -> Result<Self> {
222 Ok(Worker {
223 cancellation_token,
224 cmd_rx,
225 event_pub: args.event_pub,
226 p2p: args.p2p,
227 store: args.store,
228 header_sub_rx: None,
229 subjective_head_height: None,
230 batch_size: args.batch_size,
231 ongoing_batch: Ongoing {
232 range: None,
233 task: FusedReusableFuture::terminated(),
234 },
235 syncing_window: args.syncing_window,
236 })
237 }
238
239 async fn run(&mut self) -> Result<()> {
240 loop {
241 if self.cancellation_token.is_cancelled() {
242 break;
243 }
244
245 self.connecting_event_loop().await?;
246
247 if self.cancellation_token.is_cancelled() {
248 break;
249 }
250
251 self.connected_event_loop().await?;
252 }
253
254 debug!("Syncer stopped");
255 Ok(())
256 }
257
258 async fn connecting_event_loop(&mut self) -> Result<()> {
263 debug!("Entering connecting_event_loop");
264
265 let mut report_interval = Interval::new(Duration::from_secs(60)).await;
266 self.report().await?;
267
268 let mut try_init_fut = pin!(try_init_task(
269 self.p2p.clone(),
270 self.store.clone(),
271 self.event_pub.clone()
272 ));
273
274 loop {
275 select! {
276 _ = self.cancellation_token.cancelled() => {
277 break;
278 }
279 _ = report_interval.tick() => {
280 self.report().await?;
281 }
282 res = &mut try_init_fut => {
283 let (network_head, took) = res?;
285 let network_head_height = network_head.height().value();
286
287 info!("Setting initial subjective head to {network_head_height}");
288 self.set_subjective_head_height(network_head_height);
289
290 let (header_sub_tx, header_sub_rx) = mpsc::channel(16);
291 self.p2p.init_header_sub(network_head, header_sub_tx).await?;
292 self.header_sub_rx = Some(header_sub_rx);
293
294 self.event_pub.send(NodeEvent::FetchingHeadHeaderFinished {
295 height: network_head_height,
296 took,
297 });
298
299 break;
300 }
301 Some(cmd) = self.cmd_rx.recv() => {
302 self.on_cmd(cmd).await?;
303 }
304 }
305 }
306
307 Ok(())
308 }
309
310 async fn connected_event_loop(&mut self) -> Result<()> {
315 debug!("Entering connected_event_loop");
316
317 let mut report_interval = Interval::new(Duration::from_secs(60)).await;
318 let mut peer_tracker_info_watcher = self.p2p.peer_tracker_info_watcher();
319
320 if peer_tracker_info_watcher.borrow().num_connected_peers == 0 {
322 warn!("All peers disconnected");
323 return Ok(());
324 }
325
326 self.fetch_next_batch().await?;
327 self.report().await?;
328
329 loop {
330 select! {
331 _ = self.cancellation_token.cancelled() => {
332 break;
333 }
334 _ = peer_tracker_info_watcher.changed() => {
335 if peer_tracker_info_watcher.borrow().num_connected_peers == 0 {
336 warn!("All peers disconnected");
337 break;
338 }
339 }
340 _ = report_interval.tick() => {
341 self.report().await?;
342 }
343 res = header_sub_recv(self.header_sub_rx.as_mut()) => {
344 let header = res?;
345 self.on_header_sub_message(header).await?;
346 self.fetch_next_batch().await?;
347 }
348 Some(cmd) = self.cmd_rx.recv() => {
349 self.on_cmd(cmd).await?;
350 }
351 (res, took) = &mut self.ongoing_batch.task => {
352 self.on_fetch_next_batch_result(res, took).await?;
353 self.fetch_next_batch().await?;
354 }
355 }
356 }
357
358 if let Some(ongoing) = self.ongoing_batch.range.take() {
359 warn!("Cancelling fetching of {}", ongoing.display());
360 self.ongoing_batch.task.terminate();
361 }
362
363 self.header_sub_rx.take();
364
365 Ok(())
366 }
367
368 async fn syncing_info(&self) -> Result<SyncingInfo> {
369 Ok(SyncingInfo {
370 stored_headers: self.store.get_stored_header_ranges().await?,
371 subjective_head: self.subjective_head_height.unwrap_or(0),
372 })
373 }
374
375 #[instrument(skip_all)]
376 async fn report(&mut self) -> Result<()> {
377 let SyncingInfo {
378 stored_headers,
379 subjective_head,
380 } = self.syncing_info().await?;
381
382 let ongoing_batch = self
383 .ongoing_batch
384 .range
385 .as_ref()
386 .map(|range| format!("{}", range.display()))
387 .unwrap_or_else(|| "None".to_string());
388
389 info!("syncing: head: {subjective_head}, stored headers: {stored_headers}, ongoing batches: {ongoing_batch}");
390 Ok(())
391 }
392
393 async fn on_cmd(&mut self, cmd: SyncerCmd) -> Result<()> {
394 match cmd {
395 SyncerCmd::GetInfo { respond_to } => {
396 let info = self.syncing_info().await?;
397 respond_to.maybe_send(info);
398 }
399 }
400
401 Ok(())
402 }
403
404 #[instrument(skip_all)]
405 async fn on_header_sub_message(&mut self, new_head: ExtendedHeader) -> Result<()> {
406 let new_head_height = new_head.height().value();
407
408 self.set_subjective_head_height(new_head_height);
409
410 if let Ok(store_head_height) = self.store.head_height().await {
411 if store_head_height + 1 == new_head_height {
413 if self.store.insert(new_head).await.is_ok() {
416 self.event_pub.send(NodeEvent::AddedHeaderFromHeaderSub {
417 height: new_head_height,
418 });
419 }
420 }
421 }
422
423 Ok(())
424 }
425
426 fn set_subjective_head_height(&mut self, height: u64) {
427 if let Some(old_height) = self.subjective_head_height {
428 if height <= old_height {
429 return;
430 }
431 }
432
433 self.subjective_head_height = Some(height);
434 }
435
436 #[instrument(skip_all)]
437 async fn fetch_next_batch(&mut self) -> Result<()> {
438 debug_assert_eq!(
439 self.ongoing_batch.range.is_none(),
440 self.ongoing_batch.task.is_terminated()
441 );
442
443 if !self.ongoing_batch.task.is_terminated() {
444 return Ok(());
450 }
451
452 if self.p2p.peer_tracker_info().num_connected_peers == 0 {
453 return Ok(());
456 }
457
458 let Some(subjective_head_height) = self.subjective_head_height else {
459 return Ok(());
461 };
462
463 let store_ranges = self.store.get_stored_header_ranges().await?;
464
465 let next_batch = calculate_range_to_fetch(
466 subjective_head_height,
467 store_ranges.as_ref(),
468 self.batch_size,
469 );
470
471 if next_batch.is_empty() {
472 return Ok(());
474 }
475
476 match self.store.get_by_height(next_batch.end() + 1).await {
478 Ok(known_header) => {
479 if !self.in_syncing_window(&known_header) {
480 return Ok(());
481 }
482 }
483 Err(StoreError::NotFound) => {}
484 Err(e) => return Err(e.into()),
485 }
486
487 self.event_pub.send(NodeEvent::FetchingHeadersStarted {
488 from_height: *next_batch.start(),
489 to_height: *next_batch.end(),
490 });
491
492 let p2p = self.p2p.clone();
493
494 self.ongoing_batch.range = Some(next_batch.clone());
495
496 self.ongoing_batch.task.set(async move {
497 let now = Instant::now();
498 let res = p2p.get_unverified_header_range(next_batch).await;
499 (res, now.elapsed())
500 });
501
502 Ok(())
503 }
504
505 #[instrument(skip_all)]
507 async fn on_fetch_next_batch_result(
508 &mut self,
509 res: Result<Vec<ExtendedHeader>, P2pError>,
510 took: Duration,
511 ) -> Result<()> {
512 let range = self
513 .ongoing_batch
514 .range
515 .take()
516 .expect("ongoing_batch not initialized correctly");
517
518 let from_height = *range.start();
519 let to_height = *range.end();
520
521 let headers = match res {
522 Ok(headers) => headers,
523 Err(e) => {
524 if e.is_fatal() {
525 return Err(e.into());
526 }
527
528 self.event_pub.send(NodeEvent::FetchingHeadersFailed {
529 from_height,
530 to_height,
531 error: e.to_string(),
532 took,
533 });
534
535 return Ok(());
536 }
537 };
538
539 if let Err(e) = self.store.insert(headers).await {
540 if e.is_fatal() {
541 return Err(e.into());
542 }
543
544 self.event_pub.send(NodeEvent::FetchingHeadersFailed {
545 from_height,
546 to_height,
547 error: format!("Failed to store headers: {e}"),
548 took,
549 });
550 }
551
552 self.event_pub.send(NodeEvent::FetchingHeadersFinished {
553 from_height,
554 to_height,
555 took,
556 });
557
558 Ok(())
559 }
560
561 fn in_syncing_window(&self, header: &ExtendedHeader) -> bool {
562 let syncing_window_start =
563 Time::now()
564 .checked_sub(self.syncing_window)
565 .unwrap_or_else(|| {
566 warn!(
567 "underflow when computing syncing window start, defaulting to unix epoch"
568 );
569 Time::unix_epoch()
570 });
571
572 header.time().after(syncing_window_start)
573 }
574}
575
576fn calculate_range_to_fetch(
579 subjective_head_height: u64,
580 store_headers: &[BlockRange],
581 limit: u64,
582) -> BlockRange {
583 let mut store_headers_iter = store_headers.iter().rev();
584
585 let Some(store_head_range) = store_headers_iter.next() else {
586 let range = 1..=subjective_head_height;
588 return range.truncate_right(limit);
589 };
590
591 if store_head_range.end() < &subjective_head_height {
592 let range = store_head_range.end() + 1..=subjective_head_height;
594 return range.truncate_right(limit);
595 }
596
597 let penultimate_range_end = store_headers_iter.next().map(|r| *r.end()).unwrap_or(0);
599
600 let range = penultimate_range_end + 1..=store_head_range.start().saturating_sub(1);
601 range.truncate_left(limit)
602}
603
604#[instrument(skip_all)]
605async fn try_init_task<S>(
606 p2p: Arc<P2p>,
607 store: Arc<S>,
608 event_pub: EventPublisher,
609) -> Result<(ExtendedHeader, Duration)>
610where
611 S: Store + 'static,
612{
613 let now = Instant::now();
614 let mut event_reported = false;
615 let mut backoff = ExponentialBackoffBuilder::default()
616 .with_max_interval(TRY_INIT_BACKOFF_MAX_INTERVAL)
617 .with_max_elapsed_time(None)
618 .build();
619
620 loop {
621 match try_init(&p2p, &*store, &event_pub, &mut event_reported).await {
622 Ok(network_head) => {
623 return Ok((network_head, now.elapsed()));
624 }
625 Err(e) if e.is_fatal() => {
626 return Err(e);
627 }
628 Err(e) => {
629 let sleep_dur = backoff
630 .next_backoff()
631 .expect("backoff never stops retrying");
632
633 warn!(
634 "Initialization of subjective head failed: {e}. Trying again in {sleep_dur:?}."
635 );
636 sleep(sleep_dur).await;
637 }
638 }
639 }
640}
641
642async fn try_init<S>(
643 p2p: &P2p,
644 store: &S,
645 event_pub: &EventPublisher,
646 event_reported: &mut bool,
647) -> Result<ExtendedHeader>
648where
649 S: Store,
650{
651 p2p.wait_connected_trusted().await?;
652
653 if !*event_reported {
654 event_pub.send(NodeEvent::FetchingHeadHeaderStarted);
655 *event_reported = true;
656 }
657
658 let network_head = p2p.get_head_header().await?;
659
660 let try_insert = match store.get_head().await {
667 Ok(store_head) => store_head.hash() != network_head.hash(),
670 Err(StoreError::NotFound) => true,
671 Err(e) => return Err(e.into()),
672 };
673
674 if try_insert {
675 store.insert(network_head.clone()).await?;
678 }
679
680 Ok(network_head)
681}
682
683async fn header_sub_recv(
684 rx: Option<&mut mpsc::Receiver<ExtendedHeader>>,
685) -> Result<ExtendedHeader> {
686 rx.expect("header-sub not initialized")
687 .recv()
688 .await
689 .ok_or(SyncerError::P2p(P2pError::WorkerDied))
690}
691
692#[cfg(test)]
693mod tests {
694 use std::ops::RangeInclusive;
695
696 use super::*;
697 use crate::block_ranges::{BlockRange, BlockRangeExt};
698 use crate::events::EventChannel;
699 use crate::node::HeaderExError;
700 use crate::node::DEFAULT_SAMPLING_WINDOW;
701 use crate::p2p::header_session;
702 use crate::store::InMemoryStore;
703 use crate::test_utils::{gen_filled_store, MockP2pHandle};
704 use crate::utils::OneshotResultSenderExt;
705 use celestia_types::test_utils::ExtendedHeaderGenerator;
706 use libp2p::request_response::OutboundFailure;
707 use lumina_utils::test_utils::async_test;
708
709 #[test]
710 fn calculate_range_to_fetch_test_header_limit() {
711 let head_height = 1024;
712 let ranges = [256..=512];
713
714 let fetch_range = calculate_range_to_fetch(head_height, &ranges, 16);
715 assert_eq!(fetch_range, 513..=528);
716
717 let fetch_range = calculate_range_to_fetch(head_height, &ranges, 511);
718 assert_eq!(fetch_range, 513..=1023);
719 let fetch_range = calculate_range_to_fetch(head_height, &ranges, 512);
720 assert_eq!(fetch_range, 513..=1024);
721 let fetch_range = calculate_range_to_fetch(head_height, &ranges, 513);
722 assert_eq!(fetch_range, 513..=1024);
723
724 let fetch_range = calculate_range_to_fetch(head_height, &ranges, 1024);
725 assert_eq!(fetch_range, 513..=1024);
726 }
727
728 #[test]
729 fn calculate_range_to_fetch_empty_store() {
730 let fetch_range = calculate_range_to_fetch(1, &[], 100);
731 assert_eq!(fetch_range, 1..=1);
732
733 let fetch_range = calculate_range_to_fetch(100, &[], 10);
734 assert_eq!(fetch_range, 1..=10);
735
736 let fetch_range = calculate_range_to_fetch(100, &[], 50);
737 assert_eq!(fetch_range, 1..=50);
738 }
739
740 #[test]
741 fn calculate_range_to_fetch_fully_synced() {
742 let fetch_range = calculate_range_to_fetch(1, &[1..=1], 100);
743 assert!(fetch_range.is_empty());
744
745 let fetch_range = calculate_range_to_fetch(100, &[1..=100], 10);
746 assert!(fetch_range.is_empty());
747
748 let fetch_range = calculate_range_to_fetch(100, &[1..=100], 10);
749 assert!(fetch_range.is_empty());
750 }
751
752 #[test]
753 fn calculate_range_to_fetch_caught_up() {
754 let head_height = 4000;
755
756 let fetch_range = calculate_range_to_fetch(head_height, &[3000..=4000], 500);
757 assert_eq!(fetch_range, 2500..=2999);
758 let fetch_range = calculate_range_to_fetch(head_height, &[500..=1000, 3000..=4000], 500);
759 assert_eq!(fetch_range, 2500..=2999);
760 let fetch_range = calculate_range_to_fetch(head_height, &[2500..=2800, 3000..=4000], 500);
761 assert_eq!(fetch_range, 2801..=2999);
762 let fetch_range = calculate_range_to_fetch(head_height, &[2500..=2800, 3000..=4000], 500);
763 assert_eq!(fetch_range, 2801..=2999);
764 let fetch_range = calculate_range_to_fetch(head_height, &[300..=4000], 500);
765 assert_eq!(fetch_range, 1..=299);
766 }
767
768 #[test]
769 fn calculate_range_to_fetch_catching_up() {
770 let head_height = 4000;
771
772 let fetch_range = calculate_range_to_fetch(head_height, &[2000..=3000], 500);
773 assert_eq!(fetch_range, 3001..=3500);
774 let fetch_range = calculate_range_to_fetch(head_height, &[2000..=3500], 500);
775 assert_eq!(fetch_range, 3501..=4000);
776 let fetch_range = calculate_range_to_fetch(head_height, &[1..=2998, 3000..=3800], 500);
777 assert_eq!(fetch_range, 3801..=4000);
778 }
779
780 #[async_test]
781 async fn init_without_genesis_hash() {
782 let events = EventChannel::new();
783 let (mock, mut handle) = P2p::mocked();
784 let mut gen = ExtendedHeaderGenerator::new();
785 let header = gen.next();
786
787 let _syncer = Syncer::start(SyncerArgs {
788 p2p: Arc::new(mock),
789 store: Arc::new(InMemoryStore::new()),
790 event_pub: events.publisher(),
791 batch_size: 512,
792 syncing_window: DEFAULT_SAMPLING_WINDOW,
793 })
794 .unwrap();
795
796 handle.expect_no_cmd().await;
798 handle.announce_peer_connected();
799 handle.expect_no_cmd().await;
800 handle.announce_trusted_peer_connected();
801
802 let (height, amount, respond_to) = handle.expect_header_request_for_height_cmd().await;
804 assert_eq!(height, 0);
805 assert_eq!(amount, 1);
806 respond_to.send(Ok(vec![header.clone()])).unwrap();
807
808 let head_from_syncer = handle.expect_init_header_sub().await;
810 assert_eq!(head_from_syncer, header);
811
812 handle.expect_no_cmd().await;
814 }
815
816 #[async_test]
817 async fn init_with_genesis_hash() {
818 let mut gen = ExtendedHeaderGenerator::new();
819 let head = gen.next();
820
821 let (_syncer, _store, mut p2p_mock) = initialized_syncer(head.clone()).await;
822
823 p2p_mock.expect_no_cmd().await;
825 }
826
827 #[async_test]
828 async fn syncing() {
829 let mut gen = ExtendedHeaderGenerator::new();
830 let headers = gen.next_many(1500);
831
832 let (syncer, store, mut p2p_mock) = initialized_syncer(headers[1499].clone()).await;
833 assert_syncing(&syncer, &store, &[1500..=1500], 1500).await;
834
835 handle_session_batch(&mut p2p_mock, &headers, 988..=1499, true).await;
837 assert_syncing(&syncer, &store, &[988..=1500], 1500).await;
838
839 handle_session_batch(&mut p2p_mock, &headers, 476..=987, true).await;
841 assert_syncing(&syncer, &store, &[476..=1500], 1500).await;
842
843 let header1501 = gen.next();
845 p2p_mock.announce_new_head(header1501.clone());
846 assert_syncing(&syncer, &store, &[476..=1501], 1501).await;
849
850 handle_session_batch(&mut p2p_mock, &headers, 1..=475, true).await;
852 assert_syncing(&syncer, &store, &[1..=1501], 1501).await;
853
854 p2p_mock.expect_no_cmd().await;
856
857 let header1502 = gen.next();
859 p2p_mock.announce_new_head(header1502.clone());
860 assert_syncing(&syncer, &store, &[1..=1502], 1502).await;
861 p2p_mock.expect_no_cmd().await;
862
863 let headers_1503_1505 = gen.next_many(3);
865 p2p_mock.announce_new_head(headers_1503_1505[2].clone());
866 assert_syncing(&syncer, &store, &[1..=1502], 1505).await;
867
868 handle_session_batch(&mut p2p_mock, &headers_1503_1505, 1503..=1505, true).await;
870 assert_syncing(&syncer, &store, &[1..=1505], 1505).await;
871
872 let mut headers = gen.next_many(1495);
874 p2p_mock.announce_new_head(headers[1494].clone());
875 assert_syncing(&syncer, &store, &[1..=1505], 3000).await;
876
877 handle_session_batch(&mut p2p_mock, &headers, 1506..=2017, true).await;
879 assert_syncing(&syncer, &store, &[1..=2017], 3000).await;
880
881 headers.push(gen.next());
883 p2p_mock.announce_new_head(headers.last().unwrap().clone());
884 assert_syncing(&syncer, &store, &[1..=2017], 3001).await;
885
886 handle_session_batch(&mut p2p_mock, &headers, 2018..=2529, true).await;
888 assert_syncing(&syncer, &store, &[1..=2529], 3001).await;
889
890 handle_session_batch(&mut p2p_mock, &headers, 2530..=3001, true).await;
892 assert_syncing(&syncer, &store, &[1..=3001], 3001).await;
893
894 p2p_mock.expect_no_cmd().await;
896 }
897
898 #[async_test]
899 async fn window_edge() {
900 let month_and_day_ago = Duration::from_secs(31 * 24 * 60 * 60);
901 let mut gen = ExtendedHeaderGenerator::new();
902 gen.set_time(
903 (Time::now() - month_and_day_ago).expect("to not underflow"),
904 Duration::from_secs(1),
905 );
906 let mut headers = gen.next_many(1200);
907 gen.reset_time();
908 headers.append(&mut gen.next_many(2049 - 1200));
909
910 let (syncer, store, mut p2p_mock) = initialized_syncer(headers[2048].clone()).await;
911 assert_syncing(&syncer, &store, &[2049..=2049], 2049).await;
912
913 handle_session_batch(&mut p2p_mock, &headers, 1537..=2048, true).await;
915 assert_syncing(&syncer, &store, &[1537..=2049], 2049).await;
916
917 handle_session_batch(&mut p2p_mock, &headers, 1025..=1536, true).await;
919 assert_syncing(&syncer, &store, &[1025..=2049], 2049).await;
920
921 p2p_mock.expect_no_cmd().await;
923 }
924
925 #[async_test]
926 async fn start_with_filled_store() {
927 let events = EventChannel::new();
928 let (p2p, mut p2p_mock) = P2p::mocked();
929 let (store, mut gen) = gen_filled_store(25).await;
930 let store = Arc::new(store);
931
932 let mut headers = gen.next_many(520);
933 let network_head = gen.next(); let syncer = Syncer::start(SyncerArgs {
936 p2p: Arc::new(p2p),
937 store: store.clone(),
938 event_pub: events.publisher(),
939 batch_size: 512,
940 syncing_window: DEFAULT_SAMPLING_WINDOW,
941 })
942 .unwrap();
943
944 p2p_mock.announce_trusted_peer_connected();
945
946 let (height, amount, respond_to) = p2p_mock.expect_header_request_for_height_cmd().await;
948 assert_eq!(height, 0);
949 assert_eq!(amount, 1);
950 respond_to.send(Ok(vec![network_head.clone()])).unwrap();
951
952 let head_from_syncer = p2p_mock.expect_init_header_sub().await;
954 assert_eq!(head_from_syncer, network_head);
955
956 assert_syncing(&syncer, &store, &[1..=25, 546..=546], 546).await;
957
958 handle_session_batch(&mut p2p_mock, &headers, 34..=545, true).await;
960 assert_syncing(&syncer, &store, &[1..=25, 34..=546], 546).await;
961
962 let (height, amount, respond_to) = p2p_mock.expect_header_request_for_height_cmd().await;
964 assert_eq!(height, 26);
965 assert_eq!(amount, 8);
966 respond_to
967 .send(Ok(headers.drain(..8).collect()))
968 .map_err(|_| "headers [538, 545]")
969 .unwrap();
970 assert_syncing(&syncer, &store, &[1..=546], 546).await;
971
972 p2p_mock.expect_no_cmd().await;
974 }
975
976 #[async_test]
977 async fn stop_syncer() {
978 let mut gen = ExtendedHeaderGenerator::new();
979 let head = gen.next();
980
981 let (syncer, _store, mut p2p_mock) = initialized_syncer(head.clone()).await;
982
983 p2p_mock.expect_no_cmd().await;
985
986 syncer.stop();
987 sleep(Duration::from_millis(1)).await;
989 assert!(matches!(
990 syncer.info().await.unwrap_err(),
991 SyncerError::WorkerDied
992 ));
993 }
994
995 #[async_test]
996 async fn all_peers_disconnected() {
997 let mut gen = ExtendedHeaderGenerator::new();
998
999 let _gap = gen.next_many(24);
1000 let header25 = gen.next();
1001 let _gap = gen.next_many(4);
1002 let header30 = gen.next();
1003 let _gap = gen.next_many(4);
1004 let header35 = gen.next();
1005
1006 let (syncer, store, mut p2p_mock) = initialized_syncer(header30).await;
1008
1009 handle_session_batch(&mut p2p_mock, &[], 1..=29, false).await;
1011
1012 p2p_mock.announce_all_peers_disconnected();
1013 p2p_mock.expect_no_cmd().await;
1015
1016 p2p_mock.announce_peer_connected();
1019 p2p_mock.expect_no_cmd().await;
1020
1021 p2p_mock.announce_trusted_peer_connected();
1023
1024 let (height, amount, respond_to) = p2p_mock.expect_header_request_for_height_cmd().await;
1026 assert_eq!(height, 0);
1027 assert_eq!(amount, 1);
1028
1029 respond_to.send(Ok(vec![header25])).unwrap();
1031 assert_syncing(&syncer, &store, &[30..=30], 30).await;
1032
1033 sleep(Duration::from_secs(1)).await;
1035 let (height, amount, respond_to) = p2p_mock.expect_header_request_for_height_cmd().await;
1036 assert_eq!(height, 0);
1037 assert_eq!(amount, 1);
1038
1039 respond_to.send(Ok(vec![header35.clone()])).unwrap();
1041 assert_syncing(&syncer, &store, &[30..=30, 35..=35], 35).await;
1042
1043 let head_from_syncer = p2p_mock.expect_init_header_sub().await;
1045 assert_eq!(head_from_syncer, header35);
1046
1047 let (height, amount, _respond_to) = p2p_mock.expect_header_request_for_height_cmd().await;
1050 assert_eq!(height, 31);
1051 assert_eq!(amount, 4);
1052
1053 p2p_mock.announce_all_peers_disconnected();
1054 p2p_mock.expect_no_cmd().await;
1055 }
1056
1057 #[async_test]
1058 async fn all_peers_disconnected_and_no_network_head_progress() {
1059 let mut gen = ExtendedHeaderGenerator::new_from_height(30);
1060
1061 let header30 = gen.next();
1062
1063 let (syncer, store, mut p2p_mock) = initialized_syncer(header30.clone()).await;
1065
1066 handle_session_batch(&mut p2p_mock, &[], 1..=29, false).await;
1068
1069 p2p_mock.announce_all_peers_disconnected();
1070 p2p_mock.expect_no_cmd().await;
1072
1073 p2p_mock.announce_peer_connected();
1076 p2p_mock.expect_no_cmd().await;
1077
1078 p2p_mock.announce_trusted_peer_connected();
1080
1081 let (height, amount, respond_to) = p2p_mock.expect_header_request_for_height_cmd().await;
1083 assert_eq!(height, 0);
1084 assert_eq!(amount, 1);
1085
1086 respond_to.send(Ok(vec![header30.clone()])).unwrap();
1088 assert_syncing(&syncer, &store, &[30..=30], 30).await;
1089
1090 let head_from_syncer = p2p_mock.expect_init_header_sub().await;
1092 assert_eq!(head_from_syncer, header30);
1093
1094 handle_session_batch(&mut p2p_mock, &[], 1..=29, false).await;
1096
1097 p2p_mock.announce_all_peers_disconnected();
1098 p2p_mock.expect_no_cmd().await;
1099 }
1100
1101 #[async_test]
1102 async fn non_contiguous_response() {
1103 let mut gen = ExtendedHeaderGenerator::new();
1104 let mut headers = gen.next_many(20);
1105
1106 let (syncer, store, mut p2p_mock) = initialized_syncer(headers[19].clone()).await;
1108
1109 let header10 = headers[10].clone();
1110 headers[10] = headers[11].clone();
1112
1113 handle_session_batch(&mut p2p_mock, &headers, 1..=19, true).await;
1115
1116 assert_syncing(&syncer, &store, &[20..=20], 20).await;
1118
1119 headers[10] = header10;
1121
1122 handle_session_batch(&mut p2p_mock, &headers, 1..=19, true).await;
1124
1125 assert_syncing(&syncer, &store, &[1..=20], 20).await;
1127 }
1128
1129 #[async_test]
1130 async fn another_chain_response() {
1131 let headers = ExtendedHeaderGenerator::new().next_many(20);
1132 let headers_prime = ExtendedHeaderGenerator::new().next_many(20);
1133
1134 let (syncer, store, mut p2p_mock) = initialized_syncer(headers[19].clone()).await;
1136
1137 handle_session_batch(&mut p2p_mock, &headers_prime, 1..=19, true).await;
1139
1140 assert_syncing(&syncer, &store, &[20..=20], 20).await;
1142
1143 handle_session_batch(&mut p2p_mock, &headers, 1..=19, true).await;
1145
1146 assert_syncing(&syncer, &store, &[1..=20], 20).await;
1148 }
1149
1150 async fn assert_syncing(
1151 syncer: &Syncer<InMemoryStore>,
1152 store: &InMemoryStore,
1153 expected_synced_ranges: &[RangeInclusive<u64>],
1154 expected_subjective_head: u64,
1155 ) {
1156 sleep(Duration::from_millis(1)).await;
1159
1160 let store_ranges = store.get_stored_header_ranges().await.unwrap();
1161 let syncing_info = syncer.info().await.unwrap();
1162
1163 assert_eq!(store_ranges.as_ref(), expected_synced_ranges);
1164 assert_eq!(syncing_info.stored_headers.as_ref(), expected_synced_ranges);
1165 assert_eq!(syncing_info.subjective_head, expected_subjective_head);
1166 }
1167
1168 async fn initialized_syncer(
1169 head: ExtendedHeader,
1170 ) -> (Syncer<InMemoryStore>, Arc<InMemoryStore>, MockP2pHandle) {
1171 let events = EventChannel::new();
1172 let (mock, mut handle) = P2p::mocked();
1173 let store = Arc::new(InMemoryStore::new());
1174
1175 let syncer = Syncer::start(SyncerArgs {
1176 p2p: Arc::new(mock),
1177 store: store.clone(),
1178 event_pub: events.publisher(),
1179 batch_size: 512,
1180 syncing_window: DEFAULT_SAMPLING_WINDOW,
1181 })
1182 .unwrap();
1183
1184 handle.expect_no_cmd().await;
1186 handle.announce_peer_connected();
1187 handle.expect_no_cmd().await;
1188 handle.announce_trusted_peer_connected();
1189
1190 let (height, amount, respond_to) = handle.expect_header_request_for_height_cmd().await;
1192 assert_eq!(height, 0);
1193 assert_eq!(amount, 1);
1194 respond_to.send(Ok(vec![head.clone()])).unwrap();
1195
1196 let head_from_syncer = handle.expect_init_header_sub().await;
1198 assert_eq!(head_from_syncer, head);
1199
1200 let head_height = head.height().value();
1201 assert_syncing(&syncer, &store, &[head_height..=head_height], head_height).await;
1202
1203 (syncer, store, handle)
1204 }
1205
1206 async fn handle_session_batch(
1207 p2p_mock: &mut MockP2pHandle,
1208 remaining_headers: &[ExtendedHeader],
1209 range: BlockRange,
1210 respond: bool,
1211 ) {
1212 range.validate().unwrap();
1213
1214 let mut ranges_to_request = BlockRanges::new();
1215 ranges_to_request.insert_relaxed(&range).unwrap();
1216
1217 let mut no_respond_chans = Vec::new();
1218
1219 for _ in 0..requests_in_session(range.len()) {
1220 let (height, amount, respond_to) =
1221 p2p_mock.expect_header_request_for_height_cmd().await;
1222
1223 let requested_range = height..=height + amount - 1;
1224 ranges_to_request.remove_strict(requested_range);
1225
1226 if respond {
1227 let header_index = remaining_headers
1228 .iter()
1229 .position(|h| h.height().value() == height)
1230 .expect("height not found in provided headers");
1231
1232 let response_range =
1233 remaining_headers[header_index..header_index + amount as usize].to_vec();
1234 respond_to
1235 .send(Ok(response_range))
1236 .map_err(|_| format!("headers [{}, {}]", height, height + amount - 1))
1237 .unwrap();
1238 } else {
1239 no_respond_chans.push(respond_to);
1240 }
1241 }
1242
1243 if !respond {
1247 spawn(async move {
1248 sleep(Duration::from_secs(10)).await;
1249
1250 for respond_chan in no_respond_chans {
1251 respond_chan.maybe_send_err(P2pError::HeaderEx(
1252 HeaderExError::OutboundFailure(OutboundFailure::Timeout),
1253 ));
1254 }
1255 });
1256 }
1257
1258 assert!(
1259 ranges_to_request.is_empty(),
1260 "Some headers weren't requested. expected range: {}, not requested: {}",
1261 range.display(),
1262 ranges_to_request
1263 );
1264 }
1265
1266 fn requests_in_session(headers: u64) -> usize {
1267 let max_requests = headers.div_ceil(header_session::MAX_AMOUNT_PER_REQ) as usize;
1268 let min_requests = headers.div_ceil(header_session::MIN_AMOUNT_PER_REQ) as usize;
1269
1270 if max_requests > header_session::MAX_CONCURRENT_REQS {
1271 max_requests
1273 } else {
1274 header_session::MAX_CONCURRENT_REQS.min(min_requests)
1276 }
1277 }
1278
1279 impl BlockRanges {
1280 fn remove_strict(&mut self, range: BlockRange) {
1281 for stored in self.as_ref() {
1282 if stored.contains(range.start()) && stored.contains(range.end()) {
1283 self.remove_relaxed(range).unwrap();
1284 return;
1285 }
1286 }
1287
1288 panic!("block ranges ({self}) don't contain {}", range.display());
1289 }
1290 }
1291}