blueprint_networking/blueprint_protocol/
handler.rs1use std::time::{Duration, Instant};
2
3use alloy_primitives::Address;
4use blueprint_core::{debug, warn};
5use blueprint_crypto::{BytesEncoding, KeyType, hashing::keccak_256};
6use libp2p::{PeerId, request_response};
7
8use crate::blueprint_protocol::HandshakeMessage;
9use crate::discovery::peers::VerificationIdentifierKey;
10use crate::types::ProtocolMessage;
11
12use super::{BlueprintProtocolBehaviour, InstanceMessageRequest, InstanceMessageResponse};
13
14const INBOUND_HANDSHAKE_TIMEOUT: Duration = Duration::from_secs(30);
15const OUTBOUND_HANDSHAKE_TIMEOUT: Duration = Duration::from_secs(30);
16
17impl<K: KeyType> BlueprintProtocolBehaviour<K> {
18 pub fn handle_request_response_event(
19 &mut self,
20 event: request_response::Event<InstanceMessageRequest<K>, InstanceMessageResponse<K>>,
21 ) {
22 match event {
23 request_response::Event::Message {
24 peer,
25 message:
26 request_response::Message::Request {
27 request:
28 InstanceMessageRequest::Handshake {
29 verification_id_key,
30 signature,
31 msg,
32 },
33 channel,
34 ..
35 },
36 ..
37 } => {
38 debug!(%peer, "Received handshake request");
39
40 if self.outbound_handshakes.contains_key(&peer) {
42 debug!(%peer, "Responding to inbound handshake request while outbound is pending");
45 }
46
47 if !self.peer_manager.is_key_whitelisted(&verification_id_key) {
48 warn!(
50 "Received handshake response from unwhitelisted peer {:?} with key {:?}",
51 peer, verification_id_key
52 );
53 self.peer_manager.handle_nonwhitelisted_peer(&peer);
54 return;
55 }
56
57 match self.verify_handshake(&msg, &verification_id_key, &signature) {
59 Ok(()) => {
60 self.inbound_handshakes.insert(peer, Instant::now());
62 self.peer_manager
63 .link_peer_id_to_verification_id_key(&peer, &verification_id_key);
64
65 let mut key_pair = self.instance_key_pair.clone();
67 let public_key = K::public_from_secret(&key_pair);
68 let self_verification_id_key =
69 if self.use_address_for_handshake_verification {
70 let pre_truncation = keccak_256(public_key.to_bytes().as_ref());
71 VerificationIdentifierKey::EvmAddress(Address::from_slice(
72 &pre_truncation[12..],
73 ))
74 } else {
75 VerificationIdentifierKey::InstancePublicKey(public_key)
76 };
77
78 let handshake_msg = HandshakeMessage::new(self.local_peer_id);
79 let Some(signature) =
80 self.sign_handshake(&mut key_pair, &peer, &handshake_msg)
81 else {
82 return;
83 };
84
85 let response = InstanceMessageResponse::Handshake {
86 verification_id_key: self_verification_id_key,
87 signature,
88 msg: handshake_msg,
89 };
90
91 if let Err(e) = self.send_response(channel, response) {
92 warn!(%peer, "Failed to send handshake response: {:?}", e);
93 return;
94 }
95 }
96 Err(e) => {
97 warn!(%peer, "Invalid handshake request: {:?}", e);
98 let response = InstanceMessageResponse::Error {
99 code: 400,
100 message: format!("Invalid handshake: {:?}", e),
101 };
102 if let Err(e) = self.send_response(channel, response) {
103 warn!(%peer, "Failed to send error response: {:?}", e);
104 }
105 }
106 }
107 }
108 request_response::Event::Message {
109 peer,
110 message:
111 request_response::Message::Response {
112 response:
113 InstanceMessageResponse::Handshake {
114 verification_id_key,
115 signature,
116 msg,
117 },
118 ..
119 },
120 ..
121 } => {
122 debug!(%peer, "Received handshake response");
123
124 if !self.outbound_handshakes.contains_key(&peer) {
126 warn!(%peer, "Received unexpected handshake response");
127 return;
128 }
129
130 if !self.peer_manager.is_key_whitelisted(&verification_id_key) {
131 warn!(%peer, ?verification_id_key, "Received handshake response from unwhitelisted peer");
132 self.peer_manager.handle_nonwhitelisted_peer(&peer);
133 return;
134 }
135
136 match self.verify_handshake(&msg, &verification_id_key, &signature) {
138 Ok(()) => {
139 self.complete_handshake(&peer, &verification_id_key);
141 }
142 Err(e) => {
143 warn!(%peer, "Invalid handshake verification: {:?}", e);
144 self.outbound_handshakes.remove(&peer);
145 self.handle_handshake_failure(&peer, "Invalid handshake verification");
146 }
147 }
148 }
149 request_response::Event::Message {
150 peer,
151 message:
152 request_response::Message::Request {
153 request:
154 InstanceMessageRequest::Protocol {
155 protocol,
156 payload,
157 metadata: _,
158 },
159 channel,
160 ..
161 },
162 ..
163 } => {
164 if peer == self.local_peer_id {
166 return;
167 }
168
169 if !self.peer_manager.is_peer_verified(&peer) {
171 warn!(%peer, "Received protocol message from unverified peer");
172 let response = InstanceMessageResponse::Error {
173 code: 403,
174 message: "Handshake required".to_string(),
175 };
176 if let Err(e) = self.send_response(channel, response) {
177 warn!(%peer, "Failed to send error response: {:?}", e);
178 }
179 return;
180 }
181
182 let protocol_message: ProtocolMessage = match bincode::deserialize(&payload) {
183 Ok(message) => message,
184 Err(e) => {
185 warn!(%peer, "Failed to deserialize protocol message: {:?}", e);
186 let response = InstanceMessageResponse::Error {
187 code: 400,
188 message: format!("Invalid protocol message: {:?}", e),
189 };
190 if let Err(e) = self.send_response(channel, response) {
191 warn!(%peer, "Failed to send error response: {:?}", e);
192 }
193 return;
194 }
195 };
196
197 debug!(%peer, %protocol, %protocol_message, "Received protocol request");
198 if let Err(e) = self.protocol_message_sender.send(protocol_message) {
199 warn!(%peer, "Failed to send protocol message: {:?}", e);
200 }
201 }
202 request_response::Event::Message {
203 peer,
204 message:
205 request_response::Message::Response {
206 response: InstanceMessageResponse::Error { code, message },
207 ..
208 },
209 ..
210 } => {
211 if !self.peer_manager.is_peer_verified(&peer) {
212 warn!(%peer, code, %message, "Received error response from unverified peer");
213 return;
214 }
215 }
216 request_response::Event::Message {
217 peer,
218 message:
219 request_response::Message::Response {
220 response: InstanceMessageResponse::Success { protocol, data: _ },
221 ..
222 },
223 ..
224 } => {
225 debug!(%peer, %protocol, "Received successful protocol response");
226 }
227 _ => {}
228 }
229
230 self.check_expired_handshakes();
232 }
233
234 fn check_expired_handshakes(&mut self) {
236 let now = Instant::now();
237
238 let expired_inbound: Vec<_> = self
240 .inbound_handshakes
241 .clone()
242 .into_read_only()
243 .iter()
244 .filter(|&(_, &time)| now.duration_since(time) > INBOUND_HANDSHAKE_TIMEOUT)
245 .map(|(peer_id, _)| *peer_id)
246 .collect();
247
248 for peer_id in expired_inbound {
249 self.inbound_handshakes.remove(&peer_id);
250 self.handle_handshake_failure(&peer_id, "Inbound handshake timeout");
251 }
252
253 let expired_outbound: Vec<_> = self
255 .outbound_handshakes
256 .clone()
257 .into_read_only()
258 .iter()
259 .filter(|&(_, &time)| now.duration_since(time) > OUTBOUND_HANDSHAKE_TIMEOUT)
260 .map(|(peer_id, _)| *peer_id)
261 .collect();
262
263 for peer_id in expired_outbound {
264 self.outbound_handshakes.remove(&peer_id);
265 self.handle_handshake_failure(&peer_id, "Outbound handshake timeout");
266 }
267 }
268
269 fn complete_handshake(
271 &mut self,
272 peer: &PeerId,
273 verification_id_key: &VerificationIdentifierKey<K>,
274 ) {
275 debug!(%peer, ?verification_id_key, "Completed handshake");
276
277 self.inbound_handshakes.remove(peer);
279 self.outbound_handshakes.remove(peer);
280
281 self.peer_manager
283 .link_peer_id_to_verification_id_key(peer, verification_id_key);
284
285 self.peer_manager.verify_peer(peer);
287 }
288}