1use std::{
5 convert::TryFrom,
6 num::NonZeroU64,
7 sync::{
8 Arc, LazyLock,
9 atomic::{AtomicU64, Ordering},
10 },
11 time::{Duration, Instant},
12};
13
14use crate::{
15 blocks::{FullTipset, Tipset, TipsetKey},
16 libp2p::{
17 NetworkMessage, PeerId, PeerManager,
18 chain_exchange::{
19 ChainExchangeRequest, ChainExchangeResponse, HEADERS, MESSAGES, TipsetBundle,
20 },
21 hello::{HelloRequest, HelloResponse},
22 rpc::RequestResponseError,
23 },
24 utils::{
25 misc::{AdaptiveValueProvider, ExponentialAdaptiveValueProvider},
26 stats::Stats,
27 },
28};
29use anyhow::Context as _;
30use fvm_ipld_blockstore::Blockstore;
31use parking_lot::Mutex;
32use std::future::Future;
33use tokio::sync::Semaphore;
34use tokio::task::JoinSet;
35use tracing::{debug, trace};
36
37static CHAIN_EXCHANGE_TIMEOUT_MILLIS: LazyLock<ExponentialAdaptiveValueProvider<u64>> =
41 LazyLock::new(|| ExponentialAdaptiveValueProvider::new(5000, 2000, 60000, false));
42
43const MAX_CONCURRENT_CHAIN_EXCHANGE_REQUESTS: usize = 2;
46
47#[derive(derive_more::Constructor)]
51pub struct SyncNetworkContext<DB> {
52 network_send: flume::Sender<NetworkMessage>,
54 peer_manager: Arc<PeerManager>,
57 db: Arc<DB>,
58}
59
60impl<DB> Clone for SyncNetworkContext<DB> {
61 fn clone(&self) -> Self {
62 Self {
63 network_send: self.network_send.clone(),
64 peer_manager: self.peer_manager.clone(),
65 db: self.db.clone(),
66 }
67 }
68}
69
70struct RaceBatch<T> {
73 tasks: JoinSet<Result<T, String>>,
74 semaphore: Arc<Semaphore>,
75}
76
77impl<T> RaceBatch<T>
78where
79 T: Send + 'static,
80{
81 pub fn new(max_concurrent_jobs: usize) -> Self {
82 RaceBatch {
83 tasks: JoinSet::new(),
84 semaphore: Arc::new(Semaphore::new(max_concurrent_jobs)),
85 }
86 }
87
88 pub fn add(&mut self, future: impl Future<Output = Result<T, String>> + Send + 'static) {
89 let sem = self.semaphore.clone();
90 self.tasks.spawn(async move {
91 let permit = sem
92 .acquire_owned()
93 .await
94 .map_err(|_| "Semaphore unexpectedly closed")?;
95 let result = future.await;
96 drop(permit);
97 result
98 });
99 }
100
101 pub async fn get_ok_validated<F>(mut self, validate: F) -> Option<T>
103 where
104 F: Fn(&T) -> bool,
105 {
106 while let Some(result) = self.tasks.join_next().await {
107 if let Ok(Ok(value)) = result
108 && validate(&value)
109 {
110 return Some(value);
111 }
112 }
113 None
115 }
116}
117
118impl<DB> SyncNetworkContext<DB>
119where
120 DB: Blockstore,
121{
122 pub fn peer_manager(&self) -> &PeerManager {
124 self.peer_manager.as_ref()
125 }
126
127 pub fn network_send(&self) -> &flume::Sender<NetworkMessage> {
129 &self.network_send
130 }
131
132 pub async fn chain_exchange_headers(
136 &self,
137 peer_id: Option<PeerId>,
138 tsk: &TipsetKey,
139 count: NonZeroU64,
140 ) -> Result<Vec<Tipset>, String> {
141 self.handle_chain_exchange_request(peer_id, tsk, count, HEADERS, |tipsets: &Vec<Tipset>| {
142 validate_network_tipsets(tipsets, tsk)
143 })
144 .await
145 }
146
147 pub async fn chain_exchange_messages(
150 &self,
151 peer_id: Option<PeerId>,
152 ts: &Tipset,
153 ) -> Result<FullTipset, String> {
154 let mut bundles: Vec<TipsetBundle> = self
155 .handle_chain_exchange_request(
156 peer_id,
157 ts.key(),
158 NonZeroU64::new(1).expect("Infallible"),
159 MESSAGES,
160 |_| true,
161 )
162 .await?;
163
164 if bundles.len() != 1 {
165 return Err(format!(
166 "chain exchange request returned {} tipsets, 1 expected.",
167 bundles.len()
168 ));
169 }
170 let mut bundle = bundles.remove(0);
171 bundle.blocks = ts.block_headers().to_vec();
172 bundle.try_into()
173 }
174
175 pub async fn chain_exchange_full_tipset(
179 &self,
180 peer_id: Option<PeerId>,
181 tsk: &TipsetKey,
182 ) -> Result<FullTipset, String> {
183 let mut fts = self
184 .handle_chain_exchange_request(
185 peer_id,
186 tsk,
187 NonZeroU64::new(1).expect("Infallible"),
188 HEADERS | MESSAGES,
189 |_| true,
190 )
191 .await?;
192
193 if fts.len() != 1 {
194 return Err(format!(
195 "Full tipset request returned {} tipsets, 1 expected.",
196 fts.len()
197 ));
198 }
199 Ok(fts.remove(0))
200 }
201
202 pub async fn chain_exchange_full_tipsets(
203 &self,
204 peer_id: Option<PeerId>,
205 tsk: &TipsetKey,
206 ) -> Result<Vec<FullTipset>, String> {
207 self.handle_chain_exchange_request(
208 peer_id,
209 tsk,
210 NonZeroU64::new(16).expect("Infallible"),
211 HEADERS | MESSAGES,
212 |_| true,
213 )
214 .await
215 }
216
217 async fn handle_chain_exchange_request<T, F>(
220 &self,
221 peer_id: Option<PeerId>,
222 tsk: &TipsetKey,
223 request_len: NonZeroU64,
224 options: u64,
225 validate: F,
226 ) -> Result<Vec<T>, String>
227 where
228 T: TryFrom<TipsetBundle> + Send + Sync + 'static,
229 <T as TryFrom<TipsetBundle>>::Error: std::fmt::Display,
230 F: Fn(&Vec<T>) -> bool,
231 {
232 let request = ChainExchangeRequest {
233 start: tsk.to_cids(),
234 request_len: request_len.get(),
235 options,
236 };
237
238 let global_pre_time = Instant::now();
239 let network_failures = Arc::new(AtomicU64::new(0));
240 let lookup_failures = Arc::new(AtomicU64::new(0));
241 let chain_exchange_result = match peer_id {
242 Some(id) => Self::chain_exchange_request(
244 self.peer_manager.clone(),
245 self.network_send.clone(),
246 id,
247 request,
248 )
249 .await?
250 .into_result()?,
251 None => {
252 let peers = self.peer_manager.top_peers_shuffled();
255 if peers.is_empty() {
256 return Err("chain exchange failed: no peers are available".into());
257 }
258 let n_peers = peers.len();
259 let mut batch = RaceBatch::new(MAX_CONCURRENT_CHAIN_EXCHANGE_REQUESTS);
260 let success_time_cost_millis_stats = Arc::new(Mutex::new(Stats::new()));
261 for peer_id in peers.into_iter() {
262 let peer_manager = self.peer_manager.clone();
263 let network_send = self.network_send.clone();
264 let request = request.clone();
265 let network_failures = network_failures.clone();
266 let lookup_failures = lookup_failures.clone();
267 let success_time_cost_millis_stats = success_time_cost_millis_stats.clone();
268 batch.add(async move {
269 let start = Instant::now();
270 match Self::chain_exchange_request(
271 peer_manager,
272 network_send,
273 peer_id,
274 request,
275 )
276 .await
277 {
278 Ok(chain_exchange_result) => {
279 match chain_exchange_result.into_result::<T>() {
280 Ok(r) => {
281 success_time_cost_millis_stats.lock().update(
282 start.elapsed().as_millis()
283 );
284 Ok(r)
285 }
286 Err(error) => {
287 lookup_failures.fetch_add(1, Ordering::Relaxed);
288 debug!(%peer_id, %request_len, %options, %n_peers, %error, "Failed chain_exchange response");
289 Err(error)
290 }
291 }
292 }
293 Err(error) => {
294 network_failures.fetch_add(1, Ordering::Relaxed);
295 debug!(%peer_id, %request_len, %options, %n_peers, %error, "Failed chain_exchange request to peer");
296 Err(error)
297 }
298 }
299 });
300 }
301
302 let make_failure_message = || {
303 CHAIN_EXCHANGE_TIMEOUT_MILLIS.adapt_on_failure();
304 tracing::debug!(
305 "Increased chain exchange timeout to {}ms",
306 CHAIN_EXCHANGE_TIMEOUT_MILLIS.get()
307 );
308 let mut message = String::new();
309 message.push_str("ChainExchange request failed for all top peers. ");
310 message.push_str(&format!(
311 "{} network failures, ",
312 network_failures.load(Ordering::Relaxed)
313 ));
314 message.push_str(&format!(
315 "{} lookup failures, ",
316 lookup_failures.load(Ordering::Relaxed)
317 ));
318 message.push_str(&format!("request:\n{request:?}",));
319 message
320 };
321
322 let v = batch
323 .get_ok_validated(validate)
324 .await
325 .ok_or_else(make_failure_message)?;
326 if let Ok(mean) = success_time_cost_millis_stats.lock().mean()
327 && CHAIN_EXCHANGE_TIMEOUT_MILLIS.adapt_on_success(mean as _)
328 {
329 tracing::debug!(
330 "Decreased chain exchange timeout to {}ms. Current average: {}ms",
331 CHAIN_EXCHANGE_TIMEOUT_MILLIS.get(),
332 mean,
333 );
334 }
335 trace!("Succeed: handle_chain_exchange_request");
336 v
337 }
338 };
339
340 self.peer_manager
342 .log_global_success(Instant::now().duration_since(global_pre_time));
343
344 Ok(chain_exchange_result)
345 }
346
347 async fn chain_exchange_request(
349 peer_manager: Arc<PeerManager>,
350 network_send: flume::Sender<NetworkMessage>,
351 peer_id: PeerId,
352 request: ChainExchangeRequest,
353 ) -> Result<ChainExchangeResponse, String> {
354 trace!("Sending ChainExchange Request to {peer_id}");
355
356 let req_pre_time = Instant::now();
357
358 let (tx, rx) = flume::bounded(1);
359 if network_send
360 .send_async(NetworkMessage::ChainExchangeRequest {
361 peer_id,
362 request,
363 response_channel: tx,
364 })
365 .await
366 .is_err()
367 {
368 return Err("Failed to send chain exchange request to network".to_string());
369 };
370
371 let res = tokio::task::spawn_blocking(move || {
375 rx.recv_timeout(Duration::from_millis(CHAIN_EXCHANGE_TIMEOUT_MILLIS.get()))
376 })
377 .await;
378 let res_duration = Instant::now().duration_since(req_pre_time);
379 match res {
380 Ok(Ok(Ok(bs_res))) => {
381 peer_manager.log_success(&peer_id, res_duration);
383 trace!("Succeeded: ChainExchange Request to {peer_id}");
384 Ok(bs_res)
385 }
386 Ok(Ok(Err(e))) => {
387 match e {
389 RequestResponseError::UnsupportedProtocols => {
390 peer_manager
392 .ban_peer_with_default_duration(
393 peer_id,
394 "ChainExchange protocol unsupported",
395 |_| None,
396 )
397 .await;
398 }
399 RequestResponseError::ConnectionClosed | RequestResponseError::DialFailure => {
400 peer_manager.mark_peer_bad(peer_id, format!("chain exchange error {e:?}"));
401 }
402 RequestResponseError::Timeout | RequestResponseError::Io(_) => {
405 peer_manager.log_failure(&peer_id, res_duration);
406 }
407 }
408 debug!("Failed: ChainExchange Request to {peer_id}");
409 Err(format!("Internal libp2p error: {e:?}"))
410 }
411 Ok(Err(_)) | Err(_) => {
412 peer_manager.log_failure(&peer_id, res_duration);
415 debug!("Timeout: ChainExchange Request to {peer_id}");
416 Err(format!("Chain exchange request to {peer_id} timed out"))
417 }
418 }
419 }
420
421 pub async fn hello_request(
424 &self,
425 peer_id: PeerId,
426 request: HelloRequest,
427 ) -> anyhow::Result<(PeerId, Instant, Option<HelloResponse>)> {
428 trace!("Sending Hello Message to {}", peer_id);
429
430 let (tx, rx) = flume::bounded(1);
432
433 self.network_send
435 .send_async(NetworkMessage::HelloRequest {
436 peer_id,
437 request,
438 response_channel: tx,
439 })
440 .await
441 .context("Failed to send hello request: receiver dropped")?;
442
443 const HELLO_TIMEOUT: Duration = Duration::from_secs(30);
444 let sent = Instant::now();
445 let res = tokio::task::spawn_blocking(move || rx.recv_timeout(HELLO_TIMEOUT))
446 .await?
447 .ok();
448 Ok((peer_id, sent, res))
449 }
450}
451
452fn validate_network_tipsets(tipsets: &[Tipset], start_tipset_key: &TipsetKey) -> bool {
456 if let Some(start) = tipsets.first() {
457 if start.key() != start_tipset_key {
458 tracing::warn!(epoch=%start.epoch(), expected=%start_tipset_key, actual=%start.key(), "start tipset key mismatch");
459 return false;
460 }
461 for (ts, pts) in tipsets.iter().zip(tipsets.iter().skip(1)) {
462 if ts.parents() != pts.key() {
463 tracing::warn!(epoch=%ts.epoch(), expected_parent=%pts.key(), actual_parent=%ts.parents(), "invalid chain");
464 return false;
465 }
466 }
467 true
468 } else {
469 tracing::warn!("invalid empty chain_exchange_headers response");
470 false
471 }
472}
473
474#[cfg(test)]
475mod tests {
476 use super::*;
477
478 use std::sync::atomic::{AtomicBool, AtomicUsize};
479
480 impl<T> RaceBatch<T>
481 where
482 T: Send + 'static,
483 {
484 pub async fn get_ok(self) -> Option<T> {
485 self.get_ok_validated(|_| true).await
486 }
487 }
488
489 #[tokio::test]
490 async fn race_batch_ok() {
491 let mut batch = RaceBatch::new(3);
492 batch.add(async move { Ok(1) });
493 batch.add(async move { Err("kaboom".into()) });
494
495 assert_eq!(batch.get_ok().await, Some(1));
496 }
497
498 #[tokio::test]
499 async fn race_batch_ok_faster() {
500 let mut batch = RaceBatch::new(3);
501 batch.add(async move {
502 tokio::time::sleep(Duration::from_secs(100)).await;
503 Ok(1)
504 });
505 batch.add(async move { Ok(2) });
506 batch.add(async move { Err("kaboom".into()) });
507
508 assert_eq!(batch.get_ok().await, Some(2));
509 }
510
511 #[tokio::test]
512 async fn race_batch_none() {
513 let mut batch: RaceBatch<i32> = RaceBatch::new(3);
514 batch.add(async move { Err("kaboom".into()) });
515 batch.add(async move { Err("banana".into()) });
516
517 assert_eq!(batch.get_ok().await, None);
518 }
519
520 #[tokio::test]
521 async fn race_batch_semaphore() {
522 const MAX_JOBS: usize = 30;
523 let counter = Arc::new(AtomicUsize::new(0));
524 let exceeded = Arc::new(AtomicBool::new(false));
525
526 let mut batch: RaceBatch<i32> = RaceBatch::new(MAX_JOBS);
527 for _ in 0..10000 {
528 let c = counter.clone();
529 let e = exceeded.clone();
530 batch.add(async move {
531 let prev = c.fetch_add(1, Ordering::Relaxed);
532 if prev >= MAX_JOBS {
533 e.fetch_or(true, Ordering::Relaxed);
534 }
535
536 tokio::task::yield_now().await;
537 c.fetch_sub(1, Ordering::Relaxed);
538
539 Err("banana".into())
540 });
541 }
542
543 assert_eq!(batch.get_ok().await, None);
544 assert!(!exceeded.load(Ordering::Relaxed));
545 }
546
547 #[tokio::test]
548 async fn race_batch_semaphore_exceeded() {
549 const MAX_JOBS: usize = 30;
550 let counter = Arc::new(AtomicUsize::new(0));
551 let exceeded = Arc::new(AtomicBool::new(false));
552
553 let mut batch: RaceBatch<i32> = RaceBatch::new(MAX_JOBS + 1);
555 for _ in 0..10000 {
556 let c = counter.clone();
557 let e = exceeded.clone();
558 batch.add(async move {
559 let prev = c.fetch_add(1, Ordering::Relaxed);
560 if prev >= MAX_JOBS {
561 e.fetch_or(true, Ordering::Relaxed);
562 }
563
564 tokio::task::yield_now().await;
565 c.fetch_sub(1, Ordering::Relaxed);
566
567 Err("banana".into())
568 });
569 }
570
571 assert_eq!(batch.get_ok().await, None);
572 assert!(exceeded.load(Ordering::Relaxed));
573 }
574
575 #[test]
576 #[allow(unused_variables)]
577 fn validate_network_tipsets_tests() {
578 use crate::blocks::{Chain4U, chain4u};
579
580 let c4u = Chain4U::new();
581 chain4u! {
582 in c4u;
583 t0 @ [genesis_header]
584 -> t1 @ [first_header]
585 -> t2 @ [second_left, second_right]
586 -> t3 @ [third]
587 -> t4 @ [fourth]
588 };
589 assert!(validate_network_tipsets(
590 &[t4.clone(), t3.clone(), t2.clone(), t1.clone(), t0.clone()],
591 t4.key()
592 ));
593 assert!(!validate_network_tipsets(
594 &[t4.clone(), t3.clone(), t2.clone(), t1.clone(), t0.clone()],
595 t3.key()
596 ));
597 assert!(!validate_network_tipsets(
598 &[t4.clone(), t2.clone(), t1.clone(), t0.clone()],
599 t4.key()
600 ));
601 }
602}