Skip to main content

forest/chain_sync/
network_context.rs

1// Copyright 2019-2026 ChainSafe Systems
2// SPDX-License-Identifier: Apache-2.0, MIT
3
4use std::{
5    convert::TryFrom,
6    num::NonZeroU64,
7    sync::{
8        Arc, LazyLock,
9        atomic::{AtomicU64, Ordering},
10    },
11    time::{Duration, Instant},
12};
13
14use crate::{
15    blocks::{FullTipset, Tipset, TipsetKey},
16    libp2p::{
17        NetworkMessage, PeerId, PeerManager,
18        chain_exchange::{
19            ChainExchangeRequest, ChainExchangeResponse, HEADERS, MESSAGES, TipsetBundle,
20        },
21        hello::{HelloRequest, HelloResponse},
22        rpc::RequestResponseError,
23    },
24    utils::{
25        misc::{AdaptiveValueProvider, ExponentialAdaptiveValueProvider},
26        stats::Stats,
27    },
28};
29use anyhow::Context as _;
30use fvm_ipld_blockstore::Blockstore;
31use parking_lot::Mutex;
32use std::future::Future;
33use tokio::sync::Semaphore;
34use tokio::task::JoinSet;
35use tracing::{debug, trace};
36
37/// Timeout milliseconds for response from an RPC request
38// This value is automatically adapted in the range of [5, 60] for different network conditions,
39// being decreased on success and increased on failure
40static CHAIN_EXCHANGE_TIMEOUT_MILLIS: LazyLock<ExponentialAdaptiveValueProvider<u64>> =
41    LazyLock::new(|| ExponentialAdaptiveValueProvider::new(5000, 2000, 60000, false));
42
43/// Maximum number of concurrent chain exchange request being sent to the
44/// network.
45const MAX_CONCURRENT_CHAIN_EXCHANGE_REQUESTS: usize = 2;
46
47/// Context used in chain sync to handle network requests.
48/// This contains the peer manager, P2P service interface, and [`Blockstore`]
49/// required to make network requests.
50#[derive(derive_more::Constructor)]
51pub struct SyncNetworkContext<DB> {
52    /// Channel to send network messages through P2P service
53    network_send: flume::Sender<NetworkMessage>,
54    /// Manages peers to send requests to and updates request stats for the
55    /// respective peers.
56    peer_manager: Arc<PeerManager>,
57    db: Arc<DB>,
58}
59
60impl<DB> Clone for SyncNetworkContext<DB> {
61    fn clone(&self) -> Self {
62        Self {
63            network_send: self.network_send.clone(),
64            peer_manager: self.peer_manager.clone(),
65            db: self.db.clone(),
66        }
67    }
68}
69
70/// Race tasks to completion while limiting the number of tasks that may execute concurrently.
71/// Once a task finishes without error, the rest of the tasks are canceled.
72struct RaceBatch<T> {
73    tasks: JoinSet<Result<T, String>>,
74    semaphore: Arc<Semaphore>,
75}
76
77impl<T> RaceBatch<T>
78where
79    T: Send + 'static,
80{
81    pub fn new(max_concurrent_jobs: usize) -> Self {
82        RaceBatch {
83            tasks: JoinSet::new(),
84            semaphore: Arc::new(Semaphore::new(max_concurrent_jobs)),
85        }
86    }
87
88    pub fn add(&mut self, future: impl Future<Output = Result<T, String>> + Send + 'static) {
89        let sem = self.semaphore.clone();
90        self.tasks.spawn(async move {
91            let permit = sem
92                .acquire_owned()
93                .await
94                .map_err(|_| "Semaphore unexpectedly closed")?;
95            let result = future.await;
96            drop(permit);
97            result
98        });
99    }
100
101    /// Return first finishing `Ok` future that passes validation else return `None` if all jobs failed
102    pub async fn get_ok_validated<F>(mut self, validate: F) -> Option<T>
103    where
104        F: Fn(&T) -> bool,
105    {
106        while let Some(result) = self.tasks.join_next().await {
107            if let Ok(Ok(value)) = result
108                && validate(&value)
109            {
110                return Some(value);
111            }
112        }
113        // So far every task have failed
114        None
115    }
116}
117
118impl<DB> SyncNetworkContext<DB>
119where
120    DB: Blockstore,
121{
122    /// Returns a reference to the peer manager of the network context.
123    pub fn peer_manager(&self) -> &PeerManager {
124        self.peer_manager.as_ref()
125    }
126
127    /// Returns a reference to the channel for sending network messages through P2P service.
128    pub fn network_send(&self) -> &flume::Sender<NetworkMessage> {
129        &self.network_send
130    }
131
132    /// Send a `chain_exchange` request for only block headers (ignore
133    /// messages). If `peer_id` is `None`, requests will be sent to a set of
134    /// shuffled peers.
135    pub async fn chain_exchange_headers(
136        &self,
137        peer_id: Option<PeerId>,
138        tsk: &TipsetKey,
139        count: NonZeroU64,
140    ) -> Result<Vec<Tipset>, String> {
141        self.handle_chain_exchange_request(peer_id, tsk, count, HEADERS, |tipsets: &Vec<Tipset>| {
142            validate_network_tipsets(tipsets, tsk)
143        })
144        .await
145    }
146
147    /// Send a `chain_exchange` request for messages to assemble a full tipset with a local tipset,
148    /// If `peer_id` is `None`, requests will be sent to a set of shuffled peers.
149    pub async fn chain_exchange_messages(
150        &self,
151        peer_id: Option<PeerId>,
152        ts: &Tipset,
153    ) -> Result<FullTipset, String> {
154        let mut bundles: Vec<TipsetBundle> = self
155            .handle_chain_exchange_request(
156                peer_id,
157                ts.key(),
158                NonZeroU64::new(1).expect("Infallible"),
159                MESSAGES,
160                |_| true,
161            )
162            .await?;
163
164        if bundles.len() != 1 {
165            return Err(format!(
166                "chain exchange request returned {} tipsets, 1 expected.",
167                bundles.len()
168            ));
169        }
170        let mut bundle = bundles.remove(0);
171        bundle.blocks = ts.block_headers().to_vec();
172        bundle.try_into()
173    }
174
175    /// Send a `chain_exchange` request for a single full tipset (includes
176    /// messages) If `peer_id` is `None`, requests will be sent to a set of
177    /// shuffled peers.
178    pub async fn chain_exchange_full_tipset(
179        &self,
180        peer_id: Option<PeerId>,
181        tsk: &TipsetKey,
182    ) -> Result<FullTipset, String> {
183        let mut fts = self
184            .handle_chain_exchange_request(
185                peer_id,
186                tsk,
187                NonZeroU64::new(1).expect("Infallible"),
188                HEADERS | MESSAGES,
189                |_| true,
190            )
191            .await?;
192
193        if fts.len() != 1 {
194            return Err(format!(
195                "Full tipset request returned {} tipsets, 1 expected.",
196                fts.len()
197            ));
198        }
199        Ok(fts.remove(0))
200    }
201
202    pub async fn chain_exchange_full_tipsets(
203        &self,
204        peer_id: Option<PeerId>,
205        tsk: &TipsetKey,
206    ) -> Result<Vec<FullTipset>, String> {
207        self.handle_chain_exchange_request(
208            peer_id,
209            tsk,
210            NonZeroU64::new(16).expect("Infallible"),
211            HEADERS | MESSAGES,
212            |_| true,
213        )
214        .await
215    }
216
217    /// Helper function to handle the peer retrieval if no peer supplied as well
218    /// as the logging and updating of the peer info in the `PeerManager`.
219    async fn handle_chain_exchange_request<T, F>(
220        &self,
221        peer_id: Option<PeerId>,
222        tsk: &TipsetKey,
223        request_len: NonZeroU64,
224        options: u64,
225        validate: F,
226    ) -> Result<Vec<T>, String>
227    where
228        T: TryFrom<TipsetBundle> + Send + Sync + 'static,
229        <T as TryFrom<TipsetBundle>>::Error: std::fmt::Display,
230        F: Fn(&Vec<T>) -> bool,
231    {
232        let request = ChainExchangeRequest {
233            start: tsk.to_cids(),
234            request_len: request_len.get(),
235            options,
236        };
237
238        let global_pre_time = Instant::now();
239        let network_failures = Arc::new(AtomicU64::new(0));
240        let lookup_failures = Arc::new(AtomicU64::new(0));
241        let chain_exchange_result = match peer_id {
242            // Specific peer is given to send request, send specifically to that peer.
243            Some(id) => Self::chain_exchange_request(
244                self.peer_manager.clone(),
245                self.network_send.clone(),
246                id,
247                request,
248            )
249            .await?
250            .into_result()?,
251            None => {
252                // No specific peer set, send requests to a shuffled set of top peers until
253                // a request succeeds.
254                let peers = self.peer_manager.top_peers_shuffled();
255                if peers.is_empty() {
256                    return Err("chain exchange failed: no peers are available".into());
257                }
258                let n_peers = peers.len();
259                let mut batch = RaceBatch::new(MAX_CONCURRENT_CHAIN_EXCHANGE_REQUESTS);
260                let success_time_cost_millis_stats = Arc::new(Mutex::new(Stats::new()));
261                for peer_id in peers.into_iter() {
262                    let peer_manager = self.peer_manager.clone();
263                    let network_send = self.network_send.clone();
264                    let request = request.clone();
265                    let network_failures = network_failures.clone();
266                    let lookup_failures = lookup_failures.clone();
267                    let success_time_cost_millis_stats = success_time_cost_millis_stats.clone();
268                    batch.add(async move {
269                        let start = Instant::now();
270                        match Self::chain_exchange_request(
271                            peer_manager,
272                            network_send,
273                            peer_id,
274                            request,
275                        )
276                        .await
277                        {
278                            Ok(chain_exchange_result) => {
279                                match chain_exchange_result.into_result::<T>() {
280                                    Ok(r) => {
281                                        success_time_cost_millis_stats.lock().update(
282                                            start.elapsed().as_millis()
283                                        );
284                                        Ok(r)
285                                    }
286                                    Err(error) => {
287                                        lookup_failures.fetch_add(1, Ordering::Relaxed);
288                                        debug!(%peer_id, %request_len, %options, %n_peers, %error, "Failed chain_exchange response");
289                                        Err(error)
290                                    }
291                                }
292                            }
293                            Err(error) => {
294                                network_failures.fetch_add(1, Ordering::Relaxed);
295                                debug!(%peer_id, %request_len, %options, %n_peers, %error, "Failed chain_exchange request to peer");
296                                Err(error)
297                            }
298                        }
299                    });
300                }
301
302                let make_failure_message = || {
303                    CHAIN_EXCHANGE_TIMEOUT_MILLIS.adapt_on_failure();
304                    tracing::debug!(
305                        "Increased chain exchange timeout to {}ms",
306                        CHAIN_EXCHANGE_TIMEOUT_MILLIS.get()
307                    );
308                    let mut message = String::new();
309                    message.push_str("ChainExchange request failed for all top peers. ");
310                    message.push_str(&format!(
311                        "{} network failures, ",
312                        network_failures.load(Ordering::Relaxed)
313                    ));
314                    message.push_str(&format!(
315                        "{} lookup failures, ",
316                        lookup_failures.load(Ordering::Relaxed)
317                    ));
318                    message.push_str(&format!("request:\n{request:?}",));
319                    message
320                };
321
322                let v = batch
323                    .get_ok_validated(validate)
324                    .await
325                    .ok_or_else(make_failure_message)?;
326                if let Ok(mean) = success_time_cost_millis_stats.lock().mean()
327                    && CHAIN_EXCHANGE_TIMEOUT_MILLIS.adapt_on_success(mean as _)
328                {
329                    tracing::debug!(
330                        "Decreased chain exchange timeout to {}ms. Current average: {}ms",
331                        CHAIN_EXCHANGE_TIMEOUT_MILLIS.get(),
332                        mean,
333                    );
334                }
335                trace!("Succeed: handle_chain_exchange_request");
336                v
337            }
338        };
339
340        // Log success for the global request with the latency from before sending.
341        self.peer_manager
342            .log_global_success(Instant::now().duration_since(global_pre_time));
343
344        Ok(chain_exchange_result)
345    }
346
347    /// Send a `chain_exchange` request to the network and await response.
348    async fn chain_exchange_request(
349        peer_manager: Arc<PeerManager>,
350        network_send: flume::Sender<NetworkMessage>,
351        peer_id: PeerId,
352        request: ChainExchangeRequest,
353    ) -> Result<ChainExchangeResponse, String> {
354        trace!("Sending ChainExchange Request to {peer_id}");
355
356        let req_pre_time = Instant::now();
357
358        let (tx, rx) = flume::bounded(1);
359        if network_send
360            .send_async(NetworkMessage::ChainExchangeRequest {
361                peer_id,
362                request,
363                response_channel: tx,
364            })
365            .await
366            .is_err()
367        {
368            return Err("Failed to send chain exchange request to network".to_string());
369        };
370
371        // Add timeout to receiving response from p2p service to avoid stalling.
372        // There is also a timeout inside the request-response calls, but this ensures
373        // this.
374        let res = tokio::task::spawn_blocking(move || {
375            rx.recv_timeout(Duration::from_millis(CHAIN_EXCHANGE_TIMEOUT_MILLIS.get()))
376        })
377        .await;
378        let res_duration = Instant::now().duration_since(req_pre_time);
379        match res {
380            Ok(Ok(Ok(bs_res))) => {
381                // Successful response
382                peer_manager.log_success(&peer_id, res_duration);
383                trace!("Succeeded: ChainExchange Request to {peer_id}");
384                Ok(bs_res)
385            }
386            Ok(Ok(Err(e))) => {
387                // Internal libp2p error, score failure for peer and potentially disconnect
388                match e {
389                    RequestResponseError::UnsupportedProtocols => {
390                        // refactor this into Networkevent if user agent logging is critical here
391                        peer_manager
392                            .ban_peer_with_default_duration(
393                                peer_id,
394                                "ChainExchange protocol unsupported",
395                                |_| None,
396                            )
397                            .await;
398                    }
399                    RequestResponseError::ConnectionClosed | RequestResponseError::DialFailure => {
400                        peer_manager.mark_peer_bad(peer_id, format!("chain exchange error {e:?}"));
401                    }
402                    // Ignore dropping peer on timeout for now. Can't be confident yet that the
403                    // specified timeout is adequate time.
404                    RequestResponseError::Timeout | RequestResponseError::Io(_) => {
405                        peer_manager.log_failure(&peer_id, res_duration);
406                    }
407                }
408                debug!("Failed: ChainExchange Request to {peer_id}");
409                Err(format!("Internal libp2p error: {e:?}"))
410            }
411            Ok(Err(_)) | Err(_) => {
412                // Sender channel internally dropped or timeout, both should log failure which
413                // will negatively score the peer, but not drop yet.
414                peer_manager.log_failure(&peer_id, res_duration);
415                debug!("Timeout: ChainExchange Request to {peer_id}");
416                Err(format!("Chain exchange request to {peer_id} timed out"))
417            }
418        }
419    }
420
421    /// Send a hello request to the network (does not immediately await
422    /// response).
423    pub async fn hello_request(
424        &self,
425        peer_id: PeerId,
426        request: HelloRequest,
427    ) -> anyhow::Result<(PeerId, Instant, Option<HelloResponse>)> {
428        trace!("Sending Hello Message to {}", peer_id);
429
430        // Create oneshot channel for receiving response from sent hello.
431        let (tx, rx) = flume::bounded(1);
432
433        // Send request into libp2p service
434        self.network_send
435            .send_async(NetworkMessage::HelloRequest {
436                peer_id,
437                request,
438                response_channel: tx,
439            })
440            .await
441            .context("Failed to send hello request: receiver dropped")?;
442
443        const HELLO_TIMEOUT: Duration = Duration::from_secs(30);
444        let sent = Instant::now();
445        let res = tokio::task::spawn_blocking(move || rx.recv_timeout(HELLO_TIMEOUT))
446            .await?
447            .ok();
448        Ok((peer_id, sent, res))
449    }
450}
451
452/// Validates network tipsets that are sorted by epoch in descending order with the below checks
453/// 1. The latest(first) tipset has the desired tipset key
454/// 2. The sorted tipsets are chained by their tipset keys
455fn validate_network_tipsets(tipsets: &[Tipset], start_tipset_key: &TipsetKey) -> bool {
456    if let Some(start) = tipsets.first() {
457        if start.key() != start_tipset_key {
458            tracing::warn!(epoch=%start.epoch(), expected=%start_tipset_key, actual=%start.key(), "start tipset key mismatch");
459            return false;
460        }
461        for (ts, pts) in tipsets.iter().zip(tipsets.iter().skip(1)) {
462            if ts.parents() != pts.key() {
463                tracing::warn!(epoch=%ts.epoch(), expected_parent=%pts.key(), actual_parent=%ts.parents(), "invalid chain");
464                return false;
465            }
466        }
467        true
468    } else {
469        tracing::warn!("invalid empty chain_exchange_headers response");
470        false
471    }
472}
473
474#[cfg(test)]
475mod tests {
476    use super::*;
477
478    use std::sync::atomic::{AtomicBool, AtomicUsize};
479
480    impl<T> RaceBatch<T>
481    where
482        T: Send + 'static,
483    {
484        pub async fn get_ok(self) -> Option<T> {
485            self.get_ok_validated(|_| true).await
486        }
487    }
488
489    #[tokio::test]
490    async fn race_batch_ok() {
491        let mut batch = RaceBatch::new(3);
492        batch.add(async move { Ok(1) });
493        batch.add(async move { Err("kaboom".into()) });
494
495        assert_eq!(batch.get_ok().await, Some(1));
496    }
497
498    #[tokio::test]
499    async fn race_batch_ok_faster() {
500        let mut batch = RaceBatch::new(3);
501        batch.add(async move {
502            tokio::time::sleep(Duration::from_secs(100)).await;
503            Ok(1)
504        });
505        batch.add(async move { Ok(2) });
506        batch.add(async move { Err("kaboom".into()) });
507
508        assert_eq!(batch.get_ok().await, Some(2));
509    }
510
511    #[tokio::test]
512    async fn race_batch_none() {
513        let mut batch: RaceBatch<i32> = RaceBatch::new(3);
514        batch.add(async move { Err("kaboom".into()) });
515        batch.add(async move { Err("banana".into()) });
516
517        assert_eq!(batch.get_ok().await, None);
518    }
519
520    #[tokio::test]
521    async fn race_batch_semaphore() {
522        const MAX_JOBS: usize = 30;
523        let counter = Arc::new(AtomicUsize::new(0));
524        let exceeded = Arc::new(AtomicBool::new(false));
525
526        let mut batch: RaceBatch<i32> = RaceBatch::new(MAX_JOBS);
527        for _ in 0..10000 {
528            let c = counter.clone();
529            let e = exceeded.clone();
530            batch.add(async move {
531                let prev = c.fetch_add(1, Ordering::Relaxed);
532                if prev >= MAX_JOBS {
533                    e.fetch_or(true, Ordering::Relaxed);
534                }
535
536                tokio::task::yield_now().await;
537                c.fetch_sub(1, Ordering::Relaxed);
538
539                Err("banana".into())
540            });
541        }
542
543        assert_eq!(batch.get_ok().await, None);
544        assert!(!exceeded.load(Ordering::Relaxed));
545    }
546
547    #[tokio::test]
548    async fn race_batch_semaphore_exceeded() {
549        const MAX_JOBS: usize = 30;
550        let counter = Arc::new(AtomicUsize::new(0));
551        let exceeded = Arc::new(AtomicBool::new(false));
552
553        // We add one more job to exceed the limit
554        let mut batch: RaceBatch<i32> = RaceBatch::new(MAX_JOBS + 1);
555        for _ in 0..10000 {
556            let c = counter.clone();
557            let e = exceeded.clone();
558            batch.add(async move {
559                let prev = c.fetch_add(1, Ordering::Relaxed);
560                if prev >= MAX_JOBS {
561                    e.fetch_or(true, Ordering::Relaxed);
562                }
563
564                tokio::task::yield_now().await;
565                c.fetch_sub(1, Ordering::Relaxed);
566
567                Err("banana".into())
568            });
569        }
570
571        assert_eq!(batch.get_ok().await, None);
572        assert!(exceeded.load(Ordering::Relaxed));
573    }
574
575    #[test]
576    #[allow(unused_variables)]
577    fn validate_network_tipsets_tests() {
578        use crate::blocks::{Chain4U, chain4u};
579
580        let c4u = Chain4U::new();
581        chain4u! {
582            in c4u;
583            t0 @ [genesis_header]
584            -> t1 @ [first_header]
585            -> t2 @ [second_left, second_right]
586            -> t3 @ [third]
587            -> t4 @ [fourth]
588        };
589        assert!(validate_network_tipsets(
590            &[t4.clone(), t3.clone(), t2.clone(), t1.clone(), t0.clone()],
591            t4.key()
592        ));
593        assert!(!validate_network_tipsets(
594            &[t4.clone(), t3.clone(), t2.clone(), t1.clone(), t0.clone()],
595            t3.key()
596        ));
597        assert!(!validate_network_tipsets(
598            &[t4.clone(), t2.clone(), t1.clone(), t0.clone()],
599            t4.key()
600        ));
601    }
602}