common/peer/protocol/messages/ping.rs
1use anyhow::Result;
2use serde::{Deserialize, Serialize};
3use uuid::Uuid;
4
5use crate::bucket_log::BucketLogProvider;
6use crate::crypto::PublicKey;
7use crate::linked_data::Link;
8use crate::mount::Manifest;
9use crate::peer::protocol::bidirectional::BidirectionalHandler;
10use crate::peer::protocol::messages::Message;
11use crate::peer::Peer;
12
13/// Request to ping a peer and check bucket sync status
14#[derive(Debug, Clone, Serialize, Deserialize)]
15pub struct PingMessage {
16 /// The bucket ID to check
17 pub bucket_id: Uuid,
18 /// The current link the requesting peer has for this bucket
19 pub link: Link,
20 /// The height of the link we are responding to
21 pub height: u64,
22}
23
24/// Sync status between two peers for a bucket
25#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
26pub enum PingReplyStatus {
27 /// The peer does not have this bucket at all
28 NotFound,
29 /// We are ahead of the current peer's history,
30 /// report where we are
31 Ahead(Link, u64),
32 /// We are behind, report where we are
33 Behind(Link, u64),
34 /// Both agree on the current link (in sync)
35 InSync,
36}
37
38/// Response to a ping request
39#[derive(Debug, Clone, Serialize, Deserialize)]
40pub struct PingReply {
41 /// The bucket ID being responded to
42 pub bucket_id: Uuid,
43 /// The sync status
44 pub status: PingReplyStatus,
45}
46
47impl PingReply {
48 /// Create a new pong message indicating bucket not found
49 pub fn not_found(bucket_id: Uuid) -> Self {
50 Self {
51 bucket_id,
52 status: PingReplyStatus::NotFound,
53 }
54 }
55
56 /// Create a new pong message indicating we are ahead
57 pub fn ahead(bucket_id: Uuid, link: Link, height: u64) -> Self {
58 Self {
59 bucket_id,
60 status: PingReplyStatus::Ahead(link, height),
61 }
62 }
63
64 /// Create a new pong message indicating we are behind
65 pub fn behind(bucket_id: Uuid, link: Link, height: u64) -> Self {
66 Self {
67 bucket_id,
68 status: PingReplyStatus::Behind(link, height),
69 }
70 }
71
72 /// Create a new pong message indicating we are in sync
73 pub fn in_sync(bucket_id: Uuid) -> Self {
74 Self {
75 bucket_id,
76 status: PingReplyStatus::InSync,
77 }
78 }
79}
80
81/// Ping handler implementing the bidirectional handler pattern
82///
83/// This demonstrates the complete protocol flow in one place:
84/// - Responder: what to send back + side effects after sending
85/// - Initiator: what to do with the response
86pub struct Ping;
87
88impl BidirectionalHandler for Ping {
89 type Message = PingMessage;
90 type Reply = PingReply;
91
92 /// Wrap the request in the Message enum for proper serialization
93 fn wrap_request(request: Self::Message) -> Message {
94 Message::Ping(request)
95 }
96
97 // ========================================
98 // RESPONDER SIDE: When we receive a ping
99 // ========================================
100
101 /// Generate response: compare our state with peer's state
102 async fn handle_message<L: BucketLogProvider>(
103 peer: &Peer<L>,
104 _sender_node_id: &PublicKey,
105 ping: &PingMessage,
106 ) -> PingReply {
107 let logs = peer.logs();
108 let bucket_id = ping.bucket_id;
109
110 // Try to get our head for this bucket
111 let (link, height) = match logs.head(bucket_id, None).await {
112 Ok((link, height)) => (link, height),
113 Err(_) => {
114 // We don't have this bucket, return NotFound
115 return PingReply::not_found(bucket_id);
116 }
117 };
118
119 // Compare heights and determine sync status
120 if height < ping.height {
121 PingReply::behind(bucket_id, link, height)
122 } else if height == ping.height {
123 // At same height, we're in sync
124 PingReply::in_sync(bucket_id)
125 } else {
126 // We're ahead of the remote peer
127 PingReply::ahead(bucket_id, link, height)
128 }
129 }
130
131 /// Side effects after sending response
132 ///
133 /// This is called AFTER we've sent the pong back to the peer.
134 /// Use this to trigger background operations without blocking the response.
135 async fn handle_message_side_effect<L: BucketLogProvider>(
136 peer: &Peer<L>,
137 sender_node_id: &PublicKey,
138 ping: &PingMessage,
139 pong: &PingReply,
140 ) -> Result<()>
141 where
142 L::Error: std::error::Error + Send + Sync + 'static,
143 {
144 // Check if this bucket should be synced
145 let should_sync = peer
146 .logs()
147 .should_sync_content(ping.bucket_id)
148 .await
149 .unwrap_or(true);
150
151 match &pong.status {
152 PingReplyStatus::Behind(our_link, our_height) => {
153 if !should_sync {
154 tracing::debug!("Skipping sync for bucket {} (not active)", ping.bucket_id);
155 return Ok(());
156 }
157
158 // We told them we're behind, so we should dispatch a sync job
159 tracing::info!(
160 "We're behind peer for bucket {} (our height: {}, their height: {}), dispatching sync job",
161 ping.bucket_id,
162 our_height,
163 ping.height
164 );
165
166 // Load our manifest to get all peer IDs from shares
167 let peer_ids = match peer.blobs().get_cbor::<Manifest>(&our_link.hash()).await {
168 Ok(manifest) => manifest.get_peer_ids(),
169 Err(e) => {
170 tracing::warn!(
171 "Failed to load manifest for peer list, using sender only: {}",
172 e
173 );
174 vec![*sender_node_id]
175 }
176 };
177
178 // Dispatch sync job to background worker
179 use crate::peer::sync::{SyncBucketJob, SyncJob, SyncTarget};
180 if let Err(e) = peer
181 .dispatch(SyncJob::SyncBucket(SyncBucketJob {
182 bucket_id: ping.bucket_id,
183 target: SyncTarget {
184 link: ping.link.clone(),
185 height: ping.height,
186 peer_ids,
187 },
188 }))
189 .await
190 {
191 tracing::error!("Failed to dispatch sync job: {}", e);
192 }
193 }
194 PingReplyStatus::Ahead(_, our_height) => {
195 // We told them we're ahead, they might fetch from us
196 tracing::debug!(
197 "We're ahead of peer for bucket {} (our height: {}, their height: {})",
198 ping.bucket_id,
199 our_height,
200 ping.height
201 );
202 // Nothing to do - they'll fetch from us if they want
203 }
204 PingReplyStatus::InSync => {
205 tracing::debug!("In sync with peer for bucket {}", ping.bucket_id);
206 // All good, nothing to do
207 }
208 PingReplyStatus::NotFound => {
209 tracing::debug!(
210 "We don't have bucket {} that peer is asking about",
211 ping.bucket_id
212 );
213 // We don't have the bucket locally, so we can't get peer list from our manifest.
214 // Use only the sender for now; once we sync we'll have the full peer list.
215 let peer_ids = vec![*sender_node_id];
216
217 // Dispatch sync job to background worker
218 // The sync_bucket::execute will call on_new_bucket_discovered
219 // to set the bucket to pending status
220 use crate::peer::sync::{SyncBucketJob, SyncJob, SyncTarget};
221 if let Err(e) = peer
222 .dispatch(SyncJob::SyncBucket(SyncBucketJob {
223 bucket_id: ping.bucket_id,
224 target: SyncTarget {
225 link: ping.link.clone(),
226 height: ping.height,
227 peer_ids,
228 },
229 }))
230 .await
231 {
232 tracing::error!("Failed to dispatch sync job: {}", e);
233 }
234 }
235 }
236 Ok(())
237 }
238
239 // ========================================
240 // INITIATOR SIDE: When we receive a pong
241 // ========================================
242
243 /// Handle pong response: decide what to do based on sync status
244 async fn handle_reply<L: BucketLogProvider>(
245 peer: &Peer<L>,
246 recipient_node_id: &PublicKey,
247 pong: &PingReply,
248 ) -> Result<()>
249 where
250 L::Error: std::error::Error + Send + Sync + 'static,
251 {
252 match &pong.status {
253 PingReplyStatus::NotFound => {
254 tracing::info!(
255 "Remote peer {} doesn't have bucket {}",
256 recipient_node_id.to_hex(),
257 pong.bucket_id
258 );
259 // The peer should attemp to fetch from us after this
260 }
261 PingReplyStatus::Ahead(link, height) => {
262 // Remote peer is ahead, dispatch a sync job
263 tracing::info!(
264 "Remote peer {} is ahead for bucket {} at height {} with link {:?}, dispatching sync job",
265 recipient_node_id.to_hex(),
266 pong.bucket_id,
267 height,
268 link
269 );
270
271 // Load our manifest to get all peer IDs from shares
272 let peer_ids = match peer.logs().head(pong.bucket_id, None).await {
273 Ok((our_link, _)) => {
274 match peer.blobs().get_cbor::<Manifest>(&our_link.hash()).await {
275 Ok(manifest) => manifest.get_peer_ids(),
276 Err(e) => {
277 tracing::warn!(
278 "Failed to load manifest for peer list, using recipient only: {}",
279 e
280 );
281 vec![*recipient_node_id]
282 }
283 }
284 }
285 Err(e) => {
286 tracing::warn!(
287 "Failed to get head for peer list, using recipient only: {}",
288 e
289 );
290 vec![*recipient_node_id]
291 }
292 };
293
294 // Dispatch sync job to background worker
295 use crate::peer::sync::{SyncBucketJob, SyncJob, SyncTarget};
296 if let Err(e) = peer
297 .dispatch(SyncJob::SyncBucket(SyncBucketJob {
298 bucket_id: pong.bucket_id,
299 target: SyncTarget {
300 link: link.clone(),
301 height: *height,
302 peer_ids,
303 },
304 }))
305 .await
306 {
307 tracing::error!("Failed to dispatch sync job: {}", e);
308 }
309 }
310 PingReplyStatus::Behind(link, height) => {
311 tracing::info!(
312 "Remote peer {} is behind for bucket {} at height {} with link {:?}",
313 recipient_node_id.to_hex(),
314 pong.bucket_id,
315 height,
316 link
317 );
318 // Remote peer is behind, they might fetch from us
319 // Nothing to do on our side
320 }
321 PingReplyStatus::InSync => {
322 tracing::info!(
323 "In sync with peer {} for bucket {}",
324 recipient_node_id.to_hex(),
325 pong.bucket_id
326 );
327 // All good
328 }
329 }
330 Ok(())
331 }
332}