radicle_protocol/wire/
message.rs

1use std::{mem, net};
2
3use bytes::Buf;
4use bytes::BufMut;
5use cyphernet::addr::{tor, HostName, NetAddr};
6use radicle::crypto::Signature;
7use radicle::git::Oid;
8use radicle::identity::RepoId;
9use radicle::node::Address;
10use radicle::node::NodeId;
11use radicle::node::Timestamp;
12
13use crate::bounded::BoundedVec;
14use crate::service::filter::Filter;
15use crate::service::message::*;
16use crate::wire;
17
18/// Message type.
19#[repr(u16)]
20#[derive(Debug, Clone, Copy, PartialEq, Eq)]
21pub enum MessageType {
22    NodeAnnouncement = 2,
23    InventoryAnnouncement = 4,
24    RefsAnnouncement = 6,
25    Subscribe = 8,
26    Ping = 10,
27    Pong = 12,
28    Info = 14,
29}
30
31impl From<MessageType> for u16 {
32    fn from(other: MessageType) -> Self {
33        other as u16
34    }
35}
36
37impl TryFrom<u16> for MessageType {
38    type Error = u16;
39
40    fn try_from(other: u16) -> Result<Self, Self::Error> {
41        match other {
42            2 => Ok(MessageType::NodeAnnouncement),
43            4 => Ok(MessageType::InventoryAnnouncement),
44            6 => Ok(MessageType::RefsAnnouncement),
45            8 => Ok(MessageType::Subscribe),
46            10 => Ok(MessageType::Ping),
47            12 => Ok(MessageType::Pong),
48            14 => Ok(MessageType::Info),
49            _ => Err(other),
50        }
51    }
52}
53
54impl Message {
55    /// The maximum supported message size in bytes.
56    pub const MAX_SIZE: wire::Size =
57        wire::Size::MAX - (mem::size_of::<MessageType>() as wire::Size);
58
59    pub fn type_id(&self) -> u16 {
60        match self {
61            Self::Subscribe { .. } => MessageType::Subscribe,
62            Self::Announcement(Announcement { message, .. }) => match message {
63                AnnouncementMessage::Node(_) => MessageType::NodeAnnouncement,
64                AnnouncementMessage::Inventory(_) => MessageType::InventoryAnnouncement,
65                AnnouncementMessage::Refs(_) => MessageType::RefsAnnouncement,
66            },
67            Self::Info(_) => MessageType::Info,
68            Self::Ping { .. } => MessageType::Ping,
69            Self::Pong { .. } => MessageType::Pong,
70        }
71        .into()
72    }
73}
74
75/// Address type.
76#[repr(u8)]
77#[derive(Debug, Clone, Copy, PartialEq, Eq)]
78pub enum AddressType {
79    Ipv4 = 1,
80    Ipv6 = 2,
81    Dns = 3,
82    Onion = 4,
83}
84
85impl From<AddressType> for u8 {
86    fn from(other: AddressType) -> Self {
87        other as u8
88    }
89}
90
91impl From<&Address> for AddressType {
92    fn from(a: &Address) -> Self {
93        match a.host {
94            HostName::Ip(net::IpAddr::V4(_)) => AddressType::Ipv4,
95            HostName::Ip(net::IpAddr::V6(_)) => AddressType::Ipv6,
96            HostName::Dns(_) => AddressType::Dns,
97            HostName::Tor(_) => AddressType::Onion,
98            _ => todo!(), // FIXME(cloudhead): Maxim will remove `non-exhaustive`
99        }
100    }
101}
102
103impl TryFrom<u8> for AddressType {
104    type Error = u8;
105
106    fn try_from(other: u8) -> Result<Self, Self::Error> {
107        match other {
108            1 => Ok(AddressType::Ipv4),
109            2 => Ok(AddressType::Ipv6),
110            3 => Ok(AddressType::Dns),
111            4 => Ok(AddressType::Onion),
112            _ => Err(other),
113        }
114    }
115}
116
117impl wire::Encode for AnnouncementMessage {
118    fn encode(&self, buf: &mut impl BufMut) {
119        match self {
120            Self::Node(ann) => ann.encode(buf),
121            Self::Inventory(ann) => ann.encode(buf),
122            Self::Refs(ann) => ann.encode(buf),
123        }
124    }
125}
126
127impl wire::Encode for RefsAnnouncement {
128    fn encode(&self, buf: &mut impl BufMut) {
129        self.rid.encode(buf);
130        self.refs.encode(buf);
131        self.timestamp.encode(buf);
132    }
133}
134
135impl wire::Decode for RefsAnnouncement {
136    fn decode(buf: &mut impl Buf) -> Result<Self, wire::Error> {
137        let rid = RepoId::decode(buf)?;
138        let refs = BoundedVec::<_, REF_REMOTE_LIMIT>::decode(buf)?;
139        let timestamp = Timestamp::decode(buf)?;
140
141        Ok(Self {
142            rid,
143            refs,
144            timestamp,
145        })
146    }
147}
148
149impl wire::Encode for InventoryAnnouncement {
150    fn encode(&self, buf: &mut impl BufMut) {
151        self.inventory.encode(buf);
152        self.timestamp.encode(buf);
153    }
154}
155
156impl wire::Decode for InventoryAnnouncement {
157    fn decode(buf: &mut impl Buf) -> Result<Self, wire::Error> {
158        let inventory = BoundedVec::decode(buf)?;
159        let timestamp = Timestamp::decode(buf)?;
160
161        Ok(Self {
162            inventory,
163            timestamp,
164        })
165    }
166}
167
168/// The type tracking the different variants of [`Info`] for encoding and
169/// decoding purposes.
170#[repr(u8)]
171#[derive(Debug, Clone, Copy, PartialEq, Eq)]
172pub enum InfoType {
173    RefsAlreadySynced = 1,
174}
175
176impl From<InfoType> for u16 {
177    fn from(other: InfoType) -> Self {
178        other as u16
179    }
180}
181
182impl TryFrom<u16> for InfoType {
183    type Error = u16;
184
185    fn try_from(other: u16) -> Result<Self, Self::Error> {
186        match other {
187            1 => Ok(Self::RefsAlreadySynced),
188            n => Err(n),
189        }
190    }
191}
192
193impl From<Info> for InfoType {
194    fn from(info: Info) -> Self {
195        (&info).into()
196    }
197}
198
199impl From<&Info> for InfoType {
200    fn from(info: &Info) -> Self {
201        match info {
202            Info::RefsAlreadySynced { .. } => Self::RefsAlreadySynced,
203        }
204    }
205}
206
207impl wire::Encode for Info {
208    fn encode(&self, buf: &mut impl BufMut) {
209        u16::from(InfoType::from(self)).encode(buf);
210        match self {
211            Info::RefsAlreadySynced { rid, at } => {
212                rid.encode(buf);
213                at.encode(buf);
214            }
215        }
216    }
217}
218
219impl wire::Decode for Info {
220    fn decode(buf: &mut impl Buf) -> Result<Self, wire::Error> {
221        let info_type = buf.try_get_u16()?;
222
223        match InfoType::try_from(info_type) {
224            Ok(InfoType::RefsAlreadySynced) => {
225                let rid = RepoId::decode(buf)?;
226                let at = Oid::decode(buf)?;
227
228                Ok(Self::RefsAlreadySynced { rid, at })
229            }
230            Err(other) => Err(wire::Invalid::InfoMessageType { actual: other }.into()),
231        }
232    }
233}
234
235impl wire::Encode for Message {
236    fn encode(&self, buf: &mut impl BufMut) {
237        let buf = &mut buf.limit(wire::Size::MAX as usize);
238
239        self.type_id().encode(buf);
240
241        match self {
242            Self::Subscribe(Subscribe {
243                filter,
244                since,
245                until,
246            }) => {
247                filter.encode(buf);
248                since.encode(buf);
249                until.encode(buf);
250            }
251            Self::Announcement(Announcement {
252                node,
253                message,
254                signature,
255            }) => {
256                node.encode(buf);
257                signature.encode(buf);
258                message.encode(buf);
259            }
260            Self::Info(info) => {
261                info.encode(buf);
262            }
263            Self::Ping(Ping { ponglen, zeroes }) => {
264                ponglen.encode(buf);
265                zeroes.encode(buf);
266            }
267            Self::Pong { zeroes } => {
268                zeroes.encode(buf);
269            }
270        }
271    }
272}
273
274impl wire::Decode for Message {
275    fn decode(buf: &mut impl Buf) -> Result<Self, wire::Error> {
276        let type_id = buf.try_get_u16()?;
277
278        match MessageType::try_from(type_id) {
279            Ok(MessageType::Subscribe) => {
280                let filter = Filter::decode(buf)?;
281                let since = Timestamp::decode(buf)?;
282                let until = Timestamp::decode(buf)?;
283
284                Ok(Self::Subscribe(Subscribe {
285                    filter,
286                    since,
287                    until,
288                }))
289            }
290            Ok(MessageType::NodeAnnouncement) => {
291                let node = NodeId::decode(buf)?;
292                let signature = Signature::decode(buf)?;
293                let message = NodeAnnouncement::decode(buf)?.into();
294
295                Ok(Announcement {
296                    node,
297                    message,
298                    signature,
299                }
300                .into())
301            }
302            Ok(MessageType::InventoryAnnouncement) => {
303                let node = NodeId::decode(buf)?;
304                let signature = Signature::decode(buf)?;
305                let message = InventoryAnnouncement::decode(buf)?.into();
306
307                Ok(Announcement {
308                    node,
309                    message,
310                    signature,
311                }
312                .into())
313            }
314            Ok(MessageType::RefsAnnouncement) => {
315                let node = NodeId::decode(buf)?;
316                let signature = Signature::decode(buf)?;
317                let message = RefsAnnouncement::decode(buf)?.into();
318
319                Ok(Announcement {
320                    node,
321                    message,
322                    signature,
323                }
324                .into())
325            }
326            Ok(MessageType::Info) => {
327                let info = Info::decode(buf)?;
328                Ok(Self::Info(info))
329            }
330            Ok(MessageType::Ping) => {
331                let ponglen = u16::decode(buf)?;
332                let zeroes = ZeroBytes::decode(buf)?;
333                Ok(Self::Ping(Ping { ponglen, zeroes }))
334            }
335            Ok(MessageType::Pong) => {
336                let zeroes = ZeroBytes::decode(buf)?;
337                Ok(Self::Pong { zeroes })
338            }
339            Err(other) => Err(wire::Invalid::MessageType { actual: other }.into()),
340        }
341    }
342}
343
344impl wire::Encode for Address {
345    fn encode(&self, buf: &mut impl BufMut) {
346        match self.host {
347            HostName::Ip(net::IpAddr::V4(ip)) => {
348                u8::from(AddressType::Ipv4).encode(buf);
349                ip.octets().encode(buf);
350            }
351            HostName::Ip(net::IpAddr::V6(ip)) => {
352                u8::from(AddressType::Ipv6).encode(buf);
353                ip.octets().encode(buf);
354            }
355            HostName::Dns(ref dns) => {
356                u8::from(AddressType::Dns).encode(buf);
357                dns.encode(buf);
358            }
359            HostName::Tor(addr) => {
360                u8::from(AddressType::Onion).encode(buf);
361                addr.encode(buf);
362            }
363            _ => {
364                unimplemented!(
365                    "Encoding not defined for addresses of the same type as the following: {:?}",
366                    self.host
367                );
368            }
369        }
370        self.port().encode(buf);
371    }
372}
373
374impl wire::Decode for Address {
375    fn decode(buf: &mut impl Buf) -> Result<Self, wire::Error> {
376        let addrtype = buf.try_get_u8()?;
377
378        let host = match AddressType::try_from(addrtype) {
379            Ok(AddressType::Ipv4) => {
380                let octets: [u8; 4] = wire::Decode::decode(buf)?;
381                let ip = net::Ipv4Addr::from(octets);
382
383                HostName::Ip(net::IpAddr::V4(ip))
384            }
385            Ok(AddressType::Ipv6) => {
386                let octets: [u8; 16] = wire::Decode::decode(buf)?;
387                let ip = net::Ipv6Addr::from(octets);
388
389                HostName::Ip(net::IpAddr::V6(ip))
390            }
391            Ok(AddressType::Dns) => {
392                let dns: String = wire::Decode::decode(buf)?;
393
394                HostName::Dns(dns)
395            }
396            Ok(AddressType::Onion) => {
397                let onion: tor::OnionAddrV3 = wire::Decode::decode(buf)?;
398
399                HostName::Tor(onion)
400            }
401            Err(other) => return Err(wire::Invalid::AddressType { actual: other }.into()),
402        };
403        let port = u16::decode(buf)?;
404
405        Ok(Self::from(NetAddr { host, port }))
406    }
407}
408
409impl wire::Encode for ZeroBytes {
410    fn encode(&self, buf: &mut impl BufMut) {
411        (self.len() as u16).encode(buf);
412        buf.put_bytes(0u8, self.len());
413    }
414}
415
416impl wire::Decode for ZeroBytes {
417    fn decode(buf: &mut impl Buf) -> Result<Self, wire::Error> {
418        let zeroes = u16::decode(buf)?;
419        for _ in 0..zeroes {
420            _ = u8::decode(buf)?;
421        }
422        Ok(ZeroBytes::new(zeroes))
423    }
424}
425
426#[cfg(test)]
427mod tests {
428    use qcheck_macros::quickcheck;
429    use radicle::node::device::Device;
430    use radicle::node::UserAgent;
431    use radicle::storage::refs::RefsAt;
432    use radicle::test::arbitrary;
433
434    use crate::deserializer::Deserializer;
435    use crate::prop_roundtrip;
436    use crate::wire::{roundtrip, Encode as _};
437
438    use super::*;
439
440    prop_roundtrip!(Address);
441    prop_roundtrip!(Message);
442
443    #[test]
444    fn test_refs_ann_max_size() {
445        let signer = Device::mock();
446        let refs: [RefsAt; REF_REMOTE_LIMIT] = arbitrary::gen(1);
447        let ann = AnnouncementMessage::Refs(RefsAnnouncement {
448            rid: arbitrary::gen(1),
449            refs: BoundedVec::collect_from(&mut refs.into_iter()),
450            timestamp: arbitrary::gen(1),
451        });
452        let ann = ann.signed(&signer);
453        let msg = Message::Announcement(ann);
454        let data = msg.encode_to_vec();
455
456        assert!(data.len() < wire::Size::MAX as usize);
457    }
458
459    #[test]
460    fn test_inv_ann_max_size() {
461        let signer = Device::mock();
462        let inv: [RepoId; INVENTORY_LIMIT] = arbitrary::gen(1);
463        let ann = AnnouncementMessage::Inventory(InventoryAnnouncement {
464            inventory: BoundedVec::collect_from(&mut inv.into_iter()),
465            timestamp: arbitrary::gen(1),
466        });
467        let ann = ann.signed(&signer);
468        let msg = Message::Announcement(ann);
469        let data = msg.encode_to_vec();
470
471        assert!(data.len() < wire::Size::MAX as usize);
472    }
473
474    #[test]
475    fn test_node_ann_max_size() {
476        let signer = Device::mock();
477        let addrs: [Address; ADDRESS_LIMIT] = arbitrary::gen(1);
478        let alias = ['@'; radicle::node::MAX_ALIAS_LENGTH];
479        let ann = AnnouncementMessage::Node(NodeAnnouncement {
480            version: 1,
481            features: Default::default(),
482            alias: radicle::node::Alias::new(String::from_iter(alias)),
483            addresses: BoundedVec::collect_from(&mut addrs.into_iter()),
484            timestamp: arbitrary::gen(1),
485            nonce: u64::MAX,
486            agent: UserAgent::default(),
487        });
488        let ann = ann.signed(&signer);
489        let msg = Message::Announcement(ann);
490        let data = msg.encode_to_vec();
491
492        assert!(data.len() < wire::Size::MAX as usize);
493    }
494
495    #[test]
496    fn test_pingpong_encode_max_size() {
497        Message::Ping(Ping {
498            ponglen: 0,
499            zeroes: ZeroBytes::new(Ping::MAX_PING_ZEROES),
500        })
501        .encode_to_vec();
502
503        (Message::Pong {
504            zeroes: ZeroBytes::new(Ping::MAX_PONG_ZEROES),
505        })
506        .encode_to_vec();
507    }
508
509    #[test]
510    #[should_panic(expected = "advance out of bounds")]
511    fn test_ping_encode_size_overflow() {
512        Message::Ping(Ping {
513            ponglen: 0,
514            zeroes: ZeroBytes::new(Ping::MAX_PING_ZEROES + 1),
515        })
516        .encode_to_vec();
517    }
518
519    #[test]
520    #[should_panic(expected = "advance out of bounds")]
521    fn test_pong_encode_size_overflow() {
522        Message::Pong {
523            zeroes: ZeroBytes::new(Ping::MAX_PONG_ZEROES + 1),
524        }
525        .encode_to_vec();
526    }
527
528    #[test]
529    fn prop_message_decoder() {
530        fn property(items: Vec<Message>) {
531            let mut decoder = Deserializer::<1048576, Message>::new(8);
532
533            for item in &items {
534                item.encode(&mut decoder);
535            }
536            for item in items {
537                assert_eq!(decoder.next().unwrap().unwrap(), item);
538            }
539        }
540
541        qcheck::QuickCheck::new()
542            .gen(qcheck::Gen::new(16))
543            .quickcheck(property as fn(items: Vec<Message>));
544    }
545
546    #[quickcheck]
547    fn prop_zero_bytes_encode_decode(zeroes: wire::Size) -> qcheck::TestResult {
548        if zeroes > Ping::MAX_PING_ZEROES {
549            return qcheck::TestResult::discard();
550        }
551
552        roundtrip(ZeroBytes::new(zeroes));
553
554        qcheck::TestResult::passed()
555    }
556}