lumina_node/
syncer.rs

1//! Component responsible for synchronizing block headers announced in the Celestia network.
2//!
3//! It starts by asking the trusted peers for their current head headers and picks
4//! the latest header returned by at least two of them as the initial synchronization target
5//! called `subjective_head`.
6//!
7//! Then it starts synchronizing from the genesis header up to the target requesting headers
8//! on the `header-ex` p2p protocol. In the meantime, it constantly checks for the latest
9//! headers announced on the `header-sub` p2p protocol to keep the `subjective_head` as close
10//! to the `network_head` as possible.
11
12use std::marker::PhantomData;
13use std::pin::pin;
14use std::sync::Arc;
15use std::time::Duration;
16
17use backoff::backoff::Backoff;
18use backoff::ExponentialBackoffBuilder;
19use celestia_types::ExtendedHeader;
20use lumina_utils::executor::{spawn, JoinHandle};
21use lumina_utils::time::{sleep, Interval};
22use serde::{Deserialize, Serialize};
23use tendermint::Time;
24use tokio::select;
25use tokio::sync::{mpsc, oneshot};
26use tokio_util::sync::CancellationToken;
27use tracing::{debug, error, info, instrument, warn};
28use web_time::Instant;
29
30use crate::block_ranges::{BlockRange, BlockRangeExt, BlockRanges};
31use crate::events::{EventPublisher, NodeEvent};
32use crate::p2p::{P2p, P2pError};
33use crate::store::{Store, StoreError};
34use crate::utils::{FusedReusableFuture, OneshotSenderExt};
35
36type Result<T, E = SyncerError> = std::result::Result<T, E>;
37
38const TRY_INIT_BACKOFF_MAX_INTERVAL: Duration = Duration::from_secs(60);
39
40/// Representation of all the errors that can occur in `Syncer` component.
41#[derive(Debug, thiserror::Error)]
42pub enum SyncerError {
43    /// An error propagated from the `P2p` component.
44    #[error("P2p: {0}")]
45    P2p(#[from] P2pError),
46
47    /// An error propagated from the [`Store`] component.
48    #[error("Store: {0}")]
49    Store(#[from] StoreError),
50
51    /// The worker has died.
52    #[error("Worker died")]
53    WorkerDied,
54
55    /// Channel closed unexpectedly.
56    #[error("Channel closed unexpectedly")]
57    ChannelClosedUnexpectedly,
58}
59
60impl SyncerError {
61    pub(crate) fn is_fatal(&self) -> bool {
62        match self {
63            SyncerError::P2p(e) => e.is_fatal(),
64            SyncerError::Store(e) => e.is_fatal(),
65            SyncerError::WorkerDied | SyncerError::ChannelClosedUnexpectedly => true,
66        }
67    }
68}
69
70impl From<oneshot::error::RecvError> for SyncerError {
71    fn from(_value: oneshot::error::RecvError) -> Self {
72        SyncerError::ChannelClosedUnexpectedly
73    }
74}
75
76/// Component responsible for synchronizing block headers from the network.
77#[derive(Debug)]
78pub(crate) struct Syncer<S>
79where
80    S: Store + 'static,
81{
82    cmd_tx: mpsc::Sender<SyncerCmd>,
83    cancellation_token: CancellationToken,
84    join_handle: JoinHandle,
85    _store: PhantomData<S>,
86}
87
88/// Arguments used to configure the [`Syncer`].
89pub(crate) struct SyncerArgs<S>
90where
91    S: Store + 'static,
92{
93    /// Handler for the peer to peer messaging.
94    pub(crate) p2p: Arc<P2p>,
95    /// Headers storage.
96    pub(crate) store: Arc<S>,
97    /// Event publisher.
98    pub(crate) event_pub: EventPublisher,
99    /// Batch size.
100    pub(crate) batch_size: u64,
101    /// Syncing window
102    pub(crate) syncing_window: Duration,
103}
104
105#[derive(Debug)]
106enum SyncerCmd {
107    GetInfo {
108        respond_to: oneshot::Sender<SyncingInfo>,
109    },
110}
111
112/// Status of the synchronization.
113#[derive(Debug, Serialize, Deserialize)]
114pub struct SyncingInfo {
115    /// Ranges of headers that are already synchronised
116    pub stored_headers: BlockRanges,
117    /// Syncing target. The latest height seen in the network that was successfully verified.
118    pub subjective_head: u64,
119}
120
121impl<S> Syncer<S>
122where
123    S: Store,
124{
125    /// Create and start the [`Syncer`].
126    pub(crate) fn start(args: SyncerArgs<S>) -> Result<Self> {
127        let cancellation_token = CancellationToken::new();
128        let event_pub = args.event_pub.clone();
129        let (cmd_tx, cmd_rx) = mpsc::channel(16);
130        let mut worker = Worker::new(args, cancellation_token.child_token(), cmd_rx)?;
131
132        let join_handle = spawn(async move {
133            if let Err(e) = worker.run().await {
134                error!("Syncer stopped because of a fatal error: {e}");
135
136                event_pub.send(NodeEvent::FatalSyncerError {
137                    error: e.to_string(),
138                });
139            }
140        });
141
142        Ok(Syncer {
143            cancellation_token,
144            cmd_tx,
145            join_handle,
146            _store: PhantomData,
147        })
148    }
149
150    /// Stop the worker.
151    pub(crate) fn stop(&self) {
152        // Singal the Worker to stop.
153        self.cancellation_token.cancel();
154    }
155
156    /// Wait until worker is completely stopped.
157    pub(crate) async fn join(&self) {
158        self.join_handle.join().await;
159    }
160
161    async fn send_command(&self, cmd: SyncerCmd) -> Result<()> {
162        self.cmd_tx
163            .send(cmd)
164            .await
165            .map_err(|_| SyncerError::WorkerDied)
166    }
167
168    /// Get the current synchronization status.
169    ///
170    /// # Errors
171    ///
172    /// This function will return an error if the [`Syncer`] has been stopped.
173    pub(crate) async fn info(&self) -> Result<SyncingInfo> {
174        let (tx, rx) = oneshot::channel();
175
176        self.send_command(SyncerCmd::GetInfo { respond_to: tx })
177            .await?;
178
179        Ok(rx.await?)
180    }
181}
182
183impl<S> Drop for Syncer<S>
184where
185    S: Store,
186{
187    fn drop(&mut self) {
188        self.stop();
189    }
190}
191
192struct Worker<S>
193where
194    S: Store + 'static,
195{
196    cancellation_token: CancellationToken,
197    cmd_rx: mpsc::Receiver<SyncerCmd>,
198    event_pub: EventPublisher,
199    p2p: Arc<P2p>,
200    store: Arc<S>,
201    header_sub_rx: Option<mpsc::Receiver<ExtendedHeader>>,
202    subjective_head_height: Option<u64>,
203    batch_size: u64,
204    ongoing_batch: Ongoing,
205    syncing_window: Duration,
206}
207
208struct Ongoing {
209    range: Option<BlockRange>,
210    task: FusedReusableFuture<(Result<Vec<ExtendedHeader>, P2pError>, Duration)>,
211}
212
213impl<S> Worker<S>
214where
215    S: Store,
216{
217    fn new(
218        args: SyncerArgs<S>,
219        cancellation_token: CancellationToken,
220        cmd_rx: mpsc::Receiver<SyncerCmd>,
221    ) -> Result<Self> {
222        Ok(Worker {
223            cancellation_token,
224            cmd_rx,
225            event_pub: args.event_pub,
226            p2p: args.p2p,
227            store: args.store,
228            header_sub_rx: None,
229            subjective_head_height: None,
230            batch_size: args.batch_size,
231            ongoing_batch: Ongoing {
232                range: None,
233                task: FusedReusableFuture::terminated(),
234            },
235            syncing_window: args.syncing_window,
236        })
237    }
238
239    async fn run(&mut self) -> Result<()> {
240        loop {
241            if self.cancellation_token.is_cancelled() {
242                break;
243            }
244
245            self.connecting_event_loop().await?;
246
247            if self.cancellation_token.is_cancelled() {
248                break;
249            }
250
251            self.connected_event_loop().await?;
252        }
253
254        debug!("Syncer stopped");
255        Ok(())
256    }
257
258    /// The responsibility of this event loop is to await a trusted peer to
259    /// connect and get the network head, while accepting commands.
260    ///
261    /// NOTE: Only fatal errors should be propagated!
262    async fn connecting_event_loop(&mut self) -> Result<()> {
263        debug!("Entering connecting_event_loop");
264
265        let mut report_interval = Interval::new(Duration::from_secs(60)).await;
266        self.report().await?;
267
268        let mut try_init_fut = pin!(try_init_task(
269            self.p2p.clone(),
270            self.store.clone(),
271            self.event_pub.clone()
272        ));
273
274        loop {
275            select! {
276                _ = self.cancellation_token.cancelled() => {
277                    break;
278                }
279                _ = report_interval.tick() => {
280                    self.report().await?;
281                }
282                res = &mut try_init_fut => {
283                    // `try_init_task` propagates only fatal errors
284                    let (network_head, took) = res?;
285                    let network_head_height = network_head.height().value();
286
287                    info!("Setting initial subjective head to {network_head_height}");
288                    self.set_subjective_head_height(network_head_height);
289
290                    let (header_sub_tx, header_sub_rx) = mpsc::channel(16);
291                    self.p2p.init_header_sub(network_head, header_sub_tx).await?;
292                    self.header_sub_rx = Some(header_sub_rx);
293
294                    self.event_pub.send(NodeEvent::FetchingHeadHeaderFinished {
295                        height: network_head_height,
296                        took,
297                    });
298
299                    break;
300                }
301                Some(cmd) = self.cmd_rx.recv() => {
302                    self.on_cmd(cmd).await?;
303                }
304            }
305        }
306
307        Ok(())
308    }
309
310    /// The reponsibility of this event loop is to start the syncing process,
311    /// handles events from HeaderSub, and accept commands.
312    ///
313    /// NOTE: Only fatal errors should be propagated!
314    async fn connected_event_loop(&mut self) -> Result<()> {
315        debug!("Entering connected_event_loop");
316
317        let mut report_interval = Interval::new(Duration::from_secs(60)).await;
318        let mut peer_tracker_info_watcher = self.p2p.peer_tracker_info_watcher();
319
320        // Check if connection status changed before creating the watcher
321        if peer_tracker_info_watcher.borrow().num_connected_peers == 0 {
322            warn!("All peers disconnected");
323            return Ok(());
324        }
325
326        self.fetch_next_batch().await?;
327        self.report().await?;
328
329        loop {
330            select! {
331                _ = self.cancellation_token.cancelled() => {
332                    break;
333                }
334                _ = peer_tracker_info_watcher.changed() => {
335                    if peer_tracker_info_watcher.borrow().num_connected_peers == 0 {
336                        warn!("All peers disconnected");
337                        break;
338                    }
339                }
340                _ = report_interval.tick() => {
341                    self.report().await?;
342                }
343                res = header_sub_recv(self.header_sub_rx.as_mut()) => {
344                    let header = res?;
345                    self.on_header_sub_message(header).await?;
346                    self.fetch_next_batch().await?;
347                }
348                Some(cmd) = self.cmd_rx.recv() => {
349                    self.on_cmd(cmd).await?;
350                }
351                (res, took) = &mut self.ongoing_batch.task => {
352                    self.on_fetch_next_batch_result(res, took).await?;
353                    self.fetch_next_batch().await?;
354                }
355            }
356        }
357
358        if let Some(ongoing) = self.ongoing_batch.range.take() {
359            warn!("Cancelling fetching of {}", ongoing.display());
360            self.ongoing_batch.task.terminate();
361        }
362
363        self.header_sub_rx.take();
364
365        Ok(())
366    }
367
368    async fn syncing_info(&self) -> Result<SyncingInfo> {
369        Ok(SyncingInfo {
370            stored_headers: self.store.get_stored_header_ranges().await?,
371            subjective_head: self.subjective_head_height.unwrap_or(0),
372        })
373    }
374
375    #[instrument(skip_all)]
376    async fn report(&mut self) -> Result<()> {
377        let SyncingInfo {
378            stored_headers,
379            subjective_head,
380        } = self.syncing_info().await?;
381
382        let ongoing_batch = self
383            .ongoing_batch
384            .range
385            .as_ref()
386            .map(|range| format!("{}", range.display()))
387            .unwrap_or_else(|| "None".to_string());
388
389        info!("syncing: head: {subjective_head}, stored headers: {stored_headers}, ongoing batches: {ongoing_batch}");
390        Ok(())
391    }
392
393    async fn on_cmd(&mut self, cmd: SyncerCmd) -> Result<()> {
394        match cmd {
395            SyncerCmd::GetInfo { respond_to } => {
396                let info = self.syncing_info().await?;
397                respond_to.maybe_send(info);
398            }
399        }
400
401        Ok(())
402    }
403
404    #[instrument(skip_all)]
405    async fn on_header_sub_message(&mut self, new_head: ExtendedHeader) -> Result<()> {
406        let new_head_height = new_head.height().value();
407
408        self.set_subjective_head_height(new_head_height);
409
410        if let Ok(store_head_height) = self.store.head_height().await {
411            // If our new header is adjacent to the HEAD of the store
412            if store_head_height + 1 == new_head_height {
413                // Header is already verified by HeaderSub and will be validated against previous
414                // head on insert
415                if self.store.insert(new_head).await.is_ok() {
416                    self.event_pub.send(NodeEvent::AddedHeaderFromHeaderSub {
417                        height: new_head_height,
418                    });
419                }
420            }
421        }
422
423        Ok(())
424    }
425
426    fn set_subjective_head_height(&mut self, height: u64) {
427        if let Some(old_height) = self.subjective_head_height {
428            if height <= old_height {
429                return;
430            }
431        }
432
433        self.subjective_head_height = Some(height);
434    }
435
436    #[instrument(skip_all)]
437    async fn fetch_next_batch(&mut self) -> Result<()> {
438        debug_assert_eq!(
439            self.ongoing_batch.range.is_none(),
440            self.ongoing_batch.task.is_terminated()
441        );
442
443        if !self.ongoing_batch.task.is_terminated() {
444            // Another batch is ongoing. We do not parallelize `Syncer`
445            // by design. Any parallel requests are done in the
446            // HeaderEx client through `Session`.
447            //
448            // Nothing to schedule
449            return Ok(());
450        }
451
452        if self.p2p.peer_tracker_info().num_connected_peers == 0 {
453            // No connected peers. We can't do the request.
454            // We will recover from this in `run`.
455            return Ok(());
456        }
457
458        let Some(subjective_head_height) = self.subjective_head_height else {
459            // Nothing to schedule
460            return Ok(());
461        };
462
463        let store_ranges = self.store.get_stored_header_ranges().await?;
464
465        let next_batch = calculate_range_to_fetch(
466            subjective_head_height,
467            store_ranges.as_ref(),
468            self.batch_size,
469        );
470
471        if next_batch.is_empty() {
472            // no headers to fetch
473            return Ok(());
474        }
475
476        // make sure we're inside the syncing window before we start
477        match self.store.get_by_height(next_batch.end() + 1).await {
478            Ok(known_header) => {
479                if !self.in_syncing_window(&known_header) {
480                    return Ok(());
481                }
482            }
483            Err(StoreError::NotFound) => {}
484            Err(e) => return Err(e.into()),
485        }
486
487        self.event_pub.send(NodeEvent::FetchingHeadersStarted {
488            from_height: *next_batch.start(),
489            to_height: *next_batch.end(),
490        });
491
492        let p2p = self.p2p.clone();
493
494        self.ongoing_batch.range = Some(next_batch.clone());
495
496        self.ongoing_batch.task.set(async move {
497            let now = Instant::now();
498            let res = p2p.get_unverified_header_range(next_batch).await;
499            (res, now.elapsed())
500        });
501
502        Ok(())
503    }
504
505    /// Handle the result of the batch request and propagate fatal errors.
506    #[instrument(skip_all)]
507    async fn on_fetch_next_batch_result(
508        &mut self,
509        res: Result<Vec<ExtendedHeader>, P2pError>,
510        took: Duration,
511    ) -> Result<()> {
512        let range = self
513            .ongoing_batch
514            .range
515            .take()
516            .expect("ongoing_batch not initialized correctly");
517
518        let from_height = *range.start();
519        let to_height = *range.end();
520
521        let headers = match res {
522            Ok(headers) => headers,
523            Err(e) => {
524                if e.is_fatal() {
525                    return Err(e.into());
526                }
527
528                self.event_pub.send(NodeEvent::FetchingHeadersFailed {
529                    from_height,
530                    to_height,
531                    error: e.to_string(),
532                    took,
533                });
534
535                return Ok(());
536            }
537        };
538
539        if let Err(e) = self.store.insert(headers).await {
540            if e.is_fatal() {
541                return Err(e.into());
542            }
543
544            self.event_pub.send(NodeEvent::FetchingHeadersFailed {
545                from_height,
546                to_height,
547                error: format!("Failed to store headers: {e}"),
548                took,
549            });
550        }
551
552        self.event_pub.send(NodeEvent::FetchingHeadersFinished {
553            from_height,
554            to_height,
555            took,
556        });
557
558        Ok(())
559    }
560
561    fn in_syncing_window(&self, header: &ExtendedHeader) -> bool {
562        let syncing_window_start =
563            Time::now()
564                .checked_sub(self.syncing_window)
565                .unwrap_or_else(|| {
566                    warn!(
567                        "underflow when computing syncing window start, defaulting to unix epoch"
568                    );
569                    Time::unix_epoch()
570                });
571
572        header.time().after(syncing_window_start)
573    }
574}
575
576/// based on the stored headers and current network head height, calculate range of headers that
577/// should be fetched from the network, anchored on already existing header range in store
578fn calculate_range_to_fetch(
579    subjective_head_height: u64,
580    store_headers: &[BlockRange],
581    limit: u64,
582) -> BlockRange {
583    let mut store_headers_iter = store_headers.iter().rev();
584
585    let Some(store_head_range) = store_headers_iter.next() else {
586        // empty store, we're missing everything
587        let range = 1..=subjective_head_height;
588        return range.truncate_right(limit);
589    };
590
591    if store_head_range.end() < &subjective_head_height {
592        // if we haven't caught up with the network head, start from there
593        let range = store_head_range.end() + 1..=subjective_head_height;
594        return range.truncate_right(limit);
595    }
596
597    // there exists a range contiguous with network head. inspect previous range end
598    let penultimate_range_end = store_headers_iter.next().map(|r| *r.end()).unwrap_or(0);
599
600    let range = penultimate_range_end + 1..=store_head_range.start().saturating_sub(1);
601    range.truncate_left(limit)
602}
603
604#[instrument(skip_all)]
605async fn try_init_task<S>(
606    p2p: Arc<P2p>,
607    store: Arc<S>,
608    event_pub: EventPublisher,
609) -> Result<(ExtendedHeader, Duration)>
610where
611    S: Store + 'static,
612{
613    let now = Instant::now();
614    let mut event_reported = false;
615    let mut backoff = ExponentialBackoffBuilder::default()
616        .with_max_interval(TRY_INIT_BACKOFF_MAX_INTERVAL)
617        .with_max_elapsed_time(None)
618        .build();
619
620    loop {
621        match try_init(&p2p, &*store, &event_pub, &mut event_reported).await {
622            Ok(network_head) => {
623                return Ok((network_head, now.elapsed()));
624            }
625            Err(e) if e.is_fatal() => {
626                return Err(e);
627            }
628            Err(e) => {
629                let sleep_dur = backoff
630                    .next_backoff()
631                    .expect("backoff never stops retrying");
632
633                warn!(
634                    "Initialization of subjective head failed: {e}. Trying again in {sleep_dur:?}."
635                );
636                sleep(sleep_dur).await;
637            }
638        }
639    }
640}
641
642async fn try_init<S>(
643    p2p: &P2p,
644    store: &S,
645    event_pub: &EventPublisher,
646    event_reported: &mut bool,
647) -> Result<ExtendedHeader>
648where
649    S: Store,
650{
651    p2p.wait_connected_trusted().await?;
652
653    if !*event_reported {
654        event_pub.send(NodeEvent::FetchingHeadHeaderStarted);
655        *event_reported = true;
656    }
657
658    let network_head = p2p.get_head_header().await?;
659
660    // If the network head and the store head have the same height,
661    // then `insert` will error because of insertion contraints.
662    // However, if both headers are the exactly the same, we
663    // can skip inserting, as the header is already there.
664    //
665    // This can happen in case of fast node restart.
666    let try_insert = match store.get_head().await {
667        // `ExtendedHeader.commit.signatures` can be different set on each fetch
668        // so we compare only hashes.
669        Ok(store_head) => store_head.hash() != network_head.hash(),
670        Err(StoreError::NotFound) => true,
671        Err(e) => return Err(e.into()),
672    };
673
674    if try_insert {
675        // Insert HEAD to the store and initialize header-sub.
676        // Normal insertion checks still apply here.
677        store.insert(network_head.clone()).await?;
678    }
679
680    Ok(network_head)
681}
682
683async fn header_sub_recv(
684    rx: Option<&mut mpsc::Receiver<ExtendedHeader>>,
685) -> Result<ExtendedHeader> {
686    rx.expect("header-sub not initialized")
687        .recv()
688        .await
689        .ok_or(SyncerError::P2p(P2pError::WorkerDied))
690}
691
692#[cfg(test)]
693mod tests {
694    use std::ops::RangeInclusive;
695
696    use super::*;
697    use crate::block_ranges::{BlockRange, BlockRangeExt};
698    use crate::events::EventChannel;
699    use crate::node::HeaderExError;
700    use crate::node::DEFAULT_SAMPLING_WINDOW;
701    use crate::p2p::header_session;
702    use crate::store::InMemoryStore;
703    use crate::test_utils::{gen_filled_store, MockP2pHandle};
704    use crate::utils::OneshotResultSenderExt;
705    use celestia_types::test_utils::ExtendedHeaderGenerator;
706    use libp2p::request_response::OutboundFailure;
707    use lumina_utils::test_utils::async_test;
708
709    #[test]
710    fn calculate_range_to_fetch_test_header_limit() {
711        let head_height = 1024;
712        let ranges = [256..=512];
713
714        let fetch_range = calculate_range_to_fetch(head_height, &ranges, 16);
715        assert_eq!(fetch_range, 513..=528);
716
717        let fetch_range = calculate_range_to_fetch(head_height, &ranges, 511);
718        assert_eq!(fetch_range, 513..=1023);
719        let fetch_range = calculate_range_to_fetch(head_height, &ranges, 512);
720        assert_eq!(fetch_range, 513..=1024);
721        let fetch_range = calculate_range_to_fetch(head_height, &ranges, 513);
722        assert_eq!(fetch_range, 513..=1024);
723
724        let fetch_range = calculate_range_to_fetch(head_height, &ranges, 1024);
725        assert_eq!(fetch_range, 513..=1024);
726    }
727
728    #[test]
729    fn calculate_range_to_fetch_empty_store() {
730        let fetch_range = calculate_range_to_fetch(1, &[], 100);
731        assert_eq!(fetch_range, 1..=1);
732
733        let fetch_range = calculate_range_to_fetch(100, &[], 10);
734        assert_eq!(fetch_range, 1..=10);
735
736        let fetch_range = calculate_range_to_fetch(100, &[], 50);
737        assert_eq!(fetch_range, 1..=50);
738    }
739
740    #[test]
741    fn calculate_range_to_fetch_fully_synced() {
742        let fetch_range = calculate_range_to_fetch(1, &[1..=1], 100);
743        assert!(fetch_range.is_empty());
744
745        let fetch_range = calculate_range_to_fetch(100, &[1..=100], 10);
746        assert!(fetch_range.is_empty());
747
748        let fetch_range = calculate_range_to_fetch(100, &[1..=100], 10);
749        assert!(fetch_range.is_empty());
750    }
751
752    #[test]
753    fn calculate_range_to_fetch_caught_up() {
754        let head_height = 4000;
755
756        let fetch_range = calculate_range_to_fetch(head_height, &[3000..=4000], 500);
757        assert_eq!(fetch_range, 2500..=2999);
758        let fetch_range = calculate_range_to_fetch(head_height, &[500..=1000, 3000..=4000], 500);
759        assert_eq!(fetch_range, 2500..=2999);
760        let fetch_range = calculate_range_to_fetch(head_height, &[2500..=2800, 3000..=4000], 500);
761        assert_eq!(fetch_range, 2801..=2999);
762        let fetch_range = calculate_range_to_fetch(head_height, &[2500..=2800, 3000..=4000], 500);
763        assert_eq!(fetch_range, 2801..=2999);
764        let fetch_range = calculate_range_to_fetch(head_height, &[300..=4000], 500);
765        assert_eq!(fetch_range, 1..=299);
766    }
767
768    #[test]
769    fn calculate_range_to_fetch_catching_up() {
770        let head_height = 4000;
771
772        let fetch_range = calculate_range_to_fetch(head_height, &[2000..=3000], 500);
773        assert_eq!(fetch_range, 3001..=3500);
774        let fetch_range = calculate_range_to_fetch(head_height, &[2000..=3500], 500);
775        assert_eq!(fetch_range, 3501..=4000);
776        let fetch_range = calculate_range_to_fetch(head_height, &[1..=2998, 3000..=3800], 500);
777        assert_eq!(fetch_range, 3801..=4000);
778    }
779
780    #[async_test]
781    async fn init_without_genesis_hash() {
782        let events = EventChannel::new();
783        let (mock, mut handle) = P2p::mocked();
784        let mut gen = ExtendedHeaderGenerator::new();
785        let header = gen.next();
786
787        let _syncer = Syncer::start(SyncerArgs {
788            p2p: Arc::new(mock),
789            store: Arc::new(InMemoryStore::new()),
790            event_pub: events.publisher(),
791            batch_size: 512,
792            syncing_window: DEFAULT_SAMPLING_WINDOW,
793        })
794        .unwrap();
795
796        // Syncer is waiting for a trusted peer to connect
797        handle.expect_no_cmd().await;
798        handle.announce_peer_connected();
799        handle.expect_no_cmd().await;
800        handle.announce_trusted_peer_connected();
801
802        // We're syncing from the front, so ask for head first
803        let (height, amount, respond_to) = handle.expect_header_request_for_height_cmd().await;
804        assert_eq!(height, 0);
805        assert_eq!(amount, 1);
806        respond_to.send(Ok(vec![header.clone()])).unwrap();
807
808        // Now Syncer initializes HeaderSub with the latest HEAD
809        let head_from_syncer = handle.expect_init_header_sub().await;
810        assert_eq!(head_from_syncer, header);
811
812        // network head = local head, so nothing else is produced.
813        handle.expect_no_cmd().await;
814    }
815
816    #[async_test]
817    async fn init_with_genesis_hash() {
818        let mut gen = ExtendedHeaderGenerator::new();
819        let head = gen.next();
820
821        let (_syncer, _store, mut p2p_mock) = initialized_syncer(head.clone()).await;
822
823        // network head = local head, so nothing else is produced.
824        p2p_mock.expect_no_cmd().await;
825    }
826
827    #[async_test]
828    async fn syncing() {
829        let mut gen = ExtendedHeaderGenerator::new();
830        let headers = gen.next_many(1500);
831
832        let (syncer, store, mut p2p_mock) = initialized_syncer(headers[1499].clone()).await;
833        assert_syncing(&syncer, &store, &[1500..=1500], 1500).await;
834
835        // Syncer is syncing backwards from the network head (batch 1)
836        handle_session_batch(&mut p2p_mock, &headers, 988..=1499, true).await;
837        assert_syncing(&syncer, &store, &[988..=1500], 1500).await;
838
839        // Syncer is syncing backwards from the network head (batch 2)
840        handle_session_batch(&mut p2p_mock, &headers, 476..=987, true).await;
841        assert_syncing(&syncer, &store, &[476..=1500], 1500).await;
842
843        // New HEAD was received by HeaderSub (height 1501)
844        let header1501 = gen.next();
845        p2p_mock.announce_new_head(header1501.clone());
846        // Height 1501 is adjacent to the last header of Store, so it is appended
847        // immediately
848        assert_syncing(&syncer, &store, &[476..=1501], 1501).await;
849
850        // Syncer is syncing backwards from the network head (batch 3, partial)
851        handle_session_batch(&mut p2p_mock, &headers, 1..=475, true).await;
852        assert_syncing(&syncer, &store, &[1..=1501], 1501).await;
853
854        // Syncer is fulling synced and awaiting for events
855        p2p_mock.expect_no_cmd().await;
856
857        // New HEAD was received by HeaderSub (height 1502), it should be appended immediately
858        let header1502 = gen.next();
859        p2p_mock.announce_new_head(header1502.clone());
860        assert_syncing(&syncer, &store, &[1..=1502], 1502).await;
861        p2p_mock.expect_no_cmd().await;
862
863        // New HEAD was received by HeaderSub (height 1505), it should NOT be appended
864        let headers_1503_1505 = gen.next_many(3);
865        p2p_mock.announce_new_head(headers_1503_1505[2].clone());
866        assert_syncing(&syncer, &store, &[1..=1502], 1505).await;
867
868        // New HEAD is not adjacent to store, so Syncer requests a range
869        handle_session_batch(&mut p2p_mock, &headers_1503_1505, 1503..=1505, true).await;
870        assert_syncing(&syncer, &store, &[1..=1505], 1505).await;
871
872        // New HEAD was received by HeaderSub (height 3000), it should NOT be appended
873        let mut headers = gen.next_many(1495);
874        p2p_mock.announce_new_head(headers[1494].clone());
875        assert_syncing(&syncer, &store, &[1..=1505], 3000).await;
876
877        // Syncer is syncing forwards, anchored on a range already in store (batch 1)
878        handle_session_batch(&mut p2p_mock, &headers, 1506..=2017, true).await;
879        assert_syncing(&syncer, &store, &[1..=2017], 3000).await;
880
881        // New head from header sub added, should NOT be appended
882        headers.push(gen.next());
883        p2p_mock.announce_new_head(headers.last().unwrap().clone());
884        assert_syncing(&syncer, &store, &[1..=2017], 3001).await;
885
886        // Syncer continues syncing forwads (batch 2)
887        handle_session_batch(&mut p2p_mock, &headers, 2018..=2529, true).await;
888        assert_syncing(&syncer, &store, &[1..=2529], 3001).await;
889
890        // Syncer continues syncing forwards, should include the new head received via HeaderSub (batch 3)
891        handle_session_batch(&mut p2p_mock, &headers, 2530..=3001, true).await;
892        assert_syncing(&syncer, &store, &[1..=3001], 3001).await;
893
894        // Syncer is fulling synced and awaiting for events
895        p2p_mock.expect_no_cmd().await;
896    }
897
898    #[async_test]
899    async fn window_edge() {
900        let month_and_day_ago = Duration::from_secs(31 * 24 * 60 * 60);
901        let mut gen = ExtendedHeaderGenerator::new();
902        gen.set_time(
903            (Time::now() - month_and_day_ago).expect("to not underflow"),
904            Duration::from_secs(1),
905        );
906        let mut headers = gen.next_many(1200);
907        gen.reset_time();
908        headers.append(&mut gen.next_many(2049 - 1200));
909
910        let (syncer, store, mut p2p_mock) = initialized_syncer(headers[2048].clone()).await;
911        assert_syncing(&syncer, &store, &[2049..=2049], 2049).await;
912
913        // Syncer requested the first batch
914        handle_session_batch(&mut p2p_mock, &headers, 1537..=2048, true).await;
915        assert_syncing(&syncer, &store, &[1537..=2049], 2049).await;
916
917        // Syncer requested the second batch hitting the syncing window
918        handle_session_batch(&mut p2p_mock, &headers, 1025..=1536, true).await;
919        assert_syncing(&syncer, &store, &[1025..=2049], 2049).await;
920
921        // Syncer is fully synced and awaiting for events
922        p2p_mock.expect_no_cmd().await;
923    }
924
925    #[async_test]
926    async fn start_with_filled_store() {
927        let events = EventChannel::new();
928        let (p2p, mut p2p_mock) = P2p::mocked();
929        let (store, mut gen) = gen_filled_store(25).await;
930        let store = Arc::new(store);
931
932        let mut headers = gen.next_many(520);
933        let network_head = gen.next(); // height 546
934
935        let syncer = Syncer::start(SyncerArgs {
936            p2p: Arc::new(p2p),
937            store: store.clone(),
938            event_pub: events.publisher(),
939            batch_size: 512,
940            syncing_window: DEFAULT_SAMPLING_WINDOW,
941        })
942        .unwrap();
943
944        p2p_mock.announce_trusted_peer_connected();
945
946        // Syncer asks for current HEAD
947        let (height, amount, respond_to) = p2p_mock.expect_header_request_for_height_cmd().await;
948        assert_eq!(height, 0);
949        assert_eq!(amount, 1);
950        respond_to.send(Ok(vec![network_head.clone()])).unwrap();
951
952        // Now Syncer initializes HeaderSub with the latest HEAD
953        let head_from_syncer = p2p_mock.expect_init_header_sub().await;
954        assert_eq!(head_from_syncer, network_head);
955
956        assert_syncing(&syncer, &store, &[1..=25, 546..=546], 546).await;
957
958        // Syncer requested the first batch
959        handle_session_batch(&mut p2p_mock, &headers, 34..=545, true).await;
960        assert_syncing(&syncer, &store, &[1..=25, 34..=546], 546).await;
961
962        // Syncer requested the remaining batch ([26, 33])
963        let (height, amount, respond_to) = p2p_mock.expect_header_request_for_height_cmd().await;
964        assert_eq!(height, 26);
965        assert_eq!(amount, 8);
966        respond_to
967            .send(Ok(headers.drain(..8).collect()))
968            .map_err(|_| "headers [538, 545]")
969            .unwrap();
970        assert_syncing(&syncer, &store, &[1..=546], 546).await;
971
972        // Syncer is fulling synced and awaiting for events
973        p2p_mock.expect_no_cmd().await;
974    }
975
976    #[async_test]
977    async fn stop_syncer() {
978        let mut gen = ExtendedHeaderGenerator::new();
979        let head = gen.next();
980
981        let (syncer, _store, mut p2p_mock) = initialized_syncer(head.clone()).await;
982
983        // network head height == 1, so nothing else is produced.
984        p2p_mock.expect_no_cmd().await;
985
986        syncer.stop();
987        // Wait for Worker to stop
988        sleep(Duration::from_millis(1)).await;
989        assert!(matches!(
990            syncer.info().await.unwrap_err(),
991            SyncerError::WorkerDied
992        ));
993    }
994
995    #[async_test]
996    async fn all_peers_disconnected() {
997        let mut gen = ExtendedHeaderGenerator::new();
998
999        let _gap = gen.next_many(24);
1000        let header25 = gen.next();
1001        let _gap = gen.next_many(4);
1002        let header30 = gen.next();
1003        let _gap = gen.next_many(4);
1004        let header35 = gen.next();
1005
1006        // Start Syncer and report height 30 as HEAD
1007        let (syncer, store, mut p2p_mock) = initialized_syncer(header30).await;
1008
1009        // Wait for the request but do not reply to it.
1010        handle_session_batch(&mut p2p_mock, &[], 1..=29, false).await;
1011
1012        p2p_mock.announce_all_peers_disconnected();
1013        // Syncer is now back to `connecting_event_loop`.
1014        p2p_mock.expect_no_cmd().await;
1015
1016        // Accounce a non-trusted peer. Syncer in `connecting_event_loop` can progress only
1017        // if a trusted peer is connected.
1018        p2p_mock.announce_peer_connected();
1019        p2p_mock.expect_no_cmd().await;
1020
1021        // Accounce a trusted peer.
1022        p2p_mock.announce_trusted_peer_connected();
1023
1024        // Now syncer will send request for HEAD.
1025        let (height, amount, respond_to) = p2p_mock.expect_header_request_for_height_cmd().await;
1026        assert_eq!(height, 0);
1027        assert_eq!(amount, 1);
1028
1029        // Report an older head. Syncer should not accept it.
1030        respond_to.send(Ok(vec![header25])).unwrap();
1031        assert_syncing(&syncer, &store, &[30..=30], 30).await;
1032
1033        // Syncer will request HEAD again after some time.
1034        sleep(Duration::from_secs(1)).await;
1035        let (height, amount, respond_to) = p2p_mock.expect_header_request_for_height_cmd().await;
1036        assert_eq!(height, 0);
1037        assert_eq!(amount, 1);
1038
1039        // Report newer HEAD than before.
1040        respond_to.send(Ok(vec![header35.clone()])).unwrap();
1041        assert_syncing(&syncer, &store, &[30..=30, 35..=35], 35).await;
1042
1043        // Syncer initializes HeaderSub with the latest HEAD.
1044        let head_from_syncer = p2p_mock.expect_init_header_sub().await;
1045        assert_eq!(head_from_syncer, header35);
1046
1047        // Syncer now is in `connected_event_loop` and will try to sync the gap
1048        // that is closer to HEAD.
1049        let (height, amount, _respond_to) = p2p_mock.expect_header_request_for_height_cmd().await;
1050        assert_eq!(height, 31);
1051        assert_eq!(amount, 4);
1052
1053        p2p_mock.announce_all_peers_disconnected();
1054        p2p_mock.expect_no_cmd().await;
1055    }
1056
1057    #[async_test]
1058    async fn all_peers_disconnected_and_no_network_head_progress() {
1059        let mut gen = ExtendedHeaderGenerator::new_from_height(30);
1060
1061        let header30 = gen.next();
1062
1063        // Start Syncer and report height 30 as HEAD
1064        let (syncer, store, mut p2p_mock) = initialized_syncer(header30.clone()).await;
1065
1066        // Wait for the request but do not reply to it.
1067        handle_session_batch(&mut p2p_mock, &[], 1..=29, false).await;
1068
1069        p2p_mock.announce_all_peers_disconnected();
1070        // Syncer is now back to `connecting_event_loop`.
1071        p2p_mock.expect_no_cmd().await;
1072
1073        // Accounce a non-trusted peer. Syncer in `connecting_event_loop` can progress only
1074        // if a trusted peer is connected.
1075        p2p_mock.announce_peer_connected();
1076        p2p_mock.expect_no_cmd().await;
1077
1078        // Accounce a trusted peer.
1079        p2p_mock.announce_trusted_peer_connected();
1080
1081        // Now syncer will send request for HEAD.
1082        let (height, amount, respond_to) = p2p_mock.expect_header_request_for_height_cmd().await;
1083        assert_eq!(height, 0);
1084        assert_eq!(amount, 1);
1085
1086        // Report the same HEAD as before.
1087        respond_to.send(Ok(vec![header30.clone()])).unwrap();
1088        assert_syncing(&syncer, &store, &[30..=30], 30).await;
1089
1090        // Syncer initializes HeaderSub with the latest HEAD.
1091        let head_from_syncer = p2p_mock.expect_init_header_sub().await;
1092        assert_eq!(head_from_syncer, header30);
1093
1094        // Syncer now is in `connected_event_loop` and will try to sync the gap
1095        handle_session_batch(&mut p2p_mock, &[], 1..=29, false).await;
1096
1097        p2p_mock.announce_all_peers_disconnected();
1098        p2p_mock.expect_no_cmd().await;
1099    }
1100
1101    #[async_test]
1102    async fn non_contiguous_response() {
1103        let mut gen = ExtendedHeaderGenerator::new();
1104        let mut headers = gen.next_many(20);
1105
1106        // Start Syncer and report last header as network head
1107        let (syncer, store, mut p2p_mock) = initialized_syncer(headers[19].clone()).await;
1108
1109        let header10 = headers[10].clone();
1110        // make a gap in response, preserving the amount of headers returned
1111        headers[10] = headers[11].clone();
1112
1113        // Syncer requests missing headers
1114        handle_session_batch(&mut p2p_mock, &headers, 1..=19, true).await;
1115
1116        // Syncer should not apply headers from invalid response
1117        assert_syncing(&syncer, &store, &[20..=20], 20).await;
1118
1119        // correct the response
1120        headers[10] = header10;
1121
1122        // Syncer requests missing headers again
1123        handle_session_batch(&mut p2p_mock, &headers, 1..=19, true).await;
1124
1125        // With a correct resposne, syncer should update the store
1126        assert_syncing(&syncer, &store, &[1..=20], 20).await;
1127    }
1128
1129    #[async_test]
1130    async fn another_chain_response() {
1131        let headers = ExtendedHeaderGenerator::new().next_many(20);
1132        let headers_prime = ExtendedHeaderGenerator::new().next_many(20);
1133
1134        // Start Syncer and report last header as network head
1135        let (syncer, store, mut p2p_mock) = initialized_syncer(headers[19].clone()).await;
1136
1137        // Syncer requests missing headers
1138        handle_session_batch(&mut p2p_mock, &headers_prime, 1..=19, true).await;
1139
1140        // Syncer should not apply headers from invalid response
1141        assert_syncing(&syncer, &store, &[20..=20], 20).await;
1142
1143        // Syncer requests missing headers again
1144        handle_session_batch(&mut p2p_mock, &headers, 1..=19, true).await;
1145
1146        // With a correct resposne, syncer should update the store
1147        assert_syncing(&syncer, &store, &[1..=20], 20).await;
1148    }
1149
1150    async fn assert_syncing(
1151        syncer: &Syncer<InMemoryStore>,
1152        store: &InMemoryStore,
1153        expected_synced_ranges: &[RangeInclusive<u64>],
1154        expected_subjective_head: u64,
1155    ) {
1156        // Syncer receives responds on the same loop that receive other events.
1157        // Wait a bit to be processed.
1158        sleep(Duration::from_millis(1)).await;
1159
1160        let store_ranges = store.get_stored_header_ranges().await.unwrap();
1161        let syncing_info = syncer.info().await.unwrap();
1162
1163        assert_eq!(store_ranges.as_ref(), expected_synced_ranges);
1164        assert_eq!(syncing_info.stored_headers.as_ref(), expected_synced_ranges);
1165        assert_eq!(syncing_info.subjective_head, expected_subjective_head);
1166    }
1167
1168    async fn initialized_syncer(
1169        head: ExtendedHeader,
1170    ) -> (Syncer<InMemoryStore>, Arc<InMemoryStore>, MockP2pHandle) {
1171        let events = EventChannel::new();
1172        let (mock, mut handle) = P2p::mocked();
1173        let store = Arc::new(InMemoryStore::new());
1174
1175        let syncer = Syncer::start(SyncerArgs {
1176            p2p: Arc::new(mock),
1177            store: store.clone(),
1178            event_pub: events.publisher(),
1179            batch_size: 512,
1180            syncing_window: DEFAULT_SAMPLING_WINDOW,
1181        })
1182        .unwrap();
1183
1184        // Syncer is waiting for a trusted peer to connect
1185        handle.expect_no_cmd().await;
1186        handle.announce_peer_connected();
1187        handle.expect_no_cmd().await;
1188        handle.announce_trusted_peer_connected();
1189
1190        // After connection is established Syncer asks current network HEAD
1191        let (height, amount, respond_to) = handle.expect_header_request_for_height_cmd().await;
1192        assert_eq!(height, 0);
1193        assert_eq!(amount, 1);
1194        respond_to.send(Ok(vec![head.clone()])).unwrap();
1195
1196        // Now Syncer initializes HeaderSub with the latest HEAD
1197        let head_from_syncer = handle.expect_init_header_sub().await;
1198        assert_eq!(head_from_syncer, head);
1199
1200        let head_height = head.height().value();
1201        assert_syncing(&syncer, &store, &[head_height..=head_height], head_height).await;
1202
1203        (syncer, store, handle)
1204    }
1205
1206    async fn handle_session_batch(
1207        p2p_mock: &mut MockP2pHandle,
1208        remaining_headers: &[ExtendedHeader],
1209        range: BlockRange,
1210        respond: bool,
1211    ) {
1212        range.validate().unwrap();
1213
1214        let mut ranges_to_request = BlockRanges::new();
1215        ranges_to_request.insert_relaxed(&range).unwrap();
1216
1217        let mut no_respond_chans = Vec::new();
1218
1219        for _ in 0..requests_in_session(range.len()) {
1220            let (height, amount, respond_to) =
1221                p2p_mock.expect_header_request_for_height_cmd().await;
1222
1223            let requested_range = height..=height + amount - 1;
1224            ranges_to_request.remove_strict(requested_range);
1225
1226            if respond {
1227                let header_index = remaining_headers
1228                    .iter()
1229                    .position(|h| h.height().value() == height)
1230                    .expect("height not found in provided headers");
1231
1232                let response_range =
1233                    remaining_headers[header_index..header_index + amount as usize].to_vec();
1234                respond_to
1235                    .send(Ok(response_range))
1236                    .map_err(|_| format!("headers [{}, {}]", height, height + amount - 1))
1237                    .unwrap();
1238            } else {
1239                no_respond_chans.push(respond_to);
1240            }
1241        }
1242
1243        // Real libp2p implementation will create a timeout error if the peer does not
1244        // respond. We need to simulate that, otherwise we end up with some undesirable
1245        // behaviours in Syncer.
1246        if !respond {
1247            spawn(async move {
1248                sleep(Duration::from_secs(10)).await;
1249
1250                for respond_chan in no_respond_chans {
1251                    respond_chan.maybe_send_err(P2pError::HeaderEx(
1252                        HeaderExError::OutboundFailure(OutboundFailure::Timeout),
1253                    ));
1254                }
1255            });
1256        }
1257
1258        assert!(
1259            ranges_to_request.is_empty(),
1260            "Some headers weren't requested. expected range: {}, not requested: {}",
1261            range.display(),
1262            ranges_to_request
1263        );
1264    }
1265
1266    fn requests_in_session(headers: u64) -> usize {
1267        let max_requests = headers.div_ceil(header_session::MAX_AMOUNT_PER_REQ) as usize;
1268        let min_requests = headers.div_ceil(header_session::MIN_AMOUNT_PER_REQ) as usize;
1269
1270        if max_requests > header_session::MAX_CONCURRENT_REQS {
1271            // if we have to do more requests than our concurrency limit anyway
1272            max_requests
1273        } else {
1274            // otherwise we can handle batch fully concurrent
1275            header_session::MAX_CONCURRENT_REQS.min(min_requests)
1276        }
1277    }
1278
1279    impl BlockRanges {
1280        fn remove_strict(&mut self, range: BlockRange) {
1281            for stored in self.as_ref() {
1282                if stored.contains(range.start()) && stored.contains(range.end()) {
1283                    self.remove_relaxed(range).unwrap();
1284                    return;
1285                }
1286            }
1287
1288            panic!("block ranges ({self}) don't contain {}", range.display());
1289        }
1290    }
1291}