1use std::{mem, net};
2
3use bytes::Buf;
4use bytes::BufMut;
5#[cfg(feature = "i2p")]
6use cypheraddr::i2p;
7#[cfg(feature = "tor")]
8use cypheraddr::tor;
9use cypheraddr::{HostName, NetAddr};
10use radicle::crypto::Signature;
11use radicle::git::Oid;
12use radicle::identity::RepoId;
13use radicle::node::Address;
14use radicle::node::NodeId;
15use radicle::node::Timestamp;
16
17use crate::bounded::BoundedVec;
18use crate::service::filter::Filter;
19use crate::service::message::*;
20use crate::wire;
21
22#[repr(u16)]
24#[derive(Debug, Clone, Copy, PartialEq, Eq)]
25pub enum MessageType {
26 NodeAnnouncement = 2,
27 InventoryAnnouncement = 4,
28 RefsAnnouncement = 6,
29 Subscribe = 8,
30 Ping = 10,
31 Pong = 12,
32 Info = 14,
33}
34
35impl From<MessageType> for u16 {
36 fn from(other: MessageType) -> Self {
37 other as u16
38 }
39}
40
41impl TryFrom<u16> for MessageType {
42 type Error = u16;
43
44 fn try_from(other: u16) -> Result<Self, Self::Error> {
45 match other {
46 2 => Ok(MessageType::NodeAnnouncement),
47 4 => Ok(MessageType::InventoryAnnouncement),
48 6 => Ok(MessageType::RefsAnnouncement),
49 8 => Ok(MessageType::Subscribe),
50 10 => Ok(MessageType::Ping),
51 12 => Ok(MessageType::Pong),
52 14 => Ok(MessageType::Info),
53 _ => Err(other),
54 }
55 }
56}
57
58impl Message {
59 pub const MAX_SIZE: wire::Size =
61 wire::Size::MAX - (mem::size_of::<MessageType>() as wire::Size);
62
63 pub fn type_id(&self) -> u16 {
64 match self {
65 Self::Subscribe { .. } => MessageType::Subscribe,
66 Self::Announcement(Announcement { message, .. }) => match message {
67 AnnouncementMessage::Node(_) => MessageType::NodeAnnouncement,
68 AnnouncementMessage::Inventory(_) => MessageType::InventoryAnnouncement,
69 AnnouncementMessage::Refs(_) => MessageType::RefsAnnouncement,
70 },
71 Self::Info(_) => MessageType::Info,
72 Self::Ping { .. } => MessageType::Ping,
73 Self::Pong { .. } => MessageType::Pong,
74 }
75 .into()
76 }
77}
78
79#[repr(u8)]
81#[derive(Debug, Clone, Copy, PartialEq, Eq)]
82pub enum AddressType {
83 Ipv4 = 1,
84 Ipv6 = 2,
85 Dns = 3,
86 #[cfg(feature = "tor")]
87 Onion = 4,
88 #[cfg(feature = "i2p")]
89 I2p = 5,
90}
91
92impl From<AddressType> for u8 {
93 fn from(other: AddressType) -> Self {
94 other as u8
95 }
96}
97
98impl From<&Address> for AddressType {
99 fn from(a: &Address) -> Self {
100 match a.host {
101 HostName::Ip(net::IpAddr::V4(_)) => AddressType::Ipv4,
102 HostName::Ip(net::IpAddr::V6(_)) => AddressType::Ipv6,
103 HostName::Dns(_) => AddressType::Dns,
104 #[cfg(feature = "tor")]
105 HostName::Tor(_) => AddressType::Onion,
106 #[cfg(feature = "i2p")]
107 HostName::I2p(_) => AddressType::I2p,
108 _ => todo!(), }
110 }
111}
112
113impl TryFrom<u8> for AddressType {
114 type Error = u8;
115
116 fn try_from(other: u8) -> Result<Self, Self::Error> {
117 match other {
118 1 => Ok(AddressType::Ipv4),
119 2 => Ok(AddressType::Ipv6),
120 3 => Ok(AddressType::Dns),
121 #[cfg(feature = "tor")]
122 4 => Ok(AddressType::Onion),
123 #[cfg(feature = "i2p")]
124 5 => Ok(AddressType::I2p),
125 _ => Err(other),
126 }
127 }
128}
129
130impl wire::Encode for AnnouncementMessage {
131 fn encode(&self, buf: &mut impl BufMut) {
132 match self {
133 Self::Node(ann) => ann.encode(buf),
134 Self::Inventory(ann) => ann.encode(buf),
135 Self::Refs(ann) => ann.encode(buf),
136 }
137 }
138}
139
140impl wire::Encode for RefsAnnouncement {
141 fn encode(&self, buf: &mut impl BufMut) {
142 self.rid.encode(buf);
143 self.refs.encode(buf);
144 self.timestamp.encode(buf);
145 }
146}
147
148impl wire::Decode for RefsAnnouncement {
149 fn decode(buf: &mut impl Buf) -> Result<Self, wire::Error> {
150 let rid = RepoId::decode(buf)?;
151 let refs = BoundedVec::<_, REF_REMOTE_LIMIT>::decode(buf)?;
152 let timestamp = Timestamp::decode(buf)?;
153
154 Ok(Self {
155 rid,
156 refs,
157 timestamp,
158 })
159 }
160}
161
162impl wire::Encode for InventoryAnnouncement {
163 fn encode(&self, buf: &mut impl BufMut) {
164 self.inventory.encode(buf);
165 self.timestamp.encode(buf);
166 }
167}
168
169impl wire::Decode for InventoryAnnouncement {
170 fn decode(buf: &mut impl Buf) -> Result<Self, wire::Error> {
171 let inventory = BoundedVec::decode(buf)?;
172 let timestamp = Timestamp::decode(buf)?;
173
174 Ok(Self {
175 inventory,
176 timestamp,
177 })
178 }
179}
180
181#[repr(u8)]
184#[derive(Debug, Clone, Copy, PartialEq, Eq)]
185pub enum InfoType {
186 RefsAlreadySynced = 1,
187}
188
189impl From<InfoType> for u16 {
190 fn from(other: InfoType) -> Self {
191 other as u16
192 }
193}
194
195impl TryFrom<u16> for InfoType {
196 type Error = u16;
197
198 fn try_from(other: u16) -> Result<Self, Self::Error> {
199 match other {
200 1 => Ok(Self::RefsAlreadySynced),
201 n => Err(n),
202 }
203 }
204}
205
206impl From<Info> for InfoType {
207 fn from(info: Info) -> Self {
208 (&info).into()
209 }
210}
211
212impl From<&Info> for InfoType {
213 fn from(info: &Info) -> Self {
214 match info {
215 Info::RefsAlreadySynced { .. } => Self::RefsAlreadySynced,
216 }
217 }
218}
219
220impl wire::Encode for Info {
221 fn encode(&self, buf: &mut impl BufMut) {
222 u16::from(InfoType::from(self)).encode(buf);
223 match self {
224 Info::RefsAlreadySynced { rid, at } => {
225 rid.encode(buf);
226 at.encode(buf);
227 }
228 }
229 }
230}
231
232impl wire::Decode for Info {
233 fn decode(buf: &mut impl Buf) -> Result<Self, wire::Error> {
234 let info_type = buf.try_get_u16()?;
235
236 match InfoType::try_from(info_type) {
237 Ok(InfoType::RefsAlreadySynced) => {
238 let rid = RepoId::decode(buf)?;
239 let at = Oid::decode(buf)?;
240
241 Ok(Self::RefsAlreadySynced { rid, at })
242 }
243 Err(other) => Err(wire::Invalid::InfoMessageType { actual: other }.into()),
244 }
245 }
246}
247
248impl wire::Encode for Message {
249 fn encode(&self, buf: &mut impl BufMut) {
250 let buf = &mut buf.limit(wire::Size::MAX as usize);
251
252 self.type_id().encode(buf);
253
254 match self {
255 Self::Subscribe(Subscribe {
256 filter,
257 since,
258 until,
259 }) => {
260 filter.encode(buf);
261 since.encode(buf);
262 until.encode(buf);
263 }
264 Self::Announcement(Announcement {
265 node,
266 message,
267 signature,
268 }) => {
269 node.encode(buf);
270 signature.encode(buf);
271 message.encode(buf);
272 }
273 Self::Info(info) => {
274 info.encode(buf);
275 }
276 Self::Ping(Ping { ponglen, zeroes }) => {
277 ponglen.encode(buf);
278 zeroes.encode(buf);
279 }
280 Self::Pong { zeroes } => {
281 zeroes.encode(buf);
282 }
283 }
284 }
285}
286
287impl wire::Decode for Message {
288 fn decode(buf: &mut impl Buf) -> Result<Self, wire::Error> {
289 let type_id = buf.try_get_u16()?;
290
291 match MessageType::try_from(type_id) {
292 Ok(MessageType::Subscribe) => {
293 let filter = Filter::decode(buf)?;
294 let since = Timestamp::decode(buf)?;
295 let until = Timestamp::decode(buf)?;
296
297 Ok(Self::Subscribe(Subscribe {
298 filter,
299 since,
300 until,
301 }))
302 }
303 Ok(MessageType::NodeAnnouncement) => {
304 let node = NodeId::decode(buf)?;
305 let signature = Signature::decode(buf)?;
306 let message = NodeAnnouncement::decode(buf)?.into();
307
308 Ok(Announcement {
309 node,
310 message,
311 signature,
312 }
313 .into())
314 }
315 Ok(MessageType::InventoryAnnouncement) => {
316 let node = NodeId::decode(buf)?;
317 let signature = Signature::decode(buf)?;
318 let message = InventoryAnnouncement::decode(buf)?.into();
319
320 Ok(Announcement {
321 node,
322 message,
323 signature,
324 }
325 .into())
326 }
327 Ok(MessageType::RefsAnnouncement) => {
328 let node = NodeId::decode(buf)?;
329 let signature = Signature::decode(buf)?;
330 let message = RefsAnnouncement::decode(buf)?.into();
331
332 Ok(Announcement {
333 node,
334 message,
335 signature,
336 }
337 .into())
338 }
339 Ok(MessageType::Info) => {
340 let info = Info::decode(buf)?;
341 Ok(Self::Info(info))
342 }
343 Ok(MessageType::Ping) => {
344 let ponglen = u16::decode(buf)?;
345 let zeroes = ZeroBytes::decode(buf)?;
346 Ok(Self::Ping(Ping { ponglen, zeroes }))
347 }
348 Ok(MessageType::Pong) => {
349 let zeroes = ZeroBytes::decode(buf)?;
350 Ok(Self::Pong { zeroes })
351 }
352 Err(other) => Err(wire::Invalid::MessageType { actual: other }.into()),
353 }
354 }
355}
356
357impl wire::Encode for Address {
358 fn encode(&self, buf: &mut impl BufMut) {
359 match self.host {
360 HostName::Ip(net::IpAddr::V4(ip)) => {
361 u8::from(AddressType::Ipv4).encode(buf);
362 ip.octets().encode(buf);
363 }
364 HostName::Ip(net::IpAddr::V6(ip)) => {
365 u8::from(AddressType::Ipv6).encode(buf);
366 ip.octets().encode(buf);
367 }
368 HostName::Dns(ref dns) => {
369 u8::from(AddressType::Dns).encode(buf);
370 dns.encode(buf);
371 }
372 #[cfg(feature = "tor")]
373 HostName::Tor(addr) => {
374 u8::from(AddressType::Onion).encode(buf);
375 addr.encode(buf);
376 }
377 #[cfg(feature = "i2p")]
378 HostName::I2p(ref addr) => {
379 u8::from(AddressType::I2p).encode(buf);
380 addr.encode(buf);
381 }
382 _ => {
383 unimplemented!(
384 "Encoding not defined for addresses of the same type as the following: {:?}",
385 self.host
386 );
387 }
388 }
389 self.port().encode(buf);
390 }
391}
392
393impl wire::Decode for Address {
394 fn decode(buf: &mut impl Buf) -> Result<Self, wire::Error> {
395 let addrtype = buf.try_get_u8()?;
396
397 let host = match AddressType::try_from(addrtype) {
398 Ok(AddressType::Ipv4) => {
399 let octets: [u8; 4] = wire::Decode::decode(buf)?;
400 let ip = net::Ipv4Addr::from(octets);
401
402 HostName::Ip(net::IpAddr::V4(ip))
403 }
404 Ok(AddressType::Ipv6) => {
405 let octets: [u8; 16] = wire::Decode::decode(buf)?;
406 let ip = net::Ipv6Addr::from(octets);
407
408 HostName::Ip(net::IpAddr::V6(ip))
409 }
410 Ok(AddressType::Dns) => {
411 let dns: String = wire::Decode::decode(buf)?;
412
413 HostName::Dns(dns)
414 }
415 #[cfg(feature = "tor")]
416 Ok(AddressType::Onion) => {
417 let onion: tor::OnionAddrV3 = wire::Decode::decode(buf)?;
418
419 HostName::Tor(onion)
420 }
421 #[cfg(feature = "i2p")]
422 Ok(AddressType::I2p) => {
423 let i2p: i2p::I2pAddr = wire::Decode::decode(buf)?;
424
425 HostName::I2p(i2p)
426 }
427 Err(other) => return Err(wire::Invalid::AddressType { actual: other }.into()),
428 };
429 let port = u16::decode(buf)?;
430
431 Ok(Self::from(NetAddr { host, port }))
432 }
433}
434
435impl wire::Encode for ZeroBytes {
436 fn encode(&self, buf: &mut impl BufMut) {
437 (self.len() as u16).encode(buf);
438 buf.put_bytes(0u8, self.len());
439 }
440}
441
442impl wire::Decode for ZeroBytes {
443 fn decode(buf: &mut impl Buf) -> Result<Self, wire::Error> {
444 let zeroes = u16::decode(buf)?;
445 for _ in 0..zeroes {
446 _ = u8::decode(buf)?;
447 }
448 Ok(ZeroBytes::new(zeroes))
449 }
450}
451
452#[cfg(test)]
453mod tests {
454 use qcheck_macros::quickcheck;
455 use radicle::node::UserAgent;
456 use radicle::node::device::Device;
457 use radicle::storage::refs::RefsAt;
458 use radicle::test::arbitrary;
459
460 use crate::deserializer::Deserializer;
461 use crate::prop_roundtrip;
462 use crate::wire::{Encode as _, roundtrip};
463
464 use super::*;
465
466 prop_roundtrip!(Address);
467 prop_roundtrip!(Message);
468
469 #[test]
470 fn test_refs_ann_max_size() {
471 let signer = Device::mock();
472 let refs: [RefsAt; REF_REMOTE_LIMIT] = arbitrary::r#gen(1);
473 let ann = AnnouncementMessage::Refs(RefsAnnouncement {
474 rid: arbitrary::r#gen(1),
475 refs: BoundedVec::collect_from(&mut refs.into_iter()),
476 timestamp: arbitrary::r#gen(1),
477 });
478 let ann = ann.signed(&signer);
479 let msg = Message::Announcement(ann);
480 let data = msg.encode_to_vec();
481
482 assert!(data.len() < wire::Size::MAX as usize);
483 }
484
485 #[test]
486 fn test_inv_ann_max_size() {
487 let signer = Device::mock();
488 let inv: [RepoId; INVENTORY_LIMIT] = arbitrary::r#gen(1);
489 let ann = AnnouncementMessage::Inventory(InventoryAnnouncement {
490 inventory: BoundedVec::collect_from(&mut inv.into_iter()),
491 timestamp: arbitrary::r#gen(1),
492 });
493 let ann = ann.signed(&signer);
494 let msg = Message::Announcement(ann);
495 let data = msg.encode_to_vec();
496
497 assert!(data.len() < wire::Size::MAX as usize);
498 }
499
500 #[test]
501 fn test_node_ann_max_size() {
502 let signer = Device::mock();
503 let addrs: [Address; ADDRESS_LIMIT] = arbitrary::r#gen(1);
504 let alias = ['@'; radicle::node::MAX_ALIAS_LENGTH];
505 let ann = AnnouncementMessage::Node(NodeAnnouncement {
506 version: 1,
507 features: Default::default(),
508 alias: radicle::node::Alias::new(String::from_iter(alias)),
509 addresses: BoundedVec::collect_from(&mut addrs.into_iter()),
510 timestamp: arbitrary::r#gen(1),
511 nonce: u64::MAX,
512 agent: UserAgent::default(),
513 });
514 let ann = ann.signed(&signer);
515 let msg = Message::Announcement(ann);
516 let data = msg.encode_to_vec();
517
518 assert!(data.len() < wire::Size::MAX as usize);
519 }
520
521 #[test]
522 fn test_pingpong_encode_max_size() {
523 Message::Ping(Ping {
524 ponglen: 0,
525 zeroes: ZeroBytes::new(Ping::MAX_PING_ZEROES),
526 })
527 .encode_to_vec();
528
529 (Message::Pong {
530 zeroes: ZeroBytes::new(Ping::MAX_PONG_ZEROES),
531 })
532 .encode_to_vec();
533 }
534
535 #[test]
536 #[should_panic(expected = "advance out of bounds")]
537 fn test_ping_encode_size_overflow() {
538 Message::Ping(Ping {
539 ponglen: 0,
540 zeroes: ZeroBytes::new(Ping::MAX_PING_ZEROES + 1),
541 })
542 .encode_to_vec();
543 }
544
545 #[test]
546 #[should_panic(expected = "advance out of bounds")]
547 fn test_pong_encode_size_overflow() {
548 Message::Pong {
549 zeroes: ZeroBytes::new(Ping::MAX_PONG_ZEROES + 1),
550 }
551 .encode_to_vec();
552 }
553
554 #[test]
555 fn prop_message_decoder() {
556 fn property(items: Vec<Message>) {
557 let mut decoder = Deserializer::<1048576, Message>::new(8);
558
559 for item in &items {
560 item.encode(&mut decoder);
561 }
562 for item in items {
563 assert_eq!(decoder.next().unwrap().unwrap(), item);
564 }
565 }
566
567 qcheck::QuickCheck::new()
568 .r#gen(qcheck::Gen::new(16))
569 .quickcheck(property as fn(items: Vec<Message>));
570 }
571
572 #[quickcheck]
573 fn prop_zero_bytes_encode_decode(zeroes: wire::Size) -> qcheck::TestResult {
574 if zeroes > Ping::MAX_PING_ZEROES {
575 return qcheck::TestResult::discard();
576 }
577
578 roundtrip(ZeroBytes::new(zeroes));
579
580 qcheck::TestResult::passed()
581 }
582}