lumina_node/
syncer.rs

1//! Component responsible for synchronizing block headers announced in the Celestia network.
2//!
3//! It starts by asking the trusted peers for their current head headers and picks
4//! the latest header returned by at least two of them as the initial synchronization target
5//! called `subjective_head`.
6//!
7//! Then it starts synchronizing from the genesis header up to the target requesting headers
8//! on the `header-ex` p2p protocol. In the meantime, it constantly checks for the latest
9//! headers announced on the `header-sub` p2p protocol to keep the `subjective_head` as close
10//! to the `network_head` as possible.
11
12use std::marker::PhantomData;
13use std::pin::pin;
14use std::sync::Arc;
15use std::time::Duration;
16
17use backoff::backoff::Backoff;
18use backoff::ExponentialBackoffBuilder;
19use celestia_types::ExtendedHeader;
20use lumina_utils::executor::{spawn, JoinHandle};
21use lumina_utils::time::{sleep, Instant, Interval};
22use serde::{Deserialize, Serialize};
23use tendermint::Time;
24use tokio::select;
25use tokio::sync::{mpsc, oneshot};
26use tokio_util::sync::CancellationToken;
27use tracing::{debug, error, info, instrument, warn};
28
29use crate::block_ranges::{BlockRange, BlockRangeExt, BlockRanges};
30use crate::events::{EventPublisher, NodeEvent};
31use crate::p2p::{P2p, P2pError};
32use crate::store::{Store, StoreError};
33use crate::utils::{FusedReusableFuture, OneshotSenderExt};
34
35type Result<T, E = SyncerError> = std::result::Result<T, E>;
36
37const TRY_INIT_BACKOFF_MAX_INTERVAL: Duration = Duration::from_secs(60);
38
39/// Representation of all the errors that can occur in `Syncer` component.
40#[derive(Debug, thiserror::Error)]
41pub enum SyncerError {
42    /// An error propagated from the `P2p` component.
43    #[error("P2p: {0}")]
44    P2p(#[from] P2pError),
45
46    /// An error propagated from the [`Store`] component.
47    #[error("Store: {0}")]
48    Store(#[from] StoreError),
49
50    /// The worker has died.
51    #[error("Worker died")]
52    WorkerDied,
53
54    /// Channel closed unexpectedly.
55    #[error("Channel closed unexpectedly")]
56    ChannelClosedUnexpectedly,
57}
58
59impl SyncerError {
60    pub(crate) fn is_fatal(&self) -> bool {
61        match self {
62            SyncerError::P2p(e) => e.is_fatal(),
63            SyncerError::Store(e) => e.is_fatal(),
64            SyncerError::WorkerDied | SyncerError::ChannelClosedUnexpectedly => true,
65        }
66    }
67}
68
69impl From<oneshot::error::RecvError> for SyncerError {
70    fn from(_value: oneshot::error::RecvError) -> Self {
71        SyncerError::ChannelClosedUnexpectedly
72    }
73}
74
75/// Component responsible for synchronizing block headers from the network.
76#[derive(Debug)]
77pub(crate) struct Syncer<S>
78where
79    S: Store + 'static,
80{
81    cmd_tx: mpsc::Sender<SyncerCmd>,
82    cancellation_token: CancellationToken,
83    join_handle: JoinHandle,
84    _store: PhantomData<S>,
85}
86
87/// Arguments used to configure the [`Syncer`].
88pub(crate) struct SyncerArgs<S>
89where
90    S: Store + 'static,
91{
92    /// Handler for the peer to peer messaging.
93    pub(crate) p2p: Arc<P2p>,
94    /// Headers storage.
95    pub(crate) store: Arc<S>,
96    /// Event publisher.
97    pub(crate) event_pub: EventPublisher,
98    /// Batch size.
99    pub(crate) batch_size: u64,
100    /// Syncing window
101    pub(crate) syncing_window: Duration,
102}
103
104#[derive(Debug)]
105enum SyncerCmd {
106    GetInfo {
107        respond_to: oneshot::Sender<SyncingInfo>,
108    },
109}
110
111/// Status of the synchronization.
112#[derive(Debug, Serialize, Deserialize)]
113pub struct SyncingInfo {
114    /// Ranges of headers that are already synchronised
115    pub stored_headers: BlockRanges,
116    /// Syncing target. The latest height seen in the network that was successfully verified.
117    pub subjective_head: u64,
118}
119
120impl<S> Syncer<S>
121where
122    S: Store,
123{
124    /// Create and start the [`Syncer`].
125    pub(crate) fn start(args: SyncerArgs<S>) -> Result<Self> {
126        let cancellation_token = CancellationToken::new();
127        let event_pub = args.event_pub.clone();
128        let (cmd_tx, cmd_rx) = mpsc::channel(16);
129        let mut worker = Worker::new(args, cancellation_token.child_token(), cmd_rx)?;
130
131        let join_handle = spawn(async move {
132            if let Err(e) = worker.run().await {
133                error!("Syncer stopped because of a fatal error: {e}");
134
135                event_pub.send(NodeEvent::FatalSyncerError {
136                    error: e.to_string(),
137                });
138            }
139        });
140
141        Ok(Syncer {
142            cancellation_token,
143            cmd_tx,
144            join_handle,
145            _store: PhantomData,
146        })
147    }
148
149    /// Stop the worker.
150    pub(crate) fn stop(&self) {
151        // Singal the Worker to stop.
152        self.cancellation_token.cancel();
153    }
154
155    /// Wait until worker is completely stopped.
156    pub(crate) async fn join(&self) {
157        self.join_handle.join().await;
158    }
159
160    async fn send_command(&self, cmd: SyncerCmd) -> Result<()> {
161        self.cmd_tx
162            .send(cmd)
163            .await
164            .map_err(|_| SyncerError::WorkerDied)
165    }
166
167    /// Get the current synchronization status.
168    ///
169    /// # Errors
170    ///
171    /// This function will return an error if the [`Syncer`] has been stopped.
172    pub(crate) async fn info(&self) -> Result<SyncingInfo> {
173        let (tx, rx) = oneshot::channel();
174
175        self.send_command(SyncerCmd::GetInfo { respond_to: tx })
176            .await?;
177
178        Ok(rx.await?)
179    }
180}
181
182impl<S> Drop for Syncer<S>
183where
184    S: Store,
185{
186    fn drop(&mut self) {
187        self.stop();
188    }
189}
190
191struct Worker<S>
192where
193    S: Store + 'static,
194{
195    cancellation_token: CancellationToken,
196    cmd_rx: mpsc::Receiver<SyncerCmd>,
197    event_pub: EventPublisher,
198    p2p: Arc<P2p>,
199    store: Arc<S>,
200    header_sub_rx: Option<mpsc::Receiver<ExtendedHeader>>,
201    subjective_head_height: Option<u64>,
202    batch_size: u64,
203    ongoing_batch: Ongoing,
204    syncing_window: Duration,
205}
206
207struct Ongoing {
208    range: Option<BlockRange>,
209    task: FusedReusableFuture<(Result<Vec<ExtendedHeader>, P2pError>, Duration)>,
210}
211
212impl<S> Worker<S>
213where
214    S: Store,
215{
216    fn new(
217        args: SyncerArgs<S>,
218        cancellation_token: CancellationToken,
219        cmd_rx: mpsc::Receiver<SyncerCmd>,
220    ) -> Result<Self> {
221        Ok(Worker {
222            cancellation_token,
223            cmd_rx,
224            event_pub: args.event_pub,
225            p2p: args.p2p,
226            store: args.store,
227            header_sub_rx: None,
228            subjective_head_height: None,
229            batch_size: args.batch_size,
230            ongoing_batch: Ongoing {
231                range: None,
232                task: FusedReusableFuture::terminated(),
233            },
234            syncing_window: args.syncing_window,
235        })
236    }
237
238    async fn run(&mut self) -> Result<()> {
239        loop {
240            if self.cancellation_token.is_cancelled() {
241                break;
242            }
243
244            self.connecting_event_loop().await?;
245
246            if self.cancellation_token.is_cancelled() {
247                break;
248            }
249
250            self.connected_event_loop().await?;
251        }
252
253        debug!("Syncer stopped");
254        Ok(())
255    }
256
257    /// The responsibility of this event loop is to await a trusted peer to
258    /// connect and get the network head, while accepting commands.
259    ///
260    /// NOTE: Only fatal errors should be propagated!
261    async fn connecting_event_loop(&mut self) -> Result<()> {
262        debug!("Entering connecting_event_loop");
263
264        let mut report_interval = Interval::new(Duration::from_secs(60)).await;
265        self.report().await?;
266
267        let mut try_init_fut = pin!(try_init_task(
268            self.p2p.clone(),
269            self.store.clone(),
270            self.event_pub.clone()
271        ));
272
273        loop {
274            select! {
275                _ = self.cancellation_token.cancelled() => {
276                    break;
277                }
278                _ = report_interval.tick() => {
279                    self.report().await?;
280                }
281                res = &mut try_init_fut => {
282                    // `try_init_task` propagates only fatal errors
283                    let (network_head, took) = res?;
284                    let network_head_height = network_head.height().value();
285
286                    info!("Setting initial subjective head to {network_head_height}");
287                    self.set_subjective_head_height(network_head_height);
288
289                    let (header_sub_tx, header_sub_rx) = mpsc::channel(16);
290                    self.p2p.init_header_sub(network_head, header_sub_tx).await?;
291                    self.header_sub_rx = Some(header_sub_rx);
292
293                    self.event_pub.send(NodeEvent::FetchingHeadHeaderFinished {
294                        height: network_head_height,
295                        took,
296                    });
297
298                    break;
299                }
300                Some(cmd) = self.cmd_rx.recv() => {
301                    self.on_cmd(cmd).await?;
302                }
303            }
304        }
305
306        Ok(())
307    }
308
309    /// The reponsibility of this event loop is to start the syncing process,
310    /// handles events from HeaderSub, and accept commands.
311    ///
312    /// NOTE: Only fatal errors should be propagated!
313    async fn connected_event_loop(&mut self) -> Result<()> {
314        debug!("Entering connected_event_loop");
315
316        let mut report_interval = Interval::new(Duration::from_secs(60)).await;
317        let mut peer_tracker_info_watcher = self.p2p.peer_tracker_info_watcher();
318
319        // Check if connection status changed before creating the watcher
320        if peer_tracker_info_watcher.borrow().num_connected_peers == 0 {
321            warn!("All peers disconnected");
322            return Ok(());
323        }
324
325        self.fetch_next_batch().await?;
326        self.report().await?;
327
328        loop {
329            select! {
330                _ = self.cancellation_token.cancelled() => {
331                    break;
332                }
333                _ = peer_tracker_info_watcher.changed() => {
334                    if peer_tracker_info_watcher.borrow().num_connected_peers == 0 {
335                        warn!("All peers disconnected");
336                        break;
337                    }
338                }
339                _ = report_interval.tick() => {
340                    self.report().await?;
341                }
342                res = header_sub_recv(self.header_sub_rx.as_mut()) => {
343                    let header = res?;
344                    self.on_header_sub_message(header).await?;
345                    self.fetch_next_batch().await?;
346                }
347                Some(cmd) = self.cmd_rx.recv() => {
348                    self.on_cmd(cmd).await?;
349                }
350                (res, took) = &mut self.ongoing_batch.task => {
351                    self.on_fetch_next_batch_result(res, took).await?;
352                    self.fetch_next_batch().await?;
353                }
354            }
355        }
356
357        if let Some(ongoing) = self.ongoing_batch.range.take() {
358            warn!("Cancelling fetching of {}", ongoing.display());
359            self.ongoing_batch.task.terminate();
360        }
361
362        self.header_sub_rx.take();
363
364        Ok(())
365    }
366
367    async fn syncing_info(&self) -> Result<SyncingInfo> {
368        Ok(SyncingInfo {
369            stored_headers: self.store.get_stored_header_ranges().await?,
370            subjective_head: self.subjective_head_height.unwrap_or(0),
371        })
372    }
373
374    #[instrument(skip_all)]
375    async fn report(&mut self) -> Result<()> {
376        let SyncingInfo {
377            stored_headers,
378            subjective_head,
379        } = self.syncing_info().await?;
380
381        let ongoing_batch = self
382            .ongoing_batch
383            .range
384            .as_ref()
385            .map(|range| format!("{}", range.display()))
386            .unwrap_or_else(|| "None".to_string());
387
388        info!("syncing: head: {subjective_head}, stored headers: {stored_headers}, ongoing batches: {ongoing_batch}");
389        Ok(())
390    }
391
392    async fn on_cmd(&mut self, cmd: SyncerCmd) -> Result<()> {
393        match cmd {
394            SyncerCmd::GetInfo { respond_to } => {
395                let info = self.syncing_info().await?;
396                respond_to.maybe_send(info);
397            }
398        }
399
400        Ok(())
401    }
402
403    #[instrument(skip_all)]
404    async fn on_header_sub_message(&mut self, new_head: ExtendedHeader) -> Result<()> {
405        let new_head_height = new_head.height().value();
406
407        self.set_subjective_head_height(new_head_height);
408
409        if let Ok(store_head_height) = self.store.head_height().await {
410            // If our new header is adjacent to the HEAD of the store
411            if store_head_height + 1 == new_head_height {
412                // Header is already verified by HeaderSub and will be validated against previous
413                // head on insert
414                if self.store.insert(new_head).await.is_ok() {
415                    self.event_pub.send(NodeEvent::AddedHeaderFromHeaderSub {
416                        height: new_head_height,
417                    });
418                }
419            }
420        }
421
422        Ok(())
423    }
424
425    fn set_subjective_head_height(&mut self, height: u64) {
426        if let Some(old_height) = self.subjective_head_height {
427            if height <= old_height {
428                return;
429            }
430        }
431
432        self.subjective_head_height = Some(height);
433    }
434
435    #[instrument(skip_all)]
436    async fn fetch_next_batch(&mut self) -> Result<()> {
437        debug_assert_eq!(
438            self.ongoing_batch.range.is_none(),
439            self.ongoing_batch.task.is_terminated()
440        );
441
442        if !self.ongoing_batch.task.is_terminated() {
443            // Another batch is ongoing. We do not parallelize `Syncer`
444            // by design. Any parallel requests are done in the
445            // HeaderEx client through `Session`.
446            //
447            // Nothing to schedule
448            return Ok(());
449        }
450
451        if self.p2p.peer_tracker_info().num_connected_peers == 0 {
452            // No connected peers. We can't do the request.
453            // We will recover from this in `run`.
454            return Ok(());
455        }
456
457        let Some(subjective_head_height) = self.subjective_head_height else {
458            // Nothing to schedule
459            return Ok(());
460        };
461
462        let store_ranges = self.store.get_stored_header_ranges().await?;
463
464        let next_batch = calculate_range_to_fetch(
465            subjective_head_height,
466            store_ranges.as_ref(),
467            self.batch_size,
468        );
469
470        if next_batch.is_empty() {
471            // no headers to fetch
472            return Ok(());
473        }
474
475        // make sure we're inside the syncing window before we start
476        match self.store.get_by_height(next_batch.end() + 1).await {
477            Ok(known_header) => {
478                if !self.in_syncing_window(&known_header) {
479                    return Ok(());
480                }
481            }
482            Err(StoreError::NotFound) => {}
483            Err(e) => return Err(e.into()),
484        }
485
486        self.event_pub.send(NodeEvent::FetchingHeadersStarted {
487            from_height: *next_batch.start(),
488            to_height: *next_batch.end(),
489        });
490
491        let p2p = self.p2p.clone();
492
493        self.ongoing_batch.range = Some(next_batch.clone());
494
495        self.ongoing_batch.task.set(async move {
496            let now = Instant::now();
497            let res = p2p.get_unverified_header_range(next_batch).await;
498            (res, now.elapsed())
499        });
500
501        Ok(())
502    }
503
504    /// Handle the result of the batch request and propagate fatal errors.
505    #[instrument(skip_all)]
506    async fn on_fetch_next_batch_result(
507        &mut self,
508        res: Result<Vec<ExtendedHeader>, P2pError>,
509        took: Duration,
510    ) -> Result<()> {
511        let range = self
512            .ongoing_batch
513            .range
514            .take()
515            .expect("ongoing_batch not initialized correctly");
516
517        let from_height = *range.start();
518        let to_height = *range.end();
519
520        let headers = match res {
521            Ok(headers) => headers,
522            Err(e) => {
523                if e.is_fatal() {
524                    return Err(e.into());
525                }
526
527                self.event_pub.send(NodeEvent::FetchingHeadersFailed {
528                    from_height,
529                    to_height,
530                    error: e.to_string(),
531                    took,
532                });
533
534                return Ok(());
535            }
536        };
537
538        if let Err(e) = self.store.insert(headers).await {
539            if e.is_fatal() {
540                return Err(e.into());
541            }
542
543            self.event_pub.send(NodeEvent::FetchingHeadersFailed {
544                from_height,
545                to_height,
546                error: format!("Failed to store headers: {e}"),
547                took,
548            });
549        }
550
551        self.event_pub.send(NodeEvent::FetchingHeadersFinished {
552            from_height,
553            to_height,
554            took,
555        });
556
557        Ok(())
558    }
559
560    fn in_syncing_window(&self, header: &ExtendedHeader) -> bool {
561        let syncing_window_start =
562            Time::now()
563                .checked_sub(self.syncing_window)
564                .unwrap_or_else(|| {
565                    warn!(
566                        "underflow when computing syncing window start, defaulting to unix epoch"
567                    );
568                    Time::unix_epoch()
569                });
570
571        header.time().after(syncing_window_start)
572    }
573}
574
575/// based on the stored headers and current network head height, calculate range of headers that
576/// should be fetched from the network, anchored on already existing header range in store
577fn calculate_range_to_fetch(
578    subjective_head_height: u64,
579    store_headers: &[BlockRange],
580    limit: u64,
581) -> BlockRange {
582    let mut store_headers_iter = store_headers.iter().rev();
583
584    let Some(store_head_range) = store_headers_iter.next() else {
585        // empty store, we're missing everything
586        let range = 1..=subjective_head_height;
587        return range.truncate_right(limit);
588    };
589
590    if store_head_range.end() < &subjective_head_height {
591        // if we haven't caught up with the network head, start from there
592        let range = store_head_range.end() + 1..=subjective_head_height;
593        return range.truncate_right(limit);
594    }
595
596    // there exists a range contiguous with network head. inspect previous range end
597    let penultimate_range_end = store_headers_iter.next().map(|r| *r.end()).unwrap_or(0);
598
599    let range = penultimate_range_end + 1..=store_head_range.start().saturating_sub(1);
600    range.truncate_left(limit)
601}
602
603#[instrument(skip_all)]
604async fn try_init_task<S>(
605    p2p: Arc<P2p>,
606    store: Arc<S>,
607    event_pub: EventPublisher,
608) -> Result<(ExtendedHeader, Duration)>
609where
610    S: Store + 'static,
611{
612    let now = Instant::now();
613    let mut event_reported = false;
614    let mut backoff = ExponentialBackoffBuilder::default()
615        .with_max_interval(TRY_INIT_BACKOFF_MAX_INTERVAL)
616        .with_max_elapsed_time(None)
617        .build();
618
619    loop {
620        match try_init(&p2p, &*store, &event_pub, &mut event_reported).await {
621            Ok(network_head) => {
622                return Ok((network_head, now.elapsed()));
623            }
624            Err(e) if e.is_fatal() => {
625                return Err(e);
626            }
627            Err(e) => {
628                let sleep_dur = backoff
629                    .next_backoff()
630                    .expect("backoff never stops retrying");
631
632                warn!(
633                    "Initialization of subjective head failed: {e}. Trying again in {sleep_dur:?}."
634                );
635                sleep(sleep_dur).await;
636            }
637        }
638    }
639}
640
641async fn try_init<S>(
642    p2p: &P2p,
643    store: &S,
644    event_pub: &EventPublisher,
645    event_reported: &mut bool,
646) -> Result<ExtendedHeader>
647where
648    S: Store,
649{
650    p2p.wait_connected_trusted().await?;
651
652    if !*event_reported {
653        event_pub.send(NodeEvent::FetchingHeadHeaderStarted);
654        *event_reported = true;
655    }
656
657    let network_head = p2p.get_head_header().await?;
658
659    // If the network head and the store head have the same height,
660    // then `insert` will error because of insertion contraints.
661    // However, if both headers are the exactly the same, we
662    // can skip inserting, as the header is already there.
663    //
664    // This can happen in case of fast node restart.
665    let try_insert = match store.get_head().await {
666        // `ExtendedHeader.commit.signatures` can be different set on each fetch
667        // so we compare only hashes.
668        Ok(store_head) => store_head.hash() != network_head.hash(),
669        Err(StoreError::NotFound) => true,
670        Err(e) => return Err(e.into()),
671    };
672
673    if try_insert {
674        // Insert HEAD to the store and initialize header-sub.
675        // Normal insertion checks still apply here.
676        store.insert(network_head.clone()).await?;
677    }
678
679    Ok(network_head)
680}
681
682async fn header_sub_recv(
683    rx: Option<&mut mpsc::Receiver<ExtendedHeader>>,
684) -> Result<ExtendedHeader> {
685    rx.expect("header-sub not initialized")
686        .recv()
687        .await
688        .ok_or(SyncerError::P2p(P2pError::WorkerDied))
689}
690
691#[cfg(test)]
692mod tests {
693    use std::ops::RangeInclusive;
694
695    use super::*;
696    use crate::block_ranges::{BlockRange, BlockRangeExt};
697    use crate::events::EventChannel;
698    use crate::node::HeaderExError;
699    use crate::node::DEFAULT_SAMPLING_WINDOW;
700    use crate::p2p::header_session;
701    use crate::store::InMemoryStore;
702    use crate::test_utils::{gen_filled_store, MockP2pHandle};
703    use crate::utils::OneshotResultSenderExt;
704    use celestia_types::test_utils::ExtendedHeaderGenerator;
705    use libp2p::request_response::OutboundFailure;
706    use lumina_utils::test_utils::async_test;
707
708    #[test]
709    fn calculate_range_to_fetch_test_header_limit() {
710        let head_height = 1024;
711        let ranges = [256..=512];
712
713        let fetch_range = calculate_range_to_fetch(head_height, &ranges, 16);
714        assert_eq!(fetch_range, 513..=528);
715
716        let fetch_range = calculate_range_to_fetch(head_height, &ranges, 511);
717        assert_eq!(fetch_range, 513..=1023);
718        let fetch_range = calculate_range_to_fetch(head_height, &ranges, 512);
719        assert_eq!(fetch_range, 513..=1024);
720        let fetch_range = calculate_range_to_fetch(head_height, &ranges, 513);
721        assert_eq!(fetch_range, 513..=1024);
722
723        let fetch_range = calculate_range_to_fetch(head_height, &ranges, 1024);
724        assert_eq!(fetch_range, 513..=1024);
725    }
726
727    #[test]
728    fn calculate_range_to_fetch_empty_store() {
729        let fetch_range = calculate_range_to_fetch(1, &[], 100);
730        assert_eq!(fetch_range, 1..=1);
731
732        let fetch_range = calculate_range_to_fetch(100, &[], 10);
733        assert_eq!(fetch_range, 1..=10);
734
735        let fetch_range = calculate_range_to_fetch(100, &[], 50);
736        assert_eq!(fetch_range, 1..=50);
737    }
738
739    #[test]
740    fn calculate_range_to_fetch_fully_synced() {
741        let fetch_range = calculate_range_to_fetch(1, &[1..=1], 100);
742        assert!(fetch_range.is_empty());
743
744        let fetch_range = calculate_range_to_fetch(100, &[1..=100], 10);
745        assert!(fetch_range.is_empty());
746
747        let fetch_range = calculate_range_to_fetch(100, &[1..=100], 10);
748        assert!(fetch_range.is_empty());
749    }
750
751    #[test]
752    fn calculate_range_to_fetch_caught_up() {
753        let head_height = 4000;
754
755        let fetch_range = calculate_range_to_fetch(head_height, &[3000..=4000], 500);
756        assert_eq!(fetch_range, 2500..=2999);
757        let fetch_range = calculate_range_to_fetch(head_height, &[500..=1000, 3000..=4000], 500);
758        assert_eq!(fetch_range, 2500..=2999);
759        let fetch_range = calculate_range_to_fetch(head_height, &[2500..=2800, 3000..=4000], 500);
760        assert_eq!(fetch_range, 2801..=2999);
761        let fetch_range = calculate_range_to_fetch(head_height, &[2500..=2800, 3000..=4000], 500);
762        assert_eq!(fetch_range, 2801..=2999);
763        let fetch_range = calculate_range_to_fetch(head_height, &[300..=4000], 500);
764        assert_eq!(fetch_range, 1..=299);
765    }
766
767    #[test]
768    fn calculate_range_to_fetch_catching_up() {
769        let head_height = 4000;
770
771        let fetch_range = calculate_range_to_fetch(head_height, &[2000..=3000], 500);
772        assert_eq!(fetch_range, 3001..=3500);
773        let fetch_range = calculate_range_to_fetch(head_height, &[2000..=3500], 500);
774        assert_eq!(fetch_range, 3501..=4000);
775        let fetch_range = calculate_range_to_fetch(head_height, &[1..=2998, 3000..=3800], 500);
776        assert_eq!(fetch_range, 3801..=4000);
777    }
778
779    #[async_test]
780    async fn init_without_genesis_hash() {
781        let events = EventChannel::new();
782        let (mock, mut handle) = P2p::mocked();
783        let mut gen = ExtendedHeaderGenerator::new();
784        let header = gen.next();
785
786        let _syncer = Syncer::start(SyncerArgs {
787            p2p: Arc::new(mock),
788            store: Arc::new(InMemoryStore::new()),
789            event_pub: events.publisher(),
790            batch_size: 512,
791            syncing_window: DEFAULT_SAMPLING_WINDOW,
792        })
793        .unwrap();
794
795        // Syncer is waiting for a trusted peer to connect
796        handle.expect_no_cmd().await;
797        handle.announce_peer_connected();
798        handle.expect_no_cmd().await;
799        handle.announce_trusted_peer_connected();
800
801        // We're syncing from the front, so ask for head first
802        let (height, amount, respond_to) = handle.expect_header_request_for_height_cmd().await;
803        assert_eq!(height, 0);
804        assert_eq!(amount, 1);
805        respond_to.send(Ok(vec![header.clone()])).unwrap();
806
807        // Now Syncer initializes HeaderSub with the latest HEAD
808        let head_from_syncer = handle.expect_init_header_sub().await;
809        assert_eq!(head_from_syncer, header);
810
811        // network head = local head, so nothing else is produced.
812        handle.expect_no_cmd().await;
813    }
814
815    #[async_test]
816    async fn init_with_genesis_hash() {
817        let mut gen = ExtendedHeaderGenerator::new();
818        let head = gen.next();
819
820        let (_syncer, _store, mut p2p_mock) = initialized_syncer(head.clone()).await;
821
822        // network head = local head, so nothing else is produced.
823        p2p_mock.expect_no_cmd().await;
824    }
825
826    #[async_test]
827    async fn syncing() {
828        let mut gen = ExtendedHeaderGenerator::new();
829        let headers = gen.next_many(1500);
830
831        let (syncer, store, mut p2p_mock) = initialized_syncer(headers[1499].clone()).await;
832        assert_syncing(&syncer, &store, &[1500..=1500], 1500).await;
833
834        // Syncer is syncing backwards from the network head (batch 1)
835        handle_session_batch(&mut p2p_mock, &headers, 988..=1499, true).await;
836        assert_syncing(&syncer, &store, &[988..=1500], 1500).await;
837
838        // Syncer is syncing backwards from the network head (batch 2)
839        handle_session_batch(&mut p2p_mock, &headers, 476..=987, true).await;
840        assert_syncing(&syncer, &store, &[476..=1500], 1500).await;
841
842        // New HEAD was received by HeaderSub (height 1501)
843        let header1501 = gen.next();
844        p2p_mock.announce_new_head(header1501.clone());
845        // Height 1501 is adjacent to the last header of Store, so it is appended
846        // immediately
847        assert_syncing(&syncer, &store, &[476..=1501], 1501).await;
848
849        // Syncer is syncing backwards from the network head (batch 3, partial)
850        handle_session_batch(&mut p2p_mock, &headers, 1..=475, true).await;
851        assert_syncing(&syncer, &store, &[1..=1501], 1501).await;
852
853        // Syncer is fulling synced and awaiting for events
854        p2p_mock.expect_no_cmd().await;
855
856        // New HEAD was received by HeaderSub (height 1502), it should be appended immediately
857        let header1502 = gen.next();
858        p2p_mock.announce_new_head(header1502.clone());
859        assert_syncing(&syncer, &store, &[1..=1502], 1502).await;
860        p2p_mock.expect_no_cmd().await;
861
862        // New HEAD was received by HeaderSub (height 1505), it should NOT be appended
863        let headers_1503_1505 = gen.next_many(3);
864        p2p_mock.announce_new_head(headers_1503_1505[2].clone());
865        assert_syncing(&syncer, &store, &[1..=1502], 1505).await;
866
867        // New HEAD is not adjacent to store, so Syncer requests a range
868        handle_session_batch(&mut p2p_mock, &headers_1503_1505, 1503..=1505, true).await;
869        assert_syncing(&syncer, &store, &[1..=1505], 1505).await;
870
871        // New HEAD was received by HeaderSub (height 3000), it should NOT be appended
872        let mut headers = gen.next_many(1495);
873        p2p_mock.announce_new_head(headers[1494].clone());
874        assert_syncing(&syncer, &store, &[1..=1505], 3000).await;
875
876        // Syncer is syncing forwards, anchored on a range already in store (batch 1)
877        handle_session_batch(&mut p2p_mock, &headers, 1506..=2017, true).await;
878        assert_syncing(&syncer, &store, &[1..=2017], 3000).await;
879
880        // New head from header sub added, should NOT be appended
881        headers.push(gen.next());
882        p2p_mock.announce_new_head(headers.last().unwrap().clone());
883        assert_syncing(&syncer, &store, &[1..=2017], 3001).await;
884
885        // Syncer continues syncing forwads (batch 2)
886        handle_session_batch(&mut p2p_mock, &headers, 2018..=2529, true).await;
887        assert_syncing(&syncer, &store, &[1..=2529], 3001).await;
888
889        // Syncer continues syncing forwards, should include the new head received via HeaderSub (batch 3)
890        handle_session_batch(&mut p2p_mock, &headers, 2530..=3001, true).await;
891        assert_syncing(&syncer, &store, &[1..=3001], 3001).await;
892
893        // Syncer is fulling synced and awaiting for events
894        p2p_mock.expect_no_cmd().await;
895    }
896
897    #[async_test]
898    async fn window_edge() {
899        let month_and_day_ago = Duration::from_secs(31 * 24 * 60 * 60);
900        let mut gen = ExtendedHeaderGenerator::new();
901        gen.set_time(
902            (Time::now() - month_and_day_ago).expect("to not underflow"),
903            Duration::from_secs(1),
904        );
905        let mut headers = gen.next_many(1200);
906        gen.reset_time();
907        headers.append(&mut gen.next_many(2049 - 1200));
908
909        let (syncer, store, mut p2p_mock) = initialized_syncer(headers[2048].clone()).await;
910        assert_syncing(&syncer, &store, &[2049..=2049], 2049).await;
911
912        // Syncer requested the first batch
913        handle_session_batch(&mut p2p_mock, &headers, 1537..=2048, true).await;
914        assert_syncing(&syncer, &store, &[1537..=2049], 2049).await;
915
916        // Syncer requested the second batch hitting the syncing window
917        handle_session_batch(&mut p2p_mock, &headers, 1025..=1536, true).await;
918        assert_syncing(&syncer, &store, &[1025..=2049], 2049).await;
919
920        // Syncer is fully synced and awaiting for events
921        p2p_mock.expect_no_cmd().await;
922    }
923
924    #[async_test]
925    async fn start_with_filled_store() {
926        let events = EventChannel::new();
927        let (p2p, mut p2p_mock) = P2p::mocked();
928        let (store, mut gen) = gen_filled_store(25).await;
929        let store = Arc::new(store);
930
931        let mut headers = gen.next_many(520);
932        let network_head = gen.next(); // height 546
933
934        let syncer = Syncer::start(SyncerArgs {
935            p2p: Arc::new(p2p),
936            store: store.clone(),
937            event_pub: events.publisher(),
938            batch_size: 512,
939            syncing_window: DEFAULT_SAMPLING_WINDOW,
940        })
941        .unwrap();
942
943        p2p_mock.announce_trusted_peer_connected();
944
945        // Syncer asks for current HEAD
946        let (height, amount, respond_to) = p2p_mock.expect_header_request_for_height_cmd().await;
947        assert_eq!(height, 0);
948        assert_eq!(amount, 1);
949        respond_to.send(Ok(vec![network_head.clone()])).unwrap();
950
951        // Now Syncer initializes HeaderSub with the latest HEAD
952        let head_from_syncer = p2p_mock.expect_init_header_sub().await;
953        assert_eq!(head_from_syncer, network_head);
954
955        assert_syncing(&syncer, &store, &[1..=25, 546..=546], 546).await;
956
957        // Syncer requested the first batch
958        handle_session_batch(&mut p2p_mock, &headers, 34..=545, true).await;
959        assert_syncing(&syncer, &store, &[1..=25, 34..=546], 546).await;
960
961        // Syncer requested the remaining batch ([26, 33])
962        let (height, amount, respond_to) = p2p_mock.expect_header_request_for_height_cmd().await;
963        assert_eq!(height, 26);
964        assert_eq!(amount, 8);
965        respond_to
966            .send(Ok(headers.drain(..8).collect()))
967            .map_err(|_| "headers [538, 545]")
968            .unwrap();
969        assert_syncing(&syncer, &store, &[1..=546], 546).await;
970
971        // Syncer is fulling synced and awaiting for events
972        p2p_mock.expect_no_cmd().await;
973    }
974
975    #[async_test]
976    async fn stop_syncer() {
977        let mut gen = ExtendedHeaderGenerator::new();
978        let head = gen.next();
979
980        let (syncer, _store, mut p2p_mock) = initialized_syncer(head.clone()).await;
981
982        // network head height == 1, so nothing else is produced.
983        p2p_mock.expect_no_cmd().await;
984
985        syncer.stop();
986        // Wait for Worker to stop
987        sleep(Duration::from_millis(1)).await;
988        assert!(matches!(
989            syncer.info().await.unwrap_err(),
990            SyncerError::WorkerDied
991        ));
992    }
993
994    #[async_test]
995    async fn all_peers_disconnected() {
996        let mut gen = ExtendedHeaderGenerator::new();
997
998        let _gap = gen.next_many(24);
999        let header25 = gen.next();
1000        let _gap = gen.next_many(4);
1001        let header30 = gen.next();
1002        let _gap = gen.next_many(4);
1003        let header35 = gen.next();
1004
1005        // Start Syncer and report height 30 as HEAD
1006        let (syncer, store, mut p2p_mock) = initialized_syncer(header30).await;
1007
1008        // Wait for the request but do not reply to it.
1009        handle_session_batch(&mut p2p_mock, &[], 1..=29, false).await;
1010
1011        p2p_mock.announce_all_peers_disconnected();
1012        // Syncer is now back to `connecting_event_loop`.
1013        p2p_mock.expect_no_cmd().await;
1014
1015        // Accounce a non-trusted peer. Syncer in `connecting_event_loop` can progress only
1016        // if a trusted peer is connected.
1017        p2p_mock.announce_peer_connected();
1018        p2p_mock.expect_no_cmd().await;
1019
1020        // Accounce a trusted peer.
1021        p2p_mock.announce_trusted_peer_connected();
1022
1023        // Now syncer will send request for HEAD.
1024        let (height, amount, respond_to) = p2p_mock.expect_header_request_for_height_cmd().await;
1025        assert_eq!(height, 0);
1026        assert_eq!(amount, 1);
1027
1028        // Report an older head. Syncer should not accept it.
1029        respond_to.send(Ok(vec![header25])).unwrap();
1030        assert_syncing(&syncer, &store, &[30..=30], 30).await;
1031
1032        // Syncer will request HEAD again after some time.
1033        sleep(Duration::from_secs(1)).await;
1034        let (height, amount, respond_to) = p2p_mock.expect_header_request_for_height_cmd().await;
1035        assert_eq!(height, 0);
1036        assert_eq!(amount, 1);
1037
1038        // Report newer HEAD than before.
1039        respond_to.send(Ok(vec![header35.clone()])).unwrap();
1040        assert_syncing(&syncer, &store, &[30..=30, 35..=35], 35).await;
1041
1042        // Syncer initializes HeaderSub with the latest HEAD.
1043        let head_from_syncer = p2p_mock.expect_init_header_sub().await;
1044        assert_eq!(head_from_syncer, header35);
1045
1046        // Syncer now is in `connected_event_loop` and will try to sync the gap
1047        // that is closer to HEAD.
1048        let (height, amount, _respond_to) = p2p_mock.expect_header_request_for_height_cmd().await;
1049        assert_eq!(height, 31);
1050        assert_eq!(amount, 4);
1051
1052        p2p_mock.announce_all_peers_disconnected();
1053        p2p_mock.expect_no_cmd().await;
1054    }
1055
1056    #[async_test]
1057    async fn all_peers_disconnected_and_no_network_head_progress() {
1058        let mut gen = ExtendedHeaderGenerator::new_from_height(30);
1059
1060        let header30 = gen.next();
1061
1062        // Start Syncer and report height 30 as HEAD
1063        let (syncer, store, mut p2p_mock) = initialized_syncer(header30.clone()).await;
1064
1065        // Wait for the request but do not reply to it.
1066        handle_session_batch(&mut p2p_mock, &[], 1..=29, false).await;
1067
1068        p2p_mock.announce_all_peers_disconnected();
1069        // Syncer is now back to `connecting_event_loop`.
1070        p2p_mock.expect_no_cmd().await;
1071
1072        // Accounce a non-trusted peer. Syncer in `connecting_event_loop` can progress only
1073        // if a trusted peer is connected.
1074        p2p_mock.announce_peer_connected();
1075        p2p_mock.expect_no_cmd().await;
1076
1077        // Accounce a trusted peer.
1078        p2p_mock.announce_trusted_peer_connected();
1079
1080        // Now syncer will send request for HEAD.
1081        let (height, amount, respond_to) = p2p_mock.expect_header_request_for_height_cmd().await;
1082        assert_eq!(height, 0);
1083        assert_eq!(amount, 1);
1084
1085        // Report the same HEAD as before.
1086        respond_to.send(Ok(vec![header30.clone()])).unwrap();
1087        assert_syncing(&syncer, &store, &[30..=30], 30).await;
1088
1089        // Syncer initializes HeaderSub with the latest HEAD.
1090        let head_from_syncer = p2p_mock.expect_init_header_sub().await;
1091        assert_eq!(head_from_syncer, header30);
1092
1093        // Syncer now is in `connected_event_loop` and will try to sync the gap
1094        handle_session_batch(&mut p2p_mock, &[], 1..=29, false).await;
1095
1096        p2p_mock.announce_all_peers_disconnected();
1097        p2p_mock.expect_no_cmd().await;
1098    }
1099
1100    #[async_test]
1101    async fn non_contiguous_response() {
1102        let mut gen = ExtendedHeaderGenerator::new();
1103        let mut headers = gen.next_many(20);
1104
1105        // Start Syncer and report last header as network head
1106        let (syncer, store, mut p2p_mock) = initialized_syncer(headers[19].clone()).await;
1107
1108        let header10 = headers[10].clone();
1109        // make a gap in response, preserving the amount of headers returned
1110        headers[10] = headers[11].clone();
1111
1112        // Syncer requests missing headers
1113        handle_session_batch(&mut p2p_mock, &headers, 1..=19, true).await;
1114
1115        // Syncer should not apply headers from invalid response
1116        assert_syncing(&syncer, &store, &[20..=20], 20).await;
1117
1118        // correct the response
1119        headers[10] = header10;
1120
1121        // Syncer requests missing headers again
1122        handle_session_batch(&mut p2p_mock, &headers, 1..=19, true).await;
1123
1124        // With a correct resposne, syncer should update the store
1125        assert_syncing(&syncer, &store, &[1..=20], 20).await;
1126    }
1127
1128    #[async_test]
1129    async fn another_chain_response() {
1130        let headers = ExtendedHeaderGenerator::new().next_many(20);
1131        let headers_prime = ExtendedHeaderGenerator::new().next_many(20);
1132
1133        // Start Syncer and report last header as network head
1134        let (syncer, store, mut p2p_mock) = initialized_syncer(headers[19].clone()).await;
1135
1136        // Syncer requests missing headers
1137        handle_session_batch(&mut p2p_mock, &headers_prime, 1..=19, true).await;
1138
1139        // Syncer should not apply headers from invalid response
1140        assert_syncing(&syncer, &store, &[20..=20], 20).await;
1141
1142        // Syncer requests missing headers again
1143        handle_session_batch(&mut p2p_mock, &headers, 1..=19, true).await;
1144
1145        // With a correct resposne, syncer should update the store
1146        assert_syncing(&syncer, &store, &[1..=20], 20).await;
1147    }
1148
1149    async fn assert_syncing(
1150        syncer: &Syncer<InMemoryStore>,
1151        store: &InMemoryStore,
1152        expected_synced_ranges: &[RangeInclusive<u64>],
1153        expected_subjective_head: u64,
1154    ) {
1155        // Syncer receives responds on the same loop that receive other events.
1156        // Wait a bit to be processed.
1157        sleep(Duration::from_millis(1)).await;
1158
1159        let store_ranges = store.get_stored_header_ranges().await.unwrap();
1160        let syncing_info = syncer.info().await.unwrap();
1161
1162        assert_eq!(store_ranges.as_ref(), expected_synced_ranges);
1163        assert_eq!(syncing_info.stored_headers.as_ref(), expected_synced_ranges);
1164        assert_eq!(syncing_info.subjective_head, expected_subjective_head);
1165    }
1166
1167    async fn initialized_syncer(
1168        head: ExtendedHeader,
1169    ) -> (Syncer<InMemoryStore>, Arc<InMemoryStore>, MockP2pHandle) {
1170        let events = EventChannel::new();
1171        let (mock, mut handle) = P2p::mocked();
1172        let store = Arc::new(InMemoryStore::new());
1173
1174        let syncer = Syncer::start(SyncerArgs {
1175            p2p: Arc::new(mock),
1176            store: store.clone(),
1177            event_pub: events.publisher(),
1178            batch_size: 512,
1179            syncing_window: DEFAULT_SAMPLING_WINDOW,
1180        })
1181        .unwrap();
1182
1183        // Syncer is waiting for a trusted peer to connect
1184        handle.expect_no_cmd().await;
1185        handle.announce_peer_connected();
1186        handle.expect_no_cmd().await;
1187        handle.announce_trusted_peer_connected();
1188
1189        // After connection is established Syncer asks current network HEAD
1190        let (height, amount, respond_to) = handle.expect_header_request_for_height_cmd().await;
1191        assert_eq!(height, 0);
1192        assert_eq!(amount, 1);
1193        respond_to.send(Ok(vec![head.clone()])).unwrap();
1194
1195        // Now Syncer initializes HeaderSub with the latest HEAD
1196        let head_from_syncer = handle.expect_init_header_sub().await;
1197        assert_eq!(head_from_syncer, head);
1198
1199        let head_height = head.height().value();
1200        assert_syncing(&syncer, &store, &[head_height..=head_height], head_height).await;
1201
1202        (syncer, store, handle)
1203    }
1204
1205    async fn handle_session_batch(
1206        p2p_mock: &mut MockP2pHandle,
1207        remaining_headers: &[ExtendedHeader],
1208        range: BlockRange,
1209        respond: bool,
1210    ) {
1211        range.validate().unwrap();
1212
1213        let mut ranges_to_request = BlockRanges::new();
1214        ranges_to_request.insert_relaxed(&range).unwrap();
1215
1216        let mut no_respond_chans = Vec::new();
1217
1218        for _ in 0..requests_in_session(range.len()) {
1219            let (height, amount, respond_to) =
1220                p2p_mock.expect_header_request_for_height_cmd().await;
1221
1222            let requested_range = height..=height + amount - 1;
1223            ranges_to_request.remove_strict(requested_range);
1224
1225            if respond {
1226                let header_index = remaining_headers
1227                    .iter()
1228                    .position(|h| h.height().value() == height)
1229                    .expect("height not found in provided headers");
1230
1231                let response_range =
1232                    remaining_headers[header_index..header_index + amount as usize].to_vec();
1233                respond_to
1234                    .send(Ok(response_range))
1235                    .map_err(|_| format!("headers [{}, {}]", height, height + amount - 1))
1236                    .unwrap();
1237            } else {
1238                no_respond_chans.push(respond_to);
1239            }
1240        }
1241
1242        // Real libp2p implementation will create a timeout error if the peer does not
1243        // respond. We need to simulate that, otherwise we end up with some undesirable
1244        // behaviours in Syncer.
1245        if !respond {
1246            spawn(async move {
1247                sleep(Duration::from_secs(10)).await;
1248
1249                for respond_chan in no_respond_chans {
1250                    respond_chan.maybe_send_err(P2pError::HeaderEx(
1251                        HeaderExError::OutboundFailure(OutboundFailure::Timeout),
1252                    ));
1253                }
1254            });
1255        }
1256
1257        assert!(
1258            ranges_to_request.is_empty(),
1259            "Some headers weren't requested. expected range: {}, not requested: {}",
1260            range.display(),
1261            ranges_to_request
1262        );
1263    }
1264
1265    fn requests_in_session(headers: u64) -> usize {
1266        let max_requests = headers.div_ceil(header_session::MAX_AMOUNT_PER_REQ) as usize;
1267        let min_requests = headers.div_ceil(header_session::MIN_AMOUNT_PER_REQ) as usize;
1268
1269        if max_requests > header_session::MAX_CONCURRENT_REQS {
1270            // if we have to do more requests than our concurrency limit anyway
1271            max_requests
1272        } else {
1273            // otherwise we can handle batch fully concurrent
1274            header_session::MAX_CONCURRENT_REQS.min(min_requests)
1275        }
1276    }
1277
1278    impl BlockRanges {
1279        fn remove_strict(&mut self, range: BlockRange) {
1280            for stored in self.as_ref() {
1281                if stored.contains(range.start()) && stored.contains(range.end()) {
1282                    self.remove_relaxed(range).unwrap();
1283                    return;
1284                }
1285            }
1286
1287            panic!("block ranges ({self}) don't contain {}", range.display());
1288        }
1289    }
1290}