common/peer/protocol/messages/ping.rs
1use anyhow::Result;
2use serde::{Deserialize, Serialize};
3use uuid::Uuid;
4
5use crate::bucket_log::BucketLogProvider;
6use crate::crypto::PublicKey;
7use crate::linked_data::Link;
8use crate::mount::Manifest;
9use crate::peer::protocol::bidirectional::BidirectionalHandler;
10use crate::peer::protocol::messages::Message;
11use crate::peer::Peer;
12
13/// Request to ping a peer and check bucket sync status
14#[derive(Debug, Clone, Serialize, Deserialize)]
15pub struct PingMessage {
16 /// The bucket ID to check
17 pub bucket_id: Uuid,
18 /// The current link the requesting peer has for this bucket
19 pub link: Link,
20 /// The height of the link we are responding to
21 pub height: u64,
22}
23
24/// Sync status between two peers for a bucket
25#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
26pub enum PingReplyStatus {
27 /// The peer does not have this bucket at all
28 NotFound,
29 /// We are ahead of the current peer's history,
30 /// report where we are
31 Ahead(Link, u64),
32 /// We are behind, report where we are
33 Behind(Link, u64),
34 /// Both agree on the current link (in sync)
35 InSync,
36}
37
38/// Response to a ping request
39#[derive(Debug, Clone, Serialize, Deserialize)]
40pub struct PingReply {
41 /// The bucket ID being responded to
42 pub bucket_id: Uuid,
43 /// The sync status
44 pub status: PingReplyStatus,
45}
46
47impl PingReply {
48 /// Create a new pong message indicating bucket not found
49 pub fn not_found(bucket_id: Uuid) -> Self {
50 Self {
51 bucket_id,
52 status: PingReplyStatus::NotFound,
53 }
54 }
55
56 /// Create a new pong message indicating we are ahead
57 pub fn ahead(bucket_id: Uuid, link: Link, height: u64) -> Self {
58 Self {
59 bucket_id,
60 status: PingReplyStatus::Ahead(link, height),
61 }
62 }
63
64 /// Create a new pong message indicating we are behind
65 pub fn behind(bucket_id: Uuid, link: Link, height: u64) -> Self {
66 Self {
67 bucket_id,
68 status: PingReplyStatus::Behind(link, height),
69 }
70 }
71
72 /// Create a new pong message indicating we are in sync
73 pub fn in_sync(bucket_id: Uuid) -> Self {
74 Self {
75 bucket_id,
76 status: PingReplyStatus::InSync,
77 }
78 }
79}
80
81/// Ping handler implementing the bidirectional handler pattern
82///
83/// This demonstrates the complete protocol flow in one place:
84/// - Responder: what to send back + side effects after sending
85/// - Initiator: what to do with the response
86pub struct Ping;
87
88impl BidirectionalHandler for Ping {
89 type Message = PingMessage;
90 type Reply = PingReply;
91
92 /// Wrap the request in the Message enum for proper serialization
93 fn wrap_request(request: Self::Message) -> Message {
94 Message::Ping(request)
95 }
96
97 // ========================================
98 // RESPONDER SIDE: When we receive a ping
99 // ========================================
100
101 /// Generate response: compare our state with peer's state
102 async fn handle_message<L: BucketLogProvider>(
103 peer: &Peer<L>,
104 _sender_node_id: &PublicKey,
105 ping: &PingMessage,
106 ) -> PingReply {
107 let logs = peer.logs();
108 let bucket_id = ping.bucket_id;
109
110 // Try to get our head for this bucket
111 let (link, height) = match logs.head(bucket_id, None).await {
112 Ok((link, height)) => (link, height),
113 Err(_) => {
114 // We don't have this bucket, return NotFound
115 return PingReply::not_found(bucket_id);
116 }
117 };
118
119 // Compare heights and determine sync status
120 if height < ping.height {
121 PingReply::behind(bucket_id, link, height)
122 } else if height == ping.height {
123 // At same height, we're in sync
124 PingReply::in_sync(bucket_id)
125 } else {
126 // We're ahead of the remote peer
127 PingReply::ahead(bucket_id, link, height)
128 }
129 }
130
131 /// Side effects after sending response
132 ///
133 /// This is called AFTER we've sent the pong back to the peer.
134 /// Use this to trigger background operations without blocking the response.
135 async fn handle_message_side_effect<L: BucketLogProvider>(
136 peer: &Peer<L>,
137 sender_node_id: &PublicKey,
138 ping: &PingMessage,
139 pong: &PingReply,
140 ) -> Result<()>
141 where
142 L::Error: std::error::Error + Send + Sync + 'static,
143 {
144 match &pong.status {
145 PingReplyStatus::Behind(our_link, our_height) => {
146 // We told them we're behind, so we should dispatch a sync job
147 tracing::info!(
148 "We're behind peer for bucket {} (our height: {}, their height: {}), dispatching sync job",
149 ping.bucket_id,
150 our_height,
151 ping.height
152 );
153
154 // Load our manifest to get all peer IDs from shares
155 let peer_ids = match peer.blobs().get_cbor::<Manifest>(&our_link.hash()).await {
156 Ok(manifest) => manifest.get_peer_ids(),
157 Err(e) => {
158 tracing::warn!(
159 "Failed to load manifest for peer list, using sender only: {}",
160 e
161 );
162 vec![*sender_node_id]
163 }
164 };
165
166 // Dispatch sync job to background worker
167 use crate::peer::sync::{SyncBucketJob, SyncJob, SyncTarget};
168 if let Err(e) = peer
169 .dispatch(SyncJob::SyncBucket(SyncBucketJob {
170 bucket_id: ping.bucket_id,
171 target: SyncTarget {
172 link: ping.link.clone(),
173 height: ping.height,
174 peer_ids,
175 },
176 }))
177 .await
178 {
179 tracing::error!("Failed to dispatch sync job: {}", e);
180 }
181 }
182 PingReplyStatus::Ahead(_, our_height) => {
183 // We told them we're ahead, they might fetch from us
184 tracing::debug!(
185 "We're ahead of peer for bucket {} (our height: {}, their height: {})",
186 ping.bucket_id,
187 our_height,
188 ping.height
189 );
190 // Nothing to do - they'll fetch from us if they want
191 }
192 PingReplyStatus::InSync => {
193 tracing::debug!("In sync with peer for bucket {}", ping.bucket_id);
194 // All good, nothing to do
195 }
196 PingReplyStatus::NotFound => {
197 tracing::debug!(
198 "We don't have bucket {} that peer is asking about",
199 ping.bucket_id
200 );
201 // TODO (amiller68): there should probably be a share message instead
202 // of this
203 // We don't have the bucket locally, so we can't get peer list from our manifest.
204 // Use only the sender for now; once we sync we'll have the full peer list.
205 let peer_ids = vec![*sender_node_id];
206
207 // Dispatch sync job to background worker
208 use crate::peer::sync::{SyncBucketJob, SyncJob, SyncTarget};
209 if let Err(e) = peer
210 .dispatch(SyncJob::SyncBucket(SyncBucketJob {
211 bucket_id: ping.bucket_id,
212 target: SyncTarget {
213 link: ping.link.clone(),
214 height: ping.height,
215 peer_ids,
216 },
217 }))
218 .await
219 {
220 tracing::error!("Failed to dispatch sync job: {}", e);
221 }
222 }
223 }
224 Ok(())
225 }
226
227 // ========================================
228 // INITIATOR SIDE: When we receive a pong
229 // ========================================
230
231 /// Handle pong response: decide what to do based on sync status
232 async fn handle_reply<L: BucketLogProvider>(
233 peer: &Peer<L>,
234 recipient_node_id: &PublicKey,
235 pong: &PingReply,
236 ) -> Result<()>
237 where
238 L::Error: std::error::Error + Send + Sync + 'static,
239 {
240 match &pong.status {
241 PingReplyStatus::NotFound => {
242 tracing::info!(
243 "Remote peer {} doesn't have bucket {}",
244 recipient_node_id.to_hex(),
245 pong.bucket_id
246 );
247 // The peer should attemp to fetch from us after this
248 }
249 PingReplyStatus::Ahead(link, height) => {
250 // Remote peer is ahead, dispatch a sync job
251 tracing::info!(
252 "Remote peer {} is ahead for bucket {} at height {} with link {:?}, dispatching sync job",
253 recipient_node_id.to_hex(),
254 pong.bucket_id,
255 height,
256 link
257 );
258
259 // Load our manifest to get all peer IDs from shares
260 let peer_ids = match peer.logs().head(pong.bucket_id, None).await {
261 Ok((our_link, _)) => {
262 match peer.blobs().get_cbor::<Manifest>(&our_link.hash()).await {
263 Ok(manifest) => manifest.get_peer_ids(),
264 Err(e) => {
265 tracing::warn!(
266 "Failed to load manifest for peer list, using recipient only: {}",
267 e
268 );
269 vec![*recipient_node_id]
270 }
271 }
272 }
273 Err(e) => {
274 tracing::warn!(
275 "Failed to get head for peer list, using recipient only: {}",
276 e
277 );
278 vec![*recipient_node_id]
279 }
280 };
281
282 // Dispatch sync job to background worker
283 use crate::peer::sync::{SyncBucketJob, SyncJob, SyncTarget};
284 if let Err(e) = peer
285 .dispatch(SyncJob::SyncBucket(SyncBucketJob {
286 bucket_id: pong.bucket_id,
287 target: SyncTarget {
288 link: link.clone(),
289 height: *height,
290 peer_ids,
291 },
292 }))
293 .await
294 {
295 tracing::error!("Failed to dispatch sync job: {}", e);
296 }
297 }
298 PingReplyStatus::Behind(link, height) => {
299 tracing::info!(
300 "Remote peer {} is behind for bucket {} at height {} with link {:?}",
301 recipient_node_id.to_hex(),
302 pong.bucket_id,
303 height,
304 link
305 );
306 // Remote peer is behind, they might fetch from us
307 // Nothing to do on our side
308 }
309 PingReplyStatus::InSync => {
310 tracing::info!(
311 "In sync with peer {} for bucket {}",
312 recipient_node_id.to_hex(),
313 pong.bucket_id
314 );
315 // All good
316 }
317 }
318 Ok(())
319 }
320}