blueprint_networking/blueprint_protocol/
handler.rs

1use std::time::{Duration, Instant};
2
3use alloy_primitives::Address;
4use blueprint_core::{debug, warn};
5use blueprint_crypto::{BytesEncoding, KeyType, hashing::keccak_256};
6use libp2p::{PeerId, request_response};
7
8use crate::blueprint_protocol::HandshakeMessage;
9use crate::discovery::peers::VerificationIdentifierKey;
10use crate::types::ProtocolMessage;
11
12use super::{BlueprintProtocolBehaviour, InstanceMessageRequest, InstanceMessageResponse};
13
14const INBOUND_HANDSHAKE_TIMEOUT: Duration = Duration::from_secs(30);
15const OUTBOUND_HANDSHAKE_TIMEOUT: Duration = Duration::from_secs(30);
16
17impl<K: KeyType> BlueprintProtocolBehaviour<K> {
18    pub fn handle_request_response_event(
19        &mut self,
20        event: request_response::Event<InstanceMessageRequest<K>, InstanceMessageResponse<K>>,
21    ) {
22        match event {
23            request_response::Event::Message {
24                peer,
25                message:
26                    request_response::Message::Request {
27                        request:
28                            InstanceMessageRequest::Handshake {
29                                verification_id_key,
30                                signature,
31                                msg,
32                            },
33                        channel,
34                        ..
35                    },
36                ..
37            } => {
38                debug!(%peer, "Received handshake request");
39
40                // Check if we already sent a handshake request to this peer
41                if self.outbound_handshakes.contains_key(&peer) {
42                    // If we have an outbound handshake pending, we should still respond to their request
43                    // This ensures both sides complete their handshakes even if messages cross on the wire
44                    debug!(%peer, "Responding to inbound handshake request while outbound is pending");
45                }
46
47                if !self.peer_manager.is_key_whitelisted(&verification_id_key) {
48                    // warn!(%peer, ?verification_id_key, "Received handshake response from unwhitelisted peer");
49                    warn!(
50                        "Received handshake response from unwhitelisted peer {:?} with key {:?}",
51                        peer, verification_id_key
52                    );
53                    self.peer_manager.handle_nonwhitelisted_peer(&peer);
54                    return;
55                }
56
57                // Verify the handshake
58                match self.verify_handshake(&msg, &verification_id_key, &signature) {
59                    Ok(()) => {
60                        // Store the handshake request
61                        self.inbound_handshakes.insert(peer, Instant::now());
62                        self.peer_manager
63                            .link_peer_id_to_verification_id_key(&peer, &verification_id_key);
64
65                        // Send handshake response
66                        let mut key_pair = self.instance_key_pair.clone();
67                        let public_key = K::public_from_secret(&key_pair);
68                        let self_verification_id_key =
69                            if self.use_address_for_handshake_verification {
70                                let pre_truncation = keccak_256(public_key.to_bytes().as_ref());
71                                VerificationIdentifierKey::EvmAddress(Address::from_slice(
72                                    &pre_truncation[12..],
73                                ))
74                            } else {
75                                VerificationIdentifierKey::InstancePublicKey(public_key)
76                            };
77
78                        let handshake_msg = HandshakeMessage::new(self.local_peer_id);
79                        let Some(signature) =
80                            self.sign_handshake(&mut key_pair, &peer, &handshake_msg)
81                        else {
82                            return;
83                        };
84
85                        let response = InstanceMessageResponse::Handshake {
86                            verification_id_key: self_verification_id_key,
87                            signature,
88                            msg: handshake_msg,
89                        };
90
91                        if let Err(e) = self.send_response(channel, response) {
92                            warn!(%peer, "Failed to send handshake response: {:?}", e);
93                            return;
94                        }
95                    }
96                    Err(e) => {
97                        warn!(%peer, "Invalid handshake request: {:?}", e);
98                        let response = InstanceMessageResponse::Error {
99                            code: 400,
100                            message: format!("Invalid handshake: {:?}", e),
101                        };
102                        if let Err(e) = self.send_response(channel, response) {
103                            warn!(%peer, "Failed to send error response: {:?}", e);
104                        }
105                    }
106                }
107            }
108            request_response::Event::Message {
109                peer,
110                message:
111                    request_response::Message::Response {
112                        response:
113                            InstanceMessageResponse::Handshake {
114                                verification_id_key,
115                                signature,
116                                msg,
117                            },
118                        ..
119                    },
120                ..
121            } => {
122                debug!(%peer, "Received handshake response");
123
124                // Verify we have a pending outbound handshake
125                if !self.outbound_handshakes.contains_key(&peer) {
126                    warn!(%peer, "Received unexpected handshake response");
127                    return;
128                }
129
130                if !self.peer_manager.is_key_whitelisted(&verification_id_key) {
131                    warn!(%peer, ?verification_id_key, "Received handshake response from unwhitelisted peer");
132                    self.peer_manager.handle_nonwhitelisted_peer(&peer);
133                    return;
134                }
135
136                // Verify the handshake
137                match self.verify_handshake(&msg, &verification_id_key, &signature) {
138                    Ok(()) => {
139                        // Mark handshake as completed
140                        self.complete_handshake(&peer, &verification_id_key);
141                    }
142                    Err(e) => {
143                        warn!(%peer, "Invalid handshake verification: {:?}", e);
144                        self.outbound_handshakes.remove(&peer);
145                        self.handle_handshake_failure(&peer, "Invalid handshake verification");
146                    }
147                }
148            }
149            request_response::Event::Message {
150                peer,
151                message:
152                    request_response::Message::Request {
153                        request:
154                            InstanceMessageRequest::Protocol {
155                                protocol,
156                                payload,
157                                metadata: _,
158                            },
159                        channel,
160                        ..
161                    },
162                ..
163            } => {
164                // Reject messages from self
165                if peer == self.local_peer_id {
166                    return;
167                }
168
169                // Only accept protocol messages from peers we've completed handshakes with
170                if !self.peer_manager.is_peer_verified(&peer) {
171                    warn!(%peer, "Received protocol message from unverified peer");
172                    let response = InstanceMessageResponse::Error {
173                        code: 403,
174                        message: "Handshake required".to_string(),
175                    };
176                    if let Err(e) = self.send_response(channel, response) {
177                        warn!(%peer, "Failed to send error response: {:?}", e);
178                    }
179                    return;
180                }
181
182                let protocol_message: ProtocolMessage = match bincode::deserialize(&payload) {
183                    Ok(message) => message,
184                    Err(e) => {
185                        warn!(%peer, "Failed to deserialize protocol message: {:?}", e);
186                        let response = InstanceMessageResponse::Error {
187                            code: 400,
188                            message: format!("Invalid protocol message: {:?}", e),
189                        };
190                        if let Err(e) = self.send_response(channel, response) {
191                            warn!(%peer, "Failed to send error response: {:?}", e);
192                        }
193                        return;
194                    }
195                };
196
197                debug!(%peer, %protocol, %protocol_message, "Received protocol request");
198                if let Err(e) = self.protocol_message_sender.send(protocol_message) {
199                    warn!(%peer, "Failed to send protocol message: {:?}", e);
200                }
201            }
202            request_response::Event::Message {
203                peer,
204                message:
205                    request_response::Message::Response {
206                        response: InstanceMessageResponse::Error { code, message },
207                        ..
208                    },
209                ..
210            } => {
211                if !self.peer_manager.is_peer_verified(&peer) {
212                    warn!(%peer, code, %message, "Received error response from unverified peer");
213                    return;
214                }
215            }
216            request_response::Event::Message {
217                peer,
218                message:
219                    request_response::Message::Response {
220                        response: InstanceMessageResponse::Success { protocol, data: _ },
221                        ..
222                    },
223                ..
224            } => {
225                debug!(%peer, %protocol, "Received successful protocol response");
226            }
227            _ => {}
228        }
229
230        // Check for expired handshakes
231        self.check_expired_handshakes();
232    }
233
234    /// Check for and remove expired handshakes
235    fn check_expired_handshakes(&mut self) {
236        let now = Instant::now();
237
238        // Check inbound handshakes
239        let expired_inbound: Vec<_> = self
240            .inbound_handshakes
241            .clone()
242            .into_read_only()
243            .iter()
244            .filter(|&(_, &time)| now.duration_since(time) > INBOUND_HANDSHAKE_TIMEOUT)
245            .map(|(peer_id, _)| *peer_id)
246            .collect();
247
248        for peer_id in expired_inbound {
249            self.inbound_handshakes.remove(&peer_id);
250            self.handle_handshake_failure(&peer_id, "Inbound handshake timeout");
251        }
252
253        // Check outbound handshakes
254        let expired_outbound: Vec<_> = self
255            .outbound_handshakes
256            .clone()
257            .into_read_only()
258            .iter()
259            .filter(|&(_, &time)| now.duration_since(time) > OUTBOUND_HANDSHAKE_TIMEOUT)
260            .map(|(peer_id, _)| *peer_id)
261            .collect();
262
263        for peer_id in expired_outbound {
264            self.outbound_handshakes.remove(&peer_id);
265            self.handle_handshake_failure(&peer_id, "Outbound handshake timeout");
266        }
267    }
268
269    /// Complete a successful handshake
270    fn complete_handshake(
271        &mut self,
272        peer: &PeerId,
273        verification_id_key: &VerificationIdentifierKey<K>,
274    ) {
275        debug!(%peer, ?verification_id_key, "Completed handshake");
276
277        // Remove from pending handshakes
278        self.inbound_handshakes.remove(peer);
279        self.outbound_handshakes.remove(peer);
280
281        // Update peer manager
282        self.peer_manager
283            .link_peer_id_to_verification_id_key(peer, verification_id_key);
284
285        // Add to verified peers
286        self.peer_manager.verify_peer(peer);
287    }
288}