Skip to main content

common/peer/sync/
ping_peer.rs

1//! Peer ping job and execution logic
2//!
3//! This module contains the logic for pinging peers to check sync status.
4
5use std::time::Duration;
6
7use anyhow::Result;
8use uuid::Uuid;
9
10use crate::bucket_log::BucketLogProvider;
11use crate::crypto::PublicKey;
12use crate::peer::protocol::bidirectional::BidirectionalHandler;
13use crate::peer::protocol::{Ping, PingMessage};
14use crate::peer::Peer;
15
16/// Timeout for ping operations. Pings are lightweight status checks — if a peer
17/// doesn't respond within this window it's likely offline.
18const PING_TIMEOUT: Duration = Duration::from_secs(5);
19
20/// Ping peer job definition
21#[derive(Debug, Clone)]
22pub struct PingPeerJob {
23    pub bucket_id: Uuid,
24    pub peer_id: PublicKey,
25}
26
27/// Execute a ping peer job
28///
29/// This sends a ping to the specified peer with our current bucket state
30/// and processes the response.
31pub async fn execute<L>(peer: &Peer<L>, job: PingPeerJob) -> Result<()>
32where
33    L: BucketLogProvider + Clone + Send + Sync + 'static,
34    L::Error: std::error::Error + Send + Sync + 'static,
35{
36    tracing::info!(
37        "Processing ping job: bucket_id={}, peer_id={}",
38        job.bucket_id,
39        job.peer_id.to_hex()
40    );
41
42    // Get our bucket state
43    let (our_link, our_height) = match peer.log_provider().head(job.bucket_id, None).await {
44        Ok((link, height)) => (link, height),
45        Err(e) => {
46            tracing::warn!(
47                "Failed to get head for bucket {} when pinging peer {}: {}",
48                job.bucket_id,
49                job.peer_id.to_hex(),
50                e
51            );
52            return Ok(());
53        }
54    };
55
56    // Construct ping
57    let ping = PingMessage {
58        bucket_id: job.bucket_id,
59        link: our_link,
60        height: our_height,
61    };
62
63    // Send ping with timeout — peer unavailability is expected, not an error
64    tracing::info!("Sending ping to peer {}", job.peer_id.to_hex());
65    match tokio::time::timeout(PING_TIMEOUT, Ping::send::<L>(peer, &job.peer_id, ping)).await {
66        Ok(Ok(pong)) => {
67            tracing::info!(
68                "Received pong from peer {} for bucket {} | {:?}",
69                job.peer_id.to_hex(),
70                job.bucket_id,
71                pong
72            );
73            Ok(())
74        }
75        Ok(Err(e)) => {
76            tracing::debug!(
77                "Failed to ping peer {} for bucket {}: {}",
78                job.peer_id.to_hex(),
79                job.bucket_id,
80                e
81            );
82            Ok(())
83        }
84        Err(_) => {
85            tracing::debug!(
86                "Ping to peer {} for bucket {} timed out after {}s",
87                job.peer_id.to_hex(),
88                job.bucket_id,
89                PING_TIMEOUT.as_secs()
90            );
91            Ok(())
92        }
93    }
94}