common/peer/protocol/messages/
ping.rs

1use anyhow::Result;
2use serde::{Deserialize, Serialize};
3use uuid::Uuid;
4
5use crate::bucket_log::BucketLogProvider;
6use crate::crypto::PublicKey;
7use crate::linked_data::Link;
8use crate::peer::protocol::bidirectional::BidirectionalHandler;
9use crate::peer::protocol::messages::Message;
10use crate::peer::Peer;
11
12/// Request to ping a peer and check bucket sync status
13#[derive(Debug, Clone, Serialize, Deserialize)]
14pub struct PingMessage {
15    /// The bucket ID to check
16    pub bucket_id: Uuid,
17    /// The current link the requesting peer has for this bucket
18    pub link: Link,
19    /// The height of the link we are responding to
20    pub height: u64,
21}
22
23/// Sync status between two peers for a bucket
24#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
25pub enum PingReplyStatus {
26    /// The peer does not have this bucket at all
27    NotFound,
28    /// We are ahead of the current peer's history,
29    ///  report where we are
30    Ahead(Link, u64),
31    /// We are behind, report where we are
32    Behind(Link, u64),
33    /// Both agree on the current link (in sync)
34    InSync,
35}
36
37/// Response to a ping request
38#[derive(Debug, Clone, Serialize, Deserialize)]
39pub struct PingReply {
40    /// The bucket ID being responded to
41    pub bucket_id: Uuid,
42    /// The sync status
43    pub status: PingReplyStatus,
44}
45
46impl PingReply {
47    /// Create a new pong message indicating bucket not found
48    pub fn not_found(bucket_id: Uuid) -> Self {
49        Self {
50            bucket_id,
51            status: PingReplyStatus::NotFound,
52        }
53    }
54
55    /// Create a new pong message indicating we are ahead
56    pub fn ahead(bucket_id: Uuid, link: Link, height: u64) -> Self {
57        Self {
58            bucket_id,
59            status: PingReplyStatus::Ahead(link, height),
60        }
61    }
62
63    /// Create a new pong message indicating we are behind
64    pub fn behind(bucket_id: Uuid, link: Link, height: u64) -> Self {
65        Self {
66            bucket_id,
67            status: PingReplyStatus::Behind(link, height),
68        }
69    }
70
71    /// Create a new pong message indicating we are in sync
72    pub fn in_sync(bucket_id: Uuid) -> Self {
73        Self {
74            bucket_id,
75            status: PingReplyStatus::InSync,
76        }
77    }
78}
79
80/// Ping handler implementing the bidirectional handler pattern
81///
82/// This demonstrates the complete protocol flow in one place:
83/// - Responder: what to send back + side effects after sending
84/// - Initiator: what to do with the response
85pub struct Ping;
86
87impl BidirectionalHandler for Ping {
88    type Message = PingMessage;
89    type Reply = PingReply;
90
91    /// Wrap the request in the Message enum for proper serialization
92    fn wrap_request(request: Self::Message) -> Message {
93        Message::Ping(request)
94    }
95
96    // ========================================
97    // RESPONDER SIDE: When we receive a ping
98    // ========================================
99
100    /// Generate response: compare our state with peer's state
101    async fn handle_message<L: BucketLogProvider>(
102        peer: &Peer<L>,
103        _sender_node_id: &PublicKey,
104        ping: &PingMessage,
105    ) -> PingReply {
106        let logs = peer.logs();
107        let bucket_id = ping.bucket_id;
108
109        // Try to get our head for this bucket
110        let (link, height) = match logs.head(bucket_id, None).await {
111            Ok((link, height)) => (link, height),
112            Err(_) => {
113                // We don't have this bucket, return NotFound
114                return PingReply::not_found(bucket_id);
115            }
116        };
117
118        // Compare heights and determine sync status
119        if height < ping.height {
120            PingReply::behind(bucket_id, link, height)
121        } else if height == ping.height {
122            // At same height, we're in sync
123            PingReply::in_sync(bucket_id)
124        } else {
125            // We're ahead of the remote peer
126            PingReply::ahead(bucket_id, link, height)
127        }
128    }
129
130    /// Side effects after sending response
131    ///
132    /// This is called AFTER we've sent the pong back to the peer.
133    /// Use this to trigger background operations without blocking the response.
134    async fn handle_message_side_effect<L: BucketLogProvider>(
135        peer: &Peer<L>,
136        sender_node_id: &PublicKey,
137        ping: &PingMessage,
138        pong: &PingReply,
139    ) -> Result<()>
140    where
141        L::Error: std::error::Error + Send + Sync + 'static,
142    {
143        match &pong.status {
144            PingReplyStatus::Behind(_, our_height) => {
145                // We told them we're behind, so we should dispatch a sync job
146                tracing::info!(
147                    "We're behind peer for bucket {} (our height: {}, their height: {}), dispatching sync job",
148                    ping.bucket_id,
149                    our_height,
150                    ping.height
151                );
152
153                // Dispatch sync job to background worker
154                use crate::peer::sync::{SyncBucketJob, SyncJob, SyncTarget};
155                if let Err(e) = peer
156                    .dispatch(SyncJob::SyncBucket(SyncBucketJob {
157                        bucket_id: ping.bucket_id,
158                        target: SyncTarget {
159                            link: ping.link.clone(),
160                            height: ping.height,
161                            peer_id: *sender_node_id,
162                        },
163                    }))
164                    .await
165                {
166                    tracing::error!("Failed to dispatch sync job: {}", e);
167                }
168            }
169            PingReplyStatus::Ahead(_, our_height) => {
170                // We told them we're ahead, they might fetch from us
171                tracing::debug!(
172                    "We're ahead of peer for bucket {} (our height: {}, their height: {})",
173                    ping.bucket_id,
174                    our_height,
175                    ping.height
176                );
177                // Nothing to do - they'll fetch from us if they want
178            }
179            PingReplyStatus::InSync => {
180                tracing::debug!("In sync with peer for bucket {}", ping.bucket_id);
181                // All good, nothing to do
182            }
183            PingReplyStatus::NotFound => {
184                tracing::debug!(
185                    "We don't have bucket {} that peer is asking about",
186                    ping.bucket_id
187                );
188                // TODO (amiller68): there should probably be a share message instead
189                //  of this
190                // Dispatch sync job to background worker
191                use crate::peer::sync::{SyncBucketJob, SyncJob, SyncTarget};
192                if let Err(e) = peer
193                    .dispatch(SyncJob::SyncBucket(SyncBucketJob {
194                        bucket_id: ping.bucket_id,
195                        target: SyncTarget {
196                            link: ping.link.clone(),
197                            height: ping.height,
198                            peer_id: *sender_node_id,
199                        },
200                    }))
201                    .await
202                {
203                    tracing::error!("Failed to dispatch sync job: {}", e);
204                }
205            }
206        }
207        Ok(())
208    }
209
210    // ========================================
211    // INITIATOR SIDE: When we receive a pong
212    // ========================================
213
214    /// Handle pong response: decide what to do based on sync status
215    async fn handle_reply<L: BucketLogProvider>(
216        peer: &Peer<L>,
217        recipient_node_id: &PublicKey,
218        pong: &PingReply,
219    ) -> Result<()>
220    where
221        L::Error: std::error::Error + Send + Sync + 'static,
222    {
223        match &pong.status {
224            PingReplyStatus::NotFound => {
225                tracing::info!(
226                    "Remote peer {} doesn't have bucket {}",
227                    recipient_node_id.to_hex(),
228                    pong.bucket_id
229                );
230                // The peer should attemp to fetch from us after this
231            }
232            PingReplyStatus::Ahead(link, height) => {
233                // Remote peer is ahead, dispatch a sync job
234                tracing::info!(
235                    "Remote peer {} is ahead for bucket {} at height {} with link {:?}, dispatching sync job",
236                    recipient_node_id.to_hex(),
237                    pong.bucket_id,
238                    height,
239                    link
240                );
241
242                // Dispatch sync job to background worker
243                use crate::peer::sync::{SyncBucketJob, SyncJob, SyncTarget};
244                if let Err(e) = peer
245                    .dispatch(SyncJob::SyncBucket(SyncBucketJob {
246                        bucket_id: pong.bucket_id,
247                        target: SyncTarget {
248                            link: link.clone(),
249                            height: *height,
250                            peer_id: *recipient_node_id,
251                        },
252                    }))
253                    .await
254                {
255                    tracing::error!("Failed to dispatch sync job: {}", e);
256                }
257            }
258            PingReplyStatus::Behind(link, height) => {
259                tracing::info!(
260                    "Remote peer {} is behind for bucket {} at height {} with link {:?}",
261                    recipient_node_id.to_hex(),
262                    pong.bucket_id,
263                    height,
264                    link
265                );
266                // Remote peer is behind, they might fetch from us
267                // Nothing to do on our side
268            }
269            PingReplyStatus::InSync => {
270                tracing::info!(
271                    "In sync with peer {} for bucket {}",
272                    recipient_node_id.to_hex(),
273                    pong.bucket_id
274                );
275                // All good
276            }
277        }
278        Ok(())
279    }
280}