common/peer/protocol/messages/
ping.rs1use anyhow::Result;
2use serde::{Deserialize, Serialize};
3use uuid::Uuid;
4
5use crate::bucket_log::BucketLogProvider;
6use crate::crypto::PublicKey;
7use crate::linked_data::Link;
8use crate::peer::protocol::bidirectional::BidirectionalHandler;
9use crate::peer::protocol::messages::Message;
10use crate::peer::Peer;
11
12#[derive(Debug, Clone, Serialize, Deserialize)]
14pub struct PingMessage {
15 pub bucket_id: Uuid,
17 pub link: Link,
19 pub height: u64,
21}
22
23#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
25pub enum PingReplyStatus {
26 NotFound,
28 Ahead(Link, u64),
31 Behind(Link, u64),
33 InSync,
35}
36
37#[derive(Debug, Clone, Serialize, Deserialize)]
39pub struct PingReply {
40 pub bucket_id: Uuid,
42 pub status: PingReplyStatus,
44}
45
46impl PingReply {
47 pub fn not_found(bucket_id: Uuid) -> Self {
49 Self {
50 bucket_id,
51 status: PingReplyStatus::NotFound,
52 }
53 }
54
55 pub fn ahead(bucket_id: Uuid, link: Link, height: u64) -> Self {
57 Self {
58 bucket_id,
59 status: PingReplyStatus::Ahead(link, height),
60 }
61 }
62
63 pub fn behind(bucket_id: Uuid, link: Link, height: u64) -> Self {
65 Self {
66 bucket_id,
67 status: PingReplyStatus::Behind(link, height),
68 }
69 }
70
71 pub fn in_sync(bucket_id: Uuid) -> Self {
73 Self {
74 bucket_id,
75 status: PingReplyStatus::InSync,
76 }
77 }
78}
79
80pub struct Ping;
86
87impl BidirectionalHandler for Ping {
88 type Message = PingMessage;
89 type Reply = PingReply;
90
91 fn wrap_request(request: Self::Message) -> Message {
93 Message::Ping(request)
94 }
95
96 async fn handle_message<L: BucketLogProvider>(
102 peer: &Peer<L>,
103 _sender_node_id: &PublicKey,
104 ping: &PingMessage,
105 ) -> PingReply {
106 let logs = peer.logs();
107 let bucket_id = ping.bucket_id;
108
109 let (link, height) = match logs.head(bucket_id, None).await {
111 Ok((link, height)) => (link, height),
112 Err(_) => {
113 return PingReply::not_found(bucket_id);
115 }
116 };
117
118 if height < ping.height {
120 PingReply::behind(bucket_id, link, height)
121 } else if height == ping.height {
122 PingReply::in_sync(bucket_id)
124 } else {
125 PingReply::ahead(bucket_id, link, height)
127 }
128 }
129
130 async fn handle_message_side_effect<L: BucketLogProvider>(
135 peer: &Peer<L>,
136 sender_node_id: &PublicKey,
137 ping: &PingMessage,
138 pong: &PingReply,
139 ) -> Result<()>
140 where
141 L::Error: std::error::Error + Send + Sync + 'static,
142 {
143 match &pong.status {
144 PingReplyStatus::Behind(_, our_height) => {
145 tracing::info!(
147 "We're behind peer for bucket {} (our height: {}, their height: {}), dispatching sync job",
148 ping.bucket_id,
149 our_height,
150 ping.height
151 );
152
153 use crate::peer::sync::{SyncBucketJob, SyncJob, SyncTarget};
155 if let Err(e) = peer
156 .dispatch(SyncJob::SyncBucket(SyncBucketJob {
157 bucket_id: ping.bucket_id,
158 target: SyncTarget {
159 link: ping.link.clone(),
160 height: ping.height,
161 peer_id: *sender_node_id,
162 },
163 }))
164 .await
165 {
166 tracing::error!("Failed to dispatch sync job: {}", e);
167 }
168 }
169 PingReplyStatus::Ahead(_, our_height) => {
170 tracing::debug!(
172 "We're ahead of peer for bucket {} (our height: {}, their height: {})",
173 ping.bucket_id,
174 our_height,
175 ping.height
176 );
177 }
179 PingReplyStatus::InSync => {
180 tracing::debug!("In sync with peer for bucket {}", ping.bucket_id);
181 }
183 PingReplyStatus::NotFound => {
184 tracing::debug!(
185 "We don't have bucket {} that peer is asking about",
186 ping.bucket_id
187 );
188 use crate::peer::sync::{SyncBucketJob, SyncJob, SyncTarget};
192 if let Err(e) = peer
193 .dispatch(SyncJob::SyncBucket(SyncBucketJob {
194 bucket_id: ping.bucket_id,
195 target: SyncTarget {
196 link: ping.link.clone(),
197 height: ping.height,
198 peer_id: *sender_node_id,
199 },
200 }))
201 .await
202 {
203 tracing::error!("Failed to dispatch sync job: {}", e);
204 }
205 }
206 }
207 Ok(())
208 }
209
210 async fn handle_reply<L: BucketLogProvider>(
216 peer: &Peer<L>,
217 recipient_node_id: &PublicKey,
218 pong: &PingReply,
219 ) -> Result<()>
220 where
221 L::Error: std::error::Error + Send + Sync + 'static,
222 {
223 match &pong.status {
224 PingReplyStatus::NotFound => {
225 tracing::info!(
226 "Remote peer {} doesn't have bucket {}",
227 recipient_node_id.to_hex(),
228 pong.bucket_id
229 );
230 }
232 PingReplyStatus::Ahead(link, height) => {
233 tracing::info!(
235 "Remote peer {} is ahead for bucket {} at height {} with link {:?}, dispatching sync job",
236 recipient_node_id.to_hex(),
237 pong.bucket_id,
238 height,
239 link
240 );
241
242 use crate::peer::sync::{SyncBucketJob, SyncJob, SyncTarget};
244 if let Err(e) = peer
245 .dispatch(SyncJob::SyncBucket(SyncBucketJob {
246 bucket_id: pong.bucket_id,
247 target: SyncTarget {
248 link: link.clone(),
249 height: *height,
250 peer_id: *recipient_node_id,
251 },
252 }))
253 .await
254 {
255 tracing::error!("Failed to dispatch sync job: {}", e);
256 }
257 }
258 PingReplyStatus::Behind(link, height) => {
259 tracing::info!(
260 "Remote peer {} is behind for bucket {} at height {} with link {:?}",
261 recipient_node_id.to_hex(),
262 pong.bucket_id,
263 height,
264 link
265 );
266 }
269 PingReplyStatus::InSync => {
270 tracing::info!(
271 "In sync with peer {} for bucket {}",
272 recipient_node_id.to_hex(),
273 pong.bucket_id
274 );
275 }
277 }
278 Ok(())
279 }
280}