lumina_node/
syncer.rs

1//! Component responsible for synchronizing block headers announced in the Celestia network.
2
3use std::marker::PhantomData;
4use std::pin::pin;
5use std::sync::Arc;
6use std::time::Duration;
7
8use backoff::ExponentialBackoffBuilder;
9use backoff::backoff::Backoff;
10use celestia_types::ExtendedHeader;
11use lumina_utils::executor::{JoinHandle, spawn};
12use lumina_utils::time::{Instant, Interval, sleep};
13use serde::{Deserialize, Serialize};
14use tendermint::Time;
15use tokio::select;
16use tokio::sync::{mpsc, oneshot};
17use tokio_util::sync::CancellationToken;
18use tracing::{debug, error, info, instrument, warn};
19
20use crate::block_ranges::{BlockRange, BlockRangeExt, BlockRanges};
21use crate::events::{EventPublisher, NodeEvent};
22use crate::p2p::{P2p, P2pError};
23use crate::store::{Store, StoreError};
24use crate::utils::{FusedReusableFuture, OneshotSenderExt, TimeExt};
25
26type Result<T, E = SyncerError> = std::result::Result<T, E>;
27
28const TRY_INIT_BACKOFF_MAX_INTERVAL: Duration = Duration::from_secs(60);
29const SLOW_SYNC_MIN_THRESHOLD: u64 = 50;
30
31/// Representation of all the errors that can occur in `Syncer` component.
32#[derive(Debug, thiserror::Error)]
33pub enum SyncerError {
34    /// An error propagated from the `P2p` component.
35    #[error("P2p: {0}")]
36    P2p(#[from] P2pError),
37
38    /// An error propagated from the [`Store`] component.
39    #[error("Store: {0}")]
40    Store(#[from] StoreError),
41
42    /// The worker has died.
43    #[error("Worker died")]
44    WorkerDied,
45
46    /// Channel closed unexpectedly.
47    #[error("Channel closed unexpectedly")]
48    ChannelClosedUnexpectedly,
49}
50
51impl SyncerError {
52    pub(crate) fn is_fatal(&self) -> bool {
53        match self {
54            SyncerError::P2p(e) => e.is_fatal(),
55            SyncerError::Store(e) => e.is_fatal(),
56            SyncerError::WorkerDied | SyncerError::ChannelClosedUnexpectedly => true,
57        }
58    }
59}
60
61impl From<oneshot::error::RecvError> for SyncerError {
62    fn from(_value: oneshot::error::RecvError) -> Self {
63        SyncerError::ChannelClosedUnexpectedly
64    }
65}
66
67/// Component responsible for synchronizing block headers from the network.
68#[derive(Debug)]
69pub(crate) struct Syncer<S>
70where
71    S: Store + 'static,
72{
73    cmd_tx: mpsc::Sender<SyncerCmd>,
74    cancellation_token: CancellationToken,
75    join_handle: JoinHandle,
76    _store: PhantomData<S>,
77}
78
79/// Arguments used to configure the [`Syncer`].
80pub(crate) struct SyncerArgs<S>
81where
82    S: Store + 'static,
83{
84    /// Handler for the peer to peer messaging.
85    pub(crate) p2p: Arc<P2p>,
86    /// Headers storage.
87    pub(crate) store: Arc<S>,
88    /// Event publisher.
89    pub(crate) event_pub: EventPublisher,
90    /// Batch size.
91    pub(crate) batch_size: u64,
92    /// Sampling window
93    pub(crate) sampling_window: Duration,
94    /// Pruning window
95    pub(crate) pruning_window: Duration,
96}
97
98#[derive(Debug)]
99enum SyncerCmd {
100    GetInfo {
101        respond_to: oneshot::Sender<SyncingInfo>,
102    },
103    #[cfg(test)]
104    TriggerFetchNextBatch,
105}
106
107/// Status of the synchronization.
108#[derive(Debug, Serialize, Deserialize)]
109pub struct SyncingInfo {
110    /// Ranges of headers that are already synchronised
111    pub stored_headers: BlockRanges,
112    /// Syncing target. The latest height seen in the network that was successfully verified.
113    pub subjective_head: u64,
114}
115
116impl<S> Syncer<S>
117where
118    S: Store,
119{
120    /// Create and start the [`Syncer`].
121    pub(crate) fn start(args: SyncerArgs<S>) -> Result<Self> {
122        let cancellation_token = CancellationToken::new();
123        let event_pub = args.event_pub.clone();
124        let (cmd_tx, cmd_rx) = mpsc::channel(16);
125        let mut worker = Worker::new(args, cancellation_token.child_token(), cmd_rx)?;
126
127        let join_handle = spawn(async move {
128            if let Err(e) = worker.run().await {
129                error!("Syncer stopped because of a fatal error: {e}");
130
131                event_pub.send(NodeEvent::FatalSyncerError {
132                    error: e.to_string(),
133                });
134            }
135        });
136
137        Ok(Syncer {
138            cancellation_token,
139            cmd_tx,
140            join_handle,
141            _store: PhantomData,
142        })
143    }
144
145    /// Stop the worker.
146    pub(crate) fn stop(&self) {
147        // Singal the Worker to stop.
148        self.cancellation_token.cancel();
149    }
150
151    /// Wait until worker is completely stopped.
152    pub(crate) async fn join(&self) {
153        self.join_handle.join().await;
154    }
155
156    async fn send_command(&self, cmd: SyncerCmd) -> Result<()> {
157        self.cmd_tx
158            .send(cmd)
159            .await
160            .map_err(|_| SyncerError::WorkerDied)
161    }
162
163    /// Get the current synchronization status.
164    ///
165    /// # Errors
166    ///
167    /// This function will return an error if the [`Syncer`] has been stopped.
168    pub(crate) async fn info(&self) -> Result<SyncingInfo> {
169        let (tx, rx) = oneshot::channel();
170
171        self.send_command(SyncerCmd::GetInfo { respond_to: tx })
172            .await?;
173
174        Ok(rx.await?)
175    }
176
177    #[cfg(test)]
178    async fn trigger_fetch_next_batch(&self) -> Result<()> {
179        self.send_command(SyncerCmd::TriggerFetchNextBatch).await
180    }
181}
182
183impl<S> Drop for Syncer<S>
184where
185    S: Store,
186{
187    fn drop(&mut self) {
188        self.stop();
189    }
190}
191
192struct Worker<S>
193where
194    S: Store + 'static,
195{
196    cancellation_token: CancellationToken,
197    cmd_rx: mpsc::Receiver<SyncerCmd>,
198    event_pub: EventPublisher,
199    p2p: Arc<P2p>,
200    store: Arc<S>,
201    header_sub_rx: Option<mpsc::Receiver<ExtendedHeader>>,
202    subjective_head_height: Option<u64>,
203    highest_slow_sync_height: Option<u64>,
204    batch_size: u64,
205    ongoing_batch: Ongoing,
206    sampling_window: Duration,
207    pruning_window: Duration,
208}
209
210struct Ongoing {
211    range: Option<BlockRange>,
212    task: FusedReusableFuture<(Result<Vec<ExtendedHeader>, P2pError>, Duration)>,
213}
214
215impl<S> Worker<S>
216where
217    S: Store,
218{
219    fn new(
220        args: SyncerArgs<S>,
221        cancellation_token: CancellationToken,
222        cmd_rx: mpsc::Receiver<SyncerCmd>,
223    ) -> Result<Self> {
224        Ok(Worker {
225            cancellation_token,
226            cmd_rx,
227            event_pub: args.event_pub,
228            p2p: args.p2p,
229            store: args.store,
230            header_sub_rx: None,
231            subjective_head_height: None,
232            highest_slow_sync_height: None,
233            batch_size: args.batch_size,
234            ongoing_batch: Ongoing {
235                range: None,
236                task: FusedReusableFuture::terminated(),
237            },
238            sampling_window: args.sampling_window,
239            pruning_window: args.pruning_window,
240        })
241    }
242
243    async fn run(&mut self) -> Result<()> {
244        loop {
245            if self.cancellation_token.is_cancelled() {
246                break;
247            }
248
249            self.connecting_event_loop().await?;
250
251            if self.cancellation_token.is_cancelled() {
252                break;
253            }
254
255            self.connected_event_loop().await?;
256        }
257
258        debug!("Syncer stopped");
259        Ok(())
260    }
261
262    /// The responsibility of this event loop is to await a trusted peer to
263    /// connect and get the network head, while accepting commands.
264    ///
265    /// NOTE: Only fatal errors should be propagated!
266    async fn connecting_event_loop(&mut self) -> Result<()> {
267        debug!("Entering connecting_event_loop");
268
269        let mut report_interval = Interval::new(Duration::from_secs(60));
270        self.report().await?;
271
272        let mut try_init_fut = pin!(try_init_task(
273            self.p2p.clone(),
274            self.store.clone(),
275            self.event_pub.clone()
276        ));
277
278        loop {
279            select! {
280                _ = self.cancellation_token.cancelled() => {
281                    break;
282                }
283                _ = report_interval.tick() => {
284                    self.report().await?;
285                }
286                res = &mut try_init_fut => {
287                    // `try_init_task` propagates only fatal errors
288                    let (network_head, took) = res?;
289                    let network_head_height = network_head.height().value();
290
291                    info!("Setting initial subjective head to {network_head_height}");
292                    self.set_subjective_head_height(network_head_height);
293
294                    let (header_sub_tx, header_sub_rx) = mpsc::channel(16);
295                    self.p2p.init_header_sub(network_head, header_sub_tx).await?;
296                    self.header_sub_rx = Some(header_sub_rx);
297
298                    self.event_pub.send(NodeEvent::FetchingHeadHeaderFinished {
299                        height: network_head_height,
300                        took,
301                    });
302
303                    break;
304                }
305                Some(cmd) = self.cmd_rx.recv() => {
306                    self.on_cmd(cmd).await?;
307                }
308            }
309        }
310
311        Ok(())
312    }
313
314    /// The reponsibility of this event loop is to start the syncing process,
315    /// handles events from HeaderSub, and accept commands.
316    ///
317    /// NOTE: Only fatal errors should be propagated!
318    async fn connected_event_loop(&mut self) -> Result<()> {
319        debug!("Entering connected_event_loop");
320
321        let mut report_interval = Interval::new(Duration::from_secs(60));
322        let mut peer_tracker_info_watcher = self.p2p.peer_tracker_info_watcher();
323
324        // Check if connection status changed before creating the watcher
325        if peer_tracker_info_watcher.borrow().num_connected_peers == 0 {
326            warn!("All peers disconnected");
327            return Ok(());
328        }
329
330        self.fetch_next_batch().await?;
331        self.report().await?;
332
333        loop {
334            select! {
335                _ = self.cancellation_token.cancelled() => {
336                    break;
337                }
338                _ = peer_tracker_info_watcher.changed() => {
339                    if peer_tracker_info_watcher.borrow().num_connected_peers == 0 {
340                        warn!("All peers disconnected");
341                        break;
342                    }
343                }
344                _ = report_interval.tick() => {
345                    self.report().await?;
346                }
347                res = header_sub_recv(self.header_sub_rx.as_mut()) => {
348                    let header = res?;
349                    self.on_header_sub_message(header).await?;
350                    self.fetch_next_batch().await?;
351                }
352                Some(cmd) = self.cmd_rx.recv() => {
353                    self.on_cmd(cmd).await?;
354                }
355                (res, took) = &mut self.ongoing_batch.task => {
356                    self.on_fetch_next_batch_result(res, took).await?;
357                    self.fetch_next_batch().await?;
358                }
359            }
360        }
361
362        if let Some(ongoing) = self.ongoing_batch.range.take() {
363            warn!("Cancelling fetching of {}", ongoing.display());
364            self.ongoing_batch.task.terminate();
365        }
366
367        self.header_sub_rx.take();
368
369        Ok(())
370    }
371
372    async fn syncing_info(&self) -> Result<SyncingInfo> {
373        Ok(SyncingInfo {
374            stored_headers: self.store.get_stored_header_ranges().await?,
375            subjective_head: self.subjective_head_height.unwrap_or(0),
376        })
377    }
378
379    #[instrument(skip_all)]
380    async fn report(&mut self) -> Result<()> {
381        let SyncingInfo {
382            stored_headers,
383            subjective_head,
384        } = self.syncing_info().await?;
385
386        let ongoing_batch = self
387            .ongoing_batch
388            .range
389            .as_ref()
390            .map(|range| format!("{}", range.display()))
391            .unwrap_or_else(|| "None".to_string());
392
393        info!(
394            "syncing: head: {subjective_head}, stored headers: {stored_headers}, ongoing batches: {ongoing_batch}"
395        );
396        Ok(())
397    }
398
399    async fn on_cmd(&mut self, cmd: SyncerCmd) -> Result<()> {
400        match cmd {
401            SyncerCmd::GetInfo { respond_to } => {
402                let info = self.syncing_info().await?;
403                respond_to.maybe_send(info);
404            }
405            #[cfg(test)]
406            SyncerCmd::TriggerFetchNextBatch => {
407                self.fetch_next_batch().await?;
408            }
409        }
410
411        Ok(())
412    }
413
414    #[instrument(skip_all)]
415    async fn on_header_sub_message(&mut self, new_head: ExtendedHeader) -> Result<()> {
416        let new_head_height = new_head.height().value();
417
418        self.set_subjective_head_height(new_head_height);
419
420        if let Ok(store_head_height) = self.store.head_height().await {
421            // If our new header is adjacent to the HEAD of the store
422            if store_head_height + 1 == new_head_height {
423                // Header is already verified by HeaderSub and will be validated against previous
424                // head on insert
425                if self.store.insert(new_head).await.is_ok() {
426                    self.event_pub.send(NodeEvent::AddedHeaderFromHeaderSub {
427                        height: new_head_height,
428                    });
429                }
430            }
431        }
432
433        Ok(())
434    }
435
436    fn set_subjective_head_height(&mut self, height: u64) {
437        if self
438            .subjective_head_height
439            .is_some_and(|old_height| height <= old_height)
440        {
441            return;
442        }
443
444        self.subjective_head_height = Some(height);
445    }
446
447    #[instrument(skip_all)]
448    async fn fetch_next_batch(&mut self) -> Result<()> {
449        debug_assert_eq!(
450            self.ongoing_batch.range.is_none(),
451            self.ongoing_batch.task.is_terminated()
452        );
453
454        if !self.ongoing_batch.task.is_terminated() {
455            // Another batch is ongoing. We do not parallelize `Syncer`
456            // by design. Any parallel requests are done in the
457            // HeaderEx client through `Session`.
458            //
459            // Nothing to schedule
460            return Ok(());
461        }
462
463        if self.p2p.peer_tracker_info().num_connected_peers == 0 {
464            // No connected peers. We can't do the request.
465            // We will recover from this in `run`.
466            return Ok(());
467        }
468
469        let Some(subjective_head_height) = self.subjective_head_height else {
470            // Nothing to schedule
471            return Ok(());
472        };
473
474        let store_ranges = self.store.get_stored_header_ranges().await?;
475        let pruned_ranges = self.store.get_pruned_ranges().await?;
476
477        // Pruner removes already sampled headers and creates gaps in the ranges.
478        // Syncer must ignore those gaps.
479        let synced_ranges = pruned_ranges + &store_ranges;
480
481        let next_batch = calculate_range_to_fetch(
482            subjective_head_height,
483            synced_ranges.as_ref(),
484            self.batch_size,
485        );
486
487        if next_batch.is_empty() {
488            // no headers to fetch
489            return Ok(());
490        }
491
492        // If all heights of next batch is within the slow sync range
493        if self
494            .highest_slow_sync_height
495            .is_some_and(|height| *next_batch.end() <= height)
496        {
497            // Threshold is the half of batch size but it should be at least 50.
498            let threshold = (self.batch_size / 2).max(SLOW_SYNC_MIN_THRESHOLD);
499
500            // Calculate how many headers are locally available for sampling.
501            let sampled_ranges = self.store.get_sampled_ranges().await?;
502            let available_for_sampling = (store_ranges - sampled_ranges).len();
503
504            // Do not fetch next batch if we have more headers than the threshold.
505            if available_for_sampling > threshold {
506                // NOTE: Recheck will happen when we receive a header from header sub (~6 secs).
507                return Ok(());
508            }
509        }
510
511        // make sure we're inside the sampling window before we start
512        match self.store.get_by_height(next_batch.end() + 1).await {
513            Ok(known_header) => {
514                if !self.in_sampling_window(&known_header) {
515                    return Ok(());
516                }
517            }
518            Err(StoreError::NotFound) => {}
519            Err(e) => return Err(e.into()),
520        }
521
522        self.event_pub.send(NodeEvent::FetchingHeadersStarted {
523            from_height: *next_batch.start(),
524            to_height: *next_batch.end(),
525        });
526
527        let p2p = self.p2p.clone();
528
529        self.ongoing_batch.range = Some(next_batch.clone());
530
531        self.ongoing_batch.task.set(async move {
532            let now = Instant::now();
533            let res = p2p.get_unverified_header_range(next_batch).await;
534            (res, now.elapsed())
535        });
536
537        Ok(())
538    }
539
540    /// Handle the result of the batch request and propagate fatal errors.
541    #[instrument(skip_all)]
542    async fn on_fetch_next_batch_result(
543        &mut self,
544        res: Result<Vec<ExtendedHeader>, P2pError>,
545        took: Duration,
546    ) -> Result<()> {
547        let range = self
548            .ongoing_batch
549            .range
550            .take()
551            .expect("ongoing_batch not initialized correctly");
552
553        let from_height = *range.start();
554        let to_height = *range.end();
555
556        let headers = match res {
557            Ok(headers) => headers,
558            Err(e) => {
559                if e.is_fatal() {
560                    return Err(e.into());
561                }
562
563                self.event_pub.send(NodeEvent::FetchingHeadersFailed {
564                    from_height,
565                    to_height,
566                    error: e.to_string(),
567                    took,
568                });
569
570                return Ok(());
571            }
572        };
573
574        let pruning_cutoff = Time::now().saturating_sub(self.pruning_window);
575
576        // Iterate headers from highest to lowest and check if there is
577        // a new highest "slow sync" height.
578        for header in headers.iter().rev() {
579            if self
580                .highest_slow_sync_height
581                .is_some_and(|height| header.height().value() <= height)
582            {
583                // `highest_slow_sync_height` is already higher than `header`
584                // so we don't need to check anything else.
585                break;
586            }
587
588            // If `header` is after the pruning edge, we mark it as the
589            // `highest_slow_sync_height` and we stop checking lower headers.
590            if header.time() <= pruning_cutoff {
591                self.highest_slow_sync_height = Some(header.height().value());
592                break;
593            }
594        }
595
596        if let Err(e) = self.store.insert(headers).await {
597            if e.is_fatal() {
598                return Err(e.into());
599            }
600
601            self.event_pub.send(NodeEvent::FetchingHeadersFailed {
602                from_height,
603                to_height,
604                error: format!("Failed to store headers: {e}"),
605                took,
606            });
607        }
608
609        self.event_pub.send(NodeEvent::FetchingHeadersFinished {
610            from_height,
611            to_height,
612            took,
613        });
614
615        Ok(())
616    }
617
618    fn in_sampling_window(&self, header: &ExtendedHeader) -> bool {
619        let sampling_window_end = Time::now().saturating_sub(self.sampling_window);
620        header.time().after(sampling_window_end)
621    }
622}
623
624/// based on the synced headers and current network head height, calculate range of headers that
625/// should be fetched from the network, anchored on already synced header range
626fn calculate_range_to_fetch(
627    subjective_head_height: u64,
628    synced_headers: &[BlockRange],
629    limit: u64,
630) -> BlockRange {
631    let mut synced_headers_iter = synced_headers.iter().rev();
632
633    let Some(synced_head_range) = synced_headers_iter.next() else {
634        // empty synced ranges, we're missing everything
635        let range = 1..=subjective_head_height;
636        return range.tailn(limit);
637    };
638
639    if synced_head_range.end() < &subjective_head_height {
640        // if we haven't caught up with the network head, start from there
641        let range = synced_head_range.end() + 1..=subjective_head_height;
642        return range.tailn(limit);
643    }
644
645    // there exists a range contiguous with network head. inspect previous range end
646    let penultimate_range_end = synced_headers_iter.next().map(|r| *r.end()).unwrap_or(0);
647
648    let range = penultimate_range_end + 1..=synced_head_range.start().saturating_sub(1);
649    range.headn(limit)
650}
651
652#[instrument(skip_all)]
653async fn try_init_task<S>(
654    p2p: Arc<P2p>,
655    store: Arc<S>,
656    event_pub: EventPublisher,
657) -> Result<(ExtendedHeader, Duration)>
658where
659    S: Store + 'static,
660{
661    let now = Instant::now();
662    let mut event_reported = false;
663    let mut backoff = ExponentialBackoffBuilder::default()
664        .with_max_interval(TRY_INIT_BACKOFF_MAX_INTERVAL)
665        .with_max_elapsed_time(None)
666        .build();
667
668    loop {
669        match try_init(&p2p, &*store, &event_pub, &mut event_reported).await {
670            Ok(network_head) => {
671                return Ok((network_head, now.elapsed()));
672            }
673            Err(e) if e.is_fatal() => {
674                return Err(e);
675            }
676            Err(e) => {
677                let sleep_dur = backoff
678                    .next_backoff()
679                    .expect("backoff never stops retrying");
680
681                warn!(
682                    "Initialization of subjective head failed: {e}. Trying again in {sleep_dur:?}."
683                );
684                sleep(sleep_dur).await;
685            }
686        }
687    }
688}
689
690async fn try_init<S>(
691    p2p: &P2p,
692    store: &S,
693    event_pub: &EventPublisher,
694    event_reported: &mut bool,
695) -> Result<ExtendedHeader>
696where
697    S: Store,
698{
699    p2p.wait_connected_trusted().await?;
700
701    if !*event_reported {
702        event_pub.send(NodeEvent::FetchingHeadHeaderStarted);
703        *event_reported = true;
704    }
705
706    let network_head = p2p.get_head_header().await?;
707
708    // If the network head and the store head have the same height,
709    // then `insert` will error because of insertion contraints.
710    // However, if both headers are the exactly the same, we
711    // can skip inserting, as the header is already there.
712    //
713    // This can happen in case of fast node restart.
714    let try_insert = match store.get_head().await {
715        // `ExtendedHeader.commit.signatures` can be different set on each fetch
716        // so we compare only hashes.
717        Ok(store_head) => store_head.hash() != network_head.hash(),
718        Err(StoreError::NotFound) => true,
719        Err(e) => return Err(e.into()),
720    };
721
722    if try_insert {
723        // Insert HEAD to the store and initialize header-sub.
724        // Normal insertion checks still apply here.
725        store.insert(network_head.clone()).await?;
726    }
727
728    Ok(network_head)
729}
730
731async fn header_sub_recv(
732    rx: Option<&mut mpsc::Receiver<ExtendedHeader>>,
733) -> Result<ExtendedHeader> {
734    rx.expect("header-sub not initialized")
735        .recv()
736        .await
737        .ok_or(SyncerError::P2p(P2pError::WorkerDied))
738}
739
740#[cfg(test)]
741mod tests {
742    use std::ops::RangeInclusive;
743
744    use super::*;
745    use crate::block_ranges::{BlockRange, BlockRangeExt};
746    use crate::events::EventChannel;
747    use crate::node::{DEFAULT_PRUNING_WINDOW, HeaderExError, SAMPLING_WINDOW};
748    use crate::p2p::header_session;
749    use crate::store::InMemoryStore;
750    use crate::test_utils::{MockP2pHandle, gen_filled_store};
751    use crate::utils::OneshotResultSenderExt;
752    use celestia_types::test_utils::ExtendedHeaderGenerator;
753    use libp2p::request_response::OutboundFailure;
754    use lumina_utils::test_utils::async_test;
755
756    #[test]
757    fn calculate_range_to_fetch_test_header_limit() {
758        let head_height = 1024;
759        let ranges = [256..=512];
760
761        let fetch_range = calculate_range_to_fetch(head_height, &ranges, 16);
762        assert_eq!(fetch_range, 513..=528);
763
764        let fetch_range = calculate_range_to_fetch(head_height, &ranges, 511);
765        assert_eq!(fetch_range, 513..=1023);
766        let fetch_range = calculate_range_to_fetch(head_height, &ranges, 512);
767        assert_eq!(fetch_range, 513..=1024);
768        let fetch_range = calculate_range_to_fetch(head_height, &ranges, 513);
769        assert_eq!(fetch_range, 513..=1024);
770
771        let fetch_range = calculate_range_to_fetch(head_height, &ranges, 1024);
772        assert_eq!(fetch_range, 513..=1024);
773    }
774
775    #[test]
776    fn calculate_range_to_fetch_empty_store() {
777        let fetch_range = calculate_range_to_fetch(1, &[], 100);
778        assert_eq!(fetch_range, 1..=1);
779
780        let fetch_range = calculate_range_to_fetch(100, &[], 10);
781        assert_eq!(fetch_range, 1..=10);
782
783        let fetch_range = calculate_range_to_fetch(100, &[], 50);
784        assert_eq!(fetch_range, 1..=50);
785    }
786
787    #[test]
788    fn calculate_range_to_fetch_fully_synced() {
789        let fetch_range = calculate_range_to_fetch(1, &[1..=1], 100);
790        assert!(fetch_range.is_empty());
791
792        let fetch_range = calculate_range_to_fetch(100, &[1..=100], 10);
793        assert!(fetch_range.is_empty());
794
795        let fetch_range = calculate_range_to_fetch(100, &[1..=100], 10);
796        assert!(fetch_range.is_empty());
797    }
798
799    #[test]
800    fn calculate_range_to_fetch_caught_up() {
801        let head_height = 4000;
802
803        let fetch_range = calculate_range_to_fetch(head_height, &[3000..=4000], 500);
804        assert_eq!(fetch_range, 2500..=2999);
805        let fetch_range = calculate_range_to_fetch(head_height, &[500..=1000, 3000..=4000], 500);
806        assert_eq!(fetch_range, 2500..=2999);
807        let fetch_range = calculate_range_to_fetch(head_height, &[2500..=2800, 3000..=4000], 500);
808        assert_eq!(fetch_range, 2801..=2999);
809        let fetch_range = calculate_range_to_fetch(head_height, &[2500..=2800, 3000..=4000], 500);
810        assert_eq!(fetch_range, 2801..=2999);
811        let fetch_range = calculate_range_to_fetch(head_height, &[300..=4000], 500);
812        assert_eq!(fetch_range, 1..=299);
813    }
814
815    #[test]
816    fn calculate_range_to_fetch_catching_up() {
817        let head_height = 4000;
818
819        let fetch_range = calculate_range_to_fetch(head_height, &[2000..=3000], 500);
820        assert_eq!(fetch_range, 3001..=3500);
821        let fetch_range = calculate_range_to_fetch(head_height, &[2000..=3500], 500);
822        assert_eq!(fetch_range, 3501..=4000);
823        let fetch_range = calculate_range_to_fetch(head_height, &[1..=2998, 3000..=3800], 500);
824        assert_eq!(fetch_range, 3801..=4000);
825    }
826
827    #[async_test]
828    async fn init_without_genesis_hash() {
829        let events = EventChannel::new();
830        let (mock, mut handle) = P2p::mocked();
831        let mut generator = ExtendedHeaderGenerator::new();
832        let header = generator.next();
833
834        let _syncer = Syncer::start(SyncerArgs {
835            p2p: Arc::new(mock),
836            store: Arc::new(InMemoryStore::new()),
837            event_pub: events.publisher(),
838            batch_size: 512,
839            sampling_window: SAMPLING_WINDOW,
840            pruning_window: DEFAULT_PRUNING_WINDOW,
841        })
842        .unwrap();
843
844        // Syncer is waiting for a trusted peer to connect
845        handle.expect_no_cmd().await;
846        handle.announce_peer_connected();
847        handle.expect_no_cmd().await;
848        handle.announce_trusted_peer_connected();
849
850        // We're syncing from the front, so ask for head first
851        let (height, amount, respond_to) = handle.expect_header_request_for_height_cmd().await;
852        assert_eq!(height, 0);
853        assert_eq!(amount, 1);
854        respond_to.send(Ok(vec![header.clone()])).unwrap();
855
856        // Now Syncer initializes HeaderSub with the latest HEAD
857        let head_from_syncer = handle.expect_init_header_sub().await;
858        assert_eq!(head_from_syncer, header);
859
860        // network head = local head, so nothing else is produced.
861        handle.expect_no_cmd().await;
862    }
863
864    #[async_test]
865    async fn init_with_genesis_hash() {
866        let mut generator = ExtendedHeaderGenerator::new();
867        let head = generator.next();
868
869        let (_syncer, _store, mut p2p_mock) = initialized_syncer(head.clone()).await;
870
871        // network head = local head, so nothing else is produced.
872        p2p_mock.expect_no_cmd().await;
873    }
874
875    #[async_test]
876    async fn syncing() {
877        let mut generator = ExtendedHeaderGenerator::new();
878        let headers = generator.next_many(1500);
879
880        let (syncer, store, mut p2p_mock) = initialized_syncer(headers[1499].clone()).await;
881        assert_syncing(&syncer, &store, &[1500..=1500], 1500).await;
882
883        // Syncer is syncing backwards from the network head (batch 1)
884        handle_session_batch(&mut p2p_mock, &headers, 988..=1499, true).await;
885        assert_syncing(&syncer, &store, &[988..=1500], 1500).await;
886
887        // Syncer is syncing backwards from the network head (batch 2)
888        handle_session_batch(&mut p2p_mock, &headers, 476..=987, true).await;
889        assert_syncing(&syncer, &store, &[476..=1500], 1500).await;
890
891        // New HEAD was received by HeaderSub (height 1501)
892        let header1501 = generator.next();
893        p2p_mock.announce_new_head(header1501.clone());
894        // Height 1501 is adjacent to the last header of Store, so it is appended
895        // immediately
896        assert_syncing(&syncer, &store, &[476..=1501], 1501).await;
897
898        // Syncer is syncing backwards from the network head (batch 3, partial)
899        handle_session_batch(&mut p2p_mock, &headers, 1..=475, true).await;
900        assert_syncing(&syncer, &store, &[1..=1501], 1501).await;
901
902        // Syncer is fulling synced and awaiting for events
903        p2p_mock.expect_no_cmd().await;
904
905        // New HEAD was received by HeaderSub (height 1502), it should be appended immediately
906        let header1502 = generator.next();
907        p2p_mock.announce_new_head(header1502.clone());
908        assert_syncing(&syncer, &store, &[1..=1502], 1502).await;
909        p2p_mock.expect_no_cmd().await;
910
911        // New HEAD was received by HeaderSub (height 1505), it should NOT be appended
912        let headers_1503_1505 = generator.next_many(3);
913        p2p_mock.announce_new_head(headers_1503_1505[2].clone());
914        assert_syncing(&syncer, &store, &[1..=1502], 1505).await;
915
916        // New HEAD is not adjacent to store, so Syncer requests a range
917        handle_session_batch(&mut p2p_mock, &headers_1503_1505, 1503..=1505, true).await;
918        assert_syncing(&syncer, &store, &[1..=1505], 1505).await;
919
920        // New HEAD was received by HeaderSub (height 3000), it should NOT be appended
921        let mut headers = generator.next_many(1495);
922        p2p_mock.announce_new_head(headers[1494].clone());
923        assert_syncing(&syncer, &store, &[1..=1505], 3000).await;
924
925        // Syncer is syncing forwards, anchored on a range already in store (batch 1)
926        handle_session_batch(&mut p2p_mock, &headers, 1506..=2017, true).await;
927        assert_syncing(&syncer, &store, &[1..=2017], 3000).await;
928
929        // New head from header sub added, should NOT be appended
930        headers.push(generator.next());
931        p2p_mock.announce_new_head(headers.last().unwrap().clone());
932        assert_syncing(&syncer, &store, &[1..=2017], 3001).await;
933
934        // Syncer continues syncing forwads (batch 2)
935        handle_session_batch(&mut p2p_mock, &headers, 2018..=2529, true).await;
936        assert_syncing(&syncer, &store, &[1..=2529], 3001).await;
937
938        // Syncer continues syncing forwards, should include the new head received via HeaderSub (batch 3)
939        handle_session_batch(&mut p2p_mock, &headers, 2530..=3001, true).await;
940        assert_syncing(&syncer, &store, &[1..=3001], 3001).await;
941
942        // Syncer is fulling synced and awaiting for events
943        p2p_mock.expect_no_cmd().await;
944    }
945
946    #[async_test]
947    async fn slow_sync() {
948        let pruning_window = Duration::from_secs(600);
949        let sampling_window = SAMPLING_WINDOW;
950
951        let mut generator = ExtendedHeaderGenerator::new();
952
953        // Generate back in time 4 batches of 512 messages (= 2048 headers).
954        let first_header_time = (Time::now() - Duration::from_secs(2048)).unwrap();
955        generator.set_time(first_header_time, Duration::from_secs(1));
956        let headers = generator.next_many(2048);
957
958        let (syncer, store, mut p2p_mock) =
959            initialized_syncer_with_windows(headers[2047].clone(), sampling_window, pruning_window)
960                .await;
961        assert_syncing(&syncer, &store, &[2048..=2048], 2048).await;
962
963        // The whole 1st batch does not reach the pruning edge.
964        handle_session_batch(&mut p2p_mock, &headers, 1536..=2047, true).await;
965        assert_syncing(&syncer, &store, &[1536..=2048], 2048).await;
966
967        // 2nd batch is partially within pruning window, so fast sync applies.
968        handle_session_batch(&mut p2p_mock, &headers, 1024..=1535, true).await;
969        assert_syncing(&syncer, &store, &[1024..=2048], 2048).await;
970
971        // The whole 3rd batch is beyond pruning edge. So now slow sync applies.
972        // In order for the Syncer to send requests, more than a half of the previous
973        // batch needs to be sampled.
974        syncer.trigger_fetch_next_batch().await.unwrap();
975        p2p_mock.expect_no_cmd().await;
976
977        // After marking the 1st and more than of the 2nd batch, syncer
978        // will finally request the 3rd batch.
979        for height in 1250..=2048 {
980            // Simulate Daser
981            store.mark_as_sampled(height).await.unwrap();
982        }
983        for height in 1300..=1450 {
984            // Simulate Pruner
985            store.remove_height(height).await.unwrap();
986        }
987        syncer.trigger_fetch_next_batch().await.unwrap();
988        // Syncer should not request the pruned range
989        handle_session_batch(&mut p2p_mock, &headers, 512..=1023, true).await;
990        assert_syncing(&syncer, &store, &[512..=1299, 1451..=2048], 2048).await;
991
992        // Syncer should not request the 4th batch since there are plenty
993        // of blocks to be sampled.
994        syncer.trigger_fetch_next_batch().await.unwrap();
995        p2p_mock.expect_no_cmd().await;
996    }
997
998    #[async_test]
999    async fn window_edge() {
1000        let month_and_day_ago = Duration::from_secs(31 * 24 * 60 * 60);
1001        let mut generator = ExtendedHeaderGenerator::new();
1002        generator.set_time(
1003            (Time::now() - month_and_day_ago).expect("to not underflow"),
1004            Duration::from_secs(1),
1005        );
1006        let mut headers = generator.next_many(1200);
1007        generator.reset_time();
1008        headers.append(&mut generator.next_many(2049 - 1200));
1009
1010        let (syncer, store, mut p2p_mock) = initialized_syncer(headers[2048].clone()).await;
1011        assert_syncing(&syncer, &store, &[2049..=2049], 2049).await;
1012
1013        // Syncer requested the first batch
1014        handle_session_batch(&mut p2p_mock, &headers, 1537..=2048, true).await;
1015        assert_syncing(&syncer, &store, &[1537..=2049], 2049).await;
1016
1017        // Syncer requested the second batch hitting the sampling window
1018        handle_session_batch(&mut p2p_mock, &headers, 1025..=1536, true).await;
1019        assert_syncing(&syncer, &store, &[1025..=2049], 2049).await;
1020
1021        // Syncer is fully synced and awaiting for events
1022        p2p_mock.expect_no_cmd().await;
1023    }
1024
1025    #[async_test]
1026    async fn start_with_filled_store() {
1027        let events = EventChannel::new();
1028        let (p2p, mut p2p_mock) = P2p::mocked();
1029        let (store, mut generator) = gen_filled_store(25).await;
1030        let store = Arc::new(store);
1031
1032        let mut headers = generator.next_many(520);
1033        let network_head = generator.next(); // height 546
1034
1035        let syncer = Syncer::start(SyncerArgs {
1036            p2p: Arc::new(p2p),
1037            store: store.clone(),
1038            event_pub: events.publisher(),
1039            batch_size: 512,
1040            sampling_window: SAMPLING_WINDOW,
1041            pruning_window: DEFAULT_PRUNING_WINDOW,
1042        })
1043        .unwrap();
1044
1045        p2p_mock.announce_trusted_peer_connected();
1046
1047        // Syncer asks for current HEAD
1048        let (height, amount, respond_to) = p2p_mock.expect_header_request_for_height_cmd().await;
1049        assert_eq!(height, 0);
1050        assert_eq!(amount, 1);
1051        respond_to.send(Ok(vec![network_head.clone()])).unwrap();
1052
1053        // Now Syncer initializes HeaderSub with the latest HEAD
1054        let head_from_syncer = p2p_mock.expect_init_header_sub().await;
1055        assert_eq!(head_from_syncer, network_head);
1056
1057        assert_syncing(&syncer, &store, &[1..=25, 546..=546], 546).await;
1058
1059        // Syncer requested the first batch
1060        handle_session_batch(&mut p2p_mock, &headers, 34..=545, true).await;
1061        assert_syncing(&syncer, &store, &[1..=25, 34..=546], 546).await;
1062
1063        // Syncer requested the remaining batch ([26, 33])
1064        let (height, amount, respond_to) = p2p_mock.expect_header_request_for_height_cmd().await;
1065        assert_eq!(height, 26);
1066        assert_eq!(amount, 8);
1067        respond_to
1068            .send(Ok(headers.drain(..8).collect()))
1069            .map_err(|_| "headers [538, 545]")
1070            .unwrap();
1071        assert_syncing(&syncer, &store, &[1..=546], 546).await;
1072
1073        // Syncer is fulling synced and awaiting for events
1074        p2p_mock.expect_no_cmd().await;
1075    }
1076
1077    #[async_test]
1078    async fn stop_syncer() {
1079        let mut generator = ExtendedHeaderGenerator::new();
1080        let head = generator.next();
1081
1082        let (syncer, _store, mut p2p_mock) = initialized_syncer(head.clone()).await;
1083
1084        // network head height == 1, so nothing else is produced.
1085        p2p_mock.expect_no_cmd().await;
1086
1087        syncer.stop();
1088        // Wait for Worker to stop
1089        sleep(Duration::from_millis(1)).await;
1090        assert!(matches!(
1091            syncer.info().await.unwrap_err(),
1092            SyncerError::WorkerDied
1093        ));
1094    }
1095
1096    #[async_test]
1097    async fn all_peers_disconnected() {
1098        let mut generator = ExtendedHeaderGenerator::new();
1099
1100        let _gap = generator.next_many(24);
1101        let header25 = generator.next();
1102        let _gap = generator.next_many(4);
1103        let header30 = generator.next();
1104        let _gap = generator.next_many(4);
1105        let header35 = generator.next();
1106
1107        // Start Syncer and report height 30 as HEAD
1108        let (syncer, store, mut p2p_mock) = initialized_syncer(header30).await;
1109
1110        // Wait for the request but do not reply to it.
1111        handle_session_batch(&mut p2p_mock, &[], 1..=29, false).await;
1112
1113        p2p_mock.announce_all_peers_disconnected();
1114        // Syncer is now back to `connecting_event_loop`.
1115        p2p_mock.expect_no_cmd().await;
1116
1117        // Accounce a non-trusted peer. Syncer in `connecting_event_loop` can progress only
1118        // if a trusted peer is connected.
1119        p2p_mock.announce_peer_connected();
1120        p2p_mock.expect_no_cmd().await;
1121
1122        // Accounce a trusted peer.
1123        p2p_mock.announce_trusted_peer_connected();
1124
1125        // Now syncer will send request for HEAD.
1126        let (height, amount, respond_to) = p2p_mock.expect_header_request_for_height_cmd().await;
1127        assert_eq!(height, 0);
1128        assert_eq!(amount, 1);
1129
1130        // Report an older head. Syncer should not accept it.
1131        respond_to.send(Ok(vec![header25])).unwrap();
1132        assert_syncing(&syncer, &store, &[30..=30], 30).await;
1133
1134        // Syncer will request HEAD again after some time.
1135        sleep(Duration::from_secs(1)).await;
1136        let (height, amount, respond_to) = p2p_mock.expect_header_request_for_height_cmd().await;
1137        assert_eq!(height, 0);
1138        assert_eq!(amount, 1);
1139
1140        // Report newer HEAD than before.
1141        respond_to.send(Ok(vec![header35.clone()])).unwrap();
1142        assert_syncing(&syncer, &store, &[30..=30, 35..=35], 35).await;
1143
1144        // Syncer initializes HeaderSub with the latest HEAD.
1145        let head_from_syncer = p2p_mock.expect_init_header_sub().await;
1146        assert_eq!(head_from_syncer, header35);
1147
1148        // Syncer now is in `connected_event_loop` and will try to sync the gap
1149        // that is closer to HEAD.
1150        let (height, amount, _respond_to) = p2p_mock.expect_header_request_for_height_cmd().await;
1151        assert_eq!(height, 31);
1152        assert_eq!(amount, 4);
1153
1154        p2p_mock.announce_all_peers_disconnected();
1155        p2p_mock.expect_no_cmd().await;
1156    }
1157
1158    #[async_test]
1159    async fn all_peers_disconnected_and_no_network_head_progress() {
1160        let mut generator = ExtendedHeaderGenerator::new_from_height(30);
1161
1162        let header30 = generator.next();
1163
1164        // Start Syncer and report height 30 as HEAD
1165        let (syncer, store, mut p2p_mock) = initialized_syncer(header30.clone()).await;
1166
1167        // Wait for the request but do not reply to it.
1168        handle_session_batch(&mut p2p_mock, &[], 1..=29, false).await;
1169
1170        p2p_mock.announce_all_peers_disconnected();
1171        // Syncer is now back to `connecting_event_loop`.
1172        p2p_mock.expect_no_cmd().await;
1173
1174        // Accounce a non-trusted peer. Syncer in `connecting_event_loop` can progress only
1175        // if a trusted peer is connected.
1176        p2p_mock.announce_peer_connected();
1177        p2p_mock.expect_no_cmd().await;
1178
1179        // Accounce a trusted peer.
1180        p2p_mock.announce_trusted_peer_connected();
1181
1182        // Now syncer will send request for HEAD.
1183        let (height, amount, respond_to) = p2p_mock.expect_header_request_for_height_cmd().await;
1184        assert_eq!(height, 0);
1185        assert_eq!(amount, 1);
1186
1187        // Report the same HEAD as before.
1188        respond_to.send(Ok(vec![header30.clone()])).unwrap();
1189        assert_syncing(&syncer, &store, &[30..=30], 30).await;
1190
1191        // Syncer initializes HeaderSub with the latest HEAD.
1192        let head_from_syncer = p2p_mock.expect_init_header_sub().await;
1193        assert_eq!(head_from_syncer, header30);
1194
1195        // Syncer now is in `connected_event_loop` and will try to sync the gap
1196        handle_session_batch(&mut p2p_mock, &[], 1..=29, false).await;
1197
1198        p2p_mock.announce_all_peers_disconnected();
1199        p2p_mock.expect_no_cmd().await;
1200    }
1201
1202    #[async_test]
1203    async fn non_contiguous_response() {
1204        let mut generator = ExtendedHeaderGenerator::new();
1205        let mut headers = generator.next_many(20);
1206
1207        // Start Syncer and report last header as network head
1208        let (syncer, store, mut p2p_mock) = initialized_syncer(headers[19].clone()).await;
1209
1210        let header10 = headers[10].clone();
1211        // make a gap in response, preserving the amount of headers returned
1212        headers[10] = headers[11].clone();
1213
1214        // Syncer requests missing headers
1215        handle_session_batch(&mut p2p_mock, &headers, 1..=19, true).await;
1216
1217        // Syncer should not apply headers from invalid response
1218        assert_syncing(&syncer, &store, &[20..=20], 20).await;
1219
1220        // correct the response
1221        headers[10] = header10;
1222
1223        // Syncer requests missing headers again
1224        handle_session_batch(&mut p2p_mock, &headers, 1..=19, true).await;
1225
1226        // With a correct resposne, syncer should update the store
1227        assert_syncing(&syncer, &store, &[1..=20], 20).await;
1228    }
1229
1230    #[async_test]
1231    async fn another_chain_response() {
1232        let headers = ExtendedHeaderGenerator::new().next_many(20);
1233        let headers_prime = ExtendedHeaderGenerator::new().next_many(20);
1234
1235        // Start Syncer and report last header as network head
1236        let (syncer, store, mut p2p_mock) = initialized_syncer(headers[19].clone()).await;
1237
1238        // Syncer requests missing headers
1239        handle_session_batch(&mut p2p_mock, &headers_prime, 1..=19, true).await;
1240
1241        // Syncer should not apply headers from invalid response
1242        assert_syncing(&syncer, &store, &[20..=20], 20).await;
1243
1244        // Syncer requests missing headers again
1245        handle_session_batch(&mut p2p_mock, &headers, 1..=19, true).await;
1246
1247        // With a correct resposne, syncer should update the store
1248        assert_syncing(&syncer, &store, &[1..=20], 20).await;
1249    }
1250
1251    async fn assert_syncing(
1252        syncer: &Syncer<InMemoryStore>,
1253        store: &InMemoryStore,
1254        expected_synced_ranges: &[RangeInclusive<u64>],
1255        expected_subjective_head: u64,
1256    ) {
1257        // Syncer receives responds on the same loop that receive other events.
1258        // Wait a bit to be processed.
1259        sleep(Duration::from_millis(1)).await;
1260
1261        let store_ranges = store.get_stored_header_ranges().await.unwrap();
1262        let syncing_info = syncer.info().await.unwrap();
1263
1264        assert_eq!(store_ranges.as_ref(), expected_synced_ranges);
1265        assert_eq!(syncing_info.stored_headers.as_ref(), expected_synced_ranges);
1266        assert_eq!(syncing_info.subjective_head, expected_subjective_head);
1267    }
1268
1269    async fn initialized_syncer(
1270        head: ExtendedHeader,
1271    ) -> (Syncer<InMemoryStore>, Arc<InMemoryStore>, MockP2pHandle) {
1272        initialized_syncer_with_windows(head, SAMPLING_WINDOW, DEFAULT_PRUNING_WINDOW).await
1273    }
1274
1275    async fn initialized_syncer_with_windows(
1276        head: ExtendedHeader,
1277        sampling_window: Duration,
1278        pruning_window: Duration,
1279    ) -> (Syncer<InMemoryStore>, Arc<InMemoryStore>, MockP2pHandle) {
1280        let events = EventChannel::new();
1281        let (mock, mut handle) = P2p::mocked();
1282        let store = Arc::new(InMemoryStore::new());
1283
1284        let syncer = Syncer::start(SyncerArgs {
1285            p2p: Arc::new(mock),
1286            store: store.clone(),
1287            event_pub: events.publisher(),
1288            batch_size: 512,
1289            sampling_window,
1290            pruning_window,
1291        })
1292        .unwrap();
1293
1294        // Syncer is waiting for a trusted peer to connect
1295        handle.expect_no_cmd().await;
1296        handle.announce_peer_connected();
1297        handle.expect_no_cmd().await;
1298        handle.announce_trusted_peer_connected();
1299
1300        // After connection is established Syncer asks current network HEAD
1301        let (height, amount, respond_to) = handle.expect_header_request_for_height_cmd().await;
1302        assert_eq!(height, 0);
1303        assert_eq!(amount, 1);
1304        respond_to.send(Ok(vec![head.clone()])).unwrap();
1305
1306        // Now Syncer initializes HeaderSub with the latest HEAD
1307        let head_from_syncer = handle.expect_init_header_sub().await;
1308        assert_eq!(head_from_syncer, head);
1309
1310        let head_height = head.height().value();
1311        assert_syncing(&syncer, &store, &[head_height..=head_height], head_height).await;
1312
1313        (syncer, store, handle)
1314    }
1315
1316    async fn handle_session_batch(
1317        p2p_mock: &mut MockP2pHandle,
1318        remaining_headers: &[ExtendedHeader],
1319        range: BlockRange,
1320        respond: bool,
1321    ) {
1322        range.validate().unwrap();
1323
1324        let mut ranges_to_request = BlockRanges::new();
1325        ranges_to_request.insert_relaxed(&range).unwrap();
1326
1327        let mut no_respond_chans = Vec::new();
1328
1329        for _ in 0..requests_in_session(range.len()) {
1330            let (height, amount, respond_to) =
1331                p2p_mock.expect_header_request_for_height_cmd().await;
1332
1333            let requested_range = height..=height + amount - 1;
1334            ranges_to_request.remove_strict(requested_range);
1335
1336            if respond {
1337                let header_index = remaining_headers
1338                    .iter()
1339                    .position(|h| h.height().value() == height)
1340                    .expect("height not found in provided headers");
1341
1342                let response_range =
1343                    remaining_headers[header_index..header_index + amount as usize].to_vec();
1344                respond_to
1345                    .send(Ok(response_range))
1346                    .map_err(|_| format!("headers [{}, {}]", height, height + amount - 1))
1347                    .unwrap();
1348            } else {
1349                no_respond_chans.push(respond_to);
1350            }
1351        }
1352
1353        // Real libp2p implementation will create a timeout error if the peer does not
1354        // respond. We need to simulate that, otherwise we end up with some undesirable
1355        // behaviours in Syncer.
1356        if !respond {
1357            spawn(async move {
1358                sleep(Duration::from_secs(10)).await;
1359
1360                for respond_chan in no_respond_chans {
1361                    respond_chan.maybe_send_err(P2pError::HeaderEx(
1362                        HeaderExError::OutboundFailure(OutboundFailure::Timeout),
1363                    ));
1364                }
1365            });
1366        }
1367
1368        assert!(
1369            ranges_to_request.is_empty(),
1370            "Some headers weren't requested. expected range: {}, not requested: {}",
1371            range.display(),
1372            ranges_to_request
1373        );
1374    }
1375
1376    fn requests_in_session(headers: u64) -> usize {
1377        let max_requests = headers.div_ceil(header_session::MAX_AMOUNT_PER_REQ) as usize;
1378        let min_requests = headers.div_ceil(header_session::MIN_AMOUNT_PER_REQ) as usize;
1379
1380        if max_requests > header_session::MAX_CONCURRENT_REQS {
1381            // if we have to do more requests than our concurrency limit anyway
1382            max_requests
1383        } else {
1384            // otherwise we can handle batch fully concurrent
1385            header_session::MAX_CONCURRENT_REQS.min(min_requests)
1386        }
1387    }
1388
1389    impl BlockRanges {
1390        fn remove_strict(&mut self, range: BlockRange) {
1391            for stored in self.as_ref() {
1392                if stored.contains(range.start()) && stored.contains(range.end()) {
1393                    self.remove_relaxed(range).unwrap();
1394                    return;
1395                }
1396            }
1397
1398            panic!("block ranges ({self}) don't contain {}", range.display());
1399        }
1400    }
1401}