radicle_protocol/wire/
message.rs

1use std::{mem, net};
2
3use bytes::Buf;
4use bytes::BufMut;
5use cyphernet::addr::{tor, HostName, NetAddr};
6use radicle::crypto::Signature;
7use radicle::git::Oid;
8use radicle::identity::RepoId;
9use radicle::node::Address;
10use radicle::node::NodeId;
11use radicle::node::Timestamp;
12
13use crate::bounded::BoundedVec;
14use crate::service::filter::Filter;
15use crate::service::message::*;
16use crate::wire;
17
18/// Message type.
19#[repr(u16)]
20#[derive(Debug, Clone, Copy, PartialEq, Eq)]
21pub enum MessageType {
22    NodeAnnouncement = 2,
23    InventoryAnnouncement = 4,
24    RefsAnnouncement = 6,
25    Subscribe = 8,
26    Ping = 10,
27    Pong = 12,
28    Info = 14,
29}
30
31impl From<MessageType> for u16 {
32    fn from(other: MessageType) -> Self {
33        other as u16
34    }
35}
36
37impl TryFrom<u16> for MessageType {
38    type Error = u16;
39
40    fn try_from(other: u16) -> Result<Self, Self::Error> {
41        match other {
42            2 => Ok(MessageType::NodeAnnouncement),
43            4 => Ok(MessageType::InventoryAnnouncement),
44            6 => Ok(MessageType::RefsAnnouncement),
45            8 => Ok(MessageType::Subscribe),
46            10 => Ok(MessageType::Ping),
47            12 => Ok(MessageType::Pong),
48            14 => Ok(MessageType::Info),
49            _ => Err(other),
50        }
51    }
52}
53
54impl Message {
55    /// The maximum supported message size in bytes.
56    pub const MAX_SIZE: wire::Size =
57        wire::Size::MAX - (mem::size_of::<MessageType>() as wire::Size);
58
59    pub fn type_id(&self) -> u16 {
60        match self {
61            Self::Subscribe { .. } => MessageType::Subscribe,
62            Self::Announcement(Announcement { message, .. }) => match message {
63                AnnouncementMessage::Node(_) => MessageType::NodeAnnouncement,
64                AnnouncementMessage::Inventory(_) => MessageType::InventoryAnnouncement,
65                AnnouncementMessage::Refs(_) => MessageType::RefsAnnouncement,
66            },
67            Self::Info(_) => MessageType::Info,
68            Self::Ping { .. } => MessageType::Ping,
69            Self::Pong { .. } => MessageType::Pong,
70        }
71        .into()
72    }
73}
74
75/// Address type.
76#[repr(u8)]
77#[derive(Debug, Clone, Copy, PartialEq, Eq)]
78pub enum AddressType {
79    Ipv4 = 1,
80    Ipv6 = 2,
81    Dns = 3,
82    Onion = 4,
83}
84
85impl From<AddressType> for u8 {
86    fn from(other: AddressType) -> Self {
87        other as u8
88    }
89}
90
91impl From<&Address> for AddressType {
92    fn from(a: &Address) -> Self {
93        match a.host {
94            HostName::Ip(net::IpAddr::V4(_)) => AddressType::Ipv4,
95            HostName::Ip(net::IpAddr::V6(_)) => AddressType::Ipv6,
96            HostName::Dns(_) => AddressType::Dns,
97            HostName::Tor(_) => AddressType::Onion,
98            _ => todo!(), // FIXME(cloudhead): Maxim will remove `non-exhaustive`
99        }
100    }
101}
102
103impl TryFrom<u8> for AddressType {
104    type Error = u8;
105
106    fn try_from(other: u8) -> Result<Self, Self::Error> {
107        match other {
108            1 => Ok(AddressType::Ipv4),
109            2 => Ok(AddressType::Ipv6),
110            3 => Ok(AddressType::Dns),
111            4 => Ok(AddressType::Onion),
112            _ => Err(other),
113        }
114    }
115}
116
117impl wire::Encode for AnnouncementMessage {
118    fn encode(&self, buf: &mut impl BufMut) {
119        match self {
120            Self::Node(ann) => ann.encode(buf),
121            Self::Inventory(ann) => ann.encode(buf),
122            Self::Refs(ann) => ann.encode(buf),
123        }
124    }
125}
126
127impl wire::Encode for RefsAnnouncement {
128    fn encode(&self, buf: &mut impl BufMut) {
129        self.rid.encode(buf);
130        self.refs.encode(buf);
131        self.timestamp.encode(buf);
132    }
133}
134
135impl wire::Decode for RefsAnnouncement {
136    fn decode(buf: &mut impl Buf) -> Result<Self, wire::Error> {
137        let rid = RepoId::decode(buf)?;
138        let refs = BoundedVec::<_, REF_REMOTE_LIMIT>::decode(buf)?;
139        let timestamp = Timestamp::decode(buf)?;
140
141        Ok(Self {
142            rid,
143            refs,
144            timestamp,
145        })
146    }
147}
148
149impl wire::Encode for InventoryAnnouncement {
150    fn encode(&self, buf: &mut impl BufMut) {
151        self.inventory.encode(buf);
152        self.timestamp.encode(buf);
153    }
154}
155
156impl wire::Decode for InventoryAnnouncement {
157    fn decode(buf: &mut impl Buf) -> Result<Self, wire::Error> {
158        let inventory = BoundedVec::decode(buf)?;
159        let timestamp = Timestamp::decode(buf)?;
160
161        Ok(Self {
162            inventory,
163            timestamp,
164        })
165    }
166}
167
168/// The type tracking the different variants of [`Info`] for encoding and
169/// decoding purposes.
170#[repr(u8)]
171#[derive(Debug, Clone, Copy, PartialEq, Eq)]
172pub enum InfoType {
173    RefsAlreadySynced = 1,
174}
175
176impl From<InfoType> for u16 {
177    fn from(other: InfoType) -> Self {
178        other as u16
179    }
180}
181
182impl TryFrom<u16> for InfoType {
183    type Error = u16;
184
185    fn try_from(other: u16) -> Result<Self, Self::Error> {
186        match other {
187            1 => Ok(Self::RefsAlreadySynced),
188            n => Err(n),
189        }
190    }
191}
192
193impl From<Info> for InfoType {
194    fn from(info: Info) -> Self {
195        (&info).into()
196    }
197}
198
199impl From<&Info> for InfoType {
200    fn from(info: &Info) -> Self {
201        match info {
202            Info::RefsAlreadySynced { .. } => Self::RefsAlreadySynced,
203        }
204    }
205}
206
207impl wire::Encode for Info {
208    fn encode(&self, buf: &mut impl BufMut) {
209        u16::from(InfoType::from(self)).encode(buf);
210        match self {
211            Info::RefsAlreadySynced { rid, at } => {
212                rid.encode(buf);
213                at.encode(buf);
214            }
215        }
216    }
217}
218
219impl wire::Decode for Info {
220    fn decode(buf: &mut impl Buf) -> Result<Self, wire::Error> {
221        let info_type = buf.try_get_u16()?;
222
223        match InfoType::try_from(info_type) {
224            Ok(InfoType::RefsAlreadySynced) => {
225                let rid = RepoId::decode(buf)?;
226                let at = Oid::decode(buf)?;
227
228                Ok(Self::RefsAlreadySynced { rid, at })
229            }
230            Err(other) => Err(wire::Error::UnknownInfoType(other)),
231        }
232    }
233}
234
235impl wire::Encode for Message {
236    fn encode(&self, buf: &mut impl BufMut) {
237        self.type_id().encode(buf);
238
239        match self {
240            Self::Subscribe(Subscribe {
241                filter,
242                since,
243                until,
244            }) => {
245                filter.encode(buf);
246                since.encode(buf);
247                until.encode(buf);
248            }
249            Self::Announcement(Announcement {
250                node,
251                message,
252                signature,
253            }) => {
254                node.encode(buf);
255                signature.encode(buf);
256                message.encode(buf);
257            }
258            Self::Info(info) => {
259                info.encode(buf);
260            }
261            Self::Ping(Ping { ponglen, zeroes }) => {
262                ponglen.encode(buf);
263                zeroes.encode(buf);
264            }
265            Self::Pong { zeroes } => {
266                zeroes.encode(buf);
267            }
268        }
269    }
270}
271
272impl wire::Decode for Message {
273    fn decode(buf: &mut impl Buf) -> Result<Self, wire::Error> {
274        let type_id = buf.try_get_u16()?;
275
276        match MessageType::try_from(type_id) {
277            Ok(MessageType::Subscribe) => {
278                let filter = Filter::decode(buf)?;
279                let since = Timestamp::decode(buf)?;
280                let until = Timestamp::decode(buf)?;
281
282                Ok(Self::Subscribe(Subscribe {
283                    filter,
284                    since,
285                    until,
286                }))
287            }
288            Ok(MessageType::NodeAnnouncement) => {
289                let node = NodeId::decode(buf)?;
290                let signature = Signature::decode(buf)?;
291                let message = NodeAnnouncement::decode(buf)?.into();
292
293                Ok(Announcement {
294                    node,
295                    message,
296                    signature,
297                }
298                .into())
299            }
300            Ok(MessageType::InventoryAnnouncement) => {
301                let node = NodeId::decode(buf)?;
302                let signature = Signature::decode(buf)?;
303                let message = InventoryAnnouncement::decode(buf)?.into();
304
305                Ok(Announcement {
306                    node,
307                    message,
308                    signature,
309                }
310                .into())
311            }
312            Ok(MessageType::RefsAnnouncement) => {
313                let node = NodeId::decode(buf)?;
314                let signature = Signature::decode(buf)?;
315                let message = RefsAnnouncement::decode(buf)?.into();
316
317                Ok(Announcement {
318                    node,
319                    message,
320                    signature,
321                }
322                .into())
323            }
324            Ok(MessageType::Info) => {
325                let info = Info::decode(buf)?;
326                Ok(Self::Info(info))
327            }
328            Ok(MessageType::Ping) => {
329                let ponglen = u16::decode(buf)?;
330                let zeroes = ZeroBytes::decode(buf)?;
331                Ok(Self::Ping(Ping { ponglen, zeroes }))
332            }
333            Ok(MessageType::Pong) => {
334                let zeroes = ZeroBytes::decode(buf)?;
335                Ok(Self::Pong { zeroes })
336            }
337            Err(other) => Err(wire::Error::UnknownMessageType(other)),
338        }
339    }
340}
341
342impl wire::Encode for Address {
343    fn encode(&self, buf: &mut impl BufMut) {
344        match self.host {
345            HostName::Ip(net::IpAddr::V4(ip)) => {
346                u8::from(AddressType::Ipv4).encode(buf);
347                ip.octets().encode(buf);
348            }
349            HostName::Ip(net::IpAddr::V6(ip)) => {
350                u8::from(AddressType::Ipv6).encode(buf);
351                ip.octets().encode(buf);
352            }
353            HostName::Dns(ref dns) => {
354                u8::from(AddressType::Dns).encode(buf);
355                dns.encode(buf);
356            }
357            HostName::Tor(addr) => {
358                u8::from(AddressType::Onion).encode(buf);
359                addr.encode(buf);
360            }
361            _ => {
362                unimplemented!(
363                    "Encoding not defined for addresses of the same type as the following: {:?}",
364                    self.host
365                );
366            }
367        }
368        self.port().encode(buf);
369    }
370}
371
372impl wire::Decode for Address {
373    fn decode(buf: &mut impl Buf) -> Result<Self, wire::Error> {
374        let addrtype = buf.try_get_u8()?;
375
376        let host = match AddressType::try_from(addrtype) {
377            Ok(AddressType::Ipv4) => {
378                let octets: [u8; 4] = wire::Decode::decode(buf)?;
379                let ip = net::Ipv4Addr::from(octets);
380
381                HostName::Ip(net::IpAddr::V4(ip))
382            }
383            Ok(AddressType::Ipv6) => {
384                let octets: [u8; 16] = wire::Decode::decode(buf)?;
385                let ip = net::Ipv6Addr::from(octets);
386
387                HostName::Ip(net::IpAddr::V6(ip))
388            }
389            Ok(AddressType::Dns) => {
390                let dns: String = wire::Decode::decode(buf)?;
391
392                HostName::Dns(dns)
393            }
394            Ok(AddressType::Onion) => {
395                let onion: tor::OnionAddrV3 = wire::Decode::decode(buf)?;
396
397                HostName::Tor(onion)
398            }
399            Err(other) => return Err(wire::Error::UnknownAddressType(other)),
400        };
401        let port = u16::decode(buf)?;
402
403        Ok(Self::from(NetAddr { host, port }))
404    }
405}
406
407impl wire::Encode for ZeroBytes {
408    fn encode(&self, buf: &mut impl BufMut) {
409        (self.len() as u16).encode(buf);
410        buf.put_bytes(0u8, self.len());
411    }
412}
413
414impl wire::Decode for ZeroBytes {
415    fn decode(buf: &mut impl Buf) -> Result<Self, wire::Error> {
416        let zeroes = u16::decode(buf)?;
417        for _ in 0..zeroes {
418            _ = u8::decode(buf)?;
419        }
420        Ok(ZeroBytes::new(zeroes))
421    }
422}
423
424#[cfg(test)]
425mod tests {
426    use super::*;
427    use qcheck_macros::quickcheck;
428    use radicle::node::device::Device;
429    use radicle::node::UserAgent;
430    use radicle::storage::refs::RefsAt;
431
432    use crate::deserializer::Deserializer;
433    use crate::wire::{self, Encode};
434    use radicle::test::arbitrary;
435
436    #[test]
437    fn test_refs_ann_max_size() {
438        let signer = Device::mock();
439        let refs: [RefsAt; REF_REMOTE_LIMIT] = arbitrary::gen(1);
440        let ann = AnnouncementMessage::Refs(RefsAnnouncement {
441            rid: arbitrary::gen(1),
442            refs: BoundedVec::collect_from(&mut refs.into_iter()),
443            timestamp: arbitrary::gen(1),
444        });
445        let ann = ann.signed(&signer);
446        let msg = Message::Announcement(ann);
447        let data = wire::serialize(&msg);
448
449        assert!(data.len() < wire::Size::MAX as usize);
450    }
451
452    #[test]
453    fn test_inv_ann_max_size() {
454        let signer = Device::mock();
455        let inv: [RepoId; INVENTORY_LIMIT] = arbitrary::gen(1);
456        let ann = AnnouncementMessage::Inventory(InventoryAnnouncement {
457            inventory: BoundedVec::collect_from(&mut inv.into_iter()),
458            timestamp: arbitrary::gen(1),
459        });
460        let ann = ann.signed(&signer);
461        let msg = Message::Announcement(ann);
462        let data = wire::serialize(&msg);
463
464        assert!(data.len() < wire::Size::MAX as usize);
465    }
466
467    #[test]
468    fn test_node_ann_max_size() {
469        let signer = Device::mock();
470        let addrs: [Address; ADDRESS_LIMIT] = arbitrary::gen(1);
471        let alias = ['@'; radicle::node::MAX_ALIAS_LENGTH];
472        let ann = AnnouncementMessage::Node(NodeAnnouncement {
473            version: 1,
474            features: Default::default(),
475            alias: radicle::node::Alias::new(String::from_iter(alias)),
476            addresses: BoundedVec::collect_from(&mut addrs.into_iter()),
477            timestamp: arbitrary::gen(1),
478            nonce: u64::MAX,
479            agent: UserAgent::default(),
480        });
481        let ann = ann.signed(&signer);
482        let msg = Message::Announcement(ann);
483        let data = wire::serialize(&msg);
484
485        assert!(data.len() < wire::Size::MAX as usize);
486    }
487
488    #[test]
489    fn test_pingpong_encode_max_size() {
490        wire::serialize(&Message::Ping(Ping {
491            ponglen: 0,
492            zeroes: ZeroBytes::new(Ping::MAX_PING_ZEROES),
493        }));
494
495        wire::serialize(&Message::Pong {
496            zeroes: ZeroBytes::new(Ping::MAX_PONG_ZEROES),
497        });
498    }
499
500    #[test]
501    #[should_panic(expected = "advance out of bounds")]
502    fn test_ping_encode_size_overflow() {
503        wire::serialize(&Message::Ping(Ping {
504            ponglen: 0,
505            zeroes: ZeroBytes::new(Ping::MAX_PING_ZEROES + 1),
506        }));
507    }
508
509    #[test]
510    #[should_panic(expected = "advance out of bounds")]
511    fn test_pong_encode_size_overflow() {
512        wire::serialize(&Message::Pong {
513            zeroes: ZeroBytes::new(Ping::MAX_PONG_ZEROES + 1),
514        });
515    }
516
517    #[quickcheck]
518    fn prop_message_encode_decode(message: Message) {
519        let encoded = &wire::serialize(&message);
520        let decoded = wire::deserialize::<Message>(encoded).unwrap();
521
522        assert_eq!(message, decoded);
523    }
524
525    #[test]
526    fn prop_message_decoder() {
527        fn property(items: Vec<Message>) {
528            let mut decoder = Deserializer::<1048576, Message>::new(8);
529
530            for item in &items {
531                item.encode(&mut decoder);
532            }
533            for item in items {
534                assert_eq!(decoder.next().unwrap().unwrap(), item);
535            }
536        }
537
538        qcheck::QuickCheck::new()
539            .gen(qcheck::Gen::new(16))
540            .quickcheck(property as fn(items: Vec<Message>));
541    }
542
543    #[test]
544    fn prop_zero_bytes_encode_decode() {
545        fn property(zeroes: wire::Size) {
546            if zeroes > Ping::MAX_PING_ZEROES {
547                return;
548            }
549
550            let zeroes = ZeroBytes::new(zeroes);
551
552            assert_eq!(
553                wire::deserialize::<ZeroBytes>(&wire::serialize(&zeroes)).unwrap(),
554                zeroes
555            );
556        }
557
558        qcheck::QuickCheck::new()
559            .gen(qcheck::Gen::new(16))
560            .quickcheck(property as fn(zeroes: wire::Size));
561    }
562
563    #[quickcheck]
564    fn prop_addr(addr: Address) {
565        assert_eq!(
566            wire::deserialize::<Address>(&wire::serialize(&addr)).unwrap(),
567            addr
568        );
569    }
570}