Skip to main content

radicle_protocol/wire/
message.rs

1use std::{mem, net};
2
3use bytes::Buf;
4use bytes::BufMut;
5#[cfg(feature = "i2p")]
6use cypheraddr::i2p;
7#[cfg(feature = "tor")]
8use cypheraddr::tor;
9use cypheraddr::{HostName, NetAddr};
10use radicle::crypto::Signature;
11use radicle::git::Oid;
12use radicle::identity::RepoId;
13use radicle::node::Address;
14use radicle::node::NodeId;
15use radicle::node::Timestamp;
16
17use crate::bounded::BoundedVec;
18use crate::service::filter::Filter;
19use crate::service::message::*;
20use crate::wire;
21
22/// Message type.
23#[repr(u16)]
24#[derive(Debug, Clone, Copy, PartialEq, Eq)]
25pub enum MessageType {
26    NodeAnnouncement = 2,
27    InventoryAnnouncement = 4,
28    RefsAnnouncement = 6,
29    Subscribe = 8,
30    Ping = 10,
31    Pong = 12,
32    Info = 14,
33}
34
35impl From<MessageType> for u16 {
36    fn from(other: MessageType) -> Self {
37        other as u16
38    }
39}
40
41impl TryFrom<u16> for MessageType {
42    type Error = u16;
43
44    fn try_from(other: u16) -> Result<Self, Self::Error> {
45        match other {
46            2 => Ok(MessageType::NodeAnnouncement),
47            4 => Ok(MessageType::InventoryAnnouncement),
48            6 => Ok(MessageType::RefsAnnouncement),
49            8 => Ok(MessageType::Subscribe),
50            10 => Ok(MessageType::Ping),
51            12 => Ok(MessageType::Pong),
52            14 => Ok(MessageType::Info),
53            _ => Err(other),
54        }
55    }
56}
57
58impl Message {
59    /// The maximum supported message size in bytes.
60    pub const MAX_SIZE: wire::Size =
61        wire::Size::MAX - (mem::size_of::<MessageType>() as wire::Size);
62
63    pub fn type_id(&self) -> u16 {
64        match self {
65            Self::Subscribe { .. } => MessageType::Subscribe,
66            Self::Announcement(Announcement { message, .. }) => match message {
67                AnnouncementMessage::Node(_) => MessageType::NodeAnnouncement,
68                AnnouncementMessage::Inventory(_) => MessageType::InventoryAnnouncement,
69                AnnouncementMessage::Refs(_) => MessageType::RefsAnnouncement,
70            },
71            Self::Info(_) => MessageType::Info,
72            Self::Ping { .. } => MessageType::Ping,
73            Self::Pong { .. } => MessageType::Pong,
74        }
75        .into()
76    }
77}
78
79/// Address type.
80#[repr(u8)]
81#[derive(Debug, Clone, Copy, PartialEq, Eq)]
82pub enum AddressType {
83    Ipv4 = 1,
84    Ipv6 = 2,
85    Dns = 3,
86    #[cfg(feature = "tor")]
87    Onion = 4,
88    #[cfg(feature = "i2p")]
89    I2p = 5,
90}
91
92impl From<AddressType> for u8 {
93    fn from(other: AddressType) -> Self {
94        other as u8
95    }
96}
97
98impl From<&Address> for AddressType {
99    fn from(a: &Address) -> Self {
100        match a.host {
101            HostName::Ip(net::IpAddr::V4(_)) => AddressType::Ipv4,
102            HostName::Ip(net::IpAddr::V6(_)) => AddressType::Ipv6,
103            HostName::Dns(_) => AddressType::Dns,
104            #[cfg(feature = "tor")]
105            HostName::Tor(_) => AddressType::Onion,
106            #[cfg(feature = "i2p")]
107            HostName::I2p(_) => AddressType::I2p,
108            _ => todo!(), // FIXME(cloudhead): Maxim will remove `non-exhaustive`
109        }
110    }
111}
112
113impl TryFrom<u8> for AddressType {
114    type Error = u8;
115
116    fn try_from(other: u8) -> Result<Self, Self::Error> {
117        match other {
118            1 => Ok(AddressType::Ipv4),
119            2 => Ok(AddressType::Ipv6),
120            3 => Ok(AddressType::Dns),
121            #[cfg(feature = "tor")]
122            4 => Ok(AddressType::Onion),
123            #[cfg(feature = "i2p")]
124            5 => Ok(AddressType::I2p),
125            _ => Err(other),
126        }
127    }
128}
129
130impl wire::Encode for AnnouncementMessage {
131    fn encode(&self, buf: &mut impl BufMut) {
132        match self {
133            Self::Node(ann) => ann.encode(buf),
134            Self::Inventory(ann) => ann.encode(buf),
135            Self::Refs(ann) => ann.encode(buf),
136        }
137    }
138}
139
140impl wire::Encode for RefsAnnouncement {
141    fn encode(&self, buf: &mut impl BufMut) {
142        self.rid.encode(buf);
143        self.refs.encode(buf);
144        self.timestamp.encode(buf);
145    }
146}
147
148impl wire::Decode for RefsAnnouncement {
149    fn decode(buf: &mut impl Buf) -> Result<Self, wire::Error> {
150        let rid = RepoId::decode(buf)?;
151        let refs = BoundedVec::<_, REF_REMOTE_LIMIT>::decode(buf)?;
152        let timestamp = Timestamp::decode(buf)?;
153
154        Ok(Self {
155            rid,
156            refs,
157            timestamp,
158        })
159    }
160}
161
162impl wire::Encode for InventoryAnnouncement {
163    fn encode(&self, buf: &mut impl BufMut) {
164        self.inventory.encode(buf);
165        self.timestamp.encode(buf);
166    }
167}
168
169impl wire::Decode for InventoryAnnouncement {
170    fn decode(buf: &mut impl Buf) -> Result<Self, wire::Error> {
171        let inventory = BoundedVec::decode(buf)?;
172        let timestamp = Timestamp::decode(buf)?;
173
174        Ok(Self {
175            inventory,
176            timestamp,
177        })
178    }
179}
180
181/// The type tracking the different variants of [`Info`] for encoding and
182/// decoding purposes.
183#[repr(u8)]
184#[derive(Debug, Clone, Copy, PartialEq, Eq)]
185pub enum InfoType {
186    RefsAlreadySynced = 1,
187}
188
189impl From<InfoType> for u16 {
190    fn from(other: InfoType) -> Self {
191        other as u16
192    }
193}
194
195impl TryFrom<u16> for InfoType {
196    type Error = u16;
197
198    fn try_from(other: u16) -> Result<Self, Self::Error> {
199        match other {
200            1 => Ok(Self::RefsAlreadySynced),
201            n => Err(n),
202        }
203    }
204}
205
206impl From<Info> for InfoType {
207    fn from(info: Info) -> Self {
208        (&info).into()
209    }
210}
211
212impl From<&Info> for InfoType {
213    fn from(info: &Info) -> Self {
214        match info {
215            Info::RefsAlreadySynced { .. } => Self::RefsAlreadySynced,
216        }
217    }
218}
219
220impl wire::Encode for Info {
221    fn encode(&self, buf: &mut impl BufMut) {
222        u16::from(InfoType::from(self)).encode(buf);
223        match self {
224            Info::RefsAlreadySynced { rid, at } => {
225                rid.encode(buf);
226                at.encode(buf);
227            }
228        }
229    }
230}
231
232impl wire::Decode for Info {
233    fn decode(buf: &mut impl Buf) -> Result<Self, wire::Error> {
234        let info_type = buf.try_get_u16()?;
235
236        match InfoType::try_from(info_type) {
237            Ok(InfoType::RefsAlreadySynced) => {
238                let rid = RepoId::decode(buf)?;
239                let at = Oid::decode(buf)?;
240
241                Ok(Self::RefsAlreadySynced { rid, at })
242            }
243            Err(other) => Err(wire::Invalid::InfoMessageType { actual: other }.into()),
244        }
245    }
246}
247
248impl wire::Encode for Message {
249    fn encode(&self, buf: &mut impl BufMut) {
250        let buf = &mut buf.limit(wire::Size::MAX as usize);
251
252        self.type_id().encode(buf);
253
254        match self {
255            Self::Subscribe(Subscribe {
256                filter,
257                since,
258                until,
259            }) => {
260                filter.encode(buf);
261                since.encode(buf);
262                until.encode(buf);
263            }
264            Self::Announcement(Announcement {
265                node,
266                message,
267                signature,
268            }) => {
269                node.encode(buf);
270                signature.encode(buf);
271                message.encode(buf);
272            }
273            Self::Info(info) => {
274                info.encode(buf);
275            }
276            Self::Ping(Ping { ponglen, zeroes }) => {
277                ponglen.encode(buf);
278                zeroes.encode(buf);
279            }
280            Self::Pong { zeroes } => {
281                zeroes.encode(buf);
282            }
283        }
284    }
285}
286
287impl wire::Decode for Message {
288    fn decode(buf: &mut impl Buf) -> Result<Self, wire::Error> {
289        let type_id = buf.try_get_u16()?;
290
291        match MessageType::try_from(type_id) {
292            Ok(MessageType::Subscribe) => {
293                let filter = Filter::decode(buf)?;
294                let since = Timestamp::decode(buf)?;
295                let until = Timestamp::decode(buf)?;
296
297                Ok(Self::Subscribe(Subscribe {
298                    filter,
299                    since,
300                    until,
301                }))
302            }
303            Ok(MessageType::NodeAnnouncement) => {
304                let node = NodeId::decode(buf)?;
305                let signature = Signature::decode(buf)?;
306                let message = NodeAnnouncement::decode(buf)?.into();
307
308                Ok(Announcement {
309                    node,
310                    message,
311                    signature,
312                }
313                .into())
314            }
315            Ok(MessageType::InventoryAnnouncement) => {
316                let node = NodeId::decode(buf)?;
317                let signature = Signature::decode(buf)?;
318                let message = InventoryAnnouncement::decode(buf)?.into();
319
320                Ok(Announcement {
321                    node,
322                    message,
323                    signature,
324                }
325                .into())
326            }
327            Ok(MessageType::RefsAnnouncement) => {
328                let node = NodeId::decode(buf)?;
329                let signature = Signature::decode(buf)?;
330                let message = RefsAnnouncement::decode(buf)?.into();
331
332                Ok(Announcement {
333                    node,
334                    message,
335                    signature,
336                }
337                .into())
338            }
339            Ok(MessageType::Info) => {
340                let info = Info::decode(buf)?;
341                Ok(Self::Info(info))
342            }
343            Ok(MessageType::Ping) => {
344                let ponglen = u16::decode(buf)?;
345                let zeroes = ZeroBytes::decode(buf)?;
346                Ok(Self::Ping(Ping { ponglen, zeroes }))
347            }
348            Ok(MessageType::Pong) => {
349                let zeroes = ZeroBytes::decode(buf)?;
350                Ok(Self::Pong { zeroes })
351            }
352            Err(other) => Err(wire::Invalid::MessageType { actual: other }.into()),
353        }
354    }
355}
356
357impl wire::Encode for Address {
358    fn encode(&self, buf: &mut impl BufMut) {
359        match self.host {
360            HostName::Ip(net::IpAddr::V4(ip)) => {
361                u8::from(AddressType::Ipv4).encode(buf);
362                ip.octets().encode(buf);
363            }
364            HostName::Ip(net::IpAddr::V6(ip)) => {
365                u8::from(AddressType::Ipv6).encode(buf);
366                ip.octets().encode(buf);
367            }
368            HostName::Dns(ref dns) => {
369                u8::from(AddressType::Dns).encode(buf);
370                dns.encode(buf);
371            }
372            #[cfg(feature = "tor")]
373            HostName::Tor(addr) => {
374                u8::from(AddressType::Onion).encode(buf);
375                addr.encode(buf);
376            }
377            #[cfg(feature = "i2p")]
378            HostName::I2p(ref addr) => {
379                u8::from(AddressType::I2p).encode(buf);
380                addr.encode(buf);
381            }
382            _ => {
383                unimplemented!(
384                    "Encoding not defined for addresses of the same type as the following: {:?}",
385                    self.host
386                );
387            }
388        }
389        self.port().encode(buf);
390    }
391}
392
393impl wire::Decode for Address {
394    fn decode(buf: &mut impl Buf) -> Result<Self, wire::Error> {
395        let addrtype = buf.try_get_u8()?;
396
397        let host = match AddressType::try_from(addrtype) {
398            Ok(AddressType::Ipv4) => {
399                let octets: [u8; 4] = wire::Decode::decode(buf)?;
400                let ip = net::Ipv4Addr::from(octets);
401
402                HostName::Ip(net::IpAddr::V4(ip))
403            }
404            Ok(AddressType::Ipv6) => {
405                let octets: [u8; 16] = wire::Decode::decode(buf)?;
406                let ip = net::Ipv6Addr::from(octets);
407
408                HostName::Ip(net::IpAddr::V6(ip))
409            }
410            Ok(AddressType::Dns) => {
411                let dns: String = wire::Decode::decode(buf)?;
412
413                HostName::Dns(dns)
414            }
415            #[cfg(feature = "tor")]
416            Ok(AddressType::Onion) => {
417                let onion: tor::OnionAddrV3 = wire::Decode::decode(buf)?;
418
419                HostName::Tor(onion)
420            }
421            #[cfg(feature = "i2p")]
422            Ok(AddressType::I2p) => {
423                let i2p: i2p::I2pAddr = wire::Decode::decode(buf)?;
424
425                HostName::I2p(i2p)
426            }
427            Err(other) => return Err(wire::Invalid::AddressType { actual: other }.into()),
428        };
429        let port = u16::decode(buf)?;
430
431        Ok(Self::from(NetAddr { host, port }))
432    }
433}
434
435impl wire::Encode for ZeroBytes {
436    fn encode(&self, buf: &mut impl BufMut) {
437        (self.len() as u16).encode(buf);
438        buf.put_bytes(0u8, self.len());
439    }
440}
441
442impl wire::Decode for ZeroBytes {
443    fn decode(buf: &mut impl Buf) -> Result<Self, wire::Error> {
444        let zeroes = u16::decode(buf)?;
445        for _ in 0..zeroes {
446            _ = u8::decode(buf)?;
447        }
448        Ok(ZeroBytes::new(zeroes))
449    }
450}
451
452#[cfg(test)]
453mod tests {
454    use qcheck_macros::quickcheck;
455    use radicle::node::UserAgent;
456    use radicle::node::device::Device;
457    use radicle::storage::refs::RefsAt;
458    use radicle::test::arbitrary;
459
460    use crate::deserializer::Deserializer;
461    use crate::prop_roundtrip;
462    use crate::wire::{Encode as _, roundtrip};
463
464    use super::*;
465
466    prop_roundtrip!(Address);
467    prop_roundtrip!(Message);
468
469    #[test]
470    fn test_refs_ann_max_size() {
471        let signer = Device::mock();
472        let refs: [RefsAt; REF_REMOTE_LIMIT] = arbitrary::r#gen(1);
473        let ann = AnnouncementMessage::Refs(RefsAnnouncement {
474            rid: arbitrary::r#gen(1),
475            refs: BoundedVec::collect_from(&mut refs.into_iter()),
476            timestamp: arbitrary::r#gen(1),
477        });
478        let ann = ann.signed(&signer);
479        let msg = Message::Announcement(ann);
480        let data = msg.encode_to_vec();
481
482        assert!(data.len() < wire::Size::MAX as usize);
483    }
484
485    #[test]
486    fn test_inv_ann_max_size() {
487        let signer = Device::mock();
488        let inv: [RepoId; INVENTORY_LIMIT] = arbitrary::r#gen(1);
489        let ann = AnnouncementMessage::Inventory(InventoryAnnouncement {
490            inventory: BoundedVec::collect_from(&mut inv.into_iter()),
491            timestamp: arbitrary::r#gen(1),
492        });
493        let ann = ann.signed(&signer);
494        let msg = Message::Announcement(ann);
495        let data = msg.encode_to_vec();
496
497        assert!(data.len() < wire::Size::MAX as usize);
498    }
499
500    #[test]
501    fn test_node_ann_max_size() {
502        let signer = Device::mock();
503        let addrs: [Address; ADDRESS_LIMIT] = arbitrary::r#gen(1);
504        let alias = ['@'; radicle::node::MAX_ALIAS_LENGTH];
505        let ann = AnnouncementMessage::Node(NodeAnnouncement {
506            version: 1,
507            features: Default::default(),
508            alias: radicle::node::Alias::new(String::from_iter(alias)),
509            addresses: BoundedVec::collect_from(&mut addrs.into_iter()),
510            timestamp: arbitrary::r#gen(1),
511            nonce: u64::MAX,
512            agent: UserAgent::default(),
513        });
514        let ann = ann.signed(&signer);
515        let msg = Message::Announcement(ann);
516        let data = msg.encode_to_vec();
517
518        assert!(data.len() < wire::Size::MAX as usize);
519    }
520
521    #[test]
522    fn test_pingpong_encode_max_size() {
523        Message::Ping(Ping {
524            ponglen: 0,
525            zeroes: ZeroBytes::new(Ping::MAX_PING_ZEROES),
526        })
527        .encode_to_vec();
528
529        (Message::Pong {
530            zeroes: ZeroBytes::new(Ping::MAX_PONG_ZEROES),
531        })
532        .encode_to_vec();
533    }
534
535    #[test]
536    #[should_panic(expected = "advance out of bounds")]
537    fn test_ping_encode_size_overflow() {
538        Message::Ping(Ping {
539            ponglen: 0,
540            zeroes: ZeroBytes::new(Ping::MAX_PING_ZEROES + 1),
541        })
542        .encode_to_vec();
543    }
544
545    #[test]
546    #[should_panic(expected = "advance out of bounds")]
547    fn test_pong_encode_size_overflow() {
548        Message::Pong {
549            zeroes: ZeroBytes::new(Ping::MAX_PONG_ZEROES + 1),
550        }
551        .encode_to_vec();
552    }
553
554    #[test]
555    fn prop_message_decoder() {
556        fn property(items: Vec<Message>) {
557            let mut decoder = Deserializer::<1048576, Message>::new(8);
558
559            for item in &items {
560                item.encode(&mut decoder);
561            }
562            for item in items {
563                assert_eq!(decoder.next().unwrap().unwrap(), item);
564            }
565        }
566
567        qcheck::QuickCheck::new()
568            .r#gen(qcheck::Gen::new(16))
569            .quickcheck(property as fn(items: Vec<Message>));
570    }
571
572    #[quickcheck]
573    fn prop_zero_bytes_encode_decode(zeroes: wire::Size) -> qcheck::TestResult {
574        if zeroes > Ping::MAX_PING_ZEROES {
575            return qcheck::TestResult::discard();
576        }
577
578        roundtrip(ZeroBytes::new(zeroes));
579
580        qcheck::TestResult::passed()
581    }
582}