common/peer/sync/
ping_peer.rs1use std::time::Duration;
6
7use anyhow::Result;
8use uuid::Uuid;
9
10use crate::bucket_log::BucketLogProvider;
11use crate::crypto::PublicKey;
12use crate::peer::protocol::bidirectional::BidirectionalHandler;
13use crate::peer::protocol::{Ping, PingMessage};
14use crate::peer::Peer;
15
16const PING_TIMEOUT: Duration = Duration::from_secs(5);
19
20#[derive(Debug, Clone)]
22pub struct PingPeerJob {
23 pub bucket_id: Uuid,
24 pub peer_id: PublicKey,
25}
26
27pub async fn execute<L>(peer: &Peer<L>, job: PingPeerJob) -> Result<()>
32where
33 L: BucketLogProvider + Clone + Send + Sync + 'static,
34 L::Error: std::error::Error + Send + Sync + 'static,
35{
36 tracing::info!(
37 "Processing ping job: bucket_id={}, peer_id={}",
38 job.bucket_id,
39 job.peer_id.to_hex()
40 );
41
42 let (our_link, our_height) = match peer.log_provider().head(job.bucket_id, None).await {
44 Ok((link, height)) => (link, height),
45 Err(e) => {
46 tracing::warn!(
47 "Failed to get head for bucket {} when pinging peer {}: {}",
48 job.bucket_id,
49 job.peer_id.to_hex(),
50 e
51 );
52 return Ok(());
53 }
54 };
55
56 let ping = PingMessage {
58 bucket_id: job.bucket_id,
59 link: our_link,
60 height: our_height,
61 };
62
63 tracing::info!("Sending ping to peer {}", job.peer_id.to_hex());
65 match tokio::time::timeout(PING_TIMEOUT, Ping::send::<L>(peer, &job.peer_id, ping)).await {
66 Ok(Ok(pong)) => {
67 tracing::info!(
68 "Received pong from peer {} for bucket {} | {:?}",
69 job.peer_id.to_hex(),
70 job.bucket_id,
71 pong
72 );
73 Ok(())
74 }
75 Ok(Err(e)) => {
76 tracing::debug!(
77 "Failed to ping peer {} for bucket {}: {}",
78 job.peer_id.to_hex(),
79 job.bucket_id,
80 e
81 );
82 Ok(())
83 }
84 Err(_) => {
85 tracing::debug!(
86 "Ping to peer {} for bucket {} timed out after {}s",
87 job.peer_id.to_hex(),
88 job.bucket_id,
89 PING_TIMEOUT.as_secs()
90 );
91 Ok(())
92 }
93 }
94}