lumina_node/
syncer.rs

1//! Component responsible for synchronizing block headers announced in the Celestia network.
2
3use std::marker::PhantomData;
4use std::pin::pin;
5use std::sync::Arc;
6use std::time::Duration;
7
8use backoff::backoff::Backoff;
9use backoff::ExponentialBackoffBuilder;
10use celestia_types::ExtendedHeader;
11use lumina_utils::executor::{spawn, JoinHandle};
12use lumina_utils::time::{sleep, Instant, Interval};
13use serde::{Deserialize, Serialize};
14use tendermint::Time;
15use tokio::select;
16use tokio::sync::{mpsc, oneshot};
17use tokio_util::sync::CancellationToken;
18use tracing::{debug, error, info, instrument, warn};
19
20use crate::block_ranges::{BlockRange, BlockRangeExt, BlockRanges};
21use crate::events::{EventPublisher, NodeEvent};
22use crate::p2p::{P2p, P2pError};
23use crate::store::{Store, StoreError};
24use crate::utils::{FusedReusableFuture, OneshotSenderExt, TimeExt};
25
26type Result<T, E = SyncerError> = std::result::Result<T, E>;
27
28const TRY_INIT_BACKOFF_MAX_INTERVAL: Duration = Duration::from_secs(60);
29const SLOW_SYNC_MIN_THRESHOLD: u64 = 50;
30
31/// Representation of all the errors that can occur in `Syncer` component.
32#[derive(Debug, thiserror::Error)]
33pub enum SyncerError {
34    /// An error propagated from the `P2p` component.
35    #[error("P2p: {0}")]
36    P2p(#[from] P2pError),
37
38    /// An error propagated from the [`Store`] component.
39    #[error("Store: {0}")]
40    Store(#[from] StoreError),
41
42    /// The worker has died.
43    #[error("Worker died")]
44    WorkerDied,
45
46    /// Channel closed unexpectedly.
47    #[error("Channel closed unexpectedly")]
48    ChannelClosedUnexpectedly,
49}
50
51impl SyncerError {
52    pub(crate) fn is_fatal(&self) -> bool {
53        match self {
54            SyncerError::P2p(e) => e.is_fatal(),
55            SyncerError::Store(e) => e.is_fatal(),
56            SyncerError::WorkerDied | SyncerError::ChannelClosedUnexpectedly => true,
57        }
58    }
59}
60
61impl From<oneshot::error::RecvError> for SyncerError {
62    fn from(_value: oneshot::error::RecvError) -> Self {
63        SyncerError::ChannelClosedUnexpectedly
64    }
65}
66
67/// Component responsible for synchronizing block headers from the network.
68#[derive(Debug)]
69pub(crate) struct Syncer<S>
70where
71    S: Store + 'static,
72{
73    cmd_tx: mpsc::Sender<SyncerCmd>,
74    cancellation_token: CancellationToken,
75    join_handle: JoinHandle,
76    _store: PhantomData<S>,
77}
78
79/// Arguments used to configure the [`Syncer`].
80pub(crate) struct SyncerArgs<S>
81where
82    S: Store + 'static,
83{
84    /// Handler for the peer to peer messaging.
85    pub(crate) p2p: Arc<P2p>,
86    /// Headers storage.
87    pub(crate) store: Arc<S>,
88    /// Event publisher.
89    pub(crate) event_pub: EventPublisher,
90    /// Batch size.
91    pub(crate) batch_size: u64,
92    /// Sampling window
93    pub(crate) sampling_window: Duration,
94    /// Pruning window
95    pub(crate) pruning_window: Duration,
96}
97
98#[derive(Debug)]
99enum SyncerCmd {
100    GetInfo {
101        respond_to: oneshot::Sender<SyncingInfo>,
102    },
103    #[cfg(test)]
104    TriggerFetchNextBatch,
105}
106
107/// Status of the synchronization.
108#[derive(Debug, Serialize, Deserialize)]
109pub struct SyncingInfo {
110    /// Ranges of headers that are already synchronised
111    pub stored_headers: BlockRanges,
112    /// Syncing target. The latest height seen in the network that was successfully verified.
113    pub subjective_head: u64,
114}
115
116impl<S> Syncer<S>
117where
118    S: Store,
119{
120    /// Create and start the [`Syncer`].
121    pub(crate) fn start(args: SyncerArgs<S>) -> Result<Self> {
122        let cancellation_token = CancellationToken::new();
123        let event_pub = args.event_pub.clone();
124        let (cmd_tx, cmd_rx) = mpsc::channel(16);
125        let mut worker = Worker::new(args, cancellation_token.child_token(), cmd_rx)?;
126
127        let join_handle = spawn(async move {
128            if let Err(e) = worker.run().await {
129                error!("Syncer stopped because of a fatal error: {e}");
130
131                event_pub.send(NodeEvent::FatalSyncerError {
132                    error: e.to_string(),
133                });
134            }
135        });
136
137        Ok(Syncer {
138            cancellation_token,
139            cmd_tx,
140            join_handle,
141            _store: PhantomData,
142        })
143    }
144
145    /// Stop the worker.
146    pub(crate) fn stop(&self) {
147        // Singal the Worker to stop.
148        self.cancellation_token.cancel();
149    }
150
151    /// Wait until worker is completely stopped.
152    pub(crate) async fn join(&self) {
153        self.join_handle.join().await;
154    }
155
156    async fn send_command(&self, cmd: SyncerCmd) -> Result<()> {
157        self.cmd_tx
158            .send(cmd)
159            .await
160            .map_err(|_| SyncerError::WorkerDied)
161    }
162
163    /// Get the current synchronization status.
164    ///
165    /// # Errors
166    ///
167    /// This function will return an error if the [`Syncer`] has been stopped.
168    pub(crate) async fn info(&self) -> Result<SyncingInfo> {
169        let (tx, rx) = oneshot::channel();
170
171        self.send_command(SyncerCmd::GetInfo { respond_to: tx })
172            .await?;
173
174        Ok(rx.await?)
175    }
176
177    #[cfg(test)]
178    async fn trigger_fetch_next_batch(&self) -> Result<()> {
179        self.send_command(SyncerCmd::TriggerFetchNextBatch).await
180    }
181}
182
183impl<S> Drop for Syncer<S>
184where
185    S: Store,
186{
187    fn drop(&mut self) {
188        self.stop();
189    }
190}
191
192struct Worker<S>
193where
194    S: Store + 'static,
195{
196    cancellation_token: CancellationToken,
197    cmd_rx: mpsc::Receiver<SyncerCmd>,
198    event_pub: EventPublisher,
199    p2p: Arc<P2p>,
200    store: Arc<S>,
201    header_sub_rx: Option<mpsc::Receiver<ExtendedHeader>>,
202    subjective_head_height: Option<u64>,
203    highest_slow_sync_height: Option<u64>,
204    batch_size: u64,
205    ongoing_batch: Ongoing,
206    sampling_window: Duration,
207    pruning_window: Duration,
208}
209
210struct Ongoing {
211    range: Option<BlockRange>,
212    task: FusedReusableFuture<(Result<Vec<ExtendedHeader>, P2pError>, Duration)>,
213}
214
215impl<S> Worker<S>
216where
217    S: Store,
218{
219    fn new(
220        args: SyncerArgs<S>,
221        cancellation_token: CancellationToken,
222        cmd_rx: mpsc::Receiver<SyncerCmd>,
223    ) -> Result<Self> {
224        Ok(Worker {
225            cancellation_token,
226            cmd_rx,
227            event_pub: args.event_pub,
228            p2p: args.p2p,
229            store: args.store,
230            header_sub_rx: None,
231            subjective_head_height: None,
232            highest_slow_sync_height: None,
233            batch_size: args.batch_size,
234            ongoing_batch: Ongoing {
235                range: None,
236                task: FusedReusableFuture::terminated(),
237            },
238            sampling_window: args.sampling_window,
239            pruning_window: args.pruning_window,
240        })
241    }
242
243    async fn run(&mut self) -> Result<()> {
244        loop {
245            if self.cancellation_token.is_cancelled() {
246                break;
247            }
248
249            self.connecting_event_loop().await?;
250
251            if self.cancellation_token.is_cancelled() {
252                break;
253            }
254
255            self.connected_event_loop().await?;
256        }
257
258        debug!("Syncer stopped");
259        Ok(())
260    }
261
262    /// The responsibility of this event loop is to await a trusted peer to
263    /// connect and get the network head, while accepting commands.
264    ///
265    /// NOTE: Only fatal errors should be propagated!
266    async fn connecting_event_loop(&mut self) -> Result<()> {
267        debug!("Entering connecting_event_loop");
268
269        let mut report_interval = Interval::new(Duration::from_secs(60)).await;
270        self.report().await?;
271
272        let mut try_init_fut = pin!(try_init_task(
273            self.p2p.clone(),
274            self.store.clone(),
275            self.event_pub.clone()
276        ));
277
278        loop {
279            select! {
280                _ = self.cancellation_token.cancelled() => {
281                    break;
282                }
283                _ = report_interval.tick() => {
284                    self.report().await?;
285                }
286                res = &mut try_init_fut => {
287                    // `try_init_task` propagates only fatal errors
288                    let (network_head, took) = res?;
289                    let network_head_height = network_head.height().value();
290
291                    info!("Setting initial subjective head to {network_head_height}");
292                    self.set_subjective_head_height(network_head_height);
293
294                    let (header_sub_tx, header_sub_rx) = mpsc::channel(16);
295                    self.p2p.init_header_sub(network_head, header_sub_tx).await?;
296                    self.header_sub_rx = Some(header_sub_rx);
297
298                    self.event_pub.send(NodeEvent::FetchingHeadHeaderFinished {
299                        height: network_head_height,
300                        took,
301                    });
302
303                    break;
304                }
305                Some(cmd) = self.cmd_rx.recv() => {
306                    self.on_cmd(cmd).await?;
307                }
308            }
309        }
310
311        Ok(())
312    }
313
314    /// The reponsibility of this event loop is to start the syncing process,
315    /// handles events from HeaderSub, and accept commands.
316    ///
317    /// NOTE: Only fatal errors should be propagated!
318    async fn connected_event_loop(&mut self) -> Result<()> {
319        debug!("Entering connected_event_loop");
320
321        let mut report_interval = Interval::new(Duration::from_secs(60)).await;
322        let mut peer_tracker_info_watcher = self.p2p.peer_tracker_info_watcher();
323
324        // Check if connection status changed before creating the watcher
325        if peer_tracker_info_watcher.borrow().num_connected_peers == 0 {
326            warn!("All peers disconnected");
327            return Ok(());
328        }
329
330        self.fetch_next_batch().await?;
331        self.report().await?;
332
333        loop {
334            select! {
335                _ = self.cancellation_token.cancelled() => {
336                    break;
337                }
338                _ = peer_tracker_info_watcher.changed() => {
339                    if peer_tracker_info_watcher.borrow().num_connected_peers == 0 {
340                        warn!("All peers disconnected");
341                        break;
342                    }
343                }
344                _ = report_interval.tick() => {
345                    self.report().await?;
346                }
347                res = header_sub_recv(self.header_sub_rx.as_mut()) => {
348                    let header = res?;
349                    self.on_header_sub_message(header).await?;
350                    self.fetch_next_batch().await?;
351                }
352                Some(cmd) = self.cmd_rx.recv() => {
353                    self.on_cmd(cmd).await?;
354                }
355                (res, took) = &mut self.ongoing_batch.task => {
356                    self.on_fetch_next_batch_result(res, took).await?;
357                    self.fetch_next_batch().await?;
358                }
359            }
360        }
361
362        if let Some(ongoing) = self.ongoing_batch.range.take() {
363            warn!("Cancelling fetching of {}", ongoing.display());
364            self.ongoing_batch.task.terminate();
365        }
366
367        self.header_sub_rx.take();
368
369        Ok(())
370    }
371
372    async fn syncing_info(&self) -> Result<SyncingInfo> {
373        Ok(SyncingInfo {
374            stored_headers: self.store.get_stored_header_ranges().await?,
375            subjective_head: self.subjective_head_height.unwrap_or(0),
376        })
377    }
378
379    #[instrument(skip_all)]
380    async fn report(&mut self) -> Result<()> {
381        let SyncingInfo {
382            stored_headers,
383            subjective_head,
384        } = self.syncing_info().await?;
385
386        let ongoing_batch = self
387            .ongoing_batch
388            .range
389            .as_ref()
390            .map(|range| format!("{}", range.display()))
391            .unwrap_or_else(|| "None".to_string());
392
393        info!("syncing: head: {subjective_head}, stored headers: {stored_headers}, ongoing batches: {ongoing_batch}");
394        Ok(())
395    }
396
397    async fn on_cmd(&mut self, cmd: SyncerCmd) -> Result<()> {
398        match cmd {
399            SyncerCmd::GetInfo { respond_to } => {
400                let info = self.syncing_info().await?;
401                respond_to.maybe_send(info);
402            }
403            #[cfg(test)]
404            SyncerCmd::TriggerFetchNextBatch => {
405                self.fetch_next_batch().await?;
406            }
407        }
408
409        Ok(())
410    }
411
412    #[instrument(skip_all)]
413    async fn on_header_sub_message(&mut self, new_head: ExtendedHeader) -> Result<()> {
414        let new_head_height = new_head.height().value();
415
416        self.set_subjective_head_height(new_head_height);
417
418        if let Ok(store_head_height) = self.store.head_height().await {
419            // If our new header is adjacent to the HEAD of the store
420            if store_head_height + 1 == new_head_height {
421                // Header is already verified by HeaderSub and will be validated against previous
422                // head on insert
423                if self.store.insert(new_head).await.is_ok() {
424                    self.event_pub.send(NodeEvent::AddedHeaderFromHeaderSub {
425                        height: new_head_height,
426                    });
427                }
428            }
429        }
430
431        Ok(())
432    }
433
434    fn set_subjective_head_height(&mut self, height: u64) {
435        if let Some(old_height) = self.subjective_head_height {
436            if height <= old_height {
437                return;
438            }
439        }
440
441        self.subjective_head_height = Some(height);
442    }
443
444    #[instrument(skip_all)]
445    async fn fetch_next_batch(&mut self) -> Result<()> {
446        debug_assert_eq!(
447            self.ongoing_batch.range.is_none(),
448            self.ongoing_batch.task.is_terminated()
449        );
450
451        if !self.ongoing_batch.task.is_terminated() {
452            // Another batch is ongoing. We do not parallelize `Syncer`
453            // by design. Any parallel requests are done in the
454            // HeaderEx client through `Session`.
455            //
456            // Nothing to schedule
457            return Ok(());
458        }
459
460        if self.p2p.peer_tracker_info().num_connected_peers == 0 {
461            // No connected peers. We can't do the request.
462            // We will recover from this in `run`.
463            return Ok(());
464        }
465
466        let Some(subjective_head_height) = self.subjective_head_height else {
467            // Nothing to schedule
468            return Ok(());
469        };
470
471        let store_ranges = self.store.get_stored_header_ranges().await?;
472        let pruned_ranges = self.store.get_pruned_ranges().await?;
473
474        // Pruner removes already sampled headers and creates gaps in the ranges.
475        // Syncer must ignore those gaps.
476        let synced_ranges = pruned_ranges + &store_ranges;
477
478        let next_batch = calculate_range_to_fetch(
479            subjective_head_height,
480            synced_ranges.as_ref(),
481            self.batch_size,
482        );
483
484        if next_batch.is_empty() {
485            // no headers to fetch
486            return Ok(());
487        }
488
489        // If all heights of next batch is within the slow sync range
490        if self
491            .highest_slow_sync_height
492            .is_some_and(|height| *next_batch.end() <= height)
493        {
494            // Threshold is the half of batch size but it should be at least 50.
495            let threshold = (self.batch_size / 2).max(SLOW_SYNC_MIN_THRESHOLD);
496
497            // Calculate how many headers are locally available for sampling.
498            let sampled_ranges = self.store.get_sampled_ranges().await?;
499            let available_for_sampling = (store_ranges - sampled_ranges).len();
500
501            // Do not fetch next batch if we have more headers than the threshold.
502            if available_for_sampling > threshold {
503                // NOTE: Recheck will happen when we receive a header from header sub (~6 secs).
504                return Ok(());
505            }
506        }
507
508        // make sure we're inside the sampling window before we start
509        match self.store.get_by_height(next_batch.end() + 1).await {
510            Ok(known_header) => {
511                if !self.in_sampling_window(&known_header) {
512                    return Ok(());
513                }
514            }
515            Err(StoreError::NotFound) => {}
516            Err(e) => return Err(e.into()),
517        }
518
519        self.event_pub.send(NodeEvent::FetchingHeadersStarted {
520            from_height: *next_batch.start(),
521            to_height: *next_batch.end(),
522        });
523
524        let p2p = self.p2p.clone();
525
526        self.ongoing_batch.range = Some(next_batch.clone());
527
528        self.ongoing_batch.task.set(async move {
529            let now = Instant::now();
530            let res = p2p.get_unverified_header_range(next_batch).await;
531            (res, now.elapsed())
532        });
533
534        Ok(())
535    }
536
537    /// Handle the result of the batch request and propagate fatal errors.
538    #[instrument(skip_all)]
539    async fn on_fetch_next_batch_result(
540        &mut self,
541        res: Result<Vec<ExtendedHeader>, P2pError>,
542        took: Duration,
543    ) -> Result<()> {
544        let range = self
545            .ongoing_batch
546            .range
547            .take()
548            .expect("ongoing_batch not initialized correctly");
549
550        let from_height = *range.start();
551        let to_height = *range.end();
552
553        let headers = match res {
554            Ok(headers) => headers,
555            Err(e) => {
556                if e.is_fatal() {
557                    return Err(e.into());
558                }
559
560                self.event_pub.send(NodeEvent::FetchingHeadersFailed {
561                    from_height,
562                    to_height,
563                    error: e.to_string(),
564                    took,
565                });
566
567                return Ok(());
568            }
569        };
570
571        let pruning_cutoff = Time::now().saturating_sub(self.pruning_window);
572
573        // Iterate headers from highest to lowest and check if there is
574        // a new highest "slow sync" height.
575        for header in headers.iter().rev() {
576            if self
577                .highest_slow_sync_height
578                .is_some_and(|height| header.height().value() <= height)
579            {
580                // `highest_slow_sync_height` is already higher than `header`
581                // so we don't need to check anything else.
582                break;
583            }
584
585            // If `header` is after the pruning edge, we mark it as the
586            // `highest_slow_sync_height` and we stop checking lower headers.
587            if header.time() <= pruning_cutoff {
588                self.highest_slow_sync_height = Some(header.height().value());
589                break;
590            }
591        }
592
593        if let Err(e) = self.store.insert(headers).await {
594            if e.is_fatal() {
595                return Err(e.into());
596            }
597
598            self.event_pub.send(NodeEvent::FetchingHeadersFailed {
599                from_height,
600                to_height,
601                error: format!("Failed to store headers: {e}"),
602                took,
603            });
604        }
605
606        self.event_pub.send(NodeEvent::FetchingHeadersFinished {
607            from_height,
608            to_height,
609            took,
610        });
611
612        Ok(())
613    }
614
615    fn in_sampling_window(&self, header: &ExtendedHeader) -> bool {
616        let sampling_window_end = Time::now().saturating_sub(self.sampling_window);
617        header.time().after(sampling_window_end)
618    }
619}
620
621/// based on the synced headers and current network head height, calculate range of headers that
622/// should be fetched from the network, anchored on already synced header range
623fn calculate_range_to_fetch(
624    subjective_head_height: u64,
625    synced_headers: &[BlockRange],
626    limit: u64,
627) -> BlockRange {
628    let mut synced_headers_iter = synced_headers.iter().rev();
629
630    let Some(synced_head_range) = synced_headers_iter.next() else {
631        // empty synced ranges, we're missing everything
632        let range = 1..=subjective_head_height;
633        return range.tailn(limit);
634    };
635
636    if synced_head_range.end() < &subjective_head_height {
637        // if we haven't caught up with the network head, start from there
638        let range = synced_head_range.end() + 1..=subjective_head_height;
639        return range.tailn(limit);
640    }
641
642    // there exists a range contiguous with network head. inspect previous range end
643    let penultimate_range_end = synced_headers_iter.next().map(|r| *r.end()).unwrap_or(0);
644
645    let range = penultimate_range_end + 1..=synced_head_range.start().saturating_sub(1);
646    range.headn(limit)
647}
648
649#[instrument(skip_all)]
650async fn try_init_task<S>(
651    p2p: Arc<P2p>,
652    store: Arc<S>,
653    event_pub: EventPublisher,
654) -> Result<(ExtendedHeader, Duration)>
655where
656    S: Store + 'static,
657{
658    let now = Instant::now();
659    let mut event_reported = false;
660    let mut backoff = ExponentialBackoffBuilder::default()
661        .with_max_interval(TRY_INIT_BACKOFF_MAX_INTERVAL)
662        .with_max_elapsed_time(None)
663        .build();
664
665    loop {
666        match try_init(&p2p, &*store, &event_pub, &mut event_reported).await {
667            Ok(network_head) => {
668                return Ok((network_head, now.elapsed()));
669            }
670            Err(e) if e.is_fatal() => {
671                return Err(e);
672            }
673            Err(e) => {
674                let sleep_dur = backoff
675                    .next_backoff()
676                    .expect("backoff never stops retrying");
677
678                warn!(
679                    "Initialization of subjective head failed: {e}. Trying again in {sleep_dur:?}."
680                );
681                sleep(sleep_dur).await;
682            }
683        }
684    }
685}
686
687async fn try_init<S>(
688    p2p: &P2p,
689    store: &S,
690    event_pub: &EventPublisher,
691    event_reported: &mut bool,
692) -> Result<ExtendedHeader>
693where
694    S: Store,
695{
696    p2p.wait_connected_trusted().await?;
697
698    if !*event_reported {
699        event_pub.send(NodeEvent::FetchingHeadHeaderStarted);
700        *event_reported = true;
701    }
702
703    let network_head = p2p.get_head_header().await?;
704
705    // If the network head and the store head have the same height,
706    // then `insert` will error because of insertion contraints.
707    // However, if both headers are the exactly the same, we
708    // can skip inserting, as the header is already there.
709    //
710    // This can happen in case of fast node restart.
711    let try_insert = match store.get_head().await {
712        // `ExtendedHeader.commit.signatures` can be different set on each fetch
713        // so we compare only hashes.
714        Ok(store_head) => store_head.hash() != network_head.hash(),
715        Err(StoreError::NotFound) => true,
716        Err(e) => return Err(e.into()),
717    };
718
719    if try_insert {
720        // Insert HEAD to the store and initialize header-sub.
721        // Normal insertion checks still apply here.
722        store.insert(network_head.clone()).await?;
723    }
724
725    Ok(network_head)
726}
727
728async fn header_sub_recv(
729    rx: Option<&mut mpsc::Receiver<ExtendedHeader>>,
730) -> Result<ExtendedHeader> {
731    rx.expect("header-sub not initialized")
732        .recv()
733        .await
734        .ok_or(SyncerError::P2p(P2pError::WorkerDied))
735}
736
737#[cfg(test)]
738mod tests {
739    use std::ops::RangeInclusive;
740
741    use super::*;
742    use crate::block_ranges::{BlockRange, BlockRangeExt};
743    use crate::events::EventChannel;
744    use crate::node::{HeaderExError, DEFAULT_PRUNING_WINDOW, SAMPLING_WINDOW};
745    use crate::p2p::header_session;
746    use crate::store::InMemoryStore;
747    use crate::test_utils::{gen_filled_store, MockP2pHandle};
748    use crate::utils::OneshotResultSenderExt;
749    use celestia_types::test_utils::ExtendedHeaderGenerator;
750    use libp2p::request_response::OutboundFailure;
751    use lumina_utils::test_utils::async_test;
752
753    #[test]
754    fn calculate_range_to_fetch_test_header_limit() {
755        let head_height = 1024;
756        let ranges = [256..=512];
757
758        let fetch_range = calculate_range_to_fetch(head_height, &ranges, 16);
759        assert_eq!(fetch_range, 513..=528);
760
761        let fetch_range = calculate_range_to_fetch(head_height, &ranges, 511);
762        assert_eq!(fetch_range, 513..=1023);
763        let fetch_range = calculate_range_to_fetch(head_height, &ranges, 512);
764        assert_eq!(fetch_range, 513..=1024);
765        let fetch_range = calculate_range_to_fetch(head_height, &ranges, 513);
766        assert_eq!(fetch_range, 513..=1024);
767
768        let fetch_range = calculate_range_to_fetch(head_height, &ranges, 1024);
769        assert_eq!(fetch_range, 513..=1024);
770    }
771
772    #[test]
773    fn calculate_range_to_fetch_empty_store() {
774        let fetch_range = calculate_range_to_fetch(1, &[], 100);
775        assert_eq!(fetch_range, 1..=1);
776
777        let fetch_range = calculate_range_to_fetch(100, &[], 10);
778        assert_eq!(fetch_range, 1..=10);
779
780        let fetch_range = calculate_range_to_fetch(100, &[], 50);
781        assert_eq!(fetch_range, 1..=50);
782    }
783
784    #[test]
785    fn calculate_range_to_fetch_fully_synced() {
786        let fetch_range = calculate_range_to_fetch(1, &[1..=1], 100);
787        assert!(fetch_range.is_empty());
788
789        let fetch_range = calculate_range_to_fetch(100, &[1..=100], 10);
790        assert!(fetch_range.is_empty());
791
792        let fetch_range = calculate_range_to_fetch(100, &[1..=100], 10);
793        assert!(fetch_range.is_empty());
794    }
795
796    #[test]
797    fn calculate_range_to_fetch_caught_up() {
798        let head_height = 4000;
799
800        let fetch_range = calculate_range_to_fetch(head_height, &[3000..=4000], 500);
801        assert_eq!(fetch_range, 2500..=2999);
802        let fetch_range = calculate_range_to_fetch(head_height, &[500..=1000, 3000..=4000], 500);
803        assert_eq!(fetch_range, 2500..=2999);
804        let fetch_range = calculate_range_to_fetch(head_height, &[2500..=2800, 3000..=4000], 500);
805        assert_eq!(fetch_range, 2801..=2999);
806        let fetch_range = calculate_range_to_fetch(head_height, &[2500..=2800, 3000..=4000], 500);
807        assert_eq!(fetch_range, 2801..=2999);
808        let fetch_range = calculate_range_to_fetch(head_height, &[300..=4000], 500);
809        assert_eq!(fetch_range, 1..=299);
810    }
811
812    #[test]
813    fn calculate_range_to_fetch_catching_up() {
814        let head_height = 4000;
815
816        let fetch_range = calculate_range_to_fetch(head_height, &[2000..=3000], 500);
817        assert_eq!(fetch_range, 3001..=3500);
818        let fetch_range = calculate_range_to_fetch(head_height, &[2000..=3500], 500);
819        assert_eq!(fetch_range, 3501..=4000);
820        let fetch_range = calculate_range_to_fetch(head_height, &[1..=2998, 3000..=3800], 500);
821        assert_eq!(fetch_range, 3801..=4000);
822    }
823
824    #[async_test]
825    async fn init_without_genesis_hash() {
826        let events = EventChannel::new();
827        let (mock, mut handle) = P2p::mocked();
828        let mut gen = ExtendedHeaderGenerator::new();
829        let header = gen.next();
830
831        let _syncer = Syncer::start(SyncerArgs {
832            p2p: Arc::new(mock),
833            store: Arc::new(InMemoryStore::new()),
834            event_pub: events.publisher(),
835            batch_size: 512,
836            sampling_window: SAMPLING_WINDOW,
837            pruning_window: DEFAULT_PRUNING_WINDOW,
838        })
839        .unwrap();
840
841        // Syncer is waiting for a trusted peer to connect
842        handle.expect_no_cmd().await;
843        handle.announce_peer_connected();
844        handle.expect_no_cmd().await;
845        handle.announce_trusted_peer_connected();
846
847        // We're syncing from the front, so ask for head first
848        let (height, amount, respond_to) = handle.expect_header_request_for_height_cmd().await;
849        assert_eq!(height, 0);
850        assert_eq!(amount, 1);
851        respond_to.send(Ok(vec![header.clone()])).unwrap();
852
853        // Now Syncer initializes HeaderSub with the latest HEAD
854        let head_from_syncer = handle.expect_init_header_sub().await;
855        assert_eq!(head_from_syncer, header);
856
857        // network head = local head, so nothing else is produced.
858        handle.expect_no_cmd().await;
859    }
860
861    #[async_test]
862    async fn init_with_genesis_hash() {
863        let mut gen = ExtendedHeaderGenerator::new();
864        let head = gen.next();
865
866        let (_syncer, _store, mut p2p_mock) = initialized_syncer(head.clone()).await;
867
868        // network head = local head, so nothing else is produced.
869        p2p_mock.expect_no_cmd().await;
870    }
871
872    #[async_test]
873    async fn syncing() {
874        let mut gen = ExtendedHeaderGenerator::new();
875        let headers = gen.next_many(1500);
876
877        let (syncer, store, mut p2p_mock) = initialized_syncer(headers[1499].clone()).await;
878        assert_syncing(&syncer, &store, &[1500..=1500], 1500).await;
879
880        // Syncer is syncing backwards from the network head (batch 1)
881        handle_session_batch(&mut p2p_mock, &headers, 988..=1499, true).await;
882        assert_syncing(&syncer, &store, &[988..=1500], 1500).await;
883
884        // Syncer is syncing backwards from the network head (batch 2)
885        handle_session_batch(&mut p2p_mock, &headers, 476..=987, true).await;
886        assert_syncing(&syncer, &store, &[476..=1500], 1500).await;
887
888        // New HEAD was received by HeaderSub (height 1501)
889        let header1501 = gen.next();
890        p2p_mock.announce_new_head(header1501.clone());
891        // Height 1501 is adjacent to the last header of Store, so it is appended
892        // immediately
893        assert_syncing(&syncer, &store, &[476..=1501], 1501).await;
894
895        // Syncer is syncing backwards from the network head (batch 3, partial)
896        handle_session_batch(&mut p2p_mock, &headers, 1..=475, true).await;
897        assert_syncing(&syncer, &store, &[1..=1501], 1501).await;
898
899        // Syncer is fulling synced and awaiting for events
900        p2p_mock.expect_no_cmd().await;
901
902        // New HEAD was received by HeaderSub (height 1502), it should be appended immediately
903        let header1502 = gen.next();
904        p2p_mock.announce_new_head(header1502.clone());
905        assert_syncing(&syncer, &store, &[1..=1502], 1502).await;
906        p2p_mock.expect_no_cmd().await;
907
908        // New HEAD was received by HeaderSub (height 1505), it should NOT be appended
909        let headers_1503_1505 = gen.next_many(3);
910        p2p_mock.announce_new_head(headers_1503_1505[2].clone());
911        assert_syncing(&syncer, &store, &[1..=1502], 1505).await;
912
913        // New HEAD is not adjacent to store, so Syncer requests a range
914        handle_session_batch(&mut p2p_mock, &headers_1503_1505, 1503..=1505, true).await;
915        assert_syncing(&syncer, &store, &[1..=1505], 1505).await;
916
917        // New HEAD was received by HeaderSub (height 3000), it should NOT be appended
918        let mut headers = gen.next_many(1495);
919        p2p_mock.announce_new_head(headers[1494].clone());
920        assert_syncing(&syncer, &store, &[1..=1505], 3000).await;
921
922        // Syncer is syncing forwards, anchored on a range already in store (batch 1)
923        handle_session_batch(&mut p2p_mock, &headers, 1506..=2017, true).await;
924        assert_syncing(&syncer, &store, &[1..=2017], 3000).await;
925
926        // New head from header sub added, should NOT be appended
927        headers.push(gen.next());
928        p2p_mock.announce_new_head(headers.last().unwrap().clone());
929        assert_syncing(&syncer, &store, &[1..=2017], 3001).await;
930
931        // Syncer continues syncing forwads (batch 2)
932        handle_session_batch(&mut p2p_mock, &headers, 2018..=2529, true).await;
933        assert_syncing(&syncer, &store, &[1..=2529], 3001).await;
934
935        // Syncer continues syncing forwards, should include the new head received via HeaderSub (batch 3)
936        handle_session_batch(&mut p2p_mock, &headers, 2530..=3001, true).await;
937        assert_syncing(&syncer, &store, &[1..=3001], 3001).await;
938
939        // Syncer is fulling synced and awaiting for events
940        p2p_mock.expect_no_cmd().await;
941    }
942
943    #[async_test]
944    async fn slow_sync() {
945        let pruning_window = Duration::from_secs(600);
946        let sampling_window = SAMPLING_WINDOW;
947
948        let mut gen = ExtendedHeaderGenerator::new();
949
950        // Generate back in time 4 batches of 512 messages (= 2048 headers).
951        let first_header_time = (Time::now() - Duration::from_secs(2048)).unwrap();
952        gen.set_time(first_header_time, Duration::from_secs(1));
953        let headers = gen.next_many(2048);
954
955        let (syncer, store, mut p2p_mock) =
956            initialized_syncer_with_windows(headers[2047].clone(), sampling_window, pruning_window)
957                .await;
958        assert_syncing(&syncer, &store, &[2048..=2048], 2048).await;
959
960        // The whole 1st batch does not reach the pruning edge.
961        handle_session_batch(&mut p2p_mock, &headers, 1536..=2047, true).await;
962        assert_syncing(&syncer, &store, &[1536..=2048], 2048).await;
963
964        // 2nd batch is partially within pruning window, so fast sync applies.
965        handle_session_batch(&mut p2p_mock, &headers, 1024..=1535, true).await;
966        assert_syncing(&syncer, &store, &[1024..=2048], 2048).await;
967
968        // The whole 3rd batch is beyond pruning edge. So now slow sync applies.
969        // In order for the Syncer to send requests, more than a half of the previous
970        // batch needs to be sampled.
971        syncer.trigger_fetch_next_batch().await.unwrap();
972        p2p_mock.expect_no_cmd().await;
973
974        // After marking the 1st and more than of the 2nd batch, syncer
975        // will finally request the 3rd batch.
976        for height in 1250..=2048 {
977            // Simulate Daser
978            store.mark_as_sampled(height).await.unwrap();
979        }
980        for height in 1300..=1450 {
981            // Simulate Pruner
982            store.remove_height(height).await.unwrap();
983        }
984        syncer.trigger_fetch_next_batch().await.unwrap();
985        // Syncer should not request the pruned range
986        handle_session_batch(&mut p2p_mock, &headers, 512..=1023, true).await;
987        assert_syncing(&syncer, &store, &[512..=1299, 1451..=2048], 2048).await;
988
989        // Syncer should not request the 4th batch since there are plenty
990        // of blocks to be sampled.
991        syncer.trigger_fetch_next_batch().await.unwrap();
992        p2p_mock.expect_no_cmd().await;
993    }
994
995    #[async_test]
996    async fn window_edge() {
997        let month_and_day_ago = Duration::from_secs(31 * 24 * 60 * 60);
998        let mut gen = ExtendedHeaderGenerator::new();
999        gen.set_time(
1000            (Time::now() - month_and_day_ago).expect("to not underflow"),
1001            Duration::from_secs(1),
1002        );
1003        let mut headers = gen.next_many(1200);
1004        gen.reset_time();
1005        headers.append(&mut gen.next_many(2049 - 1200));
1006
1007        let (syncer, store, mut p2p_mock) = initialized_syncer(headers[2048].clone()).await;
1008        assert_syncing(&syncer, &store, &[2049..=2049], 2049).await;
1009
1010        // Syncer requested the first batch
1011        handle_session_batch(&mut p2p_mock, &headers, 1537..=2048, true).await;
1012        assert_syncing(&syncer, &store, &[1537..=2049], 2049).await;
1013
1014        // Syncer requested the second batch hitting the sampling window
1015        handle_session_batch(&mut p2p_mock, &headers, 1025..=1536, true).await;
1016        assert_syncing(&syncer, &store, &[1025..=2049], 2049).await;
1017
1018        // Syncer is fully synced and awaiting for events
1019        p2p_mock.expect_no_cmd().await;
1020    }
1021
1022    #[async_test]
1023    async fn start_with_filled_store() {
1024        let events = EventChannel::new();
1025        let (p2p, mut p2p_mock) = P2p::mocked();
1026        let (store, mut gen) = gen_filled_store(25).await;
1027        let store = Arc::new(store);
1028
1029        let mut headers = gen.next_many(520);
1030        let network_head = gen.next(); // height 546
1031
1032        let syncer = Syncer::start(SyncerArgs {
1033            p2p: Arc::new(p2p),
1034            store: store.clone(),
1035            event_pub: events.publisher(),
1036            batch_size: 512,
1037            sampling_window: SAMPLING_WINDOW,
1038            pruning_window: DEFAULT_PRUNING_WINDOW,
1039        })
1040        .unwrap();
1041
1042        p2p_mock.announce_trusted_peer_connected();
1043
1044        // Syncer asks for current HEAD
1045        let (height, amount, respond_to) = p2p_mock.expect_header_request_for_height_cmd().await;
1046        assert_eq!(height, 0);
1047        assert_eq!(amount, 1);
1048        respond_to.send(Ok(vec![network_head.clone()])).unwrap();
1049
1050        // Now Syncer initializes HeaderSub with the latest HEAD
1051        let head_from_syncer = p2p_mock.expect_init_header_sub().await;
1052        assert_eq!(head_from_syncer, network_head);
1053
1054        assert_syncing(&syncer, &store, &[1..=25, 546..=546], 546).await;
1055
1056        // Syncer requested the first batch
1057        handle_session_batch(&mut p2p_mock, &headers, 34..=545, true).await;
1058        assert_syncing(&syncer, &store, &[1..=25, 34..=546], 546).await;
1059
1060        // Syncer requested the remaining batch ([26, 33])
1061        let (height, amount, respond_to) = p2p_mock.expect_header_request_for_height_cmd().await;
1062        assert_eq!(height, 26);
1063        assert_eq!(amount, 8);
1064        respond_to
1065            .send(Ok(headers.drain(..8).collect()))
1066            .map_err(|_| "headers [538, 545]")
1067            .unwrap();
1068        assert_syncing(&syncer, &store, &[1..=546], 546).await;
1069
1070        // Syncer is fulling synced and awaiting for events
1071        p2p_mock.expect_no_cmd().await;
1072    }
1073
1074    #[async_test]
1075    async fn stop_syncer() {
1076        let mut gen = ExtendedHeaderGenerator::new();
1077        let head = gen.next();
1078
1079        let (syncer, _store, mut p2p_mock) = initialized_syncer(head.clone()).await;
1080
1081        // network head height == 1, so nothing else is produced.
1082        p2p_mock.expect_no_cmd().await;
1083
1084        syncer.stop();
1085        // Wait for Worker to stop
1086        sleep(Duration::from_millis(1)).await;
1087        assert!(matches!(
1088            syncer.info().await.unwrap_err(),
1089            SyncerError::WorkerDied
1090        ));
1091    }
1092
1093    #[async_test]
1094    async fn all_peers_disconnected() {
1095        let mut gen = ExtendedHeaderGenerator::new();
1096
1097        let _gap = gen.next_many(24);
1098        let header25 = gen.next();
1099        let _gap = gen.next_many(4);
1100        let header30 = gen.next();
1101        let _gap = gen.next_many(4);
1102        let header35 = gen.next();
1103
1104        // Start Syncer and report height 30 as HEAD
1105        let (syncer, store, mut p2p_mock) = initialized_syncer(header30).await;
1106
1107        // Wait for the request but do not reply to it.
1108        handle_session_batch(&mut p2p_mock, &[], 1..=29, false).await;
1109
1110        p2p_mock.announce_all_peers_disconnected();
1111        // Syncer is now back to `connecting_event_loop`.
1112        p2p_mock.expect_no_cmd().await;
1113
1114        // Accounce a non-trusted peer. Syncer in `connecting_event_loop` can progress only
1115        // if a trusted peer is connected.
1116        p2p_mock.announce_peer_connected();
1117        p2p_mock.expect_no_cmd().await;
1118
1119        // Accounce a trusted peer.
1120        p2p_mock.announce_trusted_peer_connected();
1121
1122        // Now syncer will send request for HEAD.
1123        let (height, amount, respond_to) = p2p_mock.expect_header_request_for_height_cmd().await;
1124        assert_eq!(height, 0);
1125        assert_eq!(amount, 1);
1126
1127        // Report an older head. Syncer should not accept it.
1128        respond_to.send(Ok(vec![header25])).unwrap();
1129        assert_syncing(&syncer, &store, &[30..=30], 30).await;
1130
1131        // Syncer will request HEAD again after some time.
1132        sleep(Duration::from_secs(1)).await;
1133        let (height, amount, respond_to) = p2p_mock.expect_header_request_for_height_cmd().await;
1134        assert_eq!(height, 0);
1135        assert_eq!(amount, 1);
1136
1137        // Report newer HEAD than before.
1138        respond_to.send(Ok(vec![header35.clone()])).unwrap();
1139        assert_syncing(&syncer, &store, &[30..=30, 35..=35], 35).await;
1140
1141        // Syncer initializes HeaderSub with the latest HEAD.
1142        let head_from_syncer = p2p_mock.expect_init_header_sub().await;
1143        assert_eq!(head_from_syncer, header35);
1144
1145        // Syncer now is in `connected_event_loop` and will try to sync the gap
1146        // that is closer to HEAD.
1147        let (height, amount, _respond_to) = p2p_mock.expect_header_request_for_height_cmd().await;
1148        assert_eq!(height, 31);
1149        assert_eq!(amount, 4);
1150
1151        p2p_mock.announce_all_peers_disconnected();
1152        p2p_mock.expect_no_cmd().await;
1153    }
1154
1155    #[async_test]
1156    async fn all_peers_disconnected_and_no_network_head_progress() {
1157        let mut gen = ExtendedHeaderGenerator::new_from_height(30);
1158
1159        let header30 = gen.next();
1160
1161        // Start Syncer and report height 30 as HEAD
1162        let (syncer, store, mut p2p_mock) = initialized_syncer(header30.clone()).await;
1163
1164        // Wait for the request but do not reply to it.
1165        handle_session_batch(&mut p2p_mock, &[], 1..=29, false).await;
1166
1167        p2p_mock.announce_all_peers_disconnected();
1168        // Syncer is now back to `connecting_event_loop`.
1169        p2p_mock.expect_no_cmd().await;
1170
1171        // Accounce a non-trusted peer. Syncer in `connecting_event_loop` can progress only
1172        // if a trusted peer is connected.
1173        p2p_mock.announce_peer_connected();
1174        p2p_mock.expect_no_cmd().await;
1175
1176        // Accounce a trusted peer.
1177        p2p_mock.announce_trusted_peer_connected();
1178
1179        // Now syncer will send request for HEAD.
1180        let (height, amount, respond_to) = p2p_mock.expect_header_request_for_height_cmd().await;
1181        assert_eq!(height, 0);
1182        assert_eq!(amount, 1);
1183
1184        // Report the same HEAD as before.
1185        respond_to.send(Ok(vec![header30.clone()])).unwrap();
1186        assert_syncing(&syncer, &store, &[30..=30], 30).await;
1187
1188        // Syncer initializes HeaderSub with the latest HEAD.
1189        let head_from_syncer = p2p_mock.expect_init_header_sub().await;
1190        assert_eq!(head_from_syncer, header30);
1191
1192        // Syncer now is in `connected_event_loop` and will try to sync the gap
1193        handle_session_batch(&mut p2p_mock, &[], 1..=29, false).await;
1194
1195        p2p_mock.announce_all_peers_disconnected();
1196        p2p_mock.expect_no_cmd().await;
1197    }
1198
1199    #[async_test]
1200    async fn non_contiguous_response() {
1201        let mut gen = ExtendedHeaderGenerator::new();
1202        let mut headers = gen.next_many(20);
1203
1204        // Start Syncer and report last header as network head
1205        let (syncer, store, mut p2p_mock) = initialized_syncer(headers[19].clone()).await;
1206
1207        let header10 = headers[10].clone();
1208        // make a gap in response, preserving the amount of headers returned
1209        headers[10] = headers[11].clone();
1210
1211        // Syncer requests missing headers
1212        handle_session_batch(&mut p2p_mock, &headers, 1..=19, true).await;
1213
1214        // Syncer should not apply headers from invalid response
1215        assert_syncing(&syncer, &store, &[20..=20], 20).await;
1216
1217        // correct the response
1218        headers[10] = header10;
1219
1220        // Syncer requests missing headers again
1221        handle_session_batch(&mut p2p_mock, &headers, 1..=19, true).await;
1222
1223        // With a correct resposne, syncer should update the store
1224        assert_syncing(&syncer, &store, &[1..=20], 20).await;
1225    }
1226
1227    #[async_test]
1228    async fn another_chain_response() {
1229        let headers = ExtendedHeaderGenerator::new().next_many(20);
1230        let headers_prime = ExtendedHeaderGenerator::new().next_many(20);
1231
1232        // Start Syncer and report last header as network head
1233        let (syncer, store, mut p2p_mock) = initialized_syncer(headers[19].clone()).await;
1234
1235        // Syncer requests missing headers
1236        handle_session_batch(&mut p2p_mock, &headers_prime, 1..=19, true).await;
1237
1238        // Syncer should not apply headers from invalid response
1239        assert_syncing(&syncer, &store, &[20..=20], 20).await;
1240
1241        // Syncer requests missing headers again
1242        handle_session_batch(&mut p2p_mock, &headers, 1..=19, true).await;
1243
1244        // With a correct resposne, syncer should update the store
1245        assert_syncing(&syncer, &store, &[1..=20], 20).await;
1246    }
1247
1248    async fn assert_syncing(
1249        syncer: &Syncer<InMemoryStore>,
1250        store: &InMemoryStore,
1251        expected_synced_ranges: &[RangeInclusive<u64>],
1252        expected_subjective_head: u64,
1253    ) {
1254        // Syncer receives responds on the same loop that receive other events.
1255        // Wait a bit to be processed.
1256        sleep(Duration::from_millis(1)).await;
1257
1258        let store_ranges = store.get_stored_header_ranges().await.unwrap();
1259        let syncing_info = syncer.info().await.unwrap();
1260
1261        assert_eq!(store_ranges.as_ref(), expected_synced_ranges);
1262        assert_eq!(syncing_info.stored_headers.as_ref(), expected_synced_ranges);
1263        assert_eq!(syncing_info.subjective_head, expected_subjective_head);
1264    }
1265
1266    async fn initialized_syncer(
1267        head: ExtendedHeader,
1268    ) -> (Syncer<InMemoryStore>, Arc<InMemoryStore>, MockP2pHandle) {
1269        initialized_syncer_with_windows(head, SAMPLING_WINDOW, DEFAULT_PRUNING_WINDOW).await
1270    }
1271
1272    async fn initialized_syncer_with_windows(
1273        head: ExtendedHeader,
1274        sampling_window: Duration,
1275        pruning_window: Duration,
1276    ) -> (Syncer<InMemoryStore>, Arc<InMemoryStore>, MockP2pHandle) {
1277        let events = EventChannel::new();
1278        let (mock, mut handle) = P2p::mocked();
1279        let store = Arc::new(InMemoryStore::new());
1280
1281        let syncer = Syncer::start(SyncerArgs {
1282            p2p: Arc::new(mock),
1283            store: store.clone(),
1284            event_pub: events.publisher(),
1285            batch_size: 512,
1286            sampling_window,
1287            pruning_window,
1288        })
1289        .unwrap();
1290
1291        // Syncer is waiting for a trusted peer to connect
1292        handle.expect_no_cmd().await;
1293        handle.announce_peer_connected();
1294        handle.expect_no_cmd().await;
1295        handle.announce_trusted_peer_connected();
1296
1297        // After connection is established Syncer asks current network HEAD
1298        let (height, amount, respond_to) = handle.expect_header_request_for_height_cmd().await;
1299        assert_eq!(height, 0);
1300        assert_eq!(amount, 1);
1301        respond_to.send(Ok(vec![head.clone()])).unwrap();
1302
1303        // Now Syncer initializes HeaderSub with the latest HEAD
1304        let head_from_syncer = handle.expect_init_header_sub().await;
1305        assert_eq!(head_from_syncer, head);
1306
1307        let head_height = head.height().value();
1308        assert_syncing(&syncer, &store, &[head_height..=head_height], head_height).await;
1309
1310        (syncer, store, handle)
1311    }
1312
1313    async fn handle_session_batch(
1314        p2p_mock: &mut MockP2pHandle,
1315        remaining_headers: &[ExtendedHeader],
1316        range: BlockRange,
1317        respond: bool,
1318    ) {
1319        range.validate().unwrap();
1320
1321        let mut ranges_to_request = BlockRanges::new();
1322        ranges_to_request.insert_relaxed(&range).unwrap();
1323
1324        let mut no_respond_chans = Vec::new();
1325
1326        for _ in 0..requests_in_session(range.len()) {
1327            let (height, amount, respond_to) =
1328                p2p_mock.expect_header_request_for_height_cmd().await;
1329
1330            let requested_range = height..=height + amount - 1;
1331            ranges_to_request.remove_strict(requested_range);
1332
1333            if respond {
1334                let header_index = remaining_headers
1335                    .iter()
1336                    .position(|h| h.height().value() == height)
1337                    .expect("height not found in provided headers");
1338
1339                let response_range =
1340                    remaining_headers[header_index..header_index + amount as usize].to_vec();
1341                respond_to
1342                    .send(Ok(response_range))
1343                    .map_err(|_| format!("headers [{}, {}]", height, height + amount - 1))
1344                    .unwrap();
1345            } else {
1346                no_respond_chans.push(respond_to);
1347            }
1348        }
1349
1350        // Real libp2p implementation will create a timeout error if the peer does not
1351        // respond. We need to simulate that, otherwise we end up with some undesirable
1352        // behaviours in Syncer.
1353        if !respond {
1354            spawn(async move {
1355                sleep(Duration::from_secs(10)).await;
1356
1357                for respond_chan in no_respond_chans {
1358                    respond_chan.maybe_send_err(P2pError::HeaderEx(
1359                        HeaderExError::OutboundFailure(OutboundFailure::Timeout),
1360                    ));
1361                }
1362            });
1363        }
1364
1365        assert!(
1366            ranges_to_request.is_empty(),
1367            "Some headers weren't requested. expected range: {}, not requested: {}",
1368            range.display(),
1369            ranges_to_request
1370        );
1371    }
1372
1373    fn requests_in_session(headers: u64) -> usize {
1374        let max_requests = headers.div_ceil(header_session::MAX_AMOUNT_PER_REQ) as usize;
1375        let min_requests = headers.div_ceil(header_session::MIN_AMOUNT_PER_REQ) as usize;
1376
1377        if max_requests > header_session::MAX_CONCURRENT_REQS {
1378            // if we have to do more requests than our concurrency limit anyway
1379            max_requests
1380        } else {
1381            // otherwise we can handle batch fully concurrent
1382            header_session::MAX_CONCURRENT_REQS.min(min_requests)
1383        }
1384    }
1385
1386    impl BlockRanges {
1387        fn remove_strict(&mut self, range: BlockRange) {
1388            for stored in self.as_ref() {
1389                if stored.contains(range.start()) && stored.contains(range.end()) {
1390                    self.remove_relaxed(range).unwrap();
1391                    return;
1392                }
1393            }
1394
1395            panic!("block ranges ({self}) don't contain {}", range.display());
1396        }
1397    }
1398}