lumina_node/
syncer.rs

1//! Component responsible for synchronizing block headers announced in the Celestia network.
2
3use std::marker::PhantomData;
4use std::pin::pin;
5use std::sync::Arc;
6use std::time::Duration;
7
8use backoff::ExponentialBackoffBuilder;
9use backoff::backoff::Backoff;
10use celestia_types::ExtendedHeader;
11use lumina_utils::executor::{JoinHandle, spawn};
12use lumina_utils::time::{Instant, Interval, sleep};
13use serde::{Deserialize, Serialize};
14use tendermint::Time;
15use tokio::select;
16use tokio::sync::{broadcast, mpsc, oneshot};
17use tokio_util::sync::CancellationToken;
18use tracing::{debug, error, info, instrument, warn};
19
20use crate::block_ranges::{BlockRange, BlockRangeExt, BlockRanges};
21use crate::events::{EventPublisher, NodeEvent};
22use crate::node::subscriptions::BroadcastingStore;
23use crate::p2p::{P2p, P2pError};
24use crate::store::{Store, StoreError};
25use crate::utils::{FusedReusableFuture, OneshotSenderExt, TimeExt};
26
27type Result<T, E = SyncerError> = std::result::Result<T, E>;
28
29const TRY_INIT_BACKOFF_MAX_INTERVAL: Duration = Duration::from_secs(60);
30const SLOW_SYNC_MIN_THRESHOLD: u64 = 50;
31
32/// Representation of all the errors that can occur in `Syncer` component.
33#[derive(Debug, thiserror::Error)]
34pub enum SyncerError {
35    /// An error propagated from the `P2p` component.
36    #[error("P2p: {0}")]
37    P2p(#[from] P2pError),
38
39    /// An error propagated from the [`Store`] component.
40    #[error("Store: {0}")]
41    Store(#[from] StoreError),
42
43    /// The worker has died.
44    #[error("Worker died")]
45    WorkerDied,
46
47    /// Channel closed unexpectedly.
48    #[error("Channel closed unexpectedly")]
49    ChannelClosedUnexpectedly,
50}
51
52impl SyncerError {
53    pub(crate) fn is_fatal(&self) -> bool {
54        match self {
55            SyncerError::P2p(e) => e.is_fatal(),
56            SyncerError::Store(e) => e.is_fatal(),
57            SyncerError::WorkerDied | SyncerError::ChannelClosedUnexpectedly => true,
58        }
59    }
60}
61
62impl From<oneshot::error::RecvError> for SyncerError {
63    fn from(_value: oneshot::error::RecvError) -> Self {
64        SyncerError::ChannelClosedUnexpectedly
65    }
66}
67
68/// Component responsible for synchronizing block headers from the network.
69#[derive(Debug)]
70pub(crate) struct Syncer<S>
71where
72    S: Store + 'static,
73{
74    cmd_tx: mpsc::Sender<SyncerCmd>,
75    cancellation_token: CancellationToken,
76    join_handle: JoinHandle,
77    _store: PhantomData<S>,
78}
79
80/// Arguments used to configure the [`Syncer`].
81pub(crate) struct SyncerArgs<S>
82where
83    S: Store + 'static,
84{
85    /// Handler for the peer to peer messaging.
86    pub(crate) p2p: Arc<P2p>,
87    /// Headers storage.
88    pub(crate) store: Arc<S>,
89    /// Event publisher.
90    pub(crate) event_pub: EventPublisher,
91    /// Batch size.
92    pub(crate) batch_size: u64,
93    /// Sampling window
94    pub(crate) sampling_window: Duration,
95    /// Pruning window
96    pub(crate) pruning_window: Duration,
97}
98
99#[derive(Debug)]
100enum SyncerCmd {
101    GetInfo {
102        respond_to: oneshot::Sender<SyncingInfo>,
103    },
104    SubscribeHeights {
105        respond_to: oneshot::Sender<broadcast::Receiver<ExtendedHeader>>,
106    },
107    #[cfg(test)]
108    TriggerFetchNextBatch,
109}
110
111/// Status of the synchronization.
112#[derive(Debug, Serialize, Deserialize)]
113pub struct SyncingInfo {
114    /// Ranges of headers that are already synchronised
115    pub stored_headers: BlockRanges,
116    /// Syncing target. The latest height seen in the network that was successfully verified.
117    pub subjective_head: u64,
118}
119
120impl<S> Syncer<S>
121where
122    S: Store,
123{
124    /// Create and start the [`Syncer`].
125    pub(crate) fn start(args: SyncerArgs<S>) -> Result<Self> {
126        let cancellation_token = CancellationToken::new();
127        let event_pub = args.event_pub.clone();
128        let (cmd_tx, cmd_rx) = mpsc::channel(16);
129        let mut worker = Worker::new(args, cancellation_token.child_token(), cmd_rx)?;
130
131        let join_handle = spawn(async move {
132            if let Err(e) = worker.run().await {
133                error!("Syncer stopped because of a fatal error: {e}");
134
135                event_pub.send(NodeEvent::FatalSyncerError {
136                    error: e.to_string(),
137                });
138            }
139        });
140
141        Ok(Syncer {
142            cancellation_token,
143            cmd_tx,
144            join_handle,
145            _store: PhantomData,
146        })
147    }
148
149    /// Stop the worker.
150    pub(crate) fn stop(&self) {
151        // Signal the Worker to stop.
152        self.cancellation_token.cancel();
153    }
154
155    /// Wait until worker is completely stopped.
156    pub(crate) async fn join(&self) {
157        self.join_handle.join().await;
158    }
159
160    async fn send_command(&self, cmd: SyncerCmd) -> Result<()> {
161        self.cmd_tx
162            .send(cmd)
163            .await
164            .map_err(|_| SyncerError::WorkerDied)
165    }
166
167    /// Get the current synchronization status.
168    ///
169    /// # Errors
170    ///
171    /// This function will return an error if the [`Syncer`] has been stopped.
172    pub(crate) async fn info(&self) -> Result<SyncingInfo> {
173        let (tx, rx) = oneshot::channel();
174
175        self.send_command(SyncerCmd::GetInfo { respond_to: tx })
176            .await?;
177
178        Ok(rx.await?)
179    }
180
181    /// Subscribe to a broadcast of new header heights as they are being synchronised
182    ///
183    /// # Errors
184    ///
185    /// This function will return an error if the [`Syncer`] has been stopped.
186    pub(crate) async fn subscribe_headers(&self) -> Result<broadcast::Receiver<ExtendedHeader>> {
187        let (tx, rx) = oneshot::channel();
188
189        self.send_command(SyncerCmd::SubscribeHeights { respond_to: tx })
190            .await?;
191
192        Ok(rx.await?)
193    }
194
195    #[cfg(test)]
196    async fn trigger_fetch_next_batch(&self) -> Result<()> {
197        self.send_command(SyncerCmd::TriggerFetchNextBatch).await
198    }
199}
200
201impl<S> Drop for Syncer<S>
202where
203    S: Store,
204{
205    fn drop(&mut self) {
206        self.stop();
207    }
208}
209
210struct Worker<S>
211where
212    S: Store + 'static,
213{
214    cancellation_token: CancellationToken,
215    cmd_rx: mpsc::Receiver<SyncerCmd>,
216    event_pub: EventPublisher,
217    p2p: Arc<P2p>,
218    store: BroadcastingStore<S>,
219    header_sub_rx: Option<mpsc::Receiver<ExtendedHeader>>,
220    subjective_head_height: Option<u64>,
221    highest_slow_sync_height: Option<u64>,
222    batch_size: u64,
223    ongoing_batch: Ongoing,
224    sampling_window: Duration,
225    pruning_window: Duration,
226}
227
228struct Ongoing {
229    range: Option<BlockRange>,
230    task: FusedReusableFuture<(Result<Vec<ExtendedHeader>, P2pError>, Duration)>,
231}
232
233impl<S> Worker<S>
234where
235    S: Store,
236{
237    fn new(
238        args: SyncerArgs<S>,
239        cancellation_token: CancellationToken,
240        cmd_rx: mpsc::Receiver<SyncerCmd>,
241    ) -> Result<Self> {
242        Ok(Worker {
243            cancellation_token,
244            cmd_rx,
245            event_pub: args.event_pub,
246            p2p: args.p2p,
247            store: BroadcastingStore::new(args.store),
248            header_sub_rx: None,
249            subjective_head_height: None,
250            highest_slow_sync_height: None,
251            batch_size: args.batch_size,
252            ongoing_batch: Ongoing {
253                range: None,
254                task: FusedReusableFuture::terminated(),
255            },
256            sampling_window: args.sampling_window,
257            pruning_window: args.pruning_window,
258        })
259    }
260
261    async fn run(&mut self) -> Result<()> {
262        loop {
263            if self.cancellation_token.is_cancelled() {
264                break;
265            }
266
267            self.connecting_event_loop().await?;
268
269            if self.cancellation_token.is_cancelled() {
270                break;
271            }
272
273            self.connected_event_loop().await?;
274        }
275
276        debug!("Syncer stopped");
277        Ok(())
278    }
279
280    /// The responsibility of this event loop is to await a trusted peer to
281    /// connect and get the network head, while accepting commands.
282    ///
283    /// NOTE: Only fatal errors should be propagated!
284    async fn connecting_event_loop(&mut self) -> Result<()> {
285        debug!("Entering connecting_event_loop");
286
287        let mut report_interval = Interval::new(Duration::from_secs(60));
288        self.report().await?;
289
290        let mut try_init_fut = pin!(try_init_task(
291            self.p2p.clone(),
292            self.store.clone_inner_store(),
293            self.event_pub.clone()
294        ));
295
296        loop {
297            select! {
298                _ = self.cancellation_token.cancelled() => {
299                    break;
300                }
301                _ = report_interval.tick() => {
302                    self.report().await?;
303                }
304                res = &mut try_init_fut => {
305                    // `try_init_task` propagates only fatal errors
306                    let (network_head, took) = res?;
307                    let network_head_height = network_head.height();
308
309                    info!("Setting initial subjective head to {network_head_height}");
310                    self.set_subjective_head_height(network_head_height);
311                    self.store.init_broadcast(network_head.clone());
312
313                    let (header_sub_tx, header_sub_rx) = mpsc::channel(16);
314                    self.p2p.init_header_sub(network_head, header_sub_tx).await?;
315                    self.header_sub_rx = Some(header_sub_rx);
316
317                    self.event_pub.send(NodeEvent::FetchingHeadHeaderFinished {
318                        height: network_head_height,
319                        took,
320                    });
321
322                    break;
323                }
324                Some(cmd) = self.cmd_rx.recv() => {
325                    self.on_cmd(cmd).await?;
326                }
327            }
328        }
329
330        Ok(())
331    }
332
333    /// The responsibility of this event loop is to start the syncing process,
334    /// handles events from HeaderSub, and accept commands.
335    ///
336    /// NOTE: Only fatal errors should be propagated!
337    async fn connected_event_loop(&mut self) -> Result<()> {
338        debug!("Entering connected_event_loop");
339
340        let mut report_interval = Interval::new(Duration::from_secs(60));
341        let mut peer_tracker_info_watcher = self.p2p.peer_tracker_info_watcher();
342
343        // Check if connection status changed before creating the watcher
344        if peer_tracker_info_watcher.borrow().num_connected_peers == 0 {
345            warn!("All peers disconnected");
346            return Ok(());
347        }
348
349        self.fetch_next_batch().await?;
350        self.report().await?;
351
352        loop {
353            select! {
354                _ = self.cancellation_token.cancelled() => {
355                    break;
356                }
357                _ = peer_tracker_info_watcher.changed() => {
358                    if peer_tracker_info_watcher.borrow().num_connected_peers == 0 {
359                        warn!("All peers disconnected");
360                        break;
361                    }
362                }
363                _ = report_interval.tick() => {
364                    self.report().await?;
365                }
366                res = header_sub_recv(self.header_sub_rx.as_mut()) => {
367                    let header = res?;
368                    self.on_header_sub_message(header).await?;
369                    self.fetch_next_batch().await?;
370                }
371                Some(cmd) = self.cmd_rx.recv() => {
372                    self.on_cmd(cmd).await?;
373                }
374                (res, took) = &mut self.ongoing_batch.task => {
375                    self.on_fetch_next_batch_result(res, took).await?;
376                    self.fetch_next_batch().await?;
377                }
378            }
379        }
380
381        if let Some(ongoing) = self.ongoing_batch.range.take() {
382            warn!("Cancelling fetching of {}", ongoing.display());
383            self.ongoing_batch.task.terminate();
384        }
385
386        self.header_sub_rx.take();
387
388        Ok(())
389    }
390
391    async fn syncing_info(&self) -> Result<SyncingInfo> {
392        Ok(SyncingInfo {
393            stored_headers: self.store.get_stored_header_ranges().await?,
394            subjective_head: self.subjective_head_height.unwrap_or(0),
395        })
396    }
397
398    #[instrument(skip_all)]
399    async fn report(&mut self) -> Result<()> {
400        let SyncingInfo {
401            stored_headers,
402            subjective_head,
403        } = self.syncing_info().await?;
404
405        let ongoing_batch = self
406            .ongoing_batch
407            .range
408            .as_ref()
409            .map(|range| format!("{}", range.display()))
410            .unwrap_or_else(|| "None".to_string());
411
412        info!(
413            "syncing: head: {subjective_head}, stored headers: {stored_headers}, ongoing batches: {ongoing_batch}"
414        );
415        Ok(())
416    }
417
418    async fn on_cmd(&mut self, cmd: SyncerCmd) -> Result<()> {
419        match cmd {
420            SyncerCmd::GetInfo { respond_to } => {
421                let info = self.syncing_info().await?;
422                respond_to.maybe_send(info);
423            }
424            SyncerCmd::SubscribeHeights { respond_to } => {
425                let receiver = self.store.subscribe();
426                respond_to.maybe_send(receiver);
427            }
428            #[cfg(test)]
429            SyncerCmd::TriggerFetchNextBatch => {
430                self.fetch_next_batch().await?;
431            }
432        }
433
434        Ok(())
435    }
436
437    #[instrument(skip_all)]
438    async fn on_header_sub_message(&mut self, new_head: ExtendedHeader) -> Result<()> {
439        let new_head_height = new_head.height();
440
441        self.set_subjective_head_height(new_head_height);
442
443        if let Ok(store_head_height) = self.store.head_height().await {
444            // If our new header is adjacent to the HEAD of the store
445            if store_head_height + 1 == new_head_height {
446                // Header is already verified by HeaderSub and will be validated against previous
447                // head on insert
448                if self.store.announce_insert(vec![new_head]).await.is_ok() {
449                    self.event_pub.send(NodeEvent::AddedHeaderFromHeaderSub {
450                        height: new_head_height,
451                    });
452                }
453            }
454        }
455
456        Ok(())
457    }
458
459    fn set_subjective_head_height(&mut self, height: u64) {
460        if self
461            .subjective_head_height
462            .is_some_and(|old_height| height <= old_height)
463        {
464            return;
465        }
466
467        self.subjective_head_height = Some(height);
468    }
469
470    #[instrument(skip_all)]
471    async fn fetch_next_batch(&mut self) -> Result<()> {
472        debug_assert_eq!(
473            self.ongoing_batch.range.is_none(),
474            self.ongoing_batch.task.is_terminated()
475        );
476
477        if !self.ongoing_batch.task.is_terminated() {
478            // Another batch is ongoing. We do not parallelize `Syncer`
479            // by design. Any parallel requests are done in the
480            // HeaderEx client through `Session`.
481            //
482            // Nothing to schedule
483            return Ok(());
484        }
485
486        if self.p2p.peer_tracker_info().num_connected_peers == 0 {
487            // No connected peers. We can't do the request.
488            // We will recover from this in `run`.
489            return Ok(());
490        }
491
492        let Some(subjective_head_height) = self.subjective_head_height else {
493            // Nothing to schedule
494            return Ok(());
495        };
496
497        let store_ranges = self.store.get_stored_header_ranges().await?;
498        let pruned_ranges = self.store.get_pruned_ranges().await?;
499
500        // Pruner removes already sampled headers and creates gaps in the ranges.
501        // Syncer must ignore those gaps.
502        let synced_ranges = pruned_ranges + &store_ranges;
503
504        let next_batch = calculate_range_to_fetch(
505            subjective_head_height,
506            synced_ranges.as_ref(),
507            self.batch_size,
508        );
509
510        if next_batch.is_empty() {
511            // no headers to fetch
512            return Ok(());
513        }
514
515        // If all heights of next batch is within the slow sync range
516        if self
517            .highest_slow_sync_height
518            .is_some_and(|height| *next_batch.end() <= height)
519        {
520            // Threshold is the half of batch size but it should be at least 50.
521            let threshold = (self.batch_size / 2).max(SLOW_SYNC_MIN_THRESHOLD);
522
523            // Calculate how many headers are locally available for sampling.
524            let sampled_ranges = self.store.get_sampled_ranges().await?;
525            let available_for_sampling = (store_ranges - sampled_ranges).len();
526
527            // Do not fetch next batch if we have more headers than the threshold.
528            if available_for_sampling > threshold {
529                // NOTE: Recheck will happen when we receive a header from header sub (~6 secs).
530                return Ok(());
531            }
532        }
533
534        // make sure we're inside the sampling window before we start
535        match self.store.get_by_height(next_batch.end() + 1).await {
536            Ok(known_header) => {
537                if !self.in_sampling_window(&known_header) {
538                    return Ok(());
539                }
540            }
541            Err(StoreError::NotFound) => {}
542            Err(e) => return Err(e.into()),
543        }
544
545        self.event_pub.send(NodeEvent::FetchingHeadersStarted {
546            from_height: *next_batch.start(),
547            to_height: *next_batch.end(),
548        });
549
550        let p2p = self.p2p.clone();
551
552        self.ongoing_batch.range = Some(next_batch.clone());
553
554        self.ongoing_batch.task.set(async move {
555            let now = Instant::now();
556            let res = p2p.get_unverified_header_range(next_batch).await;
557            (res, now.elapsed())
558        });
559
560        Ok(())
561    }
562
563    /// Handle the result of the batch request and propagate fatal errors.
564    #[instrument(skip_all)]
565    async fn on_fetch_next_batch_result(
566        &mut self,
567        res: Result<Vec<ExtendedHeader>, P2pError>,
568        took: Duration,
569    ) -> Result<()> {
570        let range = self
571            .ongoing_batch
572            .range
573            .take()
574            .expect("ongoing_batch not initialized correctly");
575
576        let from_height = *range.start();
577        let to_height = *range.end();
578
579        let headers = match res {
580            Ok(headers) => headers,
581            Err(e) => {
582                if e.is_fatal() {
583                    return Err(e.into());
584                }
585
586                self.event_pub.send(NodeEvent::FetchingHeadersFailed {
587                    from_height,
588                    to_height,
589                    error: e.to_string(),
590                    took,
591                });
592
593                return Ok(());
594            }
595        };
596
597        let pruning_cutoff = Time::now().saturating_sub(self.pruning_window);
598
599        // Iterate headers from highest to lowest and check if there is
600        // a new highest "slow sync" height.
601        for header in headers.iter().rev() {
602            if self
603                .highest_slow_sync_height
604                .is_some_and(|height| header.height() <= height)
605            {
606                // `highest_slow_sync_height` is already higher than `header`
607                // so we don't need to check anything else.
608                break;
609            }
610
611            // If `header` is after the pruning edge, we mark it as the
612            // `highest_slow_sync_height` and we stop checking lower headers.
613            if header.time() <= pruning_cutoff {
614                self.highest_slow_sync_height = Some(header.height());
615                break;
616            }
617        }
618
619        if let Err(e) = self.store.announce_insert(headers).await {
620            if e.is_fatal() {
621                return Err(e.into());
622            }
623
624            self.event_pub.send(NodeEvent::FetchingHeadersFailed {
625                from_height,
626                to_height,
627                error: format!("Failed to store headers: {e}"),
628                took,
629            });
630        }
631
632        self.event_pub.send(NodeEvent::FetchingHeadersFinished {
633            from_height,
634            to_height,
635            took,
636        });
637
638        Ok(())
639    }
640
641    fn in_sampling_window(&self, header: &ExtendedHeader) -> bool {
642        let sampling_window_end = Time::now().saturating_sub(self.sampling_window);
643        header.time().after(sampling_window_end)
644    }
645}
646
647/// based on the synced headers and current network head height, calculate range of headers that
648/// should be fetched from the network, anchored on already synced header range
649fn calculate_range_to_fetch(
650    subjective_head_height: u64,
651    synced_headers: &[BlockRange],
652    limit: u64,
653) -> BlockRange {
654    let mut synced_headers_iter = synced_headers.iter().rev();
655
656    let Some(synced_head_range) = synced_headers_iter.next() else {
657        // empty synced ranges, we're missing everything
658        let range = 1..=subjective_head_height;
659        return range.tailn(limit);
660    };
661
662    if synced_head_range.end() < &subjective_head_height {
663        // if we haven't caught up with the network head, start from there
664        let range = synced_head_range.end() + 1..=subjective_head_height;
665        return range.tailn(limit);
666    }
667
668    // there exists a range contiguous with network head. inspect previous range end
669    let penultimate_range_end = synced_headers_iter.next().map(|r| *r.end()).unwrap_or(0);
670
671    let range = penultimate_range_end + 1..=synced_head_range.start().saturating_sub(1);
672    range.headn(limit)
673}
674
675#[instrument(skip_all)]
676async fn try_init_task<S>(
677    p2p: Arc<P2p>,
678    store: Arc<S>,
679    event_pub: EventPublisher,
680) -> Result<(ExtendedHeader, Duration)>
681where
682    S: Store + 'static,
683{
684    let now = Instant::now();
685    let mut event_reported = false;
686    let mut backoff = ExponentialBackoffBuilder::default()
687        .with_max_interval(TRY_INIT_BACKOFF_MAX_INTERVAL)
688        .with_max_elapsed_time(None)
689        .build();
690
691    loop {
692        match try_init(&p2p, &*store, &event_pub, &mut event_reported).await {
693            Ok(network_head) => {
694                return Ok((network_head, now.elapsed()));
695            }
696            Err(e) if e.is_fatal() => {
697                return Err(e);
698            }
699            Err(e) => {
700                let sleep_dur = backoff
701                    .next_backoff()
702                    .expect("backoff never stops retrying");
703
704                warn!(
705                    "Initialization of subjective head failed: {e}. Trying again in {sleep_dur:?}."
706                );
707                sleep(sleep_dur).await;
708            }
709        }
710    }
711}
712
713async fn try_init<S>(
714    p2p: &P2p,
715    store: &S,
716    event_pub: &EventPublisher,
717    event_reported: &mut bool,
718) -> Result<ExtendedHeader>
719where
720    S: Store,
721{
722    p2p.wait_connected_trusted().await?;
723
724    // Wait a bit to increase the possibility of multiple
725    // trusted nodes to be connected. This benefits the
726    // "best head" algorithm of HeaderEx. In test cases
727    // this is not needed.
728    #[cfg(not(test))]
729    sleep(Duration::from_secs(1)).await;
730
731    if !*event_reported {
732        event_pub.send(NodeEvent::FetchingHeadHeaderStarted);
733        *event_reported = true;
734    }
735
736    let network_head = p2p.get_head_header().await?;
737
738    // If the network head and the store head have the same height,
739    // then `insert` will error because of insertion constraints.
740    // However, if both headers are the exactly the same, we
741    // can skip inserting, as the header is already there.
742    //
743    // This can happen in case of fast node restart.
744    let try_insert = match store.get_head().await {
745        // `ExtendedHeader.commit.signatures` can be different set on each fetch
746        // so we compare only hashes.
747        Ok(store_head) => store_head.hash() != network_head.hash(),
748        Err(StoreError::NotFound) => true,
749        Err(e) => return Err(e.into()),
750    };
751
752    if try_insert {
753        // Insert HEAD to the store and initialize header-sub.
754        // Normal insertion checks still apply here.
755        store.insert(network_head.clone()).await?;
756    }
757
758    Ok(network_head)
759}
760
761async fn header_sub_recv(
762    rx: Option<&mut mpsc::Receiver<ExtendedHeader>>,
763) -> Result<ExtendedHeader> {
764    rx.expect("header-sub not initialized")
765        .recv()
766        .await
767        .ok_or(SyncerError::P2p(P2pError::WorkerDied))
768}
769
770#[cfg(test)]
771mod tests {
772    use std::ops::RangeInclusive;
773
774    use super::*;
775    use crate::block_ranges::{BlockRange, BlockRangeExt};
776    use crate::events::EventChannel;
777    use crate::node::{DEFAULT_PRUNING_WINDOW, HeaderExError, SAMPLING_WINDOW};
778    use crate::p2p::header_session;
779    use crate::store::InMemoryStore;
780    use crate::test_utils::{MockP2pHandle, gen_filled_store};
781    use crate::utils::OneshotResultSenderExt;
782    use celestia_types::test_utils::ExtendedHeaderGenerator;
783    use libp2p::request_response::OutboundFailure;
784    use lumina_utils::test_utils::async_test;
785    use tokio::sync::broadcast::error::RecvError;
786
787    #[test]
788    fn calculate_range_to_fetch_test_header_limit() {
789        let head_height = 1024;
790        let ranges = [256..=512];
791
792        let fetch_range = calculate_range_to_fetch(head_height, &ranges, 16);
793        assert_eq!(fetch_range, 513..=528);
794
795        let fetch_range = calculate_range_to_fetch(head_height, &ranges, 511);
796        assert_eq!(fetch_range, 513..=1023);
797        let fetch_range = calculate_range_to_fetch(head_height, &ranges, 512);
798        assert_eq!(fetch_range, 513..=1024);
799        let fetch_range = calculate_range_to_fetch(head_height, &ranges, 513);
800        assert_eq!(fetch_range, 513..=1024);
801
802        let fetch_range = calculate_range_to_fetch(head_height, &ranges, 1024);
803        assert_eq!(fetch_range, 513..=1024);
804    }
805
806    #[test]
807    fn calculate_range_to_fetch_empty_store() {
808        let fetch_range = calculate_range_to_fetch(1, &[], 100);
809        assert_eq!(fetch_range, 1..=1);
810
811        let fetch_range = calculate_range_to_fetch(100, &[], 10);
812        assert_eq!(fetch_range, 1..=10);
813
814        let fetch_range = calculate_range_to_fetch(100, &[], 50);
815        assert_eq!(fetch_range, 1..=50);
816    }
817
818    #[test]
819    fn calculate_range_to_fetch_fully_synced() {
820        let fetch_range = calculate_range_to_fetch(1, &[1..=1], 100);
821        assert!(fetch_range.is_empty());
822
823        let fetch_range = calculate_range_to_fetch(100, &[1..=100], 10);
824        assert!(fetch_range.is_empty());
825
826        let fetch_range = calculate_range_to_fetch(100, &[1..=100], 10);
827        assert!(fetch_range.is_empty());
828    }
829
830    #[test]
831    fn calculate_range_to_fetch_caught_up() {
832        let head_height = 4000;
833
834        let fetch_range = calculate_range_to_fetch(head_height, &[3000..=4000], 500);
835        assert_eq!(fetch_range, 2500..=2999);
836        let fetch_range = calculate_range_to_fetch(head_height, &[500..=1000, 3000..=4000], 500);
837        assert_eq!(fetch_range, 2500..=2999);
838        let fetch_range = calculate_range_to_fetch(head_height, &[2500..=2800, 3000..=4000], 500);
839        assert_eq!(fetch_range, 2801..=2999);
840        let fetch_range = calculate_range_to_fetch(head_height, &[2500..=2800, 3000..=4000], 500);
841        assert_eq!(fetch_range, 2801..=2999);
842        let fetch_range = calculate_range_to_fetch(head_height, &[300..=4000], 500);
843        assert_eq!(fetch_range, 1..=299);
844    }
845
846    #[test]
847    fn calculate_range_to_fetch_catching_up() {
848        let head_height = 4000;
849
850        let fetch_range = calculate_range_to_fetch(head_height, &[2000..=3000], 500);
851        assert_eq!(fetch_range, 3001..=3500);
852        let fetch_range = calculate_range_to_fetch(head_height, &[2000..=3500], 500);
853        assert_eq!(fetch_range, 3501..=4000);
854        let fetch_range = calculate_range_to_fetch(head_height, &[1..=2998, 3000..=3800], 500);
855        assert_eq!(fetch_range, 3801..=4000);
856    }
857
858    #[async_test]
859    async fn init_without_genesis_hash() {
860        let events = EventChannel::new();
861        let (mock, mut handle) = P2p::mocked();
862        let mut generator = ExtendedHeaderGenerator::new();
863        let header = generator.next();
864
865        let _syncer = Syncer::start(SyncerArgs {
866            p2p: Arc::new(mock),
867            store: Arc::new(InMemoryStore::new()),
868            event_pub: events.publisher(),
869            batch_size: 512,
870            sampling_window: SAMPLING_WINDOW,
871            pruning_window: DEFAULT_PRUNING_WINDOW,
872        })
873        .unwrap();
874
875        // Syncer is waiting for a trusted peer to connect
876        handle.expect_no_cmd().await;
877        handle.announce_peer_connected();
878        handle.expect_no_cmd().await;
879        handle.announce_trusted_peer_connected();
880
881        // We're syncing from the front, so ask for head first
882        let (height, amount, respond_to) = handle.expect_header_request_for_height_cmd().await;
883        assert_eq!(height, 0);
884        assert_eq!(amount, 1);
885        respond_to.send(Ok(vec![header.clone()])).unwrap();
886
887        // Now Syncer initializes HeaderSub with the latest HEAD
888        let head_from_syncer = handle.expect_init_header_sub().await;
889        assert_eq!(head_from_syncer, header);
890
891        // network head = local head, so nothing else is produced.
892        handle.expect_no_cmd().await;
893    }
894
895    #[async_test]
896    async fn init_with_genesis_hash() {
897        let mut generator = ExtendedHeaderGenerator::new();
898        let head = generator.next();
899
900        let (_syncer, _store, mut p2p_mock) = initialized_syncer(head.clone()).await;
901
902        // network head = local head, so nothing else is produced.
903        p2p_mock.expect_no_cmd().await;
904    }
905
906    #[async_test]
907    async fn syncing() {
908        let mut generator = ExtendedHeaderGenerator::new();
909        let headers = generator.next_many(1500);
910
911        let (syncer, store, mut p2p_mock) = initialized_syncer(headers[1499].clone()).await;
912        assert_syncing(&syncer, &store, &[1500..=1500], 1500).await;
913
914        // Syncer is syncing backwards from the network head (batch 1)
915        handle_session_batch(&mut p2p_mock, &headers, 988..=1499, true).await;
916        assert_syncing(&syncer, &store, &[988..=1500], 1500).await;
917
918        // Syncer is syncing backwards from the network head (batch 2)
919        handle_session_batch(&mut p2p_mock, &headers, 476..=987, true).await;
920        assert_syncing(&syncer, &store, &[476..=1500], 1500).await;
921
922        // New HEAD was received by HeaderSub (height 1501)
923        let header1501 = generator.next();
924        p2p_mock.announce_new_head(header1501.clone());
925        // Height 1501 is adjacent to the last header of Store, so it is appended
926        // immediately
927        assert_syncing(&syncer, &store, &[476..=1501], 1501).await;
928
929        // Syncer is syncing backwards from the network head (batch 3, partial)
930        handle_session_batch(&mut p2p_mock, &headers, 1..=475, true).await;
931        assert_syncing(&syncer, &store, &[1..=1501], 1501).await;
932
933        // Syncer is fulling synced and awaiting for events
934        p2p_mock.expect_no_cmd().await;
935
936        // New HEAD was received by HeaderSub (height 1502), it should be appended immediately
937        let header1502 = generator.next();
938        p2p_mock.announce_new_head(header1502.clone());
939        assert_syncing(&syncer, &store, &[1..=1502], 1502).await;
940        p2p_mock.expect_no_cmd().await;
941
942        // New HEAD was received by HeaderSub (height 1505), it should NOT be appended
943        let headers_1503_1505 = generator.next_many(3);
944        p2p_mock.announce_new_head(headers_1503_1505[2].clone());
945        assert_syncing(&syncer, &store, &[1..=1502], 1505).await;
946
947        // New HEAD is not adjacent to store, so Syncer requests a range
948        handle_session_batch(&mut p2p_mock, &headers_1503_1505, 1503..=1505, true).await;
949        assert_syncing(&syncer, &store, &[1..=1505], 1505).await;
950
951        // New HEAD was received by HeaderSub (height 3000), it should NOT be appended
952        let mut headers = generator.next_many(1495);
953        p2p_mock.announce_new_head(headers[1494].clone());
954        assert_syncing(&syncer, &store, &[1..=1505], 3000).await;
955
956        // Syncer is syncing forwards, anchored on a range already in store (batch 1)
957        handle_session_batch(&mut p2p_mock, &headers, 1506..=2017, true).await;
958        assert_syncing(&syncer, &store, &[1..=2017], 3000).await;
959
960        // New head from header sub added, should NOT be appended
961        headers.push(generator.next());
962        p2p_mock.announce_new_head(headers.last().unwrap().clone());
963        assert_syncing(&syncer, &store, &[1..=2017], 3001).await;
964
965        // Syncer continues syncing forwards (batch 2)
966        handle_session_batch(&mut p2p_mock, &headers, 2018..=2529, true).await;
967        assert_syncing(&syncer, &store, &[1..=2529], 3001).await;
968
969        // Syncer continues syncing forwards, should include the new head received via HeaderSub (batch 3)
970        handle_session_batch(&mut p2p_mock, &headers, 2530..=3001, true).await;
971        assert_syncing(&syncer, &store, &[1..=3001], 3001).await;
972
973        // Syncer is fulling synced and awaiting for events
974        p2p_mock.expect_no_cmd().await;
975    }
976
977    #[async_test]
978    async fn slow_sync() {
979        let pruning_window = Duration::from_secs(600);
980        let sampling_window = SAMPLING_WINDOW;
981
982        let mut generator = ExtendedHeaderGenerator::new();
983
984        // Generate back in time 4 batches of 512 messages (= 2048 headers).
985        let first_header_time = (Time::now() - Duration::from_secs(2048)).unwrap();
986        generator.set_time(first_header_time, Duration::from_secs(1));
987        let headers = generator.next_many(2048);
988
989        let (syncer, store, mut p2p_mock) =
990            initialized_syncer_with_windows(headers[2047].clone(), sampling_window, pruning_window)
991                .await;
992        assert_syncing(&syncer, &store, &[2048..=2048], 2048).await;
993
994        // The whole 1st batch does not reach the pruning edge.
995        handle_session_batch(&mut p2p_mock, &headers, 1536..=2047, true).await;
996        assert_syncing(&syncer, &store, &[1536..=2048], 2048).await;
997
998        // 2nd batch is partially within pruning window, so fast sync applies.
999        handle_session_batch(&mut p2p_mock, &headers, 1024..=1535, true).await;
1000        assert_syncing(&syncer, &store, &[1024..=2048], 2048).await;
1001
1002        // The whole 3rd batch is beyond pruning edge. So now slow sync applies.
1003        // In order for the Syncer to send requests, more than a half of the previous
1004        // batch needs to be sampled.
1005        syncer.trigger_fetch_next_batch().await.unwrap();
1006        p2p_mock.expect_no_cmd().await;
1007
1008        // After marking the 1st and more than of the 2nd batch, Syncer
1009        // will finally request the 3rd batch.
1010        for height in 1250..=2048 {
1011            // Simulate Daser
1012            store.mark_as_sampled(height).await.unwrap();
1013        }
1014        for height in 1300..=1450 {
1015            // Simulate Pruner
1016            store.remove_height(height).await.unwrap();
1017        }
1018        syncer.trigger_fetch_next_batch().await.unwrap();
1019        // Syncer should not request the pruned range
1020        handle_session_batch(&mut p2p_mock, &headers, 512..=1023, true).await;
1021        assert_syncing(&syncer, &store, &[512..=1299, 1451..=2048], 2048).await;
1022
1023        // Syncer should not request the 4th batch since there are plenty
1024        // of blocks to be sampled.
1025        syncer.trigger_fetch_next_batch().await.unwrap();
1026        p2p_mock.expect_no_cmd().await;
1027    }
1028
1029    #[async_test]
1030    async fn window_edge() {
1031        let month_and_day_ago = Duration::from_secs(31 * 24 * 60 * 60);
1032        let mut generator = ExtendedHeaderGenerator::new();
1033        generator.set_time(
1034            (Time::now() - month_and_day_ago).expect("to not underflow"),
1035            Duration::from_secs(1),
1036        );
1037        let mut headers = generator.next_many(1200);
1038        generator.reset_time();
1039        headers.append(&mut generator.next_many(2049 - 1200));
1040
1041        let (syncer, store, mut p2p_mock) = initialized_syncer(headers[2048].clone()).await;
1042        assert_syncing(&syncer, &store, &[2049..=2049], 2049).await;
1043
1044        // Syncer requested the first batch
1045        handle_session_batch(&mut p2p_mock, &headers, 1537..=2048, true).await;
1046        assert_syncing(&syncer, &store, &[1537..=2049], 2049).await;
1047
1048        // Syncer requested the second batch hitting the sampling window
1049        handle_session_batch(&mut p2p_mock, &headers, 1025..=1536, true).await;
1050        assert_syncing(&syncer, &store, &[1025..=2049], 2049).await;
1051
1052        // Syncer is fully synced and awaiting for events
1053        p2p_mock.expect_no_cmd().await;
1054    }
1055
1056    #[async_test]
1057    async fn start_with_filled_store() {
1058        let events = EventChannel::new();
1059        let (p2p, mut p2p_mock) = P2p::mocked();
1060        let (store, mut generator) = gen_filled_store(25).await;
1061        let store = Arc::new(store);
1062
1063        let mut headers = generator.next_many(520);
1064        let network_head = generator.next(); // height 546
1065
1066        let syncer = Syncer::start(SyncerArgs {
1067            p2p: Arc::new(p2p),
1068            store: store.clone(),
1069            event_pub: events.publisher(),
1070            batch_size: 512,
1071            sampling_window: SAMPLING_WINDOW,
1072            pruning_window: DEFAULT_PRUNING_WINDOW,
1073        })
1074        .unwrap();
1075
1076        p2p_mock.announce_trusted_peer_connected();
1077
1078        // Syncer asks for current HEAD
1079        let (height, amount, respond_to) = p2p_mock.expect_header_request_for_height_cmd().await;
1080        assert_eq!(height, 0);
1081        assert_eq!(amount, 1);
1082        respond_to.send(Ok(vec![network_head.clone()])).unwrap();
1083
1084        // Now Syncer initializes HeaderSub with the latest HEAD
1085        let head_from_syncer = p2p_mock.expect_init_header_sub().await;
1086        assert_eq!(head_from_syncer, network_head);
1087
1088        assert_syncing(&syncer, &store, &[1..=25, 546..=546], 546).await;
1089
1090        // Syncer requested the first batch
1091        handle_session_batch(&mut p2p_mock, &headers, 34..=545, true).await;
1092        assert_syncing(&syncer, &store, &[1..=25, 34..=546], 546).await;
1093
1094        // Syncer requested the remaining batch ([26, 33])
1095        let (height, amount, respond_to) = p2p_mock.expect_header_request_for_height_cmd().await;
1096        assert_eq!(height, 26);
1097        assert_eq!(amount, 8);
1098        respond_to
1099            .send(Ok(headers.drain(..8).collect()))
1100            .map_err(|_| "headers [538, 545]")
1101            .unwrap();
1102        assert_syncing(&syncer, &store, &[1..=546], 546).await;
1103
1104        // Syncer is fulling synced and awaiting for events
1105        p2p_mock.expect_no_cmd().await;
1106    }
1107
1108    #[async_test]
1109    async fn stop_syncer() {
1110        let mut generator = ExtendedHeaderGenerator::new();
1111        let head = generator.next();
1112
1113        let (syncer, _store, mut p2p_mock) = initialized_syncer(head.clone()).await;
1114
1115        // network head height == 1, so nothing else is produced.
1116        p2p_mock.expect_no_cmd().await;
1117
1118        syncer.stop();
1119        // Wait for Worker to stop
1120        sleep(Duration::from_millis(1)).await;
1121        assert!(matches!(
1122            syncer.info().await.unwrap_err(),
1123            SyncerError::WorkerDied
1124        ));
1125    }
1126
1127    #[async_test]
1128    async fn all_peers_disconnected() {
1129        let mut generator = ExtendedHeaderGenerator::new();
1130
1131        let _gap = generator.next_many(24);
1132        let header25 = generator.next();
1133        let _gap = generator.next_many(4);
1134        let header30 = generator.next();
1135        let _gap = generator.next_many(4);
1136        let header35 = generator.next();
1137
1138        // Start Syncer and report height 30 as HEAD
1139        let (syncer, store, mut p2p_mock) = initialized_syncer(header30).await;
1140
1141        // Wait for the request but do not reply to it.
1142        handle_session_batch(&mut p2p_mock, &[], 1..=29, false).await;
1143
1144        p2p_mock.announce_all_peers_disconnected();
1145        // Syncer is now back to `connecting_event_loop`.
1146        p2p_mock.expect_no_cmd().await;
1147
1148        // Announce a non-trusted peer. Syncer in `connecting_event_loop` can progress only
1149        // if a trusted peer is connected.
1150        p2p_mock.announce_peer_connected();
1151        p2p_mock.expect_no_cmd().await;
1152
1153        // Announce a trusted peer.
1154        p2p_mock.announce_trusted_peer_connected();
1155
1156        // Now Syncer will send request for HEAD.
1157        let (height, amount, respond_to) = p2p_mock.expect_header_request_for_height_cmd().await;
1158        assert_eq!(height, 0);
1159        assert_eq!(amount, 1);
1160
1161        // Report an older head. Syncer should not accept it.
1162        respond_to.send(Ok(vec![header25])).unwrap();
1163        assert_syncing(&syncer, &store, &[30..=30], 30).await;
1164
1165        // Syncer will request HEAD again after some time.
1166        sleep(Duration::from_secs(1)).await;
1167        let (height, amount, respond_to) = p2p_mock.expect_header_request_for_height_cmd().await;
1168        assert_eq!(height, 0);
1169        assert_eq!(amount, 1);
1170
1171        // Report newer HEAD than before.
1172        respond_to.send(Ok(vec![header35.clone()])).unwrap();
1173        assert_syncing(&syncer, &store, &[30..=30, 35..=35], 35).await;
1174
1175        // Syncer initializes HeaderSub with the latest HEAD.
1176        let head_from_syncer = p2p_mock.expect_init_header_sub().await;
1177        assert_eq!(head_from_syncer, header35);
1178
1179        // Syncer now is in `connected_event_loop` and will try to sync the gap
1180        // that is closer to HEAD.
1181        let (height, amount, _respond_to) = p2p_mock.expect_header_request_for_height_cmd().await;
1182        assert_eq!(height, 31);
1183        assert_eq!(amount, 4);
1184
1185        p2p_mock.announce_all_peers_disconnected();
1186        p2p_mock.expect_no_cmd().await;
1187    }
1188
1189    #[async_test]
1190    async fn all_peers_disconnected_and_no_network_head_progress() {
1191        let mut generator = ExtendedHeaderGenerator::new_from_height(30);
1192
1193        let header30 = generator.next();
1194
1195        // Start Syncer and report height 30 as HEAD
1196        let (syncer, store, mut p2p_mock) = initialized_syncer(header30.clone()).await;
1197
1198        // Wait for the request but do not reply to it.
1199        handle_session_batch(&mut p2p_mock, &[], 1..=29, false).await;
1200
1201        p2p_mock.announce_all_peers_disconnected();
1202        // Syncer is now back to `connecting_event_loop`.
1203        p2p_mock.expect_no_cmd().await;
1204
1205        // Announce a non-trusted peer. Syncer in `connecting_event_loop` can progress only
1206        // if a trusted peer is connected.
1207        p2p_mock.announce_peer_connected();
1208        p2p_mock.expect_no_cmd().await;
1209
1210        // Announce a trusted peer.
1211        p2p_mock.announce_trusted_peer_connected();
1212
1213        // Now Syncer will send request for HEAD.
1214        let (height, amount, respond_to) = p2p_mock.expect_header_request_for_height_cmd().await;
1215        assert_eq!(height, 0);
1216        assert_eq!(amount, 1);
1217
1218        // Report the same HEAD as before.
1219        respond_to.send(Ok(vec![header30.clone()])).unwrap();
1220        assert_syncing(&syncer, &store, &[30..=30], 30).await;
1221
1222        // Syncer initializes HeaderSub with the latest HEAD.
1223        let head_from_syncer = p2p_mock.expect_init_header_sub().await;
1224        assert_eq!(head_from_syncer, header30);
1225
1226        // Syncer now is in `connected_event_loop` and will try to sync the gap
1227        handle_session_batch(&mut p2p_mock, &[], 1..=29, false).await;
1228
1229        p2p_mock.announce_all_peers_disconnected();
1230        p2p_mock.expect_no_cmd().await;
1231    }
1232
1233    #[async_test]
1234    async fn non_contiguous_response() {
1235        let mut generator = ExtendedHeaderGenerator::new();
1236        let mut headers = generator.next_many(20);
1237
1238        // Start Syncer and report last header as network head
1239        let (syncer, store, mut p2p_mock) = initialized_syncer(headers[19].clone()).await;
1240
1241        let header10 = headers[10].clone();
1242        // make a gap in response, preserving the amount of headers returned
1243        headers[10] = headers[11].clone();
1244
1245        // Syncer requests missing headers
1246        handle_session_batch(&mut p2p_mock, &headers, 1..=19, true).await;
1247
1248        // Syncer should not apply headers from invalid response
1249        assert_syncing(&syncer, &store, &[20..=20], 20).await;
1250
1251        // correct the response
1252        headers[10] = header10;
1253
1254        // Syncer requests missing headers again
1255        handle_session_batch(&mut p2p_mock, &headers, 1..=19, true).await;
1256
1257        // With a correct response, Syncer should update the store
1258        assert_syncing(&syncer, &store, &[1..=20], 20).await;
1259    }
1260
1261    #[async_test]
1262    async fn another_chain_response() {
1263        let headers = ExtendedHeaderGenerator::new().next_many(20);
1264        let headers_prime = ExtendedHeaderGenerator::new().next_many(20);
1265
1266        // Start Syncer and report last header as network head
1267        let (syncer, store, mut p2p_mock) = initialized_syncer(headers[19].clone()).await;
1268
1269        // Syncer requests missing headers
1270        handle_session_batch(&mut p2p_mock, &headers_prime, 1..=19, true).await;
1271
1272        // Syncer should not apply headers from invalid response
1273        assert_syncing(&syncer, &store, &[20..=20], 20).await;
1274
1275        // Syncer requests missing headers again
1276        handle_session_batch(&mut p2p_mock, &headers, 1..=19, true).await;
1277
1278        // With a correct response, Syncer should update the store
1279        assert_syncing(&syncer, &store, &[1..=20], 20).await;
1280    }
1281
1282    #[async_test]
1283    async fn height_sequence_smoke() {
1284        let mut generator = ExtendedHeaderGenerator::new();
1285        let headers = generator.next_many(20);
1286        let (syncer, store, mut p2p_mock) = initialized_syncer(headers[9].clone()).await;
1287
1288        let mut header_receiver = syncer.subscribe_headers().await.unwrap();
1289        let (tx, mut received_heights) = mpsc::unbounded_channel();
1290        spawn(async move {
1291            while let Ok(header) = header_receiver.recv().await {
1292                tx.send(header.height()).unwrap();
1293            }
1294        });
1295
1296        // header sub receiving header at height 11 (adjacent, should be added immediately)
1297        p2p_mock.announce_new_head(headers[10].clone());
1298        assert_syncing(&syncer, &store, &[10..=11], 11).await;
1299        assert_eq!(received_heights.recv().await.unwrap(), 11);
1300
1301        // header sub receiving header at height 15 (gap, should NOT be added immediately)
1302        p2p_mock.announce_new_head(headers[14].clone());
1303        assert_syncing(&syncer, &store, &[10..=11], 15).await;
1304        received_heights.try_recv().unwrap_err();
1305
1306        // Syncer requests past header range, should be ignored
1307        handle_session_batch(&mut p2p_mock, &headers, 2..=9, true).await;
1308        handle_session_batch(&mut p2p_mock, &headers, 1..=1, true).await;
1309        received_heights.try_recv().unwrap_err();
1310
1311        // Syncer should request the gap [12, 15]
1312        handle_session_batch(&mut p2p_mock, &headers, 12..=15, true).await;
1313        assert_syncing(&syncer, &store, &[1..=15], 15).await;
1314
1315        for i in 12..=15 {
1316            let height = received_heights.recv().await.unwrap();
1317            assert_eq!(height, i);
1318        }
1319
1320        // header sub receives header at height 19 (gap)
1321        p2p_mock.announce_new_head(headers[18].clone());
1322        assert_syncing(&syncer, &store, &[1..=15], 19).await;
1323        received_heights.try_recv().unwrap_err();
1324
1325        // Syncer should request the gap [16, 18]
1326        handle_session_batch(&mut p2p_mock, &headers, 16..=19, true).await;
1327        assert_syncing(&syncer, &store, &[1..=19], 19).await;
1328
1329        for i in 16..=19 {
1330            let height = received_heights.recv().await.unwrap();
1331            assert_eq!(height, i);
1332        }
1333    }
1334
1335    #[async_test]
1336    async fn height_sequence_handles_disconnect() {
1337        let mut generator = ExtendedHeaderGenerator::new();
1338        let headers = generator.next_many(20);
1339        let (syncer, store, mut p2p_mock) = initialized_syncer(headers[0].clone()).await;
1340        assert_syncing(&syncer, &store, &[1..=1], 1).await;
1341
1342        let mut header_receiver = syncer.subscribe_headers().await.unwrap();
1343        let (disconnect_tx, mut disconnect_signal) = oneshot::channel();
1344        let (tx, mut received_headers) = mpsc::unbounded_channel();
1345        spawn(async move {
1346            loop {
1347                // gather all the headers received, marking the moment disconnection
1348                // happened with a None.
1349                select! {
1350                    header = header_receiver.recv() => {
1351                        let header = match header {
1352                            Err(RecvError::Closed) => break,
1353                            header_or_error => header_or_error.unwrap()
1354                        };
1355                        tx.send(Some(header)).unwrap();
1356                    },
1357                    _ = &mut disconnect_signal, if !disconnect_signal.is_terminated() => {
1358                        tx.send(None).unwrap();
1359                    }
1360                };
1361            }
1362        });
1363
1364        // header sub receiving header at height 2
1365        p2p_mock.announce_new_head(headers[1].clone());
1366        assert_syncing(&syncer, &store, &[1..=2], 2).await;
1367
1368        // disconnect and reconnect
1369        p2p_mock.announce_all_peers_disconnected();
1370        p2p_mock.expect_no_cmd().await;
1371        p2p_mock.announce_trusted_peer_connected();
1372
1373        disconnect_tx.send(()).unwrap();
1374
1375        // Now syncer will send request for HEAD.
1376        let (height, amount, respond_to) = p2p_mock.expect_header_request_for_height_cmd().await;
1377        assert_eq!((height, amount), (0, 1));
1378
1379        // Network progressed while we were disconnected
1380        respond_to.send(Ok(vec![headers[9].clone()])).unwrap();
1381        let _ = p2p_mock.expect_init_header_sub().await;
1382        assert_syncing(&syncer, &store, &[1..=2, 10..=10], 10).await;
1383
1384        handle_session_batch(&mut p2p_mock, &headers, 3..=9, true).await;
1385        assert_syncing(&syncer, &store, &[1..=10], 10).await;
1386
1387        // Header sub receives header at height 11
1388        p2p_mock.announce_new_head(headers[10].clone());
1389        assert_syncing(&syncer, &store, &[1..=11], 11).await;
1390
1391        assert_eq!(
1392            received_headers.recv().await.unwrap().map(|h| h.height()),
1393            Some(2)
1394        );
1395        assert_eq!(received_headers.recv().await.unwrap(), None);
1396        for i in 3..=11 {
1397            let header = received_headers.recv().await.unwrap().unwrap();
1398            assert_eq!(header.height(), i);
1399        }
1400    }
1401
1402    async fn assert_syncing(
1403        syncer: &Syncer<InMemoryStore>,
1404        store: &InMemoryStore,
1405        expected_synced_ranges: &[RangeInclusive<u64>],
1406        expected_subjective_head: u64,
1407    ) {
1408        // Syncer receives responds on the same loop that receive other events.
1409        // Wait a bit to be processed.
1410        sleep(Duration::from_millis(1)).await;
1411
1412        let store_ranges = store.get_stored_header_ranges().await.unwrap();
1413        let syncing_info = syncer.info().await.unwrap();
1414
1415        assert_eq!(store_ranges.as_ref(), expected_synced_ranges);
1416        assert_eq!(syncing_info.stored_headers.as_ref(), expected_synced_ranges);
1417        assert_eq!(syncing_info.subjective_head, expected_subjective_head);
1418    }
1419
1420    async fn initialized_syncer(
1421        head: ExtendedHeader,
1422    ) -> (Syncer<InMemoryStore>, Arc<InMemoryStore>, MockP2pHandle) {
1423        initialized_syncer_with_windows(head, SAMPLING_WINDOW, DEFAULT_PRUNING_WINDOW).await
1424    }
1425
1426    async fn initialized_syncer_with_windows(
1427        head: ExtendedHeader,
1428        sampling_window: Duration,
1429        pruning_window: Duration,
1430    ) -> (Syncer<InMemoryStore>, Arc<InMemoryStore>, MockP2pHandle) {
1431        let events = EventChannel::new();
1432        let (mock, mut handle) = P2p::mocked();
1433        let store = Arc::new(InMemoryStore::new());
1434
1435        let syncer = Syncer::start(SyncerArgs {
1436            p2p: Arc::new(mock),
1437            store: store.clone(),
1438            event_pub: events.publisher(),
1439            batch_size: 512,
1440            sampling_window,
1441            pruning_window,
1442        })
1443        .unwrap();
1444
1445        // Syncer is waiting for a trusted peer to connect
1446        handle.expect_no_cmd().await;
1447        handle.announce_peer_connected();
1448        handle.expect_no_cmd().await;
1449        handle.announce_trusted_peer_connected();
1450
1451        // After connection is established Syncer asks current network HEAD
1452        let (height, amount, respond_to) = handle.expect_header_request_for_height_cmd().await;
1453        assert_eq!(height, 0);
1454        assert_eq!(amount, 1);
1455        respond_to.send(Ok(vec![head.clone()])).unwrap();
1456
1457        // Now Syncer initializes HeaderSub with the latest HEAD
1458        let head_from_syncer = handle.expect_init_header_sub().await;
1459        assert_eq!(head_from_syncer, head);
1460
1461        let head_height = head.height();
1462        assert_syncing(&syncer, &store, &[head_height..=head_height], head_height).await;
1463
1464        (syncer, store, handle)
1465    }
1466
1467    async fn handle_session_batch(
1468        p2p_mock: &mut MockP2pHandle,
1469        remaining_headers: &[ExtendedHeader],
1470        range: BlockRange,
1471        respond: bool,
1472    ) {
1473        range.validate().unwrap();
1474
1475        let mut ranges_to_request = BlockRanges::new();
1476        ranges_to_request.insert_relaxed(&range).unwrap();
1477
1478        let mut no_respond_chans = Vec::new();
1479
1480        for _ in 0..requests_in_session(range.len()) {
1481            let (height, amount, respond_to) =
1482                p2p_mock.expect_header_request_for_height_cmd().await;
1483
1484            let requested_range = height..=height + amount - 1;
1485            ranges_to_request.remove_strict(requested_range);
1486
1487            if respond {
1488                let header_index = remaining_headers
1489                    .iter()
1490                    .position(|h| h.height() == height)
1491                    .expect("height not found in provided headers");
1492
1493                let response_range =
1494                    remaining_headers[header_index..header_index + amount as usize].to_vec();
1495                respond_to
1496                    .send(Ok(response_range))
1497                    .map_err(|_| format!("headers [{}, {}]", height, height + amount - 1))
1498                    .unwrap();
1499            } else {
1500                no_respond_chans.push(respond_to);
1501            }
1502        }
1503
1504        // Real libp2p implementation will create a timeout error if the peer does not
1505        // respond. We need to simulate that, otherwise we end up with some undesirable
1506        // behaviours in Syncer.
1507        if !respond {
1508            spawn(async move {
1509                sleep(Duration::from_secs(10)).await;
1510
1511                for respond_chan in no_respond_chans {
1512                    respond_chan.maybe_send_err(P2pError::HeaderEx(
1513                        HeaderExError::OutboundFailure(OutboundFailure::Timeout),
1514                    ));
1515                }
1516            });
1517        }
1518
1519        assert!(
1520            ranges_to_request.is_empty(),
1521            "Some headers weren't requested. expected range: {}, not requested: {}",
1522            range.display(),
1523            ranges_to_request
1524        );
1525    }
1526
1527    fn requests_in_session(headers: u64) -> usize {
1528        let max_requests = headers.div_ceil(header_session::MAX_AMOUNT_PER_REQ) as usize;
1529        let min_requests = headers.div_ceil(header_session::MIN_AMOUNT_PER_REQ) as usize;
1530
1531        if max_requests > header_session::MAX_CONCURRENT_REQS {
1532            // if we have to do more requests than our concurrency limit anyway
1533            max_requests
1534        } else {
1535            // otherwise, we can handle batch fully concurrent
1536            header_session::MAX_CONCURRENT_REQS.min(min_requests)
1537        }
1538    }
1539
1540    impl BlockRanges {
1541        fn remove_strict(&mut self, range: BlockRange) {
1542            for stored in self.as_ref() {
1543                if stored.contains(range.start()) && stored.contains(range.end()) {
1544                    self.remove_relaxed(range).unwrap();
1545                    return;
1546                }
1547            }
1548
1549            panic!("block ranges ({self}) don't contain {}", range.display());
1550        }
1551    }
1552}