blueprint_networking/blueprint_protocol/
behaviour.rs

1use super::{InstanceMessageRequest, InstanceMessageResponse};
2use crate::blueprint_protocol::HandshakeMessage;
3use crate::discovery::PeerManager;
4use crate::discovery::peers::VerificationIdentifierKey;
5use crate::discovery::utils::get_address_from_compressed_pubkey;
6use crate::types::ProtocolMessage;
7use bincode;
8use blueprint_core::{debug, error, info, warn};
9use blueprint_crypto::BytesEncoding;
10use blueprint_crypto::KeyType;
11use crossbeam_channel::Sender;
12use dashmap::DashMap;
13use libp2p::{
14    Multiaddr, PeerId, StreamProtocol,
15    core::transport::PortUse,
16    gossipsub::{self, IdentTopic, MessageId, Sha256Topic},
17    identity::Keypair,
18    request_response::{self, OutboundRequestId, ResponseChannel},
19    swarm::{
20        ConnectionDenied, ConnectionId, FromSwarm, NetworkBehaviour, THandler, THandlerInEvent,
21        THandlerOutEvent, ToSwarm,
22    },
23};
24use std::{
25    sync::Arc,
26    task::Poll,
27    time::{Duration, Instant},
28};
29
30#[derive(NetworkBehaviour)]
31pub struct DerivedBlueprintProtocolBehaviour<K: KeyType> {
32    /// Request/response protocol for p2p messaging
33    request_response:
34        request_response::cbor::Behaviour<InstanceMessageRequest<K>, InstanceMessageResponse<K>>,
35    /// Gossipsub for broadcast messaging
36    gossipsub: gossipsub::Behaviour,
37}
38
39/// Events emitted by the `BlueprintProtocolBehaviour`
40#[derive(Debug)]
41pub enum BlueprintProtocolEvent<K: KeyType> {
42    /// Request received from a peer
43    Request {
44        peer: PeerId,
45        request: InstanceMessageRequest<K>,
46        channel: ResponseChannel<InstanceMessageResponse<K>>,
47    },
48    /// Response received from a peer
49    Response {
50        peer: PeerId,
51        request_id: OutboundRequestId,
52        response: InstanceMessageResponse<K>,
53    },
54    /// Gossip message received
55    GossipMessage {
56        source: PeerId,
57        message: Vec<u8>,
58        topic: IdentTopic,
59    },
60}
61
62/// Behaviour that handles the blueprint protocol request/response and gossip
63pub struct BlueprintProtocolBehaviour<K: KeyType> {
64    /// Request/response protocol for direct messaging
65    blueprint_protocol: DerivedBlueprintProtocolBehaviour<K>,
66    /// Name of the blueprint protocol
67    pub(crate) blueprint_protocol_name: String,
68    /// Peer manager for tracking peer states
69    pub(crate) peer_manager: Arc<PeerManager<K>>,
70    /// Libp2p peer ID
71    pub(crate) local_peer_id: PeerId,
72    /// Instance key pair for handshakes and blueprint protocol
73    pub(crate) instance_key_pair: K::Secret,
74    /// Peers with pending inbound handshakes
75    pub(crate) inbound_handshakes: DashMap<PeerId, Instant>,
76    /// Peers with pending outbound handshakes
77    pub(crate) outbound_handshakes: DashMap<PeerId, Instant>,
78    /// Protocol message sender
79    pub(crate) protocol_message_sender: Sender<ProtocolMessage>,
80    /// Flag for using addresses for whitelisting and handshake verification
81    pub(crate) use_address_for_handshake_verification: bool,
82}
83
84impl<K: KeyType> BlueprintProtocolBehaviour<K> {
85    /// Create a new blueprint protocol behaviour
86    #[must_use]
87    #[allow(clippy::missing_panics_doc)] // Known good gossipsub config
88    pub fn new(
89        local_key: &Keypair,
90        instance_key_pair: &K::Secret,
91        peer_manager: Arc<PeerManager<K>>,
92        blueprint_protocol_name: &str,
93        protocol_message_sender: Sender<ProtocolMessage>,
94        use_address_for_handshake_verification: bool,
95    ) -> Self {
96        let blueprint_protocol_name = blueprint_protocol_name.to_string();
97        let protocols = vec![(
98            StreamProtocol::try_from_owned(blueprint_protocol_name.to_string())
99                .unwrap_or_else(|_| StreamProtocol::new("/blueprint_protocol/1.0.0")),
100            request_response::ProtocolSupport::Full,
101        )];
102
103        // Initialize gossipsub with message signing
104        let gossipsub_config = gossipsub::ConfigBuilder::default()
105            .heartbeat_interval(Duration::from_secs(1))
106            .validation_mode(gossipsub::ValidationMode::Strict)
107            .mesh_n_low(2)
108            .mesh_n(4)
109            .mesh_n_high(8)
110            .gossip_lazy(3)
111            .history_length(10)
112            .history_gossip(3)
113            .flood_publish(true)
114            .build()
115            .expect("Valid gossipsub config");
116
117        let gossipsub = gossipsub::Behaviour::new(
118            gossipsub::MessageAuthenticity::Signed(local_key.clone()),
119            gossipsub_config,
120        )
121        .expect("Valid gossipsub behaviour");
122
123        let config = request_response::Config::default()
124            .with_request_timeout(Duration::from_secs(30))
125            .with_max_concurrent_streams(50);
126
127        let blueprint_protocol = DerivedBlueprintProtocolBehaviour {
128            request_response: request_response::cbor::Behaviour::new(protocols, config),
129            gossipsub,
130        };
131
132        let local_peer_id = local_key.public().to_peer_id();
133
134        Self {
135            blueprint_protocol,
136            blueprint_protocol_name,
137            peer_manager,
138            local_peer_id,
139            instance_key_pair: instance_key_pair.clone(),
140            inbound_handshakes: DashMap::new(),
141            outbound_handshakes: DashMap::new(),
142            protocol_message_sender,
143            use_address_for_handshake_verification,
144        }
145    }
146
147    /// Sign a handshake message for a peer
148    #[allow(clippy::unused_self)]
149    pub(crate) fn sign_handshake(
150        &self,
151        key_pair: &mut K::Secret,
152        peer: &PeerId,
153        handshake_msg: &HandshakeMessage,
154    ) -> Option<K::Signature> {
155        let msg = handshake_msg.to_bytes(peer);
156        match K::sign_with_secret(key_pair, &msg) {
157            Ok(signature) => {
158                let public_key = K::public_from_secret(key_pair);
159                let hex_msg = hex::encode(msg);
160
161                debug!(%peer, ?hex_msg, ?public_key, ?signature, "signing handshake");
162                Some(signature)
163            }
164            Err(e) => {
165                warn!("Failed to sign handshake message: {e:?}");
166                None
167            }
168        }
169    }
170
171    /// Send a request to a peer
172    pub fn send_request(
173        &mut self,
174        peer: &PeerId,
175        request: InstanceMessageRequest<K>,
176    ) -> OutboundRequestId {
177        debug!(%peer, ?request, "sending request");
178        self.blueprint_protocol
179            .request_response
180            .send_request(peer, request)
181    }
182
183    /// Send a response through a response channel
184    ///
185    /// # Errors
186    ///
187    /// See [`libp2p::request_response::Behaviour::send_response`]
188    pub fn send_response(
189        &mut self,
190        channel: ResponseChannel<InstanceMessageResponse<K>>,
191        response: InstanceMessageResponse<K>,
192    ) -> Result<(), InstanceMessageResponse<K>> {
193        debug!(?response, "sending response");
194        self.blueprint_protocol
195            .request_response
196            .send_response(channel, response)
197    }
198
199    /// Subscribe to a gossip topic
200    ///
201    /// # Errors
202    ///
203    /// See [`libp2p::gossipsub::SubscriptionError`]
204    pub fn subscribe(&mut self, topic: &str) -> Result<bool, gossipsub::SubscriptionError> {
205        let topic = Sha256Topic::new(topic);
206        self.blueprint_protocol.gossipsub.subscribe(&topic)
207    }
208
209    /// Unsubscribe from a gossip topic
210    pub fn unsubscribe(&mut self, topic: &str) -> bool {
211        let topic = Sha256Topic::new(topic);
212        self.blueprint_protocol.gossipsub.unsubscribe(&topic)
213    }
214
215    /// Publish a message to a gossip topic
216    ///
217    /// # Errors
218    ///
219    /// See [`libp2p::gossipsub::PublishError`]
220    pub fn publish(
221        &mut self,
222        topic: &str,
223        data: impl Into<Vec<u8>>,
224    ) -> Result<MessageId, gossipsub::PublishError> {
225        let topic = Sha256Topic::new(topic);
226        self.blueprint_protocol.gossipsub.publish(topic, data)
227    }
228
229    /// Send a handshake to a peer
230    ///
231    /// # Errors
232    ///
233    /// See [`Self::send_request()`]
234    pub fn send_handshake(&mut self, peer: &PeerId) -> Result<(), InstanceMessageResponse<K>> {
235        let public_key = K::public_from_secret(&self.instance_key_pair);
236        let handshake_msg = HandshakeMessage::new(self.local_peer_id);
237        let signature =
238            self.sign_handshake(&mut self.instance_key_pair.clone(), peer, &handshake_msg);
239
240        if let Some(signature) = signature {
241            self.send_request(
242                peer,
243                InstanceMessageRequest::Handshake {
244                    verification_id_key: if self.use_address_for_handshake_verification {
245                        VerificationIdentifierKey::EvmAddress(get_address_from_compressed_pubkey(
246                            &public_key.to_bytes(),
247                        ))
248                    } else {
249                        VerificationIdentifierKey::InstancePublicKey(public_key)
250                    },
251                    signature,
252                    msg: handshake_msg,
253                },
254            );
255        }
256
257        Ok(())
258    }
259
260    /// Verify and handle a handshake with a peer
261    ///
262    /// # Errors
263    ///
264    /// The handshake expired or has an invalid signature
265    pub fn verify_handshake(
266        &self,
267        msg: &HandshakeMessage,
268        verification_id_key: &VerificationIdentifierKey<K>,
269        signature: &K::Signature,
270    ) -> Result<(), InstanceMessageResponse<K>> {
271        if msg.is_expired(HandshakeMessage::MAX_AGE) {
272            error!(%msg.sender, "Handshake message expired");
273            return Err(InstanceMessageResponse::Error {
274                code: 400,
275                message: "Handshake message expired".to_string(),
276            });
277        }
278
279        let msg_bytes = msg.to_bytes(&self.local_peer_id);
280        let hex_msg = hex::encode(msg_bytes.clone());
281
282        info!(%hex_msg, ?verification_id_key, ?signature, "verifying handshake");
283
284        let valid = verification_id_key
285            .verify(&msg_bytes, signature.to_bytes().as_ref())
286            .map_err(|e| InstanceMessageResponse::Error {
287                code: 400,
288                message: format!("Invalid handshake signature: {e}"),
289            })?;
290
291        if !valid {
292            warn!(%msg.sender, "Invalid handshake signature for peer");
293            return Err(InstanceMessageResponse::Error {
294                code: 400,
295                message: "Invalid handshake signature".to_string(),
296            });
297        }
298
299        Ok(())
300    }
301
302    /// Handle a handshake message, verifying the peer on success
303    ///
304    /// # Errors
305    ///
306    /// See [`Self::verify_handshake()`]
307    pub fn handle_handshake(
308        &self,
309        msg: &HandshakeMessage,
310        verification_id_key: &VerificationIdentifierKey<K>,
311        signature: &K::Signature,
312    ) -> Result<(), InstanceMessageResponse<K>> {
313        self.verify_handshake(msg, verification_id_key, signature)?;
314        self.peer_manager
315            .link_peer_id_to_verification_id_key(&msg.sender, verification_id_key);
316
317        Ok(())
318    }
319
320    /// Handle a failed handshake with a peer
321    pub fn handle_handshake_failure(&self, peer: &PeerId, reason: &str) {
322        // Update peer info and potentially ban peer
323        if let Some(mut peer_info) = self.peer_manager.get_peer_info(peer) {
324            peer_info.failures += 1;
325            self.peer_manager.update_peer(*peer, peer_info.clone());
326
327            // Ban peer if too many failures
328            if peer_info.failures >= 3 {
329                self.peer_manager
330                    .ban_peer(*peer, reason, Some(Duration::from_secs(300)));
331            }
332        }
333    }
334
335    pub fn handle_gossipsub_event(&mut self, event: gossipsub::Event) {
336        match event {
337            gossipsub::Event::Message {
338                propagation_source,
339                message_id: _,
340                message,
341            } => {
342                // Only accept gossip from verified peers
343                if !self.peer_manager.is_peer_verified(&propagation_source) {
344                    warn!(%propagation_source, "Received gossip from unverified peer");
345                    return;
346                }
347
348                debug!(%propagation_source, "Received gossip message");
349
350                // Deserialize the protocol message
351                let Ok(protocol_message) = bincode::deserialize::<ProtocolMessage>(&message.data)
352                else {
353                    warn!(%propagation_source, "Failed to deserialize gossip message");
354                    return;
355                };
356
357                debug!(%propagation_source, %protocol_message, "Forwarding gossip message to protocol handler");
358                if let Err(e) = self.protocol_message_sender.send(protocol_message) {
359                    warn!(%propagation_source, "Failed to forward gossip message: {e}");
360                }
361            }
362            gossipsub::Event::Subscribed { peer_id, topic } => {
363                debug!(%peer_id, %topic, "Peer subscribed to topic");
364            }
365            gossipsub::Event::Unsubscribed { peer_id, topic } => {
366                debug!(%peer_id, %topic, "Peer unsubscribed from topic");
367            }
368            _ => {}
369        }
370    }
371}
372
373impl<K: KeyType> NetworkBehaviour for BlueprintProtocolBehaviour<K> {
374    type ConnectionHandler =
375        <DerivedBlueprintProtocolBehaviour<K> as NetworkBehaviour>::ConnectionHandler;
376
377    type ToSwarm = BlueprintProtocolEvent<K>;
378
379    fn handle_established_inbound_connection(
380        &mut self,
381        connection_id: ConnectionId,
382        peer: PeerId,
383        local_addr: &libp2p::Multiaddr,
384        remote_addr: &libp2p::Multiaddr,
385    ) -> Result<THandler<Self>, ConnectionDenied> {
386        debug!(%peer, ?connection_id, ?local_addr, ?remote_addr, "Established inbound connection");
387        self.blueprint_protocol
388            .handle_established_inbound_connection(connection_id, peer, local_addr, remote_addr)
389    }
390
391    fn handle_established_outbound_connection(
392        &mut self,
393        connection_id: ConnectionId,
394        peer: PeerId,
395        addr: &Multiaddr,
396        role_override: libp2p::core::Endpoint,
397        port_use: PortUse,
398    ) -> Result<THandler<Self>, ConnectionDenied> {
399        self.blueprint_protocol
400            .handle_established_outbound_connection(
401                connection_id,
402                peer,
403                addr,
404                role_override,
405                port_use,
406            )
407    }
408
409    fn handle_pending_inbound_connection(
410        &mut self,
411        connection_id: ConnectionId,
412        local_addr: &libp2p::Multiaddr,
413        remote_addr: &libp2p::Multiaddr,
414    ) -> Result<(), ConnectionDenied> {
415        self.blueprint_protocol.handle_pending_inbound_connection(
416            connection_id,
417            local_addr,
418            remote_addr,
419        )
420    }
421
422    fn handle_pending_outbound_connection(
423        &mut self,
424        connection_id: ConnectionId,
425        maybe_peer: Option<PeerId>,
426        addresses: &[libp2p::Multiaddr],
427        effective_role: libp2p::core::Endpoint,
428    ) -> Result<Vec<libp2p::Multiaddr>, ConnectionDenied> {
429        self.blueprint_protocol.handle_pending_outbound_connection(
430            connection_id,
431            maybe_peer,
432            addresses,
433            effective_role,
434        )
435    }
436
437    fn on_connection_handler_event(
438        &mut self,
439        peer_id: PeerId,
440        connection_id: ConnectionId,
441        event: THandlerOutEvent<Self>,
442    ) {
443        self.blueprint_protocol
444            .on_connection_handler_event(peer_id, connection_id, event);
445    }
446
447    fn on_swarm_event(&mut self, event: FromSwarm<'_>) {
448        match &event {
449            FromSwarm::ConnectionEstablished(conn) if conn.other_established == 0 => {
450                // Start handshake if this peer is not verified
451                if !self.peer_manager.is_peer_verified(&conn.peer_id) {
452                    debug!(
453                        "Established connection with unverified peer {:?}, sending handshake",
454                        conn.peer_id
455                    );
456
457                    match self.send_handshake(&conn.peer_id) {
458                        Ok(()) => {
459                            debug!(%conn.peer_id, "Sent handshake request");
460                            self.outbound_handshakes
461                                .insert(conn.peer_id, Instant::now());
462                        }
463                        Err(e) => {
464                            warn!(%conn.peer_id, "Failed to send handshake: {e:?}");
465                        }
466                    }
467                }
468
469                self.blueprint_protocol
470                    .gossipsub
471                    .add_explicit_peer(&conn.peer_id);
472            }
473            FromSwarm::ConnectionClosed(e) if e.remaining_established == 0 => {
474                if self.inbound_handshakes.contains_key(&e.peer_id) {
475                    self.inbound_handshakes.remove(&e.peer_id);
476                }
477
478                if self.outbound_handshakes.contains_key(&e.peer_id) {
479                    self.outbound_handshakes.remove(&e.peer_id);
480                }
481
482                if self.peer_manager.is_peer_verified(&e.peer_id) {
483                    self.peer_manager
484                        .remove_peer(&e.peer_id, "connection closed");
485                }
486
487                self.blueprint_protocol
488                    .gossipsub
489                    .remove_explicit_peer(&e.peer_id);
490
491                self.peer_manager
492                    .remove_peer_id_from_verification_id_key(&e.peer_id);
493            }
494
495            _ => {}
496        }
497
498        self.blueprint_protocol.on_swarm_event(event);
499    }
500
501    fn poll(
502        &mut self,
503        cx: &mut std::task::Context<'_>,
504    ) -> Poll<ToSwarm<Self::ToSwarm, THandlerInEvent<Self>>> {
505        while let Poll::Ready(ev) = self.blueprint_protocol.poll(cx) {
506            match ev {
507                ToSwarm::GenerateEvent(ev) => match ev {
508                    DerivedBlueprintProtocolBehaviourEvent::RequestResponse(
509                        blueprint_protocol_event,
510                    ) => self.handle_request_response_event(blueprint_protocol_event),
511                    DerivedBlueprintProtocolBehaviourEvent::Gossipsub(gossip_event) => {
512                        self.handle_gossipsub_event(gossip_event);
513                    }
514                },
515                ToSwarm::Dial { opts } => {
516                    return Poll::Ready(ToSwarm::Dial { opts });
517                }
518                ToSwarm::NotifyHandler {
519                    peer_id,
520                    handler,
521                    event,
522                } => {
523                    return Poll::Ready(ToSwarm::NotifyHandler {
524                        peer_id,
525                        handler,
526                        event,
527                    });
528                }
529                ToSwarm::CloseConnection {
530                    peer_id,
531                    connection,
532                } => {
533                    return Poll::Ready(ToSwarm::CloseConnection {
534                        peer_id,
535                        connection,
536                    });
537                }
538                ToSwarm::ListenOn { opts } => return Poll::Ready(ToSwarm::ListenOn { opts }),
539                ToSwarm::RemoveListener { id } => {
540                    return Poll::Ready(ToSwarm::RemoveListener { id });
541                }
542                ToSwarm::NewExternalAddrCandidate(addr) => {
543                    return Poll::Ready(ToSwarm::NewExternalAddrCandidate(addr));
544                }
545                ToSwarm::ExternalAddrConfirmed(addr) => {
546                    return Poll::Ready(ToSwarm::ExternalAddrConfirmed(addr));
547                }
548                ToSwarm::ExternalAddrExpired(addr) => {
549                    return Poll::Ready(ToSwarm::ExternalAddrExpired(addr));
550                }
551                _ => {}
552            }
553        }
554        Poll::Pending
555    }
556}