1use std::{
5 convert::TryFrom,
6 num::{NonZeroU64, NonZeroUsize},
7 sync::{
8 Arc, LazyLock,
9 atomic::{AtomicU64, Ordering},
10 },
11 time::{Duration, Instant},
12};
13
14use crate::{
15 blocks::{FullTipset, Tipset, TipsetKey, TipsetLike},
16 libp2p::{
17 NetworkMessage, PeerId, PeerManager,
18 chain_exchange::{
19 ChainExchangeRequest, ChainExchangeResponse, HEADERS, MESSAGES, TipsetBundle,
20 },
21 hello::{HelloRequest, HelloResponse},
22 rpc::RequestResponseError,
23 },
24 utils::{
25 ShallowClone,
26 misc::{AdaptiveValueProvider, ExponentialAdaptiveValueProvider},
27 stats::Stats,
28 },
29};
30use anyhow::Context as _;
31use fvm_ipld_blockstore::Blockstore;
32use nonzero_ext::nonzero;
33use parking_lot::Mutex;
34use std::future::Future;
35use tokio::sync::Semaphore;
36use tokio::task::JoinSet;
37use tracing::{debug, trace};
38
39static CHAIN_EXCHANGE_TIMEOUT_MILLIS: LazyLock<ExponentialAdaptiveValueProvider<u64>> =
43 LazyLock::new(|| ExponentialAdaptiveValueProvider::new(5000, 2000, 60000, false));
44
45static MAX_CONCURRENT_CHAIN_EXCHANGE_REQUESTS: LazyLock<NonZeroUsize> = LazyLock::new(|| {
48 std::env::var("FOREST_MAX_CONCURRENT_CHAIN_EXCHANGE_REQUESTS")
49 .ok()
50 .and_then(|i| {
51 i.parse().ok().inspect(|i| {
52 tracing::info!("max concurrent chain exchange requests set to {i} from `FOREST_MAX_CONCURRENT_CHAIN_EXCHANGE_REQUESTS`");
53 })
54 }).unwrap_or(nonzero!(3_usize))
55});
56
57#[derive(derive_more::Constructor)]
61pub struct SyncNetworkContext<DB> {
62 network_send: flume::Sender<NetworkMessage>,
64 peer_manager: Arc<PeerManager>,
67 db: Arc<DB>,
68}
69
70impl<DB> ShallowClone for SyncNetworkContext<DB> {
71 fn shallow_clone(&self) -> Self {
72 Self {
73 network_send: self.network_send.clone(),
74 peer_manager: self.peer_manager.shallow_clone(),
75 db: self.db.shallow_clone(),
76 }
77 }
78}
79
80struct RaceBatch<T> {
83 tasks: JoinSet<anyhow::Result<T>>,
84 semaphore: Arc<Semaphore>,
85}
86
87impl<T> RaceBatch<T>
88where
89 T: Send + 'static,
90{
91 pub fn new(max_concurrent_jobs: NonZeroUsize) -> Self {
92 RaceBatch {
93 tasks: JoinSet::new(),
94 semaphore: Arc::new(Semaphore::new(max_concurrent_jobs.get())),
95 }
96 }
97
98 pub fn add(&mut self, future: impl Future<Output = anyhow::Result<T>> + Send + 'static) {
99 let sem = self.semaphore.clone();
100 self.tasks.spawn(async move {
101 let permit = sem
102 .acquire_owned()
103 .await
104 .context("Semaphore unexpectedly closed")?;
105 let result = future.await;
106 drop(permit);
107 result
108 });
109 }
110
111 pub async fn get_ok_validated<F>(mut self, validate: F) -> Option<T>
113 where
114 F: Fn(&T) -> bool,
115 {
116 while let Some(result) = self.tasks.join_next().await {
117 if let Ok(Ok(value)) = result
118 && validate(&value)
119 {
120 return Some(value);
121 }
122 }
123 None
125 }
126}
127
128impl<DB> SyncNetworkContext<DB>
129where
130 DB: Blockstore,
131{
132 pub fn peer_manager(&self) -> &PeerManager {
134 self.peer_manager.as_ref()
135 }
136
137 pub fn network_send(&self) -> &flume::Sender<NetworkMessage> {
139 &self.network_send
140 }
141
142 pub async fn chain_exchange_headers(
146 &self,
147 peer_id: Option<PeerId>,
148 tsk: &TipsetKey,
149 count: NonZeroU64,
150 ) -> anyhow::Result<Vec<Tipset>> {
151 self.handle_chain_exchange_request(peer_id, tsk, count, HEADERS, |tipsets| {
152 validate_network_tipsets(tipsets, tsk)
153 })
154 .await
155 }
156
157 pub async fn chain_exchange_messages(
160 &self,
161 peer_id: Option<PeerId>,
162 ts: &Tipset,
163 ) -> anyhow::Result<FullTipset> {
164 let mut bundles: Vec<TipsetBundle> = self
165 .handle_chain_exchange_request(peer_id, ts.key(), nonzero!(1_u64), MESSAGES, |_| true)
166 .await?;
167
168 if bundles.len() != 1 {
169 anyhow::bail!(
170 "chain exchange request returned {} tipsets, 1 expected.",
171 bundles.len()
172 );
173 }
174 let mut bundle = bundles.remove(0);
175 bundle.blocks = ts.block_headers().to_vec();
176 bundle.try_into()
177 }
178
179 pub async fn chain_exchange_full_tipset(
183 &self,
184 peer_id: Option<PeerId>,
185 tsk: &TipsetKey,
186 ) -> anyhow::Result<FullTipset> {
187 let mut fts = self
188 .handle_chain_exchange_request(
189 peer_id,
190 tsk,
191 nonzero!(1_u64),
192 HEADERS | MESSAGES,
193 |tipsets| validate_network_tipsets(tipsets, tsk),
194 )
195 .await?;
196
197 anyhow::ensure!(
198 fts.len() == 1,
199 "Full tipset request returned {} tipsets, 1 expected.",
200 fts.len()
201 );
202
203 Ok(fts.remove(0))
204 }
205
206 pub async fn chain_exchange_full_tipsets(
207 &self,
208 peer_id: Option<PeerId>,
209 tsk: &TipsetKey,
210 ) -> anyhow::Result<Vec<FullTipset>> {
211 self.handle_chain_exchange_request(
212 peer_id,
213 tsk,
214 nonzero!(16_u64),
215 HEADERS | MESSAGES,
216 |_| true,
217 )
218 .await
219 }
220
221 pub async fn handle_chain_exchange_request<T, F>(
224 &self,
225 peer_id: Option<PeerId>,
226 tsk: &TipsetKey,
227 request_len: NonZeroU64,
228 options: u64,
229 validate: F,
230 ) -> anyhow::Result<Vec<T>>
231 where
232 T: TryFrom<TipsetBundle> + Send + Sync + 'static,
233 <T as TryFrom<TipsetBundle>>::Error: Into<anyhow::Error>,
234 F: Fn(&Vec<T>) -> bool,
235 {
236 let request = ChainExchangeRequest {
237 start: tsk.to_cids(),
238 request_len: request_len.get(),
239 options,
240 };
241
242 let global_pre_time = Instant::now();
243 let network_failures = Arc::new(AtomicU64::new(0));
244 let lookup_failures = Arc::new(AtomicU64::new(0));
245 let chain_exchange_result = match peer_id {
246 Some(id) => Self::chain_exchange_request(
248 self.peer_manager.clone(),
249 self.network_send.clone(),
250 id,
251 request,
252 )
253 .await?
254 .into_result()?,
255 None => {
256 let peers = self.peer_manager.top_peers_shuffled();
259 anyhow::ensure!(
260 !peers.is_empty(),
261 "chain exchange failed: no peers are available"
262 );
263
264 let n_peers = peers.len();
265 let mut batch = RaceBatch::new(*MAX_CONCURRENT_CHAIN_EXCHANGE_REQUESTS);
266 let success_time_cost_millis_stats = Arc::new(Mutex::new(Stats::new()));
267 for peer_id in peers.into_iter() {
268 let peer_manager = self.peer_manager.clone();
269 let network_send = self.network_send.clone();
270 let request = request.clone();
271 let network_failures = network_failures.clone();
272 let lookup_failures = lookup_failures.clone();
273 let success_time_cost_millis_stats = success_time_cost_millis_stats.clone();
274 batch.add(async move {
275 let start = Instant::now();
276 match Self::chain_exchange_request(
277 peer_manager,
278 network_send,
279 peer_id,
280 request,
281 )
282 .await
283 {
284 Ok(chain_exchange_result) => {
285 match chain_exchange_result.into_result::<T>() {
286 Ok(r) => {
287 success_time_cost_millis_stats.lock().update(
288 start.elapsed().as_millis()
289 );
290 Ok(r)
291 }
292 Err(error) => {
293 lookup_failures.fetch_add(1, Ordering::Relaxed);
294 debug!(%peer_id, %request_len, %options, %n_peers, %error, "Failed chain_exchange response");
295 Err(error)
296 }
297 }
298 }
299 Err(error) => {
300 network_failures.fetch_add(1, Ordering::Relaxed);
301 debug!(%peer_id, %request_len, %options, %n_peers, %error, "Failed chain_exchange request to peer");
302 Err(error)
303 }
304 }
305 });
306 }
307
308 let make_failure_message = || {
309 CHAIN_EXCHANGE_TIMEOUT_MILLIS.adapt_on_failure();
310 tracing::debug!(
311 "Increased chain exchange timeout to {}ms",
312 CHAIN_EXCHANGE_TIMEOUT_MILLIS.get()
313 );
314 let mut message = String::new();
315 message.push_str("ChainExchange request failed for all top peers. ");
316 message.push_str(&format!(
317 "{} network failures, ",
318 network_failures.load(Ordering::Relaxed)
319 ));
320 message.push_str(&format!(
321 "{} lookup failures, ",
322 lookup_failures.load(Ordering::Relaxed)
323 ));
324 message.push_str(&format!("request:\n{request:?}"));
325 anyhow::anyhow!(message)
326 };
327
328 let v = batch
329 .get_ok_validated(validate)
330 .await
331 .ok_or_else(make_failure_message)?;
332 if let Ok(mean) = success_time_cost_millis_stats.lock().mean()
333 && CHAIN_EXCHANGE_TIMEOUT_MILLIS.adapt_on_success(mean as _)
334 {
335 tracing::debug!(
336 "Decreased chain exchange timeout to {}ms. Current average: {}ms",
337 CHAIN_EXCHANGE_TIMEOUT_MILLIS.get(),
338 mean,
339 );
340 }
341 trace!("Succeed: handle_chain_exchange_request");
342 v
343 }
344 };
345
346 self.peer_manager
348 .log_global_success(Instant::now().duration_since(global_pre_time));
349
350 Ok(chain_exchange_result)
351 }
352
353 async fn chain_exchange_request(
355 peer_manager: Arc<PeerManager>,
356 network_send: flume::Sender<NetworkMessage>,
357 peer_id: PeerId,
358 request: ChainExchangeRequest,
359 ) -> anyhow::Result<ChainExchangeResponse> {
360 trace!("Sending ChainExchange Request to {peer_id}");
361
362 let req_pre_time = Instant::now();
363
364 let (tx, rx) = flume::bounded(1);
365 if network_send
366 .send_async(NetworkMessage::ChainExchangeRequest {
367 peer_id,
368 request,
369 response_channel: tx,
370 })
371 .await
372 .is_err()
373 {
374 anyhow::bail!("Failed to send chain exchange request to network");
375 };
376
377 let res = tokio::task::spawn_blocking(move || {
381 rx.recv_timeout(Duration::from_millis(CHAIN_EXCHANGE_TIMEOUT_MILLIS.get()))
382 })
383 .await;
384 let res_duration = Instant::now().duration_since(req_pre_time);
385 match res {
386 Ok(Ok(Ok(bs_res))) => {
387 peer_manager.log_success(&peer_id, res_duration);
389 trace!("Succeeded: ChainExchange Request to {peer_id}");
390 Ok(bs_res)
391 }
392 Ok(Ok(Err(e))) => {
393 match e {
395 RequestResponseError::UnsupportedProtocols => {
396 peer_manager
398 .ban_peer_with_default_duration(
399 peer_id,
400 "ChainExchange protocol unsupported",
401 |_| None,
402 )
403 .await;
404 }
405 RequestResponseError::ConnectionClosed | RequestResponseError::DialFailure => {
406 peer_manager.mark_peer_bad(peer_id, format!("chain exchange error {e:?}"));
407 }
408 RequestResponseError::Timeout | RequestResponseError::Io(_) => {
411 peer_manager.log_failure(&peer_id, res_duration);
412 }
413 }
414 debug!("Failed: ChainExchange Request to {peer_id}");
415 anyhow::bail!("Internal libp2p error: {e:?}");
416 }
417 Ok(Err(_)) | Err(_) => {
418 peer_manager.log_failure(&peer_id, res_duration);
421 debug!("Timeout: ChainExchange Request to {peer_id}");
422 anyhow::bail!("Chain exchange request to {peer_id} timed out");
423 }
424 }
425 }
426
427 pub async fn hello_request(
430 &self,
431 peer_id: PeerId,
432 request: HelloRequest,
433 ) -> anyhow::Result<(PeerId, Instant, Option<HelloResponse>)> {
434 trace!("Sending Hello Message to {}", peer_id);
435
436 let (tx, rx) = flume::bounded(1);
438
439 self.network_send
441 .send_async(NetworkMessage::HelloRequest {
442 peer_id,
443 request,
444 response_channel: tx,
445 })
446 .await
447 .context("Failed to send hello request: receiver dropped")?;
448
449 const HELLO_TIMEOUT: Duration = Duration::from_secs(30);
450 let sent = Instant::now();
451 let res = tokio::task::spawn_blocking(move || rx.recv_timeout(HELLO_TIMEOUT))
452 .await?
453 .ok();
454 Ok((peer_id, sent, res))
455 }
456}
457
458fn validate_network_tipsets<T: TipsetLike>(tipsets: &[T], start_tipset_key: &TipsetKey) -> bool {
462 if let Some(start) = tipsets.first() {
463 if start.key() != start_tipset_key {
464 tracing::warn!(epoch=%start.epoch(), expected=%start_tipset_key, actual=%start.key(), "start tipset key mismatch");
465 return false;
466 }
467 for (ts, pts) in tipsets.iter().zip(tipsets.iter().skip(1)) {
468 if ts.parents() != pts.key() {
469 tracing::warn!(epoch=%ts.epoch(), expected_parent=%pts.key(), actual_parent=%ts.parents(), "invalid chain");
470 return false;
471 }
472 }
473 true
474 } else {
475 tracing::warn!("invalid empty chain_exchange_headers response");
476 false
477 }
478}
479
480#[cfg(test)]
481mod tests {
482 use super::*;
483
484 use std::sync::atomic::{AtomicBool, AtomicUsize};
485
486 impl<T> RaceBatch<T>
487 where
488 T: Send + 'static,
489 {
490 pub async fn get_ok(self) -> Option<T> {
491 self.get_ok_validated(|_| true).await
492 }
493 }
494
495 #[tokio::test]
496 async fn race_batch_ok() {
497 let mut batch = RaceBatch::new(nonzero!(3_usize));
498 batch.add(async move { Ok(1) });
499 batch.add(async move { anyhow::bail!("kaboom") });
500
501 assert_eq!(batch.get_ok().await, Some(1));
502 }
503
504 #[tokio::test]
505 async fn race_batch_ok_faster() {
506 let mut batch = RaceBatch::new(nonzero!(3_usize));
507 batch.add(async move {
508 tokio::time::sleep(Duration::from_secs(100)).await;
509 Ok(1)
510 });
511 batch.add(async move { Ok(2) });
512 batch.add(async move { anyhow::bail!("kaboom") });
513
514 assert_eq!(batch.get_ok().await, Some(2));
515 }
516
517 #[tokio::test]
518 async fn race_batch_none() {
519 let mut batch: RaceBatch<i32> = RaceBatch::new(nonzero!(3_usize));
520 batch.add(async move { anyhow::bail!("kaboom") });
521 batch.add(async move { anyhow::bail!("banana") });
522
523 assert_eq!(batch.get_ok().await, None);
524 }
525
526 #[tokio::test]
527 async fn race_batch_semaphore() {
528 const MAX_JOBS: NonZeroUsize = nonzero!(30_usize);
529 let counter = Arc::new(AtomicUsize::new(0));
530 let exceeded = Arc::new(AtomicBool::new(false));
531
532 let mut batch: RaceBatch<i32> = RaceBatch::new(MAX_JOBS);
533 for _ in 0..10000 {
534 let c = counter.clone();
535 let e = exceeded.clone();
536 batch.add(async move {
537 let prev = c.fetch_add(1, Ordering::Relaxed);
538 if prev >= MAX_JOBS.get() {
539 e.fetch_or(true, Ordering::Relaxed);
540 }
541
542 tokio::task::yield_now().await;
543 c.fetch_sub(1, Ordering::Relaxed);
544
545 anyhow::bail!("banana")
546 });
547 }
548
549 assert_eq!(batch.get_ok().await, None);
550 assert!(!exceeded.load(Ordering::Relaxed));
551 }
552
553 #[tokio::test]
554 async fn race_batch_semaphore_exceeded() {
555 const MAX_JOBS: NonZeroUsize = nonzero!(30_usize);
556 let counter = Arc::new(AtomicUsize::new(0));
557 let exceeded = Arc::new(AtomicBool::new(false));
558
559 let mut batch: RaceBatch<i32> = RaceBatch::new(MAX_JOBS.checked_add(1).unwrap());
561 for _ in 0..10000 {
562 let c = counter.clone();
563 let e = exceeded.clone();
564 batch.add(async move {
565 let prev = c.fetch_add(1, Ordering::Relaxed);
566 if prev >= MAX_JOBS.get() {
567 e.fetch_or(true, Ordering::Relaxed);
568 }
569
570 tokio::task::yield_now().await;
571 c.fetch_sub(1, Ordering::Relaxed);
572
573 anyhow::bail!("banana")
574 });
575 }
576
577 assert_eq!(batch.get_ok().await, None);
578 assert!(exceeded.load(Ordering::Relaxed));
579 }
580
581 #[test]
582 #[allow(unused_variables)]
583 fn validate_network_tipsets_tests() {
584 use crate::blocks::{Chain4U, chain4u};
585
586 let c4u = Chain4U::new();
587 chain4u! {
588 in c4u;
589 t0 @ [genesis_header]
590 -> t1 @ [first_header]
591 -> t2 @ [second_left, second_right]
592 -> t3 @ [third]
593 -> t4 @ [fourth]
594 };
595 assert!(validate_network_tipsets(
596 &[t4.clone(), t3.clone(), t2.clone(), t1.clone(), t0.clone()],
597 t4.key()
598 ));
599 assert!(!validate_network_tipsets(
600 &[t4.clone(), t3.clone(), t2.clone(), t1.clone(), t0.clone()],
601 t3.key()
602 ));
603 assert!(!validate_network_tipsets(
604 &[t4.clone(), t2.clone(), t1.clone(), t0.clone()],
605 t4.key()
606 ));
607 }
608}