radicle_node/wire/
message.rs

1use std::{io, mem, net};
2
3use byteorder::{NetworkEndian, ReadBytesExt};
4use cyphernet::addr::{tor, Addr, HostName, NetAddr};
5use radicle::git::Oid;
6use radicle::node::Address;
7
8use crate::prelude::*;
9use crate::service::message::*;
10use crate::wire;
11use crate::wire::{Decode, Encode};
12
13/// Message type.
14#[repr(u16)]
15#[derive(Debug, Clone, Copy, PartialEq, Eq)]
16pub enum MessageType {
17    NodeAnnouncement = 2,
18    InventoryAnnouncement = 4,
19    RefsAnnouncement = 6,
20    Subscribe = 8,
21    Ping = 10,
22    Pong = 12,
23    Info = 14,
24}
25
26impl From<MessageType> for u16 {
27    fn from(other: MessageType) -> Self {
28        other as u16
29    }
30}
31
32impl TryFrom<u16> for MessageType {
33    type Error = u16;
34
35    fn try_from(other: u16) -> Result<Self, Self::Error> {
36        match other {
37            2 => Ok(MessageType::NodeAnnouncement),
38            4 => Ok(MessageType::InventoryAnnouncement),
39            6 => Ok(MessageType::RefsAnnouncement),
40            8 => Ok(MessageType::Subscribe),
41            10 => Ok(MessageType::Ping),
42            12 => Ok(MessageType::Pong),
43            14 => Ok(MessageType::Info),
44            _ => Err(other),
45        }
46    }
47}
48
49impl Message {
50    /// The maximum supported message size in bytes.
51    pub const MAX_SIZE: wire::Size =
52        wire::Size::MAX - (mem::size_of::<MessageType>() as wire::Size);
53
54    pub fn type_id(&self) -> u16 {
55        match self {
56            Self::Subscribe { .. } => MessageType::Subscribe,
57            Self::Announcement(Announcement { message, .. }) => match message {
58                AnnouncementMessage::Node(_) => MessageType::NodeAnnouncement,
59                AnnouncementMessage::Inventory(_) => MessageType::InventoryAnnouncement,
60                AnnouncementMessage::Refs(_) => MessageType::RefsAnnouncement,
61            },
62            Self::Info(_) => MessageType::Info,
63            Self::Ping { .. } => MessageType::Ping,
64            Self::Pong { .. } => MessageType::Pong,
65        }
66        .into()
67    }
68}
69
70impl netservices::Frame for Message {
71    type Error = wire::Error;
72
73    fn unmarshall(mut reader: impl io::Read) -> Result<Option<Self>, Self::Error> {
74        match Message::decode(&mut reader) {
75            Ok(msg) => Ok(Some(msg)),
76            Err(wire::Error::Io(_)) => Ok(None),
77            Err(err) => Err(err),
78        }
79    }
80
81    fn marshall(&self, mut writer: impl io::Write) -> Result<usize, Self::Error> {
82        self.encode(&mut writer).map_err(wire::Error::from)
83    }
84}
85
86/// Address type.
87#[repr(u8)]
88#[derive(Debug, Clone, Copy, PartialEq, Eq)]
89pub enum AddressType {
90    Ipv4 = 1,
91    Ipv6 = 2,
92    Dns = 3,
93    Onion = 4,
94}
95
96impl From<AddressType> for u8 {
97    fn from(other: AddressType) -> Self {
98        other as u8
99    }
100}
101
102impl From<&Address> for AddressType {
103    fn from(a: &Address) -> Self {
104        match a.host {
105            HostName::Ip(net::IpAddr::V4(_)) => AddressType::Ipv4,
106            HostName::Ip(net::IpAddr::V6(_)) => AddressType::Ipv6,
107            HostName::Dns(_) => AddressType::Dns,
108            HostName::Tor(_) => AddressType::Onion,
109            _ => todo!(), // FIXME(cloudhead): Maxim will remove `non-exhaustive`
110        }
111    }
112}
113
114impl TryFrom<u8> for AddressType {
115    type Error = u8;
116
117    fn try_from(other: u8) -> Result<Self, Self::Error> {
118        match other {
119            1 => Ok(AddressType::Ipv4),
120            2 => Ok(AddressType::Ipv6),
121            3 => Ok(AddressType::Dns),
122            4 => Ok(AddressType::Onion),
123            _ => Err(other),
124        }
125    }
126}
127
128impl wire::Encode for AnnouncementMessage {
129    fn encode<W: std::io::Write + ?Sized>(&self, writer: &mut W) -> Result<usize, std::io::Error> {
130        match self {
131            Self::Node(ann) => ann.encode(writer),
132            Self::Inventory(ann) => ann.encode(writer),
133            Self::Refs(ann) => ann.encode(writer),
134        }
135    }
136}
137
138impl wire::Encode for RefsAnnouncement {
139    fn encode<W: io::Write + ?Sized>(&self, writer: &mut W) -> Result<usize, io::Error> {
140        let mut n = 0;
141
142        n += self.rid.encode(writer)?;
143        n += self.refs.encode(writer)?;
144        n += self.timestamp.encode(writer)?;
145
146        Ok(n)
147    }
148}
149
150impl wire::Decode for RefsAnnouncement {
151    fn decode<R: std::io::Read + ?Sized>(reader: &mut R) -> Result<Self, wire::Error> {
152        let rid = RepoId::decode(reader)?;
153        let refs = BoundedVec::<_, REF_REMOTE_LIMIT>::decode(reader)?;
154        let timestamp = Timestamp::decode(reader)?;
155
156        Ok(Self {
157            rid,
158            refs,
159            timestamp,
160        })
161    }
162}
163
164impl wire::Encode for InventoryAnnouncement {
165    fn encode<W: io::Write + ?Sized>(&self, writer: &mut W) -> Result<usize, io::Error> {
166        let mut n = 0;
167
168        n += self.inventory.encode(writer)?;
169        n += self.timestamp.encode(writer)?;
170
171        Ok(n)
172    }
173}
174
175impl wire::Decode for InventoryAnnouncement {
176    fn decode<R: std::io::Read + ?Sized>(reader: &mut R) -> Result<Self, wire::Error> {
177        let inventory = BoundedVec::decode(reader)?;
178        let timestamp = Timestamp::decode(reader)?;
179
180        Ok(Self {
181            inventory,
182            timestamp,
183        })
184    }
185}
186
187/// The type tracking the different variants of [`Info`] for encoding and
188/// decoding purposes.
189#[repr(u8)]
190#[derive(Debug, Clone, Copy, PartialEq, Eq)]
191pub enum InfoType {
192    RefsAlreadySynced = 1,
193}
194
195impl From<InfoType> for u16 {
196    fn from(other: InfoType) -> Self {
197        other as u16
198    }
199}
200
201impl TryFrom<u16> for InfoType {
202    type Error = u16;
203
204    fn try_from(other: u16) -> Result<Self, Self::Error> {
205        match other {
206            1 => Ok(Self::RefsAlreadySynced),
207            n => Err(n),
208        }
209    }
210}
211
212impl From<Info> for InfoType {
213    fn from(info: Info) -> Self {
214        (&info).into()
215    }
216}
217
218impl From<&Info> for InfoType {
219    fn from(info: &Info) -> Self {
220        match info {
221            Info::RefsAlreadySynced { .. } => Self::RefsAlreadySynced,
222        }
223    }
224}
225
226impl wire::Encode for Info {
227    fn encode<W: io::Write + ?Sized>(&self, writer: &mut W) -> Result<usize, io::Error> {
228        let mut n = 0;
229        n += u16::from(InfoType::from(self)).encode(writer)?;
230        match self {
231            Info::RefsAlreadySynced { rid, at } => {
232                n += rid.encode(writer)?;
233                n += at.encode(writer)?;
234            }
235        }
236
237        Ok(n)
238    }
239}
240
241impl wire::Decode for Info {
242    fn decode<R: std::io::Read + ?Sized>(reader: &mut R) -> Result<Self, wire::Error> {
243        let info_type = reader.read_u16::<NetworkEndian>()?;
244
245        match InfoType::try_from(info_type) {
246            Ok(InfoType::RefsAlreadySynced) => {
247                let rid = RepoId::decode(reader)?;
248                let at = Oid::decode(reader)?;
249
250                Ok(Self::RefsAlreadySynced { rid, at })
251            }
252            Err(other) => Err(wire::Error::UnknownInfoType(other)),
253        }
254    }
255}
256
257impl wire::Encode for Message {
258    fn encode<W: std::io::Write + ?Sized>(&self, writer: &mut W) -> Result<usize, std::io::Error> {
259        let mut n = self.type_id().encode(writer)?;
260
261        match self {
262            Self::Subscribe(Subscribe {
263                filter,
264                since,
265                until,
266            }) => {
267                n += filter.encode(writer)?;
268                n += since.encode(writer)?;
269                n += until.encode(writer)?;
270            }
271            Self::Announcement(Announcement {
272                node,
273                message,
274                signature,
275            }) => {
276                n += node.encode(writer)?;
277                n += signature.encode(writer)?;
278                n += message.encode(writer)?;
279            }
280            Self::Info(info) => {
281                n += info.encode(writer)?;
282            }
283            Self::Ping(Ping { ponglen, zeroes }) => {
284                n += ponglen.encode(writer)?;
285                n += zeroes.encode(writer)?;
286            }
287            Self::Pong { zeroes } => {
288                n += zeroes.encode(writer)?;
289            }
290        }
291
292        if n > wire::Size::MAX as usize {
293            return Err(io::Error::new(
294                io::ErrorKind::InvalidData,
295                "Message exceeds maximum size",
296            ));
297        }
298        Ok(n)
299    }
300}
301
302impl wire::Decode for Message {
303    fn decode<R: std::io::Read + ?Sized>(reader: &mut R) -> Result<Self, wire::Error> {
304        let type_id = reader.read_u16::<NetworkEndian>()?;
305
306        match MessageType::try_from(type_id) {
307            Ok(MessageType::Subscribe) => {
308                let filter = Filter::decode(reader)?;
309                let since = Timestamp::decode(reader)?;
310                let until = Timestamp::decode(reader)?;
311
312                Ok(Self::Subscribe(Subscribe {
313                    filter,
314                    since,
315                    until,
316                }))
317            }
318            Ok(MessageType::NodeAnnouncement) => {
319                let node = NodeId::decode(reader)?;
320                let signature = Signature::decode(reader)?;
321                let message = NodeAnnouncement::decode(reader)?.into();
322
323                Ok(Announcement {
324                    node,
325                    message,
326                    signature,
327                }
328                .into())
329            }
330            Ok(MessageType::InventoryAnnouncement) => {
331                let node = NodeId::decode(reader)?;
332                let signature = Signature::decode(reader)?;
333                let message = InventoryAnnouncement::decode(reader)?.into();
334
335                Ok(Announcement {
336                    node,
337                    message,
338                    signature,
339                }
340                .into())
341            }
342            Ok(MessageType::RefsAnnouncement) => {
343                let node = NodeId::decode(reader)?;
344                let signature = Signature::decode(reader)?;
345                let message = RefsAnnouncement::decode(reader)?.into();
346
347                Ok(Announcement {
348                    node,
349                    message,
350                    signature,
351                }
352                .into())
353            }
354            Ok(MessageType::Info) => {
355                let info = Info::decode(reader)?;
356                Ok(Self::Info(info))
357            }
358            Ok(MessageType::Ping) => {
359                let ponglen = u16::decode(reader)?;
360                let zeroes = ZeroBytes::decode(reader)?;
361                Ok(Self::Ping(Ping { ponglen, zeroes }))
362            }
363            Ok(MessageType::Pong) => {
364                let zeroes = ZeroBytes::decode(reader)?;
365                Ok(Self::Pong { zeroes })
366            }
367            Err(other) => Err(wire::Error::UnknownMessageType(other)),
368        }
369    }
370}
371
372impl wire::Encode for Address {
373    fn encode<W: std::io::Write + ?Sized>(&self, writer: &mut W) -> Result<usize, std::io::Error> {
374        let mut n = 0;
375
376        match self.host {
377            HostName::Ip(net::IpAddr::V4(ip)) => {
378                n += u8::from(AddressType::Ipv4).encode(writer)?;
379                n += ip.octets().encode(writer)?;
380            }
381            HostName::Ip(net::IpAddr::V6(ip)) => {
382                n += u8::from(AddressType::Ipv6).encode(writer)?;
383                n += ip.octets().encode(writer)?;
384            }
385            HostName::Dns(ref dns) => {
386                n += u8::from(AddressType::Dns).encode(writer)?;
387                n += dns.encode(writer)?;
388            }
389            HostName::Tor(addr) => {
390                n += u8::from(AddressType::Onion).encode(writer)?;
391                n += addr.encode(writer)?;
392            }
393            _ => {
394                return Err(io::ErrorKind::Unsupported.into());
395            }
396        }
397        n += self.port().encode(writer)?;
398
399        Ok(n)
400    }
401}
402
403impl wire::Decode for Address {
404    fn decode<R: std::io::Read + ?Sized>(reader: &mut R) -> Result<Self, wire::Error> {
405        let addrtype = reader.read_u8()?;
406        let host = match AddressType::try_from(addrtype) {
407            Ok(AddressType::Ipv4) => {
408                let octets: [u8; 4] = wire::Decode::decode(reader)?;
409                let ip = net::Ipv4Addr::from(octets);
410
411                HostName::Ip(net::IpAddr::V4(ip))
412            }
413            Ok(AddressType::Ipv6) => {
414                let octets: [u8; 16] = wire::Decode::decode(reader)?;
415                let ip = net::Ipv6Addr::from(octets);
416
417                HostName::Ip(net::IpAddr::V6(ip))
418            }
419            Ok(AddressType::Dns) => {
420                let dns: String = wire::Decode::decode(reader)?;
421
422                HostName::Dns(dns)
423            }
424            Ok(AddressType::Onion) => {
425                let onion: tor::OnionAddrV3 = wire::Decode::decode(reader)?;
426
427                HostName::Tor(onion)
428            }
429            Err(other) => return Err(wire::Error::UnknownAddressType(other)),
430        };
431        let port = u16::decode(reader)?;
432
433        Ok(Self::from(NetAddr { host, port }))
434    }
435}
436
437impl wire::Encode for ZeroBytes {
438    fn encode<W: io::Write + ?Sized>(&self, writer: &mut W) -> Result<usize, io::Error> {
439        let mut n = (self.len() as u16).encode(writer)?;
440        for _ in 0..self.len() {
441            n += 0u8.encode(writer)?;
442        }
443        Ok(n)
444    }
445}
446
447impl wire::Decode for ZeroBytes {
448    fn decode<R: std::io::Read + ?Sized>(reader: &mut R) -> Result<Self, wire::Error> {
449        let zeroes = u16::decode(reader)?;
450        for _ in 0..zeroes {
451            _ = u8::decode(reader)?;
452        }
453        Ok(ZeroBytes::new(zeroes))
454    }
455}
456
457#[cfg(test)]
458mod tests {
459    use super::*;
460    use qcheck_macros::quickcheck;
461    use radicle::node::UserAgent;
462    use radicle::storage::refs::RefsAt;
463    use radicle_crypto::test::signer::MockSigner;
464
465    use crate::deserializer::Deserializer;
466    use crate::test::arbitrary;
467    use crate::wire::{self, Encode};
468
469    #[test]
470    fn test_refs_ann_max_size() {
471        let signer = MockSigner::default();
472        let refs: [RefsAt; REF_REMOTE_LIMIT] = arbitrary::gen(1);
473        let ann = AnnouncementMessage::Refs(RefsAnnouncement {
474            rid: arbitrary::gen(1),
475            refs: BoundedVec::collect_from(&mut refs.into_iter()),
476            timestamp: arbitrary::gen(1),
477        });
478        let ann = ann.signed(&signer);
479        let msg = Message::Announcement(ann);
480        let data = wire::serialize(&msg);
481
482        assert!(data.len() < wire::Size::MAX as usize);
483    }
484
485    #[test]
486    fn test_inv_ann_max_size() {
487        let signer = MockSigner::default();
488        let inv: [RepoId; INVENTORY_LIMIT] = arbitrary::gen(1);
489        let ann = AnnouncementMessage::Inventory(InventoryAnnouncement {
490            inventory: BoundedVec::collect_from(&mut inv.into_iter()),
491            timestamp: arbitrary::gen(1),
492        });
493        let ann = ann.signed(&signer);
494        let msg = Message::Announcement(ann);
495        let data = wire::serialize(&msg);
496
497        assert!(data.len() < wire::Size::MAX as usize);
498    }
499
500    #[test]
501    fn test_node_ann_max_size() {
502        let signer = MockSigner::default();
503        let addrs: [Address; ADDRESS_LIMIT] = arbitrary::gen(1);
504        let alias = ['@'; radicle::node::MAX_ALIAS_LENGTH];
505        let ann = AnnouncementMessage::Node(NodeAnnouncement {
506            version: 1,
507            features: Default::default(),
508            alias: radicle::node::Alias::new(String::from_iter(alias)),
509            addresses: BoundedVec::collect_from(&mut addrs.into_iter()),
510            timestamp: arbitrary::gen(1),
511            nonce: u64::MAX,
512            agent: UserAgent::default(),
513        });
514        let ann = ann.signed(&signer);
515        let msg = Message::Announcement(ann);
516        let data = wire::serialize(&msg);
517
518        assert!(data.len() < wire::Size::MAX as usize);
519    }
520
521    #[test]
522    fn test_pingpong_encode_max_size() {
523        let mut buf = Vec::new();
524
525        let ping = Message::Ping(Ping {
526            ponglen: 0,
527            zeroes: ZeroBytes::new(Ping::MAX_PING_ZEROES),
528        });
529        ping.encode(&mut buf)
530            .expect("ping should be within max message size");
531
532        let pong = Message::Pong {
533            zeroes: ZeroBytes::new(Ping::MAX_PONG_ZEROES),
534        };
535        pong.encode(&mut buf)
536            .expect("pong should be within max message size");
537    }
538
539    #[test]
540    fn test_pingpong_encode_size_overflow() {
541        let ping = Message::Ping(Ping {
542            ponglen: 0,
543            zeroes: ZeroBytes::new(Ping::MAX_PING_ZEROES + 1),
544        });
545
546        let mut buf = Vec::new();
547        ping.encode(&mut buf)
548            .expect_err("ping should exceed max message size");
549
550        let pong = Message::Pong {
551            zeroes: ZeroBytes::new(Ping::MAX_PONG_ZEROES + 1),
552        };
553
554        let mut buf = Vec::new();
555        pong.encode(&mut buf)
556            .expect_err("pong should exceed max message size");
557    }
558
559    #[quickcheck]
560    fn prop_message_encode_decode(message: Message) {
561        let encoded = &wire::serialize(&message);
562        let decoded = wire::deserialize::<Message>(encoded).unwrap();
563
564        assert_eq!(message, decoded);
565    }
566
567    #[test]
568    fn prop_message_decoder() {
569        fn property(items: Vec<Message>) {
570            let mut decoder = Deserializer::<1048576, Message>::new(8);
571
572            for item in &items {
573                item.encode(&mut decoder).unwrap();
574            }
575            for item in items {
576                assert_eq!(decoder.next().unwrap().unwrap(), item);
577            }
578        }
579
580        qcheck::QuickCheck::new()
581            .gen(qcheck::Gen::new(16))
582            .quickcheck(property as fn(items: Vec<Message>));
583    }
584
585    #[quickcheck]
586    fn prop_zero_bytes_encode_decode(zeroes: ZeroBytes) {
587        assert_eq!(
588            wire::deserialize::<ZeroBytes>(&wire::serialize(&zeroes)).unwrap(),
589            zeroes
590        );
591    }
592
593    #[quickcheck]
594    fn prop_addr(addr: Address) {
595        assert_eq!(
596            wire::deserialize::<Address>(&wire::serialize(&addr)).unwrap(),
597            addr
598        );
599    }
600}