common/peer/sync/
ping_peer.rs1use anyhow::Result;
6use uuid::Uuid;
7
8use crate::bucket_log::BucketLogProvider;
9use crate::crypto::PublicKey;
10use crate::peer::protocol::bidirectional::BidirectionalHandler;
11use crate::peer::protocol::{Ping, PingMessage};
12use crate::peer::Peer;
13
14#[derive(Debug, Clone)]
16pub struct PingPeerJob {
17 pub bucket_id: Uuid,
18 pub peer_id: PublicKey,
19}
20
21pub async fn execute<L>(peer: &Peer<L>, job: PingPeerJob) -> Result<()>
26where
27 L: BucketLogProvider + Clone + Send + Sync + 'static,
28 L::Error: std::error::Error + Send + Sync + 'static,
29{
30 tracing::info!(
31 "Processing ping job: bucket_id={}, peer_id={}",
32 job.bucket_id,
33 job.peer_id.to_hex()
34 );
35
36 let (our_link, our_height) = match peer.log_provider().head(job.bucket_id, None).await {
38 Ok((link, height)) => (link, height),
39 Err(e) => {
40 tracing::warn!(
41 "Failed to get head for bucket {} when pinging peer {}: {}",
42 job.bucket_id,
43 job.peer_id.to_hex(),
44 e
45 );
46 return Ok(());
47 }
48 };
49
50 let ping = PingMessage {
52 bucket_id: job.bucket_id,
53 link: our_link,
54 height: our_height,
55 };
56
57 tracing::info!("Sending ping to peer {}", job.peer_id.to_hex());
59 match Ping::send::<L>(peer, &job.peer_id, ping).await {
60 Ok(pong) => {
61 tracing::info!(
62 "Received pong from peer {} for bucket {} | {:?}",
63 job.peer_id.to_hex(),
64 job.bucket_id,
65 pong
66 );
67 Ok(())
68 }
69 Err(e) => {
70 tracing::debug!(
71 "Failed to ping peer {} for bucket {}: {}",
72 job.peer_id.to_hex(),
73 job.bucket_id,
74 e
75 );
76 Err(anyhow::anyhow!(
77 "Ping job failed for bucket {} to peer {}: {}",
78 job.bucket_id,
79 job.peer_id.to_hex(),
80 e
81 ))
82 }
83 }
84}