1use super::{InstanceMessageRequest, InstanceMessageResponse};
2use crate::blueprint_protocol::HandshakeMessage;
3use crate::discovery::PeerManager;
4use crate::discovery::peers::VerificationIdentifierKey;
5use crate::discovery::utils::get_address_from_compressed_pubkey;
6use crate::types::ProtocolMessage;
7use bincode;
8use blueprint_core::{debug, error, info, warn};
9use blueprint_crypto::BytesEncoding;
10use blueprint_crypto::KeyType;
11use crossbeam_channel::Sender;
12use dashmap::DashMap;
13use libp2p::{
14 Multiaddr, PeerId, StreamProtocol,
15 core::transport::PortUse,
16 gossipsub::{self, IdentTopic, MessageId, Sha256Topic},
17 identity::Keypair,
18 request_response::{self, OutboundRequestId, ResponseChannel},
19 swarm::{
20 ConnectionDenied, ConnectionId, FromSwarm, NetworkBehaviour, THandler, THandlerInEvent,
21 THandlerOutEvent, ToSwarm,
22 },
23};
24use std::{
25 sync::Arc,
26 task::Poll,
27 time::{Duration, Instant},
28};
29
30#[derive(NetworkBehaviour)]
31pub struct DerivedBlueprintProtocolBehaviour<K: KeyType> {
32 request_response:
34 request_response::cbor::Behaviour<InstanceMessageRequest<K>, InstanceMessageResponse<K>>,
35 gossipsub: gossipsub::Behaviour,
37}
38
39#[derive(Debug)]
41pub enum BlueprintProtocolEvent<K: KeyType> {
42 Request {
44 peer: PeerId,
45 request: InstanceMessageRequest<K>,
46 channel: ResponseChannel<InstanceMessageResponse<K>>,
47 },
48 Response {
50 peer: PeerId,
51 request_id: OutboundRequestId,
52 response: InstanceMessageResponse<K>,
53 },
54 GossipMessage {
56 source: PeerId,
57 message: Vec<u8>,
58 topic: IdentTopic,
59 },
60}
61
62pub struct BlueprintProtocolBehaviour<K: KeyType> {
64 blueprint_protocol: DerivedBlueprintProtocolBehaviour<K>,
66 pub(crate) blueprint_protocol_name: String,
68 pub(crate) peer_manager: Arc<PeerManager<K>>,
70 pub(crate) local_peer_id: PeerId,
72 pub(crate) instance_key_pair: K::Secret,
74 pub(crate) inbound_handshakes: DashMap<PeerId, Instant>,
76 pub(crate) outbound_handshakes: DashMap<PeerId, Instant>,
78 pub(crate) protocol_message_sender: Sender<ProtocolMessage>,
80 pub(crate) use_address_for_handshake_verification: bool,
82}
83
84impl<K: KeyType> BlueprintProtocolBehaviour<K> {
85 #[must_use]
87 #[allow(clippy::missing_panics_doc)] pub fn new(
89 local_key: &Keypair,
90 instance_key_pair: &K::Secret,
91 peer_manager: Arc<PeerManager<K>>,
92 blueprint_protocol_name: &str,
93 protocol_message_sender: Sender<ProtocolMessage>,
94 use_address_for_handshake_verification: bool,
95 ) -> Self {
96 let blueprint_protocol_name = blueprint_protocol_name.to_string();
97 let protocols = vec![(
98 StreamProtocol::try_from_owned(blueprint_protocol_name.to_string())
99 .unwrap_or_else(|_| StreamProtocol::new("/blueprint_protocol/1.0.0")),
100 request_response::ProtocolSupport::Full,
101 )];
102
103 let gossipsub_config = gossipsub::ConfigBuilder::default()
105 .heartbeat_interval(Duration::from_secs(1))
106 .validation_mode(gossipsub::ValidationMode::Strict)
107 .mesh_n_low(2)
108 .mesh_n(4)
109 .mesh_n_high(8)
110 .gossip_lazy(3)
111 .history_length(10)
112 .history_gossip(3)
113 .flood_publish(true)
114 .build()
115 .expect("Valid gossipsub config");
116
117 let gossipsub = gossipsub::Behaviour::new(
118 gossipsub::MessageAuthenticity::Signed(local_key.clone()),
119 gossipsub_config,
120 )
121 .expect("Valid gossipsub behaviour");
122
123 let config = request_response::Config::default()
124 .with_request_timeout(Duration::from_secs(30))
125 .with_max_concurrent_streams(50);
126
127 let blueprint_protocol = DerivedBlueprintProtocolBehaviour {
128 request_response: request_response::cbor::Behaviour::new(protocols, config),
129 gossipsub,
130 };
131
132 let local_peer_id = local_key.public().to_peer_id();
133
134 Self {
135 blueprint_protocol,
136 blueprint_protocol_name,
137 peer_manager,
138 local_peer_id,
139 instance_key_pair: instance_key_pair.clone(),
140 inbound_handshakes: DashMap::new(),
141 outbound_handshakes: DashMap::new(),
142 protocol_message_sender,
143 use_address_for_handshake_verification,
144 }
145 }
146
147 #[allow(clippy::unused_self)]
149 pub(crate) fn sign_handshake(
150 &self,
151 key_pair: &mut K::Secret,
152 peer: &PeerId,
153 handshake_msg: &HandshakeMessage,
154 ) -> Option<K::Signature> {
155 let msg = handshake_msg.to_bytes(peer);
156 match K::sign_with_secret(key_pair, &msg) {
157 Ok(signature) => {
158 let public_key = K::public_from_secret(key_pair);
159 let hex_msg = hex::encode(msg);
160
161 debug!(%peer, ?hex_msg, ?public_key, ?signature, "signing handshake");
162 Some(signature)
163 }
164 Err(e) => {
165 warn!("Failed to sign handshake message: {e:?}");
166 None
167 }
168 }
169 }
170
171 pub fn send_request(
173 &mut self,
174 peer: &PeerId,
175 request: InstanceMessageRequest<K>,
176 ) -> OutboundRequestId {
177 debug!(%peer, ?request, "sending request");
178 self.blueprint_protocol
179 .request_response
180 .send_request(peer, request)
181 }
182
183 pub fn send_response(
189 &mut self,
190 channel: ResponseChannel<InstanceMessageResponse<K>>,
191 response: InstanceMessageResponse<K>,
192 ) -> Result<(), InstanceMessageResponse<K>> {
193 debug!(?response, "sending response");
194 self.blueprint_protocol
195 .request_response
196 .send_response(channel, response)
197 }
198
199 pub fn subscribe(&mut self, topic: &str) -> Result<bool, gossipsub::SubscriptionError> {
205 let topic = Sha256Topic::new(topic);
206 self.blueprint_protocol.gossipsub.subscribe(&topic)
207 }
208
209 pub fn unsubscribe(&mut self, topic: &str) -> bool {
211 let topic = Sha256Topic::new(topic);
212 self.blueprint_protocol.gossipsub.unsubscribe(&topic)
213 }
214
215 pub fn publish(
221 &mut self,
222 topic: &str,
223 data: impl Into<Vec<u8>>,
224 ) -> Result<MessageId, gossipsub::PublishError> {
225 let topic = Sha256Topic::new(topic);
226 self.blueprint_protocol.gossipsub.publish(topic, data)
227 }
228
229 pub fn send_handshake(&mut self, peer: &PeerId) -> Result<(), InstanceMessageResponse<K>> {
235 let public_key = K::public_from_secret(&self.instance_key_pair);
236 let handshake_msg = HandshakeMessage::new(self.local_peer_id);
237 let signature =
238 self.sign_handshake(&mut self.instance_key_pair.clone(), peer, &handshake_msg);
239
240 if let Some(signature) = signature {
241 self.send_request(
242 peer,
243 InstanceMessageRequest::Handshake {
244 verification_id_key: if self.use_address_for_handshake_verification {
245 VerificationIdentifierKey::EvmAddress(get_address_from_compressed_pubkey(
246 &public_key.to_bytes(),
247 ))
248 } else {
249 VerificationIdentifierKey::InstancePublicKey(public_key)
250 },
251 signature,
252 msg: handshake_msg,
253 },
254 );
255 }
256
257 Ok(())
258 }
259
260 pub fn verify_handshake(
266 &self,
267 msg: &HandshakeMessage,
268 verification_id_key: &VerificationIdentifierKey<K>,
269 signature: &K::Signature,
270 ) -> Result<(), InstanceMessageResponse<K>> {
271 if msg.is_expired(HandshakeMessage::MAX_AGE) {
272 error!(%msg.sender, "Handshake message expired");
273 return Err(InstanceMessageResponse::Error {
274 code: 400,
275 message: "Handshake message expired".to_string(),
276 });
277 }
278
279 let msg_bytes = msg.to_bytes(&self.local_peer_id);
280 let hex_msg = hex::encode(msg_bytes.clone());
281
282 info!(%hex_msg, ?verification_id_key, ?signature, "verifying handshake");
283
284 let valid = verification_id_key
285 .verify(&msg_bytes, signature.to_bytes().as_ref())
286 .map_err(|e| InstanceMessageResponse::Error {
287 code: 400,
288 message: format!("Invalid handshake signature: {e}"),
289 })?;
290
291 if !valid {
292 warn!(%msg.sender, "Invalid handshake signature for peer");
293 return Err(InstanceMessageResponse::Error {
294 code: 400,
295 message: "Invalid handshake signature".to_string(),
296 });
297 }
298
299 Ok(())
300 }
301
302 pub fn handle_handshake(
308 &self,
309 msg: &HandshakeMessage,
310 verification_id_key: &VerificationIdentifierKey<K>,
311 signature: &K::Signature,
312 ) -> Result<(), InstanceMessageResponse<K>> {
313 self.verify_handshake(msg, verification_id_key, signature)?;
314 self.peer_manager
315 .link_peer_id_to_verification_id_key(&msg.sender, verification_id_key);
316
317 Ok(())
318 }
319
320 pub fn handle_handshake_failure(&self, peer: &PeerId, reason: &str) {
322 if let Some(mut peer_info) = self.peer_manager.get_peer_info(peer) {
324 peer_info.failures += 1;
325 self.peer_manager.update_peer(*peer, peer_info.clone());
326
327 if peer_info.failures >= 3 {
329 self.peer_manager
330 .ban_peer(*peer, reason, Some(Duration::from_secs(300)));
331 }
332 }
333 }
334
335 pub fn handle_gossipsub_event(&mut self, event: gossipsub::Event) {
336 match event {
337 gossipsub::Event::Message {
338 propagation_source,
339 message_id: _,
340 message,
341 } => {
342 if !self.peer_manager.is_peer_verified(&propagation_source) {
344 warn!(%propagation_source, "Received gossip from unverified peer");
345 return;
346 }
347
348 debug!(%propagation_source, "Received gossip message");
349
350 let Ok(protocol_message) = bincode::deserialize::<ProtocolMessage>(&message.data)
352 else {
353 warn!(%propagation_source, "Failed to deserialize gossip message");
354 return;
355 };
356
357 debug!(%propagation_source, %protocol_message, "Forwarding gossip message to protocol handler");
358 if let Err(e) = self.protocol_message_sender.send(protocol_message) {
359 warn!(%propagation_source, "Failed to forward gossip message: {e}");
360 }
361 }
362 gossipsub::Event::Subscribed { peer_id, topic } => {
363 debug!(%peer_id, %topic, "Peer subscribed to topic");
364 }
365 gossipsub::Event::Unsubscribed { peer_id, topic } => {
366 debug!(%peer_id, %topic, "Peer unsubscribed from topic");
367 }
368 _ => {}
369 }
370 }
371}
372
373impl<K: KeyType> NetworkBehaviour for BlueprintProtocolBehaviour<K> {
374 type ConnectionHandler =
375 <DerivedBlueprintProtocolBehaviour<K> as NetworkBehaviour>::ConnectionHandler;
376
377 type ToSwarm = BlueprintProtocolEvent<K>;
378
379 fn handle_established_inbound_connection(
380 &mut self,
381 connection_id: ConnectionId,
382 peer: PeerId,
383 local_addr: &libp2p::Multiaddr,
384 remote_addr: &libp2p::Multiaddr,
385 ) -> Result<THandler<Self>, ConnectionDenied> {
386 debug!(%peer, ?connection_id, ?local_addr, ?remote_addr, "Established inbound connection");
387 self.blueprint_protocol
388 .handle_established_inbound_connection(connection_id, peer, local_addr, remote_addr)
389 }
390
391 fn handle_established_outbound_connection(
392 &mut self,
393 connection_id: ConnectionId,
394 peer: PeerId,
395 addr: &Multiaddr,
396 role_override: libp2p::core::Endpoint,
397 port_use: PortUse,
398 ) -> Result<THandler<Self>, ConnectionDenied> {
399 self.blueprint_protocol
400 .handle_established_outbound_connection(
401 connection_id,
402 peer,
403 addr,
404 role_override,
405 port_use,
406 )
407 }
408
409 fn handle_pending_inbound_connection(
410 &mut self,
411 connection_id: ConnectionId,
412 local_addr: &libp2p::Multiaddr,
413 remote_addr: &libp2p::Multiaddr,
414 ) -> Result<(), ConnectionDenied> {
415 self.blueprint_protocol.handle_pending_inbound_connection(
416 connection_id,
417 local_addr,
418 remote_addr,
419 )
420 }
421
422 fn handle_pending_outbound_connection(
423 &mut self,
424 connection_id: ConnectionId,
425 maybe_peer: Option<PeerId>,
426 addresses: &[libp2p::Multiaddr],
427 effective_role: libp2p::core::Endpoint,
428 ) -> Result<Vec<libp2p::Multiaddr>, ConnectionDenied> {
429 self.blueprint_protocol.handle_pending_outbound_connection(
430 connection_id,
431 maybe_peer,
432 addresses,
433 effective_role,
434 )
435 }
436
437 fn on_connection_handler_event(
438 &mut self,
439 peer_id: PeerId,
440 connection_id: ConnectionId,
441 event: THandlerOutEvent<Self>,
442 ) {
443 self.blueprint_protocol
444 .on_connection_handler_event(peer_id, connection_id, event);
445 }
446
447 fn on_swarm_event(&mut self, event: FromSwarm<'_>) {
448 match &event {
449 FromSwarm::ConnectionEstablished(conn) if conn.other_established == 0 => {
450 if !self.peer_manager.is_peer_verified(&conn.peer_id) {
452 debug!(
453 "Established connection with unverified peer {:?}, sending handshake",
454 conn.peer_id
455 );
456
457 match self.send_handshake(&conn.peer_id) {
458 Ok(()) => {
459 debug!(%conn.peer_id, "Sent handshake request");
460 self.outbound_handshakes
461 .insert(conn.peer_id, Instant::now());
462 }
463 Err(e) => {
464 warn!(%conn.peer_id, "Failed to send handshake: {e:?}");
465 }
466 }
467 }
468
469 self.blueprint_protocol
470 .gossipsub
471 .add_explicit_peer(&conn.peer_id);
472 }
473 FromSwarm::ConnectionClosed(e) if e.remaining_established == 0 => {
474 if self.inbound_handshakes.contains_key(&e.peer_id) {
475 self.inbound_handshakes.remove(&e.peer_id);
476 }
477
478 if self.outbound_handshakes.contains_key(&e.peer_id) {
479 self.outbound_handshakes.remove(&e.peer_id);
480 }
481
482 if self.peer_manager.is_peer_verified(&e.peer_id) {
483 self.peer_manager
484 .remove_peer(&e.peer_id, "connection closed");
485 }
486
487 self.blueprint_protocol
488 .gossipsub
489 .remove_explicit_peer(&e.peer_id);
490
491 self.peer_manager
492 .remove_peer_id_from_verification_id_key(&e.peer_id);
493 }
494
495 _ => {}
496 }
497
498 self.blueprint_protocol.on_swarm_event(event);
499 }
500
501 fn poll(
502 &mut self,
503 cx: &mut std::task::Context<'_>,
504 ) -> Poll<ToSwarm<Self::ToSwarm, THandlerInEvent<Self>>> {
505 while let Poll::Ready(ev) = self.blueprint_protocol.poll(cx) {
506 match ev {
507 ToSwarm::GenerateEvent(ev) => match ev {
508 DerivedBlueprintProtocolBehaviourEvent::RequestResponse(
509 blueprint_protocol_event,
510 ) => self.handle_request_response_event(blueprint_protocol_event),
511 DerivedBlueprintProtocolBehaviourEvent::Gossipsub(gossip_event) => {
512 self.handle_gossipsub_event(gossip_event);
513 }
514 },
515 ToSwarm::Dial { opts } => {
516 return Poll::Ready(ToSwarm::Dial { opts });
517 }
518 ToSwarm::NotifyHandler {
519 peer_id,
520 handler,
521 event,
522 } => {
523 return Poll::Ready(ToSwarm::NotifyHandler {
524 peer_id,
525 handler,
526 event,
527 });
528 }
529 ToSwarm::CloseConnection {
530 peer_id,
531 connection,
532 } => {
533 return Poll::Ready(ToSwarm::CloseConnection {
534 peer_id,
535 connection,
536 });
537 }
538 ToSwarm::ListenOn { opts } => return Poll::Ready(ToSwarm::ListenOn { opts }),
539 ToSwarm::RemoveListener { id } => {
540 return Poll::Ready(ToSwarm::RemoveListener { id });
541 }
542 ToSwarm::NewExternalAddrCandidate(addr) => {
543 return Poll::Ready(ToSwarm::NewExternalAddrCandidate(addr));
544 }
545 ToSwarm::ExternalAddrConfirmed(addr) => {
546 return Poll::Ready(ToSwarm::ExternalAddrConfirmed(addr));
547 }
548 ToSwarm::ExternalAddrExpired(addr) => {
549 return Poll::Ready(ToSwarm::ExternalAddrExpired(addr));
550 }
551 _ => {}
552 }
553 }
554 Poll::Pending
555 }
556}