Skip to main content

common/peer/protocol/messages/
ping.rs

1use anyhow::Result;
2use serde::{Deserialize, Serialize};
3use uuid::Uuid;
4
5use crate::bucket_log::BucketLogProvider;
6use crate::crypto::PublicKey;
7use crate::linked_data::Link;
8use crate::mount::Manifest;
9use crate::peer::protocol::bidirectional::BidirectionalHandler;
10use crate::peer::protocol::messages::Message;
11use crate::peer::Peer;
12
13/// Request to ping a peer and check bucket sync status
14#[derive(Debug, Clone, Serialize, Deserialize)]
15pub struct PingMessage {
16    /// The bucket ID to check
17    pub bucket_id: Uuid,
18    /// The current link the requesting peer has for this bucket
19    pub link: Link,
20    /// The height of the link we are responding to
21    pub height: u64,
22}
23
24/// Sync status between two peers for a bucket
25#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
26pub enum PingReplyStatus {
27    /// The peer does not have this bucket at all
28    NotFound,
29    /// We are ahead of the current peer's history,
30    ///  report where we are
31    Ahead(Link, u64),
32    /// We are behind, report where we are
33    Behind(Link, u64),
34    /// Both agree on the current link (in sync)
35    InSync,
36}
37
38/// Response to a ping request
39#[derive(Debug, Clone, Serialize, Deserialize)]
40pub struct PingReply {
41    /// The bucket ID being responded to
42    pub bucket_id: Uuid,
43    /// The sync status
44    pub status: PingReplyStatus,
45}
46
47impl PingReply {
48    /// Create a new pong message indicating bucket not found
49    pub fn not_found(bucket_id: Uuid) -> Self {
50        Self {
51            bucket_id,
52            status: PingReplyStatus::NotFound,
53        }
54    }
55
56    /// Create a new pong message indicating we are ahead
57    pub fn ahead(bucket_id: Uuid, link: Link, height: u64) -> Self {
58        Self {
59            bucket_id,
60            status: PingReplyStatus::Ahead(link, height),
61        }
62    }
63
64    /// Create a new pong message indicating we are behind
65    pub fn behind(bucket_id: Uuid, link: Link, height: u64) -> Self {
66        Self {
67            bucket_id,
68            status: PingReplyStatus::Behind(link, height),
69        }
70    }
71
72    /// Create a new pong message indicating we are in sync
73    pub fn in_sync(bucket_id: Uuid) -> Self {
74        Self {
75            bucket_id,
76            status: PingReplyStatus::InSync,
77        }
78    }
79}
80
81/// Ping handler implementing the bidirectional handler pattern
82///
83/// This demonstrates the complete protocol flow in one place:
84/// - Responder: what to send back + side effects after sending
85/// - Initiator: what to do with the response
86pub struct Ping;
87
88impl BidirectionalHandler for Ping {
89    type Message = PingMessage;
90    type Reply = PingReply;
91
92    /// Wrap the request in the Message enum for proper serialization
93    fn wrap_request(request: Self::Message) -> Message {
94        Message::Ping(request)
95    }
96
97    // ========================================
98    // RESPONDER SIDE: When we receive a ping
99    // ========================================
100
101    /// Generate response: compare our state with peer's state
102    async fn handle_message<L: BucketLogProvider>(
103        peer: &Peer<L>,
104        _sender_node_id: &PublicKey,
105        ping: &PingMessage,
106    ) -> PingReply {
107        let logs = peer.logs();
108        let bucket_id = ping.bucket_id;
109
110        // Try to get our head for this bucket
111        let (link, height) = match logs.head(bucket_id, None).await {
112            Ok((link, height)) => (link, height),
113            Err(_) => {
114                // We don't have this bucket, return NotFound
115                return PingReply::not_found(bucket_id);
116            }
117        };
118
119        // Compare heights and determine sync status
120        if height < ping.height {
121            PingReply::behind(bucket_id, link, height)
122        } else if height == ping.height {
123            // At same height, we're in sync
124            PingReply::in_sync(bucket_id)
125        } else {
126            // We're ahead of the remote peer
127            PingReply::ahead(bucket_id, link, height)
128        }
129    }
130
131    /// Side effects after sending response
132    ///
133    /// This is called AFTER we've sent the pong back to the peer.
134    /// Use this to trigger background operations without blocking the response.
135    async fn handle_message_side_effect<L: BucketLogProvider>(
136        peer: &Peer<L>,
137        sender_node_id: &PublicKey,
138        ping: &PingMessage,
139        pong: &PingReply,
140    ) -> Result<()>
141    where
142        L::Error: std::error::Error + Send + Sync + 'static,
143    {
144        match &pong.status {
145            PingReplyStatus::Behind(our_link, our_height) => {
146                // We told them we're behind, so we should dispatch a sync job
147                tracing::info!(
148                    "We're behind peer for bucket {} (our height: {}, their height: {}), dispatching sync job",
149                    ping.bucket_id,
150                    our_height,
151                    ping.height
152                );
153
154                // Load our manifest to get all peer IDs from shares
155                let peer_ids = match peer.blobs().get_cbor::<Manifest>(&our_link.hash()).await {
156                    Ok(manifest) => manifest.get_peer_ids(),
157                    Err(e) => {
158                        tracing::warn!(
159                            "Failed to load manifest for peer list, using sender only: {}",
160                            e
161                        );
162                        vec![*sender_node_id]
163                    }
164                };
165
166                // Dispatch sync job to background worker
167                use crate::peer::sync::{SyncBucketJob, SyncJob, SyncTarget};
168                if let Err(e) = peer
169                    .dispatch(SyncJob::SyncBucket(SyncBucketJob {
170                        bucket_id: ping.bucket_id,
171                        target: SyncTarget {
172                            link: ping.link.clone(),
173                            height: ping.height,
174                            peer_ids,
175                        },
176                    }))
177                    .await
178                {
179                    tracing::error!("Failed to dispatch sync job: {}", e);
180                }
181            }
182            PingReplyStatus::Ahead(_, our_height) => {
183                // We told them we're ahead, they might fetch from us
184                tracing::debug!(
185                    "We're ahead of peer for bucket {} (our height: {}, their height: {})",
186                    ping.bucket_id,
187                    our_height,
188                    ping.height
189                );
190                // Nothing to do - they'll fetch from us if they want
191            }
192            PingReplyStatus::InSync => {
193                tracing::debug!("In sync with peer for bucket {}", ping.bucket_id);
194                // All good, nothing to do
195            }
196            PingReplyStatus::NotFound => {
197                tracing::debug!(
198                    "We don't have bucket {} that peer is asking about",
199                    ping.bucket_id
200                );
201                // TODO (amiller68): there should probably be a share message instead
202                //  of this
203                // We don't have the bucket locally, so we can't get peer list from our manifest.
204                // Use only the sender for now; once we sync we'll have the full peer list.
205                let peer_ids = vec![*sender_node_id];
206
207                // Dispatch sync job to background worker
208                use crate::peer::sync::{SyncBucketJob, SyncJob, SyncTarget};
209                if let Err(e) = peer
210                    .dispatch(SyncJob::SyncBucket(SyncBucketJob {
211                        bucket_id: ping.bucket_id,
212                        target: SyncTarget {
213                            link: ping.link.clone(),
214                            height: ping.height,
215                            peer_ids,
216                        },
217                    }))
218                    .await
219                {
220                    tracing::error!("Failed to dispatch sync job: {}", e);
221                }
222            }
223        }
224        Ok(())
225    }
226
227    // ========================================
228    // INITIATOR SIDE: When we receive a pong
229    // ========================================
230
231    /// Handle pong response: decide what to do based on sync status
232    async fn handle_reply<L: BucketLogProvider>(
233        peer: &Peer<L>,
234        recipient_node_id: &PublicKey,
235        pong: &PingReply,
236    ) -> Result<()>
237    where
238        L::Error: std::error::Error + Send + Sync + 'static,
239    {
240        match &pong.status {
241            PingReplyStatus::NotFound => {
242                tracing::info!(
243                    "Remote peer {} doesn't have bucket {}",
244                    recipient_node_id.to_hex(),
245                    pong.bucket_id
246                );
247                // The peer should attemp to fetch from us after this
248            }
249            PingReplyStatus::Ahead(link, height) => {
250                // Remote peer is ahead, dispatch a sync job
251                tracing::info!(
252                    "Remote peer {} is ahead for bucket {} at height {} with link {:?}, dispatching sync job",
253                    recipient_node_id.to_hex(),
254                    pong.bucket_id,
255                    height,
256                    link
257                );
258
259                // Load our manifest to get all peer IDs from shares
260                let peer_ids = match peer.logs().head(pong.bucket_id, None).await {
261                    Ok((our_link, _)) => {
262                        match peer.blobs().get_cbor::<Manifest>(&our_link.hash()).await {
263                            Ok(manifest) => manifest.get_peer_ids(),
264                            Err(e) => {
265                                tracing::warn!(
266                                    "Failed to load manifest for peer list, using recipient only: {}",
267                                    e
268                                );
269                                vec![*recipient_node_id]
270                            }
271                        }
272                    }
273                    Err(e) => {
274                        tracing::warn!(
275                            "Failed to get head for peer list, using recipient only: {}",
276                            e
277                        );
278                        vec![*recipient_node_id]
279                    }
280                };
281
282                // Dispatch sync job to background worker
283                use crate::peer::sync::{SyncBucketJob, SyncJob, SyncTarget};
284                if let Err(e) = peer
285                    .dispatch(SyncJob::SyncBucket(SyncBucketJob {
286                        bucket_id: pong.bucket_id,
287                        target: SyncTarget {
288                            link: link.clone(),
289                            height: *height,
290                            peer_ids,
291                        },
292                    }))
293                    .await
294                {
295                    tracing::error!("Failed to dispatch sync job: {}", e);
296                }
297            }
298            PingReplyStatus::Behind(link, height) => {
299                tracing::info!(
300                    "Remote peer {} is behind for bucket {} at height {} with link {:?}",
301                    recipient_node_id.to_hex(),
302                    pong.bucket_id,
303                    height,
304                    link
305                );
306                // Remote peer is behind, they might fetch from us
307                // Nothing to do on our side
308            }
309            PingReplyStatus::InSync => {
310                tracing::info!(
311                    "In sync with peer {} for bucket {}",
312                    recipient_node_id.to_hex(),
313                    pong.bucket_id
314                );
315                // All good
316            }
317        }
318        Ok(())
319    }
320}