Skip to main content

forest/chain_sync/
network_context.rs

1// Copyright 2019-2026 ChainSafe Systems
2// SPDX-License-Identifier: Apache-2.0, MIT
3
4use std::{
5    convert::TryFrom,
6    num::{NonZeroU64, NonZeroUsize},
7    sync::{
8        Arc, LazyLock,
9        atomic::{AtomicU64, Ordering},
10    },
11    time::{Duration, Instant},
12};
13
14use crate::{
15    blocks::{FullTipset, Tipset, TipsetKey, TipsetLike},
16    libp2p::{
17        NetworkMessage, PeerId, PeerManager,
18        chain_exchange::{
19            ChainExchangeRequest, ChainExchangeResponse, HEADERS, MESSAGES, TipsetBundle,
20        },
21        hello::{HelloRequest, HelloResponse},
22        rpc::RequestResponseError,
23    },
24    utils::{
25        ShallowClone,
26        misc::{AdaptiveValueProvider, ExponentialAdaptiveValueProvider},
27        stats::Stats,
28    },
29};
30use anyhow::Context as _;
31use fvm_ipld_blockstore::Blockstore;
32use nonzero_ext::nonzero;
33use parking_lot::Mutex;
34use std::future::Future;
35use tokio::sync::Semaphore;
36use tokio::task::JoinSet;
37use tracing::{debug, trace};
38
39/// Timeout milliseconds for response from an RPC request
40// This value is automatically adapted in the range of [5, 60] for different network conditions,
41// being decreased on success and increased on failure
42static CHAIN_EXCHANGE_TIMEOUT_MILLIS: LazyLock<ExponentialAdaptiveValueProvider<u64>> =
43    LazyLock::new(|| ExponentialAdaptiveValueProvider::new(5000, 2000, 60000, false));
44
45/// Maximum number of concurrent chain exchange request being sent to the
46/// network.
47static MAX_CONCURRENT_CHAIN_EXCHANGE_REQUESTS: LazyLock<NonZeroUsize> = LazyLock::new(|| {
48    std::env::var("FOREST_MAX_CONCURRENT_CHAIN_EXCHANGE_REQUESTS")
49        .ok()
50        .and_then(|i| {
51            i.parse().ok().inspect(|i| {
52                tracing::info!("max concurrent chain exchange requests set to {i} from `FOREST_MAX_CONCURRENT_CHAIN_EXCHANGE_REQUESTS`");
53            })
54        }).unwrap_or(nonzero!(3_usize))
55});
56
57/// Context used in chain sync to handle network requests.
58/// This contains the peer manager, P2P service interface, and [`Blockstore`]
59/// required to make network requests.
60#[derive(derive_more::Constructor)]
61pub struct SyncNetworkContext<DB> {
62    /// Channel to send network messages through P2P service
63    network_send: flume::Sender<NetworkMessage>,
64    /// Manages peers to send requests to and updates request stats for the
65    /// respective peers.
66    peer_manager: Arc<PeerManager>,
67    db: Arc<DB>,
68}
69
70impl<DB> ShallowClone for SyncNetworkContext<DB> {
71    fn shallow_clone(&self) -> Self {
72        Self {
73            network_send: self.network_send.clone(),
74            peer_manager: self.peer_manager.shallow_clone(),
75            db: self.db.shallow_clone(),
76        }
77    }
78}
79
80/// Race tasks to completion while limiting the number of tasks that may execute concurrently.
81/// Once a task finishes without error, the rest of the tasks are canceled.
82struct RaceBatch<T> {
83    tasks: JoinSet<anyhow::Result<T>>,
84    semaphore: Arc<Semaphore>,
85}
86
87impl<T> RaceBatch<T>
88where
89    T: Send + 'static,
90{
91    pub fn new(max_concurrent_jobs: NonZeroUsize) -> Self {
92        RaceBatch {
93            tasks: JoinSet::new(),
94            semaphore: Arc::new(Semaphore::new(max_concurrent_jobs.get())),
95        }
96    }
97
98    pub fn add(&mut self, future: impl Future<Output = anyhow::Result<T>> + Send + 'static) {
99        let sem = self.semaphore.clone();
100        self.tasks.spawn(async move {
101            let permit = sem
102                .acquire_owned()
103                .await
104                .context("Semaphore unexpectedly closed")?;
105            let result = future.await;
106            drop(permit);
107            result
108        });
109    }
110
111    /// Return first finishing `Ok` future that passes validation else return `None` if all jobs failed
112    pub async fn get_ok_validated<F>(mut self, validate: F) -> Option<T>
113    where
114        F: Fn(&T) -> bool,
115    {
116        while let Some(result) = self.tasks.join_next().await {
117            if let Ok(Ok(value)) = result
118                && validate(&value)
119            {
120                return Some(value);
121            }
122        }
123        // So far every task have failed
124        None
125    }
126}
127
128impl<DB> SyncNetworkContext<DB>
129where
130    DB: Blockstore,
131{
132    /// Returns a reference to the peer manager of the network context.
133    pub fn peer_manager(&self) -> &PeerManager {
134        self.peer_manager.as_ref()
135    }
136
137    /// Returns a reference to the channel for sending network messages through P2P service.
138    pub fn network_send(&self) -> &flume::Sender<NetworkMessage> {
139        &self.network_send
140    }
141
142    /// Send a `chain_exchange` request for only block headers (ignore
143    /// messages). If `peer_id` is `None`, requests will be sent to a set of
144    /// shuffled peers.
145    pub async fn chain_exchange_headers(
146        &self,
147        peer_id: Option<PeerId>,
148        tsk: &TipsetKey,
149        count: NonZeroU64,
150    ) -> anyhow::Result<Vec<Tipset>> {
151        self.handle_chain_exchange_request(peer_id, tsk, count, HEADERS, |tipsets| {
152            validate_network_tipsets(tipsets, tsk)
153        })
154        .await
155    }
156
157    /// Send a `chain_exchange` request for messages to assemble a full tipset with a local tipset,
158    /// If `peer_id` is `None`, requests will be sent to a set of shuffled peers.
159    pub async fn chain_exchange_messages(
160        &self,
161        peer_id: Option<PeerId>,
162        ts: &Tipset,
163    ) -> anyhow::Result<FullTipset> {
164        let mut bundles: Vec<TipsetBundle> = self
165            .handle_chain_exchange_request(peer_id, ts.key(), nonzero!(1_u64), MESSAGES, |_| true)
166            .await?;
167
168        if bundles.len() != 1 {
169            anyhow::bail!(
170                "chain exchange request returned {} tipsets, 1 expected.",
171                bundles.len()
172            );
173        }
174        let mut bundle = bundles.remove(0);
175        bundle.blocks = ts.block_headers().to_vec();
176        bundle.try_into()
177    }
178
179    /// Send a `chain_exchange` request for a single full tipset (includes
180    /// messages) If `peer_id` is `None`, requests will be sent to a set of
181    /// shuffled peers.
182    pub async fn chain_exchange_full_tipset(
183        &self,
184        peer_id: Option<PeerId>,
185        tsk: &TipsetKey,
186    ) -> anyhow::Result<FullTipset> {
187        let mut fts = self
188            .handle_chain_exchange_request(
189                peer_id,
190                tsk,
191                nonzero!(1_u64),
192                HEADERS | MESSAGES,
193                |tipsets| validate_network_tipsets(tipsets, tsk),
194            )
195            .await?;
196
197        anyhow::ensure!(
198            fts.len() == 1,
199            "Full tipset request returned {} tipsets, 1 expected.",
200            fts.len()
201        );
202
203        Ok(fts.remove(0))
204    }
205
206    pub async fn chain_exchange_full_tipsets(
207        &self,
208        peer_id: Option<PeerId>,
209        tsk: &TipsetKey,
210    ) -> anyhow::Result<Vec<FullTipset>> {
211        self.handle_chain_exchange_request(
212            peer_id,
213            tsk,
214            nonzero!(16_u64),
215            HEADERS | MESSAGES,
216            |_| true,
217        )
218        .await
219    }
220
221    /// Helper function to handle the peer retrieval if no peer supplied as well
222    /// as the logging and updating of the peer info in the `PeerManager`.
223    pub async fn handle_chain_exchange_request<T, F>(
224        &self,
225        peer_id: Option<PeerId>,
226        tsk: &TipsetKey,
227        request_len: NonZeroU64,
228        options: u64,
229        validate: F,
230    ) -> anyhow::Result<Vec<T>>
231    where
232        T: TryFrom<TipsetBundle> + Send + Sync + 'static,
233        <T as TryFrom<TipsetBundle>>::Error: Into<anyhow::Error>,
234        F: Fn(&Vec<T>) -> bool,
235    {
236        let request = ChainExchangeRequest {
237            start: tsk.to_cids(),
238            request_len: request_len.get(),
239            options,
240        };
241
242        let global_pre_time = Instant::now();
243        let network_failures = Arc::new(AtomicU64::new(0));
244        let lookup_failures = Arc::new(AtomicU64::new(0));
245        let chain_exchange_result = match peer_id {
246            // Specific peer is given to send request, send specifically to that peer.
247            Some(id) => Self::chain_exchange_request(
248                self.peer_manager.clone(),
249                self.network_send.clone(),
250                id,
251                request,
252            )
253            .await?
254            .into_result()?,
255            None => {
256                // No specific peer set, send requests to a shuffled set of top peers until
257                // a request succeeds.
258                let peers = self.peer_manager.top_peers_shuffled();
259                anyhow::ensure!(
260                    !peers.is_empty(),
261                    "chain exchange failed: no peers are available"
262                );
263
264                let n_peers = peers.len();
265                let mut batch = RaceBatch::new(*MAX_CONCURRENT_CHAIN_EXCHANGE_REQUESTS);
266                let success_time_cost_millis_stats = Arc::new(Mutex::new(Stats::new()));
267                for peer_id in peers.into_iter() {
268                    let peer_manager = self.peer_manager.clone();
269                    let network_send = self.network_send.clone();
270                    let request = request.clone();
271                    let network_failures = network_failures.clone();
272                    let lookup_failures = lookup_failures.clone();
273                    let success_time_cost_millis_stats = success_time_cost_millis_stats.clone();
274                    batch.add(async move {
275                        let start = Instant::now();
276                        match Self::chain_exchange_request(
277                            peer_manager,
278                            network_send,
279                            peer_id,
280                            request,
281                        )
282                        .await
283                        {
284                            Ok(chain_exchange_result) => {
285                                match chain_exchange_result.into_result::<T>() {
286                                    Ok(r) => {
287                                        success_time_cost_millis_stats.lock().update(
288                                            start.elapsed().as_millis()
289                                        );
290                                        Ok(r)
291                                    }
292                                    Err(error) => {
293                                        lookup_failures.fetch_add(1, Ordering::Relaxed);
294                                        debug!(%peer_id, %request_len, %options, %n_peers, %error, "Failed chain_exchange response");
295                                        Err(error)
296                                    }
297                                }
298                            }
299                            Err(error) => {
300                                network_failures.fetch_add(1, Ordering::Relaxed);
301                                debug!(%peer_id, %request_len, %options, %n_peers, %error, "Failed chain_exchange request to peer");
302                                Err(error)
303                            }
304                        }
305                    });
306                }
307
308                let make_failure_message = || {
309                    CHAIN_EXCHANGE_TIMEOUT_MILLIS.adapt_on_failure();
310                    tracing::debug!(
311                        "Increased chain exchange timeout to {}ms",
312                        CHAIN_EXCHANGE_TIMEOUT_MILLIS.get()
313                    );
314                    let mut message = String::new();
315                    message.push_str("ChainExchange request failed for all top peers. ");
316                    message.push_str(&format!(
317                        "{} network failures, ",
318                        network_failures.load(Ordering::Relaxed)
319                    ));
320                    message.push_str(&format!(
321                        "{} lookup failures, ",
322                        lookup_failures.load(Ordering::Relaxed)
323                    ));
324                    message.push_str(&format!("request:\n{request:?}"));
325                    anyhow::anyhow!(message)
326                };
327
328                let v = batch
329                    .get_ok_validated(validate)
330                    .await
331                    .ok_or_else(make_failure_message)?;
332                if let Ok(mean) = success_time_cost_millis_stats.lock().mean()
333                    && CHAIN_EXCHANGE_TIMEOUT_MILLIS.adapt_on_success(mean as _)
334                {
335                    tracing::debug!(
336                        "Decreased chain exchange timeout to {}ms. Current average: {}ms",
337                        CHAIN_EXCHANGE_TIMEOUT_MILLIS.get(),
338                        mean,
339                    );
340                }
341                trace!("Succeed: handle_chain_exchange_request");
342                v
343            }
344        };
345
346        // Log success for the global request with the latency from before sending.
347        self.peer_manager
348            .log_global_success(Instant::now().duration_since(global_pre_time));
349
350        Ok(chain_exchange_result)
351    }
352
353    /// Send a `chain_exchange` request to the network and await response.
354    async fn chain_exchange_request(
355        peer_manager: Arc<PeerManager>,
356        network_send: flume::Sender<NetworkMessage>,
357        peer_id: PeerId,
358        request: ChainExchangeRequest,
359    ) -> anyhow::Result<ChainExchangeResponse> {
360        trace!("Sending ChainExchange Request to {peer_id}");
361
362        let req_pre_time = Instant::now();
363
364        let (tx, rx) = flume::bounded(1);
365        if network_send
366            .send_async(NetworkMessage::ChainExchangeRequest {
367                peer_id,
368                request,
369                response_channel: tx,
370            })
371            .await
372            .is_err()
373        {
374            anyhow::bail!("Failed to send chain exchange request to network");
375        };
376
377        // Add timeout to receiving response from p2p service to avoid stalling.
378        // There is also a timeout inside the request-response calls, but this ensures
379        // this.
380        let res = tokio::task::spawn_blocking(move || {
381            rx.recv_timeout(Duration::from_millis(CHAIN_EXCHANGE_TIMEOUT_MILLIS.get()))
382        })
383        .await;
384        let res_duration = Instant::now().duration_since(req_pre_time);
385        match res {
386            Ok(Ok(Ok(bs_res))) => {
387                // Successful response
388                peer_manager.log_success(&peer_id, res_duration);
389                trace!("Succeeded: ChainExchange Request to {peer_id}");
390                Ok(bs_res)
391            }
392            Ok(Ok(Err(e))) => {
393                // Internal libp2p error, score failure for peer and potentially disconnect
394                match e {
395                    RequestResponseError::UnsupportedProtocols => {
396                        // refactor this into Networkevent if user agent logging is critical here
397                        peer_manager
398                            .ban_peer_with_default_duration(
399                                peer_id,
400                                "ChainExchange protocol unsupported",
401                                |_| None,
402                            )
403                            .await;
404                    }
405                    RequestResponseError::ConnectionClosed | RequestResponseError::DialFailure => {
406                        peer_manager.mark_peer_bad(peer_id, format!("chain exchange error {e:?}"));
407                    }
408                    // Ignore dropping peer on timeout for now. Can't be confident yet that the
409                    // specified timeout is adequate time.
410                    RequestResponseError::Timeout | RequestResponseError::Io(_) => {
411                        peer_manager.log_failure(&peer_id, res_duration);
412                    }
413                }
414                debug!("Failed: ChainExchange Request to {peer_id}");
415                anyhow::bail!("Internal libp2p error: {e:?}");
416            }
417            Ok(Err(_)) | Err(_) => {
418                // Sender channel internally dropped or timeout, both should log failure which
419                // will negatively score the peer, but not drop yet.
420                peer_manager.log_failure(&peer_id, res_duration);
421                debug!("Timeout: ChainExchange Request to {peer_id}");
422                anyhow::bail!("Chain exchange request to {peer_id} timed out");
423            }
424        }
425    }
426
427    /// Send a hello request to the network (does not immediately await
428    /// response).
429    pub async fn hello_request(
430        &self,
431        peer_id: PeerId,
432        request: HelloRequest,
433    ) -> anyhow::Result<(PeerId, Instant, Option<HelloResponse>)> {
434        trace!("Sending Hello Message to {}", peer_id);
435
436        // Create oneshot channel for receiving response from sent hello.
437        let (tx, rx) = flume::bounded(1);
438
439        // Send request into libp2p service
440        self.network_send
441            .send_async(NetworkMessage::HelloRequest {
442                peer_id,
443                request,
444                response_channel: tx,
445            })
446            .await
447            .context("Failed to send hello request: receiver dropped")?;
448
449        const HELLO_TIMEOUT: Duration = Duration::from_secs(30);
450        let sent = Instant::now();
451        let res = tokio::task::spawn_blocking(move || rx.recv_timeout(HELLO_TIMEOUT))
452            .await?
453            .ok();
454        Ok((peer_id, sent, res))
455    }
456}
457
458/// Validates network tipsets that are sorted by epoch in descending order with the below checks
459/// 1. The latest(first) tipset has the desired tipset key
460/// 2. The sorted tipsets are chained by their tipset keys
461fn validate_network_tipsets<T: TipsetLike>(tipsets: &[T], start_tipset_key: &TipsetKey) -> bool {
462    if let Some(start) = tipsets.first() {
463        if start.key() != start_tipset_key {
464            tracing::warn!(epoch=%start.epoch(), expected=%start_tipset_key, actual=%start.key(), "start tipset key mismatch");
465            return false;
466        }
467        for (ts, pts) in tipsets.iter().zip(tipsets.iter().skip(1)) {
468            if ts.parents() != pts.key() {
469                tracing::warn!(epoch=%ts.epoch(), expected_parent=%pts.key(), actual_parent=%ts.parents(), "invalid chain");
470                return false;
471            }
472        }
473        true
474    } else {
475        tracing::warn!("invalid empty chain_exchange_headers response");
476        false
477    }
478}
479
480#[cfg(test)]
481mod tests {
482    use super::*;
483
484    use std::sync::atomic::{AtomicBool, AtomicUsize};
485
486    impl<T> RaceBatch<T>
487    where
488        T: Send + 'static,
489    {
490        pub async fn get_ok(self) -> Option<T> {
491            self.get_ok_validated(|_| true).await
492        }
493    }
494
495    #[tokio::test]
496    async fn race_batch_ok() {
497        let mut batch = RaceBatch::new(nonzero!(3_usize));
498        batch.add(async move { Ok(1) });
499        batch.add(async move { anyhow::bail!("kaboom") });
500
501        assert_eq!(batch.get_ok().await, Some(1));
502    }
503
504    #[tokio::test]
505    async fn race_batch_ok_faster() {
506        let mut batch = RaceBatch::new(nonzero!(3_usize));
507        batch.add(async move {
508            tokio::time::sleep(Duration::from_secs(100)).await;
509            Ok(1)
510        });
511        batch.add(async move { Ok(2) });
512        batch.add(async move { anyhow::bail!("kaboom") });
513
514        assert_eq!(batch.get_ok().await, Some(2));
515    }
516
517    #[tokio::test]
518    async fn race_batch_none() {
519        let mut batch: RaceBatch<i32> = RaceBatch::new(nonzero!(3_usize));
520        batch.add(async move { anyhow::bail!("kaboom") });
521        batch.add(async move { anyhow::bail!("banana") });
522
523        assert_eq!(batch.get_ok().await, None);
524    }
525
526    #[tokio::test]
527    async fn race_batch_semaphore() {
528        const MAX_JOBS: NonZeroUsize = nonzero!(30_usize);
529        let counter = Arc::new(AtomicUsize::new(0));
530        let exceeded = Arc::new(AtomicBool::new(false));
531
532        let mut batch: RaceBatch<i32> = RaceBatch::new(MAX_JOBS);
533        for _ in 0..10000 {
534            let c = counter.clone();
535            let e = exceeded.clone();
536            batch.add(async move {
537                let prev = c.fetch_add(1, Ordering::Relaxed);
538                if prev >= MAX_JOBS.get() {
539                    e.fetch_or(true, Ordering::Relaxed);
540                }
541
542                tokio::task::yield_now().await;
543                c.fetch_sub(1, Ordering::Relaxed);
544
545                anyhow::bail!("banana")
546            });
547        }
548
549        assert_eq!(batch.get_ok().await, None);
550        assert!(!exceeded.load(Ordering::Relaxed));
551    }
552
553    #[tokio::test]
554    async fn race_batch_semaphore_exceeded() {
555        const MAX_JOBS: NonZeroUsize = nonzero!(30_usize);
556        let counter = Arc::new(AtomicUsize::new(0));
557        let exceeded = Arc::new(AtomicBool::new(false));
558
559        // We add one more job to exceed the limit
560        let mut batch: RaceBatch<i32> = RaceBatch::new(MAX_JOBS.checked_add(1).unwrap());
561        for _ in 0..10000 {
562            let c = counter.clone();
563            let e = exceeded.clone();
564            batch.add(async move {
565                let prev = c.fetch_add(1, Ordering::Relaxed);
566                if prev >= MAX_JOBS.get() {
567                    e.fetch_or(true, Ordering::Relaxed);
568                }
569
570                tokio::task::yield_now().await;
571                c.fetch_sub(1, Ordering::Relaxed);
572
573                anyhow::bail!("banana")
574            });
575        }
576
577        assert_eq!(batch.get_ok().await, None);
578        assert!(exceeded.load(Ordering::Relaxed));
579    }
580
581    #[test]
582    #[allow(unused_variables)]
583    fn validate_network_tipsets_tests() {
584        use crate::blocks::{Chain4U, chain4u};
585
586        let c4u = Chain4U::new();
587        chain4u! {
588            in c4u;
589            t0 @ [genesis_header]
590            -> t1 @ [first_header]
591            -> t2 @ [second_left, second_right]
592            -> t3 @ [third]
593            -> t4 @ [fourth]
594        };
595        assert!(validate_network_tipsets(
596            &[t4.clone(), t3.clone(), t2.clone(), t1.clone(), t0.clone()],
597            t4.key()
598        ));
599        assert!(!validate_network_tipsets(
600            &[t4.clone(), t3.clone(), t2.clone(), t1.clone(), t0.clone()],
601            t3.key()
602        ));
603        assert!(!validate_network_tipsets(
604            &[t4.clone(), t2.clone(), t1.clone(), t0.clone()],
605            t4.key()
606        ));
607    }
608}