forest/chain_sync/
network_context.rs

1// Copyright 2019-2025 ChainSafe Systems
2// SPDX-License-Identifier: Apache-2.0, MIT
3
4use std::{
5    convert::TryFrom,
6    num::NonZeroU64,
7    sync::{
8        Arc, LazyLock,
9        atomic::{AtomicU64, Ordering},
10    },
11    time::{Duration, Instant},
12};
13
14use crate::{
15    blocks::{FullTipset, Tipset, TipsetKey},
16    libp2p::{
17        NetworkMessage, PeerId, PeerManager,
18        chain_exchange::{
19            ChainExchangeRequest, ChainExchangeResponse, HEADERS, MESSAGES, TipsetBundle,
20        },
21        hello::{HelloRequest, HelloResponse},
22        rpc::RequestResponseError,
23    },
24    utils::{
25        misc::{AdaptiveValueProvider, ExponentialAdaptiveValueProvider},
26        stats::Stats,
27    },
28};
29use anyhow::Context as _;
30use fvm_ipld_blockstore::Blockstore;
31use parking_lot::Mutex;
32use std::future::Future;
33use tokio::sync::Semaphore;
34use tokio::task::JoinSet;
35use tracing::{debug, trace};
36
37/// Timeout milliseconds for response from an RPC request
38// This value is automatically adapted in the range of [5, 60] for different network conditions,
39// being decreased on success and increased on failure
40static CHAIN_EXCHANGE_TIMEOUT_MILLIS: LazyLock<ExponentialAdaptiveValueProvider<u64>> =
41    LazyLock::new(|| ExponentialAdaptiveValueProvider::new(5000, 2000, 60000, false));
42
43/// Maximum number of concurrent chain exchange request being sent to the
44/// network.
45const MAX_CONCURRENT_CHAIN_EXCHANGE_REQUESTS: usize = 2;
46
47/// Context used in chain sync to handle network requests.
48/// This contains the peer manager, P2P service interface, and [`Blockstore`]
49/// required to make network requests.
50pub struct SyncNetworkContext<DB> {
51    /// Channel to send network messages through P2P service
52    network_send: flume::Sender<NetworkMessage>,
53    /// Manages peers to send requests to and updates request stats for the
54    /// respective peers.
55    peer_manager: Arc<PeerManager>,
56    db: Arc<DB>,
57}
58
59impl<DB> Clone for SyncNetworkContext<DB> {
60    fn clone(&self) -> Self {
61        Self {
62            network_send: self.network_send.clone(),
63            peer_manager: self.peer_manager.clone(),
64            db: self.db.clone(),
65        }
66    }
67}
68
69/// Race tasks to completion while limiting the number of tasks that may execute concurrently.
70/// Once a task finishes without error, the rest of the tasks are canceled.
71struct RaceBatch<T> {
72    tasks: JoinSet<Result<T, String>>,
73    semaphore: Arc<Semaphore>,
74}
75
76impl<T> RaceBatch<T>
77where
78    T: Send + 'static,
79{
80    pub fn new(max_concurrent_jobs: usize) -> Self {
81        RaceBatch {
82            tasks: JoinSet::new(),
83            semaphore: Arc::new(Semaphore::new(max_concurrent_jobs)),
84        }
85    }
86
87    pub fn add(&mut self, future: impl Future<Output = Result<T, String>> + Send + 'static) {
88        let sem = self.semaphore.clone();
89        self.tasks.spawn(async move {
90            let permit = sem
91                .acquire_owned()
92                .await
93                .map_err(|_| "Semaphore unexpectedly closed")?;
94            let result = future.await;
95            drop(permit);
96            result
97        });
98    }
99
100    /// Return first finishing `Ok` future that passes validation else return `None` if all jobs failed
101    pub async fn get_ok_validated<F>(mut self, validate: F) -> Option<T>
102    where
103        F: Fn(&T) -> bool,
104    {
105        while let Some(result) = self.tasks.join_next().await {
106            if let Ok(Ok(value)) = result
107                && validate(&value)
108            {
109                return Some(value);
110            }
111        }
112        // So far every task have failed
113        None
114    }
115}
116
117impl<DB> SyncNetworkContext<DB>
118where
119    DB: Blockstore,
120{
121    pub fn new(
122        network_send: flume::Sender<NetworkMessage>,
123        peer_manager: Arc<PeerManager>,
124        db: Arc<DB>,
125    ) -> Self {
126        Self {
127            network_send,
128            peer_manager,
129            db,
130        }
131    }
132
133    /// Returns a reference to the peer manager of the network context.
134    pub fn peer_manager(&self) -> &PeerManager {
135        self.peer_manager.as_ref()
136    }
137
138    /// Returns a reference to the channel for sending network messages through P2P service.
139    pub fn network_send(&self) -> &flume::Sender<NetworkMessage> {
140        &self.network_send
141    }
142
143    /// Send a `chain_exchange` request for only block headers (ignore
144    /// messages). If `peer_id` is `None`, requests will be sent to a set of
145    /// shuffled peers.
146    pub async fn chain_exchange_headers(
147        &self,
148        peer_id: Option<PeerId>,
149        tsk: &TipsetKey,
150        count: NonZeroU64,
151    ) -> Result<Vec<Arc<Tipset>>, String> {
152        self.handle_chain_exchange_request(
153            peer_id,
154            tsk,
155            count,
156            HEADERS,
157            |tipsets: &Vec<Arc<Tipset>>| validate_network_tipsets(tipsets, tsk),
158        )
159        .await
160    }
161
162    /// Send a `chain_exchange` request for messages to assemble a full tipset with a local tipset,
163    /// If `peer_id` is `None`, requests will be sent to a set of shuffled peers.
164    pub async fn chain_exchange_messages(
165        &self,
166        peer_id: Option<PeerId>,
167        ts: &Tipset,
168    ) -> Result<FullTipset, String> {
169        let mut bundles: Vec<TipsetBundle> = self
170            .handle_chain_exchange_request(
171                peer_id,
172                ts.key(),
173                NonZeroU64::new(1).expect("Infallible"),
174                MESSAGES,
175                |_| true,
176            )
177            .await?;
178
179        if bundles.len() != 1 {
180            return Err(format!(
181                "chain exchange request returned {} tipsets, 1 expected.",
182                bundles.len()
183            ));
184        }
185        let mut bundle = bundles.remove(0);
186        bundle.blocks = ts.block_headers().to_vec();
187        bundle.try_into()
188    }
189
190    /// Send a `chain_exchange` request for a single full tipset (includes
191    /// messages) If `peer_id` is `None`, requests will be sent to a set of
192    /// shuffled peers.
193    pub async fn chain_exchange_full_tipset(
194        &self,
195        peer_id: Option<PeerId>,
196        tsk: &TipsetKey,
197    ) -> Result<FullTipset, String> {
198        let mut fts = self
199            .handle_chain_exchange_request(
200                peer_id,
201                tsk,
202                NonZeroU64::new(1).expect("Infallible"),
203                HEADERS | MESSAGES,
204                |_| true,
205            )
206            .await?;
207
208        if fts.len() != 1 {
209            return Err(format!(
210                "Full tipset request returned {} tipsets, 1 expected.",
211                fts.len()
212            ));
213        }
214        Ok(fts.remove(0))
215    }
216
217    pub async fn chain_exchange_full_tipsets(
218        &self,
219        peer_id: Option<PeerId>,
220        tsk: &TipsetKey,
221    ) -> Result<Vec<FullTipset>, String> {
222        self.handle_chain_exchange_request(
223            peer_id,
224            tsk,
225            NonZeroU64::new(16).expect("Infallible"),
226            HEADERS | MESSAGES,
227            |_| true,
228        )
229        .await
230    }
231
232    /// Helper function to handle the peer retrieval if no peer supplied as well
233    /// as the logging and updating of the peer info in the `PeerManager`.
234    async fn handle_chain_exchange_request<T, F>(
235        &self,
236        peer_id: Option<PeerId>,
237        tsk: &TipsetKey,
238        request_len: NonZeroU64,
239        options: u64,
240        validate: F,
241    ) -> Result<Vec<T>, String>
242    where
243        T: TryFrom<TipsetBundle> + Send + Sync + 'static,
244        <T as TryFrom<TipsetBundle>>::Error: std::fmt::Display,
245        F: Fn(&Vec<T>) -> bool,
246    {
247        let request = ChainExchangeRequest {
248            start: tsk.to_cids(),
249            request_len: request_len.get(),
250            options,
251        };
252
253        let global_pre_time = Instant::now();
254        let network_failures = Arc::new(AtomicU64::new(0));
255        let lookup_failures = Arc::new(AtomicU64::new(0));
256        let chain_exchange_result = match peer_id {
257            // Specific peer is given to send request, send specifically to that peer.
258            Some(id) => Self::chain_exchange_request(
259                self.peer_manager.clone(),
260                self.network_send.clone(),
261                id,
262                request,
263            )
264            .await?
265            .into_result()?,
266            None => {
267                // No specific peer set, send requests to a shuffled set of top peers until
268                // a request succeeds.
269                let peers = self.peer_manager.top_peers_shuffled();
270                if peers.is_empty() {
271                    return Err("chain exchange failed: no peers are available".into());
272                }
273                let n_peers = peers.len();
274                let mut batch = RaceBatch::new(MAX_CONCURRENT_CHAIN_EXCHANGE_REQUESTS);
275                let success_time_cost_millis_stats = Arc::new(Mutex::new(Stats::new()));
276                for peer_id in peers.into_iter() {
277                    let peer_manager = self.peer_manager.clone();
278                    let network_send = self.network_send.clone();
279                    let request = request.clone();
280                    let network_failures = network_failures.clone();
281                    let lookup_failures = lookup_failures.clone();
282                    let success_time_cost_millis_stats = success_time_cost_millis_stats.clone();
283                    batch.add(async move {
284                        let start = Instant::now();
285                        match Self::chain_exchange_request(
286                            peer_manager,
287                            network_send,
288                            peer_id,
289                            request,
290                        )
291                        .await
292                        {
293                            Ok(chain_exchange_result) => {
294                                match chain_exchange_result.into_result::<T>() {
295                                    Ok(r) => {
296                                        success_time_cost_millis_stats.lock().update(
297                                            start.elapsed().as_millis()
298                                        );
299                                        Ok(r)
300                                    }
301                                    Err(error) => {
302                                        lookup_failures.fetch_add(1, Ordering::Relaxed);
303                                        debug!(%peer_id, %request_len, %options, %n_peers, %error, "Failed chain_exchange response");
304                                        Err(error)
305                                    }
306                                }
307                            }
308                            Err(error) => {
309                                network_failures.fetch_add(1, Ordering::Relaxed);
310                                debug!(%peer_id, %request_len, %options, %n_peers, %error, "Failed chain_exchange request to peer");
311                                Err(error)
312                            }
313                        }
314                    });
315                }
316
317                let make_failure_message = || {
318                    CHAIN_EXCHANGE_TIMEOUT_MILLIS.adapt_on_failure();
319                    tracing::debug!(
320                        "Increased chain exchange timeout to {}ms",
321                        CHAIN_EXCHANGE_TIMEOUT_MILLIS.get()
322                    );
323                    let mut message = String::new();
324                    message.push_str("ChainExchange request failed for all top peers. ");
325                    message.push_str(&format!(
326                        "{} network failures, ",
327                        network_failures.load(Ordering::Relaxed)
328                    ));
329                    message.push_str(&format!(
330                        "{} lookup failures, ",
331                        lookup_failures.load(Ordering::Relaxed)
332                    ));
333                    message.push_str(&format!("request:\n{request:?}",));
334                    message
335                };
336
337                let v = batch
338                    .get_ok_validated(validate)
339                    .await
340                    .ok_or_else(make_failure_message)?;
341                if let Ok(mean) = success_time_cost_millis_stats.lock().mean()
342                    && CHAIN_EXCHANGE_TIMEOUT_MILLIS.adapt_on_success(mean as _)
343                {
344                    tracing::debug!(
345                        "Decreased chain exchange timeout to {}ms. Current average: {}ms",
346                        CHAIN_EXCHANGE_TIMEOUT_MILLIS.get(),
347                        mean,
348                    );
349                }
350                trace!("Succeed: handle_chain_exchange_request");
351                v
352            }
353        };
354
355        // Log success for the global request with the latency from before sending.
356        self.peer_manager
357            .log_global_success(Instant::now().duration_since(global_pre_time));
358
359        Ok(chain_exchange_result)
360    }
361
362    /// Send a `chain_exchange` request to the network and await response.
363    async fn chain_exchange_request(
364        peer_manager: Arc<PeerManager>,
365        network_send: flume::Sender<NetworkMessage>,
366        peer_id: PeerId,
367        request: ChainExchangeRequest,
368    ) -> Result<ChainExchangeResponse, String> {
369        trace!("Sending ChainExchange Request to {peer_id}");
370
371        let req_pre_time = Instant::now();
372
373        let (tx, rx) = flume::bounded(1);
374        if network_send
375            .send_async(NetworkMessage::ChainExchangeRequest {
376                peer_id,
377                request,
378                response_channel: tx,
379            })
380            .await
381            .is_err()
382        {
383            return Err("Failed to send chain exchange request to network".to_string());
384        };
385
386        // Add timeout to receiving response from p2p service to avoid stalling.
387        // There is also a timeout inside the request-response calls, but this ensures
388        // this.
389        let res = tokio::task::spawn_blocking(move || {
390            rx.recv_timeout(Duration::from_millis(CHAIN_EXCHANGE_TIMEOUT_MILLIS.get()))
391        })
392        .await;
393        let res_duration = Instant::now().duration_since(req_pre_time);
394        match res {
395            Ok(Ok(Ok(bs_res))) => {
396                // Successful response
397                peer_manager.log_success(&peer_id, res_duration);
398                trace!("Succeeded: ChainExchange Request to {peer_id}");
399                Ok(bs_res)
400            }
401            Ok(Ok(Err(e))) => {
402                // Internal libp2p error, score failure for peer and potentially disconnect
403                match e {
404                    RequestResponseError::UnsupportedProtocols => {
405                        // refactor this into Networkevent if user agent logging is critical here
406                        peer_manager
407                            .ban_peer_with_default_duration(
408                                peer_id,
409                                "ChainExchange protocol unsupported",
410                                |_| None,
411                            )
412                            .await;
413                    }
414                    RequestResponseError::ConnectionClosed | RequestResponseError::DialFailure => {
415                        peer_manager.mark_peer_bad(peer_id, format!("chain exchange error {e:?}"));
416                    }
417                    // Ignore dropping peer on timeout for now. Can't be confident yet that the
418                    // specified timeout is adequate time.
419                    RequestResponseError::Timeout | RequestResponseError::Io(_) => {
420                        peer_manager.log_failure(&peer_id, res_duration);
421                    }
422                }
423                debug!("Failed: ChainExchange Request to {peer_id}");
424                Err(format!("Internal libp2p error: {e:?}"))
425            }
426            Ok(Err(_)) | Err(_) => {
427                // Sender channel internally dropped or timeout, both should log failure which
428                // will negatively score the peer, but not drop yet.
429                peer_manager.log_failure(&peer_id, res_duration);
430                debug!("Timeout: ChainExchange Request to {peer_id}");
431                Err(format!("Chain exchange request to {peer_id} timed out"))
432            }
433        }
434    }
435
436    /// Send a hello request to the network (does not immediately await
437    /// response).
438    pub async fn hello_request(
439        &self,
440        peer_id: PeerId,
441        request: HelloRequest,
442    ) -> anyhow::Result<(PeerId, Instant, Option<HelloResponse>)> {
443        trace!("Sending Hello Message to {}", peer_id);
444
445        // Create oneshot channel for receiving response from sent hello.
446        let (tx, rx) = flume::bounded(1);
447
448        // Send request into libp2p service
449        self.network_send
450            .send_async(NetworkMessage::HelloRequest {
451                peer_id,
452                request,
453                response_channel: tx,
454            })
455            .await
456            .context("Failed to send hello request: receiver dropped")?;
457
458        const HELLO_TIMEOUT: Duration = Duration::from_secs(30);
459        let sent = Instant::now();
460        let res = tokio::task::spawn_blocking(move || rx.recv_timeout(HELLO_TIMEOUT))
461            .await?
462            .ok();
463        Ok((peer_id, sent, res))
464    }
465}
466
467/// Validates network tipsets that are sorted by epoch in descending order with the below checks
468/// 1. The latest(first) tipset has the desired tipset key
469/// 2. The sorted tipsets are chained by their tipset keys
470fn validate_network_tipsets(tipsets: &[Arc<Tipset>], start_tipset_key: &TipsetKey) -> bool {
471    if let Some(start) = tipsets.first() {
472        if start.key() != start_tipset_key {
473            tracing::warn!(epoch=%start.epoch(), expected=%start_tipset_key, actual=%start.key(), "start tipset key mismatch");
474            return false;
475        }
476        for (ts, pts) in tipsets.iter().zip(tipsets.iter().skip(1)) {
477            if ts.parents() != pts.key() {
478                tracing::warn!(epoch=%ts.epoch(), expected_parent=%pts.key(), actual_parent=%ts.parents(), "invalid chain");
479                return false;
480            }
481        }
482        true
483    } else {
484        tracing::warn!("invalid empty chain_exchange_headers response");
485        false
486    }
487}
488
489#[cfg(test)]
490mod tests {
491    use super::*;
492
493    use std::sync::atomic::{AtomicBool, AtomicUsize};
494
495    impl<T> RaceBatch<T>
496    where
497        T: Send + 'static,
498    {
499        pub async fn get_ok(self) -> Option<T> {
500            self.get_ok_validated(|_| true).await
501        }
502    }
503
504    #[tokio::test]
505    async fn race_batch_ok() {
506        let mut batch = RaceBatch::new(3);
507        batch.add(async move { Ok(1) });
508        batch.add(async move { Err("kaboom".into()) });
509
510        assert_eq!(batch.get_ok().await, Some(1));
511    }
512
513    #[tokio::test]
514    async fn race_batch_ok_faster() {
515        let mut batch = RaceBatch::new(3);
516        batch.add(async move {
517            tokio::time::sleep(Duration::from_secs(100)).await;
518            Ok(1)
519        });
520        batch.add(async move { Ok(2) });
521        batch.add(async move { Err("kaboom".into()) });
522
523        assert_eq!(batch.get_ok().await, Some(2));
524    }
525
526    #[tokio::test]
527    async fn race_batch_none() {
528        let mut batch: RaceBatch<i32> = RaceBatch::new(3);
529        batch.add(async move { Err("kaboom".into()) });
530        batch.add(async move { Err("banana".into()) });
531
532        assert_eq!(batch.get_ok().await, None);
533    }
534
535    #[tokio::test]
536    async fn race_batch_semaphore() {
537        const MAX_JOBS: usize = 30;
538        let counter = Arc::new(AtomicUsize::new(0));
539        let exceeded = Arc::new(AtomicBool::new(false));
540
541        let mut batch: RaceBatch<i32> = RaceBatch::new(MAX_JOBS);
542        for _ in 0..10000 {
543            let c = counter.clone();
544            let e = exceeded.clone();
545            batch.add(async move {
546                let prev = c.fetch_add(1, Ordering::Relaxed);
547                if prev >= MAX_JOBS {
548                    e.fetch_or(true, Ordering::Relaxed);
549                }
550
551                tokio::task::yield_now().await;
552                c.fetch_sub(1, Ordering::Relaxed);
553
554                Err("banana".into())
555            });
556        }
557
558        assert_eq!(batch.get_ok().await, None);
559        assert!(!exceeded.load(Ordering::Relaxed));
560    }
561
562    #[tokio::test]
563    async fn race_batch_semaphore_exceeded() {
564        const MAX_JOBS: usize = 30;
565        let counter = Arc::new(AtomicUsize::new(0));
566        let exceeded = Arc::new(AtomicBool::new(false));
567
568        // We add one more job to exceed the limit
569        let mut batch: RaceBatch<i32> = RaceBatch::new(MAX_JOBS + 1);
570        for _ in 0..10000 {
571            let c = counter.clone();
572            let e = exceeded.clone();
573            batch.add(async move {
574                let prev = c.fetch_add(1, Ordering::Relaxed);
575                if prev >= MAX_JOBS {
576                    e.fetch_or(true, Ordering::Relaxed);
577                }
578
579                tokio::task::yield_now().await;
580                c.fetch_sub(1, Ordering::Relaxed);
581
582                Err("banana".into())
583            });
584        }
585
586        assert_eq!(batch.get_ok().await, None);
587        assert!(exceeded.load(Ordering::Relaxed));
588    }
589
590    #[test]
591    #[allow(unused_variables)]
592    fn validate_network_tipsets_tests() {
593        use crate::blocks::{Chain4U, chain4u};
594
595        let c4u = Chain4U::new();
596        chain4u! {
597            in c4u;
598            t0 @ [genesis_header]
599            -> t1 @ [first_header]
600            -> t2 @ [second_left, second_right]
601            -> t3 @ [third]
602            -> t4 @ [fourth]
603        };
604        let t0 = Arc::new(t0.clone());
605        let t1 = Arc::new(t1.clone());
606        let t2 = Arc::new(t2.clone());
607        let t3 = Arc::new(t3.clone());
608        let t4 = Arc::new(t4.clone());
609        assert!(validate_network_tipsets(
610            &[t4.clone(), t3.clone(), t2.clone(), t1.clone(), t0.clone()],
611            t4.key()
612        ));
613        assert!(!validate_network_tipsets(
614            &[t4.clone(), t3.clone(), t2.clone(), t1.clone(), t0.clone()],
615            t3.key()
616        ));
617        assert!(!validate_network_tipsets(
618            &[t4.clone(), t2.clone(), t1.clone(), t0.clone()],
619            t4.key()
620        ));
621    }
622}