Skip to main content

common/peer/protocol/messages/
ping.rs

1use anyhow::Result;
2use serde::{Deserialize, Serialize};
3use uuid::Uuid;
4
5use crate::bucket_log::BucketLogProvider;
6use crate::crypto::PublicKey;
7use crate::linked_data::Link;
8use crate::mount::Manifest;
9use crate::peer::protocol::bidirectional::BidirectionalHandler;
10use crate::peer::protocol::messages::Message;
11use crate::peer::Peer;
12
13/// Request to ping a peer and check bucket sync status
14#[derive(Debug, Clone, Serialize, Deserialize)]
15pub struct PingMessage {
16    /// The bucket ID to check
17    pub bucket_id: Uuid,
18    /// The current link the requesting peer has for this bucket
19    pub link: Link,
20    /// The height of the link we are responding to
21    pub height: u64,
22}
23
24/// Sync status between two peers for a bucket
25#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
26pub enum PingReplyStatus {
27    /// The peer does not have this bucket at all
28    NotFound,
29    /// We are ahead of the current peer's history,
30    ///  report where we are
31    Ahead(Link, u64),
32    /// We are behind, report where we are
33    Behind(Link, u64),
34    /// Both agree on the current link (in sync)
35    InSync,
36}
37
38/// Response to a ping request
39#[derive(Debug, Clone, Serialize, Deserialize)]
40pub struct PingReply {
41    /// The bucket ID being responded to
42    pub bucket_id: Uuid,
43    /// The sync status
44    pub status: PingReplyStatus,
45}
46
47impl PingReply {
48    /// Create a new pong message indicating bucket not found
49    pub fn not_found(bucket_id: Uuid) -> Self {
50        Self {
51            bucket_id,
52            status: PingReplyStatus::NotFound,
53        }
54    }
55
56    /// Create a new pong message indicating we are ahead
57    pub fn ahead(bucket_id: Uuid, link: Link, height: u64) -> Self {
58        Self {
59            bucket_id,
60            status: PingReplyStatus::Ahead(link, height),
61        }
62    }
63
64    /// Create a new pong message indicating we are behind
65    pub fn behind(bucket_id: Uuid, link: Link, height: u64) -> Self {
66        Self {
67            bucket_id,
68            status: PingReplyStatus::Behind(link, height),
69        }
70    }
71
72    /// Create a new pong message indicating we are in sync
73    pub fn in_sync(bucket_id: Uuid) -> Self {
74        Self {
75            bucket_id,
76            status: PingReplyStatus::InSync,
77        }
78    }
79}
80
81/// Ping handler implementing the bidirectional handler pattern
82///
83/// This demonstrates the complete protocol flow in one place:
84/// - Responder: what to send back + side effects after sending
85/// - Initiator: what to do with the response
86pub struct Ping;
87
88impl BidirectionalHandler for Ping {
89    type Message = PingMessage;
90    type Reply = PingReply;
91
92    /// Wrap the request in the Message enum for proper serialization
93    fn wrap_request(request: Self::Message) -> Message {
94        Message::Ping(request)
95    }
96
97    // ========================================
98    // RESPONDER SIDE: When we receive a ping
99    // ========================================
100
101    /// Generate response: compare our state with peer's state
102    async fn handle_message<L: BucketLogProvider>(
103        peer: &Peer<L>,
104        _sender_node_id: &PublicKey,
105        ping: &PingMessage,
106    ) -> PingReply {
107        let logs = peer.logs();
108        let bucket_id = ping.bucket_id;
109
110        // Try to get our head for this bucket
111        let (link, height) = match logs.head(bucket_id, None).await {
112            Ok((link, height)) => (link, height),
113            Err(_) => {
114                // We don't have this bucket, return NotFound
115                return PingReply::not_found(bucket_id);
116            }
117        };
118
119        // Compare heights and determine sync status
120        if height < ping.height {
121            PingReply::behind(bucket_id, link, height)
122        } else if height == ping.height {
123            // At same height, we're in sync
124            PingReply::in_sync(bucket_id)
125        } else {
126            // We're ahead of the remote peer
127            PingReply::ahead(bucket_id, link, height)
128        }
129    }
130
131    /// Side effects after sending response
132    ///
133    /// This is called AFTER we've sent the pong back to the peer.
134    /// Use this to trigger background operations without blocking the response.
135    async fn handle_message_side_effect<L: BucketLogProvider>(
136        peer: &Peer<L>,
137        sender_node_id: &PublicKey,
138        ping: &PingMessage,
139        pong: &PingReply,
140    ) -> Result<()>
141    where
142        L::Error: std::error::Error + Send + Sync + 'static,
143    {
144        // Check if this bucket should be synced
145        let should_sync = peer
146            .logs()
147            .should_sync_content(ping.bucket_id)
148            .await
149            .unwrap_or(true);
150
151        match &pong.status {
152            PingReplyStatus::Behind(our_link, our_height) => {
153                if !should_sync {
154                    tracing::debug!("Skipping sync for bucket {} (not active)", ping.bucket_id);
155                    return Ok(());
156                }
157
158                // We told them we're behind, so we should dispatch a sync job
159                tracing::info!(
160                    "We're behind peer for bucket {} (our height: {}, their height: {}), dispatching sync job",
161                    ping.bucket_id,
162                    our_height,
163                    ping.height
164                );
165
166                // Load our manifest to get all peer IDs from shares
167                let peer_ids = match peer.blobs().get_cbor::<Manifest>(&our_link.hash()).await {
168                    Ok(manifest) => manifest.get_peer_ids(),
169                    Err(e) => {
170                        tracing::warn!(
171                            "Failed to load manifest for peer list, using sender only: {}",
172                            e
173                        );
174                        vec![*sender_node_id]
175                    }
176                };
177
178                // Dispatch sync job to background worker
179                use crate::peer::sync::{SyncBucketJob, SyncJob, SyncTarget};
180                if let Err(e) = peer
181                    .dispatch(SyncJob::SyncBucket(SyncBucketJob {
182                        bucket_id: ping.bucket_id,
183                        target: SyncTarget {
184                            link: ping.link.clone(),
185                            height: ping.height,
186                            peer_ids,
187                        },
188                    }))
189                    .await
190                {
191                    tracing::error!("Failed to dispatch sync job: {}", e);
192                }
193            }
194            PingReplyStatus::Ahead(_, our_height) => {
195                // We told them we're ahead, they might fetch from us
196                tracing::debug!(
197                    "We're ahead of peer for bucket {} (our height: {}, their height: {})",
198                    ping.bucket_id,
199                    our_height,
200                    ping.height
201                );
202                // Nothing to do - they'll fetch from us if they want
203            }
204            PingReplyStatus::InSync => {
205                tracing::debug!("In sync with peer for bucket {}", ping.bucket_id);
206                // All good, nothing to do
207            }
208            PingReplyStatus::NotFound => {
209                tracing::debug!(
210                    "We don't have bucket {} that peer is asking about",
211                    ping.bucket_id
212                );
213                // We don't have the bucket locally, so we can't get peer list from our manifest.
214                // Use only the sender for now; once we sync we'll have the full peer list.
215                let peer_ids = vec![*sender_node_id];
216
217                // Dispatch sync job to background worker
218                // The sync_bucket::execute will call on_new_bucket_discovered
219                // to set the bucket to pending status
220                use crate::peer::sync::{SyncBucketJob, SyncJob, SyncTarget};
221                if let Err(e) = peer
222                    .dispatch(SyncJob::SyncBucket(SyncBucketJob {
223                        bucket_id: ping.bucket_id,
224                        target: SyncTarget {
225                            link: ping.link.clone(),
226                            height: ping.height,
227                            peer_ids,
228                        },
229                    }))
230                    .await
231                {
232                    tracing::error!("Failed to dispatch sync job: {}", e);
233                }
234            }
235        }
236        Ok(())
237    }
238
239    // ========================================
240    // INITIATOR SIDE: When we receive a pong
241    // ========================================
242
243    /// Handle pong response: decide what to do based on sync status
244    async fn handle_reply<L: BucketLogProvider>(
245        peer: &Peer<L>,
246        recipient_node_id: &PublicKey,
247        pong: &PingReply,
248    ) -> Result<()>
249    where
250        L::Error: std::error::Error + Send + Sync + 'static,
251    {
252        match &pong.status {
253            PingReplyStatus::NotFound => {
254                tracing::info!(
255                    "Remote peer {} doesn't have bucket {}",
256                    recipient_node_id.to_hex(),
257                    pong.bucket_id
258                );
259                // The peer should attemp to fetch from us after this
260            }
261            PingReplyStatus::Ahead(link, height) => {
262                // Remote peer is ahead, dispatch a sync job
263                tracing::info!(
264                    "Remote peer {} is ahead for bucket {} at height {} with link {:?}, dispatching sync job",
265                    recipient_node_id.to_hex(),
266                    pong.bucket_id,
267                    height,
268                    link
269                );
270
271                // Load our manifest to get all peer IDs from shares
272                let peer_ids = match peer.logs().head(pong.bucket_id, None).await {
273                    Ok((our_link, _)) => {
274                        match peer.blobs().get_cbor::<Manifest>(&our_link.hash()).await {
275                            Ok(manifest) => manifest.get_peer_ids(),
276                            Err(e) => {
277                                tracing::warn!(
278                                    "Failed to load manifest for peer list, using recipient only: {}",
279                                    e
280                                );
281                                vec![*recipient_node_id]
282                            }
283                        }
284                    }
285                    Err(e) => {
286                        tracing::warn!(
287                            "Failed to get head for peer list, using recipient only: {}",
288                            e
289                        );
290                        vec![*recipient_node_id]
291                    }
292                };
293
294                // Dispatch sync job to background worker
295                use crate::peer::sync::{SyncBucketJob, SyncJob, SyncTarget};
296                if let Err(e) = peer
297                    .dispatch(SyncJob::SyncBucket(SyncBucketJob {
298                        bucket_id: pong.bucket_id,
299                        target: SyncTarget {
300                            link: link.clone(),
301                            height: *height,
302                            peer_ids,
303                        },
304                    }))
305                    .await
306                {
307                    tracing::error!("Failed to dispatch sync job: {}", e);
308                }
309            }
310            PingReplyStatus::Behind(link, height) => {
311                tracing::info!(
312                    "Remote peer {} is behind for bucket {} at height {} with link {:?}",
313                    recipient_node_id.to_hex(),
314                    pong.bucket_id,
315                    height,
316                    link
317                );
318                // Remote peer is behind, they might fetch from us
319                // Nothing to do on our side
320            }
321            PingReplyStatus::InSync => {
322                tracing::info!(
323                    "In sync with peer {} for bucket {}",
324                    recipient_node_id.to_hex(),
325                    pong.bucket_id
326                );
327                // All good
328            }
329        }
330        Ok(())
331    }
332}