1use std::{
5 convert::TryFrom,
6 num::NonZeroU64,
7 sync::{
8 Arc, LazyLock,
9 atomic::{AtomicU64, Ordering},
10 },
11 time::{Duration, Instant},
12};
13
14use crate::{
15 blocks::{FullTipset, Tipset, TipsetKey},
16 libp2p::{
17 NetworkMessage, PeerId, PeerManager,
18 chain_exchange::{
19 ChainExchangeRequest, ChainExchangeResponse, HEADERS, MESSAGES, TipsetBundle,
20 },
21 hello::{HelloRequest, HelloResponse},
22 rpc::RequestResponseError,
23 },
24 utils::{
25 misc::{AdaptiveValueProvider, ExponentialAdaptiveValueProvider},
26 stats::Stats,
27 },
28};
29use anyhow::Context as _;
30use fvm_ipld_blockstore::Blockstore;
31use parking_lot::Mutex;
32use std::future::Future;
33use tokio::sync::Semaphore;
34use tokio::task::JoinSet;
35use tracing::{debug, trace};
36
37static CHAIN_EXCHANGE_TIMEOUT_MILLIS: LazyLock<ExponentialAdaptiveValueProvider<u64>> =
41 LazyLock::new(|| ExponentialAdaptiveValueProvider::new(5000, 2000, 60000, false));
42
43const MAX_CONCURRENT_CHAIN_EXCHANGE_REQUESTS: usize = 2;
46
47pub struct SyncNetworkContext<DB> {
51 network_send: flume::Sender<NetworkMessage>,
53 peer_manager: Arc<PeerManager>,
56 db: Arc<DB>,
57}
58
59impl<DB> Clone for SyncNetworkContext<DB> {
60 fn clone(&self) -> Self {
61 Self {
62 network_send: self.network_send.clone(),
63 peer_manager: self.peer_manager.clone(),
64 db: self.db.clone(),
65 }
66 }
67}
68
69struct RaceBatch<T> {
72 tasks: JoinSet<Result<T, String>>,
73 semaphore: Arc<Semaphore>,
74}
75
76impl<T> RaceBatch<T>
77where
78 T: Send + 'static,
79{
80 pub fn new(max_concurrent_jobs: usize) -> Self {
81 RaceBatch {
82 tasks: JoinSet::new(),
83 semaphore: Arc::new(Semaphore::new(max_concurrent_jobs)),
84 }
85 }
86
87 pub fn add(&mut self, future: impl Future<Output = Result<T, String>> + Send + 'static) {
88 let sem = self.semaphore.clone();
89 self.tasks.spawn(async move {
90 let permit = sem
91 .acquire_owned()
92 .await
93 .map_err(|_| "Semaphore unexpectedly closed")?;
94 let result = future.await;
95 drop(permit);
96 result
97 });
98 }
99
100 pub async fn get_ok_validated<F>(mut self, validate: F) -> Option<T>
102 where
103 F: Fn(&T) -> bool,
104 {
105 while let Some(result) = self.tasks.join_next().await {
106 if let Ok(Ok(value)) = result
107 && validate(&value)
108 {
109 return Some(value);
110 }
111 }
112 None
114 }
115}
116
117impl<DB> SyncNetworkContext<DB>
118where
119 DB: Blockstore,
120{
121 pub fn new(
122 network_send: flume::Sender<NetworkMessage>,
123 peer_manager: Arc<PeerManager>,
124 db: Arc<DB>,
125 ) -> Self {
126 Self {
127 network_send,
128 peer_manager,
129 db,
130 }
131 }
132
133 pub fn peer_manager(&self) -> &PeerManager {
135 self.peer_manager.as_ref()
136 }
137
138 pub fn network_send(&self) -> &flume::Sender<NetworkMessage> {
140 &self.network_send
141 }
142
143 pub async fn chain_exchange_headers(
147 &self,
148 peer_id: Option<PeerId>,
149 tsk: &TipsetKey,
150 count: NonZeroU64,
151 ) -> Result<Vec<Arc<Tipset>>, String> {
152 self.handle_chain_exchange_request(
153 peer_id,
154 tsk,
155 count,
156 HEADERS,
157 |tipsets: &Vec<Arc<Tipset>>| validate_network_tipsets(tipsets, tsk),
158 )
159 .await
160 }
161
162 pub async fn chain_exchange_messages(
165 &self,
166 peer_id: Option<PeerId>,
167 ts: &Tipset,
168 ) -> Result<FullTipset, String> {
169 let mut bundles: Vec<TipsetBundle> = self
170 .handle_chain_exchange_request(
171 peer_id,
172 ts.key(),
173 NonZeroU64::new(1).expect("Infallible"),
174 MESSAGES,
175 |_| true,
176 )
177 .await?;
178
179 if bundles.len() != 1 {
180 return Err(format!(
181 "chain exchange request returned {} tipsets, 1 expected.",
182 bundles.len()
183 ));
184 }
185 let mut bundle = bundles.remove(0);
186 bundle.blocks = ts.block_headers().to_vec();
187 bundle.try_into()
188 }
189
190 pub async fn chain_exchange_full_tipset(
194 &self,
195 peer_id: Option<PeerId>,
196 tsk: &TipsetKey,
197 ) -> Result<FullTipset, String> {
198 let mut fts = self
199 .handle_chain_exchange_request(
200 peer_id,
201 tsk,
202 NonZeroU64::new(1).expect("Infallible"),
203 HEADERS | MESSAGES,
204 |_| true,
205 )
206 .await?;
207
208 if fts.len() != 1 {
209 return Err(format!(
210 "Full tipset request returned {} tipsets, 1 expected.",
211 fts.len()
212 ));
213 }
214 Ok(fts.remove(0))
215 }
216
217 pub async fn chain_exchange_full_tipsets(
218 &self,
219 peer_id: Option<PeerId>,
220 tsk: &TipsetKey,
221 ) -> Result<Vec<FullTipset>, String> {
222 self.handle_chain_exchange_request(
223 peer_id,
224 tsk,
225 NonZeroU64::new(16).expect("Infallible"),
226 HEADERS | MESSAGES,
227 |_| true,
228 )
229 .await
230 }
231
232 async fn handle_chain_exchange_request<T, F>(
235 &self,
236 peer_id: Option<PeerId>,
237 tsk: &TipsetKey,
238 request_len: NonZeroU64,
239 options: u64,
240 validate: F,
241 ) -> Result<Vec<T>, String>
242 where
243 T: TryFrom<TipsetBundle> + Send + Sync + 'static,
244 <T as TryFrom<TipsetBundle>>::Error: std::fmt::Display,
245 F: Fn(&Vec<T>) -> bool,
246 {
247 let request = ChainExchangeRequest {
248 start: tsk.to_cids(),
249 request_len: request_len.get(),
250 options,
251 };
252
253 let global_pre_time = Instant::now();
254 let network_failures = Arc::new(AtomicU64::new(0));
255 let lookup_failures = Arc::new(AtomicU64::new(0));
256 let chain_exchange_result = match peer_id {
257 Some(id) => Self::chain_exchange_request(
259 self.peer_manager.clone(),
260 self.network_send.clone(),
261 id,
262 request,
263 )
264 .await?
265 .into_result()?,
266 None => {
267 let peers = self.peer_manager.top_peers_shuffled();
270 if peers.is_empty() {
271 return Err("chain exchange failed: no peers are available".into());
272 }
273 let n_peers = peers.len();
274 let mut batch = RaceBatch::new(MAX_CONCURRENT_CHAIN_EXCHANGE_REQUESTS);
275 let success_time_cost_millis_stats = Arc::new(Mutex::new(Stats::new()));
276 for peer_id in peers.into_iter() {
277 let peer_manager = self.peer_manager.clone();
278 let network_send = self.network_send.clone();
279 let request = request.clone();
280 let network_failures = network_failures.clone();
281 let lookup_failures = lookup_failures.clone();
282 let success_time_cost_millis_stats = success_time_cost_millis_stats.clone();
283 batch.add(async move {
284 let start = Instant::now();
285 match Self::chain_exchange_request(
286 peer_manager,
287 network_send,
288 peer_id,
289 request,
290 )
291 .await
292 {
293 Ok(chain_exchange_result) => {
294 match chain_exchange_result.into_result::<T>() {
295 Ok(r) => {
296 success_time_cost_millis_stats.lock().update(
297 start.elapsed().as_millis()
298 );
299 Ok(r)
300 }
301 Err(error) => {
302 lookup_failures.fetch_add(1, Ordering::Relaxed);
303 debug!(%peer_id, %request_len, %options, %n_peers, %error, "Failed chain_exchange response");
304 Err(error)
305 }
306 }
307 }
308 Err(error) => {
309 network_failures.fetch_add(1, Ordering::Relaxed);
310 debug!(%peer_id, %request_len, %options, %n_peers, %error, "Failed chain_exchange request to peer");
311 Err(error)
312 }
313 }
314 });
315 }
316
317 let make_failure_message = || {
318 CHAIN_EXCHANGE_TIMEOUT_MILLIS.adapt_on_failure();
319 tracing::debug!(
320 "Increased chain exchange timeout to {}ms",
321 CHAIN_EXCHANGE_TIMEOUT_MILLIS.get()
322 );
323 let mut message = String::new();
324 message.push_str("ChainExchange request failed for all top peers. ");
325 message.push_str(&format!(
326 "{} network failures, ",
327 network_failures.load(Ordering::Relaxed)
328 ));
329 message.push_str(&format!(
330 "{} lookup failures, ",
331 lookup_failures.load(Ordering::Relaxed)
332 ));
333 message.push_str(&format!("request:\n{request:?}",));
334 message
335 };
336
337 let v = batch
338 .get_ok_validated(validate)
339 .await
340 .ok_or_else(make_failure_message)?;
341 if let Ok(mean) = success_time_cost_millis_stats.lock().mean()
342 && CHAIN_EXCHANGE_TIMEOUT_MILLIS.adapt_on_success(mean as _)
343 {
344 tracing::debug!(
345 "Decreased chain exchange timeout to {}ms. Current average: {}ms",
346 CHAIN_EXCHANGE_TIMEOUT_MILLIS.get(),
347 mean,
348 );
349 }
350 trace!("Succeed: handle_chain_exchange_request");
351 v
352 }
353 };
354
355 self.peer_manager
357 .log_global_success(Instant::now().duration_since(global_pre_time));
358
359 Ok(chain_exchange_result)
360 }
361
362 async fn chain_exchange_request(
364 peer_manager: Arc<PeerManager>,
365 network_send: flume::Sender<NetworkMessage>,
366 peer_id: PeerId,
367 request: ChainExchangeRequest,
368 ) -> Result<ChainExchangeResponse, String> {
369 trace!("Sending ChainExchange Request to {peer_id}");
370
371 let req_pre_time = Instant::now();
372
373 let (tx, rx) = flume::bounded(1);
374 if network_send
375 .send_async(NetworkMessage::ChainExchangeRequest {
376 peer_id,
377 request,
378 response_channel: tx,
379 })
380 .await
381 .is_err()
382 {
383 return Err("Failed to send chain exchange request to network".to_string());
384 };
385
386 let res = tokio::task::spawn_blocking(move || {
390 rx.recv_timeout(Duration::from_millis(CHAIN_EXCHANGE_TIMEOUT_MILLIS.get()))
391 })
392 .await;
393 let res_duration = Instant::now().duration_since(req_pre_time);
394 match res {
395 Ok(Ok(Ok(bs_res))) => {
396 peer_manager.log_success(&peer_id, res_duration);
398 trace!("Succeeded: ChainExchange Request to {peer_id}");
399 Ok(bs_res)
400 }
401 Ok(Ok(Err(e))) => {
402 match e {
404 RequestResponseError::UnsupportedProtocols => {
405 peer_manager
407 .ban_peer_with_default_duration(
408 peer_id,
409 "ChainExchange protocol unsupported",
410 |_| None,
411 )
412 .await;
413 }
414 RequestResponseError::ConnectionClosed | RequestResponseError::DialFailure => {
415 peer_manager.mark_peer_bad(peer_id, format!("chain exchange error {e:?}"));
416 }
417 RequestResponseError::Timeout | RequestResponseError::Io(_) => {
420 peer_manager.log_failure(&peer_id, res_duration);
421 }
422 }
423 debug!("Failed: ChainExchange Request to {peer_id}");
424 Err(format!("Internal libp2p error: {e:?}"))
425 }
426 Ok(Err(_)) | Err(_) => {
427 peer_manager.log_failure(&peer_id, res_duration);
430 debug!("Timeout: ChainExchange Request to {peer_id}");
431 Err(format!("Chain exchange request to {peer_id} timed out"))
432 }
433 }
434 }
435
436 pub async fn hello_request(
439 &self,
440 peer_id: PeerId,
441 request: HelloRequest,
442 ) -> anyhow::Result<(PeerId, Instant, Option<HelloResponse>)> {
443 trace!("Sending Hello Message to {}", peer_id);
444
445 let (tx, rx) = flume::bounded(1);
447
448 self.network_send
450 .send_async(NetworkMessage::HelloRequest {
451 peer_id,
452 request,
453 response_channel: tx,
454 })
455 .await
456 .context("Failed to send hello request: receiver dropped")?;
457
458 const HELLO_TIMEOUT: Duration = Duration::from_secs(30);
459 let sent = Instant::now();
460 let res = tokio::task::spawn_blocking(move || rx.recv_timeout(HELLO_TIMEOUT))
461 .await?
462 .ok();
463 Ok((peer_id, sent, res))
464 }
465}
466
467fn validate_network_tipsets(tipsets: &[Arc<Tipset>], start_tipset_key: &TipsetKey) -> bool {
471 if let Some(start) = tipsets.first() {
472 if start.key() != start_tipset_key {
473 tracing::warn!(epoch=%start.epoch(), expected=%start_tipset_key, actual=%start.key(), "start tipset key mismatch");
474 return false;
475 }
476 for (ts, pts) in tipsets.iter().zip(tipsets.iter().skip(1)) {
477 if ts.parents() != pts.key() {
478 tracing::warn!(epoch=%ts.epoch(), expected_parent=%pts.key(), actual_parent=%ts.parents(), "invalid chain");
479 return false;
480 }
481 }
482 true
483 } else {
484 tracing::warn!("invalid empty chain_exchange_headers response");
485 false
486 }
487}
488
489#[cfg(test)]
490mod tests {
491 use super::*;
492
493 use std::sync::atomic::{AtomicBool, AtomicUsize};
494
495 impl<T> RaceBatch<T>
496 where
497 T: Send + 'static,
498 {
499 pub async fn get_ok(self) -> Option<T> {
500 self.get_ok_validated(|_| true).await
501 }
502 }
503
504 #[tokio::test]
505 async fn race_batch_ok() {
506 let mut batch = RaceBatch::new(3);
507 batch.add(async move { Ok(1) });
508 batch.add(async move { Err("kaboom".into()) });
509
510 assert_eq!(batch.get_ok().await, Some(1));
511 }
512
513 #[tokio::test]
514 async fn race_batch_ok_faster() {
515 let mut batch = RaceBatch::new(3);
516 batch.add(async move {
517 tokio::time::sleep(Duration::from_secs(100)).await;
518 Ok(1)
519 });
520 batch.add(async move { Ok(2) });
521 batch.add(async move { Err("kaboom".into()) });
522
523 assert_eq!(batch.get_ok().await, Some(2));
524 }
525
526 #[tokio::test]
527 async fn race_batch_none() {
528 let mut batch: RaceBatch<i32> = RaceBatch::new(3);
529 batch.add(async move { Err("kaboom".into()) });
530 batch.add(async move { Err("banana".into()) });
531
532 assert_eq!(batch.get_ok().await, None);
533 }
534
535 #[tokio::test]
536 async fn race_batch_semaphore() {
537 const MAX_JOBS: usize = 30;
538 let counter = Arc::new(AtomicUsize::new(0));
539 let exceeded = Arc::new(AtomicBool::new(false));
540
541 let mut batch: RaceBatch<i32> = RaceBatch::new(MAX_JOBS);
542 for _ in 0..10000 {
543 let c = counter.clone();
544 let e = exceeded.clone();
545 batch.add(async move {
546 let prev = c.fetch_add(1, Ordering::Relaxed);
547 if prev >= MAX_JOBS {
548 e.fetch_or(true, Ordering::Relaxed);
549 }
550
551 tokio::task::yield_now().await;
552 c.fetch_sub(1, Ordering::Relaxed);
553
554 Err("banana".into())
555 });
556 }
557
558 assert_eq!(batch.get_ok().await, None);
559 assert!(!exceeded.load(Ordering::Relaxed));
560 }
561
562 #[tokio::test]
563 async fn race_batch_semaphore_exceeded() {
564 const MAX_JOBS: usize = 30;
565 let counter = Arc::new(AtomicUsize::new(0));
566 let exceeded = Arc::new(AtomicBool::new(false));
567
568 let mut batch: RaceBatch<i32> = RaceBatch::new(MAX_JOBS + 1);
570 for _ in 0..10000 {
571 let c = counter.clone();
572 let e = exceeded.clone();
573 batch.add(async move {
574 let prev = c.fetch_add(1, Ordering::Relaxed);
575 if prev >= MAX_JOBS {
576 e.fetch_or(true, Ordering::Relaxed);
577 }
578
579 tokio::task::yield_now().await;
580 c.fetch_sub(1, Ordering::Relaxed);
581
582 Err("banana".into())
583 });
584 }
585
586 assert_eq!(batch.get_ok().await, None);
587 assert!(exceeded.load(Ordering::Relaxed));
588 }
589
590 #[test]
591 #[allow(unused_variables)]
592 fn validate_network_tipsets_tests() {
593 use crate::blocks::{Chain4U, chain4u};
594
595 let c4u = Chain4U::new();
596 chain4u! {
597 in c4u;
598 t0 @ [genesis_header]
599 -> t1 @ [first_header]
600 -> t2 @ [second_left, second_right]
601 -> t3 @ [third]
602 -> t4 @ [fourth]
603 };
604 let t0 = Arc::new(t0.clone());
605 let t1 = Arc::new(t1.clone());
606 let t2 = Arc::new(t2.clone());
607 let t3 = Arc::new(t3.clone());
608 let t4 = Arc::new(t4.clone());
609 assert!(validate_network_tipsets(
610 &[t4.clone(), t3.clone(), t2.clone(), t1.clone(), t0.clone()],
611 t4.key()
612 ));
613 assert!(!validate_network_tipsets(
614 &[t4.clone(), t3.clone(), t2.clone(), t1.clone(), t0.clone()],
615 t3.key()
616 ));
617 assert!(!validate_network_tipsets(
618 &[t4.clone(), t2.clone(), t1.clone(), t0.clone()],
619 t4.key()
620 ));
621 }
622}