Skip to main content

ethrex_p2p/discv5/
messages.rs

1use aes::cipher::{KeyIvInit, StreamCipher, StreamCipherError};
2use aes_gcm::{Aes128Gcm, KeyInit, aead::AeadMutInPlace};
3use bytes::{BufMut, Bytes};
4use ethrex_common::H256;
5use ethrex_rlp::{
6    decode::RLPDecode,
7    encode::RLPEncode,
8    error::RLPDecodeError,
9    structs::{Decoder, Encoder},
10};
11use std::{array::TryFromSliceError, fmt::Display, net::SocketAddr};
12
13use crate::types::NodeRecord;
14
15type Aes128Ctr64BE = ctr::Ctr64BE<aes::Aes128>;
16
17// Max and min packet sizes as defined in
18// https://github.com/ethereum/devp2p/blob/master/discv5/discv5-wire.md#udp-communication
19// Used for package validation
20const MIN_PACKET_SIZE: usize = 63;
21const MAX_PACKET_SIZE: usize = 1280;
22/// 32 src-id + 1 sig-size + 1 eph-key-size
23const HANDSHAKE_AUTHDATA_HEAD: usize = 34;
24// protocol data
25const PROTOCOL_ID: &[u8] = b"discv5";
26const PROTOCOL_VERSION: u16 = 0x0001;
27// masking-iv size for a u128
28const IV_MASKING_SIZE: usize = 16;
29// static_header size is 23 bytes
30const STATIC_HEADER_SIZE: usize = 23;
31const STATIC_HEADER_END: usize = IV_MASKING_SIZE + STATIC_HEADER_SIZE;
32// Number of distances to include in a FindNode message
33pub const DISTANCES_PER_FIND_NODE_MSG: u8 = 3;
34
35#[derive(Debug, thiserror::Error)]
36pub enum PacketCodecError {
37    #[error("RLP decoding error")]
38    RLPDecodeError(#[from] RLPDecodeError),
39    #[error("Packet header decoding error")]
40    InvalidHeader,
41    #[error("Message decoding error, message type: {0}")]
42    InvalidMessage(u8),
43    #[error("Invalid packet size")]
44    InvalidSize,
45    #[error("Session not established yet")]
46    SessionNotEstablished,
47    #[error("Invalid protocol: {0}")]
48    InvalidProtocol(String),
49    #[error("Stream Cipher Error: {0}")]
50    CipherError(String),
51    #[error("TryFromSliceError: {0}")]
52    TryFromSliceError(#[from] TryFromSliceError),
53    #[error("Io Error: {0}")]
54    IoError(#[from] std::io::Error),
55    #[error("Malformed Data")]
56    MalformedData,
57}
58
59impl From<StreamCipherError> for PacketCodecError {
60    fn from(error: StreamCipherError) -> Self {
61        PacketCodecError::CipherError(error.to_string())
62    }
63}
64
65#[derive(Debug, Clone, PartialEq, Eq)]
66pub struct Packet {
67    pub masking_iv: [u8; IV_MASKING_SIZE],
68    pub header: PacketHeader,
69    pub encrypted_message: Vec<u8>,
70}
71
72impl Packet {
73    pub fn decode(dest_id: &H256, encoded_packet: &[u8]) -> Result<Packet, PacketCodecError> {
74        if encoded_packet.len() < MIN_PACKET_SIZE || encoded_packet.len() > MAX_PACKET_SIZE {
75            return Err(PacketCodecError::InvalidSize);
76        }
77
78        // the packet structure is
79        // masking-iv || masked-header || message
80        // 16 bytes for an u128
81        let masking_iv = &encoded_packet[..IV_MASKING_SIZE];
82
83        let mut cipher = <Aes128Ctr64BE as KeyIvInit>::new(dest_id[..16].into(), masking_iv.into());
84
85        let header = PacketHeader::decode(&mut cipher, encoded_packet)
86            .map_err(|_e| PacketCodecError::InvalidHeader)?;
87        let encrypted_message = encoded_packet[header.header_end_offset..].to_vec();
88        Ok(Packet {
89            masking_iv: masking_iv.try_into()?,
90            header,
91            encrypted_message,
92        })
93    }
94
95    pub fn encode(&self, buf: &mut dyn BufMut, dest_id: &H256) -> Result<(), PacketCodecError> {
96        let masking_iv = self.masking_iv;
97        buf.put_slice(&masking_iv);
98
99        let mut cipher =
100            <Aes128Ctr64BE as KeyIvInit>::new(dest_id[..16].into(), masking_iv[..].into());
101
102        self.header.encode(buf, &mut cipher)?;
103        buf.put_slice(&self.encrypted_message);
104
105        Ok(())
106    }
107}
108
109#[derive(Debug, Clone, PartialEq, Eq)]
110pub struct PacketHeader {
111    pub static_header: [u8; STATIC_HEADER_SIZE],
112    pub flag: u8,
113    pub nonce: [u8; 12],
114    pub authdata: Vec<u8>,
115    /// Offset in the encoded packet where authdata ends, i.e where the header ends.
116    pub header_end_offset: usize,
117}
118
119impl PacketHeader {
120    fn decode<T: StreamCipher>(
121        cipher: &mut T,
122        encoded_packet: &[u8],
123    ) -> Result<PacketHeader, PacketCodecError> {
124        // static header
125        let mut static_header: [u8; STATIC_HEADER_SIZE] =
126            encoded_packet[IV_MASKING_SIZE..STATIC_HEADER_END].try_into()?;
127
128        cipher.try_apply_keystream(&mut static_header)?;
129
130        // static-header = protocol-id || version || flag || nonce || authdata-size
131        //protocol check
132        let protocol_id = &static_header[..6];
133        let version = u16::from_be_bytes(static_header[6..8].try_into()?);
134        if protocol_id != PROTOCOL_ID || version != PROTOCOL_VERSION {
135            return Err(PacketCodecError::InvalidProtocol(
136                match str::from_utf8(protocol_id) {
137                    Ok(result) => format!("{} v{}", result, version),
138                    Err(_) => format!("{:?} v{}", protocol_id, version),
139                },
140            ));
141        }
142
143        let flag = static_header[8];
144        let nonce = static_header[9..21].try_into()?;
145        let authdata_size = u16::from_be_bytes(static_header[21..23].try_into()?) as usize;
146        let authdata_end = STATIC_HEADER_END + authdata_size;
147
148        if encoded_packet.len() < authdata_end {
149            return Err(PacketCodecError::InvalidSize);
150        }
151
152        let mut authdata = encoded_packet[STATIC_HEADER_END..authdata_end].to_vec();
153
154        cipher.try_apply_keystream(&mut authdata)?;
155
156        Ok(PacketHeader {
157            static_header,
158            flag,
159            nonce,
160            authdata,
161            header_end_offset: authdata_end,
162        })
163    }
164
165    fn encode<T: StreamCipher>(
166        &self,
167        buf: &mut dyn BufMut,
168        cipher: &mut T,
169    ) -> Result<(), PacketCodecError> {
170        let mut static_header = Vec::new();
171        static_header.put_slice(PROTOCOL_ID);
172        static_header.put_slice(&PROTOCOL_VERSION.to_be_bytes());
173        static_header.put_u8(self.flag);
174        static_header.put_slice(&self.nonce);
175        static_header.put_slice(&(self.authdata.len() as u16).to_be_bytes());
176        cipher.try_apply_keystream(&mut static_header)?;
177        buf.put_slice(&static_header);
178
179        let mut authdata = self.authdata.clone();
180        cipher.try_apply_keystream(&mut authdata)?;
181        buf.put_slice(&authdata);
182
183        Ok(())
184    }
185}
186
187pub trait PacketTrait {
188    const TYPE_FLAG: u8;
189    fn encode_authdata(&self, buf: &mut dyn BufMut) -> Result<(), PacketCodecError>;
190    fn get_encoded_message(&self) -> Vec<u8>;
191
192    fn build_header(&self, nonce: &[u8; 12]) -> Result<PacketHeader, PacketCodecError> {
193        let mut authdata = Vec::new();
194        self.encode_authdata(&mut authdata)?;
195
196        let authdata_size =
197            u16::try_from(authdata.len()).map_err(|_| PacketCodecError::InvalidSize)?;
198
199        let mut static_header: [u8; 23] = [0; 23];
200        static_header[0..6].copy_from_slice(PROTOCOL_ID);
201        static_header[6..8].copy_from_slice(&PROTOCOL_VERSION.to_be_bytes());
202        static_header[8] = Self::TYPE_FLAG;
203        static_header[9..21].copy_from_slice(nonce);
204        static_header[21..].copy_from_slice(&authdata_size.to_be_bytes());
205        let header_end_offset = 16 + authdata.len() + static_header.len();
206        Ok(PacketHeader {
207            static_header,
208            flag: Self::TYPE_FLAG,
209            nonce: *nonce,
210            authdata,
211            header_end_offset,
212        })
213    }
214
215    /// Encodes the packet
216    fn encode(
217        &self,
218        nonce: &[u8; 12],
219        masking_iv: [u8; 16],
220        encrypt_key: &[u8],
221    ) -> Result<Packet, PacketCodecError> {
222        if encrypt_key.len() < 16 {
223            return Err(PacketCodecError::InvalidSize);
224        }
225        let header = self.build_header(nonce)?;
226
227        let mut message = self.get_encoded_message();
228        let mut message_ad = masking_iv.to_vec();
229        message_ad.extend_from_slice(&header.static_header);
230        message_ad.extend_from_slice(&header.authdata);
231
232        let mut cipher = Aes128Gcm::new(encrypt_key[..16].into());
233        cipher
234            .encrypt_in_place(&header.nonce.into(), &message_ad, &mut message)
235            .map_err(|e| PacketCodecError::CipherError(e.to_string()))?;
236
237        Ok(Packet {
238            masking_iv,
239            header,
240            encrypted_message: message,
241        })
242    }
243}
244
245#[derive(Debug, Clone, PartialEq, Eq)]
246pub struct Ordinary {
247    pub src_id: H256,
248    pub message: Message,
249}
250
251impl PacketTrait for Ordinary {
252    const TYPE_FLAG: u8 = 0x00;
253
254    fn encode_authdata(&self, buf: &mut dyn BufMut) -> Result<(), PacketCodecError> {
255        buf.put_slice(self.src_id.as_bytes());
256        Ok(())
257    }
258
259    fn get_encoded_message(&self) -> Vec<u8> {
260        let mut message = Vec::new();
261        self.message.encode(&mut message);
262        message
263    }
264}
265
266impl Ordinary {
267    pub fn decode(packet: &Packet, decrypt_key: &[u8]) -> Result<Ordinary, PacketCodecError> {
268        if packet.header.authdata.len() != 32 {
269            return Err(PacketCodecError::InvalidSize);
270        }
271
272        let mut message = packet.encrypted_message.to_vec();
273        decrypt_message(decrypt_key, packet, &mut message)?;
274
275        let src_id = H256::from_slice(&packet.header.authdata);
276
277        let message = Message::decode(&message).map_err(|_e| {
278            PacketCodecError::InvalidMessage(message.first().copied().unwrap_or(0))
279        })?;
280        Ok(Ordinary { src_id, message })
281    }
282}
283
284/// Decrypts a message using AES-128-GCM.
285/// The message is decrypted in place.
286pub fn decrypt_message(
287    key: &[u8],
288    packet: &Packet,
289    message: &mut Vec<u8>,
290) -> Result<(), PacketCodecError> {
291    if key.len() < 16 {
292        return Err(PacketCodecError::InvalidSize);
293    }
294
295    // message-ad = masking-iv || static-header || authdata
296    let mut message_ad = packet.masking_iv.to_vec();
297    message_ad.extend_from_slice(&packet.header.static_header);
298    message_ad.extend_from_slice(&packet.header.authdata);
299
300    let mut cipher = Aes128Gcm::new(key[..16].into());
301    cipher
302        .decrypt_in_place(packet.header.nonce.as_slice().into(), &message_ad, message)
303        .map_err(|e| PacketCodecError::CipherError(e.to_string()))?;
304    Ok(())
305}
306
307#[derive(Debug, Clone, PartialEq, Eq)]
308pub struct WhoAreYou {
309    pub id_nonce: u128,
310    pub enr_seq: u64,
311}
312
313impl PacketTrait for WhoAreYou {
314    const TYPE_FLAG: u8 = 0x01;
315
316    fn encode_authdata(&self, buf: &mut dyn BufMut) -> Result<(), PacketCodecError> {
317        buf.put_slice(&self.id_nonce.to_be_bytes());
318        buf.put_slice(&self.enr_seq.to_be_bytes());
319        Ok(())
320    }
321
322    fn get_encoded_message(&self) -> Vec<u8> {
323        Vec::new()
324    }
325
326    /// Encodes the WhoAreYou packet.
327    /// No encryption needed, just an empty message
328    fn encode(
329        &self,
330        nonce: &[u8; 12],
331        masking_iv: [u8; 16],
332        _encrypt_key: &[u8],
333    ) -> Result<Packet, PacketCodecError> {
334        Ok(Packet {
335            masking_iv,
336            header: self.build_header(nonce)?,
337            encrypted_message: Vec::new(),
338        })
339    }
340}
341
342impl WhoAreYou {
343    pub fn decode(packet: &Packet) -> Result<WhoAreYou, PacketCodecError> {
344        let authdata = packet.header.authdata.clone();
345        let id_nonce = u128::from_be_bytes(authdata[..16].try_into()?);
346        let enr_seq = u64::from_be_bytes(authdata[16..].try_into()?);
347
348        Ok(WhoAreYou { id_nonce, enr_seq })
349    }
350}
351
352/// Parsed handshake authdata, used for signature verification and session key derivation
353/// before decrypting the message.
354#[derive(Debug, Clone, PartialEq, Eq)]
355pub struct HandshakeAuthdata {
356    pub src_id: H256,
357    pub id_signature: Vec<u8>,
358    pub eph_pubkey: Vec<u8>,
359    pub record: Option<NodeRecord>,
360}
361
362impl HandshakeAuthdata {
363    /// Decodes the authdata from a handshake packet header.
364    /// This can be called before decryption to extract the ephemeral public key
365    /// needed for session key derivation.
366    pub fn decode(authdata: &[u8]) -> Result<Self, PacketCodecError> {
367        if authdata.len() < HANDSHAKE_AUTHDATA_HEAD {
368            return Err(PacketCodecError::InvalidSize);
369        }
370
371        let src_id = H256::from_slice(&authdata[..32]);
372        let sig_size = authdata[32] as usize;
373        let eph_key_size = authdata[33] as usize;
374
375        let authdata_head = HANDSHAKE_AUTHDATA_HEAD + sig_size + eph_key_size;
376        if authdata.len() < authdata_head {
377            return Err(PacketCodecError::InvalidSize);
378        }
379
380        let id_signature =
381            authdata[HANDSHAKE_AUTHDATA_HEAD..HANDSHAKE_AUTHDATA_HEAD + sig_size].to_vec();
382
383        let eph_key_start = HANDSHAKE_AUTHDATA_HEAD + sig_size;
384        let eph_pubkey = authdata[eph_key_start..authdata_head].to_vec();
385
386        let record = if authdata.len() > authdata_head {
387            let record_bytes = &authdata[authdata_head..];
388            if record_bytes.is_empty() {
389                None
390            } else {
391                Some(NodeRecord::decode(record_bytes)?)
392            }
393        } else {
394            None
395        };
396
397        Ok(HandshakeAuthdata {
398            src_id,
399            id_signature,
400            eph_pubkey,
401            record,
402        })
403    }
404}
405
406#[derive(Debug, Clone, PartialEq, Eq)]
407pub struct Handshake {
408    pub src_id: H256,
409    pub id_signature: Vec<u8>,
410    pub eph_pubkey: Vec<u8>,
411    /// The record field may be omitted if the enr-seq of WHOAREYOU is recent enough, i.e. when it matches the current sequence number of the sending node.
412    /// If enr-seq is zero, the record must be sent.
413    pub record: Option<NodeRecord>,
414    pub message: Message,
415}
416
417impl PacketTrait for Handshake {
418    const TYPE_FLAG: u8 = 0x02;
419
420    fn encode_authdata(&self, buf: &mut dyn BufMut) -> Result<(), PacketCodecError> {
421        let sig_size: u8 = self
422            .id_signature
423            .len()
424            .try_into()
425            .map_err(|_| PacketCodecError::InvalidSize)?;
426        let eph_key_size: u8 = self
427            .eph_pubkey
428            .len()
429            .try_into()
430            .map_err(|_| PacketCodecError::InvalidSize)?;
431
432        buf.put_slice(self.src_id.as_bytes());
433        buf.put_u8(sig_size);
434        buf.put_u8(eph_key_size);
435        buf.put_slice(&self.id_signature);
436        buf.put_slice(&self.eph_pubkey);
437        if let Some(record) = &self.record {
438            record.encode(buf);
439        }
440
441        Ok(())
442    }
443
444    fn get_encoded_message(&self) -> Vec<u8> {
445        let mut message = Vec::new();
446        self.message.encode(&mut message);
447        message
448    }
449}
450
451impl Handshake {
452    /// Decodes a handshake packet, including decrypting the message.
453    pub fn decode(packet: &Packet, decrypt_key: &[u8]) -> Result<Handshake, PacketCodecError> {
454        let authdata = HandshakeAuthdata::decode(&packet.header.authdata)?;
455
456        let mut encrypted = packet.encrypted_message.to_vec();
457        decrypt_message(decrypt_key, packet, &mut encrypted)?;
458        let message = Message::decode(&encrypted)?;
459
460        Ok(Handshake {
461            src_id: authdata.src_id,
462            id_signature: authdata.id_signature,
463            eph_pubkey: authdata.eph_pubkey,
464            record: authdata.record,
465            message,
466        })
467    }
468
469    /// Creates a Handshake from pre-parsed authdata and a decrypted message.
470    /// Useful when authdata was already parsed for signature verification.
471    pub fn from_authdata(authdata: HandshakeAuthdata, message: Message) -> Self {
472        Handshake {
473            src_id: authdata.src_id,
474            id_signature: authdata.id_signature,
475            eph_pubkey: authdata.eph_pubkey,
476            record: authdata.record,
477            message,
478        }
479    }
480}
481
482#[derive(Debug, Eq, PartialEq, Clone)]
483pub enum Message {
484    Ping(PingMessage),
485    Pong(PongMessage),
486    FindNode(FindNodeMessage),
487    Nodes(NodesMessage),
488    TalkReq(TalkReqMessage),
489    TalkRes(TalkResMessage),
490    Ticket(TicketMessage),
491    // TODO: add the other messages
492}
493
494impl Message {
495    /// Returns a short, stable label suitable for use as a Prometheus metric label value.
496    pub fn metric_label(&self) -> &'static str {
497        match self {
498            Message::Ping(_) => "Ping",
499            Message::Pong(_) => "Pong",
500            Message::FindNode(_) => "FindNode",
501            Message::Nodes(_) => "Nodes",
502            Message::TalkReq(_) => "TalkReq",
503            Message::TalkRes(_) => "TalkRes",
504            Message::Ticket(_) => "Ticket",
505        }
506    }
507
508    fn msg_type(&self) -> u8 {
509        match self {
510            Message::Ping(_) => 0x01,
511            Message::Pong(_) => 0x02,
512            Message::FindNode(_) => 0x03,
513            Message::Nodes(_) => 0x04,
514            Message::TalkReq(_) => 0x05,
515            Message::TalkRes(_) => 0x06,
516            Message::Ticket(_) => 0x08,
517        }
518    }
519
520    pub fn encode(&self, buf: &mut dyn BufMut) {
521        buf.put_u8(self.msg_type());
522        match self {
523            Message::Ping(ping) => ping.encode(buf),
524            Message::Pong(pong) => pong.encode(buf),
525            Message::FindNode(find_node) => find_node.encode(buf),
526            Message::Nodes(nodes) => nodes.encode(buf),
527            Message::TalkReq(talk_req) => talk_req.encode(buf),
528            Message::TalkRes(talk_res) => talk_res.encode(buf),
529            Message::Ticket(ticket) => ticket.encode(buf),
530        }
531    }
532
533    pub fn decode(message: &[u8]) -> Result<Message, RLPDecodeError> {
534        let &message_type = message.first().ok_or(RLPDecodeError::InvalidLength)?;
535        match message_type {
536            0x01 => {
537                let ping = PingMessage::decode(&message[1..])?;
538                Ok(Message::Ping(ping))
539            }
540            0x02 => {
541                let pong = PongMessage::decode(&message[1..])?;
542                Ok(Message::Pong(pong))
543            }
544            0x03 => {
545                let find_node_msg = FindNodeMessage::decode(&message[1..])?;
546                Ok(Message::FindNode(find_node_msg))
547            }
548            0x04 => {
549                let nodes_msg = NodesMessage::decode(&message[1..])?;
550                Ok(Message::Nodes(nodes_msg))
551            }
552            0x05 => {
553                let talk_req_msg = TalkReqMessage::decode(&message[1..])?;
554                Ok(Message::TalkReq(talk_req_msg))
555            }
556            0x06 => {
557                let enr_response_msg = TalkResMessage::decode(&message[1..])?;
558                Ok(Message::TalkRes(enr_response_msg))
559            }
560            0x08 => {
561                let ticket_msg = TicketMessage::decode(&message[1..])?;
562                Ok(Message::Ticket(ticket_msg))
563            }
564            _ => Err(RLPDecodeError::MalformedData),
565        }
566    }
567}
568
569impl Display for Message {
570    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
571        match self {
572            Message::Ping(_) => write!(f, "Ping"),
573            Message::Pong(_) => write!(f, "Pong"),
574            Message::FindNode(_) => write!(f, "FindNode"),
575            Message::Nodes(_) => write!(f, "Nodes"),
576            Message::TalkReq(_) => write!(f, "TalkReq"),
577            Message::TalkRes(_) => write!(f, "TalkRes"),
578            Message::Ticket(_) => write!(f, "Ticket"),
579        }
580    }
581}
582
583#[derive(Debug, Clone, PartialEq, Eq)]
584pub struct PingMessage {
585    /// The request id of the sender.
586    pub req_id: Bytes,
587    /// The ENR sequence number of the sender.
588    pub enr_seq: u64,
589}
590
591impl PingMessage {
592    pub fn new(req_id: Bytes, enr_seq: u64) -> Self {
593        Self { req_id, enr_seq }
594    }
595}
596
597impl RLPEncode for PingMessage {
598    fn encode(&self, buf: &mut dyn BufMut) {
599        Encoder::new(buf)
600            .encode_field(&self.req_id)
601            .encode_field(&self.enr_seq)
602            .finish();
603    }
604}
605
606impl RLPDecode for PingMessage {
607    fn decode_unfinished(rlp: &[u8]) -> Result<(Self, &[u8]), RLPDecodeError> {
608        let decoder = Decoder::new(rlp)?;
609        let (req_id, decoder) = decoder.decode_field("req_id")?;
610        let (enr_seq, decoder) = decoder.decode_field("enr_seq")?;
611        let ping = PingMessage { req_id, enr_seq };
612        Ok((ping, decoder.finish()?))
613    }
614}
615
616#[derive(Debug, Clone, PartialEq, Eq)]
617pub struct PongMessage {
618    pub req_id: Bytes,
619    pub enr_seq: u64,
620    pub recipient_addr: SocketAddr,
621}
622
623impl RLPEncode for PongMessage {
624    fn encode(&self, buf: &mut dyn BufMut) {
625        Encoder::new(buf)
626            .encode_field(&self.req_id)
627            .encode_field(&self.enr_seq)
628            .encode_field(&self.recipient_addr.ip())
629            .encode_field(&self.recipient_addr.port())
630            .finish();
631    }
632}
633
634impl RLPDecode for PongMessage {
635    fn decode_unfinished(rlp: &[u8]) -> Result<(Self, &[u8]), RLPDecodeError> {
636        use std::net::IpAddr;
637        let decoder = Decoder::new(rlp)?;
638        let (req_id, decoder) = decoder.decode_field("req_id")?;
639        let (enr_seq, decoder) = decoder.decode_field("enr_seq")?;
640        let (recipient_ip, decoder): (IpAddr, _) = decoder.decode_field("recipient_ip")?;
641        let (recipient_port, decoder): (u16, _) = decoder.decode_field("recipient_port")?;
642
643        Ok((
644            Self {
645                req_id,
646                enr_seq,
647                recipient_addr: SocketAddr::new(recipient_ip, recipient_port),
648            },
649            decoder.finish()?,
650        ))
651    }
652}
653
654#[derive(Debug, Clone, PartialEq, Eq)]
655pub struct FindNodeMessage {
656    pub req_id: Bytes,
657    pub distances: Vec<u32>,
658}
659
660impl RLPEncode for FindNodeMessage {
661    fn encode(&self, buf: &mut dyn BufMut) {
662        Encoder::new(buf)
663            .encode_field(&self.req_id)
664            .encode_field(&self.distances)
665            .finish();
666    }
667}
668
669impl RLPDecode for FindNodeMessage {
670    fn decode_unfinished(rlp: &[u8]) -> Result<(Self, &[u8]), RLPDecodeError> {
671        let decoder = Decoder::new(rlp)?;
672        let (req_id, decoder) = decoder.decode_field("req_id")?;
673        let (distance, decoder) = decoder.decode_field("distance")?;
674
675        Ok((
676            Self {
677                req_id,
678                distances: distance,
679            },
680            decoder.finish()?,
681        ))
682    }
683}
684
685#[derive(Debug, Clone, PartialEq, Eq)]
686pub struct NodesMessage {
687    pub req_id: Bytes,
688    pub total: u64,
689    pub nodes: Vec<NodeRecord>,
690}
691
692impl RLPEncode for NodesMessage {
693    fn encode(&self, buf: &mut dyn BufMut) {
694        Encoder::new(buf)
695            .encode_field(&self.req_id)
696            .encode_field(&self.total)
697            .encode_field(&self.nodes)
698            .finish();
699    }
700}
701
702impl RLPDecode for NodesMessage {
703    fn decode_unfinished(rlp: &[u8]) -> Result<(Self, &[u8]), RLPDecodeError> {
704        let decoder = Decoder::new(rlp)?;
705        let (req_id, decoder) = decoder.decode_field("req_id")?;
706        let (total, decoder) = decoder.decode_field("total")?;
707        let (nodes, decoder) = decoder.decode_field("nodes")?;
708
709        Ok((
710            Self {
711                req_id,
712                total,
713                nodes,
714            },
715            decoder.finish()?,
716        ))
717    }
718}
719
720#[derive(Debug, Clone, PartialEq, Eq)]
721pub struct TalkReqMessage {
722    pub req_id: Bytes,
723    pub protocol: Bytes,
724    pub request: Bytes,
725}
726
727impl RLPEncode for TalkReqMessage {
728    fn encode(&self, buf: &mut dyn BufMut) {
729        Encoder::new(buf)
730            .encode_field(&self.req_id)
731            .encode_field(&self.protocol)
732            .encode_field(&self.request)
733            .finish();
734    }
735}
736
737impl RLPDecode for TalkReqMessage {
738    fn decode_unfinished(rlp: &[u8]) -> Result<(Self, &[u8]), RLPDecodeError> {
739        let decoder = Decoder::new(rlp)?;
740        let (req_id, decoder) = decoder.decode_field("req_id")?;
741        let (protocol, decoder) = decoder.decode_field("protocol")?;
742        let (request, decoder) = decoder.decode_field("request")?;
743
744        Ok((
745            Self {
746                req_id,
747                protocol,
748                request,
749            },
750            decoder.finish()?,
751        ))
752    }
753}
754
755#[derive(Debug, Clone, PartialEq, Eq)]
756pub struct TalkResMessage {
757    pub req_id: Bytes,
758    pub response: Vec<u8>,
759}
760
761impl RLPEncode for TalkResMessage {
762    fn encode(&self, buf: &mut dyn BufMut) {
763        Encoder::new(buf)
764            .encode_field(&self.req_id)
765            .encode_field(&Bytes::copy_from_slice(&self.response))
766            .finish();
767    }
768}
769
770impl RLPDecode for TalkResMessage {
771    fn decode_unfinished(rlp: &[u8]) -> Result<(Self, &[u8]), RLPDecodeError> {
772        let ((req_id, response), remaining) =
773            <(Bytes, Bytes) as RLPDecode>::decode_unfinished(rlp)?;
774
775        Ok((
776            Self {
777                req_id,
778                response: response.to_vec(),
779            },
780            remaining,
781        ))
782    }
783}
784
785#[derive(Debug, Clone, PartialEq, Eq)]
786pub struct TicketMessage {
787    pub req_id: Bytes,
788    pub ticket: Bytes,
789    pub wait_time: u64,
790}
791
792impl RLPEncode for TicketMessage {
793    fn encode(&self, buf: &mut dyn BufMut) {
794        Encoder::new(buf)
795            .encode_field(&self.req_id)
796            .encode_field(&self.ticket)
797            .encode_field(&self.wait_time)
798            .finish();
799    }
800}
801
802impl RLPDecode for TicketMessage {
803    fn decode_unfinished(rlp: &[u8]) -> Result<(Self, &[u8]), RLPDecodeError> {
804        let decoder = Decoder::new(rlp)?;
805        let (req_id, decoder) = decoder.decode_field("req_id")?;
806        let (ticket, decoder) = decoder.decode_field("ticket")?;
807        let (wait_time, decoder) = decoder.decode_field("wait_time")?;
808
809        Ok((
810            Self {
811                req_id,
812                ticket,
813                wait_time,
814            },
815            decoder.finish()?,
816        ))
817    }
818}